Skip to main content

Amazon RDS for MySQL

This connector materializes Flow collections into tables in a MySQL database.

It is available for use in the Flow web application. For local development or open-source workflows, ghcr.io/estuary/materialize-mysql:dev provides the latest version of the connector as a Docker image. You can also follow the link in your browser to see past image versions.

Prerequisites

To use this connector, you'll need:

  • A MySQL database to which to materialize, and user credentials.
    • MySQL versions 5.7 and later are supported
    • The connector will create new tables in the database per your specification, so user credentials must have access to create new tables.
    • The local_infile global variable must be enabled. You can enable this setting by running SET GLOBAL local_infile = true in your database.
  • At least one Flow collection

Setup

You must configure your database to allow connections from Estuary. There are two ways to do this: by granting direct access to Flow's IP or by creating an SSH tunnel.

Connecting Directly With Amazon RDS

  1. Edit the VPC security group associated with your database instance, or create a new VPC security group and associate it with the database instance.

    1. Modify the instance, choosing Publicly accessible in the Connectivity settings.

    2. Per the steps in the Amazon documentation, create a new inbound rule and a new outbound rule that allow all traffic from the Estuary Flow IP addresses.

Connect With SSH Tunneling

To allow SSH tunneling to a database instance hosted on AWS, you'll need to create a virtual computing environment, or instance, in Amazon EC2.

  1. Begin by finding your public SSH key on your local machine. In the .ssh subdirectory of your user home directory, look for the PEM file that contains the private SSH key. Check that it starts with -----BEGIN RSA PRIVATE KEY-----, which indicates it is an RSA-based file.

    • If no such file exists, generate one using the command:
       ssh-keygen -m PEM -t rsa
    • If a PEM file exists, but starts with -----BEGIN OPENSSH PRIVATE KEY-----, convert it with the command:
       ssh-keygen -p -N "" -m pem -f /path/to/key
  2. Import your SSH key into AWS.

  3. Launch a new instance in EC2. During setup:

    • Configure the security group to allow SSH connection from anywhere.
    • When selecting a key pair, choose the key you just imported.
  4. Connect to the instance, setting the user name to ec2-user.

  5. Find and note the instance's public DNS. This will be formatted like: ec2-198-21-98-1.compute-1.amazonaws.com.

  • Connect with SSH tunneling

    1. Refer to the guide to configure an SSH server on the cloud platform of your choice.

    2. Configure your connector as described in the configuration section above, with the additional of the networkTunnel stanza to enable the SSH tunnel, if using. See Connecting to endpoints on secure networks for additional details and a sample.

Configuration Tip

To configure the connector, you must specify the database address in the format host:port. (You can also supply host only; the connector will use the port 3306 by default, which is correct in many cases.) You can find the host and port in the following locations in each platform's console:

  • Amazon RDS: host as Endpoint; port as Port.

Configuration

To use this connector, begin with data in one or more Flow collections. Use the below properties to configure a MySQL materialization, which will direct one or more of your Flow collections to your desired tables, or views, in the database.

Properties

Endpoint

PropertyTitleDescriptionTypeRequired/Default
/databaseDatabaseName of the logical database to materialize to.stringRequired
/addressAddressHost and port of the database. If only the host is specified, port will default to 3306.stringRequired
/passwordPasswordPassword for the specified database user.stringRequired
/userUserDatabase user to connect as.stringRequired
/timezoneTimezoneTimezone to use when materializing datetime columns. Should normally be left blank to use the database's 'time_zone' system variable. Only required if the 'time_zone' system variable cannot be read. Must be a valid IANA time zone name or +HH:MM offset. Takes precedence over the 'time_zone' system variable if both are set.string
/advancedAdvanced OptionsOptions for advanced users. You should not typically need to modify these.object
/advanced/sslmodeSSL ModeOverrides SSL connection behavior by setting the 'sslmode' parameter.string
/advanced/ssl_server_caSSL Server CAOptional server certificate authority to use when connecting with custom SSL modestring
/advanced/ssl_client_certSSL Client CertificateOptional client certificate to use when connecting with custom SSL mode.string
/advanced/ssl_client_keySSL Client KeyOptional client key to use when connecting with custom SSL mode.string

Setting the MySQL time zone

MySQL's time_zone server system variable is set to SYSTEM by default.

If you intend to materialize collections including fields of with format: date-time or format: time, and time_zone is set to SYSTEM, Flow won't be able to detect the time zone and convert datetimes to the appropriate timezone when materializing. To avoid this, you must explicitly set the time zone for your database.

You can:

  • Specify a numerical offset from UTC.

    • For MySQL version 8.0.19 or higher, values from -13:59 to +14:00, inclusive, are permitted.
    • Prior to MySQL 8.0.19, values from -12:59 to +13:00, inclusive, are permitted
  • Specify a named timezone in IANA timezone format.

  • If you're using Amazon Aurora, create or modify the DB cluster parameter group associated with your MySQL database. Set the time_zone parameter to the correct value.

For example, if you're located in New Jersey, USA, you could set time_zone to -05:00 or -04:00, depending on the time of year. Because this region observes daylight savings time, you'd be responsible for changing the offset. Alternatively, you could set time_zone to America/New_York, and time changes would occur automatically.

If using IANA time zones, your database must include time zone tables. Learn more in the MySQL docs.

Materialize Timezone Configuration

If you are unable to set the time_zone in the database and need to materialize collections with date-time or time fields, the materialization can be configured to assume a time zone using the timezone configuration property (see above). The timezone configuration property can be set as a numerical offset or IANA timezone format.

SSL Mode

Possible values:

  • disabled: A plain unencrypted connection is established with the server
  • preferred: Only use SSL connection if the server asks for it
  • required: Connect using an SSL connection, but do not verify the server's certificate.
  • verify_ca: Connect using an SSL connection, and verify the server's certificate against the given SSL Server CA, but does not verify the server's hostname. This option is most commonly used when connecting to an IP address which does not have a hostname to be verified. When using this mode, SSL Server CA must be provided.
  • verify_identity: Connect using an SSL connection, verify the server's certificate and the server's hostname. This is the most secure option. When using this mode, SSL Server CA must be provided.

Optionally, SSL Client Certificate and Key can be provided if necessary to authorize the client.

Bindings

PropertyTitleDescriptionTypeRequired/Default
/tableTableTable name to materialize to. It will be created by the connector, unless the connector has previously created it.stringRequired
/delta_updatesDelta UpdateShould updates to this table be done via delta updates.booleanfalse

Sample

materializations:
${PREFIX}/${mat_name}:
endpoint:
connector:
image: ghcr.io/estuary/materialize-mysql:dev
config:
database: flow
address: localhost:5432
password: flow
user: flow
bindings:
- resource:
table: ${TABLE_NAME}
source: ${PREFIX}/${COLLECTION_NAME}

Delta updates

This connector supports both standard (merge) and delta updates. The default is to use standard updates.

Date & times

Date and time fields that are part of collections, which specify a format: date-time for the field, are automatically converted to UTC and persisted as UTC DATETIME in MySQL.

Reserved words

MySQL has a list of reserved words that must be quoted in order to be used as an identifier. Flow considers all the reserved words in the official MySQL documentation.

These reserved words are listed in the table below. Flow automatically quotes fields that are in this list.

Reserved words
accessibleclonedescribefloatint
accountclosedescriptionfloat4int1
actioncoalescedes_key_filefloat8int2
activecodedeterministicflushint3
addcollatediagnosticsfollowingint4
admincollationdirectoryfollowsint8
aftercolumndisableforinteger
againstcolumnsdiscardforceintersect
aggregatecolumn_formatdiskforeigninterval
algorithmcolumn_namedistinctformatinto
allcommentdistinctrowfoundinvisible
altercommitdivfrominvoker
alwayscommitteddofullio
analysecompactdoublefulltextio_after_gtid
analyzecompletiondropfunctionio_before_gti
andcomponentdualgeneralio_thread
anycompresseddumpfilegenerateipc
arraycompressionduplicategeneratedis
asconcurrentdynamicgeomcollectioisolation
ascconditioneachgeometryissuer
asciiconnectionelsegeometrycolleiterate
asensitiveconsistentelseifgetjoin
atconstraintemptyget_formatjson
attributeconstraint_caenableget_master_pujson_table
authenticatioconstraint_naenclosedget_source_pujson_value
autoextend_siconstraint_scencryptionglobalkey
auto_incremencontainsendgrantkeyring
avgcontextendsgrantskeys
avg_row_lengtcontinueenforcedgroupkey_block_siz
backupconvertenginegroupingkill
beforecpuenginesgroupslag
begincreateengine_attribgroup_replicalanguage
betweencrossenumgtid_onlylast
bigintcubeerrorhandlerlast_value
binarycume_disterrorshashlateral
binlogcurrentescapehavinglead
bitcurrent_dateescapedhelpleading
blobcurrent_timeeventhigh_priorityleave
blockcurrent_timeseventshistogramleaves
boolcurrent_usereveryhistoryleft
booleancursorexcepthostless
bothcursor_nameexchangehostslevel
btreedataexcludehourlike
bucketsdatabaseexecutehour_microseclimit
bulkdatabasesexistshour_minutelinear
bydatafileexithour_secondlines
bytedateexpansionidentifiedlinestring
cachedatetimeexpireiflist
calldayexplainignoreload
cascadeday_hourexportignore_serverlocal
cascadedday_microsecoextendedimportlocaltime
caseday_minuteextent_sizeinlocaltimestam
catalog_nameday_secondfactorinactivelock
chaindeallocatefailedloginindexlocked
challenge_resdecfalseindexeslocks
changedecimalfastinfilelogfile
changeddeclarefaultsinitiallogs
channeldefaultfetchinitial_sizelong
chardefault_authfieldsinitiatelongblob
characterdefinerfileinnerlongtext
charsetdefinitionfile_block_siinoutloop
checkdelayedfilterinsensitivelow_priority
checksumdelay_key_wrifinishinsertmaster
cipherdeletefirstinsert_methodmaster_auto_p
class_origindense_rankfirst_valueinstallmaster_bind
clientdescfixedinstancemaster_compre
master_connecneverpreserverestrictsource_host
master_delaynewprevresumesource_log_fi
master_heartbnextprimaryretainsource_log_po
master_hostnoprivilegesreturnsource_passwo
master_log_finodegroupprivilege_chereturned_sqlssource_port
master_log_pononeprocedurereturningsource_public
master_passwonotprocessreturnssourceretry
master_portnowaitprocesslistreusesource_ssl
master_publicno_waitprofilereversesource_ssl_ca
masterretryno_write_to_bprofilesrevokesource_ssl_ca
master_servernth_valueproxyrightsource_ssl_ce
master_sslntilepurgerlikesource_ssl_ci
master_ssl_canullquarterrolesource_ssl_cr
master_ssl_canullsqueryrollbacksource_ssl_cr
master_ssl_cenumberquickrollupsource_ssl_ke
master_ssl_cinumericrandomrotatesource_ssl_ve
master_ssl_crnvarcharrangeroutinesource_tls_ci
master_ssl_crofrankrowsource_tls_ve
master_ssl_keoffreadrowssource_user
master_ssl_veoffsetreadsrow_countsource_zstd_c
master_tls_ciojread_onlyrow_formatspatial
master_tls_veoldread_writerow_numberspecific
master_useronrealrtreesql
master_zstd_conerebuildsavepointsqlexception
matchonlyrecoverschedulesqlstate
maxvalueopenrecursiveschemasqlwarning
max_connectiooptimizeredofileschemassql_after_gti
max_queries_poptimizer_cosredo_buffer_sschema_namesql_after_mts
max_rowsoptionredundantsecondsql_before_gt
max_sizeoptionalreferencesecondarysql_big_resul
max_updates_poptionallyreferencessecondary_engsql_buffer_re
max_user_connoptionsregexpsecondary_engsql_cache
mediumorregistrationsecondary_loasql_calc_foun
mediumbloborderrelaysecondary_unlsql_no_cache
mediumintordinalityrelaylogsecond_microssql_small_res
mediumtextorganizationrelay_log_filsecuritysql_thread
memberothersrelay_log_posselectsql_tsi_day
memoryoutrelay_threadsensitivesql_tsi_hour
mergeouterreleaseseparatorsql_tsi_minut
message_textoutfilereloadserialsql_tsi_month
microsecondoverremoteserializablesql_tsi_quart
middleintownerremoveserversql_tsi_secon
migratepack_keysrenamesessionsql_tsi_week
minutepagereorganizesetsql_tsi_year
minute_microsparserrepairsharesrid
minute_secondpartialrepeatshowssl
min_rowspartitionrepeatableshutdownstacked
modpartitioningreplacesignalstart
modepartitionsreplicasignedstarting
modifiespasswordreplicassimplestarts
modifypassword_lockreplicatedoskipstats_auto_re
monthpathreplicatedoslavestats_persist
multilinestripercent_rankreplicate_ignslowstatssample
multipointpersistreplicate_ignsmallintstatus
multipolygonpersist_onlyreplicate_rewsnapshotstop
mutexphasereplicate_wilsocketstorage
mysql_errnopluginreplicate_wilsomestored
namepluginsreplicationsonamestraight_join
namesplugin_dirrequiresoundsstream
nationalpointrequire_row_fsourcestring
naturalpolygonresetsource_auto_psubclass_orig
ncharportresignalsource_bindsubject
ndbprecedesresourcesource_compresubpartition
ndbclusterprecedingrespectsource_connecsubpartitions
nestedprecisionrestartsource_delaysuper
network_namespreparerestoresource_heartbsuspend
swapstimestampdiffundo_buffer_sutc_datewhen
switchestinyblobunicodeutc_timewhere
systemtinyintuninstallutc_timestampwhile
tabletinytextunionvalidationwindow
tablestlsuniquevaluewith
tablespacetounknownvalueswithout
table_checksutrailingunlockvarbinarywork
table_nametransactionunregistervarcharwrapper
temporarytriggerunsignedvarcharacterwrite
temptabletriggersuntilvariablesx509
terminatedtrueupdatevaryingxa
texttruncateupgradevcpuxid
thantypeurlviewxml
thentypesusagevirtualxor
thread_prioriunboundedusevisibleyear
tiesuncommitteduserwaityear_month
timeundefineduser_resourcewarningszerofill
timestampundouse_frmweekzone
timestampaddundofileusingweight_string

Changelog

The changelog includes a list of breaking changes made to this connector. Backwards-compatible changes are not listed.

Proceed with caution when editing materializations created with previous versions of this connector; editing always upgrades your materialization to the latest connector version.

V1: 2023-08-21

  • First version