Change Data Capture ermöglicht es, Datenänderungen in bestehenden Systemen automatisch zu erkennen und als Events zu publizieren, ohne den Anwendungscode zu modifizieren. CDC fungiert als Brücke zwischen datenzentrischen Legacy-Systemen und Event-getriebenen Architekturen.
Das effizienteste CDC-Pattern nutzt die Transaction Logs der Datenbank. Jede Datenbank schreibt alle Änderungen in ein Write-Ahead Log (WAL), das normalerweise nur für Replikation und Recovery verwendet wird.
Funktionsweise am Beispiel PostgreSQL:
-- PostgreSQL Logical Replication Slot erstellen
SELECT pg_create_logical_replication_slot('order_changes', 'pgoutput');
-- WAL-Level für logical replication konfigurieren
-- postgresql.conf: wal_level = logical
-- postgresql.conf: max_replication_slots = 10CDC-Tools lesen diese Logs kontinuierlich und transformieren die Low-Level-Änderungen in Business-Events:
| Database Change | CDC Event | Business Meaning |
|---|---|---|
INSERT INTO orders(id, customer_id, total) VALUES (123, 'C1', 99.99) |
OrderCreated | Neue Bestellung eingegangen |
UPDATE orders SET status='PAID' WHERE id=123 |
OrderPaymentConfirmed | Zahlung erfolgreich |
UPDATE orders SET status='SHIPPED' WHERE id=123 |
OrderShipped | Versand erfolgt |
DELETE FROM orders WHERE id=123 |
OrderCancelled | Stornierung |
Weniger performant, aber universell einsetzbar sind Datenbank-Trigger:
-- Trigger für Order-Änderungen
CREATE OR REPLACE FUNCTION order_change_trigger()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO cdc_events (table_name, operation, new_data, timestamp)
VALUES ('orders', 'INSERT', row_to_json(NEW), CURRENT_TIMESTAMP);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO cdc_events (table_name, operation, old_data, new_data, timestamp)
VALUES ('orders', 'UPDATE', row_to_json(OLD), row_to_json(NEW), CURRENT_TIMESTAMP);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO cdc_events (table_name, operation, old_data, timestamp)
VALUES ('orders', 'DELETE', row_to_json(OLD), CURRENT_TIMESTAMP);
RETURN OLD;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER order_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION order_change_trigger();Der einfachste Ansatz integriert CDC direkt in die Anwendungslogik:
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final CDCEventPublisher cdcPublisher;
public Order updateOrderStatus(String orderId, OrderStatus newStatus) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
OrderStatus oldStatus = order.getStatus();
order.setStatus(newStatus);
Order savedOrder = orderRepository.save(order);
// CDC Event publizieren
cdcPublisher.publishStatusChange(orderId, oldStatus, newStatus);
return savedOrder;
}
}| Ansatz | Performance | Implementierungsaufwand | Invasiveness | Use Case |
|---|---|---|---|---|
| Log-based | Sehr hoch | Mittel | Nicht-invasiv | Production Systems |
| Trigger-based | Niedrig | Niedrig | Mittel-invasiv | Legacy Integration |
| Application-level | Hoch | Hoch | Invasiv | Neue Entwicklung |
CDC befreit Daten aus Legacy-Systemen und macht sie für Event-getriebene Verarbeitung verfügbar. Der Prozess transformiert datenzentrische Denkmuster in Event-orientierte Workflows.
Traditionell (Pull-basiert):
[Order DB] ← poll ← [Inventory Service] ← poll ← [Shipping Service]
Mit CDC (Push-basiert):
[Order DB] → CDC → [Events] → [Inventory Service] → [Shipping Service]
Ein E-Commerce-System nutzt CDC, um verschiedene Services über Bestellungsänderungen zu informieren:
# CDC Event Processor
class OrderCDCProcessor:
def __init__(self, event_publisher):
self.event_publisher = event_publisher
self.status_mapping = {
'NEW': 'OrderPlaced',
'PAID': 'PaymentConfirmed',
'SHIPPED': 'OrderShipped',
'DELIVERED': 'OrderDelivered',
'CANCELLED': 'OrderCancelled'
}
def process_cdc_event(self, cdc_event):
if cdc_event['table'] == 'orders':
self._handle_order_change(cdc_event)
elif cdc_event['table'] == 'payments':
self._handle_payment_change(cdc_event)
def _handle_order_change(self, cdc_event):
operation = cdc_event['operation']
if operation == 'INSERT':
self._publish_order_placed(cdc_event['after'])
elif operation == 'UPDATE':
self._publish_order_status_change(
cdc_event['before'],
cdc_event['after']
)
elif operation == 'DELETE':
self._publish_order_cancelled(cdc_event['before'])
def _publish_order_status_change(self, before, after):
if before['status'] != after['status']:
event_type = self.status_mapping.get(after['status'])
if event_type:
event = {
'eventType': event_type,
'orderId': after['id'],
'previousStatus': before['status'],
'newStatus': after['status'],
'timestamp': after['updated_at']
}
self.event_publisher.publish(
topic=f"order.{event_type.lower()}.v1",
key=after['id'],
value=event
)CDC-Events müssen Schema-Änderungen der Quelldatenbank abfedern:
@Component
public class OrderCDCSchemaAdapter {
public OrderEvent adaptCDCEvent(CDCEvent rawEvent) {
JsonNode data = rawEvent.getAfter();
// Legacy-Schema Behandlung
if (data.has("customer_info")) {
// Alter monolithischer customer_info String
return adaptLegacyCustomerFormat(data);
}
// Aktuelles Schema
return OrderEvent.builder()
.orderId(data.get("id").asText())
.customerId(data.get("customer_id").asText())
.status(data.get("status").asText())
.totalAmount(data.get("total_amount").asDouble())
.items(parseItems(data.get("items")))
.build();
}
private OrderEvent adaptLegacyCustomerFormat(JsonNode data) {
String customerInfo = data.get("customer_info").asText();
String[] parts = customerInfo.split(",");
// Transformation: "123,John,Doe" -> strukturierte Daten
return OrderEvent.builder()
.orderId(data.get("id").asText())
.customerId(parts[0])
.customerName(parts[1] + " " + parts[2])
.status(data.get("status").asText())
// ... weitere Felder
.build();
}
}Debezium ist die de-facto Standard-Lösung für Log-based CDC mit Kafka-Integration:
# Debezium PostgreSQL Connector Configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: order-database-source
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: postgres-db
database.port: 5432
database.user: debezium
database.password: secret
database.dbname: ecommerce
database.server.name: order-db
table.include.list: public.orders,public.payments,public.inventory
# Event-Routing
transforms: route
transforms.route.type: io.debezium.transforms.ByLogicalTableRouter
transforms.route.topic.regex: order-db.public.(.*)
transforms.route.topic.replacement: $1.changes.v1Für spezielle Anforderungen kann eine maßgeschneiderte CDC-Pipeline entwickelt werden:
@Component
public class CustomCDCPipeline {
private final PostgreSQLCDCConnector cdcConnector;
private final EventTransformer transformer;
private final KafkaEventPublisher publisher;
@PostConstruct
public void startCDCStream() {
cdcConnector.onChanges(this::processDatabaseChange);
}
private void processDatabaseChange(DatabaseChange change) {
try {
// 1. Filtern: Nur relevante Tabellen
if (!isRelevantTable(change.getTable())) {
return;
}
// 2. Transformieren: DB-Change → Business Event
List<BusinessEvent> events = transformer.transform(change);
// 3. Publizieren: Events → Kafka Topics
for (BusinessEvent event : events) {
publisher.publish(event);
}
} catch (Exception e) {
log.error("Failed to process CDC change: {}", change, e);
// Dead Letter Queue oder Retry-Logik
}
}
private boolean isRelevantTable(String tableName) {
return Set.of("orders", "payments", "inventory", "customers")
.contains(tableName);
}
}In microservice-orientierten Umgebungen müssen oft mehrere Datenbanken überwacht werden:
# Multi-Source CDC Coordinator
class MultiSourceCDCCoordinator:
def __init__(self):
self.connectors = {
'orders': PostgreSQLCDCConnector('orders_db'),
'payments': MySQLCDCConnector('payments_db'),
'inventory': PostgreSQLCDCConnector('inventory_db')
}
self.event_router = EventRouter()
async def start_all_streams(self):
tasks = []
for source, connector in self.connectors.items():
task = asyncio.create_task(
self._monitor_source(source, connector)
)
tasks.append(task)
await asyncio.gather(*tasks)
async def _monitor_source(self, source_name, connector):
async for change in connector.stream_changes():
await self._route_change(source_name, change)
async def _route_change(self, source, change):
# Source-spezifische Event-Transformation
events = await self.event_router.transform_change(source, change)
for event in events:
await self._publish_unified_event(event)CDC-Pipelines benötigen robuste Fehlerbehandlung:
@Component
public class CDCErrorHandler {
private final CircuitBreaker circuitBreaker;
private final RetryTemplate retryTemplate;
private final DeadLetterQueue deadLetterQueue;
public void handleCDCEvent(CDCEvent event) {
try {
circuitBreaker.executeSupplier(() -> {
return retryTemplate.execute(context -> {
processEvent(event);
return null;
});
});
} catch (CircuitBreakerOpenException e) {
log.warn("Circuit breaker open for CDC processing");
deadLetterQueue.send(event, "circuit-breaker-open");
} catch (Exception e) {
log.error("Failed to process CDC event after retries: {}", event, e);
deadLetterQueue.send(event, e.getMessage());
}
}
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "cdc.dead-letter-queue")
public void handleDeadLetterEvent(CDCEvent event) {
// Manuelle Verarbeitung oder Eskalation
log.info("Processing dead letter CDC event: {}", event);
}
}CDC als Sidecar-Container neben der Hauptanwendung:
# Docker Compose CDC Sidecar
version: '3.8'
services:
order-service:
image: order-service:latest
environment:
- DATABASE_URL=jdbc:postgresql://postgres:5432/orders
order-cdc-sidecar:
image: debezium/connect:latest
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=order-cdc-group
- CONFIG_STORAGE_TOPIC=connect-configs
- OFFSET_STORAGE_TOPIC=connect-offsets
volumes:
- ./cdc-connector-config.json:/kafka/connect/connector-config.json
depends_on:
- postgres
- kafkaEin zentraler CDC-Service überwacht mehrere Datenbanken:
@Configuration
public class CentralCDCConfiguration {
@Bean
public CDCHub cdcHub() {
return CDCHub.builder()
.addSource("orders", postgresConfig("orders_db"))
.addSource("payments", mysqlConfig("payments_db"))
.addSource("inventory", mongoConfig("inventory_db"))
.eventRouter(new BusinessEventRouter())
.errorHandler(new CDCErrorHandler())
.build();
}
}Change Data Capture transformiert bestehende Systeme zu Event-Producern, ohne deren Anwendungslogik zu ändern. Die Wahl zwischen Log-based, Trigger-based oder Application-level CDC hängt von Performance-Anforderungen, Legacy-Constraints und Implementierungsaufwand ab.