Skip to main content

flowctl in Automated and Programmatic Contexts

While Estuary's UI is a convenient way to create and manage resources, some users may prefer to treat their captures, materializations, and other resources as infrastructure-as-code. This allows resource specifications to be checked into your own version control system with a clearly logged history of changes. You can then set your infa-as-code repositories up with a CI/CD pipeline to automate deployment.

This guide will show you how to configure Estuary Flow resources programmatically for use in CI/CD workflows or other automation.

For instructions on using the UI instead, see information on Estuary's web application.

Creating Estuary Resources Locally

During development, you can locally create, manage, and test your resources before committing your changes.

Before creating these resources, you will need:

You can authenticate your flowctl session in one of two ways:

  • Set the FLOW_AUTH_TOKEN environment variable to your Estuary access token. This is the recommended way to handle a CI or automation setup.

  • Or run the flowctl auth login command and paste in your token. This is handy for local development.

You will then be able to connect with Estuary to set up your resources.

Programmatically, all Estuary resources start with a flow.yaml configuration file. You can create and test this file locally, and can upload it to Estuary when ready to create your resources.

You can specify all of your resources (captures, collections, and materializations) in one flow.yaml file or separate them out based on resource type, schema definition, or desired data plane.

The sections below provide example configurations.

Capture Configuration

To create a capture, start with a local flow.yaml file. You will need to use the capture connector's reference for details on the available settings, authorization methods, and required fields for the configuration. The connector's reference page will include an example specification you can use to get started.

At a minimum, the configuration will need to specify:

  • The capture name
  • The connector image for the capture
  • Any credentials needed for source system authentication
  • The resource streams from the source system you wish to use

Consider these example specifications:

captures:
Artificial-Industries/ci-cd/source-stripe-native:
endpoint:
connector:
image: ghcr.io/estuary/source-stripe-native:dev
config:
credentials:
credentials_title: Private App Credentials
access_token: <Access Token>
bindings:
- resource:
stream: charges
syncMode: incremental
target: Artificial-Industries/ci-cd/stripe_charges
- resource:
stream: plans
syncMode: full_refresh
target: Artificial-Industries/ci-cd/stripe_plans

Note that you will not be able to successfully publish a capture by itself. You will also need to define the collections that relate to the capture's bindings.

Collection Configuration

When you create a capture configuration, you will also need to create associated collection configurations. Your resource streams will flow into these collections as a staging area before final materialization.

Collections provide an opportunity to enforce schemas and transform data with derivations. When using the UI, Estuary will intelligently infer these schemas for you. When you're creating your own specifications from scratch, however, you will need to be very aware of your source system's schema in order to replicate it accurately.

Create a collection specification for each target you identified in your capture bindings. The collection specification should, at a minimum, include:

  • The schema, with its properties and their types
  • Any required fields in the schema, including the key field
  • The key field used to identify and order documents
    • Since the key can be a composite, JSON pointers are used to identify relevant fields, so field names should begin with /

Consider these example specifications:

collections:
Artificial-Industries/ci-cd/stripe_charges:
schema:
type: object
required:
- id
properties:
id:
type: string
created:
type: string
amount:
type: integer
{...}
key:
- /id
Artificial-Industries/ci-cd/stripe_plans:
schema:
type: object
required:
- id
properties:
id:
type: string
active:
type: boolean
interval:
type: string
{...}
key:
- /id

Derivation Configuration

Derivations are often more complex than other Estuary resources. Besides a flow.yaml configuration file, you may also need a TypeScript or SQL script to define the transformation. Not all pipelines will require derivation resources.

A derivation is a type of collection--one that is derived from one or more existing collections. To create a specification for a derivation, you will therefore need to define the key, schema properties, and required fields as for any collection.

In addition, you will need to specify how to derive this schema:

  • Provide a pointer to the TypeScript or SQL file that handles the transformation
    • Or you may opt to define a lambda function within flow.yaml for simple transformations
  • List the existing collections that provide source data for the derivation

A full example specification may therefore look like:

collections:
Artificial-Industries/customers-with-orders:
schema:
type: object
properties:
customer_id:
type: string
name:
type: string
orders:
type: array
items:
type: object
properties:
order_id:
type: string
reduce:
strategy: merge
key:
- /order_id
required:
- customer_id
reduce:
strategy: merge
key:
- /customer_id

derive:
using:
typescript:
module: full-outer-join.flow.ts
transforms:
- name: fromOrders
source:
name: Artificial-Industries/join-collections/orders
shuffle:
key:
- /customer_id
- name: fromCustomers
source:
name: Artificial-Industries/join-collections/customers
shuffle:
key:
- /customer_id

If you specify a separate module for a transformation in your flow.yaml, you can generate stub files to help get started with your derivation. When you're finished with the specification, run the following command:

flowctl generate --source path/to/your/flow.yaml

This flowctl command requires Docker. Successfully running it will generate relevant stub files, which you can modify to return your expected schema.

For more on configuring transformations, see other derivation guides.

Materialization Configuration

Creating a new materialization resource is similar to creating a capture. In a local flow.yaml file, you can fill out a specification according to the materialization connector's reference guide. The reference will indicate which fields are required and how you can authenticate. It will also provide an example specification you can use to get started.

The configuration, at a minimum, will need to specify:

  • The materialization name
  • The connector image for the materialization
  • Any credentials needed for destination system authentication
  • The data collections to use as sources and which tables they should map to

Consider this example specification:

materializations:
Artificial-Industries/ci-cd/materialize-snowflake:
endpoint:
connector:
image: ghcr.io/estuary/materialize-snowflake:dev
config:
host: orgname-accountname.snowflakecomputing.com
database: estuary_db
schema: estuary_schema
credentials:
auth_type: jwt
user: estuary_user
privateKey: |
-----BEGIN PRIVATE KEY-----
MIIEv....
...
...
-----END PRIVATE KEY-----
bindings:
- resource:
table: shipments
source: Artificial-Industries/ci-cd/postgres_shipments

Encrypting Secrets

Passwords, authentication tokens, and other sensitive information in your specifications should be encrypted. If you create resources through the UI, Estuary will do this for you. In an automation setting, you will need to encrypt sensitive information yourself.

See the section on sops for guidance and examples.

Testing Specifications

You can add tests to your specifications to ensure baseline expected behavior. Tests are defined as any other resource. You can specify ingest and verify steps to provide and evaluate test documents.

While Estuary performs basic tests by default, it's best practice to define your own tests when working in a programmatic context. That way, you can incorporate testing into your CI/CD workflow to ensure you only publish changes that conform with your requirements for your data.

Consider this example specification:

tests:
Artificial-Industries/tests/example:
- ingest:
collection: Artificial-Industries/line-items
documents:
- { id: "1", item: "popcorn", price: 499, sales_tax: 25 }
- { id: "2", item: "hot dog", price: 650, sales_tax: 32 }
- verify:
collection: Artificial-Industries/line-item-totals
documents:
- { id: "1", item: "popcorn", total: "$5.24" }
- { id: "2", item: "hot dog", total: "$6.82" }

Run tests using:

flowctl catalog test --source <SOURCE>

Your output will be similar to the following:

test:1> Running  1  tests...
test:1> ✔️ flow://test/Artificial-Industries/tests/example :: Artificial-Industries/tests/example
test:1>
test:1> Ran 1 tests, 1 passed, 0 failed
Tests successful

Publishing Resources

Once you're happy with your resources and any tests have passed, your automation can publish your changes to Estuary.

Your automation will need to be authenticated to use flowctl for your resources on your behalf. You can do so by setting the FLOW_AUTH_TOKEN environment variable:

export FLOW_AUTH_TOKEN=your_refresh_token

The session will then be authenticated to use the catalog publish command:

flowctl catalog publish --source <SOURCE>

The command's default behavior is to summarize the resource configurations to publish and prompt for confirmation. You can skip this prompt with the --auto-approve option.

Choosing a Data Plane

The catalog publish command defaults to publishing resources to the ops/dp/public/gcp-us-central1-c1 data plane. You can also specify a different public data plane or your own private or BYOC data plane.

Public data planes will begin with the ops/dp/public/ prefix. If you are using a private data plane, you can ask your Estuary representative if you are unsure of the full name for your data plane.

When publishing resources to a data plane besides the default, make sure to specify the data plane in an option:

flowctl catalog publish --default-data-plane ops/dp/public/aws-eu-west-1-c1 --source ./flow.yaml

All resources that interact with each other (such as derivations or materializations along with their relevant sources) must be part of the same data plane. If you wish to publish resources to different data planes, you will need to save the specifications in different files and run separate commands for each.

Editing Existing Resources Locally

If you are starting from a published resource, you can pull the latest version from Estuary to your local directory with the flowctl catalog pull-specs command.

You can add options to the pull-specs command to target certain resources or customize your experience:

  • --captures, --collections, --materializations: Only pull specifications from a specific type of resource
  • --name <NAME>: Pull the specification for a single, named resource
  • --target <TARGET>: Local root specification to write to (defaults to flow.yaml)
  • --overwrite: Determine whether existing specs are overwritten by copies from the Flow control plane; useful if existing local copies have gotten out of date
  • --flat: Determine whether specs are written to a single specification file or follow a canonical layout
tip

When you begin local development to update a resource specification, you may want to pull a fresh copy of the spec directly from Estuary, even if you check changes into your own infra-as-code repo. This will ensure any changes made via the UI or as part of an extended support session get captured.

See more on how to edit your specifications locally.