Each job is managed by a root task that contains a state machine managing the workflow of that job. Actual work is done by worker tasks which may be executed multiple times and will always generate the same result.
There is a single centralized scheduling service responsible for a queue of jobs for the cluster. A root task sends a list of ready workers and their input data summaries to the scheduler. The scheduler chooses computers for tasks and makes the root task to start them. The root task is responsible for back-tracking through the dependency graph and resubmitting in case of a failure. If the scheduler decides to kill a worker task before it completes it will notify the root task.
A worker is not submitted to the scheduler until all of its input files have been written to the cluster. When a worker is ready its root task computes, for each computer, the amount of data that the worker would read across the network. The root then creates for it a list of preferred computers and a list of preferred racks.
Fairness: a job which takes T seconds when executed alone should take no more than N*T seconds when there are N concurrent jobs. There is a hard limit on the total number of jobs in the cluster and when it's reached new jobs are queued (and started later in order of submission time). Each computer runs only one task at a time and each job is allowed to run on a certain number of servers.
Old queue-based scheduling
Architecture
- One queue for each server, one queue for each rack, one queue for the entire cluster
- A new job is added to the queues of its preferred servers, preferred racks and the cluster-wide queue
- When the job is scheduled it's removed from all the queues
- When a job is started its root task is executed on a server not running another root task; if there's a worker task it is killed and resubmitted
- Basic algorithm: when a server becomes idle, assign it a task from its server queue, its rack queue or the cluster queue.
- Greedy fairness: block jobs that have more tasks running than min([cluster size/number of jobs], number of workers). When a job is blocked its waiting tasks will not be scheduled.
- Fairness with preemption: starting with the most recently scheduled, kill tasks of jobs that have more than their quota of tasks.
Graph topology
- Represent instantaneous scheduling as a min-cost flow network problem
- Each task has one unit of flow as its supply
- There are nodes in the graph for each root and worker task, an unscheduled node for each job, a node for each server, a rack aggregator node for each rack, and a cluster aggregator node.
- There is the sink node through which all flows drain from the graph. Only unscheduled and server nodes are connected to it
- Each root task has a single edge to the server where it is running.
- Each worker task has an edge to its job's unscheduled node, to the cluster-aggregator node, and to every rack and server in its preferred lists.
- Workers that are executing have an edge to the server on which they are running
- Computer/rack/cluster edge cost is a function of the amount of data that would be transferred across rack and core switches
- Unscheduled edge represents the penalty for leaving a task unscheduled (increases over time)
- When a task is started, an additional cost (increasing with time) is added to its edges to nodes other than the server it is running on.
- Tradeoffs are controlled with only three parameters: the cost of waiting in the queue, the cost of transferring data across the core switch, the cost of transferring data across a rack switch
- The scheduler updates the graph when a job-related event occurs and on a regular timer event (some costs are time-dependent)
- When the graph is changed the scheduler computes a new min-cost flow and then starts or kills tasks as necessary
- Multidimensional capacities (e.g. CPU and memory) cannot be easily represented and so Mesos-style fine-grained resource allocation is infeasible
- Correlated constraints (e.g. run two task in the same rack) are equally challenging
No comments:
Post a Comment