Now that we have discussed the Data Source API in general it's time to consider what happens when you actually start coding. This post will rely on the first iteration of my experiment. I cannot promise that its design follows the only possible allocation of responsibilities. It will also not work in a real cluster because for the time being I am ignoring the need for a DFS.
What is real are interactions between the abstractions comprising the connector, a few useful RDD/SparkContext API tidbits, and the problems one faces trying to write a connector for a non-trivial storage from scratch.
As I mentioned before, I am using Lucene to represent the data storage to integrate into Spark. For simplicity sake I am making a few assumptions here: only point fields are supported, a field visitor is used to retrieve values, no effort is made to implement a scalable iterator-based approach to search.
Squeezing performance from Lucene quickly leads away from the standard API and so remains out of scope. I am actually curious about the latency achievable when millions of documents with DocValue fields are retrieved and one day I could end up trying it.
Connector structure
There are many Spark connectors like it, but this one is mine :) To begin with let me introduce key abstractions and their responsibilities:
- DefaultSource - the connector entry point that is essentially a factory of LuceneRelation instances
- LuceneRelation - represents an application-specific dataset (e.g. the data that belongs to a particular version of some tenant's data); technically, it incapsulates a LuceneRDD and its schema
- LuceneRDD - an RDD of LucenePartitions; one way to think about it is a facade for multiple storage partitions. Every store or retrieve operation is delegated to all the partitions.
- LucenePartition - a subset of original data operated on by Spark as an RDD partition (i.e. iterator of Rows); an entire partition can be retrieved from storage or its subset can be specified with a combination of required columns and row filters
- Spark API extensions - a package object making possible to add methods to DataFrame and SQLContext
In addition to the aforementioned classes that mostly implement Data Source API interfaces there are a couple less generic classes:
- LuceneSchema - the data schema representation used by the connector and comprised of a Spark schema (fields types) and a Lucene schema (can a field be retrieved and/or filtered on?). The latter is necessary because not all Lucene documents are required to have all the fields and so reverse engineering the schema from a Lucene index would be prohibitively expensive
- LucenePartitionStorage - a means of using different storage implementations in read and write paths. In the read path the storage is an actual Lucene index. In the write path the storage is a temporary in-memory data structure holding DataFrame rows before they are written to Lucene index.
To better understand how these abstractions fit each other let's consider interactions among them in the write path and the read path separately. Both paths are used by a unit test and can be seen in action.
Write path
The write path starts from a DataFrame instance that you want to write to the storage. You also need to come up with a file system path (e.g. a tenant-specific subdirectory under some root location) and a Lucene schema (to let Lucene know which fields to index and/or store).
- DefaultSource is asked for a new LuceneRelation instance
- DefaultSource calls the extended DataFrame API to save the data frame
- For each partition of the RDD wrapped by the data frame a new LucenePartition is created
- A new LuceneRDD instance is created from the LucenePartitions using the data frame schema and the client-provided Lucene schema
- The composite schema is written to a file (with the expectation to be shared by all partitions)
- For each LucenePartition a subdirectory is created
- Every LucenePartition is written to a Lucene index created in the subdirectory
- A new LuceneRelation is created to represent the written data
In the diagram above:
- The RDD[Row] is the RDD wrapped by the data frame
- The RDD[LP] stands for an RDD[LucenePartition]
- When a LucenePartition instance is created from DataFrame rows it is backed by an in-memory storage implementation; currently it simply remembers the original row iterator
- The in-memory storage only hold data temporarily before it's actually written to the Lucene index
- The details of Lucene index writing are omitted because they are quite generic and have nothing Spark-specific
Read path
The read path is a mirror copy of the write path. It starts from the client calling the SparkContext to load a data frame from a given location.
- DefaultSource is asked for a new LuceneRelation instance
- A new LuceneRelation instance is created with a schema loaded from the given location
- The number of LucenePartition is found by scanning the location for subdirectories
- The sequence of found partition identifiers is parallelized to make loading of all partitions in parallel possible
-
A new LucenePartition is created for each found partition
- A LuceneRDD instance is created from the LucenePartitions and the loaded schema
- At this point the LuceneRelation is created and so is the data frame the client can see
- The unit test demonstrates a few different ways to query after the loaded data frame is registered as a temporary SparkSQL table. All of them are internally translated into the same three method calls seen in the diagram above. The most interesting of the three is the buildScan method
- The LuceneRDD delegates the call to its partitions and than flatMaps partial results into a single row iterator
- Each partition compiles the required column list and the filters into a Lucene query, executes the query, collects the values from the matching documents, and converts every Lucene document into a Spark Row