Skip to main content

Derivations

At times, the collections generated by a capture may not be suitable for your needs. For instance, you might want to filter certain documents or add calculations to them. Perhaps you need to unpack an array nested inside or aggregate data from many documents. Alternatively, you might need to merge across several collections using a common key, or employ business logic to arrive at a real-time decision. With Flow derivations, you can perform a wide range of transformations, from a simple remapping to complicated, self-referential, and stateful transaction processing.

In essence, a derivation is a collection that is constructed from applying transformations to one or more sourced collections. Derivations operate continuously, keeping up with updates to the source collections as they happen.

A derivation consists of three primary elements:

  • A collection that stores the output.
  • A catalog task that applies transformations to source documents as they become available and writes the resulting documents into the derived collection.
  • An internal task state which enables aggregations, joins, and windowing.

Today, Flow enables you to write derivations using either SQLite or TypeScript. Additional language support is in the works.

note

If you would like a a more hands-on approach to learn derivations, check out this tutorial!

Specification

A derivation is specified as a regular collection with an additional derive stanza:

collections:
# The unique name of the derivation.
acmeCo/my/derivation:
schema: my-schema.yaml
key: [/key]

# Presence of a `derive` stanza makes this collection a derivation.
# Type: object
derive:
# Connector which this derivation uses.
# One of `typescript` or `sqlite`.
using:
# Derivation is using the SQLite connector.
# Optional, type: object
sqlite:
# SQL migrations to apply as inline SQL or file references.
# If a referenced file does not exist
# a stub can be generated using `flowctl generate`.
# Optional, type: array of strings
migrations:
- CREATE TABLE foobar (id INTEGER PRIMARY KEY NOT NULL);
- ../path/to/other/migration.sql

# Derivation is using the TypeScript connector.
# Optional, type: object
typescript:
# TypeScript module implementing this derivation,
# as inline TypeScript or a relative file reference.
# If a referenced file does not exist
# a stub can be generated using `flowctl generate`.
module: acmeModule.ts

# The array of transformations that build this derived collection.
transform:
# Unique name of the transformation, containing only Unicode
# Letters, Numbers, `-`, or `_` (no spaces or other punctuation).
- name: myTransformName
# Source collection read by this transformation.
# Required, type: object or string.
source:
# Name of the collection to be read.
# Required.
name: acmeCo/my/source/collection
# Partition selector of the source collection.
# Optional. Default is to read all partitions.
partitions: {}
# Lower bound date-time for documents which should be processed.
# Source collection documents published before this date-time are filtered.
# `notBefore` is *only* a filter. Updating its value will not cause Flow
# to re-process documents that have already been read.
# Optional. Default is to process all documents.
notBefore: 2023-01-23T01:00:00Z
# Upper bound date-time for documents which should be processed.
# Source collection documents published after this date-time are filtered.
# Like `notBefore`, `notAfter` is *only* a filter. Updating its value will
# not cause Flow to re-process documents that have already been read.
# Optional. Default is to process all documents.
notAfter: 2023-01-23T02:00:00Z

# Lambda of this transform, with a meaning which depends
# on the derivation connector:
# * SQLite derivation lambdas are blocks of SQL code.
# * TypeScript does not use `lambda`, as implementations
# are provided by the derivation's TypeScript module.
# Lambdas can be either inline or a relative file reference.
lambda: SELECT $foo, $bar;
# Delay applied to sourced documents before being processed
# by this transformation.
# Default: No delay, pattern: ^\\d+(s|m|h)$
readDelay: "48h"
# Key by which source documents are shuffled to task shards.
# Optional, type: object.
# If not set, the source collection key is used.
shuffle:
# Composite key of JSON pointers which are extracted from
# source documents.
key: [/shuffle/key/one, /shuffle/key/two]
# Priority applied to documents of this transformation
# relative to other transformations of the derivation.
# Default: 0, integer >= 0
priority: 0

Supported Languages

As with captures and materializations, Flow derivations are built around a plug-able connectors architecture. Derivation connectors encapsulate the details of how documents are transformed, and integrate with Flow's runtime through a common protocol.

At present, Flow supports transformations in SQL using SQLite, and TypeScript.

SQLite

Flow's SQLite connector lets you write plain SQL which is evaluated with each source collection document:

