Tag: Azure Data Factory

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 3)

This is the third part of the blog series to demonstrate how to build an end-to-end ADF pipeline for data warehouse ELT. The part will describe how to build an ADLA U-SQL job for incremental extraction of machine cycle data from Azure Data Lake store and go through the steps for scheduling and triggering the U-SQL job using ADF.

In the first part of the blog series we have created a Azure Data Lake store directory to store the machine cycle data in csv format.

3

We are going to create a U-SQL job to query the machine cycle records based on the SliceStart datatime and the SliceEnd datatime. The ADF pipeline will pass the start time and the end time of current ADF ingestion slice into the U-SQL job, and the U-SQL job will only returns the machine cycle ended within this period. This example focuses on the same-day machine cycle scenario and does not cover the across-day machine cycle scenario. There is another blog post I have written specifically for the across-day scenario here.

Instead of creating an U-SQL script file to host the code of the U-SQL job, we prefer to create an ADLA catalog stored procedure to simplify the scripts organisation and deploying. Two parameters are defined in the stored procedure that take the SliceStart datatime and SliceEnd datatime from ADF. The year, month, day parts will be extracted from the SliceStart datatime for building the folder path that points to the location where the machine cycle csv files are stored.

The U-SQL script will then extract the rows from the csv files and SELECT the rows within the current ADF execution slice. The selected rows will be output into a staging area in the Azure Data Lake store.

CREATE DATABASE IF NOT EXISTS ADFDW;

DROP PROCEDURE IF EXISTS ADFDW.dbo.uspGetMachineCycles;
CREATE PROCEDURE IF NOT EXISTS ADFDW.dbo.uspGetMachineCycles(@sliceStart DateTime, @sliceEnd DateTime)
AS
BEGIN

    DECLARE @sliceYear string = @sliceStart.Year.ToString();
    DECLARE @sliceMonth string = @sliceStart.Month.ToString().PadLeft(2, '0');
    DECLARE @sliceDay string = @sliceStart.Day.ToString().PadLeft(2, '0');

    DECLARE @InputPath = @"/IoT/Curated%20Zone/Demo/ADFDW/"
                         + @sliceYear + "/" + @sliceMonth + "/" + @sliceDay + "/{*}.csv";

    DECLARE @OutputFile string = "/IoT/Staging/stgMachineCycles.csv";

    @machineCycles =
        EXTRACT CycleId int,
                CustomerCode string,
                MachineCode string,
                StartDateTime DateTime,
                EndDateTime DateTime,
                EventTime DateTime
        FROM @InputPath
        USING Extractors.Csv(skipFirstNRows : 1);

    @result =
        SELECT *
        FROM @machineCycles
        WHERE EndDateTime >= @sliceStart AND EndDateTime < @sliceEnd;


    OUTPUT @result
    TO @OutputFile
    USING Outputters.Csv();

END;

After we created the U-SQL script, we need to create the ADF pipeline to schedule and trigger the U-SQL job. As the machine cycle fact table need to look up for the machine and customer dimension tables. We need to schedule the execution of the U-SQL job immediately after the dimension tables are load:

4

The rest of this blog post will go through the steps to build the IncrementalExtractMachineCycle ADF pipeline for triggering the U-SQL job.

Firstly, we need to create a AAD service principal for ADF and ADLA using to access the csv files stored in the ADLS. You can find the detailed steps to create service principal and to assign ADLS access in Microsoft official documents (don’t forget to turn the allowing service access option on in the ADLS firewall settings).

With the service principal we can create the AzureDataLakeAnalyticsLinkedService file in ADF that configure the link to the ADLA service where the U-SQL job will be executed.

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "AzureDataLakeAnalyticsLinkedService",
    "properties": {
        "type": "AzureDataLakeAnalytics",
        "typeProperties": {
          "accountName": "ninjago0786demp",
          "subscriptionId": "{subscription id}",
          "resourceGroupName": "{resource group}",
          "dataLakeAnalyticsUri": "azuredatalakeanalytics.net",
          "servicePrincipalId": "{serivce principal id}",
          "servicePrincipalKey": "{service principal key}",
          "tenant": "tenant"
      }
    }
}

We can then create the IncrementalExtractMachineCycle pipeline linked to the AzureDataLakeAnalyticsLinkedService and create a DataLakeAnalyticsU-SQL type activity to trigger the U-SQL job. In the activity, we set the “script” property as the commend to call the ADFDW.dbo.uspGetMachineCycles we created earlier and pass the SliceStart and SliceEnd as parameters.

