Showing posts with label hadoop. Show all posts
Showing posts with label hadoop. Show all posts

Aug 11, 2015

Writing Parquet file

Parquet is a column-oriented binary file format very popular in big data analytics circles. Nowadays it's probably impossible to find a sql-on-hadoop engine that does not support this format. The Parquet library makes it trivial to write Avro and Protocol Buffers records to a file. Most of the time you can't beat the simplicity of a "df.write().parquet("some-file.par")"-like Spark one-liner.

But it's also possible to find yourself in a situation when you want to export data from an existing system which does not use Avro-like records. It's actually quite easy to do. One would need to:
  • extend the WriteSupport class responsible for writing a record instance to Parquet output
  • extend the ParquetWriter class to instantiate the new WriteSupport subclass
  • create a schema representing your record fields
  • for each record instance, write the field values to RecordConsumer

For simplicity sake let's assume a record type without nested structures. The high-level writing sequence looks roughy like that:

Here Record Writer represents a collection of Field Writers. Each Field Writer implementation knows how to write a field of a particular primitive type. The sequence is straightforward, please look at the example source code for details. Notice that to write a null value for a field it's enough to just skip the field for that record.

One unexpected complication is that for no particular reason Parquet library uses java.util.logging. This is the first time in my life I see anybody using it. You are not likely to have a logging configuration for it in your code base. You will definitely want to separate Parquet logs from the rest because they could be quite verbose. I actually had to use a rather unpleasant way to configure logging properly. 

Jan 29, 2012

Scheduler design in YARN / MapReduce2 / Hadoop 0.23

YARN

The Hadoop branch started with version 0.23 introduced a few significant changes to Hadoop implementation. From the middleware perspective, the most salient one is a brand new approach to scheduling. Previously, Map and Reduce slots used to be explicitly distinguished for scheduling purposes which limited potential for using Hadoop framework in non-MapReduce environments. In YARN, one of the key goals was to make the entire framework more generic, Mesos-style, so that internal distributed machinery can be used for other computational models such as MPI.

I am particularly curious about the possibility to heavily customize YARN for running distributed computations based on a non-standard computational model. Now that even MSFT dropped their, arguably more powerful alternative, Hadoop is destined to become the most advanced open source distributed infrastructure. So many in-house computational platforms could potentially benefit from reusing its core components.

There is still interesting tension between fundamental Hadoop trade-offs and low latency requirements. To guarantee control over resource consumption and allow termination of misbehaving tasks Hadoop starts a new JVM instance for each allocated container. JVM startup and loading of task classes take a few seconds which would be spared if JVMs were reused. This approach is taken by, for example, by GridGain for the price of inability to restrict resource consumption of a particular task on a given node. We'll see how Hadoop developers extend resource allocation to CPUs which Mesos achieves with Linux Containers.

Scheduler API

The way the Scheduler API is defined has significant influence on how general and so reusable YARN will be. Currently, YARN is shipped with the same schedulers as before but if it gets used outside of default MapReduce world custom schedulers will likely be one of the most popular extension points.

If you look at the diagram below, you can see key abstractions of YARN Scheduler design.

  • Scheduler is notified about topology changes via event-based mechanism
  • Scheduler operates on one or more hierarchical task queues. A queue can be chosen in Job Configuration using the mapred.job.queue.name property.
  • Scheduler allocates resources on the basis of resource requests that currently are limited to memory only. A request includes optional preference for a particular host or rack.
  • Scheduler supports fault tolerance with the ability to recover itself from a state snapshot provided by Resource Manager

Scheduler events include notifications about changes to the topology of available nodes and the set of running applications:
Side notes

