End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 2)

This is the second part of the blog series to demonstrate how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and loading to a star-schema data warehouse database with considerations on  SCD (slow changing dimensions) and incremental loading.

The first part of the blog series describes the example used by the demonstration and setup the required Azure SQL DB/Azure Data Lake directory and the sample data. In the second part of the blog series we are going to build the ADF pipeline for the incremental loading of customer and machine related data from the reference database to the staging database and then transform them to the type 2 SCD tables.

In this demonstration we are going to create two Dimension tables, DimMachine and DimCustomer (the DimDate has been created in Part 1 of the blog series). We start from building the ADF pipeline for DimMachine. Firstly we need to create the stored procedure for loading the type 2 SCD DimMachine table from the staging table ([stg].[Machine). The stored procedure will be called by the ADF pipeline after the staging table is loaded from the reference database. In the Visual Studio DW project we have created in the the first part of the blog series, we add a ETL folder and create a sql file naming: uspLoadDimMachine.sql.

2

In this file we create the Stored Procedure [stg].[uspLoadDimMachine] with the use of  SQL Merge statement to implement the type 2 SCD for DimMachine table.

CREATE PROC [stg].[uspLoadDimMachine] AS
BEGIN
    INSERT INTO [prod].[DimMachine]
        ([Code]
        ,[Name]
        ,[Condition]
        ,[CurrentRow]
        ,[ValidTo]
        ,[LastModified]
        )
    SELECT
         M.[Code]
        ,M.[Name]
        ,M.[Condition]
        ,1
        ,'9999-12-31'
        , GETDATE()
    FROM (
        MERGE [prod].[DimMachine] AS [Target]
            USING [stg].[Machine] AS [Source] ON Target.Code = Source.Code
        WHEN MATCHED AND Target.[Condition]  Source.[Condition]
             THEN UPDATE SET
                [CurrentRow] = 0
               ,[LastModified] = GETDATE()
               ,[ValidTo] = GETDATE()
        WHEN NOT MATCHED BY TARGET
             THEN INSERT (
                 [Code]
                ,[Name]
                ,[Condition]
                ,[CurrentRow]
                ,[ValidTo]
                ,[LastModified]
               ) VALUES (
                 Source.Code
                ,Source.Name
                ,Source.Condition
                ,1
                ,'9999-12-31'
                ,GETDATE()
               )
         OUTPUT $action AS Action, [Source].*
        ) AS M
        WHERE M.Action = 'UPDATE' AND M.[Code]  IS NOT NULL;
END

After the stored procedure is ready we can start to build the ADF pipeline for machine dimension loading. The snapshot below shows the complete pipeline diagram for DimMachine data loading.

1

We will author the ADF components in Visual Studio. The snapshot below show the files we need to create for DimMachine data loading (highlighted in yellow colour):

1

ReferenceDBLinkedService and DWLinkedService configure the links to the Azure SQL reference database and the target data warehouse.

ReferenceDBLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "ReferenceDBLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

DWLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "DWLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

InputMachine table is an AzureSQLTable type table that points to the [dbo].[Machine] table in our ReferenceDB Azure SQL database. The columns used to mapped to the columns in the output data table, i.e. [stg].[Machine] is declared in the structure attribute. The input table is set to be available every one hour which means the table will be refreshed from the [dbo].[Machine] table every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "InputMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "ReferenceDBLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[dbo].[Machine]"
    },
    "external": true,
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputStgMachine ADF table points to the [stg].[Machine] table in our target DW Azure SQL database. The output columns mapped to the input columns (the input columns were defined in the InputMachine table earlier) is also declared in the structure attribute. The output table is set to be available every one hour that means the Copy Activity for moving data from source Machine table to the staging Machine table will be triggered and output the results to the output data every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[Machine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputDimMachine ADF table points to the [prod].[DimMachine] table in our target DW Azure SQL database.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputDimMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[DimMachine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

LoadDimMachine pipleline contains two activities. The first activity LoadStgMachine is a Copy Activity which copies data from the reference machine table to the staging machine table. The data queried from the reference machine table is filtered by the SliceStart datatime and the SliceEnd datatime that only returns the machine recorded updated during that period. TabularTranslator is configured to map the columns from reference machine table to the staging table.

The second activity is a SqlServerStoredProcedure activity which calls the stored procedure we have created earlier to implementing type 2 SCD for DimMachine.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadDimMachine",
  "properties": {
    "description": "Load Machine table to staging table, and then call stored procedure to load to dimention table with SCD 2 ",
    "activities": [
      {
        "name": "LoadStgMachine",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            "sqlReaderQuery": "$$Text.Format('SELECT [Code],[Name],[Condition],[LastModified] FROM [dbo].[Machine] WHERE [LastModified] >= \\'{0:yyyy-MM-dd HH:mm}\\' AND [LastModified] < \\'{1:yyyy-MM-dd HH:mm}\\'', SliceStart, SliceEnd)"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[Machine]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[Machine]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "Code: Code, Name: Name, Condition: Condition, LastModified: LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },
      {
        "name": "LoadDimMachine",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputDimMachine"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadDimMachine",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }

    ],
    "start": "2017-11-10T21:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have done our pipeline for DimMachine table loading. We can publish the pipeline to our Azure tenant:

1

The periodically execution of the pipeline will provide us the incremental loading of the type 2 SCD DimMachine table.

2

Next we can move on to create the LoadDimCustomer pipeline that takes same steps as the steps described above for creating LoadDimMachine pipeline

3

In the next part of the blog series, I am going to cover the steps for building ADLA U-SQL job for incremental extraction of machine cycle data and also how to schedule and trigger the U-SQL job from ADF.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s