Skip to main content

PostgreSQL

This connector uses change data capture (CDC) to continuously capture updates in a PostgreSQL database into one or more Flow collections.

It is available for use in the Flow web application. For local development or open-source workflows, ghcr.io/estuary/source-postgres:dev provides the latest version of the connector as a Docker image. You can also follow the link in your browser to see past image versions.

Prerequisites

This connector supports PostgreSQL versions 10.0 and later.

You'll need a PostgreSQL database setup with the following:

  • Logical replication enabledwal_level=logical
  • User role with REPLICATION attribute
  • A replication slot. This represents a “cursor” into the PostgreSQL write-ahead log from which change events can be read.
    • Optional; if none exist, one will be created by the connector.
    • If you wish to run multiple captures from the same database, each must have its own slot. You can create these slots yourself, or by specifying a name other than the default in the advanced configuration.
  • A publication. This represents the set of tables for which change events will be reported.
    • In more restricted setups, this must be created manually, but can be created automatically if the connector has suitable permissions.
  • A watermarks table. The watermarks table is a small “scratch space” to which the connector occasionally writes a small amount of data to ensure accuracy when backfilling preexisting table contents.
    • In more restricted setups, this must be created manually, but can be created automatically if the connector has suitable permissions.

Setup

info

These setup instructions are PostgreSQL instances you manage yourself. If you use a cloud-based managed service for your database, see below.

The simplest way to meet the above prerequisites is to change the WAL level and have the connector use a database superuser role.

For a more restricted setup, create a new user with just the required permissions as detailed in the following steps:

  1. Connect to your instance and create a new user and password:
CREATE USER flow_capture WITH PASSWORD 'secret' REPLICATION;
  1. Assign the appropriate role.

    1. If using PostgreSQL v14 or later:
    GRANT pg_read_all_data TO flow_capture;
    1. If using an earlier version:
    ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES to flow_capture;
    GRANT SELECT ON ALL TABLES IN SCHEMA public, <others> TO flow_capture;
    GRANT SELECT ON ALL TABLES IN SCHEMA information_schema, pg_catalog TO flow_capture;

    where <others> lists all schemas that will be captured from.

    info

    If an even more restricted set of permissions is desired, you can also grant SELECT on just the specific table(s) which should be captured from. The ‘information_schema’ and ‘pg_catalog’ access is required for stream auto-discovery, but not for capturing already configured streams.

  2. Create the watermarks table, grant privileges, and create publication:

CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT);
GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture;
CREATE PUBLICATION flow_publication FOR ALL TABLES;
  1. Set WAL level to logical:
ALTER SYSTEM SET wal_level = logical;
  1. Restart PostgreSQL to allow the WAL level change to take effect.

Backfills and performance considerations

When the a PostgreSQL capture is initiated, by default, the connector first backfills, or captures the targeted tables in their current state. It then transitions to capturing change events on an ongoing basis.

This is desirable in most cases, as in ensures that a complete view of your tables is captured into Flow. However, you may find it appropriate to skip the backfill, especially for extremely large tables.

In this case, you may turn of backfilling on a per-table basis. See properties for details.

Configuration

You configure connectors either in the Flow web app, or by directly editing the catalog specification file. See connectors to learn more about using connectors. The values and specification sample below provide configuration details specific to the PostgreSQL source connector.

Properties

Endpoint

PropertyTitleDescriptionTypeRequired/Default
/addressAddressThe host or host:port at which the database can be reached.stringRequired
/databaseDatabaseLogical database name to capture from.stringRequired, "postgres"
/userUserThe database user to authenticate as.stringRequired, "flow_capture"
/passwordPasswordPassword for the specified database user.stringRequired
/advancedAdvanced OptionsOptions for advanced users. You should not typically need to modify these.object
/advanced/backfill_chunk_sizeBackfill Chunk SizeThe number of rows which should be fetched from the database in a single backfill query.integer4096
/advanced/publicationNamePublication NameThe name of the PostgreSQL publication to replicate from.string"flow_publication"
/advanced/skip_backfillsSkip BackfillsA comma-separated list of fully-qualified table names which should not be backfilled.string
/advanced/slotNameSlot NameThe name of the PostgreSQL replication slot to replicate from.string"flow_slot"
/advanced/watermarksTableWatermarks TableThe name of the table used for watermark writes during backfills. Must be fully-qualified in '<schema>.<table>' form.string"public.flow_watermarks"

