Tag: SQL

SSIS in Azure #1 – Periodically Ingesting Data into Azure Data Lake from SQL Database using SSIS

The low cost, schema-less and large column attributes of Azure Data Lake Store along with the large number of supported analytic engines (e.g., Azure Data Lake Analytics, Hive and Spark) makes it a prefect store-everything repository for enterprise data. We can offline the copies of business data from various LOB data sources into Azure Data Lake Store for all sorts of batch analysis.

Microsoft provides us with Azure Data Factory, the cloud-based ETL service in Azure Cortana Intelligence Suite, to support the data ingestion to Azure Data Lake Store. However, many data engineers working with Microsoft BI stack may prefer to use the SSIS, the tool they are familiar with and offers the easy-to-use visual editor and the rich collection of transformation components, instead of Azure Data Factory where they have to author the json files to define data source links, datasets, and pipelines (at least for Azure Data Factory V1. There will be a visual editor for Azure Data Factory V2 but not available yet).

Thanks to the Azure-SSIS integration runtime that is available for public preview in Azure Data Factory V2, we can now deploy and execute our SSIS packages in Azure that provides an alternative option for cloud-based ETL.

This blog post introduces how to move data in cloud using SSIS with an example for a common use case that periodically ingest data from SQL database to Azure Data Lake Store. There are two key requirements for this use case:

  • SSIS need to be able to connect to Azure SQL database and load the data into a csv file in a specified folder in Azure Data Lake Store
  • SSIS need to be able to periodically, incrementally load data from Azure SQL database into a csv file for that period. The csv files need to be organised in date hierarchy for  optimised performance of Azure Data Lake Store.

1t1

For the first requirement, we need to use the SSIS Feature Pack for Azure that is an SSIS extension to connect to Azure services, move data between Azure data sources or between on-premises data sources and Azure data sources. For the second requirement, we need to use a SSIS trick for dynamic attribute settings on data flow destination component. We will cover the details to fulfil those two requirements in the rest of the blog post.

Firstly, we need to install the SSIS Feature Pack for Azure to Visual Studio (the right version of SSDT should have been installed to the Visual Studio). We should be able to see the Azure connection components in the SSIS toolbox after the feature pack is installed.

1t1

Before starting to build the SSIS package, we need to create a Azure AD service principle as the service account for accessing the Azure Data Lake Store and assign the principle read/write permission to the folder in the Azure Data Lake Store where the output csv files will be stored.

We then create a SSIS project in SSDT and add a Data Flow Task.

2

Open the Data Flow tab, add an ADO NET source which will connect to the Azure SQL database where the data will be extracted from. In this example, we will use the AdventureWorks sample database as data source and transfer the sale orders data into Azure Data Lake Store. To extract the sale orders periodically, we first define two variables in the SSIS package, “StartDate” and “EndDate“. In this example, we want to load the sale orders at the daily interval. The SSIS package is expected to run at early morning every day to load data of the previous day. Therefore, the value of StartDate variable will be: DATEADD( “day”, -1, ( (DT_DATE)(DT_DBDATE)GETDATE())) and the value of EndDate will be: (DT_DATE)(DT_DBDATE)GETDATE().

3

Then we want to extract the sale order records with LastModified datatime between the StartDate and the EndDate. In this example, we first create a Stored Procedure uspGetSaleOrders in the source SQL Database that take the StartDate and EndDate as parameters and return the sale orders between the dates. In your environment, if you do not have access to create Stored Procedure in your data sources, you can create the sql scripts into a SSIS variable.

We then move to the Control Flow tab and open the properties panel of the data flow task and open the Expressions editor.

1t1.PNG

On the Expressions editor, we add an expression to dynamically set the SqlCommand property of the SQL database source as: “EXEC [dbo].[uspGetSaleOrders] @StartDate ='”+(DT_WSTR, 50)@[User::StartDate]+”‘, @EndDate = ‘”+(DT_WSTR, 50)@[User::EndDate]+”‘”. This command will exec the stored procedure we created earlier with the StartDate and EndDate variables passed in.

2.PNG

Now we have the data source setup and we can move to add and configure the Azure Data Lake Store Destination.

3.PNG

We add an Azure Data Lake Store Destination component in the Data Flow table and add a data flow from the SQL database source to the destination. On the Azure Data Lake store Destination Editor window, we need to create an connection manager to manage the connection (including the store location and the authentication) to the Azure Data Lake Store and specify the output file path and the format of the file. As we will output the file as csv format, we need to select the file format as Text and the column delimiter character as “,”.

4.PNG

The interesting part is on the File Path attribute. As we discussed earlier, we want to organise the files into the date hierarchy based on the modified date of the sale order records, so the file path will look like: “/{project folder}/{Year}/{Month}/{Day}/SaleOrders_{date string}.csv“.

To dynamically set the file path of Azure Data Lake Destination, we can add an expression in the parent Data Flow Task as we did for the SQLCommond attribute of the SQL database source.

2

We define the expression for the file path as:

/Samples/Demo/”+(DT_WSTR, 4)YEAR(@[User::EndDate]) +”/”+RIGHT(“0” + (DT_WSTR, 2) MONTH(@[User::EndDate]), 2) +”/”+RIGHT(“0″ + (DT_WSTR, 2) DAY(@[User::EndDate]), 2)+”/SaleOrders_” +(DT_WSTR, 4)YEAR(@[User::EndDate]) + RIGHT(“0” + (DT_WSTR, 2) MONTH(@[User::EndDate]), 2) + RIGHT(“0″ + (DT_WSTR, 2) DAY(@[User::EndDate]), 2)+”.csv

Now we have the Azure Data Lake Store Destination setup and the data flow is ready to run. We can test the data flow in SSDT. As the sample AdventureWork database does not contain sale order records in the period when the blog post is written. I manually set the StartDate and EndDate variables for a day when there are sale order records in the AdventureWork database for the test purpose.

2

1t1

Now we can see the data flow is working when running on our local machine through SSDT. The next blog post will provision the Azure-SSIS Integration Runtime and deploy and run the SSIS package in the cloud.

Advertisements

End-to-End Azure Data Factory (V1) Pipeline for Star Schema ETL (Part 4)

This is the last part of the blog series demonstrating how to build an end-to-end ADF pipeline for data warehouse ELT.

2

In the previous part we created the U-SQL job that incrementally extracts the machine cycle rows and then stores them into a staging area in the ADLS. In this part of the blog series we will create the LoadFactMachineCycle pipeline that loads the machine cycle rows from the staging area in the ADLS to the [stg].[MachineCycle] staging table in target DW database and then call a stored procedure that looks up the reference id of the dimension tables (DimMachine, DimCustomer, DimDate), create measures and load the transformed machine cycle data into the fact table, FactMachineCycle.

We start from creating this Stored Procedure, namely [stg].[uspLoadFactMachineCycle] for loading machine cycle fact table. This stored procedure joins the [stg].[MachineCycle] table and the  dimension tables to get the id of those tables and calculates the machine cycle duration measure from the cycle StartDateTime column and EndDateTime column.

 

CREATE PROC [stg].[uspLoadFactMachineCycle] AS
BEGIN
   
   INSERT INTO [prod].[FactMachineCycle]
       SELECT S.[CycleId]
          ,CASE 
            WHEN DM.Id IS NULL THEN 0
            ELSE DM.Id
           END AS MachineId
          ,CASE 
            WHEN DC.Id IS NULL THEN 0
            ELSE DC.Id
           END as CustomerId
          ,Year(S.[StartDateTime])*10000+Month(S.[StartDateTime])*100+Day(S.[StartDateTime]) AS DateKey
          ,DATEDIFF(s, S.[StartDateTime], S.[EndDateTime]) AS Duration
          ,GETDATE() AS RowAdded
      FROM [stg].[MachineCycle] S
      LEFT JOIN [prod].[FactMachineCycle] F ON S.CycleId = F.CycleId
      LEFT JOIN [prod].[DimCustomer] DC ON S.CustomerName = DC.Code AND DC.CurrentRow=1 
      LEFT JOIN [prod].[DimMachine] DM ON S.MachineName = DM.Code AND DM.CurrentRow=1
      WHERE F.CycleId IS NULL

END

We then move on to create the ADF pipeline for the machine cycle fact table loading. the following ADF files will be crated (highlighted in yellow colour).

3

The snapshot below shows the pipeline diagram:

1

Firstly we need to create a linked service (AzureDataLakeStoreLinkedService) that points to the ADLS where the staging machine cycle csv files are stored.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
  "name": "AzureDataLakeStoreLinkedService",
  "properties": {
    "type": "AzureDataLakeStore",
    "typeProperties": {
      "dataLakeStoreUri": "adl://{store account name}.azuredatalakestore.net",
      "servicePrincipalId": "{service principal id}",
      "servicePrincipalKey": "{service principal key}",
      "tenant": "{tenant},
      "subscriptionId": "{subscription id}",
      "resourceGroupName": "{resource group name}"
    }
  }
}

Next, We need to create the input and output ADF tables, InputMachineCycle, OutputStgMachineCycle and the OutputFactMachineCycleInputMachineCycle ADF table points to the staging machine cycle csv file, stgMachineCycles.scv, in the ADLS. The columns in the csv file need to be explicitly declared in the structure property. It is important to specify the type of the non-string columns, e.g., CycleId (Int32) and StartDateTime (Datetime), so that those columns can be converted from a string type in the csv file to the correct type in the DW database.

 

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
    "name": "InputMachineCycle",
  "properties": {
    "type": "AzureDataLakeStore",
    "linkedServiceName": "AzureDataLakeStoreLinkedService",
    "structure": [
      { "name": "CycleId","type": "Int32"},
      { "name": "CustomerCode"},
      { "name": "MachineCode"},
      { "name": "StartDateTime", "type": "Datetime" },
      { "name": "EndDateTime", "type": "Datetime" },
      { "name": "EventTime", "type": "Datetime" }
    ],
    "typeProperties": {
      "folderPath": "IoT/Staging",
      "fileName": "stgMachineCycles.csv",
      "partitionedBy": []
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
The OutputStgMachine ADF table points to the [stg].[MachineCycle] staging table in the target DW database.
{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachineCycle",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "CycleId" },
      { "name": "CustomerName" },
      { "name": "MachineName" },
      { "name": "StartDateTime" },
      { "name": "EndDateTime" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[MachineCycle]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
 The OutputFactMachineCycle ADF table points to the FactMachineCycle table in the target DW database.
{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputFactMachineCycle",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[FactMachineCycle]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

After the input and output ADF tables have been created we can start to build the LoadFactMachineCycle pipeline which contains two activities. The first activity, LoadStgMachineCycle, copies machine cycle data from the staging csv file stored in the ADLS to the staging machine cycle table, [stg].[MachineCycle], in the target DB. The TabularTranslator is configured to map the columns from the csv file to the staging database table. The second activity in the pipeline is a SqlServerStoredProcedure activity that calls the stored procedure we created earlier to load the machine cycle data from [stg].[MachineCycle] to the target [prod].[FactMachineCycle] table.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadFactMachineCycle",
  "properties": {
    "description": "Load machine cycles from azure data lake store to staging table, and then call stored procedure to load the fact table",
    "activities": [
      {
        "name": "LoadStgMachineCycle",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachineCycle"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "AzureDataLakeStoreSource"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[MachineCycle]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[MachineCycle]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "CycleId: CycleId, CustomerCode: CustomerName, MachineCode: MachineName, StartDateTime: StartDateTime, EndDateTime:EndDateTime, EventTime:LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },

      {
        "name": "LoadFactMachineCycle",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachineCycle"
          }
        ],
        "outputs": [
          {
            "name": "OutputFactMachineCycle"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadFactMachineCycle",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

The snapshot below shows the [prod].[FactMachineCycle] table after the machine cycle data has been load by the ADF pipeline.

2

Up to this point, we have completed the end-to-end ADF pipeline that extracts data from Azure SQL DB and ADLS, and load to type 2 SCD dimension tables and fact table in a incremental loading mode.

1

The snapshot below shows all the visual studio files we have created for building the pipeline.

End-to-End Azure Data Factory (V1) Pipeline for Star Schema ETL (Part 3)

This is the third part of the blog series to demonstrate how to build an end-to-end ADF pipeline for data warehouse ELT. The part will describe how to build an ADLA U-SQL job for incremental extraction of machine cycle data from Azure Data Lake store and go through the steps for scheduling and triggering the U-SQL job using ADF.

In the first part of the blog series we have created a Azure Data Lake store directory to store the machine cycle data in csv format.

3

We are going to create a U-SQL job to query the machine cycle records based on the SliceStart datatime and the SliceEnd datatime. The ADF pipeline will pass the start time and the end time of current ADF ingestion slice into the U-SQL job, and the U-SQL job will only returns the machine cycle ended within this period. This example focuses on the same-day machine cycle scenario and does not cover the across-day machine cycle scenario. There is another blog post I have written specifically for the across-day scenario here.

Instead of creating an U-SQL script file to host the code of the U-SQL job, we prefer to create an ADLA catalog stored procedure to simplify the scripts organisation and deploying. Two parameters are defined in the stored procedure that take the SliceStart datatime and SliceEnd datatime from ADF. The year, month, day parts will be extracted from the SliceStart datatime for building the folder path that points to the location where the machine cycle csv files are stored.

The U-SQL script will then extract the rows from the csv files and SELECT the rows within the current ADF execution slice. The selected rows will be output into a staging area in the Azure Data Lake store.

CREATE DATABASE IF NOT EXISTS ADFDW;

DROP PROCEDURE IF EXISTS ADFDW.dbo.uspGetMachineCycles;
CREATE PROCEDURE IF NOT EXISTS ADFDW.dbo.uspGetMachineCycles(@sliceStart DateTime, @sliceEnd DateTime)
AS
BEGIN

    DECLARE @sliceYear string = @sliceStart.Year.ToString();
    DECLARE @sliceMonth string = @sliceStart.Month.ToString().PadLeft(2, '0');
    DECLARE @sliceDay string = @sliceStart.Day.ToString().PadLeft(2, '0');

    DECLARE @InputPath = @"/IoT/Curated%20Zone/Demo/ADFDW/"
                         + @sliceYear + "/" + @sliceMonth + "/" + @sliceDay + "/{*}.csv";

    DECLARE @OutputFile string = "/IoT/Staging/stgMachineCycles.csv";

    @machineCycles =
        EXTRACT CycleId int,
                CustomerCode string,
                MachineCode string,
                StartDateTime DateTime,
                EndDateTime DateTime,
                EventTime DateTime
        FROM @InputPath
        USING Extractors.Csv(skipFirstNRows : 1);

    @result =
        SELECT *
        FROM @machineCycles
        WHERE EndDateTime >= @sliceStart AND EndDateTime < @sliceEnd;


    OUTPUT @result
    TO @OutputFile
    USING Outputters.Csv();

END;

After we created the U-SQL script, we need to create the ADF pipeline to schedule and trigger the U-SQL job. As the machine cycle fact table need to look up for the machine and customer dimension tables. We need to schedule the execution of the U-SQL job immediately after the dimension tables are load:

4

The rest of this blog post will go through the steps to build the IncrementalExtractMachineCycle ADF pipeline for triggering the U-SQL job.

Firstly, we need to create a AAD service principal for ADF and ADLA using to access the csv files stored in the ADLS. You can find the detailed steps to create service principal and to assign ADLS access in Microsoft official documents (don’t forget to turn the allowing service access option on in the ADLS firewall settings).

With the service principal we can create the AzureDataLakeAnalyticsLinkedService file in ADF that configure the link to the ADLA service where the U-SQL job will be executed.

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "AzureDataLakeAnalyticsLinkedService",
    "properties": {
        "type": "AzureDataLakeAnalytics",
        "typeProperties": {
          "accountName": "ninjago0786demp",
          "subscriptionId": "{subscription id}",
          "resourceGroupName": "{resource group}",
          "dataLakeAnalyticsUri": "azuredatalakeanalytics.net",
          "servicePrincipalId": "{serivce principal id}",
          "servicePrincipalKey": "{service principal key}",
          "tenant": "tenant"
      }
    }
}

We can then create the IncrementalExtractMachineCycle pipeline linked to the AzureDataLakeAnalyticsLinkedService and create a DataLakeAnalyticsU-SQL type activity to trigger the U-SQL job. In the activity, we set the “script” property as the commend to call the ADFDW.dbo.uspGetMachineCycles we created earlier and pass the SliceStart and SliceEnd as parameters.

As the U-SQL job is expected to run after both the DimMachine and DimCustomer tables are loaded, we set both OutputDimCustomer and OutputDimMachine ADF tables (the output of the LoadDimCustomer pipeline and the LoadDimMachine pipeline)  as the inputs of the DataLakeAnalyticsU-SQL activity. For the output of the activity, we set it as the InputMachineCycle ADF table which is the input for the pipeline to load the machine cycle data from ADLS staging area to the target data warehouse. The implementation of  inputMachineCycle ADF table will be covered in the next part of the blog series.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "IncrementalExtractMachineCycle",
  "properties": {
    "description": "Extract machine cycles within current time slice",
    "activities": [
      {
        "name": "ExtractMachineCycle",
        "linkedServiceName": "AzureDataLakeAnalyticsLinkedService",
        "type": "DataLakeAnalyticsU-SQL",
        "typeProperties": {
          "script": "ADFDW.dbo.uspGetMachineCycles(System.DateTime.Parse(@sliceStart), System.DateTime.Parse(@sliceEnd));",
          "parameters": {
            "sliceStart": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceStart)",
            "sliceEnd": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceEnd)"
          }
        },
        "inputs": [
          {
            "name": "OutputDimCustomer"
          },
          {
            "name": "OutputDimMachine"
          }
        ],
        "outputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have the U-SQL job created and scheduled that will extract the machine cycle rows within the current ADF execution slice and store them into the staging area in the ADLS. The next part of the blog series will create the FactMachineCycle loading pipeline to load the machine cycle rows in csv file format from ADLS to the staging table in the target DW and then create a store procedure to transform and load the data into the machine cycle fact table.

End-to-End Azure Data Factory (V1) Pipeline for Star Schema ETL (Part 2)

This is the second part of the blog series to demonstrate how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and loading to a star-schema data warehouse database with considerations on  SCD (slow changing dimensions) and incremental loading.

The first part of the blog series describes the example used by the demonstration and setup the required Azure SQL DB/Azure Data Lake directory and the sample data. In the second part of the blog series we are going to build the ADF pipeline for the incremental loading of customer and machine related data from the reference database to the staging database and then transform them to the type 2 SCD tables.

In this demonstration we are going to create two Dimension tables, DimMachine and DimCustomer (the DimDate has been created in Part 1 of the blog series). We start from building the ADF pipeline for DimMachine. Firstly we need to create the stored procedure for loading the type 2 SCD DimMachine table from the staging table ([stg].[Machine). The stored procedure will be called by the ADF pipeline after the staging table is loaded from the reference database. In the Visual Studio DW project we have created in the the first part of the blog series, we add a ETL folder and create a sql file naming: uspLoadDimMachine.sql.

2

In this file we create the Stored Procedure [stg].[uspLoadDimMachine] with the use of  SQL Merge statement to implement the type 2 SCD for DimMachine table.

CREATE PROC [stg].[uspLoadDimMachine] AS
BEGIN
    INSERT INTO [prod].[DimMachine]
        ([Code]
        ,[Name]
        ,[Condition]
        ,[CurrentRow]
        ,[ValidTo]
        ,[LastModified]
        )
    SELECT
         M.[Code]
        ,M.[Name]
        ,M.[Condition]
        ,1
        ,'9999-12-31'
        , GETDATE()
    FROM (
        MERGE [prod].[DimMachine] AS [Target]
            USING [stg].[Machine] AS [Source] ON Target.Code = Source.Code
        WHEN MATCHED AND Target.[Condition]  Source.[Condition]
             THEN UPDATE SET
                [CurrentRow] = 0
               ,[LastModified] = GETDATE()
               ,[ValidTo] = GETDATE()
        WHEN NOT MATCHED BY TARGET
             THEN INSERT (
                 [Code]
                ,[Name]
                ,[Condition]
                ,[CurrentRow]
                ,[ValidTo]
                ,[LastModified]
               ) VALUES (
                 Source.Code
                ,Source.Name
                ,Source.Condition
                ,1
                ,'9999-12-31'
                ,GETDATE()
               )
         OUTPUT $action AS Action, [Source].*
        ) AS M
        WHERE M.Action = 'UPDATE' AND M.[Code]  IS NOT NULL;
END

After the stored procedure is ready we can start to build the ADF pipeline for machine dimension loading. The snapshot below shows the complete pipeline diagram for DimMachine data loading.

1

We will author the ADF components in Visual Studio. The snapshot below show the files we need to create for DimMachine data loading (highlighted in yellow colour):

1

ReferenceDBLinkedService and DWLinkedService configure the links to the Azure SQL reference database and the target data warehouse.

ReferenceDBLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "ReferenceDBLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

DWLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "DWLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

InputMachine table is an AzureSQLTable type table that points to the [dbo].[Machine] table in our ReferenceDB Azure SQL database. The columns used to mapped to the columns in the output data table, i.e. [stg].[Machine] is declared in the structure attribute. The input table is set to be available every one hour which means the table will be refreshed from the [dbo].[Machine] table every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "InputMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "ReferenceDBLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[dbo].[Machine]"
    },
    "external": true,
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputStgMachine ADF table points to the [stg].[Machine] table in our target DW Azure SQL database. The output columns mapped to the input columns (the input columns were defined in the InputMachine table earlier) is also declared in the structure attribute. The output table is set to be available every one hour that means the Copy Activity for moving data from source Machine table to the staging Machine table will be triggered and output the results to the output data every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[Machine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputDimMachine ADF table points to the [prod].[DimMachine] table in our target DW Azure SQL database.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputDimMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[DimMachine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

LoadDimMachine pipleline contains two activities. The first activity LoadStgMachine is a Copy Activity which copies data from the reference machine table to the staging machine table. The data queried from the reference machine table is filtered by the SliceStart datatime and the SliceEnd datatime that only returns the machine recorded updated during that period. TabularTranslator is configured to map the columns from reference machine table to the staging table.

The second activity is a SqlServerStoredProcedure activity which calls the stored procedure we have created earlier to implementing type 2 SCD for DimMachine.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadDimMachine",
  "properties": {
    "description": "Load Machine table to staging table, and then call stored procedure to load to dimention table with SCD 2 ",
    "activities": [
      {
        "name": "LoadStgMachine",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            "sqlReaderQuery": "$$Text.Format('SELECT [Code],[Name],[Condition],[LastModified] FROM [dbo].[Machine] WHERE [LastModified] >= \\'{0:yyyy-MM-dd HH:mm}\\' AND [LastModified] < \\'{1:yyyy-MM-dd HH:mm}\\'', SliceStart, SliceEnd)"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[Machine]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[Machine]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "Code: Code, Name: Name, Condition: Condition, LastModified: LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },
      {
        "name": "LoadDimMachine",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputDimMachine"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadDimMachine",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }

    ],
    "start": "2017-11-10T21:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have done our pipeline for DimMachine table loading. We can publish the pipeline to our Azure tenant:

1

The periodically execution of the pipeline will provide us the incremental loading of the type 2 SCD DimMachine table.

2

Next we can move on to create the LoadDimCustomer pipeline that takes same steps as the steps described above for creating LoadDimMachine pipeline

3

In the next part of the blog series, I am going to cover the steps for building ADLA U-SQL job for incremental extraction of machine cycle data and also how to schedule and trigger the U-SQL job from ADF.

End-to-End Azure Data Factory (V1) Pipeline for Star Schema ETL (Part 1)

This blog series demonstrates how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and load to a star-schema data warehouse database with considerations of  SCD (slow changing dimensions) and incremental loading.

The final pipeline will look as:

1

The machine cycle records will be load from the csv files stored in a Azure Data Lake store, and the reference data of customers and machines will be load form the Reference DB (Azure SQL DB). The ADF pipeline will first load the data into the staging tables in the target DW, and the ADF pipeline will then execute the SQL stored procedures to transform the data into star schema and implement the type 2 SCD.

A number of services within the Azure Cortana Intelligence Suite will be employed to build the end-to-end pipeline, including Azure Data Factory, Azure Data Lake Store, Azure Data Lake Analytics (U-SQL), Azure SQL Database, and the development tools (Visual Studio + SSDT). Four Visual Studio projects will be created, including the ReferenceDB for provisioning the Azure SQL reference database, the DW for provisioning the target Data Wareshourse, the ADLA for incrementally extracting the machine cycle data, and the ADF for the build of the pipeline. You can download the source code here.

This blog post covers the first part of the series to give a brief introduction of the example used in this demonstration and provides the steps for preparing the required demo databases/data lake directory and the sample data.

Preparation

1.Create Visual Studio solution

Create a Visual Studio solution with the SSDT tool to host all the projects required in this demonstration, including the DB projects for ReferenceDB and the target DW, and the ADF project.

2. Create ReferenceDB project

1

Create ReferenceDB project and add the scripts for creating Customer table and Machine table.

Customer table:

CREATE TABLE [dbo].[Customer]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Machine table:

CREATE TABLE [dbo].[Machine]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Create a post deployment script to add the sample data:

INSERT INTO [dbo].[Customer]
           ([Code]
           ,[Name]
           ,[LastModified])
     VALUES
           ('C0001','Customer1', '2017-11-08 21:21:30'),
           ('C0002','Customer2', '2017-11-08 21:21:30')

INSERT INTO [dbo].[Machine]
           ([Code]
           ,[Name]
           ,[Condition]
           ,[LastModified])
     VALUES
           ('M0001','Machine1', 'Perfect', '2017-11-08 21:21:30'),
           ('M0002','Machine2', 'Good', '2017-11-08 21:21:30')

Publish the project to the Azure tenant to provision the Azure SQL DB.

3. Create the target DW project

2

Create the target DW project and add the scripts for schema, staging tables, dimension tables and fact tables:

Schema – prod:

CREATE SCHEMA [prod]

Schema – stg:

CREATE SCHEMA [stg]

Staging – Customer:

CREATE TABLE [stg].[Customer]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – Machine:

CREATE TABLE [stg].[Machine]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – MachineCycle:

CREATE TABLE [stg].[MachineCycle]
(
    [CycleId] int NOT NULL,
    [MachineName] varchar(50) NOT NULL,
    [CustomerName] varchar(50) NOT NULL,
    [StartDateTime] datetime2 NOT NULL,
    [EndDateTime] datetime2 NOT NULL,
        [LastModified] DATETIME2 NOT NULL
)

Dimension – DimCustomer

CREATE TABLE [prod].[DimCustomer]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY, 
    [Name] VARCHAR(50) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Dimension – DimMachine

CREATE TABLE [prod].[DimMachine]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Facts – FactMachineCycle

CREATE TABLE [prod].[FactMachineCycle]
(
    [CycleId] int NOT NULL PRIMARY KEY,
    [MachineId] int NOT NULL,
    [CustomerId] int NOT NULL,
    [DateKey] int NOT NULL,
    [Duration] float NOT NULL,
        [RowAdded] DATETIME2 NOT NULL
)

In addition to the tables above we also need a Date Dimension table and the scripts to generate the table. There are plenty of scripts you can find from Google, e.g., https://www.codeproject.com/Articles/647950/Create-and-Populate-Date-Dimension-for-Data-Wareho. The generated DimDate table should look like:

6

Finally, we need to publish the project to Azure and will get the DB deployed.

7

4. Create Azure Data Lake directory

Create a directory in Azure Data Lake, e.g., “ADFDW”, and organise the sub-directory hierarchy as year -> month -> day.

3

The sample csv data includes the columns of cycleID, CustomerCode, MachineCode, StartDateTime, EndDateTime, and EventTime.

3

Now we have done our preparation work, the next blog post will cover the steps to build ADF pipelines for dimensional tables ETL.