Tag: Azure Data Lake Analytics

End-to-End Azure Data Factory (V1) Pipeline for Star Schema ETL (Part 3)

This is the third part of the blog series to demonstrate how to build an end-to-end ADF pipeline for data warehouse ELT. The part will describe how to build an ADLA U-SQL job for incremental extraction of machine cycle data from Azure Data Lake store and go through the steps for scheduling and triggering the U-SQL job using ADF.

In the first part of the blog series we have created a Azure Data Lake store directory to store the machine cycle data in csv format.

3

We are going to create a U-SQL job to query the machine cycle records based on the SliceStart datatime and the SliceEnd datatime. The ADF pipeline will pass the start time and the end time of current ADF ingestion slice into the U-SQL job, and the U-SQL job will only returns the machine cycle ended within this period. This example focuses on the same-day machine cycle scenario and does not cover the across-day machine cycle scenario. There is another blog post I have written specifically for the across-day scenario here.

Instead of creating an U-SQL script file to host the code of the U-SQL job, we prefer to create an ADLA catalog stored procedure to simplify the scripts organisation and deploying. Two parameters are defined in the stored procedure that take the SliceStart datatime and SliceEnd datatime from ADF. The year, month, day parts will be extracted from the SliceStart datatime for building the folder path that points to the location where the machine cycle csv files are stored.

The U-SQL script will then extract the rows from the csv files and SELECT the rows within the current ADF execution slice. The selected rows will be output into a staging area in the Azure Data Lake store.

CREATE DATABASE IF NOT EXISTS ADFDW;

DROP PROCEDURE IF EXISTS ADFDW.dbo.uspGetMachineCycles;
CREATE PROCEDURE IF NOT EXISTS ADFDW.dbo.uspGetMachineCycles(@sliceStart DateTime, @sliceEnd DateTime)
AS
BEGIN

    DECLARE @sliceYear string = @sliceStart.Year.ToString();
    DECLARE @sliceMonth string = @sliceStart.Month.ToString().PadLeft(2, '0');
    DECLARE @sliceDay string = @sliceStart.Day.ToString().PadLeft(2, '0');

    DECLARE @InputPath = @"/IoT/Curated%20Zone/Demo/ADFDW/"
                         + @sliceYear + "/" + @sliceMonth + "/" + @sliceDay + "/{*}.csv";

    DECLARE @OutputFile string = "/IoT/Staging/stgMachineCycles.csv";

    @machineCycles =
        EXTRACT CycleId int,
                CustomerCode string,
                MachineCode string,
                StartDateTime DateTime,
                EndDateTime DateTime,
                EventTime DateTime
        FROM @InputPath
        USING Extractors.Csv(skipFirstNRows : 1);

    @result =
        SELECT *
        FROM @machineCycles
        WHERE EndDateTime >= @sliceStart AND EndDateTime < @sliceEnd;


    OUTPUT @result
    TO @OutputFile
    USING Outputters.Csv();

END;

After we created the U-SQL script, we need to create the ADF pipeline to schedule and trigger the U-SQL job. As the machine cycle fact table need to look up for the machine and customer dimension tables. We need to schedule the execution of the U-SQL job immediately after the dimension tables are load:

4

The rest of this blog post will go through the steps to build the IncrementalExtractMachineCycle ADF pipeline for triggering the U-SQL job.

Firstly, we need to create a AAD service principal for ADF and ADLA using to access the csv files stored in the ADLS. You can find the detailed steps to create service principal and to assign ADLS access in Microsoft official documents (don’t forget to turn the allowing service access option on in the ADLS firewall settings).

With the service principal we can create the AzureDataLakeAnalyticsLinkedService file in ADF that configure the link to the ADLA service where the U-SQL job will be executed.

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "AzureDataLakeAnalyticsLinkedService",
    "properties": {
        "type": "AzureDataLakeAnalytics",
        "typeProperties": {
          "accountName": "ninjago0786demp",
          "subscriptionId": "{subscription id}",
          "resourceGroupName": "{resource group}",
          "dataLakeAnalyticsUri": "azuredatalakeanalytics.net",
          "servicePrincipalId": "{serivce principal id}",
          "servicePrincipalKey": "{service principal key}",
          "tenant": "tenant"
      }
    }
}

We can then create the IncrementalExtractMachineCycle pipeline linked to the AzureDataLakeAnalyticsLinkedService and create a DataLakeAnalyticsU-SQL type activity to trigger the U-SQL job. In the activity, we set the “script” property as the commend to call the ADFDW.dbo.uspGetMachineCycles we created earlier and pass the SliceStart and SliceEnd as parameters.

As the U-SQL job is expected to run after both the DimMachine and DimCustomer tables are loaded, we set both OutputDimCustomer and OutputDimMachine ADF tables (the output of the LoadDimCustomer pipeline and the LoadDimMachine pipeline)  as the inputs of the DataLakeAnalyticsU-SQL activity. For the output of the activity, we set it as the InputMachineCycle ADF table which is the input for the pipeline to load the machine cycle data from ADLS staging area to the target data warehouse. The implementation of  inputMachineCycle ADF table will be covered in the next part of the blog series.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "IncrementalExtractMachineCycle",
  "properties": {
    "description": "Extract machine cycles within current time slice",
    "activities": [
      {
        "name": "ExtractMachineCycle",
        "linkedServiceName": "AzureDataLakeAnalyticsLinkedService",
        "type": "DataLakeAnalyticsU-SQL",
        "typeProperties": {
          "script": "ADFDW.dbo.uspGetMachineCycles(System.DateTime.Parse(@sliceStart), System.DateTime.Parse(@sliceEnd));",
          "parameters": {
            "sliceStart": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceStart)",
            "sliceEnd": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceEnd)"
          }
        },
        "inputs": [
          {
            "name": "OutputDimCustomer"
          },
          {
            "name": "OutputDimMachine"
          }
        ],
        "outputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have the U-SQL job created and scheduled that will extract the machine cycle rows within the current ADF execution slice and store them into the staging area in the ADLS. The next part of the blog series will create the FactMachineCycle loading pipeline to load the machine cycle rows in csv file format from ADLS to the staging table in the target DW and then create a store procedure to transform and load the data into the machine cycle fact table.

Advertisements

End-to-End Azure Data Factory (V1) Pipeline for Star Schema ETL (Part 1)

This blog series demonstrates how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and load to a star-schema data warehouse database with considerations of  SCD (slow changing dimensions) and incremental loading.

The final pipeline will look as:

1

The machine cycle records will be load from the csv files stored in a Azure Data Lake store, and the reference data of customers and machines will be load form the Reference DB (Azure SQL DB). The ADF pipeline will first load the data into the staging tables in the target DW, and the ADF pipeline will then execute the SQL stored procedures to transform the data into star schema and implement the type 2 SCD.

A number of services within the Azure Cortana Intelligence Suite will be employed to build the end-to-end pipeline, including Azure Data Factory, Azure Data Lake Store, Azure Data Lake Analytics (U-SQL), Azure SQL Database, and the development tools (Visual Studio + SSDT). Four Visual Studio projects will be created, including the ReferenceDB for provisioning the Azure SQL reference database, the DW for provisioning the target Data Wareshourse, the ADLA for incrementally extracting the machine cycle data, and the ADF for the build of the pipeline. You can download the source code here.

This blog post covers the first part of the series to give a brief introduction of the example used in this demonstration and provides the steps for preparing the required demo databases/data lake directory and the sample data.

Preparation

1.Create Visual Studio solution

Create a Visual Studio solution with the SSDT tool to host all the projects required in this demonstration, including the DB projects for ReferenceDB and the target DW, and the ADF project.

2. Create ReferenceDB project

1

Create ReferenceDB project and add the scripts for creating Customer table and Machine table.

Customer table:

CREATE TABLE [dbo].[Customer]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Machine table:

CREATE TABLE [dbo].[Machine]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Create a post deployment script to add the sample data:

INSERT INTO [dbo].[Customer]
           ([Code]
           ,[Name]
           ,[LastModified])
     VALUES
           ('C0001','Customer1', '2017-11-08 21:21:30'),
           ('C0002','Customer2', '2017-11-08 21:21:30')

INSERT INTO [dbo].[Machine]
           ([Code]
           ,[Name]
           ,[Condition]
           ,[LastModified])
     VALUES
           ('M0001','Machine1', 'Perfect', '2017-11-08 21:21:30'),
           ('M0002','Machine2', 'Good', '2017-11-08 21:21:30')

Publish the project to the Azure tenant to provision the Azure SQL DB.

3. Create the target DW project

2

Create the target DW project and add the scripts for schema, staging tables, dimension tables and fact tables:

Schema – prod:

CREATE SCHEMA [prod]

Schema – stg:

CREATE SCHEMA [stg]

Staging – Customer:

CREATE TABLE [stg].[Customer]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – Machine:

CREATE TABLE [stg].[Machine]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – MachineCycle:

CREATE TABLE [stg].[MachineCycle]
(
    [CycleId] int NOT NULL,
    [MachineName] varchar(50) NOT NULL,
    [CustomerName] varchar(50) NOT NULL,
    [StartDateTime] datetime2 NOT NULL,
    [EndDateTime] datetime2 NOT NULL,
        [LastModified] DATETIME2 NOT NULL
)

Dimension – DimCustomer

CREATE TABLE [prod].[DimCustomer]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY, 
    [Name] VARCHAR(50) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Dimension – DimMachine

CREATE TABLE [prod].[DimMachine]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Facts – FactMachineCycle

CREATE TABLE [prod].[FactMachineCycle]
(
    [CycleId] int NOT NULL PRIMARY KEY,
    [MachineId] int NOT NULL,
    [CustomerId] int NOT NULL,
    [DateKey] int NOT NULL,
    [Duration] float NOT NULL,
        [RowAdded] DATETIME2 NOT NULL
)

In addition to the tables above we also need a Date Dimension table and the scripts to generate the table. There are plenty of scripts you can find from Google, e.g., https://www.codeproject.com/Articles/647950/Create-and-Populate-Date-Dimension-for-Data-Wareho. The generated DimDate table should look like:

6

Finally, we need to publish the project to Azure and will get the DB deployed.

7

4. Create Azure Data Lake directory

Create a directory in Azure Data Lake, e.g., “ADFDW”, and organise the sub-directory hierarchy as year -> month -> day.

3

The sample csv data includes the columns of cycleID, CustomerCode, MachineCode, StartDateTime, EndDateTime, and EventTime.

3

Now we have done our preparation work, the next blog post will cover the steps to build ADF pipelines for dimensional tables ETL.

Handling Across-Day Cycle Issue in Daily Usage Analysis using U-SQL

When analysing daily usage of a machine that can run across days, we need to split the time of a single machine running cycle into right days. As the example below shows, the second machine (D00002) run across two days and the third machine (D00003) run across three days.

t2

To analyse the daily usage of those machines we need to transform data in the example above into the following structure.

t22

As you can see, the record of second machine that run across two days has been split into two records, one for each day. Same as the record of third machine that run across three days, the record has been split into three days. The UsageHours column stores the time the machine run on that day. For example, the second machine started to run at 17:33:21 on 22nd of May and stopped at 14:29:22 on 23rd of May. The original record of the machine running cycle will be split into two rows: the row for day on 22nd of May with UsageHours as 6.4 (between 17:33:21 and 23:59:59) , and the row for day on 23rd of May with UsageHours as 14.5 (between 00:00:00 and 14:29:22).

In this blog post, I am going to introduce how to conduct this kind of data transformation using U-SQL with Azure Data Lake Analytics (ADLA) jobs . The snapshot below shows the U-SQL code.

t4

Firstly, we need to split the machine running cycle into the days it spans. To achieve that we need to join the machine cycle table withe a Date dimension table (here is a way to generate data dimension table using U-SQL introduced by Paul Andrew).

t5

The join operation can give us something like:

t1

We use the CROSS JOIN with WHERE clause to get the days within the span of cycle StartDateTime and the cycle EndDateTime (As the U-SQL JOIN does not support non-equality comparison at this moment, we cannot use INNER JOIN with ON clause)

After we split a single cycle into each day segment in the time span of the cycle, we can use the CASE statement to decide the type of the day segment (i.e. the day when machine starts, the day when machine stops, and the day(s) in middle that run the full 24 hours) and calculate the time in each day segment.

t7

In addition, we need to use another CASE statement to flag the first day of the machine running cycle as 1 and the other days as 0. This is the trick for aggregating the count of total machine running cycles within a period. After we split a single machine cycle into multiple records, the count of records in the machine cycle table will not reflect the count of machine cycles. Therefore, we need to use the Cycle flag in each record to aggregate the count of machine cycles.

t6

 

Generate Device Cycle Records from Raw Telemetry Message using Azure Data Lake Analytics

The raw telemetry data collected from IoT sensor is normally event-based, e.g., a “Device On” message when the device starts to run, and a “Device Off” message when the device stops.

a1

One common data preprocessing task is to transform the raw “On/Off” telemetry data into device cycle records with the start time, end time and the duration of a device’s running cycle.

a2

This post introduces how to implement this kind of data preprocessing task using Azure Data Lake Analytics (ADLA) job.

Firstly, we need to create an U-SQL file in Visual Studio. U-SQL is the SQL-like language for creating ADLA jobs, which takes advantage of both C# and SQL language features.

In the U-SQL file, we can use Extract expression to load the raw telemetry data file.

a3

U-SQL provides the LAG function that accesses to a row at a given offset that comes before the current row in a given order. For the “Device Off” event message, we can use the LAG function, LAG(EventDateTime, 1) OVER(ORDER BY DeviceId, EventDateTime), to location the corresponding “Device On” event message and get the EventDataTime (the time when the event happens, i.e. cycle start time). The cycle end time is the EventDataTime of the “Device Off” message, and cycle duration is the time difference between the cycle start time and the cycle end time.

a4

After the data is processed, we can use U-SQL Output  expression to output the results.

a5