I always thought that the Java Thread API is something… strange. If you work in a frontend application, things like running long-running tasks in the back without having the GUI ugly frozen and not responding should be somewhat simpler.
SwingWorker has been around for quite a time but made it just recently into the core API (Java6). Furthermore i don’t think that it’s the right thing for performing enduring tasks like checking for mail and pushing a result with a second thread into a database, for example.
Don’t get me wrong, i use SwingWorker quite often, but it didn’t fit my needs and furthermore, i wanted to learn more about Java Threads.
My goal / task was a little daemon that regularly checks an email account and a samba share for some files, load them into an Oracle Database and executes a longer running db procedure. The checking should be suspendable and stoppable independently, the configuration should be reloadable.
I throw some interfaces and abstract classes at the vm and boom, it was that simple 😉
Things i’ve learned:
- Always start the GUI in it’s own thread, never use the main thread. Sure, most programs will work fine, but it can get confusing. Use SwingUtilities to do so.
- A thread once terminated is not reusable. Never ever. So don’t interrupt them if you plan on resume them later.
- Know the primitives, i.e. build in locks (synchronized and wait())
- Know the task scheduling frameworks (Executors and ExecutorServices)
- Read the tutorials here, here and maybe here
The following demo can start 7 producers and one consumer, both are synchronized via a BlockingQueue (i actually used a SynchronousQueue at work, as the files must only be removed if they were taken by the db). Both the producers and consumers can be suspended, resumed and stopped. I never ever will start a thread by hand again if not necessary. The threads are managed by an ExecutorService.
For the tasked mentioned above this thing works fine. If anyone comes up with a better idea, let me know.
I had fun to write it, maybe you have fun to read. Be aware, the program is not a good example of organizing classes, i put everything in one file just for the sake of being a demo.
But apart from that, the demo could serve as an example of what came to Java with Java 5 and 6 as i use a lot of generics, enums and enhanced for loops, in case you haven’t seen this.
To compile and run the stuff you need at least a Java5 JDK (get it here, download this file JThreadDemo.zip, unzip it and type
javac snow/white/JThreadDemo.java java snow.white.JThreadDemo
If you read this, i’ll guess you’re familiar with the JDK, java packages and the whole crap.
Get the whole fun after the click:
package snow.white; /* * This little fairytale was written by * Michael Simons * http://michael-simons.eu * * If you like it, you can drop me line :) * * It's published under the Beerware License: * http://en.wikipedia.org/wiki/Beerware * * ---------------------------------------------------------------------------- * "THE BEER-WARE LICENSE": * <misi@planet-punk.de> wrote this file. As long as you retain this notice you * can do whatever you want with this stuff. If we meet some day, and you think * this stuff is worth it, you can buy me a beer in return * Michael Simons * ---------------------------------------------------------------------------- */ import java.awt.BorderLayout; import java.awt.Container; import java.awt.Cursor; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.JToolBar; import javax.swing.SwingUtilities; import javax.swing.event.DocumentEvent; import javax.swing.event.DocumentListener; import javax.swing.text.Document; /** * This abstract working thingy represents my special runnable * which is suspendable as well as resumable and stoppable. * @author michael */ abstract class AbstractWorkingThingy implements Runnable { /** A flag to indicate wether the task ended or not */ protected boolean ended = true; /** A flag to indicate wether the task should stop */ protected boolean stopped = true; /** A flag to indicate wether the task should be suspended */ protected boolean suspended = false; /** The interval in secs in which to perform the duty */ protected int intervall = 10; /** The executing thread, can be null */ protected Thread currentThread; /** * This methods called from anywhere (i.e. the event dispatching * thread, edt) suspends the little working thing. * * The run method must use the mutex/semaphore "this" after * isSuspend() is true as.. */ public void suspend() { this.suspended = true; } /** * ..resume is synchronized to the same mutex and * notifies all threads that are waiting for this */ public synchronized void resume() { this.suspended = false; this.notify(); } /** * stop sets isStopped() to true. If the thing is suspended, * it also wakes it as the run loop wouldn't stop otherwise * * If the executing thread was safed in run, the thread * gets also interrupted, so that any sleeping method * wakes up from their dreams. */ public void stop() { this.stopped = true; // Wake the sleeping for that they come to an end if(suspended) this.resume(); // Interrupt anything that is waiting if(currentThread != null) currentThread.interrupt(); } /** * This thing just indicates isStopped() is false... * Can be overriden to do some configuration work i.e. * bringing in some cookies and cream. */ public synchronized void start() { this.stopped = false; } public boolean isEnded() { return this.ended; } public boolean isSuspended() { return this.suspended && !this.ended; } public boolean isStopped() { return this.stopped; } public boolean isRunning() { return !(isStopped() || isSuspended()); } public int getIntervall() { return intervall; } public void setIntervall(int intervall) { this.intervall = intervall; } } /** * Just a little something to pass along, like chips with beer. * @author michael */ class Item { private String diggedby; private int val; public String getDiggedby() { return diggedby; } public Item setDiggedby(String diggedby) { this.diggedby = diggedby; return this; } public int getVal() { return val; } public Item setVal(int val) { this.val = val; return this; } } /** * The greedy one... it takes all the cookies from the dwarfs. * @author michael */ class Consumer extends AbstractWorkingThingy { /** The synchronizing queue is the source for the consumer */ private final BlockingQueue<Item> source; private final PrintStream log; public Consumer(final BlockingQueue<Item> source, final PrintStream log) { this.source = source; this.log = log; } public void run() { // I'm not dead yet... super.ended = false; // In case somebody wants to wake me super.currentThread = Thread.currentThread(); // For now and ever... and ever... while(!isStopped()) { try { // Check my source for new cookies for intervall secs. // The queue will then return a null value if nothings there. // The wait is as good as a sleep. final Item item = this.source.poll(intervall, TimeUnit.SECONDS); if(item != null) log.println(currentThread.getName() +": " + "Hey, i got " + item.getVal() + " from " + item.getDiggedby() ); // Ups, somebody wants me suspended... if (isSuspended()) { // ... so i wait til someone notifies me synchronized(this) { wait(); } } } catch (InterruptedException e) { // Ok, they don't need me... // i go (i was stopped and interrupted) log.println("I was interrupted, quitting..."); break; } } // Now i'm dead! super.ended = true; } } /** * My little working class heroe. * @author michael */ class Producer extends AbstractWorkingThingy { private final String name; /** The synchronizing queue is the target for the producer */ private final BlockingQueue<Item> target; private final PrintStream log; public Producer( final String name, final BlockingQueue<Item> target, final PrintStream log ) { this.name = name; this.target = target; this.log = log; } /** * @see java.lang.Runnable#run() */ public void run() { // Same blah blah as above... super.ended = false; super.currentThread = Thread.currentThread(); final Random r = new Random(System.currentTimeMillis()); while(!isStopped()) { try { // I do my duty and try to put something in the basket... // be aware: the queue can be synchron: one man in, one man out, // it can be full or maybe the world has ended on the other // side, either way, put can be block for a loooong time... // // An alternative is just offering something like so: // target.offer(o, timeout, unit) target.put(new Item().setDiggedby(name).setVal(r.nextInt())); log.println( currentThread.getName() + ": " + name + " digged something..." ); // Same blah blah as above... Thread.sleep(intervall * 1000); if (isSuspended()) { synchronized(this) { wait(); } } } catch (InterruptedException e) { log.println(name + " was interrupted, quitting..."); break; } catch (Exception e) { log.println(name + " got an error while digging: " + e.getMessage() ); break; } } // I guess, you know the game... super.ended = true; } public String getName() { return name; } } public class JThreadDemo extends JFrame implements ActionListener { public enum Command { START_PRODUCERS("Start or resume producers"), SUSPEND_PRODUCERS("Suspend producers"), STOP_PRODUCERS("Stop producers"), START_CONSUMER("Start consumer"), STOP_CONSUMER("Stop consumer"); private String name; Command(String name) { this.name = name; } public String getName() { return name; } } private static final long serialVersionUID = -307930103825328797L; /** My Actions */ private final Map<Command, JButton> actions; /** Something to log to */ private final JTextArea textArea; /** The FIFO buffer for producers and consumers */ private final BlockingQueue<Item> sync; /** The one and only consumer */ private final Consumer consumer; /** The dwarfs that dig the earth */ private final List<Producer> producers; /** One executor to rule them all */ private final ExecutorService executor; private final PrintStream log; public JThreadDemo() { super("Michaels funny little thread demo"); // I want the demo to exit without any additional exit listener super.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); // Initializing the toolbar this.actions = new HashMap<Command, JButton>(); final JToolBar tb = new JToolBar(); for(Command command : Command.values()) { final JButton b = new JButton(command.getName()); b.setActionCommand(command.toString()); b.addActionListener(this); tb.add(b); this.actions.put(command, b); } // Our log pane this.textArea = new JTextArea(); this.log = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { JThreadDemo.this.textArea.append( // Yeah, i know, i couldn't think of anything // less performant... new String(new byte[]{(byte)b}) ); } }); // Doing some layout crap final Container cp = super.getContentPane(); cp.setLayout(new BorderLayout()); cp.add(tb, BorderLayout.PAGE_START); cp.add(new JScrollPane(this.textArea), BorderLayout.CENTER); // I want to tail the messages this.textArea.getDocument().addDocumentListener(new DocumentListener() { private void tail(final Document document) { JThreadDemo.this.textArea.setCaretPosition( document.getLength() ); } public void changedUpdate(DocumentEvent e) { this.tail(e.getDocument()); } public void insertUpdate(DocumentEvent e) { this.tail(e.getDocument()); } public void removeUpdate(DocumentEvent e) { this.tail(e.getDocument()); } }); super.pack(); super.setSize(super.getSize().width, 480); // Building the fairytale // The means of communicate this.sync = new ArrayBlockingQueue<Item>(100); // The consumer... this.consumer = new Consumer(this.sync, this.log); this.consumer.setIntervall(5); // ...and somewhat more than a handfull of producers this.producers = new ArrayList<Producer>(); // Yeah, the 7 dwarfes get ready for work... for(String name : new String[] { "Bashful", "Doc", "Dopey", "Grumpy", "Happy", "Sleepy", "Sneezy" }) this.producers.add(new Producer(name, sync, this.log)); this.executor = Executors.newCachedThreadPool(); this.checkActions(); } public void actionPerformed(ActionEvent e) { Command cmd = null; try { cmd = Command.valueOf(e.getActionCommand()); } catch(IllegalArgumentException ex) { // We can safely ignore this... } // If you want to use a switch statement, do it like this: // Command.START_PRODUCERS can for whatever reason not // to be used in the case /* switch(cmd) { case START_PRODUCERS: this.startOrResumeAllProducers(); break; case SUSPEND_PRODUCERS: this.suspendAllProducers(); break; // etc.pp. } */ if(cmd == null) log.println("Invalid command..."); else if(cmd == Command.START_PRODUCERS) this.startOrResumeAllProducers(); else if(cmd == Command.SUSPEND_PRODUCERS) this.suspendAllProducers(); else if(cmd == Command.STOP_PRODUCERS) this.stopAllProducers(); else if(cmd == Command.START_CONSUMER) this.startConsumer(); else if(cmd == Command.STOP_CONSUMER) this.stopConsumer(); } /** * Checks every producer for its state and * brings it back to work or to a fresh start... */ private void startOrResumeAllProducers() { log.println(Thread.currentThread().getName() + ": Starting the producers..." ); final Random r = new Random(System.currentTimeMillis()); int running = 0; for(Producer producer : producers) { // Let's see... suspended? than it's allready be executed... if(producer.isSuspended()) producer.resume(); // Otherwise, start it and hand it over to the executor. else if(producer.isEnded()) { // I like surprises... so let them do the work in some // random intervalls producer.setIntervall(Math.max(1,r.nextInt(20))); producer.start(); log.println("Producer " + producer.getName() + " has an " + "interval of " + producer.getIntervall() + "sec" ); this.executor.execute(producer); } running += producer.isStopped() || producer.isSuspended() ? 0 : 1; } log.println(Thread.currentThread().getName() + ": Started " + running + " producers..." ); this.checkActions(); } /** * Suspends all producers. This method doesn't need to be synchronized * as no-one is activly waiting to be notified. */ private void suspendAllProducers() { for(Producer producer : producers) producer.suspend(); this.checkActions(); } /** * Stopping all producers. * This methods tries to stop all producers, giving them time to end * their jobs */ @SuppressWarnings("unchecked") private void stopAllProducers() { super.setEnabled(false); // Can anyone tell me why the heck this works ... randomly?? super.getGlassPane().setCursor( Cursor.getPredefinedCursor(Cursor.WAIT_CURSOR) ); // So this can take a while, if a producer doesn't immediate respond to // stop or ignores the interrupt (maybe it needs to disconnect from a // database, his wife or just the fridge) // So here comes the future task, with the callable and a rudimentary // callback method: this.executor.execute( // The future tasks gets executed... new FutureTask ( // ...and delegates to a callable // This things are generics, so if they produce // something, type them and return the corresponding // result. new Callable() { public Object call() throws Exception { for(Producer producer : producers) { // Oh, no need to get involved. if(producer.isEnded()) continue; // Try to stop producer.stop(); // ... as long as you wait for me ... while(!producer.isEnded()) { // Give them a chance to free resources synchronized (this) { log.println( Thread.currentThread() .getName() + ": " + "Waiting for " + "producers to end..." ); try { // Some arbitrary wait time this.wait(2 * 1000); } catch (InterruptedException e) { } } } } return null; } }) { /** * This belongs to future tasks and acts like a callback * when the callable is done. * If the callable isn't void, one can get the result with * "get()" * @see java.util.concurrent.FutureTask#done() */ @Override protected void done() { super.done(); JThreadDemo.super.getGlassPane().setCursor( Cursor.getPredefinedCursor( Cursor.DEFAULT_CURSOR ) ); JThreadDemo.super.setEnabled(true); JThreadDemo.this.checkActions(); } } ); } private void startConsumer() { if(consumer.isSuspended()) consumer.resume(); else if(consumer.isEnded()) { consumer.start(); this.executor.execute(consumer); } this.checkActions(); } @SuppressWarnings("unchecked") private void stopConsumer() { super.setEnabled(false); super.getContentPane().setCursor( Cursor.getPredefinedCursor(Cursor.WAIT_CURSOR)); this.executor.execute( new FutureTask(new Callable() { public Object call() throws Exception { if(!consumer.isEnded()) { consumer.stop(); while(!consumer.isEnded()) { synchronized (this) { log.println(Thread.currentThread().getName() + ": Waiting for consumer to end..." ); try { this.wait(1000); } catch (InterruptedException e) { } } } } return null; } }) { @Override protected void done() { super.done(); JThreadDemo.super.getGlassPane().setCursor( Cursor.getPredefinedCursor(Cursor.DEFAULT_CURSOR ) ); JThreadDemo.super.setEnabled(true); JThreadDemo.this.checkActions(); } } ); } private void checkActions() { int running = 0; for(Producer producer : producers) running += producer.isRunning() ? 1 : 0; this.actions.get(Command.START_PRODUCERS).setEnabled(running == 0); this.actions.get(Command.SUSPEND_PRODUCERS).setEnabled(running > 0); this.actions.get(Command.STOP_PRODUCERS).setEnabled(running > 0); this.actions.get(Command.START_CONSUMER).setEnabled( !consumer.isRunning() ); this.actions.get(Command.STOP_CONSUMER).setEnabled( consumer.isRunning() ); } public static void main(String[] args) { final JThreadDemo demo = new JThreadDemo(); // First of all: Start the application in the right thread. // Swing got its one event dispatching thread, leave the // system thread alone. SwingUtilities.invokeLater(new Runnable() { public void run() { demo.setVisible(true); } }); } } |
No comments yet
One Trackback/Pingback
[…] kann er sich mal angucken, mit was für Kram ich mir so meine Nachmittage um die Ohren schlage: Snow White and the 7 threads. Achtung, Java und so. Nicht direkt wieder mit geschweiften Klammern nach mir […]
Post a Comment