Java 7 – Fork/Join

While we lamented how feature-poor Java 7 turned out to be, one thing that made it that turns out to be a boon to high-performance concurrent development is the new Fork/Join framework. This framework is targeted at multi-processor systems (really almost all hardware today) in situations where a batch of work can broken into smaller recursive calls. But more than just that, it also uses a work-stealing algorithm where threads with no work can steal available work from other threads that are busy. What makes that so useful is that you can try to break down your work into fairly small, roughly equal pieces, but some pieces can take longer than others and you’ll still get good use of your processing resources.

The Fork/Join framework builds on the ExecutorService we discussed in our Concurrency series and the implementation is ForkJoinPool that can execute the ForkJoinTask which is usually implemented as a child of either RecursiveTask(returns data) or RecursiveAction(no returned data).

As is our custom, let’s use a sample task and some applicable code. Let’s assume you want to write a class that will calculate the size on disk of a given directory and all its children. And in our case we’ll just make the assumption we’re running some wicked fast SSDs so we can actually benefit from concurrent scans.

Here’s our code:

import java.util.concurrent.*;
import java.io.*;
import java.util.*;

public class DirectorySizer extends RecursiveTask<Long> {
  
  private List<File> mFiles;
  private boolean mAllFiles = true;
  
  public DirectorySizer(List<File> files) {
    mFiles = files;
    for (File file : files) {
      if (file.isDirectory()) {
        mAllFiles = false;
      }
    }
  }
  
  protected Long compute() {
    if (mFiles.size() <=4 && mAllFiles) {
      return computeLocal();
    } else {
      return forkAndJoin();
    }  
  }
  
  private Long computeLocal() {
    long length = 0;
    for (File file : mFiles) {
	  length += file.length();
    }
    return length;
  }
  
  private Long forkAndJoin() {
    List<File> dirsAndFiles = new ArrayList();
	for (File file : mFiles) {
      if (file.isFile()) {
        dirsAndFiles.add(file);
      } else {
        dirsAndFiles.addAll(Arrays.asList(file.listFiles()));
      }
    }
    int rightSize = dirsAndFiles.size() / 2;
    int leftSize = dirsAndFiles.size() - rightSize;
    List<File> leftList = dirsAndFiles.subList(0, leftSize);
    List<File> rightList= dirsAndFiles.subList(leftSize, leftSize+rightSize);
    DirectorySizer d1 = new DirectorySizer(leftList);
    d1.fork();
    DirectorySizer d2 = new DirectorySizer(rightList);
    return d2.compute() + d1.join();
  }
  
  public static void main(String[] args) throws Exception {
    List<File> files = Arrays.asList(new File(args[0]).listFiles());
    DirectorySizer sizer = new DirectorySizer(files);
    ForkJoinPool pool = new ForkJoinPool();
    Long size = pool.invoke(sizer);
    System.out.println(args[0] + " is " + size + " bytes ");
  }
}

Let's break down the usage of the Fork/Join framework. By extending RecursiveTask, all we really need to do is implement the compute method calculating the size when the amount of work is small enough to suit our needs and using a fork/join when the chunk of work is still too large. In our main method we get all the benefit of the framework simply by creating a new ForkJoinPool and passing our top-level instance to the invoke method.

While it's true the potential uses of this framework are limited and require a fairly narrow problem scope, it's nice to see continued advancement in Java in the concurrency space to build on all the goodies we got in Java 5. If you have any concurrency problems you'd like us to tackle in our blog, drop us line. We enjoy talking shop with fellow developers.

Java Concurrency Part 6 – CountDownLatch

Some concurrency utilities in Java naturally get more attention than others just because they serve general purpose problems instead of more specific ones. Most of us encounter things like executor services and concurrent collections fairly often. Other utilities are less common, so sometimes they may escape us, but it’s good to keep them in mind. CountDownLatch is one of those tools.

CountDownLatch – a more general wait/notify mechanism

Java developers of all sorts should be familiar with the wait/notify approach to blocking until a condition is reached. Here is a little sample of how it works:

	
public void testWaitNotify() throws Exception {
   final Object mutex = new Object();
   Thread t = new Thread() {
      public void run() {
      // we must acquire the lock before waiting to be notified
      synchronized(mutex) {
         System.out.println("Going to wait " +
                            "(lock held by " + Thread.currentThread().getName() + ")");
            try {
               mutex.wait(); // this will release the lock to be notified (optional timeout can be supplied)
            } catch (InterruptedException e) {
               e.printStackTrace();
            } 
            
            System.out.println("Done waiting " +
                               "(lock held by " + Thread.currentThread().getName() + ")");
         }
      }
   };

   t.start(); // start her up and let her wait()

   // not normally how we do things, but good enough for demonstration purposes
   Thread.sleep(1000);

   // we acquire the lock released by wait(), and notify()
   synchronized (mutex) {
      System.out.println("Going to notify " +
                         "(lock held by " + Thread.currentThread().getName() + ")");
      mutex.notify();
      System.out.println("Done notify " +
                         "(lock held by " + Thread.currentThread().getName() + ")");
   }

}

Output

Going to wait (lock held by Thread-0)
Going to notify (lock held by main)
Done notify (lock held by main)
Done waiting (lock held by Thread-0)

A CountDownLatch can actually be used similar to a wait/notify with only one notify – that is, as long as you don’t want wait() to stall if notify() is called before you have acquired the lock and invoked wait(). It is actually more forgiving because of this, and in some cases, it’s just what you want. Here’s a sample:

public void testWaitNotify() throws Exception {
   final CountDownLatch latch = new CountDownLatch(1); // just one time
   Thread t = new Thread() {
      public void run() {
         // no lock to acquire!
         System.out.println("Going to count down...");
         latch.countDown();
      }
   };
	
   t.start(); // start her up and let her wait()
   System.out.println("Going to await...");
   latch.await();
   System.out.println("Done waiting!");
}

As you can see, it is simpler than wait/notify, and requires less code. It also allows us to invoke the condition that ultimately releases the block before we call wait(). This can mean safer code.

A Real World Example

So we know we can use it as a simpler wait/notify mechanism, but you probably saw the constructor argument that we used above. In the constructor, you specify the number of times the latch needs to be counted down before unlocking. What possible uses are there for this? Well, it can be used to make a process wait until a certain number of actions have been taken.

For example, if you have an asynchronous process that you can hook into via listeners or something similar, you can create unit tests that verify a certain number of invocations were made. This lets us long only as long as we need to in the normal case (or some limit before we bail and assume failure).

I recently ran into a case where I had to verify that JMS messages were being pulled off the queue and handled correctly. This was naturally asynchronous and outside of my control, and mocks weren’t an option since it was a fully assembled application with Spring context, etc. To test this, I made minor changes to the consuming services to allow listeners to be invoked when the messages were processed. I was then able to temporarily add a listener, which used a CountDownLatch to keep my tests as close to synchronous as possible.

Here’s an example that shows the concept:


public void testSomeProcessing() throws Exception {
   // should be called twice
   final CountDownLatch testLatch = new CountDownLatch(2);
   ExecutorService executor = Executors.newFixedThreadPool(1);
   AsyncProcessor processor = new AsyncProcessor(new Observer() {
      // this observer would be the analogue for a listener in your async process
      public void update(Observable o, Object arg) {
         System.out.println("Counting down...");
         testLatch.countDown();
      }
   });
	
   //submit two tasks to be process 
   // (in my real world example, these were JMS messages)
   executor.submit(processor);
   executor.submit(processor);
	
   System.out.println("Submitted tasks. Time to wait...");
   long time = System.currentTimeMillis();
   testLatch.await(5000, TimeUnit.MILLISECONDS); // bail after a reasonable time
   long totalTime = System.currentTimeMillis() - time;
	
   System.out.println("I awaited for " + totalTime + 
                      "ms. Did latch count down? " + (testLatch.getCount() == 0));
	
   executor.shutdown();
}

// just a process that takes a random amount of time 
// (up to 2 seconds) and calls its listener
public class AsyncProcessor implements Callable<Object> {
      private Observer listener; 
      private AsyncProcessor(Observer listener) {
      this.listener = listener;
   }

   public Object call() throws Exception {
      // some processing here which can take all kinds of time...
      int sleepTime = new Random().nextInt(2000);
      System.out.println("Sleeping for " + sleepTime + "ms");
      Thread.sleep(sleepTime);
      listener.update(null, null); // not standard usage, but good for a demo
      return null;
   }
}

Output

Submitted tasks. Time to wait...
Sleeping for 739ms
Counting down...
Sleeping for 1742ms
Counting down...
I awaited for 2481ms. Did latch count down? true

Conclusion

That’s about it for the CountDownLatch. It is not a complicated subject and it has limited usages, but it’s good to see examples and know it’s their in your tool chest when you hit a problem like I did. In the future, I’ll definitely keep it in mind for a simpler wait/notify, if nothing else. If you have questions or comments about this post or others in the series, just leave a message.

Concurrent Collections – Map Time!

Java has boasted various collections classes for many years now, all to deal with common programming problems.

When we need synchronized collections, we used to just wrap our regular collections with a call to java.util.Collections.synchronizedList() or the other similar methods. Sometimes though, these methods don’t scale as they are a very primitive and unoptimized way of controlling access.

In this post we will cover the implementation details of ConcurrentHashMap, and also discuss ConcurrentSkipListMap later on. ConcurrentHashMap implements the normal Map interface, and shows the same behaviour as HashMap, so it is a nice drop-in replacement for other maps. But most importantly, it offers vastly superior concurrent performance over using Collections.synchronizedMap() or even using your own customized synchronized blocks.

How does it do this? Well, in the average application, reads typically outnumber writes by a huge number and this applies to most critical sections also – we end up locking and blocking other threads for a lot of reads where nothing is going on. How can this be improved?

Well, the designers of ConcurrentHashMap realized this problem and created the map so that we go beyond having a certain number of buckets and split the map into several segments which each have their own buckets. Hashing the key is still used to determine the appropriate segment, just as they are for buckets. The segment buckets also take advantage of volatile fields to ensure consistent reads during get() or other read operations.

Two nice properties come out of this:

  • Each segment can actually be written to without locking the rest of the hash table
  • Reads which are not concurrent with a write to the same bucket will not require a lock (due to volatile fields)

Tip: If you want the equivalent of a ConcurrentHashSet, use Collections.newSetFromMap() with a ConcurrentHashMap as the argument.

Tuning ConcurrentHashMap

Now that we understand the mechanics behind the ConcurrentHashMap, how do we tune it for specific cases?

Well, as a general rule, the sparser the map, the less contention there will be. The only real way to improve concurrency within the map is to adjust the concurrency level argument supplied upon construction. This controls how many segments are created, which directly corresponds to how well the map will scale with many concurrent writes (or reads when concurrent writes are happening). The example below shows examples of various constructions that will result in different performance characteristics.

// default settings, allows for 16 concurrent segments, 
// with default load factor of .75 and initial capacity of 16
Map<String, String> map = new ConcurrentHashMap<String, String>();

// for a larger number of threads, you can specify a higher concurrency level
int concurrencyLevel = 50;
Map<String, String> moreConcurrentMap =
      new ConcurrentHashMap<String, String>p(0.75f, 16, concurrencyLevel);

// a sparser map will use use more memory but result in faster puts and writes.
// but won't reduce contention for writes since they lock each segment
float sparse = 0.25f;
Map<String, String> moreConcurrentMap = 
     new ConcurrentHashMap<String, String>(sparse, 16);

ConcurrentSkipListMap

Java 1.6 introduced the ConcurrentSkipListMap, which is a good alternative to the ConcurrentHashMap when you need a NavigableMap implementation. NavigableMap instances are sorted maps, and provide operations to traverse and navigate through the entries in the map. When considering ConcurrentHashMap versus ConcurrentSkipListMap, your choice should depend on whether you need a sorted map or not.

The ConcurrentSkipListMap in general should not be preferred over ConcurrentHashMap, but is ideal if you need a sorted map. If you have a bit of time, I suggest you read on up skip lists, which are the basis for this map implementation. Skip lists are a data structure which use probabilistic balancing to avoid the need for rebalancing operations. Real life performance of skip lists is exceptional, and having a scalable concurrent sorted map is a boon for those designing high volume applications in Java.

If you have a bit of time, I suggest you read up on how skip lists work – it is a fascinating computer science subject and they are a relatively new invention, dating back only to 1990. There is a nice write-up of skip lists here.

Java Concurrency Part 5 – Blocking Queues

As discussed in Part 3, the thread pools introduced in Java 1.5 provided core support that was quickly a favourite of many java developers.

Internally, the implementations make smart use of another concurrency feature introduced in java 1.5 – Blocking Queues.

Queue
First, a brief review of what a standard queue is. In computer science, a queue is simply a collection that always adds elements to the end and always takes elements from the beginning. The expression First-In-First-Out (FIFO) is commonly used to describe a standard queue. Introduced in java 1.6 is Deque or double-ended queue and this interface is now implemented on LinkedList. Some queues in java allow for alternate ordering, such as using a Comparator or even writing your own ordering implementation. While that extended functionality is nice, what we’re focusing on today is how BlockingQueues really shine in concurrent development.

Blocking Queue
Blocking queues are queues that also expose functionality for blocking on requests to retrieve an element when no element is available with the additional option to limit the amount of time spent waiting. On a constrained size queue, this same blocking functionality is available when trying to add. Let’s dive right in and consider an example of BlockingQueue usage.

Let’s assume a simple scenario. You have a processing thread whose function is simply to execute commands.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

private BlockingQueue<Command> workQueue = new LinkedBlockingQueue<Command>();

public void addCommand(Command command) {
    workQueue.offer(command);
}

public Object call() throws Exception {
    try {
        Command command = workQueue.take();
        command.execute();
    } catch (InterruptedException e) {
        throw new WorkException(e);
    }
}

Granted, that is a very simple example, but it shows you the basics of using a BlockingQueue for multiple threads. Let’s try something a little more involved. In this example, we have a need to create a connection pool with limit. It should only create connections as needed. No client will wait longer than 5 seconds for an available connection.

private BlockingQueue<Connection> pool = new ArrayBlockingQueue<Connection>(10);
private AtomicInteger connCount = new AtomicInteger();

public Connection getConnection() {
    Connection conn = pool.poll(5, TimeUnit.SECONDS);
    if (conn == null) {
        synchronized (connCount) {
            if (connCount.get() < 10) {
                conn = getNewConnection();
                pool.offer(conn);
                connCount.incrementAndGet();
            }
        }
        if (conn == null) {
            throw new ConnUnavailException();
        } else {
            return conn;
        }
    }
}

Finally let’s consider a sample usage of an interesting implementation, the SynchronousQueue.

In this example, similar to our first, we want to execute a Command but need to know when it is done, waiting at most 2 minutes.

private BlockingQueue workQueue = new LinkedBlockingQueue();
private Map> commandQueueMap = new ConcurrentHashMap>(); 
	
public SynchronousQueue addCommand(Command command) {
    SynchronousQueue queue = new SynchronousQueue();
    commandQueueMap.put(command, queue);
    workQueue.offer(command);
    return queue;
}

public Object call() throws Exception {
    try {
        Command command = workQueue.take();
        Result result = command.execute();
        SynchronousQueue queue = commandQueueMap.get(command);
        queue.offer(result);
        return null;
    } catch (InterruptedException e) {
        throw new WorkException(e);
    }
}

Now the consumer can safely poll with timeout on its request to have its Command executed.

Command command;
SynchronousQueue queue = commandRunner.addCommand(command);
Result result = queue.poll(2, TimeUnit.MINUTES);
if (result == null) {
	throw new CommandTooLongException(command);
} else {
	return result;
}

As you’re starting to see, the BlockingQueues in java provide a lot of flexibility and give you relatively easy structures to serve many, if not all, of your needs in a multi-threaded application. There are some really neat BlockingQueues that we haven’t even reviewed such as PriorityBlockingQueue and DelayQueue. Take a look at them and get in touch. We love talking shop with fellow developers.

Java Concurrency Part 4 – Callable, Future

One of the beautiful things about Java from its very first release was the ease with which we could write multi-threaded programs and introduce asynchronous processing into our designs. The Thread class and Runnable interface combined with Java’s memory management model meant for straightforward thread programming. But as discussed in Part 3, neither the Thread class nor the Runnable interface allowed for thrown Exceptions or returned values. The lack of returned values was mildly annoying.

