Skip to main content

Google Cloud SQL for PostgreSQL

This connector materializes Flow collections into tables in a Google Cloud SQL for PostgreSQL database.

It is available for use in the Flow web application. For local development or open-source workflows, ghcr.io/estuary/materialize-postgres:dev provides the latest version of the connector as a Docker image. You can also follow the link in your browser to see past image versions.

Prerequisites

To use this connector, you'll need:

  • A Postgres database to which to materialize, and user credentials. The connector will create new tables in the database per your specification. Tables created manually in advance are not supported.
  • At least one Flow collection

Setup

You must configure your database to allow connections from Estuary. There are two ways to do this: by granting direct access to Flow's IP or by creating an SSH tunnel.

Conenecting Directly to Google Cloud SQL

  1. Enable public IP on your database and add 34.121.207.128 as an authorized IP address.

Connect With SSH Tunneling

To allow SSH tunneling to a database instance hosted on Google Cloud, you must set up a virtual machine (VM).

  1. Begin by finding your public SSH key on your local machine. In the .ssh subdirectory of your user home directory, look for the PEM file that contains the private SSH key. Check that it starts with -----BEGIN RSA PRIVATE KEY-----, which indicates it is an RSA-based file.

    • If no such file exists, generate one using the command:
       ssh-keygen -m PEM -t rsa
    • If a PEM file exists, but starts with -----BEGIN OPENSSH PRIVATE KEY-----, convert it with the command:
       ssh-keygen -p -N "" -m pem -f /path/to/key
    • If your Google login differs from your local username, generate a key that includes your Google email address as a comment:
       ssh-keygen -m PEM -t rsa -C user@domain.com
  2. Create and start a new VM in GCP, choosing an image that supports OS Login.

  3. Add your public key to the VM.

  4. Reserve an external IP address and connect it to the VM during setup. Note the generated address.

Configuration Tip

To configure the connector, you must specify the database address in the format host:port. (You can also supply host only; the connector will use the port 5432 by default, which is correct in many cases.) You can find the host and port in the following location:

  • Host as Private IP Address; port is always 5432. You may need to configure private IP on your database.

Configuration

To use this connector, begin with data in one or more Flow collections. Use the below properties to configure a Postgres materialization, which will direct one or more of your Flow collections to your desired tables, or views, in the database.

Properties

Endpoint

PropertyTitleDescriptionTypeRequired/Default
/databaseDatabaseName of the logical database to materialize to.string
/addressAddressHost and port of the database. If only the host is specified, port will default to 5432.stringRequired
/passwordPasswordPassword for the specified database user.stringRequired
/schemaDatabase SchemaDatabase schema to use for materialized tables (unless overridden within the binding resource configuration) as well as associated materialization metadata tablesstring"public"
/userUserDatabase user to connect as.stringRequired
/advancedAdvanced OptionsOptions for advanced users. You should not typically need to modify these.object
/advanced/sslmodeSSL ModeOverrides SSL connection behavior by setting the 'sslmode' parameter.string

Bindings

PropertyTitleDescriptionTypeRequired/Default
/additional_table_create_sqlAdditional Table Create SQLAdditional SQL statement(s) to be run in the same transaction that creates the table.string
/delta_updatesDelta UpdateShould updates to this table be done via delta updates.booleanfalse
/schemaAlternative SchemaAlternative schema for this table (optional). Overrides schema set in endpoint configuration.string
/tableTableTable name to materialize to. It will be created by the connector, unless the connector has previously created it.stringRequired

Sample

materializations:
${PREFIX}/${mat_name}:
endpoint:
connector:
image: ghcr.io/estuary/materialize-postgres:dev
config:
database: flow
address: localhost:5432
password: flow
user: flow
bindings:
- resource:
table: ${TABLE_NAME}
source: ${PREFIX}/${COLLECTION_NAME}

Delta updates

This connector supports both standard (merge) and delta updates. The default is to use standard updates.

Reserved words

PostgreSQL has a list of reserved words that must be quoted in order to be used as an identifier. Flow considers all the reserved words that are marked as "reserved" in any of the columns in the official PostgreSQL documentation.

These reserve words are listed in the table below. Flow automatically quotes fields that are in this list.

Reserved words
abscurrent_transform_group_for_typeindicatorordersqlexception
absolutecurrent_userinitialoutsqlstate
acoscursorinitiallyoutersqlwarning
actioncycleinneroutputsqrt
adddatalinkinoutoverstart
alldateinputoverlapsstatic
allocatedayinsensitiveoverlaystddev_pop
alterdeallocateinsertpadstddev_samp
analysedecintparametersubmultiset
analyzedecfloatintegerpartialsubset
anddecimalintersectpartitionsubstring
anydeclareintersectionpatternsubstring_regex
aredefaultintervalpersucceeds
arraydeferrableintopercentsum
array_aggdeferredispercentile_contsymmetric
array_max_cardinalitydefineisnullpercentile_discsystem
asdeleteisolationpercent_ranksystem_time
ascdense_rankjoinperiodsystem_user
asensitivederefjson_arraypermutetable
asindescjson_arrayaggplacingtablesample
assertiondescribejson_existsportiontan
asymmetricdescriptorjson_objectpositiontanh
atdeterministicjson_objectaggposition_regextemporary
atandiagnosticsjson_querypowerthen
atomicdisconnectjson_tableprecedestime
authorizationdistinctjson_table_primitiveprecisiontimestamp
avgdlnewcopyjson_valuepreparetimezone_hour
begindlpreviouscopykeypreservetimezone_minute
begin_framedlurlcompletelagprimaryto
begin_partitiondlurlcompleteonlylanguagepriortrailing
betweendlurlcompletewritelargeprivilegestransaction
bigintdlurlpathlastproceduretranslate
binarydlurlpathonlylast_valueptftranslate_regex
bitdlurlpathwritelateralpublictranslation
bit_lengthdlurlschemeleadrangetreat
blobdlurlserverleadingranktrigger
booleandlvalueleftreadtrim
bothdolevelreadstrim_array
bydomainlikerealtrue
calldoublelike_regexrecursivetruncate
calleddroplimitrefuescape
cardinalitydynamiclistaggreferencesunion
cascadeeachlnreferencingunique
cascadedelementlocalregr_avgxunknown
caseelselocaltimeregr_avgyunmatched
castemptylocaltimestampregr_countunnest
catalogendlogregr_interceptupdate
ceilend-execlog10regr_r2upper
ceilingend_framelowerregr_slopeusage
charend_partitionmatchregr_sxxuser
characterequalsmatchesregr_sxyusing
character_lengthescapematch_numberregr_syyvalue
char_lengtheverymatch_recognizerelativevalues
checkexceptmaxreleasevalue_of
classifierexceptionmeasuresrestrictvarbinary
clobexecmemberresultvarchar
closeexecutemergereturnvariadic
coalesceexistsmethodreturningvarying
collateexpminreturnsvar_pop
collationexternalminuterevokevar_samp
collectextractmodrightverbose
columnfalsemodifiesrollbackversioning
commitfetchmodulerollupview
concurrentlyfiltermonthrowwhen
conditionfirstmultisetrowswhenever
connectfirst_valuenamesrow_numberwhere
connectionfloatnationalrunningwidth_bucket
constraintfloornaturalsavepointwindow
constraintsforncharschemawith
containsforeignnclobscopewithin
continuefoundnewscrollwithout
convertframe_rownextsearchwork
copyfreenosecondwrite
corrfreezenonesectionxml
correspondingfromnormalizeseekxmlagg
cosfullnotselectxmlattributes
coshfunctionnotnullsensitivexmlbinary
countfusionnth_valuesessionxmlcast
covar_popgetntilesession_userxmlcomment
covar_sampglobalnullsetxmlconcat
creategonullifshowxmldocument
crossgotonumericsimilarxmlelement
cubegrantoccurrences_regexsinxmlexists
cume_distgroupoctet_lengthsinhxmlforest
currentgroupingofsizexmliterate
current_cataloggroupsoffsetskipxmlnamespaces
current_datehavingoldsmallintxmlparse
current_default_transform_groupholdomitsomexmlpi
current_pathhouronspacexmlquery
current_roleidentityonespecificxmlserialize
current_rowilikeonlyspecifictypexmltable
current_schemaimmediateopensqlxmltext
current_timeimportoptionsqlcodexmlvalidate
current_timestampinorsqlerroryear

Changelog

The changelog includes a list of breaking changes made to this connector. Backwards-compatible changes are not listed.

Proceed with caution when editing materializations created with previous versions of this connector; editing always upgrades your materialization to the latest connector version.

V4: 2022-11-30

This version includes breaking changes to materialized table columns. These provide more consistent column names and types, but tables created from previous versions of the connector may not be compatible with this version.

  • Capitalization is now preserved when fields in Flow are converted to Postgres column names. Previously, fields containing uppercase letters were converted to lowercase.

  • Field names and values of types date, duration, ipv4, ipv6, macaddr, macaddr8, and time are now converted into their corresponding Postgres types. Previously, only date-time was converted, and all others were materialized as strings.