Tag: Azure Data Factory

SSIS in Azure #3 – Schedule and Monitor SSIS Package Execution using ADF V2

*The source code created for this blog post can be found here.

In the previous blog posts in the SSIS in Azure series, we created a SSIS package to periodically ingests data from Azure SQL database to Azure Data Lake Store and deployed the package in the Azure-SSIS Integrated Runtime. Up to this point, we have achieved two goals in the SSIS in Azure series:

  • Using SSIS to move data between cloud storages
  • Host and run SSIS packages in cloud

The last goal we need to achieve is to schedule the execution of the SSIS package in cloud. Traditionally, we can schedule the SSIS package execution by creating a SQL Server Agent job. However, we can only do that through an on-premises SQL Server instance or provision a SQL Server vm in Azure. Thanks to the support of Stored Procedure activity in ADF, we can now schedule the SSIS package execution using a cloud-based ADF pipeline. This blog post will walk through the steps to achieve that.

You can find the source code created for this blog post in my Github. The ningago.demo.adf.ssis.adfv2 project contains the json and powershell files to create the ADF pipeline.

4.PNG

 

The key to this solution is to call the sp_executesql stored procedure in the RunSSISPackagePipeline pipeline that execute the sql script for triggering the SSIS package execution. We can define a Stored Procedure activity in the pipeline to call the sp_executesql stored procedure and pass in the sql script for triggering the SSIS package execution as parameter.

5

