Step by step guide to continuous data loading in Snowflake with Snowpipe and Azure Storage
Snowpipe enables loading data from files as soon as they’re available. This means you can load data from files in micro-batches, making it available to users within minutes, rather than manually executing COPY statements on a schedule to load larger batches.
Event driven, automated data ingestion into a scalable cloud data platform is one of the many things we at Data Engineers love and so do our clients.
After having set this up for a few Snowflake customers now, I thought it might be useful to create a step by step guide with lots of helpful screenshots to assist you setting up this awesome functionality.
While the Snowflake documentation is very good, I like to have a few pretty pictures to follow along with, I hope you do too.
https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-azure.html
The following are the steps to create Snowpipe.
Create a storage Account
If you want to create a new Resource Group in Azure you can do that by going to the Azure Portal.
In the newly created Resource Group (anil-test-resource-group), click new and select storage account as per the below image.
Under the resource group, click new and select storage account as per the below image.
Storage Container
Go inside the storage account that you have recently created click on the container and give the appropriate name.
Storage Queues
When Events (a new file has arrived in this case) are raised they are written to a Storage Queue. The messages raised by the Event Grid we create shortly contain pointers to the Blob Storage Container.
In the Storage Account where you created the Blob Container also create the Storage Queue.
Events
Whenever a file is loaded into the container, the event creates a message in the queue. From the Snowflake documentation.
“Your client application calls a public REST endpoint with the name of a pipe object and a list of data filenames. If new data files matching the list are discovered in the stage referenced by the pipe object, they are queued for loading. Snowflake-provided computes resources load data from the queue into a Snowflake table based on parameters defined in the pipe.”
In the same Storage Account, you created the Blob Container and the Storage Queue, create an event as follows
Click on the add event subscription and select the value as:
Event Schema--> Event Grid Schema
Filter to Event Types--> Blob created
Endpoint Type-->Storage Queues
On Endpoint select the storage account and select the storage queue that you have recently created.
Notification Integration
The following snowflake script creates a Notification Integration.
create notification integration enabled = true type = queue notification_provider = azure_storage_queue azure_storage_queue_primary_uri = '<<storage queue uri>>' azure_tenant_id = '<<tenant id';
azure_storage_queue_primary_uri is the URL from the storage queue that you have recently created.
Go to the storage account and then click on the queues. I have made queue named as "test-snowflake-queue" and URL is https://anillteststorageaccount.queue.core.windows.net/test-snowflake-queue
For azure_tenant_id, go to Azure portal --> click on the menu --> click the Azure Active Directory and copy the directory ID and run the command.
The run the command
Snowflake permissions on Azure
Once, Notification Event is created, run the following command and you will see the AZURE_CONCENT_URL. Copy the URL and paste on the browser and click on the accept button.
desc notification integration SNOWPIPE_TEST_EVENT;
Once you have added the authentication the snowflake can be seen on Enterprise applications.
Go to Azure Active Directory--> Enterprise applications
Add Role Assignment to Azure storage account
Go to storage account--> Access Control(IAM) --> Add a role assignment.
Select the right values
Role--> Storage Queue Data Contributor
Assign access to --> Azure AD user, group, or service principal
Then type snowflake in the select field. you can see the snowflake account and hit save button.
Create Stage
To create a stage on azure blob storage the following command is needed:
create or replace stage my_azure_stage url='azure://xxxxxxx.blob.core.windows.net/mycontainer/load/files' credentials=(azure_sas_token='?sv=2016-05-31&ss=b&srt=sco&sp=rwdl&se=2018-06-27T10:05:50Z&st=2017-06-27T02:05:50Z&spr=https,http&sig=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
To get azure_sas_token, you can have 2 options either from the Azure portal or from Microsoft Azure Storage Explorer.
I am using Azure Storage Explorer. Right-click on blob container and click on "Get Shared Access Signature". Select expiry time and permissions.
Create database command
create database SNOWPIPE_TEST
Run the following command on the snowflake to create the stage.
To view the stage file run following command:
LIST @"SNOWPIPE_TEST"."PUBLIC"."TEST_STAGE"
Create Table to load JSON Data
CREATE OR REPLACE TABLE "SNOWPIPE_TEST"."PUBLIC"."CROPTRACKER_API_DATA" ( "API_DATA" VARIANT );
To create Snowpipe, run the following command
CREATE OR REPLACE pipe "SNOWPIPE_TEST"."PUBLIC"."TEST_SNOWPIPE" auto_ingest = true integration = 'SNOWPIPE_TEST_EVENT' as copy into "SNOWPIPE_TEST"."PUBLIC"."CROPTRACKER_API_DATA" from @"SNOWPIPE_TEST"."PUBLIC"."TEST_STAGE" file_format = (type = 'JSON');
Run the following command to check whether the Snowpipe is working or not
SELECT * FROM "SNOWPIPE_TEST"."PUBLIC"."CROPTRACKER_API_DATA"
If you log in as an ACCOUNT ADMIN, you can see the snowpipe usages and billing.