Showing posts with label research. Show all posts
Showing posts with label research. Show all posts

Jun 12, 2016

Druid and Pinot low latency OLAP design ideas

0. Introduction

Imagine a data schema that can be described as (M, D1, .. , Dn, {(t,v)} ) where every entry has
  • M -  a measure (e.g. Site Visits)
  • Di - a few dimensions (e.g. Country, Traffic Source)
  • {(t,v)} - a time series with a measure value for each timestamp

Calling it a time series OLAP could be a misnomer. Time is just another dimension in a typical OLAP schema. Time series usually implies just a metric without dimensions (e.g. server CPU usage as a function of time). Nevertheless in real life systems dealing with such data is quite common. They also face similar challenges. Chief among them is the need for low latency/interactive queries. So traditional Hadoop scalability is not enough. There are a couple systems out there that are less well known than Spark or Flink but seem to be good at solving the time series OLAP puzzle.

1. Druid ideas 
  • For event streams, pre-aggregate small batches (at some minimum granularity e.g. 1 min)
  • Time is a dimension to be treated differently because all the queries use it
  • Partition on time into batches (+ versions), each to become a file
  • Each batch to have (IR-style): a dictionary of terms, a compressed bitmap for each term (and so query filters are compiled into efficient binary AND/OR operations on large bitmaps)
  • Column-oriented format for partitions
  • A separate service to decide which partitions to be stored /cached by which historic data server
  • Metadata is stored as BLOBs in a relational DB
  • ZK for service discovery and segment mappings
  • Historical and real-time nodes, queried by coordinators separately to produce final results

2. Pinot ideas
  • Kafka and Hadoop M/R based
  • Historical and real-time nodes, queried by brokers
  • Data segment : fixed-size blocks of records stored as a single file
  • Data segments
  • Time is special. Ex: brokers know how to query realtime and historic nodes for the same query by sending queries with different time filters. Realtime is slower because it aggregates at different granularity ("min/hour range instead of day")
  • ZK for service discovery
  • Realtime has segments in-memory, flushes to disk periodically, queries both.
  • Multiple segments (with the same schema) constitute a table
  • Column-oriented format for partition for segments
  • A few indices for a segment (forward, single-value sorted, )
3. Common threads
  • Column oriented storage formats
  • Partition on time
  • Information Retrieval/Search Engine techniques for data storage (inverted index)
  • ZK is a popular choice of discovery service
4. In other news

There also smaller scale products using search engine technology for OLAP data storage.

May 27, 2011

Ideas from Hadoop/MapReduce alternatives

Hadoop as we know it today is synonymous with open-source MapReduce (which is an internal GOOG system). So currently its computational model requires chaining pairs of map and reduce jobs into more realistic workflows by means of Cascading or comparable framework.

Dryad, which was announced after MapReduce, promotes an alternative model based on execution of direct acyclic graphs of jobs. In contrast to hugely popular among the practitioners MapReduce, Dryad seems to be preferred by the academicians. There is a growing number of research frameworks based on a few ideas first popularized in this context by Dryad. What is even more interesting is that the academicians started open-sourcing their work recently (consider for example Nephele and Hyracks).

Smart people noticed this alternative trend quite some time ago. It remains to be seen if any of the wannabe contenders ever reaches maturity. Most likely if they fail to somehow merge into "Hadoop 2 (if not 3)" they will have only very limited user base. But at least the design ideas will be useful one way or another. To this end, I would like to summarize a few of them I found to be particularly relevant.

Programs as DAGs of computational jobs
  • Custom jobs can be added to the set of standardized reusable jobs
  • When data is partitioned, a job can be cloned and executed in parallel on all partitions
  • Jobs are connected with logical channels. Typically, there are at least local file- and socket-based implementations. 

Cloud-based v cluster-based
  • In traditional cluster systems distance-based heuristics can be applied to minimize data traffic between racks. In the cloud topology information is unavailable and the actual topology can change significantly.
  • In contrast to cluster environments, in the cloud it is possible to dynamically allocate and deallocate servers on demand. There are usually a limited number of available server types such as "small", "medium" and "large". Servers can be pooled by type and reused. The costs could be further minimized by keeping an idle server allocated until the end of its current period it was already charged for (typically, an hour).

Job scheduling
  • When a job has a preferred set of servers (e.g. because of data locality) it could be more efficient to wait a little longer for one of the servers to become available than to immediately schedule the job on the first idle server
  • Block (don't execute new tasks of) jobs that already are using more than a fair share of the servers
  • In a cluster, have a dedicated job queue for each server, rack and the entire cluster. Queue a  job (e.g. on the basis of its data locality) to the queues of its preferred servers and racks. When a job is scheduled remove it from all the queues.
  • Have dedicated capacity for long-running jobs on each server
  • Have a root task to keep track of the job state machine and resubmit, for a limited number of times, tasks that failed

DAG execution stages
  • Split the entire DAG into stages in a way reminiscent of topological sort
  • Only one stage is executed at a time. 
  • It limits the number of required servers and simplifies scheduling because only a subset of tasks needs to run simultaneously.
  • When a stage is completed, all the provisional results are "materialized" as local files. This has a side benefit of having checkpoints for free.
  • It simplifies automatic choice of channel types. Only jobs from the same stage can be connected with sockets. Jobs from different staged must be connected with file-based channels.

Multistage job startup
  • Instantiate tasks on chosen the servers, create network endpoints, send their descriptions to the scheduler
  • Merge descriptions and share it with all the tasks
  • On each server, resolve task peers using the merged descriptor

May 13, 2011

Nephele and scheduling in the cloud

A summary of "Nephele: Efficient Parallel Data Processing in the Cloud".


Goal: a data processing framework with support for dynamic allocation and de-allocation of different computational resources in the cloud.

Compute resources available in a cloud environment are highly dynamic and possibly heterogeneous. In addition, the network topology is hidden so scheduling optimizations based on knowledge of the distance to a particular rack or server are impossible.

Topology

A job graph is a DAG of tasks connected with edges. Tasks process records implementing a common interface. A task may have an arbitrary number of input and output gates though which records enter and leave the task. A task can be seen as a set of parallel subtasks processing different partitions of the data. By default each subtask is assigned to a dedicated server.

A job graph is transformed into execution graph by the job manager. The execution graph has two levels of detail:

  • the abstract level describes the job execution on a task level (without parallelization) and the scheduling of instance allocation/deallocation. A Group Vertex is created for every Job Graph vertex to control the set of subtasks. The edges between Group Vertices are ephemeral and do not represent any physical communication paths.
  • the concrete level defines the mapping of subtasks to servers and the communication channels between them. An Execution Vertex is created for each subtask. Each Execution Vertex is always controlled by its corresponding Group Vertex. Execution vertices are connected by channels.

Channel types

All edges of an Execution Graph are replaced by a channel before processing can begin. There are three channel types:
  • A network channel is based on a TCP socket connection. Two subtasks connected via a network channel can be executed on different instances. Since they must be executed at the same time, they are required to run in the same Execution Stage.
  • An in-memory channel uses the server memory to buffer data. The two connected subtasks must be scheduled to run on the same instance and in the same Execution Stage.
  • A file channel allows two subtasks to exchange records via the local file system. Two subtasks are assigned to the same instance and the consuming group vertex must be scheduled to run in a later Execution Stage than the producing group vertex. Subtasks must exchange records across different stages via file channels because they are the only channel types which store the intermediate records in a persistent manner.

Execution Stage Scheduling

The requested server types may be temporarily unavailable in the cloud but for cost-efficiency servers should be ideally allocated just before they can be used. The Execution Graph is split into one or more Execution Stages.

  • when the processing of a stage begins, all servers required within the stage are allocated. 
  • all subtasks included in this stage are sent to the corresponding Task Managers and ready to receive records. 
  • before the processing of a new stage, all intermediate results of its preceding stages are stored in a persistent manner. So the execution stage is similar to a checkpoint because a job can be interrupted and resumed later after a stage is completed.

The user can provide manual hints to change the default scheduling behavior:

  • into how many parallel subtasks should a task be split at runtime
  • how many subtasks can share the same server
  • which execution groups can share servers
  • channel type of each edge
  • server type required by a task (to characterize the hardware requirements)
Server type support

Server types are simple string identifiers such as "m1.small". The scheduler is given a list of available server types and their cost per time unit. Each task can be executed on its own server type. To support it, each subtask must be mapped to an Execution Instance. An Execution Instance has an ID and an server type representing the hardware characteristics. 

Before beginning to process a new Execution Stage, the scheduler collects all Execution Instances from that stage and tries to replace them with matching cloud instances. If all required instances could be allocated the subtasks are sent to the corresponding server s and set up for execution.

Nephele keeps track of server allocation time to minimize costs when usage is charged by the hour. An idle server of a particular type is not immediately deallocated if a server of the same type is required in an upcoming Execution Stage. It is kept allocated until the end of its current lease period. If the next Execution Stage begins before the end of that period, the server is reassigned to the Execution Vertex of that stage. Otherwise the server is deallocated in time not to cause any additional cost.

Nephele is an open source project.

May 4, 2011

Hyracks and parallel dataflow scheduling in stages


Goal: a parallel platform to be a target for compiling Hive-style declarative data processing languages. Collections of data items are stored as local partitions distributed across the nodes of the cluster. A Hyracks job processes one or more collections of data to produce one or more output collections.

Job topology

  • A Hyracks job is a dataflow DAG composed of operators (vertices) and connectors (edges).
  • An individual operator (e.g. HashJoin) consists of one or more activities (e.g. JoinBuild to build a hashtable from one input and JoinProbe to probe it with the other input).
  • Each activity is executed as a set of cloned tasks operating on different partitions of the data flowing through the activity. Each task consumes a partition of the activity's inputs and produces a partition of its output.
  • An operator descriptor knows about output record format and operator activities. 
  • The tasks corresponding to activities of the same partition of the same operator may need to share state (e.g. JoinBuild and JoinProbe ahers a hashtable) and so they are co-located by Haracks. Even though the previous task is not active anymore when the next is started a shared context is provided to exchange the required information between the two tasks.
Standard building blocks

Available operators include file readers/writers, sorters (in-memory and external), joiners (hash-based, GRACE, hybrid hash) and aggregators (e.g. hash-based).

Connectors distribute data produced by a set of sender operators to a set of receiver operators. Available connectors include:

  • M:N Hash-Partitioner: hashes every tuple to generate the receiver number. Tuples  keep their initial order on
  • the receiver side.
  • M:N Hash-Partitioning Merger: hashes each tuple to find the receiver. On the receiver side merges streams coming from different senders based on a given comparator.
  • M:N Range-Partitioner: associates one receiver with each range of partitioning field in a set of disjoint ranges
  • M:N Replicator: copies the data produced by every sender to every receiver.
  • 1:1 Connector: connects exactly one sender to one receiver.
Job execution

  • Typically, connections between activities of one operator are blocking (e.g. between JoinBuild and JoinProbe) while connections between operators are not (e.g. between input file reading operator and JoinBuild activity). 
  • Activities that are transitively connected to other activities in a job only through dataflow edges are said to form a stage. So a stage is a set of activities that can be co-scheduled. 
  • Stages are independently scheduled and executed in the order in which they become ready. A stage is ready to execute when all of its dependencies have successfully completed execution. Each stage is expanded into tasks just prior to its execution.

Currently, there are two scheduling choices:

  • configure the exact number of partitions of an operator to be created and server for each partition
  • configure just the number of partitions to allow automatic placement of the partitions
  • automatic partitioning and placement based on the estimated resource requirements and the current availability of resources is being developed
A Hyracks cluster is comprised of the cluster controller and node controllers. When a stage is ready to run, the cluster controller starts the stage tasks on a set of node controllers. It then waits until the stage completes or an node controller failure is detected.

A task is started in three steps:

1) Activation

  • In response to a request from the CC each NC creates its designated tasks.
  • For each task accepting input from other tasks a network endpoint is created. The mapping of tasks on endpoints is sent back to the CC
2) Pairing

  • The CC merges all the responses to create a job-wise address map.
  • The map is sent to all the NCs so that each sender knows its receiver addresses
3) Connecting

  • Once pairing is completed the CC makes the NCs start their tasks
Data stream management

  • A dataflow is a stream of records with an arbitrary number of fields. A fixed-size chunk of contiguous bytes representing a sequence of serialized records is called a frame. 
  • A frame is the unit of data transfer between tasks. To avoid excessive garbage churn Hyracks provides interfaces for comparing and hashing fields that can be implemented to work off of the binary data in a frame.
  • A task is implemented as a push-based iterator that receives a frame at a time from its inputs and pushes result frames to its consumers. Any repartitioning is achieved by using a Connector.
  • There are as many send-side instances of a connector as tasks in the producing activity and as many receive-side instances as tasks in the consuming activity.
  • When a send-side instance of a connector receives a frame from the producer, it applies its redistribution logic (e.g. hash-partitioning on a field) to move records to the relevant receive-side connector instances. Each record is copied to the target frame meant for the appropriate receive-side instance. When a target frame is full, the send-side instance sends the frame to the receive-side instance.
  • The buffering strategy for received frames is configurable and can use one network buffer either for all senders (e.g. M:N hash-partitioning) or for each sender (e.g. M:N sort-mergewhich expects pre-sorted frames).

