41 Eventisierung von REST / Datenbankaktionen

Die Transformation bestehender synchroner Systeme zu Event-getriebenen Architekturen erfordert die systematische Umwandlung von direkten Datenbankoperationen und REST-Calls in Event-Streams. Dieser Prozess macht implizite Geschäftsereignisse explizit sichtbar und ermöglicht lose gekoppelte Systemarchitekturen.

41.0.1 Database Trigger Approaches

Datenbank-Trigger bieten den direktesten Weg, um Datenbankänderungen automatisch in Events zu überführen. Sie reagieren unmittelbar auf DML-Operationen (INSERT, UPDATE, DELETE) und können Business-Events generieren, ohne Anwendungscode zu modifizieren.

41.0.1.1 Trigger-basierte Event-Generierung

Ein typisches E-Commerce-Szenario nutzt Trigger, um Order-Status-Änderungen in Events zu verwandeln:

-- Event-Tabelle für ausgehende Events
CREATE TABLE order_events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    order_id UUID NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    event_data JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    published BOOLEAN DEFAULT FALSE
);

-- Trigger-Funktion für Order-Änderungen
CREATE OR REPLACE FUNCTION generate_order_events()
RETURNS TRIGGER AS $$
DECLARE
    event_data JSONB;
BEGIN
    -- Bei INSERT: OrderPlaced Event
    IF TG_OP = 'INSERT' THEN
        event_data := jsonb_build_object(
            'orderId', NEW.id,
            'customerId', NEW.customer_id,
            'items', NEW.items,
            'totalAmount', NEW.total_amount,
            'status', NEW.status,
            'timestamp', EXTRACT(EPOCH FROM NEW.created_at)
        );
        
        INSERT INTO order_events (order_id, event_type, event_data)
        VALUES (NEW.id, 'OrderPlaced', event_data);
        
        RETURN NEW;
    END IF;
    
    -- Bei UPDATE: Status-spezifische Events
    IF TG_OP = 'UPDATE' THEN
        -- Nur bei Statusänderung Event generieren
        IF OLD.status != NEW.status THEN
            event_data := jsonb_build_object(
                'orderId', NEW.id,
                'previousStatus', OLD.status,
                'newStatus', NEW.status,
                'timestamp', EXTRACT(EPOCH FROM NEW.updated_at)
            );
            
            -- Status-spezifische Event-Typen
            CASE NEW.status
                WHEN 'PAYMENT_CONFIRMED' THEN
                    INSERT INTO order_events (order_id, event_type, event_data)
                    VALUES (NEW.id, 'PaymentConfirmed', event_data);
                    
                WHEN 'SHIPPED' THEN
                    INSERT INTO order_events (order_id, event_type, event_data)
                    VALUES (NEW.id, 'OrderShipped', event_data);
                    
                WHEN 'DELIVERED' THEN
                    INSERT INTO order_events (order_id, event_type, event_data)
                    VALUES (NEW.id, 'OrderDelivered', event_data);
                    
                WHEN 'CANCELLED' THEN
                    INSERT INTO order_events (order_id, event_type, event_data)
                    VALUES (NEW.id, 'OrderCancelled', event_data);
            END CASE;
        END IF;
        
        RETURN NEW;
    END IF;
    
    -- Bei DELETE: OrderCancelled Event
    IF TG_OP = 'DELETE' THEN
        event_data := jsonb_build_object(
            'orderId', OLD.id,
            'reason', 'HARD_DELETE',
            'timestamp', EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
        );
        
        INSERT INTO order_events (order_id, event_type, event_data)
        VALUES (OLD.id, 'OrderCancelled', event_data);
        
        RETURN OLD;
    END IF;
    
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Trigger an Order-Tabelle binden
CREATE TRIGGER order_event_trigger
    AFTER INSERT OR UPDATE OR DELETE ON orders
    FOR EACH ROW EXECUTE FUNCTION generate_order_events();

41.0.1.2 Event-Publisher als separater Service

Ein Background-Service verarbeitet die durch Trigger generierten Events:

@Service
public class DatabaseEventPublisher {
    
    private final JdbcTemplate jdbcTemplate;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void publishPendingEvents() {
        List<DatabaseEvent> pendingEvents = fetchUnpublishedEvents();
        
        for (DatabaseEvent event : pendingEvents) {
            try {
                publishEvent(event);
                markAsPublished(event.getId());
                
            } catch (Exception e) {
                log.error("Failed to publish event {}: {}", event.getId(), e.getMessage());
                // Event bleibt unpubliziert für Retry
            }
        }
    }
    
    private List<DatabaseEvent> fetchUnpublishedEvents() {
        String sql = """
            SELECT id, order_id, event_type, event_data, created_at
            FROM order_events 
            WHERE published = false 
            ORDER BY created_at 
            LIMIT 100
            """;
        
        return jdbcTemplate.query(sql, (rs, rowNum) -> 
            DatabaseEvent.builder()
                .id(UUID.fromString(rs.getString("id")))
                .orderId(rs.getString("order_id"))
                .eventType(rs.getString("event_type"))
                .eventData(rs.getString("event_data"))
                .createdAt(rs.getTimestamp("created_at").toInstant())
                .build()
        );
    }
    
    private void publishEvent(DatabaseEvent dbEvent) {
        String topic = determineTopicFromEventType(dbEvent.getEventType());
        
        // JSON zu Event-Objekt transformieren
        Object eventPayload = transformToEventPayload(dbEvent);
        
        kafkaTemplate.send(topic, dbEvent.getOrderId(), eventPayload);
        
        log.info("Published {} event for order {}", 
            dbEvent.getEventType(), dbEvent.getOrderId());
    }
    
    private String determineTopicFromEventType(String eventType) {
        return switch (eventType) {
            case "OrderPlaced" -> "order.placed.v1";
            case "PaymentConfirmed" -> "payment.confirmed.v1";
            case "OrderShipped" -> "order.shipped.v1";
            case "OrderDelivered" -> "order.delivered.v1";
            case "OrderCancelled" -> "order.cancelled.v1";
            default -> "order.unknown.v1";
        };
    }
}

41.0.1.3 Python-basierte Trigger Event Processing

import psycopg2
import json
from kafka import KafkaProducer
import asyncio
from typing import List, Dict

class DatabaseEventProcessor:
    def __init__(self, db_config, kafka_config):
        self.db_config = db_config
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_config['servers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8')
        )
        
        self.topic_mapping = {
            'OrderPlaced': 'order.placed.v1',
            'PaymentConfirmed': 'payment.confirmed.v1',
            'OrderShipped': 'order.shipped.v1',
            'OrderDelivered': 'order.delivered.v1',
            'OrderCancelled': 'order.cancelled.v1'
        }
    
    async def process_pending_events(self):
        while True:
            try:
                events = self._fetch_unpublished_events()
                
                for event in events:
                    await self._publish_event(event)
                    self._mark_as_published(event['id'])
                
                await asyncio.sleep(1)  # 1 Sekunde Pause
                
            except Exception as e:
                print(f"Error processing events: {e}")
                await asyncio.sleep(5)
    
    def _fetch_unpublished_events(self) -> List[Dict]:
        conn = psycopg2.connect(**self.db_config)
        try:
            with conn.cursor() as cursor:
                cursor.execute("""
                    SELECT id, order_id, event_type, event_data, created_at
                    FROM order_events 
                    WHERE published = false 
                    ORDER BY created_at 
                    LIMIT 100
                """)
                
                events = []
                for row in cursor.fetchall():
                    events.append({
                        'id': row[0],
                        'order_id': row[1],
                        'event_type': row[2],
                        'event_data': json.loads(row[3]),
                        'created_at': row[4]
                    })
                
                return events
        finally:
            conn.close()
    
    async def _publish_event(self, event):
        topic = self.topic_mapping.get(event['event_type'], 'order.unknown.v1')
        
        # Event-Payload zusammenstellen
        payload = {
            'eventId': event['id'],
            'eventType': event['event_type'],
            'timestamp': event['created_at'].isoformat(),
            'data': event['event_data']
        }
        
        future = self.producer.send(
            topic=topic,
            key=event['order_id'],
            value=payload
        )
        
        # Kafka-Bestätigung abwarten
        record_metadata = future.get(timeout=10)
        print(f"Published {event['event_type']} to {topic}:{record_metadata.partition}")

41.0.2 Application-level Event Generation

Die Integration von Event-Generierung direkt in die Anwendungslogik bietet mehr Kontrolle über Business-Semantik und Event-Timing, erfordert aber Änderungen am bestehenden Code.

41.0.2.1 REST-Controller Event-Erweiterung

Bestehende REST-Endpoints werden um Event-Publikation erweitert:

@RestController
@RequestMapping("/api/v1/orders")
public class OrderController {
    
    private final OrderService orderService;
    private final OrderEventPublisher eventPublisher;
    
    @PostMapping
    public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
        // 1. Geschäftslogik ausführen
        Order order = orderService.createOrder(request);
        
        // 2. Event publizieren
        OrderPlacedEvent event = OrderPlacedEvent.builder()
            .orderId(order.getId())
            .customerId(order.getCustomerId())
            .items(order.getItems())
            .totalAmount(order.getTotalAmount())
            .timestamp(order.getCreatedAt())
            .build();
        
        eventPublisher.publishOrderPlaced(event);
        
        // 3. HTTP-Response zurückgeben
        return ResponseEntity.ok(OrderResponse.from(order));
    }
    
    @PutMapping("/{orderId}/status")
    public ResponseEntity<OrderResponse> updateOrderStatus(
            @PathVariable String orderId,
            @RequestBody UpdateStatusRequest request) {
        
        Order order = orderService.updateStatus(orderId, request.getStatus());
        
        // Status-spezifische Events publizieren
        publishStatusChangeEvent(order, request.getStatus());
        
        return ResponseEntity.ok(OrderResponse.from(order));
    }
    
    private void publishStatusChangeEvent(Order order, OrderStatus newStatus) {
        switch (newStatus) {
            case PAYMENT_CONFIRMED:
                eventPublisher.publishPaymentConfirmed(
                    PaymentConfirmedEvent.from(order)
                );
                break;
                
            case SHIPPED:
                eventPublisher.publishOrderShipped(
                    OrderShippedEvent.from(order)
                );
                break;
                
            case DELIVERED:
                eventPublisher.publishOrderDelivered(
                    OrderDeliveredEvent.from(order)
                );
                break;
                
            case CANCELLED:
                eventPublisher.publishOrderCancelled(
                    OrderCancelledEvent.from(order)
                );
                break;
        }
    }
}

41.0.2.2 Service-Layer Event Integration

Events werden in der Service-Schicht generiert, um Business-Logik und Event-Publikation zu koppeln:

@Service
@Transactional
public class OrderService {
    
    private final OrderRepository orderRepository;
    private final ApplicationEventPublisher applicationEventPublisher;
    
    public Order createOrder(CreateOrderRequest request) {
        Order order = Order.builder()
            .id(UUID.randomUUID().toString())
            .customerId(request.getCustomerId())
            .items(request.getItems())
            .totalAmount(calculateTotal(request.getItems()))
            .status(OrderStatus.PLACED)
            .createdAt(Instant.now())
            .build();
        
        Order savedOrder = orderRepository.save(order);
        
        // Domain Event publizieren (Spring ApplicationEvent)
        applicationEventPublisher.publishEvent(
            new OrderPlacedDomainEvent(savedOrder)
        );
        
        return savedOrder;
    }
    
    public Order processPayment(String orderId, PaymentInfo paymentInfo) {
        Order order = findOrderById(orderId);
        
        // Geschäftslogik: Zahlung verarbeiten
        order.confirmPayment(paymentInfo);
        Order savedOrder = orderRepository.save(order);
        
        // Domain Event publizieren
        applicationEventPublisher.publishEvent(
            new PaymentConfirmedDomainEvent(savedOrder, paymentInfo)
        );
        
        return savedOrder;
    }
}

// Domain Event zu Kafka Event Adapter
@Component
public class DomainEventToKafkaAdapter {
    
    private final OrderEventPublisher kafkaEventPublisher;
    
    @EventListener
    @Async
    public void handleOrderPlaced(OrderPlacedDomainEvent domainEvent) {
        OrderPlacedEvent kafkaEvent = transformToKafkaEvent(domainEvent);
        kafkaEventPublisher.publishOrderPlaced(kafkaEvent);
    }
    
    @EventListener
    @Async
    public void handlePaymentConfirmed(PaymentConfirmedDomainEvent domainEvent) {
        PaymentConfirmedEvent kafkaEvent = transformToKafkaEvent(domainEvent);
        kafkaEventPublisher.publishPaymentConfirmed(kafkaEvent);
    }
}

41.0.2.3 Python FastAPI Event Integration

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio

app = FastAPI()

class CreateOrderRequest(BaseModel):
    customer_id: str
    items: list
    total_amount: float

class OrderService:
    def __init__(self, event_publisher):
        self.event_publisher = event_publisher
        self.orders = {}  # Simplified in-memory storage
    
    async def create_order(self, request: CreateOrderRequest):
        order = {
            'id': str(uuid.uuid4()),
            'customer_id': request.customer_id,
            'items': request.items,
            'total_amount': request.total_amount,
            'status': 'PLACED',
            'created_at': datetime.utcnow()
        }
        
        # Geschäftsdaten speichern
        self.orders[order['id']] = order
        
        # Event publizieren
        await self.event_publisher.publish_order_placed(order)
        
        return order

@app.post("/api/v1/orders")
async def create_order(request: CreateOrderRequest):
    try:
        order = await order_service.create_order(request)
        return {"order": order}
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.put("/api/v1/orders/{order_id}/status")
async def update_order_status(order_id: str, status: str):
    try:
        order = await order_service.update_status(order_id, status)
        
        # Status-spezifische Events
        await publish_status_event(order, status)
        
        return {"order": order}
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

async def publish_status_event(order, status):
    event_mapping = {
        'PAYMENT_CONFIRMED': event_publisher.publish_payment_confirmed,
        'SHIPPED': event_publisher.publish_order_shipped,
        'DELIVERED': event_publisher.publish_order_delivered,
        'CANCELLED': event_publisher.publish_order_cancelled
    }
    
    publisher_func = event_mapping.get(status)
    if publisher_func:
        await publisher_func(order)

41.0.3 Transaction Log Mining

Transaction Log Mining extrahiert Events direkt aus den Datenbank-WAL-Files (Write-Ahead Logs), ohne Trigger oder Anwendungsänderungen. Diese Technik bietet die geringste Invasivität bei hoher Performance.

41.0.3.1 PostgreSQL WAL-basierte Event-Extraktion

@Component
public class PostgreSQLWALMiner {
    
    private final PGReplicationStream replicationStream;
    private final EventTransformer eventTransformer;
    private final KafkaEventPublisher eventPublisher;
    
    @PostConstruct
    public void startWALMining() {
        CompletableFuture.runAsync(this::processWALChanges);
    }
    
    private void processWALChanges() {
        try {
            while (true) {
                ByteBuffer walMessage = replicationStream.read();
                
                if (walMessage != null) {
                    processWALMessage(walMessage);
                }
                
                Thread.sleep(100); // Kurze Pause zwischen Reads
            }
        } catch (Exception e) {
            log.error("WAL mining failed", e);
            // Restart-Logik
        }
    }
    
    private void processWALMessage(ByteBuffer walMessage) {
        try {
            // WAL-Nachricht zu LogicalReplicationMessage dekodieren
            LogicalReplicationMessage message = LogicalReplicationMessage
                .decode(walMessage);
            
            if (message instanceof PgOutputMessage) {
                PgOutputMessage pgOutput = (PgOutputMessage) message;
                processTableChange(pgOutput);
            }
            
        } catch (Exception e) {
            log.error("Failed to process WAL message", e);
        }
    }
    
    private void processTableChange(PgOutputMessage message) {
        String tableName = message.getTableName();
        
        // Nur relevante Tabellen verarbeiten
        if (!isBusinessRelevantTable(tableName)) {
            return;
        }
        
        switch (message.getOperation()) {
            case INSERT:
                handleInsert(tableName, message.getNewValues());
                break;
            case UPDATE:
                handleUpdate(tableName, message.getOldValues(), message.getNewValues());
                break;
            case DELETE:
                handleDelete(tableName, message.getOldValues());
                break;
        }
    }
    
    private void handleUpdate(String tableName, Map<String, Object> oldValues, 
                             Map<String, Object> newValues) {
        if ("orders".equals(tableName)) {
            String oldStatus = (String) oldValues.get("status");
            String newStatus = (String) newValues.get("status");
            
            // Nur bei Statusänderung Event generieren
            if (!Objects.equals(oldStatus, newStatus)) {
                OrderStatusChangedEvent event = eventTransformer
                    .createStatusChangeEvent(oldValues, newValues);
                
                eventPublisher.publishStatusChange(event);
            }
        }
    }
}

41.0.3.2 Python-basierte WAL-Mining mit psycopg2

import psycopg2
from psycopg2.extras import LogicalReplicationConnection
import json

class PostgreSQLWALMiner:
    def __init__(self, db_config, event_publisher):
        self.db_config = db_config
        self.event_publisher = event_publisher
        self.slot_name = 'order_events_slot'
    
    async def start_wal_mining(self):
        conn = psycopg2.connect(
            connection_factory=LogicalReplicationConnection,
            **self.db_config
        )
        
        try:
            # Replication Slot erstellen (falls nicht vorhanden)
            await self._ensure_replication_slot(conn)
            
            # WAL-Stream konsumieren
            await self._consume_wal_stream(conn)
            
        finally:
            conn.close()
    
    async def _consume_wal_stream(self, conn):
        cursor = conn.cursor()
        cursor.start_replication(
            slot_name=self.slot_name,
            decode=True,
            options={'pretty-print': 1}
        )
        
        async for message in cursor:
            await self._process_wal_message(message)
            message.cursor.send_feedback(flush_lsn=message.data_start)
    
    async def _process_wal_message(self, message):
        try:
            # WAL-Message parsen (Format abhängig von Plugin)
            change_data = self._parse_wal_message(message.payload)
            
            if self._is_business_relevant(change_data):
                events = self._transform_to_events(change_data)
                
                for event in events:
                    await self.event_publisher.publish(event)
                    
        except Exception as e:
            print(f"Failed to process WAL message: {e}")
    
    def _parse_wal_message(self, payload):
        # Beispiel für pgoutput-Plugin Format
        lines = payload.strip().split('\n')
        
        if lines[0].startswith('BEGIN'):
            return None  # Transaction start, ignorieren
        elif lines[0].startswith('COMMIT'):
            return None  # Transaction end, ignorieren
        elif lines[0].startswith('table'):
            return self._parse_table_change(lines)
        
        return None
    
    def _transform_to_events(self, change_data):
        events = []
        
        if change_data['table'] == 'orders':
            if change_data['operation'] == 'INSERT':
                events.append(self._create_order_placed_event(change_data))
            elif change_data['operation'] == 'UPDATE':
                events.extend(self._create_order_update_events(change_data))
        
        return events

41.0.4 Vergleich der Eventisierungs-Ansätze

Ansatz Invasivität Performance Flexibilität Maintenance
Database Triggers Niedrig Mittel Mittel Hoch (DB-abhängig)
Application-level Hoch Hoch Sehr hoch Niedrig
Transaction Log Mining Sehr niedrig Sehr hoch Niedrig Mittel

41.0.4.1 Auswahlkriterien

Database Triggers eignen sich für: - Legacy-Systeme ohne Änderungsmöglichkeit - Einfache Event-Szenarien - Prototyping und Migration

Application-level Event Generation eignet sich für: - Neue Entwicklungen - Komplexe Business-Event-Logik - Starke Kontrolle über Event-Timing

Transaction Log Mining eignet sich für: - High-Performance-Anforderungen - Minimal-invasive Integration - Reine Datenreplikation

Die Wahl des Ansatzes hängt von Legacy-Constraints, Performance-Anforderungen und der gewünschten Kontrolle über Event-Semantik ab. Oft werden mehrere Ansätze kombiniert: Transaction Log Mining für Datenreplikation und Application-level Events für komplexe Geschäftslogik.