Category: Azure Data Platform

How Azure Storage Cheats Over the CAP Theorem

Microsoft claims Azure Storage providing both high availability and strong consistency. It sounds good but obviously violates the CAP theorem as the ‘P’ (network partitioning) is not avoidable in the real world. In theory, you can only achieve either high availability or strong consistency in a distributed storage system. I have done a bit of research and found out the way how Azure Storage walks around the CAP theorem.

The figure below shows the high-level architecture of Azure Storage (from the SOSP paper authored by Microsoft product team)

Azure Storage stores data in Storage Stamps. Each Storage Stamp physically consists of a cluster of N racks of storage nodes. Each rack is built as a separate fault domain. There are three functional layers in each Storage Stamps, Front-Ends, Partition Layer and Stream Layer. The Stream Layer is where the data is stored and it can be thought of as a distributed file system layer with “Streams” can be understood as ordered lists of storage chunks, namely Extents. Extents are the units of replication that are distributed in different nodes in a storage stamp to support fault tolerance. Similar to Hadoop, the default replication policy in Azure Storage is to keep three replicas for each extent.

For Azure Storage to successfully cheat over the CAP theorem to provide high availability and strong consistency at the same time under the condition of network partitioning, the stream layer needs to be able to immediately response requests from client and the reads from any replica of the extent needs to return the same data. In the SOSP paper, Microsoft describes their solution implemented in Azure Storage as layered design with stream layer responsible for high availability and partition layer responsible for strong consistency. However, I have to admit I was confused with the explanations presented in the SOSP paper.

Instead, I found the core idea behind Azure Storage implementation is rather simple, i.e. offering strong consistency at the cost of higher latency, in the other words, enforcing data replication on the critical path of writing requests. The figure below shows the journey of a write request at the stream layer.

In a nutshell, extents are append-only and immutable. For an extent, every append is replicated three times across the extent replicas in a synchronous mode. A write request can only marked as success when all of the writes to all replicas are successful. Compared to asynchronous replication that is often used to offer eventual consistency, the synchronous replication will no doubt cause higher write latency. However, the append-only data model design should be able to compensate that.

It is worth to mention that the synchronous replication is only supported within a storage stamp. The inter-stamp replication still takes asynchronous approach. This design decision is understandable as having geo-replication on the synchronous critical write path can cause much worse write latency.

dqops – Query Databricks Database Schema through SQL Connector for Python

dqops Data Quality Studio (DQS) is one of my R&D projects I have been doing during my spare time. I plan to note down some tips & tricks I use in this project in the future blog posts from time to time.

Databricks is one of the main data services that the dqops DQS is going to support. This blog post notes down the approach for querying Databricks database schemas using the “Databricks SQL Connector for Python” library.

The “Databricks SQL Connector for Python” library supports to query Databricks databases through Spark Thrift Server (STS) that allows JDBC/ODBC clients to execute SQL queries over JDBC or ODBC protocols on Apache Spark. A similar approach has been used by dqops DQ studio to connect to on-premises Spark cluster with jaydebeapi library. However, the “Databricks SQL Connector for Python” library simplifies the setup and does not need the installation of JDBC driver on the client environment.

Setup

The setup of the “Databricks SQL Connector for Python” library is simple. You just need to include the python library ref “databricks-sql-connector==0.9.0” in the requirements.txt on the client environment from where your code needs to query the Databricks databases. Alternatively, you can manually install the library using “pip install databricks-sql-connector

Databricks Connection

The “Databricks SQL Connector for Python” library uses the same connections interface as the other JDBC/ODBC python libraries, such as pyodbc and jaydebeapi. That makes it very easy to build the Databricks connector on top of the existing dqops data connection framework that I have built, based on the open-source sodasql Dialect module, to support different types of data repositories, such as Azure Database, Databricks Azure Synapse, and Snowflakes.

To create a Databricks connection, we just need to import “sql” module and run the “connect” function (line 11-15 in the following figure).

The “connect” function requires the hostname and http_path of the thrift server deployed in the target Databricks cluster and also the Databricks access token. The hostname and http_path can be found in the “Advanced Options” section of the cluster setting page.

Databricks access token can be set in the “User Settings” page of the Databricks workspace.

As dqops DQS needs to allow end-users to create and test connection based on a specific data source type, a UI component has been created that generates the data connection form based on predefined connection type metadata.

This image has an empty alt attribute; its file name is image-19.pngThis image has an empty alt attribute; its file name is image-12.png
Query Metadata

After the connection instance is created, we can run sql commends against the Databricks databases using the “execute_sql” function with the sql statement passing in as the parameter.

Spark provides a set of SQL commands to read database/table/column metadata. In dqops DQS, the “SHOW TABLES” command is used to read the list of tables in a specified database, and the “DESCRIBE TABLE” commend is used to read column name and type in a data table.

More Spark sql commends related to metadata can be found here.

The snapshot below shows the metadata UI in the dqops DQS that is built using the Databricks SQL Connector for Python.

Setup a Dockerised Spark Development Environment with VS code and Docker

Setup a Dockerised Spark Development Environment with VS code and Docker

Databricks is not cheap, especially when I need to use it for my personal R&D work (where unfortunately money has to be taken from my own pocket). Therefore, I have been developing in a dockerised Spark environment since a while ago and I found this way actually works well. Here I list the steps to set the environment up with VS code and Docker.

1. Setup VS code remote development in Docker containers

Please refer to this guide for setting up VS code remote development in Docker containers. In addition, the docker-compose needs to be installed as well.

2. Setup base Spark cluster docker images

There are a number of open-source Spark cluster docker images available online that can be used as the base for our dev environment. After the hand-on tests, I found this one from André Perez suits best that consists of a Spark base image, Spark master image, Spark worker image, and a JupyterLab image. A simulated Hadoop distributed file system is also configured as the storage.

cluster-architecture.png

Apart from the Dockerfile image files, there are also a build.sh and a build.yml for configuring and building the images. In addition, a docker-compose file is provided to run up the cluster.

3. Customise the Spark cluster

We can then customise the Spark cluster to suit the requirements of our dev environment. To give an example, here is a list of customisations I made for my dev environment:

Add additional components/services required for the development, such as Microsoft SQL Server (docker image), Hadoop (docker image), OpenLDAP (docker image). You can add the build commends of those additional images to the build.sh and build.yml scripts to simplify the image build process. In addition, you need to add entries (corresponding to each image) in the docker-compose file.

Customise the JupyterLab container as the primary dev container. The Dockerfile of JupyterLab image already contains the installations of the necessary libraries for Spark development. You can install the additional libraries required for your development by extending this Dcokerfile. For example, I have added pyodbc to connect to mssql, SparkJDBC jar to connect to Spark Thrift Server, and the requirements.txt file containing all the dependency Python libraries I need.

COPY docker/jupyterlab/requirements.txt /opt/req/requirements.txt
COPY docker/jupyterlab/jdbc/SparkJDBC42.jar /opt/req/SparkJDBC42.jar

RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
    curl https://packages.microsoft.com/config/debian/9/prod.list > /etc/apt/sources.list.d/mssql-release.list && \
    apt-get update && \
    ACCEPT_EULA=Y apt-get -y install msodbcsql17 && \
    apt-get -y install unixodbc-dev && \
    pip3 install pyodbc && \
    pip3 install -r /opt/req/requirements.txt

ENV CLASSPATH /opt/jdbc/drivers/SparkJDBC42.jar
4. (Optional) Push images to Azure Container Registry

Please refer to this doc.

5. Build images

Extend the build.sh and build.yml to include the entries for the additional images you have added, and then run build.sh shell file.

6. Mapping source file folder to the working folder in dev container

Map your source file folder (in your local machine) to a folder in the dev Docker container (extended from the original jupyterlab container) by configuring “volumes” of the dev container entry in your docker-compose file.

7. Run docker-compose up to start containers
8. Attach VS code to the dev container

Start VS code window, open the “command palette”, and select “Remote-Containers: Attach to Running Container…”.

After the VS code window is attached to the dev container, install “Python Extension for Visual Studio Code” (Install inside the dev container, not the local host machine).

Open the working folder you mapped in step 6.

Then you have the dev environment in docker container where you can author, run, debug your code.

Here is the code for creating a SparkSession instance with the support of Delta Lake.

Why I Prefer Hand-Coded Transformations over ADF Mapping Data Flow

Firstly, I need to clarify that what I am discussing in this blog post is only with ADF Mapping Data Flow instead of the whole ADF service. I am not going to challenge ADF’s role as the superb orchestration service in the Azure data ecosystem. In fact, I love ADF. At the control flow level, it is neat, easy to use, and can be very flexible if you can be a little bit creative and imaginative. To avoid potential confusion, I define the “Hand-Coded Transformations” and the “ADF Mapping Data Flow” in this blog post as:

  • Hand-Coded Transformations – Programming transformation logics in code (pyspark, SQL, or others) and orchestrating the execution of the code using ADF data transformation activities in control flow, such as Databricks activities, Stored Procedure activity, and custom activity.
  • ADF Mapping Data Flow – Programming transformation logics using ADF Mapping Data Flow visual editor.

I totally understand why people prefer using a GUI-based, ready-to-use, code-free ETL tool to manually crafting data wrangling/transforming code. Intuitively, this is a no-brainer choice for people who view it from 10,000 feet. However, the devil is in the details. Here is the list of reasons why I prefer hand-coded transformation to ADF Mapping Data Flow:

  • One of the main drives for using ADF Mapping Data Flow is possibly the simplicity and flat learning curves. This may be true for the simple, generic transformation logic. However, for the more complicated, domain-specific requirements, ADF Mapping Data Flow does not have to be simpler than writing code in SQL or Python. They either require you to create complex data flow logic or you have to embed custom code in the data flow eventually. On the other hand, the hand-coded approach can be much simpler due to the flexibility of custom coding and the huge number of ready-to-use data wrangling/transformation libraries.
  • You may argue that the ADF Mapping Data Flow is for people who don’t know coding. However, ADF does not play the role of self-service ETL tools in the Azure data ecosystem (Power Query does). The target users of ADF should be data engineers or DataOps instead of data analysts. I understand some junior data engineers may resisting coding and shy away even without a try. The ADF Mapping Data Flow gives them the opportunity to escape and actually deprives their motivations of learning coding.
  • We can implement some validation logics in an ADF Mapping Data Flow, but that would be very high-level and simple. That does not allow us to conduct a comprehensive unit test on the transformation logics.
  • One of the main reasons I prefer coding over visual editor is the reusability, including reusability of domain-specific business logics and reusability of domain-independent non-functional logics, such as auditing, QA and exception handling. As the ADF Mapping Data Flow is working in the declarative paradigm, it is difficult to abstract and generalise the logic implemented in ADF Mapping Data Flow. On the other hand, abstraction and generalisation can be easily implemented with custom coding.
  • Custom coding allows you to build configurable and metadata-driven pipelines. A configurable, metadata-driven design can significantly improve the maintainability of your ADF pipelines. One of my previous blog posts describes some of my early designs of metadata-driven ADF pipelines with custom coding in SQL. If you are interested, you can find the blog post here.
  • The last but also the most important reason for me to favour hand-coded approach is the control and flexibility I can have to withstand the uncertainty in my projects. Just like any other IT project, it is not realistic to predict and control all the possible situations in a data engineering project. There are always some kinds of surprises popping up or user requirement changes. A code-free tool like ADF Mapping Data Flow hides the implementation under the hood and requires you to work in their way. In case an unpredicted situation happens and the code-free tool cannot handle it, it is expensive to change the tool at a late stage of a project. Compared to a code-free tool, hand-coding give you more flexibility and resilience to handle unpredicted situations.

Create Custom Partitioner for Spark Dataframe

Spark dataframe provides the repartition function to partition the dataframe by a specified column and/or a specified number of partitions. However, for some use cases, the repartition function doesn’t work in the way as required. For example, in the previous blog post, Handling Embarrassing Parallel Workload with PySpark Pandas UDF, we want to repartition the traveller dataframe so that the travellers from a travel group are placed into a same partition. As it takes a decent amount of time to run the group travel planning algorithm on each travel group,  we want the travel groups are evenly spread into different partitions so that the tasks running in parallel will take similar amount of time.

However, when running the repartition function on the traveller dataframe, the travel groups are not evenly distributed.

spark_04_02

spark_04_04

After printed the content of each partition, we can see that the travellers from same travel group are indeed placed into the same partition, however, some partitions are allocated with more travel groups while some other partitions are empty with no travel group allocated.

spark_04_05

The reason for that is due to the default partitioner, HashPartitioner, used by the dataframe. The HashPartitioner decides the partition through calculating the mod of the HashCode of the partition key (e.g., “GroupID” in our example) and the number of the partitions. Depending on the hashcode of the travel group ids, multiple travel groups may end up with same mod value.

spark_04_10

To enforce an evenly distributed partitions, we can create a custom partitioner to return an unique integer value for each travel group. For example, as the following snapshot shown, we can first define a dictionary to map the id of a travel group to the index of the travel group in a list with all the travel groups. In this way, the mapped index is unique for each travel group. We then create a custom partitioner to take the group id as parameter which will be used to get mapped index from the dictionary and the return the index as output of the custom partitioner. 

spark_04_06

Now we have the custom partitioner created, however, there is no function available for applying a custom partitioner on a dataframe. Instead, we can use the partitionBy function of the RDD associated with the dataframe, i.e. df.rdd.partitionBy({number of Partitions}, {custom partitioner}), to apply the custom partitioner. As the partitionBy function only supports key-value paired RDD, we need first map the data rows in the RDD to key-value pairs where key is the group id and value is the data row.

spark_04_09

We can then call the partitionBy function on the key-value paired RDD where the RDD is partitioned by the key (group id). Lastly, we need to convert the partitioned RDD back to a dataframe with createDataFrame function. Before converting the partitioned RDD, we need to map the key-value paired RDD to row-based RDD.

spark_04_07

After printed the content of the new RDD, we can see that the travel groups are now evenly distributed in the partitions.

spark_04_08

Configuration-Driven Azure Data Factory Pipelines

In this blog post, I will introduce two configuration-driven Azure Data Factory pipeline patterns I have used in my previous projects, including the Source-Sink pattern and the Key-Value pattern.

The Source-Sink pattern is primarily used for parameterising and configuring the data movement activities, with the source location and sink location of the data movement configured in a database table. In Azure Data Factory, we can use Copy activity to copy an data entity (database table or text-based file) from a location to another, and we need to create an ADF dataset for the source and an ADF dataset for the sink. It can easily turn into a nightmare when we have a large number of data entities to copy and we have to create a source dataset and a sink dataset for each data entity. With the Source-Sink pattern, we only need to create a generic ADF dataset for each data source type, and dynamically refer to the data entity for copy with the source and sink settings configured in a separate configuration database.

The Key-Value pattern is used for configuring the settings of activities in Azure Data Factory, such as the notebook file path of a Databricks Notebook activity or the url of a logic app Https event. Instead of hard-coding the settings in ADF pipeline itself, the Key-Value pattern allows to configure the activity settings in a database table and to load the settings and dynamically apply them to the ADF activities at runtime.

In the rest of this blog post, I will introduce the implementation of those two patterns in Azure Data Factory.

Source-Sink Pattern

To implement the Source-Sink pattern in Azure Data Factory, the following steps need to be followed:

  • Step 1 – create data movement source-sink configuration table
  • Step 2 – create generic source and sink datasets
  • Step 3 – use Lookup activity to fetch the source-sink settings of the data entities for copy from the configuration table
  • Step 4 – use ForEach activity to loop through the data entities for copy
  • Step 5 – add a generic Copy activity inner the ForEach activity

Step 1 – create data movement source-sink configuration table

Create a configuration database (an Azure SQL database will be a good option), and create a table with columns to define data movement category, location for source and location for sink.

ADF_04_001

Along with the configuration table, we need to create a stored procedure for the Lookup ADF activity to retrieve the source-sink settings of the data entities for specified copy category.

ADF_04_020

Step 2 – create generic source and sink datasets

Create generic source and sink datasets. To make the datasets generic, we need to create a number of dataset parameters to define the location of the table or file referred by the dataset. Those parameters will take the values fetched from the configuration database table at runtime.

ADF_04_005

On the Connection tab of the generic dataset, we need to specify the table name or file container/path/name using the dataset parameters we just defined.

ADF_04_006

Step 3 – use Lookup activity to fetch the source-sink settings 

Add a Lookup activity and set to use the stored procedure we created earlier for retrieving the source-sink settings of all the data entities for copy in a specified category.

ADF_04_019ADF_04_007

The output of the Lookup activity will contains source-sink settings of all the data entities in the specified category defined in the configuration database table.

ADF_04_017

Step 4 – use ForEach activity to loop through the data entities for copy

Add and chain a ForEach activity after the Lookup activity, and set the Items property of the ForEach activity as the output of the Lookup activity, @activity(‘Lookup Configuration’).output.value, so that the ForEach activity will loop through the source-sink settings of all the data entities fetched from the configuration database table.

ADF_04_009

Step 5 – add a generic Copy activity inner the ForEach activity

Inner the ForEach activity, add the Copy activity with the Source and Sink pointing to the generic source dataset and sink dataset we created earlier and pass the corresponding source-sink settings of current data entity item, such as @item().Sink_Item for the file name of the sink.

ADF_04_010

At this point, the source-sink pattern is all set. When the pipeline runs, a copy channel is created and executed for each data entity defined in the configuration database table.

ADF_04_011

Key-Value Pattern

To implement the Key Value pattern in Azure Data Factory, the following steps need to be followed:

  • Step 1 – create activity settings key-value configuration table
  • Step 2 – create stored procedure to transform the key-value table
  • Step 3 – use Lookup activity to retrieve activity settings from configuration table
  • Step 4 – apply settings on ADF activities

Step 1 – create activity settings key-value configuration table

Create a database table with columns defining pipeline name, activity type, activity name, activity setting key, activity setting value.

ADF_04_012

Step 2 – create stored procedure to transform the key-value table 

The ADF Lookup activity is not working in the key-value type dataset. Therefore, we need to convert the key-value table into a single data row with column header as the activity setting key and the column value as the activity setting value.

ADF_04_013

We can use T-SQL PIVOT operator to convert the key-value table into the single row table. However, as the number of records in the key-value table is not fixed, we cannot specify the keys for pivot. Instead, we need to fetch the keys from the key-value table first and author the PIVOT operation in dynamical sql. The snapshot below shows the stored procedure I have created for convert the key-value table.

ADF_04_014

Step 3 – use Lookup activity to retrieve activity settings from configuration table

In the Azure Data Factory pipelines you are creating, at the start of the activities chain, Add a Lookup activity and use the stored procedure created earlier to retrieve the key-value table and convert it into the single row format to be readable for the Lookup activity.  On the Lookup activity, tick the ‘First row only’ on as the result of the Lookup activity will be the single row dataset.

ADF_04_015

The snapshot below shows what the output of the Lookup activity looks like.

ADF_04_016

Step 4 – apply settings on ADF activities

From the output of the Lookup activity, the other ADF activities chained after can access the value of an activity setting as @activity(‘{lookup activity name}’).output.firstRow.{column name for the key}. 

ADF_04_018