We don’t have to manually author the sql script from scratch, but instead we can generate the script using SSMS. First, we connect to the SSISDB catalog in our Azure-SSIS integrated runtime (please refer to SSIS in Azure #2 for how to do that), select the SSIS project or package we can execute, and click the Execute button to open the Execute Package dialog window.

6

On the Execute Package dialog window, we select the package to execute and also set a value (can be any value) for the package parameter (the reason is set any value to the parameter is to ensure the parameter setting statement will be generated in the sql script).

2

We then click the “Script” button on the top-left that will generate the sql script for executing the selected SSIS package.

3

We can then copy the script into the stored procedure activity defined in the RunSSISPackagePipeline pipeline.

5

As you may have noticed that the SSIS package parameter, DateScliceToLoad, has been manually set by us with a random value. The next step we need to take is to pass the date of pipeline run into the sql script in order to make the SSIS package only move the data in the give day. Firstly, we need to define the “scheduledRunTime” parameter in the RunSSISPipeline which will receive the scheduled pipeline run time from the pipeline trigger we will create later.

6.PNG

In the sql script we just generated to trigger SSIS package execution, we replace the random date we have manually set with the ADF expression which points to the scheduleRunTime parameter in the pipeline.

7.PNG

Next, we will need to create the trigger json file to schedule the execution of the pipeline. After we specify the pipeline to execute in the trigger file, we need to pass the scheduledTime of the trigger which is a system variable to the scheduledRunTime parameter defined in the pipeline.

{
  "properties": {
    "name": "RunSSISPackageScheduler",
    "type": "ScheduleTrigger",
    "typeProperties": {
      "recurrence": {
        "frequency": "Hour",
        "interval": 1,
        "startTime": "2018-01-07T00:00:00-22:00"
      }
    },
    "pipelines": [
      {
        "pipelineReference": {
          "type": "PipelineReference",
          "referenceName": "RunSSISPackagePipeline"
        },
        "parameters": {
          "scheduledRunTime": "@trigger().scheduledTime"
        }
      }
    ]
  }
}

After we have created the json files to define linked service, pipeline and the trigger, we need to deploy them into our data factory instance. As the V2 version of ADF does not support deployment through UI yet, we need to create a deployment scripts using PowerShell.

$DataFactoryName = "ninjago3843adf"
$ResourceGroupName = "SSIS"

Set-AzureRmDataFactoryV2LinkedService -DataFactoryName $DataFactoryName -ResourceGroupName $ResourceGroupName -Name "SSISDBLinkedService" -File ".\SSISDBLinkedService.json"
Set-AzureRmDataFactoryV2Pipeline -DataFactoryName $DataFactoryName -ResourceGroupName $ResourceGroupName -Name "RunSSISPackagePipeline" -DefinitionFile ".\RunSSISPackagePipeline.json"

Stop-AzureRmDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "RunSSISPackageScheduler"
Set-AzureRmDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "RunSSISPackageScheduler" -DefinitionFile ".\RunSSISPackageScheduler.json"
Start-AzureRmDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "RunSSISPackageScheduler"

After the pipeline is deployed and scheduled by the trigger, the pipeline will execute our SSIS package based on the schedule.

8.PNG

 

Advertisements

SSIS in Azure #2 – Deploy SSIS Packages to Azure-SSIS Integration Runtime in ADF V2

In the first blog post of the SSIS in Azure series, I gave a demonstration on how to create SSIS packages to move data in cloud, using a common use case that periodically ingests data from Azure SQL database to Azure Data Lake Store.  In the pre-ADF V2 era, we can only deploy SSIS packages in on-premises SQL Servers or SQL Servers in Azure VM. Thanks to the Azure-SSIS integration runtime that is available for public preview in Azure Data Factory V2, we can now directly deploy and execute our SSIS packages in Azure without needing to provision and manage an Azure SQL Server VM (One thing to note, under the hood, Azure-SSIS integrated runtime itself is a cluster of Azure VMs that are dedicated to run SSIS packages).

This blog post will walk through the steps to deploy, execute and monitor the SSIS package we have created in previous blog post in the Azure-SSIS integration runtime.

First of all, we need to have an Azure-SSIS integration runtime created in our Azure tenant. This can be achieved using the PowerShell scripts provided by Microsoft.

I have met an issue when running the scripts. Here is the hint in case you run into the same issue. If the Set-AzureRmDataFactoryV2 and the Set-AzureRmDataFactoryV2IntegrationRuntime cmdlet in the PowerShell scripts fail to execute and raise the “HTTP Status Code: NotFound” error even after you have installed the Azure PowerShell modules (we can see the AzureRM.DataFactoryV2 module is installed after checking the C:\Program Files\WindowsPowerShell\Modules folder), it may be caused by the incompatible versions between AzureRM modules. As the “Install-Module AzureRM -AllowClobber” cmdlet only installs the Azure modules that are not already installed in your computer, the Azure modules on which the AzureRM.DataFactoryV2 module is dependent are not updated. The simplest solution to this issue is to remove all AzureRM modules in the C:\Program Files\WindowsPowerShell\Modules folder and then run the Install-Module AzureRM -AllowClobber cmdlet.

After the PowerShell scripts is successfully run, an Azure Data Factory V2 instance and a SSISDB Azure SQL database are created in the resource group we specified in the scripts.

2

The SSISDB is the Azure-hosted SSIS catalog where you can deploy your SSIS packaged into. We can use SSMS to connect to the SSISDB. On the Connect to Server dialogue in the SSMS, we need to specify the Server name as the Azure SQL server where the SSISDB SQL database is hosted.

3

After we specified the server name and the login details, don’t click the Connect button to connect to the SQL Server, but instead click the Options button to open the Connection Properties tab. On the Connection Properties tab, set “Connect to database” field as SSISDB database. This settings will allow you to connect to the SSISDB SSIS catalog after clicked the Connect button.

4

5

We can deploy our SSIS package into the SSISdB catalog either by using Deploy Project option in SSMS

6

or using Deploy option in you SSIS SSDT project in Visual Studio.

7

Both options will launch the Integration Services Deployment Wizard dialog to guide us through the steps for the SSIS package deployment.

8

13

We can execute the deployed SSIS packages in SSMS by right-clicking a SSIS project or SSIS package to launch the Execute Package dialog. The SSIS package we have created to periodically load data from Azure SQL database to Azure Data Lake requires a DateSclieToLoad parameter passed in that specifies which day of data to move. We can manually set the value of this parameter in the Execute Package dialog.

9

We can also monitor the SSIS package executions in SSMS through the SSIS catalog reports.

14.PNG

1112

Up to this point, we have created the Azure-SSIS Integration Runtime and deployed our SSIS package into the runtime. We can execute the package and monitor the execution results manually in SSMS. In next blog post, I will walk through the steps to schedule the SSIS package execution using an ADF V2 pipeline.  The ADF V2 pipeline will be scheduled to execute every day. The date of the current pipeline execution will be passed into the SSIS package, and the SSIS package will move the data by the given date parameter.

 

 

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 4)

This is the last part of the blog series demonstrating how to build an end-to-end ADF pipeline for data warehouse ELT.

2

