Showing posts with label nephele. Show all posts
Showing posts with label nephele. Show all posts

Mar 10, 2015

Research projects inch towards mainstream big data analytics

Just recently I wrote about a previously small research prototype that grew up into a much larger system. It took me some time to recognize that another university project from a few years ago got even closer to the world of real software. After rebranding it is known as Flink and resides at Apache Incubator.

It raises a few interesting questions about evolution of what could be liberally called sql-on-hadoop query engines. A growing number of open source systems used for analytics look architecturally very similar. If you squint enough, each and every of Presto/Impala/Hive/Drill/Spark/AsterixDB/Flink and their ilk is fundamentally an relational MPP database. Or at least its query engine half.

Some of them started with a poor computational model. To be more precise, the mapreduce revolution of Hadoop 1.0 allowed people access to the kind of computational infrastructure previously absent from the open source world. Without alternatives, people were forced to abuse a simple model invented for log batch processing. Others didn't limit themselves with two kinds of jobs and went for real relational operators. 

Impala was arguably the first wake up call from the database universe. Its technological stack alone was a clear indication of the kind of gun real database folks bring to a Hadoop knife fight. Nowadays it's Spark that is so bright and shiny that every other day people suggest it could replace MapReduce in Hadoop stack.

I reckon it'll take years to see how it all plays out. If Spark, with its academic pedigree, can challenge Hadoop/MR could it be the case that AsterixDB or Flink mature enough in a couple of years to do something similar? They also started at respected universities, roughly at the same time, had the right relational MPP core from day one, have some Scala in the code base. Are they too late to the game and will never be able to acquire enough traction? Do they need a company to provide support (and aggressive marketing) as a precondition? Will they be able to run on YARN/Mesos well enough to compete in a common Hadoop environment?

May 27, 2011

Ideas from Hadoop/MapReduce alternatives

Hadoop as we know it today is synonymous with open-source MapReduce (which is an internal GOOG system). So currently its computational model requires chaining pairs of map and reduce jobs into more realistic workflows by means of Cascading or comparable framework.

Dryad, which was announced after MapReduce, promotes an alternative model based on execution of direct acyclic graphs of jobs. In contrast to hugely popular among the practitioners MapReduce, Dryad seems to be preferred by the academicians. There is a growing number of research frameworks based on a few ideas first popularized in this context by Dryad. What is even more interesting is that the academicians started open-sourcing their work recently (consider for example Nephele and Hyracks).

Smart people noticed this alternative trend quite some time ago. It remains to be seen if any of the wannabe contenders ever reaches maturity. Most likely if they fail to somehow merge into "Hadoop 2 (if not 3)" they will have only very limited user base. But at least the design ideas will be useful one way or another. To this end, I would like to summarize a few of them I found to be particularly relevant.

Programs as DAGs of computational jobs
  • Custom jobs can be added to the set of standardized reusable jobs
  • When data is partitioned, a job can be cloned and executed in parallel on all partitions
  • Jobs are connected with logical channels. Typically, there are at least local file- and socket-based implementations. 

Cloud-based v cluster-based
  • In traditional cluster systems distance-based heuristics can be applied to minimize data traffic between racks. In the cloud topology information is unavailable and the actual topology can change significantly.
  • In contrast to cluster environments, in the cloud it is possible to dynamically allocate and deallocate servers on demand. There are usually a limited number of available server types such as "small", "medium" and "large". Servers can be pooled by type and reused. The costs could be further minimized by keeping an idle server allocated until the end of its current period it was already charged for (typically, an hour).

Job scheduling
  • When a job has a preferred set of servers (e.g. because of data locality) it could be more efficient to wait a little longer for one of the servers to become available than to immediately schedule the job on the first idle server
  • Block (don't execute new tasks of) jobs that already are using more than a fair share of the servers
  • In a cluster, have a dedicated job queue for each server, rack and the entire cluster. Queue a  job (e.g. on the basis of its data locality) to the queues of its preferred servers and racks. When a job is scheduled remove it from all the queues.
  • Have dedicated capacity for long-running jobs on each server
  • Have a root task to keep track of the job state machine and resubmit, for a limited number of times, tasks that failed

DAG execution stages
  • Split the entire DAG into stages in a way reminiscent of topological sort
  • Only one stage is executed at a time. 
  • It limits the number of required servers and simplifies scheduling because only a subset of tasks needs to run simultaneously.
  • When a stage is completed, all the provisional results are "materialized" as local files. This has a side benefit of having checkpoints for free.
  • It simplifies automatic choice of channel types. Only jobs from the same stage can be connected with sockets. Jobs from different staged must be connected with file-based channels.

Multistage job startup
  • Instantiate tasks on chosen the servers, create network endpoints, send their descriptions to the scheduler
  • Merge descriptions and share it with all the tasks
  • On each server, resolve task peers using the merged descriptor

May 13, 2011

Nephele and scheduling in the cloud

A summary of "Nephele: Efficient Parallel Data Processing in the Cloud".


Goal: a data processing framework with support for dynamic allocation and de-allocation of different computational resources in the cloud.

Compute resources available in a cloud environment are highly dynamic and possibly heterogeneous. In addition, the network topology is hidden so scheduling optimizations based on knowledge of the distance to a particular rack or server are impossible.

Topology

A job graph is a DAG of tasks connected with edges. Tasks process records implementing a common interface. A task may have an arbitrary number of input and output gates though which records enter and leave the task. A task can be seen as a set of parallel subtasks processing different partitions of the data. By default each subtask is assigned to a dedicated server.

A job graph is transformed into execution graph by the job manager. The execution graph has two levels of detail:

  • the abstract level describes the job execution on a task level (without parallelization) and the scheduling of instance allocation/deallocation. A Group Vertex is created for every Job Graph vertex to control the set of subtasks. The edges between Group Vertices are ephemeral and do not represent any physical communication paths.
  • the concrete level defines the mapping of subtasks to servers and the communication channels between them. An Execution Vertex is created for each subtask. Each Execution Vertex is always controlled by its corresponding Group Vertex. Execution vertices are connected by channels.

Channel types

All edges of an Execution Graph are replaced by a channel before processing can begin. There are three channel types:
  • A network channel is based on a TCP socket connection. Two subtasks connected via a network channel can be executed on different instances. Since they must be executed at the same time, they are required to run in the same Execution Stage.
  • An in-memory channel uses the server memory to buffer data. The two connected subtasks must be scheduled to run on the same instance and in the same Execution Stage.
  • A file channel allows two subtasks to exchange records via the local file system. Two subtasks are assigned to the same instance and the consuming group vertex must be scheduled to run in a later Execution Stage than the producing group vertex. Subtasks must exchange records across different stages via file channels because they are the only channel types which store the intermediate records in a persistent manner.

Execution Stage Scheduling

The requested server types may be temporarily unavailable in the cloud but for cost-efficiency servers should be ideally allocated just before they can be used. The Execution Graph is split into one or more Execution Stages.

  • when the processing of a stage begins, all servers required within the stage are allocated. 
  • all subtasks included in this stage are sent to the corresponding Task Managers and ready to receive records. 
  • before the processing of a new stage, all intermediate results of its preceding stages are stored in a persistent manner. So the execution stage is similar to a checkpoint because a job can be interrupted and resumed later after a stage is completed.

The user can provide manual hints to change the default scheduling behavior:

  • into how many parallel subtasks should a task be split at runtime
  • how many subtasks can share the same server
  • which execution groups can share servers
  • channel type of each edge
  • server type required by a task (to characterize the hardware requirements)
Server type support

Server types are simple string identifiers such as "m1.small". The scheduler is given a list of available server types and their cost per time unit. Each task can be executed on its own server type. To support it, each subtask must be mapped to an Execution Instance. An Execution Instance has an ID and an server type representing the hardware characteristics. 

Before beginning to process a new Execution Stage, the scheduler collects all Execution Instances from that stage and tries to replace them with matching cloud instances. If all required instances could be allocated the subtasks are sent to the corresponding server s and set up for execution.

Nephele keeps track of server allocation time to minimize costs when usage is charged by the hour. An idle server of a particular type is not immediately deallocated if a server of the same type is required in an upcoming Execution Stage. It is kept allocated until the end of its current lease period. If the next Execution Stage begins before the end of that period, the server is reassigned to the Execution Vertex of that stage. Otherwise the server is deallocated in time not to cause any additional cost.

Nephele is an open source project.