25 Logging, Fehlerbehandlung und Retry-Verhalten im Producer

Produktive Event-Producer benötigen umfassende Observability, robuste Fehlerbehandlung und intelligente Retry-Mechanismen. Diese operationellen Aspekte sind entscheidend für die Wartbarkeit, Debugging-Fähigkeit und Ausfallsicherheit von Event-Driven Systemen.

25.1 Structured Logging für Events

Structured Logging ermöglicht maschinelle Auswertung von Log-Daten und erleichtert das Debugging komplexer Event-Flows. Correlation IDs und strukturierte Datenformate sind essentiell für das Tracing von Events durch verteilte Systeme.

25.1.1 Spring Boot Structured Logging

// Logback Konfiguration für Structured Logging
// logback-spring.xml
/*
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <springProfile name="!local">
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
                <providers>
                    <timestamp/>
                    <logLevel/>
                    <loggerName/>
                    <message/>
                    <mdc/>
                    <arguments/>
                    <stackTrace/>
                </providers>
            </encoder>
        </appender>
    </springProfile>
    
    <springProfile name="local">
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level [%X{correlationId:-}] %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
    </springProfile>
    
    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
    
    <!-- Event-specific logging -->
    <logger name="de.eda.training.events" level="DEBUG"/>
    <logger name="org.springframework.kafka" level="INFO"/>
</configuration>
*/

// Event Logging Service
@Component
@Slf4j
public class EventLoggingService {
    
    private final MeterRegistry meterRegistry;
    private final Counter eventPublishedCounter;
    private final Counter eventFailedCounter;
    private final Timer eventPublishLatency;
    
    public EventLoggingService(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.eventPublishedCounter = Counter.builder("events.published")
            .description("Total events published")
            .register(meterRegistry);
        this.eventFailedCounter = Counter.builder("events.failed")
            .description("Total events failed")
            .register(meterRegistry);
        this.eventPublishLatency = Timer.builder("events.publish.duration")
            .description("Event publishing duration")
            .register(meterRegistry);
    }
    
    public void logEventPublishStart(String eventType, String businessKey, String correlationId) {
        // Set MDC for correlation tracking
        MDC.put("correlationId", correlationId);
        MDC.put("eventType", eventType);
        MDC.put("businessKey", businessKey);
        
        log.info("Event publishing started",
                kv("eventType", eventType),
                kv("businessKey", businessKey),
                kv("correlationId", correlationId),
                kv("stage", "publish_start"));
    }
    
    public void logEventPublishSuccess(String eventType, String businessKey, 
                                     RecordMetadata metadata, Duration duration) {
        eventPublishedCounter.increment(
            Tags.of("eventType", eventType, "topic", metadata.topic())
        );
        eventPublishLatency.record(duration);
        
        log.info("Event published successfully",
                kv("eventType", eventType),
                kv("businessKey", businessKey),
                kv("topic", metadata.topic()),
                kv("partition", metadata.partition()),
                kv("offset", metadata.offset()),
                kv("timestamp", metadata.timestamp()),
                kv("duration_ms", duration.toMillis()),
                kv("stage", "publish_success"));
        
        // Clear MDC
        MDC.clear();
    }
    
    public void logEventPublishFailure(String eventType, String businessKey, 
                                     String topic, Exception error, Duration duration) {
        eventFailedCounter.increment(
            Tags.of("eventType", eventType, "topic", topic, "errorType", error.getClass().getSimpleName())
        );
        
        log.error("Event publishing failed",
                kv("eventType", eventType),
                kv("businessKey", businessKey),
                kv("topic", topic),
                kv("duration_ms", duration.toMillis()),
                kv("errorType", error.getClass().getSimpleName()),
                kv("errorMessage", error.getMessage()),
                kv("stage", "publish_failure"),
                error);
        
        // Clear MDC
        MDC.clear();
    }
    
    public void logRetryAttempt(String eventType, String businessKey, int attempt, 
                              int maxAttempts, Duration nextRetryDelay) {
        log.warn("Event publishing retry attempt",
                kv("eventType", eventType),
                kv("businessKey", businessKey),
                kv("attempt", attempt),
                kv("maxAttempts", maxAttempts),
                kv("nextRetryDelay_ms", nextRetryDelay.toMillis()),
                kv("stage", "retry_attempt"));
    }
    
    public void logDeadLetterSend(String eventType, String businessKey, 
                                String originalTopic, String deadLetterTopic, String reason) {
        log.error("Event sent to dead letter topic",
                kv("eventType", eventType),
                kv("businessKey", businessKey),
                kv("originalTopic", originalTopic),
                kv("deadLetterTopic", deadLetterTopic),
                kv("reason", reason),
                kv("stage", "dead_letter"));
    }
    
    // Utility method for structured key-value logging
    private static KeyValue kv(String key, Object value) {
        return KeyValue.of(key, value);
    }
}

