Tag: U-SQL

End-to-End Azure Data Factory Pipeline for Star Schema ETL (Part 1)

This blog series demonstrates how to build an end-to-end ADF pipeline for extracting data from Azure SQL DB/Azure Data Lake Store and load to a star-schema data warehouse database with considerations of  SCD (slow changing dimensions) and incremental loading.

The final pipeline will look as:

1

The machine cycle records will be load from the csv files stored in a Azure Data Lake store, and the reference data of customers and machines will be load form the Reference DB (Azure SQL DB). The ADF pipeline will first load the data into the staging tables in the target DW, and the ADF pipeline will then execute the SQL stored procedures to transform the data into star schema and implement the type 2 SCD.

A number of services within the Azure Cortana Intelligence Suite will be employed to build the end-to-end pipeline, including Azure Data Factory, Azure Data Lake Store, Azure Data Lake Analytics (U-SQL), Azure SQL Database, and the development tools (Visual Studio + SSDT). Four Visual Studio projects will be created, including the ReferenceDB for provisioning the Azure SQL reference database, the DW for provisioning the target Data Wareshourse, the ADLA for incrementally extracting the machine cycle data, and the ADF for the build of the pipeline. You can download the source code here.

This blog post covers the first part of the series to give a brief introduction of the example used in this demonstration and provides the steps for preparing the required demo databases/data lake directory and the sample data.

Preparation

1.Create Visual Studio solution

Create a Visual Studio solution with the SSDT tool to host all the projects required in this demonstration, including the DB projects for ReferenceDB and the target DW, and the ADF project.

2. Create ReferenceDB project

1

Create ReferenceDB project and add the scripts for creating Customer table and Machine table.

Customer table:

CREATE TABLE [dbo].[Customer]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Machine table:

CREATE TABLE [dbo].[Machine]
(
    [Id] INT NOT NULL PRIMARY KEY IDENTITY, 
    [Code] VARCHAR(10) NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Create a post deployment script to add the sample data:

INSERT INTO [dbo].[Customer]
           ([Code]
           ,[Name]
           ,[LastModified])
     VALUES
           ('C0001','Customer1', '2017-11-08 21:21:30'),
           ('C0002','Customer2', '2017-11-08 21:21:30')

INSERT INTO [dbo].[Machine]
           ([Code]
           ,[Name]
           ,[Condition]
           ,[LastModified])
     VALUES
           ('M0001','Machine1', 'Perfect', '2017-11-08 21:21:30'),
           ('M0002','Machine2', 'Good', '2017-11-08 21:21:30')

Publish the project to the Azure tenant to provision the Azure SQL DB.

3. Create the target DW project

2

Create the target DW project and add the scripts for schema, staging tables, dimension tables and fact tables:

Schema – prod:

CREATE SCHEMA [prod]

Schema – stg:

CREATE SCHEMA [stg]

Staging – Customer:

CREATE TABLE [stg].[Customer]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – Machine:

CREATE TABLE [stg].[Machine]
(
    [Code] int NOT NULL,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [LastModified] DATETIME2 NOT NULL
)

Staging – MachineCycle:

CREATE TABLE [stg].[MachineCycle]
(
    [CycleId] int NOT NULL,
    [MachineName] varchar(50) NOT NULL,
    [CustomerName] varchar(50) NOT NULL,
    [StartDateTime] datetime2 NOT NULL,
    [EndDateTime] datetime2 NOT NULL,
        [LastModified] DATETIME2 NOT NULL
)

Dimension – DimCustomer

CREATE TABLE [prod].[DimCustomer]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY, 
    [Name] VARCHAR(50) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Dimension – DimMachine

CREATE TABLE [prod].[DimMachine]
(
    [Id] int NOT NULL PRIMARY KEY IDENTITY,
    [Code] int NOT NULL PRIMARY KEY,
    [Name] VARCHAR(50) NOT NULL, 
    [Condition] VARCHAR(10) NOT NULL, 
    [CurrentRow] bit NOT NULL,
    [ValidTo] DATETIME2 NULL,
    [LastModified] DATETIME2 NOT NULL
)

Facts – FactMachineCycle

CREATE TABLE [prod].[FactMachineCycle]
(
    [CycleId] int NOT NULL PRIMARY KEY,
    [MachineId] int NOT NULL,
    [CustomerId] int NOT NULL,
    [DateKey] int NOT NULL,
    [Duration] float NOT NULL,
        [RowAdded] DATETIME2 NOT NULL
)

In addition to the tables above we also need a Date Dimension table and the scripts to generate the table. There are plenty of scripts you can find from Google, e.g., https://www.codeproject.com/Articles/647950/Create-and-Populate-Date-Dimension-for-Data-Wareho. The generated DimDate table should look like:

6

Finally, we need to publish the project to Azure and will get the DB deployed.

7

4. Create Azure Data Lake directory

Create a directory in Azure Data Lake, e.g., “ADFDW”, and organise the sub-directory hierarchy as year -> month -> day.

3

The sample csv data includes the columns of cycleID, CustomerCode, MachineCode, StartDateTime, EndDateTime, and EventTime.

3

Now we have done our preparation work, the next blog post will cover the steps to build ADF pipelines for dimensional tables ETL.

Handling Across-Day Cycle Issue in Daily Usage Analysis using U-SQL

When analysing daily usage of a machine that can run across days, we need to split the time of a single machine running cycle into right days. As the example below shows, the second machine (D00002) run across two days and the third machine (D00003) run across three days.

t2

To analyse the daily usage of those machines we need to transform data in the example above into the following structure.

t22

As you can see, the record of second machine that run across two days has been split into two records, one for each day. Same as the record of third machine that run across three days, the record has been split into three days. The UsageHours column stores the time the machine run on that day. For example, the second machine started to run at 17:33:21 on 22nd of May and stopped at 14:29:22 on 23rd of May. The original record of the machine running cycle will be split into two rows: the row for day on 22nd of May with UsageHours as 6.4 (between 17:33:21 and 23:59:59) , and the row for day on 23rd of May with UsageHours as 14.5 (between 00:00:00 and 14:29:22).

In this blog post, I am going to introduce how to conduct this kind of data transformation using U-SQL with Azure Data Lake Analytics (ADLA) jobs . The snapshot below shows the U-SQL code.

t4

Firstly, we need to split the machine running cycle into the days it spans. To achieve that we need to join the machine cycle table withe a Date dimension table (here is a way to generate data dimension table using U-SQL introduced by Paul Andrew).

t5

The join operation can give us something like:

t1

We use the CROSS JOIN with WHERE clause to get the days within the span of cycle StartDateTime and the cycle EndDateTime (As the U-SQL JOIN does not support non-equality comparison at this moment, we cannot use INNER JOIN with ON clause)

After we split a single cycle into each day segment in the time span of the cycle, we can use the CASE statement to decide the type of the day segment (i.e. the day when machine starts, the day when machine stops, and the day(s) in middle that run the full 24 hours) and calculate the time in each day segment.

t7

In addition, we need to use another CASE statement to flag the first day of the machine running cycle as 1 and the other days as 0. This is the trick for aggregating the count of total machine running cycles within a period. After we split a single machine cycle into multiple records, the count of records in the machine cycle table will not reflect the count of machine cycles. Therefore, we need to use the Cycle flag in each record to aggregate the count of machine cycles.

t6

 

Generate Device Cycle Records from Raw Telemetry Message using Azure Data Lake Analytics

The raw telemetry data collected from IoT sensor is normally event-based, e.g., a “Device On” message when the device starts to run, and a “Device Off” message when the device stops.

a1

One common data preprocessing task is to transform the raw “On/Off” telemetry data into device cycle records with the start time, end time and the duration of a device’s running cycle.

a2

This post introduces how to implement this kind of data preprocessing task using Azure Data Lake Analytics (ADLA) job.

Firstly, we need to create an U-SQL file in Visual Studio. U-SQL is the SQL-like language for creating ADLA jobs, which takes advantage of both C# and SQL language features.

In the U-SQL file, we can use Extract expression to load the raw telemetry data file.

a3

U-SQL provides the LAG function that accesses to a row at a given offset that comes before the current row in a given order. For the “Device Off” event message, we can use the LAG function, LAG(EventDateTime, 1) OVER(ORDER BY DeviceId, EventDateTime), to location the corresponding “Device On” event message and get the EventDataTime (the time when the event happens, i.e. cycle start time). The cycle end time is the EventDataTime of the “Device Off” message, and cycle duration is the time difference between the cycle start time and the cycle end time.

a4

After the data is processed, we can use U-SQL Output  expression to output the results.

a5