Java Concurrency Part 5 – Blocking Queues

As discussed in Part 3, the thread pools introduced in Java 1.5 provided core support that was quickly a favourite of many java developers.

Internally, the implementations make smart use of another concurrency feature introduced in java 1.5 – Blocking Queues.

Queue
First, a brief review of what a standard queue is. In computer science, a queue is simply a collection that always adds elements to the end and always takes elements from the beginning. The expression First-In-First-Out (FIFO) is commonly used to describe a standard queue. Introduced in java 1.6 is Deque or double-ended queue and this interface is now implemented on LinkedList. Some queues in java allow for alternate ordering, such as using a Comparator or even writing your own ordering implementation. While that extended functionality is nice, what we’re focusing on today is how BlockingQueues really shine in concurrent development.

Blocking Queue
Blocking queues are queues that also expose functionality for blocking on requests to retrieve an element when no element is available with the additional option to limit the amount of time spent waiting. On a constrained size queue, this same blocking functionality is available when trying to add. Let’s dive right in and consider an example of BlockingQueue usage.

Let’s assume a simple scenario. You have a processing thread whose function is simply to execute commands.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

private BlockingQueue<Command> workQueue = new LinkedBlockingQueue<Command>();

public void addCommand(Command command) {
    workQueue.offer(command);
}

public Object call() throws Exception {
    try {
        Command command = workQueue.take();
        command.execute();
    } catch (InterruptedException e) {
        throw new WorkException(e);
    }
}

Granted, that is a very simple example, but it shows you the basics of using a BlockingQueue for multiple threads. Let’s try something a little more involved. In this example, we have a need to create a connection pool with limit. It should only create connections as needed. No client will wait longer than 5 seconds for an available connection.

private BlockingQueue<Connection> pool = new ArrayBlockingQueue<Connection>(10);
private AtomicInteger connCount = new AtomicInteger();

public Connection getConnection() {
    Connection conn = pool.poll(5, TimeUnit.SECONDS);
    if (conn == null) {
        synchronized (connCount) {
            if (connCount.get() < 10) {
                conn = getNewConnection();
                pool.offer(conn);
                connCount.incrementAndGet();
            }
        }
        if (conn == null) {
            throw new ConnUnavailException();
        } else {
            return conn;
        }
    }
}

Finally let’s consider a sample usage of an interesting implementation, the SynchronousQueue.

In this example, similar to our first, we want to execute a Command but need to know when it is done, waiting at most 2 minutes.

private BlockingQueue workQueue = new LinkedBlockingQueue();
private Map> commandQueueMap = new ConcurrentHashMap>(); 
	
public SynchronousQueue addCommand(Command command) {
    SynchronousQueue queue = new SynchronousQueue();
    commandQueueMap.put(command, queue);
    workQueue.offer(command);
    return queue;
}

public Object call() throws Exception {
    try {
        Command command = workQueue.take();
        Result result = command.execute();
        SynchronousQueue queue = commandQueueMap.get(command);
        queue.offer(result);
        return null;
    } catch (InterruptedException e) {
        throw new WorkException(e);
    }
}

Now the consumer can safely poll with timeout on its request to have its Command executed.

Command command;
SynchronousQueue queue = commandRunner.addCommand(command);
Result result = queue.poll(2, TimeUnit.MINUTES);
if (result == null) {
	throw new CommandTooLongException(command);
} else {
	return result;
}

As you’re starting to see, the BlockingQueues in java provide a lot of flexibility and give you relatively easy structures to serve many, if not all, of your needs in a multi-threaded application. There are some really neat BlockingQueues that we haven’t even reviewed such as PriorityBlockingQueue and DelayQueue. Take a look at them and get in touch. We love talking shop with fellow developers.

Java Concurrency Part 4 – Callable, Future

One of the beautiful things about Java from its very first release was the ease with which we could write multi-threaded programs and introduce asynchronous processing into our designs. The Thread class and Runnable interface combined with Java’s memory management model meant for straightforward thread programming. But as discussed in Part 3, neither the Thread class nor the Runnable interface allowed for thrown Exceptions or returned values. The lack of returned values was mildly annoying.

