Skip to main content
Skip table of contents

Example: Customer Data Integration

This example walks you through setting up a complete data integration pipeline to load customer data from a CRM system into Golden for deduplication.

Overview

Item

Value

Scenario

Load customer data from a PostgreSQL CRM database

Source

CRM system with customers table

Goal

Clean, normalize, and prepare data for deduplication

Skill Level

Intermediate

Time Required

30-45 minutes


What You'll Create

CODE
CRM Database → Source → Transformation → Pipeline → Table/Sink

Component

Purpose

Dataset

Defines the structure of customer data

Source

Connects to the CRM database

Transformation

Maps and normalizes fields

Pipeline

Cleans and standardizes data

Sink

Publishes changes to downstream systems


Prerequisites

Before starting, ensure you have:

  • Access to Golden with STEWARD or ADMIN role

  • Database credentials for the CRM system

  • Network connectivity between Golden and the CRM database


Step 1: Define the Dataset

The dataset defines the structure of your customer data. This schema will be used throughout the pipeline.

What Each Field Means

Field

Type

Purpose

id

TOKEN/TEXT

Unique customer identifier

email

TOKEN/EMAIL

Primary contact, used for matching

full_name

TOKEN/TEXT

Customer's display name

phone

TOKEN/PHONE

Secondary contact

status

TOKEN/TEXT

Customer status (active, inactive)

Configuration

JSON
{
  "type": "dataset",
  "id": "customer_dataset",
  "columns": [
    {
      "key": "id",
      "type": "TOKEN",
      "token": "TEXT",
      "mandatory": true,
      "lookup": true
    },
    {
      "key": "email",
      "type": "TOKEN",
      "token": "EMAIL",
      "mandatory": true,
      "lookup": true
    },
    {
      "key": "full_name",
      "type": "TOKEN",
      "token": "TEXT",
      "mandatory": true
    },
    {
      "key": "phone",
      "type": "TOKEN",
      "token": "PHONE"
    },
    {
      "key": "status",
      "type": "TOKEN",
      "token": "TEXT"
    }
  ]
}

Key Options Explained

Option

Description

mandatory: true

Field must have a value

lookup: true

Field can be used for searching records

token: EMAIL

Applies email-specific normalization

token: PHONE

Applies phone number normalization


Step 2: Create the Source

The source defines how to connect to your CRM database and which data to extract.

Configuration

JSON
{
  "type": "source-jdbc",
  "id": "crm_source",
  "dataset": "raw_customer_dataset",
  "url": "jdbc:postgresql://crm-server:5432/sales",
  "credentials": "crm_db_creds",
  "table": "customers",
  "timestampFilteringFlag": true,
  "timestampFilteringColumn": "updated_at"
}

Configuration Options

Option

Description

Example

url

JDBC connection string

jdbc:postgresql://host:5432/db

credentials

Reference to stored credentials

crm_db_creds

table

Source table name

customers

timestampFilteringFlag

Enable incremental loads

true

timestampFilteringColumn

Column to track changes

updated_at

Why Incremental Loading?

Setting timestampFilteringFlag: true enables incremental loading:

  • First run: Loads all records

  • Subsequent runs: Only loads records modified since last run

  • Benefit: Faster processing, reduced load on source system


Step 3: Create the Transformation

Transformations map fields from the source schema to your target dataset and apply data manipulations.

Configuration

JSON
{
  "type": "transformation",
  "id": "normalize_crm",
  "source": "raw_customer_dataset",
  "target": "customer_dataset",
  "mappings": [
    {
      "mappingType": "COLUMN",
      "source": ["cust_id"],
      "target": ["id"]
    },
    {
      "mappingType": "CONCATENATE",
      "source": ["fname", "lname"],
      "target": ["full_name"],
      "delimiter": " "
    },
    {
      "mappingType": "COLUMN",
      "source": ["email"],
      "target": ["email"]
    },
    {
      "mappingType": "COLUMN",
      "source": ["phone_number"],
      "target": ["phone"]
    },
    {
      "mappingType": "CONSTANT",
      "constantValue": "ACTIVE",
      "target": ["status"]
    }
  ]
}

