Feb 26, 2012

Basic document and event analytics

Imagine that you want to add to your system the ability to find documents and events related to the "current context". Such a context could include important words, periods of time or topics from a preconfigured catalogue. The documents such as PDF and Word files could be found automatically by a web crawler or entered by placing them to a periodically scanned location. The events could be twitter and blog postings or GOOG alerts.

Ultimately, and especially if you expect to find most of input data by crawling at scale, you will likely want to apply data mining techniques for classification, clustering or recommending. Those are vast areas and so in the first version you will probably limit yourself to a simplified approach where some useful functionality can be provided without a Hadoop server farm and a room of data scientists.

Strictly speaking, document and event analytics are two distinctly different capabilities with their own data flows and available open source technologies. There are also certain architectural similarities between the two.

It is common in Big Data systems to have two independent workflows sharing the same data storage. One, typically batch-oriented, is responsible for data collecting and preprocessing. The other, basically real-time, is responsible for responding to user requests with the most relevant subset of preprocessed data. The batch-oriented workflow is often made scalable by running it on MapReduce-style middleware. The request-time workflow is frequently based on a NoSQL in-memory database to support scalability and high availability.

Document analytics
Preprocessing workflow:
  • A new documents is found by a crawler (web or file-system for non-public documents) 
  • The document is parsed into plain text and some accompanying metadata, the text is analyzed 
  • The filtered text is indexed and saved to persistent storage

Request processing workflow: 
  • A user sends a request with a context descriptor 
  • The request is converted to a query against the index shared with the preprocessing workflow 
  • The returned results are ranked using information from the context descriptor

Event analytics
Preprocessing workflow:
  • Feed funnel receives new events such as Twitter notifications or detects newly posted RSS updates
  • Relevant fields are extracted from the original raw data and packaged into event format used by the system
  • The events are filtered by relevance criteria such as subject or period of time 
  • The filtered events are aggregated by correlating with previously received events, known consumers or according to some preconfigured rules. In process, the original events are often converted into new events and fed back into the system.
  • The final leaf-level events are placed into persistent storage. The storage is likely to behave like a cache to allow expiration of obsolete events. 

Request processing workflow: 
  • In a pull-style design, a user sends a request with a context descriptor. In a push-style design, a long running session registers its interest in the events satisfying some predicates.
  • Query parser uses the context of the filters to retrieve matching events from the storage.

Available open source components

Assuming no need for classification, document analytics can be considered an exercise in full text search. Pretty much all open source products rely on the Lucene indexing engine for this purpose. Tika is another almost universally used library for parsing complex formats such as PDF. 

Even those the two libraries meet core functional requirements, they do not address questions such as scalability, pre-configured text processing pipelines or remote API. Those are answered by what is known as text search servers. They mostly provide support for distributed indexing and replace coding to Lucene API with XML file-style configuration of prepackaged functionality. The most popular alternatives:
  • Lucene core - a low-level approach which could make sense for a proof of concept project but would require some coding to productize the result.
  • SOLR - the flagship text search engine that requires a lot of XML configuration but has exhaustive documentation and a big community.
  • Elastic search - advertised as a more scalable SOLR (and until recently, with better near-real-time support) with significantly less documentation but no need for XML. It is likely to appeal to non-Java people.
  • Zoie - a niche player originally designed for for near-real-time search
Event analytics can be treated as a classical ESP problem. The world of open-source ESP/CEP is more obscure, probably because ESP originated in rich domains such as Finance and Telecoms. Until very recently Esper had been the only option. Unfortunately Esper does not support distributed processing in its community edition. Now there are two promising alternatives:
  • Esper - the golden standard of Java ESP equipped with a nice SQL-like DSL. The lack of scalability makes it appropriate only for proof of concept projects.
  • Yahoo S4 - still quite immature but with a good pedigree from a large enough company
  • Twitter Storm - probably even less mature, uses rather exotic technologies (Clojure, ZeroMQ), was written by what looks like a team of one. Functionality-wise, it is advertised to have delivery guarantees (which S4 does not have). In addition, it currently has a limited choice of integration plugins.
Functionality required for persistent storage of events is general enough to allow using any in-memory  product from Infinispan (simple and well-known) to Cassandra (popular at scale).

Jan 29, 2012

Scheduler design in YARN / MapReduce2 / Hadoop 0.23


The Hadoop branch started with version 0.23 introduced a few significant changes to Hadoop implementation. From the middleware perspective, the most salient one is a brand new approach to scheduling. Previously, Map and Reduce slots used to be explicitly distinguished for scheduling purposes which limited potential for using Hadoop framework in non-MapReduce environments. In YARN, one of the key goals was to make the entire framework more generic, Mesos-style, so that internal distributed machinery can be used for other computational models such as MPI.

I am particularly curious about the possibility to heavily customize YARN for running distributed computations based on a non-standard computational model. Now that even MSFT dropped their, arguably more powerful alternative, Hadoop is destined to become the most advanced open source distributed infrastructure. So many in-house computational platforms could potentially benefit from reusing its core components.

There is still interesting tension between fundamental Hadoop trade-offs and low latency requirements. To guarantee control over resource consumption and allow termination of misbehaving tasks Hadoop starts a new JVM instance for each allocated container. JVM startup and loading of task classes take a few seconds which would be spared if JVMs were reused. This approach is taken by, for example, by GridGain for the price of inability to restrict resource consumption of a particular task on a given node. We'll see how Hadoop developers extend resource allocation to CPUs which Mesos achieves with Linux Containers.

Scheduler API

The way the Scheduler API is defined has significant influence on how general and so reusable YARN will be. Currently, YARN is shipped with the same schedulers as before but if it gets used outside of default MapReduce world custom schedulers will likely be one of the most popular extension points.

If you look at the diagram below, you can see key abstractions of YARN Scheduler design.

  • Scheduler is notified about topology changes via event-based mechanism
  • Scheduler operates on one or more hierarchical task queues. A queue can be chosen in Job Configuration using the mapred.job.queue.name property.
  • Scheduler allocates resources on the basis of resource requests that currently are limited to memory only. A request includes optional preference for a particular host or rack.
  • Scheduler supports fault tolerance with the ability to recover itself from a state snapshot provided by Resource Manager

Scheduler events include notifications about changes to the topology of available nodes and the set of running applications:
Side notes

It remains to be seen how quickly YARN takes over classical Hadoop. Even though the new code base is more interesting, it seems to be rather immature despite three years since the previous major version. Some things which surprised me:

  • There are important pieces of code such as Scheduler recovery from failures which are currently commented out. I also saw a TODO item stating something like "synchronization approach is broken here" in another place. Probably it's me, but before I merged a couple files with trunk I had not been able to build original v0.23.
  • The good news is that they finally migrated to Maven. The bad news is that for a project of this magnitude they have only a handful of Maven modules and those are strangely nested and interleaved with multiple non-maven directories.
  • It's quite odd not to have any dependency injection framework used in a large server-side Java system.
  • Not only they have their own tiny web framework but it is mixed with pure server-side code
  • Even though Guava is declared as a dependency it is used only sporadically. As an example, they even have their own Service hierarchy

Proliferation of Hadoop branches does not make it any easier I guess.