Tag: Databricks / Spark

Parquet for Spark Deep Dive (3) – Parquet Encoding

Parquet for Spark Deep Dive (3) – Parquet Encoding

As promised in the last blog post, I am going to dedicate a whole blog post to explore Parquet encoding, focusing on finding answers to the following questions:

  • Why does Parquet using encoding?
  • What encoding algorithms are used in Parquet?
  • How does Parquet implement encoding?
  • How does Parquet choose encoding algorithms?
Why does Parquet using encoding?

Short answer: Parquet encoding makes the data size smaller, therefore the IO volume is smaller and the memory usage is smaller.

Despite extra CPU costs being required for decoding the Parquet data, it is still a good deal to trade off CPU costs for IO costs. For modern computers, it is much cheaper to increase computing power compared to increasing the IO performance. Especially for big data workload, IO is the bottleneck of the data query performance.

Memory is cheap, very cheap actually. The costs of memory to concern is not on the memory size but instead on the memory scan costs. A smaller data size reduces the memory footprint of the data required for your query so that reduces the RAM to scan.

In another perspective, encoding is more efficient when it is applied to a columnar storage format such as Parquet that the data with the same date type is stored together.

What encoding algorithms are used in Parquet?

Here I am going to use a simple dataset as an example to explain the encoding algorithms.

Dictionary Encoding

Dictionary encoding builds a dictionary of all the distinct values of a column and then replaces the column values with the indexes of the distinct values in the dictionary. Like the example shown below, the “Product” column holds the names of the products. With Dictionary encoding, all the distinct product names are added into the “Product” dictionary, each of which has a unique index referring to it. The column now holds the index of the product name in the dictionary instead of the actual product name.

As you might find that the performance of Dictionary encoding depends on the cardinality of the column. The lower the cardinality, the compression ratio of the column is higher, and the size of the encoded data is smaller. Therefore, you could normally expect the dictionary encoding of a “Gender” column to achieve a higher compression ratio than the dictionary encoding of a “Postcode” column.

You could also expect a similar query performance of the different data types once they have been encoded with dictionary encoding, no matter the string, integer or date data type. The size of the column values (which are now index values) is the same. The only difference is with the size of the dictionary. As the cardinality of the column is same and so the unique values in the dictionary are the same, the workloads of scanning the column would be similar for different data types.

Run Length Encoding (RLE) 

Run Length Encoding (RLE) replaces repeating values in a column with the number of contiguous rows with the same value. In our example, the “Quantity” column hosts the repeated data. Instead of encoding the values back to back, RLE counts the same value that appears consecutively and stores the number of repetitions and the value.

The performance of RLE depends on the repetition pattern of a column. The more rows with the same value stored consecutively, the higher the compression ratio the RLE can achieve. On the other hand, RLE could perform pretty badly with the frequently changing values of a column. The encoded column could be even bigger than the unencoded column. Therefore, the sorting of a column makes a very big difference in the compression ratio. In addition, the cardinality of the column also plays a big role in the RLE efficiency. One extreme example is the column with a unique constrain where there is no repeated data at all.

RLE encoding could also be used as a supplement of the Dictionary encoding. The Dictionary encoding converts column values to the integer type of indexes and the RLE encoding could further compress the column if the original column shows a repetition pattern.

Delta Encoding

Delta encoding stores the differences (deltas) between sequential data instead of the whole form of data. The idea behind this approach is that the information held by the delta data along the start point data would be enough to represent the whole form of data in the column.

Delta encoding performs better when the consecutive column values have small or constant variations. One good example is the time-series data column. DateTime data type is normally stored in milliseconds internally which occupied 64 bits for each column value. The deltas could occupy much fewer bits for small variations and the large constant variations can be further processed to only store the differences between deltas.

How does Parquet implement encoding?

As discussed in the previous blog post, the implementations of the ValuesWriter abstract class conducts encoding and value writing to the column store. The implementations of the encoding in Parquet need to inherit from the ValuesWriter.

Dictionary Encoding

DictionaryValuesWriter (inherited from the ValuesWriter class) and its subclasses for each Parquet primitive data type are the Dictionary encoding implementation in Parquet.

