Category: Azure Cortana Intelligence Suite

Azure Stream Analytics Patterns & Implementations

Thanks to the increased popularity of IoT and social networks, steaming analytics has become a hot topic and attracted more and more attentions in the data analytics community. Many people (e.g., this and this) believe streaming analytics is the future that will take over the use cases that are traditionally targeted by batch-oriented analytics.

Azure Stream Analytics is Microsoft’s offer of real-time analytics tool which is one major service in Azure Cortana Intelligence Suite. When designing data analytics solutions on Azure platform, we need to know what is the role Azure Stream Analytics can play in our solutions and how we can use Azure Stream Analytics in what use scenarios. Dr. Srinath Perera, an expert on CEP and streaming analytic, has summarised 13 patterns for streaming real-time analytics. Those patterns can be a very useful guide for us to make design decisions in our data analytics solutions.

In this blog post, I will discuss those patterns in Azure Stream Analytics context, evaluate Azure Stream Analytics’ strengths and weaknesses for those patterns,  and explore how to  implement those patterns using Azure Stream Analytics coupled with the supports from other Azure services (e.g., Event Hub, Azure Functions, and Azure Machine Learning).

Firstly, I am going to give a summary of Dr. Srinath Perera’s 13 streaming real-time analytics patterns and then discuss the Azure Stream Analytics implementation for each patterns. In addition, I am going to add an additional pattern, Edge analytics, onto the list, that is specific for Azure Stream Analytics.

Dr. Perera’s 13 stream analytics patterns

  • Pattern 1 – Preprocessing
  • Pattern 2 – Alerts and Thresholds
  • Pattern 3 – Simple Counting and Counting with Windows
  • Pattern 4 – Joining Event Streams
  • Pattern 5 – Data Correlation, Missing Events, and Erroneous Data
  • Pattern 6 – Interacting with Databases
  • Pattern 7 – Detecting Temporal Event Sequence Patterns
  • Pattern 8 – Tracking
  • Pattern 9 – Detecting Trends
  • Pattern 10 – Running the same Query in Batch and Realtime Pipelines
  • Pattern 11 – Detecting and switching to Detailed Analysis
  • Pattern 12 – Using a Model
  • Pattern 13 – Online Control
  • Pattern 14 (additional) – Edge Analytics

Pattern 1 – Preprocessing

One basic and common task for streaming analytics is data preprocessing that filters, reshapes, splits/combines and transforms incoming raw data into a format suitable for further processing and analysis.

Azure Stream Analytics provides a good support for data preprocessing tasks. The Stream Analytics Query Language is a sql-like language using a subset of T-SQL syntax. The developers with T-SQL skills can easily create scripts for those common data preprocessing task in Azure Stream Analytics with the SQL knowledge they already have. The Stream Analytics Query Language allows them to preprocess streaming data just in the same way as they preprocess batch-oriented data.

7*This snapshot is from Microsoft

Pattern 2 – Alerts and Thresholds

This pattern is a very common streaming analytics pattern, especially in many industrial IoT uses cases. In this pattern, the streaming analytics program detects the abnormal condition based on a pre-defined threshold and generates alerts based on the condition.

Anomaly detection using “WHERE” clause

We can use the “WHERE” clause of Stream Analytics Query Language in Azure Stream Analytics to detect the abnormal condition, and then output the queried event in the abnormal condition to a “Alert” output port,  e.g.,

SELECT  DeviceID, Temperature, "Over Temperature"  AS ErrorStatus
INTO AlertOutput
FROM TelemetryInput
WHERE Temperature >100

Anomaly detection using “ANOMALYDETECTION” Operator

The machine learning-based “ANOMALYDETECTION” Operator is a new feature recently added in Azure Stream Analytics and is currently under Preview release. This operator takes advantage of machine learning algorithm to detect events or observations that do not conform to the expected patterns.

The “ANOMALYDETECTION” Operator is very easy to use, similar to the way how LAG Operator is used.

3

you can find more details about the “ANOMALYDETECTION” Operator here.

Handling Alerts

When an abnormal condition is detected and output to the AlertOutput stream, We can handle the alert output in a number for ways on the Azure platform.

  1. Output the alert output into a live dashboard
  2. Send alert notifications
  3. Automatically handling the alert by adjust the setting of equipment

Azure Stream Analytics support the output of stream to real-time Power BI dashboard. With this feature we can show the real-time alerts on the Power BI dashboard monitored by the maintenance engineers.

6

The alert can also be send to the maintenance engineers in the push mode. Thanks to the recently added Azure Functions output target in Azure Stream Analytics, it is much easier for developers to send out the alerts through email or notifications without the need to first output the stream to service bus queues and then access Azure Functions from there. The developers can now directly egress the alert stream to Azure Functions where they can implement the logic for alert delivery.

1

