January 22, 2009

Java Concurrency

Writing a thread safe application is a challenging task, but with Java 1.5 and 1.6 a lot of the plumbing infrastructure has been moved into the Java language. But before looking at these new feature, one must first be able to write thread safe classes. Writing thread safe classes is most about sharing object's state in a safe way. That can be achieved in four ways:
  • Use Thread Confinement, that is to ensure that data is only accessed by one thread at a time. Examples of that are:
    • Swing Event Dispatch Thread, that is accessed via the javax.swing.SwingWorker
    • java.sql.Connection that is retrieved from a server application pool.
  • Use Read-Only data.
  • Make object thread safe.
    • java.util.concurrent.atomic.AtomicLong
    • java.util.concurrent.ConcurrentHashMap
    • java.util.concurrent.ConcurrentLinkedQueue
    • java.util.concurrent.CopyOnWriteArrayList is the concurrent replacement of . java.util.Collections.synchronizedList(), but should only be used when iteration is far more common than modification. The reason is that copying arrays becomes very costly when the size of the list grows.
  • Guard object.
    • synchronized methods
    • Intrinsic Lock: Using private class variable to synchronize blocks in methods.
  • If you are not depending of the previous value of a variable, you can get away of adding volatile to the class variable. Volatile ensures that the most recent values are always visible between different threads memory areas. Using volatile can simplify implementing thread safety, but beware that volatile only guarantee visibility, not Atomicity.
Now to show all the goodies that the latest Java comes with. If you want to move out one task of a code that could be done asynchronously, you need a Producer-Consumer. The Producer is the original code and the asynchronously task is put in the Consumer.
public class Producer implements Runnable {

 private final File root;

 private final BlockingQueue<String> queue;

 public Producer(String path, BlockingQueue<String> queue) {
  this.root = new File(path);
  this.queue = queue;
 }

 @Override
 public void run() {
  for (File file : root.listFiles()) {
   if (file.isFile()) {
    readHeadOfFile(file);
   }
  }
  System.out.println("Finished producing!");
 }

 public void readHeadOfFile(File file) {
  System.out.println("Reading '" + file.getName() + "'...");
  BufferedReader reader = null;
  try {
   reader = new BufferedReader(new FileReader(file));
   queue.add(reader.readLine());
  } catch (IOException e) {
   System.out.println("Failed to read '" + file.getAbsolutePath() + "'. " + e.getMessage());
   Thread.currentThread().interrupt();
  } finally {
   closeQuitly(reader);
  }
 }
 
 public void closeQuitly(Reader reader) {
  if (reader != null) {
   try {
    reader.close();
   } catch (IOException e) {
   }
  }
 }
}

public class Consumer implements Runnable {

 private final BlockingQueue<String> queue;

 public Consumer(BlockingQueue<String> queue) {
  this.queue = queue;
 }

 @Override
 public void run() {
  try {
   while (true) {
    String str = queue.take();
    System.out.println("Consuming '" + str + "'.");
   }
  } catch (InterruptedException e) {
   System.out.println("Consumer interrupted!");
   Thread.currentThread().interrupt();
  }
 }
}

public class ProducerConsumerTest {

 private final ExecutorService pool = Executors.newFixedThreadPool(10);

 private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
 
 @Before
 public void setUp() {

 }

 @After
 public void tearDown() {
  pool.shutdown(); // Disable new tasks from being submitted
  try {
   System.out.println("Wait 30s for existing tasks to terminate...");
   if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
    System.out.println("Cancel currently executing tasks");
    pool.shutdownNow();  
    // Wait a while for tasks to respond to being cancelled
    if (!pool.awaitTermination(60, TimeUnit.SECONDS))
     System.err.println("Pool did not terminate");
   }
  } catch (InterruptedException ie) {
   // (Re-)Cancel if current thread also interrupted
   pool.shutdownNow();
   // Preserve interrupt status
   Thread.currentThread().interrupt();
  }
 }

 @Test
 public void test() {
  try {
   pool.execute(new Producer("/home/magnus/documents/", queue));
   pool.execute(new Consumer(queue));
  } catch (Exception e) {
   e.printStackTrace();
   pool.shutdown();
  }
 }
}
Examples of applicable scenarios are:
  • Moving logic operation out from the IO code.
  • Moving logging code out of the logic code.
Another convenient function is a Latch. I used it when writing a test harness and wanted to make sure all the test threads were released at the same time and then wait for them to finish, before exiting the test harness. See java.util.concurrent.CountDownLatch API for a good example. If you want a reusable Latch, then look at java.util.concurrent.CyclicBarrier. I used that one when doing calculation that should take place every time unit. See the API for a good example. One of the best thing that comes out of the box are all the Thread Pools:
  • java.util.concurrent.Executors.newFixedThreadPool
  • java.util.concurrent.Executors.newCacheThreadPool
  • java.util.concurrent.Executors.newSingleThreadPool. Good because if single thread goes down a new one is automatically started.
  • java.util.concurrent.Executors.newScheduledThreadPool

No comments: