MySQL
This is a change data capture (CDC) connector that captures change events from a MySQL database via the Binary Log.
It is available for use in the Flow web application. For local development or open-source workflows, ghcr.io/estuary/source-mysql:dev
provides the latest version of the connector as a Docker image. You can also follow the link in your browser to see past image versions.
Supported platforms
This connector supports MySQL on major cloud providers, as well as self-hosted instances.
Setup instructions are provided for the following platforms:
Prerequisites
To use this connector, you'll need a MySQL database setup with the following.
binlog_format
system variable set toROW
(the default value).- Binary log expiration period set to MySQL's default value of 30 days (2592000 seconds) if at all possible.
- This value may be set lower if necessary, but we strongly discourage going below 7 days as this may increase the likelihood of unrecoverable failures.
- A watermarks table. The watermarks table is a small "scratch space"
to which the connector occasionally writes a small amount of data (a UUID,
specifically) to ensure accuracy when backfilling preexisting table contents.
- The default name is
"flow.watermarks"
, but this can be overridden inconfig.json
.
- The default name is
- A database user with appropriate permissions:
REPLICATION CLIENT
andREPLICATION SLAVE
privileges.- Permission to insert, update, and delete on the watermarks table.
- Permission to read the tables being captured.
- Permission to read from
information_schema
tables, if automatic discovery is used.
- If the table(s) to be captured include columns of type
DATETIME
, thetime_zone
system variable must be set to an IANA zone name or numerical offset or the capture configured with atimezone
to use by default.
Setup
To meet these requirements, follow the steps for your hosting type.
Self-hosted MySQL
- Create the watermarks table. This table can have any name and be in any database, so long as the capture's
config.json
file is modified accordingly.
CREATE DATABASE IF NOT EXISTS flow;
CREATE TABLE IF NOT EXISTS flow.watermarks (slot INTEGER PRIMARY KEY, watermark TEXT);
Create the
flow_capture
user with replication permission, the ability to read all tables, and the ability to read and write the watermarks table.The
SELECT
permission can be restricted to just the tables that need to be captured, but automatic discovery requiresinformation_schema
access as well.
CREATE USER IF NOT EXISTS flow_capture
IDENTIFIED BY 'secret'
COMMENT 'User account for Flow MySQL data capture';
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'flow_capture';
GRANT SELECT ON *.* TO 'flow_capture';
GRANT INSERT, UPDATE, DELETE ON flow.watermarks TO 'flow_capture';
- Configure the binary log to retain data for the default MySQL setting of 30 days, if previously set lower.
SET PERSIST binlog_expire_logs_seconds = 2592000;
- Configure the database's time zone. See below for more information.
SET PERSIST time_zone = '-05:00'
Amazon RDS
Estuary recommends creating a read replica in RDS for use with Flow; however, it's not required. You're able to apply the connector directly to the primary instance if you'd like.
Allow connections to the database from the Estuary Flow IP address.
Modify the database, setting Public accessibility to Yes.
Edit the VPC security group associated with your database, or create a new VPC security group and associate it with the database. Refer to the steps in the Amazon documentation. Create a new inbound rule and a new outbound rule that allow all traffic from the IP address
34.121.207.128
.
infoAlternatively, you can allow secure connections via SSH tunneling. To do so:
- Follow the guide to configure an SSH server for tunneling
- When you configure your connector as described in the configuration section above,
including the additional
networkTunnel
configuration to enable the SSH tunnel. See Connecting to endpoints on secure networks for additional details and a sample.
Create a RDS parameter group to enable replication in MySQL.
Create a parameter group. Create a unique name and description and set the following properties:
- Family: mysql8.0
- Type: DB Parameter group
Modify the new parameter group and update the following parameters:
- binlog_format: ROW
- binlog_row_metadata: FULL
- read_only: 0
If using the primary instance (not recommended), associate the parameter group with the database and set Backup Retention Period to 7 days. Reboot the database to allow the changes to take effect.
Create a read replica with the new parameter group applied (recommended).
Create a read replica of your MySQL database.
Modify the replica and set the following:
- DB parameter group: choose the parameter group you created previously
- Backup retention period: 7 days
- Public access: Publicly accessible
Reboot the replica to allow the changes to take effect.
Switch to your MySQL client. Run the following commands to create a new user for the capture with appropriate permissions, and set up the watermarks table:
CREATE DATABASE IF NOT EXISTS flow;
CREATE TABLE IF NOT EXISTS flow.watermarks (slot INTEGER PRIMARY KEY, watermark TEXT);
CREATE USER IF NOT EXISTS flow_capture
IDENTIFIED BY 'secret'
COMMENT 'User account for Flow MySQL data capture';
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'flow_capture';
GRANT SELECT ON *.* TO 'flow_capture';
GRANT INSERT, UPDATE, DELETE ON flow.watermarks TO 'flow_capture';
- Run the following command to set the binary log retention to 7 days, the maximum value which RDS MySQL permits:
CALL mysql.rds_set_configuration('binlog retention hours', 168);
- In the RDS console, note the instance's Endpoint and Port. You'll need these for the
address
property when you configure the connector.
Amazon Aurora
You must apply some of the settings to the entire Aurora DB cluster, and others to a database instance within the cluster (we recommend you use a replica, or reader instance to connect with Flow). For each step, take note of which entity you're working with.
Allow connections to the database from the Estuary Flow IP address.
Modify the instance, choosing Publicly accessible in the Connectivity settings.
Edit the VPC security group associated with your instance, or create a new VPC security group and associate it with the instance. Refer to the steps in the Amazon documentation. Create a new inbound rule and a new outbound rule that allow all traffic from the IP address
34.121.207.128
.
infoAlternatively, you can allow secure connections via SSH tunneling. To do so:
- Follow the guide to configure an SSH server for tunneling
- When you configure your connector as described in the configuration section above,
including the additional
networkTunnel
configuration to enable the SSH tunnel. See Connecting to endpoints on secure networks for additional details and a sample.
Create a RDS parameter group to enable replication on your Aurora DB cluster.
Create a parameter group. Create a unique name and description and set the following properties:
- Family: aurora-mysql8.0
- Type: DB ClusterParameter group
Modify the new parameter group and update the following parameters:
- binlog_format: ROW
- binlog_row_metadata: FULL
- read_only: 0
Associate the parameter group with the DB cluster. While you're modifying the cluster, also set Backup Retention Period to 7 days.
Reboot the cluster to allow the changes to take effect.
Switch to your MySQL client. Run the following commands to create a new user for the capture with appropriate permissions, and set up the watermarks table:
CREATE DATABASE IF NOT EXISTS flow;
CREATE TABLE IF NOT EXISTS flow.watermarks (slot INTEGER PRIMARY KEY, watermark TEXT);
CREATE USER IF NOT EXISTS flow_capture
IDENTIFIED BY 'secret'
COMMENT 'User account for Flow MySQL data capture';
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'flow_capture';
GRANT SELECT ON *.* TO 'flow_capture';
GRANT INSERT, UPDATE, DELETE ON flow.watermarks TO 'flow_capture';
- Run the following command to set the binary log retention to 7 days, the maximum value Aurora permits:
CALL mysql.rds_set_configuration('binlog retention hours', 168);
- In the RDS console, note the instance's Endpoint and Port. You'll need these for the
address
property when you configure the connector.
Google Cloud SQL
Allow connections to the DB instance from the Estuary Flow IP address.
- Enable public IP on your database and add
34.121.207.128
as an authorized IP address.
infoAlternatively, you can allow secure connections via SSH tunneling. To do so:
- Follow the guide to configure an SSH server for tunneling
- When you configure your connector as described in the configuration section above,
including the additional
networkTunnel
configuration to enable the SSH tunnel. See Connecting to endpoints on secure networks for additional details and a sample.
- Enable public IP on your database and add
Set the instance's
binlog_expire_logs_seconds
flag to2592000
.Using Google Cloud Shell or your preferred client, create the watermarks table.
CREATE DATABASE IF NOT EXISTS flow;
CREATE TABLE IF NOT EXISTS flow.watermarks (slot INTEGER PRIMARY KEY, watermark TEXT);
Create the
flow_capture
user with replication permission, the ability to read all tables, and the ability to read and write the watermarks table.The
SELECT
permission can be restricted to just the tables that need to be captured, but automatic discovery requiresinformation_schema
access as well.
CREATE USER IF NOT EXISTS flow_capture
IDENTIFIED BY 'secret'
COMMENT 'User account for Flow MySQL data capture';
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'flow_capture';
GRANT SELECT ON *.* TO 'flow_capture';
GRANT INSERT, UPDATE, DELETE ON flow.watermarks TO 'flow_capture';
- In the Cloud Console, note the instance's host under Public IP Address. Its port will always be
3306
. Together, you'll use the host:port as theaddress
property when you configure the connector.
Azure Database for MySQL
Allow connections to the database from the Estuary Flow IP address.
- Create a new firewall rule
that grants access to the IP address
34.121.207.128
.
infoAlternatively, you can allow secure connections via SSH tunneling. To do so:
- Follow the guide to configure an SSH server for tunneling
- When you configure your connector as described in the configuration section above,
including the additional
networkTunnel
configuration to enable the SSH tunnel. See Connecting to endpoints on secure networks for additional details and a sample.
- Create a new firewall rule
that grants access to the IP address
Set the
binlog_expire_logs_seconds
server perameter to2592000
.Using MySQL workbench or your preferred client, create the watermarks table.
Your username must be specified in the format username@servername
.
CREATE DATABASE IF NOT EXISTS flow;
CREATE TABLE IF NOT EXISTS flow.watermarks (slot INTEGER PRIMARY KEY, watermark TEXT);
Create the
flow_capture
user with replication permission, the ability to read all tables, and the ability to read and write the watermarks table.The
SELECT
permission can be restricted to just the tables that need to be captured, but automatic discovery requiresinformation_schema
access as well.
CREATE USER IF NOT EXISTS flow_capture
IDENTIFIED BY 'secret'
COMMENT 'User account for Flow MySQL data capture';
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'flow_capture';
GRANT SELECT ON *.* TO 'flow_capture';
GRANT INSERT, UPDATE, DELETE ON flow.watermarks TO 'flow_capture';
- Note the instance's host under Server name, and the port under Connection Strings (usually
3306
). Together, you'll use the host:port as theaddress
property when you configure the connector.
Setting the MySQL time zone
MySQL's time_zone
server system variable is set to SYSTEM
by default.
If you intend to capture tables including columns of the type DATETIME
,
and time_zone
is set to SYSTEM
,
Flow won't be able to detect the time zone and convert the column to RFC3339 format.
To avoid this, you must explicitly set the time zone for your database.
You can:
Specify a numerical offset from UTC.
- For MySQL version 8.0.19 or higher, values from
-13:59
to+14:00
, inclusive, are permitted. - Prior to MySQL 8.0.19, values from
-12:59
to+13:00
, inclusive, are permitted
- For MySQL version 8.0.19 or higher, values from
Specify a named timezone in IANA timezone format.
If you're using Amazon Aurora, create or modify the DB cluster parameter group associated with your MySQL database. Set the
time_zone
parameter to the correct value.
For example, if you're located in New Jersey, USA, you could set time_zone
to -05:00
or -04:00
, depending on the time of year.
Because this region observes daylight savings time, you'd be responsible for changing the offset.
Alternatively, you could set time_zone
to America/New_York
, and time changes would occur automatically.
If using IANA time zones, your database must include time zone tables. Learn more in the MySQL docs.
If you are unable to set the time_zone
in the database and need to capture tables with DATETIME
columns, the capture can be configured to assume a time zone using the timezone
configuration property (see below). The timezone
configuration property can be set as a numerical offset or IANA timezone format.
Backfills and performance considerations
When the a MySQL capture is initiated, by default, the connector first backfills, or captures the targeted tables in their current state. It then transitions to capturing change events on an ongoing basis.
This is desirable in most cases, as in ensures that a complete view of your tables is captured into Flow. However, you may find it appropriate to skip the backfill, especially for extremely large tables.
In this case, you may turn of backfilling on a per-table basis. See properties for details.
Configuration
You configure connectors either in the Flow web app, or by directly editing the catalog specification file. See connectors to learn more about using connectors. The values and specification sample below provide configuration details specific to the MySQL source connector.
Properties
Endpoint
Property | Title | Description | Type | Required/Default |
---|---|---|---|---|
/address | Server Address | The host or host:port at which the database can be reached. | string | Required |
/user | Login User | The database user to authenticate as. | string | Required, "flow_capture" |
/password | Login Password | Password for the specified database user. | string | Required |
/timezone | Timezone | Timezone to use when capturing datetime columns. Should normally be left blank to use the database's 'time_zone' system variable. Only required if the 'time_zone' system variable cannot be read and columns with type datetime are being captured. Must be a valid IANA time zone name or +HH:MM offset. Takes precedence over the 'time_zone' system variable if both are set. | string | |
/advanced/watermarks_table | Watermarks Table Name | The name of the table used for watermark writes. Must be fully-qualified in '<schema>.<table>' form. | string | "flow.watermarks" |
/advanced/dbname | Database Name | The name of database to connect to. In general this shouldn't matter. The connector can discover and capture from all databases it's authorized to access. | string | "mysql" |
/advanced/node_id | Node ID | Node ID for the capture. Each node in a replication cluster must have a unique 32-bit ID. The specific value doesn't matter so long as it is unique. If unset or zero the connector will pick a value. | integer | |
/advanced/skip_backfills | Skip Backfills | A comma-separated list of fully-qualified table names which should not be backfilled. | string | |
/advanced/backfill_chunk_size | Backfill Chunk Size | The number of rows which should be fetched from the database in a single backfill query. | integer | 131072 |
/advanced/skip_binlog_retention_check | Skip Binlog Retention Sanity Check | Bypasses the 'dangerously short binlog retention' sanity check at startup. Only do this if you understand the danger and have a specific need. | boolean |
Bindings
Property | Title | Description | Type | Required/Default |
---|---|---|---|---|
/namespace | Namespace | The database/schema in which the table resides. | string | Required |
/stream | Stream | Name of the table to be captured from the database. | string | Required |
/syncMode | Sync mode | Connection method. Always set to incremental . | string | Required |
When you configure this connector in the web application, the automatic discovery process sets up a binding for most tables it finds in your database, but there are exceptions.
Tables in the MySQL system schemas information_schema
, mysql
, performance_schema
, and sys
will not be discovered.
You can add bindings for such tables manually.
Sample
A minimal capture definition will look like the following:
captures:
${PREFIX}/${CAPTURE_NAME}:
endpoint:
connector:
image: ghcr.io/estuary/source-mysql:dev
config:
address: "127.0.0.1:3306"
user: "flow_capture"
password: "secret"
bindings:
- resource:
namespace: ${TABLE_NAMESPACE}
stream: ${TABLE_NAME}
syncMode: incremental
target: ${PREFIX}/${COLLECTION_NAME}
Your capture definition will likely be more complex, with additional bindings for each table in the source database.
Learn more about capture definitions.
Troubleshooting Capture Errors
The source-mysql
connector is designed to halt immediately if something wrong or unexpected happens, instead of continuing on and potentially outputting incorrect data. What follows is a non-exhaustive list of some potential failure modes, and what action should be taken to fix these situations:
Unsupported Operations
If your capture is failing with an "unsupported operation {ALTER,DROP,TRUNCATE,etc} TABLE"
error, this indicates that such an operation has taken place impacting a table which is currently being captured.
In the case of DROP TABLE
and other destructive operations this is not supported, and can only be resolved by removing the offending table(s) from the capture bindings list, after which you may recreate the capture if desired (causing the latest state of the table to be recaptured in its entirety).
In the case of ALTER TABLE
we currently support table alterations to add or drop columns from a table. This error indicates that whatever alteration took place is not currently supported. Practically speaking the immediate resolution is the same as for a DROP
or TRUNCATE TABLE
, but if you frequently perform schema migrations it may be worth reaching out to see if we can add support for whatever table alteration you just did.
Data Manipulation Queries
If your capture is failing with an "unsupported DML query"
error, this means that an INSERT
, UPDATE
, DELETE
or other data manipulation query is present in the MySQL binlog. This should generally not happen if binlog_format = 'ROW'
as described in the Prerequisites section.
Resolving this error requires fixing the binlog_format
system variable, and then either tearing down and recreating the entire capture so that it restarts at a later point in the binlog, or in the case of an INSERT
/DELETE
query it may suffice to remove the capture binding for the offending table and then re-add it.
Unhandled Queries
If your capture is failing with an "unhandled query"
error, some SQL query is present in the binlog which the connector does not (currently) understand.
In general, this error suggests that the connector should be modified to at least recognize this type of query, and most likely categorize it as either an unsupported DML Query, an unsupported Table Operation, or something that can safely be ignored. Until such a fix is made the capture cannot proceed, and you will need to tear down and recreate the entire capture so that it restarts from a later point in the binlog.
Metadata Errors
If your capture is failing with a "metadata error"
then something has gone badly wrong with the capture's tracking of table metadata, such as column names or datatypes.
This should never happen, and most likely means that the MySQL binlog itself is corrupt in some way. If this occurs, it can be resolved by removing the offending table(s) from the capture bindings list and then recreating the capture (generally into a new collection, as this process will cause the table to be re-captured in its entirety).
Insufficient Binlog Retention
If your capture fails with a "binlog retention period is too short"
error, it is informing you that the MySQL binlog retention period is set to a dangerously low value, and your capture would risk unrecoverable failure if it were paused or the server became unreachable for a nontrivial amount of time, such that the database expired a binlog segment that the capture was still reading from.
(If this were to happen, then change events would be permanently lost and that particular capture would never be able to make progress without potentially producing incorrect data. Thus the capture would need to be torn down and recreated so that each table could be re-captured in its entirety, starting with a complete backfill of current contents.)
The "binlog retention period is too short"
error should normally be fixed by setting binlog_expire_logs_seconds = 2592000
as described in the Prerequisites section (and when running on a managed cloud platform additional steps may be required, refer to the managed cloud setup instructions above). However, advanced users who understand the risks can use the skip_binlog_retention_check
configuration option to disable this safety.
Empty Collection Key
Every Flow collection must declare a key which is used to group its documents. When testing your capture, if you encounter an error indicating collection key cannot be empty, you will need to either add a key to the table in your source, or manually edit the generated specification and specify keys for the collection before publishing to the catalog as documented here.