Skip to main content

Derivations

At times, the collections generated by a capture may not be suitable for your needs. For instance, you might want to filter certain documents or add calculations to them. Perhaps you need to unpack an array nested inside or aggregate data from many documents. Alternatively, you might need to merge across several collections using a common key, or employ business logic to arrive at a real-time decision. With Flow derivations, you can perform a wide range of transformations, from a simple remapping to complicated, self-referential, and stateful transaction processing.

In essence, a derivation is a collection that is constructed from applying transformations to one or more sourced collections. Derivations operate continuously, keeping up with updates to the source collections as they happen.

A derivation consists of three primary elements:

  • A collection that stores the output.
  • A catalog task that applies transformations to source documents as they become available and writes the resulting documents into the derived collection.
  • An internal task state which enables aggregations, joins, and windowing.

Today, Flow enables you to write derivations using either SQLite or TypeScript. Additional language support is in the works.

Tutorial

Introducing AcmeBank

The following tutorial sections use an illustrative example to introduce you to derivations, how you might use them, and their common components. We'll discuss each component in depth in subsequent sections of this page, but we recommend you start here to get your bearings.

Suppose you have an application through which users send one another some amount of currency, like in-game tokens or dollars or digital kittens. You have a transfers collection of user-requested transfers, each sending funds from one account to another:

collections:
# Collection of 💲 transfers between accounts:
# {id: 123, sender: alice, recipient: bob, amount: 32.50}
acmeBank/transfers:
schema: transfers.schema.yaml
key: [/id]

There are many views over this data that you might require, such as summaries of sender or receiver activity, or current account balances within your application.

Filtering Large Transfers

note

This section introduces SQLite derivations, SQL lambda blocks and $parameters.

Your compliance department has reached out, and they require an understanding of the last large transfer (if any) made by each user account.

You create a SQL derivation to help them out. The transfers collection is keyed on the transfer /id, so you'll need to re-key your derivation on the /sender account. You also need to filter out transfers that aren't large enough.

Putting this all together:

collections:
acmeBank/last-large-send:
schema: transfers.schema.yaml
key: [/sender]

derive:
using:
sqlite: {}
transforms:
- name: filterTransfers
source: acmeBank/transfers
shuffle: any
lambda: SELECT $id, $sender, $recipient, $amount WHERE $amount > 100;

derive: using: sqlite: {} tells Flow that collection acmeBank/last-large-send is derived using Flow's SQLite derivation connector.

This derivation has just one transform, which sources from the transfers collection. As source documents become available, they're evaluated by the SQL lambda and its SELECT output is published to the derived collection. Your SQL queries access locations of source documents through $parameter bindings.

The compliance department then materializes this collection to their preferred destination, for an always up-to-date view indexed by each account.

Finding New Account Pairs

note

This section introduces SQLite migrations and internal task tables.

The fraud team needs your help: they have a new process they must run the first time some sending account sends funds to a receiving account. They would like to see only those transfers which reflect a new account pair of (sender, recipient). To tackle this you need to know which account pairs have been seen before.

SQLite derivations run within the context of a persistent, managed SQLite database. You can apply database migrations that create whatever tables, triggers, or views you might need. Then, the statements of your SQL lambda code can INSERT, UPDATE, or DELETE from those tables, query from them, or any other operation supported by SQLite. The tables and other schema you create through your migrations are the internal state of your task.

collections:
acmeBank/first-send:
schema: transfers.schema.yaml
key: [/id]

derive:
using:
sqlite:
migrations:
- CREATE TABLE seen_pairs (
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
PRIMARY KEY (sender, recipient)
);

transforms:
- name: fromTransfers
source: acmeBank/transfers
shuffle:
key: [/sender, /recipient]
lambda:
INSERT INTO seen_pairs (sender, recipient) VALUES ($sender, $recipient)
ON CONFLICT DO NOTHING
RETURNING $id, $sender, $recipient, $amount;

This time, the derivation attempts to INSERT into the seen_pairs table, and uses SQLite's RETURNING syntax to only publish documents for rows which were successfully inserted.

You can evolve the internal SQLite tables of your derivation as needed, by appending SQL blocks which perform a database migration to the migrations array. Any migrations appended to the list are automatically applied by Flow.

Grouped Windows of Transfers

note

This section introduces delayed reads, and applies them to implement a custom window policy.

The fraud team is back, and now needs to know the other transfers which an account has made in the last day. They want you to enrich each transfer with the grouping of all transfers initiated by that account in the prior 24 hours.

You may have encountered "windowing" in other tools for stream processing. Some systems even require that you define a window policy in order to function. Flow does not use windows, but sometimes you do want a time-bound grouping of recent events.

All collection documents contain a wall-clock timestamp of when they were published. The transforms of a derivation will generally process source documents in ascending wall-time order. You can augment this behavior by using a read delay to refine the relative order in which source documents are read, which is useful for implementing arbitrary window policies:

collections:
acmeBank/grouped-transfers:
schema:
# Enrich transfer with a window of *other* transfers.
$ref: transfers.schema.yaml
required: [window]
properties:
window: { type: array }
key: [/id]

derive:
using:
sqlite:
migrations:
- CREATE TABLE transfers (
id INTEGER PRIMARY KEY NOT NULL,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
amount REAL NOT NULL
);
CREATE INDEX idx_transfers_sender ON transfers (sender);
transforms:
- name: enrichAndAddToWindow
source: acmeBank/transfers
shuffle: { key: [/sender] }
lambda: enrichAndAddToWindow.sql

- name: removeFromWindow
source: acmeBank/transfers
shuffle: { key: [/sender] }
readDelay: 24h
lambda: DELETE FROM transfers WHERE id = $id;

Approving Transfers

note

This section expands usage of SQLite task tables and introduces a recursive data flow.

Your users don't always check if they have sufficient funds before starting a transfer, and account overdrafts are becoming common. The product team has tapped you to fix this by enriching each transfer with an approve or deny outcome based on the account balance of the sender.

To do this, you first need to track the sender's current account balance. Clearly an account balance is debited when it's used to sends funds. It's also credited when it receives funds.

But there's a catch: an account can only be credited for funds received from approved transfers! This implies you need a collection of transfer outcomes in order to derive your collection of transfer outcomes 🤯.

This is an example of a self-referential, recursive data-flow. You may have used tools which require that data flow in a Directed Acyclic Graph (DAG). Flow does not require that your data flows are acyclic, and it also supports a derivation that reads from itself, which lets you tackle this task:

collections:
acmeBank/transfer-outcomes:
schema:
# Enrich transfer schema with outcome and the sender's balance.
$ref: transfers.schema.yaml
required: [outcome, sender_balance]
properties:
outcome:
description: Transfer was approved, or denied for insufficient funds.
enum: [approve, deny]
sender_balance: { type: number }
key: [/id]

derive:
using:
sqlite:
migrations:
- CREATE TABLE current_balances (
account TEXT PRIMARY KEY NOT NULL,
balance REAL NOT NULL
);

transforms:
- name: debitSender
source: acmeBank/transfers
# Shuffle on the sender, as we'll debit their balance.
shuffle: { key: [/sender] }
lambda: debitSender.sql

- name: creditRecipient
# When a transfer is approved, we've debited the sender but still need to
# credit the recipient. Read approved transfers from ourselves to do so.
source:
name: acmeBank/transfer-outcomes
partitions:
include:
outcome: [approve]
shuffle: { key: [/recipient] }
lambda:
INSERT INTO current_balances (account, balance) VALUES ($recipient, $amount)
ON CONFLICT DO UPDATE SET balance = balance + $amount;

# Partition output based on the transfer outcome.
projections:
outcome:
location: /outcome
partition: true

Current Account Balances

note

