Store streaming data to Azure Synapse Pool (Part 3)
In my previous post, I showed you how to stream real time tweets to Power BI using Azure Databricks, Azure Streaming Analytics and Power BI. Then, I showed you how to store those tweets for long time storage in your data lake.
But how do we bring that data to our data warehouse?
First of all: why would you want that? The reason might be that you would like to do some analysis on this data permanently. If you have data scientists who once in a while like to do ad hoc querying using databricks, then you might be fine leaving it in your data lake (blob gen 2). But when you need this data to be related to your data warehouse data, then you might want to store this data relational as well so you can load it off to Power BI or Analysis Services.
"Hang on..." I hear you say "Didn't we already load this data to Power BI the last time?". Yes, but then we streamed to Power BI, and streaming data is gone after a while unless you fetch it and store it for keeps 🙂
How to get started
There are several ways to do it. The easiest one might be to configure your Azure Stream Analytics Job to have a second output and send the tweets not only to Power BI for live streaming, but to also store them directly in your Synapse database. How to do that can be read here. I found it a bit easier to do it in Databricks directly, so I have more control over what part of the tweet I would like to store. So in this blog, we are going to show the Databricks way.
Azure key vault
In order to connect to the Synapse database and the data lake, we want to have a Azure Key Vault unless you are fine with storing passwords in the code itself. The key vault makes it possible to store those keys safely.
If you do not already have a key vault, go create a new resource, choose "key vault", choose a subscription and location and review if you like the other options as they are:
Once the vault is created, go to it, and click on the secrets menu option in the left side.
Click new +generate/import to create a new secret. Now..what value do we need to store so that we can connect to the data lake? This will be the access key that you can find by navigating to your storage account, then click Access keys, and then it is the value for key 1. Copy this value.
Now when creating a new secret, back in the other window: Upload option should be manual, give the secret a name like "accesstodatalakekey" and paste the value of the key in the value field. Click create, and we have created our first secret. If you already have a synapse server in place, you should create another secret for the password of the user you want to use to connect to the dwh.
Next: let's make sure databricks can actually use these secrets. In order to do this, you need to use the databricks UI to do that. Navigate to
https://<databricks-instance>#secrets/createScope. Do note this is case sensitive. Fill in if the creator or all users can use this scope. Creator is you in this case. Next you need to lookup the dns of your key vault which you can find when navigating to your key vault in azure. The resource id can be found in the properties page of the key vault. Once you click create, you are good to go.
Setting up Synapse
If you already have a synapse database, you can skip this part.
To create a Synapse database you need to create a new resource, with type Azure Synapse Analytics (formerly SQL DW). This creates a logical server that can contain several databases.
You need to create a Sql Server and a sql pool, which will be your database. You will also create a sql user to login to the database. Remember the password, we will need to enter it in the Azure key vault as a new secret. Once you are done, click create and you will have a server.
You can connect to your synapse server with SQL management studio. The Server is <yoursynapsename>.database.windows.net. You can login with SQL Server login and use the credentials of the user you created while setting up the server.
Getting the data to Synapse
So we have set up the key vault and connected it with data bricks. How do we load the data to Synapse? Well, turns out, I often find that Azure Databricks is the answer. Makes me wonder if backend BI developers like myself must learn how to code properly instead of using tools like data factory, SSIS and other graphic ETL tools. Especially after reading https://medium.com/free-code-camp/the-rise-of-the-data-engineer-91be18f1e603. Once we have done all this, let's get started with a notebook. I assume you already completed the last two posts and know how to create a notebook in data bricks.
We start out with the following block in python:
sa_key = dbutils.secrets.get(scope = "<yoursecretscope>", key = "<keyfromazurevault>") spark.conf.set( "fs.azure.account.key.<yourdatalake>.dfs.core.windows.net", dbutils.secrets.get(scope = "<yoursecretscope>", key = "<datalakesecret>")) twitter_df_1 = spark.read.format("avro").load("abfss://twitter@<yourdatalake>.dfs.core.windows.net/<pathtoyouravrofiles>/*/*/*/*/*/*/*.avro") twitter_df_2 = twitter_df_1.select("EnqueuedTimeUtc", "Body")
In the previous post, I named the blob container that the event hub stores tweets to 'twitter', if you named it different you should change abfss://twitter to abfss://<yourblobname>.
Now put in the name of your secret scope where I put: <yoursecretscope> and the name of the datalake key in the azure key vault where I put <datalakesecret>, and the name of your datalake gen 2 where I put <yourdatalake>, and also make sure the path to your avro files is right. If you followed the last blog post, your event hub is storing avro files to your data lake gen 2. If you are unsure where it does that, check it out with Azure Storage Explorer.
Next up we have the following block:
# solve some encoding issues with the binary string def bin_to_utf(binstring): return binstring[110:290].decode("utf-8") convertedudf = udf(bin_to_utf) twitter_df_3 = twitter_df_2.withColumn("Tweet", convertedudf(twitter_df_2.Body)).select("EnqueuedTimeUtc", "Tweet")
I put this in there next to make sure the tweet column starts off with the tweet text. The Avro file contains a lot of extra information before the tweet text is there, so I had it start at position 110. You can tweak this to your desire. We also convert it to utf-8.
new_table_name = "dbo.tweets" servername = "<yoursynapseserver>" dwh_user = "<yourusername>@<yoursynapseserver>" dwh_pwd = dbutils.secrets.get(scope = "<yoursecretscope>", key = "synapseww") sql_dw_connection_string = "jdbc:sqlserver://<yoursynapseserver>.database.windows.net:1433;database=<yoursqlpool>;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=50;"
What I do here is set the connection info to your synapse database. Put in the name of the secret that contains the password of the sql user you created for synapse in place of <synapseww>. <yoursqlpool> must be the name of the sql pool you provided.
This code block creates or updates a table named Tweets. Run this, and your data warehouse contains the tweet information 🙂 From here on, we can further integrate it with your other facts and dimensions.
In the next part (part 4), I will integrate the twitter data with other fictional sales data. So we can set up a scenario where a company can tell if tweets about that company are related to sales.