Category: Azure Cortana Intelligence Suite

Anomaly Detection with Azure Stream Analytics

Anomaly detection is a very common use case in IoT related deployments. A new ANOMALYDETECTION operator has been recently added into Azure Stream Analytics and is currently at public preview.

ANOMALYDETECTION operator detects anomalies based on Exchangeability Martingales (EM) that supports online test of the exchangeability of a sequence of event values. When the distribution of the sequence of event values is invariant, this sequence of event values is exchangeable. If the distribution of the sequence of event values is changed, a potential anomaly occurs.

This is the syntax of ANOMALYDETECTION operator, which check whether the current event value is anomaly against a sliding window of time period defined by the OVER clause.

ANOMALYDETECTION(\<scalar_expression\>) OVER ([PARTITION BY \] LIMIT DURATION(\<unit\>, \<length\>) [WHEN boolean_expression])

ANOMALYDETECTION operator returns three scores (BiLevelChangeScore, SlowPosTrendScore, and SlowNegTrendScore) corresponding to the three types of anomalies:

  • Bidirectional level change
  • Slow positive trend
  • Slow negative trend

This blog post gives a demo on the ANOMALYDETECTION operator with an example that detect anomalies in a temperature sensor events flow. The sensor data will be generated by the Raspberry Pi Azure Iot Online Simulator , and sent to the IoT hub.

2

An Azure Stream Analytics input will be created to consume the temperature data from the IoT hub, and a Power BI output will be created to output the temperature anomaly alerts.

4

A prerequisite for ANOMALYDETECTION operator to work is that the input time series needs to be uniform. We can use tumbling window to uniform the time series by averaging the temperature within n seconds window.

1t1

1t1

To fill the window with no sensor data flowing in, we can use the last window where sensor data is available.

2

2

We can then use the ANOMALYDETECTION operator to compute the anomaly scores within the time window of last n minutes/hours and extract the BiLevelChangeSocore, SlowPosTrendScore, and SlowNegTrendScore.

3

Finally, we can check the scores against the threshold set for alert. The recommended range of the threshold from Microsoft is between 3.25 and 5.

The full code can be found below:

After start the stream analytics job, the temperature measure data with the anomaly scores will flow into Power BI.
1t1.PNG

We can create anomalies through changing the temperature value generated by the simulator.

3

1t1

Advertisements

SSIS in Azure #1 – Periodically Ingesting Data into Azure Data Lake from SQL Database using SSIS

The low cost, schema-less and large column attributes of Azure Data Lake Store along with the large number of supported analytic engines (e.g., Azure Data Lake Analytics, Hive and Spark) makes it a prefect store-everything repository for enterprise data. We can offline the copies of business data from various LOB data sources into Azure Data Lake Store for all sorts of batch analysis.

Microsoft provides us with Azure Data Factory, the cloud-based ETL service in Azure Cortana Intelligence Suite, to support the data ingestion to Azure Data Lake Store. However, many data engineers working with Microsoft BI stack may prefer to use the SSIS, the tool they are familiar with and offers the easy-to-use visual editor and the rich collection of transformation components, instead of Azure Data Factory where they have to author the json files to define data source links, datasets, and pipelines (at least for Azure Data Factory V1. There will be a visual editor for Azure Data Factory V2 but not available yet).

Thanks to the Azure-SSIS integration runtime that is available for public preview in Azure Data Factory V2, we can now deploy and execute our SSIS packages in Azure that provides an alternative option for cloud-based ETL.

This blog post introduces how to move data in cloud using SSIS with an example for a common use case that periodically ingest data from SQL database to Azure Data Lake Store. There are two key requirements for this use case:

  • SSIS need to be able to connect to Azure SQL database and load the data into a csv file in a specified folder in Azure Data Lake Store
  • SSIS need to be able to periodically, incrementally load data from Azure SQL database into a csv file for that period. The csv files need to be organised in date hierarchy for  optimised performance of Azure Data Lake Store.

1t1

For the first requirement, we need to use the SSIS Feature Pack for Azure that is an SSIS extension to connect to Azure services, move data between Azure data sources or between on-premises data sources and Azure data sources. For the second requirement, we need to use a SSIS trick for dynamic attribute settings on data flow destination component. We will cover the details to fulfil those two requirements in the rest of the blog post.

Firstly, we need to install the SSIS Feature Pack for Azure to Visual Studio (the right version of SSDT should have been installed to the Visual Studio). We should be able to see the Azure connection components in the SSIS toolbox after the feature pack is installed.

1t1

Before starting to build the SSIS package, we need to create a Azure AD service principle as the service account for accessing the Azure Data Lake Store and assign the principle read/write permission to the folder in the Azure Data Lake Store where the output csv files will be stored.

We then create a SSIS project in SSDT and add a Data Flow Task.

2

Open the Data Flow tab, add an ADO NET source which will connect to the Azure SQL database where the data will be extracted from. In this example, we will use the AdventureWorks sample database as data source and transfer the sale orders data into Azure Data Lake Store. To extract the sale orders periodically, we first define two variables in the SSIS package, “StartDate” and “EndDate“. In this example, we want to load the sale orders at the daily interval. The SSIS package is expected to run at early morning every day to load data of the previous day. Therefore, the value of StartDate variable will be: DATEADD( “day”, -1, ( (DT_DATE)(DT_DBDATE)GETDATE())) and the value of EndDate will be: (DT_DATE)(DT_DBDATE)GETDATE().

3

Then we want to extract the sale order records with LastModified datatime between the StartDate and the EndDate. In this example, we first create a Stored Procedure uspGetSaleOrders in the source SQL Database that take the StartDate and EndDate as parameters and return the sale orders between the dates. In your environment, if you do not have access to create Stored Procedure in your data sources, you can create the sql scripts into a SSIS variable.

We then move to the Control Flow tab and open the properties panel of the data flow task and open the Expressions editor.

1t1.PNG

On the Expressions editor, we add an expression to dynamically set the SqlCommand property of the SQL database source as: “EXEC [dbo].[uspGetSaleOrders] @StartDate ='”+(DT_WSTR, 50)@[User::StartDate]+”‘, @EndDate = ‘”+(DT_WSTR, 50)@[User::EndDate]+”‘”. This command will exec the stored procedure we created earlier with the StartDate and EndDate variables passed in.

2.PNG

Now we have the data source setup and we can move to add and configure the Azure Data Lake Store Destination.

3.PNG

We add an Azure Data Lake Store Destination component in the Data Flow table and add a data flow from the SQL database source to the destination. On the Azure Data Lake store Destination Editor window, we need to create an connection manager to manage the connection (including the store location and the authentication) to the Azure Data Lake Store and specify the output file path and the format of the file. As we will output the file as csv format, we need to select the file format as Text and the column delimiter character as “,”.

4.PNG

The interesting part is on the File Path attribute. As we discussed earlier, we want to organise the files into the date hierarchy based on the modified date of the sale order records, so the file path will look like: “/{project folder}/{Year}/{Month}/{Day}/SaleOrders_{date string}.csv“.

To dynamically set the file path of Azure Data Lake Destination, we can add an expression in the parent Data Flow Task as we did for the SQLCommond attribute of the SQL database source.

2

We define the expression for the file path as:

/Samples/Demo/”+(DT_WSTR, 4)YEAR(@[User::EndDate]) +”/”+RIGHT(“0” + (DT_WSTR, 2) MONTH(@[User::EndDate]), 2) +”/”+RIGHT(“0″ + (DT_WSTR, 2) DAY(@[User::EndDate]), 2)+”/SaleOrders_” +(DT_WSTR, 4)YEAR(@[User::EndDate]) + RIGHT(“0” + (DT_WSTR, 2) MONTH(@[User::EndDate]), 2) + RIGHT(“0″ + (DT_WSTR, 2) DAY(@[User::EndDate]), 2)+”.csv

Now we have the Azure Data Lake Store Destination setup and the data flow is ready to run. We can test the data flow in SSDT. As the sample AdventureWork database does not contain sale order records in the period when the blog post is written. I manually set the StartDate and EndDate variables for a day when there are sale order records in the AdventureWork database for the test purpose.

2

1t1

Now we can see the data flow is working when running on our local machine through SSDT. The next blog post will provision the Azure-SSIS Integration Runtime and deploy and run the SSIS package in the cloud.

Azure Stream Analytics Patterns & Implementations

Thanks to the increased popularity of IoT and social networks, steaming analytics has become a hot topic and attracted more and more attentions in the data analytics community. Many people (e.g., this and this) believe streaming analytics is the future that will take over the use cases that are traditionally targeted by batch-oriented analytics.

Azure Stream Analytics is Microsoft’s offer of real-time analytics tool which is one major service in Azure Cortana Intelligence Suite. When designing data analytics solutions on Azure platform, we need to know what is the role Azure Stream Analytics can play in our solutions and how we can use Azure Stream Analytics in what use scenarios. Dr. Srinath Perera, an expert on CEP and streaming analytic, has summarised 13 patterns for streaming real-time analytics. Those patterns can be a very useful guide for us to make design decisions in our data analytics solutions.

In this blog post, I will discuss those patterns in Azure Stream Analytics context, evaluate Azure Stream Analytics’ strengths and weaknesses for those patterns,  and explore how to  implement those patterns using Azure Stream Analytics coupled with the supports from other Azure services (e.g., Event Hub, Azure Functions, and Azure Machine Learning).

Firstly, I am going to give a summary of Dr. Srinath Perera’s 13 streaming real-time analytics patterns and then discuss the Azure Stream Analytics implementation for each patterns. In addition, I am going to add an additional pattern, Edge analytics, onto the list, that is specific for Azure Stream Analytics.

Dr. Perera’s 13 stream analytics patterns

  • Pattern 1 – Preprocessing
  • Pattern 2 – Alerts and Thresholds
  • Pattern 3 – Simple Counting and Counting with Windows
  • Pattern 4 – Joining Event Streams
  • Pattern 5 – Data Correlation, Missing Events, and Erroneous Data
  • Pattern 6 – Interacting with Databases
  • Pattern 7 – Detecting Temporal Event Sequence Patterns
  • Pattern 8 – Tracking
  • Pattern 9 – Detecting Trends
  • Pattern 10 – Running the same Query in Batch and Realtime Pipelines
  • Pattern 11 – Detecting and switching to Detailed Analysis
  • Pattern 12 – Using a Model
  • Pattern 13 – Online Control
  • Pattern 14 (additional) – Edge Analytics

Pattern 1 – Preprocessing

One basic and common task for streaming analytics is data preprocessing that filters, reshapes, splits/combines and transforms incoming raw data into a format suitable for further processing and analysis.

Azure Stream Analytics provides a good support for data preprocessing tasks. The Stream Analytics Query Language is a sql-like language using a subset of T-SQL syntax. The developers with T-SQL skills can easily create scripts for those common data preprocessing task in Azure Stream Analytics with the SQL knowledge they already have. The Stream Analytics Query Language allows them to preprocess streaming data just in the same way as they preprocess batch-oriented data.

7*This snapshot is from Microsoft

Pattern 2 – Alerts and Thresholds

This pattern is a very common streaming analytics pattern, especially in many industrial IoT uses cases. In this pattern, the streaming analytics program detects the abnormal condition based on a pre-defined threshold and generates alerts based on the condition.

Anomaly detection using “WHERE” clause

We can use the “WHERE” clause of Stream Analytics Query Language in Azure Stream Analytics to detect the abnormal condition, and then output the queried event in the abnormal condition to a “Alert” output port,  e.g.,

SELECT  DeviceID, Temperature, "Over Temperature"  AS ErrorStatus
INTO AlertOutput
FROM TelemetryInput
WHERE Temperature >100

Anomaly detection using “ANOMALYDETECTION” Operator

The machine learning-based “ANOMALYDETECTION” Operator is a new feature recently added in Azure Stream Analytics and is currently under Preview release. This operator takes advantage of machine learning algorithm to detect events or observations that do not conform to the expected patterns.

The “ANOMALYDETECTION” Operator is very easy to use, similar to the way how LAG Operator is used.

3

you can find more details about the “ANOMALYDETECTION” Operator here.

Handling Alerts

When an abnormal condition is detected and output to the AlertOutput stream, We can handle the alert output in a number for ways on the Azure platform.

  1. Output the alert output into a live dashboard
  2. Send alert notifications
  3. Automatically handling the alert by adjust the setting of equipment

Azure Stream Analytics support the output of stream to real-time Power BI dashboard. With this feature we can show the real-time alerts on the Power BI dashboard monitored by the maintenance engineers.

6

The alert can also be send to the maintenance engineers in the push mode. Thanks to the recently added Azure Functions output target in Azure Stream Analytics, it is much easier for developers to send out the alerts through email or notifications without the need to first output the stream to service bus queues and then access Azure Functions from there. The developers can now directly egress the alert stream to Azure Functions where they can implement the logic for alert delivery.

1

When combined with Azure IoT Hub, we can also make the monitored equipment to automatically adjust settings based on the alerts. For example, Microsoft has created a real-time data processing solution for KingwayTek that takes advantage of Azure Stream Analytics, Azure Functions and Azure IoT Hub to proactively raise an alert on the vehicle status and the alert will trigger vehicle reconfiguration.

4

*This snapshot is from Microsoft

Pattern 3 – Simple Counting and Counting with Windows

In this pattern, the raw, atomic stream events will be aggregated in a time window to reveal the potential patterns and behaviours. For example, the raw message of a single website visit event may not provide us much meaningful insight but the average view counts per hour or per day can reveal the pattern of the website visits, e.g., the website has more visits in the evening than the morning.

To implement this pattern, the streaming analytics service need to support two types of functions, aggregation and time windows. Azure Stream Analytics provides good supports for both functions.

The Stream Analytics Query Language provides a list of built-in aggregate functions that can cover most of common aggregation requirements.

1.PNG

In addition, Azure Stream Analytics supports user-defined aggregates (UDA) written in Javascript that gives developers the extra power and flexibility to implement complicated aggregate rules.

Azure Stream Analytics also provide good supports on time windows. Three time window functions are supported by Azure Stream Analytics, including Tumbling window, Hopping window and Sliding window.

The tumbling window function, TumblingWindow,  segments a data stream into the repeated, non-overlap, and distinct time windows.

1t1*This image is from Microsoft

The hopping window function, HoppingWindow,  generates time windows that hops forward in time by a fixed period. Compared to the tumbling windows, the hopping windows can overlap with others so same events may fall in more than on hopping windows.

2*This image is from Microsoft

The sliding window function, SlidingWindow, generates time window when an event occurs. The time window ends at the time when the event happens and the start of the time window is defined by the period parameters specified in the SlidingWindow function.

stream-analytics-window-functions-sliding-intro*This image is from Microsoft

Pattern 4 – Joining Event Streams

This pattern is used for the scenarios where multiple data streams need to be processed to create a new event stream. For example, we may have multiple sensors that collect data for different aspects of an object or event.

Azure Streaming Analytics supports multiple inputs from a variety of stream data sources.

1t1.PNG*This image is from Microsoft

After the inputs are defined in Azure Streaming Analytics you can reference the inputs by name using Stream Analytics Query Language.

Pattern 5 – Data Correlation, Missing Events, and Erroneous Data

This pattern correlates the data from different streams or within the same stream. Dr. Perera has give some use cases of this pattern in his article, such as matching up two data streams that send events in different speeds, detecting a customer request that has not been responded within one hour, and detecting failed sensors by comparing a set of sensors that monitor overlapping regions.

In Azure Stream Analytics we can take advantage of the T-SQL syntax of the Stream Analytics Query Language to implement the pattern. For example, we can use Join clause to join different streams on the id of monitored object (e.g., the id of a machine where different sensors are installed on) and use the operators provided by T-SQL to find the correction.

Pattern 6 – Interacting with Databases

In many use cases the streaming data alone is not enough for us to dig out meaningful insight for the businesses. The data from the streaming source can only become useful when combined with historical, businesses oriented data. The streaming analytics service need to be able to fetch data from other business databases and combine with streaming data. For example, we need to check the blacklists when processing a real-time service request.

Azure Stream Analytics do provides the supports of reference data join in the Stream Analytics Query Language. To use this feature, we need to create a Reference type input that fetch the reference data from Azure Blob storage.1t1

Up until to the point, only Azure Blob storage is support as the reference data source for Azure Stream Analytics. We need to use Azure Data Factory to move the reference data from where they are originally stored into a Azure Blob storage instance. The reference data is modeled as a sequence of blobs in ascending order by the datatime specified in the blob name.

As most of reference data is slow changing type of data, the streaming analytics solutions also need to ensure the reference data they combined with the streaming data is up-to-date. Azure Stream Analytics do provides an approach to support slow changing reference data but has some limitations.

Firstly, the reference data blob stored in the Azure Blob storage cannot be updated as that would cause the Stream Analytics jobs to fail. Therefore, we can only add a new blob to store the updated reference data using the same container and path pattern defined in the job input with a date/time greater than the one specified in the last blob in the sequence. Secondly, the old reference data blobs must not be altered or removed.

Pattern 7 – Detecting Temporal Event Sequence Patterns

In this pattern, the streaming analytics is used to detect the temporal event sequence patterns. For example, a machine may fail to work after showing a sequence of status in a certain order. The streaming analytics solution need to be able to detect the sequence pattern so that an alert can be sent to engineers when the pattern occurs.

In the example provided by Dr. Perera, he used Storm and Siddhi (a CEP engien) to detect the temporal event sequence patterns. We can use the Stream Analytics Query Language in Azure Stream Analytics to implement the example. However, I think a better solution that can cope with more complicated use cases is to use machine learning algorithm to detect the pattern and make the prediction. Azure Stream Analytics provides good supports to the Azure Machine Learning. I will provide more details about the Azure Stream Analytics and Azure Machine Learning integration when discussing the Pattern 12.

Pattern 8 – Tracking

This pattern refers to the streaming analytics use cases on tracking something over space and time in one or more given conditions. Those use cases are often combined with IoT use cases that monitoring the real-time status or movements with something. For example, tracking the movement of missing airline luggage.

Azure Stream Analytics comes with real-time geospatial analytics capability that provides native functions for geospatial operations such as computing geospatial data as points, lines, polygons and also supports the join of multiple geospatial data streams to solve more complicated use cases.

Pattern 9 – Detecting Trends

This pattern detects the trend over time series data, e.g., usage increases and drops, peaks, outliers etc. Same as Pattern 8, this pattern is often used in the IoT use cases.

In Azure Stream Analytics, for simple use cases, we can use Stream Analytics Query Language to query the peak (MAX) value, outliers (ANOMALYDETECTION), and start value and end value in a time window for computing the trends . When combined with Power BI  dashboard, we can provide the time series based charts to visualise the trends.

For more complicated use cases, we may need to use some other functions outside of the Stream Analytics Query Language (e.g., is_monotonic_decreasing /is_monotonic_increasing in Python) or we may need time-series analysis model (e.g., ARIMA) for forecasting use cases.  At this moment, Azure Stream Analytics does not support Python or R. However, we can take a workaround that implements the algorithm in Azure Machine Learning studio with Python or R scripts and publish it as a rest api and then integrate it with the Azure Stream Analytics.

Pattern 10 – Running the same Query in Batch and Realtime Pipelines

I found the title of this Pattern “Running the same Query in Batch and Realtime Pipelines” is a bit of confusing, but from Dr. Perera’s explanation, this pattern refers to the Lambda Architecture which is the most popular data analytics architecture used in IoT use cases at this moment.

Lambda Architecture separates the IoT data analytics into two paths, hot path (in other name, speed layer) and cold path (batch layer). The hot path refers to the stream data processing path and the cold path refers to the batch-oriented data processing path. Microsoft Azure Cortana Intelligence suite provides good supports to the Lambda Architecture. More details can be found here.

1t1.PNG

*This snapshot is from Microsoft

Pattern 11 – Detecting and switching to Detailed Analysis

This pattern is used for the use cases where an anomaly or behaviour can be identified by the streaming analytics and further detailed analysis is required against the historical data. I think this pattern can be viewed as a sub-pattern of Pattern 10.

This pattern can be supported on Azure platform using Lambda Architecture as introduced above.

Pattern 12 – Using a Model

This pattern refers to use machine learning model in stream analytics. I have mentioned some use cases in previous patterns where machine learning model need to be used.

Azure Stream Analytics provide a Azure ML type function to support the integration with Azure Machine Learning.

1t1

The machine learning developers can implement the model using Azure Machine Learning studio and publish as a rest api. An Azure Stream Analytics job can call the api using the Azure ML function.

1t1.PNG

*This snapshot is from Microsoft

Pattern 13 – Online Control

This pattern refers to AI-related use cases such as autopilot, self-driving and robotics. Dr. Perera does not provide much details about this pattern in his article and presentation slides. I think Azure Stream Analytics is not designed for this type of application.

Pattern 14 (additional) – Edge Analytics

I have added this pattern to Dr. Perera’s list as Edge computing has become more and more important in IoT use cases and Azure Stream Analytics along with Azure Machine Learning are the main component in Microsoft’s Edge computing offer.

With Azure Stream Analytics on IoT Edge, the real-time analytics intelligence can be deployed close to IoT devices to achieve low latency, resiliency, efficient use of bandwidth and compliance.

2.PNG

*This snapshot is from Microsoft

 

End-to-End Azure Data Factory (V1) Pipeline for Star Schema ETL (Part 4)

This is the last part of the blog series demonstrating how to build an end-to-end ADF pipeline for data warehouse ELT.

2

In the previous part we created the U-SQL job that incrementally extracts the machine cycle rows and then stores them into a staging area in the ADLS. In this part of the blog series we will create the LoadFactMachineCycle pipeline that loads the machine cycle rows from the staging area in the ADLS to the [stg].[MachineCycle] staging table in target DW database and then call a stored procedure that looks up the reference id of the dimension tables (DimMachine, DimCustomer, DimDate), create measures and load the transformed machine cycle data into the fact table, FactMachineCycle.

We start from creating this Stored Procedure, namely [stg].[uspLoadFactMachineCycle] for loading machine cycle fact table. This stored procedure joins the [stg].[MachineCycle] table and the  dimension tables to get the id of those tables and calculates the machine cycle duration measure from the cycle StartDateTime column and EndDateTime column.

 

CREATE PROC [stg].[uspLoadFactMachineCycle] AS
BEGIN
   
   INSERT INTO [prod].[FactMachineCycle]
       SELECT S.[CycleId]
          ,CASE 
            WHEN DM.Id IS NULL THEN 0
            ELSE DM.Id
           END AS MachineId
          ,CASE 
            WHEN DC.Id IS NULL THEN 0
            ELSE DC.Id
           END as CustomerId
          ,Year(S.[StartDateTime])*10000+Month(S.[StartDateTime])*100+Day(S.[StartDateTime]) AS DateKey
          ,DATEDIFF(s, S.[StartDateTime], S.[EndDateTime]) AS Duration
          ,GETDATE() AS RowAdded
      FROM [stg].[MachineCycle] S
      LEFT JOIN [prod].[FactMachineCycle] F ON S.CycleId = F.CycleId
      LEFT JOIN [prod].[DimCustomer] DC ON S.CustomerName = DC.Code AND DC.CurrentRow=1 
      LEFT JOIN [prod].[DimMachine] DM ON S.MachineName = DM.Code AND DM.CurrentRow=1
      WHERE F.CycleId IS NULL

END

We then move on to create the ADF pipeline for the machine cycle fact table loading. the following ADF files will be crated (highlighted in yellow colour).

3

The snapshot below shows the pipeline diagram:

1

Firstly we need to create a linked service (AzureDataLakeStoreLinkedService) that points to the ADLS where the staging machine cycle csv files are stored.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
  "name": "AzureDataLakeStoreLinkedService",
  "properties": {
    "type": "AzureDataLakeStore",
    "typeProperties": {
      "dataLakeStoreUri": "adl://{store account name}.azuredatalakestore.net",
      "servicePrincipalId": "{service principal id}",
      "servicePrincipalKey": "{service principal key}",
      "tenant": "{tenant},
      "subscriptionId": "{subscription id}",
      "resourceGroupName": "{resource group name}"
    }
  }
}

