Snowflake Driven Events

In many data pipelines, we want to react to data being available to the user. Typically, we can do this in a pub/sub model, but there are times when we want data to be extracted into an eventing system from a data store. In something traditional like a MS SQL Server database, we'd use SSIS for this, but with Snowflake, because of the power of unloading data, we can do this a different way.

Architecture Overview

This is roughly what we'll be doing.

/articleimages/snowflake-driven-events.png

Here, we have a stream created against our table. There are a number of ways you can set up a stream, but at a top level, it records changes to your table, which you can then query. The stream advances when you perform a write command.

We then have a task, set to execute every minute, that reads from the stream and calls a procedure. In this procedure, we read from the stream, getting the latest changes to the table, transform it into our event structure, and unload this data into an S3 bucket.

From here, a lambda function is called, triggered by writes to the S3, which extracts the data from the newly written JSON file, and raises the events into a topic. Depending on your needs here, you could do a batch of events, or split this into a single event per record.

This might seem like a Rube Goldberg machine, but from my experience, this set up is incredibly resilient, easily handling ~20,000 messages every five minutes, and actually, all we're really doing here is using Snowflake's data unloading feature to load data into an external file, and then AWS to take it the rest of the way.

Now we've got an idea of what we're trying to achieve, let's look into the individual components

Snowflake Streams

The stream object in Snowflake records DML changes made to a table in a process referred to as Change Data Capture. Streams in Snowflake can be used to track the following:

  • Standard tables (including shared)
  • Views
  • Directory Tables
  • External Tables

Streams in Snowflake work by taking an initial snapshot of the table at the time of stream creation and adding a pair of hidden tracking columns to the table, and then relying on the streams offset information, and the tracking columns to build a picture of changes to the table. Tables in Snowflake have version histories that are added to when a DML operation occurs on a row. The stream will store the version it was at, and the new version (the offset) for each row that has changed.

Creating a stream is simple:

CREATE OR REPLACE STREAM stream_facts ON TABLE fact;

Now to query the stream we can treat it like a table:

SELECT * FROM stream_facts;

Finally you can advance the stream (effectively removing the offset) by performing a DML operation on the stream. Note: querying the stream alone does not advance the stream. Snowflake streams use repeatable-read isolation. When performing a DML operation on a stream in a transaction, the contents of that stream are locked until the transaction is committed. In this example the stream will be cleared up to the point the transaction starts:

INSERT INTO temp table myStreamData
       SELECT * FROM fact_stream;

Another factor to consider with streams is staleness.

A stream becomes stale when its offset is longer than the underlying table's retention period, therefore the ETL process should ensure that the stream is consumed regularly, and notify if there are problems in good time. 14 days is the minimum and if a tables retention period is shorter than 14 days the stream will extend it to match up to the minimum regardless of edition.

To start tracking a stale stream again, simply recreate it

Procedures and Tasks

Next we need to create our task. Rather than including all of the code in the task, it's better to create a procedure that gets called from the task. This way, we can change the procedure, without having to recreate the task which will leave it suspended. The amount of times I've deployed a task and forgetten to resume it is high enough to justify doing this!

create or replace task task_export_data
	warehouse=my_warehouse
	schedule='1 minute'
	when system$stream_has_data('stream_facts')
	as begin
        call data_unload();
    end;

Here, we're creating a task that runs every minute. The when, allows you to skip an execution, in this case when the stream doesn't have any data. Doing it this way, rather than in the body of the task doesn't use any warehouse credits, instead using the application layer of Snowflake to query the metadata of the stream. When the stream does have data, it calls a procedure called data_unload().

Next, let's create the procedure:

CREATE OR REPLACE PROCEDURE DATA_UNLOAD()
RETURNS VARCHAR(16777216)
LANGUAGE SQL
AS
BEGIN
COPY INTO '@my_stage/egress_folder/'
  FROM (
    SELECT object_construct(*)
    FROM fact_stream
  )
  max_file_size = 32000
  single = false
  file_format = format_json_egress
  INCLUDE_QUERY_ID = true;
  RETURN 'SUCCESS';
END
';

This procedure simply uses the COPY INTO command to load the data into a stage. For me, that's normally an S3 bucket, but you'll want to read up on stages if you're not sure what this means. We're also specifying some extra options to help out with some debugging scenarios. Firstly, we're setting the max file size to something sensible. The default is 16MB, but I found that the next part had difficulty extracting that much data, so I reduced it. Single indicates that we don't want to enforce the data to be extracted into a single file. There's a file format specified there as well, that I'm specifying as some sort of JSON format, as it's generally easier to work with in code, but there are a few options here if you're interested. Finally we've got the INCLUDE_QUERY_ID parameter, that adds the ID of the query that produced the files into the file name. This can be useful for debugging.

Triggering the Lambda and Extracting the Data

