flow
Search…
Stateful transformations and testing
Use a stateful transformation to implement a reactive join and test your catalog

Introduction

This tutorial will introduce some more powerful tools for creating derivations. It builds on the output of the Transformations basics tutorial, so make sure you complete that one first.
As in the previous tutorial, we're working with data from a network of temperature sensors. At this point, you should already have a working catalog spec that creates two collections: temperature/readings and temperature/averageTemps.
We'll build on this by capturing data about the sensor locations. We'll join it to temperature/averageTemps to create a new derived collection that contains the average temperatures for each location.
Flow must keep track of various sensor locations and temperature readings in order to join appropriate records together as they appear. For that reason, the join is considered a stateful transformation. In contrast, the derivation in our previous tutorial was stateless; the reduction was applied equally to all records as they flowed through.
We'll finish our workflow by adding a few tests. Testing is always necessary to ensure that any data product is functioning properly. We'll create functional tests that check our data quality by comparing input fixtures to output documents.
Here's an outline of our workflow:
  • Create a new collection for sensor locations
  • Join the sensor locations with our existing collection of temperature statistics. This creates a derived collection showing average temperatures by location
    • Add the required schemas and lambdas for the join
  • Create and run tests to verify that everything works

Capturing sensor locations

Let's first create a collection for sensor locations called temperature/sensors, like we did for temperature/readings in the last tutorial. We'll define the collection in the catalog spec and add the corresponding schema to our schema file.
Copy the updates to both files below and save the files.
temp-tutorial.flow.yaml
schemas.yaml
1
# Paste the following under collections:
2
3
temperature/sensors:
4
schema: schemas.yaml#/$defs/tempSensor
5
# Each unique id represents a sensor that may produce readings
6
key: [/id]
Copied!
1
# Paste the following under $defs:
2
3
tempSensor:
4
description: "A sensor that produces temperature readings"
5
type: object
6
properties:
7
id:
8
type: integer
9
description: "The unique id of this sensor"
10
locationName:
11
type: string
12
description: "Human readable name of the sensor location"
13
examples: ["Behind the couch"]
14
location:
15
$ref: https://geojson.org/schema/Point.json
16
description: "The precise geographic location of the sensor"
17
required: [id]
Copied!

Joining with register states

We want to join this dataset to the temperature/averageTemps collection we created in the previous tutorial to enrich that collection with each sensor's location information. We'd like this join to be fully reactive, meaning that it won't matter whether you add the sensors before the readings or the other way around. You can think of our goal as creating an outer join: records from both collections will be returned, even if the join condition hasn't yet been met.
The join process is another example of a derivation, because it transforms data and produces a new collection. In order to work, it relies on the concept of a register. Registers are states that Flow constantly maintains so you can enrich collections when particular events happen. They are based on keys — in this case, the sensor ID.
  • If a sensor location arrives first, Flow keeps the location in the register and keys off its ID. When a reading occurs with the same key, Flow performs the join immediately.
  • If a reading arrives first, Flow keeps the statistics about that reading in the register and keys off the ID. When a location with the same ID arrives, Flow performs the join immediately.
