Jun 29, 2016

Single producer multiple consumer queue with Disruptor

1. Disruptor with WorkPool

While the first use case that comes to your mind when you think about the Disruptor is "multiple highly concurrent readers seeing every event" there is another, less frequently mentioned feature. The WorkerPool is an abstraction that allows to configure multiple concurrent readers with the Disruptor load balancing among them. In such Disruptor setup, every event written to the ring buffer is processed by a single reader.

Imagine that you want to process a stream of records on as many CPU cores as you have got. One easy way to set up this configuration is to use the Disruptor. If nothing else, you should expect reasonable concurrency out of the box.

2. Disruptor initialization and shutdown 

The disruptor was not designed to have a complicated lifecycle. Its original architecture pretty much assumes that you start it, and then it runs until you shut down the entire thing. So if your input is not really a continuos stream but something more like a large file iterator then some attention should be paid to stopping the Disruptor safely.

If you have workers stateful enough to require a new group for every input stream you'll need to stop the Disruptor processing the previous stream before setting up a new one with a different WorkerPool. Moreover, the Disruptor is very good at saturating CPUs so if you allow it to use all of them you will probably want to have a request queue in front of it to prevent creation of multiple Disruptors.

In case of WorkerPool, you need a producer thread and a consumer ThreadFactory to be called by the Disruptor. If you represent your producer as a Runnable instance, you could re-use the same single-threaded executor for multiple requests. As you have no direct control over the threads created by the Disruptor with the ThreadFactory you will rely on the Disruptor shutdown call to stop them.

The Disruptor shutdown procedure is quite straightforward. Once you know that all the input events were put into the ring buffer you are supposed to shut down the Disruptor with a timeout long enough to allow the remaining events to be processed by the workers. One way to do it would be to create and wait for a latch on the client thread and make the producer thread count down the latch when it's done with the input. 

3. Disruptor WorkPool example

While experimenting with a real-life prototype I came up with a complete flow illustrating Disruptor WorkPool usage in a backend service.

  • Client - represents a backend service; owns the producer executor, catches exceptions
  • ParallelRequestRunner - hides a few steps required to process one input stream; calls a ParallelProcessor to process input, waits for the producer to finish
  • ParallelProcessor - creates a new Disruptor with a WorkerPool of a given size; submits a Runnable wrapper of producer for execution
  • DataRowEventConsumer - processes a single event at a time
  • DataRowEventProducer - reads input and puts one event at a time into the ring buffer
  • ParallelProcessingContext - keeps track of the participants to allow waiting for the producer to finish, triggers Disruptor shutdown, collects statistics from the consumers

No comments: