Sep 3, 2015

Customizing Lucene scoring loop

By default, one thinks about Lucene as a full-text search engine. You give it many text files and it will be able to find the most relevant ones for a query. The API is quite straightforward and there is no need to know anything about what happens underneath it.

There is another, arguably less orthodox, way to think about it. Lucene is widely known for its search speed. So instead of looking for the most relevant text documents one could use it as a more generic data storage with excellent indices. Some people used it this way for storing time series.

Once you start this kind of development you quickly recognize the need for calling Lucene efficiently. It's trivial to have millions of matching documents for your query. So if you want to extract data from all of them and not just a few top-scoring ones you will be wise to pay attention to new object allocation. It start from simple things such as caching and re-using Lucene document and filed instances. Then you stop calling search methods returning arrays and switch to callback-based ones.

And then there is the next stage when you find out that it's possible to go a level down and abuse Lucene a little. This approach is usually referred to as "custom scoring loop". Oddly enough, by far the best explanation of it I have seen is a blog post. I can't recommend enough that description, the author knows his stuff and goes quite deep into Lucene internals. 

The idea is to:

  • create a special kind of Query that can register a listener for matching documents
  • when the listener is called, instead of scoring a document in a different way, read the fields required for processing; if possible, process them right there
  • ignore the results returned by IndexSearcher

It's actually surprisingly simple to implement this idea. It's enough to extend three classes. The sequence diagram below shows a representative control flow:

  • extend StoredFieldVisitor to have a Visitor that knows the Lucene document fields you want to read
  • create a Processor that owns the Visitor and can be notified about availability of document field values
  • extend CustomScoreProvider to have a matching document listener; when called, apply the Visitor to read field values and make the Processor use them
  • extend CustomScoreQuery to register the listener with Lucene searcher
  • call IndexSearcher with an instance of the new query type; once finished, enjoy the data gathered by the Processor

Aug 11, 2015

Writing Parquet file

Parquet is a column-oriented binary file format very popular in big data analytics circles. Nowadays it's probably impossible to find a sql-on-hadoop engine that does not support this format. The Parquet library makes it trivial to write Avro and Protocol Buffers records to a file. Most of the time you can't beat the simplicity of a "df.write().parquet("some-file.par")"-like Spark one-liner.

But it's also possible to find yourself in a situation when you want to export data from an existing system which does not use Avro-like records. It's actually quite easy to do. One would need to:
  • extend the WriteSupport class responsible for writing a record instance to Parquet output
  • extend the ParquetWriter class to instantiate the new WriteSupport subclass
  • create a schema representing your record fields
  • for each record instance, write the field values to RecordConsumer

For simplicity sake let's assume a record type without nested structures. The high-level writing sequence looks roughy like that:

Here Record Writer represents a collection of Field Writers. Each Field Writer implementation knows how to write a field of a particular primitive type. The sequence is straightforward, please look at the example source code for details. Notice that to write a null value for a field it's enough to just skip the field for that record.

One unexpected complication is that for no particular reason Parquet library uses java.util.logging. This is the first time in my life I see anybody using it. You are not likely to have a logging configuration for it in your code base. You will definitely want to separate Parquet logs from the rest because they could be quite verbose. I actually had to use a rather unpleasant way to configure logging properly. 

Jul 2, 2015

Running SparkSQL in standalone mode for development experiments

Spark has very decent documentation in general. Cloudera and Databricks blogs cover the rest of the finer points concerning configuration optimizations. But it took me a few passes through the documentation to learn how to configure my Spark development cluster. It'll probably take you longer to copy and unpack Spark archive to your boxes than to configure all you need for experimenting in standalone mode. But to save you a few minutes, I created this gist which summarizes my experience. 

To install Spark, on each server:
  • unpack Spark archive
  • "cp conf/ conf/"
  • "vi conf/"

To run Spark:
  • on master, "./sbin/"
  • on worker(s), "./bin/spark-class org.apache.spark.deploy.worker.Worker spark://"

The gist shows how to configure a few most useful things:
  • Spark Executor memory, in case you want to quickly experiment with it without changing on all the workers
  • recommended non-default serializer for more realistic performance
  • remote JMX access to Spark Executor (NOT secure)
  • data frame cache batch size

Jun 30, 2015

Insufficient support for sparse data in sparksql

I was experimenting with sparse data in sparksql and I learned at least one surprising thing. I'm really interested in OLAP-style analytics use cases. If you squint enough SparkSQL looks like an in-memory analytics database. So even though SparkSQL is not a data storage product its DataFrame cache can be abused for doing in-memory filtering and aggregation of multidimensional data. 

In my experiment I started from an 820,000 rows by 900 columns Parquet file that simulated a very sparse denormalized OLAP dataset. I had a dozen or so dense columns of the long type representing dimension member ids. The rest were very sparse columns of the double type pretending to be metric values. I was impressed to see that Parquet compressed it into a 22 MB binary file.

The next thing I tried was actually loading that file as a DataFrame and caching it in memory to accelerate future processing. The good news is that reading such a file is a one-liner in Spark. The bad news is that it took 3 GB of SparkSQL cache storage. Initially I though that I had misconfigured something. After all to me 3 GB very much looks like 800K * 900 * 8 bytes with a modicum of compression. After looking at my executor's debug-level logs I could see that indeed what happened. Double-type columns were "compressed" with the PassThrough compression scheme which is a no-op.

I dug just a little bit deeper to see that no compression scheme is actually supported for floating point types. Well, who knows, even assertEquals treats floating types differently. Maybe data science is all about integer-type feature vectors. But while looking at the NullableColumnBuilder I also noticed that it is biased towards dense data and is rather unhelpful if what you have is sparse. It encodes a null value by remembering its int (i.e. 4-byte) position. So if you have an empty long/double (i.e. 8-byte) column, the best you can expect is to spend half of the memory a fully populated column would take.

I actually couldn't make the suggested workaround work in Java but I was not really trying. So I am still convinced that currently SparkSQL is rather sparse data-unfriendly. The corresponding compression functionality is not pluggable. I guess one would need to implement a new NullableColumnBuilder trait, extend that trait by new DoubleColumnBuilder-style classes, and instantiate those classes in ColumnBuilder::apply(). I feel like trying it to see if I can make something simple work.

May 3, 2015

Data storage for SparkSQL-based analytics

I have been mulling over the question of Spark-friendly data storage recently. My favorite use-case here is still the one where:
  • in a multi-tenant environment 
  • you can provision enough memory for all/most of the data to reside in RAM
  • your target system is expected to have response time of a few seconds, preferably less
When I imagine an ideal data storage in this context I see things such as:
  • In a steady state, all data is in  memory, backed by some disk-based cold storage.
  • Datasets are split into multiple partitions which are held on different servers. It makes possible to execute a query in parallel on multiple servers. In case of a crash, only a smallish data subset is lost and has to be recovered from the cold storage.
  • A catalog service shared between the storage component and the Spark scheduler. It would allow the scheduler to honor data locality for expensive computations.
  • Obvious features such as efficient column compression and support for predicate push-down
  • It's too much to ask, but using the same storage (or at least transfer) format as the internal Spark cache storage would be nice for server recovery time. 
Curiously enough, I don't see storage options discussed much in Spark documentation and books. Probably because the default Spark positioning is against MapReduce and so one is expected to have ORC/Parquet files on HDFS.

My impression is that there are three reasonable alternatives. All of them are work in progress to such an extent that only your own experiments can show how far they are from production-quality. It is not entirely clear how stable they are but there are companies running them in production despite of the official experimental/beta status. So I can imagine some startups betting on those technologies with expectation of growing together.
  • Spark cache itself. On the positive side, it's easy to use, it's integrated into the engine and it's believed to be memory-efficient because of column-orientation and compression. The problem is that it's not a real storage (try to imagine updates), it's called a cache for a reason. What is much worse is that if you lose you driver then you lose every single cached DataFrame it owned. It's a really big deal and there are only tepid answers right now.
  • Tachyon. You would think that for a project originated at the same lab the integration (and PR around it) would be top-notch. Apparently there are missing pieces and Tachyon is not explicitly mentioned in what passes for a roadmap.
  • Cassandra. With an official connector on github, the story here seems to be more clear. For people who already run it I would expect it to be not a bad idea. You already have an in-memory columnar storage that can process predicates and serve column subsets. Co-locate Spark workers and Cassandra nodes and data transfer might be not that expensive.
  • GridGain Ignite. Frankly, I guess I am not being serious here. I have never looked at their in-memory file system anyway. But according to some not-so-fringe voices..
When I just started thinking about it, serving Parquet files via Tachyon as your SparkSQL data storage sounded like a buzzword-compliant joke. I am not so sure anymore even though it still looks strange to me. I would say that the Cassandra option looks much more traditional and so likely to be production-worthy sooner than anything else. But I admit to having little certainty as to which one is solid enough to be used this year.

Apr 19, 2015

SparkSQL as a foundation for analytics server

Imagine that you want to build an analytics server for the enterprise-scale data. In contrast to web-scale analytics with sql-on-hadoop, you probably have datasets which are of moderate size. Let's say high double-digit GBs. With 244 GB RAM instances available even on AWS (e.g. r3.8xlarge, or i2.8xlarge) you can fully expect to fit the largest dataset in memory. It's hardly a surprise that people have been talking about it for years [1]. Now a Java heap space of that size is arguably not a good idea because of garbage collection. But then again, with off-heap storage or rarely allocated primitive type arrays it could be manageable.

Query engine techniques we know and love

When you mention analytics I think of OLAP. When I think of OLAP I see columns and column-oriented storage. Column-oriented always rhymes with efficient compression in my book. After some initial Hadoop-inspired confusion even the sql-on-hadoop world is converging on essentially MPP relational query engine architecture. By now we got used to expecting from an analytics query engine typical features such as:
  • relational operators as the core computational model
  • optimizations biased towards in-memory processing
  • column oriented storage and data transfer; lots of compression [2]
  • once in memory, data is held in primitive type arrays; no Java auto-boxing ever
  • operators implemented to operate on batches of rows; hot loops compiled into byte code [3][4]
  • support for predicate pushdown and retrieval of a subset of columns only
  • query engine aware of dataset partitions and capable of locality-sensitive scheduling (aka "send computation to where the data is and not the other way around")

Why Spark?

So let's see how well Spark meets these expectations. By Spark here we definitely mean the new shining Spark SQL component [5] and its DataFrames API in particular. From our OLAP perspective we can think about Spark SQL as a relational MPP engine wrapped into the DateFrames API on the top and the Data Sources SPI on the bottom.

The DataFrames API accepts both SQL expressions and normal Scala method calls. All the usual relational algebra fun, nothing to see here. Except for what is known as data frame caching. In which case the data frame in question is put into in-memory column-oriented storage with appropriate compression applied.

Did you know Scala has quasiquotes? Neither did I. It turns out Scala macros can be useful in real life :) So the Spark optimizer will generate byte-code for your query. Something an average sql-on-hadoop engine can do only if it's called Impala :) And quasiquotes look so much cleaner than ASM.

The Data Source SPI is another important enabler. It's a means of plugging real data storage into Spark SQL engine. Among its few simple interfaces there is one which supports both predicate push down and retrieval of only a subset of all columns.

In general, it's safe to say that Spark is making real progress. It's already a very promising query engine and they are clearly paying attention to efficiency of in-memory execution. If they lack certain common capabilities now it is very likely the gap will be closed soon enough.

Why not?

The question I am the least comfortable with currently is data storage and the corresponding data transfer. When you have an essentially in-memory query engine it's only natural to expect an in-memory data storage closely integrated with it. 

Ideally, when you read from the storage you want to use the same binary format as the DataFrame cache. Whether you marshal data between the JVMs on the same node or run against out of heap storage you want to avoid/minimize any kind of transcoding.

For scalability and fault tolerance you want to split your dataset into multiple partitions. You want the query engine to be aware of those partitions so that it can send queries to the corresponding servers. Each partition could be replicated to reduce server crash recovery latency.

As of now, I don't quite understand the whole story about most promising Spark data storage. Clearly, Tachyon is on its way. I'm not entirely convinced that "a very fast HDFS" is how I imagine my data storage API though. GOOG will tell you that people also use Cassandra as a column storage with a good Spark connector.

It really feels like those are important considerations but not show-stoppers. Personally, I would expect more emphasis on data storage from the Spark team this year. It's just not clear to me right now how I would architect storage for a real analytics system on SparkSQL.


[1] SanssouciDB: An In-Memory Database for Processing Enterprise Workloads
[2] Integrating Compression and Execution in Column-Oriented Database Systems
[3] Vectorization vs. Compilation in Query Execution
[4] Hive Vectorized Query Execution Design
[5] Spark SQL: Relational Data Processing in Spark

Mar 11, 2015

Scala is hot in analytics companies

It is hard to believe it's been six years since I started worrying that Java is obsolete :) We lived long enough to see Java8 in production. A couple of years ago I thought that Scala flat-lined. I remember keynotes on never getting to mainstream and compiler folks leaving. But this year I can see a very different picture. Most of the companies dealing with analytics are writing Scala. A great many are either already running Spark in production or prototyping it. At this point it feels like a very good time to join such a company. There are too few people around with real Scala experience to make it a strict requirement.

It is interesting to observe that some arguments against Scala sound so familiar from good old C++ days. Some companies ban use of a certain feature subset (e.g. implicits). Some folks have trouble with a multi-paradigm language where there is so much choice. Few people are excited about function signatures in the standard library (remember STL? partially specialized template anyone?). I know good engineers are supposed to value simplicity. I must be a bad one, I am attracted to complexity. Scala reminds me C++ in this respect. There is always some language tidbit to pick up every time you read a book. In Java only the j.u.c package could fill one with so much joy for years :)

Mar 10, 2015

Research projects inch towards mainstream big data analytics

Just recently I wrote about a previously small research prototype that grew up into a much larger system. It took me some time to recognize that another university project from a few years ago got even closer to the world of real software. After rebranding it is known as Flink and resides at Apache Incubator.

It raises a few interesting questions about evolution of what could be liberally called sql-on-hadoop query engines. A growing number of open source systems used for analytics look architecturally very similar. If you squint enough, each and every of Presto/Impala/Hive/Drill/Spark/AsterixDB/Flink and their ilk is fundamentally an relational MPP database. Or at least its query engine half.

Some of them started with a poor computational model. To be more precise, the mapreduce revolution of Hadoop 1.0 allowed people access to the kind of computational infrastructure previously absent from the open source world. Without alternatives, people were forced to abuse a simple model invented for log batch processing. Others didn't limit themselves with two kinds of jobs and went for real relational operators. 

Impala was arguably the first wake up call from the database universe. Its technological stack alone was a clear indication of the kind of gun real database folks bring to a Hadoop knife fight. Nowadays it's Spark that is so bright and shiny that every other day people suggest it could replace MapReduce in Hadoop stack.

I reckon it'll take years to see how it all plays out. If Spark, with its academic pedigree, can challenge Hadoop/MR could it be the case that AsterixDB or Flink mature enough in a couple of years to do something similar? They also started at respected universities, roughly at the same time, had the right relational MPP core from day one, have some Scala in the code base. Are they too late to the game and will never be able to acquire enough traction? Do they need a company to provide support (and aggressive marketing) as a precondition? Will they be able to run on YARN/Mesos well enough to compete in a common Hadoop environment?

Feb 18, 2015

GridGain is now an Apache project

It's not exactly news but still quite a recent stage in its evolution. Years ago, say in 2009, it was quite a unique product. There were a few popular data grids/distributed caches to choose from. There was no alternative open source computational grid though. Moreover, GG was easy to use and well documented. Its code base was well structured as a consequence of a highly pluggable architecture. For a small team with roots in a peculiar location it was a very impressive product. 

I am not sure why GG is not more popular. It's not exactly obscure but it certainly feels like a niche system. From the core API it's clear that the original MapReduce had some influence. Taking into account how generic most of GG core is it probably became a limiting factor later. One really important piece missing from their architecture was an explicit scheduler. Without it GG could not be re-purposed as a really generic basis for something akin to "sql-on-hadoop".

Which also raises a question. Did they fail to pivot in that direction? They had a lot of experience with middleware. They were smart enough to appreciate Scala extremely early. Is it conceivable that they might have come up with something like Spark? Did they lack database internals expertise? Did not have enough resources to compete with Internet companies? There is very little reuse in this space. Netty and ANTLR are probably the only libraries shared by the Prestos/Hives/Drills of this world. So apparently nobody needs common middleware. The Impalas of this world try to skip even most of Hadoop.

I am curious about GG future. They went back and forth on their open source/community edition strategy and at at times it looked like desperation. I believe they initially released community edition simultaneously with enterprise one. Then separated them by almost a year. Their github repository was just a backup for development done somewhere else. And now it's all an Apache project. Could it be a new beginning? 

It would be interesting to know how people use GG in 2015 when the market is so over-saturated with "grid/cluster" frameworks. For example, Analytics and Big Data space is big but owned by Hadoop&Co and being seriously attacked by Spark. Event-oriented/stream processing is a contested space now but I never heard GG mentioned in that context. If you squint enough, distributed actors in Akka look similar to GG jobs. I hardly ever see GG in job ads or people's resumes. Is there a market segment large enough left for GG and what is it?