microsoft-bitools.blogspot.com
Open in
urlscan Pro
2a00:1450:4001:825::2001
Public Scan
Submitted URL: https://microsoft-bitools.blogspot.com.admin-us.cas.ms/
Effective URL: https://microsoft-bitools.blogspot.com/
Submission: On March 31 via automatic, source certstream-suspicious
Effective URL: https://microsoft-bitools.blogspot.com/
Submission: On March 31 via automatic, source certstream-suspicious
Form analysis
1 forms found in the DOMhttps://microsoft-bitools.blogspot.com/search
<form action="https://microsoft-bitools.blogspot.com/search" class="gsc-search-box" target="_top">
<table cellpadding="0" cellspacing="0" class="gsc-search-box">
<tbody>
<tr>
<td class="gsc-input">
<input autocomplete="off" class="gsc-input" name="q" size="10" title="search" type="text" value="">
</td>
<td class="gsc-search-button">
<input class="gsc-search-button" title="search" type="submit" value="Search">
</td>
</tr>
</tbody>
</table>
</form>
Text Content
MICROSOFT BI TOOLS Blog about Microsoft Business Intelligence: Azure, Power Platform, SQL Server and related matter SUNDAY, 22 MARCH 2020 BUILD DYNAMIC PIPELINES IN AZURE DATA FACTORY Case I want to create multiple Azure Data Factory (ADF) pipelines that are using the same source and sink location, but with different files and tables. How can we do this in ADF when taking extensibility and maintainability of your pipelines into account? Solution Let's say we have a scenario where all the source files are stored into our Azure Storage, for example Blob Storage. We want to load these files into a traditional Data Warehouse (DWH) using an Azure SQL Database, that contains a separate schema for the Staging layer. Earlier we showed you how you can do this using a Foreach Loop in one single pipeline. For this scenario we will not this because: * Not loading all Staging tables every day on the same time. * Sometimes it will only load 3 of the total 5 files, because not all the files are coming for a certain day and can be different per day. * Load can be spread during the day, several moments. * Be more flexible per Staging load. * In most cases you want to have the same Staging load for every table, but sometimes transformations can be different or you need an extra table to prepare the dataset. * The use of technical columns in Staging table. * In most cases you want to store some metadata, for example "inserted date" and "inserted by". When using a "Copy Data Activity", you have to configure the mapping section when the source and sink fields are not equal. One of the solutions is building dynamic pipelines. This will be a combination of parameters, variables and naming convention. The result will be a dynamic pipeline, that we can clone to create multiple pipelines using the same source and sink dataset. In this blog post we will focus on the Staging layer. Of course you can implement this for every layer of your DWH. Note: When working in a team, it is important to have a consistent way of development and have a proper naming convention in place. This contributes to the extensibility and maintainability of your application. Click here for an example of naming convention in ADF. PIPELINE For this solution, we will create a pipeline that contains the following activities: * Copy Data activity (load data from source to sink) * Stored Procedure activity (update technical columns) The source will be a CSV file and is stored in a Blob container. The data is based on the customer sales table from WideWorldImporters-Standard. The file will be delivered daily. Storage account bitoolssa Blob container sourcefiles Blob folder WWI File name* SalesCustomers20200322.csv * this is the blog post date, but it should be the date of today Note: Blob storage containers only have virtual folders which means that the folder name is stored in the filename. Microsoft Azure Storage Explorer will show it as if it are real folders. 1) LINKED SERVICE Open the ADF portal, go to Connections - Linked services and and click on New. Select Azure Blob Storage and give it a suitable (generic) name. Make connection with your storage account. Create another Linked service for Azure SQL Database, because that will be our destination (sink). Click here how to make connection using Azure Key Vault. ADF portal - Create Linked Service 2) DATASET (SOURCE) Click New dataset and select Azure Blob Storage. The format is DelimitedText. Give it a suitable (generic) name and select the Linked Service for Blob Storage that we created earlier. Once you click 'OK', it will open the dataset automatically. Go to Parameters and add the following: * Container -> type "String" * Directory -> type "String" * File -> type "String" * ColumnDelimiter -> type "String" Go to Connection and now use the applicable parameters to fill File path. You can apply dynamic content for each setting. For now, we also added a parameter for "Column delimiter". At last, we use First row as header. Click Publishing to save your content locally. For now we did not configure a Git Repository, but of course we recommend that. ADF portal - Create Source Dataset Note: Use Preview data to verify if the source input is as expected by filling in the correct parameters values. 3) DATASET (SINK) Create another dataset for the sink. Choose Azure SQL Database and give it a suitable (generic) name. Select the Linked Service for Azure SQL Database that we created earlier and click 'OK'. Add the following Parameters: * SchemaName (String) * TableName (string) Go to Connection and click Edit. Fill in both parameters using dynamic content. ADF portal - Create Sink Dataset 4) PIPELINE - COPY DATA Create a new pipeline and include the schema and table name in the name, like "PL_Load_Stg_Customer". Go to Variables and add the following including values: * Container -> type "String" and value "sourcefiles" * Directory -> type "String" and value "WWI" * File -> type "String" and value "SalesCustomers" * ColumnDelimiter -> type "String" and value "," Add the Copy data activity, go to Source and use dynamic content to assign the variables to the input parameters. For the file, we use also expression language to retrieve the correct name of the file dynamically: "@{variables('File')}@{formatDateTime(utcnow(), 'yyyyMMdd')}.csv" Go to Sink and fill in the schema and table name. We use the SPLIT function to retrieve this from the pipeline name. We use a Pre-copy data script to truncate the table before loading. At last, go to Mapping and click on Import schemas. This will automatically map the columns with the same names (source and sink). We will remove the columns that do not exists in the source, in this case our technical columns "InsertedDate" and "InsertedBy". We will fill those columns in the next activity. ADF portal - Create Copy data activity 5) PIPELINE - STORED PROCEDURE Add the Stored Procedure activity and give it a suitable name. In SQL Account, select the Linked service for Azure SQL Database that we created earlier. We created a SP, that contains dynamic SQL to fill the columns "InsertedDate" and "InsertedBy" for every Staging table. See code below. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 /* ========================================================================================================================== Stored Procedure [dbo].[uspPostLoadStaging] ========================================================================================================================== Description: This query will load the following columns: - InsertedDate - InsertedBy In ADF, without using Data Flows (Mapping), you can combine a Copy data activity with a Stored Procedure in order to fill those (technical) columns during execution of the pipeline. In SSIS this was done by the Derived Column task. ========================================================================================================================== Parameter Parameter description -------------------------------------------------------------------------------------------------------------------------- @InsertedDate Date when the data was inserted into the Staging table. @InsertedBy Name of a service / account that has inserted the data into the Staging table. ========================================================================================================================== ========================================================================================================================== Change history Date Who Remark -------------------------------------------------------------------------------------------------------------------------- 2020-03-22 Ricardo Schuurman Intial creation. ========================================================================================================================== */ CREATE PROCEDURE [dbo].[uspPostLoadStaging] @SchemaName AS NVARCHAR(255) , @TableName AS NVARCHAR(255) , @InsertedDate AS DATETIME , @InsertedBy AS NVARCHAR(255) AS BEGIN SET NOCOUNT ON; BEGIN TRY DECLARE @QueryStep1 AS NVARCHAR(MAX) , @QueryStep2 AS NVARCHAR(MAX) -- ERROR variables , @errorMsg AS NVARCHAR(MAX) = '' , @errorLine AS NVARCHAR(MAX) = '' /* Example values of SP parameters , @SchemaName AS NVARCHAR(255) , @TableName AS NVARCHAR(255) , @InsertedDate AS DATETIME , @InsertedBy AS NVARCHAR(255) SET @SchemaName = N'Stg' SET @TableName = N'Customer' SET @InsertedDate = GETDATE() SET @InsertedBy = N'Test' */ /* ================================================================================================================ Step 1: Extract schema and table name (based on the ADF pipeline naming convention) ================================================================================================================ */ -- Add LIKE condition for schema SET @SchemaName = '%' + @SchemaName + '%' SELECT @QueryStep1 = '[' + [TABLE_SCHEMA] + '].[' + [TABLE_NAME] + ']' FROM [INFORMATION_SCHEMA].[TABLES] WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_SCHEMA LIKE @SchemaName AND TABLE_NAME = @TableName /* ================================================================================================================ Step 2: Execute Update statement using the fixed (technical) columns ================================================================================================================ */ SET @QueryStep2 = N' UPDATE ' + @QueryStep1 + ' SET [InsertedDate] = ''' + CONVERT(NVARCHAR(30),@InsertedDate, 120) + ''' , [InsertedBy] = ''' + @InsertedBy + '''; ' EXEC SP_EXECUTESQL @Query = @QueryStep2 END TRY -------------------- Error Handling -------------------- BEGIN CATCH SELECT @errorMsg = ERROR_MESSAGE() , @errorLine = ERROR_LINE() SET @errorMsg = 'Error Occured with the following Message ' + @errorMsg + 'Error Line Number '+ @errorLine; THROW 50001, @errorMsg, @errorLine; END CATCH END Create the SP in the database, go to Stored Procedure and select the SP. Click Import parameter and fill the parameters. We use the System variables 'Pipeline Name' and 'Pipeline trigger time' for "InsertedDate" and "InsertedBy". Reuse the values of "SchemaName" and "TableName" from the sink (copy data activity). ADF portal - Create Stored Procedure activity Result Execute the pipeline in ADF (by clicking Debug) and check the result in SQL Server Management Studio. It works! Note: In case of a "SqlFailedToConnect" error, make sure the firewall of the Azure SQL Database allows the Integration Runtime of ADF to access. Go to the Azure portal or use sp_set_firewall_rule on the database to create a firewall rule. Summary In this blog post you saw how you can create a dynamic pipeline using parameters and variables in combination with a proper naming convention in ADF, for example the names of your pipelines. This way of working can contribute to a consistent solution and code. In case of a new Staging load table, clone the existing pipeline and use the schema and table name in the pipeline name. You only have to change the mapping in the copy data activity. In a next post we will continue with dynamic pipelines, but then using metadata that is stored in a database instead of ADF itself. Posted by Ricardo Schuurman No comments: Email ThisBlogThis!Share to TwitterShare to FacebookShare to Pinterest Labels: ADF, RICARDO Older Posts Home Subscribe to: Posts (Atom) SEARCH THIS BLOG AUTHORS * Joost van Rossum (62) * Ricardo Schuurman (22) * Mark de Groot (5) * Menno Vestering (3) TOPICS * Azure Analysis Services * Azure Automation * Azure Data Factory * Azure Data Lake * Azure DevOps * Azure Logic Apps * Azure SQL Data Warehouse * Azure Stream Analytics * Microsoft Power Apps * Microsoft Power BI BLOG ARCHIVE * ▼ 2020 (11) * ▼ March (4) * Build dynamic pipelines in Azure Data Factory * Embed a Power BI tile in Power Apps with filters * Power Apps Snack: Loop through combobox items * Power Apps: Get next item from gallery * ► February (4) * ► January (3) * ► 2019 (18) * ► December (5) * ► November (3) * ► October (2) * ► September (1) * ► August (1) * ► June (1) * ► May (1) * ► March (2) * ► February (1) * ► January (1) * ► 2018 (18) * ► December (1) * ► November (1) * ► October (2) * ► September (1) * ► August (2) * ► July (1) * ► June (2) * ► May (1) * ► April (1) * ► March (2) * ► February (2) * ► January (2) * ► 2017 (28) * ► December (1) * ► November (3) * ► October (3) * ► September (1) * ► August (2) * ► July (3) * ► June (2) * ► May (2) * ► April (5) * ► March (2) * ► February (2) * ► January (2) * ► 2016 (12) * ► December (1) * ► November (3) * ► October (1) * ► September (1) * ► August (1) * ► July (5) POPULAR POSTS * Azure SQL Database vs Azure SQL Data Warehouse * Add email notification in Azure Data Factory V2 * Power BI - Bookmarking feature update (December 2017) * Staging with the Azure Data Factory Foreach Loop * PowerApps snack: Replace Textbox by Drop down Powered by Blogger. Diese Website verwendet Cookies von Google, um Dienste anzubieten und Zugriffe zu analysieren. Deine IP-Adresse und dein User-Agent werden zusammen mit Messwerten zur Leistung und Sicherheit für Google freigegeben. So können Nutzungsstatistiken generiert, Missbrauchsfälle erkannt und behoben und die Qualität des Dienstes gewährleistet werden.Weitere InformationenOk