Produktive Event-Producer benötigen umfassende Observability, robuste Fehlerbehandlung und intelligente Retry-Mechanismen. Diese operationellen Aspekte sind entscheidend für die Wartbarkeit, Debugging-Fähigkeit und Ausfallsicherheit von Event-Driven Systemen.
Structured Logging ermöglicht maschinelle Auswertung von Log-Daten und erleichtert das Debugging komplexer Event-Flows. Correlation IDs und strukturierte Datenformate sind essentiell für das Tracing von Events durch verteilte Systeme.
// Logback Konfiguration für Structured Logging
// logback-spring.xml
/*
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<springProfile name="!local">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp/>
<logLevel/>
<loggerName/>
<message/>
<mdc/>
<arguments/>
<stackTrace/>
</providers>
</encoder>
</appender>
</springProfile>
<springProfile name="local">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level [%X{correlationId:-}] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
</springProfile>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
<!-- Event-specific logging -->
<logger name="de.eda.training.events" level="DEBUG"/>
<logger name="org.springframework.kafka" level="INFO"/>
</configuration>
*/
// Event Logging Service
@Component
@Slf4j
public class EventLoggingService {
private final MeterRegistry meterRegistry;
private final Counter eventPublishedCounter;
private final Counter eventFailedCounter;
private final Timer eventPublishLatency;
public EventLoggingService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.eventPublishedCounter = Counter.builder("events.published")
.description("Total events published")
.register(meterRegistry);
this.eventFailedCounter = Counter.builder("events.failed")
.description("Total events failed")
.register(meterRegistry);
this.eventPublishLatency = Timer.builder("events.publish.duration")
.description("Event publishing duration")
.register(meterRegistry);
}
public void logEventPublishStart(String eventType, String businessKey, String correlationId) {
// Set MDC for correlation tracking
MDC.put("correlationId", correlationId);
MDC.put("eventType", eventType);
MDC.put("businessKey", businessKey);
log.info("Event publishing started",
kv("eventType", eventType),
kv("businessKey", businessKey),
kv("correlationId", correlationId),
kv("stage", "publish_start"));
}
public void logEventPublishSuccess(String eventType, String businessKey,
RecordMetadata metadata, Duration duration) {
eventPublishedCounter.increment(
Tags.of("eventType", eventType, "topic", metadata.topic())
);
eventPublishLatency.record(duration);
log.info("Event published successfully",
kv("eventType", eventType),
kv("businessKey", businessKey),
kv("topic", metadata.topic()),
kv("partition", metadata.partition()),
kv("offset", metadata.offset()),
kv("timestamp", metadata.timestamp()),
kv("duration_ms", duration.toMillis()),
kv("stage", "publish_success"));
// Clear MDC
MDC.clear();
}
public void logEventPublishFailure(String eventType, String businessKey,
String topic, Exception error, Duration duration) {
eventFailedCounter.increment(
Tags.of("eventType", eventType, "topic", topic, "errorType", error.getClass().getSimpleName())
);
log.error("Event publishing failed",
kv("eventType", eventType),
kv("businessKey", businessKey),
kv("topic", topic),
kv("duration_ms", duration.toMillis()),
kv("errorType", error.getClass().getSimpleName()),
kv("errorMessage", error.getMessage()),
kv("stage", "publish_failure"),
error);
// Clear MDC
MDC.clear();
}
public void logRetryAttempt(String eventType, String businessKey, int attempt,
int maxAttempts, Duration nextRetryDelay) {
log.warn("Event publishing retry attempt",
kv("eventType", eventType),
kv("businessKey", businessKey),
kv("attempt", attempt),
kv("maxAttempts", maxAttempts),
kv("nextRetryDelay_ms", nextRetryDelay.toMillis()),
kv("stage", "retry_attempt"));
}
public void logDeadLetterSend(String eventType, String businessKey,
String originalTopic, String deadLetterTopic, String reason) {
log.error("Event sent to dead letter topic",
kv("eventType", eventType),
kv("businessKey", businessKey),
kv("originalTopic", originalTopic),
kv("deadLetterTopic", deadLetterTopic),
kv("reason", reason),
kv("stage", "dead_letter"));
}
// Utility method for structured key-value logging
private static KeyValue kv(String key, Object value) {
return KeyValue.of(key, value);
}
}
// Correlation ID Filter für Web Requests
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class CorrelationIdFilter implements Filter {
private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
private static final String CORRELATION_ID_MDC_KEY = "correlationId";
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// Get or generate correlation ID
String correlationId = httpRequest.getHeader(CORRELATION_ID_HEADER);
if (correlationId == null || correlationId.trim().isEmpty()) {
correlationId = UUID.randomUUID().toString();
}
// Set in MDC and response header
MDC.put(CORRELATION_ID_MDC_KEY, correlationId);
httpResponse.setHeader(CORRELATION_ID_HEADER, correlationId);
try {
chain.doFilter(request, response);
} finally {
MDC.clear();
}
}
}
// Event Tracing Aspect
@Aspect
@Component
@Slf4j
public class EventTracingAspect {
private final EventLoggingService eventLoggingService;
@Around("@annotation(EventTrace)")
public Object traceEventPublishing(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
String methodName = joinPoint.getSignature().getName();
Object[] args = joinPoint.getArgs();
// Extract event information from method arguments
String eventType = extractEventType(args);
String businessKey = extractBusinessKey(args);
String correlationId = MDC.get("correlationId");
eventLoggingService.logEventPublishStart(eventType, businessKey, correlationId);
try {
Object result = joinPoint.proceed();
Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);
// Log success if result indicates successful publishing
if (result instanceof SendResult) {
SendResult<String, Object> sendResult = (SendResult<String, Object>) result;
eventLoggingService.logEventPublishSuccess(eventType, businessKey,
sendResult.getRecordMetadata(), duration);
}
return result;
} catch (Exception e) {
Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);
eventLoggingService.logEventPublishFailure(eventType, businessKey,
"unknown", e, duration);
throw e;
}
}
private String extractEventType(Object[] args) {
// Extract event type from method arguments
for (Object arg : args) {
if (arg != null && arg.getClass().getSimpleName().endsWith("Event")) {
return arg.getClass().getSimpleName();
}
}
return "UnknownEvent";
}
private String extractBusinessKey(Object[] args) {
// Extract business key (usually order ID, customer ID, etc.)
for (Object arg : args) {
if (arg instanceof String) {
return (String) arg;
}
}
return "unknown";
}
}
// Event Trace Annotation
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface EventTrace {
}# Python Structured Logging mit structlog
import structlog
import logging
import json
import uuid
import time
import contextvars
from datetime import datetime
from typing import Optional, Dict, Any
from prometheus_client import Counter, Histogram, generate_latest
# Correlation ID Context Variable
correlation_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
'correlation_id', default=None
)
# Configure structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Metrics
event_published_counter = Counter(
'events_published_total',
'Total events published',
['event_type', 'topic', 'status']
)
event_publish_duration = Histogram(
'event_publish_duration_seconds',
'Event publishing duration',
['event_type', 'topic']
)
class EventLoggingService:
def __init__(self):
self.logger = structlog.get_logger()
def log_event_publish_start(self, event_type: str, business_key: str,
correlation_id: Optional[str] = None):
"""Log the start of event publishing"""
if not correlation_id:
correlation_id = str(uuid.uuid4())
# Set correlation ID in context
correlation_id_var.set(correlation_id)
self.logger.info(
"Event publishing started",
event_type=event_type,
business_key=business_key,
correlation_id=correlation_id,
stage="publish_start",
timestamp=datetime.utcnow().isoformat()
)
return correlation_id
def log_event_publish_success(self, event_type: str, business_key: str,
topic: str, partition: int, offset: int,
duration_seconds: float):
"""Log successful event publishing"""
# Record metrics
event_published_counter.labels(
event_type=event_type,
topic=topic,
status='success'
).inc()
event_publish_duration.labels(
event_type=event_type,
topic=topic
).observe(duration_seconds)
self.logger.info(
"Event published successfully",
event_type=event_type,
business_key=business_key,
topic=topic,
partition=partition,
offset=offset,
duration_seconds=duration_seconds,
stage="publish_success",
correlation_id=correlation_id_var.get()
)
def log_event_publish_failure(self, event_type: str, business_key: str,
topic: str, error: Exception,
duration_seconds: float):
"""Log failed event publishing"""
# Record metrics
event_published_counter.labels(
event_type=event_type,
topic=topic,
status='failure'
).inc()
self.logger.error(
"Event publishing failed",
event_type=event_type,
business_key=business_key,
topic=topic,
duration_seconds=duration_seconds,
error_type=type(error).__name__,
error_message=str(error),
stage="publish_failure",
correlation_id=correlation_id_var.get(),
exc_info=True
)
def log_retry_attempt(self, event_type: str, business_key: str,
attempt: int, max_attempts: int,
next_retry_delay_seconds: float):
"""Log retry attempt"""
self.logger.warning(
"Event publishing retry attempt",
event_type=event_type,
business_key=business_key,
attempt=attempt,
max_attempts=max_attempts,
next_retry_delay_seconds=next_retry_delay_seconds,
stage="retry_attempt",
correlation_id=correlation_id_var.get()
)
def log_dead_letter_send(self, event_type: str, business_key: str,
original_topic: str, dead_letter_topic: str,
reason: str):
"""Log sending event to dead letter topic"""
self.logger.error(
"Event sent to dead letter topic",
event_type=event_type,
business_key=business_key,
original_topic=original_topic,
dead_letter_topic=dead_letter_topic,
reason=reason,
stage="dead_letter",
correlation_id=correlation_id_var.get()
)
# Event Tracing Decorator
import functools
import asyncio
def event_trace(func):
"""Decorator for tracing event publishing methods"""
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
logging_service = EventLoggingService()
# Extract event info from arguments
event_type = _extract_event_type(args, kwargs)
business_key = _extract_business_key(args, kwargs)
correlation_id = logging_service.log_event_publish_start(
event_type, business_key
)
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
duration = time.time() - start_time
# Extract result info
if isinstance(result, dict) and 'topic' in result:
logging_service.log_event_publish_success(
event_type, business_key,
result['topic'],
result.get('partition', -1),
result.get('offset', -1),
duration
)
return result
except Exception as e:
duration = time.time() - start_time
logging_service.log_event_publish_failure(
event_type, business_key, "unknown", e, duration
)
raise
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
# Sync version of the wrapper
start_time = time.time()
logging_service = EventLoggingService()
event_type = _extract_event_type(args, kwargs)
business_key = _extract_business_key(args, kwargs)
correlation_id = logging_service.log_event_publish_start(
event_type, business_key
)
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
if isinstance(result, dict) and 'topic' in result:
logging_service.log_event_publish_success(
event_type, business_key,
result['topic'],
result.get('partition', -1),
result.get('offset', -1),
duration
)
return result
except Exception as e:
duration = time.time() - start_time
logging_service.log_event_publish_failure(
event_type, business_key, "unknown", e, duration
)
raise
# Return appropriate wrapper based on function type
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
def _extract_event_type(args, kwargs) -> str:
"""Extract event type from function arguments"""
# Look for event objects in arguments
for arg in args:
if hasattr(arg, '__class__') and arg.__class__.__name__.endswith('Event'):
return arg.__class__.__name__
# Look in kwargs
for key, value in kwargs.items():
if key == 'event_type':
return str(value)
if hasattr(value, '__class__') and value.__class__.__name__.endswith('Event'):
return value.__class__.__name__
return "UnknownEvent"
def _extract_business_key(args, kwargs) -> str:
"""Extract business key from function arguments"""
# Look for common business key names
business_key_names = ['order_id', 'customer_id', 'business_key', 'key']
for key, value in kwargs.items():
if key in business_key_names:
return str(value)
# Look for string arguments that might be business keys
for arg in args:
if isinstance(arg, str) and len(arg) > 0:
return arg
return "unknown"
# Correlation ID Middleware for FastAPI
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
class CorrelationIdMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Get or generate correlation ID
correlation_id = request.headers.get('x-correlation-id')
if not correlation_id:
correlation_id = str(uuid.uuid4())
# Set in context variable
correlation_id_var.set(correlation_id)
# Process request
response = await call_next(request)
# Add correlation ID to response
response.headers['x-correlation-id'] = correlation_id
return responseDead Letter Topics fangen Events auf, die nicht erfolgreich verarbeitet werden können. Sie sind essentiell für das Management von problematischen Events und ermöglichen manuelle Nachbearbeitung.
// Dead Letter Topic Configuration
@Configuration
public class DeadLetterTopicConfig {
@Value("${app.events.dead-letter.enabled:true}")
private boolean deadLetterEnabled;
@Value("${app.events.dead-letter.retention-hours:168}") // 7 days
private int retentionHours;
@Bean
public NewTopic createDeadLetterTopics(EventProperties eventProperties) {
if (!deadLetterEnabled) {
return null;
}
// Create dead letter topics for each main topic
return TopicBuilder.name("dead-letter-events")
.partitions(3)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(retentionHours * 60 * 60 * 1000))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")
.build();
}
}
// Dead Letter Service
@Service
@Slf4j
public class DeadLetterService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final EventLoggingService eventLoggingService;
private final DeadLetterMetrics deadLetterMetrics;
private static final String DEAD_LETTER_TOPIC = "dead-letter-events";
public void sendToDeadLetter(String originalTopic, String key, Object originalEvent,
Exception error, int retryAttempts) {
DeadLetterEvent deadLetterEvent = DeadLetterEvent.builder()
.deadLetterEventId(UUID.randomUUID().toString())
.originalTopic(originalTopic)
.originalKey(key)
.originalEvent(originalEvent)
.failureReason(error.getMessage())
.failureType(error.getClass().getSimpleName())
.retryAttempts(retryAttempts)
.firstFailureTime(Instant.now())
.deadLetterTime(Instant.now())
.correlationId(MDC.get("correlationId"))
.stackTrace(ExceptionUtils.getStackTrace(error))
.build();
try {
kafkaTemplate.send(DEAD_LETTER_TOPIC, key, deadLetterEvent)
.addCallback(
result -> {
deadLetterMetrics.incrementDeadLetterSuccess(originalTopic);
eventLoggingService.logDeadLetterSend(
extractEventType(originalEvent),
key,
originalTopic,
DEAD_LETTER_TOPIC,
error.getMessage()
);
},
failure -> {
deadLetterMetrics.incrementDeadLetterFailure(originalTopic);
log.error("Failed to send event to dead letter topic", failure);
// Fallback: log to file or database
logToFileSystem(deadLetterEvent);
}
);
} catch (Exception e) {
log.error("Critical failure: Cannot send to dead letter topic", e);
// Emergency fallback
logToFileSystem(deadLetterEvent);
}
}
private void logToFileSystem(DeadLetterEvent event) {
// Emergency fallback: write to local file
try {
String filename = String.format("dead-letter-%s.json",
event.getDeadLetterEventId());
String json = new ObjectMapper().writeValueAsString(event);
Files.write(Paths.get("/var/log/dead-letters", filename),
json.getBytes(), StandardOpenOption.CREATE);
} catch (Exception e) {
log.error("Emergency dead letter logging failed", e);
}
}
@Data
@Builder
public static class DeadLetterEvent {
private String deadLetterEventId;
private String originalTopic;
private String originalKey;
private Object originalEvent;
private String failureReason;
private String failureType;
private int retryAttempts;
private Instant firstFailureTime;
private Instant deadLetterTime;
private String correlationId;
private String stackTrace;
// Additional metadata for debugging
private Map<String, String> systemInfo = Map.of(
"hostname", getHostname(),
"applicationVersion", getApplicationVersion(),
"javaVersion", System.getProperty("java.version")
);
}
private String extractEventType(Object event) {
return event != null ? event.getClass().getSimpleName() : "Unknown";
}
private static String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown";
}
}
private static String getApplicationVersion() {
return DeadLetterService.class.getPackage().getImplementationVersion();
}
}
// Dead Letter Recovery Service
@Service
public class DeadLetterRecoveryService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final DeadLetterRepository deadLetterRepository;
@KafkaListener(topics = "dead-letter-events")
public void handleDeadLetterEvent(DeadLetterEvent deadLetterEvent) {
// Store dead letter event for later analysis
deadLetterRepository.save(convertToEntity(deadLetterEvent));
}
@Scheduled(fixedDelay = 300000) // Every 5 minutes
public void retryRecoverableEvents() {
List<DeadLetterEntity> recoverableEvents =
deadLetterRepository.findRecoverableEvents();
for (DeadLetterEntity deadLetter : recoverableEvents) {
if (isRecoverable(deadLetter)) {
try {
// Attempt to republish
kafkaTemplate.send(deadLetter.getOriginalTopic(),
deadLetter.getOriginalKey(),
deadLetter.getOriginalEvent());
deadLetter.setRecoveryAttempted(true);
deadLetter.setRecoveryTime(Instant.now());
deadLetterRepository.save(deadLetter);
} catch (Exception e) {
log.warn("Dead letter recovery failed", e);
}
}
}
}
private boolean isRecoverable(DeadLetterEntity deadLetter) {
// Define business rules for when events can be recovered
return !deadLetter.isRecoveryAttempted() &&
deadLetter.getRetryAttempts() < 5 &&
deadLetter.getDeadLetterTime().isAfter(
Instant.now().minus(Duration.ofHours(24))
);
}
}# Python Dead Letter Service
import json
import traceback
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Any, Dict, Optional
import structlog
logger = structlog.get_logger()
@dataclass
class DeadLetterEvent:
dead_letter_event_id: str
original_topic: str
original_key: str
original_event: Dict[str, Any]
failure_reason: str
failure_type: str
retry_attempts: int
first_failure_time: str
dead_letter_time: str
correlation_id: Optional[str]
stack_trace: str
system_info: Dict[str, str]
class DeadLetterService:
def __init__(self, producer, metrics_service=None):
self.producer = producer
self.metrics_service = metrics_service
self.logger = logger.bind(component="DeadLetterService")
self.dead_letter_topic = "dead-letter-events"
async def send_to_dead_letter(self, original_topic: str, key: str,
original_event: Any, error: Exception,
retry_attempts: int):
"""Send event to dead letter topic"""
dead_letter_event = DeadLetterEvent(
dead_letter_event_id=str(uuid.uuid4()),
original_topic=original_topic,
original_key=key,
original_event=self._serialize_event(original_event),
failure_reason=str(error),
failure_type=type(error).__name__,
retry_attempts=retry_attempts,
first_failure_time=datetime.utcnow().isoformat(),
dead_letter_time=datetime.utcnow().isoformat(),
correlation_id=correlation_id_var.get(),
stack_trace=traceback.format_exc(),
system_info=self._get_system_info()
)
try:
# Send to dead letter topic
result = await self.producer.send_event(
topic=self.dead_letter_topic,
event=asdict(dead_letter_event),
key=key
)
if self.metrics_service:
self.metrics_service.increment_dead_letter_success(original_topic)
self.logger.error(
"Event sent to dead letter topic",
dead_letter_event_id=dead_letter_event.dead_letter_event_id,
original_topic=original_topic,
original_key=key,
failure_reason=dead_letter_event.failure_reason,
retry_attempts=retry_attempts
)
except Exception as dead_letter_error:
if self.metrics_service:
self.metrics_service.increment_dead_letter_failure(original_topic)
self.logger.critical(
"Failed to send event to dead letter topic",
original_topic=original_topic,
original_key=key,
dead_letter_error=str(dead_letter_error)
)
# Emergency fallback: write to file
await self._write_to_filesystem(dead_letter_event)
def _serialize_event(self, event: Any) -> Dict[str, Any]:
"""Serialize event for dead letter storage"""
if hasattr(event, '__dict__'):
return event.__dict__
elif hasattr(event, '__dataclass_fields__'):
return asdict(event)
elif isinstance(event, dict):
return event
else:
return {"serialized_value": str(event)}
def _get_system_info(self) -> Dict[str, str]:
"""Get system information for debugging"""
import platform
import os
return {
"hostname": platform.node(),
"python_version": platform.python_version(),
"platform": platform.platform(),
"pid": str(os.getpid())
}
async def _write_to_filesystem(self, dead_letter_event: DeadLetterEvent):
"""Emergency fallback: write to local filesystem"""
try:
import aiofiles
import os
dead_letter_dir = "/var/log/dead-letters"
os.makedirs(dead_letter_dir, exist_ok=True)
filename = f"dead-letter-{dead_letter_event.dead_letter_event_id}.json"
filepath = os.path.join(dead_letter_dir, filename)
async with aiofiles.open(filepath, 'w') as f:
await f.write(json.dumps(asdict(dead_letter_event), indent=2))
self.logger.warning(
"Dead letter event written to filesystem",
filepath=filepath
)
except Exception as fs_error:
self.logger.critical(
"Emergency filesystem write failed",
error=str(fs_error)
)
class DeadLetterRecoveryService:
def __init__(self, producer, consumer, database):
self.producer = producer
self.consumer = consumer
self.database = database
self.logger = logger.bind(component="DeadLetterRecoveryService")
async def handle_dead_letter_event(self, dead_letter_event: Dict[str, Any]):
"""Handle incoming dead letter events"""
# Store in database for analysis
await self.database.store_dead_letter_event(dead_letter_event)
self.logger.info(
"Dead letter event stored for analysis",
dead_letter_event_id=dead_letter_event.get('dead_letter_event_id'),
original_topic=dead_letter_event.get('original_topic')
)
async def retry_recoverable_events(self):
"""Periodically retry recoverable events"""
recoverable_events = await self.database.get_recoverable_events()
for event in recoverable_events:
if await self._is_recoverable(event):
try:
# Attempt republish
await self.producer.send_event(
topic=event['original_topic'],
event=event['original_event'],
key=event['original_key']
)
# Mark as recovery attempted
await self.database.mark_recovery_attempted(
event['dead_letter_event_id']
)
self.logger.info(
"Dead letter event recovery successful",
dead_letter_event_id=event['dead_letter_event_id']
)
except Exception as recovery_error:
self.logger.warning(
"Dead letter recovery failed",
dead_letter_event_id=event['dead_letter_event_id'],
error=str(recovery_error)
)
async def _is_recoverable(self, event: Dict[str, Any]) -> bool:
"""Determine if event is recoverable"""
# Business rules for recovery
return (
not event.get('recovery_attempted', False) and
event.get('retry_attempts', 0) < 5 and
datetime.fromisoformat(event['dead_letter_time']) >
datetime.utcnow() - timedelta(hours=24)
)Circuit Breaker Pattern verhindert kaskadische Ausfälle, indem es fehlerhafte Services isoliert. Bei Event-Produktion schützt es vor Broker-Ausfällen und reduziert die Systemlast während Störungen.
// Spring Boot Circuit Breaker mit Resilience4j
// pom.xml dependency
/*
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.7.1</version>
</dependency>
*/
// Circuit Breaker Configuration
@Configuration
public class CircuitBreakerConfig {
@Bean
public CircuitBreaker eventPublishingCircuitBreaker() {
return CircuitBreaker.ofDefaults("eventPublishing");
}
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 50% failure rate
.waitDurationInOpenState(Duration.ofSeconds(30)) // Wait 30s in open state
.slidingWindowSize(10) // Consider last 10 calls
.minimumNumberOfCalls(5) // Minimum 5 calls before evaluation
.permittedNumberOfCallsInHalfOpenState(3) // 3 calls in half-open state
.automaticTransitionFromOpenToHalfOpenEnabled(true)
.build();
return CircuitBreakerRegistry.of(config);
}
}
// application.yml
/*
resilience4j:
circuitbreaker:
instances:
eventPublishing:
failure-rate-threshold: 50
wait-duration-in-open-state: 30s
sliding-window-size: 10
minimum-number-of-calls: 5
permitted-number-of-calls-in-half-open-state: 3
automatic-transition-from-open-to-half-open-enabled: true
record-exceptions:
- org.apache.kafka.common.errors.TimeoutException
- org.apache.kafka.common.errors.RetriableException
ignore-exceptions:
- org.apache.kafka.common.errors.SerializationException
metrics:
enabled: true
events:
enabled: true
*/
// Circuit Breaker Event Publisher
@Component
@Slf4j
public class CircuitBreakerEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final CircuitBreaker circuitBreaker;
private final EventLoggingService eventLoggingService;
private final DeadLetterService deadLetterService;
public CircuitBreakerEventPublisher(KafkaTemplate<String, Object> kafkaTemplate,
CircuitBreakerRegistry circuitBreakerRegistry,
EventLoggingService eventLoggingService,
DeadLetterService deadLetterService) {
this.kafkaTemplate = kafkaTemplate;
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker("eventPublishing");
this.eventLoggingService = eventLoggingService;
this.deadLetterService = deadLetterService;
// Register event listeners
registerCircuitBreakerEventListeners();
}
public CompletableFuture<SendResult<String, Object>> publishWithCircuitBreaker(
String topic, String key, Object event) {
// Wrap Kafka publishing with circuit breaker
Supplier<CompletableFuture<SendResult<String, Object>>> publishSupplier =
CircuitBreaker.decorateSupplier(circuitBreaker, () -> {
return kafkaTemplate.send(topic, key, event).completable();
});
return publishSupplier.get()
.exceptionally(throwable -> {
// Handle circuit breaker open state
if (throwable instanceof CallNotPermittedException) {
log.warn("Circuit breaker is OPEN, event publishing blocked",
kv("topic", topic),
kv("key", key),
kv("eventType", event.getClass().getSimpleName()));
// Send to dead letter or queue for later
handleCircuitBreakerOpen(topic, key, event);
return null;
}
log.error("Event publishing failed", throwable);
throw new CompletionException(throwable);
});
}
private void handleCircuitBreakerOpen(String topic, String key, Object event) {
// Strategy 1: Send to dead letter topic
deadLetterService.sendToDeadLetter(topic, key, event,
new CircuitBreakerOpenException("Circuit breaker is open"), 0);
// Strategy 2: Queue for later retry (alternative)
// queueForLaterRetry(topic, key, event);
}
private void registerCircuitBreakerEventListeners() {
circuitBreaker.getEventPublisher()
.onStateTransition(event -> {
log.info("Circuit breaker state transition",
kv("from", event.getStateTransition().getFromState()),
kv("to", event.getStateTransition().getToState()),
kv("circuitBreaker", "eventPublishing"));
})
.onFailureRateExceeded(event -> {
log.warn("Circuit breaker failure rate exceeded",
kv("failureRate", event.getFailureRate()),
kv("threshold", circuitBreaker.getCircuitBreakerConfig().getFailureRateThreshold()));
})
.onCallNotPermitted(event -> {
log.warn("Circuit breaker call not permitted - OPEN state");
});
}
// Health check endpoint for circuit breaker state
@Component
public static class CircuitBreakerHealthIndicator implements HealthIndicator {
private final CircuitBreaker circuitBreaker;
public CircuitBreakerHealthIndicator(CircuitBreakerRegistry registry) {
this.circuitBreaker = registry.circuitBreaker("eventPublishing");
}
@Override
public Health health() {
CircuitBreaker.State state = circuitBreaker.getState();
switch (state) {
case CLOSED:
return Health.up()
.withDetail("circuitBreaker", "CLOSED")
.withDetail("failureRate", circuitBreaker.getMetrics().getFailureRate())
.build();
case OPEN:
return Health.down()
.withDetail("circuitBreaker", "OPEN")
.withDetail("failureRate", circuitBreaker.getMetrics().getFailureRate())
.build();
case HALF_OPEN:
return Health.up()
.withDetail("circuitBreaker", "HALF_OPEN")
.withDetail("failureRate", circuitBreaker.getMetrics().getFailureRate())
.build();
default:
return Health.unknown()
.withDetail("circuitBreaker", state.toString())
.build();
}
}
}
public static class CircuitBreakerOpenException extends RuntimeException {
public CircuitBreakerOpenException(String message) {
super(message);
}
}
}# Python Circuit Breaker Implementation
import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
import structlog
logger = structlog.get_logger()
class CircuitBreakerState(Enum):
CLOSED = "CLOSED"
OPEN = "OPEN"
HALF_OPEN = "HALF_OPEN"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Failures before opening
failure_rate_threshold: float = 0.5 # 50% failure rate
timeout_seconds: int = 30 # Time to wait in OPEN state
success_threshold: int = 3 # Successes to close from HALF_OPEN
sliding_window_size: int = 10 # Size of sliding window
@dataclass
class CircuitBreakerMetrics:
total_calls: int = 0
success_calls: int = 0
failure_calls: int = 0
last_failure_time: Optional[float] = None
last_success_time: Optional[float] = None
consecutive_failures: int = 0
consecutive_successes: int = 0
call_times: list = field(default_factory=list)
@property
def failure_rate(self) -> float:
if self.total_calls == 0:
return 0.0
return self.failure_calls / self.total_calls
class CircuitBreakerOpenException(Exception):
pass
class CircuitBreaker:
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self.state = CircuitBreakerState.CLOSED
self.metrics = CircuitBreakerMetrics()
self.logger = logger.bind(component="CircuitBreaker")
self._lock = asyncio.Lock()
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
async with self._lock:
if self.state == CircuitBreakerState.OPEN:
if self._should_attempt_reset():
self.state = CircuitBreakerState.HALF_OPEN
self.logger.info("Circuit breaker transitioning to HALF_OPEN")
else:
self.logger.warning("Circuit breaker is OPEN, call not permitted")
raise CircuitBreakerOpenException("Circuit breaker is OPEN")
start_time = time.time()
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
# Record success
await self._record_success(time.time() - start_time)
return result
except Exception as e:
# Record failure
await self._record_failure(time.time() - start_time)
raise e
async def _record_success(self, call_duration: float):
"""Record successful call"""
async with self._lock:
self.metrics.total_calls += 1
self.metrics.success_calls += 1
self.metrics.consecutive_successes += 1
self.metrics.consecutive_failures = 0
self.metrics.last_success_time = time.time()
# Maintain sliding window
self._update_sliding_window(call_duration, success=True)
# State transitions on success
if self.state == CircuitBreakerState.HALF_OPEN:
if self.metrics.consecutive_successes >= self.config.success_threshold:
self.state = CircuitBreakerState.CLOSED
self.logger.info("Circuit breaker CLOSED after successful calls")
async def _record_failure(self, call_duration: float):
"""Record failed call"""
async with self._lock:
self.metrics.total_calls += 1
self.metrics.failure_calls += 1
self.metrics.consecutive_failures += 1
self.metrics.consecutive_successes = 0
self.metrics.last_failure_time = time.time()
# Maintain sliding window
self._update_sliding_window(call_duration, success=False)
# State transitions on failure
if self.state == CircuitBreakerState.CLOSED:
if self._should_open_circuit():
self.state = CircuitBreakerState.OPEN
self.logger.warning(
"Circuit breaker OPENED",
failure_rate=self.metrics.failure_rate,
consecutive_failures=self.metrics.consecutive_failures
)
elif self.state == CircuitBreakerState.HALF_OPEN:
self.state = CircuitBreakerState.OPEN
self.logger.warning("Circuit breaker back to OPEN from HALF_OPEN")
def _should_open_circuit(self) -> bool:
"""Determine if circuit should be opened"""
return (
self.metrics.consecutive_failures >= self.config.failure_threshold or
(self.metrics.total_calls >= self.config.sliding_window_size and
self.metrics.failure_rate >= self.config.failure_rate_threshold)
)
def _should_attempt_reset(self) -> bool:
"""Determine if we should attempt to reset from OPEN state"""
if not self.metrics.last_failure_time:
return True
return (time.time() - self.metrics.last_failure_time) >= self.config.timeout_seconds
def _update_sliding_window(self, call_duration: float, success: bool):
"""Update sliding window for metrics"""
self.metrics.call_times.append({
'timestamp': time.time(),
'duration': call_duration,
'success': success
})
# Keep only recent calls within sliding window
cutoff_time = time.time() - (self.config.sliding_window_size * 60) # Assume 1 minute per window slot
self.metrics.call_times = [
call for call in self.metrics.call_times
if call['timestamp'] > cutoff_time
]
def get_state(self) -> CircuitBreakerState:
"""Get current circuit breaker state"""
return self.state
def get_metrics(self) -> CircuitBreakerMetrics:
"""Get current metrics"""
return self.metrics
class CircuitBreakerEventPublisher:
def __init__(self, producer, dead_letter_service=None):
self.producer = producer
self.dead_letter_service = dead_letter_service
self.logger = logger.bind(component="CircuitBreakerEventPublisher")
# Create circuit breaker
config = CircuitBreakerConfig(
failure_threshold=5,
failure_rate_threshold=0.5,
timeout_seconds=30,
success_threshold=3
)
self.circuit_breaker = CircuitBreaker(config)
async def publish_with_circuit_breaker(self, topic: str, key: str, event: Any) -> dict:
"""Publish event with circuit breaker protection"""
try:
result = await self.circuit_breaker.call(
self.producer.send_event,
topic=topic,
event=event,
key=key
)
self.logger.debug(
"Event published successfully with circuit breaker",
topic=topic,
key=key,
circuit_state=self.circuit_breaker.get_state().value
)
return result
except CircuitBreakerOpenException:
self.logger.warning(
"Circuit breaker is OPEN, handling event",
topic=topic,
key=key
)
# Handle circuit breaker open state
await self._handle_circuit_breaker_open(topic, key, event)
return {"status": "circuit_breaker_open"}
except Exception as e:
self.logger.error(
"Event publishing failed through circuit breaker",
topic=topic,
key=key,
error=str(e),
circuit_state=self.circuit_breaker.get_state().value
)
raise
async def _handle_circuit_breaker_open(self, topic: str, key: str, event: Any):
"""Handle circuit breaker open state"""
if self.dead_letter_service:
# Send to dead letter topic
await self.dead_letter_service.send_to_dead_letter(
original_topic=topic,
key=key,
original_event=event,
error=CircuitBreakerOpenException("Circuit breaker is open"),
retry_attempts=0
)
else:
# Log for manual recovery
self.logger.critical(
"Circuit breaker open and no dead letter service configured",
topic=topic,
key=key,
event=event
)
def get_health_status(self) -> dict:
"""Get health status including circuit breaker state"""
state = self.circuit_breaker.get_state()
metrics = self.circuit_breaker.get_metrics()
return {
"circuit_breaker_state": state.value,
"failure_rate": metrics.failure_rate,
"total_calls": metrics.total_calls,
"consecutive_failures": metrics.consecutive_failures,
"healthy": state != CircuitBreakerState.OPEN
}Diese umfassende Implementierung von Logging, Error Handling und Circuit Breaker Pattern zeigt production-ready Ansätze für robuste Event-Producer. Die Kombination aus strukturiertem Logging, Dead Letter Topics und Circuit Breaker Pattern bietet maximale Observability und Ausfallsicherheit in Event-Driven Systemen.
Status-Tracking: ✅ Kapitel “Logging, Fehlerbehandlung und Retry-Verhalten im Producer” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf Production-Ready Observability und Resilience Patterns