In the previous part we created the U-SQL job that incrementally extracts the machine cycle rows and then stores them into a staging area in the ADLS. In this part of the blog series we will create the LoadFactMachineCycle pipeline that loads the machine cycle rows from the staging area in the ADLS to the [stg].[MachineCycle] staging table in target DW database and then call a stored procedure that looks up the reference id of the dimension tables (DimMachine, DimCustomer, DimDate), create measures and load the transformed machine cycle data into the fact table, FactMachineCycle.

We start from creating this Stored Procedure, namely [stg].[uspLoadFactMachineCycle] for loading machine cycle fact table. This stored procedure joins the [stg].[MachineCycle] table and the  dimension tables to get the id of those tables and calculates the machine cycle duration measure from the cycle StartDateTime column and EndDateTime column.

 

CREATE PROC [stg].[uspLoadFactMachineCycle] AS
BEGIN
   
   INSERT INTO [prod].[FactMachineCycle]
       SELECT S.[CycleId]
          ,CASE 
            WHEN DM.Id IS NULL THEN 0
            ELSE DM.Id
           END AS MachineId
          ,CASE 
            WHEN DC.Id IS NULL THEN 0
            ELSE DC.Id
           END as CustomerId
          ,Year(S.[StartDateTime])*10000+Month(S.[StartDateTime])*100+Day(S.[StartDateTime]) AS DateKey
          ,DATEDIFF(s, S.[StartDateTime], S.[EndDateTime]) AS Duration
          ,GETDATE() AS RowAdded
      FROM [stg].[MachineCycle] S
      LEFT JOIN [prod].[FactMachineCycle] F ON S.CycleId = F.CycleId
      LEFT JOIN [prod].[DimCustomer] DC ON S.CustomerName = DC.Code AND DC.CurrentRow=1 
      LEFT JOIN [prod].[DimMachine] DM ON S.MachineName = DM.Code AND DM.CurrentRow=1
      WHERE F.CycleId IS NULL

END

We then move on to create the ADF pipeline for the machine cycle fact table loading. the following ADF files will be crated (highlighted in yellow colour).

3

The snapshot below shows the pipeline diagram:

1

Firstly we need to create a linked service (AzureDataLakeStoreLinkedService) that points to the ADLS where the staging machine cycle csv files are stored.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
  "name": "AzureDataLakeStoreLinkedService",
  "properties": {
    "type": "AzureDataLakeStore",
    "typeProperties": {
      "dataLakeStoreUri": "adl://{store account name}.azuredatalakestore.net",
      "servicePrincipalId": "{service principal id}",
      "servicePrincipalKey": "{service principal key}",
      "tenant": "{tenant},
      "subscriptionId": "{subscription id}",
      "resourceGroupName": "{resource group name}"
    }
  }
}