Bindings

PropertyTitleDescriptionTypeRequired/Default
/namespaceNamespaceThe namespace/schema of the table.stringRequired
/streamStreamTable name.stringRequired
/syncModeSync modeConnection method. Always set to incremental.stringRequired

Sample

A minimal capture definition will look like the following:

captures:
${PREFIX}/${CAPTURE_NAME}:
endpoint:
connector:
image: "ghcr.io/estuary/source-postgres:dev"
config:
address: "localhost:5432"
database: "postgres"
user: "flow_capture"
password: "secret"
bindings:
- resource:
stream: ${TABLE_NAME}
namespace: ${TABLE_NAMESPACE}
syncMode: incremental
target: ${PREFIX}/${COLLECTION_NAME}

Your capture definition will likely be more complex, with additional bindings for each table in the source database.

Learn more about capture definitions..

PostgreSQL on managed cloud platforms

In addition to standard PostgreSQL, this connector supports cloud-based PostgreSQL instances on certain platforms.

Amazon RDS

You can use this connector for PostgreSQL instances on Amazon RDS using the following setup instructions. For Amazon Aurora, see below.

Setup

  1. Allow connections to the database from the Estuary Flow IP address.

    1. Modify the database, setting Public accessibility to Yes.

    2. Edit the VPC security group associated with your database, or create a new VPC security group and associate it with the database. Refer to the steps in the Amazon documentation. Create a new inbound rule and a new outbound rule that allow all traffic from the IP address 34.121.207.128.

    info

    Alternatively, you can allow secure connections via SSH tunneling. To do so:

  2. Enable logical replication on your RDS PostgreSQL instance.

    1. Create a parameter group. Create a unique name and description and set the following properties:

      • Family: postgres13
      • Type: DB Parameter group
    2. Modify the new parameter group and set rds.logical_replication=1.

    3. Associate the parameter group with the database.

    4. Reboot the database to allow the new parameter group to take effect.

  3. In the PostgreSQL client, connect to your instance and run the following commands to create a new user for the capture with appropriate permissions, and set up the watermarks table and publication.

    CREATE USER flow_capture WITH PASSWORD 'secret';
    GRANT rds_replication TO flow_capture;
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO flow_capture;
    ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO flow_capture;
    CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT);
    GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture;
    CREATE PUBLICATION flow_publication FOR ALL TABLES;
  4. In the RDS console, note the instance's Endpoint and Port. You'll need these for the address property when you configure the connector.

Amazon Aurora

You can use this connector for PostgreSQL-compatible Amazon Aurora instances using the following setup instructions.

You must apply some of the settings to the entire Aurora DB cluster, and others to a database instance within the cluster (typically, you'll want to use a replica, or reader instance). For each step, take note of which entity you're working with.

Setup

  1. Allow connections to the DB instance from the Estuary Flow IP address.

    1. Modify the instance, choosing Publicly accessible in the Connectivity settings.

    2. Edit the VPC security group associated with your instance, or create a new VPC security group and associate it with the instance. Refer to the steps in the Amazon documentation. Create a new inbound rule and a new outbound rule that allow all traffic from the IP address 34.121.207.128.

    info

    Alternatively, you can allow secure connections via SSH tunneling. To do so:

  2. Enable logical replication on your Aurora DB cluster.

    1. Create a parameter group. Create a unique name and description and set the following properties:

      • Family: aurora-postgresql13, or substitute the version of Aurora PostgreSQL used for your cluster.
      • Type: DB Cluster Parameter group
    2. Modify the new parameter group and set rds.logical_replication=1.

    3. Associate the parameter group with the DB cluster.

    4. Reboot the cluster to allow the new parameter group to take effect.

  3. In the PostgreSQL client, connect to your instance and run the following commands to create a new user for the capture with appropriate permissions, and set up the watermarks table and publication.

    CREATE USER flow_capture WITH PASSWORD 'secret';
    GRANT rds_replication TO flow_capture;
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO flow_capture;
    ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO flow_capture;
    CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT);
    GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture;
    CREATE PUBLICATION flow_publication FOR ALL TABLES;
  4. In the RDS console, note the instance's Endpoint and Port. You'll need these for the address property when you configure the connector.

Google Cloud SQL

