How to transform data using TypeScript
This guide will teach you how to write and publish a simple TypeScript derivation.
Introduction
This tutorial will show you how to implement a stateless transformation using TypeScript. You’ll learn how to implement a flow that filters events coming from the live, real-time Wikipedia API.
Setting up your development environment
In order to implement transformations through derivations, you’ll need to set up your development environment. You’ll need a text editor and flowctl, the CLI-tool for Flow installed on your machine. Check out the docs page on installation instructions.
Before continuing, sign in to the Estuary Flow dashboard, make sure you enable access to the Wikipedia demo. Using flowctl
, quickly verify you are able to view the demo collections used in this guide.
Execute the below command to display the documents in the demo/wikipedia/recentchange-sampled
collection:
This collection is a 3% sample of the enormous demo/wikipedia/recentchange
collection which contains millions of documents. Since the purpose of this tutorial is to demonstrate a proof of concept, we avoid publishing a derivation that processes hundreds of gigabytes of data.
flowctl collections read --collection demo/wikipedia/recentchange-sampled --uncommitted
If you see a stream of JSON documents on your terminal, you’re all good - feel free to cancel the process by pressing C^C
.
Examine a sample JSON that lives in the demo collection, as this is the data you’ll be using as the input for our derivation.
{
"$schema": "/mediawiki/recentchange/1.0.0",
"_meta": {
"file": "recentchange",
"offset": 12837,
"uuid": "f8f07d87-f5bf-11ee-8401-4fdf95f7b91a"
},
"bot": false,
"comment": "[[:File:Jeton. Ordinaire des guerres - btv1b10405460g (1 of 2).jpg]] added to category",
"id": 2468434138,
"meta": {
"domain": "commons.wikimedia.org",
"dt": "2024-04-08T15:52:13Z",
"id": "d9e8698f-4eac-4262-a451-b7ca247e401c",
"offset": 5008568732,
"partition": 0,
"request_id": "b5372124-63fa-45e1-b35e-86784f1692bc",
"stream": "mediawiki.recentchange",
"topic": "eqiad.mediawiki.recentchange",
"uri": "https://commons.wikimedia.org/wiki/Category:Jetons"
},
"namespace": 14,
"notify_url": "https://commons.wikimedia.org/w/index.php?diff=866807860&oldid=861559382&rcid=2468434138",
"parsedcomment": "<a href=\"/wiki/File:Jeton._Ordinaire_des_guerres_-_btv1b10405460g_(1_of_2).jpg\" title=\"File:Jeton. Ordinaire des guerres - btv1b10405460g (1 of 2).jpg\">File:Jeton. Ordinaire des guerres - btv1b10405460g (1 of 2).jpg</a> added to category",
"server_name": "commons.wikimedia.org",
"server_script_path": "/w",
"server_url": "https://commons.wikimedia.org",
"timestamp": 1712591533,
"title": "Category:Jetons",
"title_url": "https://commons.wikimedia.org/wiki/Category:Jetons",
"type": "categorize",
"user": "DenghiùComm",
"wiki": "commonswiki"
}
There’s a bunch of fields available, but as mentioned earlier, the scope of the transformation for this tutorial is limited to only one field, which lives nested inside the meta
object.
{
...
"meta": {
...
"domain": "commons.wikimedia.org",
...
},
...
}
This field is composed of the various wikipedia domains that are used to serve different sites of the organization. This is what you’ll use as the base of the filter derivation. Let's say that the goal is to only keep events that originate from the English-language wikipedia page, which is running under the domain en.wikipedia.org
.
Writing the derivation
Set up your folder structure so you can organize the resources required for the derivation. Create a working directory to follow along, and inside, create a flow.yaml
file.
Inside your flow.yaml
file, add the following contents:
---
collections:
Dani/derivation-tutorial/recentchange-filtered-typescript:
schema: recentchange-filtered.schema.yaml
key:
- /_meta/file
- /_meta/offset
derive:
using:
typescript:
module: recentchange-filtered.ts
transforms:
- name: filter_values_typescript
source: demo/wikipedia/recentchange-sampled
shuffle: any
The Flow consists of just one collection, which is what you define here, called Dani/derivation-tutorial/recentchange-filtered-typescript
.
Let’s go over this in a bit more detail.
First of all, the collection needs a schema. The schema of the incoming data (also called the “write” schema) is already defined by the demo, you only have to define the schema of the documents the transformation will output, which is the “read” schema.
Let’s define what the final documents will look like.
---
$schema: "http://json-schema.org/draft-07/schema#"
properties:
_meta:
properties:
file:
type: string
offset:
type: integer
uuid:
type: string
required:
- file
- offset
type: object
domain:
type: string
title:
type: string
user:
type: string
type: object
Save this schema as recentchange-filtered.schema.yaml
next to your flow.yaml
file.
As you can see, this schema definition includes a lot less fields than what is available in the incoming documents, this is expected, but if you wish to include more, this is where you would add them first.
In the collection yaml definition, the next section defines the key of the documents.
key:
- /_meta/file
- /_meta/offset
Every Flow collection must declare a key which is used to group its documents. Keys are specified as an array of JSON pointers to document locations. The important detail here is to know that a collection key instructs Flow how documents of a collection are to be reduced, such as while being materialized to an endpoint. For this tutorial, you are just going to reuse the key definition of the base collection.
The final section is where you specify that this collection is derived from another collection.
derive:
using:
typescript:
module: recentchange-filtered.ts
transforms:
- name: filter_values_typescript
source: demo/wikipedia/recentchange-sampled
shuffle: any
Here you configure the name of the Typescript file that will contain the code for the actual transformation (don’t worry about the file not existing yet!) and give a name to the transformation.
The source: demo/wikipedia/recentchange-sampled
property lets Flow know that the source collection is the demo collection from mentioned at in the beginning of the tutorial while shuffle
tells Flow how to colocate documents while processing, which in this case is set to any
, meaning source documents can be processed by any available compute.
Alright, the configuration required for the derivation is in place, all that’s left is to write some TypeScript!
The transformation code
The next step is to use flowctl
to generate TypeScript stubs you can use as aid when writing the transformation code.
Execute the following command:
flowctl generate --source flow.yaml
If everything went well, you’ll see a bunch of new files that flowctl
generated for you in your working directory.
➜ tree
.
├── deno.json
├── flow.yaml
├── flow_generated
│ └── typescript
│ └── Dani
│ └── derivation-tutorial
│ └── recentchange-filtered-typescript.ts
├── recentchange-filtered.schema.yaml
└── recentchange-filtered.ts
5 directories, 5 files
The folder flow_generated
along with the deno.json
file are two things you won’t have to modify during this tutorial. If you take a look at file that flowctl
generated under flow_generated/typescript/<your_working_directory>/<your_prefix>/recentchange-filtered-typescript.ts
you can see the types you are able to use in your transformations.
// Generated for published documents of derived collection Dani/derivation-tutorial/recentchange-filtered-typescript.
export type Document = {
"_meta"?: {
file: string;
offset: number;
uuid?: string;
};
domain?: string;
title?: string;