microsoft-bitools.blogspot.com Open in urlscan Pro
2a00:1450:4001:825::2001  Public Scan

Submitted URL: https://microsoft-bitools.blogspot.com.admin-us.cas.ms/
Effective URL: https://microsoft-bitools.blogspot.com/
Submission: On March 31 via automatic, source certstream-suspicious

Form analysis 1 forms found in the DOM

https://microsoft-bitools.blogspot.com/search

<form action="https://microsoft-bitools.blogspot.com/search" class="gsc-search-box" target="_top">
  <table cellpadding="0" cellspacing="0" class="gsc-search-box">
    <tbody>
      <tr>
        <td class="gsc-input">
          <input autocomplete="off" class="gsc-input" name="q" size="10" title="search" type="text" value="">
        </td>
        <td class="gsc-search-button">
          <input class="gsc-search-button" title="search" type="submit" value="Search">
        </td>
      </tr>
    </tbody>
  </table>
</form>

Text Content

MICROSOFT BI TOOLS

Blog about Microsoft Business Intelligence: Azure, Power Platform, SQL Server
and related matter





SUNDAY, 22 MARCH 2020


BUILD DYNAMIC PIPELINES IN AZURE DATA FACTORY


Case
I want to create multiple Azure Data Factory (ADF) pipelines that are using the
same source and sink location, but with different files and tables. How can we
do this in ADF when taking extensibility and maintainability of your pipelines
into account?
















Solution
Let's say we have a scenario where all the source files are stored into our
Azure Storage, for example Blob Storage. We want to load these files into a
traditional Data Warehouse (DWH) using an Azure SQL Database, that contains a
separate schema for the Staging layer. Earlier we showed you how you can do this
using a Foreach Loop in one single pipeline. For this scenario we will not this
because:

 * Not loading all Staging tables every day on the same time. 
    * Sometimes it will only load 3 of the total 5 files, because not all the
      files are coming for a certain day and can be different per day. 
    * Load can be spread during the day, several moments.

 * Be more flexible per Staging load.
    * In most cases you want to have the same Staging load for every table, but
      sometimes transformations can be different or you need an extra table to
      prepare the dataset.

 * The use of technical columns in Staging table.
    * In most cases you want to store some metadata, for example "inserted date"
      and "inserted by". When using a "Copy Data Activity", you have to
      configure the mapping section when the source and sink fields are not
      equal. 

One of the solutions is building dynamic pipelines. This will be a combination
of parameters, variables and naming convention. The result will be a dynamic
pipeline, that we can clone to create multiple pipelines using the same source
and sink dataset. In this blog post we will focus on the Staging layer. Of
course you can implement this for every layer of your DWH.

Note:
When working in a team, it is important to have a consistent way of development
and have a proper naming convention in place. This contributes to the
extensibility and maintainability of your application. Click here for an example
of naming convention in ADF.




PIPELINE

For this solution, we will create a pipeline that contains the following
activities:
 * Copy Data activity (load data from source to sink)
 * Stored Procedure activity (update technical columns)

The source will be a CSV file and is stored in a Blob container. The data is
based on the customer sales table from WideWorldImporters-Standard. The file
will be delivered daily.


Storage account
bitoolssa
Blob container
sourcefiles
Blob folder
WWI
File name*
SalesCustomers20200322.csv

* this is the blog post date, but it should be the date of today

Note:
Blob storage containers only have virtual folders which means that the folder
name is stored in the filename. Microsoft Azure Storage Explorer will show it as
if it are real folders.



1) LINKED SERVICE

Open the ADF portal, go to Connections - Linked services and and click on New.
Select Azure Blob Storage and give it a suitable (generic) name. Make connection
with your storage account. Create another Linked service for Azure SQL Database,
because that will be our destination (sink). Click here how to make connection
using Azure Key Vault.

ADF portal - Create Linked Service



























2) DATASET (SOURCE)

Click New dataset and select Azure Blob Storage. The format is DelimitedText.
Give it a suitable (generic) name and select the Linked Service for Blob Storage
that we created earlier. 


Once you click 'OK', it will open the dataset automatically. Go to
Parameters and add the following:
 * Container -> type "String"
 * Directory -> type "String"
 * File -> type "String"
 * ColumnDelimiter -> type "String"

Go to Connection and now use the applicable parameters to fill File path. You
can apply dynamic content for each setting. For now, we also added a parameter
for "Column delimiter". At last, we use First row as header.

Click Publishing to save your content locally. For now we did not configure a
Git Repository, but of course we recommend that.

ADF portal - Create Source Dataset


















































Note:
Use Preview data to verify if the source input is as expected by filling in the
correct parameters values.



3) DATASET (SINK)

Create another dataset for the sink. Choose Azure SQL Database and give it a
suitable (generic) name. Select the Linked Service for Azure SQL Database that
we created earlier and click 'OK'.

Add the following Parameters:

 * SchemaName (String)
 * TableName (string)

Go to Connection and click Edit. Fill in both parameters using dynamic content. 

ADF portal - Create Sink Dataset




























4) PIPELINE - COPY DATA

Create a new pipeline and include the schema and table name in the name, like
"PL_Load_Stg_Customer". Go to Variables and add the following including values:
 * Container -> type "String" and value "sourcefiles"
 * Directory -> type "String" and value "WWI"
 * File -> type "String" and value "SalesCustomers"
 * ColumnDelimiter -> type "String" and value ","

Add the Copy data activity, go to Source and use dynamic content to assign the
variables to the input parameters. For the file, we use also expression
language to retrieve the correct name of the file dynamically:
"@{variables('File')}@{formatDateTime(utcnow(), 'yyyyMMdd')}.csv"


Go to Sink and fill in the schema and table name. We use the SPLIT function to
retrieve this from the pipeline name. We use a Pre-copy data script to truncate
the table before loading.

At last, go to Mapping and click on Import schemas. This will automatically map
the columns with the same names (source and sink). We will remove the columns
that do not exists in the source, in this case our technical columns
"InsertedDate" and "InsertedBy". We will fill those columns in the next
activity.



ADF portal - Create Copy data activity





















5) PIPELINE - STORED PROCEDURE

Add the Stored Procedure activity and give it a suitable name. In SQL Account,
select the Linked service for Azure SQL Database that we created earlier. We
created a SP, that contains dynamic SQL to fill the columns "InsertedDate" and
"InsertedBy" for every Staging table. See code below.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
==========================================================================================================================
Stored Procedure [dbo].[uspPostLoadStaging]
==========================================================================================================================
Description:
 
This query will load the following columns:
- InsertedDate
- InsertedBy
 
In ADF, without using Data Flows (Mapping), you can combine a Copy data activity
with a Stored Procedure in order to
fill those (technical) columns during execution of the pipeline. In SSIS this
was done by the Derived Column task.
 
==========================================================================================================================
Parameter     Parameter description
--------------------------------------------------------------------------------------------------------------------------
@InsertedDate    Date when the data was inserted into the Staging table.
@InsertedBy     Name of a service / account that has inserted the data into the
Staging table.
==========================================================================================================================
 
==========================================================================================================================
Change history
 
Date  Who      Remark
--------------------------------------------------------------------------------------------------------------------------
2020-03-22 Ricardo Schuurman  Intial creation.
==========================================================================================================================
*/
 
CREATE PROCEDURE [dbo].[uspPostLoadStaging]
 @SchemaName   AS NVARCHAR(255)
, @TableName   AS NVARCHAR(255)
, @InsertedDate  AS DATETIME
, @InsertedBy   AS NVARCHAR(255)
 
AS
BEGIN
 
 SET NOCOUNT ON;
 BEGIN TRY
 
  DECLARE
   @QueryStep1    AS NVARCHAR(MAX)
  , @QueryStep2    AS NVARCHAR(MAX)
 
  -- ERROR variables
  , @errorMsg    AS NVARCHAR(MAX) = ''
  , @errorLine    AS NVARCHAR(MAX) = ''
 
  /* Example values of SP parameters
  , @SchemaName   AS NVARCHAR(255)
  , @TableName   AS NVARCHAR(255)
  , @InsertedDate  AS DATETIME
  , @InsertedBy   AS NVARCHAR(255)
 
  SET @SchemaName   = N'Stg'
  SET @TableName   = N'Customer'
  SET @InsertedDate  = GETDATE()
  SET @InsertedBy   = N'Test'
 
  */
 
  /*
================================================================================================================
   Step 1: Extract schema and table name (based on the ADF pipeline naming
convention)
   ================================================================================================================
*/
 
  -- Add LIKE condition for schema
  SET @SchemaName = '%' + @SchemaName + '%'
 
  SELECT
   @QueryStep1 = '[' + [TABLE_SCHEMA] + '].[' + [TABLE_NAME] + ']'
  FROM [INFORMATION_SCHEMA].[TABLES]
  WHERE TABLE_TYPE = 'BASE TABLE'
  AND TABLE_SCHEMA LIKE @SchemaName
  AND TABLE_NAME = @TableName
 
  /*
================================================================================================================
   Step 2: Execute Update statement using the fixed (technical) columns
   ================================================================================================================
*/
 
  SET @QueryStep2 = N'
   UPDATE ' + @QueryStep1 + '
   SET
    [InsertedDate]  = ''' + CONVERT(NVARCHAR(30),@InsertedDate, 120) + '''
   , [InsertedBy]  = ''' + @InsertedBy + ''';
  '
 
  EXEC SP_EXECUTESQL
   @Query = @QueryStep2
 
 END TRY
 
  -------------------- Error Handling  --------------------
 
  BEGIN CATCH
 
    SELECT
     @errorMsg = ERROR_MESSAGE()
    , @errorLine = ERROR_LINE()
 
    SET @errorMsg = 'Error Occured with the following Message ' + @errorMsg +
'Error Line Number  '+ @errorLine;
 
    THROW 50001, @errorMsg, @errorLine;
 
  END CATCH
 
END



Create the SP in the database, go to Stored Procedure and select the SP. Click
Import parameter and fill the parameters. We use the System variables 'Pipeline
Name' and 'Pipeline trigger time' for "InsertedDate" and "InsertedBy". Reuse the
values of "SchemaName" and "TableName" from the sink (copy data activity).


ADF portal - Create Stored Procedure activity





















Result

Execute the pipeline in ADF (by clicking Debug) and check the result in SQL
Server Management Studio. It works!














Note:
In case of a "SqlFailedToConnect" error, make sure the firewall of the Azure SQL
Database allows the Integration Runtime of ADF to access. Go to the Azure portal
or use sp_set_firewall_rule on the  database to create a firewall rule. 


Summary
In this blog post you saw how you can create a dynamic pipeline using parameters
and variables in combination with a proper naming convention in ADF, for example
the names of your pipelines. This way of working can contribute to a consistent
solution and code. In case of a new Staging load table, clone the existing
pipeline and use the schema and table name in the pipeline name. You only have
to change the mapping in the copy data activity.

In a next post we will continue with dynamic pipelines, but then using metadata
that is stored in a database instead of ADF itself.



Posted by Ricardo Schuurman No comments:
Email ThisBlogThis!Share to TwitterShare to FacebookShare to Pinterest
Labels: ADF, RICARDO

Older Posts Home

Subscribe to: Posts (Atom)




SEARCH THIS BLOG







AUTHORS

 * Joost van Rossum (62)
 * Ricardo Schuurman (22)
 * Mark de Groot (5)
 * Menno Vestering (3)





TOPICS

 * Azure Analysis Services
 * Azure Automation
 * Azure Data Factory
 * Azure Data Lake
 * Azure DevOps
 * Azure Logic Apps
 * Azure SQL Data Warehouse
 * Azure Stream Analytics
 * Microsoft Power Apps
 * Microsoft Power BI





BLOG ARCHIVE

 * ▼  2020 (11)
   * ▼  March (4)
     * Build dynamic pipelines in Azure Data Factory
     * Embed a Power BI tile in Power Apps with filters
     * Power Apps Snack: Loop through combobox items
     * Power Apps: Get next item from gallery
   * ►  February (4)
   * ►  January (3)

 * ►  2019 (18)
   * ►  December (5)
   * ►  November (3)
   * ►  October (2)
   * ►  September (1)
   * ►  August (1)
   * ►  June (1)
   * ►  May (1)
   * ►  March (2)
   * ►  February (1)
   * ►  January (1)

 * ►  2018 (18)
   * ►  December (1)
   * ►  November (1)
   * ►  October (2)
   * ►  September (1)
   * ►  August (2)
   * ►  July (1)
   * ►  June (2)
   * ►  May (1)
   * ►  April (1)
   * ►  March (2)
   * ►  February (2)
   * ►  January (2)

 * ►  2017 (28)
   * ►  December (1)
   * ►  November (3)
   * ►  October (3)
   * ►  September (1)
   * ►  August (2)
   * ►  July (3)
   * ►  June (2)
   * ►  May (2)
   * ►  April (5)
   * ►  March (2)
   * ►  February (2)
   * ►  January (2)

 * ►  2016 (12)
   * ►  December (1)
   * ►  November (3)
   * ►  October (1)
   * ►  September (1)
   * ►  August (1)
   * ►  July (5)





POPULAR POSTS

 * Azure SQL Database vs Azure SQL Data Warehouse
 * Add email notification in Azure Data Factory V2
 * Power BI - Bookmarking feature update (December 2017)
 * Staging with the Azure Data Factory Foreach Loop
 * PowerApps snack: Replace Textbox by Drop down






Powered by Blogger.




Diese Website verwendet Cookies von Google, um Dienste anzubieten und Zugriffe
zu analysieren. Deine IP-Adresse und dein User-Agent werden zusammen mit
Messwerten zur Leistung und Sicherheit für Google freigegeben. So können
Nutzungsstatistiken generiert, Missbrauchsfälle erkannt und behoben und die
Qualität des Dienstes gewährleistet werden.Weitere InformationenOk