You can use this connector for PostgreSQL instances on Google Cloud SQL using the following setup instructions.

Setup

  1. Allow connections to the database from the Estuary Flow IP address.

    1. Enable public IP on your database and add 34.121.207.128 as an authorized IP address.
    info

    Alternatively, you can allow secure connections via SSH tunneling. To do so:

  2. Set the cloudsql.logical_decoding flag to on to enable logical replication on your Cloud SQL PostgreSQL instance.

  3. In your PostgreSQL client, connect to your instance and issue the following commands to create a new user for the capture with appropriate permissions, and set up the watermarks table and publication.

    CREATE USER flow_capture WITH REPLICATION
    IN ROLE cloudsqlsuperuser LOGIN PASSWORD 'secret';
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO flow_capture;
    ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO flow_capture;
    CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT);
    GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture;
    CREATE PUBLICATION flow_publication FOR ALL TABLES;
  4. In the Cloud Console, note the instance's host under Public IP Address. Its port will always be 5432. Together, you'll use the host:port as the address property when you configure the connector.

Azure Database for PostgreSQL

You can use this connector for instances on Azure Database for PostgreSQL using the following setup instructions.

Setup

  1. Allow connections to the database from the Estuary Flow IP address.

    1. Create a new firewall rule that grants access to the IP address 34.121.207.128.
    info

    Alternatively, you can allow secure connections via SSH tunneling. To do so:

  2. In your Azure PostgreSQL instance's support parameters, set replication to logical to enable logical replication.

  3. In the PostgreSQL client, connect to your instance and run the following commands to create a new user for the capture with appropriate permissions.

CREATE USER flow_capture WITH PASSWORD 'secret' REPLICATION;
  • If using PostgreSQL v14 or later:
GRANT pg_read_all_data TO flow_capture;
  • If using an earlier version:

    ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES to flow_capture;
    GRANT SELECT ON ALL TABLES IN SCHEMA public, <others> TO flow_capture;
    GRANT SELECT ON ALL TABLES IN SCHEMA information_schema, pg_catalog TO flow_capture;

    where <others> lists all schemas that will be captured from.

    info

    If an even more restricted set of permissions is desired, you can also grant SELECT on just the specific table(s) which should be captured from. The ‘information_schema’ and ‘pg_catalog’ access is required for stream auto-discovery, but not for capturing already configured streams.

  1. Set up the watermarks table and publication.
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES to flow_capture;
GRANT SELECT ON ALL TABLES IN SCHEMA public, <others> TO flow_capture;
GRANT SELECT ON information_schema.columns, information_schema.tables, pg_catalog.pg_attribute, pg_catalog.pg_class, pg_catalog.pg_index, pg_catalog.pg_namespace TO flow_capture;
CREATE TABLE IF NOT EXISTS public.flow_watermarks (slot TEXT PRIMARY KEY, watermark TEXT);
GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture;
CREATE PUBLICATION flow_publication FOR TABLE schema.table1, schema.table2;
  1. Note the following important items for configuration:

    • Find the instance's host under Server Name, and the port under Connection Strings (usually 5432). Together, you'll use the host:port as the address property when you configure the connector.
    • Format user as username@databasename; for example, flow_capture@myazuredb.

TOASTed values

PostgreSQL has a hard page size limit, usually 8 KB, for performance reasons. If your tables contain values that exceed the limit, those values can't be stored directly. PostgreSQL uses TOAST (The Oversized-Attribute Storage Technique) to store them separately.

TOASTed values can sometimes present a challenge for systems that rely on the PostgreSQL write-ahead log (WAL), like this connector. If a change event occurs on a row that contains a TOASTed value, but the TOASTed value itself is unchanged, it is omitted from the WAL. As a result, the connector emits a row update with the a value omitted, which might cause unexpected results in downstream catalog tasks if adjustments are not made.

The PostgreSQL connector handles TOASTed values for you when you follow the standard discovery workflow or use the Flow UI to create your capture. It uses merge reductions to fill in the previous known TOASTed value in cases when that value is omitted from a row update.

However, due to the event-driven nature of certain tasks in Flow, it's still possible to see unexpected results in your data flow, specifically:

  • When you materialize the captured data to another system using a connector that requires delta updates
  • When you perform a derivation that uses TOASTed values

Troubleshooting

If you encounter an issue that you suspect is due to TOASTed values, try the following: