Sep 15, 2016

Spark SQL Data Source design considerations

I like learning about query engine internals. Spark SQL Data Source API seems to be a reasonable gateway drug on the way to really understanding Spark executor and optimizer. In more practical terms, the efficiency of data storage integration is likely to dominate any high-performance Spark-based architecture. Especially with the core engine getting so many optimizations in v2.0.

Curiously enough, there is no official documentation other than the API scaladocs. Even though the API itself is comprised of just a handful of abstractions, in my limited experience getting a clear picture of what it takes to implement a reasonable data source integration is not easy. I have not seen too many descriptions and even my favorite ones tend to concentrate on the read path or rely on remote access to some actual storage. 

Browsing a few real integrations made me think that not all of them follow the same design. I am still trying to understand if the differences are purely cosmetic or there are indeed multiple ways to structure a Spark connector. With this question in mind I embarked on a little journey through the world of Spark data sources. I am trying to answer questions such as:
  • Assuming co-locating computations with the data is the shortest path to low latency, how do I integrate a data storage into Spark without making (too many) remote calls from the workers?
  • Can it be done without a distributed file system (assuming, say, AWS S3 instead)?
While reading Succinct tech report I liked their connector so much I decided to follow its design in my first iteration. I decided to use Lucene as my data storage because (A) it will force realistic constraints on my prototype (B) I am interested in using Lucene in OLAP context (C) apparently nobody tried it before so it could be fun.


Spark is a distributed query engine that by default expects an HDFS-style distributed file system to be available on worker nodes. A popular alternative is to call a remote storage using some TCP-based RPC protocol (e.g. ElasticSearch or SOLR). Some people use S3 instead of a DFS but that approach seems to be less prominent.

Spark API is somewhat confusing because it attempts to hide the complexity of distributed processing behind a rather generic RDD abstraction. And then builds even more abstractions on top of the RDD. In addition, the older low-level RDD API and the higher level modern DataFrame API are closely related. The DataFrame is an RDD<Row> with a schema. RDDs have partitions. The Data Source API operates on Relations.


  • Lucene is a library that relies on the usual local file system API to write and read index files
  • Documents do not have a schema because different documents can use different fields (the set of fields could be small enough to treat it as a schema conceptually but it's application-specific)
  • Document fields can be indexed (so you can filter on their values), stored (so you can retrieve a field value from a particular document), or both.
  • There are at least two very different flavors of numeric fields (i.e. LongPoint and NumericDocValuesField)
  • When you expect your queries to return millions of documents there are a few optimized ways to retrieve those efficiently. The more efficient, the less standard.
  • The standard search API can take a callback but there is no iterator-friendly entry point

First Spark Lucene connector iteration

I am learning some crucial details as I go. So the first iteration will be more of an educational exercise demonstrating internal data source interactions rather than a usable piece of software. It remains to be seen how far I'll be able to evolve it from there.

I don't have a DFS. I also believe that trying to write a "Lucene Directory implementation using HDFS I/O Stream API" is not a good idea. Regardless of whether you use HDFS or S3 I would expect that a more promising approach is to read/write Lucene index files locally and then upload/download them in the background.

Initially I will co-locate everything in my prototype and simply use the local file system for any files. My hypothesis is that having a Lucene index for each Spark partition is a reasonable approach. Once I have the simplest possible connector implementation I will see

  • If there are alternative connector designs 
  • What can be done about storing index files in S3 
  • What kind of performance is achievable with DocValues fields (that will likely require a fixed schema but latency should be worth it)

No comments: