31 Fehlertoleranz, Dead Letter Topics, Wiederholungen

Robuste Event-Consumer-Systeme müssen mit verschiedenen Fehlersituationen umgehen können. Eine durchdachte Fehlerbehandlung unterscheidet zwischen temporären und permanenten Problemen und implementiert angemessene Recovery-Strategien. Dies verhindert, dass einzelne defekte Events die gesamte Verarbeitung blockieren.

31.1 Error Classification

Die Klassifizierung von Fehlern ist fundamental für eine effektive Fehlerbehandlung. Verschiedene Fehlertypen erfordern unterschiedliche Behandlungsstrategien.

31.1.1 Transiente Fehler

Transiente Fehler sind vorübergehend und können durch Wiederholung behoben werden.

@Component
public class ErrorClassifyingPaymentService {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        try {
            processPayment(event);
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            ErrorType errorType = classifyError(e);
            handleErrorByType(event, e, errorType, acknowledgment);
        }
    }
    
    private ErrorType classifyError(Exception e) {
        // Netzwerk- und Service-Fehler (meist transient)
        if (e instanceof ConnectTimeoutException || 
            e instanceof SocketTimeoutException ||
            e instanceof HttpServerErrorException) {
            return ErrorType.TRANSIENT_NETWORK;
        }
        
        // Service temporär nicht verfügbar
        if (e instanceof ServiceUnavailableException ||
            (e instanceof HttpStatusCodeException && 
             ((HttpStatusCodeException) e).getStatusCode().is5xxServerError())) {
            return ErrorType.TRANSIENT_SERVICE;
        }
        
        // Datenbank-Deadlocks und temporäre Sperren
        if (e instanceof CannotAcquireLockException ||
            e instanceof DeadlockLoserDataAccessException) {
            return ErrorType.TRANSIENT_DATABASE;
        }
        
        // Rate Limiting
        if (e instanceof HttpClientErrorException &&
            ((HttpClientErrorException) e).getStatusCode() == HttpStatus.TOO_MANY_REQUESTS) {
            return ErrorType.TRANSIENT_RATE_LIMIT;
        }
        
        // Validierungsfehler (permanent)
        if (e instanceof ValidationException ||
            e instanceof IllegalArgumentException) {
            return ErrorType.PERMANENT_VALIDATION;
        }
        
        // Autorisierungsfehler (permanent)
        if (e instanceof UnauthorizedException ||
            e instanceof ForbiddenException) {
            return ErrorType.PERMANENT_AUTHORIZATION;
        }
        
        // Business Logic Fehler (permanent)
        if (e instanceof InsufficientFundsException ||
            e instanceof InvalidPaymentMethodException) {
            return ErrorType.PERMANENT_BUSINESS;
        }
        
        // Unbekannte Fehler erstmal als transient behandeln
        return ErrorType.UNKNOWN_TRANSIENT;
    }
    
    private void handleErrorByType(OrderPlacedEvent event, Exception e, 
                                 ErrorType errorType, Acknowledgment acknowledgment) {
        switch (errorType) {
            case TRANSIENT_NETWORK:
            case TRANSIENT_SERVICE:
            case TRANSIENT_DATABASE:
                scheduleRetry(event, e, getRetryDelay(errorType));
                break;
                
            case TRANSIENT_RATE_LIMIT:
                scheduleRetryWithBackoff(event, e, Duration.ofMinutes(5));
                break;
                
            case PERMANENT_VALIDATION:
            case PERMANENT_AUTHORIZATION:
            case PERMANENT_BUSINESS:
                sendToDeadLetterTopic(event, e, "Permanent error: " + errorType);
                acknowledgment.acknowledge(); // Event als verarbeitet markieren
                break;
                
            case UNKNOWN_TRANSIENT:
                scheduleRetryWithLimit(event, e, 3);
                break;
        }
    }
}

enum ErrorType {
    TRANSIENT_NETWORK,
    TRANSIENT_SERVICE, 
    TRANSIENT_DATABASE,
    TRANSIENT_RATE_LIMIT,
    PERMANENT_VALIDATION,
    PERMANENT_AUTHORIZATION,
    PERMANENT_BUSINESS,
    UNKNOWN_TRANSIENT
}

Python Implementierung:

import asyncio
from enum import Enum
from typing import Optional
import logging

class ErrorType(Enum):
    TRANSIENT_NETWORK = "transient_network"
    TRANSIENT_SERVICE = "transient_service" 
    TRANSIENT_DATABASE = "transient_database"
    PERMANENT_VALIDATION = "permanent_validation"
    PERMANENT_BUSINESS = "permanent_business"
    UNKNOWN = "unknown"

class ErrorClassifyingPaymentService:
    def __init__(self, kafka_config):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.logger = logging.getLogger(__name__)
    
    def classify_error(self, exception: Exception) -> ErrorType:
        error_message = str(exception).lower()
        
        # Netzwerk-Fehler
        if any(keyword in error_message for keyword in 
               ['timeout', 'connection', 'network', 'unreachable']):
            return ErrorType.TRANSIENT_NETWORK
        
        # Service-Fehler (HTTP 5xx)
        if any(keyword in error_message for keyword in 
               ['internal server', '500', '502', '503', '504']):
            return ErrorType.TRANSIENT_SERVICE
        
        # Validierungsfehler
        if any(keyword in error_message for keyword in 
               ['validation', 'invalid', 'missing required']):
            return ErrorType.PERMANENT_VALIDATION
        
        # Business-Fehler
        if any(keyword in error_message for keyword in 
               ['insufficient funds', 'invalid payment method']):
            return ErrorType.PERMANENT_BUSINESS
        
        return ErrorType.UNKNOWN
    
    async def handle_error_by_type(self, event: dict, error: Exception, 
                                 error_type: ErrorType) -> bool:
        """Returns True if event should be committed, False for retry"""
        
        if error_type in [ErrorType.TRANSIENT_NETWORK, ErrorType.TRANSIENT_SERVICE]:
            await self.schedule_retry(event, error)
            return False  # Nicht committen - Retry
            
        elif error_type in [ErrorType.PERMANENT_VALIDATION, ErrorType.PERMANENT_BUSINESS]:
            await self.send_to_dead_letter_topic(event, error)
            return True  # Committen - permanent failed
            
        elif error_type == ErrorType.UNKNOWN:
            retry_count = event.get('_retry_count', 0)
            if retry_count < 3:
                await self.schedule_retry_with_limit(event, error, retry_count + 1)
                return False
            else:
                await self.send_to_dead_letter_topic(event, error)
                return True
        
        return True

31.2 Retry Policies

31.2.1 Exponential Backoff

Exponential Backoff verhindert, dass fehlerhafte Services durch aggressive Wiederholungen überlastet werden.

@Component
public class ExponentialBackoffPaymentService {
    private final RedisTemplate<String, Object> redisTemplate;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        String retryKey = "retry:payment:" + event.getOrderId();
        
        try {
            processPayment(event);
            
            // Bei Erfolg Retry-Info löschen
            redisTemplate.delete(retryKey);
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            if (isTransientError(e)) {
                RetryInfo retryInfo = getOrCreateRetryInfo(retryKey);
                
                if (retryInfo.getAttempts() < getMaxRetries(e)) {
                    scheduleRetryWithBackoff(event, retryInfo);
                } else {
                    log.error("Max retries exceeded for order {}", event.getOrderId());
                    sendToDeadLetterTopic(event, e, "Max retries exceeded");
                    acknowledgment.acknowledge();
                }
            } else {
                sendToDeadLetterTopic(event, e, "Permanent error");
                acknowledgment.acknowledge();
            }
        }
    }
    
    private void scheduleRetryWithBackoff(OrderPlacedEvent event, RetryInfo retryInfo) {
        int attempt = retryInfo.getAttempts();
        long delay = calculateBackoffDelay(attempt);
        
        // Retry-Info aktualisieren
        retryInfo.setAttempts(attempt + 1);
        retryInfo.setLastAttempt(Instant.now());
        retryInfo.setNextRetry(Instant.now().plusMillis(delay));
        
        saveRetryInfo(retryInfo);
        
        // Event für verzögerte Verarbeitung einplanen
        ScheduledRetryEvent retryEvent = ScheduledRetryEvent.builder()
            .originalEvent(event)
            .scheduleTime(Instant.now().plusMillis(delay))
            .attempt(attempt + 1)
            .build();
        
        kafkaTemplate.send("payment.retry.v1", retryEvent);
        
        log.info("Scheduled retry {} for order {} in {}ms", 
                attempt + 1, event.getOrderId(), delay);
    }
    
    private long calculateBackoffDelay(int attempt) {
        // Exponential Backoff: 1s, 2s, 4s, 8s, 16s, ...
        long baseDelay = 1000; // 1 Sekunde
        long maxDelay = 300000; // 5 Minuten Maximum
        
        long delay = (long) (baseDelay * Math.pow(2, attempt));
        
        // Jitter hinzufügen (±25% Randomisierung)
        double jitterFactor = 0.75 + (Math.random() * 0.5);
        delay = (long) (delay * jitterFactor);
        
        return Math.min(delay, maxDelay);
    }
    
    private int getMaxRetries(Exception e) {
        if (e instanceof ConnectTimeoutException) return 5;
        if (e instanceof ServiceUnavailableException) return 3;
        if (e instanceof RateLimitException) return 10;
        return 3; // Default
    }
}

// Separater Consumer für Retry-Events
@Component
public class RetryEventProcessor {
    
    @KafkaListener(topics = "payment.retry.v1")
    public void handleRetryEvent(ScheduledRetryEvent retryEvent, Acknowledgment acknowledgment) {
        if (Instant.now().isAfter(retryEvent.getScheduleTime())) {
            // Zeit für Retry ist gekommen
            try {
                processPayment(retryEvent.getOriginalEvent());
                acknowledgment.acknowledge();
                
            } catch (Exception e) {
                // Weiterleitung an normalen Error-Handler
                kafkaTemplate.send("order.placed.v1", retryEvent.getOriginalEvent());
                acknowledgment.acknowledge();
            }
        } else {
            // Noch nicht Zeit für Retry - Event zurück in Topic
            kafkaTemplate.send("payment.retry.v1", retryEvent);
            acknowledgment.acknowledge();
        }
    }
}

31.2.2 Circuit Breaker Pattern

@Component
public class CircuitBreakerPaymentService {
    private final CircuitBreaker paymentGatewayCircuitBreaker;
    
    public CircuitBreakerPaymentService() {
        this.paymentGatewayCircuitBreaker = CircuitBreaker.ofDefaults("paymentGateway");
        
        // Circuit Breaker Konfiguration
        paymentGatewayCircuitBreaker.getEventPublisher()
            .onStateTransition(event -> 
                log.info("Circuit breaker state transition: {} -> {}", 
                        event.getStateTransition().getFromState(),
                        event.getStateTransition().getToState()));
    }
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        try {
            PaymentResult result = paymentGatewayCircuitBreaker.executeSupplier(() -> 
                paymentGateway.processPayment(createPaymentRequest(event))
            );
            
            if (result.isSuccessful()) {
                publishPaymentProcessedEvent(event.getOrderId(), result);
                acknowledgment.acknowledge();
            } else {
                handlePaymentFailure(event, result);
            }
            
        } catch (CallNotPermittedException e) {
            // Circuit Breaker ist offen
            log.warn("Circuit breaker open for order {}, scheduling retry", 
                    event.getOrderId());
            scheduleRetryWhenCircuitClosed(event);
            
        } catch (Exception e) {
            handlePaymentError(event, e, acknowledgment);
        }
    }
    
    private void scheduleRetryWhenCircuitClosed(OrderPlacedEvent event) {
        // Event für späteren Retry vormerken
        CircuitBreakerRetryEvent retryEvent = CircuitBreakerRetryEvent.builder()
            .originalEvent(event)
            .reason("Circuit breaker open")
            .scheduleTime(Instant.now().plusMinutes(1))
            .build();
        
        kafkaTemplate.send("payment.circuit-breaker-retry.v1", retryEvent);
    }
}

