Example: Customer Data Integration
This example walks you through setting up a complete data integration pipeline to load customer data from a CRM system into Golden for deduplication.
Overview
Item | Value |
|---|---|
Scenario | Load customer data from a PostgreSQL CRM database |
Source | CRM system with customers table |
Goal | Clean, normalize, and prepare data for deduplication |
Skill Level | Intermediate |
Time Required | 30-45 minutes |
What You'll Create
CRM Database → Source → Transformation → Pipeline → Table/Sink
Component | Purpose |
|---|---|
Dataset | Defines the structure of customer data |
Source | Connects to the CRM database |
Transformation | Maps and normalizes fields |
Pipeline | Cleans and standardizes data |
Sink | Publishes changes to downstream systems |
Prerequisites
Before starting, ensure you have:
Access to Golden with STEWARD or ADMIN role
Database credentials for the CRM system
Network connectivity between Golden and the CRM database
Step 1: Define the Dataset
The dataset defines the structure of your customer data. This schema will be used throughout the pipeline.
What Each Field Means
Field | Type | Purpose |
|---|---|---|
| TOKEN/TEXT | Unique customer identifier |
| TOKEN/EMAIL | Primary contact, used for matching |
| TOKEN/TEXT | Customer's display name |
| TOKEN/PHONE | Secondary contact |
| TOKEN/TEXT | Customer status (active, inactive) |
Configuration
{
"type": "dataset",
"id": "customer_dataset",
"columns": [
{
"key": "id",
"type": "TOKEN",
"token": "TEXT",
"mandatory": true,
"lookup": true
},
{
"key": "email",
"type": "TOKEN",
"token": "EMAIL",
"mandatory": true,
"lookup": true
},
{
"key": "full_name",
"type": "TOKEN",
"token": "TEXT",
"mandatory": true
},
{
"key": "phone",
"type": "TOKEN",
"token": "PHONE"
},
{
"key": "status",
"type": "TOKEN",
"token": "TEXT"
}
]
}
Key Options Explained
Option | Description |
|---|---|
| Field must have a value |
| Field can be used for searching records |
| Applies email-specific normalization |
| Applies phone number normalization |
Step 2: Create the Source
The source defines how to connect to your CRM database and which data to extract.
Configuration
{
"type": "source-jdbc",
"id": "crm_source",
"dataset": "raw_customer_dataset",
"url": "jdbc:postgresql://crm-server:5432/sales",
"credentials": "crm_db_creds",
"table": "customers",
"timestampFilteringFlag": true,
"timestampFilteringColumn": "updated_at"
}
Configuration Options
Option | Description | Example |
|---|---|---|
| JDBC connection string |
|
| Reference to stored credentials |
|
| Source table name |
|
| Enable incremental loads |
|
| Column to track changes |
|
Why Incremental Loading?
Setting timestampFilteringFlag: true enables incremental loading:
First run: Loads all records
Subsequent runs: Only loads records modified since last run
Benefit: Faster processing, reduced load on source system
Step 3: Create the Transformation
Transformations map fields from the source schema to your target dataset and apply data manipulations.
Configuration
{
"type": "transformation",
"id": "normalize_crm",
"source": "raw_customer_dataset",
"target": "customer_dataset",
"mappings": [
{
"mappingType": "COLUMN",
"source": ["cust_id"],
"target": ["id"]
},
{
"mappingType": "CONCATENATE",
"source": ["fname", "lname"],
"target": ["full_name"],
"delimiter": " "
},
{
"mappingType": "COLUMN",
"source": ["email"],
"target": ["email"]
},
{
"mappingType": "COLUMN",
"source": ["phone_number"],
"target": ["phone"]
},
{
"mappingType": "CONSTANT",
"constantValue": "ACTIVE",
"target": ["status"]
}
]
}
Mapping Types Explained
Type | Purpose | Example |
|---|---|---|
| Direct field mapping |
|
| Combine multiple fields |
|
| Set fixed value | Always set |
| Apply formula or logic | Calculate derived values |
Common Transformation Patterns
Pattern | Configuration |
|---|---|
Rename field |
|
Combine names |
|
Default value |
|
Conditional |
|
Step 4: Create the Pipeline
Pipelines apply data quality operations like cleaning, standardization, and validation.
Configuration
{
"type": "pipeline",
"id": "customer_cleaning",
"dataset": "customer_dataset",
"processors": [
{
"processorType": "CLEANER",
"properties": [
{"key": "trim", "value": "true"},
{"key": "lowercase", "value": "email"}
]
},
{
"processorType": "VALIDATOR",
"properties": [
{"key": "email", "value": "email"},
{"key": "required", "value": "id,email,full_name"}
]
}
]
}
Available Processors
Processor | Purpose | Common Properties |
|---|---|---|
| Remove whitespace, normalize case |
|
| Check data quality rules |
|
| Add derived data |
|
| Remove unwanted records |
|
Cleaning Operations
Operation | Effect |
|---|---|
| Remove leading/trailing whitespace from all fields |
| Convert email field to lowercase |
| Convert country field to uppercase |
Step 5: Create the Sink (Optional)
Sinks publish processed data to external systems like Kafka, webhooks, or files.
Kafka Sink Example
{
"type": "sink-kafka",
"id": "customer_events",
"dataset": "customer_dataset",
"topicName": "customer-changes",
"bootstrapServers": "kafka:9092"
}
Available Sink Types
Type | Use Case |
|---|---|
| Stream events to Kafka topics |
| POST to HTTP endpoints |
| Write to database tables |
| Export to CSV/JSON files |
Step 6: Execute the ETL Load
Run the complete pipeline to load and process customer data.
Using the API
curl -X 'POST' "${GOLDEN_URL}/api/table/load" \
-H "Authorization: Bearer ${GOLDEN_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"source": "crm_source",
"transformation": "normalize_crm",
"pipeline": "customer_cleaning",
"sinkTable": "customer_table",
"operation": "UPSERT"
}'
Load Operations
Operation | Behavior |
|---|---|
| Insert new records, update existing (recommended) |
| Insert only, fail on duplicates |
| Delete all existing, then insert |
Expected Response
{
"taskId": "task-12345",
"status": "QUEUED",
"message": "Load task queued for execution"
}
Step 7: Monitor the Task
Track the progress of your data load.
Check Task Status
curl -X 'GET' "${GOLDEN_URL}/api/task/task-12345" \
-H "Authorization: Bearer ${GOLDEN_TOKEN}"
Task States
State | Meaning |
|---|---|
| Waiting to start |
| Currently processing |
| Finished successfully |
| Error occurred |
| Manually stopped |
Successful Completion
{
"taskId": "task-12345",
"status": "COMPLETED",
"statistics": {
"recordsRead": 10000,
"recordsWritten": 9850,
"recordsSkipped": 150,
"duration": "00:02:34"
}
}
Next Steps
After loading data, you can:
Set up deduplication: Create an entity to find duplicates
Schedule recurring loads: Configure automatic synchronization
Add more sources: Integrate additional systems
Configure sinks: Publish golden records downstream