Skip to main content

Derivations

A derivation is a collection that continuously derives its documents from transformations that are applied to one or more other source collections. It defines the collection of derived documents as well as a catalog task. The task processes documents of source collections as they become available, transforming them into updates of the derived collection.

In addition to their collection, derivations are defined by their transformations and registers.

Specification

A derivation is specified as a regular collection with an additional derivation stanza:

collections:
# The unique name of the derivation.
acmeCo/my/derivation:
schema: my-schema.yaml
key: [/key]

# Presence of a `derivation` stanza makes this collection a derivation.
# Type: object
derivation:

# Register definition of the derivation.
# If not provided, registers have an unconstrained schema
# and initialize to the `null` value.
# Optional, type: object
register:

# JSON Schema of register documents. As with collection schemas,
# this is either an inline definition or a relative URL reference.
# Required, type: string (relative URL form) or object (inline form)
schema:
type: integer

# Initial value taken by a register which has never been updated before.
# Optional, default: null
initial: 0

# Transformations of the derivation,
# specified as a map of named transformations.
transform:

# Unique name of the transformation, containing only Unicode
# Letters and Numbers (no spaces or punctuation).
myTransformName:

# Source collection read by this transformation.
# Required, type: object
source:
# Name of the collection to be read.
# Required.
name: acmeCo/my/source/collection
# JSON Schema to validate against the source collection.
# If not set, the schema of the source collection is used.
# Optional, type: string (relative URL form) or object (inline form)
schema: {}
# Partition selector of the source collection.
# Optional. Default is to read all partitions.
partitions: {}

# Delay applied to sourced documents before being processed
# by this transformation.
# Default: No delay, pattern: ^\\d+(s|m|h)$
readDelay: "48h"

# Shuffle determines the key by which source documents are
# shuffled (mapped) to a register.
# Optional, type: object.
# If not provided, documents are shuffled on the source collection key.
shuffle:
# Key is a composite key which is extracted from documents
# of the source.
key: [/shuffle/key/one, /shuffle/key/two]

# Update lambda of the transformation.
# Optional, type: object
update: {lambda: typescript}

# Publish lambda of the transformation.
# Optional, type: object
publish: {lambda: typescript}

# Priority applied to processing documents of this transformation
# relative to other transformations of the derivation.
# Default: 0, integer >= 0
priority: 0

Background

The following sections will refer to the following common example to illustrate concepts.

Suppose you have an application through which users send one another some amount of currency, like in-game tokens or dollars or digital kittens:

collections:
# Collection of 💲 transfers between accounts:
# {id: 123, sender: alice, recipient: bob, amount: 32.50}
acmeBank/transfers:
schema: transfers.schema.yaml
key: [/id]

There are many views over this data that you might require, such as summaries of sender or receiver activity, or current account balances within your application.

Transformations

A transformation binds a source collection to a derivation. As documents of the source collection arrive, the transformation processes the document to publish new documents, update a register, or both.

Read source documents are shuffled on a shuffle key to co-locate the processing of documents that have equal shuffle keys. The transformation then processes documents by invoking lambdas: user-defined functions that accept documents as arguments and return documents in response.

A derivation may have many transformations, and each transformation has a long-lived and stable name. Each transformation independently reads documents from its source collection and tracks its own read progress. More than one transformation can read from the same source collection, and transformations may also source from their own derivation, enabling cyclic data-flows and graph algorithms.

Transformations may be added to or removed from a derivation at any time. This makes it possible to, for example, add a new collection into an existing multi-way join, or gracefully migrate to a new source collection without incurring downtime. However, renaming a running transformation is not possible. If attempted, the old transformation is dropped and a new transformation under the new name is created, which begins reading its source collection all over again.

graph LR; d[Derivation]; t[Transformation]; r[Registers]; p[Publish λ]; u[Update λ]; c[Sourced Collection]; d-- has many -->t; t-- reads from -->c; t-- invokes -->u; t-- invokes -->p; u-- updates -->r; r-- reads -->p; d-- indexes -->r;

Sources

The source of a transformation is a collection. As documents are published into the source collection, they are continuously read and processed by the transformation.

A partition selector may be provided to process only a subset of the source collection's logical partitions. Selectors are efficient: only partitions that match the selector are read, and Flow can cheaply skip over partitions that don't.

Derivations re-validate their source documents against the source collection's schema as they are read. This is because collection schemas may evolve over time, and could have inadvertently become incompatible with historical documents of the source collection. Upon a schema error, the derivation will pause and give you an opportunity to correct the problem.

You may also provide an alternative source schema. Source schemas aide in processing third-party sources of data that you don't control, which can have unexpected schema changes without notice. You may want to capture this data with a minimal and very permissive schema. Then, a derivation can apply a significantly stricter source schema, which verifies your current expectations of what the data should be. If those expectations turn out to be wrong, little harm is done: your derivation is paused but the capture continues to run. You must simply update your transformations to account for the upstream changes and then continue without any data loss.

Shuffles

As each source document is read, it's shuffled — or equivalently, mapped — on an extracted key.

If you're familiar with data shuffles in tools like MapReduce, Apache Spark, or Flink, the concept is very similar. Flow catalog tasks scale to run across many machines at the same time, where each machine processes a subset of source documents. Shuffles let Flow know how to group documents so that they're co-located, which can increase processing efficiency and reduce data volumes. They are also used to map source documents to registers.

graph LR; subgraph s1 [Source Partitions] p1>acmeBank/transfers/part-1]; p2>acmeBank/transfers/part-2]; end subgraph s2 [Derivation Task Shards] t1([task/shard-1]); t2([task/shard-2]); end p1-- sender: alice -->t1; p1-- sender: bob -->t2; p2-- sender: alice -->t1; p2-- sender: bob -->t2;

If you don't provide a shuffle key, Flow will shuffle on the source collection key, which is typically what you want.

If a derivation has more than one transformation, the shuffle keys of all transformations must align with one another in terms of the extracted key types (string or integer) as well as the number of components in a composite key. For example, one transformation couldn't shuffle transfers on [/id] while another shuffles on [/sender], because sender is a string and id an integer. Similarly mixing a shuffle of [/sender] alongside [/sender, /recipient] is prohibited because the keys have different numbers of components. However, one transformation can shuffle on [/sender] while another shuffles on [/recipient], as in the examples below.

Publish lambdas

A publish lambda publishes documents into the derived collection.

To illustrate first with an example, suppose you must know the last transfer from each sender that was over $100:

import:
- transfers.flow.yaml

collections:
acmeBank/last-large-send:
schema: transfers.schema.yaml
key: [/sender]

derivation:
transform:
fromTransfers:
source:
name: acmeBank/transfers
publish:
lambda: typescript

This transformation defines a TypeScript publish lambda, which is implemented in an accompanying TypeScript module. The lambda is invoked as each source transfer document arrives. It is given the source document, and also includes the a _register and _previous register, which are not used here. Registers are discussed in depth below. The lambda outputs zero or more documents, each of which must conform to the derivation's schema.

As this derivation's collection is keyed on /sender, the last published document (the last large transfer) of each sender is retained. If it were instead keyed on /id, then all transfers with large amounts would be retained. In SQL terms, the collection key acts as a GROUP BY.

tip

Flow will initialize a TypeScript module for your lambdas if one doesn't exist, with stubs of the required interfaces and TypeScript types that match your schemas. You just write the function body.

Learn more about TypeScript generation


Derivation collection schemas may have reduction annotations, and publish lambdas can be combined with reductions in interesting ways.

You may be familiar with map and reduce functions built into languages like Python, JavaScript; and many others, or have used tools like MapReduce or Spark. In functional terms, lambdas you write within Flow are "mappers," and reductions are always done by the Flow runtime using your schema annotations.

Suppose you need to know the running account balances of your users given all of their transfers thus far. Tackle this by reducing the final account balance for each user from all of the credit and debit amounts of their transfers:

import:
- transfers.flow.yaml

collections:
acmeBank/balances:
schema:
type: object
required: [user]
reduce: { strategy: merge }
properties:
user: { type: string }
balance:
type: number
reduce: { strategy: sum }
key: [/user]

derivation:
transform:
fromTransfers:
source:
name: acmeBank/transfers
publish:
lambda: typescript

Registers

Registers are the internal memory of a derivation. They are a building block that enable derivations to tackle advanced stateful streaming computations like multi-way joins, windowing, and transaction processing. As we've already seen, not all derivations require registers, but they are essential for a variety of important use cases.

Each register is a document with a user-defined schema. Registers are keyed, and every derivation maintains an index of keys and their corresponding register documents. Every source document is mapped to a specific register document through its extracted shuffle key.

For example, when shuffling acmeBank/transfers on [/sender], each account ("alice", "bob", or "carol") is allocated its own register. If you instead shuffle on [/sender, /recipient] then each pair of accounts ("alice -> bob", "alice -> carol", "bob -> carol") is allocated a register.

Registers are best suited for relatively small, fast-changing documents that are shared within and across the transformations of a derivation. The number of registers indexed within a derivation may be very large, and if a register has never before been used, it starts with a user-defined initial value. From there, registers may be modified through an update lambda.

info

Under the hood, registers are backed by replicated, embedded RocksDB instances, which co-locate with the lambda execution contexts that Flow manages. As contexts are assigned and re-assigned, their register databases travel with them.

If any single RocksDB instance becomes too large, Flow is able to perform an online split, which subdivides its contents into two new databases — and paired execution contexts — which are re-assigned to other machines.

Update lambdas

An update lambda transforms a source document into an update of the source document's register.

To again illustrate through an example, suppose your compliance department wants you to flag the first transfer a sender sends to a new recipient. You achieve this by shuffling on pairs of [/sender, /recipient] and using a register to track whether this account pair has been seen before:

import:
- transfers.flow.yaml

collections:
acmeBank/first-send:
schema: transfers.schema.yaml
key: [/id]

derivation:
# We'll store a `true/false` boolean in our register documents,
# which is initially `false` and becomes `true` after the first transfer.
register:
schema: { type: boolean }
initial: false

transform:
fromTransfers:
source:
name: acmeBank/transfers
# Shuffle so that each account pair
# is allocated its own register.
shuffle:
key: [/sender, /recipient]
update:
lambda: typescript
publish:
lambda: typescript

This transformation uses both a publish and an update lambda, implemented in an accompanying TypeScript module. The update lambda is invoked first for each source document, and it returns zero or more documents, which each must conform to the derivation's register schema (in this case, a simple boolean).

The publish lambda is invoked next, and is given the source document as well as the before (previous) and after (_register) values of the updated register. In this case, we don't need the after value: our update lambda implementation implies that it's always true. The before value, however, tells us whether this was the very first update of this register, and by implication was the first transfer for this pair of accounts.

sequenceDiagram autonumber Derivation->>Update λ: update({sender: alice, recipient: bob})? Update λ-->>Derivation: return "true" Derivation->>Registers: lookup(key = [alice, bob])? Registers-->>Derivation: not found, initialize as "false" Derivation-->>Derivation: Register: "false" => "true" Derivation-)Registers: store(key = [alice, bob], value = "true") Derivation->>Publish λ: publish({sender: alice, recipient: bob}, register = "true", previous = "false")? Publish λ-->>Derivation: return {sender: alice, recipient: bob}
FAQ

Why not have one lambda that can return a register update and derived documents?

Performance. Update and publish are designed to be parallelized and pipelined over many source documents simultaneously, while still giving the appearance and correctness of lambdas are invoked in strict serial order. Notice that (1) above doesn't depend on actually knowing the register value, which doesn't happen until (4). Many calls like (1) can also happen in parallel, so long as their applications to the register value (5) happen in the correct order. In comparison, a single-lambda design would require Flow to await each invocation before it can begin the next.


Register schemas may also have reduction annotations, and documents returned by update lambdas are reduced into the current register value.

The compliance department reached out again, and this time they need you to identify transfers where the sender's account had insufficient funds.

You manage this by tracking the running credits and debits of each account in a register. Then, you enrich each transfer with the account's current balance and whether the account was overdrawn:

import:
- transfers.flow.yaml

collections:
acmeBank/flagged-transfers:
schema:
# Extend transfer schema with `balance` and `overdrawn` fields.
$ref: transfers.schema.yaml
required: [balance, overdrawn]
properties:
balance: { type: number }
overdrawn: { type: boolean }
key: [/id]

projections:
# Logically partition on transfers which are flagged as overdrawn.
overdrawn:
location: /overdrawn
partition: true

derivation:
# Registers track the current balance of each account.
register:
schema:
type: number
reduce: { strategy: sum }
initial: 0

transform:
fromTransferSender:
source: { name: acmeBank/transfers }
shuffle: { key: [/sender] }
# Debit the sender's register balance.
update: { lambda: typescript }
# Publish transfer enriched with current sender balance.
publish: { lambda: typescript }

fromTransferRecipient:
source: { name: acmeBank/transfers }
shuffle: { key: [/recipient] }
# Credit the recipient's register balance.
update: { lambda: typescript }

Source transfers are read twice. The first read shuffles on /recipient to track account credits, and the second shuffles on /sender to track account debits and to publish enriched transfer events. Update lambdas return the amount of credit or debit, and these amounts are summed into a derivation register keyed on the account.

sequenceDiagram autonumber Derivation->>Registers: lookup(key = alice)? Registers-->>Derivation: not found, initialize as 0 Derivation->>Update λ: update({recipient: alice, amount: 50, ...})? Update λ-->>Derivation: return +50 Derivation->>Update λ: update({sender: alice, amount: 75, ...})? Update λ-->>Derivation: return -75 Derivation-->>Derivation: Register: 0 + 50 => 50 Derivation-->>Derivation: Register: 50 - 75 => -25 Derivation->>Publish λ: publish({sender: alice, amount: 75, ...}, register = -25, previous = 50)? Publish λ-->>Derivation: return {sender: alice, amount: 75, balance: -25, overdrawn: true}

Processing order

Derivations may have multiple transformations that simultaneously read from different source collections, or even multiple transformations that read from the same source collection.

Roughly speaking, the derivation will globally process transformations and their source documents in the time-based order in which the source documents were originally written to their source collections. This means that a derivation started a month ago and a new copy of the derivation started today, will process documents in the same order and arrive at the same result. Derivations are repeatable.

More precisely, processing order is stable for each individual shuffle key, though different shuffle keys may process in different orders if more than one task shard is used.

Processing order can be attenuated through a read delay or differing transformation priority.

Read delay

A transformation can define a read delay, which will hold back the processing of its source documents until the time delay condition is met. For example, a read delay of 15 minutes would mean that a source document cannot be processed until it was published at least 15 minutes ago. If the derivation is working through a historical backlog of source documents, than a delayed transformation will respect its ordering delay relative to the publishing times of other historical documents also being read.

Event-driven workflows are a great fit for reacting to events as they occur, but aren’t terribly good at taking action when something hasn’t happened:

  • A user adds a product to their cart, but then doesn’t complete a purchase.
  • A temperature sensor stops producing its expected, periodic measurements.

A common pattern for tackling these workflows in Flow is to read a source collection without a delay and update a register. Then, read a collection with a read delay and determine whether the desired action has happened or not. For example, source from a collection of sensor readings and index the last timestamp of each sensor in a register. Then, source the same collection again with a read delay: if the register timestamp isn't more recent than the delayed source reading, the sensor failed to produce a measurement.

Flow read delays are very efficient and scale better than managing very large numbers of fine-grain timers.

Learn more from the Citi Bike idle bikes example

Read priority

Sometimes it's necessary for all documents of a source collection to be processed by a transformation before any documents of some other source collection are processed, regardless of their relative publishing time. For example, a collection may have corrections that should be applied before the historical data of another collection is re-processed.

Transformation priorities allow you to express the relative processing priority of a derivation's various transformations. When priorities are not equal, all available source documents of a higher-priority transformation are processed before any source documents of a lower-priority transformation.

Where to accumulate?

When you build a derived collection, you must choose where accumulation will happen: whether Flow will reduce into documents held within your materialization endpoint, or within the derivation's registers. These two approaches can produce equivalent results, but they do so in very different ways.

Accumulate in your database

To accumulate in your materialization endpoint, such as a database, you define a derivation with a reducible schema and use only publish lambdas and no registers. The Flow runtime uses your reduction annotations to combine published documents, which are written to the collection. It then fully reduces collection documents into the values stored in the database. This keeps the materialized table up to date.

A key insight is that the database is the only stateful system in this scenario, and Flow uses reductions in two steps:

  1. To combine many published documents into intermediate delta documents, which are the documents written to collection storage.
  2. To reduce delta states into the final database-stored document.

For example, consider a collection that’s summing a value:

TimeDBLambdasDerived Document
T00publish(2, 1, 2)5
T15publish(-2, 1)-1
T24publish(3, -2, 1)2
T36publish()

This works especially well when materializing into a transactional database. Flow couples its processing transactions with corresponding database transactions, ensuring end-to-end “exactly once” semantics.

When materializing into a non-transactional store, Flow is only able to provide weaker “at least once” semantics; it’s possible that a document may be combined into a database value more than once. Whether that’s a concern depends a bit on the task at hand. Some reductions can be applied repeatedly without changing the result (they're "idempotent"), while in other use cases approximations are acceptable. For the summing example above, "at-least-once" semantics could give an incorrect result.

Accumulate in registers

To accumulate in registers, you use a derivation that defines a reducible register schema that's updated through update lambdas. The Flow runtime allocates, manages, and scales durable storage for registers; you don’t have to. Then you use publish lambdas to publish a snapshot of your register value into your collection.

Returning to our summing example:

TimeRegisterLambdasDerived Document
T00update(2, 1, 2), publish(register)5
T15update(-2, 1), publish(register)4
T24update(3, -2, 1), publish(register)6
T36update()

Register derivations are a great solution for materializations into non-transactional stores because the documents they produce can be applied multiple times without breaking correctness.

They’re also well-suited for materializations into endpoints that aren't stateful, such as pub/sub systems or Webhooks, because they can produce fully reduced values as stand-alone updates.

Learn more in the derivation pattern examples of Flow's repository