Combining Microsoft Graph Data Connect information sets in Azure Synapse Analytics

This article covers the process of combining two information sets extracted via an Azure Synapse pipeline using Microsoft Graph Information Connect (MGDC). The steps to deploy the baseline Azure Synapse Analytics workspace to follow this demo are described in my blog here. For users who are not familiar with Azure Synapse analytics, it is a solution that provides a full Extract/Transform/Load (ETL) stack for your data.

We volition demo ii different approaches to combining data sets via Azure Synapse Analytics pipelines. The get-go approach volition use Synapse Spark Notebook with PySpark scripts, while the 2nd one will use the no-code Data catamenia arroyo instead. In both cases we will demo how to effectively combine the Microsoft 365 Messages information set, which contains emails, with the Microsoft 365 Users data set. We will be joining the two data sets on the pUser property which is common to both sets and identifies the tape of specific users.

Choice 1 – Using Synapse Spark Notebook

To go started, we volition demand to create a new Synapse pipeline. To practice so, navigate to your Azure Synapse workspace, and open the Synapse Studio. From the chief workbench, click on the Integrate button in the left navigation bar.

Azure Synapse Analytics' Integrate feature

Figure one – Azure Synapse Analytics' Integrate feature

Beside the Integrate header, click on the + button and select Pipeline from the drop-down menu.

- Creating a new Azure Synapse Analytics pipeline

Figure 2 – Creating a new Azure Synapse Analytics pipeline

We now demand to add together two Copy Data activities to our pipeline: i that will copy the Users (BasicDataSet_v0.User_v1) data fix and ane that will copy the Messages (BasicDataSet_v0.Message_v1) one. In our case, nosotros have defined both activities so that they will return all fields from both information sets. For more details on how to configure Microsoft Graph Data Connect via the Re-create Data activity, you lot can refer to the following article: Build your kickoff Microsoft Graph Data Connect application.

For our demos, the extracted information will be copied to an Azure Data Lake Storage Gen ii location. Also, it is important to note that the activity to excerpt information about messages will have a dynamic filter on the sentDateTime so that information technology only extracts information from the by 24 hours. Figure 3 below shows the new pipeline with the 2 Re-create Data activities on the workbench.

Azure Synapse Analytics pipeline with MGDC copy data activities

Figure 3 – Azure Synapse Analytics pipeline with MGDC copy information activities

Every time our pipeline executes, it will retrieve all emails that were sent in the by 24 hours (based on the dynamic filter mentioned previously) and will retrieve information near all users and accept it stored in our storage account as binary files with JSON rows. Nosotros've configured our pipeline then that each activity copies its extracted data in a folder named accordingly (e.g., messages and users folders). For our scenario, nosotros will schedule our pipeline to automatically go triggered every 24 hours, which means that every day it volition generate a new binary file containing all emails from the past day. The consequence, however, is that the same thing will happen with the users data extract, which will upshot in duplicate users being listed beyond files. We want to make sure that every time we run our pipeline we start with a fresh and most recent list of all users in our environment. To enforce this, we will add together a new Delete activity (under the General category) and ensure all files are deleted from our users folder as a prerequisites to extracting the users data set.

Adding a new Delete activity to our pipeline

Effigy 4 – Adding a new Delete activity to our pipeline

We volition configure the delete activeness to delete every file under the specified location every bit shown in Effigy 5 beneath.

Delete all files in the specified location

Figure five – Delete all files in the specified location

Under the Logging settings tab, we will only uncheck the Enable logging checkbox.

Disabling logging on the delete activity

Effigy six – Disabling logging on the delete activity

Nosotros are at present set to tackle the majority of pipeline's logic. From the Activities list, nether the Synapse category, elevate and drop a new Notebook activity onto the workbench, and make the two copy data activity prerequisites (see Effigy 7).

Add a new Synapse Runbook to the pipeline

Effigy 7 – Add a new Synapse Runbook to the pipeline

From the workbench, select the newly added Notebook action and in the holding panel at the lesser, select the Settings tab. From there, click on the + New push button to create a new notebook.

Creating a new Azure Synapse Notebook

Figure 8 – Creating a new Azure Synapse Notebook

This volition launch the Synapse Notebook editor. In the pinnacle card bar, click on Adhere to and select your Apache Spark pool from the list.

Selecting Apache Spark Pool to run an Azure Synapse Notebook

Figure 9 – Selecting Apache Spark Puddle to run an Azure Synapse Notebook

In the script editor, copy and paste the post-obit PySpark script, making sure to update the value of the filePathMessages and filePathUsers variables with the path to the location where your data is beingness extracted.

from pyspark.sql import functions as f filePathMessages = "wasbs://<messages folder location>" filePathUsers = "wasbs://<users folder location>" storageAccountName = "<storage account name>" storageAccountKey = "<storage account key>" fileStorageUrl = "fs.azure.account.key." + storageAccountName +  ".blob.core.windows.net"   spark.conf.set(fileStorageUrl, storageAccountKey)  messageDataframe = spark.read.json(filePathMessages).withColumn("sender_name",  f.col("sender.emailAddress.proper name")).withColumn("sender_address",  f.col("sender.emailAddress.address")).withColumn("from_name",  f.col("from.emailAddress.name")).withColumn("from_address",  f.col("from.emailAddress.accost")).withColumn("to_name",  f.col("toRecipients.emailAddress.name")).withColumn("to_address",  f.col("toRecipients.emailAddress.accost")) try:                      # There is a hazard that all entries in the ccRecipients column be empty  which volition detect the schema every bit being a string assortment.                      # If it's the example, but add the derived name and address columns as nada          data_type = dict(messageDataframe.dtypes)['ccRecipients']       if data_type == "assortment<string>":             messageDataframe = messageDataframe.withColumn("cc_name", f.lit(""))             messageDataframe = messageDataframe.withColumn("cc_address",  f.lit(""))       else:             messageDataframe = messageDataframe.withColumn("cc_name",  f.col("ccRecipients.emailAddress.name"))             messageDataframe = messageDataframe.withColumn("cc_address",  f.col("ccRecipients.emailAddress.accost"))                      # In that location is a chance that all entries in the bccRecipients cavalcade be empty  which volition detect the schema as being a string array.       # If it'south the case, simply add the derived name and address columns as null          data_type = dict(messageDataframe.dtypes)['bccRecipients']       if data_type == "array<cord>":             messageDataframe = messageDataframe.withColumn("bcc_name", f.lit(""))             messageDataframe = messageDataframe.withColumn("bcc_address", f.lit(""))       else:             messageDataframe = messageDataframe.withColumn("bcc_name",  f.col("bccRecipients.emailAddress.proper noun"))             messageDataframe = messageDataframe.withColumn("bcc_address",  f.col("bccRecipients.emailAddress.address")) finally:       messageDataframe = messageDataframe.driblet("from", "sender", "toRecipients",  "ccRecipients", "bccRecipients")  usersDataframe = spark.read.json(filePathUsers).withColumn("skuIds",  f.col("assignedLicenses.skuId")).withColumn("assignedPlanDates",  f.col("assignedPlans.assignedDateTime")).withColumn("assignedPlanServices",  f.col("assignedPlans.service")).withColumn("puser2",  f.col("puser")).driblet("createddatetime","datarow","id","padditionalInfo","ptenant",  "rowInformation", "userrow", "puser", "pagerow") results = messageDataframe.join(usersDataframe, messageDataframe.puser ==  usersDataframe.puser2,"inner").drib("puser2")  results.write.style("overwrite").saveAsTable("Results")

The script will flatten the JSON's bureaucracy and rename some fields that are duplicated betwixt the two data sets. Information technology will then merge the two data sets and shop them in a new table named Results in our default Synapse Lake database. We are now ready to publish our pipeline and then trigger its execution.

Manually trigger an Azure Synapse pipeline

Figure 10 – Manually trigger an Azure Synapse pipeline

Executing the pipeline can take several minutes to an hour to complete its execution in one case approvers have approved the asking, depending on the size of the data yous are trying to consign. Brand sure you practice not accept any pending approvals for your pipeline so that the pipeline can successfully execute. Once its execution succeeds y'all can go and scan the default Synapse Lake database to visualize the combined information. To exercise then, click on the Data push in the left navigation (database icon), expand the Lake database card, then the default database. Expand the tabular array folder and click on the ellipses abreast the newly created Results database. From the flyout menu, select New SQL Script and then Select Pinnacle 100 rows.

Select Top 100 Rows in Lake Database

Figure 11 – Select Top 100 Rows in Lake Database

Adjacent, execute the query by clicking on the Run button at the elevation of the query editor.

Executing a SQL query in Azure Synapse Analytics

Figure 12 – Executing a SQL query in Azure Synapse Analytics

This will accept a few seconds to execute. Once it succeeds, you should exist able to review the merged information in your lake database.

Image MGDC Synapse blog image 13

Figure 13 – Reviewing Merge Data in Query Editor

