This is the third part of the blog series to demonstrate how to build an end-to-end ADF pipeline for data warehouse ELT. The part will describe how to build an ADLA U-SQL job for incremental extraction of machine cycle data from Azure Data Lake store and go through the steps for scheduling and triggering the U-SQL job using ADF.
- Introduction & Preparation
- Build ADF pipeline for dimensional tables ELT
- Build ADLA U-SQL job for incremental extraction of machine cycle data
- Build ADF pipeline for facts table ELT
In the first part of the blog series we have created a Azure Data Lake store directory to store the machine cycle data in csv format.
We are going to create a U-SQL job to query the machine cycle records based on the SliceStart datatime and the SliceEnd datatime. The ADF pipeline will pass the start time and the end time of current ADF ingestion slice into the U-SQL job, and the U-SQL job will only returns the machine cycle ended within this period. This example focuses on the same-day machine cycle scenario and does not cover the across-day machine cycle scenario. There is another blog post I have written specifically for the across-day scenario here.
Instead of creating an U-SQL script file to host the code of the U-SQL job, we prefer to create an ADLA catalog stored procedure to simplify the scripts organisation and deploying. Two parameters are defined in the stored procedure that take the SliceStart datatime and SliceEnd datatime from ADF. The year, month, day parts will be extracted from the SliceStart datatime for building the folder path that points to the location where the machine cycle csv files are stored.
The U-SQL script will then extract the rows from the csv files and SELECT the rows within the current ADF execution slice. The selected rows will be output into a staging area in the Azure Data Lake store.
CREATE DATABASE IF NOT EXISTS ADFDW; DROP PROCEDURE IF EXISTS ADFDW.dbo.uspGetMachineCycles; CREATE PROCEDURE IF NOT EXISTS ADFDW.dbo.uspGetMachineCycles(@sliceStart DateTime, @sliceEnd DateTime) AS BEGIN DECLARE @sliceYear string = @sliceStart.Year.ToString(); DECLARE @sliceMonth string = @sliceStart.Month.ToString().PadLeft(2, '0'); DECLARE @sliceDay string = @sliceStart.Day.ToString().PadLeft(2, '0'); DECLARE @InputPath = @"/IoT/Curated%20Zone/Demo/ADFDW/" + @sliceYear + "/" + @sliceMonth + "/" + @sliceDay + "/{*}.csv"; DECLARE @OutputFile string = "/IoT/Staging/stgMachineCycles.csv"; @machineCycles = EXTRACT CycleId int, CustomerCode string, MachineCode string, StartDateTime DateTime, EndDateTime DateTime, EventTime DateTime FROM @InputPath USING Extractors.Csv(skipFirstNRows : 1); @result = SELECT * FROM @machineCycles WHERE EndDateTime >= @sliceStart AND EndDateTime < @sliceEnd; OUTPUT @result TO @OutputFile USING Outputters.Csv(); END;
After we created the U-SQL script, we need to create the ADF pipeline to schedule and trigger the U-SQL job. As the machine cycle fact table need to look up for the machine and customer dimension tables. We need to schedule the execution of the U-SQL job immediately after the dimension tables are load:
The rest of this blog post will go through the steps to build the IncrementalExtractMachineCycle ADF pipeline for triggering the U-SQL job.
Firstly, we need to create a AAD service principal for ADF and ADLA using to access the csv files stored in the ADLS. You can find the detailed steps to create service principal and to assign ADLS access in Microsoft official documents (don’t forget to turn the allowing service access option on in the ADLS firewall settings).
With the service principal we can create the AzureDataLakeAnalyticsLinkedService file in ADF that configure the link to the ADLA service where the U-SQL job will be executed.
{ "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json", "name": "AzureDataLakeAnalyticsLinkedService", "properties": { "type": "AzureDataLakeAnalytics", "typeProperties": { "accountName": "ninjago0786demp", "subscriptionId": "{subscription id}", "resourceGroupName": "{resource group}", "dataLakeAnalyticsUri": "azuredatalakeanalytics.net", "servicePrincipalId": "{serivce principal id}", "servicePrincipalKey": "{service principal key}", "tenant": "tenant" } } }
We can then create the IncrementalExtractMachineCycle pipeline linked to the AzureDataLakeAnalyticsLinkedService and create a DataLakeAnalyticsU-SQL type activity to trigger the U-SQL job. In the activity, we set the “script” property as the commend to call the ADFDW.dbo.uspGetMachineCycles we created earlier and pass the SliceStart and SliceEnd as parameters.
As the U-SQL job is expected to run after both the DimMachine and DimCustomer tables are loaded, we set both OutputDimCustomer and OutputDimMachine ADF tables (the output of the LoadDimCustomer pipeline and the LoadDimMachine pipeline) as the inputs of the DataLakeAnalyticsU-SQL activity. For the output of the activity, we set it as the InputMachineCycle ADF table which is the input for the pipeline to load the machine cycle data from ADLS staging area to the target data warehouse. The implementation of inputMachineCycle ADF table will be covered in the next part of the blog series.
{ "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json", "name": "IncrementalExtractMachineCycle", "properties": { "description": "Extract machine cycles within current time slice", "activities": [ { "name": "ExtractMachineCycle", "linkedServiceName": "AzureDataLakeAnalyticsLinkedService", "type": "DataLakeAnalyticsU-SQL", "typeProperties": { "script": "ADFDW.dbo.uspGetMachineCycles(System.DateTime.Parse(@sliceStart), System.DateTime.Parse(@sliceEnd));", "parameters": { "sliceStart": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceStart)", "sliceEnd": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceEnd)" } }, "inputs": [ { "name": "OutputDimCustomer" }, { "name": "OutputDimMachine" } ], "outputs": [ { "name": "InputMachineCycle" } ], "policy": { "concurrency": 1, "executionPriorityOrder": "OldestFirst", "retry": 3, "timeout": "01:00:00" }, "scheduler": { "frequency": "Hour", "interval": 1 } } ], "start": "2017-11-10T20:00:00", "end": "2017-11-13T01:00:00" } }
Now we have the U-SQL job created and scheduled that will extract the machine cycle rows within the current ADF execution slice and store them into the staging area in the ADLS. The next part of the blog series will create the FactMachineCycle loading pipeline to load the machine cycle rows in csv file format from ADLS to the staging table in the target DW and then create a store procedure to transform and load the data into the machine cycle fact table.
You must be logged in to post a comment.