As the U-SQL job is expected to run after both the DimMachine and DimCustomer tables are loaded, we set both OutputDimCustomer and OutputDimMachine ADF tables (the output of the LoadDimCustomer pipeline and the LoadDimMachine pipeline)  as the inputs of the DataLakeAnalyticsU-SQL activity. For the output of the activity, we set it as the InputMachineCycle ADF table which is the input for the pipeline to load the machine cycle data from ADLS staging area to the target data warehouse. The implementation of  inputMachineCycle ADF table will be covered in the next part of the blog series.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "IncrementalExtractMachineCycle",
  "properties": {
    "description": "Extract machine cycles within current time slice",
    "activities": [
      {
        "name": "ExtractMachineCycle",
        "linkedServiceName": "AzureDataLakeAnalyticsLinkedService",
        "type": "DataLakeAnalyticsU-SQL",
        "typeProperties": {
          "script": "ADFDW.dbo.uspGetMachineCycles(System.DateTime.Parse(@sliceStart), System.DateTime.Parse(@sliceEnd));",
          "parameters": {
            "sliceStart": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceStart)",
            "sliceEnd": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceEnd)"
          }
        },
        "inputs": [
          {
            "name": "OutputDimCustomer"
          },
          {
            "name": "OutputDimMachine"
          }
        ],
        "outputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have the U-SQL job created and scheduled that will extract the machine cycle rows within the current ADF execution slice and store them into the staging area in the ADLS. The next part of the blog series will create the FactMachineCycle loading pipeline to load the machine cycle rows in csv file format from ADLS to the staging table in the target DW and then create a store procedure to transform and load the data into the machine cycle fact table.

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 2)

This is the second part of the blog series to demonstrate how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and loading to a star-schema data warehouse database with considerations on  SCD (slow changing dimensions) and incremental loading.

The first part of the blog series describes the example used by the demonstration and setup the required Azure SQL DB/Azure Data Lake directory and the sample data. In the second part of the blog series we are going to build the ADF pipeline for the incremental loading of customer and machine related data from the reference database to the staging database and then transform them to the type 2 SCD tables.

In this demonstration we are going to create two Dimension tables, DimMachine and DimCustomer (the DimDate has been created in Part 1 of the blog series). We start from building the ADF pipeline for DimMachine. Firstly we need to create the stored procedure for loading the type 2 SCD DimMachine table from the staging table ([stg].[Machine). The stored procedure will be called by the ADF pipeline after the staging table is loaded from the reference database. In the Visual Studio DW project we have created in the the first part of the blog series, we add a ETL folder and create a sql file naming: uspLoadDimMachine.sql.

2

In this file we create the Stored Procedure [stg].[uspLoadDimMachine] with the use of  SQL Merge statement to implement the type 2 SCD for DimMachine table.

CREATE PROC [stg].[uspLoadDimMachine] AS
BEGIN
    INSERT INTO [prod].[DimMachine]
        ([Code]
        ,[Name]
        ,[Condition]
        ,[CurrentRow]
        ,[ValidTo]
        ,[LastModified]
        )
    SELECT
         M.[Code]
        ,M.[Name]
        ,M.[Condition]
        ,1
        ,'9999-12-31'
        , GETDATE()
    FROM (
        MERGE [prod].[DimMachine] AS [Target]
            USING [stg].[Machine] AS [Source] ON Target.Code = Source.Code
        WHEN MATCHED AND Target.[Condition]  Source.[Condition]
             THEN UPDATE SET
                [CurrentRow] = 0
               ,[LastModified] = GETDATE()
               ,[ValidTo] = GETDATE()
        WHEN NOT MATCHED BY TARGET
             THEN INSERT (
                 [Code]
                ,[Name]
                ,[Condition]
                ,[CurrentRow]
                ,[ValidTo]
                ,[LastModified]
               ) VALUES (
                 Source.Code
                ,Source.Name
                ,Source.Condition
                ,1
                ,'9999-12-31'
                ,GETDATE()
               )
         OUTPUT $action AS Action, [Source].*
        ) AS M
        WHERE M.Action = 'UPDATE' AND M.[Code]  IS NOT NULL;
END

After the stored procedure is ready we can start to build the ADF pipeline for machine dimension loading. The snapshot below shows the complete pipeline diagram for DimMachine data loading.

1

We will author the ADF components in Visual Studio. The snapshot below show the files we need to create for DimMachine data loading (highlighted in yellow colour):

1

ReferenceDBLinkedService and DWLinkedService configure the links to the Azure SQL reference database and the target data warehouse.

ReferenceDBLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "ReferenceDBLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

DWLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "DWLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

InputMachine table is an AzureSQLTable type table that points to the [dbo].[Machine] table in our ReferenceDB Azure SQL database. The columns used to mapped to the columns in the output data table, i.e. [stg].[Machine] is declared in the structure attribute. The input table is set to be available every one hour which means the table will be refreshed from the [dbo].[Machine] table every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "InputMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "ReferenceDBLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[dbo].[Machine]"
    },
    "external": true,
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputStgMachine ADF table points to the [stg].[Machine] table in our target DW Azure SQL database. The output columns mapped to the input columns (the input columns were defined in the InputMachine table earlier) is also declared in the structure attribute. The output table is set to be available every one hour that means the Copy Activity for moving data from source Machine table to the staging Machine table will be triggered and output the results to the output data every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[Machine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputDimMachine ADF table points to the [prod].[DimMachine] table in our target DW Azure SQL database.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputDimMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[DimMachine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

LoadDimMachine pipleline contains two activities. The first activity LoadStgMachine is a Copy Activity which copies data from the reference machine table to the staging machine table. The data queried from the reference machine table is filtered by the SliceStart datatime and the SliceEnd datatime that only returns the machine recorded updated during that period. TabularTranslator is configured to map the columns from reference machine table to the staging table.

The second activity is a SqlServerStoredProcedure activity which calls the stored procedure we have created earlier to implementing type 2 SCD for DimMachine.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadDimMachine",
  "properties": {
    "description": "Load Machine table to staging table, and then call stored procedure to load to dimention table with SCD 2 ",
    "activities": [
      {
        "name": "LoadStgMachine",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            "sqlReaderQuery": "$$Text.Format('SELECT [Code],[Name],[Condition],[LastModified] FROM [dbo].[Machine] WHERE [LastModified] >= \\'{0:yyyy-MM-dd HH:mm}\\' AND [LastModified] < \\'{1:yyyy-MM-dd HH:mm}\\'', SliceStart, SliceEnd)"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[Machine]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[Machine]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "Code: Code, Name: Name, Condition: Condition, LastModified: LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },
      {
        "name": "LoadDimMachine",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputDimMachine"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadDimMachine",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }

    ],
    "start": "2017-11-10T21:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have done our pipeline for DimMachine table loading. We can publish the pipeline to our Azure tenant:

1

The periodically execution of the pipeline will provide us the incremental loading of the type 2 SCD DimMachine table.

2

Next we can move on to create the LoadDimCustomer pipeline that takes same steps as the steps described above for creating LoadDimMachine pipeline

3

In the next part of the blog series, I am going to cover the steps for building ADLA U-SQL job for incremental extraction of machine cycle data and also how to schedule and trigger the U-SQL job from ADF.

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 1)

This blog series demonstrates how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and load to a star-schema data warehouse database with considerations of  SCD (slow changing dimensions) and incremental loading.

The final pipeline will look as:

1

The machine cycle records will be load from the csv files stored in a Azure Data Lake store, and the reference data of customers and machines will be load form the Reference DB (Azure SQL DB). The ADF pipeline will first load the data into the staging tables in the target DW, and the ADF pipeline will then execute the SQL stored procedures to transform the data into star schema and implement the type 2 SCD.

A number of services within the Azure Cortana Intelligence Suite will be employed to build the end-to-end pipeline, including Azure Data Factory, Azure Data Lake Store, Azure Data Lake Analytics (U-SQL), Azure SQL Database, and the development tools (Visual Studio + SSDT). Four Visual Studio projects will be created, including the ReferenceDB for provisioning the Azure SQL reference database, the DW for provisioning the target Data Wareshourse, the ADLA for incrementally extracting the machine cycle data, and the ADF for the build of the pipeline. You can download the source code here.

This blog post covers the first part of the series to give a brief introduction of the example used in this demonstration and provides the steps for preparing the required demo databases/data lake directory and the sample data.

Preparation

1.Create Visual Studio solution

Create a Visual Studio solution with the SSDT tool to host all the projects required in this demonstration, including the DB projects for ReferenceDB and the target DW, and the ADF project.

2. Create ReferenceDB project

1

Create ReferenceDB project and add the scripts for creating Customer table and Machine table.

Customer table:

CREATE TABLE [dbo].[Customer]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Machine table:

CREATE TABLE [dbo].[Machine]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Create a post deployment script to add the sample data:

INSERT INTO [dbo].[Customer]
           ([Code]
           ,[Name]
           ,[LastModified])
     VALUES
           ('C0001','Customer1', '2017-11-08 21:21:30'),
           ('C0002','Customer2', '2017-11-08 21:21:30')

INSERT INTO [dbo].[Machine]
           ([Code]
           ,[Name]
           ,[Condition]
           ,[LastModified])
     VALUES
           ('M0001','Machine1', 'Perfect', '2017-11-08 21:21:30'),
           ('M0002','Machine2', 'Good', '2017-11-08 21:21:30')

Publish the project to the Azure tenant to provision the Azure SQL DB.

3. Create the target DW project

2

Create the target DW project and add the scripts for schema, staging tables, dimension tables and fact tables:

Schema – prod:

CREATE SCHEMA [prod]

Schema – stg:

CREATE SCHEMA [stg]

Staging – Customer:

CREATE TABLE [stg].[Customer]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – Machine:

CREATE TABLE [stg].[Machine]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – MachineCycle:

CREATE TABLE [stg].[MachineCycle]
(
    [CycleId] int NOT NULL,
    [MachineName] varchar(50) NOT NULL,
    [CustomerName] varchar(50) NOT NULL,
    [StartDateTime] datetime2 NOT NULL,
    [EndDateTime] datetime2 NOT NULL,
        [LastModified] DATETIME2 NOT NULL
)

Dimension – DimCustomer

CREATE TABLE [prod].[DimCustomer]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY, 
    [Name] VARCHAR(50) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Dimension – DimMachine

CREATE TABLE [prod].[DimMachine]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Facts – FactMachineCycle

CREATE TABLE [prod].[FactMachineCycle]
(
    [CycleId] int NOT NULL PRIMARY KEY,
    [MachineId] int NOT NULL,
    [CustomerId] int NOT NULL,
    [DateKey] int NOT NULL,
    [Duration] float NOT NULL,
        [RowAdded] DATETIME2 NOT NULL
)

In addition to the tables above we also need a Date Dimension table and the scripts to generate the table. There are plenty of scripts you can find from Google, e.g., https://www.codeproject.com/Articles/647950/Create-and-Populate-Date-Dimension-for-Data-Wareho. The generated DimDate table should look like:

6

Finally, we need to publish the project to Azure and will get the DB deployed.

7

4. Create Azure Data Lake directory

Create a directory in Azure Data Lake, e.g., “ADFDW”, and organise the sub-directory hierarchy as year -> month -> day.

3

The sample csv data includes the columns of cycleID, CustomerCode, MachineCode, StartDateTime, EndDateTime, and EventTime.

3

Now we have done our preparation work, the next blog post will cover the steps to build ADF pipelines for dimensional tables ETL.

Trigger Azure Analysis Service Processing in Azure Data Factory

There is one important feature missing from Azure Data Factory. In SSIS, at the end of the ETL process when the new data has been transformed and load into data warehouse, the SSAS processing task can be run to process the cube immediately after the new data has flow into the data warehouse. However, Azure Data Factory does not ship with the OOTB Azure Analysis Service processing activity.

Thanks to the Analysis Services Management Object (AMO), a few lines of simple .Net code will fill the gap to process the Azure Analysis Service automatically.

Server asServer = new Server();
String asConnectionString = "{connection string to the azure analysis service}";
asServer.Connect(asConnectionString);
Database asDatabase = asServer.Databases.FindByName("{Azure analysis service DB name}");
asDatabase.Model.RequestRefresh({Processing Mode});
asDatabase.Model.SaveChanges();

We can host and run the code using Azure Functions that allows us to schedule the AS model processing periodically. However, the problem with this approach is that the AS model is not processed immediately after the data warehouse refreshing in sequence.

Another approach is to create DotNetActivity activity and to run the .Net model processing code against Azure Batch. You can find some sample code and deployment instructions for this approach from Microsoft here.

You can then add the DotNetActivity activity at end of your Azure Data Factory pipeline.

q1