Dictionary encoding supports all the primitive data types in Parquet. For each primitive data type, Parquet creates a dictionary values writer subclass. Parquet is able to choose the corresponding dictionary values writer against the current column data type.

Let’s look into one of the dictionary values writer subclasses, such as PlainBinaryDictionaryValuesWriter, to see how Parquet conducts dictionary encoding.

Firstly, a hash map (fastutil) is created to be the dictionary: the key of the hash map stores a unique column value, and the value of the hash map stores the index of the column value in the dictionary.

In the value write method, the dictionary is first looked up to check whether or not the column value to write already exists. If the column value does not exist yet, it will be added to the hash map first. If the column value already exists in the dictionary, the index of this column value in the dictionary will be added into the encoded values list which will be flushed into the data page. The hash map for the dictionary will be flushed into the dictionary page.

When the number or the size of the unique column values exceeds the allowed thresholds, the values writer will fall back to a predefined fallback encoding method (which is configured in the ValuesWriter factory).

Run Length Encoding (RLE)

Parquet implements the RLE encoding in the RunLenthBitPackingHybridEncoder, which is used by the RunLengthBitPackingHybridValuesWriter for encoding and writing column values. This encoder only supports only Dictionary indices, Boolean values and repetition and definition levels in the data pages.

The RunLenthBitPackingHybridEncoder combines both RLE encoding and bit packing. When Parquet writes a column value using the RunLengthBitPackingHybridValuesWriter, the repeated value will be cached until a different value occurs. The repetitions of the repeated value will be counted. If the value is repeated more than 8 times, RLE encoding will be used with the following grammar.

rle-run := <rle-header> <repeated-value>
rle-header := varint-encode((rle-run-len) << 1)
repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width)
*varint-encode is ULEB0128 coding

If the condition for using RLE is not satisfied and the unprocessed column values reach 8, the bit packing is applied with the following grammar:

bit-packed-run := <bit-packed-header> <bit-packed-values>
bit-packed-header := varint-encode(<bit-pack-scaled-run-len> << 1 | 1)
bit-pack-scaled-run-len := (bit-packed-run-len) / 8

Delta

There are a number of implementations of Delta encoding in Parquet, including the DeltaBinaryPackingValuesWriter for Int32 and Int64, the DeltaLengthByteArrayValuesWriter and the DeltaByteArrayWriter. The ideas behind the implementations are very similar so I will only cover the DeltaBinaryPackingValuesWriter implementation in this blog post as an example.

When Parquet writes a column value using the DeltaBinaryPackingValuesWriter, the delta between this value and the previous value will be calculated. At the same time, the first and the minimum of the added values in this block batch will be cached to support the next step of the encoding.

When the current block batch reaches the block size threshold, the deltas will be converted to be the difference to min delta. This will further compress the data especially when the deltas have close values. Finally, the converted deltas will be encoded into binary. You could find the header and block definition here in the Parquet official doc.

How does Parquet choose encoding algorithms?

Parquet associates a values writer, implemented with a specific encoding algorithm, to one primitive data type. When column writer writes a column value to the column store, one of the value writer implementations will be created by the values writer factory’s newValuesWriter method.

Default ValuesWriter Factory

A default values writer factory, DefaultValuesWriterFactory, which handles ValuesWriter creation based on the types of the columns and the writer version, is built in the Parquet parcakge and used as default. This default values writer factory contains two implementations, one for version 1 Parquet writer and one for version 2 Parquet writer.

In each of the implementation, the newValuesWriter method defines the ValuesWriter for each primitive data type. A fallback ValuesWriter is also defined for each primitive data type. The fallback ValuesWriter will be applied when the conditions for using the main ValuesWriter are not met, such as there are too many unique values in a column to use dictionary ValuesWriter.

Custom ValuesWriter Factory

When the default values writer factory cannot meet the requirements and you want to implement your own encoding selection strategy, you could implement the ValuesWriterFactory interface and override the newValuesWriter method to define the ValuesWriter for each primitive data type.

Once you have created the customer values writer factory, you could put it on use by setting it through the withValuesWriterFactory method in the ParquetProperties Builder.

Parquet for Spark Deep Dive (2) – Parquet Write Internal

Parquet for Spark Deep Dive (2) – Parquet Write Internal

This blog post continues the Delta Lake table write joruney into the parquet file write internal.

