65 Logging-Strategien in Event-getriebenen Systemen

Event-Driven Architecture stellt besondere Anforderungen an das Logging. Anders als bei klassischen Request-Response-Systemen verlaufen Verarbeitungsflows über mehrere Services und Events, die zeitlich versetzt auftreten können. Ein effektives Logging-System muss diese asynchronen, verteilten Abläufe nachvollziehbar machen.

65.1 Structured Logging

65.1.1 JSON-basierte Log-Struktur

Structured Logging ist in EDA unverzichtbar, da Events und Services kontextreiche Informationen enthalten, die strukturiert erfasst werden müssen:

Spring Boot Logback-Konfiguration:

<!-- logback-spring.xml -->
<configuration>
    <springProfile name="!local">
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
                <providers>
                    <timestamp/>
                    <logLevel/>
                    <loggerName/>
                    <message/>
                    <mdc/>
                    <arguments/>
                    <stackTrace/>
                </providers>
            </encoder>
        </appender>
    </springProfile>
    
    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

Event-spezifisches Logging in Java:

@Service
public class OrderEventLogger {
    
    private static final Logger log = LoggerFactory.getLogger(OrderEventLogger.class);
    
    @EventListener
    public void onOrderPlaced(OrderPlaced event) {
        MDC.put("event.type", "OrderPlaced");
        MDC.put("event.id", event.getEventId());
        MDC.put("order.id", event.getOrderId());
        MDC.put("customer.id", event.getCustomerId());
        MDC.put("order.amount", event.getTotalAmount().toString());
        
        log.info("Order placed successfully", 
            kv("business.context", "order-processing"),
            kv("order.items.count", event.getItems().size()),
            kv("order.payment.method", event.getPaymentMethod()));
        
        MDC.clear();
    }
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(@Payload OrderPlaced order, 
                                  @Header Map<String, Object> headers) {
        
        // Kafka-spezifische Metadaten loggen
        MDC.put("kafka.topic", (String) headers.get("kafka_receivedTopic"));
        MDC.put("kafka.partition", headers.get("kafka_receivedPartitionId").toString());
        MDC.put("kafka.offset", headers.get("kafka_offset").toString());
        
        try {
            processOrder(order);
            
            log.info("Order processing completed",
                kv("processing.duration.ms", getDurationMs()),
                kv("processing.result", "success"));
                
        } catch (Exception e) {
            log.error("Order processing failed", 
                kv("error.type", e.getClass().getSimpleName()),
                kv("error.message", e.getMessage()),
                e);
        } finally {
            MDC.clear();
        }
    }
}

Python Structured Logging:

import logging
import json
import time
from typing import Dict, Any
from contextvars import ContextVar

# Context für Request/Event Correlation
request_context: ContextVar[Dict[str, Any]] = ContextVar('request_context', default={})

class StructuredFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            'timestamp': self.formatTime(record),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        # Context-Informationen hinzufügen
        context = request_context.get({})
        if context:
            log_entry['context'] = context
            
        # Exception-Informationen
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)
            
        # Custom Attribute aus LogRecord
        for key, value in record.__dict__.items():
            if key.startswith('event_') or key.startswith('business_'):
                log_entry[key] = value
                
        return json.dumps(log_entry, default=str)

class OrderProcessor:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
    async def process_order_event(self, order_data: dict, kafka_metadata: dict):
        # Event-Context setzen
        context = {
            'event_type': 'OrderPlaced',
            'event_id': order_data.get('eventId'),
            'order_id': order_data.get('orderId'),
            'customer_id': order_data.get('customerId'),
            'kafka_topic': kafka_metadata.get('topic'),
            'kafka_partition': kafka_metadata.get('partition'),
            'kafka_offset': kafka_metadata.get('offset')
        }
        request_context.set(context)
        
        start_time = time.time()
        
        try:
            self.logger.info(
                "Starting order processing",
                extra={
                    'business_event': 'order_processing_started',
                    'event_order_amount': order_data.get('totalAmount'),
                    'event_items_count': len(order_data.get('items', []))
                }
            )
            
            await self.validate_order(order_data)
            await self.reserve_inventory(order_data)
            await self.process_payment(order_data)
            
            processing_duration = time.time() - start_time
            
            self.logger.info(
                "Order processing completed successfully",
                extra={
                    'business_event': 'order_processing_completed',
                    'event_processing_duration_ms': processing_duration * 1000,
                    'event_result': 'success'
                }
            )
            
        except ValidationError as e:
            self.logger.warning(
                "Order validation failed",
                extra={
                    'business_event': 'order_validation_failed',
                    'event_error_type': 'validation_error',
                    'event_error_details': str(e)
                }
            )
            raise
            
        except Exception as e:
            self.logger.error(
                "Order processing failed",
                extra={
                    'business_event': 'order_processing_failed',
                    'event_error_type': type(e).__name__,
                    'event_error_message': str(e)
                },
                exc_info=True
            )
            raise
        finally:
            request_context.set({})

65.1.2 Event-Schema-Logging

Für Event-Schema-Evolution und Debugging sollten Sie Event-Strukturen systematisch loggen:

@Component
public class EventSchemaLogger {
    
    private final ObjectMapper objectMapper;
    private final Logger log = LoggerFactory.getLogger(EventSchemaLogger.class);
    
    @EventListener
    public void onAnyEvent(ApplicationEvent event) {
        if (event instanceof DomainEvent) {
            DomainEvent domainEvent = (DomainEvent) event;
            
            try {
                JsonNode eventSchema = objectMapper.valueToTree(domainEvent);
                
                log.debug("Event schema captured",
                    kv("event.type", event.getClass().getSimpleName()),
                    kv("event.schema.fields", getFieldNames(eventSchema)),
                    kv("event.schema.version", domainEvent.getVersion()),
                    kv("event.schema.hash", calculateSchemaHash(eventSchema)));
                    
            } catch (Exception e) {
                log.warn("Failed to capture event schema", e);
            }
        }
    }
    
    private List<String> getFieldNames(JsonNode node) {
        List<String> fields = new ArrayList<>();
        node.fieldNames().forEachRemaining(fields::add);
        return fields;
    }
}

65.2 Correlation across Services

65.2.1 Distributed Tracing Integration

In Event-Driven Systemen müssen Sie Traces über Service-Grenzen hinweg verfolgen:

Spring Boot mit Micrometer Tracing:

@Service
public class TracedEventProcessor {
    
    private final Tracer tracer;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(@Payload OrderPlaced order,
                                  @Header Map<String, Object> headers) {
        
        // Trace-Context aus Kafka-Headers extrahieren
        SpanBuilder spanBuilder = tracer.nextSpan()
                .name("order-processing")
                .tag("service.name", "payment-service")
                .tag("event.type", "OrderPlaced")
                .tag("order.id", order.getOrderId());
                
        // Propagation von Parent-Span falls vorhanden
        String traceId = (String) headers.get("X-Trace-Id");
        String spanId = (String) headers.get("X-Span-Id");
        
        if (traceId != null && spanId != null) {
            TraceContext.Builder contextBuilder = TraceContext.newBuilder()
                    .traceId(Long.parseUnsignedLong(traceId, 16))
                    .spanId(Long.parseUnsignedLong(spanId, 16));
            spanBuilder.setParent(contextBuilder.build());
        }
        
        Span span = spanBuilder.start();
        
        try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
            MDC.put("trace.id", span.context().traceId());
            MDC.put("span.id", span.context().spanId());
            
            processPayment(order);
            
            // Next Event mit Trace-Context publizieren
            PaymentProcessed paymentEvent = createPaymentEvent(order);
            publishWithTraceContext(paymentEvent, span.context());
            
        } catch (Exception e) {
            span.tag("error", true);
            span.tag("error.message", e.getMessage());
            throw e;
        } finally {
            span.end();
            MDC.clear();
        }
    }
    
    private void publishWithTraceContext(PaymentProcessed event, SpanContext context) {
        ProducerRecord<String, Object> record = new ProducerRecord<>(
                "payment.processed.v1", 
                event.getOrderId(), 
                event);
                
        // Trace-Context in Headers propagieren
        record.headers().add("X-Trace-Id", 
                Long.toHexString(context.traceId()).getBytes());
        record.headers().add("X-Span-Id", 
                Long.toHexString(context.spanId()).getBytes());
                
        kafkaTemplate.send(record);
    }
}

Python Correlation mit OpenTelemetry:

from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import logging
import json

class CorrelatedEventProcessor:
    def __init__(self):
        self.tracer = trace.get_tracer(__name__)
        self.logger = logging.getLogger(__name__)
        self.propagator = TraceContextTextMapPropagator()
        
    async def process_order_event(self, message_value: bytes, message_headers: dict):
        # Trace-Context aus Kafka-Headers extrahieren
        carrier = {}
        for key, value in message_headers.items():
            if isinstance(value, bytes):
                carrier[key] = value.decode('utf-8')
            else:
                carrier[key] = str(value)
                
        # Parent Context wiederherstellen
        parent_context = self.propagator.extract(carrier)
        
        with self.tracer.start_as_current_span(
            "order-event-processing",
            context=parent_context,
            attributes={
                "service.name": "payment-processor",
                "messaging.system": "kafka",
                "messaging.destination": "order.placed.v1"
            }
        ) as span:
            
            # Correlation IDs in Logging-Context
            trace_id = format(span.get_span_context().trace_id, '032x')
            span_id = format(span.get_span_context().span_id, '016x')
            
            request_context.set({
                'trace_id': trace_id,
                'span_id': span_id,
                'service': 'payment-processor'
            })
            
            try:
                order_data = json.loads(message_value)
                
                self.logger.info(
                    "Processing order event",
                    extra={
                        'business_event': 'order_received',
                        'event_order_id': order_data.get('orderId')
                    }
                )
                
                result = await self.process_payment(order_data)
                
                # Next Event mit Trace-Context publizieren
                await self.publish_payment_processed(result, span.get_span_context())
                
                span.set_attribute("processing.result", "success")
                
            except Exception as e:
                span.record_exception(e)
                span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
                self.logger.error(
                    "Order processing failed",
                    extra={'business_event': 'order_processing_failed'},
                    exc_info=True
                )
                raise
                
    async def publish_payment_processed(self, payment_result: dict, span_context):
        # Trace-Context für nächsten Service propagieren
        carrier = {}
        self.propagator.inject(carrier, context.set_value("current-span", span_context))
        
        headers = {}
        for key, value in carrier.items():
            headers[key] = value.encode('utf-8')
            
        await self.kafka_producer.send(
            topic='payment.processed.v1',
            value=json.dumps(payment_result),
            headers=headers
        )

65.2.2 Business Process Correlation

Neben technischer Tracing-Korrelation sollten Sie Business-Process-IDs verwenden:

@Component
public class BusinessProcessLogger {
    
    private final Logger log = LoggerFactory.getLogger(BusinessProcessLogger.class);
    
    @EventListener
    public void onOrderPlaced(OrderPlaced event) {
        String processId = "order-fulfillment-" + event.getOrderId();
        
        MDC.put("business.process.id", processId);
        MDC.put("business.process.stage", "order-placed");
        MDC.put("business.process.customer", event.getCustomerId());
        
        log.info("Business process started: Order fulfillment",
            kv("process.expected.stages", List.of("order-placed", "payment-processed", "inventory-reserved", "shipped")),
            kv("process.sla.minutes", 60));
    }
    
    @EventListener  
    public void onPaymentProcessed(PaymentProcessed event) {
        String processId = "order-fulfillment-" + event.getOrderId();
        
        MDC.put("business.process.id", processId);
        MDC.put("business.process.stage", "payment-processed");
        
        log.info("Business process progressed: Payment completed");
    }
    
    @EventListener
    public void onOrderShipped(OrderShipped event) {
        String processId = "order-fulfillment-" + event.getOrderId();
        
        MDC.put("business.process.id", processId);
        MDC.put("business.process.stage", "shipped");
        MDC.put("business.process.completed", true);
        
        log.info("Business process completed: Order fulfillment",
            kv("process.duration.minutes", calculateProcessDuration(event.getOrderId())));
    }
}

65.3 Log Aggregation

65.3.1 ELK Stack Integration

Logstash-Konfiguration für Event-Logs:

# logstash.conf
input {
  kafka {
    bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
    topics => ["application-logs"]
    codec => "json"
  }
}

filter {
  # Event-spezifische Felder extrahieren
  if [message] =~ /Order placed successfully/ {
    mutate {
      add_field => { "log_category" => "business_event" }
      add_field => { "business_event_type" => "order_placed" }
    }
  }
  
  # Trace-ID für Correlation
  if [context][trace_id] {
    mutate {
      add_field => { "trace_id" => "%{[context][trace_id]}" }
    }
  }
  
  # Error-Kategorisierung
  if [level] == "ERROR" {
    if [event_error_type] == "ValidationError" {
      mutate {
        add_field => { "error_category" => "business_validation" }
        add_field => { "error_severity" => "medium" }
      }
    } else {
      mutate {
        add_field => { "error_category" => "technical" }
        add_field => { "error_severity" => "high" }
      }
    }
  }
  
  # Performance-Metriken extrahieren
  if [event_processing_duration_ms] {
    ruby {
      code => "
        duration = event.get('event_processing_duration_ms')
        if duration > 5000
          event.set('performance_issue', true)
        end
      "
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "eda-logs-%{+YYYY.MM.dd}"
    template_name => "eda-logs"
    template => "/etc/logstash/templates/eda-logs-template.json"
  }
}

Elasticsearch Index Template:

{
  "index_patterns": ["eda-logs-*"],
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "timestamp": { "type": "date" },
      "level": { "type": "keyword" },
      "message": { "type": "text" },
      "service": { "type": "keyword" },
      "trace_id": { "type": "keyword" },
      "business_process_id": { "type": "keyword" },
      "event_type": { "type": "keyword" },
      "order_id": { "type": "keyword" },
      "customer_id": { "type": "keyword" },
      "event_processing_duration_ms": { "type": "float" },
      "error_category": { "type": "keyword" },
      "performance_issue": { "type": "boolean" }
    }
  }
}

65.3.2 Log Analytics und Queries

Business Process Tracking:

# Kibana Query: Incomplete Business Processes
{
  "query": {
    "bool": {
      "must": [
        { "term": { "business_process_stage": "order-placed" } },
        { "range": { "timestamp": { "gte": "now-1h" } } }
      ],
      "must_not": [
        { "term": { "business_process_completed": true } }
      ]
    }
  },
  "aggs": {
    "incomplete_processes": {
      "terms": {
        "field": "business_process_id",
        "size": 100
      }
    }
  }
}

Error Analysis Dashboard:

# Python Script für Log-Analytics
import elasticsearch
from datetime import datetime, timedelta

class EDALogAnalyzer:
    def __init__(self, es_host="localhost:9200"):
        self.es = elasticsearch.Elasticsearch([es_host])
        
    def analyze_error_patterns(self, hours_back=24):
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"term": {"level": "ERROR"}},
                        {"range": {"timestamp": {"gte": f"now-{hours_back}h"}}}
                    ]
                }
            },
            "aggs": {
                "error_by_service": {
                    "terms": {"field": "service"},
                    "aggs": {
                        "error_by_type": {
                            "terms": {"field": "error_category"}
                        }
                    }
                },
                "error_timeline": {
                    "date_histogram": {
                        "field": "timestamp",
                        "interval": "1h"
                    }
                }
            }
        }
        
        result = self.es.search(index="eda-logs-*", body=query)
        return self.format_error_analysis(result)
    
    def find_slow_processes(self, threshold_ms=5000):
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"range": {"event_processing_duration_ms": {"gte": threshold_ms}}},
                        {"range": {"timestamp": {"gte": "now-1h"}}}
                    ]
                }
            },
            "sort": [{"event_processing_duration_ms": {"order": "desc"}}],
            "size": 50
        }
        
        result = self.es.search(index="eda-logs-*", body=query)
        return [hit["_source"] for hit in result["hits"]["hits"]]
        
    def trace_business_process(self, process_id):
        query = {
            "query": {
                "term": {"business_process_id": process_id}
            },
            "sort": [{"timestamp": {"order": "asc"}}]
        }
        
        result = self.es.search(index="eda-logs-*", body=query)
        return self.build_process_timeline(result)

Diese Logging-Strategien ermöglichen es Ihnen, auch in komplexen Event-Driven Architekturen den Überblick zu behalten und Probleme effizient zu diagnostizieren. Der Schlüssel liegt in der strukturierten Erfassung von Kontext-Informationen und der systematischen Korrelation von Events über Service-Grenzen hinweg.