The lack of thrown checked exceptions was a little more serious. The contract was public void run() which meant you had to catch checked exceptions and do something with them. Even if you were careful and you stored these for later verification, you couldn’t force all uses of the class to check the exception. You could go through all your getters and throw the Exception if it existed on each one. Besides being cumbersome, even that wasn’t foolproof. You couldn’t enforce calls to any of these. Thread programmers would correctly call join() to wait for it complete and may then have gone on their merry way.

Not to worry though, after many years, this was finally addressed in the 1.5 release. With the introduction of the Callable and Future interfaces and their support in the thread pools discussed in our last post, both of these issues have been addressed quite elegantly.

Callable
The Callable interface declares public T call() throws Exception. Now we can return a result, have it strongly typed as declared in our implementation and even throw Exceptions. While there are some utility methods in the Executors class to convert your Runnable instances as discussed in Part 3, you would do well to review your current implementations of Runnable or subclasses of Thread. Why bother? Primarily to double check and remove the workaround you may have implemented to address the lack of support for thrown Exceptions. At the same time, you may wish to make use of the ability to return results right in the execution method eliminating any need to cast to retrieve values.

Future
Here’s where the combined power of the thread pools and Callable come together. Future is another new interface introduced in 1.5. When you submit a Callable to one of the thread pools, you are provided an instance of Future that is typed to the Callable you passed in. This object substitutes for an actual Thread instance that you would have used prior to 1.5. Whereas you previously had to do Thread.join() or Thread.join(long millis), now you may use them as in this example.

public class ServerAcceptingRequestsVerifier implements Callable {
	/**
	 * @return Boolean.TRUE is server is accepting requests
	 * Boolean.FALSE otherwise
	 */
	public Boolean call() throws Exception {
		Boolean isAcceptingRequests = null;
		... ask server about taking requests here
		return isAcceptingRequests;
	}
}
public Boolean isServerTakingRequests(String server) 
			throws UnresponsiveException, InterruptedException {
	ServerAcceptingRequestsVerifier acceptingRequestsVerifier = 
		new ServerAcceptingRequestsVerifier();
	Future future = 
		THREAD_POOL.submit(acceptingRequestsVerifier);
	try {
		Boolean isAcceptingRequests = future.get();
		//waits for the thread to complete, even if it hasn't started
		return isAcceptingRequests;
	} catch (ExecutionException e) {
		throw new UnresponsiveException(e.getCause());
	}

}

It’s also nice that we now have explicit TimeoutException if we decide to limit how long we’re willing to wait for completion.

try {
	Boolean isAcceptingRequests = future.get(5, TimeUnit.SECONDS);
	//this waits for 5 seconds, throwing TimeoutException if not done
	return isAcceptingRequests;
} catch (TimeoutException e) {
	LOGGER.warn("Timed out waiting for server check thread." +
		"We'll try to interrupt it.");
	future.cancel(true);
	return Boolean.FALSE;
} catch (ExecutionException e) {
	throw new UnresponsiveException(e.getCause());
}

In our next post, we’ll get into some of the new interfaces/classes that are used to make the thread pools work that are available for our use too.

Java Concurrency Part 3 – Thread Pools

One of the most generally useful concurrency enhancements delivered in Java 1.5 was the introduction of customizable thread pools. These thread pools give you quite a bit of control over things such as number of threads, reuse of threads, scheduling and thread construction. Let’s review these.

First, thread pools. Let’s dive right into java.util.concurrent.ExecutorService, which provides us the basic interface for a thread pool. All thread pools allow submitting Callable or Runnable instances for future execution. They also provide various pool management methods.

Pool Management

Various management methods exist for the pools. You can shutdown() the pool, which will reject any future submissions but complete processing of in-process executions and even those that had not yet started but were submitted before the shutdown was initiated. You can also more aggressively perform a shutdownNow(). This will also prevent any future submissions, but it has a few different, notable behaviours. It will not start execution of submitted but unstarted tasks. They will be in the returned list. It will also attempt to stop, or more precisely, Thread.interrupt() currently executing tasks. This is a best effort with no guarantee that these tasks will be successfully interrupted.

ThreadFactory