// Correlation ID Filter für Web Requests
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class CorrelationIdFilter implements Filter {
    
    private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
    private static final String CORRELATION_ID_MDC_KEY = "correlationId";
    
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                        FilterChain chain) throws IOException, ServletException {
        
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        HttpServletResponse httpResponse = (HttpServletResponse) response;
        
        // Get or generate correlation ID
        String correlationId = httpRequest.getHeader(CORRELATION_ID_HEADER);
        if (correlationId == null || correlationId.trim().isEmpty()) {
            correlationId = UUID.randomUUID().toString();
        }
        
        // Set in MDC and response header
        MDC.put(CORRELATION_ID_MDC_KEY, correlationId);
        httpResponse.setHeader(CORRELATION_ID_HEADER, correlationId);
        
        try {
            chain.doFilter(request, response);
        } finally {
            MDC.clear();
        }
    }
}

// Event Tracing Aspect
@Aspect
@Component
@Slf4j
public class EventTracingAspect {
    
    private final EventLoggingService eventLoggingService;
    
    @Around("@annotation(EventTrace)")
    public Object traceEventPublishing(ProceedingJoinPoint joinPoint) throws Throwable {
        long startTime = System.currentTimeMillis();
        String methodName = joinPoint.getSignature().getName();
        Object[] args = joinPoint.getArgs();
        
        // Extract event information from method arguments
        String eventType = extractEventType(args);
        String businessKey = extractBusinessKey(args);
        String correlationId = MDC.get("correlationId");
        
        eventLoggingService.logEventPublishStart(eventType, businessKey, correlationId);
        
        try {
            Object result = joinPoint.proceed();
            Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);
            
            // Log success if result indicates successful publishing
            if (result instanceof SendResult) {
                SendResult<String, Object> sendResult = (SendResult<String, Object>) result;
                eventLoggingService.logEventPublishSuccess(eventType, businessKey, 
                                                         sendResult.getRecordMetadata(), duration);
            }
            
            return result;
        } catch (Exception e) {
            Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);
            eventLoggingService.logEventPublishFailure(eventType, businessKey, 
                                                     "unknown", e, duration);
            throw e;
        }
    }
    
    private String extractEventType(Object[] args) {
        // Extract event type from method arguments
        for (Object arg : args) {
            if (arg != null && arg.getClass().getSimpleName().endsWith("Event")) {
                return arg.getClass().getSimpleName();
            }
        }
        return "UnknownEvent";
    }
    
    private String extractBusinessKey(Object[] args) {
        // Extract business key (usually order ID, customer ID, etc.)
        for (Object arg : args) {
            if (arg instanceof String) {
                return (String) arg;
            }
        }
        return "unknown";
    }
}

// Event Trace Annotation
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface EventTrace {
}

25.1.2 Python Structured Logging

# Python Structured Logging mit structlog
import structlog
import logging
import json
import uuid
import time
import contextvars
from datetime import datetime
from typing import Optional, Dict, Any
from prometheus_client import Counter, Histogram, generate_latest

# Correlation ID Context Variable
correlation_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
    'correlation_id', default=None
)

# Configure structlog
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.StackInfoRenderer(),
        structlog.dev.set_exc_info,
        structlog.processors.JSONRenderer()
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    context_class=dict,
    logger_factory=structlog.PrintLoggerFactory(),
    cache_logger_on_first_use=True,
)

# Metrics
event_published_counter = Counter(
    'events_published_total',
    'Total events published',
    ['event_type', 'topic', 'status']
)

event_publish_duration = Histogram(
    'event_publish_duration_seconds',
    'Event publishing duration',
    ['event_type', 'topic']
)

class EventLoggingService:
    def __init__(self):
        self.logger = structlog.get_logger()
    
    def log_event_publish_start(self, event_type: str, business_key: str, 
                              correlation_id: Optional[str] = None):
        """Log the start of event publishing"""
        if not correlation_id:
            correlation_id = str(uuid.uuid4())
        
        # Set correlation ID in context
        correlation_id_var.set(correlation_id)
        
        self.logger.info(
            "Event publishing started",
            event_type=event_type,
            business_key=business_key,
            correlation_id=correlation_id,
            stage="publish_start",
            timestamp=datetime.utcnow().isoformat()
        )
        
        return correlation_id
    
    def log_event_publish_success(self, event_type: str, business_key: str,
                                topic: str, partition: int, offset: int,
                                duration_seconds: float):
        """Log successful event publishing"""
        
        # Record metrics
        event_published_counter.labels(
            event_type=event_type, 
            topic=topic, 
            status='success'
        ).inc()
        
        event_publish_duration.labels(
            event_type=event_type,
            topic=topic
        ).observe(duration_seconds)
        
        self.logger.info(
            "Event published successfully",
            event_type=event_type,
            business_key=business_key,
            topic=topic,
            partition=partition,
            offset=offset,
            duration_seconds=duration_seconds,
            stage="publish_success",
            correlation_id=correlation_id_var.get()
        )
    
    def log_event_publish_failure(self, event_type: str, business_key: str,
                                topic: str, error: Exception, 
                                duration_seconds: float):
        """Log failed event publishing"""
        
        # Record metrics
        event_published_counter.labels(
            event_type=event_type,
            topic=topic,
            status='failure'
        ).inc()
        
        self.logger.error(
            "Event publishing failed",
            event_type=event_type,
            business_key=business_key,
            topic=topic,
            duration_seconds=duration_seconds,
            error_type=type(error).__name__,
            error_message=str(error),
            stage="publish_failure",
            correlation_id=correlation_id_var.get(),
            exc_info=True
        )
    
    def log_retry_attempt(self, event_type: str, business_key: str,
                         attempt: int, max_attempts: int, 
                         next_retry_delay_seconds: float):
        """Log retry attempt"""
        self.logger.warning(
            "Event publishing retry attempt",
            event_type=event_type,
            business_key=business_key,
            attempt=attempt,
            max_attempts=max_attempts,
            next_retry_delay_seconds=next_retry_delay_seconds,
            stage="retry_attempt",
            correlation_id=correlation_id_var.get()
        )
    
    def log_dead_letter_send(self, event_type: str, business_key: str,
                           original_topic: str, dead_letter_topic: str,
                           reason: str):
        """Log sending event to dead letter topic"""
        self.logger.error(
            "Event sent to dead letter topic",
            event_type=event_type,
            business_key=business_key,
            original_topic=original_topic,
            dead_letter_topic=dead_letter_topic,
            reason=reason,
            stage="dead_letter",
            correlation_id=correlation_id_var.get()
        )

# Event Tracing Decorator
import functools
import asyncio

def event_trace(func):
    """Decorator for tracing event publishing methods"""
    @functools.wraps(func)
    async def async_wrapper(*args, **kwargs):
        start_time = time.time()
        logging_service = EventLoggingService()
        
        # Extract event info from arguments
        event_type = _extract_event_type(args, kwargs)
        business_key = _extract_business_key(args, kwargs)
        
        correlation_id = logging_service.log_event_publish_start(
            event_type, business_key
        )
        
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(*args, **kwargs)
            else:
                result = func(*args, **kwargs)
            
            duration = time.time() - start_time
            
            # Extract result info
            if isinstance(result, dict) and 'topic' in result:
                logging_service.log_event_publish_success(
                    event_type, business_key,
                    result['topic'],
                    result.get('partition', -1),
                    result.get('offset', -1),
                    duration
                )
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            logging_service.log_event_publish_failure(
                event_type, business_key, "unknown", e, duration
            )
            raise
    
    @functools.wraps(func)
    def sync_wrapper(*args, **kwargs):
        # Sync version of the wrapper
        start_time = time.time()
        logging_service = EventLoggingService()
        
        event_type = _extract_event_type(args, kwargs)
        business_key = _extract_business_key(args, kwargs)
        
        correlation_id = logging_service.log_event_publish_start(
            event_type, business_key
        )
        
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            
            if isinstance(result, dict) and 'topic' in result:
                logging_service.log_event_publish_success(
                    event_type, business_key,
                    result['topic'],
                    result.get('partition', -1),
                    result.get('offset', -1),
                    duration
                )
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            logging_service.log_event_publish_failure(
                event_type, business_key, "unknown", e, duration
            )
            raise
    
    # Return appropriate wrapper based on function type
    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    else:
        return sync_wrapper

def _extract_event_type(args, kwargs) -> str:
    """Extract event type from function arguments"""
    # Look for event objects in arguments
    for arg in args:
        if hasattr(arg, '__class__') and arg.__class__.__name__.endswith('Event'):
            return arg.__class__.__name__
    
    # Look in kwargs
    for key, value in kwargs.items():
        if key == 'event_type':
            return str(value)
        if hasattr(value, '__class__') and value.__class__.__name__.endswith('Event'):
            return value.__class__.__name__
    
    return "UnknownEvent"

def _extract_business_key(args, kwargs) -> str:
    """Extract business key from function arguments"""
    # Look for common business key names
    business_key_names = ['order_id', 'customer_id', 'business_key', 'key']
    
    for key, value in kwargs.items():
        if key in business_key_names:
            return str(value)
    
    # Look for string arguments that might be business keys
    for arg in args:
        if isinstance(arg, str) and len(arg) > 0:
            return arg
    
    return "unknown"

# Correlation ID Middleware for FastAPI
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware

class CorrelationIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Get or generate correlation ID
        correlation_id = request.headers.get('x-correlation-id')
        if not correlation_id:
            correlation_id = str(uuid.uuid4())
        
        # Set in context variable
        correlation_id_var.set(correlation_id)
        
        # Process request
        response = await call_next(request)
        
        # Add correlation ID to response
        response.headers['x-correlation-id'] = correlation_id
        
        return response

25.2 Dead Letter Topics

Dead Letter Topics fangen Events auf, die nicht erfolgreich verarbeitet werden können. Sie sind essentiell für das Management von problematischen Events und ermöglichen manuelle Nachbearbeitung.

25.2.1 Spring Boot Dead Letter Implementation

// Dead Letter Topic Configuration
@Configuration
public class DeadLetterTopicConfig {
    
    @Value("${app.events.dead-letter.enabled:true}")
    private boolean deadLetterEnabled;
    
    @Value("${app.events.dead-letter.retention-hours:168}") // 7 days
    private int retentionHours;
    
    @Bean
    public NewTopic createDeadLetterTopics(EventProperties eventProperties) {
        if (!deadLetterEnabled) {
            return null;
        }
        
        // Create dead letter topics for each main topic
        return TopicBuilder.name("dead-letter-events")
            .partitions(3)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, 
                   String.valueOf(retentionHours * 60 * 60 * 1000))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")
            .build();
    }
}

// Dead Letter Service
@Service
@Slf4j
public class DeadLetterService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final EventLoggingService eventLoggingService;
    private final DeadLetterMetrics deadLetterMetrics;
    
    private static final String DEAD_LETTER_TOPIC = "dead-letter-events";
    
    public void sendToDeadLetter(String originalTopic, String key, Object originalEvent, 
                               Exception error, int retryAttempts) {
        
        DeadLetterEvent deadLetterEvent = DeadLetterEvent.builder()
            .deadLetterEventId(UUID.randomUUID().toString())
            .originalTopic(originalTopic)
            .originalKey(key)
            .originalEvent(originalEvent)
            .failureReason(error.getMessage())
            .failureType(error.getClass().getSimpleName())
            .retryAttempts(retryAttempts)
            .firstFailureTime(Instant.now())
            .deadLetterTime(Instant.now())
            .correlationId(MDC.get("correlationId"))
            .stackTrace(ExceptionUtils.getStackTrace(error))
            .build();
        
        try {
            kafkaTemplate.send(DEAD_LETTER_TOPIC, key, deadLetterEvent)
                .addCallback(
                    result -> {
                        deadLetterMetrics.incrementDeadLetterSuccess(originalTopic);
                        eventLoggingService.logDeadLetterSend(
                            extractEventType(originalEvent),
                            key,
                            originalTopic,
                            DEAD_LETTER_TOPIC,
                            error.getMessage()
                        );
                    },
                    failure -> {
                        deadLetterMetrics.incrementDeadLetterFailure(originalTopic);
                        log.error("Failed to send event to dead letter topic", failure);
                        // Fallback: log to file or database
                        logToFileSystem(deadLetterEvent);
                    }
                );
        } catch (Exception e) {
            log.error("Critical failure: Cannot send to dead letter topic", e);
            // Emergency fallback
            logToFileSystem(deadLetterEvent);
        }
    }
    
    private void logToFileSystem(DeadLetterEvent event) {
        // Emergency fallback: write to local file
        try {
            String filename = String.format("dead-letter-%s.json", 
                                           event.getDeadLetterEventId());
            String json = new ObjectMapper().writeValueAsString(event);
            Files.write(Paths.get("/var/log/dead-letters", filename), 
                       json.getBytes(), StandardOpenOption.CREATE);
        } catch (Exception e) {
            log.error("Emergency dead letter logging failed", e);
        }
    }
    
    @Data
    @Builder
    public static class DeadLetterEvent {
        private String deadLetterEventId;
        private String originalTopic;
        private String originalKey;
        private Object originalEvent;
        private String failureReason;
        private String failureType;
        private int retryAttempts;
        private Instant firstFailureTime;
        private Instant deadLetterTime;
        private String correlationId;
        private String stackTrace;
        
        // Additional metadata for debugging
        private Map<String, String> systemInfo = Map.of(
            "hostname", getHostname(),
            "applicationVersion", getApplicationVersion(),
            "javaVersion", System.getProperty("java.version")
        );
    }
    
    private String extractEventType(Object event) {
        return event != null ? event.getClass().getSimpleName() : "Unknown";
    }
    
    private static String getHostname() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (Exception e) {
            return "unknown";
        }
    }
    
    private static String getApplicationVersion() {
        return DeadLetterService.class.getPackage().getImplementationVersion();
    }
}

// Dead Letter Recovery Service
@Service
public class DeadLetterRecoveryService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final DeadLetterRepository deadLetterRepository;
    
    @KafkaListener(topics = "dead-letter-events")
    public void handleDeadLetterEvent(DeadLetterEvent deadLetterEvent) {
        // Store dead letter event for later analysis
        deadLetterRepository.save(convertToEntity(deadLetterEvent));
    }
    
    @Scheduled(fixedDelay = 300000) // Every 5 minutes
    public void retryRecoverableEvents() {
        List<DeadLetterEntity> recoverableEvents = 
            deadLetterRepository.findRecoverableEvents();
        
        for (DeadLetterEntity deadLetter : recoverableEvents) {
            if (isRecoverable(deadLetter)) {
                try {
                    // Attempt to republish
                    kafkaTemplate.send(deadLetter.getOriginalTopic(), 
                                     deadLetter.getOriginalKey(), 
                                     deadLetter.getOriginalEvent());
                    
                    deadLetter.setRecoveryAttempted(true);
                    deadLetter.setRecoveryTime(Instant.now());
                    deadLetterRepository.save(deadLetter);
                    
                } catch (Exception e) {
                    log.warn("Dead letter recovery failed", e);
                }
            }
        }
    }
    
    private boolean isRecoverable(DeadLetterEntity deadLetter) {
        // Define business rules for when events can be recovered
        return !deadLetter.isRecoveryAttempted() &&
               deadLetter.getRetryAttempts() < 5 &&
               deadLetter.getDeadLetterTime().isAfter(
                   Instant.now().minus(Duration.ofHours(24))
               );
    }
}

25.2.2 Python Dead Letter Implementation

# Python Dead Letter Service
import json
import traceback
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Any, Dict, Optional
import structlog

logger = structlog.get_logger()

@dataclass
class DeadLetterEvent:
    dead_letter_event_id: str
    original_topic: str
    original_key: str
    original_event: Dict[str, Any]
    failure_reason: str
    failure_type: str
    retry_attempts: int
    first_failure_time: str
    dead_letter_time: str
    correlation_id: Optional[str]
    stack_trace: str
    system_info: Dict[str, str]

class DeadLetterService:
    def __init__(self, producer, metrics_service=None):
        self.producer = producer
        self.metrics_service = metrics_service
        self.logger = logger.bind(component="DeadLetterService")
        self.dead_letter_topic = "dead-letter-events"
    
    async def send_to_dead_letter(self, original_topic: str, key: str, 
                                original_event: Any, error: Exception, 
                                retry_attempts: int):
        """Send event to dead letter topic"""
        
        dead_letter_event = DeadLetterEvent(
            dead_letter_event_id=str(uuid.uuid4()),
            original_topic=original_topic,
            original_key=key,
            original_event=self._serialize_event(original_event),
            failure_reason=str(error),
            failure_type=type(error).__name__,
            retry_attempts=retry_attempts,
            first_failure_time=datetime.utcnow().isoformat(),
            dead_letter_time=datetime.utcnow().isoformat(),
            correlation_id=correlation_id_var.get(),
            stack_trace=traceback.format_exc(),
            system_info=self._get_system_info()
        )
        
        try:
            # Send to dead letter topic
            result = await self.producer.send_event(
                topic=self.dead_letter_topic,
                event=asdict(dead_letter_event),
                key=key
            )
            
            if self.metrics_service:
                self.metrics_service.increment_dead_letter_success(original_topic)
            
            self.logger.error(
                "Event sent to dead letter topic",
                dead_letter_event_id=dead_letter_event.dead_letter_event_id,
                original_topic=original_topic,
                original_key=key,
                failure_reason=dead_letter_event.failure_reason,
                retry_attempts=retry_attempts
            )
            
        except Exception as dead_letter_error:
            if self.metrics_service:
                self.metrics_service.increment_dead_letter_failure(original_topic)
            
            self.logger.critical(
                "Failed to send event to dead letter topic",
                original_topic=original_topic,
                original_key=key,
                dead_letter_error=str(dead_letter_error)
            )
            
            # Emergency fallback: write to file
            await self._write_to_filesystem(dead_letter_event)
    
    def _serialize_event(self, event: Any) -> Dict[str, Any]:
        """Serialize event for dead letter storage"""
        if hasattr(event, '__dict__'):
            return event.__dict__
        elif hasattr(event, '__dataclass_fields__'):
            return asdict(event)
        elif isinstance(event, dict):
            return event
        else:
            return {"serialized_value": str(event)}
    
    def _get_system_info(self) -> Dict[str, str]:
        """Get system information for debugging"""
        import platform
        import os
        
        return {
            "hostname": platform.node(),
            "python_version": platform.python_version(),
            "platform": platform.platform(),
            "pid": str(os.getpid())
        }
    
    async def _write_to_filesystem(self, dead_letter_event: DeadLetterEvent):
        """Emergency fallback: write to local filesystem"""
        try:
            import aiofiles
            import os
            
            dead_letter_dir = "/var/log/dead-letters"
            os.makedirs(dead_letter_dir, exist_ok=True)
            
            filename = f"dead-letter-{dead_letter_event.dead_letter_event_id}.json"
            filepath = os.path.join(dead_letter_dir, filename)
            
            async with aiofiles.open(filepath, 'w') as f:
                await f.write(json.dumps(asdict(dead_letter_event), indent=2))
            
            self.logger.warning(
                "Dead letter event written to filesystem",
                filepath=filepath
            )
            
        except Exception as fs_error:
            self.logger.critical(
                "Emergency filesystem write failed",
                error=str(fs_error)
            )

class DeadLetterRecoveryService:
    def __init__(self, producer, consumer, database):
        self.producer = producer
        self.consumer = consumer
        self.database = database
        self.logger = logger.bind(component="DeadLetterRecoveryService")
    
    async def handle_dead_letter_event(self, dead_letter_event: Dict[str, Any]):
        """Handle incoming dead letter events"""
        # Store in database for analysis
        await self.database.store_dead_letter_event(dead_letter_event)
        
        self.logger.info(
            "Dead letter event stored for analysis",
            dead_letter_event_id=dead_letter_event.get('dead_letter_event_id'),
            original_topic=dead_letter_event.get('original_topic')
        )
    
    async def retry_recoverable_events(self):
        """Periodically retry recoverable events"""
        recoverable_events = await self.database.get_recoverable_events()
        
        for event in recoverable_events:
            if await self._is_recoverable(event):
                try:
                    # Attempt republish
                    await self.producer.send_event(
                        topic=event['original_topic'],
                        event=event['original_event'],
                        key=event['original_key']
                    )
                    
                    # Mark as recovery attempted
                    await self.database.mark_recovery_attempted(
                        event['dead_letter_event_id']
                    )
                    
                    self.logger.info(
                        "Dead letter event recovery successful",
                        dead_letter_event_id=event['dead_letter_event_id']
                    )
                    
                except Exception as recovery_error:
                    self.logger.warning(
                        "Dead letter recovery failed",
                        dead_letter_event_id=event['dead_letter_event_id'],
                        error=str(recovery_error)
                    )
    
    async def _is_recoverable(self, event: Dict[str, Any]) -> bool:
        """Determine if event is recoverable"""
        # Business rules for recovery
        return (
            not event.get('recovery_attempted', False) and
            event.get('retry_attempts', 0) < 5 and
            datetime.fromisoformat(event['dead_letter_time']) > 
            datetime.utcnow() - timedelta(hours=24)
        )

25.3 Circuit Breaker Implementation

Circuit Breaker Pattern verhindert kaskadische Ausfälle, indem es fehlerhafte Services isoliert. Bei Event-Produktion schützt es vor Broker-Ausfällen und reduziert die Systemlast während Störungen.

25.3.1 Spring Boot Circuit Breaker

// Spring Boot Circuit Breaker mit Resilience4j
// pom.xml dependency
/*
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
    <version>1.7.1</version>
</dependency>
*/

// Circuit Breaker Configuration
@Configuration
public class CircuitBreakerConfig {
    
    @Bean
    public CircuitBreaker eventPublishingCircuitBreaker() {
        return CircuitBreaker.ofDefaults("eventPublishing");
    }
    
    @Bean
    public CircuitBreakerRegistry circuitBreakerRegistry() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)                    // 50% failure rate
            .waitDurationInOpenState(Duration.ofSeconds(30))  // Wait 30s in open state
            .slidingWindowSize(10)                       // Consider last 10 calls
            .minimumNumberOfCalls(5)                     // Minimum 5 calls before evaluation
            .permittedNumberOfCallsInHalfOpenState(3)    // 3 calls in half-open state
            .automaticTransitionFromOpenToHalfOpenEnabled(true)
            .build();
        
        return CircuitBreakerRegistry.of(config);
    }
}

// application.yml
/*
resilience4j:
  circuitbreaker:
    instances:
      eventPublishing:
        failure-rate-threshold: 50
        wait-duration-in-open-state: 30s
        sliding-window-size: 10
        minimum-number-of-calls: 5
        permitted-number-of-calls-in-half-open-state: 3
        automatic-transition-from-open-to-half-open-enabled: true
        record-exceptions:
          - org.apache.kafka.common.errors.TimeoutException
          - org.apache.kafka.common.errors.RetriableException
        ignore-exceptions:
          - org.apache.kafka.common.errors.SerializationException
  metrics:
    enabled: true
  events:
    enabled: true
*/

// Circuit Breaker Event Publisher
@Component
@Slf4j
public class CircuitBreakerEventPublisher {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final CircuitBreaker circuitBreaker;
    private final EventLoggingService eventLoggingService;
    private final DeadLetterService deadLetterService;
    
    public CircuitBreakerEventPublisher(KafkaTemplate<String, Object> kafkaTemplate,
                                      CircuitBreakerRegistry circuitBreakerRegistry,
                                      EventLoggingService eventLoggingService,
                                      DeadLetterService deadLetterService) {
        this.kafkaTemplate = kafkaTemplate;
        this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("eventPublishing");
        this.eventLoggingService = eventLoggingService;
        this.deadLetterService = deadLetterService;
        
        // Register event listeners
        registerCircuitBreakerEventListeners();
    }
    
    public CompletableFuture<SendResult<String, Object>> publishWithCircuitBreaker(
            String topic, String key, Object event) {
        
        // Wrap Kafka publishing with circuit breaker
        Supplier<CompletableFuture<SendResult<String, Object>>> publishSupplier = 
            CircuitBreaker.decorateSupplier(circuitBreaker, () -> {
                return kafkaTemplate.send(topic, key, event).completable();
            });
        
        return publishSupplier.get()
            .exceptionally(throwable -> {
                // Handle circuit breaker open state
                if (throwable instanceof CallNotPermittedException) {
                    log.warn("Circuit breaker is OPEN, event publishing blocked",
                            kv("topic", topic),
                            kv("key", key),
                            kv("eventType", event.getClass().getSimpleName()));
                    
                    // Send to dead letter or queue for later
                    handleCircuitBreakerOpen(topic, key, event);
                    return null;
                }
                
                log.error("Event publishing failed", throwable);
                throw new CompletionException(throwable);
            });
    }
    
    private void handleCircuitBreakerOpen(String topic, String key, Object event) {
        // Strategy 1: Send to dead letter topic
        deadLetterService.sendToDeadLetter(topic, key, event, 
                                         new CircuitBreakerOpenException("Circuit breaker is open"), 0);
        
        // Strategy 2: Queue for later retry (alternative)
        // queueForLaterRetry(topic, key, event);
    }
    