Python Asyncio Circuit Breaker:

import asyncio
from enum import Enum
import time
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open" 
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
                print("Circuit breaker half-open, trying request")
            else:
                raise Exception("Circuit breaker is open")
        
        try:
            result = await func(*args, **kwargs)
            self.on_success()
            return result
            
        except Exception as e:
            self.on_failure()
            raise e
    
    def on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker opened after {self.failure_count} failures")

class CircuitBreakerPaymentService:
    def __init__(self, kafka_config):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
    
    async def process_events(self):
        while True:
            msg = self.consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
            
            try:
                event = json.loads(msg.value().decode('utf-8'))
                
                # Payment über Circuit Breaker verarbeiten
                result = await self.circuit_breaker.call(
                    self.process_payment_external, event
                )
                
                if result['success']:
                    self.consumer.commit(msg)
                    print(f"Payment processed for order: {event['orderId']}")
                
            except Exception as e:
                if "Circuit breaker is open" in str(e):
                    await self.schedule_circuit_breaker_retry(event)
                else:
                    await self.handle_payment_error(event, e)

31.3 Poison Message Handling

31.3.1 Dead Letter Topic Implementation

@Component
public class DeadLetterTopicHandler {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final DeadLetterRepository deadLetterRepository;
    
    public void sendToDeadLetterTopic(OrderPlacedEvent originalEvent, 
                                    Exception error, String reason) {
        
        DeadLetterEvent deadLetterEvent = DeadLetterEvent.builder()
            .originalEventId(originalEvent.getEventId())
            .originalTopic("order.placed.v1")
            .orderId(originalEvent.getOrderId())
            .originalPayload(objectMapper.writeValueAsString(originalEvent))
            .errorMessage(error.getMessage())
            .errorStackTrace(getStackTrace(error))
            .reason(reason)
            .timestamp(Instant.now())
            .retryCount(getRetryCount(originalEvent))
            .build();
        
        // Dead Letter Topic senden
        kafkaTemplate.send("order.placed.v1.DLT", deadLetterEvent);
        
        // Lokale Persistence für Management UI
        DeadLetterRecord record = mapToRecord(deadLetterEvent);
        deadLetterRepository.save(record);
        
        // Monitoring und Alerting
        meterRegistry.counter("dead.letter.events", 
            "topic", "order.placed.v1",
            "reason", reason).increment();
        
        log.error("Sent event to Dead Letter Topic: orderId={}, reason={}", 
                 originalEvent.getOrderId(), reason);
    }
    
    // Dead Letter Topic Consumer für Analyse
    @KafkaListener(topics = "order.placed.v1.DLT")
    public void handleDeadLetterEvent(DeadLetterEvent deadLetterEvent) {
        log.info("Received dead letter event: {}", deadLetterEvent.getOriginalEventId());
        
        // Kategorisierung für Analyse
        String errorCategory = categorizeError(deadLetterEvent.getErrorMessage());
        
        // Statistiken aktualisieren
        updateDeadLetterStatistics(deadLetterEvent, errorCategory);
        
        // Bei bestimmten Fehlern automatische Remediation versuchen
        if (isRemediable(errorCategory)) {
            attemptAutomaticRemediation(deadLetterEvent);
        }
    }
    
    private void attemptAutomaticRemediation(DeadLetterEvent deadLetterEvent) {
        try {
            OrderPlacedEvent originalEvent = objectMapper.readValue(
                deadLetterEvent.getOriginalPayload(), OrderPlacedEvent.class
            );
            
            // Enrichment oder Korrektur versuchen
            OrderPlacedEvent correctedEvent = enrichAndCorrectEvent(originalEvent);
            
            if (correctedEvent != null) {
                // Zurück in originales Topic senden
                kafkaTemplate.send("order.placed.v1", correctedEvent);
                
                // Dead Letter Record als resolved markieren
                deadLetterRepository.markAsResolved(
                    deadLetterEvent.getOriginalEventId(), 
                    "Automatic remediation successful"
                );
                
                log.info("Successfully remediated dead letter event: {}", 
                        deadLetterEvent.getOriginalEventId());
            }
            
        } catch (Exception e) {
            log.error("Automatic remediation failed for event {}", 
                     deadLetterEvent.getOriginalEventId(), e);
        }
    }
}

31.3.2 Poison Message Detection

@Component
public class PoisonMessageDetector {
    private final RedisTemplate<String, Object> redisTemplate;
    private static final String POISON_KEY_PREFIX = "poison:";
    private static final int POISON_THRESHOLD = 5;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event, 
                                ConsumerRecord<String, OrderPlacedEvent> record,
                                Acknowledgment acknowledgment) {
        
        String messageKey = generateMessageKey(record);
        
        // Prüfung auf Poison Message
        if (isPoisonMessage(messageKey)) {
            log.warn("Detected poison message, sending to quarantine: {}", messageKey);
            sendToQuarantineTopic(event, "Poison message detected");
            acknowledgment.acknowledge();
            return;
        }
        
        try {
            processPayment(event);
            
            // Bei Erfolg Poison-Counter zurücksetzen
            redisTemplate.delete(POISON_KEY_PREFIX + messageKey);
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            // Fehler-Count für diese Message erhöhen
            incrementPoisonCounter(messageKey);
            
            if (isTransientError(e)) {
                // Normal retry für transiente Fehler
                throw e;
            } else {
                sendToDeadLetterTopic(event, e, "Permanent error");
                acknowledgment.acknowledge();
            }
        }
    }
    
    private String generateMessageKey(ConsumerRecord<String, OrderPlacedEvent> record) {
        // Eindeutiger Key basierend auf Message-Inhalt
        OrderPlacedEvent event = record.value();
        return DigestUtils.md5Hex(
            event.getOrderId() + ":" + 
            event.getCustomerId() + ":" + 
            event.getTotalAmount() + ":" +
            record.topic() + ":" + 
            record.partition() + ":" + 
            record.offset()
        );
    }
    
    private boolean isPoisonMessage(String messageKey) {
        String counterKey = POISON_KEY_PREFIX + messageKey;
        Integer count = (Integer) redisTemplate.opsForValue().get(counterKey);
        return count != null && count >= POISON_THRESHOLD;
    }
    
    private void incrementPoisonCounter(String messageKey) {
        String counterKey = POISON_KEY_PREFIX + messageKey;
        redisTemplate.opsForValue().increment(counterKey);
        redisTemplate.expire(counterKey, Duration.ofHours(24));
    }
}

31.3.3 Message Quarantine und Recovery

class MessageQuarantineService:
    def __init__(self, kafka_config):
        self.consumer = Consumer(kafka_config)
        self.producer = Producer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.quarantine_map = {}  # In-Memory für Demo
    
    async def process_with_quarantine(self):
        while True:
            msg = self.consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
            
            message_id = self.generate_message_id(msg)
            
            try:
                # Quarantine-Check
                if self.is_quarantined(message_id):
                    print(f"Message {message_id} is quarantined, skipping")
                    self.consumer.commit(msg)
                    continue
                
                event = json.loads(msg.value().decode('utf-8'))
                success = await self.process_payment(event)
                
                if success:
                    # Bei Erfolg Quarantine-Info löschen
                    self.remove_from_quarantine(message_id)
                    self.consumer.commit(msg)
                else:
                    self.handle_processing_failure(msg, message_id, "Processing failed")
                
            except Exception as e:
                self.handle_processing_failure(msg, message_id, str(e))
    
    def handle_processing_failure(self, msg, message_id: str, error_reason: str):
        failure_count = self.increment_failure_count(message_id)
        
        if failure_count >= 3:
            print(f"Message {message_id} failed {failure_count} times, quarantining")
            self.quarantine_message(message_id, error_reason)
            self.send_to_quarantine_topic(msg, error_reason)
        
        # Kein Commit - Message wird wiederholt
    
    def quarantine_message(self, message_id: str, reason: str):
        self.quarantine_map[message_id] = {
            'reason': reason,
            'quarantined_at': time.time(),
            'failure_count': self.get_failure_count(message_id)
        }
    
    def send_to_quarantine_topic(self, msg, reason: str):
        quarantine_event = {
            'original_topic': msg.topic(),
            'original_partition': msg.partition(),
            'original_offset': msg.offset(),
            'original_payload': msg.value().decode('utf-8'),
            'quarantine_reason': reason,
            'quarantined_at': time.time()
        }
        
        self.producer.produce(
            'quarantine.messages.v1',
            key=msg.key(),
            value=json.dumps(quarantine_event).encode('utf-8')
        )
        self.producer.flush()

# Recovery Service für quarantinierte Messages
class QuarantineRecoveryService:
    def __init__(self, kafka_config):
        self.consumer = Consumer(kafka_config)
        self.producer = Producer(kafka_config)
        self.consumer.subscribe(['quarantine.messages.v1'])
    
    async def review_quarantined_messages(self):
        """Periodische Überprüfung quarantinierter Messages"""
        while True:
            msg = self.consumer.poll(timeout=1.0)
            
            if msg is None:
                await asyncio.sleep(5)
                continue
            
            try:
                quarantine_event = json.loads(msg.value().decode('utf-8'))
                
                # Automatische Recovery-Versuche
                if await self.can_auto_recover(quarantine_event):
                    await self.attempt_auto_recovery(quarantine_event)
                else:
                    # Für manuellen Review vormerken
                    await self.log_for_manual_review(quarantine_event)
                
                self.consumer.commit(msg)
                
            except Exception as e:
                print(f"Error processing quarantined message: {e}")
    
    async def can_auto_recover(self, quarantine_event: dict) -> bool:
        """Bestimmt, ob automatische Recovery möglich ist"""
        reason = quarantine_event['quarantine_reason']
        
        # Beispiele für auto-recoverable Probleme
        auto_recoverable_reasons = [
            'timeout',
            'service unavailable',
            'temporary network error'
        ]
        
        return any(recoverable in reason.lower() 
                  for recoverable in auto_recoverable_reasons)
    
    async def attempt_auto_recovery(self, quarantine_event: dict):
        """Versucht automatische Wiederherstellung"""
        try:
            original_payload = quarantine_event['original_payload']
            original_topic = quarantine_event['original_topic']
            
            # Message zurück in originales Topic senden
            self.producer.produce(
                original_topic,
                value=original_payload.encode('utf-8')
            )
            self.producer.flush()
            
            print(f"Auto-recovered quarantined message to {original_topic}")
            
        except Exception as e:
            print(f"Auto-recovery failed: {e}")

Effektive Fehlertoleranz in Event-Driven-Systemen erfordert eine Kombination aus intelligenter Fehlerklassifizierung, angemessenen Retry-Strategien und robusten Dead-Letter-Mechanismen. Die Implementierung sollte zwischen temporären und permanenten Fehlern unterscheiden und entsprechende Recovery-Strategien anwenden.

Das siehe Kapitel zu “Beispiele mit Zustandsänderung und Geschäftslogik” wird zeigen, wie diese Fehlerbehandlungsstrategien in komplexeren stateful Event-Processing-Szenarien angewendet werden, während das siehe Kapitel zu “Verarbeitung und Zustandsmanagement” die Auswirkungen von Fehlern auf die Zustandskonsistenz behandelt.