As described in the last blog post, a ParquetOutputWriter instance is created and call the Parquet API for writing a partitoin of the Spark SQL dataframe into Parquet file. From this point on, the Delta table write journey steps into the Parquet terrortory.

To understand this blog post, some fundemental knowledge about Parquet format is required, such as Row Group, Column Chunk, and Page. You could find tons of articles covering those topics from Google, so I wound not repeat those in this blog post. If need, please search for them by yourself.

Parquet Write Initiation

The ParquetOutputWriter object executed within a Spark executor task creates a Parquet ParquetOutputFormat object and called the getRecordWriter method.

The getRecordWriter method mainly initialise the parquet file write context, including the creation and initialisation of the ParquetWriteSupport object, the Spark SQL implementation of Parquet WriteSupport that writes Spark SQL Catalyst InternalRow as Parquet messages, the ParquetFileWriter object that encapsulates the file-level operations, the ParquetRecordWriter that is the entry to record writing operations, and the MemoryMananger that keeps a global context of the memory usages of parquet writers.

The start method of the ParquetFileWriter object is also triggered at this moment that writes the Magic Number to file output stream to mark the start of the Parquet file.

At the time when the ParquetRecordWriter object is created, an InternalParquetRecordWriter object is created and the initStore method is called that initialises a new column store and a new page store.

Parquet Record Writer

Despite the data would be ultimately stored in Parquet column by column, the Spark SQL dataframe data is written into Parquet row by row.

The ParquetRecordWriter object created in the getRecordWriter method above offers a write method that is called to write a single record into Parquet. In each SQL executor task, an Iterator[InternalRow] of the current partition of dataframe data is looped through and the ParquetRecordWriter write method is called for each InternalRow one by one.

The ParquetRecordWriter write method then calls the ÏnternalParquetRecordWriter write method that triggers the ParquetWriteSupport write method, which is defined in Spark SQL framework for converting Spark SQL dataframe InternalRow to the message format consumable by Parquet API.

The ParquetWriteSupport write method splits a InternalRow into fields and write the fields from the row one by one.

This image has an empty alt attribute; its file name is image-29.png

On the Parquet side, the Parquet record consumer, which hosts a column writer for each column of the input dataframe, calls the column writer of the current filed, and trigger the corresponding ValuesWriter write method to encode and write the value into the column store. ValuesWriter is the base class to implement an encoding for a given column. Here is a list of encoding implementation supported by Parquet. As encoding is an important topic with columnar storage, I am going to dedicate a separate blog post to dive into the Parquet encoding implementations.

The column writer also writes the repetition level and definition level of a field. Because Delta table mainly stores flatten schema at this moment, the implementation of nested schema in Parquet is out of scope of this blog post.

After the pre-defined minimum number of rows have added into the column store, the column store starts to check whether or not the size of the added (and encoded) column values (for a column) is reaching the predefined page size threshold.

If it reaches the page size threshold, the ColumnWriter of the current column in the column store will write the page into page store, which internally is an array list of byte[] (List<byte[]>). Each page contains a page header that includes the metadata of the page, such as page size (compressed and uncompressed), value count, null value count, and min/max values.

Parquet compresses data at the page level. If a compress format such as snappy is specified, a page will be compressed before written into the page store.

Both column store and page store cache data in memory. The actual data write to disk happens at row group level. When the data size exceeds the specified row group size, the Parquet first flushes the current data in the column store into page store for each column, and then write the data in the page store, column by column, into output stream.

After that, the Parquet dismisses the current column store and page store and reinitialise new stores, and continue to read the remaining dataframe rows and repeat the process.

FileMetaData Write

Apart from the PageHeader metadata stored in each page, the parquet file also encapsulates the file schema and RowGroup metadata for each row group. They are appended to the end of the Parquet file. I am going to dive into the details and usage of those metadata when I cover the Parquet file reading part.

Parquet for Spark Deep Dive (1) – Table Writing Journey Overview

In this blog post, I am going to explore the full delta table writing journey, from Spark SQL Dataframe to the underneath Parquet files.

The diagram above shows the whole journey of a table writing operation (some steps have been simplified or abstracted in order to make the whole table writing journey presentable in the diagram).

df.write.format("delta").save(path)

