Example: Customer Data Integration
1. Define Dataset
JSON
{
"type": "dataset",
"id": "customer_dataset",
"columns": [
{"key": "id", "type": "TOKEN", "token": "TEXT", "mandatory": true, "lookup": true},
{"key": "email", "type": "TOKEN", "token": "EMAIL", "mandatory": true, "lookup": true},
{"key": "full_name", "type": "TOKEN", "token": "TEXT", "mandatory": true},
{"key": "phone", "type": "TOKEN", "token": "PHONE"},
{"key": "status", "type": "TOKEN", "token": "TEXT"}
]
}
2. Create Source
JSON
{
"type": "source-jdbc",
"id": "crm_source",
"dataset": "raw_customer_dataset",
"url": "jdbc:postgresql://crm:5432/sales",
"credentials": "crm_db_creds",
"table": "customers",
"timestampFilteringFlag": true,
"timestampFilteringColumn": "updated_at"
}
3. Create Transformation
JSON
{
"type": "transformation",
"id": "normalize_crm",
"source": "raw_customer_dataset",
"target": "customer_dataset",
"mappings": [
{"mappingType": "COLUMN", "source": ["cust_id"], "target": ["id"]},
{"mappingType": "CONCATENATE", "source": ["fname", "lname"],
"target": ["full_name"], "delimiter": " "},
{"mappingType": "COLUMN", "source": ["email"], "target": ["email"]},
{"mappingType": "CONSTANT", "constantValue": "ACTIVE", "target": ["status"]}
]
}
4. Create Pipeline
JSON
{
"type": "pipeline",
"id": "customer_cleaning",
"dataset": "customer_dataset",
"processors": [
{
"processorType": "CLEANER",
"properties": [
{"key": "trim", "value": "true"},
{"key": "lowercase", "value": "email"}
]
}
]
}
5. Create Sink
JSON
{
"type": "sink-kafka",
"id": "customer_events",
"dataset": "customer_dataset",
"topicName": "customer-changes",
"bootstrapServers": "kafka:9092"
}
6. Execute ETL
BASH
POST /tables/load
{
"source": "crm_source",
"transformation": "normalize_crm",
"pipeline": "customer_cleaning",
"sinkTable": "customer_table",
"operation": "UPSERT"
}