Option 2 – Using the No Code Data Flow Activity

If y'all are less familiar with Spark coding or if you lot merely desire a no lawmaking solution for your pipeline, you can use the Data Flow activity inside of your pipeline to perform transformation on your extracted data. For this demo, we will clone the pipeline nosotros created with Pick ane above, with the one difference that we will remove the Notebook activity and replace it with a Data flow action instead. The figure below shows the overview of what the pipeline will look like.

Replacing the Notebook activity by a Data flow activity

Figure 14 – Replacing the Notebook activeness by a Data menstruum activeness

To edit the period's logic, select the new Data flow activity from the workbench and in the property console at the bottom navigate to the Settings tab. From in that location, click on the + New button to create a new flow.

Creating a new Data flow

Figure 15 – Creating a new data flow

This will launch the data flow editor. The first thing we want to practise is load the data we've exported as binary file and flatten their hierarchies. Let u.s. kickoff with the Letters information set up. From the workbench, click on the arrow abreast the Add together Source shape and select Add Source from the flyout menu.

Adding a new data source to our flow

Effigy xvi – Calculation a new data source to our flow

From the property panel at the lesser, click on the + New button beside the Dataset field.

Creating a new data set link

Figure 17 – Creating a new data set link

From the Integration data fix blade, select Azure Data Lake Storage Gen2 and click on the Continue push button at the bottom.

New Azure Data Lake Storage Gen2 integration data set

Figure xviii – New Azure Data Lake Storage Gen2 integration data set

On the information format selection blade, select JSON and click on the Keep push at the bottom.

Selecting JSON format for our integration data set

Figure 19 – Selecting JSON format for our integration information set up

On the Fix properties bract, select the linked service you've previously created to your Azure Data Lake Storage location, specify the path to the folder where your letters data is being exported and click on the OK button at the bottom.

Selecting files location for our integration data set

Figure 20 – Selecting files location for our integration data set

Echo the aforementioned process, but this time for the Users data set. Make sure you ready the path to indicate to the Users binder. At this stage, you lot should have a Information Flow that looks like the following:

Data flow with just two sources

Figure 21 – Data flow with just two sources

Next, we need to combine the two data sources. To do and then, click on the + sign on the correct of the Messages source in the workbench. From the listing of actions, select Bring together.

Adding a Join Action to a data flow

Figure 22 – Adding a Join Activeness to a data flow

In the editor console at the bottom, select the Users source in the Right Stream drib down. For the Bring together status make sure to select the pUser field for both sources.

Configuring a Join Action on the pUser Field

Figure 23 – Configuring a Join Action on the pUser Field

Considering both data sets will contain fields that are named the same, running the catamenia as-is would throw errors complaining about duplicate columns (e.one thousand. pageRow). To remediate to the issue, we will need to add a select activity subsequently the Bring together action to skip duplicate columns since nosotros do not need them both for our scenario. In the workbench, beside the Join action added above, click on the + sign and select the Select action from the list.

Adding a Select Action to a data flow

Figure 24 – Adding a Select Action to a data flow

Simply leave the default values, which past default will skip the duplicate columns.

Configuring the Select Action

Figure 25 – Configuring the Select Action

The last pace in our data menstruum is to determine where to shop the combined data. In our case, nosotros will be selecting a new workspace as the data sink. In the workbench, abreast the newly added Select action, click on the + sign and select Sink from the listing.

Adding a Sink Action to a data flow

Figure 26 – Calculation a Sink Action to a data flow

In the editor panel at the bottom, select Workspace DB from the Sink blazon list, select the default database from the database listing, and type in Results as the name of the table. Note that this will create a new table named Results in the default database and store the combined data sets in it.

Configuring the Sink Action for our data flow

Effigy 27 – Configuring the Sink Action for our information flow

This completes the steps required to combine our information sets using a Data catamenia activity. Yous tin can now publish your pipeline and execute it. Once it successfully completes, you lot can follow the steps from the previous section (Pick 1) to check the merged information using the SELECT TOP 100 ROWS query.

Conclusion

In this article, we have shown you 2 ways of combining data sets extracted from Microsoft Graph Data Connect using Azure Synapse Analytics. While developers may tend to adopt the Spark Notebook arroyo (Option 1) over the low-lawmaking approach described in Option ii, note that both approaches will outcome in the same result. You should choose the approach you and your organization are more familiar with. The data we have exported and combined could then be connected to a Ability BI dashboard using its Azure Synapse Analytics connector to generate valuable Insights and Assay for your organisation.