May 4, 2011

Hyracks and parallel dataflow scheduling in stages


Goal: a parallel platform to be a target for compiling Hive-style declarative data processing languages. Collections of data items are stored as local partitions distributed across the nodes of the cluster. A Hyracks job processes one or more collections of data to produce one or more output collections.

Job topology

  • A Hyracks job is a dataflow DAG composed of operators (vertices) and connectors (edges).
  • An individual operator (e.g. HashJoin) consists of one or more activities (e.g. JoinBuild to build a hashtable from one input and JoinProbe to probe it with the other input).
  • Each activity is executed as a set of cloned tasks operating on different partitions of the data flowing through the activity. Each task consumes a partition of the activity's inputs and produces a partition of its output.
  • An operator descriptor knows about output record format and operator activities. 
  • The tasks corresponding to activities of the same partition of the same operator may need to share state (e.g. JoinBuild and JoinProbe ahers a hashtable) and so they are co-located by Haracks. Even though the previous task is not active anymore when the next is started a shared context is provided to exchange the required information between the two tasks.
Standard building blocks

Available operators include file readers/writers, sorters (in-memory and external), joiners (hash-based, GRACE, hybrid hash) and aggregators (e.g. hash-based).

Connectors distribute data produced by a set of sender operators to a set of receiver operators. Available connectors include:

  • M:N Hash-Partitioner: hashes every tuple to generate the receiver number. Tuples  keep their initial order on
  • the receiver side.
  • M:N Hash-Partitioning Merger: hashes each tuple to find the receiver. On the receiver side merges streams coming from different senders based on a given comparator.
  • M:N Range-Partitioner: associates one receiver with each range of partitioning field in a set of disjoint ranges
  • M:N Replicator: copies the data produced by every sender to every receiver.
  • 1:1 Connector: connects exactly one sender to one receiver.
Job execution

  • Typically, connections between activities of one operator are blocking (e.g. between JoinBuild and JoinProbe) while connections between operators are not (e.g. between input file reading operator and JoinBuild activity). 
  • Activities that are transitively connected to other activities in a job only through dataflow edges are said to form a stage. So a stage is a set of activities that can be co-scheduled. 
  • Stages are independently scheduled and executed in the order in which they become ready. A stage is ready to execute when all of its dependencies have successfully completed execution. Each stage is expanded into tasks just prior to its execution.

Currently, there are two scheduling choices:

  • configure the exact number of partitions of an operator to be created and server for each partition
  • configure just the number of partitions to allow automatic placement of the partitions
  • automatic partitioning and placement based on the estimated resource requirements and the current availability of resources is being developed
A Hyracks cluster is comprised of the cluster controller and node controllers. When a stage is ready to run, the cluster controller starts the stage tasks on a set of node controllers. It then waits until the stage completes or an node controller failure is detected.

A task is started in three steps:

1) Activation

  • In response to a request from the CC each NC creates its designated tasks.
  • For each task accepting input from other tasks a network endpoint is created. The mapping of tasks on endpoints is sent back to the CC
2) Pairing

  • The CC merges all the responses to create a job-wise address map.
  • The map is sent to all the NCs so that each sender knows its receiver addresses
3) Connecting

  • Once pairing is completed the CC makes the NCs start their tasks
Data stream management

  • A dataflow is a stream of records with an arbitrary number of fields. A fixed-size chunk of contiguous bytes representing a sequence of serialized records is called a frame. 
  • A frame is the unit of data transfer between tasks. To avoid excessive garbage churn Hyracks provides interfaces for comparing and hashing fields that can be implemented to work off of the binary data in a frame.
  • A task is implemented as a push-based iterator that receives a frame at a time from its inputs and pushes result frames to its consumers. Any repartitioning is achieved by using a Connector.
  • There are as many send-side instances of a connector as tasks in the producing activity and as many receive-side instances as tasks in the consuming activity.
  • When a send-side instance of a connector receives a frame from the producer, it applies its redistribution logic (e.g. hash-partitioning on a field) to move records to the relevant receive-side connector instances. Each record is copied to the target frame meant for the appropriate receive-side instance. When a target frame is full, the send-side instance sends the frame to the receive-side instance.
  • The buffering strategy for received frames is configurable and can use one network buffer either for all senders (e.g. M:N hash-partitioning) or for each sender (e.g. M:N sort-mergewhich expects pre-sorted frames).

For backward compatibility, Hyracks can execute Hadoop jobs.

No comments: