Showing posts with label spark. Show all posts
Showing posts with label spark. Show all posts

Apr 16, 2018

Spark Data Source V2 - core interactions

With the general idea of Spark Data Source API v2 understood, it's time to make another step towards working code. Quite some time ago I was experimenting with the original Data Source API. It's only natural to see how that idea would look after the major API overhaul. I make the same assumptions as before so it's more of a prototype illustrating key interactions. 

Write path


In the write path a new Dataset is created from a few data rows generated by the client. An application-specific Lucene document schema is used to mark which columns to index, store or both. The Dataset is repartitioned and each partition is stored as an independent Lucene index directory.



  • Client - creates a new Dataset from a set of rows; configures data source-specific properties (Lucene document schema and the root directory for all index partitions)
  • LuceneDataSourceV2 - plugs LuceneDataSourceV2Writer into the standard API
  • LuceneDataSourceV2Writer - creates a new LuceneDataWriterFactory; on commit saves the Lucene schema into a file common for all partitions; on rollback would be responsible for deleting created files
  • LuceneDataWriterFactory - is sent to worker nodes where is creates a LuceneDataWriter for every partition and configures it to use a dedicated directory
  • LuceneDataWriter - creates a new Lucene index in given directory and writes every partition Row to the index
  • LuceneIndexWriter - is responsible for building a Lucene index represented by a directory in the local file system; it's basically the same class as used in my old experiment

Read path


In the read path a previously created directory structure is parsed to find the Lucene schema and then load a Dataset partition from every subdirectory. Optionally, (A) the data schema is pruned to retrieve only requested columns and (B) supported filters are pushed to the Lucene engine to retrieve only matching rows.


  • Client - loads a previously written Dataset from a given root directory
  • LuceneDataSourceV2 - plugs LuceneDataSourceV2Reader into the standard API
  • LuceneDataSourceV2Reader - loads Lucene schema from the root directory, prunes it to align with the schema requested by Spark; in case of predicate pushdown, recognizes supported filter types and lets the Spark engine know; scans the root directory for partition subdirectories, creates LuceneReadTask for each
  • LuceneReadTask - is sent to worker nodes where it uses a LuceneIndexReader to read data Rows from the corresponding Lucene index
  • LuceneIndexReader - wraps Lucene index access; pushes the predicates chosen by the LuceneDataSourceV2Reader to the Lucene engine

The first impression is that the new API is actually an API. It has a clear structure and can be extended with additional functionality. We can already see exciting work happening in 2.4 master. 

Dec 11, 2017

Spark Data Source API V2 - write query planning and execution

Before you can read anything you need to have it written in the first place. The previously considered read path is a little simpler but certainly shares a lot with the write path we'll delve into today. Initially I am going to concentrate on the basic flow and ignore predicate push-down and column pruning. As my running example I assume a trivial Spark expression that uses some existing Dataset such as "df.write.format("someDSV2").option("path", hdfsDir).save()".

A data source is represented in a logical plan by WriteToDataSourceV2 instance created by data frame writer. During planning DataSourceV2Strategy transforms the relation into a WriteToDataSourceV2Exec instance. The latter uses the Spark context to execute the plan associated with the Dataset. The resultant RDD determines the number of input partitions and so the number of required write tasks. In addition, WriteToDataSourceV2Exec helps with keeping track of WriterCommitMessages used to make writing data source partitions transactional.

Query planning


In this phase the goal is to create a WriteToDataSourceV2 from a Dataset and client-supplied options and then transform it into a WriteToDataSourceV2Exec. The process is initiated by saving a Dataset. It triggers both logical and physical planning stages.


  • DataFrameWriter searches for an implementation of the DataSourceV2 marker interface. The implementation is expected to also implement the WriteSupport mixin interface that is the actual entry point into writing functionality
  • WriteSupport is a factory of DataSourceV2Writer instances
  • DataSourceV2Writer can create a DataWriterFactory and is ultimately responsible for committing or rolling back write partition tasks
  • WriteToDataSourceV2 is the logical plan node that represents a data source
  • A QueryExecution is associated with the WriteToDataSourceV2 to help with coordination of query planning
  • SparkPlanner uses DataSourceV2Strategy to transform logical plan nodes into physical ones
  • DataSourceV2Strategy recognizes data source logical nodes and transforms them into physical ones
  • WriteToDataSourceV2 carries a DataSourceV2Writer and logical plan associated with the Dataset; both are given to WriteToDataSourceV2Exec
  • WriteToDataSourceV2Exec is the physical plan node that represents a data source

Query execution - Driver side


In this phase the goal is to obtain data partitions from the RDD associated with the Dataset and trigger execution of a write task for each partition. In this section we look at the Driver-side half of the process.


  • WriteToDataSourceV2Exec creates a DataWriterFactory and a write task for each partition
  • The Spark context executes the tasks on worker nodes and collects WriteCommitMessages from successfully finished ones 
  • WriteToDataSourceV2Exec uses DataSourceV2Writer to commit or rollback the write transaction using collected WriteCommitMessages

Query execution - Executor side


The second half of the execution phase takes place on the Executor side. 


  • DataWriterFactory is serialized to worker nodes where DataWritingSparkTask are executed
  • Executor provides a DataWritingSparkTask with a Row iterator and a partition index
  • The task acquires a DataWriter from the DataWriterFactory and writes all partition Rows with the DataWriter
  • The task calls the DataWriter to commit writing and propagates a WriteCommitMessage to the driver

Dec 7, 2017

Spark Data Source API V2 - read query planning and execution

Without much marketing noise the Spark team decided to dramatically overhaul the current data connector API in v2.3 The new version was quietly merged into master recently. In comparison with the original approach the new one definitely looks like a real API. Even though nobody will be surprised by further evolution of the version one can see in master it already looks irresistibly promising. So I decided to spend some time digging into it. 

You probably remember the high-level architecture of the Spark query compilation process. It follows the traditional DBMS approach of parsing a query into a logical plan, optimizing the logical plan into a physical plan, and executing the physical plan on worker nodes. I am going to structure my investigation along similar lines.

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

The following discussion does not shy away from relevant Spark query execution internals. Those are always fascinating even when not directly actionable. Moreover, the Data Source API implementation is a nice vertical slice of the generic Spark infrastructure. So this is a convenient opportunity to refresh my memory. I will omit fine details of the Spark internals in an attempt to highlight the big picture.

As my running example I assume a trivial Spark expression such as "SparkSession.read.format("someDSV2").load.count()".

Query plans and Data Sources in a 100 words or less


Logical and physical plan nodes are subtypes of the QueryPlan. Spark Planner is responsible for transforming a logical plan tree into a physical one. The planner relies on a configurable set of strategies for actual transformation rules.

A data source is represented in a logical plan by DataSourceV2Relation instance created by data frame reader. During planning DataSourceV2Strategy transforms the relation into a DataSourceV2ScanExec instance. The latter creates a DataSourceRDD instance that is used to actually compute the results in parallel from multiple partitions.

Logical query planning


In this phase the goal is to create a DataSourceV2Relation from client-supplied options. The process is initiated by loading a Dataset.

  • DataFrameReader searches for an implementation of the DataSourceV2 marker interface. The implementation is expected to also implement one of ReadSupport mixin interfaces that allow the engine to know optional capabilities of the data source
  • ReadSupport is a factory of DataSourceV2Reader instances
  • DataSourceV2Reader knows the data source schema and can create ReadTasks
  • DataSourceV2Relation is the logical plan node that represents a data source
  • Dataset owns a QueryExecution and is associated with the DataSourceV2Relation

Physical query planning


In this phase the goal is to transform a DataSourceV2Relation into a DataSourceV2ScanExec. The process is initiated by executing an action on the Dataset created by the previous phase.



  • Dataset owns a QueryExecution
  • QueryExecution coordinates query planning
  • SparkPlanner uses DataSourceV2Strategy to transform logical plan nodes into physical ones
  • DataSourceV2Strategy recognizes data source logical nodes and transforms them into physical ones
  • DataSourceV2Relation carries a DataSourceV2Reader that is given to DataSourceV2ScanExec
  • DataSourceV2ScanExec is the physical plan node that represents a data source

Query execution


In this phase the goal is to obtain data partitions from the RDD created by  DataSourceV2ScanExec and collect data Rows from all the partitions.

  • DataSourceV2ScanExec creates ReadTasks using its DataSourceV2Reader and converts the tasks into a DataSourceRDD
  • DataSourceRDD wraps each task into a DataSourceRDDPartition that knows its id
  • ReadTask represents a single data partition. A ReadTask is serialized and sent to a Spark worker.
  • After DataSourceRDD execution is triggered, on executor side the DataSourceRDD obtains a DataReader from the ReadTask
  • DataReader is an iterator of Rows from one data partition  


Sep 7, 2017

Tantalizing promise of GPU for analytics workload

For the last few years the trend in analytics systems has been to implement traditional MPP-style architecture as an open source, usually JVM-based product. Impala was the only exception. Surprisingly, its C++/LLVM foundation did not make it the leader of the "SQL on Hadoop" space which is probably a testament to modern Java performance. 

More recently, everyone began converging on the triad of columnar storage (off-heap when in-memory), vectorized operators, and operator byte code generation. Parquet / Arrow-style columnar format helps with compression and reducing disk-to-memory data transfer volume. Vectorization can take advantage of SIMD (e.g. see Lecture #21 - Vectorized Execution) or at the very least alleviate CPU cache line thrashing. 

By v2.0, Spark had covered most of the ground. It's interesting to note that even though cached dataframes are stored in  a compressed columnar format, physical operators are still not vectorized and so process a row at a time (e.g. see a rare glimpse of the internal machinery in the design doc attached to SPARK-14098 "Generate Java code to build CachedColumnarBatch"). 

Judging from SPARK-15687 "Columnar execution engine", the need for vectorized operators vis-a-vis whole stage code generation seems to be a topic to seariously debate this year. Nevertheless it would still be a well understood approach other features such as SPARK-19489 "Stable serialization format" could benefit from.

What is easy to miss behind all this progress is that the GPU revolution ignited by DL/ML quietly arrived to the analytics world too. My high-level understanding of the GPU is essentially "very fast operations on columnar data once you copy it from main memory to the GPU". Which almost looks like very advanced SIMD with high latency. So my initial expectation was that analytics queries would not be much faster because of the sheer data volume required by an average query.

I was clearly wrong. My favorite source of query engine benchmarking statistics shows that MapD and Brytlyt score very high. Both were designed from ground up for the GPU. One challenge with GPU-related technology is that it's way too low-level in comparison with any JVM-based development. 

So most people will probably end up just learning the big picture. For a reasonable overview of GPU usage in the OLAP context, I would recommend the Red Fox research project and their "Red Fox: An Execution Environment for Relational Query Processing on GPUs" paper in particular.

To the best of my knowledge, this topic is of limited interest for Spark project currently. At least partially because it would take columnar data formats and vectorized operators to align Spark internals with GPU coding idioms and data layouts. It probably makes sense to keep an eye SPARK-12620 "Proposal of GPU exploitation for Spark" and whatever else its author does even if that prototype did not succeed.

Sep 17, 2016

Spark SQL connector design

Now that we have discussed the Data Source API in general it's time to consider what happens when you actually start coding. This post will rely on the first iteration of my experiment. I cannot promise that its design follows the only possible allocation of responsibilities. It will also not work in a real cluster because for the time being I am ignoring the need for a DFS. 

What is real are interactions between the abstractions comprising the connector, a few useful RDD/SparkContext API tidbits, and the problems one faces trying to write a connector for a non-trivial storage from scratch.

As I mentioned before, I am using Lucene to represent the data storage to integrate into Spark. For simplicity sake I am making a few assumptions here: only point fields are supported, a field visitor is used to retrieve values, no effort is made to implement a scalable iterator-based approach to search. 

Squeezing performance from Lucene quickly leads away from the standard API and so remains out of scope. I am actually curious about the latency achievable when millions of documents with DocValue fields are retrieved and one day I could end up trying it.

Connector structure


There are many Spark connectors like it, but this one is mine :) To begin with let me introduce key abstractions and their responsibilities:
  • DefaultSource - the connector entry point that is essentially a factory of LuceneRelation instances
  • LuceneRelation - represents an application-specific dataset (e.g. the data that belongs to a particular version of some tenant's data); technically, it incapsulates a LuceneRDD and its schema
  • LuceneRDD - an RDD of LucenePartitions; one way to think about it is a facade for multiple storage partitions. Every store or retrieve operation is delegated to all the partitions.
  • LucenePartition - a subset of original data operated on by Spark as an RDD partition (i.e. iterator of Rows); an entire partition can be retrieved from storage or its subset can be specified with a combination of required columns and row filters
  • Spark API extensions - a package object making possible to add methods to DataFrame and SQLContext

In addition to the aforementioned classes that mostly implement Data Source API interfaces there are a couple less generic classes:
  • LuceneSchema - the data schema representation used by the connector and comprised of a Spark schema (fields types) and a Lucene schema (can a field be retrieved and/or filtered on?). The latter is necessary because not all Lucene documents are required to have all the fields and so reverse engineering the schema from a Lucene index would be prohibitively expensive
  • LucenePartitionStorage - a means of using different storage implementations in read and write paths. In the read path the storage is an actual Lucene index. In the write path the storage is a temporary in-memory data structure holding DataFrame rows before they are written to Lucene index.
To better understand how these abstractions fit each other let's consider interactions among them in the write path and the read path separately. Both paths are used by a unit test and can be seen in action.

Write path


The write path starts from a DataFrame instance that you want to write to the storage. You also need to come up with a file system path (e.g. a tenant-specific subdirectory under some root location) and a Lucene schema (to let Lucene know which fields to index and/or store).

  • DefaultSource is asked for a new LuceneRelation instance
  • DefaultSource calls the extended DataFrame API to save the data frame
  • For each partition of the RDD wrapped by the data frame a new LucenePartition is created
  • A new LuceneRDD instance is created from the LucenePartitions using the data frame schema and the client-provided Lucene schema
  • The composite schema is written to a file (with the expectation to be shared by all partitions)
  • For each LucenePartition a subdirectory is created 
  • Every LucenePartition is written to a Lucene index created in the subdirectory
  • A new LuceneRelation is created to represent the written data

In the diagram above:
  • The RDD[Row] is the RDD wrapped by the data frame
  • The RDD[LP] stands for an RDD[LucenePartition]
  • When a LucenePartition instance is created from DataFrame rows it is backed by an in-memory storage implementation; currently it simply remembers the original row iterator
  • The in-memory storage only hold data temporarily before it's actually written to the Lucene index
  • The details of Lucene index writing are omitted because they are quite generic and have nothing Spark-specific

Read path


The read path is a mirror copy of the write path. It starts from the client calling the SparkContext to load a data frame from a given location.
  • DefaultSource is asked for a new LuceneRelation instance
  • A new LuceneRelation instance is created with a schema loaded from the given location
  • The number of LucenePartition is found by scanning the location for subdirectories
  • The sequence of found partition identifiers is parallelized to make loading of all partitions in parallel possible
  • A new LucenePartition is created for each found partition
  • A LuceneRDD instance is created from the LucenePartitions and the loaded schema
  • At this point the LuceneRelation is created and so is the data frame the client can see
  • The unit test demonstrates a few different ways to query after the loaded data frame is registered as a temporary SparkSQL table. All of them are internally translated into the same three method calls seen in the diagram above. The most interesting of the three is the buildScan method
  • The LuceneRDD delegates the call to its partitions and than flatMaps partial results into a single row iterator
  • Each partition compiles the required column list and the filters into a Lucene query, executes the query, collects the values from the matching documents, and converts every Lucene document into a Spark Row
As another side note, the Lucene query API is iterator-unfriendly. So in this version every partition collects all the Rows into an array which would probably kill it in real life if the query selectivity is low. 

Sep 15, 2016

Spark SQL Data Source design considerations

I like learning about query engine internals. Spark SQL Data Source API seems to be a reasonable gateway drug on the way to really understanding Spark executor and optimizer. In more practical terms, the efficiency of data storage integration is likely to dominate any high-performance Spark-based architecture. Especially with the core engine getting so many optimizations in v2.0.

Curiously enough, there is no official documentation other than the API scaladocs. Even though the API itself is comprised of just a handful of abstractions, in my limited experience getting a clear picture of what it takes to implement a reasonable data source integration is not easy. I have not seen too many descriptions and even my favorite ones tend to concentrate on the read path or rely on remote access to some actual storage. 

Browsing a few real integrations made me think that not all of them follow the same design. I am still trying to understand if the differences are purely cosmetic or there are indeed multiple ways to structure a Spark connector. With this question in mind I embarked on a little journey through the world of Spark data sources. I am trying to answer questions such as:
  • Assuming co-locating computations with the data is the shortest path to low latency, how do I integrate a data storage into Spark without making (too many) remote calls from the workers?
  • Can it be done without a distributed file system (assuming, say, AWS S3 instead)?
While reading Succinct tech report I liked their connector so much I decided to follow its design in my first iteration. I decided to use Lucene as my data storage because (A) it will force realistic constraints on my prototype (B) I am interested in using Lucene in OLAP context (C) apparently nobody tried it before so it could be fun.

Spark


Spark is a distributed query engine that by default expects an HDFS-style distributed file system to be available on worker nodes. A popular alternative is to call a remote storage using some TCP-based RPC protocol (e.g. ElasticSearch or SOLR). Some people use S3 instead of a DFS but that approach seems to be less prominent.

Spark API is somewhat confusing because it attempts to hide the complexity of distributed processing behind a rather generic RDD abstraction. And then builds even more abstractions on top of the RDD. In addition, the older low-level RDD API and the higher level modern DataFrame API are closely related. The DataFrame is an RDD<Row> with a schema. RDDs have partitions. The Data Source API operates on Relations.

Lucene


  • Lucene is a library that relies on the usual local file system API to write and read index files
  • Documents do not have a schema because different documents can use different fields (the set of fields could be small enough to treat it as a schema conceptually but it's application-specific)
  • Document fields can be indexed (so you can filter on their values), stored (so you can retrieve a field value from a particular document), or both.
  • There are at least two very different flavors of numeric fields (i.e. LongPoint and NumericDocValuesField)
  • When you expect your queries to return millions of documents there are a few optimized ways to retrieve those efficiently. The more efficient, the less standard.
  • The standard search API can take a callback but there is no iterator-friendly entry point

First Spark Lucene connector iteration


I am learning some crucial details as I go. So the first iteration will be more of an educational exercise demonstrating internal data source interactions rather than a usable piece of software. It remains to be seen how far I'll be able to evolve it from there.

I don't have a DFS. I also believe that trying to write a "Lucene Directory implementation using HDFS I/O Stream API" is not a good idea. Regardless of whether you use HDFS or S3 I would expect that a more promising approach is to read/write Lucene index files locally and then upload/download them in the background.

Initially I will co-locate everything in my prototype and simply use the local file system for any files. My hypothesis is that having a Lucene index for each Spark partition is a reasonable approach. Once I have the simplest possible connector implementation I will see

  • If there are alternative connector designs 
  • What can be done about storing index files in S3 
  • What kind of performance is achievable with DocValues fields (that will likely require a fixed schema but latency should be worth it)

Aug 11, 2015

Writing Parquet file

Parquet is a column-oriented binary file format very popular in big data analytics circles. Nowadays it's probably impossible to find a sql-on-hadoop engine that does not support this format. The Parquet library makes it trivial to write Avro and Protocol Buffers records to a file. Most of the time you can't beat the simplicity of a "df.write().parquet("some-file.par")"-like Spark one-liner.

But it's also possible to find yourself in a situation when you want to export data from an existing system which does not use Avro-like records. It's actually quite easy to do. One would need to:
  • extend the WriteSupport class responsible for writing a record instance to Parquet output
  • extend the ParquetWriter class to instantiate the new WriteSupport subclass
  • create a schema representing your record fields
  • for each record instance, write the field values to RecordConsumer

For simplicity sake let's assume a record type without nested structures. The high-level writing sequence looks roughy like that:

Here Record Writer represents a collection of Field Writers. Each Field Writer implementation knows how to write a field of a particular primitive type. The sequence is straightforward, please look at the example source code for details. Notice that to write a null value for a field it's enough to just skip the field for that record.

One unexpected complication is that for no particular reason Parquet library uses java.util.logging. This is the first time in my life I see anybody using it. You are not likely to have a logging configuration for it in your code base. You will definitely want to separate Parquet logs from the rest because they could be quite verbose. I actually had to use a rather unpleasant way to configure logging properly. 

Jul 2, 2015

Running SparkSQL in standalone mode for development experiments

Spark has very decent documentation in general. Cloudera and Databricks blogs cover the rest of the finer points concerning configuration optimizations. But it took me a few passes through the documentation to learn how to configure my Spark development cluster. It'll probably take you longer to copy and unpack Spark archive to your boxes than to configure all you need for experimenting in standalone mode. But to save you a few minutes, I created this gist which summarizes my experience. 

To install Spark, on each server:
  • unpack Spark archive
  • "cp conf/spark-env.sh.template conf/spark-env.sh"
  • "vi conf/spark-env.sh"

To run Spark:
  • on master, "./sbin/start-master.sh"
  • on worker(s), "./bin/spark-class org.apache.spark.deploy.worker.Worker spark://127.0.0.1:8091"

The gist shows how to configure a few most useful things:
  • Spark Executor memory, in case you want to quickly experiment with it without changing spark-env.sh on all the workers
  • recommended non-default serializer for more realistic performance
  • remote JMX access to Spark Executor (NOT secure)
  • data frame cache batch size

Jun 30, 2015

Insufficient support for sparse data in sparksql

I was experimenting with sparse data in sparksql and I learned at least one surprising thing. I'm really interested in OLAP-style analytics use cases. If you squint enough SparkSQL looks like an in-memory analytics database. So even though SparkSQL is not a data storage product its DataFrame cache can be abused for doing in-memory filtering and aggregation of multidimensional data. 

In my experiment I started from an 820,000 rows by 900 columns Parquet file that simulated a very sparse denormalized OLAP dataset. I had a dozen or so dense columns of the long type representing dimension member ids. The rest were very sparse columns of the double type pretending to be metric values. I was impressed to see that Parquet compressed it into a 22 MB binary file.

The next thing I tried was actually loading that file as a DataFrame and caching it in memory to accelerate future processing. The good news is that reading such a file is a one-liner in Spark. The bad news is that it took 3 GB of SparkSQL cache storage. Initially I though that I had misconfigured something. After all to me 3 GB very much looks like 800K * 900 * 8 bytes with a modicum of compression. After looking at my executor's debug-level logs I could see that indeed what happened. Double-type columns were "compressed" with the PassThrough compression scheme which is a no-op.

I dug just a little bit deeper to see that no compression scheme is actually supported for floating point types. Well, who knows, even assertEquals treats floating types differently. Maybe data science is all about integer-type feature vectors. But while looking at the NullableColumnBuilder I also noticed that it is biased towards dense data and is rather unhelpful if what you have is sparse. It encodes a null value by remembering its int (i.e. 4-byte) position. So if you have an empty long/double (i.e. 8-byte) column, the best you can expect is to spend half of the memory a fully populated column would take.

I actually couldn't make the suggested workaround work in Java but I was not really trying. So I am still convinced that currently SparkSQL is rather sparse data-unfriendly. The corresponding compression functionality is not pluggable. I guess one would need to implement a new NullableColumnBuilder trait, extend that trait by new DoubleColumnBuilder-style classes, and instantiate those classes in ColumnBuilder::apply(). I feel like trying it to see if I can make something simple work.

May 3, 2015

Data storage for SparkSQL-based analytics

I have been mulling over the question of Spark-friendly data storage recently. My favorite use-case here is still the one where:
  • in a multi-tenant environment 
  • you can provision enough memory for all/most of the data to reside in RAM
  • your target system is expected to have response time of a few seconds, preferably less
When I imagine an ideal data storage in this context I see things such as:
  • In a steady state, all data is in  memory, backed by some disk-based cold storage.
  • Datasets are split into multiple partitions which are held on different servers. It makes possible to execute a query in parallel on multiple servers. In case of a crash, only a smallish data subset is lost and has to be recovered from the cold storage.
  • A catalog service shared between the storage component and the Spark scheduler. It would allow the scheduler to honor data locality for expensive computations.
  • Obvious features such as efficient column compression and support for predicate push-down
  • It's too much to ask, but using the same storage (or at least transfer) format as the internal Spark cache storage would be nice for server recovery time. 
Curiously enough, I don't see storage options discussed much in Spark documentation and books. Probably because the default Spark positioning is against MapReduce and so one is expected to have ORC/Parquet files on HDFS.

My impression is that there are three reasonable alternatives. All of them are work in progress to such an extent that only your own experiments can show how far they are from production-quality. It is not entirely clear how stable they are but there are companies running them in production despite of the official experimental/beta status. So I can imagine some startups betting on those technologies with expectation of growing together.
  • Spark cache itself. On the positive side, it's easy to use, it's integrated into the engine and it's believed to be memory-efficient because of column-orientation and compression. The problem is that it's not a real storage (try to imagine updates), it's called a cache for a reason. What is much worse is that if you lose you driver then you lose every single cached DataFrame it owned. It's a really big deal and there are only tepid answers right now.
  • Tachyon. You would think that for a project originated at the same lab the integration (and PR around it) would be top-notch. Apparently there are missing pieces and Tachyon is not explicitly mentioned in what passes for a roadmap.
  • Cassandra. With an official connector on github, the story here seems to be more clear. For people who already run it I would expect it to be not a bad idea. You already have an in-memory columnar storage that can process predicates and serve column subsets. Co-locate Spark workers and Cassandra nodes and data transfer might be not that expensive.
  • GridGain Ignite. Frankly, I guess I am not being serious here. I have never looked at their in-memory file system anyway. But according to some not-so-fringe voices..
When I just started thinking about it, serving Parquet files via Tachyon as your SparkSQL data storage sounded like a buzzword-compliant joke. I am not so sure anymore even though it still looks strange to me. I would say that the Cassandra option looks much more traditional and so likely to be production-worthy sooner than anything else. But I admit to having little certainty as to which one is solid enough to be used this year.

Apr 19, 2015

SparkSQL as a foundation for analytics server

Imagine that you want to build an analytics server for the enterprise-scale data. In contrast to web-scale analytics with sql-on-hadoop, you probably have datasets which are of moderate size. Let's say high double-digit GBs. With 244 GB RAM instances available even on AWS (e.g. r3.8xlarge, or i2.8xlarge) you can fully expect to fit the largest dataset in memory. It's hardly a surprise that people have been talking about it for years [1]. Now a Java heap space of that size is arguably not a good idea because of garbage collection. But then again, with off-heap storage or rarely allocated primitive type arrays it could be manageable.

Query engine techniques we know and love

When you mention analytics I think of OLAP. When I think of OLAP I see columns and column-oriented storage. Column-oriented always rhymes with efficient compression in my book. After some initial Hadoop-inspired confusion even the sql-on-hadoop world is converging on essentially MPP relational query engine architecture. By now we got used to expecting from an analytics query engine typical features such as:
  • relational operators as the core computational model
  • optimizations biased towards in-memory processing
  • column oriented storage and data transfer; lots of compression [2]
  • once in memory, data is held in primitive type arrays; no Java auto-boxing ever
  • operators implemented to operate on batches of rows; hot loops compiled into byte code [3][4]
  • support for predicate pushdown and retrieval of a subset of columns only
  • query engine aware of dataset partitions and capable of locality-sensitive scheduling (aka "send computation to where the data is and not the other way around")

Why Spark?

So let's see how well Spark meets these expectations. By Spark here we definitely mean the new shining Spark SQL component [5] and its DataFrames API in particular. From our OLAP perspective we can think about Spark SQL as a relational MPP engine wrapped into the DateFrames API on the top and the Data Sources SPI on the bottom.

The DataFrames API accepts both SQL expressions and normal Scala method calls. All the usual relational algebra fun, nothing to see here. Except for what is known as data frame caching. In which case the data frame in question is put into in-memory column-oriented storage with appropriate compression applied.

Did you know Scala has quasiquotes? Neither did I. It turns out Scala macros can be useful in real life :) So the Spark optimizer will generate byte-code for your query. Something an average sql-on-hadoop engine can do only if it's called Impala :) And quasiquotes look so much cleaner than ASM.

The Data Source SPI is another important enabler. It's a means of plugging real data storage into Spark SQL engine. Among its few simple interfaces there is one which supports both predicate push down and retrieval of only a subset of all columns.

In general, it's safe to say that Spark is making real progress. It's already a very promising query engine and they are clearly paying attention to efficiency of in-memory execution. If they lack certain common capabilities now it is very likely the gap will be closed soon enough.

Why not?

The question I am the least comfortable with currently is data storage and the corresponding data transfer. When you have an essentially in-memory query engine it's only natural to expect an in-memory data storage closely integrated with it. 

Ideally, when you read from the storage you want to use the same binary format as the DataFrame cache. Whether you marshal data between the JVMs on the same node or run against out of heap storage you want to avoid/minimize any kind of transcoding.

For scalability and fault tolerance you want to split your dataset into multiple partitions. You want the query engine to be aware of those partitions so that it can send queries to the corresponding servers. Each partition could be replicated to reduce server crash recovery latency.

As of now, I don't quite understand the whole story about most promising Spark data storage. Clearly, Tachyon is on its way. I'm not entirely convinced that "a very fast HDFS" is how I imagine my data storage API though. GOOG will tell you that people also use Cassandra as a column storage with a good Spark connector.

It really feels like those are important considerations but not show-stoppers. Personally, I would expect more emphasis on data storage from the Spark team this year. It's just not clear to me right now how I would architect storage for a real analytics system on SparkSQL.

References

[1] SanssouciDB: An In-Memory Database for Processing Enterprise Workloads
[2] Integrating Compression and Execution in Column-Oriented Database Systems
[3] Vectorization vs. Compilation in Query Execution
[4] Hive Vectorized Query Execution Design
[5] Spark SQL: Relational Data Processing in Spark