Tag: Databricks/Spark

dqops – Query Databricks Database Schema through SQL Connector for Python

dqops Data Quality Studio (DQS) is one of my R&D projects I have been doing during my spare time. I plan to note down some tips & tricks I use in this project in the future blog posts from time to time.

Databricks is one of the main data services that the dqops DQS is going to support. This blog post notes down the approach for querying Databricks database schemas using the “Databricks SQL Connector for Python” library.

The “Databricks SQL Connector for Python” library supports to query Databricks databases through Spark Thrift Server (STS) that allows JDBC/ODBC clients to execute SQL queries over JDBC or ODBC protocols on Apache Spark. A similar approach has been used by dqops DQ studio to connect to on-premises Spark cluster with jaydebeapi library. However, the “Databricks SQL Connector for Python” library simplifies the setup and does not need the installation of JDBC driver on the client environment.

Setup

The setup of the “Databricks SQL Connector for Python” library is simple. You just need to include the python library ref “databricks-sql-connector==0.9.0” in the requirements.txt on the client environment from where your code needs to query the Databricks databases. Alternatively, you can manually install the library using “pip install databricks-sql-connector

Databricks Connection

The “Databricks SQL Connector for Python” library uses the same connections interface as the other JDBC/ODBC python libraries, such as pyodbc and jaydebeapi. That makes it very easy to build the Databricks connector on top of the existing dqops data connection framework that I have built, based on the open-source sodasql Dialect module, to support different types of data repositories, such as Azure Database, Databricks Azure Synapse, and Snowflakes.

To create a Databricks connection, we just need to import “sql” module and run the “connect” function (line 11-15 in the following figure).

The “connect” function requires the hostname and http_path of the thrift server deployed in the target Databricks cluster and also the Databricks access token. The hostname and http_path can be found in the “Advanced Options” section of the cluster setting page.

Databricks access token can be set in the “User Settings” page of the Databricks workspace.

As dqops DQS needs to allow end-users to create and test connection based on a specific data source type, a UI component has been created that generates the data connection form based on predefined connection type metadata.

This image has an empty alt attribute; its file name is image-19.pngThis image has an empty alt attribute; its file name is image-12.png
Query Metadata

After the connection instance is created, we can run sql commends against the Databricks databases using the “execute_sql” function with the sql statement passing in as the parameter.

Spark provides a set of SQL commands to read database/table/column metadata. In dqops DQS, the “SHOW TABLES” command is used to read the list of tables in a specified database, and the “DESCRIBE TABLE” commend is used to read column name and type in a data table.

More Spark sql commends related to metadata can be found here.

The snapshot below shows the metadata UI in the dqops DQS that is built using the Databricks SQL Connector for Python.

Setup a Dockerised Spark Development Environment with VS code and Docker

Setup a Dockerised Spark Development Environment with VS code and Docker

Databricks is not cheap, especially when I need to use it for my personal R&D work (where unfortunately money has to be taken from my own pocket). Therefore, I have been developing in a dockerised Spark environment since a while ago and I found this way actually works well. Here I list the steps to set the environment up with VS code and Docker.

1. Setup VS code remote development in Docker containers

Please refer to this guide for setting up VS code remote development in Docker containers. In addition, the docker-compose needs to be installed as well.

2. Setup base Spark cluster docker images

There are a number of open-source Spark cluster docker images available online that can be used as the base for our dev environment. After the hand-on tests, I found this one from André Perez suits best that consists of a Spark base image, Spark master image, Spark worker image, and a JupyterLab image. A simulated Hadoop distributed file system is also configured as the storage.

cluster-architecture.png

Apart from the Dockerfile image files, there are also a build.sh and a build.yml for configuring and building the images. In addition, a docker-compose file is provided to run up the cluster.

3. Customise the Spark cluster

We can then customise the Spark cluster to suit the requirements of our dev environment. To give an example, here is a list of customisations I made for my dev environment:

Add additional components/services required for the development, such as Microsoft SQL Server (docker image), Hadoop (docker image), OpenLDAP (docker image). You can add the build commends of those additional images to the build.sh and build.yml scripts to simplify the image build process. In addition, you need to add entries (corresponding to each image) in the docker-compose file.

Customise the JupyterLab container as the primary dev container. The Dockerfile of JupyterLab image already contains the installations of the necessary libraries for Spark development. You can install the additional libraries required for your development by extending this Dcokerfile. For example, I have added pyodbc to connect to mssql, SparkJDBC jar to connect to Spark Thrift Server, and the requirements.txt file containing all the dependency Python libraries I need.

COPY docker/jupyterlab/requirements.txt /opt/req/requirements.txt
COPY docker/jupyterlab/jdbc/SparkJDBC42.jar /opt/req/SparkJDBC42.jar

RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
    curl https://packages.microsoft.com/config/debian/9/prod.list > /etc/apt/sources.list.d/mssql-release.list && \
    apt-get update && \
    ACCEPT_EULA=Y apt-get -y install msodbcsql17 && \
    apt-get -y install unixodbc-dev && \
    pip3 install pyodbc && \
    pip3 install -r /opt/req/requirements.txt

ENV CLASSPATH /opt/jdbc/drivers/SparkJDBC42.jar
4. (Optional) Push images to Azure Container Registry

Please refer to this doc.

5. Build images

Extend the build.sh and build.yml to include the entries for the additional images you have added, and then run build.sh shell file.

6. Mapping source file folder to the working folder in dev container

Map your source file folder (in your local machine) to a folder in the dev Docker container (extended from the original jupyterlab container) by configuring “volumes” of the dev container entry in your docker-compose file.

7. Run docker-compose up to start containers
8. Attach VS code to the dev container

Start VS code window, open the “command palette”, and select “Remote-Containers: Attach to Running Container…”.

After the VS code window is attached to the dev container, install “Python Extension for Visual Studio Code” (Install inside the dev container, not the local host machine).

Open the working folder you mapped in step 6.

Then you have the dev environment in docker container where you can author, run, debug your code.

Here is the code for creating a SparkSession instance with the support of Delta Lake.

Create Custom Partitioner for Spark Dataframe

Spark dataframe provides the repartition function to partition the dataframe by a specified column and/or a specified number of partitions. However, for some use cases, the repartition function doesn’t work in the way as required. For example, in the previous blog post, Handling Embarrassing Parallel Workload with PySpark Pandas UDF, we want to repartition the traveller dataframe so that the travellers from a travel group are placed into a same partition. As it takes a decent amount of time to run the group travel planning algorithm on each travel group,  we want the travel groups are evenly spread into different partitions so that the tasks running in parallel will take similar amount of time.

However, when running the repartition function on the traveller dataframe, the travel groups are not evenly distributed.

spark_04_02

spark_04_04

After printed the content of each partition, we can see that the travellers from same travel group are indeed placed into the same partition, however, some partitions are allocated with more travel groups while some other partitions are empty with no travel group allocated.

spark_04_05

The reason for that is due to the default partitioner, HashPartitioner, used by the dataframe. The HashPartitioner decides the partition through calculating the mod of the HashCode of the partition key (e.g., “GroupID” in our example) and the number of the partitions. Depending on the hashcode of the travel group ids, multiple travel groups may end up with same mod value.

spark_04_10

To enforce an evenly distributed partitions, we can create a custom partitioner to return an unique integer value for each travel group. For example, as the following snapshot shown, we can first define a dictionary to map the id of a travel group to the index of the travel group in a list with all the travel groups. In this way, the mapped index is unique for each travel group. We then create a custom partitioner to take the group id as parameter which will be used to get mapped index from the dictionary and the return the index as output of the custom partitioner. 

spark_04_06

Now we have the custom partitioner created, however, there is no function available for applying a custom partitioner on a dataframe. Instead, we can use the partitionBy function of the RDD associated with the dataframe, i.e. df.rdd.partitionBy({number of Partitions}, {custom partitioner}), to apply the custom partitioner. As the partitionBy function only supports key-value paired RDD, we need first map the data rows in the RDD to key-value pairs where key is the group id and value is the data row.

spark_04_09

We can then call the partitionBy function on the key-value paired RDD where the RDD is partitioned by the key (group id). Lastly, we need to convert the partitioned RDD back to a dataframe with createDataFrame function. Before converting the partitioned RDD, we need to map the key-value paired RDD to row-based RDD.

spark_04_07

After printed the content of the new RDD, we can see that the travel groups are now evenly distributed in the partitions.

spark_04_08

Handling Embarrassing Parallel Workload with PySpark Pandas UDF

Introduction

In the previous post, I walked through the approach to handle embarrassing parallel workload with Databricks notebook workflows. However, as all the parallel workloads are running on a single node (the cluster driver), that approach is only able to scale up to a certain point depending on the capability of the driver vm and is not able to split workload into multiple worker nodes. In this blog post, I will walk through another approach to handle embarrassing parallel workload with Databricks Pandas UDF.

Embarrassing parallel workload refers to the type of problems that are easy to separate into parallel task with no dependency for communication between those parallel tasks. Embarrassing parallel workload normally calculates similar things many times with different groups of data or different sets of parameters. The calculation are independent of each other and each calculation takes a fair amount of time. The datasets involved in embarrassing can be big, but in most of use cases, the datasets are not at the “Big Data” scale.

The native parallelism mechanism of Apache Spark might not be an efficient way for the embarrassing parallel workload due to the overhead of serialization and inter-process communication.  In addition, many libraries commonly used in the Embarrassing Parallel use cases, such as numpy and scikit-learn, are not supported by PySpark. However, Pandas UDFs supported by PySpark can be a feasible way for embarrassing parallel workload.

Approach

Unlike the PySpark UDFs which operate row-at-a-time, grouped map Pandas UDFs operate in the split-apply-combine pattern where a Spark dataframe is split into groups based on the conditions specified in the groupBy operator and a user-defined Pandas UDF is applied to each group and the results from all groups are combined and returned as a new Spark dataframe. Embarrassing parallel workload fits into this pattern well. In this way, the calculation of an embarrassing parallel workload can be encapsulated into a Pandas UDF. The dataset involved in the embarrassing parallel workload is loaded into a PySpark dataframe and split into group and the calculation on each group of data is executed in the Pandas UDF with Spark tasks running on separate executors in parallel.

This blog post demonstrates the Pandas UDF approach with the sample example I have used in the previous blog post for explaining the approach for handling embarrassing parallel workload with Databricks notebook workflows.

The sample notebook I have created for this blog post can be found here in my Github repository. The snapshot below shows the core part of the notebook:

databricks_2_002

A Pandas UDF, “planGroupTravelling” (Line 6-12) is crated to execute the group travel planning optimisation algorithm from Toby Segaran’s “Collective Intelligence” book for one travel group.

The dataset of all travel groups used in this example is converted into a Spark dataframe where each row contains fields of travel group id, traveler name, and travel origin (Line 14). For this example, an utility function, “travelGroupsToDataframe“,  is created to covert the original dataset in Python dictionary format to a Spark dataframe (The code for the “travelGroupsToDataframe” function can be found in the sample notebook for this blog post here). The snapshot below shows the converted Spark dataframe, i.e. the input dataframe for the parallel workload.

databricks_2_004

The dataframe is split by travel group id and the “planGroupTraveling” Pandas UDF is applied to each group (Line 15). The results from each UDF, the optimised travelling arrangement for each traveler, are combined into a new Spark dataframe.

databricks_2_003

Conclusion

This blog post describes another approach for handling embarrassing parallel workload using PySpark Pandas UDFs. Grouped map Pandas UDFs operate in the split-apply-combine pattern which distributes calculation logic written in native Python to separate executors and run in parallel. This pattern fits well with embarrassing parallel workload.

Handling Embarrassing Parallel Workload with Databricks Notebook Workflows

Introduction

Embarrassing Parallel refers to the problem where little or no effort is needed to separate the problem into parallel tasks, and there is no dependency for communication needed between the parallel tasks. Embarrassing parallel problem is very common with some typical examples like group-by analyses, simulations, optimisations, cross-validations or feature selections. Normally, an Embarrassing Parallel workload has the following characteristics:

  • Calculate similar things many times with different groups of data or different parameter sets
  • Calculation are independent of each other
  • Each calculation take a fair amount of time
  • The datasets involved in the calculation can be big, but in most of use cases, the datasets are not at  the “Big Data” scale.

The native parallelism mechanism of Apache Spark might not be an efficient way for the Embarrassing Parallel workload due to the overhead of serialization and inter-process communication, especially when the dataset is not big but the calculation is complex. Based on benchmark tests conducted by Dorian Beganovic, Spark performed much more poorly on small or medium size of datasets compared to Pandas running on single machine. Dorian suggested a >500GB mark as the suitable size for spark. In addition, many libraries commonly used in the Embarrassing Parallel use cases, such as numpy and scikit-learn, are not supported by PySpark.

One alternative solution is to take advantage of Databricks Notebook workflows to handle the Embarrassing Parallel workloads. Basically, one generic Databricks Notebook is created to run the calculation for one parallel task against one partition of the entire dataset or one set of parameters. The partition of the dataset or set of parameters are specified by Notebook parameters. In this blog post, these generic Databricks notebooks to run the calculation logic in parallel are referred as Parallel Notebooks.  The Parallel Notebooks are triggered by another Databricks Notebook, which is named as Master Notebook in this blog post. The Master Notebook encapsulates the logic for partitioning the dataset or creating the parameters set and launches the parallel notebooks to execute the calculation logic against the partitioned dataset or parameter set in parallel.

Approach

This blog post walks through the detailed steps to handle Embarrassing Parallel workloads using Databricks Notebook workflows. You can find the sample Databricks notebooks I created for this blog post here.

Baseline Test

We first take a baseline test to run some calculation logic workload in sequence, and then we create notebook workflows to run the workload in parallel to compare the total execution time.

For the calculation task, I will use the group travel planning optimisation example from Toby Segaran’s “Collective Intelligence” book. You can find the Python code encapsulated the optimisation algorithms from here. In this baseline test, I have created eight travel groups and stored them into a dictionary with the dictionary key as the group ID and the dictionary value as the travel group data. A for loop is crated to loop through each travel group and run the optimisation algorithm for each group one by one. The result shows that it took 4.99 minutes to run through all eight groups.

ADB_1_001

After we have the execution time for the sequential workload, we can start to handle the workload in parallel with Databricks notebook workflows to see if the total execution time can be reduced with parallel approach.

Parallel Notebook

We first create the parallel notebook and include the optimisation algorithm code snippet from  Toby Segaran’s “Collective Intelligence” book which will be used for finding the optimised  travel plan for a travel group.

We then create a “groupIDs” parameter which takes the ids of the travel groups to process in this parallel task.

ADB_1_009

The value of the “groupIDs”  parameter is used to retrieve the data of the specified travel groups. In this example, for the simplicity, I just used a Python dictionary to store the data in the file. In real-world scenarios, the data is more likely stored externally such as a database or data lake. In the generic notebook, the optimisation algorithm is only run on the travel groups specified by the parameters.

ADB_1_004

Master Notebook

We then create the Master notebook to partition the travel groups and launch the parallel notebooks. I have written a GroupTravelPlanning class which encapsulates the travel group partition function and the function to launch multiple notebooks in parallel using the built-in multiprocessing library of Python.

##################################################################################
# The master class that partitions the travel groups and trigger the sub notebooks 
# execution for each partition in parallel
##################################################################################
class GroupTravelPlanning:
  
  def __init__(self, partitionNum, travelGroups):
    self.partitionNum = partitionNum
    self.travelGroups = travelGroups
    self.groupPartitionList = self.PartitionTravelGroups()
  
  #Partition travel groups
  def PartitionTravelGroups(self):

    #Initialise a dictionary for hosting the partitions of travel groups
    partitions={}
    for partitionID in range(0, self.partitionNum):
      partitions[partitionID]=[]
      
    #Partition the travel groups 
    for index, groupID in enumerate(self.travelGroups.keys(), start=0):
      partitionID = index % self.partitionNum
      partitions[partitionID].append(groupID)
  
    #Convert the travel groups partition dictionary into an list of travel 
    #group strings, such as ['Group-1,Group-2', 'Group-3,Group4',...] 
    groupPartitionList = []
    for group in partitions.values():
      groupPartition = ','.join(group)
      groupPartitionList.append(groupPartition)
    return groupPartitionList
  
  #Run group travel planning sub-notebooks in parallel
  def RunGroupTravelPlanningNotebooks(self):
    from multiprocessing.pool import ThreadPool
    pool = ThreadPool(self.partitionNum)
    
    pool.map(
       lambda groupIDs: dbutils.notebook.run(
          '/Users/malinxiao@hotmail.com/Group Travel/GroupTravelPlanning_Parallel',
           timeout_seconds=0,
           arguments = {"groupIDs":groupIDs}),
       self.groupPartitionList)

You can find more details for using Python multiprocessing library for concurrent Databricks notebook workflows from this doc. Basically, a thread pool is crated with the number of threads to be same as the number of partition of travel groups to process so that all the partitions of travel groups can be processed at the same time and no waiting queue.

ADB_1_007

Apart from the GroupTravelPlanning class, we also need to add the code to create an instance of the GroupTravelPlanning class in the Master notebook and trigger the main method.

ADB_1_011

After the Master notebook starts to run, the list of sub notebooks (from the generic notebook template with the partitioned travel group ids as parameter) is launched.

ADB_1_002

The result shows that the total time for running through the eight test travel groups in four groups in parallel took 1.63 minutes compared to 4.99 minutes for running in sequence.

ADB_1_001

Limitations

This blog post walks through an approach to handle Embarrassing Parallel workload with Databricks notebook workflows. This approach is logically simple to understand and is easy to control the parallelism with built-in Python multiprocessing package.  However, the limitation of this approach is that all the parallel tasks are running on a single machine (the cluster driver) instead of being distributed to worker nodes. That limits the solution only being able to scale up to a certain point although the maximum VM supported by Databricks at the time when the blog post was written can reach to 80 cores/640 GB (Standard_L80s_V2). To support Embarrassing Parallel workload across multiple nodes,  the alternative approaches can use Pandas UDF for PySpark. I will introduce the Pandas UDF approach in next blog post.