32 Stateless vs. stateful Eventverarbeitung

Die Art der Eventverarbeitung bestimmt maßgeblich, wie flexibel und skalierbar ein System werden kann. Während stateless Verarbeitung maximale Flexibilität bietet, ermöglicht stateful Verarbeitung komplexere Geschäftslogik. Die richtige Wahl hängt von den spezifischen Anforderungen ab.

32.1 Functional Event Processing

Functional Event Processing behandelt jedes Event isoliert, ohne Kontext aus vorherigen Events. Ein Handler empfängt ein Event, verarbeitet es und produziert möglicherweise neue Events – alles ohne persistenten Zustand zwischen den Aufrufen.

@Component
public class PaymentNotificationHandler {
    
    @KafkaListener(topics = "payment.processed.v1")
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        // Stateless: Jedes Event wird unabhängig verarbeitet
        EmailNotification notification = EmailNotification.builder()
            .recipientId(event.getCustomerId())
            .subject("Zahlung erhalten")
            .content("Ihre Zahlung über " + event.getAmount() + " wurde verarbeitet.")
            .build();
            
        emailService.send(notification);
        
        // Kein persistenter Zustand zwischen Events
        // Jeder Handler-Aufruf ist vollständig unabhängig
    }
}

Die funktionale Verarbeitung eignet sich besonders für:

import asyncio
from confluent_kafka import Consumer

class PaymentNotificationHandler:
    def __init__(self, email_service):
        self.email_service = email_service
    
    async def handle_payment_processed(self, event):
        """Stateless Event Handler - kein persistenter Zustand"""
        notification = {
            'recipient_id': event['customerId'],
            'subject': 'Zahlung erhalten',
            'content': f"Ihre Zahlung über {event['amount']} wurde verarbeitet."
        }
        
        await self.email_service.send(notification)
        # Keine Zustandsspeicherung zwischen Events

32.2 State Management Patterns

Sobald die Verarbeitung Kontext aus mehreren Events benötigt, entstehen verschiedene Patterns für Zustandsmanagement. Diese reichen von einfacher In-Memory-Speicherung bis zu persistenten Event Stores.

Pattern Anwendungsfall Vorteile Nachteile
Event Aggregation Summenbildung, Zähler Einfach zu verstehen Zustand kann verloren gehen
Session Windows Benutzerverhalten, Shopping Carts Zeitbasierte Gruppierung Komplexe Timeout-Behandlung
Entity State Tracking Bestell-Workflows, Zustandsmaschinen Vollständiges Objektmodell Höhere Speicheranforderungen

Ein typisches Beispiel ist die Verfolgung von Bestellzuständen, wo verschiedene Events den aktuellen Status einer Bestellung beeinflussen:

@Component
public class OrderStatusTracker {
    private final Map<String, OrderStatus> orderStates = new ConcurrentHashMap<>();
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        OrderStatus status = OrderStatus.builder()
            .orderId(event.getOrderId())
            .status("PLACED")
            .timestamp(event.getTimestamp())
            .items(event.getItems())
            .build();
            
        orderStates.put(event.getOrderId(), status);
    }
    
    @KafkaListener(topics = "payment.processed.v1") 
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        OrderStatus currentStatus = orderStates.get(event.getOrderId());
        if (currentStatus != null && "PLACED".equals(currentStatus.getStatus())) {
            currentStatus.setStatus("PAID");
            currentStatus.setPaymentTimestamp(event.getTimestamp());
            
            // Event für nachgelagerte Services
            orderStatusService.publishStatusUpdate(event.getOrderId(), "PAID");
        }
    }
    
    public OrderStatus getOrderStatus(String orderId) {
        return orderStates.get(orderId);
    }
}

Entscheidend ist die Wahl des richtigen Patterns basierend auf den Geschäftsanforderungen:

Event Aggregation sammelt quantitative Daten über Zeit oder Ereignisse. Ein Beispiel ist die Berechnung des täglichen Umsatzes aus einzelnen Zahlungsereignissen.

Session Windows gruppieren zusammengehörige Events zeitlich. Shopping-Cart-Ereignisse werden beispielsweise zu Sitzungen zusammengefasst, um Kaufverhalten zu analysieren.

Entity State Tracking verwaltet den vollständigen Zustand von Geschäftsobjekten über ihren Lebenszyklus hinweg.

from collections import defaultdict
import asyncio
from datetime import datetime

class OrderStatusTracker:
    def __init__(self):
        self.order_states = {}
    
    async def handle_order_placed(self, event):
        """Stateful: Zustand wird für spätere Events gespeichert"""
        order_status = {
            'order_id': event['orderId'],
            'status': 'PLACED',
            'timestamp': event['timestamp'],
            'items': event['items']
        }
        
        self.order_states[event['orderId']] = order_status
        
    async def handle_payment_processed(self, event):
        """Stateful: Bestehender Zustand wird aktualisiert"""
        order_id = event['orderId']
        
        if order_id in self.order_states:
            current_status = self.order_states[order_id]
            if current_status['status'] == 'PLACED':
                current_status['status'] = 'PAID'
                current_status['payment_timestamp'] = event['timestamp']
                
                # Weiteres Event für nachgelagerte Services
                await self.publish_status_update(order_id, 'PAID')
    
    def get_order_status(self, order_id):
        return self.order_states.get(order_id)

32.3 Scalability Implications

Die Wahl zwischen stateless und stateful Verarbeitung hat direkten Einfluss auf die Skalierbarkeit des Systems. Stateless Handler können beliebig horizontal skaliert werden, während stateful Handler zusätzliche Koordination benötigen.

Stateless Skalierung ist linear und vorhersagbar. Jede neue Handler-Instanz kann sofort Events verarbeiten, ohne Rücksicht auf andere Instanzen. Die Skalierung erfolgt typischerweise über die Anzahl der Consumer-Instanzen und die Partitionierung der Topics.

// Stateless Handler - beliebig skalierbar
@Component
public class InvoiceGenerator {
    
    @KafkaListener(topics = "order.completed.v1", 
                   containerFactory = "kafkaListenerContainerFactory")
    public void generateInvoice(OrderCompletedEvent event) {
        Invoice invoice = invoiceService.createInvoice(event);
        pdfService.generatePdf(invoice);
        storageService.store(invoice);
        
        // Keine Koordination mit anderen Handler-Instanzen nötig
        // Jede Instanz kann unabhängig arbeiten
    }
}

Stateful Skalierung erfordert Partitionierung des Zustands. Der Zustand muss entweder sharded oder über mehrere Instanzen synchronisiert werden.

Skalierungsansatz Stateless Handler Stateful Handler
Instanz hinzufügen Sofort produktiv Zustand muss umverteilt werden
Lastverteilung Beliebig möglich Nur innerhalb der Partition
Fehlerbehandlung Einfaches Restart Zustandsrekonstruktion nötig
Deployment Rolling Update möglich Koordiniertes Update erforderlich

Bei stateful Verarbeitung wird üblicherweise nach einem Partitionierungsschlüssel (z.B. Kunden-ID oder Bestell-ID) aufgeteilt:

import hashlib

class PartitionedOrderTracker:
    def __init__(self, partition_count=4):
        self.partition_count = partition_count
        self.order_states = {}
    
    def get_partition(self, order_id):
        """Bestimmt Partition basierend auf Order-ID"""
        hash_value = int(hashlib.md5(order_id.encode()).hexdigest(), 16)
        return hash_value % self.partition_count
    
    async def handle_order_event(self, event):
        order_id = event['orderId']
        partition = self.get_partition(order_id)
        
        # Nur Events der eigenen Partition verarbeiten
        if partition == self.assigned_partition:
            await self.process_order_event(event)
        # Events anderer Partitionen werden ignoriert

Memory Management wird bei stateful Verarbeitung kritisch. Ohne geeignete Strategien kann der Speicherverbrauch unbegrenzt wachsen:

@Component
public class ManagedOrderTracker {
    private final Map<String, OrderStatus> orderStates = new ConcurrentHashMap<>();
    
    @Scheduled(fixedRate = 300000) // Alle 5 Minuten
    public void cleanupExpiredOrders() {
        Instant cutoff = Instant.now().minus(Duration.ofHours(24));
        
        orderStates.entrySet().removeIf(entry -> {
            OrderStatus status = entry.getValue();
            return status.isCompleted() && 
                   status.getCompletionTime().isBefore(cutoff);
        });
    }
    
    @EventListener
    public void handleOrderCompleted(OrderCompletedEvent event) {
        OrderStatus status = orderStates.get(event.getOrderId());
        if (status != null) {
            status.markCompleted(event.getTimestamp());
            // Status bleibt für 24h verfügbar, dann automatische Bereinigung
        }
    }
}

Die Entscheidung zwischen stateless und stateful Verarbeitung sollte bewusst getroffen werden. Stateless Handler bieten maximale Flexibilität und einfache Skalierung, während stateful Handler komplexere Geschäftslogik ermöglichen, aber zusätzliche Architekturentscheidungen erfordern. Hybride Ansätze, bei denen verschiedene Handler unterschiedliche Strategien verwenden, sind in der Praxis häufig die beste Lösung.