Building Real-time CDC Pipelines at Scale: Lessons from Production
Practical insights from operating CDC systems processing billions of events. Learn about architecture patterns, performance optimization, and operational best practices.
Introduction
Change Data Capture at scale is fundamentally different from small deployments. When your CDC pipeline processes billions of events daily, decisions that seem minor in development become critical in production. This article shares hard-won lessons from building and operating large-scale CDC systems.
Architecture Patterns That Scale
The Log-Based CDC Architecture
For high-volume systems, log-based CDC consistently outperforms trigger-based or polling approaches:
Source Database (PostgreSQL/MySQL)
│
▼
Transaction Log (WAL/Binlog)
│
▼
CDC Connector (Savegress/Debezium)
│
▼
Message Queue (Kafka/Kinesis)
│
▼
Consumers (Data Warehouse, Search, Cache)
Why Log-Based Wins at Scale
- Minimal source impact: Reading logs adds negligible load to the database
- Complete change history: Captures every change, including intermediate states
- Transactional ordering: Changes are captured in commit order
- No schema modifications: No triggers or additional columns needed
Handling High-Volume Scenarios
Partitioning Strategies
Proper partitioning is essential for parallel processing:
# Partition by primary key for even distribution
partition_key = hash(record.primary_key) % num_partitions
# Partition by table for schema isolation
partition_key = hash(record.table_name) % num_partitions
# Partition by tenant for multi-tenant systems
partition_key = hash(record.tenant_id) % num_partitions
Backpressure Management
When consumers cannot keep up with producers, you need backpressure mechanisms:
class BackpressureHandler:
def __init__(self, max_lag_seconds=300):
self.max_lag = max_lag_seconds
def check_and_throttle(self, current_lag):
if current_lag > self.max_lag:
# Slow down source reading
sleep_time = min(current_lag / 10, 60)
time.sleep(sleep_time)
return True
return False
Schema Evolution Challenges
Schema changes are inevitable. Your CDC pipeline must handle them gracefully.
Backward Compatible Changes
These changes are safe and require no special handling:
- Adding nullable columns
- Adding columns with defaults
- Removing unused columns (if consumers ignore unknown fields)
Breaking Changes
These require coordination:
- Renaming columns
- Changing column types
- Removing columns still in use
Schema Registry Pattern
# Register schema version with changes
schema_registry.register(
subject="orders-value",
schema={
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"}
]
}
)
Exactly-Once Delivery
Achieving exactly-once semantics in distributed systems requires careful design.
Idempotent Consumers
class IdempotentConsumer:
def __init__(self, processed_store):
self.processed = processed_store
def process(self, event):
event_id = f"{event.source}:{event.lsn}"
if self.processed.contains(event_id):
return # Already processed, skip
try:
self.handle_event(event)
self.processed.add(event_id)
except Exception as e:
# Will be retried
raise
Transactional Outbox Pattern
For producing events reliably:
BEGIN TRANSACTION;
-- Business logic
UPDATE accounts SET balance = balance - 100 WHERE id = 123;
UPDATE accounts SET balance = balance + 100 WHERE id = 456;
-- Write to outbox (CDC captures this)
INSERT INTO outbox (aggregate_id, event_type, payload)
VALUES (123, 'transfer', '{"from": 123, "to": 456, "amount": 100}');
COMMIT;
Monitoring and Alerting
Critical Metrics
# Replication lag - most important metric
replication_lag_seconds = gauge(
"cdc_replication_lag_seconds",
"Time between change in source and delivery to destination"
)
# Throughput metrics
events_processed = counter(
"cdc_events_processed_total",
"Total events processed",
labels=["table", "operation"]
)
# Error tracking
processing_errors = counter(
"cdc_processing_errors_total",
"Processing errors by type",
labels=["error_type"]
)
Alert Thresholds
- Replication lag > 5 minutes: Warning - investigate cause
- Replication lag > 30 minutes: Critical - immediate action required
- Error rate > 1%: Warning - check error patterns
- Slot inactive > 1 hour: Critical - risk of WAL accumulation
Disaster Recovery
Checkpoint Strategy
class CheckpointManager:
def __init__(self, checkpoint_interval=10000):
self.interval = checkpoint_interval
self.events_since_checkpoint = 0
def maybe_checkpoint(self, current_position):
self.events_since_checkpoint += 1
if self.events_since_checkpoint >= self.interval:
self.save_checkpoint(current_position)
self.events_since_checkpoint = 0
def save_checkpoint(self, position):
# Persist to durable storage
self.store.put("checkpoint", {
"position": position,
"timestamp": time.time()
})
Recovery Procedures
- Consumer failure: Resume from last checkpoint
- Connector failure: Resume from replication slot position
- Complete disaster: Initial snapshot + streaming from snapshot point
Performance Optimization
Batching
class BatchProcessor:
def __init__(self, batch_size=1000, max_wait_ms=100):
self.batch_size = batch_size
self.max_wait = max_wait_ms
self.batch = []
self.last_flush = time.time()
def add(self, event):
self.batch.append(event)
if len(self.batch) >= self.batch_size:
return self.flush()
if (time.time() - self.last_flush) * 1000 > self.max_wait:
return self.flush()
return None
def flush(self):
if not self.batch:
return []
result = self.batch
self.batch = []
self.last_flush = time.time()
return result
Conclusion
Building CDC pipelines at scale requires attention to architecture, operational practices, and failure handling that goes far beyond basic implementations. The patterns and practices shared here come from real production experience processing billions of events.
Key takeaways:
- Use log-based CDC for minimal source impact and complete change capture
- Design for schema evolution from the start
- Implement idempotent consumers for exactly-once semantics
- Monitor replication lag as your primary health indicator
- Plan disaster recovery before you need it