flow
Search…
Derivations
How to transform and join data in Flow
This documentation covers most use cases of transforms. You can find additional information about specific features not discussed here by using flowctl json-schema on the command line.
Derivations collectively refer to any transformation or join that you can do in Flow aside from schema-based reduction annotations. They define how documents are derived from other collections. A collection without a derivation is referred to as a captured collection.
Derivations in Flow are objects that use the following entities:
1
# To be nested within a colleciton definition in a catalog spec.
2
derivation:
3
4
# A register holds the internal states of a derivation, which can be read and updated by
5
# all of its transforms. Any stateful transformation (joins, for example) must use a register
6
# to store its state.
7
# When reading source documents, each document is mapped to a single register using the
8
# shuffle key, or the key of the source collection if a shuffle key is not explicitly defined.
9
# "Update" lambdas of the transformation produce updates which are reduced into the register,
10
# and a "publish" lambda reads the current (and previous, if updated) register value.
11
# Optional, type: object
12
register:
13
14
# A register always has an associated JSON schema, which may also use reduction annotations.
15
# Schemas may be specified either as URIs or inline, just like for collection schemas.
16
# However, it is recommended to use URIs as a best practice and store schemas seperately.
17
# Details can be found on the "Schemas" page.
18
# Required, type: object | string
19
schema: {type: [integer, 'null']}
20
21
# Registers allow an initial value to be passed in for a document that has never been updated.
22
# default: null
23
initial: null
24
25
# Defines the set of transformations that produce the documents in this collection.
26
# Each transformation reads and shuffles documents of a single source collection, and processes each document
27
# through either one or both of a register "update" lambda and a derived document "publish" lambda.
28
transform:
29
30
# Name of the transformation, which is used in determining the name of associated typescript
31
# functions.
32
title:
33
34
# Update lambda that maps a source document to register updates.
35
# Optional, type: object
36
update: {lambda: typescript}
37
38
# Publish lambda that maps a source document and registers into derived documents of the
39
# collection.
40
# Optional, type: object
41
publish: {lambda: typescript}
42
43
# Source collection read by this transformation.
44
# Required, type: object
45
source:
46
# name of the source collection used by this transformation.
47
name: example/collection
48
49
# Shuffle by which source documents are mapped to registers. Composite key of JSON pointers.
50
# default: key of source collection.
51
shuffle: [/fieldA, /field/B]
52
53
# Delay applied to documents processed by this transformation.
54
# Delays are applied as an adjustment to the UUID clock encoded within each document, which
55
# is then used to impose a relative ordering of all documents read by this derivation. This
56
# means that read delays are applied in a consistent way, even when back-filling over
57
# historical documents. When caught up and tailing the source collection, delays also "gate"
58
# documents such that they aren't processed until the current wall-time reflects the delay.
59
# default: null, pattern: ^\\d+(s|m|h)$
60
readDelay: "48h"
61
62
# When all transforms are of equal priority, Flow processes documents according to their
63
# associated publishing time, as encoded in the document UUID.
64
# However, when one transform has a higher priority than others, then *all* ready documents
65
# are processed through the transform before *any* documents of other transforms are processed.
66
# default: null, integer => 0
67
priority: 0
Copied!
A TypeScript lambda is referenced by update and publish, which references a TypeScript file. Learn more on the lambdas page.
Let's take a look at a simple real-life derivation that uses all of the above concepts. This example is part of a catalog that works with Citi-bike data and calls an API to create a list of bikes that haven't moved in two days.
1
derivation:
2
register:
3
# Store the most-recent ride timestamp for each bike_id in a register,
4
# and default to null if the bike hasn't ridden before.
5
schema: { type: [string, "null"] }
6
initial: null
7
8
transform:
9
liveRides:
10
source:
11
name: examples/citi-bike/rides
12
# This is an arbitrary pointer to an example collection in our
13
# Flow git repository.
14
shuffle: { key: [/bike_id] }
15
update: { lambda: typescript }
16
17
delayedRides:
18
source:
19
name: examples/citi-bike/rides
20
shuffle: { key: [/bike_id] }
21
# Use a 2-day read delay, relative to the document's ingestion.
22
# To see read delays in action within a short-lived
23
# testing contexts, try using a smaller value (e.g., 2m).
24
readDelay: "48h"
25
publish: { lambda: typescript }
Copied!
The above derivation makes use of all available entities except for priority.

Registers

Under the hood, registers are backed by replicated, embedded RocksDB instances which co-locate one-to-one with the lambda execution contexts that Flow manages. As contexts are assigned and re-assigned, their DBs travel with them.
If any single RocksDB instance becomes too large, Flow can perform an online split, which subdivides its contents into two new databases — and paired execution contexts — which are re-assigned to other machines.
Last modified 3mo ago
Copy link
Contents
Registers