When a table writing operation is started, a new instance of the DataFrameWriter class is created. The DataFrameWriter class encapsulates the configurations and methods related to a table writing operation, such as partitioning, bucketing, and saving.

DataFrameWriter offers an interface for developers to specify the target file format. In the case of Delta Lake tables, the format needs to be set as “delta”. On a side note, Spark SQL implements data source integration in a loose-coupled, “plug-in” style. Taking the Delta Lake library as an example, the DeltaDataSource implements the DataSourceRegister trait so that it can register the alias “delta” as the format type. After the full name of the DeltaDataSource class is specified in the DataSoruceRegister meta file in the delta-core package, the DeltaDataSource can be “plug-in” the Spark SQL framework when the delta-core package is loaded.

At the time when this blog post is being written, Delta Lake supports V1 version of Spark SQL datasource. The save method of the DataFrameWrite class redirects the execution flow to the saveToV1Source method which calls the planForWriting method in the DataSource class. The SaveIntoDataSourceCommand is then called to run that eventually triggers the createRelation method in the DeltaDataSource class.

An instance of WriteIntoDelta class is created within the createRelation method. The WriteIntoDelta class is a RunnableCommand class that creates and executes a dataframe writingaction to write data file and delta metadata within an OptimisationTransaction. I will elaborate on the delta metadata writing process in a separate blog post after I cover the Delta Lake Transaction Logs. The blog post focuses on the data file writing journey.

The writing journey proceeds to execute the write method of the FileFormatWriter object that is a helper object for writing FileFomat data out to a storage location.

The write method issues a write job consisting of one or more executor tasks, each of which writes all rows within an RDD partition. The number of the executor tasks depends on the number of partitions within the dataframe to write. The tasks are executed in parallel (depending on the number of executors). For each task execution, if no exception is thrown, commits the tasks, otherwise aborts the tasks. If all tasks are committed successfully, commit the job, otherwise aborts the job. If any exception is thrown during the job commitment, the job will be aborted. Post-commit operations, such as processing statistics, will be performed if a job is successfully committed.

In each task, an instance of ParquetOutputWriter is created that bridges the Spark SQL API and Parquet API.

The ParquetOutputWriter instance initialises the ParquetOutputFormat object which is an instance of the ParquetOutputFormat class from org.apache.parquet.hadoop package. The parquet RecordWriter for writing data into a parquet file can be accessed from the ParquetOutputFormat object.

Now the writing journey steps into the Parquet territory.

To understand Delta Lake data storage, it is very helpful to understand the parquet storage model and data writing and reading internals that covers many topics such as columnar storage format, predicate pushdown, vectorised decoding etc. I plan to cover those in the next few of blog posts.

Databricks Lakehouse Breaks Data Warehousing Performance Record – Time to Forget about Data Warehouse?

Lakehouse is faster than Data Warehouse! This is a big deal! Now, what could stop Lakehouse from replacing Data Warehouse? This is the news I have been waiting for since Databricks SQL, the full suite of data warehousing capabilities, was announced last year.

In the 100TB TPC-DS performance benchmark test for data warehousing, Databricks SQL outperformed the previous record by 2.2x. In a separate test conducted by Barcelona Supercomputing Center, Databricks SQL demonstrates 2.7x faster and 12x cheaper than snowflake on the same data warehousing workloads.

Now, what could stop Lakehouse from replacing Data Warehouse?

Functionality-Wise

Databricks SQL, powered by Delta Lake, offers the full suite of data warehousing capabilities such as ACID transactions, fine-grained data security, scalable metadata handling, first-class SQL support, and BI reporting. In addition, new capabilities are being continuously added at a fast pace, such as the latest Low Shuffle Merge feature and SQL custom function feature. This is impressive to see those data warehousing capabilities on top of data lakes. Just a few years ago, we had to write rather awkward code as a workaround for the lack of merge capability when updating data in the data lake.

Regarding the ‘Lake’ side workloads, there is no doubt that Lakehouse outperformed the data warehouses (of course, that is why it is called “lakehouse”) and offers the capabilities that cannot be achieved by the data warehouses, such as native supports of unstructured or semi-structured data and machine learning type of workloads.

Capability-Wise

The born of the data lake is driven by the difficulties that businesses face to handle data at greater volume, variety and speed with a classic data warehouse. Lakehouse is capable of storing and processing very large volumes of data. It is impossible or is too expensive to handle that scale for a classic data warehouse. Lakehouse natively supports unstructured and semistructured data. Lakehouse natively supports streaming data.

Flexibility-Wise

I personally consider Lakehouse being more flexible than the classic data warehouse. Databricks Lakehouse originated from open-source initiatives and adopts an open architecture instead of building into a closed black-box as most of the classic data warehouses. It offers data engineers more options and flexibility to integrate into or extend their lakehouse.

Reliability-Wise

The data warehouse has been around for more than 30 years. It is no doubt that the data warehouse is normally more reliable and robust than Lakehouse under most of the conditions. However, the situation is changing at a fast pace.

Cost-Wise

Databricks Lakehouse is not cheap, especially when you need to pay for both the Databricks Units and the VMs provisioned for supporting it. However, for similar data warehousing workloads and data volumes, Lakehouse has the advantages: low-cost cloud-based storage, elastic and pay as you go computing powers, and the latest serverless SQL feature (Databricks claims a 40% cost saving).

Performance-Wise

Now, get to the decisive factor, the performance, more specifically the interactive query performance. Manageability/Reliability and interactive query performance are two of the biggest hurdles for Lakehouse to be competent for data warehousing workloads. With the rapid advance of Delta Lake capabilities, the hurdle of manageability/reliability is not that formidable.

Databricks and its open-source cousin, Apache Spark, were originally designed for offline processing of big data workloads. In the other words, this is a design favouring high throughput over low latency and people don’t have that high expectation for their performance with interactive queries. However, with the complete rewrite of its processing engine and performance optimisation techniques (caching, cost-based query optimizer, data skipping, data compaction, and so on), Databricks Lakehouse gets to the same performance level as data warehouse.

How about Snowflake, the Most Promising Data Warehouse?

Yes, Snowflake started as a data warehousing company, however, it has been adding more and more data lake features. Even though it takes the opposite route of Databricks, which started as a big data company but has been adding more and more data warehousing features, they are becoming more and more alike. Eventually, a new name would be given to them. It might be “Lakehouse” or not (at the end of the day, they might be called back as “Data Warehouse 2.0” or something else), however, we could expect the new “thing” is capable to make the “big” data (big volume, variety of data structures, and high-velocity) not “big” in future.

dqops – Query Databricks Database Schema through SQL Connector for Python

dqops Data Quality Studio (DQS) is one of my R&D projects I have been doing during my spare time. I plan to note down some tips & tricks I use in this project in the future blog posts from time to time.

Databricks is one of the main data services that the dqops DQS is going to support. This blog post notes down the approach for querying Databricks database schemas using the “Databricks SQL Connector for Python” library.

The “Databricks SQL Connector for Python” library supports to query Databricks databases through Spark Thrift Server (STS) that allows JDBC/ODBC clients to execute SQL queries over JDBC or ODBC protocols on Apache Spark. A similar approach has been used by dqops DQ studio to connect to on-premises Spark cluster with jaydebeapi library. However, the “Databricks SQL Connector for Python” library simplifies the setup and does not need the installation of JDBC driver on the client environment.

Setup

The setup of the “Databricks SQL Connector for Python” library is simple. You just need to include the python library ref “databricks-sql-connector==0.9.0” in the requirements.txt on the client environment from where your code needs to query the Databricks databases. Alternatively, you can manually install the library using “pip install databricks-sql-connector

Databricks Connection

The “Databricks SQL Connector for Python” library uses the same connections interface as the other JDBC/ODBC python libraries, such as pyodbc and jaydebeapi. That makes it very easy to build the Databricks connector on top of the existing dqops data connection framework that I have built, based on the open-source sodasql Dialect module, to support different types of data repositories, such as Azure Database, Databricks Azure Synapse, and Snowflakes.

To create a Databricks connection, we just need to import “sql” module and run the “connect” function (line 11-15 in the following figure).

The “connect” function requires the hostname and http_path of the thrift server deployed in the target Databricks cluster and also the Databricks access token. The hostname and http_path can be found in the “Advanced Options” section of the cluster setting page.

Databricks access token can be set in the “User Settings” page of the Databricks workspace.

As dqops DQS needs to allow end-users to create and test connection based on a specific data source type, a UI component has been created that generates the data connection form based on predefined connection type metadata.

This image has an empty alt attribute; its file name is image-19.pngThis image has an empty alt attribute; its file name is image-12.png
Query Metadata

After the connection instance is created, we can run sql commends against the Databricks databases using the “execute_sql” function with the sql statement passing in as the parameter.

Spark provides a set of SQL commands to read database/table/column metadata. In dqops DQS, the “SHOW TABLES” command is used to read the list of tables in a specified database, and the “DESCRIBE TABLE” commend is used to read column name and type in a data table.

More Spark sql commends related to metadata can be found here.

The snapshot below shows the metadata UI in the dqops DQS that is built using the Databricks SQL Connector for Python.

Setup a Dockerised Spark Development Environment with VS code and Docker

Setup a Dockerised Spark Development Environment with VS code and Docker

Databricks is not cheap, especially when I need to use it for my personal R&D work (where unfortunately money has to be taken from my own pocket). Therefore, I have been developing in a dockerised Spark environment since a while ago and I found this way actually works well. Here I list the steps to set the environment up with VS code and Docker.

1. Setup VS code remote development in Docker containers

Please refer to this guide for setting up VS code remote development in Docker containers. In addition, the docker-compose needs to be installed as well.

2. Setup base Spark cluster docker images

There are a number of open-source Spark cluster docker images available online that can be used as the base for our dev environment. After the hand-on tests, I found this one from André Perez suits best that consists of a Spark base image, Spark master image, Spark worker image, and a JupyterLab image. A simulated Hadoop distributed file system is also configured as the storage.

cluster-architecture.png

Apart from the Dockerfile image files, there are also a build.sh and a build.yml for configuring and building the images. In addition, a docker-compose file is provided to run up the cluster.

3. Customise the Spark cluster

We can then customise the Spark cluster to suit the requirements of our dev environment. To give an example, here is a list of customisations I made for my dev environment:

Add additional components/services required for the development, such as Microsoft SQL Server (docker image), Hadoop (docker image), OpenLDAP (docker image). You can add the build commends of those additional images to the build.sh and build.yml scripts to simplify the image build process. In addition, you need to add entries (corresponding to each image) in the docker-compose file.

Customise the JupyterLab container as the primary dev container. The Dockerfile of JupyterLab image already contains the installations of the necessary libraries for Spark development. You can install the additional libraries required for your development by extending this Dcokerfile. For example, I have added pyodbc to connect to mssql, SparkJDBC jar to connect to Spark Thrift Server, and the requirements.txt file containing all the dependency Python libraries I need.

COPY docker/jupyterlab/requirements.txt /opt/req/requirements.txt
COPY docker/jupyterlab/jdbc/SparkJDBC42.jar /opt/req/SparkJDBC42.jar

RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
    curl https://packages.microsoft.com/config/debian/9/prod.list > /etc/apt/sources.list.d/mssql-release.list && \
    apt-get update && \
    ACCEPT_EULA=Y apt-get -y install msodbcsql17 && \
    apt-get -y install unixodbc-dev && \
    pip3 install pyodbc && \
    pip3 install -r /opt/req/requirements.txt

ENV CLASSPATH /opt/jdbc/drivers/SparkJDBC42.jar
4. (Optional) Push images to Azure Container Registry

Please refer to this doc.

5. Build images

Extend the build.sh and build.yml to include the entries for the additional images you have added, and then run build.sh shell file.

6. Mapping source file folder to the working folder in dev container

Map your source file folder (in your local machine) to a folder in the dev Docker container (extended from the original jupyterlab container) by configuring “volumes” of the dev container entry in your docker-compose file.

7. Run docker-compose up to start containers
8. Attach VS code to the dev container

Start VS code window, open the “command palette”, and select “Remote-Containers: Attach to Running Container…”.

After the VS code window is attached to the dev container, install “Python Extension for Visual Studio Code” (Install inside the dev container, not the local host machine).

Open the working folder you mapped in step 6.

Then you have the dev environment in docker container where you can author, run, debug your code.

Here is the code for creating a SparkSession instance with the support of Delta Lake.