Sep 17, 2016

Spark SQL connector design

Now that we have discussed the Data Source API in general it's time to consider what happens when you actually start coding. This post will rely on the first iteration of my experiment. I cannot promise that its design follows the only possible allocation of responsibilities. It will also not work in a real cluster because for the time being I am ignoring the need for a DFS. 

What is real are interactions between the abstractions comprising the connector, a few useful RDD/SparkContext API tidbits, and the problems one faces trying to write a connector for a non-trivial storage from scratch.

As I mentioned before, I am using Lucene to represent the data storage to integrate into Spark. For simplicity sake I am making a few assumptions here: only point fields are supported, a field visitor is used to retrieve values, no effort is made to implement a scalable iterator-based approach to search. 

Squeezing performance from Lucene quickly leads away from the standard API and so remains out of scope. I am actually curious about the latency achievable when millions of documents with DocValue fields are retrieved and one day I could end up trying it.

Connector structure

There are many Spark connectors like it, but this one is mine :) To begin with let me introduce key abstractions and their responsibilities:
  • DefaultSource - the connector entry point that is essentially a factory of LuceneRelation instances
  • LuceneRelation - represents an application-specific dataset (e.g. the data that belongs to a particular version of some tenant's data); technically, it incapsulates a LuceneRDD and its schema
  • LuceneRDD - an RDD of LucenePartitions; one way to think about it is a facade for multiple storage partitions. Every store or retrieve operation is delegated to all the partitions.
  • LucenePartition - a subset of original data operated on by Spark as an RDD partition (i.e. iterator of Rows); an entire partition can be retrieved from storage or its subset can be specified with a combination of required columns and row filters
  • Spark API extensions - a package object making possible to add methods to DataFrame and SQLContext

In addition to the aforementioned classes that mostly implement Data Source API interfaces there are a couple less generic classes:
  • LuceneSchema - the data schema representation used by the connector and comprised of a Spark schema (fields types) and a Lucene schema (can a field be retrieved and/or filtered on?). The latter is necessary because not all Lucene documents are required to have all the fields and so reverse engineering the schema from a Lucene index would be prohibitively expensive
  • LucenePartitionStorage - a means of using different storage implementations in read and write paths. In the read path the storage is an actual Lucene index. In the write path the storage is a temporary in-memory data structure holding DataFrame rows before they are written to Lucene index.
To better understand how these abstractions fit each other let's consider interactions among them in the write path and the read path separately. Both paths are used by a unit test and can be seen in action.

Write path

The write path starts from a DataFrame instance that you want to write to the storage. You also need to come up with a file system path (e.g. a tenant-specific subdirectory under some root location) and a Lucene schema (to let Lucene know which fields to index and/or store).

  • DefaultSource is asked for a new LuceneRelation instance
  • DefaultSource calls the extended DataFrame API to save the data frame
  • For each partition of the RDD wrapped by the data frame a new LucenePartition is created
  • A new LuceneRDD instance is created from the LucenePartitions using the data frame schema and the client-provided Lucene schema
  • The composite schema is written to a file (with the expectation to be shared by all partitions)
  • For each LucenePartition a subdirectory is created 
  • Every LucenePartition is written to a Lucene index created in the subdirectory
  • A new LuceneRelation is created to represent the written data

In the diagram above:
  • The RDD[Row] is the RDD wrapped by the data frame
  • The RDD[LP] stands for an RDD[LucenePartition]
  • When a LucenePartition instance is created from DataFrame rows it is backed by an in-memory storage implementation; currently it simply remembers the original row iterator
  • The in-memory storage only hold data temporarily before it's actually written to the Lucene index
  • The details of Lucene index writing are omitted because they are quite generic and have nothing Spark-specific

Read path

The read path is a mirror copy of the write path. It starts from the client calling the SparkContext to load a data frame from a given location.
  • DefaultSource is asked for a new LuceneRelation instance
  • A new LuceneRelation instance is created with a schema loaded from the given location
  • The number of LucenePartition is found by scanning the location for subdirectories
  • The sequence of found partition identifiers is parallelized to make loading of all partitions in parallel possible
  • A new LucenePartition is created for each found partition
  • A LuceneRDD instance is created from the LucenePartitions and the loaded schema
  • At this point the LuceneRelation is created and so is the data frame the client can see
  • The unit test demonstrates a few different ways to query after the loaded data frame is registered as a temporary SparkSQL table. All of them are internally translated into the same three method calls seen in the diagram above. The most interesting of the three is the buildScan method
  • The LuceneRDD delegates the call to its partitions and than flatMaps partial results into a single row iterator
  • Each partition compiles the required column list and the filters into a Lucene query, executes the query, collects the values from the matching documents, and converts every Lucene document into a Spark Row
As another side note, the Lucene query API is iterator-unfriendly. So in this version every partition collects all the Rows into an array which would probably kill it in real life if the query selectivity is low. 

Sep 15, 2016

Spark SQL Data Source design considerations

I like learning about query engine internals. Spark SQL Data Source API seems to be a reasonable gateway drug on the way to really understanding Spark executor and optimizer. In more practical terms, the efficiency of data storage integration is likely to dominate any high-performance Spark-based architecture. Especially with the core engine getting so many optimizations in v2.0.

Curiously enough, there is no official documentation other than the API scaladocs. Even though the API itself is comprised of just a handful of abstractions, in my limited experience getting a clear picture of what it takes to implement a reasonable data source integration is not easy. I have not seen too many descriptions and even my favorite ones tend to concentrate on the read path or rely on remote access to some actual storage. 

Browsing a few real integrations made me think that not all of them follow the same design. I am still trying to understand if the differences are purely cosmetic or there are indeed multiple ways to structure a Spark connector. With this question in mind I embarked on a little journey through the world of Spark data sources. I am trying to answer questions such as:
  • Assuming co-locating computations with the data is the shortest path to low latency, how do I integrate a data storage into Spark without making (too many) remote calls from the workers?
  • Can it be done without a distributed file system (assuming, say, AWS S3 instead)?
While reading Succinct tech report I liked their connector so much I decided to follow its design in my first iteration. I decided to use Lucene as my data storage because (A) it will force realistic constraints on my prototype (B) I am interested in using Lucene in OLAP context (C) apparently nobody tried it before so it could be fun.


Spark is a distributed query engine that by default expects an HDFS-style distributed file system to be available on worker nodes. A popular alternative is to call a remote storage using some TCP-based RPC protocol (e.g. ElasticSearch or SOLR). Some people use S3 instead of a DFS but that approach seems to be less prominent.

Spark API is somewhat confusing because it attempts to hide the complexity of distributed processing behind a rather generic RDD abstraction. And then builds even more abstractions on top of the RDD. In addition, the older low-level RDD API and the higher level modern DataFrame API are closely related. The DataFrame is an RDD<Row> with a schema. RDDs have partitions. The Data Source API operates on Relations.


  • Lucene is a library that relies on the usual local file system API to write and read index files
  • Documents do not have a schema because different documents can use different fields (the set of fields could be small enough to treat it as a schema conceptually but it's application-specific)
  • Document fields can be indexed (so you can filter on their values), stored (so you can retrieve a field value from a particular document), or both.
  • There are at least two very different flavors of numeric fields (i.e. LongPoint and NumericDocValuesField)
  • When you expect your queries to return millions of documents there are a few optimized ways to retrieve those efficiently. The more efficient, the less standard.
  • The standard search API can take a callback but there is no iterator-friendly entry point

First Spark Lucene connector iteration

I am learning some crucial details as I go. So the first iteration will be more of an educational exercise demonstrating internal data source interactions rather than a usable piece of software. It remains to be seen how far I'll be able to evolve it from there.

I don't have a DFS. I also believe that trying to write a "Lucene Directory implementation using HDFS I/O Stream API" is not a good idea. Regardless of whether you use HDFS or S3 I would expect that a more promising approach is to read/write Lucene index files locally and then upload/download them in the background.

Initially I will co-locate everything in my prototype and simply use the local file system for any files. My hypothesis is that having a Lucene index for each Spark partition is a reasonable approach. Once I have the simplest possible connector implementation I will see

  • If there are alternative connector designs 
  • What can be done about storing index files in S3 
  • What kind of performance is achievable with DocValues fields (that will likely require a fixed schema but latency should be worth it)

Sep 4, 2016

Asynchronous RPC server with GRPC

After a severe disappointment with the state of the Thrift RPC framework I was looking forward to the first stable release of GRPC. Of the original big three only the protobuf-based ecosystem seems to be still evolving enough to be a viable choice. With the arrival of v1.0 it is finally ready for production.

For better or worse despite using a binary transport protocol GRPC forces things with "HTTP" in their names upon us. That is unfortunate because a protocol designed for connecting to browsers outside of the data center will incur some penalties in complexity and probably latency on backend services not exposed to the public Internet. 

The question I want to investigate today is pretty much the same I had playing with Thrift last year. What does it take to configure and run a backend service exposing multiple end points with no blocking calls. Spoiler alert: it's so trivial I should probably double-check my understanding of the threading model to confirm GRPC is as reactive as it seems to be :)

To begin with I would like to illustrate a typical GRPC call. Let's assume we have a service API with one method. GRPC uses protobuf3 as its IDL and, other than not having required fields anymore, I find it hard to see any difference from protobuf2 or even Thrift. 

The first pleasant surprise is that a single maven plugin with a simple, mostly OS-specific configuration is all it takes to compile IDL into Java classes. For each XXX service you basically end up with two classes:
  • xxxProto - request and response classes
  • xxxGrpc - request handler base class and client-side factory method for different IO styles

The second pleasant surprise is how uniform server-side API is. When processing a request you are given an Observer with the same onNext/onComplete/onError methods used for all four currently supported RPC types. On the client side it's even simpler, all you need is a request instance and a guava callback instance.

  • a Channel is created for a TCP destination
  • a service stub using the new Channel is obtained
  • a callback instance is created to process server response
  • an RPC method is called on the stub
  • a mirror copy method is called on the request handler but with an observer instead of a callback
  • the handler calls either onNext/onSuccess or onError observer methods to finish processing
  • once the server response is received, GRPC invokes the callback on a thread from a pre-configured executor

Server-side initialization is equally terse. You give it a handler instance for each service end point, an executor, and a port to bind to. Four lines of code and you are in business.

In my example I created two nearly identical protobuf files to describe two service end points. There are two abstractions to establish a connection on the client-side and bootstrap the server on the server side. I followed the same CompletableFuture-based approach to implementing request handlers as discussed in the Thrift post. A unit test wires everything together and calls both end points in parallel.

So far so good. But what ointment would be without a fly? There are a couple of things that I either don't understand or need more time to sort out. And at least one of them is probably necessary for production deployments. 

A trivial note is that GRPC is only a second library I am aware of that depends on JUL (Parquet being the first but only temporarily).

One odd question I still have is how asynchronous Java GRPC implementation really is. What confuses me is that I can see an "asynchronous basics tutorial" for C++ only. Is it some special kind of asynchronicity attainable only by those who manage memory manually? Or is there some blocking still left in Java GRPC library?

A real question is the complexity of GRPC security. My example follows the lead of GRPC Java tutorial when it calls "usePlaintext()" on channel builder. At first glance I am not even sure if SSL/TLS is necessary for the traffic inside of the data center or whether AWS ELB could interfere with it. A topic for another day I guess.