Dec 7, 2017

Spark Data Source API V2 - read query planning and execution

Without much marketing noise the Spark team decided to dramatically overhaul the current data connector API in v2.3 The new version was quietly merged into master recently. In comparison with the original approach the new one definitely looks like a real API. Even though nobody will be surprised by further evolution of the version one can see in master it already looks irresistibly promising. So I decided to spend some time digging into it. 

You probably remember the high-level architecture of the Spark query compilation process. It follows the traditional DBMS approach of parsing a query into a logical plan, optimizing the logical plan into a physical plan, and executing the physical plan on worker nodes. I am going to structure my investigation along similar lines.

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

The following discussion does not shy away from relevant Spark query execution internals. Those are always fascinating even when not directly actionable. Moreover, the Data Source API implementation is a nice vertical slice of the generic Spark infrastructure. So this is a convenient opportunity to refresh my memory. I will omit fine details of the Spark internals in an attempt to highlight the big picture.

As my running example I assume a trivial Spark expression such as "SparkSession.read.format("someDSV2").load.count()".

Query plans and Data Sources in a 100 words or less


Logical and physical plan nodes are subtypes of the QueryPlan. Spark Planner is responsible for transforming a logical plan tree into a physical one. The planner relies on a configurable set of strategies for actual transformation rules.

A data source is represented in a logical plan by DataSourceV2Relation instance created by data frame reader. During planning DataSourceV2Strategy transforms the relation into a DataSourceV2ScanExec instance. The latter creates a DataSourceRDD instance that is used to actually compute the results in parallel from multiple partitions.

Logical query planning


In this phase the goal is to create a DataSourceV2Relation from client-supplied options. The process is initiated by loading a Dataset.

  • DataFrameReader searches for an implementation of the DataSourceV2 marker interface. The implementation is expected to also implement one of ReadSupport mixin interfaces that allow the engine to know optional capabilities of the data source
  • ReadSupport is a factory of DataSourceV2Reader instances
  • DataSourceV2Reader knows the data source schema and can create ReadTasks
  • DataSourceV2Relation is the logical plan node that represents a data source
  • Dataset owns a QueryExecution and is associated with the DataSourceV2Relation

Physical query planning


In this phase the goal is to transform a DataSourceV2Relation into a DataSourceV2ScanExec. The process is initiated by executing an action on the Dataset created by the previous phase.



  • Dataset owns a QueryExecution
  • QueryExecution coordinates query planning
  • SparkPlanner uses DataSourceV2Strategy to transform logical plan nodes into physical ones
  • DataSourceV2Strategy recognizes data source logical nodes and transforms them into physical ones
  • DataSourceV2Relation carries a DataSourceV2Reader that is given to DataSourceV2ScanExec
  • DataSourceV2ScanExec is the physical plan node that represents a data source

Query execution


In this phase the goal is to obtain data partitions from the RDD created by  DataSourceV2ScanExec and collect data Rows from all the partitions.

  • DataSourceV2ScanExec creates ReadTasks using its DataSourceV2Reader and converts the tasks into a DataSourceRDD
  • DataSourceRDD wraps each task into a DataSourceRDDPartition that knows its id
  • ReadTask represents a single data partition. A ReadTask is serialized and sent to a Spark worker.
  • After DataSourceRDD execution is triggered, on executor side the DataSourceRDD obtains a DataReader from the ReadTask
  • DataReader is an iterator of Rows from one data partition  


No comments: