On Java Threads: A fairytale of a tutorial

July 12, 2007 by Michael

I always thought that the Java Thread API is something… strange. If you work in a frontend application, things like running long-running tasks in the back without having the GUI ugly frozen and not responding should be somewhat simpler.

SwingWorker has been around for quite a time but made it just recently into the core API (Java6). Furthermore i don’t think that it’s the right thing for performing enduring tasks like checking for mail and pushing a result with a second thread into a database, for example.

Don’t get me wrong, i use SwingWorker quite often, but it didn’t fit my needs and furthermore, i wanted to learn more about Java Threads.

My goal / task was a little daemon that regularly checks an email account and a samba share for some files, load them into an Oracle Database and executes a longer running db procedure. The checking should be suspendable and stoppable independently, the configuration should be reloadable.

I throw some interfaces and abstract classes at the vm and boom, it was that simple 😉

Things i’ve learned:

  • Always start the GUI in it’s own thread, never use the main thread. Sure, most programs will work fine, but it can get confusing. Use SwingUtilities to do so.
  • A thread once terminated is not reusable. Never ever. So don’t interrupt them if you plan on resume them later.
  • Know the primitives, i.e. build in locks (synchronized and wait())
  • Know the task scheduling frameworks (Executors and ExecutorServices)
  • Read the tutorials here, here and maybe here

The following demo can start 7 producers and one consumer, both are synchronized via a BlockingQueue (i actually used a SynchronousQueue at work, as the files must only be removed if they were taken by the db). Both the producers and consumers can be suspended, resumed and stopped. I never ever will start a thread by hand again if not necessary. The threads are managed by an ExecutorService.

For the tasked mentioned above this thing works fine. If anyone comes up with a better idea, let me know.

I had fun to write it, maybe you have fun to read. Be aware, the program is not a good example of organizing classes, i put everything in one file just for the sake of being a demo.

But apart from that, the demo could serve as an example of what came to Java with Java 5 and 6 as i use a lot of generics, enums and enhanced for loops, in case you haven’t seen this.