Next, We need to create the input and output ADF tables, InputMachineCycle, OutputStgMachineCycle and the OutputFactMachineCycleInputMachineCycle ADF table points to the staging machine cycle csv file, stgMachineCycles.scv, in the ADLS. The columns in the csv file need to be explicitly declared in the structure property. It is important to specify the type of the non-string columns, e.g., CycleId (Int32) and StartDateTime (Datetime), so that those columns can be converted from a string type in the csv file to the correct type in the DW database.

 

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
    "name": "InputMachineCycle",
  "properties": {
    "type": "AzureDataLakeStore",
    "linkedServiceName": "AzureDataLakeStoreLinkedService",
    "structure": [
      { "name": "CycleId","type": "Int32"},
      { "name": "CustomerCode"},
      { "name": "MachineCode"},
      { "name": "StartDateTime", "type": "Datetime" },
      { "name": "EndDateTime", "type": "Datetime" },
      { "name": "EventTime", "type": "Datetime" }
    ],
    "typeProperties": {
      "folderPath": "IoT/Staging",
      "fileName": "stgMachineCycles.csv",
      "partitionedBy": []
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
The OutputStgMachine ADF table points to the [stg].[MachineCycle] staging table in the target DW database.
{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachineCycle",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "CycleId" },
      { "name": "CustomerName" },
      { "name": "MachineName" },
      { "name": "StartDateTime" },
      { "name": "EndDateTime" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[MachineCycle]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
 The OutputFactMachineCycle ADF table points to the FactMachineCycle table in the target DW database.
{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputFactMachineCycle",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[FactMachineCycle]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

After the input and output ADF tables have been created we can start to build the LoadFactMachineCycle pipeline which contains two activities. The first activity, LoadStgMachineCycle, copies machine cycle data from the staging csv file stored in the ADLS to the staging machine cycle table, [stg].[MachineCycle], in the target DB. The TabularTranslator is configured to map the columns from the csv file to the staging database table. The second activity in the pipeline is a SqlServerStoredProcedure activity that calls the stored procedure we created earlier to load the machine cycle data from [stg].[MachineCycle] to the target [prod].[FactMachineCycle] table.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadFactMachineCycle",
  "properties": {
    "description": "Load machine cycles from azure data lake store to staging table, and then call stored procedure to load the fact table",
    "activities": [
      {
        "name": "LoadStgMachineCycle",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachineCycle"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "AzureDataLakeStoreSource"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[MachineCycle]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[MachineCycle]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "CycleId: CycleId, CustomerCode: CustomerName, MachineCode: MachineName, StartDateTime: StartDateTime, EndDateTime:EndDateTime, EventTime:LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },

      {
        "name": "LoadFactMachineCycle",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachineCycle"
          }
        ],
        "outputs": [
          {
            "name": "OutputFactMachineCycle"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadFactMachineCycle",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

The snapshot below shows the [prod].[FactMachineCycle] table after the machine cycle data has been load by the ADF pipeline.

2

Up to this point, we have completed the end-to-end ADF pipeline that extracts data from Azure SQL DB and ADLS, and load to type 2 SCD dimension tables and fact table in a incremental loading mode.

1

The snapshot below shows all the visual studio files we have created for building the pipeline.

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 3)

This is the third part of the blog series to demonstrate how to build an end-to-end ADF pipeline for data warehouse ELT. The part will describe how to build an ADLA U-SQL job for incremental extraction of machine cycle data from Azure Data Lake store and go through the steps for scheduling and triggering the U-SQL job using ADF.

In the first part of the blog series we have created a Azure Data Lake store directory to store the machine cycle data in csv format.

3

We are going to create a U-SQL job to query the machine cycle records based on the SliceStart datatime and the SliceEnd datatime. The ADF pipeline will pass the start time and the end time of current ADF ingestion slice into the U-SQL job, and the U-SQL job will only returns the machine cycle ended within this period. This example focuses on the same-day machine cycle scenario and does not cover the across-day machine cycle scenario. There is another blog post I have written specifically for the across-day scenario here.

Instead of creating an U-SQL script file to host the code of the U-SQL job, we prefer to create an ADLA catalog stored procedure to simplify the scripts organisation and deploying. Two parameters are defined in the stored procedure that take the SliceStart datatime and SliceEnd datatime from ADF. The year, month, day parts will be extracted from the SliceStart datatime for building the folder path that points to the location where the machine cycle csv files are stored.

The U-SQL script will then extract the rows from the csv files and SELECT the rows within the current ADF execution slice. The selected rows will be output into a staging area in the Azure Data Lake store.

CREATE DATABASE IF NOT EXISTS ADFDW;

DROP PROCEDURE IF EXISTS ADFDW.dbo.uspGetMachineCycles;
CREATE PROCEDURE IF NOT EXISTS ADFDW.dbo.uspGetMachineCycles(@sliceStart DateTime, @sliceEnd DateTime)
AS
BEGIN

    DECLARE @sliceYear string = @sliceStart.Year.ToString();
    DECLARE @sliceMonth string = @sliceStart.Month.ToString().PadLeft(2, '0');
    DECLARE @sliceDay string = @sliceStart.Day.ToString().PadLeft(2, '0');

    DECLARE @InputPath = @"/IoT/Curated%20Zone/Demo/ADFDW/"
                         + @sliceYear + "/" + @sliceMonth + "/" + @sliceDay + "/{*}.csv";

    DECLARE @OutputFile string = "/IoT/Staging/stgMachineCycles.csv";

    @machineCycles =
        EXTRACT CycleId int,
                CustomerCode string,
                MachineCode string,
                StartDateTime DateTime,
                EndDateTime DateTime,
                EventTime DateTime
        FROM @InputPath
        USING Extractors.Csv(skipFirstNRows : 1);

    @result =
        SELECT *
        FROM @machineCycles
        WHERE EndDateTime >= @sliceStart AND EndDateTime < @sliceEnd;


    OUTPUT @result
    TO @OutputFile
    USING Outputters.Csv();

END;

After we created the U-SQL script, we need to create the ADF pipeline to schedule and trigger the U-SQL job. As the machine cycle fact table need to look up for the machine and customer dimension tables. We need to schedule the execution of the U-SQL job immediately after the dimension tables are load:

4

The rest of this blog post will go through the steps to build the IncrementalExtractMachineCycle ADF pipeline for triggering the U-SQL job.

Firstly, we need to create a AAD service principal for ADF and ADLA using to access the csv files stored in the ADLS. You can find the detailed steps to create service principal and to assign ADLS access in Microsoft official documents (don’t forget to turn the allowing service access option on in the ADLS firewall settings).

With the service principal we can create the AzureDataLakeAnalyticsLinkedService file in ADF that configure the link to the ADLA service where the U-SQL job will be executed.

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "AzureDataLakeAnalyticsLinkedService",
    "properties": {
        "type": "AzureDataLakeAnalytics",
        "typeProperties": {
          "accountName": "ninjago0786demp",
          "subscriptionId": "{subscription id}",
          "resourceGroupName": "{resource group}",
          "dataLakeAnalyticsUri": "azuredatalakeanalytics.net",
          "servicePrincipalId": "{serivce principal id}",
          "servicePrincipalKey": "{service principal key}",
          "tenant": "tenant"
      }
    }
}

We can then create the IncrementalExtractMachineCycle pipeline linked to the AzureDataLakeAnalyticsLinkedService and create a DataLakeAnalyticsU-SQL type activity to trigger the U-SQL job. In the activity, we set the “script” property as the commend to call the ADFDW.dbo.uspGetMachineCycles we created earlier and pass the SliceStart and SliceEnd as parameters.

As the U-SQL job is expected to run after both the DimMachine and DimCustomer tables are loaded, we set both OutputDimCustomer and OutputDimMachine ADF tables (the output of the LoadDimCustomer pipeline and the LoadDimMachine pipeline)  as the inputs of the DataLakeAnalyticsU-SQL activity. For the output of the activity, we set it as the InputMachineCycle ADF table which is the input for the pipeline to load the machine cycle data from ADLS staging area to the target data warehouse. The implementation of  inputMachineCycle ADF table will be covered in the next part of the blog series.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "IncrementalExtractMachineCycle",
  "properties": {
    "description": "Extract machine cycles within current time slice",
    "activities": [
      {
        "name": "ExtractMachineCycle",
        "linkedServiceName": "AzureDataLakeAnalyticsLinkedService",
        "type": "DataLakeAnalyticsU-SQL",
        "typeProperties": {
          "script": "ADFDW.dbo.uspGetMachineCycles(System.DateTime.Parse(@sliceStart), System.DateTime.Parse(@sliceEnd));",
          "parameters": {
            "sliceStart": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceStart)",
            "sliceEnd": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceEnd)"
          }
        },
        "inputs": [
          {
            "name": "OutputDimCustomer"
          },
          {
            "name": "OutputDimMachine"
          }
        ],
        "outputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have the U-SQL job created and scheduled that will extract the machine cycle rows within the current ADF execution slice and store them into the staging area in the ADLS. The next part of the blog series will create the FactMachineCycle loading pipeline to load the machine cycle rows in csv file format from ADLS to the staging table in the target DW and then create a store procedure to transform and load the data into the machine cycle fact table.

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 2)

This is the second part of the blog series to demonstrate how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and loading to a star-schema data warehouse database with considerations on  SCD (slow changing dimensions) and incremental loading.

The first part of the blog series describes the example used by the demonstration and setup the required Azure SQL DB/Azure Data Lake directory and the sample data. In the second part of the blog series we are going to build the ADF pipeline for the incremental loading of customer and machine related data from the reference database to the staging database and then transform them to the type 2 SCD tables.

In this demonstration we are going to create two Dimension tables, DimMachine and DimCustomer (the DimDate has been created in Part 1 of the blog series). We start from building the ADF pipeline for DimMachine. Firstly we need to create the stored procedure for loading the type 2 SCD DimMachine table from the staging table ([stg].[Machine). The stored procedure will be called by the ADF pipeline after the staging table is loaded from the reference database. In the Visual Studio DW project we have created in the the first part of the blog series, we add a ETL folder and create a sql file naming: uspLoadDimMachine.sql.

2

In this file we create the Stored Procedure [stg].[uspLoadDimMachine] with the use of  SQL Merge statement to implement the type 2 SCD for DimMachine table.

CREATE PROC [stg].[uspLoadDimMachine] AS
BEGIN
    INSERT INTO [prod].[DimMachine]
        ([Code]
        ,[Name]
        ,[Condition]
        ,[CurrentRow]
        ,[ValidTo]
        ,[LastModified]
        )
    SELECT
         M.[Code]
        ,M.[Name]
        ,M.[Condition]
        ,1
        ,'9999-12-31'
        , GETDATE()
    FROM (
        MERGE [prod].[DimMachine] AS [Target]
            USING [stg].[Machine] AS [Source] ON Target.Code = Source.Code
        WHEN MATCHED AND Target.[Condition]  Source.[Condition]
             THEN UPDATE SET
                [CurrentRow] = 0
               ,[LastModified] = GETDATE()
               ,[ValidTo] = GETDATE()
        WHEN NOT MATCHED BY TARGET
             THEN INSERT (
                 [Code]
                ,[Name]
                ,[Condition]
                ,[CurrentRow]
                ,[ValidTo]
                ,[LastModified]
               ) VALUES (
                 Source.Code
                ,Source.Name
                ,Source.Condition
                ,1
                ,'9999-12-31'
                ,GETDATE()
               )
         OUTPUT $action AS Action, [Source].*
        ) AS M
        WHERE M.Action = 'UPDATE' AND M.[Code]  IS NOT NULL;
END

After the stored procedure is ready we can start to build the ADF pipeline for machine dimension loading. The snapshot below shows the complete pipeline diagram for DimMachine data loading.

1

We will author the ADF components in Visual Studio. The snapshot below show the files we need to create for DimMachine data loading (highlighted in yellow colour):

1

ReferenceDBLinkedService and DWLinkedService configure the links to the Azure SQL reference database and the target data warehouse.

ReferenceDBLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "ReferenceDBLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

DWLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "DWLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

InputMachine table is an AzureSQLTable type table that points to the [dbo].[Machine] table in our ReferenceDB Azure SQL database. The columns used to mapped to the columns in the output data table, i.e. [stg].[Machine] is declared in the structure attribute. The input table is set to be available every one hour which means the table will be refreshed from the [dbo].[Machine] table every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "InputMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "ReferenceDBLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[dbo].[Machine]"
    },
    "external": true,
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputStgMachine ADF table points to the [stg].[Machine] table in our target DW Azure SQL database. The output columns mapped to the input columns (the input columns were defined in the InputMachine table earlier) is also declared in the structure attribute. The output table is set to be available every one hour that means the Copy Activity for moving data from source Machine table to the staging Machine table will be triggered and output the results to the output data every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[Machine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputDimMachine ADF table points to the [prod].[DimMachine] table in our target DW Azure SQL database.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputDimMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[DimMachine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

LoadDimMachine pipleline contains two activities. The first activity LoadStgMachine is a Copy Activity which copies data from the reference machine table to the staging machine table. The data queried from the reference machine table is filtered by the SliceStart datatime and the SliceEnd datatime that only returns the machine recorded updated during that period. TabularTranslator is configured to map the columns from reference machine table to the staging table.

The second activity is a SqlServerStoredProcedure activity which calls the stored procedure we have created earlier to implementing type 2 SCD for DimMachine.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadDimMachine",
  "properties": {
    "description": "Load Machine table to staging table, and then call stored procedure to load to dimention table with SCD 2 ",
    "activities": [
      {
        "name": "LoadStgMachine",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            "sqlReaderQuery": "$$Text.Format('SELECT [Code],[Name],[Condition],[LastModified] FROM [dbo].[Machine] WHERE [LastModified] >= \\'{0:yyyy-MM-dd HH:mm}\\' AND [LastModified] < \\'{1:yyyy-MM-dd HH:mm}\\'', SliceStart, SliceEnd)"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[Machine]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[Machine]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "Code: Code, Name: Name, Condition: Condition, LastModified: LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },
      {
        "name": "LoadDimMachine",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputDimMachine"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadDimMachine",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }

    ],
    "start": "2017-11-10T21:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have done our pipeline for DimMachine table loading. We can publish the pipeline to our Azure tenant:

1

The periodically execution of the pipeline will provide us the incremental loading of the type 2 SCD DimMachine table.

2

Next we can move on to create the LoadDimCustomer pipeline that takes same steps as the steps described above for creating LoadDimMachine pipeline

3

In the next part of the blog series, I am going to cover the steps for building ADLA U-SQL job for incremental extraction of machine cycle data and also how to schedule and trigger the U-SQL job from ADF.