Snowflake streams and tasks provide a mechanism for automatically updating one table, as soon as data is loaded into another table. A Snowflake stream tracks all data manipulation language (DML) changes to a table, and by wrapping a stored procedure inside of a Snowflake task, we can update one table based on the DML changes of another table.
Something I commonly see in a production environment is views created from another view, simply to move the dataset to a different database or schema:
create or replace destination_schema.my_2nd_view
as
select
...
from source_schema.my_first_view
Nested views can negatively affect performance, as views are computed at runtime. If you simply need to move a dataset from one location to another within Snowflake (for example, from staging to prod, or the databse you expose to end users), using a streams and tasks workflow is a much better alternative and will allow you to create a table rather than a view.
To start, lets set a few session variables and initialize our session:
/* Set session variables */
set role_name = 'sysadmin';
set wh = 'my_wh';
set db = 'my_db';
set sch = 'my_schema';
set dest_table = 'my_table';
set stream_name = 'my_stream';
set source_table = 'source_db.source_schema.source_table'; --Use the fully qualified name here if your source table is in a different db/schema
set proc_name = 'my_procedure';
set task_name = 'push_my_table';
/* Initialize Environment */
use role identifier($role_name);
use warehouse identifier($wh);
Next, lets create our database, table, and stream objects. To reference a variable when creating a Snowflake object, wrap your variable inside identifier
using identifier($my_variable
):
/* Create database, table and stream objects */
create database if not exists identifier($db);
create schema if not exists identifier($sch);
use database identifier($db);
use schema identifier($sch);
create table if not exists identifier($dest_table)
comment='JSON data from API, streaming from the source database'
clone identifier($source_table);
create stream if not exists identifier($stream_name)
on table identifier($source_table)
comment = 'CDC stream from source table to prod table';
/* quick diagnostic check */
show streams;
select * from identifier($stream_name);
A Snowflake stream is a schema level object that contains the same fields as your source table, but also includes the following fields:
-
METADATA$ACTION
: Specifies whether the action is anINSERT
or aDELETE
-
METADATA$ISUPDATE
: Specifies whether theINSERT
orDELETE
is part of anUPDATE
-
METADATA$ROW_ID
: A unique row ID. This ID can be used to track changes to specific rows over time
So, in our basic example above, here is how the results of select * from identifier($stream_name);
would look:
NAME | AMOUNT | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID |
---|---|---|---|---|
John | 5 | INSERT | FALSE | 3d5ttsht47wssy8bv |
Next, lets create a stored procedure to house our code - in this case, we’ll use the MERGE
keyword, which will only incrementally load new records into our destination table from our source table. Note that session variables can be used inside of a stored procedure if the procedure is executed as caller
- however, we don’t want to do that in this case, because when the task runs later that calls the procedure, the session variable won’t be active. A unique identifier at the row level is required here, shown below as json_data:ID
:
/*
Create stored procedure -
tailor this to your needs
*/
create or replace procedure identifier($proc_name)()
returns varchar
language sql
execute as owner
as
$$
begin
merge into my_table DEST using (
select * from my_stream
qualify row_number() over (
partition by json_data:ID order by insert_date) = 1
) SOURCE
on DEST.json_data:ID = SOURCE.json_data:ID
when matched and metadata$action = 'INSERT' then
update set DEST.json_data = SOURCE.json_data,
DEST.insert_date = current_timestamp()
when not matched and metadata$action = 'INSERT' then
insert (DEST.json_data, DEST.insert_date)
values(SOURCE.json_data, current_timestamp());
return 'CDC records successfully inserted';
end;
$$;
Now, lets create a task, which calls our stored procedure once a minute:
/* Create task */
create or replace task identifier($task_name)
warehouse = LOAD_WH
schedule = '1 minute'
comment = 'Change data capture task that pulls over new data once a minute'
when system$stream_has_data('my_stream')
as
call my_procedure();
Tasks are created in a paused state by default, we need to activate them. To do that, use the ACCOUNTADMIN
role to GRANT
EXECUTE
on the task to your role of choice, then ALTER
the task to resume:
/* grant execute task priveleges to role sysadmin */
use role accountadmin;
grant execute task
on account
to role identifier($role_name);
/* tasks are created in a suspended state by default, you must 'resume' them to schedule them */
use role identifier($role_name);
alter task identifier($task_name) resume;
Finally, we can confirm everything is working:
select * from identifier($my_table);
show tasks;
select *
from table(information_schema.task_history())
order by scheduled_time;
You will now have a table in a different database, or schema (modify the code of course to suit your needs) that will be synced up, once a minute to a source table of your choosing.
Here is the full code, also located on my Github:
/* Set session variables */
set role_name = 'sysadmin';
set wh = 'my_wh';
set db = 'my_db';
set sch = 'my_schema';
set dest_table = 'my_table';
set stream_name = 'my_stream';
set source_table = 'source_db.source_schema.source_table'; --Use the fully qualified name her if your source table is in a different db/schema
set proc_name = 'my_procedure';
set task_name = 'push_my_table';
/* Initialize Environment */
use role identifier($role_name);
use warehouse identifier($wh);
/* Create database, table and stream objects */
create database if not exists identifier($db);
create schema if not exists identifier($sch);
use database identifier($db);
use schema identifier($sch);
create table if not exists identifier($dest_table)
comment='JSON data from API, streaming from the source database'
clone identifier($source_table);
create stream if not exists identifier($stream_name)
on table identifier($source_table)
comment = 'CDC stream from source table to prod table';
/* quick diagnostic check */
show streams;
select * from identifier($stream_name);
/*
Create stored procedure -
tailor this to your needs
*/
create or replace procedure identifier($proc_name)()
returns varchar
language sql
execute as owner
as
$$
begin
merge into my_table DEST using (
select * from my_stream
qualify row_number() over (
partition by json_data:ID order by insert_date) = 1
) SOURCE
on DEST.json_data:ID = SOURCE.json_data:ID
when matched and metadata$action = 'INSERT' then
update set DEST.json_data = SOURCE.json_data,
DEST.insert_date = current_timestamp()
when not matched and metadata$action = 'INSERT' then
insert (DEST.json_data, DEST.insert_date)
values(SOURCE.json_data, current_timestamp());
return 'CDC records successfully inserted';
end;
$$;
/* Create task */
create or replace task identifier($task_name)
warehouse = LOAD_WH
schedule = '1 minute'
comment = 'Change data capture task that pulls over new data once a minute'
when system$stream_has_data('my_stream')
as
call my_procedure();
/* grant execute task priveleges to role sysadmin */
use role accountadmin;
grant execute task
on account
to role identifier($role_name);
/* tasks are created in a suspended state by default, you must 'resume' them to schedule them */
use role identifier($role_name);
alter task identifier($task_name) resume;
select * from identifier($my_table);
show tasks;
select *
from table(information_schema.task_history())
order by scheduled_time;