Implementing Derivations for AcmeBank
The following tutorial sections use an illustrative example to introduce you to derivations, how you might use them, and their common components. We'll discuss each component in depth in subsequent sections of this page, but we recommend you start here to get your bearings.
Suppose you have an application through which users send one another
some amount of currency, like in-game tokens or dollars or digital kittens.
You have a transfers
collection of user-requested transfers,
each sending funds from one account to another:
- transfers.flow.yaml
- transfers.schema.yaml
collections:
# Collection of 💲 transfers between accounts:
# {id: 123, sender: alice, recipient: bob, amount: 32.50}
acmeBank/transfers:
schema: transfers.schema.yaml
key: [/id]
type: object
properties:
id: { type: integer }
sender: { type: string }
recipient: { type: string }
amount: { type: number }
required: [id, sender, recipient, amount]
There are many views over this data that you might require, such as summaries of sender or receiver activity, or current account balances within your application.
Filtering Large Transfers​
This section introduces SQLite derivations, SQL lambda blocks and $parameters
.
Your compliance department has reached out, and they require an understanding of the last large transfer (if any) made by each user account.
You create a SQL derivation to help them out.
The transfers
collection is keyed on the transfer /id
,
so you'll need to re-key your derivation on the /sender
account.
You also need to filter out transfers that aren't large enough.
Putting this all together:
- last-large-send.flow.yaml
- last-large-send-test.flow.yaml
collections:
acmeBank/last-large-send:
schema: transfers.schema.yaml
key: [/sender]
derive:
using:
sqlite: {}
transforms:
- name: filterTransfers
source: acmeBank/transfers
shuffle: any
lambda: SELECT $id, $sender, $recipient, $amount WHERE $amount > 100;
tests:
acmeBank/tests/last-large-send:
- ingest:
collection: acmeBank/transfers
description: Initial set of transfers amongst users
documents:
- { id: 1, sender: alice, recipient: bob, amount: 125.10 }
- { id: 2, sender: bob, recipient: alice, amount: 10.22 }
- { id: 3, sender: carol, recipient: bob, amount: 327.00 }
- ingest:
collection: acmeBank/transfers
description: Alice and Carol later send additional transfers.
documents:
- { id: 4, sender: alice, recipient: carol, amount: 32.50 }
- { id: 5, sender: carol, recipient: alice, amount: 226.73 }
- verify:
collection: acmeBank/last-large-send
description:
Expect the most-recent of Carol's large transfers is tracked,
along with Alice's only large transfer.
documents:
- { id: 1, sender: alice, recipient: bob, amount: 125.10 }
- { id: 5, sender: carol, recipient: alice, amount: 226.73 }
derive: using: sqlite: {}
tells Flow that collection
acmeBank/last-large-send
is derived using Flow's SQLite derivation connector.
This derivation has just one transform, which sources from the transfers
collection.
As source documents become available, they're evaluated by the SQL lambda
and its SELECT
output is published to the derived collection.
Your SQL queries access locations of source documents through $parameter bindings.
The compliance department then materializes this collection to their preferred destination, for an always up-to-date view indexed by each account.
Finding New Account Pairs​
This section introduces SQLite migrations and internal task tables.
The fraud team needs your help: they have a new process they must run the first time some sending account sends funds to a receiving account. They would like to see only those transfers which reflect a new account pair of (sender, recipient). To tackle this you need to know which account pairs have been seen before.
SQLite derivations run within the context of a persistent, managed SQLite database.
You can apply database migrations that create whatever tables, triggers, or views you might need.
Then, the statements of your SQL lambda code can INSERT
, UPDATE
, or DELETE
from those tables, query from them, or any other operation supported by SQLite.
The tables and other schema you create through your migrations
are the internal state of your task.
- first-send.flow.yaml
- first-send-test.flow.yaml
collections:
acmeBank/first-send:
schema: transfers.schema.yaml
key: [/id]
derive:
using:
sqlite:
migrations:
- CREATE TABLE seen_pairs (
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
PRIMARY KEY (sender, recipient)
);
transforms:
- name: fromTransfers
source: acmeBank/transfers
shuffle:
key: [/sender, /recipient]
lambda:
INSERT INTO seen_pairs (sender, recipient) VALUES ($sender, $recipient)
ON CONFLICT DO NOTHING
RETURNING $id, $sender, $recipient, $amount;
tests:
acmeBank/tests/first-send:
- ingest:
collection: acmeBank/transfers
documents:
- { id: 1, sender: alice, recipient: bob, amount: 10.25 }
- { id: 2, sender: alice, recipient: bob, amount: 13.40 }
- { id: 3, sender: carol, recipient: alice, amount: 12.50 }
- { id: 4, sender: alice, recipient: carol, amount: 16.96 }
- { id: 5, sender: carol, recipient: alice, amount: 2.36 }
- { id: 6, sender: alice, recipient: carol, amount: 7.13 }
- verify:
collection: acmeBank/first-send
description: Expect to see only the first interaction of each account pair.
documents:
- { id: 1, sender: alice, recipient: bob, amount: 10.25 }
- { id: 3, sender: carol, recipient: alice, amount: 12.50 }
- { id: 4, sender: alice, recipient: carol, amount: 16.96 }
This time, the derivation attempts to INSERT
into the seen_pairs
table,
and uses SQLite's RETURNING
syntax to only publish documents for rows which were successfully inserted.
You can evolve the internal SQLite tables of your derivation as needed,
by appending SQL blocks which perform a database migration to the migrations
array.
Any migrations appended to the list are automatically applied by Flow.
Grouped Windows of Transfers​
This section introduces delayed reads, and applies them to implement a custom window policy.
The fraud team is back, and now needs to know the other transfers which an account has made in the last day. They want you to enrich each transfer with the grouping of all transfers initiated by that account in the prior 24 hours.
You may have encountered "windowing" in other tools for stream processing. Some systems even require that you define a window policy in order to function. Flow does not use windows, but sometimes you do want a time-bound grouping of recent events.
All collection documents contain a wall-clock timestamp of when they were published. The transforms of a derivation will generally process source documents in ascending wall-time order. You can augment this behavior by using a read delay to refine the relative order in which source documents are read, which is useful for implementing arbitrary window policies:
- grouped.flow.yaml
- enrichAndAddToWindow.sql
- grouped-test.flow.yaml
collections:
acmeBank/grouped-transfers:
schema:
# Enrich transfer with a window of *other* transfers.
$ref: transfers.schema.yaml
required: [window]
properties:
window: { type: array }
key: [/id]
derive:
using:
sqlite:
migrations:
- CREATE TABLE transfers (
id INTEGER PRIMARY KEY NOT NULL,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
amount REAL NOT NULL
);
CREATE INDEX idx_transfers_sender ON transfers (sender);
transforms:
- name: enrichAndAddToWindow
source: acmeBank/transfers
shuffle: { key: [/sender] }
lambda: enrichAndAddToWindow.sql
- name: removeFromWindow
source: acmeBank/transfers
shuffle: { key: [/sender] }
readDelay: 24h
lambda: DELETE FROM transfers WHERE id = $id;
-- Enrich the transfer with a nested array of other transfers in the window.
WITH w AS (
SELECT JSON_GROUP_ARRAY(JSON_OBJECT(
'id', id,
'recipient', recipient,
'amount', amount
)) AS window
FROM transfers WHERE sender = $sender
)
SELECT $id, $sender, $recipient, $amount, w.window FROM w;
-- Add the current transfer to the window.
INSERT INTO transfers (id, sender, recipient, amount)
VALUES ($id, $sender, $recipient, $amount);
tests:
acmeBank/tests/grouped-transfers:
- ingest:
description: Initial transfers.
collection: acmeBank/transfers
documents:
- { id: 1, sender: alice, recipient: bob, amount: 6.45 }
- { id: 2, sender: bob, recipient: carol, amount: 9.72 }
- { id: 3, sender: bob, recipient: alice, amount: 12.50 }
- verify:
description: Expect transfers were enriched with their current window.
Note that a verify step will advance test time forward until all
source documents have been read by all transforms,
meaning that the window has been reset.
collection: acmeBank/grouped-transfers
documents:
- { amount: 6.45, id: 1, sender: alice, recipient: bob, window: [] }
- { amount: 9.72, id: 2, sender: bob, recipient: carol, window: [] }
- {
id: 3,
sender: bob,
recipient: alice,
amount: 12.5,
window: [{ id: 2, recipient: carol, amount: 9.72 }],
}
- ingest:
collection: acmeBank/transfers
documents:
- { id: 4, sender: alice, recipient: bob, amount: 1.96 }
- { id: 5, sender: alice, recipient: carol, amount: 2.36 }
- { id: 6, sender: bob, recipient: alice, amount: 7.13 }
- { id: 7, sender: alice, recipient: bob, amount: 2.57 }
- verify:
collection: acmeBank/grouped-transfers
documents:
- { id: 1 }
- { id: 2 }
- { id: 3 }
- { id: 4, amount: 1.96, sender: alice, recipient: bob, window: [] }
- {
id: 5,
amount: 2.36,
sender: alice,
recipient: carol,
window: [{ id: 4, amount: 1.96, recipient: bob }],
}
- { id: 6, sender: bob, recipient: alice, amount: 7.13, window: [] }
- {
id: 7,
amount: 2.57,
sender: alice,
recipient: bob,
window:
[
{ id: 4, amount: 1.96, recipient: bob },
{ id: 5, amount: 2.36, recipient: carol },
],
}
Approving Transfers​
This section expands usage of SQLite task tables and introduces a recursive data flow.
Your users don't always check if they have sufficient funds before starting a transfer, and account overdrafts are becoming common. The product team has tapped you to fix this by enriching each transfer with an approve or deny outcome based on the account balance of the sender.
To do this, you first need to track the sender's current account balance. Clearly an account balance is debited when it's used to sends funds. It's also credited when it receives funds.
But there's a catch: an account can only be credited for funds received from approved transfers! This implies you need a collection of transfer outcomes in order to derive your collection of transfer outcomes 🤯.
This is an example of a self-referential, recursive data-flow. You may have used tools which require that data flow in a Directed Acyclic Graph (DAG). Flow does not require that your data flows are acyclic, and it also supports a derivation that reads from itself, which lets you tackle this task:
- outcomes.flow.yaml
- debitSender.sql
- outcomes-test.flow.yaml
collections:
acmeBank/transfer-outcomes:
schema:
# Enrich transfer schema with outcome and the sender's balance.
$ref: transfers.schema.yaml
required: [outcome, sender_balance]
properties:
outcome:
description: Transfer was approved, or denied for insufficient funds.
enum: [approve, deny]
sender_balance: { type: number }
key: [/id]
derive:
using:
sqlite:
migrations:
- CREATE TABLE current_balances (
account TEXT PRIMARY KEY NOT NULL,
balance REAL NOT NULL
);
transforms:
- name: debitSender
source: acmeBank/transfers
# Shuffle on the sender, as we'll debit their balance.
shuffle: { key: [/sender] }
lambda: debitSender.sql
- name: creditRecipient
# When a transfer is approved, we've debited the sender but still need to
# credit the recipient. Read approved transfers from ourselves to do so.
source:
name: acmeBank/transfer-outcomes
partitions:
include:
outcome: [approve]
shuffle: { key: [/recipient] }
lambda:
INSERT INTO current_balances (account, balance) VALUES ($recipient, $amount)
ON CONFLICT DO UPDATE SET balance = balance + $amount;
# Partition output based on the transfer outcome.
projections:
outcome:
location: /outcome
partition: true
-- Debit the sender if they have an account with sufficient funds.
UPDATE current_balances
SET balance = balance - $amount
WHERE account = $sender AND balance >= $amount;
-- Publish the transfer enriched with outcome and sender balance.
-- Use SQLite's CHANGES() function to check if the prior UPDATE matched any rows.
-- Or, a special sweep account 'DEPOSIT' is always approved.
WITH t AS (SELECT $id, $sender, $recipient, $amount)
SELECT t.*,
CASE WHEN CHANGES() OR $sender = 'DEPOSIT'
THEN 'approve' ELSE 'deny' END AS outcome,
COALESCE(b.balance, 0) AS sender_balance
FROM t
LEFT OUTER JOIN current_balances b ON $sender = b.account;
tests:
acmeBank/tests/transfer-outcomes:
- ingest:
description: Initial deposits.
collection: acmeBank/transfers
documents:
- { id: 1, sender: DEPOSIT, recipient: Alice, amount: 20 }
- { id: 2, sender: DEPOSIT, recipient: Bob, amount: 20 }
- ingest:
description: Transfers between users.
collection: acmeBank/transfers
documents:
- { id: 3, sender: Alice, recipient: Bob, amount: 32.50 }
- { id: 4, sender: Bob, recipient: Carol, amount: 10.75 }
- verify:
description: Expect transfers were enriched with outcome and balance.
collection: acmeBank/transfer-outcomes
documents:
- {
id: 1,
sender: DEPOSIT,
recipient: Alice,
amount: 20,
outcome: approve,
}
- {
id: 2,
sender: DEPOSIT,
recipient: Bob,
amount: 20,
outcome: approve,
}
- {
id: 3,
sender: Alice,
recipient: Bob,
amount: 32.50,
outcome: deny,
}
- {
id: 4,
sender: Bob,
recipient: Carol,
amount: 10.75,
outcome: approve,
sender_balance: 9.25,
}
Current Account Balances​
This section introduces TypeScript derivations and reduction annotations.
Your product team is back, and they want a database table keyed by account that contains its up-to-date current balance.
As shown in the previous section, you could create
a task table which aggregates each account balance,
and then SELECT
the current balance after every transfer.
For most use cases, this is a great place to start.
For interest and variety, you'll solve this problem using TypeScript.
TypeScript derivations require a module
which you write.
You don't know how to write that module yet,
so first implement the derivation specification in balances.flow.yaml
.
Next run the flowctl generate
command, which generates two files:
- A module stub for you to fill out.
- A file of TypeScript interfaces which are used by your module.
- balances.flow.yaml
- Module Stub
- Interfaces
collections:
acmeBank/balances:
schema: balances.schema.yaml
key: [/user]
derive:
using:
typescript:
module: balances.ts
transforms:
- name: fromOutcomes
source:
name: acmeBank/transfer-outcomes
partitions:
include:
outcome: [approve]
shuffle: any
import { IDerivation, Document, SourceFromOutcomes } from 'flow/acmeBank/balances.ts';
// Implementation for derivation acmeBank/balances.
export class Derivation extends IDerivation {
fromOutcomes(_read: { doc: SourceFromOutcomes }): Document[] {
throw new Error("Not implemented"); // 👈 Your implementation goes here.
}
}
// Generated for published documents of derived collection acmeBank/balances.
export type Document = {
balance: number;
user: string;
};
// Generated for read documents of sourced collection acmeBank/transfer-outcomes.
export type SourceFromOutcomes = {
amount: number;
id: number;
outcome: /* Transfer was approved, or denied for insufficient funds. */ "approve" | "deny";
recipient: string;
sender: string;
sender_balance: number;
};
export abstract class IDerivation {
// Construct a new Derivation instance from a Request.Open message.
constructor(_open: { state: unknown }) { }
// flush awaits any remaining documents to be published and returns them.
// deno-lint-ignore require-await
async flush(): Promise<Document[]> {
return [];
}
// reset is called only when running catalog tests, and must reset any internal state.
async reset() { }
// startCommit is notified of a runtime commit in progress, and returns an optional
// connector state update to be committed.
startCommit(_startCommit: { runtimeCheckpoint: unknown }): { state?: { updated: unknown, mergePatch: boolean } } {
return {};
}
abstract fromOutcomes(read: { doc: SourceFromOutcomes }): Document[];
}
Next fill out the body of your TypeScript module and write a test:
- balances.ts
- balances-test.flow.yaml
import { IDerivation, Document, SourceFromOutcomes } from 'flow/acmeBank/balances.ts';
// Implementation for derivation acmeBank/balances.
export class Derivation extends IDerivation {
fromOutcomes(read: { doc: SourceFromOutcomes }): Document[] {
const doc = read.doc;
return [
// Debit the sender.
{ user: doc.sender, balance: -doc.amount },
// Credit the recipient.
{ user: doc.recipient, balance: doc.amount },
];
}
}
tests:
acmeBank/tests/balances:
- ingest:
collection: acmeBank/transfers
description: Initial deposits into user accounts.
documents:
- { id: 1, sender: DEPOSIT, recipient: alice, amount: 100 }
- { id: 2, sender: DEPOSIT, recipient: bob, amount: 100 }
- { id: 3, sender: DEPOSIT, recipient: carol, amount: 100 }
- ingest:
collection: acmeBank/transfers
description: Transfers between users, and a withdraw.
documents:
- { id: 4, sender: alice, recipient: bob, amount: 20 }
- { id: 5, sender: bob, recipient: carol, amount: 40 }
- { id: 6, sender: carol, recipient: alice, amount: 90 }
- { id: 7, sender: bob, recipient: DEPOSIT, amount: 40 }
- verify:
collection: acmeBank/balances
description: Verify expected balances.
documents:
- { user: DEPOSIT, balance: -260 }
- { user: alice, balance: 170 }
- { user: bob, balance: 40 }
- { user: carol, balance: 50 }
One piece is still missing. Your TypeScript module is publishing the change in account balance for each transfer. That's not the same thing as the current balance for each account.
You can ask Flow to sum up the balance changes into a current account balance
through reduction annotations.
Here's the balances schema, with reduce
annotations for summing the account balance:
type: object
required: [user, balance]
reduce: { strategy: merge }
properties:
user: { type: string }
balance:
type: number
reduce: { strategy: sum }
This section has more moving parts that the previous SQL-based examples. You might be wondering, why bother? Fair question! This is just an illustrative example, after all.
While they're more verbose, TypeScript derivations do have certain advantages:
- TypeScript derivations are strongly typed, and those checks often catch meaningful bugs and defects before they're deployed. Your derivation modules also play nicely with VSCode and other developer tooling.
- TypeScript derivations can use third-party libraries, as well as your native code compiled to WASM.
- TypeScript can be easier when working with nested or complex document structures.
Reduction annotations also have some benefits over task state (like SQLite tables):
- Internal task state is managed by Flow. If it grows to be large (say, you have a lot of accounts), then your task must be scaled and could require performance tuning. Reduction annotations, on the other hand, require no internal state and are extremely efficient.
- Certain aggregations, such as recursive merging of tree-like structures, are much simpler to express through reduction annotations vs implementing yourself.
[See "Where to Accumulate?" for more discussion]../../concepts/derivations.md(#where-to-accumulate).