Dec 11, 2017

Spark Data Source API V2 - write query planning and execution

Before you can read anything you need to have it written in the first place. The previously considered read path is a little simpler but certainly shares a lot with the write path we'll delve into today. Initially I am going to concentrate on the basic flow and ignore predicate push-down and column pruning. As my running example I assume a trivial Spark expression that uses some existing Dataset such as "df.write.format("someDSV2").option("path", hdfsDir).save()".

A data source is represented in a logical plan by WriteToDataSourceV2 instance created by data frame writer. During planning DataSourceV2Strategy transforms the relation into a WriteToDataSourceV2Exec instance. The latter uses the Spark context to execute the plan associated with the Dataset. The resultant RDD determines the number of input partitions and so the number of required write tasks. In addition, WriteToDataSourceV2Exec helps with keeping track of WriterCommitMessages used to make writing data source partitions transactional.

Query planning


In this phase the goal is to create a WriteToDataSourceV2 from a Dataset and client-supplied options and then transform it into a WriteToDataSourceV2Exec. The process is initiated by saving a Dataset. It triggers both logical and physical planning stages.


  • DataFrameWriter searches for an implementation of the DataSourceV2 marker interface. The implementation is expected to also implement the WriteSupport mixin interface that is the actual entry point into writing functionality
  • WriteSupport is a factory of DataSourceV2Writer instances
  • DataSourceV2Writer can create a DataWriterFactory and is ultimately responsible for committing or rolling back write partition tasks
  • WriteToDataSourceV2 is the logical plan node that represents a data source
  • A QueryExecution is associated with the WriteToDataSourceV2 to help with coordination of query planning
  • SparkPlanner uses DataSourceV2Strategy to transform logical plan nodes into physical ones
  • DataSourceV2Strategy recognizes data source logical nodes and transforms them into physical ones
  • WriteToDataSourceV2 carries a DataSourceV2Writer and logical plan associated with the Dataset; both are given to WriteToDataSourceV2Exec
  • WriteToDataSourceV2Exec is the physical plan node that represents a data source

Query execution - Driver side


In this phase the goal is to obtain data partitions from the RDD associated with the Dataset and trigger execution of a write task for each partition. In this section we look at the Driver-side half of the process.


  • WriteToDataSourceV2Exec creates a DataWriterFactory and a write task for each partition
  • The Spark context executes the tasks on worker nodes and collects WriteCommitMessages from successfully finished ones 
  • WriteToDataSourceV2Exec uses DataSourceV2Writer to commit or rollback the write transaction using collected WriteCommitMessages

Query execution - Executor side


The second half of the execution phase takes place on the Executor side. 


  • DataWriterFactory is serialized to worker nodes where DataWritingSparkTask are executed
  • Executor provides a DataWritingSparkTask with a Row iterator and a partition index
  • The task acquires a DataWriter from the DataWriterFactory and writes all partition Rows with the DataWriter
  • The task calls the DataWriter to commit writing and propagates a WriteCommitMessage to the driver

Dec 7, 2017

Spark Data Source API V2 - read query planning and execution

Without much marketing noise the Spark team decided to dramatically overhaul the current data connector API in v2.3 The new version was quietly merged into master recently. In comparison with the original approach the new one definitely looks like a real API. Even though nobody will be surprised by further evolution of the version one can see in master it already looks irresistibly promising. So I decided to spend some time digging into it. 

You probably remember the high-level architecture of the Spark query compilation process. It follows the traditional DBMS approach of parsing a query into a logical plan, optimizing the logical plan into a physical plan, and executing the physical plan on worker nodes. I am going to structure my investigation along similar lines.

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

The following discussion does not shy away from relevant Spark query execution internals. Those are always fascinating even when not directly actionable. Moreover, the Data Source API implementation is a nice vertical slice of the generic Spark infrastructure. So this is a convenient opportunity to refresh my memory. I will omit fine details of the Spark internals in an attempt to highlight the big picture.

As my running example I assume a trivial Spark expression such as "SparkSession.read.format("someDSV2").load.count()".

Query plans and Data Sources in a 100 words or less


Logical and physical plan nodes are subtypes of the QueryPlan. Spark Planner is responsible for transforming a logical plan tree into a physical one. The planner relies on a configurable set of strategies for actual transformation rules.

A data source is represented in a logical plan by DataSourceV2Relation instance created by data frame reader. During planning DataSourceV2Strategy transforms the relation into a DataSourceV2ScanExec instance. The latter creates a DataSourceRDD instance that is used to actually compute the results in parallel from multiple partitions.

Logical query planning


In this phase the goal is to create a DataSourceV2Relation from client-supplied options. The process is initiated by loading a Dataset.

  • DataFrameReader searches for an implementation of the DataSourceV2 marker interface. The implementation is expected to also implement one of ReadSupport mixin interfaces that allow the engine to know optional capabilities of the data source
  • ReadSupport is a factory of DataSourceV2Reader instances
  • DataSourceV2Reader knows the data source schema and can create ReadTasks
  • DataSourceV2Relation is the logical plan node that represents a data source
  • Dataset owns a QueryExecution and is associated with the DataSourceV2Relation

Physical query planning


In this phase the goal is to transform a DataSourceV2Relation into a DataSourceV2ScanExec. The process is initiated by executing an action on the Dataset created by the previous phase.



  • Dataset owns a QueryExecution
  • QueryExecution coordinates query planning
  • SparkPlanner uses DataSourceV2Strategy to transform logical plan nodes into physical ones
  • DataSourceV2Strategy recognizes data source logical nodes and transforms them into physical ones
  • DataSourceV2Relation carries a DataSourceV2Reader that is given to DataSourceV2ScanExec
  • DataSourceV2ScanExec is the physical plan node that represents a data source

Query execution


In this phase the goal is to obtain data partitions from the RDD created by  DataSourceV2ScanExec and collect data Rows from all the partitions.

  • DataSourceV2ScanExec creates ReadTasks using its DataSourceV2Reader and converts the tasks into a DataSourceRDD
  • DataSourceRDD wraps each task into a DataSourceRDDPartition that knows its id
  • ReadTask represents a single data partition. A ReadTask is serialized and sent to a Spark worker.
  • After DataSourceRDD execution is triggered, on executor side the DataSourceRDD obtains a DataReader from the ReadTask
  • DataReader is an iterator of Rows from one data partition