Event-Driven Architecture bringt besondere Herausforderungen für das Monitoring mit sich. Anders als bei synchronen REST-APIs gibt es keine direkten Request-Response-Zyklen zu überwachen. Stattdessen müssen Sie den Fluss von Events durch verteilte Services verfolgen und dabei asynchrone Verarbeitungszeiten und Warteschlangen im Blick behalten.
Consumer Lag ist die wichtigste Metrik in Event-Driven Systemen. Sie zeigt, wie weit ein Consumer hinter der aktuellen Event-Produktion zurückliegt:
@Component
public class ConsumerLagMetrics {
private final MeterRegistry meterRegistry;
@EventListener
public void onKafkaConsumerMetrics(KafkaConsumerMetrics metrics) {
metrics.getConsumerGroups().forEach((groupId, partitions) -> {
partitions.forEach((partition, lag) -> {
Gauge.builder("kafka.consumer.lag")
.tags("group", groupId,
"topic", partition.getTopic(),
"partition", String.valueOf(partition.getPartition()))
.register(meterRegistry)
.set(lag);
});
});
}
}Error Rate misst, wie viele Events fehlschlagen oder an Dead Letter Topics landen:
import logging
from prometheus_client import Counter, Histogram
import time
# Metriken definieren
events_processed = Counter('events_processed_total',
'Total processed events',
['service', 'topic', 'status'])
processing_duration = Histogram('event_processing_duration_seconds',
'Time spent processing events',
['service', 'topic'])
class OrderProcessor:
def __init__(self):
self.logger = logging.getLogger(__name__)
def process_order(self, order_event):
start_time = time.time()
try:
# Business Logic
self.validate_order(order_event)
self.reserve_inventory(order_event)
self.create_payment(order_event)
# Success Metrics
events_processed.labels(
service='order-processor',
topic='order.placed.v1',
status='success'
).inc()
except ValidationError as e:
self.logger.error(f"Validation failed: {e}")
events_processed.labels(
service='order-processor',
topic='order.placed.v1',
status='validation_error'
).inc()
raise
except Exception as e:
self.logger.error(f"Processing failed: {e}")
events_processed.labels(
service='order-processor',
topic='order.placed.v1',
status='error'
).inc()
raise
finally:
processing_duration.labels(
service='order-processor',
topic='order.placed.v1'
).observe(time.time() - start_time)Processing Time zeigt, wie lange einzelne Events oder Batches benötigen:
Neben technischen Metriken sollten Sie Business-KPIs aus Events ableiten:
| Business Metrik | Event-basierte Berechnung | Monitoring-Relevanz |
|---|---|---|
| Order-to-Payment Time | OrderPlaced → PaymentProcessed | Customer Experience |
| Inventory Turnover | InventoryReserved → InventoryShipped | Business Efficiency |
| Failed Payment Rate | PaymentFailed / PaymentAttempted | Revenue Impact |
| Service Recovery Time | ServiceDown → ServiceUp | System Reliability |
Business Metrics in Spring Boot:
@Service
public class BusinessMetrics {
private final Timer orderToPaymentTimer;
private final Counter failedPayments;
public BusinessMetrics(MeterRegistry registry) {
this.orderToPaymentTimer = Timer.builder("business.order.payment.duration")
.description("Time from order to payment")
.register(registry);
this.failedPayments = Counter.builder("business.payments.failed")
.description("Failed payment attempts")
.register(registry);
}
@EventListener
public void onOrderPlaced(OrderPlaced event) {
// Start Timer für diese Order
Timer.Sample sample = Timer.start();
orderTimers.put(event.getOrderId(), sample);
}
@EventListener
public void onPaymentProcessed(PaymentProcessed event) {
Timer.Sample sample = orderTimers.remove(event.getOrderId());
if (sample != null) {
sample.stop(orderToPaymentTimer);
}
}
@EventListener
public void onPaymentFailed(PaymentFailed event) {
failedPayments.increment();
}
}Throughput-Monitoring:
@Component
public class ThroughputMonitor {
private final MeterRegistry registry;
private final Counter eventsReceived;
private final Timer eventProcessingTime;
public ThroughputMonitor(MeterRegistry registry) {
this.registry = registry;
this.eventsReceived = Counter.builder("kafka.events.received")
.register(registry);
this.eventProcessingTime = Timer.builder("kafka.events.processing.time")
.register(registry);
}
@KafkaListener(topics = "#{@topicConfig.orderPlacedTopic}")
public void handleEvent(@Payload String eventData) {
eventsReceived.increment();
Timer.Sample sample = Timer.start(registry);
try {
processEvent(eventData);
} finally {
sample.stop(eventProcessingTime);
}
}
}Consumer Lag ist oft der erste Indikator für Probleme. Definieren Sie abgestufte Alerts:
| Alert Level | Lag Threshold | Response Time | Action |
|---|---|---|---|
| Warning | > 1000 Messages | 15 Minuten | Monitoring Team |
| Critical | > 10000 Messages | 5 Minuten | On-Call Engineer |
| Emergency | > 100000 Messages | Sofort | Escalation |
Prometheus Alert Rules:
# prometheus-rules.yml
groups:
- name: kafka.consumer.lag
rules:
- alert: ConsumerLagHigh
expr: kafka_consumer_lag > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer {{ $labels.group }} lagging behind"
description: "Consumer group {{ $labels.group }} for topic {{ $labels.topic }} has lag of {{ $value }} messages"
- alert: ConsumerLagCritical
expr: kafka_consumer_lag > 10000
for: 2m
labels:
severity: critical
annotations:
summary: "Critical consumer lag detected"
description: "Consumer group {{ $labels.group }} has critical lag of {{ $value }} messages"Adaptive Error Rate Alerts:
@Component
public class AdaptiveAlertingService {
private final Map<String, ErrorRateCalculator> serviceErrorRates = new HashMap<>();
@EventListener
public void onEventProcessingResult(EventProcessingResult result) {
String serviceName = result.getServiceName();
ErrorRateCalculator calculator = serviceErrorRates.computeIfAbsent(
serviceName,
k -> new ErrorRateCalculator(Duration.ofMinutes(5))
);
calculator.recordResult(result.isSuccess());
double currentErrorRate = calculator.getCurrentErrorRate();
double baselineErrorRate = calculator.getBaselineErrorRate();
// Alert bei signifikantem Anstieg über Baseline
if (currentErrorRate > baselineErrorRate * 2.0 && currentErrorRate > 0.05) {
alertingService.sendAlert(AlertLevel.WARNING,
"Error rate spike detected for " + serviceName +
": " + String.format("%.2f%%", currentErrorRate * 100));
}
}
}
class ErrorRateCalculator {
private final Duration window;
private final Queue<ProcessingResult> results = new LinkedList<>();
public void recordResult(boolean success) {
results.offer(new ProcessingResult(success, Instant.now()));
// Alte Ergebnisse entfernen
while (!results.isEmpty() &&
results.peek().getTimestamp().isBefore(Instant.now().minus(window))) {
results.poll();
}
}
public double getCurrentErrorRate() {
if (results.isEmpty()) return 0.0;
long failures = results.stream()
.mapToLong(r -> r.isSuccess() ? 0 : 1)
.sum();
return (double) failures / results.size();
}
}SLA-basierte Processing Time Alerts:
import asyncio
from dataclasses import dataclass
from typing import Dict, List
import time
@dataclass
class ProcessingSLA:
service_name: str
p95_threshold_ms: float
p99_threshold_ms: float
error_rate_threshold: float
class ProcessingTimeMonitor:
def __init__(self):
self.processing_times: Dict[str, List[float]] = {}
self.slas = {
'order-processor': ProcessingSLA('order-processor', 500.0, 1000.0, 0.05),
'payment-processor': ProcessingSLA('payment-processor', 2000.0, 5000.0, 0.01),
'inventory-service': ProcessingSLA('inventory-service', 200.0, 500.0, 0.02)
}
def record_processing_time(self, service: str, duration_ms: float):
if service not in self.processing_times:
self.processing_times[service] = []
self.processing_times[service].append(duration_ms)
# Sliding Window - letzte 1000 Measurements
if len(self.processing_times[service]) > 1000:
self.processing_times[service] = self.processing_times[service][-1000:]
self.check_sla_violations(service)
def check_sla_violations(self, service: str):
if service not in self.slas or len(self.processing_times[service]) < 20:
return
sla = self.slas[service]
times = sorted(self.processing_times[service])
p95_time = times[int(len(times) * 0.95)]
p99_time = times[int(len(times) * 0.99)]
violations = []
if p95_time > sla.p95_threshold_ms:
violations.append(f"P95 processing time ({p95_time:.1f}ms) exceeds SLA ({sla.p95_threshold_ms}ms)")
if p99_time > sla.p99_threshold_ms:
violations.append(f"P99 processing time ({p99_time:.1f}ms) exceeds SLA ({sla.p99_threshold_ms}ms)")
if violations:
self.send_alert(service, violations)
def send_alert(self, service: str, violations: List[str]):
alert_message = f"SLA violation for {service}: {'; '.join(violations)}"
print(f"ALERT: {alert_message}") # In Praxis: Slack, PagerDuty, etc.Effective EDA-Dashboards folgen einer Hierarchie vom Überblick zu Details:
Level 1: System Overview Dashboard - Gesamtdurchsatz aller Services - System-weite Error Rate - Maximaler Consumer Lag - Active Alerts
Level 2: Service-spezifische Dashboards - Service-Throughput und Latenz - Topic-spezifische Metriken - Error Rates nach Event-Typ - Resource Utilization
Level 3: Troubleshooting Dashboards - Detailed Error Traces - Event Flow Visualization - Consumer Group Details - Partition-level Metrics
System Overview Panel:
{
"dashboard": {
"title": "EDA System Overview",
"panels": [
{
"title": "Event Throughput",
"type": "stat",
"targets": [
{
"expr": "sum(rate(kafka_events_received_total[5m]))",
"legendFormat": "Events/sec"
}
]
},
{
"title": "Consumer Lag Heatmap",
"type": "heatmap",
"targets": [
{
"expr": "kafka_consumer_lag",
"legendFormat": "{{group}}-{{topic}}"
}
]
},
{
"title": "Error Rate by Service",
"type": "graph",
"targets": [
{
"expr": "rate(events_processed_total{status=\"error\"}[5m]) / rate(events_processed_total[5m])",
"legendFormat": "{{service}}"
}
]
}
]
}
}End-to-End Trace Dashboard:
@Component
public class EventFlowTracer {
@EventListener
public void onOrderPlaced(OrderPlaced event) {
TraceContext trace = TraceContext.newBuilder()
.traceId(generateTraceId())
.spanId(generateSpanId())
.sampled(true)
.build();
Span span = tracer.nextSpan(trace)
.name("order-processing-flow")
.tag("order.id", event.getOrderId())
.tag("customer.id", event.getCustomerId())
.start();
// Event-Metadaten für Flow-Tracking
event.setTraceId(trace.traceId());
event.setFlowStartTime(Instant.now());
}
@EventListener
public void onPaymentProcessed(PaymentProcessed event) {
if (event.getTraceId() != null) {
Span span = tracer.nextSpan()
.name("payment-completed")
.tag("trace.id", event.getTraceId())
.tag("payment.amount", event.getAmount().toString())
.start();
span.end();
}
}
}Service Health Overview:
class ServiceHealthDashboard:
def __init__(self):
self.health_indicators = {
'consumer_lag': self.check_consumer_lag,
'error_rate': self.check_error_rate,
'processing_time': self.check_processing_time,
'resource_usage': self.check_resource_usage
}
def get_service_health(self, service_name: str) -> Dict[str, str]:
health_status = {}
for indicator_name, check_function in self.health_indicators.items():
try:
status = check_function(service_name)
health_status[indicator_name] = status
except Exception as e:
health_status[indicator_name] = 'UNKNOWN'
overall_health = self.calculate_overall_health(health_status)
health_status['overall'] = overall_health
return health_status
def calculate_overall_health(self, indicators: Dict[str, str]) -> str:
if any(status == 'CRITICAL' for status in indicators.values()):
return 'CRITICAL'
elif any(status == 'WARNING' for status in indicators.values()):
return 'WARNING'
elif all(status == 'HEALTHY' for status in indicators.values()):
return 'HEALTHY'
else:
return 'UNKNOWN'Real-time Event Monitoring:
@RestController
public class EventMonitoringController {
@GetMapping("/api/monitoring/event-flow")
public Flux<EventFlowStatus> getEventFlow() {
return Flux.interval(Duration.ofSeconds(5))
.map(tick -> {
EventFlowStatus status = new EventFlowStatus();
status.setTimestamp(Instant.now());
status.setActiveConsumers(getActiveConsumerCount());
status.setTotalLag(getTotalConsumerLag());
status.setThroughput(getCurrentThroughput());
status.setErrorRate(getCurrentErrorRate());
return status;
})
.doOnNext(status -> log.debug("Event flow status: {}", status));
}
}Diese Monitoring-Strategien geben Ihnen die Transparenz, die Sie für den erfolgreichen Betrieb von Event-Driven Architecture benötigen. Der Schlüssel liegt darin, sowohl technische als auch Business-Metriken zu verfolgen und dabei die Besonderheiten asynchroner Systeme zu berücksichtigen.