derive:
using:
sqlite: {}
transforms:
- name: fromOrders
source: acmeCo/orders
shuffle: any
lambda:
SELECT $customer,
DATE($timestamp) AS date,
PRINTF('$%.2f', $item_price + $sales_tax) AS cost;

Given an input document:

{
"customer": "Wile E. Coyote",
"timestamp": "2023-04-17T16:45:31Z",
"item_price": 11.5,
"sales_tax": 0.8
}

The derivation will produce an output document like:

{
"customer": "Wile E. Coyote",
"date": "2023-04-17",
"cost": "$12.30"
}

SQLite derivations run within the context of a persistent, managed SQLite database. Most anything you can do within SQLite, you can do within a SQLite derivation.

SQL Lambdas

Lambdas are blocks of one or more SQL statements. They can be defined inline within a Flow specification, or they can be provided as a relative file reference to a file of SQL.

Your SQL lambda code can include any number of statements, and your statements are evaluated in the context of your applied database migrations. Use regular INSERT, UPDATE, and DELETE statements in your SQL blocks to manipulate your internal tables as required.

Any rows which are returned by SQL statements, such as SELECT and also variations like INSERT ... RETURNING, are mapped into documents that are published into your derived collection. Published documents must conform to your collection schema or your derivation task will stop due to the schema violation.

tip

The SQLite connector wraps your lambdas in an enclosing transaction. Do not include BEGIN or COMMIT statements in your lambdas. You may use a SAVEPOINT or ROLLBACK TO.

Document Mapping

In most cases, each named output column of your query becomes a top-level property of its corresponding document.

When you directly select a $parameter its corresponding field name is used. For example, a projection with field name my-field would be queried as SELECT $my_field; and map into a document like {"my-field":"value"}.

A named column such as SELECT $x * 100 AS foo;, maps to a property using the provided name: {"foo": 200}.

Your selected columns may included nested JSON documents, such as SELECT 'hello' AS greeting, JSON_ARRAY(1, 'two', 3) AS items;. The connector looks for SQLite TEXT values which can be parsed into JSON arrays or objects and embeds them into the mapped document: {"greeting": "hello", "items": [1, "two", 3]}. If parsing fails, the raw string is used instead.

If you would like to select all columns of the input collection, rather than select *, use select JSON($flow_document), e.g. select JSON($flow_document where $status = open;.

As a special case if your query selects a single column having a name that begins with json or JSON, as is common when working with SQLite's JSON functions, then that column will become the output document. For example SELECT JSON_OBJECT('a', 1, 'b', JSON('true')); maps into document {"a": 1, "b": true}. This can be used to build documents with dynamic top-level properties.

Parameters

Your SQL lambda will execute with every source document of the collection it transforms. To access locations within the document, you utilize $parameter placeholders in your SQL code, which bind to projections of the source document. You can use both your defined projections as well as projections which are statically inferred from your source collection's schema.

You can access projected fields that are top-level as well as those which are nested within a source document. Consider the following schematized document:

{"top-level": true, "object": {"foo": 42}, "arr": ["bar"]}

In your SQL code, you can use parameters like $top_level, $object$foo, or $arr$0. If you're unsure of what parameter to use for a given field, try typing something approximate and Flow will suggest the appropriate $parameter.

Migrations

The SQLite connector offers a managed, persistent SQLite database that can accommodate any number of tables, indices, views, triggers, and other schema, as defined by your database migrations. To add a migration, simply append it to the migrations array, either as a block of inline SQL statements or as a relative path to a file of SQL statements:

derive:
using:
sqlite:
migrations:
- CREATE TABLE foo (thing INTEGER NOT NULL);
CREATE INDEX idx_foo_thing foo (thing);
- ../path/to/another/migration.sql
- ALTER TABLE foo ADD COLUMN other_thing TEXT NOT NULL;
- https://example.com/yet/another/migration.sql
caution

You cannot change an existing migration once it has been published. Instead, add a new migration which applies your desired schema.

The tables and other schema you create through database migrations are the internal state of your derivation. They don't directly cause any documents to be published into your derived collection, but changes made to tables in one SQL lambda execution are immediately visible to others. Changes are also durable and transactional: a Flow derivation transaction commits new documents to the derived collection in lockstep with committing changes made to your task tables.

Flow is responsible for the persistence and replication of your SQLite database, and the SQLite connector tracks and will apply your migrations as needed.

Performance

Your observed performance will of course depend on the specifics of your use case, including the size of your task states and the complexity of your source documents and transformations.

Generally speaking, SQLite is very performant and Flow's SQLite connector strives to drive it as efficiently as possible. Real-world use cases are observed to process many tens of thousands of documents per second on a single core.

Flow can also scale your task without downtime by creating point-in-time clones of the database that subdivide the overall workload and storage of the task. Once created, these subdivisions process in parallel across multiple physical machines to enhance performance.

TypeScript

Flow's TypeScript derivation connector transforms your source documents by executing methods of a TypeScript class which you implement. TypeScript derivations are executed using Deno and let you take advantage of the broad ecosystem of available third-party JavaScript and TypeScript libraries, as well as native code compiled to WASM.

TypeScript derivations are strongly typed: Flow maps the JSON schemas of your source and output collections into corresponding TypeScript types, which are type-checked as you develop and test your derivation. This helps catch a wide variety of potential bugs and avoid accidental violations of your collection data contracts.

Modules

The bulk of a TypeScript derivation lives in its associated module, which is a TypeScript source file that exports the class that implements your derivation.

Each derivation also has an accompanying, generated interfaces module. Interface modules are managed by Flow and are purely advisory: they're generated to improve your development experience, but any changes you make are ignored.

The flowctl generate --source path/to/my/derivation.flow.yaml CLI command will generate interface modules under paths like flow_generated/typescript/acmeCo/my-derivation.ts, under the top-level directory under --source having a flow.yaml or flow.json file.

It will also generate a deno.json file in your top-level directory, which is designed to work with developer tooling like VSCode's Deno extension.

See the Current Account Balances tutorial for a concrete example of modules.

State

The abstract IDerivation class generated within the interfaces module includes additional, experimental methods which can be used for persisting and recovering internal state of the connector.

Consult the generated implementation and feel free to reach out to support if you'd like more information on building stateful TypeScript derivations.

Transformations

A transformation binds a source collection to a derivation, causing its documents to be read and processed by the derivation connector.

Read source documents are first shuffled on a shuffle key to co-locate the processing of documents that have equal shuffle keys. The transformation then processes documents by invoking lambdas: user-defined functions that accept documents as arguments, return documents in response, and potentially update internal task state.

A derivation may have many transformations, and each transformation has a long-lived and stable name. Each transformation independently reads documents from its source collection and tracks its own read progress. More than one transformation can read from the same source collection, and transformations may also source from their own derivation, enabling cyclic data-flows and graph algorithms.

Transformations may be added to or removed from a derivation at any time. This makes it possible to, for example, add a new collection into an existing multi-way join, or gracefully migrate to a new source collection without incurring downtime. However, renaming a running transformation is not possible. If attempted, the old transformation is dropped and a new transformation under the new name is created, which begins reading its source collection all over again.

graph LR; d[Derivation]; t[Transformation]; s[Internal State]; l[Lambda]; c[Sourced Collection]; o[Derived Collection]; d-- has many -->t; d-- has one -->s; d-- has one -->o; c-- reads from -->t; t-- invokes -->l; l-- updates -->s; s-- queries -->l; l-- publishes to -->o;

Sources

The source of a transformation is a collection. As documents are published into the source collection, they are continuously read and processed by the transformation.

A partition selector may be provided to process only a subset of the source collection's logical partitions. Selectors are efficient: only partitions that match the selector are read, and Flow can cheaply skip over partitions that don't.

Derivations re-validate their source documents against the source collection's schema as they are read. This is because collection schemas may evolve over time, and could have inadvertently become incompatible with historical documents of the source collection. Upon a schema error, the derivation will pause and give you an opportunity to correct the problem.

Shuffles

As each source document is read, it's shuffled — or equivalently, mapped — on an extracted key.

If you're familiar with data shuffles in tools like MapReduce, Apache Spark, or Flink, the concept is very similar. Flow catalog tasks scale out into multiple shards, each running in parallel on different physical machines, where each shard processes a subset of source documents.

Shuffles let Flow identify the shard that should process a particular source document, in order to co-locate that processing with other documents it may need to know about.

For example, transforms of the Approving Transfers example shuffle on either /sender or /recipient in order to process documents that debit or credit accounts on the specific shard that is uniquely responsible for maintaining the balance of a given account.

graph LR; subgraph s1 [Source Partitions] p1>acmeBank/transfers/part-1]; p2>acmeBank/transfers/part-2]; end subgraph s2 [Derivation Task Shards] t1([derivation/shard-1]); t2([derivation/shard-2]); end p1-- sender: alice -->t1; p1-- recipient: bob -->t2; p2-- recipient: alice -->t1; p2-- sender: bob -->t2;

Flow offers three modes for configuring document shuffles: key, any, and lambda.

shuffle: key

Shuffle keys are defined as an array of JSON pointers to locations that should be extracted from your source documents. This array forms the composite key over which your documents are shuffled:

transforms:
- name: fromOrders
source: acmeCo/orders
shuffle:
key: [/item/product_id, /customer_id]
# Flow guarantees that the same shard will process the user's lambda
# for all instances of a specific (product ID, customer ID) tuple.
lambda: ...

If a derivation has more than one transformation, the shuffle keys of all transformations must align with one another in terms of the extracted key types (string vs integer) as well as the number of components in a composite key.

For example, one transformation couldn't shuffle transfers on [/id] while another shuffles on [/sender], because sender is a string and id an integer.

Similarly mixing a shuffle of [/sender] alongside [/sender, /recipient] is prohibited because the keys have different numbers of components.

shuffle: any

If your lambda doesn't rely on any task state then it may not matter which task shard processes a given source document. In these instances you can use shuffle: any, which allows source documents to be processed by any available task shard.

This is common for transformation lambdas which perform basic filtering or mapping of source documents and which don't require any joined task state.

transforms:
- name: fromOrders
source: acmeCo/orders
shuffle: any
# The user's lambda is a pure function and can be evaluated by any available shard.
lambda:
SELECT $customer_id, $item_price WHERE $item_price > 100;

shuffle: lambda

Warning

Computed shuffles are in active development and are not yet functional.

Your source documents may not always contain an appropriate value to shuffle upon. For instance, you might want to shuffle on product ID and order date, but your source documents contain only an order timestamp field.

You can use shuffle: lambda to define a function that maps your source document into the appropriate shuffle key:

transforms:
- name: fromOrders
source: acmeCo/orders
shuffle:
lambda: SELECT $product_id, DATE($order_timestamp);
# Flow guarantees that the same shard will process the user's lambda
# for all instances of a specific (product ID, date) tuple.
lambda: ...

Your shuffle lambda must return exactly one row, and its columns and types must align with the other shuffles of your derivation transformations.

Flow must know the types of your composite shuffle key. In most cases it will infer these types from the shuffle: key of another transformation. If you have no shuffle: key transformations, Flow will ask that you explicitly tell it your shuffle types:

derive:
using:
sqlite: {}
shuffleKeyTypes: [integer, string]
transforms:
- name: fromOrders
source: acmeCo/orders
shuffle:
lambda: SELECT $product_id, DATE($order_timestamp);
lambda: ...

Lambdas

Lambdas are user-defined functions that are invoked by transformations. They accept documents as arguments and return transformed documents in response. Lambdas can update internal task state, publish documents into the derived collection, or both.

Lambdas are "serverless": Flow manages the execution and scaling of your transformation lambdas on your behalf.

Processing order

Transformations may simultaneously read from many source collections, or even read from the same source collection multiple times.

Roughly speaking, the derivation will globally process transformations and their source documents in the time-based order in which the source documents were originally written to their source collections. This means that a derivation started a month ago and a new copy of the derivation started today, will process documents in the same order and arrive at the same result. Derivations are repeatable.

More precisely, processing order is stable for each individual shuffle key, though different shuffle keys may process in different orders if more than one task shard is used.

Processing order can be attenuated through a read delay or differentiated read priority.

Read delay

A transformation can define a read delay, which will hold back the processing of its source documents until the time delay condition is met. For example, a read delay of 15 minutes would mean that a source document cannot be processed until it was published at least 15 minutes ago. If the derivation is working through a historical backlog of source documents, than a delayed transformation will respect its ordering delay relative to the publishing times of other historical documents also being read.

Event-driven workflows are a great fit for reacting to events as they occur, but aren’t terribly good at taking action when something hasn’t happened:

  • A user adds a product to their cart, but then doesn’t complete a purchase.
  • A temperature sensor stops producing its expected, periodic measurements.

A common pattern for tackling these workflows in Flow is to read a source collection without a delay and update an internal state. Then, read a collection with a read delay and determine whether the desired action has happened or not. For example, source from a collection of sensor readings and index the last timestamp of each sensor. Then, source the same collection again with a read delay: if the register timestamp isn't more recent than the delayed source reading, the sensor failed to produce a measurement.

Flow read delays are very efficient and scale better than managing very large numbers of fine-grain timers.

See Grouped Windows of Transfers for an example using a read delay

Learn more from the Citi Bike "idle bikes" example

Read priority

Sometimes it's necessary for all documents of a source collection to be processed by a transformation before any documents of some other source collection are processed, regardless of their relative publishing time. For example, a collection may have corrections that should be applied before the historical data of another collection is re-processed.

Transformation priorities allow you to express the relative processing priority of a derivation's various transformations. When priorities are not equal, all available source documents of a higher-priority transformation are processed before any source documents of a lower-priority transformation.

Internal State

Derivation tasks often require an internal state, perhaps to hold a partial aggregation or join result. Internal state is not a direct part of the output of a derivation. Instead, transformation lambdas query and update internal state as they process source documents and return derived documents.

For SQLite derivations, the entire SQLite database is the internal state of the task. TypeScript derivations can use in-memory states with a recovery and checkpoint mechanism. Estuary intends to offer additional mechanisms for automatic internal state snapshot and recovery in the future.

The exact nature of internal task states vary, but under the hood they're backed by a replicated embedded RocksDB instance which is co-located with the task shard execution contexts that Flow manages. As contexts are assigned and re-assigned, their state databases travel with them.

If a task shard needs to be scaled out, Flow is able to perform an online split, which cheaply clones its state database into two new databases — and paired shards — which are re-assigned to other machines.

Where to accumulate?

Derivation collection schemas may have reduction annotations, and lambdas can be combined with reductions in interesting ways. You may be familiar with map and reduce functions built into languages like Python, JavaScript, and many others or have used tools like MapReduce or Spark. In functional terms, lambdas you write within Flow are "mappers," and reductions are always done by the Flow runtime using your schema annotations.

This means that, when you implement a derivation, you get to choose where accumulation will happen:

  1. Your lambdas can update and query aggregates stored in internal task state. Approving Transfers is an example that maintains account balances in a SQLite table.
  2. Or, your lambdas can compute changes of an aggregate, which are then reduced by Flow using reduction annotations. Current Account Balances is an example that combines a lambda with a reduce annotation.

These two approaches can produce equivalent results, but they do so in very different ways.

Accumulate in Internal Task State

You can accumulate using the internal state of your derivation: for instance, by using an internal table within your SQLite derivation. You then write lambdas which update that state, or query it to publish derived documents.

For example, consider a collection that’s summing a value:

TimeStateLambdasDerived Document
T00UPDATE val = val + 5; SELECT val;5
T15UPDATE val = val - 1; SELECT val;4
T24UPDATE val = val + 2; SELECT val;6
T36

Using a derivation's internal state is a great solution if you expect to materialize the derived collection into a non-transactional store. That's because its documents are complete statements of the current answer, and can be correctly applied to systems that support only at-least-once semantics.

They’re also well-suited for materializations into endpoints that aren't stateful, such as Pub/Sub systems or Webhooks.

Accumulate in a Database

To accumulate in your materialization endpoint, such as a database, you define a derivation with a reducible schema and implement lambdas which publish the changes to a current answer. The Flow runtime then uses your reduction annotations to combine the documents published into your derived collection.

Later, when the collection is materialized, your reduction annotations are applied again to reduce each collection document into a final, fully-reduced value for each collection key that's kept up to date in the materialized table.

A key insight is that the database is the only stateful system in this scenario. The derivation itself is stateless, with lambdas that are pure functions, which is typically extremely performant.

Returning to our summing example:

TimeDBLambdasDerived Document
T00SELECT 5;5
T15SELECT -1;-1
T24SELECT 2;2
T36

This works especially well when materializing into a transactional database. Flow couples its processing transactions with corresponding database transactions, ensuring end-to-end “exactly once” semantics.

When materializing into a non-transactional store, Flow is only able to provide weaker “at least once” semantics; it’s possible that a document may be combined into a database value more than once. Whether that’s a concern depends a bit on the task at hand. Some reductions like merge can be applied repeatedly without changing the result, while in other use cases approximations are acceptable. For the summing example above, "at-least-once" semantics could give an incorrect result.

Learn more in the derivation pattern examples of Flow's repository