Event-Driven Architecture stellt besondere Anforderungen an das Logging. Anders als bei klassischen Request-Response-Systemen verlaufen Verarbeitungsflows über mehrere Services und Events, die zeitlich versetzt auftreten können. Ein effektives Logging-System muss diese asynchronen, verteilten Abläufe nachvollziehbar machen.
Structured Logging ist in EDA unverzichtbar, da Events und Services kontextreiche Informationen enthalten, die strukturiert erfasst werden müssen:
Spring Boot Logback-Konfiguration:
<!-- logback-spring.xml -->
<configuration>
<springProfile name="!local">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp/>
<logLevel/>
<loggerName/>
<message/>
<mdc/>
<arguments/>
<stackTrace/>
</providers>
</encoder>
</appender>
</springProfile>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>Event-spezifisches Logging in Java:
@Service
public class OrderEventLogger {
private static final Logger log = LoggerFactory.getLogger(OrderEventLogger.class);
@EventListener
public void onOrderPlaced(OrderPlaced event) {
MDC.put("event.type", "OrderPlaced");
MDC.put("event.id", event.getEventId());
MDC.put("order.id", event.getOrderId());
MDC.put("customer.id", event.getCustomerId());
MDC.put("order.amount", event.getTotalAmount().toString());
log.info("Order placed successfully",
kv("business.context", "order-processing"),
kv("order.items.count", event.getItems().size()),
kv("order.payment.method", event.getPaymentMethod()));
MDC.clear();
}
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(@Payload OrderPlaced order,
@Header Map<String, Object> headers) {
// Kafka-spezifische Metadaten loggen
MDC.put("kafka.topic", (String) headers.get("kafka_receivedTopic"));
MDC.put("kafka.partition", headers.get("kafka_receivedPartitionId").toString());
MDC.put("kafka.offset", headers.get("kafka_offset").toString());
try {
processOrder(order);
log.info("Order processing completed",
kv("processing.duration.ms", getDurationMs()),
kv("processing.result", "success"));
} catch (Exception e) {
log.error("Order processing failed",
kv("error.type", e.getClass().getSimpleName()),
kv("error.message", e.getMessage()),
e);
} finally {
MDC.clear();
}
}
}Python Structured Logging:
import logging
import json
import time
from typing import Dict, Any
from contextvars import ContextVar
# Context für Request/Event Correlation
request_context: ContextVar[Dict[str, Any]] = ContextVar('request_context', default={})
class StructuredFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Context-Informationen hinzufügen
context = request_context.get({})
if context:
log_entry['context'] = context
# Exception-Informationen
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
# Custom Attribute aus LogRecord
for key, value in record.__dict__.items():
if key.startswith('event_') or key.startswith('business_'):
log_entry[key] = value
return json.dumps(log_entry, default=str)
class OrderProcessor:
def __init__(self):
self.logger = logging.getLogger(__name__)
async def process_order_event(self, order_data: dict, kafka_metadata: dict):
# Event-Context setzen
context = {
'event_type': 'OrderPlaced',
'event_id': order_data.get('eventId'),
'order_id': order_data.get('orderId'),
'customer_id': order_data.get('customerId'),
'kafka_topic': kafka_metadata.get('topic'),
'kafka_partition': kafka_metadata.get('partition'),
'kafka_offset': kafka_metadata.get('offset')
}
request_context.set(context)
start_time = time.time()
try:
self.logger.info(
"Starting order processing",
extra={
'business_event': 'order_processing_started',
'event_order_amount': order_data.get('totalAmount'),
'event_items_count': len(order_data.get('items', []))
}
)
await self.validate_order(order_data)
await self.reserve_inventory(order_data)
await self.process_payment(order_data)
processing_duration = time.time() - start_time
self.logger.info(
"Order processing completed successfully",
extra={
'business_event': 'order_processing_completed',
'event_processing_duration_ms': processing_duration * 1000,
'event_result': 'success'
}
)
except ValidationError as e:
self.logger.warning(
"Order validation failed",
extra={
'business_event': 'order_validation_failed',
'event_error_type': 'validation_error',
'event_error_details': str(e)
}
)
raise
except Exception as e:
self.logger.error(
"Order processing failed",
extra={
'business_event': 'order_processing_failed',
'event_error_type': type(e).__name__,
'event_error_message': str(e)
},
exc_info=True
)
raise
finally:
request_context.set({})Für Event-Schema-Evolution und Debugging sollten Sie Event-Strukturen systematisch loggen:
@Component
public class EventSchemaLogger {
private final ObjectMapper objectMapper;
private final Logger log = LoggerFactory.getLogger(EventSchemaLogger.class);
@EventListener
public void onAnyEvent(ApplicationEvent event) {
if (event instanceof DomainEvent) {
DomainEvent domainEvent = (DomainEvent) event;
try {
JsonNode eventSchema = objectMapper.valueToTree(domainEvent);
log.debug("Event schema captured",
kv("event.type", event.getClass().getSimpleName()),
kv("event.schema.fields", getFieldNames(eventSchema)),
kv("event.schema.version", domainEvent.getVersion()),
kv("event.schema.hash", calculateSchemaHash(eventSchema)));
} catch (Exception e) {
log.warn("Failed to capture event schema", e);
}
}
}
private List<String> getFieldNames(JsonNode node) {
List<String> fields = new ArrayList<>();
node.fieldNames().forEachRemaining(fields::add);
return fields;
}
}In Event-Driven Systemen müssen Sie Traces über Service-Grenzen hinweg verfolgen:
Spring Boot mit Micrometer Tracing:
@Service
public class TracedEventProcessor {
private final Tracer tracer;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(@Payload OrderPlaced order,
@Header Map<String, Object> headers) {
// Trace-Context aus Kafka-Headers extrahieren
SpanBuilder spanBuilder = tracer.nextSpan()
.name("order-processing")
.tag("service.name", "payment-service")
.tag("event.type", "OrderPlaced")
.tag("order.id", order.getOrderId());
// Propagation von Parent-Span falls vorhanden
String traceId = (String) headers.get("X-Trace-Id");
String spanId = (String) headers.get("X-Span-Id");
if (traceId != null && spanId != null) {
TraceContext.Builder contextBuilder = TraceContext.newBuilder()
.traceId(Long.parseUnsignedLong(traceId, 16))
.spanId(Long.parseUnsignedLong(spanId, 16));
spanBuilder.setParent(contextBuilder.build());
}
Span span = spanBuilder.start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
MDC.put("trace.id", span.context().traceId());
MDC.put("span.id", span.context().spanId());
processPayment(order);
// Next Event mit Trace-Context publizieren
PaymentProcessed paymentEvent = createPaymentEvent(order);
publishWithTraceContext(paymentEvent, span.context());
} catch (Exception e) {
span.tag("error", true);
span.tag("error.message", e.getMessage());
throw e;
} finally {
span.end();
MDC.clear();
}
}
private void publishWithTraceContext(PaymentProcessed event, SpanContext context) {
ProducerRecord<String, Object> record = new ProducerRecord<>(
"payment.processed.v1",
event.getOrderId(),
event);
// Trace-Context in Headers propagieren
record.headers().add("X-Trace-Id",
Long.toHexString(context.traceId()).getBytes());
record.headers().add("X-Span-Id",
Long.toHexString(context.spanId()).getBytes());
kafkaTemplate.send(record);
}
}Python Correlation mit OpenTelemetry:
from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import logging
import json
class CorrelatedEventProcessor:
def __init__(self):
self.tracer = trace.get_tracer(__name__)
self.logger = logging.getLogger(__name__)
self.propagator = TraceContextTextMapPropagator()
async def process_order_event(self, message_value: bytes, message_headers: dict):
# Trace-Context aus Kafka-Headers extrahieren
carrier = {}
for key, value in message_headers.items():
if isinstance(value, bytes):
carrier[key] = value.decode('utf-8')
else:
carrier[key] = str(value)
# Parent Context wiederherstellen
parent_context = self.propagator.extract(carrier)
with self.tracer.start_as_current_span(
"order-event-processing",
context=parent_context,
attributes={
"service.name": "payment-processor",
"messaging.system": "kafka",
"messaging.destination": "order.placed.v1"
}
) as span:
# Correlation IDs in Logging-Context
trace_id = format(span.get_span_context().trace_id, '032x')
span_id = format(span.get_span_context().span_id, '016x')
request_context.set({
'trace_id': trace_id,
'span_id': span_id,
'service': 'payment-processor'
})
try:
order_data = json.loads(message_value)
self.logger.info(
"Processing order event",
extra={
'business_event': 'order_received',
'event_order_id': order_data.get('orderId')
}
)
result = await self.process_payment(order_data)
# Next Event mit Trace-Context publizieren
await self.publish_payment_processed(result, span.get_span_context())
span.set_attribute("processing.result", "success")
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
self.logger.error(
"Order processing failed",
extra={'business_event': 'order_processing_failed'},
exc_info=True
)
raise
async def publish_payment_processed(self, payment_result: dict, span_context):
# Trace-Context für nächsten Service propagieren
carrier = {}
self.propagator.inject(carrier, context.set_value("current-span", span_context))
headers = {}
for key, value in carrier.items():
headers[key] = value.encode('utf-8')
await self.kafka_producer.send(
topic='payment.processed.v1',
value=json.dumps(payment_result),
headers=headers
)Neben technischer Tracing-Korrelation sollten Sie Business-Process-IDs verwenden:
@Component
public class BusinessProcessLogger {
private final Logger log = LoggerFactory.getLogger(BusinessProcessLogger.class);
@EventListener
public void onOrderPlaced(OrderPlaced event) {
String processId = "order-fulfillment-" + event.getOrderId();
MDC.put("business.process.id", processId);
MDC.put("business.process.stage", "order-placed");
MDC.put("business.process.customer", event.getCustomerId());
log.info("Business process started: Order fulfillment",
kv("process.expected.stages", List.of("order-placed", "payment-processed", "inventory-reserved", "shipped")),
kv("process.sla.minutes", 60));
}
@EventListener
public void onPaymentProcessed(PaymentProcessed event) {
String processId = "order-fulfillment-" + event.getOrderId();
MDC.put("business.process.id", processId);
MDC.put("business.process.stage", "payment-processed");
log.info("Business process progressed: Payment completed");
}
@EventListener
public void onOrderShipped(OrderShipped event) {
String processId = "order-fulfillment-" + event.getOrderId();
MDC.put("business.process.id", processId);
MDC.put("business.process.stage", "shipped");
MDC.put("business.process.completed", true);
log.info("Business process completed: Order fulfillment",
kv("process.duration.minutes", calculateProcessDuration(event.getOrderId())));
}
}Logstash-Konfiguration für Event-Logs:
# logstash.conf
input {
kafka {
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
topics => ["application-logs"]
codec => "json"
}
}
filter {
# Event-spezifische Felder extrahieren
if [message] =~ /Order placed successfully/ {
mutate {
add_field => { "log_category" => "business_event" }
add_field => { "business_event_type" => "order_placed" }
}
}
# Trace-ID für Correlation
if [context][trace_id] {
mutate {
add_field => { "trace_id" => "%{[context][trace_id]}" }
}
}
# Error-Kategorisierung
if [level] == "ERROR" {
if [event_error_type] == "ValidationError" {
mutate {
add_field => { "error_category" => "business_validation" }
add_field => { "error_severity" => "medium" }
}
} else {
mutate {
add_field => { "error_category" => "technical" }
add_field => { "error_severity" => "high" }
}
}
}
# Performance-Metriken extrahieren
if [event_processing_duration_ms] {
ruby {
code => "
duration = event.get('event_processing_duration_ms')
if duration > 5000
event.set('performance_issue', true)
end
"
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "eda-logs-%{+YYYY.MM.dd}"
template_name => "eda-logs"
template => "/etc/logstash/templates/eda-logs-template.json"
}
}Elasticsearch Index Template:
{
"index_patterns": ["eda-logs-*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"level": { "type": "keyword" },
"message": { "type": "text" },
"service": { "type": "keyword" },
"trace_id": { "type": "keyword" },
"business_process_id": { "type": "keyword" },
"event_type": { "type": "keyword" },
"order_id": { "type": "keyword" },
"customer_id": { "type": "keyword" },
"event_processing_duration_ms": { "type": "float" },
"error_category": { "type": "keyword" },
"performance_issue": { "type": "boolean" }
}
}
}Business Process Tracking:
# Kibana Query: Incomplete Business Processes
{
"query": {
"bool": {
"must": [
{ "term": { "business_process_stage": "order-placed" } },
{ "range": { "timestamp": { "gte": "now-1h" } } }
],
"must_not": [
{ "term": { "business_process_completed": true } }
]
}
},
"aggs": {
"incomplete_processes": {
"terms": {
"field": "business_process_id",
"size": 100
}
}
}
}Error Analysis Dashboard:
# Python Script für Log-Analytics
import elasticsearch
from datetime import datetime, timedelta
class EDALogAnalyzer:
def __init__(self, es_host="localhost:9200"):
self.es = elasticsearch.Elasticsearch([es_host])
def analyze_error_patterns(self, hours_back=24):
query = {
"query": {
"bool": {
"must": [
{"term": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": f"now-{hours_back}h"}}}
]
}
},
"aggs": {
"error_by_service": {
"terms": {"field": "service"},
"aggs": {
"error_by_type": {
"terms": {"field": "error_category"}
}
}
},
"error_timeline": {
"date_histogram": {
"field": "timestamp",
"interval": "1h"
}
}
}
}
result = self.es.search(index="eda-logs-*", body=query)
return self.format_error_analysis(result)
def find_slow_processes(self, threshold_ms=5000):
query = {
"query": {
"bool": {
"must": [
{"range": {"event_processing_duration_ms": {"gte": threshold_ms}}},
{"range": {"timestamp": {"gte": "now-1h"}}}
]
}
},
"sort": [{"event_processing_duration_ms": {"order": "desc"}}],
"size": 50
}
result = self.es.search(index="eda-logs-*", body=query)
return [hit["_source"] for hit in result["hits"]["hits"]]
def trace_business_process(self, process_id):
query = {
"query": {
"term": {"business_process_id": process_id}
},
"sort": [{"timestamp": {"order": "asc"}}]
}
result = self.es.search(index="eda-logs-*", body=query)
return self.build_process_timeline(result)Diese Logging-Strategien ermöglichen es Ihnen, auch in komplexen Event-Driven Architekturen den Überblick zu behalten und Probleme effizient zu diagnostizieren. Der Schlüssel liegt in der strukturierten Erfassung von Kontext-Informationen und der systematischen Korrelation von Events über Service-Grenzen hinweg.