Next, We need to create the input and output ADF tables, InputMachineCycle, OutputStgMachineCycle and the OutputFactMachineCycleInputMachineCycle ADF table points to the staging machine cycle csv file, stgMachineCycles.scv, in the ADLS. The columns in the csv file need to be explicitly declared in the structure property. It is important to specify the type of the non-string columns, e.g., CycleId (Int32) and StartDateTime (Datetime), so that those columns can be converted from a string type in the csv file to the correct type in the DW database.

 

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
    "name": "InputMachineCycle",
  "properties": {
    "type": "AzureDataLakeStore",
    "linkedServiceName": "AzureDataLakeStoreLinkedService",
    "structure": [
      { "name": "CycleId","type": "Int32"},
      { "name": "CustomerCode"},
      { "name": "MachineCode"},
      { "name": "StartDateTime", "type": "Datetime" },
      { "name": "EndDateTime", "type": "Datetime" },
      { "name": "EventTime", "type": "Datetime" }
    ],
    "typeProperties": {
      "folderPath": "IoT/Staging",
      "fileName": "stgMachineCycles.csv",
      "partitionedBy": []
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
The OutputStgMachine ADF table points to the [stg].[MachineCycle] staging table in the target DW database.
{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputStgMachineCycle",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [
      { "name": "CycleId" },
      { "name": "CustomerName" },
      { "name": "MachineName" },
      { "name": "StartDateTime" },
      { "name": "EndDateTime" },
      { "name": "LastModified" }
    ],
    "typeProperties": {
      "tableName": "[stg].[MachineCycle]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}
 The OutputFactMachineCycle ADF table points to the FactMachineCycle table in the target DW database.
{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Table.json",
  "name": "OutputFactMachineCycle",
  "properties": {
    "type": "AzureSqlTable",
    "linkedServiceName": "DWLinkedService",
    "structure": [

    ],
    "typeProperties": {
      "tableName": "[prod].[FactMachineCycle]"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

After the input and output ADF tables have been created we can start to build the LoadFactMachineCycle pipeline which contains two activities. The first activity, LoadStgMachineCycle, copies machine cycle data from the staging csv file stored in the ADLS to the staging machine cycle table, [stg].[MachineCycle], in the target DB. The TabularTranslator is configured to map the columns from the csv file to the staging database table. The second activity in the pipeline is a SqlServerStoredProcedure activity that calls the stored procedure we created earlier to load the machine cycle data from [stg].[MachineCycle] to the target [prod].[FactMachineCycle] table.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "LoadFactMachineCycle",
  "properties": {
    "description": "Load machine cycles from azure data lake store to staging table, and then call stored procedure to load the fact table",
    "activities": [
      {
        "name": "LoadStgMachineCycle",
        "type": "Copy",
        "inputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "outputs": [
          {
            "name": "OutputStgMachineCycle"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "AzureDataLakeStoreSource"
          },
          "sink": {
            "type": "SqlSink",
            "SqlWriterTableType": "[stg].[MachineCycle]",
            "sqlWriterCleanupScript": "TRUNCATE TABLE [stg].[MachineCycle]"
          },
          "translator": {
            "type": "TabularTranslator",
            "ColumnMappings": "CycleId: CycleId, CustomerCode: CustomerName, MachineCode: MachineName, StartDateTime: StartDateTime, EndDateTime:EndDateTime, EventTime:LastModified"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      },

      {
        "name": "LoadFactMachineCycle",
        "type": "SqlServerStoredProcedure",
        "inputs": [
          {
            "name": "OutputStgMachineCycle"
          }
        ],
        "outputs": [
          {
            "name": "OutputFactMachineCycle"
          }
        ],
        "typeProperties": {
          "storedProcedureName": "stg.uspLoadFactMachineCycle",
          "storedProcedureParameters": {}
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

The snapshot below shows the [prod].[FactMachineCycle] table after the machine cycle data has been load by the ADF pipeline.

2

Up to this point, we have completed the end-to-end ADF pipeline that extracts data from Azure SQL DB and ADLS, and load to type 2 SCD dimension tables and fact table in a incremental loading mode.

1

The snapshot below shows all the visual studio files we have created for building the pipeline.

End-to-End Azure Data Factory (V1) Pipeline for Star Schema ETL (Part 3)

This is the third part of the blog series to demonstrate how to build an end-to-end ADF pipeline for data warehouse ELT. The part will describe how to build an ADLA U-SQL job for incremental extraction of machine cycle data from Azure Data Lake store and go through the steps for scheduling and triggering the U-SQL job using ADF.

In the first part of the blog series we have created a Azure Data Lake store directory to store the machine cycle data in csv format.

3

We are going to create a U-SQL job to query the machine cycle records based on the SliceStart datatime and the SliceEnd datatime. The ADF pipeline will pass the start time and the end time of current ADF ingestion slice into the U-SQL job, and the U-SQL job will only returns the machine cycle ended within this period. This example focuses on the same-day machine cycle scenario and does not cover the across-day machine cycle scenario. There is another blog post I have written specifically for the across-day scenario here.

Instead of creating an U-SQL script file to host the code of the U-SQL job, we prefer to create an ADLA catalog stored procedure to simplify the scripts organisation and deploying. Two parameters are defined in the stored procedure that take the SliceStart datatime and SliceEnd datatime from ADF. The year, month, day parts will be extracted from the SliceStart datatime for building the folder path that points to the location where the machine cycle csv files are stored.

The U-SQL script will then extract the rows from the csv files and SELECT the rows within the current ADF execution slice. The selected rows will be output into a staging area in the Azure Data Lake store.

CREATE DATABASE IF NOT EXISTS ADFDW;

DROP PROCEDURE IF EXISTS ADFDW.dbo.uspGetMachineCycles;
CREATE PROCEDURE IF NOT EXISTS ADFDW.dbo.uspGetMachineCycles(@sliceStart DateTime, @sliceEnd DateTime)
AS
BEGIN

    DECLARE @sliceYear string = @sliceStart.Year.ToString();
    DECLARE @sliceMonth string = @sliceStart.Month.ToString().PadLeft(2, '0');
    DECLARE @sliceDay string = @sliceStart.Day.ToString().PadLeft(2, '0');

    DECLARE @InputPath = @"/IoT/Curated%20Zone/Demo/ADFDW/"
                         + @sliceYear + "/" + @sliceMonth + "/" + @sliceDay + "/{*}.csv";

    DECLARE @OutputFile string = "/IoT/Staging/stgMachineCycles.csv";

    @machineCycles =
        EXTRACT CycleId int,
                CustomerCode string,
                MachineCode string,
                StartDateTime DateTime,
                EndDateTime DateTime,
                EventTime DateTime
        FROM @InputPath
        USING Extractors.Csv(skipFirstNRows : 1);

    @result =
        SELECT *
        FROM @machineCycles
        WHERE EndDateTime >= @sliceStart AND EndDateTime < @sliceEnd;


    OUTPUT @result
    TO @OutputFile
    USING Outputters.Csv();

END;

After we created the U-SQL script, we need to create the ADF pipeline to schedule and trigger the U-SQL job. As the machine cycle fact table need to look up for the machine and customer dimension tables. We need to schedule the execution of the U-SQL job immediately after the dimension tables are load:

4

The rest of this blog post will go through the steps to build the IncrementalExtractMachineCycle ADF pipeline for triggering the U-SQL job.

Firstly, we need to create a AAD service principal for ADF and ADLA using to access the csv files stored in the ADLS. You can find the detailed steps to create service principal and to assign ADLS access in Microsoft official documents (don’t forget to turn the allowing service access option on in the ADLS firewall settings).

With the service principal we can create the AzureDataLakeAnalyticsLinkedService file in ADF that configure the link to the ADLA service where the U-SQL job will be executed.

{
    "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.LinkedService.json",
    "name": "AzureDataLakeAnalyticsLinkedService",
    "properties": {
        "type": "AzureDataLakeAnalytics",
        "typeProperties": {
          "accountName": "ninjago0786demp",
          "subscriptionId": "{subscription id}",
          "resourceGroupName": "{resource group}",
          "dataLakeAnalyticsUri": "azuredatalakeanalytics.net",
          "servicePrincipalId": "{serivce principal id}",
          "servicePrincipalKey": "{service principal key}",
          "tenant": "tenant"
      }
    }
}

We can then create the IncrementalExtractMachineCycle pipeline linked to the AzureDataLakeAnalyticsLinkedService and create a DataLakeAnalyticsU-SQL type activity to trigger the U-SQL job. In the activity, we set the “script” property as the commend to call the ADFDW.dbo.uspGetMachineCycles we created earlier and pass the SliceStart and SliceEnd as parameters.

As the U-SQL job is expected to run after both the DimMachine and DimCustomer tables are loaded, we set both OutputDimCustomer and OutputDimMachine ADF tables (the output of the LoadDimCustomer pipeline and the LoadDimMachine pipeline)  as the inputs of the DataLakeAnalyticsU-SQL activity. For the output of the activity, we set it as the InputMachineCycle ADF table which is the input for the pipeline to load the machine cycle data from ADLS staging area to the target data warehouse. The implementation of  inputMachineCycle ADF table will be covered in the next part of the blog series.

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json",
  "name": "IncrementalExtractMachineCycle",
  "properties": {
    "description": "Extract machine cycles within current time slice",
    "activities": [
      {
        "name": "ExtractMachineCycle",
        "linkedServiceName": "AzureDataLakeAnalyticsLinkedService",
        "type": "DataLakeAnalyticsU-SQL",
        "typeProperties": {
          "script": "ADFDW.dbo.uspGetMachineCycles(System.DateTime.Parse(@sliceStart), System.DateTime.Parse(@sliceEnd));",
          "parameters": {
            "sliceStart": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceStart)",
            "sliceEnd": "$$Text.Format('{0:yyyy-MM-ddTHH:mm:ssZ}', SliceEnd)"
          }
        },
        "inputs": [
          {
            "name": "OutputDimCustomer"
          },
          {
            "name": "OutputDimMachine"
          }
        ],
        "outputs": [
          {
            "name": "InputMachineCycle"
          }
        ],
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        }
      }
    ],
    "start": "2017-11-10T20:00:00",
    "end": "2017-11-13T01:00:00"
  }
}

Now we have the U-SQL job created and scheduled that will extract the machine cycle rows within the current ADF execution slice and store them into the staging area in the ADLS. The next part of the blog series will create the FactMachineCycle loading pipeline to load the machine cycle rows in csv file format from ADLS to the staging table in the target DW and then create a store procedure to transform and load the data into the machine cycle fact table.