Skip to main content
Skip table of contents

Example: Customer Data Integration

1. Define Dataset

JSON
{
  "type": "dataset",
  "id": "customer_dataset",
  "columns": [
    {"key": "id", "type": "TOKEN", "token": "TEXT", "mandatory": true, "lookup": true},
    {"key": "email", "type": "TOKEN", "token": "EMAIL", "mandatory": true, "lookup": true},
    {"key": "full_name", "type": "TOKEN", "token": "TEXT", "mandatory": true},
    {"key": "phone", "type": "TOKEN", "token": "PHONE"},
    {"key": "status", "type": "TOKEN", "token": "TEXT"}
  ]
}

2. Create Source

JSON
{
  "type": "source-jdbc",
  "id": "crm_source",
  "dataset": "raw_customer_dataset",
  "url": "jdbc:postgresql://crm:5432/sales",
  "credentials": "crm_db_creds",
  "table": "customers",
  "timestampFilteringFlag": true,
  "timestampFilteringColumn": "updated_at"
}

3. Create Transformation

JSON
{
  "type": "transformation",
  "id": "normalize_crm",
  "source": "raw_customer_dataset",
  "target": "customer_dataset",
  "mappings": [
    {"mappingType": "COLUMN", "source": ["cust_id"], "target": ["id"]},
    {"mappingType": "CONCATENATE", "source": ["fname", "lname"],
     "target": ["full_name"], "delimiter": " "},
    {"mappingType": "COLUMN", "source": ["email"], "target": ["email"]},
    {"mappingType": "CONSTANT", "constantValue": "ACTIVE", "target": ["status"]}
  ]
}

4. Create Pipeline

JSON
{
  "type": "pipeline",
  "id": "customer_cleaning",
  "dataset": "customer_dataset",
  "processors": [
    {
      "processorType": "CLEANER",
      "properties": [
        {"key": "trim", "value": "true"},
        {"key": "lowercase", "value": "email"}
      ]
    }
  ]
}

5. Create Sink

JSON
{
  "type": "sink-kafka",
  "id": "customer_events",
  "dataset": "customer_dataset",
  "topicName": "customer-changes",
  "bootstrapServers": "kafka:9092"
}

6. Execute ETL

BASH
POST /tables/load
{
  "source": "crm_source",
  "transformation": "normalize_crm",
  "pipeline": "customer_cleaning",
  "sinkTable": "customer_table",
  "operation": "UPSERT"
}
JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.