Flow documents and collections always have an associated schema that defines the structure, representation, and constraints of your documents. Collections must have one schema, but may have two distinct schemas: one for when documents are added to the collection, and one for when documents are read from that collection.
Schemas are a powerful tool for data quality. Flow verifies every document against its schema whenever it's read or written, which provides a strong guarantee that your collections hold only "clean" data, and that bugs and invalid documents are caught before they can impact downstream data products.
In most cases, Flow generates a functioning schema on your behalf during the discovery phase of capture. In advanced use cases, however, customizing your schema becomes more important.
JSON Schema
JSON Schema is an expressive open standard for defining the schema and structure of documents. Flow uses it for all schemas defined in Flow specifications.
JSON Schema goes well beyond basic type information and can model tagged unions, recursion, and other complex, real-world composite types. Schemas can also define rich data validations like minimum and maximum values, regular expressions, dates, timestamps, email addresses, and other formats.
Together, these features let schemas represent structure as well as expectations and constraints that are evaluated and must hold true for every collection document before it’s added to the collection. They’re a powerful tool for ensuring end-to-end data quality: for catching data errors and mistakes early, before they can impact your production data products.
When capturing data from an external system, Flow can usually generate suitable JSON schemas on your behalf.
Learn more about using connectors
For systems like relational databases, Flow will typically generate a complete JSON schema by introspecting the table definition.
For systems that store unstructured data, Flow will typically generate a very minimal schema, and will rely on schema inference to fill in the details. See continuous schema inference for more information.
You must only provide Flow a model of a given dataset one time, as a JSON schema. Having done that, Flow leverages static inference over your schemas to perform many build-time validations of your catalog entities, helping you catch potential problems early.
Schema inference is also used to provide translations into other schema flavors:
- Most projections of a collection are automatically inferred from its schema. Materializations use your projections to create appropriate representations in your endpoint system. A SQL connector will create table definitions with appropriate columns, types, and constraints.
- Flow generates TypeScript definitions from schemas to provide compile-time type checks of user lambda functions. These checks are immensely helpful for surfacing mismatched expectations around, for example, whether a field could ever be null or is misspelt — which, if not caught, might otherwise fail at runtime.
The JSON Schema standard introduces the concept of
which are keywords that attach metadata to a location within a validated JSON document.
For example, title
and description
can be used to annotate a schema with its meaning:
title: My Field
description: A description of myField
Flow extends JSON Schema with additional annotation keywords,
which provide Flow with further instruction for how documents should be processed.
In particular, the reduce
and default
help you define merge behaviors and avoid null values at your destination systems, respectively.
What’s especially powerful about annotations is that they respond to conditionals within the schema. Consider a schema validating a positive or negative number:
type: number
- exclusiveMinimum: 0
description: A positive number.
- exclusiveMaximum: 0
description: A negative number.
- const: 0
description: Zero.
Here, the activated description
of this schema location depends
on whether the integer is positive, negative, or zero.
Writing schemas
Your schema can be quite permissive or as strict as you wish. There are a few things to know, however.
The top-level type must be
. Flow adds a bit of metadata to each of your documents under the_meta
property, which can only be done with a top-level object. -
Any fields that are part of the collection's
must provably exist in any document that validates against the schema. Put another way, every document within a collection must include all of the fields of the collection's key, and the schema must guarantee that.
For example, the following collection schema would be invalid because
the id
field, which is used as its key, is not required
so it might not actually exist in all documents:
type: object
required: [value]
id: {type: integer}
value: {type: string}
key: [/id]
To fix the above schema, change required
to [id, value]
Learn more of how schemas can be expressed within collections.
JSON schema has a $ref
keyword which is used to reference a schema stored elsewhere.
Flow resolves $ref
as a relative URL of the current file,
and also supports
JSON fragment pointers
for referencing a specific schema within a larger schema document,
such as ../my/widget.schema.yaml#/path/to/schema
It's recommended to use references in order to organize your schemas for reuse.
can also be used in combination with other schema keywords
to further refine a base schema.
Here's an example that uses references to organize and
further tighten the constraints of a reused base schema:
- flow.yaml
- schemas.yaml
key: [/id]
schema: schemas.yaml#/definitions/coordinate
key: [/id]
schema: schemas.yaml#/definitions/integer-coordinate
key: [/id]
# Compose a restriction that `x` & `y` must be positive.
$ref: schemas.yaml#/definitions/coordinate
x: {exclusiveMinimum: 0}
y: {exclusiveMinimum: 0}
type: object
required: [id, x, y]
id: {type: string}
description: The X axis value of the coordinate.
type: number
description: The Y axis value of the coordinate.
type: number
$ref: "#/definitions/coordinate"
# Compose a restriction that `x` & `y` cannot be fractional.
x: {type: integer}
y: {type: integer}
You can write your JSON schemas as either YAML or JSON across any number of files, all referenced from Flow catalog files or other schemas.
Schema references are always resolved as URLs relative to the current file, but you can also use absolute URLs to a third-party schema like
Write and read schemas
In some cases, you may want to impose different constraints to data that is being added (written) to the collection and data that is exiting (read from) the collection.
For example, you may need to start capturing data now from a source system; say, a pub-sub system with short-lived historical data support or an HTTP endpoint, but don't know or don't control the endpoint's schema. You can capture the data with a permissive write schema, and impose a stricter read schema on the data as you need to perform a derivation or materialization. You can safely experiment with the read schema at your convenience, knowing the data has already been captured.
To achieve this, edit the collection, re-naming the standard schema
to writeSchema
and adding a readSchema
Make sure that the field used as the collection key is defined in both schemas.
You can either perform this manually, or use Flow's Schema Inference tool to infer a read schema. Schema Inference is available in the web app when you edit a capture or materialization and create a materialization.
Before separating your write and read schemas, have the following in mind:
The write schema comes from the capture connector that produced the collection and shouldn't be modified. Always apply your schema changes to the read schema.
Separate read and write schemas are typically useful for collections that come from a source system with a flat or loosely defined data structure, such as cloud storage or pub-sub systems. Collections sourced from databases and most SaaS systems come with an explicitly defined data structure and shouldn't need a different read schema.
If you're using standard projections, you must only define them in the read schema. However, if your projections are logical partitions, you must define them in both schemas.
Here's a simple example in which you don't know how purchase prices are formatted when capturing them,
but find out later that number
is the appropriate data type:
type: object
title: Store price as strings
description: Not sure if prices are formatted as numbers or strings.
id: { type: integer}
price: {type: [string, number]}
type: object
title: Prices as numbers
id: { type: integer}
price: {type: number}
key: [/id]
Flow collections have keys, and multiple documents may be added to collections that share a common key. When this happens, Flow will opportunistically merge all such documents into a single representative document for that key through a process known as reduction.
Flow's default is simply to retain the most recent document of a given key,
which is often the behavior that you're after.
Schema reduce
annotations allow for far more powerful behaviors.
The Flow runtime performs reductions frequently and continuously to reduce the overall movement and cost of data transfer and storage. A torrent of input collection documents can often become a trickle of reduced updates that must be stored or materialized into your endpoints.
Flow never delays processing in order to batch or combine more documents, as some systems do (commonly known as micro-batches, or time-based polling). Every document is processed as quickly as possible, from end to end.
Instead Flow uses optimistic transaction pipelining to do as much useful work as possible, while it awaits the commit of a previous transaction. This natural back-pressure affords plenty of opportunity for data reductions while minimizing latency.
Reduction behaviors are defined by reduce
JSON schema annotations
within your document schemas.
These annotations provide Flow with the specific reduction strategies
to use at your various document locations.
If you're familiar with the map and reduce primitives present in
Python, Javascript, and many other languages, this should feel familiar.
When multiple documents map into a collection with a common key,
Flow reduces them on your behalf by using your reduce
Here's an example that sums an integer:
type: integer
reduce: { strategy: sum }
# 1, 2, -1 => 2
Or deeply merges a map:
type: object
reduce: { strategy: merge }
# {"a": "b"}, {"c": "d"} => {"a": "b", "c": "d"}
Learn more in the reduction strategies reference documentation.
Reductions and collection keys
Reduction annotations change the common patterns for how you think about collection keys.
Suppose you are building a reporting fact table over events of your business. Today you would commonly consider a unique event ID to be its natural key. You would load all events into your warehouse and perform query-time aggregation. When that becomes too slow, you periodically refresh materialized views for fast-but-stale queries.
With Flow, you instead use a collection key of your fact table dimensions,
and use reduce
annotations to define your metric aggregations.
A materialization of the collection then maintains a
database table which is keyed on your dimensions,
so that queries are both fast and up to date.
Composition with conditionals
Like any other JSON Schema annotation,
annotations respond to schema conditionals.
Here we compose append
and lastWriteWins
strategies to
reduce an appended array which can also be cleared:
type: array
# If the array is non-empty, reduce by appending its items.
- minItems: 1
reduce: { strategy: append }
# Otherwise, if the array is empty, reset the reduced array to be empty.
- maxItems: 0
reduce: { strategy: lastWriteWins }
# [1, 2], [3, 4, 5] => [1, 2, 3, 4, 5]
# [1, 2], [], [3, 4, 5] => [3, 4, 5]
# [1, 2], [3, 4, 5], [] => []
You can combine schema conditionals with annotations to build rich behaviors.
Continuous schema inference
Flow automatically infers a JSON schema for every captured collection. This schema is updated automatically as data is captured.
For some systems, like relational databases, Flow is able to determine a complete JSON schema for each collection up front, before even starting the capture. But many other systems are not able to provide detailed and accurate information about the data before it's captured. Often, this is because the source system data is unstructured or loosely structured. For these systems, the schema can only be known after the data is captured. Continuous schema inference is most useful in these scenarios.
For example, say you're capturing from MongoDB. MongoDB documents must all have an _id
field, but that is essentially the only requirement. You can't know what other fields may exist on MongoDB documents until you've read them. When you set up a capture from MongoDB using the Flow web app, the collection specifications will look something like this:
key: [ /_id ]
type: object
_id: { type: string }
required: [ _id ]
- $ref: flow://write-schema
- $ref: flow://inferred-schema
Note that this spec uses separate read and write schemas. The writeSchema
is extremely permissive, and only requires an _id
property with a string value. The readSchema
references flow://inferred-schema
, which expands to the current inferred schema when the collection is published.
Note that $ref: flow://write-schema
expands to the current writeSchema
. Whenever you use $ref: flow://inferred-schema
, you should always include the flow://write-schema
as well, so that you don't need to repeat any fields that are defined in the writeSchema
or wait for those fields to be observed by schema inference.
When you first publish a collection using the inferred schema, flow://inferred-schema
expands to a special placeholder schema that rejects all documents. This is to ensure that a non-placeholder inferred schema has been published before allowing any documents to be materialized. Once data is captured to the collection, the inferred schema immediately updates to strictly and minimally describe the captured.
Because the effective readSchema
is only ever updated when the collection is published, the best option is usually to use the inferred schema in conjunction with autoDiscover.
You can use default
annotations to prevent null values from being materialized to your endpoint system.
When this annotation is absent for a non-required field, missing values in that field are materialized as null
When the annotation is present, missing values are materialized with the field's default
type: object
required: [id]
id: {type: integer}
anvils_dropped: {type: integer}
reduce: {strategy: sum }
default: 0
key: [/id]
annotations are only used for materializations; they're ignored by captures and derivations.
If your collection has both a write and read schema, make sure you add this annotation to the read schema.