Apr 28, 2011

Mesos and inspiration for next generation Hadoop

The notion of Next Generation Hadoop (NGH) is somewhat blurred at this point. Recent announcements by YHOO and Facebook could be construed as at least two independent branches referred to as NGH. So even now that YHOO is merging all its development to main Apache codeline I am not sure how many NGHs are being developed right now and how convergent that process is.

The only technical description of the NGH I am aware of was written by YHOO engineers. Even though that posts does not mention Mesos I am pretty sure it is not coincidental that the NGH shares so much with it. It is also noteworthy that Mesos itself is now an Apache project.

A summary of "Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center":

Goal: when running multiple frameworks in the same cluster
  • improve utilization through statistical multiplexing
  • share datasets that are too expensive to replicate
Two-stage scheduling with resource offers
  • Mesos decides how many resources to offer each framework
  • Frameworks decide which resources to accept and which computations to run on them
  • A fault-tolerant Zookeeper-based master process manages slave daemons running on each cluster node
  • Slaves report to the master which vacant resources they have (e.g. {2CPUs,16GB})
  • A framework scheduler registers with the master
  • A framework scheduler is offered resources, decides which ones to use and describe tasks to launch on those resources
  • A framework executor is launched on slave nodes to execute tasks
  • Each resource offer is a list of free resources on multiple slaves
  • A pluggable master strategy decides how many resources to offer to teach framework
  • Supports long tasks by allowing to designate a set of resources on a slave for use by long tasks
  • Linux containers are used to isolate frameworks
Dominant Resource Fairness
  • Equalize each framework's fractional share of its dominant resource (i.e. the resource that it has the largest fractional share of)
  • Example: make F1's share of CPU equals F2's share of RAM if F1 is CPU-pound and F2 needs mostly memory
  • Filters registered by a framework with the master to short-circuit the rejection process (e.g. only from nodes from a given list or at least as many resources free)
  • For the purpose of allocation, count offered resources as used by the framework to encourage the framework to respond quickly 
  • Rescind an offer if the framework does not respond for too long

Apr 27, 2011

Quincy and scheduling in Dryad

A summary of "Quincy: Fair Scheduling for Distributed Computing Clusters"

Each job is managed by a root task that contains a state machine managing the workflow of that job. Actual work is done by worker tasks which may be executed multiple times and will always generate the same result.

There is a single centralized scheduling service responsible for a queue of jobs for the cluster. A root task sends a list of ready workers and their input data summaries to the scheduler. The scheduler chooses computers for tasks and makes the root task to start them. The root task is responsible for back-tracking through the dependency graph and resubmitting in case of a failure. If the scheduler decides to kill a worker task before it completes it will notify the root task.

A worker is not submitted to the scheduler until all of its input files have been written to the cluster. When a worker is ready its root task computes, for each computer, the amount of data that the worker would read across the network. The root then creates for it a list of preferred computers and a list of preferred racks.

Fairness: a job which takes T seconds when executed alone should take no more than N*T seconds when there are N concurrent jobs. There is a hard limit on the total number of jobs in the cluster and when it's reached new jobs are queued (and started later in order of submission time). Each computer runs only one task at a time and each job is allowed to run on a certain number of servers.

Old queue-based scheduling

  • One queue for each server, one queue for each rack, one queue for the entire cluster
  • A new job is added to the queues of its preferred servers, preferred racks and the cluster-wide queue
  • When the job is scheduled it's removed from all the queues
  • When a job is started its root task is executed on a server not running another root task; if there's a worker task it is killed and resubmitted
  • Basic algorithm: when a server becomes idle, assign it a task from its server queue, its rack queue or the cluster queue.
  • Greedy fairness: block jobs that have more tasks running than min([cluster size/number of jobs], number of workers). When a job is blocked its waiting tasks will not be scheduled.
  • Fairness with preemption: starting with the most recently scheduled, kill tasks of jobs that have more than their quota of tasks.
New flow-based scheduling

Graph topology
  • Represent instantaneous scheduling as a min-cost flow network problem
  • Each task has one unit of flow as its supply
  • There are nodes in the graph for each root and worker task, an unscheduled node for each job, a node for each server, a rack aggregator node for each rack, and a cluster aggregator node. 
  • There is the sink node through which all flows drain from the graph. Only unscheduled and server nodes are connected to it
  • Each root task has a single edge to the server where it is running. 
  • Each worker task has an edge to its job's unscheduled node, to the cluster-aggregator node, and to every rack and server in its preferred lists.
  • Workers that are executing have an edge to the server on which they are running
