Apr 19, 2015

SparkSQL as a foundation for analytics server

Imagine that you want to build an analytics server for the enterprise-scale data. In contrast to web-scale analytics with sql-on-hadoop, you probably have datasets which are of moderate size. Let's say high double-digit GBs. With 244 GB RAM instances available even on AWS (e.g. r3.8xlarge, or i2.8xlarge) you can fully expect to fit the largest dataset in memory. It's hardly a surprise that people have been talking about it for years [1]. Now a Java heap space of that size is arguably not a good idea because of garbage collection. But then again, with off-heap storage or rarely allocated primitive type arrays it could be manageable.

Query engine techniques we know and love

When you mention analytics I think of OLAP. When I think of OLAP I see columns and column-oriented storage. Column-oriented always rhymes with efficient compression in my book. After some initial Hadoop-inspired confusion even the sql-on-hadoop world is converging on essentially MPP relational query engine architecture. By now we got used to expecting from an analytics query engine typical features such as:
  • relational operators as the core computational model
  • optimizations biased towards in-memory processing
  • column oriented storage and data transfer; lots of compression [2]
  • once in memory, data is held in primitive type arrays; no Java auto-boxing ever
  • operators implemented to operate on batches of rows; hot loops compiled into byte code [3][4]
  • support for predicate pushdown and retrieval of a subset of columns only
  • query engine aware of dataset partitions and capable of locality-sensitive scheduling (aka "send computation to where the data is and not the other way around")

Why Spark?

So let's see how well Spark meets these expectations. By Spark here we definitely mean the new shining Spark SQL component [5] and its DataFrames API in particular. From our OLAP perspective we can think about Spark SQL as a relational MPP engine wrapped into the DateFrames API on the top and the Data Sources SPI on the bottom.

The DataFrames API accepts both SQL expressions and normal Scala method calls. All the usual relational algebra fun, nothing to see here. Except for what is known as data frame caching. In which case the data frame in question is put into in-memory column-oriented storage with appropriate compression applied.

Did you know Scala has quasiquotes? Neither did I. It turns out Scala macros can be useful in real life :) So the Spark optimizer will generate byte-code for your query. Something an average sql-on-hadoop engine can do only if it's called Impala :) And quasiquotes look so much cleaner than ASM.

The Data Source SPI is another important enabler. It's a means of plugging real data storage into Spark SQL engine. Among its few simple interfaces there is one which supports both predicate push down and retrieval of only a subset of all columns.

In general, it's safe to say that Spark is making real progress. It's already a very promising query engine and they are clearly paying attention to efficiency of in-memory execution. If they lack certain common capabilities now it is very likely the gap will be closed soon enough.

Why not?

The question I am the least comfortable with currently is data storage and the corresponding data transfer. When you have an essentially in-memory query engine it's only natural to expect an in-memory data storage closely integrated with it. 

Ideally, when you read from the storage you want to use the same binary format as the DataFrame cache. Whether you marshal data between the JVMs on the same node or run against out of heap storage you want to avoid/minimize any kind of transcoding.

For scalability and fault tolerance you want to split your dataset into multiple partitions. You want the query engine to be aware of those partitions so that it can send queries to the corresponding servers. Each partition could be replicated to reduce server crash recovery latency.

As of now, I don't quite understand the whole story about most promising Spark data storage. Clearly, Tachyon is on its way. I'm not entirely convinced that "a very fast HDFS" is how I imagine my data storage API though. GOOG will tell you that people also use Cassandra as a column storage with a good Spark connector.

It really feels like those are important considerations but not show-stoppers. Personally, I would expect more emphasis on data storage from the Spark team this year. It's just not clear to me right now how I would architect storage for a real analytics system on SparkSQL.

References

[1] SanssouciDB: An In-Memory Database for Processing Enterprise Workloads
[2] Integrating Compression and Execution in Column-Oriented Database Systems
[3] Vectorization vs. Compilation in Query Execution
[4] Hive Vectorized Query Execution Design
[5] Spark SQL: Relational Data Processing in Spark