    private void registerCircuitBreakerEventListeners() {
        circuitBreaker.getEventPublisher()
            .onStateTransition(event -> {
                log.info("Circuit breaker state transition",
                        kv("from", event.getStateTransition().getFromState()),
                        kv("to", event.getStateTransition().getToState()),
                        kv("circuitBreaker", "eventPublishing"));
            })
            .onFailureRateExceeded(event -> {
                log.warn("Circuit breaker failure rate exceeded",
                        kv("failureRate", event.getFailureRate()),
                        kv("threshold", circuitBreaker.getCircuitBreakerConfig().getFailureRateThreshold()));
            })
            .onCallNotPermitted(event -> {
                log.warn("Circuit breaker call not permitted - OPEN state");
            });
    }
    
    // Health check endpoint for circuit breaker state
    @Component
    public static class CircuitBreakerHealthIndicator implements HealthIndicator {
        
        private final CircuitBreaker circuitBreaker;
        
        public CircuitBreakerHealthIndicator(CircuitBreakerRegistry registry) {
            this.circuitBreaker = registry.circuitBreaker("eventPublishing");
        }
        
        @Override
        public Health health() {
            CircuitBreaker.State state = circuitBreaker.getState();
            
            switch (state) {
                case CLOSED:
                    return Health.up()
                        .withDetail("circuitBreaker", "CLOSED")
                        .withDetail("failureRate", circuitBreaker.getMetrics().getFailureRate())
                        .build();
                case OPEN:
                    return Health.down()
                        .withDetail("circuitBreaker", "OPEN")
                        .withDetail("failureRate", circuitBreaker.getMetrics().getFailureRate())
                        .build();
                case HALF_OPEN:
                    return Health.up()
                        .withDetail("circuitBreaker", "HALF_OPEN")
                        .withDetail("failureRate", circuitBreaker.getMetrics().getFailureRate())
                        .build();
                default:
                    return Health.unknown()
                        .withDetail("circuitBreaker", state.toString())
                        .build();
            }
        }
    }
    
    public static class CircuitBreakerOpenException extends RuntimeException {
        public CircuitBreakerOpenException(String message) {
            super(message);
        }
    }
}

25.3.2 Python Circuit Breaker Implementation

# Python Circuit Breaker Implementation
import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
import structlog

logger = structlog.get_logger()

class CircuitBreakerState(Enum):
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5           # Failures before opening
    failure_rate_threshold: float = 0.5  # 50% failure rate
    timeout_seconds: int = 30            # Time to wait in OPEN state
    success_threshold: int = 3           # Successes to close from HALF_OPEN
    sliding_window_size: int = 10        # Size of sliding window

@dataclass 
class CircuitBreakerMetrics:
    total_calls: int = 0
    success_calls: int = 0
    failure_calls: int = 0
    last_failure_time: Optional[float] = None
    last_success_time: Optional[float] = None
    consecutive_failures: int = 0
    consecutive_successes: int = 0
    call_times: list = field(default_factory=list)
    
    @property
    def failure_rate(self) -> float:
        if self.total_calls == 0:
            return 0.0
        return self.failure_calls / self.total_calls

class CircuitBreakerOpenException(Exception):
    pass

class CircuitBreaker:
    def __init__(self, config: CircuitBreakerConfig):
        self.config = config
        self.state = CircuitBreakerState.CLOSED
        self.metrics = CircuitBreakerMetrics()
        self.logger = logger.bind(component="CircuitBreaker")
        self._lock = asyncio.Lock()
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""
        async with self._lock:
            if self.state == CircuitBreakerState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitBreakerState.HALF_OPEN
                    self.logger.info("Circuit breaker transitioning to HALF_OPEN")
                else:
                    self.logger.warning("Circuit breaker is OPEN, call not permitted")
                    raise CircuitBreakerOpenException("Circuit breaker is OPEN")
        
        start_time = time.time()
        
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(*args, **kwargs)
            else:
                result = func(*args, **kwargs)
            
            # Record success
            await self._record_success(time.time() - start_time)
            return result
            
        except Exception as e:
            # Record failure
            await self._record_failure(time.time() - start_time)
            raise e
    
    async def _record_success(self, call_duration: float):
        """Record successful call"""
        async with self._lock:
            self.metrics.total_calls += 1
            self.metrics.success_calls += 1
            self.metrics.consecutive_successes += 1
            self.metrics.consecutive_failures = 0
            self.metrics.last_success_time = time.time()
            
            # Maintain sliding window
            self._update_sliding_window(call_duration, success=True)
            
            # State transitions on success
            if self.state == CircuitBreakerState.HALF_OPEN:
                if self.metrics.consecutive_successes >= self.config.success_threshold:
                    self.state = CircuitBreakerState.CLOSED
                    self.logger.info("Circuit breaker CLOSED after successful calls")
    
    async def _record_failure(self, call_duration: float):
        """Record failed call"""
        async with self._lock:
            self.metrics.total_calls += 1
            self.metrics.failure_calls += 1
            self.metrics.consecutive_failures += 1
            self.metrics.consecutive_successes = 0
            self.metrics.last_failure_time = time.time()
            
            # Maintain sliding window
            self._update_sliding_window(call_duration, success=False)
            
            # State transitions on failure
            if self.state == CircuitBreakerState.CLOSED:
                if self._should_open_circuit():
                    self.state = CircuitBreakerState.OPEN
                    self.logger.warning(
                        "Circuit breaker OPENED",
                        failure_rate=self.metrics.failure_rate,
                        consecutive_failures=self.metrics.consecutive_failures
                    )
            elif self.state == CircuitBreakerState.HALF_OPEN:
                self.state = CircuitBreakerState.OPEN
                self.logger.warning("Circuit breaker back to OPEN from HALF_OPEN")
    
    def _should_open_circuit(self) -> bool:
        """Determine if circuit should be opened"""
        return (
            self.metrics.consecutive_failures >= self.config.failure_threshold or
            (self.metrics.total_calls >= self.config.sliding_window_size and
             self.metrics.failure_rate >= self.config.failure_rate_threshold)
        )
    
    def _should_attempt_reset(self) -> bool:
        """Determine if we should attempt to reset from OPEN state"""
        if not self.metrics.last_failure_time:
            return True
        
        return (time.time() - self.metrics.last_failure_time) >= self.config.timeout_seconds
    
    def _update_sliding_window(self, call_duration: float, success: bool):
        """Update sliding window for metrics"""
        self.metrics.call_times.append({
            'timestamp': time.time(),
            'duration': call_duration,
            'success': success
        })
        
        # Keep only recent calls within sliding window
        cutoff_time = time.time() - (self.config.sliding_window_size * 60)  # Assume 1 minute per window slot
        self.metrics.call_times = [
            call for call in self.metrics.call_times 
            if call['timestamp'] > cutoff_time
        ]
    
    def get_state(self) -> CircuitBreakerState:
        """Get current circuit breaker state"""
        return self.state
    
    def get_metrics(self) -> CircuitBreakerMetrics:
        """Get current metrics"""
        return self.metrics

class CircuitBreakerEventPublisher:
    def __init__(self, producer, dead_letter_service=None):
        self.producer = producer
        self.dead_letter_service = dead_letter_service
        self.logger = logger.bind(component="CircuitBreakerEventPublisher")
        
        # Create circuit breaker
        config = CircuitBreakerConfig(
            failure_threshold=5,
            failure_rate_threshold=0.5,
            timeout_seconds=30,
            success_threshold=3
        )
        self.circuit_breaker = CircuitBreaker(config)
    
    async def publish_with_circuit_breaker(self, topic: str, key: str, event: Any) -> dict:
        """Publish event with circuit breaker protection"""
        try:
            result = await self.circuit_breaker.call(
                self.producer.send_event,
                topic=topic,
                event=event,
                key=key
            )
            
            self.logger.debug(
                "Event published successfully with circuit breaker",
                topic=topic,
                key=key,
                circuit_state=self.circuit_breaker.get_state().value
            )
            
            return result
            
        except CircuitBreakerOpenException:
            self.logger.warning(
                "Circuit breaker is OPEN, handling event",
                topic=topic,
                key=key
            )
            
            # Handle circuit breaker open state
            await self._handle_circuit_breaker_open(topic, key, event)
            return {"status": "circuit_breaker_open"}
            
        except Exception as e:
            self.logger.error(
                "Event publishing failed through circuit breaker",
                topic=topic,
                key=key,
                error=str(e),
                circuit_state=self.circuit_breaker.get_state().value
            )
            raise
    
    async def _handle_circuit_breaker_open(self, topic: str, key: str, event: Any):
        """Handle circuit breaker open state"""
        if self.dead_letter_service:
            # Send to dead letter topic
            await self.dead_letter_service.send_to_dead_letter(
                original_topic=topic,
                key=key,
                original_event=event,
                error=CircuitBreakerOpenException("Circuit breaker is open"),
                retry_attempts=0
            )
        else:
            # Log for manual recovery
            self.logger.critical(
                "Circuit breaker open and no dead letter service configured",
                topic=topic,
                key=key,
                event=event
            )
    
    def get_health_status(self) -> dict:
        """Get health status including circuit breaker state"""
        state = self.circuit_breaker.get_state()
        metrics = self.circuit_breaker.get_metrics()
        
        return {
            "circuit_breaker_state": state.value,
            "failure_rate": metrics.failure_rate,
            "total_calls": metrics.total_calls,
            "consecutive_failures": metrics.consecutive_failures,
            "healthy": state != CircuitBreakerState.OPEN
        }

Diese umfassende Implementierung von Logging, Error Handling und Circuit Breaker Pattern zeigt production-ready Ansätze für robuste Event-Producer. Die Kombination aus strukturiertem Logging, Dead Letter Topics und Circuit Breaker Pattern bietet maximale Observability und Ausfallsicherheit in Event-Driven Systemen.


Status-Tracking: ✅ Kapitel “Logging, Fehlerbehandlung und Retry-Verhalten im Producer” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf Production-Ready Observability und Resilience Patterns