From this blog post on, I am going to start writing about Spark SQL Catalyst. Catalyst is the core of Spark SQL and there are many topics to cover. I don’t have a formal writing plan on this, but instead, I want to keep my blog to be casual, informal, and more importantly stress-free to myself. Therefore, there will not be a formal plan and organisation of the blog posts in this Catalyst series.
However, I think I do need to give a global view on how Catalyst works instead of just jumping into the details of a specific Catalyst component. Therefore, in this blog post, I am going to write about the QueryExecution class in Spark SQL, which wraps the primary workflow for executing relational queries with Catalyst.
QueryExecution encapsulates the methods for executing each of the primary stages involved in a query execution flow. (*Those methods are lazy methods and is executed only when an action of the dataset to query is triggered)
In the rest of the blog post, I am going to talk about the QueryExecution class and give a high-level view on each stage involved in the query execution. I will dig into the technical details of those stages in the following blog posts.
In addition, I will also use a query example to explain the query execution flow, from a text sql statement to the parsed, analyzed, optimized, up to the physical plan. This example query is for querying data from a Databricks sample dataset, which joins the ‘sales’ table to the ‘items’ table and selects the items with ‘i_price’ column value smaller than 10.
The internal data structure of the query plans in Catalyst is tree that is transformed by rules through each stage. The input SQL statement in text format needs first to be parsed into a tree type. When the sql method of spark session is called, the SQL parser first parses the sql statement into a ANTLR ParseTree and then convert the ANTLR ParseTree into an unresolved logical plan. The unresolved logical plan is then passed into the ofRows method of a Dataset to create a QueryExecution instance.
For our sql example, here is the unresolved logical plan parsed from our sql statement. As you can see, both ‘sales’ table and ‘items’ table are unresolved. The column name is not validated and there is no type information on columns.
Spark SQL Catalyst provides a logical query plan analyzer, which retrieves tables and attributes information from SessionCatalog and translates UnresolvedAttributes and UnresolvedRelations into fully typed objects. The SessionCatalog is a proxy to the underlying metastore, such as Hive Metastore. Metastore is an important topic, and I will dedicate one blog post only to cover Hive Metastore. For now, you can think a metastore is where you could look up the full type information of tables and columns.
Based on the paper, “Spark SQL: Relational Data Processing in Spark“, published by the Databricks guys, the logical query plan analyzer conducts the following actions during the analysis stage:
- Looking up relations in SessionCatalog by name
- Mapping the named attributes, such as column name
- Determining attributes with same value and give unique id
- Propagating and coercing types through expressions
Back to our example, after the unresolved logical plan is analysed, we get the following analyzed logical plan. As you can see, the logical plan now knows the full name and file format of a relation. The data types of columns are recognised, and a unique id has been associated with each column.
After the logical plan is analysed, the logical plan optimizer applies standard rule-based optimisation to the logical plan. Catalyst ships with around 70 pre-built logical plan optimisation rules, each of them transforming the query plan of a segment of the structured query to an optimised logical plan. The rules are grouped and executed in rule batches. I plan to take one or a couple of blog posts to cover all the rules and rule batches.
In our example, the PushDownPredicate rule is applied to the analysed logical plan and move the filter from after join action to before the join action and after data load. This optimisation is expected to reduce the size of datasets to join and ensure the predicate will be pushed down to the data source at the physical planning stage.
The logical plan is platform-independent that cannot be interpreted and executed by Spark. The logical plan needs to be converted to a SparkPlan by SpakerPlanner. QueryExecution provides a createSparkPlan method that takes the optimised logical plan as input and calls the plan method of SparkPlanner, which applies node-matched strategies to return a list of candidate physical plans.
A cost-based modal is expected to analyse the cost of each candidate physical plan and choose the best one. For the Spark version 3.0.0, the cost-based model was still not available yet, and the temporary solution is being used that takes the first plan returned by the SparkPlanner
Back to our example, the logical operators in the optimised logical plan are converted to their physical counterparts. The relation is transformed by the DataSoruce strategy to FileScan node, which has the filter predicates pushed down to the data source for only reading the data required by the query. The Join Inner logical operator has been converted to SortMergeJoin by the Join strategy (we need to encourage the Join strategy to choose a more efficient join algorithm, such as broadcast. I will go into the details of the Join strategy and optimisation methods in the future blog posts).
After the SparkPlanner converts the logical plan to the physical plan, a list of preparation rules need to be executed to prepare the physical plan for execution.
For example, the EnsureRequirements rule ensures all the requirements of a physical operator are met (if no, and required actions to meet the requirements). In our example, the SortMergeJoin operator is planned for the join operation in our query. The SortMergeJoin operator requires a reshuffle and sort before its execution. The EnsureRequirements rule will add those required actions in.
Apart from ensuring requirements, the preparation rules will also ensure that subqueries are planned, the data partitioning and ordering are used correctly, exchanges and subqueries are reused if applied. In addition, the WholeStageCodeGen is inserted at this stage.