Oct 31, 2011

Parsing queries with ANTLR using embedment helper

ANTLR is the most popular Java-based parser generator used by many products from Hibernate to Hive. There are two extremely well written and highly practical books written by the ANTLR developer. The most recent Martin Fowler's book also relies on ANTLR for external DSL examples.

So much high quality information could be confusing because not only each language can be described by a few different grammars but there are different options for parser designs. The rule of thumb seems to be that building and then walking the AST is appropriate when "compilation" is expected to have multiple passes (e.g. for tree rewriting for optimization purposes). In case of a one-pass "compilation" it could be more beneficial to use what Martin calls Embedment Helper.

The idea is to have a Builder-like object called by parser on detection of every interesting script element. The object is implemented in target language and embedded into the generated parser as Foreign Code. Thus, neither resources are spent on AST processing nor ANTLR grammar becomes unintelligible because of a lot of injected code.

A typical use case implies building for future use of a semantic model as opposed to executing the script while parsing. The model is populated by embedment helper. This post is intended to highlight a few simple idioms for developing a query parser with this approach.

Grammar headers
  • Remember that if you name your grammar TestQuery then ANTLR will generate classes TestQueryParser and TestQueryLexer
  • You will want to put them into some package using @header and @lexer::header sections
  • Embedment helper-related code will go to the @member section
