Die Art der Eventverarbeitung bestimmt maßgeblich, wie flexibel und skalierbar ein System werden kann. Während stateless Verarbeitung maximale Flexibilität bietet, ermöglicht stateful Verarbeitung komplexere Geschäftslogik. Die richtige Wahl hängt von den spezifischen Anforderungen ab.
Functional Event Processing behandelt jedes Event isoliert, ohne Kontext aus vorherigen Events. Ein Handler empfängt ein Event, verarbeitet es und produziert möglicherweise neue Events – alles ohne persistenten Zustand zwischen den Aufrufen.
@Component
public class PaymentNotificationHandler {
@KafkaListener(topics = "payment.processed.v1")
public void handlePaymentProcessed(PaymentProcessedEvent event) {
// Stateless: Jedes Event wird unabhängig verarbeitet
EmailNotification notification = EmailNotification.builder()
.recipientId(event.getCustomerId())
.subject("Zahlung erhalten")
.content("Ihre Zahlung über " + event.getAmount() + " wurde verarbeitet.")
.build();
emailService.send(notification);
// Kein persistenter Zustand zwischen Events
// Jeder Handler-Aufruf ist vollständig unabhängig
}
}Die funktionale Verarbeitung eignet sich besonders für:
import asyncio
from confluent_kafka import Consumer
class PaymentNotificationHandler:
def __init__(self, email_service):
self.email_service = email_service
async def handle_payment_processed(self, event):
"""Stateless Event Handler - kein persistenter Zustand"""
notification = {
'recipient_id': event['customerId'],
'subject': 'Zahlung erhalten',
'content': f"Ihre Zahlung über {event['amount']} wurde verarbeitet."
}
await self.email_service.send(notification)
# Keine Zustandsspeicherung zwischen EventsSobald die Verarbeitung Kontext aus mehreren Events benötigt, entstehen verschiedene Patterns für Zustandsmanagement. Diese reichen von einfacher In-Memory-Speicherung bis zu persistenten Event Stores.
| Pattern | Anwendungsfall | Vorteile | Nachteile |
|---|---|---|---|
| Event Aggregation | Summenbildung, Zähler | Einfach zu verstehen | Zustand kann verloren gehen |
| Session Windows | Benutzerverhalten, Shopping Carts | Zeitbasierte Gruppierung | Komplexe Timeout-Behandlung |
| Entity State Tracking | Bestell-Workflows, Zustandsmaschinen | Vollständiges Objektmodell | Höhere Speicheranforderungen |
Ein typisches Beispiel ist die Verfolgung von Bestellzuständen, wo verschiedene Events den aktuellen Status einer Bestellung beeinflussen:
@Component
public class OrderStatusTracker {
private final Map<String, OrderStatus> orderStates = new ConcurrentHashMap<>();
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) {
OrderStatus status = OrderStatus.builder()
.orderId(event.getOrderId())
.status("PLACED")
.timestamp(event.getTimestamp())
.items(event.getItems())
.build();
orderStates.put(event.getOrderId(), status);
}
@KafkaListener(topics = "payment.processed.v1")
public void handlePaymentProcessed(PaymentProcessedEvent event) {
OrderStatus currentStatus = orderStates.get(event.getOrderId());
if (currentStatus != null && "PLACED".equals(currentStatus.getStatus())) {
currentStatus.setStatus("PAID");
currentStatus.setPaymentTimestamp(event.getTimestamp());
// Event für nachgelagerte Services
orderStatusService.publishStatusUpdate(event.getOrderId(), "PAID");
}
}
public OrderStatus getOrderStatus(String orderId) {
return orderStates.get(orderId);
}
}Entscheidend ist die Wahl des richtigen Patterns basierend auf den Geschäftsanforderungen:
Event Aggregation sammelt quantitative Daten über Zeit oder Ereignisse. Ein Beispiel ist die Berechnung des täglichen Umsatzes aus einzelnen Zahlungsereignissen.
Session Windows gruppieren zusammengehörige Events zeitlich. Shopping-Cart-Ereignisse werden beispielsweise zu Sitzungen zusammengefasst, um Kaufverhalten zu analysieren.
Entity State Tracking verwaltet den vollständigen Zustand von Geschäftsobjekten über ihren Lebenszyklus hinweg.
from collections import defaultdict
import asyncio
from datetime import datetime
class OrderStatusTracker:
def __init__(self):
self.order_states = {}
async def handle_order_placed(self, event):
"""Stateful: Zustand wird für spätere Events gespeichert"""
order_status = {
'order_id': event['orderId'],
'status': 'PLACED',
'timestamp': event['timestamp'],
'items': event['items']
}
self.order_states[event['orderId']] = order_status
async def handle_payment_processed(self, event):
"""Stateful: Bestehender Zustand wird aktualisiert"""
order_id = event['orderId']
if order_id in self.order_states:
current_status = self.order_states[order_id]
if current_status['status'] == 'PLACED':
current_status['status'] = 'PAID'
current_status['payment_timestamp'] = event['timestamp']
# Weiteres Event für nachgelagerte Services
await self.publish_status_update(order_id, 'PAID')
def get_order_status(self, order_id):
return self.order_states.get(order_id)Die Wahl zwischen stateless und stateful Verarbeitung hat direkten Einfluss auf die Skalierbarkeit des Systems. Stateless Handler können beliebig horizontal skaliert werden, während stateful Handler zusätzliche Koordination benötigen.
Stateless Skalierung ist linear und vorhersagbar. Jede neue Handler-Instanz kann sofort Events verarbeiten, ohne Rücksicht auf andere Instanzen. Die Skalierung erfolgt typischerweise über die Anzahl der Consumer-Instanzen und die Partitionierung der Topics.
// Stateless Handler - beliebig skalierbar
@Component
public class InvoiceGenerator {
@KafkaListener(topics = "order.completed.v1",
containerFactory = "kafkaListenerContainerFactory")
public void generateInvoice(OrderCompletedEvent event) {
Invoice invoice = invoiceService.createInvoice(event);
pdfService.generatePdf(invoice);
storageService.store(invoice);
// Keine Koordination mit anderen Handler-Instanzen nötig
// Jede Instanz kann unabhängig arbeiten
}
}Stateful Skalierung erfordert Partitionierung des Zustands. Der Zustand muss entweder sharded oder über mehrere Instanzen synchronisiert werden.
| Skalierungsansatz | Stateless Handler | Stateful Handler |
|---|---|---|
| Instanz hinzufügen | Sofort produktiv | Zustand muss umverteilt werden |
| Lastverteilung | Beliebig möglich | Nur innerhalb der Partition |
| Fehlerbehandlung | Einfaches Restart | Zustandsrekonstruktion nötig |
| Deployment | Rolling Update möglich | Koordiniertes Update erforderlich |
Bei stateful Verarbeitung wird üblicherweise nach einem Partitionierungsschlüssel (z.B. Kunden-ID oder Bestell-ID) aufgeteilt:
import hashlib
class PartitionedOrderTracker:
def __init__(self, partition_count=4):
self.partition_count = partition_count
self.order_states = {}
def get_partition(self, order_id):
"""Bestimmt Partition basierend auf Order-ID"""
hash_value = int(hashlib.md5(order_id.encode()).hexdigest(), 16)
return hash_value % self.partition_count
async def handle_order_event(self, event):
order_id = event['orderId']
partition = self.get_partition(order_id)
# Nur Events der eigenen Partition verarbeiten
if partition == self.assigned_partition:
await self.process_order_event(event)
# Events anderer Partitionen werden ignoriertMemory Management wird bei stateful Verarbeitung kritisch. Ohne geeignete Strategien kann der Speicherverbrauch unbegrenzt wachsen:
@Component
public class ManagedOrderTracker {
private final Map<String, OrderStatus> orderStates = new ConcurrentHashMap<>();
@Scheduled(fixedRate = 300000) // Alle 5 Minuten
public void cleanupExpiredOrders() {
Instant cutoff = Instant.now().minus(Duration.ofHours(24));
orderStates.entrySet().removeIf(entry -> {
OrderStatus status = entry.getValue();
return status.isCompleted() &&
status.getCompletionTime().isBefore(cutoff);
});
}
@EventListener
public void handleOrderCompleted(OrderCompletedEvent event) {
OrderStatus status = orderStates.get(event.getOrderId());
if (status != null) {
status.markCompleted(event.getTimestamp());
// Status bleibt für 24h verfügbar, dann automatische Bereinigung
}
}
}Die Entscheidung zwischen stateless und stateful Verarbeitung sollte bewusst getroffen werden. Stateless Handler bieten maximale Flexibilität und einfache Skalierung, während stateful Handler komplexere Geschäftslogik ermöglichen, aber zusätzliche Architekturentscheidungen erfordern. Hybride Ansätze, bei denen verschiedene Handler unterschiedliche Strategien verwenden, sind in der Praxis häufig die beste Lösung.