When combined with Azure IoT Hub, we can also make the monitored equipment to automatically adjust settings based on the alerts. For example, Microsoft has created a real-time data processing solution for KingwayTek that takes advantage of Azure Stream Analytics, Azure Functions and Azure IoT Hub to proactively raise an alert on the vehicle status and the alert will trigger vehicle reconfiguration.

4

*This snapshot is from Microsoft

Pattern 3 – Simple Counting and Counting with Windows

In this pattern, the raw, atomic stream events will be aggregated in a time window to reveal the potential patterns and behaviours. For example, the raw message of a single website visit event may not provide us much meaningful insight but the average view counts per hour or per day can reveal the pattern of the website visits, e.g., the website has more visits in the evening than the morning.

To implement this pattern, the streaming analytics service need to support two types of functions, aggregation and time windows. Azure Stream Analytics provides good supports for both functions.

The Stream Analytics Query Language provides a list of built-in aggregate functions that can cover most of common aggregation requirements.

1.PNG

In addition, Azure Stream Analytics supports user-defined aggregates (UDA) written in Javascript that gives developers the extra power and flexibility to implement complicated aggregate rules.

Azure Stream Analytics also provide good supports on time windows. Three time window functions are supported by Azure Stream Analytics, including Tumbling window, Hopping window and Sliding window.

The tumbling window function, TumblingWindow,  segments a data stream into the repeated, non-overlap, and distinct time windows.

1t1*This image is from Microsoft

The hopping window function, HoppingWindow,  generates time windows that hops forward in time by a fixed period. Compared to the tumbling windows, the hopping windows can overlap with others so same events may fall in more than on hopping windows.

2*This image is from Microsoft

The sliding window function, SlidingWindow, generates time window when an event occurs. The time window ends at the time when the event happens and the start of the time window is defined by the period parameters specified in the SlidingWindow function.

stream-analytics-window-functions-sliding-intro*This image is from Microsoft

Pattern 4 – Joining Event Streams

This pattern is used for the scenarios where multiple data streams need to be processed to create a new event stream. For example, we may have multiple sensors that collect data for different aspects of an object or event.

Azure Streaming Analytics supports multiple inputs from a variety of stream data sources.

1t1.PNG*This image is from Microsoft

After the inputs are defined in Azure Streaming Analytics you can reference the inputs by name using Stream Analytics Query Language.

Pattern 5 – Data Correlation, Missing Events, and Erroneous Data

This pattern correlates the data from different streams or within the same stream. Dr. Perera has give some use cases of this pattern in his article, such as matching up two data streams that send events in different speeds, detecting a customer request that has not been responded within one hour, and detecting failed sensors by comparing a set of sensors that monitor overlapping regions.

In Azure Stream Analytics we can take advantage of the T-SQL syntax of the Stream Analytics Query Language to implement the pattern. For example, we can use Join clause to join different streams on the id of monitored object (e.g., the id of a machine where different sensors are installed on) and use the operators provided by T-SQL to find the correction.

Pattern 6 – Interacting with Databases

In many use cases the streaming data alone is not enough for us to dig out meaningful insight for the businesses. The data from the streaming source can only become useful when combined with historical, businesses oriented data. The streaming analytics service need to be able to fetch data from other business databases and combine with streaming data. For example, we need to check the blacklists when processing a real-time service request.

Azure Stream Analytics do provides the supports of reference data join in the Stream Analytics Query Language. To use this feature, we need to create a Reference type input that fetch the reference data from Azure Blob storage.1t1

Up until to the point, only Azure Blob storage is support as the reference data source for Azure Stream Analytics. We need to use Azure Data Factory to move the reference data from where they are originally stored into a Azure Blob storage instance. The reference data is modeled as a sequence of blobs in ascending order by the datatime specified in the blob name.

As most of reference data is slow changing type of data, the streaming analytics solutions also need to ensure the reference data they combined with the streaming data is up-to-date. Azure Stream Analytics do provides an approach to support slow changing reference data but has some limitations.

Firstly, the reference data blob stored in the Azure Blob storage cannot be updated as that would cause the Stream Analytics jobs to fail. Therefore, we can only add a new blob to store the updated reference data using the same container and path pattern defined in the job input with a date/time greater than the one specified in the last blob in the sequence. Secondly, the old reference data blobs must not be altered or removed.

Pattern 7 – Detecting Temporal Event Sequence Patterns

In this pattern, the streaming analytics is used to detect the temporal event sequence patterns. For example, a machine may fail to work after showing a sequence of status in a certain order. The streaming analytics solution need to be able to detect the sequence pattern so that an alert can be sent to engineers when the pattern occurs.

In the example provided by Dr. Perera, he used Storm and Siddhi (a CEP engien) to detect the temporal event sequence patterns. We can use the Stream Analytics Query Language in Azure Stream Analytics to implement the example. However, I think a better solution that can cope with more complicated use cases is to use machine learning algorithm to detect the pattern and make the prediction. Azure Stream Analytics provides good supports to the Azure Machine Learning. I will provide more details about the Azure Stream Analytics and Azure Machine Learning integration when discussing the Pattern 12.

Pattern 8 – Tracking

This pattern refers to the streaming analytics use cases on tracking something over space and time in one or more given conditions. Those use cases are often combined with IoT use cases that monitoring the real-time status or movements with something. For example, tracking the movement of missing airline luggage.

Azure Stream Analytics comes with real-time geospatial analytics capability that provides native functions for geospatial operations such as computing geospatial data as points, lines, polygons and also supports the join of multiple geospatial data streams to solve more complicated use cases.

Pattern 9 – Detecting Trends

This pattern detects the trend over time series data, e.g., usage increases and drops, peaks, outliers etc. Same as Pattern 8, this pattern is often used in the IoT use cases.

In Azure Stream Analytics, for simple use cases, we can use Stream Analytics Query Language to query the peak (MAX) value, outliers (ANOMALYDETECTION), and start value and end value in a time window for computing the trends . When combined with Power BI  dashboard, we can provide the time series based charts to visualise the trends.

For more complicated use cases, we may need to use some other functions outside of the Stream Analytics Query Language (e.g., is_monotonic_decreasing /is_monotonic_increasing in Python) or we may need time-series analysis model (e.g., ARIMA) for forecasting use cases.  At this moment, Azure Stream Analytics does not support Python or R. However, we can take a workaround that implements the algorithm in Azure Machine Learning studio with Python or R scripts and publish it as a rest api and then integrate it with the Azure Stream Analytics.

Pattern 10 – Running the same Query in Batch and Realtime Pipelines

I found the title of this Pattern “Running the same Query in Batch and Realtime Pipelines” is a bit of confusing, but from Dr. Perera’s explanation, this pattern refers to the Lambda Architecture which is the most popular data analytics architecture used in IoT use cases at this moment.

Lambda Architecture separates the IoT data analytics into two paths, hot path (in other name, speed layer) and cold path (batch layer). The hot path refers to the stream data processing path and the cold path refers to the batch-oriented data processing path. Microsoft Azure Cortana Intelligence suite provides good supports to the Lambda Architecture. More details can be found here.

1t1.PNG

*This snapshot is from Microsoft

Pattern 11 – Detecting and switching to Detailed Analysis

This pattern is used for the use cases where an anomaly or behaviour can be identified by the streaming analytics and further detailed analysis is required against the historical data. I think this pattern can be viewed as a sub-pattern of Pattern 10.

This pattern can be supported on Azure platform using Lambda Architecture as introduced above.

Pattern 12 – Using a Model

This pattern refers to use machine learning model in stream analytics. I have mentioned some use cases in previous patterns where machine learning model need to be used.

Azure Stream Analytics provide a Azure ML type function to support the integration with Azure Machine Learning.

1t1

The machine learning developers can implement the model using Azure Machine Learning studio and publish as a rest api. An Azure Stream Analytics job can call the api using the Azure ML function.

1t1.PNG

*This snapshot is from Microsoft

Pattern 13 – Online Control

This pattern refers to AI-related use cases such as autopilot, self-driving and robotics. Dr. Perera does not provide much details about this pattern in his article and presentation slides. I think Azure Stream Analytics is not designed for this type of application.

Pattern 14 (additional) – Edge Analytics

I have added this pattern to Dr. Perera’s list as Edge computing has become more and more important in IoT use cases and Azure Stream Analytics along with Azure Machine Learning are the main component in Microsoft’s Edge computing offer.

With Azure Stream Analytics on IoT Edge, the real-time analytics intelligence can be deployed close to IoT devices to achieve low latency, resiliency, efficient use of bandwidth and compliance.

2.PNG

*This snapshot is from Microsoft

 

Advertisements

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 4)

This is the last part of the blog series demonstrating how to build an end-to-end ADF pipeline for data warehouse ELT.

2

In the previous part we created the U-SQL job that incrementally extracts the machine cycle rows and then stores them into a staging area in the ADLS. In this part of the blog series we will create the LoadFactMachineCycle pipeline that loads the machine cycle rows from the staging area in the ADLS to the [stg].[MachineCycle] staging table in target DW database and then call a stored procedure that looks up the reference id of the dimension tables (DimMachine, DimCustomer, DimDate), create measures and load the transformed machine cycle data into the fact table, FactMachineCycle.

We start from creating this Stored Procedure, namely [stg].[uspLoadFactMachineCycle] for loading machine cycle fact table. This stored procedure joins the [stg].[MachineCycle] table and the  dimension tables to get the id of those tables and calculates the machine cycle duration measure from the cycle StartDateTime column and EndDateTime column.

 

CREATE PROC [stg].[uspLoadFactMachineCycle] AS
BEGIN
   
   INSERT INTO [prod].[FactMachineCycle]
       SELECT S.[CycleId]
          ,CASE 
            WHEN DM.Id IS NULL THEN 0
            ELSE DM.Id
           END AS MachineId
          ,CASE 
            WHEN DC.Id IS NULL THEN 0
            ELSE DC.Id
           END as CustomerId
          ,Year(S.[StartDateTime])*10000+Month(S.[StartDateTime])*100+Day(S.[StartDateTime]) AS DateKey
          ,DATEDIFF(s, S.[StartDateTime], S.[EndDateTime]) AS Duration
          ,GETDATE() AS RowAdded
      FROM [stg].[MachineCycle] S
      LEFT JOIN [prod].[FactMachineCycle] F ON S.CycleId = F.CycleId
      LEFT JOIN [prod].[DimCustomer] DC ON S.CustomerName = DC.Code AND DC.CurrentRow=1 
      LEFT JOIN [prod].[DimMachine] DM ON S.MachineName = DM.Code AND DM.CurrentRow=1
      WHERE F.CycleId IS NULL

END

We then move on to create the ADF pipeline for the machine cycle fact table loading. the following ADF files will be crated (highlighted in yellow colour).

3

The snapshot below shows the pipeline diagram:

1

Firstly we need to create a linked service (AzureDataLakeStoreLinkedService) that points to the ADLS where the staging machine cycle csv files are stored.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
  "name": "AzureDataLakeStoreLinkedService",
  "properties": {
    "type": "AzureDataLakeStore",
    "typeProperties": {
      "dataLakeStoreUri": "adl://{store account name}.azuredatalakestore.net",
      "servicePrincipalId": "{service principal id}",
      "servicePrincipalKey": "{service principal key}",
      "tenant": "{tenant},
      "subscriptionId": "{subscription id}",
      "resourceGroupName": "{resource group name}"
    }
  }
}

Next, We need to create the input and output ADF tables, InputMachineCycle, OutputStgMachineCycle and the OutputFactMachineCycleInputMachineCycle ADF table points to the staging machine cycle csv file, stgMachineCycles.scv, in the ADLS. The columns in the csv file need to be explicitly declared in the structure property. It is important to specify the type of the non-string columns, e.g., CycleId (Int32) and StartDateTime (Datetime), so that those columns can be converted from a string type in the csv file to the correct type in the DW database.

 

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
    "name": "InputMachineCycle",
  "properties": {
    "type": "AzureDataLakeStore",
    "linkedServiceName": "AzureDataLakeStoreLinkedService",
    "structure": [
      { "name": "CycleId","type": "Int32"},
      { "name": "CustomerCode"},
      { "name": "MachineCode"},
      { "name": "StartDateTime", "type": "Datetime" },
      { "name": "EndDateTime", "type": "Datetime" },
      { "name": "EventTime", "type": "Datetime" }
    ],
    "typeProperties": {
      "folderPath": "IoT/Staging",
      "fileName": "stgMachineCycles.csv",
      "partitionedBy": []
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
The OutputStgMachine ADF table points to the [stg].[MachineCycle] staging table in the target DW database.
{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachineCycle",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "CycleId" },
      { "name": "CustomerName" },
      { "name": "MachineName" },
      { "name": "StartDateTime" },
      { "name": "EndDateTime" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[MachineCycle]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
 The OutputFactMachineCycle ADF table points to the FactMachineCycle table in the target DW database.
{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputFactMachineCycle",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[FactMachineCycle]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

After the input and output ADF tables have been created we can start to build the LoadFactMachineCycle pipeline which contains two activities. The first activity, LoadStgMachineCycle, copies machine cycle data from the staging csv file stored in the ADLS to the staging machine cycle table, [stg].[MachineCycle], in the target DB. The TabularTranslator is configured to map the columns from the csv file to the staging database table. The second activity in the pipeline is a SqlServerStoredProcedure activity that calls the stored procedure we created earlier to load the machine cycle data from [stg].[MachineCycle] to the target [prod].[FactMachineCycle] table.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadFactMachineCycle",
  "properties": {
    "description": "Load machine cycles from azure data lake store to staging table, and then call stored procedure to load the fact table",
    "activities": [
      {
        "name": "LoadStgMachineCycle",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachineCycle"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "AzureDataLakeStoreSource"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[MachineCycle]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[MachineCycle]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "CycleId: CycleId, CustomerCode: CustomerName, MachineCode: MachineName, StartDateTime: StartDateTime, EndDateTime:EndDateTime, EventTime:LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },

      {
        "name": "LoadFactMachineCycle",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachineCycle"
          }
        ],
        "outputs": [
          {
            "name": "OutputFactMachineCycle"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadFactMachineCycle",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

The snapshot below shows the [prod].[FactMachineCycle] table after the machine cycle data has been load by the ADF pipeline.

2

Up to this point, we have completed the end-to-end ADF pipeline that extracts data from Azure SQL DB and ADLS, and load to type 2 SCD dimension tables and fact table in a incremental loading mode.

1

The snapshot below shows all the visual studio files we have created for building the pipeline.

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 3)

This is the third part of the blog series to demonstrate how to build an end-to-end ADF pipeline for data warehouse ELT. The part will describe how to build an ADLA U-SQL job for incremental extraction of machine cycle data from Azure Data Lake store and go through the steps for scheduling and triggering the U-SQL job using ADF.

In the first part of the blog series we have created a Azure Data Lake store directory to store the machine cycle data in csv format.

3

We are going to create a U-SQL job to query the machine cycle records based on the SliceStart datatime and the SliceEnd datatime. The ADF pipeline will pass the start time and the end time of current ADF ingestion slice into the U-SQL job, and the U-SQL job will only returns the machine cycle ended within this period. This example focuses on the same-day machine cycle scenario and does not cover the across-day machine cycle scenario. There is another blog post I have written specifically for the across-day scenario here.

Instead of creating an U-SQL script file to host the code of the U-SQL job, we prefer to create an ADLA catalog stored procedure to simplify the scripts organisation and deploying. Two parameters are defined in the stored procedure that take the SliceStart datatime and SliceEnd datatime from ADF. The year, month, day parts will be extracted from the SliceStart datatime for building the folder path that points to the location where the machine cycle csv files are stored.

The U-SQL script will then extract the rows from the csv files and SELECT the rows within the current ADF execution slice. The selected rows will be output into a staging area in the Azure Data Lake store.

CREATE DATABASE IF NOT EXISTS ADFDW;

DROP PROCEDURE IF EXISTS ADFDW.dbo.uspGetMachineCycles;
CREATE PROCEDURE IF NOT EXISTS ADFDW.dbo.uspGetMachineCycles(@sliceStart DateTime, @sliceEnd DateTime)
AS
BEGIN

    DECLARE @sliceYear string = @sliceStart.Year.ToString();
    DECLARE @sliceMonth string = @sliceStart.Month.ToString().PadLeft(2, '0');
    DECLARE @sliceDay string = @sliceStart.Day.ToString().PadLeft(2, '0');

    DECLARE @InputPath = @"/IoT/Curated%20Zone/Demo/ADFDW/"
                         + @sliceYear + "/" + @sliceMonth + "/" + @sliceDay + "/{*}.csv";

    DECLARE @OutputFile string = "/IoT/Staging/stgMachineCycles.csv";

    @machineCycles =
        EXTRACT CycleId int,
                CustomerCode string,
                MachineCode string,
                StartDateTime DateTime,
                EndDateTime DateTime,
                EventTime DateTime
        FROM @InputPath
        USING Extractors.Csv(skipFirstNRows : 1);

    @result =
        SELECT *
        FROM @machineCycles
        WHERE EndDateTime >= @sliceStart AND EndDateTime < @sliceEnd;


    OUTPUT @result
    TO @OutputFile
    USING Outputters.Csv();

END;

After we created the U-SQL script, we need to create the ADF pipeline to schedule and trigger the U-SQL job. As the machine cycle fact table need to look up for the machine and customer dimension tables. We need to schedule the execution of the U-SQL job immediately after the dimension tables are load:

4

The rest of this blog post will go through the steps to build the IncrementalExtractMachineCycle ADF pipeline for triggering the U-SQL job.

Firstly, we need to create a AAD service principal for ADF and ADLA using to access the csv files stored in the ADLS. You can find the detailed steps to create service principal and to assign ADLS access in Microsoft official documents (don’t forget to turn the allowing service access option on in the ADLS firewall settings).

With the service principal we can create the AzureDataLakeAnalyticsLinkedService file in ADF that configure the link to the ADLA service where the U-SQL job will be executed.

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "AzureDataLakeAnalyticsLinkedService",
    "properties": {
        "type": "AzureDataLakeAnalytics",
        "typeProperties": {
          "accountName": "ninjago0786demp",
          "subscriptionId": "{subscription id}",
          "resourceGroupName": "{resource group}",
          "dataLakeAnalyticsUri": "azuredatalakeanalytics.net",
          "servicePrincipalId": "{serivce principal id}",
          "servicePrincipalKey": "{service principal key}",
          "tenant": "tenant"
      }
    }
}

We can then create the IncrementalExtractMachineCycle pipeline linked to the AzureDataLakeAnalyticsLinkedService and create a DataLakeAnalyticsU-SQL type activity to trigger the U-SQL job. In the activity, we set the “script” property as the commend to call the ADFDW.dbo.uspGetMachineCycles we created earlier and pass the SliceStart and SliceEnd as parameters.

As the U-SQL job is expected to run after both the DimMachine and DimCustomer tables are loaded, we set both OutputDimCustomer and OutputDimMachine ADF tables (the output of the LoadDimCustomer pipeline and the LoadDimMachine pipeline)  as the inputs of the DataLakeAnalyticsU-SQL activity. For the output of the activity, we set it as the InputMachineCycle ADF table which is the input for the pipeline to load the machine cycle data from ADLS staging area to the target data warehouse. The implementation of  inputMachineCycle ADF table will be covered in the next part of the blog series.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "IncrementalExtractMachineCycle",
  "properties": {
    "description": "Extract machine cycles within current time slice",
    "activities": [
      {
        "name": "ExtractMachineCycle",
        "linkedServiceName": "AzureDataLakeAnalyticsLinkedService",
        "type": "DataLakeAnalyticsU-SQL",
        "typeProperties": {
          "script": "ADFDW.dbo.uspGetMachineCycles(System.DateTime.Parse(@sliceStart), System.DateTime.Parse(@sliceEnd));",
          "parameters": {
            "sliceStart": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceStart)",
            "sliceEnd": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceEnd)"
          }
        },
        "inputs": [
          {
            "name": "OutputDimCustomer"
          },
          {
            "name": "OutputDimMachine"
          }
        ],
        "outputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have the U-SQL job created and scheduled that will extract the machine cycle rows within the current ADF execution slice and store them into the staging area in the ADLS. The next part of the blog series will create the FactMachineCycle loading pipeline to load the machine cycle rows in csv file format from ADLS to the staging table in the target DW and then create a store procedure to transform and load the data into the machine cycle fact table.

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 2)

This is the second part of the blog series to demonstrate how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and loading to a star-schema data warehouse database with considerations on  SCD (slow changing dimensions) and incremental loading.

The first part of the blog series describes the example used by the demonstration and setup the required Azure SQL DB/Azure Data Lake directory and the sample data. In the second part of the blog series we are going to build the ADF pipeline for the incremental loading of customer and machine related data from the reference database to the staging database and then transform them to the type 2 SCD tables.

In this demonstration we are going to create two Dimension tables, DimMachine and DimCustomer (the DimDate has been created in Part 1 of the blog series). We start from building the ADF pipeline for DimMachine. Firstly we need to create the stored procedure for loading the type 2 SCD DimMachine table from the staging table ([stg].[Machine). The stored procedure will be called by the ADF pipeline after the staging table is loaded from the reference database. In the Visual Studio DW project we have created in the the first part of the blog series, we add a ETL folder and create a sql file naming: uspLoadDimMachine.sql.

2

In this file we create the Stored Procedure [stg].[uspLoadDimMachine] with the use of  SQL Merge statement to implement the type 2 SCD for DimMachine table.

CREATE PROC [stg].[uspLoadDimMachine] AS
BEGIN
    INSERT INTO [prod].[DimMachine]
        ([Code]
        ,[Name]
        ,[Condition]
        ,[CurrentRow]
        ,[ValidTo]
        ,[LastModified]
        )
    SELECT
         M.[Code]
        ,M.[Name]
        ,M.[Condition]
        ,1
        ,'9999-12-31'
        , GETDATE()
    FROM (
        MERGE [prod].[DimMachine] AS [Target]
            USING [stg].[Machine] AS [Source] ON Target.Code = Source.Code
        WHEN MATCHED AND Target.[Condition]  Source.[Condition]
             THEN UPDATE SET
                [CurrentRow] = 0
               ,[LastModified] = GETDATE()
               ,[ValidTo] = GETDATE()
        WHEN NOT MATCHED BY TARGET
             THEN INSERT (
                 [Code]
                ,[Name]
                ,[Condition]
                ,[CurrentRow]
                ,[ValidTo]
                ,[LastModified]
               ) VALUES (
                 Source.Code
                ,Source.Name
                ,Source.Condition
                ,1
                ,'9999-12-31'
                ,GETDATE()
               )
         OUTPUT $action AS Action, [Source].*
        ) AS M
        WHERE M.Action = 'UPDATE' AND M.[Code]  IS NOT NULL;
END

After the stored procedure is ready we can start to build the ADF pipeline for machine dimension loading. The snapshot below shows the complete pipeline diagram for DimMachine data loading.

1

We will author the ADF components in Visual Studio. The snapshot below show the files we need to create for DimMachine data loading (highlighted in yellow colour):

1

ReferenceDBLinkedService and DWLinkedService configure the links to the Azure SQL reference database and the target data warehouse.

ReferenceDBLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "ReferenceDBLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

DWLinkedService:

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "DWLinkedService",
    "properties": {
        "type": "AzureSqlDatabase",
      "typeProperties": {
        "connectionString": "{connection string}"
      }
    }
}

InputMachine table is an AzureSQLTable type table that points to the [dbo].[Machine] table in our ReferenceDB Azure SQL database. The columns used to mapped to the columns in the output data table, i.e. [stg].[Machine] is declared in the structure attribute. The input table is set to be available every one hour which means the table will be refreshed from the [dbo].[Machine] table every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "InputMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "ReferenceDBLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[dbo].[Machine]"
    },
    "external": true,
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputStgMachine ADF table points to the [stg].[Machine] table in our target DW Azure SQL database. The output columns mapped to the input columns (the input columns were defined in the InputMachine table earlier) is also declared in the structure attribute. The output table is set to be available every one hour that means the Copy Activity for moving data from source Machine table to the staging Machine table will be triggered and output the results to the output data every one hour.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "Code" },
      { "name": "Name" },
      { "name": "Condition" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[Machine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

The OutputDimMachine ADF table points to the [prod].[DimMachine] table in our target DW Azure SQL database.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputDimMachine",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[DimMachine]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

LoadDimMachine pipleline contains two activities. The first activity LoadStgMachine is a Copy Activity which copies data from the reference machine table to the staging machine table. The data queried from the reference machine table is filtered by the SliceStart datatime and the SliceEnd datatime that only returns the machine recorded updated during that period. TabularTranslator is configured to map the columns from reference machine table to the staging table.

The second activity is a SqlServerStoredProcedure activity which calls the stored procedure we have created earlier to implementing type 2 SCD for DimMachine.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadDimMachine",
  "properties": {
    "description": "Load Machine table to staging table, and then call stored procedure to load to dimention table with SCD 2 ",
    "activities": [
      {
        "name": "LoadStgMachine",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            "sqlReaderQuery": "$$Text.Format('SELECT [Code],[Name],[Condition],[LastModified] FROM [dbo].[Machine] WHERE [LastModified] >= \\'{0:yyyy-MM-dd HH:mm}\\' AND [LastModified] < \\'{1:yyyy-MM-dd HH:mm}\\'', SliceStart, SliceEnd)"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[Machine]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[Machine]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "Code: Code, Name: Name, Condition: Condition, LastModified: LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },
      {
        "name": "LoadDimMachine",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachine"
          }
        ],
        "outputs": [
          {
            "name": "OutputDimMachine"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadDimMachine",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }

    ],
    "start": "2017-11-10T21:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have done our pipeline for DimMachine table loading. We can publish the pipeline to our Azure tenant:

1

The periodically execution of the pipeline will provide us the incremental loading of the type 2 SCD DimMachine table.

2

Next we can move on to create the LoadDimCustomer pipeline that takes same steps as the steps described above for creating LoadDimMachine pipeline

3

In the next part of the blog series, I am going to cover the steps for building ADLA U-SQL job for incremental extraction of machine cycle data and also how to schedule and trigger the U-SQL job from ADF.

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 1)

This blog series demonstrates how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and load to a star-schema data warehouse database with considerations of  SCD (slow changing dimensions) and incremental loading.

The final pipeline will look as:

1

The machine cycle records will be load from the csv files stored in a Azure Data Lake store, and the reference data of customers and machines will be load form the Reference DB (Azure SQL DB). The ADF pipeline will first load the data into the staging tables in the target DW, and the ADF pipeline will then execute the SQL stored procedures to transform the data into star schema and implement the type 2 SCD.

A number of services within the Azure Cortana Intelligence Suite will be employed to build the end-to-end pipeline, including Azure Data Factory, Azure Data Lake Store, Azure Data Lake Analytics (U-SQL), Azure SQL Database, and the development tools (Visual Studio + SSDT). Four Visual Studio projects will be created, including the ReferenceDB for provisioning the Azure SQL reference database, the DW for provisioning the target Data Wareshourse, the ADLA for incrementally extracting the machine cycle data, and the ADF for the build of the pipeline. You can download the source code here.

This blog post covers the first part of the series to give a brief introduction of the example used in this demonstration and provides the steps for preparing the required demo databases/data lake directory and the sample data.

Preparation

1.Create Visual Studio solution

Create a Visual Studio solution with the SSDT tool to host all the projects required in this demonstration, including the DB projects for ReferenceDB and the target DW, and the ADF project.

2. Create ReferenceDB project

1

Create ReferenceDB project and add the scripts for creating Customer table and Machine table.

Customer table:

CREATE TABLE [dbo].[Customer]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Machine table:

CREATE TABLE [dbo].[Machine]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Create a post deployment script to add the sample data:

INSERT INTO [dbo].[Customer]
           ([Code]
           ,[Name]
           ,[LastModified])
     VALUES
           ('C0001','Customer1', '2017-11-08 21:21:30'),
           ('C0002','Customer2', '2017-11-08 21:21:30')

INSERT INTO [dbo].[Machine]
           ([Code]
           ,[Name]
           ,[Condition]
           ,[LastModified])
     VALUES
           ('M0001','Machine1', 'Perfect', '2017-11-08 21:21:30'),
           ('M0002','Machine2', 'Good', '2017-11-08 21:21:30')

Publish the project to the Azure tenant to provision the Azure SQL DB.

3. Create the target DW project

2

Create the target DW project and add the scripts for schema, staging tables, dimension tables and fact tables:

Schema – prod:

CREATE SCHEMA [prod]

Schema – stg:

CREATE SCHEMA [stg]

Staging – Customer:

CREATE TABLE [stg].[Customer]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – Machine:

CREATE TABLE [stg].[Machine]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – MachineCycle:

CREATE TABLE [stg].[MachineCycle]
(
    [CycleId] int NOT NULL,
    [MachineName] varchar(50) NOT NULL,
    [CustomerName] varchar(50) NOT NULL,
    [StartDateTime] datetime2 NOT NULL,
    [EndDateTime] datetime2 NOT NULL,
        [LastModified] DATETIME2 NOT NULL
)

Dimension – DimCustomer

CREATE TABLE [prod].[DimCustomer]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY, 
    [Name] VARCHAR(50) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Dimension – DimMachine

CREATE TABLE [prod].[DimMachine]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Facts – FactMachineCycle

CREATE TABLE [prod].[FactMachineCycle]
(
    [CycleId] int NOT NULL PRIMARY KEY,
    [MachineId] int NOT NULL,
    [CustomerId] int NOT NULL,
    [DateKey] int NOT NULL,
    [Duration] float NOT NULL,
        [RowAdded] DATETIME2 NOT NULL
)

In addition to the tables above we also need a Date Dimension table and the scripts to generate the table. There are plenty of scripts you can find from Google, e.g., https://www.codeproject.com/Articles/647950/Create-and-Populate-Date-Dimension-for-Data-Wareho. The generated DimDate table should look like:

6

Finally, we need to publish the project to Azure and will get the DB deployed.

7

4. Create Azure Data Lake directory

Create a directory in Azure Data Lake, e.g., “ADFDW”, and organise the sub-directory hierarchy as year -> month -> day.

3

The sample csv data includes the columns of cycleID, CustomerCode, MachineCode, StartDateTime, EndDateTime, and EventTime.

3

Now we have done our preparation work, the next blog post will cover the steps to build ADF pipelines for dimensional tables ETL.

Handling Across-Day Cycle Issue in Daily Usage Analysis using U-SQL

When analysing daily usage of a machine that can run across days, we need to split the time of a single machine running cycle into right days. As the example below shows, the second machine (D00002) run across two days and the third machine (D00003) run across three days.

t2

To analyse the daily usage of those machines we need to transform data in the example above into the following structure.

t22

As you can see, the record of second machine that run across two days has been split into two records, one for each day. Same as the record of third machine that run across three days, the record has been split into three days. The UsageHours column stores the time the machine run on that day. For example, the second machine started to run at 17:33:21 on 22nd of May and stopped at 14:29:22 on 23rd of May. The original record of the machine running cycle will be split into two rows: the row for day on 22nd of May with UsageHours as 6.4 (between 17:33:21 and 23:59:59) , and the row for day on 23rd of May with UsageHours as 14.5 (between 00:00:00 and 14:29:22).

In this blog post, I am going to introduce how to conduct this kind of data transformation using U-SQL with Azure Data Lake Analytics (ADLA) jobs . The snapshot below shows the U-SQL code.

t4

Firstly, we need to split the machine running cycle into the days it spans. To achieve that we need to join the machine cycle table withe a Date dimension table (here is a way to generate data dimension table using U-SQL introduced by Paul Andrew).

t5

The join operation can give us something like:

t1

We use the CROSS JOIN with WHERE clause to get the days within the span of cycle StartDateTime and the cycle EndDateTime (As the U-SQL JOIN does not support non-equality comparison at this moment, we cannot use INNER JOIN with ON clause)

After we split a single cycle into each day segment in the time span of the cycle, we can use the CASE statement to decide the type of the day segment (i.e. the day when machine starts, the day when machine stops, and the day(s) in middle that run the full 24 hours) and calculate the time in each day segment.

t7

In addition, we need to use another CASE statement to flag the first day of the machine running cycle as 1 and the other days as 0. This is the trick for aggregating the count of total machine running cycles within a period. After we split a single machine cycle into multiple records, the count of records in the machine cycle table will not reflect the count of machine cycles. Therefore, we need to use the Cycle flag in each record to aggregate the count of machine cycles.

t6