Event-driven Systeme sind inhärent verteilt und müssen mit verschiedenen Arten von Fehlern umgehen können. Ein durchdachtes Fehlerbehandlungskonzept unterscheidet zwischen temporären und permanenten Fehlern und implementiert entsprechende Resilience Patterns.
Fehlerklassifizierung ist der erste Schritt für effektive Resilience. Temporäre Fehler wie Netzwerkprobleme oder überlastete Services rechtfertigen Wiederholungsversuche, während permanente Fehler wie Validierungsfehler oder fehlende Berechtigungen sofort behandelt werden müssen.
@Component
public class OrderEventHandler {
@EventListener
public void handleOrderPlaced(OrderPlacedEvent event) {
try {
processOrder(event);
} catch (ValidationException e) {
// Permanenter Fehler - keine Wiederholung
handlePermanentFailure(event, e);
} catch (ServiceUnavailableException e) {
// Temporärer Fehler - Retry sinnvoll
throw new RetryableException("Service temporarily unavailable", e);
} catch (DatabaseException e) {
// Temporärer Fehler - könnte sich erholen
throw new RetryableException("Database connection failed", e);
}
}
}Exponential Backoff verhindert, dass fehlerhafte Consumer ein bereits überlastetes System weiter belasten. Die Wartezeit zwischen Wiederholungsversuchen steigt exponentiell an, wodurch dem System Zeit zur Erholung gegeben wird.
import time
import random
from typing import Callable, Any
class ExponentialBackoffRetry:
def __init__(self, max_attempts=5, base_delay=1.0, max_delay=60.0):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
def execute(self, func: Callable, *args, **kwargs) -> Any:
for attempt in range(self.max_attempts):
try:
return func(*args, **kwargs)
except TransientError as e:
if attempt == self.max_attempts - 1:
raise e
# Exponential Backoff mit Jitter
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")Dead Letter Topics sind entscheidend für die Behandlung von Events, die nach mehreren Versuchen nicht verarbeitet werden können. Diese Events werden in spezielle Topics umgeleitet, wo sie später analysiert oder manuell bearbeitet werden können.
@Component
public class DeadLetterHandler {
private final KafkaTemplate<String, String> kafkaTemplate;
@Retryable(value = {TransientException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2))
public void processEvent(OrderPlacedEvent event) {
// Verarbeitung mit automatischem Retry
orderService.processOrder(event);
}
@Recover
public void handleFailedEvent(TransientException ex, OrderPlacedEvent event) {
// Nach allen Retry-Versuchen: Ab ins Dead Letter Topic
DeadLetterEvent deadLetter = new DeadLetterEvent(
event,
ex.getMessage(),
Instant.now(),
"order-processing-service"
);
kafkaTemplate.send("order.placed.dead-letter",
event.getOrderId(),
toJson(deadLetter));
}
}Poison Message Detection erkennt Events, die systematisch Fehler verursachen. Solche Events werden nach wenigen Versuchen direkt in Dead Letter Topics geleitet, um zu verhindern, dass sie den gesamten Consumer blockieren.
Circuit Breaker schützen nachgelagerte Services vor Überlastung, indem sie bei einer kritischen Fehlerrate die Verbindung temporär unterbrechen. In EDA werden Circuit Breaker typischerweise in Consumern eingesetzt, die externe Services aufrufen.
@Component
public class PaymentServiceClient {
@CircuitBreaker(name = "payment-service",
fallbackMethod = "fallbackPayment")
@TimeLimiter(name = "payment-service")
@Retry(name = "payment-service")
public CompletableFuture<PaymentResult> processPayment(PaymentRequest request) {
return CompletableFuture.supplyAsync(() -> {
// Call to external payment service
return paymentApiClient.processPayment(request);
});
}
public CompletableFuture<PaymentResult> fallbackPayment(PaymentRequest request, Exception ex) {
// Fallback: Event für späteren Retry erzeugen
publishPaymentRetryEvent(request);
return CompletableFuture.completedFuture(
PaymentResult.deferred("Payment service unavailable")
);
}
}Bulkhead Pattern isoliert verschiedene Arten von Events voneinander, sodass Probleme in einem Bereich nicht das gesamte System beeinträchtigen. Dies wird durch separate Consumer Groups oder Thread Pools erreicht.
@Configuration
public class BulkheadConfig {
// Kritische Events - dedizierte Ressourcen
@Bean("criticalEventExecutor")
public TaskExecutor criticalEventExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("critical-events-");
return executor;
}
// Bulk Events - getrennte Ressourcen
@Bean("bulkEventExecutor")
public TaskExecutor bulkEventExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("bulk-events-");
return executor;
}
}
@KafkaListener(topics = "order.placed.v1",
containerFactory = "criticalEventContainer")
public void handleCriticalOrder(OrderPlacedEvent event) {
// Kritische Bestellverarbeitung
}
@KafkaListener(topics = "analytics.events.v1",
containerFactory = "bulkEventContainer")
public void handleAnalyticsEvent(AnalyticsEvent event) {
// Unkritische Analytics-Verarbeitung
}Rate Limiting verhindert, dass Consumer mit zu hoher Geschwindigkeit Events verarbeiten und nachgelagerte Systeme überlasten. Dies ist besonders wichtig bei der Integration mit Legacy-Systemen oder externen APIs.
import asyncio
from asyncio import Semaphore
from datetime import datetime, timedelta
class RateLimitedConsumer:
def __init__(self, max_concurrent=10, requests_per_minute=100):
self.semaphore = Semaphore(max_concurrent)
self.requests_per_minute = requests_per_minute
self.request_times = []
async def process_event(self, event):
await self.semaphore.acquire()
try:
await self._check_rate_limit()
await self._handle_event(event)
finally:
self.semaphore.release()
async def _check_rate_limit(self):
now = datetime.now()
cutoff = now - timedelta(minutes=1)
# Alte Requests entfernen
self.request_times = [t for t in self.request_times if t > cutoff]
if len(self.request_times) >= self.requests_per_minute:
sleep_time = (self.request_times[0] + timedelta(minutes=1) - now).total_seconds()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.request_times.append(now)Key Performance Indicators für Event-driven Systeme unterscheiden sich von traditionellen Request-Response-Systemen. Consumer Lag ist oft wichtiger als Response Time, und Event Processing Rate wichtiger als Request Rate.
| Metrik | Beschreibung | Kritischer Wert | Action |
|---|---|---|---|
| Consumer Lag | Events zwischen Producer und Consumer | >1000 Events | Scale Consumer |
| Processing Rate | Events/Sekunde verarbeitet | <Expected Rate | Investigate Bottlenecks |
| Error Rate | Fehlgeschlagene Events/Gesamt | >1% | Check Dead Letter Topics |
| Dead Letter Volume | Events in Dead Letter Topics | >0 consistently | Manual Intervention |
| Circuit Breaker State | Offen/Geschlossen/Halb-Offen | Open >5min | Check Downstream Services |
Prometheus Metrics bieten sich für Event-driven Monitoring an, da sie gut mit zeitreihenbasierten Daten umgehen können:
@Component
public class EventMetrics {
private final Counter eventsProcessed = Counter.build()
.name("events_processed_total")
.help("Total processed events")
.labelNames("topic", "consumer_group", "status")
.register();
private final Histogram processingDuration = Histogram.build()
.name("event_processing_duration_seconds")
.help("Event processing duration")
.labelNames("topic", "consumer_group")
.register();
private final Gauge consumerLag = Gauge.build()
.name("consumer_lag_events")
.help("Consumer lag in events")
.labelNames("topic", "partition", "consumer_group")
.register();
public void recordEventProcessed(String topic, String group, String status,
double duration) {
eventsProcessed.labels(topic, group, status).inc();
processingDuration.labels(topic, group).observe(duration);
}
public void updateConsumerLag(String topic, int partition, String group,
double lag) {
consumerLag.labels(topic, String.valueOf(partition), group).set(lag);
}
}Alerting Rules sollten auf Business Impact fokussiert sein, nicht nur auf technische Metriken. Ein hoher Consumer Lag bei Analytics-Events ist weniger kritisch als bei Payment-Events.
# Prometheus Alerting Rules
groups:
- name: event_processing_alerts
rules:
- alert: HighConsumerLag
expr: consumer_lag_events{topic=~"order.*|payment.*"} > 1000
for: 2m
labels:
severity: warning
annotations:
summary: "High consumer lag on critical topic"
- alert: DeadLetterAccumulation
expr: increase(events_processed_total{status="dead_letter"}[5m]) > 10
for: 1m
labels:
severity: critical
annotations:
summary: "Events accumulating in dead letter topics"
- alert: CircuitBreakerOpen
expr: circuit_breaker_state{state="open"} == 1
for: 5m
labels:
severity: warning
annotations:
summary: "Circuit breaker has been open for 5 minutes"Distributed Tracing wird in Event-driven Systemen besonders wichtig, da ein Business-Vorgang oft mehrere asynchrone Schritte umfasst. Correlation IDs müssen durch alle Events propagiert werden.
@Component
public class TracingEventHandler {
@NewSpan("order-processing")
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(@SpanTag("orderId") String orderId,
OrderPlacedEvent event,
@Header Map<String, Object> headers) {
// Tracing Context aus Headers extrahieren
SpanContext spanContext = extractSpanContext(headers);
try (Scope scope = tracer.activateSpan(spanContext)) {
processOrder(event);
// Correlation ID für nachgelagerte Events setzen
publishInventoryReservationEvent(event.getOrderId(),
getCurrentTraceId());
}
}
private void publishInventoryReservationEvent(String orderId, String traceId) {
Headers headers = new RecordHeaders();
headers.add("traceId", traceId.getBytes());
headers.add("correlationId", orderId.getBytes());
kafkaTemplate.send("inventory.reservation.v1", orderId, event, headers);
}
}Health Checks für Event-Consumer sollten nicht nur die technische Verfügbarkeit prüfen, sondern auch die funktionale Gesundheit. Ein Consumer, der Events empfängt aber nicht verarbeitet, ist technisch verfügbar aber funktional defekt.
@Component
public class EventConsumerHealthIndicator implements HealthIndicator {
private volatile Instant lastEventProcessed = Instant.now();
private final Duration maxIdleTime = Duration.ofMinutes(5);
@EventListener
public void onEventProcessed(EventProcessedEvent event) {
lastEventProcessed = Instant.now();
}
@Override
public Health health() {
Duration idleTime = Duration.between(lastEventProcessed, Instant.now());
if (idleTime.compareTo(maxIdleTime) > 0) {
return Health.down()
.withDetail("lastEventProcessed", lastEventProcessed)
.withDetail("idleTime", idleTime.toString())
.build();
}
return Health.up()
.withDetail("lastEventProcessed", lastEventProcessed)
.build();
}
}Resiliente Event-driven Systeme erfordern eine Kombination aus präventiven Maßnahmen (Circuit Breaker, Rate Limiting), reaktiven Strategien (Retry, Dead Letter Topics) und umfassendem Monitoring. Die Balance zwischen Robustheit und Komplexität muss für jeden Anwendungsfall individuell gefunden werden.