40 Change Data Capture (CDC) – konzeptionelle Einführung

Change Data Capture ermöglicht es, Datenänderungen in bestehenden Systemen automatisch zu erkennen und als Events zu publizieren, ohne den Anwendungscode zu modifizieren. CDC fungiert als Brücke zwischen datenzentrischen Legacy-Systemen und Event-getriebenen Architekturen.

40.0.1 CDC Patterns und Tools

40.0.1.1 Log-based CDC

Das effizienteste CDC-Pattern nutzt die Transaction Logs der Datenbank. Jede Datenbank schreibt alle Änderungen in ein Write-Ahead Log (WAL), das normalerweise nur für Replikation und Recovery verwendet wird.

Funktionsweise am Beispiel PostgreSQL:

-- PostgreSQL Logical Replication Slot erstellen
SELECT pg_create_logical_replication_slot('order_changes', 'pgoutput');

-- WAL-Level für logical replication konfigurieren
-- postgresql.conf: wal_level = logical
-- postgresql.conf: max_replication_slots = 10

CDC-Tools lesen diese Logs kontinuierlich und transformieren die Low-Level-Änderungen in Business-Events:

Database Change CDC Event Business Meaning
INSERT INTO orders(id, customer_id, total) VALUES (123, 'C1', 99.99) OrderCreated Neue Bestellung eingegangen
UPDATE orders SET status='PAID' WHERE id=123 OrderPaymentConfirmed Zahlung erfolgreich
UPDATE orders SET status='SHIPPED' WHERE id=123 OrderShipped Versand erfolgt
DELETE FROM orders WHERE id=123 OrderCancelled Stornierung

40.0.1.2 Trigger-based CDC

Weniger performant, aber universell einsetzbar sind Datenbank-Trigger:

-- Trigger für Order-Änderungen
CREATE OR REPLACE FUNCTION order_change_trigger() 
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        INSERT INTO cdc_events (table_name, operation, new_data, timestamp)
        VALUES ('orders', 'INSERT', row_to_json(NEW), CURRENT_TIMESTAMP);
        RETURN NEW;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO cdc_events (table_name, operation, old_data, new_data, timestamp)
        VALUES ('orders', 'UPDATE', row_to_json(OLD), row_to_json(NEW), CURRENT_TIMESTAMP);
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        INSERT INTO cdc_events (table_name, operation, old_data, timestamp)
        VALUES ('orders', 'DELETE', row_to_json(OLD), CURRENT_TIMESTAMP);
        RETURN OLD;
    END IF;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER order_changes_trigger
    AFTER INSERT OR UPDATE OR DELETE ON orders
    FOR EACH ROW EXECUTE FUNCTION order_change_trigger();

40.0.1.3 Application-level CDC

Der einfachste Ansatz integriert CDC direkt in die Anwendungslogik:

@Service
@Transactional
public class OrderService {
    
    private final OrderRepository orderRepository;
    private final CDCEventPublisher cdcPublisher;
    
    public Order updateOrderStatus(String orderId, OrderStatus newStatus) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
        
        OrderStatus oldStatus = order.getStatus();
        order.setStatus(newStatus);
        
        Order savedOrder = orderRepository.save(order);
        
        // CDC Event publizieren
        cdcPublisher.publishStatusChange(orderId, oldStatus, newStatus);
        
        return savedOrder;
    }
}

40.0.2 Vergleich der CDC-Ansätze

Ansatz Performance Implementierungsaufwand Invasiveness Use Case
Log-based Sehr hoch Mittel Nicht-invasiv Production Systems
Trigger-based Niedrig Niedrig Mittel-invasiv Legacy Integration
Application-level Hoch Hoch Invasiv Neue Entwicklung

40.0.3 Event-driven Data Liberation

CDC befreit Daten aus Legacy-Systemen und macht sie für Event-getriebene Verarbeitung verfügbar. Der Prozess transformiert datenzentrische Denkmuster in Event-orientierte Workflows.

40.0.3.1 Traditioneller Datenfluss vs. CDC-basierter Eventfluss

Traditionell (Pull-basiert):

[Order DB] ← poll ← [Inventory Service] ← poll ← [Shipping Service]

Mit CDC (Push-basiert):

[Order DB] → CDC → [Events] → [Inventory Service] → [Shipping Service]

40.0.3.2 Praktische Transformation von Datenbankänderungen

Ein E-Commerce-System nutzt CDC, um verschiedene Services über Bestellungsänderungen zu informieren:

