May 30, 2011

Zookeeper-based implementation of GridGain Discovery SPI

The GridGain team is not particularly fond of Zookeeper. Their argument is based on the need for a rather static configuration which is somewhat at odds with the original vision for the highly dynamic GridGain framework. Nevertheless, nowadays a growing number of products use ZK as their discovery service to enable SOA-style architecture. Once you have ZK installed it's quite natural to try to reuse it to keep track of all kinds of servers in your system including GridGain nodes.

GridGain is a pretty interesting niche player. To the best of my knowledge they were the only open source Java product started as a computational grid framework when everyone was implementing K/V-store-style distributed caches. Curiously enough, nowadays everyone is bolting on something similar while GridGain is working on a distributed cache of their own.

When I first learned about GridGain a couple of years ago the two most striking features for me were their SPI-based architecture and the quality of their javadocs. Except for the kernal, pretty much every piece of functionality in GridGain belongs to one of a dozen well-defined SPI interfaces. So theoretically if you feel like you can provide your own implementation for a slice of the system. On the flip side, the entire framework is still shipped as one jar file and so Maven-based configuration management is complicated by the need to explicitly exclude at least half of transitive dependencies pulled in by default.

The Discovery SPI has a few obvious implementations out of the box. The recommended one is a recently rebranded and updated TCP-based implementation. Its protocol seems to have gossip characteristics but there is also the notion of coordinator. Under the hood, this implementation relies on an auxiliary notion of IP finder to determine the initial set of peers to try to connect to. If a connection succeeds, the peer server will forward the connection request to the coordinator. The coordinator is responsible for keeping a versioned view of the current topology.

The easy way

The TCP Discovery implementation internals are pretty complicated because the connection state machine, the corresponding message processing logic and socket IO all reside in the same Java class. Nevertheless it looks like the easiest way to integrate with ZK is to implement a new IP Finder. At a high level:
  • Choose a persistent znode with a sufficiently unique name/path (especially if you share a set of servers among multiple Gridgain clusters)
  • When a new instance of your Finder is being created: connect to ZK, check whether the base znode exists and create it if necessary, set a watcher for children of the base znode
  • When your ChildrenCallback is notified extract urls from the returned list of strings
  • When the local GridTcpDiscoverySpi instance registers an address with its Finder, create an ephemeral child node with a name based on the address (e.g. following the "hostname:port" convention) under the base znode
  • There is no need to remove the children created by a GridGain instance when the instance is stopped. ZK will detect a closed session and delete the corresponding nodes automatically.
I had a positive experience with an implementation along these lines with the previous version (3.0.5c). My understanding though is that this is more of a hack because the TCP SPI implementation will have all the socket-based machinery running redundantly. The DRY principle is clearly broken here. So this approach is probably more of a prototype to see how much you like ZK-based Discovery.

The right way

As they say, no pain no GridGain ;) And apparently the only correct way is to create a brand new Discovery SPI implementation. The good new is that there are at least two detailed examples: the JMS-based implementation and the Coherence-based one. The latter seems to be more relevant because Coherence is certainly more similar to GridGain than JMS is.

The bad news is that the Discovery SPI itself is not as trivial as one might think. You will need to keep in mind a few design details pertaining to the way the SPI is used by the rest of the system. Even when you think that some of them might be optional you will likely discover (ha-ha) that in reality other parts of the system actually use them.

The notion of GridGain Node includes the following elements:
  • a UUID used to uniquely identify a node (so you'll need to map them on whatever you put into ZK)
  • one or more IP addresses associated with the node (certainly to be held in ZK)
  • a set of node attributes created when the GridGain instance starts and given to the SPI by the kernal (another thing to put into ZK). Attributes are frequently used by SPI implementations to exchange configuration information via a generic mechanism
  • an implicit set of periodically refreshed node metrics. Here comes the pain - they change and so a chunk of data will need to be written to ZK periodically (and trigger a bunch of updates in ZK server and other GridGain instances).
  • a discovery listener to be notified about nodes that joined or left for internal event-based bookkeeping.
In comparison to the previous approach, the node representation in ZK is not only larger but also more dynamic. So a more promising ZK node structure could be borrowed from Norbert:

  • Create two persistent children of the base znode called "members" and "available"
  • The "members" node will have an ephemeral child node for each GridGain instance with the node state (attributes, metrics, probably UUID) kept as data 
  • The "available" node will have an ephemeral node for each IP address of all currently available instances (e.g. assuming the same "hostname:port"  name format). Each "available" node would refer to the corresponding "members" node (to simplify support for multiple IP adresses for the same instance).
  • The SPI would listen for changes to "available" nodes to detect joined and left nodes and to "members" nodes to detect updated metrics
The middle way

I am wary of frequent updates written to ZK. So another option would be to keep only URLs in ZK as in the first approach and come up with a metric exchange protocol of your own (with manually created sockets or even Netty). My inner child thinks it could be fun. My inner engineer thinks that enough already of reimplementing the same wheel over and over again. So this middle way would trade fewer ZK updates for a lot of coding and very real potential for subtle bugs.

May 27, 2011

Ideas from Hadoop/MapReduce alternatives

Hadoop as we know it today is synonymous with open-source MapReduce (which is an internal GOOG system). So currently its computational model requires chaining pairs of map and reduce jobs into more realistic workflows by means of Cascading or comparable framework.

Dryad, which was announced after MapReduce, promotes an alternative model based on execution of direct acyclic graphs of jobs. In contrast to hugely popular among the practitioners MapReduce, Dryad seems to be preferred by the academicians. There is a growing number of research frameworks based on a few ideas first popularized in this context by Dryad. What is even more interesting is that the academicians started open-sourcing their work recently (consider for example Nephele and Hyracks).

Smart people noticed this alternative trend quite some time ago. It remains to be seen if any of the wannabe contenders ever reaches maturity. Most likely if they fail to somehow merge into "Hadoop 2 (if not 3)" they will have only very limited user base. But at least the design ideas will be useful one way or another. To this end, I would like to summarize a few of them I found to be particularly relevant.

Programs as DAGs of computational jobs
  • Custom jobs can be added to the set of standardized reusable jobs
  • When data is partitioned, a job can be cloned and executed in parallel on all partitions
  • Jobs are connected with logical channels. Typically, there are at least local file- and socket-based implementations. 

Cloud-based v cluster-based
  • In traditional cluster systems distance-based heuristics can be applied to minimize data traffic between racks. In the cloud topology information is unavailable and the actual topology can change significantly.
  • In contrast to cluster environments, in the cloud it is possible to dynamically allocate and deallocate servers on demand. There are usually a limited number of available server types such as "small", "medium" and "large". Servers can be pooled by type and reused. The costs could be further minimized by keeping an idle server allocated until the end of its current period it was already charged for (typically, an hour).

Job scheduling
  • When a job has a preferred set of servers (e.g. because of data locality) it could be more efficient to wait a little longer for one of the servers to become available than to immediately schedule the job on the first idle server
  • Block (don't execute new tasks of) jobs that already are using more than a fair share of the servers
  • In a cluster, have a dedicated job queue for each server, rack and the entire cluster. Queue a  job (e.g. on the basis of its data locality) to the queues of its preferred servers and racks. When a job is scheduled remove it from all the queues.
  • Have dedicated capacity for long-running jobs on each server
  • Have a root task to keep track of the job state machine and resubmit, for a limited number of times, tasks that failed

DAG execution stages
  • Split the entire DAG into stages in a way reminiscent of topological sort
  • Only one stage is executed at a time. 
  • It limits the number of required servers and simplifies scheduling because only a subset of tasks needs to run simultaneously.
  • When a stage is completed, all the provisional results are "materialized" as local files. This has a side benefit of having checkpoints for free.
  • It simplifies automatic choice of channel types. Only jobs from the same stage can be connected with sockets. Jobs from different staged must be connected with file-based channels.

Multistage job startup
  • Instantiate tasks on chosen the servers, create network endpoints, send their descriptions to the scheduler
  • Merge descriptions and share it with all the tasks
  • On each server, resolve task peers using the merged descriptor

May 13, 2011

Nephele and scheduling in the cloud

A summary of "Nephele: Efficient Parallel Data Processing in the Cloud".

Goal: a data processing framework with support for dynamic allocation and de-allocation of different computational resources in the cloud.

Compute resources available in a cloud environment are highly dynamic and possibly heterogeneous. In addition, the network topology is hidden so scheduling optimizations based on knowledge of the distance to a particular rack or server are impossible.


A job graph is a DAG of tasks connected with edges. Tasks process records implementing a common interface. A task may have an arbitrary number of input and output gates though which records enter and leave the task. A task can be seen as a set of parallel subtasks processing different partitions of the data. By default each subtask is assigned to a dedicated server.

A job graph is transformed into execution graph by the job manager. The execution graph has two levels of detail:

  • the abstract level describes the job execution on a task level (without parallelization) and the scheduling of instance allocation/deallocation. A Group Vertex is created for every Job Graph vertex to control the set of subtasks. The edges between Group Vertices are ephemeral and do not represent any physical communication paths.
  • the concrete level defines the mapping of subtasks to servers and the communication channels between them. An Execution Vertex is created for each subtask. Each Execution Vertex is always controlled by its corresponding Group Vertex. Execution vertices are connected by channels.

Channel types

All edges of an Execution Graph are replaced by a channel before processing can begin. There are three channel types:
  • A network channel is based on a TCP socket connection. Two subtasks connected via a network channel can be executed on different instances. Since they must be executed at the same time, they are required to run in the same Execution Stage.
  • An in-memory channel uses the server memory to buffer data. The two connected subtasks must be scheduled to run on the same instance and in the same Execution Stage.
  • A file channel allows two subtasks to exchange records via the local file system. Two subtasks are assigned to the same instance and the consuming group vertex must be scheduled to run in a later Execution Stage than the producing group vertex. Subtasks must exchange records across different stages via file channels because they are the only channel types which store the intermediate records in a persistent manner.

Execution Stage Scheduling

The requested server types may be temporarily unavailable in the cloud but for cost-efficiency servers should be ideally allocated just before they can be used. The Execution Graph is split into one or more Execution Stages.

  • when the processing of a stage begins, all servers required within the stage are allocated. 
  • all subtasks included in this stage are sent to the corresponding Task Managers and ready to receive records. 
  • before the processing of a new stage, all intermediate results of its preceding stages are stored in a persistent manner. So the execution stage is similar to a checkpoint because a job can be interrupted and resumed later after a stage is completed.

The user can provide manual hints to change the default scheduling behavior:

  • into how many parallel subtasks should a task be split at runtime
  • how many subtasks can share the same server
  • which execution groups can share servers
  • channel type of each edge
  • server type required by a task (to characterize the hardware requirements)
Server type support

Server types are simple string identifiers such as "m1.small". The scheduler is given a list of available server types and their cost per time unit. Each task can be executed on its own server type. To support it, each subtask must be mapped to an Execution Instance. An Execution Instance has an ID and an server type representing the hardware characteristics. 

Before beginning to process a new Execution Stage, the scheduler collects all Execution Instances from that stage and tries to replace them with matching cloud instances. If all required instances could be allocated the subtasks are sent to the corresponding server s and set up for execution.

Nephele keeps track of server allocation time to minimize costs when usage is charged by the hour. An idle server of a particular type is not immediately deallocated if a server of the same type is required in an upcoming Execution Stage. It is kept allocated until the end of its current lease period. If the next Execution Stage begins before the end of that period, the server is reassigned to the Execution Vertex of that stage. Otherwise the server is deallocated in time not to cause any additional cost.

Nephele is an open source project.

May 4, 2011

Hyracks and parallel dataflow scheduling in stages

Goal: a parallel platform to be a target for compiling Hive-style declarative data processing languages. Collections of data items are stored as local partitions distributed across the nodes of the cluster. A Hyracks job processes one or more collections of data to produce one or more output collections.

Job topology

  • A Hyracks job is a dataflow DAG composed of operators (vertices) and connectors (edges).
  • An individual operator (e.g. HashJoin) consists of one or more activities (e.g. JoinBuild to build a hashtable from one input and JoinProbe to probe it with the other input).
  • Each activity is executed as a set of cloned tasks operating on different partitions of the data flowing through the activity. Each task consumes a partition of the activity's inputs and produces a partition of its output.
  • An operator descriptor knows about output record format and operator activities. 
  • The tasks corresponding to activities of the same partition of the same operator may need to share state (e.g. JoinBuild and JoinProbe ahers a hashtable) and so they are co-located by Haracks. Even though the previous task is not active anymore when the next is started a shared context is provided to exchange the required information between the two tasks.
Standard building blocks

Available operators include file readers/writers, sorters (in-memory and external), joiners (hash-based, GRACE, hybrid hash) and aggregators (e.g. hash-based).

Connectors distribute data produced by a set of sender operators to a set of receiver operators. Available connectors include:

  • M:N Hash-Partitioner: hashes every tuple to generate the receiver number. Tuples  keep their initial order on
  • the receiver side.
  • M:N Hash-Partitioning Merger: hashes each tuple to find the receiver. On the receiver side merges streams coming from different senders based on a given comparator.
  • M:N Range-Partitioner: associates one receiver with each range of partitioning field in a set of disjoint ranges
  • M:N Replicator: copies the data produced by every sender to every receiver.
  • 1:1 Connector: connects exactly one sender to one receiver.
Job execution

  • Typically, connections between activities of one operator are blocking (e.g. between JoinBuild and JoinProbe) while connections between operators are not (e.g. between input file reading operator and JoinBuild activity). 
  • Activities that are transitively connected to other activities in a job only through dataflow edges are said to form a stage. So a stage is a set of activities that can be co-scheduled. 
  • Stages are independently scheduled and executed in the order in which they become ready. A stage is ready to execute when all of its dependencies have successfully completed execution. Each stage is expanded into tasks just prior to its execution.

Currently, there are two scheduling choices:

  • configure the exact number of partitions of an operator to be created and server for each partition
  • configure just the number of partitions to allow automatic placement of the partitions
  • automatic partitioning and placement based on the estimated resource requirements and the current availability of resources is being developed
A Hyracks cluster is comprised of the cluster controller and node controllers. When a stage is ready to run, the cluster controller starts the stage tasks on a set of node controllers. It then waits until the stage completes or an node controller failure is detected.

A task is started in three steps:

1) Activation

  • In response to a request from the CC each NC creates its designated tasks.
  • For each task accepting input from other tasks a network endpoint is created. The mapping of tasks on endpoints is sent back to the CC
2) Pairing

  • The CC merges all the responses to create a job-wise address map.
  • The map is sent to all the NCs so that each sender knows its receiver addresses
3) Connecting

  • Once pairing is completed the CC makes the NCs start their tasks
Data stream management

  • A dataflow is a stream of records with an arbitrary number of fields. A fixed-size chunk of contiguous bytes representing a sequence of serialized records is called a frame. 
  • A frame is the unit of data transfer between tasks. To avoid excessive garbage churn Hyracks provides interfaces for comparing and hashing fields that can be implemented to work off of the binary data in a frame.
  • A task is implemented as a push-based iterator that receives a frame at a time from its inputs and pushes result frames to its consumers. Any repartitioning is achieved by using a Connector.
  • There are as many send-side instances of a connector as tasks in the producing activity and as many receive-side instances as tasks in the consuming activity.
  • When a send-side instance of a connector receives a frame from the producer, it applies its redistribution logic (e.g. hash-partitioning on a field) to move records to the relevant receive-side connector instances. Each record is copied to the target frame meant for the appropriate receive-side instance. When a target frame is full, the send-side instance sends the frame to the receive-side instance.
  • The buffering strategy for received frames is configurable and can use one network buffer either for all senders (e.g. M:N hash-partitioning) or for each sender (e.g. M:N sort-mergewhich expects pre-sorted frames).

For backward compatibility, Hyracks can execute Hadoop jobs.