Skip to main content

Collections

The documents of your Data Flows are stored in collections: real-time data lakes of JSON documents in cloud storage.

The data in a collection may be captured from an external system, or derived as a transformation of one or more other collections. When you create a new capture in a typical workflow, you define one or more new collections as part of that process. Materializations then read data from collections.

Every collection has a key and an associated schema that its documents must validate against.

Documents

Flow processes and stores data in terms of documents: JSON files that consist of multiple key-value pair objects. Collections are comprised of documents; Flow tasks (captures, materializations, and derivations) process data in terms of documents.

A Flow document corresponds to different units of data in different types of endpoint systems. For example, it might map to a table row, a pub/sub message, or an API response. The structure of a given collection’s documents is determined by that collection’s schema and the way in which tasks handle documents is determined by the collection key.

The size of a document depends on the complexity of the source data. Flow allows documents up to 16 MB in size, but it's rare for documents to approach this limit.

An example document for a collection with two fields, name and count is shown below.

{
"_meta": {
"uuid": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
"count": 5954,
"message": "Hello #5954"
}

System Fields and Metadata

The _meta object is present in all Flow documents, and contains metadata added by Flow. Minimally, every document _meta always has a uuid, which is a globally unique id for each document. Some capture connectors may add additional _meta properties to tie each document to a specific record within the source system. Documents that were captured from cloud storage connectors, for example, will contain /_meta/file and /_meta/offset properties that tell you where the document came from within your cloud storage bucket.

_meta/uuid

The _meta/uuid field is a system-generated globally unique identifier for each document within Estuary Flow.

flow_published_at

The flow_published_at field is a system-generated timestamp within Estuary Flow, derived from the runtime environment. It captures the exact moment a document is published to a collection, offering a reliable proxy for when the document was last modified or inserted.

  • Source: The flow_published_at field is generated by the runtime environment of Estuary Flow.
  • Definition: This field represents the timestamp when a document is captured and subsequently published to a collection. Essentially, it is a projection of the _meta/uuid field, where the UUID contains an encoded timestamp component.
  • Availability: The flow_published_at field is available in every collection, as it is a derived projection from the _meta/uuid field.

For a given document identified by a unique key, the flow_published_at field can be used as a proxy for the last time the document was modified. This is particularly useful when performing incremental updates or transformations, such as in a data warehouse environment.

When dealing with materializations that are not delta updates:

  • A document in Estuary Flow is any JSON object emitted by a capture connector. The flow_published_at field provides the timestamp for when this JSON object was captured and inserted into the collection.
  • If the collection is reduced with a strategy like lastWriteWins or merge on the capture side, flow_published_at becomes the timestamp for the last event that updated the document.

Viewing collection documents

In many cases, it's not necessary to view your collection data — you're able to materialize it directly to a destination in the correct shape using a connector.

However, it can be helpful to view collection documents to confirm the source data was captured as expected, or verify a schema change.

In the web application

Sign into the Flow web application and click the Collections tab. The collections to which you have access are listed. Click the Details drop down to show a sample of collection documents as well as the collection specification.

The collection documents are displayed by key. Click the desired key to preview it in its native JSON format.

Using the flowctl CLI

In your authenticated flowctl session, issue the command flowctl collections read --collection <full/collection-name> --uncommitted. For example, flowctl collections read --collection acmeCo/inventory/anvils --uncommitted.

Options are available to read a subset of data from collections. For example, --since allows you to specify an approximate start time from which to read data, and --include-partition allows you to read only data from a specified logical partition. Use flowctl collections read --help to see documentation for all options.

Beta

While in beta, this command currently has the following limitations. They will be removed in a later release:

  • The --uncommitted flag is required. This means that all collection documents are read, regardless of whether they were successfully committed or not. In the future, reads of committed documents will be the default.

  • Only reads of a single partition are supported. If you need to read from a partitioned collection, use --include-partition or --exclude-partition to narrow down to a single partition.

  • The --output flag is not usable for this command. Only JSON data can be read from collections.

Specification

Collections are defined in Flow specification files per the following format:

# A set of collections to include in the catalog.
# Optional, type: object
collections:
# The unique name of the collection.
acmeCo/products/anvils:

# The schema of the collection, against which collection documents
# are validated. This may be an inline definition or a relative URI
# reference.
# Required, type: string (relative URI form) or object (inline form)
schema: anvils.schema.yaml

# The key of the collection, specified as JSON pointers of one or more
# locations within collection documents. If multiple fields are given,
# they act as a composite key, equivalent to a SQL table PRIMARY KEY
# with multiple table columns.
# Required, type: array
key: [/product/id]

# Projections and logical partitions for this collection.
# Optional, type: object
projections:

# Derivation that builds this collection from others through transformations.
# See the "Derivations" concept page to learn more.
# Optional, type: object
derive: ~

Schemas

Every Flow collection must declare a schema, and will never accept documents that do not validate against the schema. This helps ensure the quality of your data products and the reliability of your derivations and materializations. Schema specifications are flexible: yours could be exactingly strict, extremely permissive, or somewhere in between. For many source types, Flow is able to generate a basic schema during discovery.

Schemas may either be declared inline, or provided as a reference to a file. References can also include JSON pointers as a URL fragment to name a specific schema of a larger schema document:

collections:
acmeCo/collection:
schema:
type: object
required: [id]
properties:
id: string
key: [/id]

Learn more about schemas

Keys

Every Flow collection must declare a key which is used to group its documents. Keys are specified as an array of JSON pointers to document locations. For example:

collections:
acmeCo/users:
schema: schema.yaml
key: [/userId]

Suppose the following JSON documents are captured into acmeCo/users:

{"userId": 1, "name": "Will"}
{"userId": 1, "name": "William"}
{"userId": 1, "name": "Will"}

As its key is [/userId], a materialization of the collection into a database table will reduce to a single row:

userId | name
1 | Will

If its key were instead [/name], there would be two rows in the table:

userId | name
1 | Will
1 | William

Schema restrictions

Keyed document locations may be of a limited set of allowed types:

  • boolean
  • integer
  • string

Excluded types are:

  • array
  • null
  • object
  • Fractional number

Keyed fields also must always exist in collection documents. Flow performs static inference of the collection schema to verify the existence and types of all keyed document locations, and will report an error if the location could not exist, or could exist with the wrong type.

Flow itself doesn't mind if a keyed location has multiple types, so long as they're each of the allowed types: an integer or string for example. Some materialization connectors, however, may impose further type restrictions as required by the endpoint. For example, SQL databases do not support multiple types for a primary key.

Composite Keys

A collection may have multiple locations which collectively form a composite key. This can include locations within nested objects and arrays:

collections:
acmeCo/compound-key:
schema: schema.yaml
key: [/foo/a, /foo/b, /foo/c/0, /foo/c/1]

Key behaviors

A collection key instructs Flow how documents of a collection are to be reduced, such as while being materialized to an endpoint. Flow also performs opportunistic local reductions over windows of documents to improve its performance and reduce the volumes of data at each processing stage.

An important subtlety is that the underlying storage of a collection will potentially retain many documents of a given key.

In the acmeCo/users example, each of the "Will" or "William" variants is likely represented in the collection's storage — so long as they didn't arrive so closely together that they were locally combined by Flow. If desired, a derivation could re-key the collection on [/userId, /name] to materialize the various /names seen for a /userId.

This property makes keys less lossy than they might otherwise appear, and it is generally good practice to choose a key that reflects how you wish to query a collection, rather than an exhaustive key that's certain to be unique for every document.

Empty keys

When a specification is automatically generated, there may not be an unambiguously correct key for all collections. This could occur, for example, when a SQL database doesn't have a primary key defined for some table.

In cases like this, the generated specification will contain an empty collection key. However, every collection must have a non-empty key, so you'll need to manually edit the generated specification and specify keys for those collections before publishing to the catalog.

Projections

Projections are named locations within a collection document that may be used for logical partitioning or directly exposed to databases into which collections are materialized.

Many projections are automatically inferred from the collection schema. The projections stanza can be used to provide additional projections, and to declare logical partitions:

collections:
acmeCo/products/anvils:
schema: anvils.schema.yaml
key: [/product/id]

# Projections and logical partitions for this collection.
# Keys name the unique projection field, and values are its JSON Pointer
# location within the document and configure logical partitioning.
# Optional, type: object
projections:
# Short form: define a field "product_id" with document pointer /product/id.
product_id: "/product/id"

# Long form: define a field "metal" with document pointer /metal_type
# which is a logical partition of the collection.
metal:
location: "/metal_type"
partition: true

Learn more about projections.

Storage

Collections are real-time data lakes. Historical documents of the collection are stored as an organized layout of regular JSON files in your cloud storage bucket. Reads of that history are served by directly reading files from your bucket.

Your storage mappings determine how Flow collections are mapped into your cloud storage buckets.

Unlike a traditional data lake, however, it's very efficient to read collection documents as they are written. Derivations and materializations that source from a collection are notified of its new documents within milliseconds of their being published.

Learn more about journals, which provide storage for collections