Mapping Types Explained

Type

Purpose

Example

COLUMN

Direct field mapping

cust_idid

CONCATENATE

Combine multiple fields

fname + lnamefull_name

CONSTANT

Set fixed value

Always set status = "ACTIVE"

EXPRESSION

Apply formula or logic

Calculate derived values

Common Transformation Patterns

Pattern

Configuration

Rename field

COLUMN with different source/target names

Combine names

CONCATENATE with space delimiter

Default value

CONSTANT for missing data

Conditional

EXPRESSION with logic


Step 4: Create the Pipeline

Pipelines apply data quality operations like cleaning, standardization, and validation.

Configuration

JSON
{
  "type": "pipeline",
  "id": "customer_cleaning",
  "dataset": "customer_dataset",
  "processors": [
    {
      "processorType": "CLEANER",
      "properties": [
        {"key": "trim", "value": "true"},
        {"key": "lowercase", "value": "email"}
      ]
    },
    {
      "processorType": "VALIDATOR",
      "properties": [
        {"key": "email", "value": "email"},
        {"key": "required", "value": "id,email,full_name"}
      ]
    }
  ]
}

Available Processors

Processor

Purpose

Common Properties

CLEANER

Remove whitespace, normalize case

trim, lowercase, uppercase

VALIDATOR

Check data quality rules

email, required, pattern

ENRICHER

Add derived data

lookup, calculate

FILTER

Remove unwanted records

condition, exclude

Cleaning Operations

Operation

Effect

trim: true

Remove leading/trailing whitespace from all fields

lowercase: email

Convert email field to lowercase

uppercase: country

Convert country field to uppercase


Step 5: Create the Sink (Optional)

Sinks publish processed data to external systems like Kafka, webhooks, or files.

Kafka Sink Example

JSON
{
  "type": "sink-kafka",
  "id": "customer_events",
  "dataset": "customer_dataset",
  "topicName": "customer-changes",
  "bootstrapServers": "kafka:9092"
}

Available Sink Types

Type

Use Case

sink-kafka

Stream events to Kafka topics

sink-webhook

POST to HTTP endpoints

sink-jdbc

Write to database tables

sink-file

Export to CSV/JSON files


Step 6: Execute the ETL Load

Run the complete pipeline to load and process customer data.

Using the API

BASH
curl -X 'POST' "${GOLDEN_URL}/api/table/load" \
    -H "Authorization: Bearer ${GOLDEN_TOKEN}" \
    -H "Content-Type: application/json" \
    -d '{
      "source": "crm_source",
      "transformation": "normalize_crm",
      "pipeline": "customer_cleaning",
      "sinkTable": "customer_table",
      "operation": "UPSERT"
    }'

Load Operations

Operation

Behavior

UPSERT

Insert new records, update existing (recommended)

INSERT

Insert only, fail on duplicates

REPLACE

Delete all existing, then insert

Expected Response

JSON
{
  "taskId": "task-12345",
  "status": "QUEUED",
  "message": "Load task queued for execution"
}

Step 7: Monitor the Task

Track the progress of your data load.

Check Task Status

BASH
curl -X 'GET' "${GOLDEN_URL}/api/task/task-12345" \
    -H "Authorization: Bearer ${GOLDEN_TOKEN}"

Task States

State

Meaning

QUEUED

Waiting to start

RUNNING

Currently processing

COMPLETED

Finished successfully

FAILED

Error occurred

CANCELLED

Manually stopped

Successful Completion

JSON
{
  "taskId": "task-12345",
  "status": "COMPLETED",
  "statistics": {
    "recordsRead": 10000,
    "recordsWritten": 9850,
    "recordsSkipped": 150,
    "duration": "00:02:34"
  }
}

Next Steps

After loading data, you can:

  1. Set up deduplication: Create an entity to find duplicates

  2. Schedule recurring loads: Configure automatic synchronization

  3. Add more sources: Integrate additional systems

  4. Configure sinks: Publish golden records downstream

Related Documentation

Topic

Link

Entity Configuration

Entities

Resource Types

Resources

Task Management

Tasks

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.