How to transform data using SQL
This guide will teach you how to write and publish a simple SQL derivation that you can use to transform data from one collection to another.
Introduction
This tutorial will show you how to implement a stateless transformation using SQL. You’ll learn how to implement a flow that transforms events coming from the live, real-time Wikipedia API.
Setting up your development environment
In order to implement transformations through derivations, you’ll need to set up your development environment. You’ll need a text editor and flowctl, the CLI-tool for Flow installed on your machine. Check out the docs page on installation instructions.
Before continuing, sign in to the Estuary Flow dashboard, make sure you enable access to the Wikipedia demo. Using flowctl
, quickly verify you are able to view the demo collections used in this guide.
Execute the below command to display the documents in the demo/wikipedia/recentchange-sampled
collection:
This collection is a 3% sample of the enormous demo/wikipedia/recentchange
collection which contains millions of documents. Since the purpose of this tutorial is to demonstrate a proof of concept, we avoid publishing a derivation that processes hundreds of gigabytes of data.
flowctl collections read --collection demo/wikipedia/recentchange-sampled --uncommitted
If you see a stream of JSON documents on your terminal, you’re all good - feel free to cancel the process by pressing C^C
.
Examine a sample JSON that lives in the demo collection, as this is the data you’ll be using as the input for our derivation.
{
"$schema": "/mediawiki/recentchange/1.0.0",
"_meta": {
"file": "recentchange",
"offset": 12837,
"uuid": "f8f07d87-f5bf-11ee-8401-4fdf95f7b91a"
},
"bot": false,
"comment": "[[:File:Jeton. Ordinaire des guerres - btv1b10405460g (1 of 2).jpg]] added to category",
"id": 2468434138,
"meta": {
"domain": "commons.wikimedia.org",
"dt": "2024-04-08T15:52:13Z",
"id": "d9e8698f-4eac-4262-a451-b7ca247e401c",
"offset": 5008568732,
"partition": 0,
"request_id": "b5372124-63fa-45e1-b35e-86784f1692bc",
"stream": "mediawiki.recentchange",
"topic": "eqiad.mediawiki.recentchange",
"uri": "https://commons.wikimedia.org/wiki/Category:Jetons"
},
"namespace": 14,
"notify_url": "https://commons.wikimedia.org/w/index.php?diff=866807860&oldid=861559382&rcid=2468434138",
"parsedcomment": "<a href=\"/wiki/File:Jeton._Ordinaire_des_guerres_-_btv1b10405460g_(1_of_2).jpg\" title=\"File:Jeton. Ordinaire des guerres - btv1b10405460g (1 of 2).jpg\">File:Jeton. Ordinaire des guerres - btv1b10405460g (1 of 2).jpg</a> added to category",
"server_name": "commons.wikimedia.org",
"server_script_path": "/w",
"server_url": "https://commons.wikimedia.org",
"timestamp": 1712591533,
"title": "Category:Jetons",
"title_url": "https://commons.wikimedia.org/wiki/Category:Jetons",
"type": "categorize",
"user": "DenghiùComm",
"wiki": "commonswiki"
}
The transformation in this tutorial will make use of the length
, bot
and user_id
fields to calculate how many lines a given non-bot user has modified on a day.
{
...
"user_id": "User"
"bot": 0
"length": 1253
...
}
Writing the derivation
Set up your folder structure so you can organize the resources required for the derivation. Create a working directory to follow along, and inside, create a flow.yaml
file.
Inside your flow.yaml
file, add the following contents:
---
collections:
Dani/derivation-tutorial/edits-by-users:
schema:
type: object
properties:
user_id:
type: string
date:
format: date
type: string
total_edits:
reduce:
strategy: sum
type: number
total_new_lines:
reduce:
strategy: sum
type: number
reduce:
strategy: merge
required:
- date
- user_id
key:
- /date
- /user_id
derive:
using:
sqlite: {}
transforms:
- name: edits_by_users
source: demo/wikipedia/recentchange-sampled
shuffle: any
lambda: |
select
$user as user_id,
substr($meta$dt,1,10) as date,
1 as total_edits,
coalesce($length$new - $length$old, 0) as total_new_lines
where $type = 'edit' and $user is not null and $bot = 0;
The Flow consists of just one collection, which is what you define here, called edits-by-users
.
Let’s go over this in a bit more detail.
First of all, the collection needs a schema. The schema of the incoming data (also called the “write” schema) is already defined by the demo, you only have to define the schema of the documents the transformation will output, which is the “read” schema.
In the flow.yaml
file, the schema is defined in-line with the rest of the configuration.
schema:
type: object
properties:
user_id:
type: string
date:
format: date
type: string
total_edits:
reduce:
strategy: sum
type: number
total_new_lines:
reduce:
strategy: sum
type: number
reduce:
strategy: merge
required:
- date
- user_id
As you can see, this schema includes less fields than what is available in the incoming documents, this is expected, but if you wish to include more, this is where you would add them first.
The user_id
and date
fields do not contain any modifications, but the other two have their reduction strategy defined as well to be sum
. This strategy reduces two numbers or integers by adding their values.
To learn more about how reduction strategies work, check out the documentation page.
Moving on, the next section in the yaml file defines the key of the documents.
key:
- /date
- /user_id
Every Flow collection must declare a key which is used to group its documents. Keys are specified as an array of JSON pointers to document locations. The important detail here is to know that a collection key instructs Flow how documents of a collection are to be reduced, such as while being materialized to an endpoint.
The final section is where you specify that this collection is derived from another collection.
derive:
using:
sqlite: {}
transforms:
- name: edits_by_users
source: demo/wikipedia/recentchange-sampled
shuffle: any
lambda: |
select
$user as user_id,
substr($meta$dt,1,10) as date,
1 as total_edits,
coalesce($length$new - $length$old, 0) as total_new_lines
where $type = 'edit' and $user is not null and $bot = 0;
Here you define the SQL statement that gets executed on the documents of the source collection.
The source: demo/wikipedia/recentchange-sampled
property lets Flow know that the source collection is the demo collection from mentioned at in the beginning of the tutorial while shuffle
tells Flow how to colocate documents while processing, which in this case is set to any
, meaning source documents can be processed by any available compute.
The SQL is straightforward
select
$user as user_id,
substr($meta$dt,1,10) as date,
1 as total_edits,
coalesce($length$new - $length$old, 0) as total_new_lines
where $type = 'edit' and $user is not null and $bot = 0
We select the user_id
, parse the event date
and calculate the amount of line changes. We also select 1
for the value of total_edits
, this is important because during the reduction phase, due to having selected sum
as the strategy, these values will get added together to form the total number of edits in the result. We also filter out non-edit events, bot users or events without a user_id to have a somewhat clean dataset.
Verify
You can use flowctl
to quickly verify your derivation before publishing it. Use the preview
command to get an idea of the resulting collections.
flowctl preview --source flow.yaml --name Dani/derivation-tutorial/edits-by-users
{"date":"2024-04-08","total_edits":3,"total_new_lines":110,"user_id":"Renamerr"}
{"date":"2024-04-08","total_edits":1,"total_new_lines":769,"user_id":"Sebring12Hrs"}
{"date":"2024-04-08","total_edits":5,"total_new_lines":3360,"user_id":"Sic19"}
{"date":"2024-04-08","total_edits":1,"total_new_lines":82,"user_id":"Simeon"}
^C
As you can see, the output format matches the defined schema. The last step would be to publish your derivation to Flow, which you can also do using flowctl
.
Publishing the derivation will initialize the transformation on the live, real-time Wikipedia stream, make sure to delete it after completing the tutorial.
flowctl catalog publish --source flow.yaml
After successfully publishing your derivation, head over to the Collections page on the Web UI and you will be able to see your derivation in action!
Wrapping up
In this guide you learned how to write your first stateless SQL derivation to filter data in a collection.