The lack of thrown checked exceptions was a little more serious. The contract was public void run() which meant you had to catch checked exceptions and do something with them. Even if you were careful and you stored these for later verification, you couldn’t force all uses of the class to check the exception. You could go through all your getters and throw the Exception if it existed on each one. Besides being cumbersome, even that wasn’t foolproof. You couldn’t enforce calls to any of these. Thread programmers would correctly call join() to wait for it complete and may then have gone on their merry way.

Not to worry though, after many years, this was finally addressed in the 1.5 release. With the introduction of the Callable and Future interfaces and their support in the thread pools discussed in our last post, both of these issues have been addressed quite elegantly.

Callable
The Callable interface declares public T call() throws Exception. Now we can return a result, have it strongly typed as declared in our implementation and even throw Exceptions. While there are some utility methods in the Executors class to convert your Runnable instances as discussed in Part 3, you would do well to review your current implementations of Runnable or subclasses of Thread. Why bother? Primarily to double check and remove the workaround you may have implemented to address the lack of support for thrown Exceptions. At the same time, you may wish to make use of the ability to return results right in the execution method eliminating any need to cast to retrieve values.

Future
Here’s where the combined power of the thread pools and Callable come together. Future is another new interface introduced in 1.5. When you submit a Callable to one of the thread pools, you are provided an instance of Future that is typed to the Callable you passed in. This object substitutes for an actual Thread instance that you would have used prior to 1.5. Whereas you previously had to do Thread.join() or Thread.join(long millis), now you may use them as in this example.

public class ServerAcceptingRequestsVerifier implements Callable {
	/**
	 * @return Boolean.TRUE is server is accepting requests
	 * Boolean.FALSE otherwise
	 */
	public Boolean call() throws Exception {
		Boolean isAcceptingRequests = null;
		... ask server about taking requests here
		return isAcceptingRequests;
	}
}
public Boolean isServerTakingRequests(String server) 
			throws UnresponsiveException, InterruptedException {
	ServerAcceptingRequestsVerifier acceptingRequestsVerifier = 
		new ServerAcceptingRequestsVerifier();
	Future future = 
		THREAD_POOL.submit(acceptingRequestsVerifier);
	try {
		Boolean isAcceptingRequests = future.get();
		//waits for the thread to complete, even if it hasn't started
		return isAcceptingRequests;
	} catch (ExecutionException e) {
		throw new UnresponsiveException(e.getCause());
	}

}

It’s also nice that we now have explicit TimeoutException if we decide to limit how long we’re willing to wait for completion.

try {
	Boolean isAcceptingRequests = future.get(5, TimeUnit.SECONDS);
	//this waits for 5 seconds, throwing TimeoutException if not done
	return isAcceptingRequests;
} catch (TimeoutException e) {
	LOGGER.warn("Timed out waiting for server check thread." +
		"We'll try to interrupt it.");
	future.cancel(true);
	return Boolean.FALSE;
} catch (ExecutionException e) {
	throw new UnresponsiveException(e.getCause());
}

In our next post, we’ll get into some of the new interfaces/classes that are used to make the thread pools work that are available for our use too.

Java Concurrency Part 3 – Thread Pools

One of the most generally useful concurrency enhancements delivered in Java 1.5 was the introduction of customizable thread pools. These thread pools give you quite a bit of control over things such as number of threads, reuse of threads, scheduling and thread construction. Let’s review these.

First, thread pools. Let’s dive right into java.util.concurrent.ExecutorService, which provides us the basic interface for a thread pool. All thread pools allow submitting Callable or Runnable instances for future execution. They also provide various pool management methods.

Pool Management

Various management methods exist for the pools. You can shutdown() the pool, which will reject any future submissions but complete processing of in-process executions and even those that had not yet started but were submitted before the shutdown was initiated. You can also more aggressively perform a shutdownNow(). This will also prevent any future submissions, but it has a few different, notable behaviours. It will not start execution of submitted but unstarted tasks. They will be in the returned list. It will also attempt to stop, or more precisely, Thread.interrupt() currently executing tasks. This is a best effort with no guarantee that these tasks will be successfully interrupted.

ThreadFactory

In a moment we will get into the java.util.concurrent.Executors builder class which can create various thread pool configurations, but first let’s focus for a second on using ThreadFactory. You’ll want to take advantage of ThreadFactory support in Executors and be in the habit of providing your own. The default ThreadFactory will give you give you an incrementing numbered pool naming scheme which is not all that helpful in logs or other monitoring. For the first pool created you’ll get threads named pool-1-thread-1, pool-1-thread-2 and the second one starts with pool-2-thread-1, etc. By providing your own ThreadFactory, you can have threads named like ReportProcessingThread1 and HttpThread1. Here’s a simple example:

private AtomicLong counter = new AtomicLong();
private String name;
public Thread newThread(Runnable r) {
	Thread t = new Thread(r);
	t.setName(name + counter.incrementAndGet());
	return t;
}

ThreadFactory will only be called when a new Thread is created. Given that the JDK thread pools will reuse threads whenever possible, this class cannot be used to manage the beginning of execution.

Executors Builder Methods

Now back to the Executors utility builder methods. They are:

  • newCachedThreadPool() will give you a thread pool that will reuse threads when possible, creating new ones as needed with no configured limit.
  • newFixedThreadPool(int nThreads) will give you a thread pool that will use only up to the number of threads specified but will accept as many tasks as submitted for execution running them in submission order.
  • newScheduledThreadPool(int corePoolSize) is used specifically for scheduling threads with delayed execution, on a recurring schedule on with recurring delay. The returned thread pool implements ScheduledExecutorService which exposes the additional scheduling methods schedule(Runnable command, long delay, TimeUnit unit), scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) and scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit).
  • newSingleThreadExecutor() and newSingleThreadScheduledExecutor(). These impose no limit on the number of tasks that can be submitted, only ensuring that a single thread/task is executing at a time.

Finally, there are a few helper methods for creating Callable instances from Runnable. This gets us into the newly created constructs for allowing threads to throw Exceptions and return values, something we had to work around quite painfully before. We’ll consider these and how they are used with these thread pools in our next post.

Java Concurrency Part 1 – Semaphores

This is the first part in a series that we’re going to be doing on Java concurrency. Specifically, we are going to dive into the concurrency tools built into Java 1.5 and beyond. We’re going to assume you have a basic understanding of synchronization and volatile keywords.

The first post will cover semaphores - specifically counting semaphores. Semaphores are an often misunderstood and underused tool for restricting access to resources. They are ignored for other ways of controlling access to resources. But semaphores give us a toolset that goes beyond what normal synchronization and other tools can give us.

So what is semaphore? The simplest way to think of a semaphore is to consider it an abstraction that allows n units to be acquired, and offers acquire and release mechanisms. It safely allows us to ensure that only n processes can access a certain resource at a given time.

That’s all well and good, but what purpose would this serve? Well, here is one example that will help explain its uses. It uses the nicely designed Semaphore class introduced in 1.5, located in the java.util.concurrent package.

Limiting connections

Perhaps we have a process that downloads resources for us periodically via HTTP. We don’t want to spam any of the hosts and at the same time, we want to limit how many connections we are making so we don’t exhaust the limited file handles or outbound connections we are permitted. A simple way to do this would be with a semaphore:

public class ConnectionLimiter {
   private final Semaphore semaphore;

   private ConnectionLimiter(int maxConcurrentRequests) {
       semaphore = new Semaphore(maxConcurrentRequests);
   }

   public URLConnection acquire(URL url) throws InterruptedException,
                                                IOException {
       semaphore.acquire();
       return url.openConnection();
   }

   public void release(URLConnection conn) {
       try {
           /*
           * ... clean up here
           */
       } finally {
           semaphore.release();
       }
   }
}

This is a nice elegant solution to a problem of limited resources. The call to acquire() will block until permits are available. The beauty of the semaphore is that it hides all the complexity of managing access control, counting permits and, of course, getting the thread-safety right.

Dangers

As with most methods of locking or synchronization, there are some potential issues.

The number one thing to remember is, always release what you acquire. This is done by using try..finally constructs.

There are other less obvious problems that can befall you when using semaphores. The following class shows a deadlock that only the luckiest of you will avoid. You’ll notice that the two threads which acquire the two semaphore permits do so in opposite order. (try..finally is left out for the sake of brevity).

public static void main(String[] args) throws Exception {
   Semaphore s1 = new Semaphore(1);
   Semaphore s2 = new Semaphore(1);

   Thread t = new Thread(new DoubleResourceGrabber(s1, s2));
   // now reverse them ... here comes trouble!
   Thread t2 = new Thread(new DoubleResourceGrabber(s2, s1));

   t.start();
   t2.start();

   t.join();
   t2.join();
   System.out.println("We got lucky!");
}

private static class DoubleResourceGrabber implements Runnable {
   private Semaphore first;
   private Semaphore second;

   public DoubleResourceGrabber(Semaphore s1, Semaphore s2) {
       first = s1;
       second = s2;
   }

   public void run() {
       try {
           Thread t = Thread.currentThread();

           first.acquire();
           System.out.println(t + " acquired " + first);

           Thread.sleep(200); // demonstrate deadlock

           second.acquire();
           System.out.println(t + " acquired " + second);

           second.release();
           System.out.println(t + " released " + second);

           first.release();
           System.out.println(t + " released " + first);
       } catch (InterruptedException ex) {
           ex.printStackTrace();
       }
   }
}

If you run this, you will more than likely have a hung process. Issues of lock ordering apply to semaphores as much as regular mutexes or synchronization in Java. In some cases, timeouts (see note on tryAcquire() later in the article) can be used to prevent deadlocks from causing a process to hang up, but typically a deadlock is a symptom of a logic error which can be avoided. If you’re unfamiliar with deadlocks, I recommend you read up on them. Wikipedia has a decent article on deadlocks which applies to all languages equally.

The main things that you should be careful of when using semaphores (including binary semaphores, i.e. mutexes) are:

  • Not releasing after acquire (either missing release call or an exception is thrown and there is no finally block)
  • Long held semaphores, causing thread starvation
  • Deadlocks (as seen above)

Useful Tricks with Semaphores

One interesting property of Semaphores in Java is that release doesn’t have to be called by the same thread as acquire. This means you could have a thread limiter that pools or creates threads based on a semaphore by calling acquire(). Then, the running thread could release its own semaphore permit when it completes. This is a useful property that we don’t have with normal mutexes in Java.

Another trick is to increase the number of permits at runtime. Contrary to what you might guess, the number of permits in a semaphore isn’t fixed, and a call to release() will always increment the number of permits, even if no corresponding acquire() call was made. Note that this can also result in bugs if you are incorrectly calling release() when no acquire() was made.

Finally, there are a few useful methods to be familiar with in Java’s Semaphore. The method acquireInterruptibly() will acquire a resource, reattempting if it is interrupted. This means no outside handling of InterruptedException. The method tryAcquire() allows us to limit how long we will wait for a permit – we can either return immediately if there is no permit to obtain, or wait a specified timeout. If you somehow have known deadlocks that you can’t fix easily or track down, you could help prevent locking up processes by using tryAcquire() with suitable timeouts.

Uses

What are some possible uses for counting semaphores? The following come to mind:

  • Limiting concurrent access to disk (this can kill performance due to competing disk seeks)
  • Thread creation limiting
  • JDBC connection pooling / limiting
  • Network connection throttling
  • Throttling CPU or memory intensive tasks

Of course, a semaphore is a pretty low-level building block for access control and synchronization. Java provides us a wealth of concurrency mechanisms and strategies which were introduced in Java 1.5 and beyond. In the coming posts, we will be covering some of the more abstract methods of managing concurrency including Executors, BlockingQueues, Barriers, Futures and also some of the new concurrent Collection classes.

What uses have you found for semaphores? Let us know by leaving a comment – we love talking software.