Jun 29, 2016

Single producer multiple consumer queue with Disruptor

1. Disruptor with WorkPool


While the first use case that comes to your mind when you think about the Disruptor is "multiple highly concurrent readers seeing every event" there is another, less frequently mentioned feature. The WorkerPool is an abstraction that allows to configure multiple concurrent readers with the Disruptor load balancing among them. In such Disruptor setup, every event written to the ring buffer is processed by a single reader.

Imagine that you want to process a stream of records on as many CPU cores as you have got. One easy way to set up this configuration is to use the Disruptor. If nothing else, you should expect reasonable concurrency out of the box.

2. Disruptor initialization and shutdown 


The disruptor was not designed to have a complicated lifecycle. Its original architecture pretty much assumes that you start it, and then it runs until you shut down the entire thing. So if your input is not really a continuos stream but something more like a large file iterator then some attention should be paid to stopping the Disruptor safely.

If you have workers stateful enough to require a new group for every input stream you'll need to stop the Disruptor processing the previous stream before setting up a new one with a different WorkerPool. Moreover, the Disruptor is very good at saturating CPUs so if you allow it to use all of them you will probably want to have a request queue in front of it to prevent creation of multiple Disruptors.

In case of WorkerPool, you need a producer thread and a consumer ThreadFactory to be called by the Disruptor. If you represent your producer as a Runnable instance, you could re-use the same single-threaded executor for multiple requests. As you have no direct control over the threads created by the Disruptor with the ThreadFactory you will rely on the Disruptor shutdown call to stop them.

The Disruptor shutdown procedure is quite straightforward. Once you know that all the input events were put into the ring buffer you are supposed to shut down the Disruptor with a timeout long enough to allow the remaining events to be processed by the workers. One way to do it would be to create and wait for a latch on the client thread and make the producer thread count down the latch when it's done with the input. 

3. Disruptor WorkPool example


While experimenting with a real-life prototype I came up with a complete flow illustrating Disruptor WorkPool usage in a backend service.



  • Client - represents a backend service; owns the producer executor, catches exceptions
  • ParallelRequestRunner - hides a few steps required to process one input stream; calls a ParallelProcessor to process input, waits for the producer to finish
  • ParallelProcessor - creates a new Disruptor with a WorkerPool of a given size; submits a Runnable wrapper of producer for execution
  • DataRowEventConsumer - processes a single event at a time
  • DataRowEventProducer - reads input and puts one event at a time into the ring buffer
  • ParallelProcessingContext - keeps track of the participants to allow waiting for the producer to finish, triggers Disruptor shutdown, collects statistics from the consumers

Jun 12, 2016

Druid and Pinot low latency OLAP design ideas

0. Introduction

Imagine a data schema that can be described as (M, D1, .. , Dn, {(t,v)} ) where every entry has
  • M -  a measure (e.g. Site Visits)
  • Di - a few dimensions (e.g. Country, Traffic Source)
  • {(t,v)} - a time series with a measure value for each timestamp

Calling it a time series OLAP could be a misnomer. Time is just another dimension in a typical OLAP schema. Time series usually implies just a metric without dimensions (e.g. server CPU usage as a function of time). Nevertheless in real life systems dealing with such data is quite common. They also face similar challenges. Chief among them is the need for low latency/interactive queries. So traditional Hadoop scalability is not enough. There are a couple systems out there that are less well known than Spark or Flink but seem to be good at solving the time series OLAP puzzle.

1. Druid ideas 
  • For event streams, pre-aggregate small batches (at some minimum granularity e.g. 1 min)
  • Time is a dimension to be treated differently because all the queries use it
  • Partition on time into batches (+ versions), each to become a file
  • Each batch to have (IR-style): a dictionary of terms, a compressed bitmap for each term (and so query filters are compiled into efficient binary AND/OR operations on large bitmaps)
  • Column-oriented format for partitions
  • A separate service to decide which partitions to be stored /cached by which historic data server
  • Metadata is stored as BLOBs in a relational DB
  • ZK for service discovery and segment mappings
  • Historical and real-time nodes, queried by coordinators separately to produce final results

2. Pinot ideas
  • Kafka and Hadoop M/R based
  • Historical and real-time nodes, queried by brokers
  • Data segment : fixed-size blocks of records stored as a single file
  • Data segments
  • Time is special. Ex: brokers know how to query realtime and historic nodes for the same query by sending queries with different time filters. Realtime is slower because it aggregates at different granularity ("min/hour range instead of day")
  • ZK for service discovery
  • Realtime has segments in-memory, flushes to disk periodically, queries both.
  • Multiple segments (with the same schema) constitute a table
  • Column-oriented format for partition for segments
  • A few indices for a segment (forward, single-value sorted, )
3. Common threads
  • Column oriented storage formats
  • Partition on time
  • Information Retrieval/Search Engine techniques for data storage (inverted index)
  • ZK is a popular choice of discovery service
4. In other news

There also smaller scale products using search engine technology for OLAP data storage.