Handling Embarrassing Parallel Workload with Databricks Notebook Workflows

Introduction

Embarrassing Parallel refers to the problem where little or no effort is needed to separate the problem into parallel tasks, and there is no dependency for communication needed between the parallel tasks. Embarrassing parallel problem is very common with some typical examples like group-by analyses, simulations, optimisations, cross-validations or feature selections. Normally, an Embarrassing Parallel workload has the following characteristics:

  • Calculate similar things many times with different groups of data or different parameter sets
  • Calculation are independent of each other
  • Each calculation take a fair amount of time
  • The datasets involved in the calculation can be big, but in most of use cases, the datasets are not at  the “Big Data” scale.

The native parallelism mechanism of Apache Spark might not be an efficient way for the Embarrassing Parallel workload due to the overhead of serialization and inter-process communication, especially when the dataset is not big but the calculation is complex. Based on benchmark tests conducted by Dorian Beganovic, Spark performed much more poorly on small or medium size of datasets compared to Pandas running on single machine. Dorian suggested a >500GB mark as the suitable size for spark. In addition, many libraries commonly used in the Embarrassing Parallel use cases, such as numpy and scikit-learn, are not supported by PySpark.

One alternative solution is to take advantage of Databricks Notebook workflows to handle the Embarrassing Parallel workloads. Basically, one generic Databricks Notebook is created to run the calculation for one parallel task against one partition of the entire dataset or one set of parameters. The partition of the dataset or set of parameters are specified by Notebook parameters. In this blog post, these generic Databricks notebooks to run the calculation logic in parallel are referred as Parallel Notebooks.  The Parallel Notebooks are triggered by another Databricks Notebook, which is named as Master Notebook in this blog post. The Master Notebook encapsulates the logic for partitioning the dataset or creating the parameters set and launches the parallel notebooks to execute the calculation logic against the partitioned dataset or parameter set in parallel.

Approach

This blog post walks through the detailed steps to handle Embarrassing Parallel workloads using Databricks Notebook workflows. You can find the sample Databricks notebooks I created for this blog post here.

Baseline Test

We first take a baseline test to run some calculation logic workload in sequence, and then we create notebook workflows to run the workload in parallel to compare the total execution time.

For the calculation task, I will use the group travel planning optimisation example from Toby Segaran’s “Collective Intelligence” book. You can find the Python code encapsulated the optimisation algorithms from here. In this baseline test, I have created eight travel groups and stored them into a dictionary with the dictionary key as the group ID and the dictionary value as the travel group data. A for loop is crated to loop through each travel group and run the optimisation algorithm for each group one by one. The result shows that it took 4.99 minutes to run through all eight groups.

ADB_1_001

After we have the execution time for the sequential workload, we can start to handle the workload in parallel with Databricks notebook workflows to see if the total execution time can be reduced with parallel approach.

Parallel Notebook

We first create the parallel notebook and include the optimisation algorithm code snippet from  Toby Segaran’s “Collective Intelligence” book which will be used for finding the optimised  travel plan for a travel group.

We then create a “groupIDs” parameter which takes the ids of the travel groups to process in this parallel task.

ADB_1_009

The value of the “groupIDs”  parameter is used to retrieve the data of the specified travel groups. In this example, for the simplicity, I just used a Python dictionary to store the data in the file. In real-world scenarios, the data is more likely stored externally such as a database or data lake. In the generic notebook, the optimisation algorithm is only run on the travel groups specified by the parameters.

ADB_1_004

Master Notebook

We then create the Master notebook to partition the travel groups and launch the parallel notebooks. I have written a GroupTravelPlanning class which encapsulates the travel group partition function and the function to launch multiple notebooks in parallel using the built-in multiprocessing library of Python.

##################################################################################
# The master class that partitions the travel groups and trigger the sub notebooks 
# execution for each partition in parallel
##################################################################################
class GroupTravelPlanning:
  
  def __init__(self, partitionNum, travelGroups):
    self.partitionNum = partitionNum
    self.travelGroups = travelGroups
    self.groupPartitionList = self.PartitionTravelGroups()
  
  #Partition travel groups
  def PartitionTravelGroups(self):

    #Initialise a dictionary for hosting the partitions of travel groups
    partitions={}
    for partitionID in range(0, self.partitionNum):
      partitions[partitionID]=[]
      
    #Partition the travel groups 
    for index, groupID in enumerate(self.travelGroups.keys(), start=0):
      partitionID = index % self.partitionNum
      partitions[partitionID].append(groupID)
  
    #Convert the travel groups partition dictionary into an list of travel 
    #group strings, such as ['Group-1,Group-2', 'Group-3,Group4',...] 
    groupPartitionList = []
    for group in partitions.values():
      groupPartition = ','.join(group)
      groupPartitionList.append(groupPartition)
    return groupPartitionList
  
  #Run group travel planning sub-notebooks in parallel
  def RunGroupTravelPlanningNotebooks(self):
    from multiprocessing.pool import ThreadPool
    pool = ThreadPool(self.partitionNum)
    
    pool.map(
       lambda groupIDs: dbutils.notebook.run(
          '/Users/malinxiao@hotmail.com/Group Travel/GroupTravelPlanning_Parallel',
           timeout_seconds=0,
           arguments = {"groupIDs":groupIDs}),
       self.groupPartitionList)

You can find more details for using Python multiprocessing library for concurrent Databricks notebook workflows from this doc. Basically, a thread pool is crated with the number of threads to be same as the number of partition of travel groups to process so that all the partitions of travel groups can be processed at the same time and no waiting queue.

ADB_1_007

Apart from the GroupTravelPlanning class, we also need to add the code to create an instance of the GroupTravelPlanning class in the Master notebook and trigger the main method.

ADB_1_011

After the Master notebook starts to run, the list of sub notebooks (from the generic notebook template with the partitioned travel group ids as parameter) is launched.

ADB_1_002

The result shows that the total time for running through the eight test travel groups in four groups in parallel took 1.63 minutes compared to 4.99 minutes for running in sequence.

ADB_1_001

Limitations

This blog post walks through an approach to handle Embarrassing Parallel workload with Databricks notebook workflows. This approach is logically simple to understand and is easy to control the parallelism with built-in Python multiprocessing package.  However, the limitation of this approach is that all the parallel tasks are running on a single machine (the cluster driver) instead of being distributed to worker nodes. That limits the solution only being able to scale up to a certain point although the maximum VM supported by Databricks at the time when the blog post was written can reach to 80 cores/640 GB (Standard_L80s_V2). To support Embarrassing Parallel workload across multiple nodes,  the alternative approaches can use Pandas UDF for PySpark. I will introduce the Pandas UDF approach in next blog post.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s