In a moment we will get into the java.util.concurrent.Executors builder class which can create various thread pool configurations, but first let’s focus for a second on using ThreadFactory. You’ll want to take advantage of ThreadFactory support in Executors and be in the habit of providing your own. The default ThreadFactory will give you give you an incrementing numbered pool naming scheme which is not all that helpful in logs or other monitoring. For the first pool created you’ll get threads named pool-1-thread-1, pool-1-thread-2 and the second one starts with pool-2-thread-1, etc. By providing your own ThreadFactory, you can have threads named like ReportProcessingThread1 and HttpThread1. Here’s a simple example:

private AtomicLong counter = new AtomicLong();
private String name;
public Thread newThread(Runnable r) {
	Thread t = new Thread(r);
	t.setName(name + counter.incrementAndGet());
	return t;
}

ThreadFactory will only be called when a new Thread is created. Given that the JDK thread pools will reuse threads whenever possible, this class cannot be used to manage the beginning of execution.

Executors Builder Methods

Now back to the Executors utility builder methods. They are:

  • newCachedThreadPool() will give you a thread pool that will reuse threads when possible, creating new ones as needed with no configured limit.
  • newFixedThreadPool(int nThreads) will give you a thread pool that will use only up to the number of threads specified but will accept as many tasks as submitted for execution running them in submission order.
  • newScheduledThreadPool(int corePoolSize) is used specifically for scheduling threads with delayed execution, on a recurring schedule on with recurring delay. The returned thread pool implements ScheduledExecutorService which exposes the additional scheduling methods schedule(Runnable command, long delay, TimeUnit unit), scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) and scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit).
  • newSingleThreadExecutor() and newSingleThreadScheduledExecutor(). These impose no limit on the number of tasks that can be submitted, only ensuring that a single thread/task is executing at a time.

Finally, there are a few helper methods for creating Callable instances from Runnable. This gets us into the newly created constructs for allowing threads to throw Exceptions and return values, something we had to work around quite painfully before. We’ll consider these and how they are used with these thread pools in our next post.

Java Concurrency Part 2 – Reentrant Locks

Java’s synchronized keyword is a wonderful tool – it allows us a simple and reliable way to synchronize access to critical sections and it’s not too hard to understand.

But sometimes we need more control over synchronization. Either we need to control types of access (read and write) separately, or it is cumbersome to use because either there is no obvious mutex or we need to maintain multiple mutexes.

Thankfully, lock utility classes were added in Java 1.5 and make these problems easier to solve.

Java Reentrant Locks

Java has a few lock implementations in the java.util.concurrent.locks package.

The general classes of locks are nicely laid out as interfaces:

  • Lock - the simplest case of a lock which can be acquired and released
  • ReadWriteLock - a lock implementation that has both read and write lock types – multiple read locks can be held at a time unless the exclusive write lock is held

Java provides two implementations of these locks that we care about – both of which are reentrant (this just means a thread can reacquire the same lock multiple times without any issue).

  • ReentrantLock - as you’d expect, a reentrant Lock implementation
  • ReentrantReadWriteLock - a reentrant ReadWriteLock implementation

Now, let’s see some examples.

An Read/Write Lock Example

So how does one use a lock? It’s pretty simple: just acquire and release (and never forget to release – finally is your friend!).

Imagine we have a very simple case where we need to synchronize access to a pair of variables. One is a simple value and another is derived based on some lengthy calculation. First, this is how we would perform that with the synchronized keyword.

public class Calculator {
    private int calculatedValue;
    private int value;

    public synchronized void calculate(int value) {
        this.value = value;
        this.calculatedValue = doMySlowCalculation(value);
    }

    public synchronized int getCalculatedValue() {
        return calculatedValue;
    }

    public synchronized int getValue() {
        return value;
    }
}

Simple, but if we have a lot of contention or if we perform a lot of reads and few writes, synchronization could hurt performance. Since frequently reads occur a lot more often than writes, Using a ReadWriteLock helps us minimize the issue:

public class Calculator {
    private int calculatedValue;
    private int value;
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    public void calculate(int value) {
        lock.writeLock().lock();
        try {
            this.value = value;
            this.calculatedValue = doMySlowCalculation(value);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public int getCalculatedValue() {
        lock.readLock().lock();
        try {
            return calculatedValue;
        } finally {
            lock.readLock().unlock();
        }
    }

    public int getValue() {
        lock.readLock().lock();
        try {
            return value;
        } finally {
            lock.readLock().unlock();
        }
    }
}

This example actually shows one big advantage using synchronized has: it is concise and more foolproof than using explicit locks. But locks give use flexibility we wouldn’t otherwise have.

In the example above, we can have hundreds of threads reading the same value at once with no issue, and we only block readers when we acquire the write lock. Remember that: many readers can acquire the read lock at the same time, but there are no readers OR writers allowed when acquiring the write lock.

A More Typical Use

Our first example may leave you confused or not totally convinced that explicit locks are useful. Aren’t there other uses for them that aren’t so contrived? Certainly!

We at Carfey have used explicit locks to solve many problems. One example is when you have various tasks which can run concurrently, but you don’t want more than one of the same type running at the same time. One clean way to implement it is with locks. It could be done with synchronized, but locks give us the ability to fail after timing out.

As a bonus, you’ll note we used a mix of synchronized and explicit locks – sometimes one is just cleaner and simpler than the other.

public class TaskRunner {
    private Map<Class<? extends Runnable>,  Lock> mLocks =
            new HashMap<Class<? extends Runnable>,  Lock>();

    public void runTaskUniquely(Runnable r, int secondsToWait) {
        Lock lock = getLock(r.getClass());
        boolean acquired = lock.tryLock(secondsToWait, TimeUnit.SECONDS);
        if (acquired) {
            try {
                r.run();
            } finally {
                lock.unlock();
            }
        } else {
            // failure code here
        }
    }

    private synchronized Lock getLock(Class clazz) {
        Lock l = mLocks.get(clazz);
        if (l == null) {
            l = new ReentrantLock();
            mLocks.put(clazz, l);
        }
        return l;
    }
}

These two examples should give you a pretty good idea of how to use both plan Locks and ReadWriteLocks. As with synchronized, don’t worry about reacquiring the same lock – there will be no issue in the locks provided in the JDK since they are reentrant.

Whenever you’re dealing with concurrency, there are dangers. Always remember the following:

  • Release all locks in finally block. This is rule 1 for a reason.
  • Beware of thread starvation! The fair setting in ReentrantLocks may be useful if you have many readers and occasional writers that you don’t want waiting forever. It’s possible a writer could wait a very long time (maybe forever) if there are constantly read locks held by other threads.
  • Use synchronized where possible. You will avoid bugs and keep your code cleaner.
  • Use tryLock() if you don’t want a thread waiting indefinitely to acquire a lock – this is similar to wait lock timeouts that databases have.

That’s about it! If you have questions or comments, feel free to leave them below.

Java Concurrency Part 1 – Semaphores

This is the first part in a series that we’re going to be doing on Java concurrency. Specifically, we are going to dive into the concurrency tools built into Java 1.5 and beyond. We’re going to assume you have a basic understanding of synchronization and volatile keywords.

The first post will cover semaphores - specifically counting semaphores. Semaphores are an often misunderstood and underused tool for restricting access to resources. They are ignored for other ways of controlling access to resources. But semaphores give us a toolset that goes beyond what normal synchronization and other tools can give us.

So what is semaphore? The simplest way to think of a semaphore is to consider it an abstraction that allows n units to be acquired, and offers acquire and release mechanisms. It safely allows us to ensure that only n processes can access a certain resource at a given time.

That’s all well and good, but what purpose would this serve? Well, here is one example that will help explain its uses. It uses the nicely designed Semaphore class introduced in 1.5, located in the java.util.concurrent package.

Limiting connections

Perhaps we have a process that downloads resources for us periodically via HTTP. We don’t want to spam any of the hosts and at the same time, we want to limit how many connections we are making so we don’t exhaust the limited file handles or outbound connections we are permitted. A simple way to do this would be with a semaphore:

public class ConnectionLimiter {
   private final Semaphore semaphore;

   private ConnectionLimiter(int maxConcurrentRequests) {
       semaphore = new Semaphore(maxConcurrentRequests);
   }

   public URLConnection acquire(URL url) throws InterruptedException,
                                                IOException {
       semaphore.acquire();
       return url.openConnection();
   }

   public void release(URLConnection conn) {
       try {
           /*
           * ... clean up here
           */
       } finally {
           semaphore.release();
       }
   }
}

This is a nice elegant solution to a problem of limited resources. The call to acquire() will block until permits are available. The beauty of the semaphore is that it hides all the complexity of managing access control, counting permits and, of course, getting the thread-safety right.

Dangers

As with most methods of locking or synchronization, there are some potential issues.

The number one thing to remember is, always release what you acquire. This is done by using try..finally constructs.

There are other less obvious problems that can befall you when using semaphores. The following class shows a deadlock that only the luckiest of you will avoid. You’ll notice that the two threads which acquire the two semaphore permits do so in opposite order. (try..finally is left out for the sake of brevity).

public static void main(String[] args) throws Exception {
   Semaphore s1 = new Semaphore(1);
   Semaphore s2 = new Semaphore(1);

   Thread t = new Thread(new DoubleResourceGrabber(s1, s2));
   // now reverse them ... here comes trouble!
   Thread t2 = new Thread(new DoubleResourceGrabber(s2, s1));

   t.start();
   t2.start();

   t.join();
   t2.join();
   System.out.println("We got lucky!");
}

private static class DoubleResourceGrabber implements Runnable {
   private Semaphore first;
   private Semaphore second;

   public DoubleResourceGrabber(Semaphore s1, Semaphore s2) {
       first = s1;
       second = s2;
   }

   public void run() {
       try {
           Thread t = Thread.currentThread();

           first.acquire();
           System.out.println(t + " acquired " + first);

           Thread.sleep(200); // demonstrate deadlock

           second.acquire();
           System.out.println(t + " acquired " + second);

           second.release();
           System.out.println(t + " released " + second);

           first.release();
           System.out.println(t + " released " + first);
       } catch (InterruptedException ex) {
           ex.printStackTrace();
       }
   }
}

If you run this, you will more than likely have a hung process. Issues of lock ordering apply to semaphores as much as regular mutexes or synchronization in Java. In some cases, timeouts (see note on tryAcquire() later in the article) can be used to prevent deadlocks from causing a process to hang up, but typically a deadlock is a symptom of a logic error which can be avoided. If you’re unfamiliar with deadlocks, I recommend you read up on them. Wikipedia has a decent article on deadlocks which applies to all languages equally.

The main things that you should be careful of when using semaphores (including binary semaphores, i.e. mutexes) are:

  • Not releasing after acquire (either missing release call or an exception is thrown and there is no finally block)
  • Long held semaphores, causing thread starvation
  • Deadlocks (as seen above)

Useful Tricks with Semaphores

One interesting property of Semaphores in Java is that release doesn’t have to be called by the same thread as acquire. This means you could have a thread limiter that pools or creates threads based on a semaphore by calling acquire(). Then, the running thread could release its own semaphore permit when it completes. This is a useful property that we don’t have with normal mutexes in Java.

Another trick is to increase the number of permits at runtime. Contrary to what you might guess, the number of permits in a semaphore isn’t fixed, and a call to release() will always increment the number of permits, even if no corresponding acquire() call was made. Note that this can also result in bugs if you are incorrectly calling release() when no acquire() was made.

Finally, there are a few useful methods to be familiar with in Java’s Semaphore. The method acquireInterruptibly() will acquire a resource, reattempting if it is interrupted. This means no outside handling of InterruptedException. The method tryAcquire() allows us to limit how long we will wait for a permit – we can either return immediately if there is no permit to obtain, or wait a specified timeout. If you somehow have known deadlocks that you can’t fix easily or track down, you could help prevent locking up processes by using tryAcquire() with suitable timeouts.

Uses

What are some possible uses for counting semaphores? The following come to mind:

  • Limiting concurrent access to disk (this can kill performance due to competing disk seeks)
  • Thread creation limiting
  • JDBC connection pooling / limiting
  • Network connection throttling
  • Throttling CPU or memory intensive tasks

Of course, a semaphore is a pretty low-level building block for access control and synchronization. Java provides us a wealth of concurrency mechanisms and strategies which were introduced in Java 1.5 and beyond. In the coming posts, we will be covering some of the more abstract methods of managing concurrency including Executors, BlockingQueues, Barriers, Futures and also some of the new concurrent Collection classes.

What uses have you found for semaphores? Let us know by leaving a comment – we love talking software.