May 13, 2011

Nephele and scheduling in the cloud

A summary of "Nephele: Efficient Parallel Data Processing in the Cloud".

Goal: a data processing framework with support for dynamic allocation and de-allocation of different computational resources in the cloud.

Compute resources available in a cloud environment are highly dynamic and possibly heterogeneous. In addition, the network topology is hidden so scheduling optimizations based on knowledge of the distance to a particular rack or server are impossible.


A job graph is a DAG of tasks connected with edges. Tasks process records implementing a common interface. A task may have an arbitrary number of input and output gates though which records enter and leave the task. A task can be seen as a set of parallel subtasks processing different partitions of the data. By default each subtask is assigned to a dedicated server.

A job graph is transformed into execution graph by the job manager. The execution graph has two levels of detail:

  • the abstract level describes the job execution on a task level (without parallelization) and the scheduling of instance allocation/deallocation. A Group Vertex is created for every Job Graph vertex to control the set of subtasks. The edges between Group Vertices are ephemeral and do not represent any physical communication paths.
  • the concrete level defines the mapping of subtasks to servers and the communication channels between them. An Execution Vertex is created for each subtask. Each Execution Vertex is always controlled by its corresponding Group Vertex. Execution vertices are connected by channels.

Channel types

All edges of an Execution Graph are replaced by a channel before processing can begin. There are three channel types:
  • A network channel is based on a TCP socket connection. Two subtasks connected via a network channel can be executed on different instances. Since they must be executed at the same time, they are required to run in the same Execution Stage.
  • An in-memory channel uses the server memory to buffer data. The two connected subtasks must be scheduled to run on the same instance and in the same Execution Stage.
  • A file channel allows two subtasks to exchange records via the local file system. Two subtasks are assigned to the same instance and the consuming group vertex must be scheduled to run in a later Execution Stage than the producing group vertex. Subtasks must exchange records across different stages via file channels because they are the only channel types which store the intermediate records in a persistent manner.

Execution Stage Scheduling

The requested server types may be temporarily unavailable in the cloud but for cost-efficiency servers should be ideally allocated just before they can be used. The Execution Graph is split into one or more Execution Stages.

  • when the processing of a stage begins, all servers required within the stage are allocated. 
  • all subtasks included in this stage are sent to the corresponding Task Managers and ready to receive records. 
  • before the processing of a new stage, all intermediate results of its preceding stages are stored in a persistent manner. So the execution stage is similar to a checkpoint because a job can be interrupted and resumed later after a stage is completed.

The user can provide manual hints to change the default scheduling behavior:

  • into how many parallel subtasks should a task be split at runtime
  • how many subtasks can share the same server
  • which execution groups can share servers
  • channel type of each edge
  • server type required by a task (to characterize the hardware requirements)
Server type support

Server types are simple string identifiers such as "m1.small". The scheduler is given a list of available server types and their cost per time unit. Each task can be executed on its own server type. To support it, each subtask must be mapped to an Execution Instance. An Execution Instance has an ID and an server type representing the hardware characteristics. 

Before beginning to process a new Execution Stage, the scheduler collects all Execution Instances from that stage and tries to replace them with matching cloud instances. If all required instances could be allocated the subtasks are sent to the corresponding server s and set up for execution.

Nephele keeps track of server allocation time to minimize costs when usage is charged by the hour. An idle server of a particular type is not immediately deallocated if a server of the same type is required in an upcoming Execution Stage. It is kept allocated until the end of its current lease period. If the next Execution Stage begins before the end of that period, the server is reassigned to the Execution Vertex of that stage. Otherwise the server is deallocated in time not to cause any additional cost.

Nephele is an open source project.

No comments: