Skip to main content

Snowflake

This connector materializes Flow collections into tables in a Snowflake database. It allows both standard and delta updates. Snowpipe is additionally available for delta update bindings.

The connector first uploads data changes to a Snowflake table stage. From there, it transactionally applies the changes to the Snowflake table.

ghcr.io/estuary/materialize-snowflake:dev provides the latest connector image. You can also follow the link in your browser to see past image versions.

Prerequisites

To use this connector, you'll need:

  • A Snowflake account that includes:
    • A target database, to which you'll materialize data
    • A schema — a logical grouping of database objects — within the target database
    • A virtual warehouse
    • A user with a role assigned that grants the appropriate access levels to these resources. See the script below for details.
  • Know your Snowflake account's host URL. This is formatted using your Snowflake account identifier, for example, orgname-accountname.snowflakecomputing.com.
  • At least one Flow collection
tip

If you haven't yet captured your data from its external source, start at the beginning of the guide to create a dataflow. You'll be referred back to this connector-specific documentation at the appropriate steps.

Setup

To meet the prerequisites, copy and paste the following script into the Snowflake SQL editor, replacing the variable names in the first six lines.

If you'd like to use an existing database, warehouse, and/or schema, be sure to set database_name, warehouse_name, and estuary_schema accordingly. If you specify a new name, the script will create the item for you. You can set estuary_role, estuary_user, and estuary_password to whatever you'd like.

Check the All Queries check box, and click Run.

set database_name = 'ESTUARY_DB';
set warehouse_name = 'ESTUARY_WH';
set estuary_role = 'ESTUARY_ROLE';
set estuary_user = 'ESTUARY_USER';
set estuary_password = 'secret';
set estuary_schema = 'ESTUARY_SCHEMA';
-- create role and schema for Estuary
create role if not exists identifier($estuary_role);
grant role identifier($estuary_role) to role SYSADMIN;
-- Create snowflake DB
create database if not exists identifier($database_name);
use database identifier($database_name);
create schema if not exists identifier($estuary_schema);
-- create a user for Estuary
create user if not exists identifier($estuary_user)
password = $estuary_password
default_role = $estuary_role
default_warehouse = $warehouse_name;
grant role identifier($estuary_role) to user identifier($estuary_user);
grant all on schema identifier($estuary_schema) to identifier($estuary_role);
-- create a warehouse for estuary
create warehouse if not exists identifier($warehouse_name)
warehouse_size = xsmall
warehouse_type = standard
auto_suspend = 60
auto_resume = true
initially_suspended = true;
-- grant Estuary role access to warehouse
grant USAGE
on warehouse identifier($warehouse_name)
to role identifier($estuary_role);
-- grant Estuary access to database
grant CREATE SCHEMA, MONITOR, USAGE on database identifier($database_name) to role identifier($estuary_role);
-- change role to ACCOUNTADMIN for STORAGE INTEGRATION support to Estuary (only needed for Snowflake on GCP)
use role ACCOUNTADMIN;
grant CREATE INTEGRATION on account to role identifier($estuary_role);
use role sysadmin;
COMMIT;

Key-pair Authentication & Snowpipe

In order to enable use of Snowpipe for delta updates bindings, you need to authenticate using key-pair authentication, also known as JWT authentication.

To set up your user for key-pair authentication, first generate a key-pair in your shell:

# generate a private key
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
# generate a public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# read the public key and copy it to clipboard
cat rsa_key.pub

-----BEGIN PUBLIC KEY-----
MIIBIj...
-----END PUBLIC KEY-----

Then assign the public key with your Snowflake user using these SQL commands:

ALTER USER $estuary_user SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...'

Verify the public key fingerprint in Snowflake matches the one you have locally:

DESC USER $estuary_user;
SELECT TRIM((SELECT "value" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
WHERE "property" = 'RSA_PUBLIC_KEY_FP'), 'SHA256:');

Then compare with the local version:

openssl rsa -pubin -in rsa_key.pub -outform DER | openssl dgst -sha256 -binary | openssl enc -base64

Now you can use the generated private key when configuring your Snowflake connector. Once you have key-pair authentication enabled, delta updates bindings will use Snowpipe for loading data.

Configuration

To use this connector, begin with data in one or more Flow collections. Use the below properties to configure a Snowflake materialization, which will direct one or more of your Flow collections to new Snowflake tables.

Properties

Endpoint

PropertyTitleDescriptionTypeRequired/Default
/accountAccountThe Snowflake account identifierstringRequired
/databaseDatabaseName of the Snowflake database to which to materializestringRequired
/hostHost URLThe Snowflake Host used for the connection. Example: orgname-accountname.snowflakecomputing.com (do not include the protocol).stringRequired
/roleRoleRole assigned to the userstring
/schemaSchemaDatabase schema for bound collection tables (unless overridden within the binding resource configuration) as well as associated materialization metadata tablesstringRequired
/warehouseWarehouseName of the data warehouse that contains the databasestring
/credentialsCredentialsCredentials for authenticationobjectRequired
/credentials/auth_typeAuthentication typeOne of user_password or jwtstringRequired
/credentials/userUserSnowflake usernamestringRequired
/credentials/passwordPasswordRequired if using user_password authenticationstringRequired
/credentials/privateKeyPrivate KeyRequired if using jwt authenticationstringRequired
/advancedAdvanced OptionsOptions for advanced users. You should not typically need to modify these.object
/advanced/updateDelayUpdate DelayPotentially reduce active warehouse time by increasing the delay between updates.string

Bindings

PropertyTitleDescriptionTypeRequired/Default
/tableTableTable namestringRequired
/schemaAlternative SchemaAlternative schema for this tablestring
/delta_updatesDelta updatesWhether to use standard or delta updatesboolean

Sample

User and password authentication:

materializations:
${PREFIX}/${mat_name}:
endpoint:
connector:
config:
account: acmeCo
database: acmeCo_db
host: orgname-accountname.snowflakecomputing.com
schema: acmeCo_flow_schema
warehouse: acmeCo_warehouse
credentials:
auth_type: user_pasword
user: snowflake_user
password: secret
image: ghcr.io/estuary/materialize-snowflake:dev
# If you have multiple collections you need to materialize, add a binding for each one
# to ensure complete data flow-through
bindings:
- resource:
table: ${table_name}
source: ${PREFIX}/${source_collection}

Key-pair authentication:

materializations:
${PREFIX}/${mat_name}:
endpoint:
connector:
config:
account: acmeCo
database: acmeCo_db
host: orgname-accountname.snowflakecomputing.com
schema: acmeCo_flow_schema
warehouse: acmeCo_warehouse
credentials:
auth_type: jwt
user: snowflake_user
privateKey: |
-----BEGIN PRIVATE KEY-----
MIIEv....
...
...
...
...
...
-----END PRIVATE KEY-----
image: ghcr.io/estuary/materialize-snowflake:dev
# If you have multiple collections you need to materialize, add a binding for each one
# to ensure complete data flow-through
bindings:
- resource:
table: ${table_name}
source: ${PREFIX}/${source_collection}

Delta updates

This connector supports both standard (merge) and delta updates. The default is to use standard updates.

Enabling delta updates will prevent Flow from querying for documents in your Snowflake table, which can reduce latency and costs for large datasets. If you're certain that all events will have unique keys, enabling delta updates is a simple way to improve performance with no effect on the output. However, enabling delta updates is not suitable for all workflows, as the resulting table in Snowflake won't be fully reduced.

You can enable delta updates on a per-binding basis:

    bindings:
- resource:
table: ${table_name}
delta_updates: true
source: ${PREFIX}/${source_collection}

Performance considerations

Optimizing performance for standard updates

When using standard updates for a large dataset, the collection key you choose can have a significant impact on materialization performance and efficiency.

Snowflake uses micro partitions to physically arrange data within tables. Each micro partition includes metadata, such as the minimum and maximum values for each column. If you choose a collection key that takes advantage of this metadata to help Snowflake prune irrelevant micro partitions, you'll see dramatically better performance.

For example, if you materialize a collection with a key of /user_id, it will tend to perform far worse than a materialization of /date, /user_id. This is because most materializations tend to be roughly chronological over time, and that means that data is written to Snowflake in roughly /date order.

This means that updates of keys /date, /user_id will need to physically read far fewer rows as compared to a key like /user_id, because those rows will tend to live in the same micro-partitions, and Snowflake is able to cheaply prune micro-partitions that aren't relevant to the transaction.

Reducing active warehouse time

Snowflake compute is priced per second of activity, with a minimum of 60 seconds. Inactive warehouses don't incur charges. To keep costs down, you'll want to minimize your warehouse's active time.

Like other Estuary connectors, this is a real-time connector that materializes documents using continuous transactions. Every time a Flow materialization commits a transaction, your warehouse becomes active.

If your source data collection or collections don't change much, this shouldn't cause an issue; Flow only commits transactions when data has changed. However, if your source data is frequently updated, your materialization may have frequent transactions that result in excessive active time in the warehouse, and thus a higher bill from Snowflake.

To mitigate this, we recommend a two-pronged approach:

  • Configure your Snowflake warehouse to auto-suspend after 60 seconds.

    This ensures that after each transaction completes, you'll only be charged for one minute of compute, Snowflake's smallest granularity.

    Use a query like the one shown below, being sure to substitute your warehouse name:

    ALTER WAREHOUSE ESTUARY_WH SET auto_suspend = 60;
  • Configure the materialization's update delay by setting a value in the advanced configuration.

For example, if you set the warehouse to auto-suspend after 60 seconds and set the materialization's update delay to 30 minutes, you can incur as little as 48 minutes per day of active time in the warehouse.

Update Delay

The Update Delay parameter in Estuary materializations offers a flexible approach to data ingestion scheduling. This advanced option allows users to control when the materialization or capture tasks pull in new data by specifying a delay period. By incorporating an update delay into your workflow, you can effectively manage and optimize your active warehouse time, leading to potentially lower costs and more efficient data processing.

An update delay is configured in the advanced settings of a materialization's configuration. It represents the amount of time the system will wait before it begins materializing the latest data. This delay is specified in hours and can be adjusted according to the needs of your data pipeline.

For example, if an update delay is set to 2 hours, the materialization task will pause for 2 hours before processing the latest available data. This delay ensures that data is not pulled in immediately after it becomes available, allowing for batching and other optimizations that can reduce warehouse load and processing time.

To configure an update delay, navigate the Advanced Options section of the materialization's configuration and select a value from the drop down. The default value for the update delay in Estuary materializations is set to 30 minutes.

Snowpipe

Snowpipe allows for loading data into target tables without waking up the warehouse, which can be cheaper and more performant. Snowpipe can be used for delta updates bindings, and it requires configuring your authentication using a private key. Instructions for configuring key-pair authentication can be found in this page: Key-pair Authentication & Snowpipe

Timestamp Data Type Mapping

Flow uses the TIMESTAMP type alias in Snowflake for materializing timestamp data types. This type alias points to either TIMESTAMP_NTZ (default), TIMESTAMP_TZ or TIMESTAMP_LTZ. The default TIMESTAMP_NTZ mapping means timestamps are normalised to UTC upon materialization. If you want to have timezone data as part of the timestamp, set the TIMESTAMP_TYPE_MAPPING configuration to TIMESTAMP_TZ. See Snowflake documentation on TIMESTAMP_TYPE_MAPPING for more information.

Reserved words

Snowflake has a list of reserved words that must be quoted in order to be used as an identifier. Flow automatically quotes fields that are in the reserved words list. You can find this list in Snowflake's documentation here and in the table below.

caution

In Snowflake, objects created with quoted identifiers must always be referenced exactly as created, including the quotes. Otherwise, SQL statements and queries can result in errors. See the Snowflake docs.

Reserved words
accountfromqualify
allfullregexp
altergrantrevoke
andgroupright
anygsclusterrlike
ashavingrow
betweenilikerows
byinsample
caseincrementschema
castinnerselect
checkinsertset
columnintersectsome
connectintostart
connectionistable
constraintissuetablesample
createjointhen
crosslateralto
currentlefttrigger
current_dateliketrue
current_timelocaltimetry_cast
current_timestamplocaltimestampunion
current_userminusunique
databasenaturalupdate
deletenotusing
distinctnullvalues
dropofview
elseonwhen
existsorwhenever
falseorderwhere
followingorganizationwith
for