Scheduling behavior and parameters
  • Computer/rack/cluster edge cost is a function of the amount of data that would be transferred across rack and core switches
  • Unscheduled edge represents the penalty for leaving a task unscheduled (increases over time)
  • When a task is started, an additional cost (increasing with time) is added to its edges to nodes other than the server it is running on.
  • Tradeoffs are controlled with only three parameters: the cost of waiting in the queue, the cost of transferring data across the core switch, the cost of transferring data across a rack switch
  • The scheduler updates the graph when a job-related event occurs and on a regular timer event (some costs are time-dependent)
  • When the graph is changed the scheduler computes a new min-cost flow and then starts or kills tasks as necessary
  • Multidimensional capacities (e.g. CPU and memory) cannot be easily represented and so Mesos-style fine-grained resource allocation is infeasible
  • Correlated constraints (e.g. run two task in the same rack) are equally challenging

Apr 24, 2011

Starfish and Hadoop self-tuning

A summary of "Starfish: A Self-tuning System for Big Data Analytics".

Most of Hadoop features must be managed manually with multiple obscure parameters. As an example, Hadoop supports dynamic cluster membership changes but it has no support for deciding when to add/remove nodes or when to rebalance the data layout. Instead of aiming for peak performance, Starfish project wants to provide good Hadoop performance automatically.

Three levels of Hadoop workload optimization:
  • Individual MR jobs
  • MR jobs assembled into a workflow (e.g. generated from HiveQL or by a Cascading-style framework)
  • Collections of workflows
Job-level tuning
  • JIT optimizer (instead of manual configuration of 190 parameters) based on 
  • Profiler (dynamic instrumentation to learn performance models) and 
  • Sampler (statistics about input/intermediate/output key-value spaces of a job)
The Profiler creates a job profile for different phases of MR job
  • Timings view: where wall-clock time is spent in each phase
  • Data flow view: how much data is processed in each phase
  • Resource-level view: how many resources such as CPU and memory is used in each phase
Workflow-level tuning
  • Workflow-aware Scheduler ("global optimization")
  • WhatIf engine (uses performance models and a job profile to estimate a new profile for different configuration parameters)
  • Data Manager (rebalance HDFS data blocks using different block placement policies)
Workload-level tuning
  • Workload Optimizer to generate an equivalent, but optimized, collection of workflows using 
  • Data-flow sharing (reusing the same job on behalf of different workflows)
  • Materialization (caching intermediate data for later reuse, probably by other workflows; also helps avoid cascading reexecution)
  • Reorganization (automatically chosen alternative means of keeping intermediate data such as key-value and column stores)
Starfish is built on top of Hadoop. Its input is expressed in a new language called Lastword. The language is not supposed to be used directly by humans. Instead there are translators from HiveQL-style languages to submit a collection of MR workflows. Those workflows can be DAGs of MR jobs, select-project-join-aggregate logical specification or user-defined functions. Workflows can be annotated with metadata such as scheduling hints, data statistics and data layouts.

Starfish is an open-source project.

Apr 23, 2011

Delay Scheduling and Hadoop Fair Scheduler

I am blessed with a job which is in a domain sophisticated enough to encourage reading research papers. I am planning to post my notes on the most interesting ones at least as a means of keeping key notes readily available.

So without further ado, here goes a summary of "Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling":

The key trade-off is fairness (in allocation of resources) v data locality (executing a job on a node that already has input data for the job). The scheduler goal is sharing a cluster between multiple users with a mix of long batch jobs and short interactive queries over a common data set. Fair scheduling requires resource reallocation when the number of jobs changes.

The original Hadoop FIFO scheduler:

  • assign tasks in response to heartbeats sent by slaves which report the number of free map and reduce slots on the slave
  • scan through jobs in order of priority and submit time to find one with a task of the required type
  • for maps, after selecting a job greedily pick the map task in the job with data closest to the slave
Locality problems with naive fair sharing (assign free slots to the job that has the fewest running tasks):

  • Head-of-line scheduling: small jobs are likely to have their tasks sent to random nodes 
  • Sticky Slots: a tendency for a job to be assigned the same slot repeatedly (a task completes, its job has fewer tasks than the others, the slot is reassigned to the job again)
The Hadoop Fair Scheduler: 

  • divide resources using max-min fair sharing to achieve statistical multiplexing
  • place computations near their input data to maximize system throughput
A two-level scheduling hierarchy: 

  • allocate task slots across pools using weighted fair sharing
  • let each pool allocate its slots using either FIFO with priorities or a second level of fair sharing (each pool can be given a minimum share guaranteed to be given as long as the pool contains jobs)
Delay Scheduling

  • each job has two timeouts
  • a job with no tasks local to a node is not scheduled for the duration of the first timeout. 
  • the job is not scheduled for the duration of the second timeout if it does not have tasks local to the rack of the node.
Key enablers: 

  • most tasks are short compared to jobs
  • multiple locations in which a task can run to read a given data block (including multiple task slots per node)