Showing posts with label dryad. Show all posts
Showing posts with label dryad. Show all posts

May 27, 2011

Ideas from Hadoop/MapReduce alternatives

Hadoop as we know it today is synonymous with open-source MapReduce (which is an internal GOOG system). So currently its computational model requires chaining pairs of map and reduce jobs into more realistic workflows by means of Cascading or comparable framework.

Dryad, which was announced after MapReduce, promotes an alternative model based on execution of direct acyclic graphs of jobs. In contrast to hugely popular among the practitioners MapReduce, Dryad seems to be preferred by the academicians. There is a growing number of research frameworks based on a few ideas first popularized in this context by Dryad. What is even more interesting is that the academicians started open-sourcing their work recently (consider for example Nephele and Hyracks).

Smart people noticed this alternative trend quite some time ago. It remains to be seen if any of the wannabe contenders ever reaches maturity. Most likely if they fail to somehow merge into "Hadoop 2 (if not 3)" they will have only very limited user base. But at least the design ideas will be useful one way or another. To this end, I would like to summarize a few of them I found to be particularly relevant.

Programs as DAGs of computational jobs
  • Custom jobs can be added to the set of standardized reusable jobs
  • When data is partitioned, a job can be cloned and executed in parallel on all partitions
  • Jobs are connected with logical channels. Typically, there are at least local file- and socket-based implementations. 

Cloud-based v cluster-based
  • In traditional cluster systems distance-based heuristics can be applied to minimize data traffic between racks. In the cloud topology information is unavailable and the actual topology can change significantly.
  • In contrast to cluster environments, in the cloud it is possible to dynamically allocate and deallocate servers on demand. There are usually a limited number of available server types such as "small", "medium" and "large". Servers can be pooled by type and reused. The costs could be further minimized by keeping an idle server allocated until the end of its current period it was already charged for (typically, an hour).

Job scheduling
  • When a job has a preferred set of servers (e.g. because of data locality) it could be more efficient to wait a little longer for one of the servers to become available than to immediately schedule the job on the first idle server
  • Block (don't execute new tasks of) jobs that already are using more than a fair share of the servers
  • In a cluster, have a dedicated job queue for each server, rack and the entire cluster. Queue a  job (e.g. on the basis of its data locality) to the queues of its preferred servers and racks. When a job is scheduled remove it from all the queues.
  • Have dedicated capacity for long-running jobs on each server
  • Have a root task to keep track of the job state machine and resubmit, for a limited number of times, tasks that failed

DAG execution stages
  • Split the entire DAG into stages in a way reminiscent of topological sort
  • Only one stage is executed at a time. 
  • It limits the number of required servers and simplifies scheduling because only a subset of tasks needs to run simultaneously.
  • When a stage is completed, all the provisional results are "materialized" as local files. This has a side benefit of having checkpoints for free.
  • It simplifies automatic choice of channel types. Only jobs from the same stage can be connected with sockets. Jobs from different staged must be connected with file-based channels.

Multistage job startup
  • Instantiate tasks on chosen the servers, create network endpoints, send their descriptions to the scheduler
  • Merge descriptions and share it with all the tasks
  • On each server, resolve task peers using the merged descriptor

Apr 27, 2011

Quincy and scheduling in Dryad

A summary of "Quincy: Fair Scheduling for Distributed Computing Clusters"

Each job is managed by a root task that contains a state machine managing the workflow of that job. Actual work is done by worker tasks which may be executed multiple times and will always generate the same result.

There is a single centralized scheduling service responsible for a queue of jobs for the cluster. A root task sends a list of ready workers and their input data summaries to the scheduler. The scheduler chooses computers for tasks and makes the root task to start them. The root task is responsible for back-tracking through the dependency graph and resubmitting in case of a failure. If the scheduler decides to kill a worker task before it completes it will notify the root task.

A worker is not submitted to the scheduler until all of its input files have been written to the cluster. When a worker is ready its root task computes, for each computer, the amount of data that the worker would read across the network. The root then creates for it a list of preferred computers and a list of preferred racks.

Fairness: a job which takes T seconds when executed alone should take no more than N*T seconds when there are N concurrent jobs. There is a hard limit on the total number of jobs in the cluster and when it's reached new jobs are queued (and started later in order of submission time). Each computer runs only one task at a time and each job is allowed to run on a certain number of servers.

Old queue-based scheduling

Architecture
  • One queue for each server, one queue for each rack, one queue for the entire cluster
  • A new job is added to the queues of its preferred servers, preferred racks and the cluster-wide queue
  • When the job is scheduled it's removed from all the queues
  • When a job is started its root task is executed on a server not running another root task; if there's a worker task it is killed and resubmitted
  • Basic algorithm: when a server becomes idle, assign it a task from its server queue, its rack queue or the cluster queue.
  • Greedy fairness: block jobs that have more tasks running than min([cluster size/number of jobs], number of workers). When a job is blocked its waiting tasks will not be scheduled.
  • Fairness with preemption: starting with the most recently scheduled, kill tasks of jobs that have more than their quota of tasks.
New flow-based scheduling

Graph topology
  • Represent instantaneous scheduling as a min-cost flow network problem
  • Each task has one unit of flow as its supply
  • There are nodes in the graph for each root and worker task, an unscheduled node for each job, a node for each server, a rack aggregator node for each rack, and a cluster aggregator node. 
  • There is the sink node through which all flows drain from the graph. Only unscheduled and server nodes are connected to it
  • Each root task has a single edge to the server where it is running. 
  • Each worker task has an edge to its job's unscheduled node, to the cluster-aggregator node, and to every rack and server in its preferred lists.
  • Workers that are executing have an edge to the server on which they are running
Scheduling behavior and parameters
  • Computer/rack/cluster edge cost is a function of the amount of data that would be transferred across rack and core switches
  • Unscheduled edge represents the penalty for leaving a task unscheduled (increases over time)
  • When a task is started, an additional cost (increasing with time) is added to its edges to nodes other than the server it is running on.
  • Tradeoffs are controlled with only three parameters: the cost of waiting in the queue, the cost of transferring data across the core switch, the cost of transferring data across a rack switch
  • The scheduler updates the graph when a job-related event occurs and on a regular timer event (some costs are time-dependent)
  • When the graph is changed the scheduler computes a new min-cost flow and then starts or kills tasks as necessary
Restrictions:
  • Multidimensional capacities (e.g. CPU and memory) cannot be easily represented and so Mesos-style fine-grained resource allocation is infeasible
  • Correlated constraints (e.g. run two task in the same rack) are equally challenging