Here's what it looks like. Add the following to your catalog spec and save the file:
temp-tutorial.flow.yaml
temp-tutorial.flow.yaml
1
#paste the below under collections:
2
3
temperature/averageByLocation:
4
schema: schemas.yaml#/$defs/avgTempsWithLocation
5
key: [/sensorId]
6
derivation:
7
register:
8
schema: schemas.yaml#/$defs/tempToLocationRegister
9
# All registers will start out with a default locationName. This allows us to publish
10
# aggregates based on readings that are received before the sensor information has been
11
# processed.
12
initial: { locationName: null }
13
14
transform:
15
avgTempLocationSensors:
16
source:
17
name: temperature/sensors
18
# Update the register when a new sensor location arrives.
19
update: { lambda: typescript }
20
# Update the collection if a new sensor arrived which had readings.
21
publish: { lambda: typescript }
22
avgTempLocationTemps:
23
source:
24
name: temperature/averageTemps
25
# Update the register when a new reading arrives.
26
update: { lambda: typescript }
27
# Update the collection when a new reading arrives.
28
publish: { lambda: typescript }
Copied!
Let's dive deeper into how this works.
A register represents a user-defined JSON document that is shared amongst all the individual transforms that make up a derivation. Each transform may update the register value using an update lambda, read the current and previous values, and then publish to the collection using a publish lambda.
As mentioned, this derivation is an outer join. When Flow reads a document from the sensors collection, it saves the location name in the register and keys off the sensor ID. Using that register, it later joins with the avgTempLocationTemps transformation process using the equivalent of the SQL statement tempSensor.Id = tempReadings.sensorId. This gives the transform access to the locationName in its publish lambda. Thus the two collections are joined and Flow publishes a document that includes data from both collections. This works regardless of which event happens first.
Though an outer join is appropriate for this example, the types of join you use for different workflows will vary. You can find more examples here.
Let's take a closer look at how the registers are updated for each temperature reading. Every time a new value appears in one of the source collections, the appropriate update lambda reduces it into the previous register value. The lambdas use the default reduction behavior, last write wins, meaning the most recent data is retained.
To understand this, we'll look at the register's JSON schema. Paste the following into your schemas.yaml file and save it. Note that the schema for the derived collection we're creating is also included.
schemas.yaml
1
avgTempsWithLocation:
2
description: "Average temperature with location added"
3
type: object
4
$ref: schemas.yaml#/$defs/tempToLocationRegister
5
properties:
6
sensorId: { type: integer }
7
# locationName may be null if we've received readings before the corresponding sensor
8
# documents. That's OK because we'll later update the locationName once it becomes known.
9
required: [sensorId]
10
11
tempToLocationRegister:
12
type: object
13
properties:
14
numReadings:
15
type: integer
16
totalC:
17
type: number
18
minTempC:
19
type: number
20
maxTempC:
21
type: number
22
avgC:
23
type: number
24
lastReading:
25
type: string
26
format: timestamp
27
description: "Timestamp of the most recent reading for this named location"
28
# Since our timestamps are in RFC 3339 format, the lexicographic comparison done by
29
# maximize will pick the most recent time.
30
locationName: { type: [string, "null"] }
31
location:
32
$ref: https://geojson.org/schema/Point.json
33
description: "The precise geographic location of the sensor"
34
reduce: { strategy: merge }
Copied!
Update lambdas will update location values when a new piece of information comes from the temperature/sensors collection, and update everything else when information arrives to our temperature/averageTemps collection. The return values from these functions are reduced into the current register value using the reduce annotations in the schema.
Normally, you'd run flowctl test --source temp-tutorial.flow.yaml to stub out a typescript file, and you'd simply fill in function definitions. Since you already have a typescript lambda from the previous tutorial, just copy and paste the following into that same file,temp-tutorial.flow.ts.
temp-tutorial.flow.ts
1
2
export class TemperatureAverageByLocation implements interfaces.TemperatureAverageByLocation {
3
avgTempLocationSensorsUpdate(
4
source: collections.TemperatureSensors,
5
): registers.TemperatureAverageByLocation[] {
6
// Update the register when a new location arrives.
7
return [{ locationName: source.locationName }];
8
}
9
avgTempLocationSensorsPublish(
10
source: collections.TemperatureSensors,
11
register: registers.TemperatureAverageByLocation,
12
_previous: registers.TemperatureAverageByLocation,
13
): collections.TemperatureAverageByLocation[] {
14
// If we have a reading for a new location, update the collection. Else, don't update it
15
// but keep it around in the register for when a reading arrives.
16
if (register.numReadings) {
17
var avg = Math.round(register.totalC! / register.numReadings! * 100) / 100.0;
18
return [{
19
sensorId: source.id,
20
numReadings: register.numReadings,
21
avgC: avg,
22
totalC: register.totalC,
23
minTempC: register.minTempC,
24
maxTempC: register.maxTempC,
25
lastReading: register.lastReading,
26
locationName: source.locationName
27
}];
28
} else {
29
return []
30
}
31
}
32
avgTempLocationTempsUpdate(
33
source: collections.TemperatureAverageTemps,
34
): registers.TemperatureAverageByLocation[] {
35
// Update the register with stats when a new reading comes in. This can be used later
36
// if a location arrives in for this sensor to ensure a fully reactive join.
37
return [{
38
numReadings: source.numReadings,
39
totalC: source.totalC,
40
minTempC: source.minTempC,
41
maxTempC: source.maxTempC,
42
lastReading: source.lastReading,
43
}];
44
}
45
avgTempLocationTempsPublish(
46
source: collections.TemperatureAverageTemps,
47
register: registers.TemperatureAverageByLocation,
48
_previous: registers.TemperatureAverageByLocation,
49
): collections.TemperatureAverageByLocation[] {
50
var avg = Math.round(source.totalC! / source.numReadings! * 100) / 100.0;
51
// Always update the collection when a new reading comes in.
52
return [{
53
sensorId: source.sensorId,
54
numReadings: source.numReadings,
55
avgC: avg,
56
totalC: source.totalC,
57
minTempC: source.minTempC,
58
maxTempC: source.maxTempC,
59
lastReading: source.lastReading,
60
locationName: register.locationName
61
}];
62
}
63
}
Copied!
Let's walk through what this mapper is doing. It contains four lambdas.
  • Sensors update updates the register with its location value when a new sensor location arrives.
  • Sensor publish publishes combined data (including the register's statistical data) to the collection if there have already been readings for that sensor.
  • Temps update updates the register with statistical data when a new reading is taken.
  • Temps publish publishes combined data (including the register's location if one exists) to the collection whenever a new reading arrives.
As you can see, unlike joins in SQL, real-time joins need to consider the time domain. However, it's possible to model these similarly to SQL joins by simply deciding what the register stores, when it's updated, and what you publish to collections. Registers effectively expose the inner workings of joins to you, which provides a lot of power.

Testing

In the previous tutorial, we performed a materialization to a database. In this tutorial, we're going to take advantage of Flow's built-in testing to ensure everything is working as intended.
Flow allows you to define ingestions and expected results as contract tests. This means that as catalogs evolve, you can be confident that their data pipelines are working as intended. And because the data is defined in the test itself, we still don't need to capture data from real sensors to use our catalog in this tutorial.
Add the following to the end of your catalog spec file:
temp-tutorial.flow.yaml
1
tests:
2
"test average temperature, then by location":
3
- ingest: &readings
4
collection: temperature/readings
5
documents:
6
- { sensorId: 1, timestamp: "2020-08-26T06:30:31Z", tempC: 18.9 }
7
- { sensorId: 1, timestamp: "2020-08-26T11:39:57Z", tempC: 21.1 }
8
- { sensorId: 2, timestamp: "2020-08-26T13:32:44Z", tempC: 23.2 }
9
- { sensorId: 1, timestamp: "2020-08-26T17:19:00Z", tempC: 21.0 }
10
- { sensorId: 2, timestamp: "2020-08-26T19:26:53Z", tempC: 20.9 }
11
12
- verify: &expected
13
collection: temperature/averageTemps
14
documents:
15
- sensorId: 1
16
numReadings: 3
17
totalC: 61
18
minTempC: 18.9
19
maxTempC: 21.1
20
lastReading: "2020-08-26T17:19:00Z"
21
- sensorId: 2
22
numReadings: 2
23
totalC: 44.1
24
minTempC: 20.9
25
maxTempC: 23.2
26
lastReading: "2020-08-26T19:26:53Z"
Copied!
This follows the basic structure of testing in Flow. It allows us to ingest some temperature readings and verify that the temperature/averageTemps collection produces expected and valid results.
Run the test by running the following command in the terminal:
flowctl test --source temp-tutorial.flow.yaml
The terminal output should tell you that the test passed.
Now let's test our fully reactive join to make sure it's working as expected. We should be able to ingest documents to either our temperature/readings or temperature/sensors collection first and end up with the same result. Add this to the end of your catalog spec and save the file:
temp-tutorial.flow.yaml
1
# Paste the following under tests
2
3
"test ingestion of sensors, then readings":
4
- ingest: &sensors
5
collection: temperature/sensors
6
documents:
7
- { id: 1, locationName: "Office" }
8
- { id: 2, locationName: "Workshop" }
9
- ingest: *readings
10
11
- verify: &expected1
12
collection: temperature/averageByLocation
13
documents:
14
- sensorId: 1
15
locationName: "Office"
16
numReadings: 3
17
totalC: 61
18
avgC: 20.33
19
minTempC: 18.9
20
maxTempC: 21.1
21
lastReading: "2020-08-26T17:19:00Z"
22
- sensorId: 2
23
locationName: "Workshop"
24
numReadings: 2
25
totalC: 44.1
26
avgC: 22.05
27
minTempC: 20.9
28
maxTempC: 23.2
29
lastReading: "2020-08-26T19:26:53Z"
30
31
"test readings produced before sensor info":
32
- ingest: *readings
33
# Expect that the same results are produced, only without the location name
34
- verify:
35
collection: temperature/averageByLocation
36
documents:
37
- sensorId: 1
38
locationName: null
39
numReadings: 3
40
totalC: 61
41
minTempC: 18.9
42
maxTempC: 21.1
43
avgC: 20.33
44
lastReading: "2020-08-26T17:19:00Z"
45
- sensorId: 2
46
locationName: null
47
numReadings: 2
48
avgC: 22.05
49
totalC: 44.1
50
minTempC: 20.9
51
maxTempC: 23.2
52
lastReading: "2020-08-26T19:26:53Z"
53
- ingest: *sensors
54
# Expect that the locationNames have been added
55
- verify: *expected1
56
Copied!
As you can see, this section first ingests temperature/sensors, then temperature/readings, and verifies that temperature/averageByLocation contains location and statistical data. It then ingests only temperature/readings and verifies that temperature/averageByLocation has statistical data but null location names. Finally, it ingests temperature/sensors and verifies that location information was added to the collection.
Run the command again to give it a try:
flowctl test --source temp-tutorial.flow.yaml
The terminal output should indicate that all three tests passed.
Derivations and reduction annotations are powerful tools, which together can handle virtually any transformation use case. Testing is one of Flow's most important features and can help you guarantee that the data products you create always meet business requirements.

Learn more

Last modified 3mo ago