grammar TestQuery;
tokens {
 FROM = 'FROM' ;

@header {
package net.ndolgov.antlrtest;

import org.antlr.runtime.*;
import java.io.IOException;

@members {
private EmbedmentHelper helper;

@lexer::header {
package net.ndolgov.antlrtest;

Embedment helper interface

The helper is a reference to an interface that defines a callback method for each interesting grammar element.

 * Embedment Helper object (see http://martinfowler.com/dslCatalog/embedmentHelper.html) called by ANTLR-generated query parser.
interface EmbedmentHelper {
     * Set storage id
     * @param id storage id
    void onStorage(String id);

     * Add variable definition
     * @param type variable type
     * @param name variable name
    void onVariable(Type type, String name);

Production rules

Now let's look at the rest of the grammar file to see how the helper is invoked. Note that the top-level production rule name "query" will result in a method called "query()" generated by ANTLR in the parser.

query : SELECT variable (',' variable )*
   FROM ID {helper.onStorage($ID.text);};
variable : type=varType id=ID {helper.onVariable(type, $id.text);};
varType returns [Type type]
 : LONG_TYPE {$type = Type.LONG;}
 | DOUBLE_TYPE {$type = Type.DOUBLE;};
  • Notice how Java code is embedded into the grammar using curly brackets e.g. "{helper.onStorage($ID.text);}"
  • Notice how to refer to a parsed piece of the query e.g. "$ID.text" in case of storage id
  • Notice how an alias can be used for the same purpose e.g. "type=varType" and later "{helper.onVariable(type ..."
  • Instead of returning just a substring of the original query a rule can return a Java object type e.g. "varType returns [Type type]" where Type is enumeration type. To create its instance we assign "{$type = Type.LONG;}". This also shows how to use Java enumerations in ANTLR grammars.

Embedment helper injection
Let's look again at the member section, this time updated to include helper injection.
@members {
    private EmbedmentHelper helper;

    public  EHT parseWithHelper(EHT helper) throws RecognitionException {
        this.helper = helper;
        return helper;

The parseWithHelper method shows three convenient points:
  • the method returns exactly the helper type it is given
  • it assigns given helper to private variable so that it can be called from the code embedded into the grammar
  • it hides the call to the top-level query method of the generated parser

Parser facade
It could be also convenient to hide parser instantiation from the client code with a facade class:

 * Parse a given query and return extracted execution-time representation of the parsed query
public final class QueryParser {
     * Parse a query expression and return the extracted request configuration
     * @param expr query expression
     * @return extracted request configuration
    public static QueryDescriptor parse(String expr) {
        try {
            final TestQueryParser parser = new TestQueryParser(new CommonTokenStream(new TestQueryLexer(new ANTLRStringStream(expr))));
            final EmbedmentHelperImpl helper = parser.parseWithHelper(new EmbedmentHelperImpl());

            return helper.queryDescriptor();
        } catch (RecognitionException e) {
            throw new RuntimeException("Could not parse query: " + expr, e);

Parsing error processing
One last thing to remember is error processing. In our case we just throw a runtime exception.

@members {

    public void emitErrorMessage(String msg) {
        throw new IllegalArgumentException("Query parser error: " + msg);

Complete source code is available as a maven project on GitHub

May 30, 2011

Zookeeper-based implementation of GridGain Discovery SPI

The GridGain team is not particularly fond of Zookeeper. Their argument is based on the need for a rather static configuration which is somewhat at odds with the original vision for the highly dynamic GridGain framework. Nevertheless, nowadays a growing number of products use ZK as their discovery service to enable SOA-style architecture. Once you have ZK installed it's quite natural to try to reuse it to keep track of all kinds of servers in your system including GridGain nodes.

GridGain is a pretty interesting niche player. To the best of my knowledge they were the only open source Java product started as a computational grid framework when everyone was implementing K/V-store-style distributed caches. Curiously enough, nowadays everyone is bolting on something similar while GridGain is working on a distributed cache of their own.

When I first learned about GridGain a couple of years ago the two most striking features for me were their SPI-based architecture and the quality of their javadocs. Except for the kernal, pretty much every piece of functionality in GridGain belongs to one of a dozen well-defined SPI interfaces. So theoretically if you feel like you can provide your own implementation for a slice of the system. On the flip side, the entire framework is still shipped as one jar file and so Maven-based configuration management is complicated by the need to explicitly exclude at least half of transitive dependencies pulled in by default.

The Discovery SPI has a few obvious implementations out of the box. The recommended one is a recently rebranded and updated TCP-based implementation. Its protocol seems to have gossip characteristics but there is also the notion of coordinator. Under the hood, this implementation relies on an auxiliary notion of IP finder to determine the initial set of peers to try to connect to. If a connection succeeds, the peer server will forward the connection request to the coordinator. The coordinator is responsible for keeping a versioned view of the current topology.

The easy way

The TCP Discovery implementation internals are pretty complicated because the connection state machine, the corresponding message processing logic and socket IO all reside in the same Java class. Nevertheless it looks like the easiest way to integrate with ZK is to implement a new IP Finder. At a high level:
  • Choose a persistent znode with a sufficiently unique name/path (especially if you share a set of servers among multiple Gridgain clusters)
  • When a new instance of your Finder is being created: connect to ZK, check whether the base znode exists and create it if necessary, set a watcher for children of the base znode
  • When your ChildrenCallback is notified extract urls from the returned list of strings
  • When the local GridTcpDiscoverySpi instance registers an address with its Finder, create an ephemeral child node with a name based on the address (e.g. following the "hostname:port" convention) under the base znode
  • There is no need to remove the children created by a GridGain instance when the instance is stopped. ZK will detect a closed session and delete the corresponding nodes automatically.
I had a positive experience with an implementation along these lines with the previous version (3.0.5c). My understanding though is that this is more of a hack because the TCP SPI implementation will have all the socket-based machinery running redundantly. The DRY principle is clearly broken here. So this approach is probably more of a prototype to see how much you like ZK-based Discovery.

The right way

As they say, no pain no GridGain ;) And apparently the only correct way is to create a brand new Discovery SPI implementation. The good new is that there are at least two detailed examples: the JMS-based implementation and the Coherence-based one. The latter seems to be more relevant because Coherence is certainly more similar to GridGain than JMS is.

The bad news is that the Discovery SPI itself is not as trivial as one might think. You will need to keep in mind a few design details pertaining to the way the SPI is used by the rest of the system. Even when you think that some of them might be optional you will likely discover (ha-ha) that in reality other parts of the system actually use them.

The notion of GridGain Node includes the following elements:
  • a UUID used to uniquely identify a node (so you'll need to map them on whatever you put into ZK)
  • one or more IP addresses associated with the node (certainly to be held in ZK)
  • a set of node attributes created when the GridGain instance starts and given to the SPI by the kernal (another thing to put into ZK). Attributes are frequently used by SPI implementations to exchange configuration information via a generic mechanism
  • an implicit set of periodically refreshed node metrics. Here comes the pain - they change and so a chunk of data will need to be written to ZK periodically (and trigger a bunch of updates in ZK server and other GridGain instances).
  • a discovery listener to be notified about nodes that joined or left for internal event-based bookkeeping.
In comparison to the previous approach, the node representation in ZK is not only larger but also more dynamic. So a more promising ZK node structure could be borrowed from Norbert:

  • Create two persistent children of the base znode called "members" and "available"
  • The "members" node will have an ephemeral child node for each GridGain instance with the node state (attributes, metrics, probably UUID) kept as data 
  • The "available" node will have an ephemeral node for each IP address of all currently available instances (e.g. assuming the same "hostname:port"  name format). Each "available" node would refer to the corresponding "members" node (to simplify support for multiple IP adresses for the same instance).
  • The SPI would listen for changes to "available" nodes to detect joined and left nodes and to "members" nodes to detect updated metrics
The middle way

I am wary of frequent updates written to ZK. So another option would be to keep only URLs in ZK as in the first approach and come up with a metric exchange protocol of your own (with manually created sockets or even Netty). My inner child thinks it could be fun. My inner engineer thinks that enough already of reimplementing the same wheel over and over again. So this middle way would trade fewer ZK updates for a lot of coding and very real potential for subtle bugs.

May 27, 2011

Ideas from Hadoop/MapReduce alternatives

Hadoop as we know it today is synonymous with open-source MapReduce (which is an internal GOOG system). So currently its computational model requires chaining pairs of map and reduce jobs into more realistic workflows by means of Cascading or comparable framework.

Dryad, which was announced after MapReduce, promotes an alternative model based on execution of direct acyclic graphs of jobs. In contrast to hugely popular among the practitioners MapReduce, Dryad seems to be preferred by the academicians. There is a growing number of research frameworks based on a few ideas first popularized in this context by Dryad. What is even more interesting is that the academicians started open-sourcing their work recently (consider for example Nephele and Hyracks).

Smart people noticed this alternative trend quite some time ago. It remains to be seen if any of the wannabe contenders ever reaches maturity. Most likely if they fail to somehow merge into "Hadoop 2 (if not 3)" they will have only very limited user base. But at least the design ideas will be useful one way or another. To this end, I would like to summarize a few of them I found to be particularly relevant.

Programs as DAGs of computational jobs
  • Custom jobs can be added to the set of standardized reusable jobs
  • When data is partitioned, a job can be cloned and executed in parallel on all partitions
  • Jobs are connected with logical channels. Typically, there are at least local file- and socket-based implementations. 

Cloud-based v cluster-based
  • In traditional cluster systems distance-based heuristics can be applied to minimize data traffic between racks. In the cloud topology information is unavailable and the actual topology can change significantly.
  • In contrast to cluster environments, in the cloud it is possible to dynamically allocate and deallocate servers on demand. There are usually a limited number of available server types such as "small", "medium" and "large". Servers can be pooled by type and reused. The costs could be further minimized by keeping an idle server allocated until the end of its current period it was already charged for (typically, an hour).

Job scheduling
  • When a job has a preferred set of servers (e.g. because of data locality) it could be more efficient to wait a little longer for one of the servers to become available than to immediately schedule the job on the first idle server
  • Block (don't execute new tasks of) jobs that already are using more than a fair share of the servers
  • In a cluster, have a dedicated job queue for each server, rack and the entire cluster. Queue a  job (e.g. on the basis of its data locality) to the queues of its preferred servers and racks. When a job is scheduled remove it from all the queues.
  • Have dedicated capacity for long-running jobs on each server
  • Have a root task to keep track of the job state machine and resubmit, for a limited number of times, tasks that failed

DAG execution stages
  • Split the entire DAG into stages in a way reminiscent of topological sort
  • Only one stage is executed at a time. 
  • It limits the number of required servers and simplifies scheduling because only a subset of tasks needs to run simultaneously.
  • When a stage is completed, all the provisional results are "materialized" as local files. This has a side benefit of having checkpoints for free.
  • It simplifies automatic choice of channel types. Only jobs from the same stage can be connected with sockets. Jobs from different staged must be connected with file-based channels.

Multistage job startup
  • Instantiate tasks on chosen the servers, create network endpoints, send their descriptions to the scheduler
  • Merge descriptions and share it with all the tasks
  • On each server, resolve task peers using the merged descriptor

May 13, 2011

Nephele and scheduling in the cloud

A summary of "Nephele: Efficient Parallel Data Processing in the Cloud".

Goal: a data processing framework with support for dynamic allocation and de-allocation of different computational resources in the cloud.

Compute resources available in a cloud environment are highly dynamic and possibly heterogeneous. In addition, the network topology is hidden so scheduling optimizations based on knowledge of the distance to a particular rack or server are impossible.


A job graph is a DAG of tasks connected with edges. Tasks process records implementing a common interface. A task may have an arbitrary number of input and output gates though which records enter and leave the task. A task can be seen as a set of parallel subtasks processing different partitions of the data. By default each subtask is assigned to a dedicated server.

A job graph is transformed into execution graph by the job manager. The execution graph has two levels of detail:

  • the abstract level describes the job execution on a task level (without parallelization) and the scheduling of instance allocation/deallocation. A Group Vertex is created for every Job Graph vertex to control the set of subtasks. The edges between Group Vertices are ephemeral and do not represent any physical communication paths.
  • the concrete level defines the mapping of subtasks to servers and the communication channels between them. An Execution Vertex is created for each subtask. Each Execution Vertex is always controlled by its corresponding Group Vertex. Execution vertices are connected by channels.

Channel types

All edges of an Execution Graph are replaced by a channel before processing can begin. There are three channel types:
  • A network channel is based on a TCP socket connection. Two subtasks connected via a network channel can be executed on different instances. Since they must be executed at the same time, they are required to run in the same Execution Stage.
  • An in-memory channel uses the server memory to buffer data. The two connected subtasks must be scheduled to run on the same instance and in the same Execution Stage.
  • A file channel allows two subtasks to exchange records via the local file system. Two subtasks are assigned to the same instance and the consuming group vertex must be scheduled to run in a later Execution Stage than the producing group vertex. Subtasks must exchange records across different stages via file channels because they are the only channel types which store the intermediate records in a persistent manner.

Execution Stage Scheduling

The requested server types may be temporarily unavailable in the cloud but for cost-efficiency servers should be ideally allocated just before they can be used. The Execution Graph is split into one or more Execution Stages.

  • when the processing of a stage begins, all servers required within the stage are allocated. 
  • all subtasks included in this stage are sent to the corresponding Task Managers and ready to receive records. 
  • before the processing of a new stage, all intermediate results of its preceding stages are stored in a persistent manner. So the execution stage is similar to a checkpoint because a job can be interrupted and resumed later after a stage is completed.

The user can provide manual hints to change the default scheduling behavior:

  • into how many parallel subtasks should a task be split at runtime
  • how many subtasks can share the same server
  • which execution groups can share servers
  • channel type of each edge
  • server type required by a task (to characterize the hardware requirements)
Server type support

Server types are simple string identifiers such as "m1.small". The scheduler is given a list of available server types and their cost per time unit. Each task can be executed on its own server type. To support it, each subtask must be mapped to an Execution Instance. An Execution Instance has an ID and an server type representing the hardware characteristics. 

Before beginning to process a new Execution Stage, the scheduler collects all Execution Instances from that stage and tries to replace them with matching cloud instances. If all required instances could be allocated the subtasks are sent to the corresponding server s and set up for execution.

Nephele keeps track of server allocation time to minimize costs when usage is charged by the hour. An idle server of a particular type is not immediately deallocated if a server of the same type is required in an upcoming Execution Stage. It is kept allocated until the end of its current lease period. If the next Execution Stage begins before the end of that period, the server is reassigned to the Execution Vertex of that stage. Otherwise the server is deallocated in time not to cause any additional cost.

Nephele is an open source project.

May 4, 2011

Hyracks and parallel dataflow scheduling in stages

Goal: a parallel platform to be a target for compiling Hive-style declarative data processing languages. Collections of data items are stored as local partitions distributed across the nodes of the cluster. A Hyracks job processes one or more collections of data to produce one or more output collections.

Job topology

  • A Hyracks job is a dataflow DAG composed of operators (vertices) and connectors (edges).
  • An individual operator (e.g. HashJoin) consists of one or more activities (e.g. JoinBuild to build a hashtable from one input and JoinProbe to probe it with the other input).
  • Each activity is executed as a set of cloned tasks operating on different partitions of the data flowing through the activity. Each task consumes a partition of the activity's inputs and produces a partition of its output.
  • An operator descriptor knows about output record format and operator activities. 
  • The tasks corresponding to activities of the same partition of the same operator may need to share state (e.g. JoinBuild and JoinProbe ahers a hashtable) and so they are co-located by Haracks. Even though the previous task is not active anymore when the next is started a shared context is provided to exchange the required information between the two tasks.
Standard building blocks

Available operators include file readers/writers, sorters (in-memory and external), joiners (hash-based, GRACE, hybrid hash) and aggregators (e.g. hash-based).

Connectors distribute data produced by a set of sender operators to a set of receiver operators. Available connectors include:

  • M:N Hash-Partitioner: hashes every tuple to generate the receiver number. Tuples  keep their initial order on
  • the receiver side.
  • M:N Hash-Partitioning Merger: hashes each tuple to find the receiver. On the receiver side merges streams coming from different senders based on a given comparator.
  • M:N Range-Partitioner: associates one receiver with each range of partitioning field in a set of disjoint ranges
  • M:N Replicator: copies the data produced by every sender to every receiver.
  • 1:1 Connector: connects exactly one sender to one receiver.
Job execution

  • Typically, connections between activities of one operator are blocking (e.g. between JoinBuild and JoinProbe) while connections between operators are not (e.g. between input file reading operator and JoinBuild activity). 
  • Activities that are transitively connected to other activities in a job only through dataflow edges are said to form a stage. So a stage is a set of activities that can be co-scheduled. 
  • Stages are independently scheduled and executed in the order in which they become ready. A stage is ready to execute when all of its dependencies have successfully completed execution. Each stage is expanded into tasks just prior to its execution.

Currently, there are two scheduling choices:

  • configure the exact number of partitions of an operator to be created and server for each partition
  • configure just the number of partitions to allow automatic placement of the partitions
  • automatic partitioning and placement based on the estimated resource requirements and the current availability of resources is being developed
A Hyracks cluster is comprised of the cluster controller and node controllers. When a stage is ready to run, the cluster controller starts the stage tasks on a set of node controllers. It then waits until the stage completes or an node controller failure is detected.

A task is started in three steps:

1) Activation

  • In response to a request from the CC each NC creates its designated tasks.
  • For each task accepting input from other tasks a network endpoint is created. The mapping of tasks on endpoints is sent back to the CC
2) Pairing

  • The CC merges all the responses to create a job-wise address map.
  • The map is sent to all the NCs so that each sender knows its receiver addresses
3) Connecting

  • Once pairing is completed the CC makes the NCs start their tasks
Data stream management

  • A dataflow is a stream of records with an arbitrary number of fields. A fixed-size chunk of contiguous bytes representing a sequence of serialized records is called a frame. 
  • A frame is the unit of data transfer between tasks. To avoid excessive garbage churn Hyracks provides interfaces for comparing and hashing fields that can be implemented to work off of the binary data in a frame.
  • A task is implemented as a push-based iterator that receives a frame at a time from its inputs and pushes result frames to its consumers. Any repartitioning is achieved by using a Connector.
  • There are as many send-side instances of a connector as tasks in the producing activity and as many receive-side instances as tasks in the consuming activity.
  • When a send-side instance of a connector receives a frame from the producer, it applies its redistribution logic (e.g. hash-partitioning on a field) to move records to the relevant receive-side connector instances. Each record is copied to the target frame meant for the appropriate receive-side instance. When a target frame is full, the send-side instance sends the frame to the receive-side instance.
  • The buffering strategy for received frames is configurable and can use one network buffer either for all senders (e.g. M:N hash-partitioning) or for each sender (e.g. M:N sort-mergewhich expects pre-sorted frames).

For backward compatibility, Hyracks can execute Hadoop jobs.

Apr 28, 2011

Mesos and inspiration for next generation Hadoop

The notion of Next Generation Hadoop (NGH) is somewhat blurred at this point. Recent announcements by YHOO and Facebook could be construed as at least two independent branches referred to as NGH. So even now that YHOO is merging all its development to main Apache codeline I am not sure how many NGHs are being developed right now and how convergent that process is.

The only technical description of the NGH I am aware of was written by YHOO engineers. Even though that posts does not mention Mesos I am pretty sure it is not coincidental that the NGH shares so much with it. It is also noteworthy that Mesos itself is now an Apache project.

A summary of "Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center":

Goal: when running multiple frameworks in the same cluster
  • improve utilization through statistical multiplexing
  • share datasets that are too expensive to replicate
Two-stage scheduling with resource offers
  • Mesos decides how many resources to offer each framework
  • Frameworks decide which resources to accept and which computations to run on them
  • A fault-tolerant Zookeeper-based master process manages slave daemons running on each cluster node
  • Slaves report to the master which vacant resources they have (e.g. {2CPUs,16GB})
  • A framework scheduler registers with the master
  • A framework scheduler is offered resources, decides which ones to use and describe tasks to launch on those resources
  • A framework executor is launched on slave nodes to execute tasks
  • Each resource offer is a list of free resources on multiple slaves
  • A pluggable master strategy decides how many resources to offer to teach framework
  • Supports long tasks by allowing to designate a set of resources on a slave for use by long tasks
  • Linux containers are used to isolate frameworks
Dominant Resource Fairness
  • Equalize each framework's fractional share of its dominant resource (i.e. the resource that it has the largest fractional share of)
  • Example: make F1's share of CPU equals F2's share of RAM if F1 is CPU-pound and F2 needs mostly memory
  • Filters registered by a framework with the master to short-circuit the rejection process (e.g. only from nodes from a given list or at least as many resources free)
  • For the purpose of allocation, count offered resources as used by the framework to encourage the framework to respond quickly 
  • Rescind an offer if the framework does not respond for too long

Apr 27, 2011

Quincy and scheduling in Dryad

A summary of "Quincy: Fair Scheduling for Distributed Computing Clusters"

Each job is managed by a root task that contains a state machine managing the workflow of that job. Actual work is done by worker tasks which may be executed multiple times and will always generate the same result.

There is a single centralized scheduling service responsible for a queue of jobs for the cluster. A root task sends a list of ready workers and their input data summaries to the scheduler. The scheduler chooses computers for tasks and makes the root task to start them. The root task is responsible for back-tracking through the dependency graph and resubmitting in case of a failure. If the scheduler decides to kill a worker task before it completes it will notify the root task.

A worker is not submitted to the scheduler until all of its input files have been written to the cluster. When a worker is ready its root task computes, for each computer, the amount of data that the worker would read across the network. The root then creates for it a list of preferred computers and a list of preferred racks.

Fairness: a job which takes T seconds when executed alone should take no more than N*T seconds when there are N concurrent jobs. There is a hard limit on the total number of jobs in the cluster and when it's reached new jobs are queued (and started later in order of submission time). Each computer runs only one task at a time and each job is allowed to run on a certain number of servers.

Old queue-based scheduling

  • One queue for each server, one queue for each rack, one queue for the entire cluster
  • A new job is added to the queues of its preferred servers, preferred racks and the cluster-wide queue
  • When the job is scheduled it's removed from all the queues
  • When a job is started its root task is executed on a server not running another root task; if there's a worker task it is killed and resubmitted
  • Basic algorithm: when a server becomes idle, assign it a task from its server queue, its rack queue or the cluster queue.
  • Greedy fairness: block jobs that have more tasks running than min([cluster size/number of jobs], number of workers). When a job is blocked its waiting tasks will not be scheduled.
  • Fairness with preemption: starting with the most recently scheduled, kill tasks of jobs that have more than their quota of tasks.
New flow-based scheduling

Graph topology
  • Represent instantaneous scheduling as a min-cost flow network problem
  • Each task has one unit of flow as its supply
  • There are nodes in the graph for each root and worker task, an unscheduled node for each job, a node for each server, a rack aggregator node for each rack, and a cluster aggregator node. 
  • There is the sink node through which all flows drain from the graph. Only unscheduled and server nodes are connected to it
  • Each root task has a single edge to the server where it is running. 
  • Each worker task has an edge to its job's unscheduled node, to the cluster-aggregator node, and to every rack and server in its preferred lists.
  • Workers that are executing have an edge to the server on which they are running
Scheduling behavior and parameters
  • Computer/rack/cluster edge cost is a function of the amount of data that would be transferred across rack and core switches
  • Unscheduled edge represents the penalty for leaving a task unscheduled (increases over time)
  • When a task is started, an additional cost (increasing with time) is added to its edges to nodes other than the server it is running on.
  • Tradeoffs are controlled with only three parameters: the cost of waiting in the queue, the cost of transferring data across the core switch, the cost of transferring data across a rack switch
  • The scheduler updates the graph when a job-related event occurs and on a regular timer event (some costs are time-dependent)
  • When the graph is changed the scheduler computes a new min-cost flow and then starts or kills tasks as necessary
  • Multidimensional capacities (e.g. CPU and memory) cannot be easily represented and so Mesos-style fine-grained resource allocation is infeasible
  • Correlated constraints (e.g. run two task in the same rack) are equally challenging

Apr 24, 2011

Starfish and Hadoop self-tuning

A summary of "Starfish: A Self-tuning System for Big Data Analytics".

Most of Hadoop features must be managed manually with multiple obscure parameters. As an example, Hadoop supports dynamic cluster membership changes but it has no support for deciding when to add/remove nodes or when to rebalance the data layout. Instead of aiming for peak performance, Starfish project wants to provide good Hadoop performance automatically.

Three levels of Hadoop workload optimization:
  • Individual MR jobs
  • MR jobs assembled into a workflow (e.g. generated from HiveQL or by a Cascading-style framework)
  • Collections of workflows
Job-level tuning
  • JIT optimizer (instead of manual configuration of 190 parameters) based on 
  • Profiler (dynamic instrumentation to learn performance models) and 
  • Sampler (statistics about input/intermediate/output key-value spaces of a job)
The Profiler creates a job profile for different phases of MR job
  • Timings view: where wall-clock time is spent in each phase
  • Data flow view: how much data is processed in each phase
  • Resource-level view: how many resources such as CPU and memory is used in each phase
Workflow-level tuning
  • Workflow-aware Scheduler ("global optimization")
  • WhatIf engine (uses performance models and a job profile to estimate a new profile for different configuration parameters)
  • Data Manager (rebalance HDFS data blocks using different block placement policies)
Workload-level tuning
  • Workload Optimizer to generate an equivalent, but optimized, collection of workflows using 
  • Data-flow sharing (reusing the same job on behalf of different workflows)
  • Materialization (caching intermediate data for later reuse, probably by other workflows; also helps avoid cascading reexecution)
  • Reorganization (automatically chosen alternative means of keeping intermediate data such as key-value and column stores)
Starfish is built on top of Hadoop. Its input is expressed in a new language called Lastword. The language is not supposed to be used directly by humans. Instead there are translators from HiveQL-style languages to submit a collection of MR workflows. Those workflows can be DAGs of MR jobs, select-project-join-aggregate logical specification or user-defined functions. Workflows can be annotated with metadata such as scheduling hints, data statistics and data layouts.

Starfish is an open-source project.

Apr 23, 2011

Delay Scheduling and Hadoop Fair Scheduler

I am blessed with a job which is in a domain sophisticated enough to encourage reading research papers. I am planning to post my notes on the most interesting ones at least as a means of keeping key notes readily available.

So without further ado, here goes a summary of "Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling":

The key trade-off is fairness (in allocation of resources) v data locality (executing a job on a node that already has input data for the job). The scheduler goal is sharing a cluster between multiple users with a mix of long batch jobs and short interactive queries over a common data set. Fair scheduling requires resource reallocation when the number of jobs changes.

The original Hadoop FIFO scheduler:

  • assign tasks in response to heartbeats sent by slaves which report the number of free map and reduce slots on the slave
  • scan through jobs in order of priority and submit time to find one with a task of the required type
  • for maps, after selecting a job greedily pick the map task in the job with data closest to the slave
Locality problems with naive fair sharing (assign free slots to the job that has the fewest running tasks):

  • Head-of-line scheduling: small jobs are likely to have their tasks sent to random nodes 
  • Sticky Slots: a tendency for a job to be assigned the same slot repeatedly (a task completes, its job has fewer tasks than the others, the slot is reassigned to the job again)
The Hadoop Fair Scheduler: 

  • divide resources using max-min fair sharing to achieve statistical multiplexing
  • place computations near their input data to maximize system throughput
A two-level scheduling hierarchy: 

  • allocate task slots across pools using weighted fair sharing
  • let each pool allocate its slots using either FIFO with priorities or a second level of fair sharing (each pool can be given a minimum share guaranteed to be given as long as the pool contains jobs)
Delay Scheduling

  • each job has two timeouts
  • a job with no tasks local to a node is not scheduled for the duration of the first timeout. 
  • the job is not scheduled for the duration of the second timeout if it does not have tasks local to the rack of the node.
Key enablers: 

  • most tasks are short compared to jobs
  • multiple locations in which a task can run to read a given data block (including multiple task slots per node)

Jan 14, 2011

Concurrent frameworks

Newer languages such as Scala re-introduced not as new concurrency models such as Actors. In a few recent posts people suggest different ways to compare frameworks available on the JVM.

I believe juxtaposing j.u.c and the rest of the frameworks mentioned by both authors is fundamentally wrong. Concurrency goes all the way down to real hardware (just think about CMPXCHG on x86 for example and its role in support of CAS on which virtually all non-blocking algorithms are based). Theoretically, there could be a different concurrency model implemented at that level but it is not the case in our commodity PC world. Even if there's bus-based message passing deep inside of your CPU you have no exposure to it, just barrier-like instructions.

Thus, everything starts from whatever hardware supports. If you implement a drastically different model on top of it you will have to pay a price. Which people who really care about concurrency will not be comfortable with. So you are likely to squeeze the most concurrency from a software approach which mirrors the hardware approach the most (think about all the Unsafe support Mr. Lea has from the JVM guys or read locks Azul guys have in their CPU to enable their miraculous pauseless garbage collector). 

It's only natural to consider everything else to be just additional levels of indirection in the concurrent stack. The core primitives exposed by your operating system will be quite close to the memory model of actual hardware. Then there's the tricky question of memory models in language runtime libraries, something which we on the JVM are lucky to have solved since JSR-133. All this complexity means that we are not likely to see anything comparable to j.u.c which is tightly integrated with the JVM, which in turn attempts via native means to get as close as possible to the hardware.

On the flip side, there are people who don't need every last drop of concurrency, are too young to be proficient  in even something as high-level as j.u.c or have access to unreasonable number of CPU cores and so can trade CPU cycles for code simplicity. The latter category is like to be increasingly dominant provided all the multi-core forecasts we have heard in the last few years are correct. And so Actors and similar models are extremely likely to be just j.u.c. queues and executors in disguise. 

It's good engineering to reuse higher-level components built on top of more challenging foundational blocks. Once we have enough CPU cores to waste very few people will probably care about old school concurrent primitives. But it will still be as good to be able to see j.u.c. behind some Actor facade as it is to see C code behind C++ abstractions or Java behind Scala. It allows you to better predict the amount of garbage and the performance penalty for using higher levels of abstraction.

On a related note,  I am not even entirely convinced we can separate F/J from j.u.c. proper. The former seems to be just a different set of trade-offs implemented within the same framework to solve a different class of problems. You want IO concurrency, you use the classical j.u.c. You want embarrassingly parallel fine-grained computations, you rely on F/J.