For backward compatibility, Hyracks can execute Hadoop jobs.

Apr 28, 2011

Mesos and inspiration for next generation Hadoop

The notion of Next Generation Hadoop (NGH) is somewhat blurred at this point. Recent announcements by YHOO and Facebook could be construed as at least two independent branches referred to as NGH. So even now that YHOO is merging all its development to main Apache codeline I am not sure how many NGHs are being developed right now and how convergent that process is.

The only technical description of the NGH I am aware of was written by YHOO engineers. Even though that posts does not mention Mesos I am pretty sure it is not coincidental that the NGH shares so much with it. It is also noteworthy that Mesos itself is now an Apache project.

A summary of "Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center":

Goal: when running multiple frameworks in the same cluster
  • improve utilization through statistical multiplexing
  • share datasets that are too expensive to replicate
Two-stage scheduling with resource offers
  • Mesos decides how many resources to offer each framework
  • Frameworks decide which resources to accept and which computations to run on them
Architecture
  • A fault-tolerant Zookeeper-based master process manages slave daemons running on each cluster node
  • Slaves report to the master which vacant resources they have (e.g. {2CPUs,16GB})
  • A framework scheduler registers with the master
  • A framework scheduler is offered resources, decides which ones to use and describe tasks to launch on those resources
  • A framework executor is launched on slave nodes to execute tasks
  • Each resource offer is a list of free resources on multiple slaves
  • A pluggable master strategy decides how many resources to offer to teach framework
  • Supports long tasks by allowing to designate a set of resources on a slave for use by long tasks
  • Linux containers are used to isolate frameworks
Dominant Resource Fairness
  • Equalize each framework's fractional share of its dominant resource (i.e. the resource that it has the largest fractional share of)
  • Example: make F1's share of CPU equals F2's share of RAM if F1 is CPU-pound and F2 needs mostly memory
Optimizations
  • Filters registered by a framework with the master to short-circuit the rejection process (e.g. only from nodes from a given list or at least as many resources free)
  • For the purpose of allocation, count offered resources as used by the framework to encourage the framework to respond quickly 
  • Rescind an offer if the framework does not respond for too long

Apr 27, 2011

Quincy and scheduling in Dryad

A summary of "Quincy: Fair Scheduling for Distributed Computing Clusters"

Each job is managed by a root task that contains a state machine managing the workflow of that job. Actual work is done by worker tasks which may be executed multiple times and will always generate the same result.

There is a single centralized scheduling service responsible for a queue of jobs for the cluster. A root task sends a list of ready workers and their input data summaries to the scheduler. The scheduler chooses computers for tasks and makes the root task to start them. The root task is responsible for back-tracking through the dependency graph and resubmitting in case of a failure. If the scheduler decides to kill a worker task before it completes it will notify the root task.

A worker is not submitted to the scheduler until all of its input files have been written to the cluster. When a worker is ready its root task computes, for each computer, the amount of data that the worker would read across the network. The root then creates for it a list of preferred computers and a list of preferred racks.

Fairness: a job which takes T seconds when executed alone should take no more than N*T seconds when there are N concurrent jobs. There is a hard limit on the total number of jobs in the cluster and when it's reached new jobs are queued (and started later in order of submission time). Each computer runs only one task at a time and each job is allowed to run on a certain number of servers.

Old queue-based scheduling

Architecture
  • One queue for each server, one queue for each rack, one queue for the entire cluster
  • A new job is added to the queues of its preferred servers, preferred racks and the cluster-wide queue
  • When the job is scheduled it's removed from all the queues
  • When a job is started its root task is executed on a server not running another root task; if there's a worker task it is killed and resubmitted
  • Basic algorithm: when a server becomes idle, assign it a task from its server queue, its rack queue or the cluster queue.
  • Greedy fairness: block jobs that have more tasks running than min([cluster size/number of jobs], number of workers). When a job is blocked its waiting tasks will not be scheduled.
  • Fairness with preemption: starting with the most recently scheduled, kill tasks of jobs that have more than their quota of tasks.
New flow-based scheduling

Graph topology
  • Represent instantaneous scheduling as a min-cost flow network problem
  • Each task has one unit of flow as its supply
  • There are nodes in the graph for each root and worker task, an unscheduled node for each job, a node for each server, a rack aggregator node for each rack, and a cluster aggregator node. 
  • There is the sink node through which all flows drain from the graph. Only unscheduled and server nodes are connected to it
  • Each root task has a single edge to the server where it is running. 
  • Each worker task has an edge to its job's unscheduled node, to the cluster-aggregator node, and to every rack and server in its preferred lists.
  • Workers that are executing have an edge to the server on which they are running
Scheduling behavior and parameters
  • Computer/rack/cluster edge cost is a function of the amount of data that would be transferred across rack and core switches
  • Unscheduled edge represents the penalty for leaving a task unscheduled (increases over time)
  • When a task is started, an additional cost (increasing with time) is added to its edges to nodes other than the server it is running on.
  • Tradeoffs are controlled with only three parameters: the cost of waiting in the queue, the cost of transferring data across the core switch, the cost of transferring data across a rack switch
  • The scheduler updates the graph when a job-related event occurs and on a regular timer event (some costs are time-dependent)
  • When the graph is changed the scheduler computes a new min-cost flow and then starts or kills tasks as necessary
Restrictions:
  • Multidimensional capacities (e.g. CPU and memory) cannot be easily represented and so Mesos-style fine-grained resource allocation is infeasible
  • Correlated constraints (e.g. run two task in the same rack) are equally challenging

Apr 24, 2011

Starfish and Hadoop self-tuning

A summary of "Starfish: A Self-tuning System for Big Data Analytics".

Most of Hadoop features must be managed manually with multiple obscure parameters. As an example, Hadoop supports dynamic cluster membership changes but it has no support for deciding when to add/remove nodes or when to rebalance the data layout. Instead of aiming for peak performance, Starfish project wants to provide good Hadoop performance automatically.

Three levels of Hadoop workload optimization:
  • Individual MR jobs
  • MR jobs assembled into a workflow (e.g. generated from HiveQL or by a Cascading-style framework)
  • Collections of workflows
Job-level tuning
  • JIT optimizer (instead of manual configuration of 190 parameters) based on 
  • Profiler (dynamic instrumentation to learn performance models) and 
  • Sampler (statistics about input/intermediate/output key-value spaces of a job)
The Profiler creates a job profile for different phases of MR job
  • Timings view: where wall-clock time is spent in each phase
  • Data flow view: how much data is processed in each phase
  • Resource-level view: how many resources such as CPU and memory is used in each phase
Workflow-level tuning
  • Workflow-aware Scheduler ("global optimization")
  • WhatIf engine (uses performance models and a job profile to estimate a new profile for different configuration parameters)
  • Data Manager (rebalance HDFS data blocks using different block placement policies)
Workload-level tuning
  • Workload Optimizer to generate an equivalent, but optimized, collection of workflows using 
  • Data-flow sharing (reusing the same job on behalf of different workflows)
  • Materialization (caching intermediate data for later reuse, probably by other workflows; also helps avoid cascading reexecution)
  • Reorganization (automatically chosen alternative means of keeping intermediate data such as key-value and column stores)
Starfish is built on top of Hadoop. Its input is expressed in a new language called Lastword. The language is not supposed to be used directly by humans. Instead there are translators from HiveQL-style languages to submit a collection of MR workflows. Those workflows can be DAGs of MR jobs, select-project-join-aggregate logical specification or user-defined functions. Workflows can be annotated with metadata such as scheduling hints, data statistics and data layouts.

Starfish is an open-source project.

Apr 23, 2011

Delay Scheduling and Hadoop Fair Scheduler

I am blessed with a job which is in a domain sophisticated enough to encourage reading research papers. I am planning to post my notes on the most interesting ones at least as a means of keeping key notes readily available.

So without further ado, here goes a summary of "Delay scheduling: a simple technique for achieving locality and fairness in cluster scheduling":

The key trade-off is fairness (in allocation of resources) v data locality (executing a job on a node that already has input data for the job). The scheduler goal is sharing a cluster between multiple users with a mix of long batch jobs and short interactive queries over a common data set. Fair scheduling requires resource reallocation when the number of jobs changes.

The original Hadoop FIFO scheduler:

  • assign tasks in response to heartbeats sent by slaves which report the number of free map and reduce slots on the slave
  • scan through jobs in order of priority and submit time to find one with a task of the required type
  • for maps, after selecting a job greedily pick the map task in the job with data closest to the slave
Locality problems with naive fair sharing (assign free slots to the job that has the fewest running tasks):

  • Head-of-line scheduling: small jobs are likely to have their tasks sent to random nodes 
  • Sticky Slots: a tendency for a job to be assigned the same slot repeatedly (a task completes, its job has fewer tasks than the others, the slot is reassigned to the job again)
The Hadoop Fair Scheduler: 

  • divide resources using max-min fair sharing to achieve statistical multiplexing
  • place computations near their input data to maximize system throughput
A two-level scheduling hierarchy: 

  • allocate task slots across pools using weighted fair sharing
  • let each pool allocate its slots using either FIFO with priorities or a second level of fair sharing (each pool can be given a minimum share guaranteed to be given as long as the pool contains jobs)
Delay Scheduling

  • each job has two timeouts
  • a job with no tasks local to a node is not scheduled for the duration of the first timeout. 
  • the job is not scheduled for the duration of the second timeout if it does not have tasks local to the rack of the node.
Key enablers: 

  • most tasks are short compared to jobs
  • multiple locations in which a task can run to read a given data block (including multiple task slots per node)