Now we have some files getting populated with our data into an s3 bucket we need to trigger the extraction of this data into an SNS topic, for the rest of our system to consume. First up we'll create a queue that will be populated with a message whenever there is another file to process. Your CloudFormation file might look something like this:

AWSTemplateFormatVersion: '2010-09-09'
Resources:
  # S3 Bucket
  SnowflakeStageBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: Snowflake-stage

  # SQS Queue
  ExtractFactsQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: extract-facts

  # S3 Bucket Policy to allow S3 to send messages to the SQS queue
  SnowflakeStageBucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref SnowflakeStageBucket
      PolicyDocument:
        Statement:
          - Effect: Allow
            Principal: "*"
            Action: "s3:PutObject"
            Resource: !Sub "arn:aws:s3:::${SnowflakeStageBucket}/*"
          - Effect: Allow
            Principal:
              Service: s3.amazonaws.com
            Action: "sqs:SendMessage"
            Resource: !GetAtt ExtractFactsQueue.Arn
            Condition:
              ArnLike:
                aws:SourceArn: !GetAtt SnowflakeStageBucket.Arn

  # S3 Bucket Notification Configuration
  SnowflakeStageBucketNotificationConfiguration:
    Type: AWS::S3::BucketNotification
    Properties:
      Bucket: !Ref SnowflakeStageBucket
      NotificationConfiguration:
        QueueConfigurations:
          - Event: s3:ObjectCreated:*
            Queue: !GetAtt ExtractFactsQueue.Arn

This is creating the resources needed, creating a policy to allow s3 to send messages to our queue, and then finally creaing a BucketNoticiation, with the queue as the target.

The next piece of the puzzle is a lambda to process the data. Remember, at this point we only have a file full of lines of JSON, so we need to process these files. Set this up however you want, using CloudFormation or SAM, or even directly in the console. The main thing here is that you want the lambda triggered by the queue.

The message you're being sent will be an array of records that have landed in the bucket.

const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const sns = new AWS.SNS();
const readline = require('readline');

const SNS_TOPIC_ARN = process.env.SNS_TOPIC_ARN;

exports.handler = async (event) => {
    for (const record of event.Records) {
        const bucket = record.s3.bucket.name;
        const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));

        try {
            const params = {
                Bucket: bucket,
                Key: key,
            };

            const s3Stream = s3.getObject(params).createReadStream();
            const rl = readline.createInterface({
                input: s3Stream,
                crlfDelay: Infinity
            });

            for await (const line of rl) {
                const jsonObject = JSON.parse(line);
                const message = JSON.stringify(jsonObject);
                const snsParams = {
                    Message: message,
                    TopicArn: SNS_TOPIC_ARN,
                };

                await sns.publish(snsParams).promise();
                console.log(`Message sent to SNS: ${message}`);
            }

            console.log(`Successfully processed ${key} from ${bucket}`);
        } catch (error) {
            console.error(`Error processing ${key} from ${bucket}:`, error);
        }
    }
};

Here we're looping through each file record in the message, then reading the file from S3. For each line, we then check it's valid JSON with a parse, then add it as a payload to our topic in the message parameter. We're then using the SNS topic to publish the message to the topic.

There are some additional things to consider here, that the code has made some assumptions about. A key consideration here is how should we handle errors? In this code, we may end up extracting part of the file, coming across an error, and then failing. This would lead to us emitting the same messages multiple times which might not be desirable. An alternative here could be to bulk send the messages, so this becomes a more ACIDic transaction. The downside of this approach is that a single record in the file might lead to the whole file being unprocessable.

What's Next?

Now that the events are in your system, you can process them however you need! If you're using lambdas, it's easy to subscribe directly to the topic, but you'll probably want a queue in between to do some error handling and to be able to recover nicely from any back pressure. Because they're now events in a topic, each subscriber has full control of the mechanism for handling, so you can safely pass this off to whoever needs to process them, and your solution will be unaffected by any of their choices. The beauty of the publish subscriber model in action!

Conclusion

This pattern works specifically when you have already set up the process of getting data into the table you want to drive the events, and there's no way of getting this data out earlier and ensuring data has landed in the table before emitting the event. But it has some downsides:

  • It's fundamentally a batch process, so expect spikey event emission.
  • It will lead to a more coupled system.
  • It's not particularly easy to understand without some documentation to go along with it.

A more traditional approach would be something like this:

/articleimages/snowflake-driven-events.png

Here, we're already working with an event driven system, rather than a batched system, and we can simply transform the events as they're coming in with a Snowflake connector (e.g. the firehose Snowflake connector), whilst other services are subscribed to the same event. As long as you're ok with some eventual consistency, this tends to be a more extensible, less coupled solution.

But, if you really need a guarantee that your events are triggered by data landing in your Snowflake table and you've already got a data ingress system set up, then this could get you out of a bind.

Comments

contact

Adam Drew adamjustdrewit