This section introduces TypeScript derivations and reduction annotations.

Your product team is back, and they want a database table keyed by account that contains its up-to-date current balance.

As shown in the previous section, you could create a task table which aggregates each account balance, and then SELECT the current balance after every transfer. For most use cases, this is a great place to start. For interest and variety, you'll solve this problem using TypeScript.

TypeScript derivations require a module which you write. You don't know how to write that module yet, so first implement the derivation specification in balances.flow.yaml. Next run the flowctl generate command, which generates two files:

  • A module stub for you to fill out.
  • A file of TypeScript interfaces which are used by your module.
collections:
acmeBank/balances:
schema: balances.schema.yaml
key: [/user]

derive:
using:
typescript:
module: balances.ts
transforms:
- name: fromOutcomes
source:
name: acmeBank/transfer-outcomes
partitions:
include:
outcome: [approve]
shuffle: any

Next fill out the body of your TypeScript module and write a test:

import { IDerivation, Document, SourceFromOutcomes } from 'flow/acmeBank/balances.ts';

// Implementation for derivation acmeBank/balances.
export class Derivation extends IDerivation {
fromOutcomes(read: { doc: SourceFromOutcomes }): Document[] {
const doc = read.doc;
return [
// Debit the sender.
{ user: doc.sender, balance: -doc.amount },
// Credit the recipient.
{ user: doc.recipient, balance: doc.amount },
];
}
}

One piece is still missing. Your TypeScript module is publishing the change in account balance for each transfer. That's not the same thing as the current balance for each account.

You can ask Flow to sum up the balance changes into a current account balance through reduction annotations. Here's the balances schema, with reduce annotations for summing the account balance:

type: object
required: [user, balance]
reduce: { strategy: merge }
properties:
user: { type: string }
balance:
type: number
reduce: { strategy: sum }

This section has more moving parts that the previous SQL-based examples. You might be wondering, why bother? Fair question! This is just an illustrative example, after all.

While they're more verbose, TypeScript derivations do have certain advantages:

  • TypeScript derivations are strongly typed, and those checks often catch meaningful bugs and defects before they're deployed. Your derivation modules also play nicely with VSCode and other developer tooling.
  • TypeScript derivations can use third-party libraries, as well as your native code compiled to WASM.
  • TypeScript can be easier when working with nested or complex document structures.

Reduction annotations also have some benefits over task state (like SQLite tables):

  • Internal task state is managed by Flow. If it grows to be large (say, you have a lot of accounts), then your task must be scaled and could require performance tuning. Reduction annotations, on the other hand, require no internal state and are extremely efficient.
  • Certain aggregations, such as recursive merging of tree-like structures, are much simpler to express through reduction annotations vs implementing yourself.

See "Where to Accumulate?" for more discussion.

Specification

A derivation is specified as a regular collection with an additional derive stanza:

collections:
# The unique name of the derivation.
acmeCo/my/derivation:
schema: my-schema.yaml
key: [/key]

# Presence of a `derive` stanza makes this collection a derivation.
# Type: object
derive:
# Connector which this derivation uses.
# One of `typescript` or `sqlite`.
using:
# Derivation is using the SQLite connector.
# Optional, type: object
sqlite:
# SQL migrations to apply as inline SQL or file references.
# If a referenced file does not exist
# a stub can be generated using `flowctl generate`.
# Optional, type: array of strings
migrations:
- CREATE TABLE foobar (id INTEGER PRIMARY KEY NOT NULL);
- ../path/to/other/migration.sql

# Derivation is using the TypeScript connector.
# Optional, type: object
typescript:
# TypeScript module implementing this derivation,
# as inline TypeScript or a relative file reference.
# If a referenced file does not exist
# a stub can be generated using `flowctl generate`.
module: acmeModule.ts

# The array of transformations that build this derived collection.
transform:
# Unique name of the transformation, containing only Unicode
# Letters, Numbers, `-`, or `_` (no spaces or other punctuation).
- name: myTransformName
# Source collection read by this transformation.
# Required, type: object or string.
source:
# Name of the collection to be read.
# Required.
name: acmeCo/my/source/collection
# Partition selector of the source collection.
# Optional. Default is to read all partitions.
partitions: {}
# Lower bound date-time for documents which should be processed.
# Source collection documents published before this date-time are filtered.
# `notBefore` is *only* a filter. Updating its value will not cause Flow
# to re-process documents that have already been read.
# Optional. Default is to process all documents.
notBefore: 2023-01-23T01:00:00Z
# Upper bound date-time for documents which should be processed.
# Source collection documents published after this date-time are filtered.
# Like `notBefore`, `notAfter` is *only* a filter. Updating its value will
# not cause Flow to re-process documents that have already been read.
# Optional. Default is to process all documents.
notAfter: 2023-01-23T02:00:00Z

# Lambda of this transform, with a meaning which depends
# on the derivation connector:
# * SQLite derivation lambdas are blocks of SQL code.
# * TypeScript does not use `lambda`, as implementations
# are provided by the derivation's TypeScript module.
# Lambdas can be either inline or a relative file reference.
lambda: SELECT $foo, $bar;
# Delay applied to sourced documents before being processed
# by this transformation.
# Default: No delay, pattern: ^\\d+(s|m|h)$
readDelay: "48h"
# Key by which source documents are shuffled to task shards.
# Optional, type: object.
# If not set, the source collection key is used.
shuffle:
# Composite key of JSON pointers which are extracted from
# source documents.
key: [/shuffle/key/one, /shuffle/key/two]
# Priority applied to documents of this transformation
# relative to other transformations of the derivation.
# Default: 0, integer >= 0
priority: 0

Supported Languages

As with captures and materializations, Flow derivations are built around a plug-able connectors architecture. Derivation connectors encapsulate the details of how documents are transformed, and integrate with Flow's runtime through a common protocol.

At present, Flow supports transformations in SQL using SQLite, and TypeScript.

SQLite

Flow's SQLite connector lets you write plain SQL which is evaluated with each source collection document:

derive:
using:
sqlite: {}
transforms:
- name: fromOrders
source: acmeCo/orders
shuffle: any
lambda:
SELECT $customer,
DATE($timestamp) AS date,
PRINTF('$%.2f', $item_price + $sales_tax) AS cost;

Given an input document:

{
"customer": "Wile E. Coyote",
"timestamp": "2023-04-17T16:45:31Z",
"item_price": 11.5,
"sales_tax": 0.8
}

The derivation will produce an output document like:

{
"customer": "Wile E. Coyote",
"date": "2023-04-17",
"cost": "$12.30"
}

SQLite derivations run within the context of a persistent, managed SQLite database. Most anything you can do within SQLite, you can do within a SQLite derivation.

SQL Lambdas

Lambdas are blocks of one or more SQL statements. They can be defined inline within a Flow specification, or they can be provided as a relative file reference to a file of SQL.

Your SQL lambda code can include any number of statements, and your statements are evaluated in the context of your applied database migrations. Use regular INSERT, UPDATE, and DELETE statements in your SQL blocks to manipulate your internal tables as required.

Any rows which are returned by SQL statements, such as SELECT and also variations like INSERT ... RETURNING, are mapped into documents that are published into your derived collection. Published documents must conform to your collection schema or your derivation task will stop due to the schema violation.

tip

The SQLite connector wraps your lambdas in an enclosing transaction. Do not include BEGIN or COMMIT statements in your lambdas. You may use a SAVEPOINT or ROLLBACK TO.

Document Mapping

In most cases, each named output column of your query becomes a top-level property of its corresponding document.

When you directly select a $parameter its corresponding field name is used. For example, a projection with field name my-field would be queried as SELECT $my_field; and map into a document like {"my-field":"value"}.

A named column such as SELECT $x * 100 AS foo;, maps to a property using the provided name: {"foo": 200}.

Your selected columns may included nested JSON documents, such as SELECT 'hello' AS greeting, JSON_ARRAY(1, 'two', 3) AS items;. The connector looks for SQLite TEXT values which can be parsed into JSON arrays or objects and embeds them into the mapped document: {"greeting": "hello", "items": [1, "two", 3]}. If parsing fails, the raw string is used instead.

As a special case if your query selects a single column having a name that begins with json or JSON, as is common when working with SQLite's JSON functions, then that column will become the output document. For example SELECT JSON_OBJECT('a', 1, 'b', JSON('true')); maps into document {"a": 1, "b": true}. This can be used to build documents with dynamic top-level properties.

Parameters

Your SQL lambda will execute with every source document of the collection it transforms. To access locations within the document, you utilize $parameter placeholders in your SQL code, which bind to projections of the source document. You can use both your defined projections as well as projections which are statically inferred from your source collection's schema.

You can access projected fields that are top-level as well as those which are nested within a source document. Consider the following schematized document:

{"top-level": true, "object": {"foo": 42}, "arr": ["bar"]}

In your SQL code, you can use parameters like $top_level, $object$foo, or $arr$0. If you're unsure of what parameter to use for a given field, try typing something approximate and Flow will suggest the appropriate $parameter.

Migrations

The SQLite connector offers a managed, persistent SQLite database that can accommodate any number of tables, indices, views, triggers, and other schema, as defined by your database migrations. To add a migration, simply append it to the migrations array, either as a block of inline SQL statements or as a relative path to a file of SQL statements:

derive:
using:
sqlite:
migrations:
- CREATE TABLE foo (thing INTEGER NOT NULL);
CREATE INDEX idx_foo_thing foo (thing);
- ../path/to/another/migration.sql
- ALTER TABLE foo ADD COLUMN other_thing TEXT NOT NULL;
- https://example.com/yet/another/migration.sql
caution

You cannot change an existing migration once it has been published. Instead, add a new migration which applies your desired schema.

The tables and other schema you create through database migrations are the internal state of your derivation. They don't directly cause any documents to be published into your derived collection, but changes made to tables in one SQL lambda execution are immediately visible to others. Changes are also durable and transactional: a Flow derivation transaction commits new documents to the derived collection in lockstep with committing changes made to your task tables.

Flow is responsible for the persistence and replication of your SQLite database, and the SQLite connector tracks and will apply your migrations as needed.

Performance

Your observed performance will of course depend on the specifics of your use case, including the size of your task states and the complexity of your source documents and transformations.

Generally speaking, SQLite is very performant and Flow's SQLite connector strives to drive it as efficiently as possible. Real-world use cases are observed to process many tens of thousands of documents per second on a single core.

Flow can also scale your task without downtime by creating point-in-time clones of the database that subdivide the overall workload and storage of the task. Once created, these subdivisions process in parallel across multiple physical machines to enhance performance.

TypeScript

Flow's TypeScript derivation connector transforms your source documents by executing methods of a TypeScript class which you implement. TypeScript derivations are executed using Deno and let you take advantage of the broad ecosystem of available third-party JavaScript and TypeScript libraries, as well as native code compiled to WASM.

TypeScript derivations are strongly typed: Flow maps the JSON schemas of your source and output collections into corresponding TypeScript types, which are type-checked as you develop and test your derivation. This helps catch a wide variety of potential bugs and avoid accidental violations of your collection data contracts.

Modules

The bulk of a TypeScript derivation lives in its associated module, which is a TypeScript source file that exports the class that implements your derivation.

Each derivation also has an accompanying, generated interfaces module. Interface modules are managed by Flow and are purely advisory: they're generated to improve your development experience, but any changes you make are ignored.

The flowctl generate --source path/to/my/derivation.flow.yaml CLI command will generate interface modules under paths like flow_generated/typescript/acmeCo/my-derivation.ts, under the top-level directory under --source having a flow.yaml or flow.json file.

It will also generate a deno.json file in your top-level directory, which is designed to work with developer tooling like VSCode's Deno extension.

See the Current Account Balances tutorial for a concrete example of modules.

State

The abstract IDerivation class generated within the interfaces module includes additional, experimental methods which can be used for persisting and recovering internal state of the connector.

Consult the generated implementation and feel free to reach out to support if you'd like more information on building stateful TypeScript derivations.

Transformations

A transformation binds a source collection to a derivation, causing its documents to be read and processed by the derivation connector.

Read source documents are first shuffled on a shuffle key to co-locate the processing of documents that have equal shuffle keys. The transformation then processes documents by invoking lambdas: user-defined functions that accept documents as arguments, return documents in response, and potentially update internal task state.

A derivation may have many transformations, and each transformation has a long-lived and stable name. Each transformation independently reads documents from its source collection and tracks its own read progress. More than one transformation can read from the same source collection, and transformations may also source from their own derivation, enabling cyclic data-flows and graph algorithms.

Transformations may be added to or removed from a derivation at any time. This makes it possible to, for example, add a new collection into an existing multi-way join, or gracefully migrate to a new source collection without incurring downtime. However, renaming a running transformation is not possible. If attempted, the old transformation is dropped and a new transformation under the new name is created, which begins reading its source collection all over again.

graph LR; d[Derivation]; t[Transformation]; s[Internal State]; l[Lambda]; c[Sourced Collection]; o[Derived Collection]; d-- has many -->t; d-- has one -->s; d-- has one -->o; c-- reads from -->t; t-- invokes -->l; l-- updates -->s; s-- queries -->l; l-- publishes to -->o;

Sources

The source of a transformation is a collection. As documents are published into the source collection, they are continuously read and processed by the transformation.

A partition selector may be provided to process only a subset of the source collection's logical partitions. Selectors are efficient: only partitions that match the selector are read, and Flow can cheaply skip over partitions that don't.

Derivations re-validate their source documents against the source collection's schema as they are read. This is because collection schemas may evolve over time, and could have inadvertently become incompatible with historical documents of the source collection. Upon a schema error, the derivation will pause and give you an opportunity to correct the problem.

Shuffles

As each source document is read, it's shuffled — or equivalently, mapped — on an extracted key.

If you're familiar with data shuffles in tools like MapReduce, Apache Spark, or Flink, the concept is very similar. Flow catalog tasks scale out into multiple shards, each running in parallel on different physical machines, where each shard processes a subset of source documents.

Shuffles let Flow identify the shard that should process a particular source document, in order to co-locate that processing with other documents it may need to know about.

For example, transforms of the Approving Transfers example shuffle on either /sender or /recipient in order to process documents that debit or credit accounts on the specific shard that is uniquely responsible for maintaining the balance of a given account.

graph LR; subgraph s1 [Source Partitions] p1>acmeBank/transfers/part-1]; p2>acmeBank/transfers/part-2]; end subgraph s2 [Derivation Task Shards] t1([derivation/shard-1]); t2([derivation/shard-2]); end p1-- sender: alice -->t1; p1-- recipient: bob -->t2; p2-- recipient: alice -->t1; p2-- sender: bob -->t2;

Flow offers three modes for configuring document shuffles: key, any, and lambda.

shuffle: key

Shuffle keys are defined as an array of JSON pointers to locations that should be extracted from your source documents. This array forms the composite key over which your documents are shuffled:

transforms:
- name: fromOrders
source: acmeCo/orders
shuffle:
key: [/item/product_id, /customer_id]
# Flow guarantees that the same shard will process the user's lambda
# for all instances of a specific (product ID, customer ID) tuple.
lambda: ...

If a derivation has more than one transformation, the shuffle keys of all transformations must align with one another in terms of the extracted key types (string vs integer) as well as the number of components in a composite key.

For example, one transformation couldn't shuffle transfers on [/id] while another shuffles on [/sender], because sender is a string and id an integer.

Similarly mixing a shuffle of [/sender] alongside [/sender, /recipient] is prohibited because the keys have different numbers of components.

shuffle: any

If your lambda doesn't rely on any task state then it may not matter which task shard processes a given source document. In these instances you can use shuffle: any, which allows source documents to be processed by any available task shard.

This is common for transformation lambdas which perform basic filtering or mapping of source documents and which don't require any joined task state.

transforms:
- name: fromOrders
source: acmeCo/orders
shuffle: any
# The user's lambda is a pure function and can be evaluated by any available shard.
lambda:
SELECT $customer_id, $item_price WHERE $item_price > 100;

shuffle: lambda

Warning

Computed shuffles are in active development and are not yet functional.

Your source documents may not always contain an appropriate value to shuffle upon. For instance, you might want to shuffle on product ID and order date, but your source documents contain only an order timestamp field.

You can use shuffle: lambda to define a function that maps your source document into the appropriate shuffle key:

transforms:
- name: fromOrders
source: acmeCo/orders
shuffle:
lambda: SELECT $product_id, DATE($order_timestamp);
# Flow guarantees that the same shard will process the user's lambda
# for all instances of a specific (product ID, date) tuple.
lambda: ...

Your shuffle lambda must return exactly one row, and its columns and types must align with the other shuffles of your derivation transformations.

Flow must know the types of your composite shuffle key. In most cases it will infer these types from the shuffle: key of another transformation. If you have no shuffle: key transformations, Flow will ask that you explicitly tell it your shuffle types:

derive:
using:
sqlite: {}
shuffleKeyTypes: [integer, string]
transforms:
- name: fromOrders
source: acmeCo/orders
shuffle:
lambda: SELECT $product_id, DATE($order_timestamp);
lambda: ...

Lambdas

Lambdas are user-defined functions that are invoked by transformations. They accept documents as arguments and return transformed documents in response. Lambdas can update internal task state, publish documents into the derived collection, or both.

Lambdas are "serverless": Flow manages the execution and scaling of your transformation lambdas on your behalf.

Processing order

Transformations may simultaneously read from many source collections, or even read from the same source collection multiple times.

Roughly speaking, the derivation will globally process transformations and their source documents in the time-based order in which the source documents were originally written to their source collections. This means that a derivation started a month ago and a new copy of the derivation started today, will process documents in the same order and arrive at the same result. Derivations are repeatable.

More precisely, processing order is stable for each individual shuffle key, though different shuffle keys may process in different orders if more than one task shard is used.

Processing order can be attenuated through a read delay or differentiated read priority.

Read delay

A transformation can define a read delay, which will hold back the processing of its source documents until the time delay condition is met. For example, a read delay of 15 minutes would mean that a source document cannot be processed until it was published at least 15 minutes ago. If the derivation is working through a historical backlog of source documents, than a delayed transformation will respect its ordering delay relative to the publishing times of other historical documents also being read.

Event-driven workflows are a great fit for reacting to events as they occur, but aren’t terribly good at taking action when something hasn’t happened:

  • A user adds a product to their cart, but then doesn’t complete a purchase.
  • A temperature sensor stops producing its expected, periodic measurements.

A common pattern for tackling these workflows in Flow is to read a source collection without a delay and update an internal state. Then, read a collection with a read delay and determine whether the desired action has happened or not. For example, source from a collection of sensor readings and index the last timestamp of each sensor. Then, source the same collection again with a read delay: if the register timestamp isn't more recent than the delayed source reading, the sensor failed to produce a measurement.

Flow read delays are very efficient and scale better than managing very large numbers of fine-grain timers.

See Grouped Windows of Transfers for an example using a read delay Learn more from the Citi Bike "idle bikes" example

Read priority

Sometimes it's necessary for all documents of a source collection to be processed by a transformation before any documents of some other source collection are processed, regardless of their relative publishing time. For example, a collection may have corrections that should be applied before the historical data of another collection is re-processed.

Transformation priorities allow you to express the relative processing priority of a derivation's various transformations. When priorities are not equal, all available source documents of a higher-priority transformation are processed before any source documents of a lower-priority transformation.

Internal State

Derivation tasks often require an internal state, perhaps to hold a partial aggregation or join result. Internal state is not a direct part of the output of a derivation. Instead, transformation lambdas query and update internal state as they process source documents and return derived documents.

For SQLite derivations, the entire SQLite database is the internal state of the task. TypeScript derivations can use in-memory states with a recovery and checkpoint mechanism. Estuary intends to offer an additional mechanisms for automatic internal state snapshot and recovery in the future.

The exact nature of internal task states vary, but under the hood they're backed by a replicated embedded RocksDB instance which is co-located with the task shard execution contexts that Flow manages. As contexts are assigned and re-assigned, their state databases travel with them.

If a task shard needs to be scaled out, Flow is able to perform an online split, which cheaply clones its state database into two new databases — and paired shards — which are re-assigned to other machines.

Where to accumulate?

Derivation collection schemas may have reduction annotations, and lambdas can be combined with reductions in interesting ways. You may be familiar with map and reduce functions built into languages like Python, JavaScript, and many others or have used tools like MapReduce or Spark. In functional terms, lambdas you write within Flow are "mappers," and reductions are always done by the Flow runtime using your schema annotations.

This means that, when you implement a derivation, you get to choose where accumulation will happen:

  1. Your lambdas can update and query aggregates stored in internal task state. Approving Transfers is an example that maintains account balances in a SQLite table.
  2. Or, your lambdas can compute changes of an aggregate, which are then reduced by Flow using reduction annotations. Current Account Balances is an example that combines a lambda with a reduce annotation.

These two approaches can produce equivalent results, but they do so in very different ways.

Accumulate in Internal Task State

You can accumulate using the internal state of your derivation: for instance, by using an internal table within your SQLite derivation. You then write lambdas which update that state, or query it to publish derived documents.

For example, consider a collection that’s summing a value:

TimeStateLambdasDerived Document
T00UPDATE val = val + 5; SELECT val;5
T15UPDATE val = val - 1; SELECT val;4
T24UPDATE val = val + 2; SELECT val;6
T36

Using a derivation's internal state is a great solution if you expect to materialize the derived collection into a non-transactional store. That's because its documents are complete statements of the current answer, and can be correctly applied to systems that support only at-least-once semantics.

They’re also well-suited for materializations into endpoints that aren't stateful, such as Pub/Sub systems or Webhooks.

Accumulate in a Database

To accumulate in your materialization endpoint, such as a database, you define a derivation with a reducible schema and implement lambdas which publish the changes to a current answer. The Flow runtime then uses your reduction annotations to combine the documents published into your derived collection.

Later, when the collection is materialized, your reduction annotations are applied again to reduce each collection document into a final, fully-reduced value for each collection key that's kept up to date in the materialized table.

A key insight is that the database is the only stateful system in this scenario. The derivation itself is stateless, with lambdas that are pure functions, which is typically extremely performant.

Returning to our summing example:

TimeDBLambdasDerived Document
T00SELECT 5;5
T15SELECT -1;-1
T24SELECT 2;2
T36

This works especially well when materializing into a transactional database. Flow couples its processing transactions with corresponding database transactions, ensuring end-to-end “exactly once” semantics.

When materializing into a non-transactional store, Flow is only able to provide weaker “at least once” semantics; it’s possible that a document may be combined into a database value more than once. Whether that’s a concern depends a bit on the task at hand. Some reductions like merge can be applied repeatedly without changing the result, while in other use cases approximations are acceptable. For the summing example above, "at-least-once" semantics could give an incorrect result.

Learn more in the derivation pattern examples of Flow's repository