Robuste Event-Consumer-Systeme müssen mit verschiedenen Fehlersituationen umgehen können. Eine durchdachte Fehlerbehandlung unterscheidet zwischen temporären und permanenten Problemen und implementiert angemessene Recovery-Strategien. Dies verhindert, dass einzelne defekte Events die gesamte Verarbeitung blockieren.
Die Klassifizierung von Fehlern ist fundamental für eine effektive Fehlerbehandlung. Verschiedene Fehlertypen erfordern unterschiedliche Behandlungsstrategien.
Transiente Fehler sind vorübergehend und können durch Wiederholung behoben werden.
@Component
public class ErrorClassifyingPaymentService {
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
try {
processPayment(event);
acknowledgment.acknowledge();
} catch (Exception e) {
ErrorType errorType = classifyError(e);
handleErrorByType(event, e, errorType, acknowledgment);
}
}
private ErrorType classifyError(Exception e) {
// Netzwerk- und Service-Fehler (meist transient)
if (e instanceof ConnectTimeoutException ||
e instanceof SocketTimeoutException ||
e instanceof HttpServerErrorException) {
return ErrorType.TRANSIENT_NETWORK;
}
// Service temporär nicht verfügbar
if (e instanceof ServiceUnavailableException ||
(e instanceof HttpStatusCodeException &&
((HttpStatusCodeException) e).getStatusCode().is5xxServerError())) {
return ErrorType.TRANSIENT_SERVICE;
}
// Datenbank-Deadlocks und temporäre Sperren
if (e instanceof CannotAcquireLockException ||
e instanceof DeadlockLoserDataAccessException) {
return ErrorType.TRANSIENT_DATABASE;
}
// Rate Limiting
if (e instanceof HttpClientErrorException &&
((HttpClientErrorException) e).getStatusCode() == HttpStatus.TOO_MANY_REQUESTS) {
return ErrorType.TRANSIENT_RATE_LIMIT;
}
// Validierungsfehler (permanent)
if (e instanceof ValidationException ||
e instanceof IllegalArgumentException) {
return ErrorType.PERMANENT_VALIDATION;
}
// Autorisierungsfehler (permanent)
if (e instanceof UnauthorizedException ||
e instanceof ForbiddenException) {
return ErrorType.PERMANENT_AUTHORIZATION;
}
// Business Logic Fehler (permanent)
if (e instanceof InsufficientFundsException ||
e instanceof InvalidPaymentMethodException) {
return ErrorType.PERMANENT_BUSINESS;
}
// Unbekannte Fehler erstmal als transient behandeln
return ErrorType.UNKNOWN_TRANSIENT;
}
private void handleErrorByType(OrderPlacedEvent event, Exception e,
ErrorType errorType, Acknowledgment acknowledgment) {
switch (errorType) {
case TRANSIENT_NETWORK:
case TRANSIENT_SERVICE:
case TRANSIENT_DATABASE:
scheduleRetry(event, e, getRetryDelay(errorType));
break;
case TRANSIENT_RATE_LIMIT:
scheduleRetryWithBackoff(event, e, Duration.ofMinutes(5));
break;
case PERMANENT_VALIDATION:
case PERMANENT_AUTHORIZATION:
case PERMANENT_BUSINESS:
sendToDeadLetterTopic(event, e, "Permanent error: " + errorType);
acknowledgment.acknowledge(); // Event als verarbeitet markieren
break;
case UNKNOWN_TRANSIENT:
scheduleRetryWithLimit(event, e, 3);
break;
}
}
}
enum ErrorType {
TRANSIENT_NETWORK,
TRANSIENT_SERVICE,
TRANSIENT_DATABASE,
TRANSIENT_RATE_LIMIT,
PERMANENT_VALIDATION,
PERMANENT_AUTHORIZATION,
PERMANENT_BUSINESS,
UNKNOWN_TRANSIENT
}Python Implementierung:
import asyncio
from enum import Enum
from typing import Optional
import logging
class ErrorType(Enum):
TRANSIENT_NETWORK = "transient_network"
TRANSIENT_SERVICE = "transient_service"
TRANSIENT_DATABASE = "transient_database"
PERMANENT_VALIDATION = "permanent_validation"
PERMANENT_BUSINESS = "permanent_business"
UNKNOWN = "unknown"
class ErrorClassifyingPaymentService:
def __init__(self, kafka_config):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.logger = logging.getLogger(__name__)
def classify_error(self, exception: Exception) -> ErrorType:
error_message = str(exception).lower()
# Netzwerk-Fehler
if any(keyword in error_message for keyword in
['timeout', 'connection', 'network', 'unreachable']):
return ErrorType.TRANSIENT_NETWORK
# Service-Fehler (HTTP 5xx)
if any(keyword in error_message for keyword in
['internal server', '500', '502', '503', '504']):
return ErrorType.TRANSIENT_SERVICE
# Validierungsfehler
if any(keyword in error_message for keyword in
['validation', 'invalid', 'missing required']):
return ErrorType.PERMANENT_VALIDATION
# Business-Fehler
if any(keyword in error_message for keyword in
['insufficient funds', 'invalid payment method']):
return ErrorType.PERMANENT_BUSINESS
return ErrorType.UNKNOWN
async def handle_error_by_type(self, event: dict, error: Exception,
error_type: ErrorType) -> bool:
"""Returns True if event should be committed, False for retry"""
if error_type in [ErrorType.TRANSIENT_NETWORK, ErrorType.TRANSIENT_SERVICE]:
await self.schedule_retry(event, error)
return False # Nicht committen - Retry
elif error_type in [ErrorType.PERMANENT_VALIDATION, ErrorType.PERMANENT_BUSINESS]:
await self.send_to_dead_letter_topic(event, error)
return True # Committen - permanent failed
elif error_type == ErrorType.UNKNOWN:
retry_count = event.get('_retry_count', 0)
if retry_count < 3:
await self.schedule_retry_with_limit(event, error, retry_count + 1)
return False
else:
await self.send_to_dead_letter_topic(event, error)
return True
return TrueExponential Backoff verhindert, dass fehlerhafte Services durch aggressive Wiederholungen überlastet werden.
@Component
public class ExponentialBackoffPaymentService {
private final RedisTemplate<String, Object> redisTemplate;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
String retryKey = "retry:payment:" + event.getOrderId();
try {
processPayment(event);
// Bei Erfolg Retry-Info löschen
redisTemplate.delete(retryKey);
acknowledgment.acknowledge();
} catch (Exception e) {
if (isTransientError(e)) {
RetryInfo retryInfo = getOrCreateRetryInfo(retryKey);
if (retryInfo.getAttempts() < getMaxRetries(e)) {
scheduleRetryWithBackoff(event, retryInfo);
} else {
log.error("Max retries exceeded for order {}", event.getOrderId());
sendToDeadLetterTopic(event, e, "Max retries exceeded");
acknowledgment.acknowledge();
}
} else {
sendToDeadLetterTopic(event, e, "Permanent error");
acknowledgment.acknowledge();
}
}
}
private void scheduleRetryWithBackoff(OrderPlacedEvent event, RetryInfo retryInfo) {
int attempt = retryInfo.getAttempts();
long delay = calculateBackoffDelay(attempt);
// Retry-Info aktualisieren
retryInfo.setAttempts(attempt + 1);
retryInfo.setLastAttempt(Instant.now());
retryInfo.setNextRetry(Instant.now().plusMillis(delay));
saveRetryInfo(retryInfo);
// Event für verzögerte Verarbeitung einplanen
ScheduledRetryEvent retryEvent = ScheduledRetryEvent.builder()
.originalEvent(event)
.scheduleTime(Instant.now().plusMillis(delay))
.attempt(attempt + 1)
.build();
kafkaTemplate.send("payment.retry.v1", retryEvent);
log.info("Scheduled retry {} for order {} in {}ms",
attempt + 1, event.getOrderId(), delay);
}
private long calculateBackoffDelay(int attempt) {
// Exponential Backoff: 1s, 2s, 4s, 8s, 16s, ...
long baseDelay = 1000; // 1 Sekunde
long maxDelay = 300000; // 5 Minuten Maximum
long delay = (long) (baseDelay * Math.pow(2, attempt));
// Jitter hinzufügen (±25% Randomisierung)
double jitterFactor = 0.75 + (Math.random() * 0.5);
delay = (long) (delay * jitterFactor);
return Math.min(delay, maxDelay);
}
private int getMaxRetries(Exception e) {
if (e instanceof ConnectTimeoutException) return 5;
if (e instanceof ServiceUnavailableException) return 3;
if (e instanceof RateLimitException) return 10;
return 3; // Default
}
}
// Separater Consumer für Retry-Events
@Component
public class RetryEventProcessor {
@KafkaListener(topics = "payment.retry.v1")
public void handleRetryEvent(ScheduledRetryEvent retryEvent, Acknowledgment acknowledgment) {
if (Instant.now().isAfter(retryEvent.getScheduleTime())) {
// Zeit für Retry ist gekommen
try {
processPayment(retryEvent.getOriginalEvent());
acknowledgment.acknowledge();
} catch (Exception e) {
// Weiterleitung an normalen Error-Handler
kafkaTemplate.send("order.placed.v1", retryEvent.getOriginalEvent());
acknowledgment.acknowledge();
}
} else {
// Noch nicht Zeit für Retry - Event zurück in Topic
kafkaTemplate.send("payment.retry.v1", retryEvent);
acknowledgment.acknowledge();
}
}
}@Component
public class CircuitBreakerPaymentService {
private final CircuitBreaker paymentGatewayCircuitBreaker;
public CircuitBreakerPaymentService() {
this.paymentGatewayCircuitBreaker = CircuitBreaker.ofDefaults("paymentGateway");
// Circuit Breaker Konfiguration
paymentGatewayCircuitBreaker.getEventPublisher()
.onStateTransition(event ->
log.info("Circuit breaker state transition: {} -> {}",
event.getStateTransition().getFromState(),
event.getStateTransition().getToState()));
}
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
try {
PaymentResult result = paymentGatewayCircuitBreaker.executeSupplier(() ->
paymentGateway.processPayment(createPaymentRequest(event))
);
if (result.isSuccessful()) {
publishPaymentProcessedEvent(event.getOrderId(), result);
acknowledgment.acknowledge();
} else {
handlePaymentFailure(event, result);
}
} catch (CallNotPermittedException e) {
// Circuit Breaker ist offen
log.warn("Circuit breaker open for order {}, scheduling retry",
event.getOrderId());
scheduleRetryWhenCircuitClosed(event);
} catch (Exception e) {
handlePaymentError(event, e, acknowledgment);
}
}
private void scheduleRetryWhenCircuitClosed(OrderPlacedEvent event) {
// Event für späteren Retry vormerken
CircuitBreakerRetryEvent retryEvent = CircuitBreakerRetryEvent.builder()
.originalEvent(event)
.reason("Circuit breaker open")
.scheduleTime(Instant.now().plusMinutes(1))
.build();
kafkaTemplate.send("payment.circuit-breaker-retry.v1", retryEvent);
}
}Python Asyncio Circuit Breaker:
import asyncio
from enum import Enum
import time
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs) -> Any:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
print("Circuit breaker half-open, trying request")
else:
raise Exception("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker opened after {self.failure_count} failures")
class CircuitBreakerPaymentService:
def __init__(self, kafka_config):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
async def process_events(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
try:
event = json.loads(msg.value().decode('utf-8'))
# Payment über Circuit Breaker verarbeiten
result = await self.circuit_breaker.call(
self.process_payment_external, event
)
if result['success']:
self.consumer.commit(msg)
print(f"Payment processed for order: {event['orderId']}")
except Exception as e:
if "Circuit breaker is open" in str(e):
await self.schedule_circuit_breaker_retry(event)
else:
await self.handle_payment_error(event, e)@Component
public class DeadLetterTopicHandler {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final DeadLetterRepository deadLetterRepository;
public void sendToDeadLetterTopic(OrderPlacedEvent originalEvent,
Exception error, String reason) {
DeadLetterEvent deadLetterEvent = DeadLetterEvent.builder()
.originalEventId(originalEvent.getEventId())
.originalTopic("order.placed.v1")
.orderId(originalEvent.getOrderId())
.originalPayload(objectMapper.writeValueAsString(originalEvent))
.errorMessage(error.getMessage())
.errorStackTrace(getStackTrace(error))
.reason(reason)
.timestamp(Instant.now())
.retryCount(getRetryCount(originalEvent))
.build();
// Dead Letter Topic senden
kafkaTemplate.send("order.placed.v1.DLT", deadLetterEvent);
// Lokale Persistence für Management UI
DeadLetterRecord record = mapToRecord(deadLetterEvent);
deadLetterRepository.save(record);
// Monitoring und Alerting
meterRegistry.counter("dead.letter.events",
"topic", "order.placed.v1",
"reason", reason).increment();
log.error("Sent event to Dead Letter Topic: orderId={}, reason={}",
originalEvent.getOrderId(), reason);
}
// Dead Letter Topic Consumer für Analyse
@KafkaListener(topics = "order.placed.v1.DLT")
public void handleDeadLetterEvent(DeadLetterEvent deadLetterEvent) {
log.info("Received dead letter event: {}", deadLetterEvent.getOriginalEventId());
// Kategorisierung für Analyse
String errorCategory = categorizeError(deadLetterEvent.getErrorMessage());
// Statistiken aktualisieren
updateDeadLetterStatistics(deadLetterEvent, errorCategory);
// Bei bestimmten Fehlern automatische Remediation versuchen
if (isRemediable(errorCategory)) {
attemptAutomaticRemediation(deadLetterEvent);
}
}
private void attemptAutomaticRemediation(DeadLetterEvent deadLetterEvent) {
try {
OrderPlacedEvent originalEvent = objectMapper.readValue(
deadLetterEvent.getOriginalPayload(), OrderPlacedEvent.class
);
// Enrichment oder Korrektur versuchen
OrderPlacedEvent correctedEvent = enrichAndCorrectEvent(originalEvent);
if (correctedEvent != null) {
// Zurück in originales Topic senden
kafkaTemplate.send("order.placed.v1", correctedEvent);
// Dead Letter Record als resolved markieren
deadLetterRepository.markAsResolved(
deadLetterEvent.getOriginalEventId(),
"Automatic remediation successful"
);
log.info("Successfully remediated dead letter event: {}",
deadLetterEvent.getOriginalEventId());
}
} catch (Exception e) {
log.error("Automatic remediation failed for event {}",
deadLetterEvent.getOriginalEventId(), e);
}
}
}@Component
public class PoisonMessageDetector {
private final RedisTemplate<String, Object> redisTemplate;
private static final String POISON_KEY_PREFIX = "poison:";
private static final int POISON_THRESHOLD = 5;
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event,
ConsumerRecord<String, OrderPlacedEvent> record,
Acknowledgment acknowledgment) {
String messageKey = generateMessageKey(record);
// Prüfung auf Poison Message
if (isPoisonMessage(messageKey)) {
log.warn("Detected poison message, sending to quarantine: {}", messageKey);
sendToQuarantineTopic(event, "Poison message detected");
acknowledgment.acknowledge();
return;
}
try {
processPayment(event);
// Bei Erfolg Poison-Counter zurücksetzen
redisTemplate.delete(POISON_KEY_PREFIX + messageKey);
acknowledgment.acknowledge();
} catch (Exception e) {
// Fehler-Count für diese Message erhöhen
incrementPoisonCounter(messageKey);
if (isTransientError(e)) {
// Normal retry für transiente Fehler
throw e;
} else {
sendToDeadLetterTopic(event, e, "Permanent error");
acknowledgment.acknowledge();
}
}
}
private String generateMessageKey(ConsumerRecord<String, OrderPlacedEvent> record) {
// Eindeutiger Key basierend auf Message-Inhalt
OrderPlacedEvent event = record.value();
return DigestUtils.md5Hex(
event.getOrderId() + ":" +
event.getCustomerId() + ":" +
event.getTotalAmount() + ":" +
record.topic() + ":" +
record.partition() + ":" +
record.offset()
);
}
private boolean isPoisonMessage(String messageKey) {
String counterKey = POISON_KEY_PREFIX + messageKey;
Integer count = (Integer) redisTemplate.opsForValue().get(counterKey);
return count != null && count >= POISON_THRESHOLD;
}
private void incrementPoisonCounter(String messageKey) {
String counterKey = POISON_KEY_PREFIX + messageKey;
redisTemplate.opsForValue().increment(counterKey);
redisTemplate.expire(counterKey, Duration.ofHours(24));
}
}class MessageQuarantineService:
def __init__(self, kafka_config):
self.consumer = Consumer(kafka_config)
self.producer = Producer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.quarantine_map = {} # In-Memory für Demo
async def process_with_quarantine(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
message_id = self.generate_message_id(msg)
try:
# Quarantine-Check
if self.is_quarantined(message_id):
print(f"Message {message_id} is quarantined, skipping")
self.consumer.commit(msg)
continue
event = json.loads(msg.value().decode('utf-8'))
success = await self.process_payment(event)
if success:
# Bei Erfolg Quarantine-Info löschen
self.remove_from_quarantine(message_id)
self.consumer.commit(msg)
else:
self.handle_processing_failure(msg, message_id, "Processing failed")
except Exception as e:
self.handle_processing_failure(msg, message_id, str(e))
def handle_processing_failure(self, msg, message_id: str, error_reason: str):
failure_count = self.increment_failure_count(message_id)
if failure_count >= 3:
print(f"Message {message_id} failed {failure_count} times, quarantining")
self.quarantine_message(message_id, error_reason)
self.send_to_quarantine_topic(msg, error_reason)
# Kein Commit - Message wird wiederholt
def quarantine_message(self, message_id: str, reason: str):
self.quarantine_map[message_id] = {
'reason': reason,
'quarantined_at': time.time(),
'failure_count': self.get_failure_count(message_id)
}
def send_to_quarantine_topic(self, msg, reason: str):
quarantine_event = {
'original_topic': msg.topic(),
'original_partition': msg.partition(),
'original_offset': msg.offset(),
'original_payload': msg.value().decode('utf-8'),
'quarantine_reason': reason,
'quarantined_at': time.time()
}
self.producer.produce(
'quarantine.messages.v1',
key=msg.key(),
value=json.dumps(quarantine_event).encode('utf-8')
)
self.producer.flush()
# Recovery Service für quarantinierte Messages
class QuarantineRecoveryService:
def __init__(self, kafka_config):
self.consumer = Consumer(kafka_config)
self.producer = Producer(kafka_config)
self.consumer.subscribe(['quarantine.messages.v1'])
async def review_quarantined_messages(self):
"""Periodische Überprüfung quarantinierter Messages"""
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
await asyncio.sleep(5)
continue
try:
quarantine_event = json.loads(msg.value().decode('utf-8'))
# Automatische Recovery-Versuche
if await self.can_auto_recover(quarantine_event):
await self.attempt_auto_recovery(quarantine_event)
else:
# Für manuellen Review vormerken
await self.log_for_manual_review(quarantine_event)
self.consumer.commit(msg)
except Exception as e:
print(f"Error processing quarantined message: {e}")
async def can_auto_recover(self, quarantine_event: dict) -> bool:
"""Bestimmt, ob automatische Recovery möglich ist"""
reason = quarantine_event['quarantine_reason']
# Beispiele für auto-recoverable Probleme
auto_recoverable_reasons = [
'timeout',
'service unavailable',
'temporary network error'
]
return any(recoverable in reason.lower()
for recoverable in auto_recoverable_reasons)
async def attempt_auto_recovery(self, quarantine_event: dict):
"""Versucht automatische Wiederherstellung"""
try:
original_payload = quarantine_event['original_payload']
original_topic = quarantine_event['original_topic']
# Message zurück in originales Topic senden
self.producer.produce(
original_topic,
value=original_payload.encode('utf-8')
)
self.producer.flush()
print(f"Auto-recovered quarantined message to {original_topic}")
except Exception as e:
print(f"Auto-recovery failed: {e}")Effektive Fehlertoleranz in Event-Driven-Systemen erfordert eine Kombination aus intelligenter Fehlerklassifizierung, angemessenen Retry-Strategien und robusten Dead-Letter-Mechanismen. Die Implementierung sollte zwischen temporären und permanenten Fehlern unterscheiden und entsprechende Recovery-Strategien anwenden.
Das siehe Kapitel zu “Beispiele mit Zustandsänderung und Geschäftslogik” wird zeigen, wie diese Fehlerbehandlungsstrategien in komplexeren stateful Event-Processing-Szenarien angewendet werden, während das siehe Kapitel zu “Verarbeitung und Zustandsmanagement” die Auswirkungen von Fehlern auf die Zustandskonsistenz behandelt.