It remains to be seen how quickly YARN takes over classical Hadoop. Even though the new code base is more interesting, it seems to be rather immature despite three years since the previous major version. Some things which surprised me:

  • There are important pieces of code such as Scheduler recovery from failures which are currently commented out. I also saw a TODO item stating something like "synchronization approach is broken here" in another place. Probably it's me, but before I merged a couple files with trunk I had not been able to build original v0.23.
  • The good news is that they finally migrated to Maven. The bad news is that for a project of this magnitude they have only a handful of Maven modules and those are strangely nested and interleaved with multiple non-maven directories.
  • It's quite odd not to have any dependency injection framework used in a large server-side Java system.
  • Not only they have their own tiny web framework but it is mixed with pure server-side code
  • Even though Guava is declared as a dependency it is used only sporadically. As an example, they even have their own Service hierarchy

Proliferation of Hadoop branches does not make it any easier I guess.

May 27, 2011

Ideas from Hadoop/MapReduce alternatives

Hadoop as we know it today is synonymous with open-source MapReduce (which is an internal GOOG system). So currently its computational model requires chaining pairs of map and reduce jobs into more realistic workflows by means of Cascading or comparable framework.

Dryad, which was announced after MapReduce, promotes an alternative model based on execution of direct acyclic graphs of jobs. In contrast to hugely popular among the practitioners MapReduce, Dryad seems to be preferred by the academicians. There is a growing number of research frameworks based on a few ideas first popularized in this context by Dryad. What is even more interesting is that the academicians started open-sourcing their work recently (consider for example Nephele and Hyracks).

Smart people noticed this alternative trend quite some time ago. It remains to be seen if any of the wannabe contenders ever reaches maturity. Most likely if they fail to somehow merge into "Hadoop 2 (if not 3)" they will have only very limited user base. But at least the design ideas will be useful one way or another. To this end, I would like to summarize a few of them I found to be particularly relevant.

Programs as DAGs of computational jobs
  • Custom jobs can be added to the set of standardized reusable jobs
  • When data is partitioned, a job can be cloned and executed in parallel on all partitions
  • Jobs are connected with logical channels. Typically, there are at least local file- and socket-based implementations. 

Cloud-based v cluster-based
  • In traditional cluster systems distance-based heuristics can be applied to minimize data traffic between racks. In the cloud topology information is unavailable and the actual topology can change significantly.
  • In contrast to cluster environments, in the cloud it is possible to dynamically allocate and deallocate servers on demand. There are usually a limited number of available server types such as "small", "medium" and "large". Servers can be pooled by type and reused. The costs could be further minimized by keeping an idle server allocated until the end of its current period it was already charged for (typically, an hour).

Job scheduling
  • When a job has a preferred set of servers (e.g. because of data locality) it could be more efficient to wait a little longer for one of the servers to become available than to immediately schedule the job on the first idle server
  • Block (don't execute new tasks of) jobs that already are using more than a fair share of the servers
  • In a cluster, have a dedicated job queue for each server, rack and the entire cluster. Queue a  job (e.g. on the basis of its data locality) to the queues of its preferred servers and racks. When a job is scheduled remove it from all the queues.
  • Have dedicated capacity for long-running jobs on each server
  • Have a root task to keep track of the job state machine and resubmit, for a limited number of times, tasks that failed

DAG execution stages
  • Split the entire DAG into stages in a way reminiscent of topological sort
  • Only one stage is executed at a time. 
  • It limits the number of required servers and simplifies scheduling because only a subset of tasks needs to run simultaneously.
  • When a stage is completed, all the provisional results are "materialized" as local files. This has a side benefit of having checkpoints for free.
  • It simplifies automatic choice of channel types. Only jobs from the same stage can be connected with sockets. Jobs from different staged must be connected with file-based channels.

Multistage job startup
  • Instantiate tasks on chosen the servers, create network endpoints, send their descriptions to the scheduler
  • Merge descriptions and share it with all the tasks
  • On each server, resolve task peers using the merged descriptor

Apr 24, 2011

Starfish and Hadoop self-tuning

A summary of "Starfish: A Self-tuning System for Big Data Analytics".

Most of Hadoop features must be managed manually with multiple obscure parameters. As an example, Hadoop supports dynamic cluster membership changes but it has no support for deciding when to add/remove nodes or when to rebalance the data layout. Instead of aiming for peak performance, Starfish project wants to provide good Hadoop performance automatically.

Three levels of Hadoop workload optimization:
  • Individual MR jobs
  • MR jobs assembled into a workflow (e.g. generated from HiveQL or by a Cascading-style framework)
  • Collections of workflows
Job-level tuning
  • JIT optimizer (instead of manual configuration of 190 parameters) based on 
  • Profiler (dynamic instrumentation to learn performance models) and 
  • Sampler (statistics about input/intermediate/output key-value spaces of a job)
The Profiler creates a job profile for different phases of MR job
  • Timings view: where wall-clock time is spent in each phase
  • Data flow view: how much data is processed in each phase
  • Resource-level view: how many resources such as CPU and memory is used in each phase
Workflow-level tuning
  • Workflow-aware Scheduler ("global optimization")
  • WhatIf engine (uses performance models and a job profile to estimate a new profile for different configuration parameters)
  • Data Manager (rebalance HDFS data blocks using different block placement policies)
Workload-level tuning
  • Workload Optimizer to generate an equivalent, but optimized, collection of workflows using 
  • Data-flow sharing (reusing the same job on behalf of different workflows)
  • Materialization (caching intermediate data for later reuse, probably by other workflows; also helps avoid cascading reexecution)
  • Reorganization (automatically chosen alternative means of keeping intermediate data such as key-value and column stores)
Starfish is built on top of Hadoop. Its input is expressed in a new language called Lastword. The language is not supposed to be used directly by humans. Instead there are translators from HiveQL-style languages to submit a collection of MR workflows. Those workflows can be DAGs of MR jobs, select-project-join-aggregate logical specification or user-defined functions. Workflows can be annotated with metadata such as scheduling hints, data statistics and data layouts.

Starfish is an open-source project.

Apr 23, 2011

Delay Scheduling and Hadoop Fair Scheduler

I am blessed with a job which is in a domain sophisticated enough to encourage reading research papers. I am planning to post my notes on the most interesting ones at least as a means of keeping key notes readily available.

So without further ado, here goes a summary of "Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling":

The key trade-off is fairness (in allocation of resources) v data locality (executing a job on a node that already has input data for the job). The scheduler goal is sharing a cluster between multiple users with a mix of long batch jobs and short interactive queries over a common data set. Fair scheduling requires resource reallocation when the number of jobs changes.

The original Hadoop FIFO scheduler:

  • assign tasks in response to heartbeats sent by slaves which report the number of free map and reduce slots on the slave
  • scan through jobs in order of priority and submit time to find one with a task of the required type
  • for maps, after selecting a job greedily pick the map task in the job with data closest to the slave
Locality problems with naive fair sharing (assign free slots to the job that has the fewest running tasks):

  • Head-of-line scheduling: small jobs are likely to have their tasks sent to random nodes 
  • Sticky Slots: a tendency for a job to be assigned the same slot repeatedly (a task completes, its job has fewer tasks than the others, the slot is reassigned to the job again)
The Hadoop Fair Scheduler: 

  • divide resources using max-min fair sharing to achieve statistical multiplexing
  • place computations near their input data to maximize system throughput
A two-level scheduling hierarchy: 

  • allocate task slots across pools using weighted fair sharing
  • let each pool allocate its slots using either FIFO with priorities or a second level of fair sharing (each pool can be given a minimum share guaranteed to be given as long as the pool contains jobs)
Delay Scheduling

  • each job has two timeouts
  • a job with no tasks local to a node is not scheduled for the duration of the first timeout. 
  • the job is not scheduled for the duration of the second timeout if it does not have tasks local to the rack of the node.
Key enablers: 

  • most tasks are short compared to jobs
  • multiple locations in which a task can run to read a given data block (including multiple task slots per node)