# CDC Event Processor
class OrderCDCProcessor:
    def __init__(self, event_publisher):
        self.event_publisher = event_publisher
        self.status_mapping = {
            'NEW': 'OrderPlaced',
            'PAID': 'PaymentConfirmed', 
            'SHIPPED': 'OrderShipped',
            'DELIVERED': 'OrderDelivered',
            'CANCELLED': 'OrderCancelled'
        }
    
    def process_cdc_event(self, cdc_event):
        if cdc_event['table'] == 'orders':
            self._handle_order_change(cdc_event)
        elif cdc_event['table'] == 'payments':
            self._handle_payment_change(cdc_event)
    
    def _handle_order_change(self, cdc_event):
        operation = cdc_event['operation']
        
        if operation == 'INSERT':
            self._publish_order_placed(cdc_event['after'])
        elif operation == 'UPDATE':
            self._publish_order_status_change(
                cdc_event['before'], 
                cdc_event['after']
            )
        elif operation == 'DELETE':
            self._publish_order_cancelled(cdc_event['before'])
    
    def _publish_order_status_change(self, before, after):
        if before['status'] != after['status']:
            event_type = self.status_mapping.get(after['status'])
            if event_type:
                event = {
                    'eventType': event_type,
                    'orderId': after['id'],
                    'previousStatus': before['status'],
                    'newStatus': after['status'],
                    'timestamp': after['updated_at']
                }
                
                self.event_publisher.publish(
                    topic=f"order.{event_type.lower()}.v1",
                    key=after['id'],
                    value=event
                )

40.0.3.3 Schema Evolution und Backwards Compatibility

CDC-Events müssen Schema-Änderungen der Quelldatenbank abfedern:

@Component
public class OrderCDCSchemaAdapter {
    
    public OrderEvent adaptCDCEvent(CDCEvent rawEvent) {
        JsonNode data = rawEvent.getAfter();
        
        // Legacy-Schema Behandlung
        if (data.has("customer_info")) {
            // Alter monolithischer customer_info String
            return adaptLegacyCustomerFormat(data);
        }
        
        // Aktuelles Schema
        return OrderEvent.builder()
            .orderId(data.get("id").asText())
            .customerId(data.get("customer_id").asText())
            .status(data.get("status").asText())
            .totalAmount(data.get("total_amount").asDouble())
            .items(parseItems(data.get("items")))
            .build();
    }
    
    private OrderEvent adaptLegacyCustomerFormat(JsonNode data) {
        String customerInfo = data.get("customer_info").asText();
        String[] parts = customerInfo.split(",");
        
        // Transformation: "123,John,Doe" -> strukturierte Daten
        return OrderEvent.builder()
            .orderId(data.get("id").asText())
            .customerId(parts[0])
            .customerName(parts[1] + " " + parts[2])
            .status(data.get("status").asText())
            // ... weitere Felder
            .build();
    }
}

40.0.4 Integration Strategies

40.0.4.1 Debezium als CDC-Plattform

Debezium ist die de-facto Standard-Lösung für Log-based CDC mit Kafka-Integration:

# Debezium PostgreSQL Connector Configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: order-database-source
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 1
  config:
    database.hostname: postgres-db
    database.port: 5432
    database.user: debezium
    database.password: secret
    database.dbname: ecommerce
    database.server.name: order-db
    table.include.list: public.orders,public.payments,public.inventory
    
    # Event-Routing
    transforms: route
    transforms.route.type: io.debezium.transforms.ByLogicalTableRouter
    transforms.route.topic.regex: order-db.public.(.*)
    transforms.route.topic.replacement: $1.changes.v1

40.0.4.2 Custom CDC Pipeline

Für spezielle Anforderungen kann eine maßgeschneiderte CDC-Pipeline entwickelt werden:

@Component
public class CustomCDCPipeline {
    
    private final PostgreSQLCDCConnector cdcConnector;
    private final EventTransformer transformer;
    private final KafkaEventPublisher publisher;
    
    @PostConstruct
    public void startCDCStream() {
        cdcConnector.onChanges(this::processDatabaseChange);
    }
    
    private void processDatabaseChange(DatabaseChange change) {
        try {
            // 1. Filtern: Nur relevante Tabellen
            if (!isRelevantTable(change.getTable())) {
                return;
            }
            
            // 2. Transformieren: DB-Change → Business Event
            List<BusinessEvent> events = transformer.transform(change);
            
            // 3. Publizieren: Events → Kafka Topics
            for (BusinessEvent event : events) {
                publisher.publish(event);
            }
            
        } catch (Exception e) {
            log.error("Failed to process CDC change: {}", change, e);
            // Dead Letter Queue oder Retry-Logik
        }
    }
    
    private boolean isRelevantTable(String tableName) {
        return Set.of("orders", "payments", "inventory", "customers")
            .contains(tableName);
    }
}

40.0.4.3 Multi-Database CDC Aggregation

In microservice-orientierten Umgebungen müssen oft mehrere Datenbanken überwacht werden:

# Multi-Source CDC Coordinator
class MultiSourceCDCCoordinator:
    def __init__(self):
        self.connectors = {
            'orders': PostgreSQLCDCConnector('orders_db'),
            'payments': MySQLCDCConnector('payments_db'), 
            'inventory': PostgreSQLCDCConnector('inventory_db')
        }
        self.event_router = EventRouter()
    
    async def start_all_streams(self):
        tasks = []
        for source, connector in self.connectors.items():
            task = asyncio.create_task(
                self._monitor_source(source, connector)
            )
            tasks.append(task)
        
        await asyncio.gather(*tasks)
    
    async def _monitor_source(self, source_name, connector):
        async for change in connector.stream_changes():
            await self._route_change(source_name, change)
    
    async def _route_change(self, source, change):
        # Source-spezifische Event-Transformation
        events = await self.event_router.transform_change(source, change)
        
        for event in events:
            await self._publish_unified_event(event)

40.0.4.4 CDC Error Handling und Monitoring

CDC-Pipelines benötigen robuste Fehlerbehandlung:

@Component
public class CDCErrorHandler {
    
    private final CircuitBreaker circuitBreaker;
    private final RetryTemplate retryTemplate;
    private final DeadLetterQueue deadLetterQueue;
    
    public void handleCDCEvent(CDCEvent event) {
        try {
            circuitBreaker.executeSupplier(() -> {
                return retryTemplate.execute(context -> {
                    processEvent(event);
                    return null;
                });
            });
            
        } catch (CircuitBreakerOpenException e) {
            log.warn("Circuit breaker open for CDC processing");
            deadLetterQueue.send(event, "circuit-breaker-open");
            
        } catch (Exception e) {
            log.error("Failed to process CDC event after retries: {}", event, e);
            deadLetterQueue.send(event, e.getMessage());
        }
    }
    
    @RetryableTopic(
        attempts = "3",
        backoff = @Backoff(delay = 1000, multiplier = 2.0),
        dltStrategy = DltStrategy.FAIL_ON_ERROR
    )
    @KafkaListener(topics = "cdc.dead-letter-queue")
    public void handleDeadLetterEvent(CDCEvent event) {
        // Manuelle Verarbeitung oder Eskalation
        log.info("Processing dead letter CDC event: {}", event);
    }
}

40.0.5 CDC Deployment Patterns

40.0.5.1 Sidecar Pattern

CDC als Sidecar-Container neben der Hauptanwendung:

# Docker Compose CDC Sidecar
version: '3.8'
services:
  order-service:
    image: order-service:latest
    environment:
      - DATABASE_URL=jdbc:postgresql://postgres:5432/orders
  
  order-cdc-sidecar:
    image: debezium/connect:latest
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=order-cdc-group
      - CONFIG_STORAGE_TOPIC=connect-configs
      - OFFSET_STORAGE_TOPIC=connect-offsets
    volumes:
      - ./cdc-connector-config.json:/kafka/connect/connector-config.json
    depends_on:
      - postgres
      - kafka

40.0.5.2 Centralized CDC Hub

Ein zentraler CDC-Service überwacht mehrere Datenbanken:

@Configuration
public class CentralCDCConfiguration {
    
    @Bean
    public CDCHub cdcHub() {
        return CDCHub.builder()
            .addSource("orders", postgresConfig("orders_db"))
            .addSource("payments", mysqlConfig("payments_db"))
            .addSource("inventory", mongoConfig("inventory_db"))
            .eventRouter(new BusinessEventRouter())
            .errorHandler(new CDCErrorHandler())
            .build();
    }
}

Change Data Capture transformiert bestehende Systeme zu Event-Producern, ohne deren Anwendungslogik zu ändern. Die Wahl zwischen Log-based, Trigger-based oder Application-level CDC hängt von Performance-Anforderungen, Legacy-Constraints und Implementierungsaufwand ab.