To compile and run the stuff you need at least a Java5 JDK (get it here, download this file JThreadDemo.zip, unzip it and type

javac snow/white/JThreadDemo.java
java snow.white.JThreadDemo

If you read this, i’ll guess you’re familiar with the JDK, java packages and the whole crap.

Get the whole fun after the click:

package snow.white;
 
/*
 * This little fairytale was written by
 * Michael Simons
 * http://michael-simons.eu
 * 
 * If you like it, you can drop me line :)
 * 
 * It's published under the Beerware License: 
 * http://en.wikipedia.org/wiki/Beerware
 *
 * ----------------------------------------------------------------------------
 * "THE BEER-WARE LICENSE":
 * <misi@planet-punk.de> wrote this file. As long as you retain this notice you
 * can do whatever you want with this stuff. If we meet some day, and you think
 * this stuff is worth it, you can buy me a beer in return
 * Michael Simons
 * ----------------------------------------------------------------------------
 */
 
import java.awt.BorderLayout;
import java.awt.Container;
import java.awt.Cursor;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
 
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.JToolBar;
import javax.swing.SwingUtilities;
import javax.swing.event.DocumentEvent;
import javax.swing.event.DocumentListener;
import javax.swing.text.Document;
 
 
/**
 * This abstract working thingy represents my special runnable
 * which is suspendable as well as resumable and stoppable.
 * @author michael
 */
abstract class AbstractWorkingThingy implements Runnable {	
	/** A flag to indicate wether the task ended or not */
	protected boolean ended      = true;	
	/** A flag to indicate wether the task should stop */
	protected boolean stopped    = true;
	/** A flag to indicate wether the task should be suspended */
	protected boolean suspended  = false;
	/** The interval in secs in which to perform the duty */
	protected int intervall = 10;
	/** The executing thread, can be null */
	protected Thread currentThread;
 
	/**
	 * This methods called from anywhere (i.e. the event dispatching
	 * thread, edt) suspends the little working thing.
	 *
	 * The run method must use the mutex/semaphore "this" after
	 * isSuspend() is true as..	 
	 */
	public void suspend() {
		this.suspended = true;
	}
 
	/**
	 * ..resume is synchronized to the same mutex and
	 * notifies all threads that are waiting for this	 
	 */
	public synchronized void resume() {
		this.suspended = false;		
		this.notify();
	}		
 
	/**
	 * stop sets isStopped() to true. If the thing is suspended,
	 * it also wakes it as the run loop wouldn't stop otherwise
	 * 
	 * If the executing thread was safed in run, the thread
	 * gets also interrupted, so that any sleeping method
	 * wakes up from their dreams.
	 */
	public void stop() {
		this.stopped = true;
		// Wake the sleeping for that they come to an end
		if(suspended)
			this.resume();
		// Interrupt anything that is waiting
		if(currentThread != null)
			currentThread.interrupt();
	}
 
	/**
	 * This thing just indicates isStopped() is false...
	 * Can be overriden to do some configuration work i.e.  
	 * bringing in some cookies and cream.	 
	 */
	public synchronized void start() {
		this.stopped = false;		
	}	
 
	public boolean isEnded() {
		return this.ended;
	}
 
	public boolean isSuspended() {
		return this.suspended && !this.ended;
	}
 
	public boolean isStopped() {
		return this.stopped;
	}
 
	public boolean isRunning() {
		return !(isStopped() ||  isSuspended()); 
	}
 
	public int getIntervall() {
		return intervall;
	}
 
	public void setIntervall(int intervall) {
		this.intervall = intervall;
	}			
}
 
/**
 * Just a little something to pass along, like chips with beer.
 * @author michael
 */
class Item {
	private String diggedby;
	private int val;
 
	public String getDiggedby() {
		return diggedby;		
	}
 
	public Item setDiggedby(String diggedby) {
		this.diggedby = diggedby;
		return this;
	}
 
	public int getVal() {
		return val;
	}
 
	public Item setVal(int val) {
		this.val = val;
		return this;
	}		
}
 
/**
 * The greedy one... it takes all the cookies from the dwarfs.
 * @author michael 
 */
class Consumer extends AbstractWorkingThingy {
	/** The synchronizing queue is the source for the consumer */
	private final BlockingQueue<Item> source;
	private final PrintStream log;
 
	public Consumer(final BlockingQueue<Item> source, final PrintStream log) {
		this.source = source;	
		this.log = log;
	}
 
	public void run() {
		// I'm not dead yet...
		super.ended = false;
		// In case somebody wants to wake me
		super.currentThread = Thread.currentThread();
 
		// For now and ever... and ever... 
		while(!isStopped()) {					
			try {							
				// Check my source for new cookies for intervall secs.
				// The queue will then return a null value if nothings there. 
				// The wait is as good as a sleep.
				final Item item = this.source.poll(intervall, TimeUnit.SECONDS);
				if(item != null) 
					log.println(currentThread.getName() +":  " + 
							"Hey, i got " + item.getVal() + 
							" from " + item.getDiggedby()
					);
 
				// Ups, somebody wants me suspended...
				if (isSuspended()) {
					// ... so i wait til someone notifies me
					synchronized(this) {
						wait();             
					}
				}				
			} catch (InterruptedException e) {
				// Ok, they don't need me... 
				// i go (i was stopped and interrupted)
				log.println("I was interrupted, quitting...");
				break;
			}
		}	
		// Now i'm dead!
		super.ended = true;				
	}	
}
 
/**
 * My little working class heroe.
 * @author michael 
 */
class Producer extends AbstractWorkingThingy {
	private final String name;
	/** The synchronizing queue is the target for the producer */
	private final BlockingQueue<Item> target;		
	private final PrintStream log;
 
	public Producer(
			final String name, 
			final BlockingQueue<Item> target, 
			final PrintStream log
	) {
		this.name = name;
		this.target = target;
		this.log = log;
	}
 
	/** 
	 * @see java.lang.Runnable#run()
	 */
	public void run() {
		// Same blah blah as above...
		super.ended = false;		
		super.currentThread = Thread.currentThread();
		final Random r = new Random(System.currentTimeMillis());
 
		while(!isStopped()) {
			try {					
				// I do my duty and try to put something in the basket...
				// be aware: the queue can be synchron: one man in, one man out,
				// it can be full or maybe the world has ended on the other 
				// side, either way, put can be block for a loooong time...
				//
				// An alternative is just offering something like so:
				// target.offer(o, timeout, unit)				
				target.put(new Item().setDiggedby(name).setVal(r.nextInt()));
				log.println(
						currentThread.getName() + ": " + 
						name + " digged something..."
				);
 
				// Same blah blah as above...
				Thread.sleep(intervall * 1000);
				if (isSuspended()) {
					synchronized(this) {
						wait();             
					}
				}
			} catch (InterruptedException e) {
				log.println(name + " was interrupted, quitting...");
				break;
			} catch (Exception e) {
				log.println(name + 
						" got an error while digging: " + e.getMessage()
				);				
				break;
			}
		}		
 
		// I guess, you know the game...
		super.ended = true;
	}
 
	public String getName() {
		return name;
	}		
}
 
public class JThreadDemo extends JFrame implements ActionListener {
	public enum Command {
		START_PRODUCERS("Start or resume producers"), 
		SUSPEND_PRODUCERS("Suspend producers"), 
		STOP_PRODUCERS("Stop producers"), 
		START_CONSUMER("Start consumer"), 
		STOP_CONSUMER("Stop consumer");
 
		private String name;
 
		Command(String name) {
			this.name = name;
		}
 
		public String getName() {	
			return name;
		}		
	}		
 
	private static final long serialVersionUID = -307930103825328797L;
 
	/** My Actions */
	private final Map<Command, JButton> actions;
	/** Something to log to */
	private final JTextArea textArea;
	/** The FIFO buffer for producers and consumers */
	private final BlockingQueue<Item> sync;
	/** The one and only consumer */	
	private final Consumer consumer;
	/** The dwarfs that dig the earth */
	private final List<Producer> producers;	
	/** One executor to rule them all */
	private final ExecutorService executor;
 
	private final PrintStream log;
 
	public JThreadDemo() {
		super("Michaels funny little thread demo");
		// I want the demo to exit without any additional exit listener
		super.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);			
 
		// Initializing the toolbar
		this.actions = new HashMap<Command, JButton>();
		final JToolBar tb = new JToolBar();
		for(Command command : Command.values()) {
			final JButton b = new JButton(command.getName());		
			b.setActionCommand(command.toString());
			b.addActionListener(this);			
			tb.add(b);
 
			this.actions.put(command, b);
		}		
 
		// Our log pane
		this.textArea = new JTextArea();
		this.log = new PrintStream(new OutputStream() {
			@Override
			public void write(int b) throws IOException {
				JThreadDemo.this.textArea.append(
						// Yeah, i know, i couldn't think of anything
						// less performant...
						new String(new byte[]{(byte)b})
				);
			}			
		});		
 
		// Doing some layout crap
		final Container cp = super.getContentPane();
		cp.setLayout(new BorderLayout());
		cp.add(tb,                             BorderLayout.PAGE_START);
		cp.add(new JScrollPane(this.textArea), BorderLayout.CENTER);
 
		// I want to tail the messages
		this.textArea.getDocument().addDocumentListener(new DocumentListener() {
			private void tail(final Document document) {
				JThreadDemo.this.textArea.setCaretPosition(
						document.getLength()
				);				
			}
 
			public void changedUpdate(DocumentEvent e) {
				this.tail(e.getDocument());
			}
 
			public void insertUpdate(DocumentEvent e) {
				this.tail(e.getDocument());				
			}
 
			public void removeUpdate(DocumentEvent e) {
				this.tail(e.getDocument());				
			}			
		});		
		super.pack();
		super.setSize(super.getSize().width, 480);
 
		// Building the fairytale		
		// The means of communicate
		this.sync = new ArrayBlockingQueue<Item>(100);
		// The consumer...
		this.consumer = new Consumer(this.sync, this.log);
		this.consumer.setIntervall(5);
		// ...and somewhat more than a handfull of producers
		this.producers = new ArrayList<Producer>();
 
		// Yeah, the 7 dwarfes get ready for work...
		for(String name : new String[] {
				"Bashful", "Doc", "Dopey", 
				"Grumpy", "Happy", "Sleepy", 
				"Sneezy"
		}) 			
			this.producers.add(new Producer(name, sync, this.log));						
 
		this.executor = Executors.newCachedThreadPool();
 
		this.checkActions();
	}
 
	public void actionPerformed(ActionEvent e) {
		Command cmd = null;
		try {
			cmd = Command.valueOf(e.getActionCommand());
		} catch(IllegalArgumentException ex) {
			// We can safely ignore this...
		}
 
		// If you want to use a switch statement, do it like this:
		// Command.START_PRODUCERS can for whatever reason not
		// to be used in the case
		/*
		switch(cmd) {
		case START_PRODUCERS:
			this.startOrResumeAllProducers();
			break;
		case SUSPEND_PRODUCERS:
			this.suspendAllProducers();
			break;
		// etc.pp.
		}
		*/
 
		if(cmd == null)
			log.println("Invalid command...");
		else if(cmd == Command.START_PRODUCERS)
			this.startOrResumeAllProducers();
		else if(cmd == Command.SUSPEND_PRODUCERS)
			this.suspendAllProducers();
		else if(cmd == Command.STOP_PRODUCERS)
			this.stopAllProducers();
		else if(cmd == Command.START_CONSUMER)
			this.startConsumer();
		else if(cmd == Command.STOP_CONSUMER)
			this.stopConsumer();				
	}
 
	/**
	 * Checks every producer for its state and 
	 * brings it back to work or to a fresh start...
	 */
	private void startOrResumeAllProducers() {
		log.println(Thread.currentThread().getName() + 
				": Starting the producers..."
		);
		final Random r = new Random(System.currentTimeMillis());
 
		int running = 0;
		for(Producer producer : producers) {
			// Let's see... suspended? than it's allready be executed...		
			if(producer.isSuspended())
				producer.resume();
			// Otherwise, start it and hand it over to the executor.
			else if(producer.isEnded()) {			
				// I like surprises... so let them do the work in some
				// random intervalls												
				producer.setIntervall(Math.max(1,r.nextInt(20)));							
				producer.start();
 
				log.println("Producer " + producer.getName() + " has an " +
						"interval of " + producer.getIntervall() + "sec"
				);
				this.executor.execute(producer);
			}
 
			running += producer.isStopped() || producer.isSuspended() ? 0 : 1;
		}
 
		log.println(Thread.currentThread().getName()  + ": Started " + running +
				" producers..."
		);		
		this.checkActions();
	}
 
	/**
	 * Suspends all producers. This method doesn't need to be synchronized
	 * as no-one is activly waiting to be notified.
	 */
	private void suspendAllProducers() {
		for(Producer producer : producers) 
			producer.suspend();		
		this.checkActions();
	}
 
	/**
	 * Stopping all producers.
	 * This methods tries to stop all producers, giving them time to end 
	 * their jobs	 
	 */
	@SuppressWarnings("unchecked")
	private void stopAllProducers() {
		super.setEnabled(false);		
		// Can anyone tell me why the heck this works ... randomly??
		super.getGlassPane().setCursor(
				Cursor.getPredefinedCursor(Cursor.WAIT_CURSOR)
		);
 
		// So this can take a while, if a producer doesn't immediate respond to
		// stop or ignores the interrupt (maybe it needs to disconnect from a 
		// database, his wife or just the fridge)
		// So here comes the future task, with the callable and a rudimentary 
		// callback method:
		this.executor.execute(
				// The future tasks gets executed...
				new FutureTask (
						// ...and delegates to a callable
						// This things are generics, so if they produce 
						// something, type them and return the corresponding 
						// result.
						new Callable() {												
							public Object call() throws Exception {
								for(Producer producer : producers) {
									// Oh, no need to get involved.
									if(producer.isEnded())
										continue;	
									// Try to stop
									producer.stop();
									// ... as long as you wait for me ...
									while(!producer.isEnded()) {
										// Give them a chance to free resources
										synchronized (this) {
											log.println(
													Thread.currentThread()
													.getName()  + ": " +
													"Waiting for " +
													"producers to end..."
											);
											try {			
												// Some arbitrary wait time									
												this.wait(2 * 1000);
											} catch (InterruptedException e) {							
											}	
										}					
									}											
								}
								return null;
							}
 
						}) {
 
					/**
					 * This belongs to future tasks and acts like a callback 
					 * when the callable is done.
					 * If the callable isn't void, one can get the result with 
					 * "get()"
					 * @see java.util.concurrent.FutureTask#done()
					 */
					@Override
					protected void done() {
						super.done();												
						JThreadDemo.super.getGlassPane().setCursor(
								Cursor.getPredefinedCursor(
										Cursor.DEFAULT_CURSOR
								)
						);
						JThreadDemo.super.setEnabled(true);
						JThreadDemo.this.checkActions();
					}			
				}
		);		
	}
 
	private void startConsumer() {
		if(consumer.isSuspended())
			consumer.resume();
		else if(consumer.isEnded()) {
			consumer.start();
			this.executor.execute(consumer);
		}
 
		this.checkActions();
	}
 
	@SuppressWarnings("unchecked")
	private void stopConsumer() {
		super.setEnabled(false);
		super.getContentPane().setCursor(
				Cursor.getPredefinedCursor(Cursor.WAIT_CURSOR));
		this.executor.execute(
				new FutureTask(new Callable() {
					public Object call() throws Exception {
						if(!consumer.isEnded()) {
							consumer.stop();
							while(!consumer.isEnded()) {
								synchronized (this) {
									log.println(Thread.currentThread().getName()
											+ ": Waiting for consumer to end..."
									);
									try {
										this.wait(1000);
									} catch (InterruptedException e) {							
									}	
								}					
							}											
						}
						return null;
					}
 
				}) {
					@Override
					protected void done() {
						super.done();
 
						JThreadDemo.super.getGlassPane().setCursor(
								Cursor.getPredefinedCursor(Cursor.DEFAULT_CURSOR
								)
						);
						JThreadDemo.super.setEnabled(true);
						JThreadDemo.this.checkActions();
					}			
				}
		);	
	}
 
	private void checkActions() {
		int running = 0;
		for(Producer producer : producers) 
			running += producer.isRunning() ? 1 : 0;		
 
		this.actions.get(Command.START_PRODUCERS).setEnabled(running == 0);
		this.actions.get(Command.SUSPEND_PRODUCERS).setEnabled(running > 0);
		this.actions.get(Command.STOP_PRODUCERS).setEnabled(running > 0);
		this.actions.get(Command.START_CONSUMER).setEnabled(
				!consumer.isRunning()
		);
		this.actions.get(Command.STOP_CONSUMER).setEnabled(
				consumer.isRunning()
		);		
	}
 
	public static void main(String[] args) {
		final JThreadDemo demo = new JThreadDemo();
		// First of all: Start the application in the right thread.
		// Swing got its one event dispatching thread, leave the 
		// system thread alone.
		SwingUtilities.invokeLater(new Runnable() {
			public void run() {								
				demo.setVisible(true);												
			}
		});				
	}	
}

No comments yet

One Trackback/Pingback
  1. Run to the hill(s)… | Planet-Punk.de on July 12, 2007 at 10:05 PM

    […] kann er sich mal angucken, mit was für Kram ich mir so meine Nachmittage um die Ohren schlage: Snow White and the 7 threads. Achtung, Java und so. Nicht direkt wieder mit geschweiften Klammern nach mir […]

Post a Comment

Your email is never published. We need your name and email address only for verifying a legitimate comment. For more information, a copy of your saved data or a request to delete any data under this address, please send a short notice to michael@simons.ac from the address you used to comment on this entry.
By entering and submitting a comment, wether with or without name or email address, you'll agree that all data you have entered including your IP address will be checked and stored for a limited time by Automattic Inc., 60 29th Street #343, San Francisco, CA 94110-4929, USA. only for the purpose of avoiding spam. You can deny further storage of your data by sending an email to support@wordpress.com, with subject “Deletion of Data stored by Akismet”.
Required fields are marked *