64 Monitoring und Metriken (Consumer Lag, Error Rate, Processing Time)

Event-Driven Architecture bringt besondere Herausforderungen für das Monitoring mit sich. Anders als bei synchronen REST-APIs gibt es keine direkten Request-Response-Zyklen zu überwachen. Stattdessen müssen Sie den Fluss von Events durch verteilte Services verfolgen und dabei asynchrone Verarbeitungszeiten und Warteschlangen im Blick behalten.

64.1 Key Performance Indicators

64.1.1 Die drei kritischen Metriken für EDA

Consumer Lag ist die wichtigste Metrik in Event-Driven Systemen. Sie zeigt, wie weit ein Consumer hinter der aktuellen Event-Produktion zurückliegt:

@Component
public class ConsumerLagMetrics {
    
    private final MeterRegistry meterRegistry;
    
    @EventListener
    public void onKafkaConsumerMetrics(KafkaConsumerMetrics metrics) {
        metrics.getConsumerGroups().forEach((groupId, partitions) -> {
            partitions.forEach((partition, lag) -> {
                Gauge.builder("kafka.consumer.lag")
                    .tags("group", groupId, 
                          "topic", partition.getTopic(),
                          "partition", String.valueOf(partition.getPartition()))
                    .register(meterRegistry)
                    .set(lag);
            });
        });
    }
}

Error Rate misst, wie viele Events fehlschlagen oder an Dead Letter Topics landen:

import logging
from prometheus_client import Counter, Histogram
import time

# Metriken definieren
events_processed = Counter('events_processed_total', 
                          'Total processed events', 
                          ['service', 'topic', 'status'])

processing_duration = Histogram('event_processing_duration_seconds',
                              'Time spent processing events',
                              ['service', 'topic'])

class OrderProcessor:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def process_order(self, order_event):
        start_time = time.time()
        
        try:
            # Business Logic
            self.validate_order(order_event)
            self.reserve_inventory(order_event)
            self.create_payment(order_event)
            
            # Success Metrics
            events_processed.labels(
                service='order-processor',
                topic='order.placed.v1', 
                status='success'
            ).inc()
            
        except ValidationError as e:
            self.logger.error(f"Validation failed: {e}")
            events_processed.labels(
                service='order-processor',
                topic='order.placed.v1',
                status='validation_error'
            ).inc()
            raise
            
        except Exception as e:
            self.logger.error(f"Processing failed: {e}")
            events_processed.labels(
                service='order-processor',
                topic='order.placed.v1',
                status='error'
            ).inc()
            raise
            
        finally:
            processing_duration.labels(
                service='order-processor',
                topic='order.placed.v1'
            ).observe(time.time() - start_time)

Processing Time zeigt, wie lange einzelne Events oder Batches benötigen:

64.1.2 Business-spezifische KPIs

Neben technischen Metriken sollten Sie Business-KPIs aus Events ableiten:

Business Metrik Event-basierte Berechnung Monitoring-Relevanz
Order-to-Payment Time OrderPlaced → PaymentProcessed Customer Experience
Inventory Turnover InventoryReserved → InventoryShipped Business Efficiency
Failed Payment Rate PaymentFailed / PaymentAttempted Revenue Impact
Service Recovery Time ServiceDown → ServiceUp System Reliability

Business Metrics in Spring Boot:

@Service
public class BusinessMetrics {
    
    private final Timer orderToPaymentTimer;
    private final Counter failedPayments;
    
    public BusinessMetrics(MeterRegistry registry) {
        this.orderToPaymentTimer = Timer.builder("business.order.payment.duration")
                .description("Time from order to payment")
                .register(registry);
                
        this.failedPayments = Counter.builder("business.payments.failed")
                .description("Failed payment attempts")
                .register(registry);
    }
    
    @EventListener
    public void onOrderPlaced(OrderPlaced event) {
        // Start Timer für diese Order
        Timer.Sample sample = Timer.start();
        orderTimers.put(event.getOrderId(), sample);
    }
    
    @EventListener  
    public void onPaymentProcessed(PaymentProcessed event) {
        Timer.Sample sample = orderTimers.remove(event.getOrderId());
        if (sample != null) {
            sample.stop(orderToPaymentTimer);
        }
    }
    
    @EventListener
    public void onPaymentFailed(PaymentFailed event) {
        failedPayments.increment();
    }
}

64.1.3 Durchsatz und Latenz-Monitoring

Throughput-Monitoring:

@Component
public class ThroughputMonitor {
    
    private final MeterRegistry registry;
    private final Counter eventsReceived;
    private final Timer eventProcessingTime;
    
    public ThroughputMonitor(MeterRegistry registry) {
        this.registry = registry;
        this.eventsReceived = Counter.builder("kafka.events.received")
                .register(registry);
        this.eventProcessingTime = Timer.builder("kafka.events.processing.time")
                .register(registry);
    }
    
    @KafkaListener(topics = "#{@topicConfig.orderPlacedTopic}")
    public void handleEvent(@Payload String eventData) {
        eventsReceived.increment();
        
        Timer.Sample sample = Timer.start(registry);
        try {
            processEvent(eventData);
        } finally {
            sample.stop(eventProcessingTime);
        }
    }
}

64.2 Alerting Strategies

64.2.1 Lag-basierte Alerts

Consumer Lag ist oft der erste Indikator für Probleme. Definieren Sie abgestufte Alerts:

Alert Level Lag Threshold Response Time Action
Warning > 1000 Messages 15 Minuten Monitoring Team
Critical > 10000 Messages 5 Minuten On-Call Engineer
Emergency > 100000 Messages Sofort Escalation

Prometheus Alert Rules:

# prometheus-rules.yml
groups:
- name: kafka.consumer.lag
  rules:
  - alert: ConsumerLagHigh
    expr: kafka_consumer_lag > 1000
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Consumer {{ $labels.group }} lagging behind"
      description: "Consumer group {{ $labels.group }} for topic {{ $labels.topic }} has lag of {{ $value }} messages"
      
  - alert: ConsumerLagCritical  
    expr: kafka_consumer_lag > 10000
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "Critical consumer lag detected"
      description: "Consumer group {{ $labels.group }} has critical lag of {{ $value }} messages"

64.2.2 Error Rate Alerting

Adaptive Error Rate Alerts:

@Component
public class AdaptiveAlertingService {
    
    private final Map<String, ErrorRateCalculator> serviceErrorRates = new HashMap<>();
    
    @EventListener
    public void onEventProcessingResult(EventProcessingResult result) {
        String serviceName = result.getServiceName();
        
        ErrorRateCalculator calculator = serviceErrorRates.computeIfAbsent(
            serviceName, 
            k -> new ErrorRateCalculator(Duration.ofMinutes(5))
        );
        
        calculator.recordResult(result.isSuccess());
        
        double currentErrorRate = calculator.getCurrentErrorRate();
        double baselineErrorRate = calculator.getBaselineErrorRate();
        
        // Alert bei signifikantem Anstieg über Baseline
        if (currentErrorRate > baselineErrorRate * 2.0 && currentErrorRate > 0.05) {
            alertingService.sendAlert(AlertLevel.WARNING, 
                "Error rate spike detected for " + serviceName + 
                ": " + String.format("%.2f%%", currentErrorRate * 100));
        }
    }
}

class ErrorRateCalculator {
    private final Duration window;
    private final Queue<ProcessingResult> results = new LinkedList<>();
    
    public void recordResult(boolean success) {
        results.offer(new ProcessingResult(success, Instant.now()));
        // Alte Ergebnisse entfernen
        while (!results.isEmpty() && 
               results.peek().getTimestamp().isBefore(Instant.now().minus(window))) {
            results.poll();
        }
    }
    
    public double getCurrentErrorRate() {
        if (results.isEmpty()) return 0.0;
        
        long failures = results.stream()
                .mapToLong(r -> r.isSuccess() ? 0 : 1)
                .sum();
        return (double) failures / results.size();
    }
}

64.2.3 Processing Time Alerts

SLA-basierte Processing Time Alerts:

import asyncio
from dataclasses import dataclass
from typing import Dict, List
import time

@dataclass
class ProcessingSLA:
    service_name: str
    p95_threshold_ms: float
    p99_threshold_ms: float
    error_rate_threshold: float

class ProcessingTimeMonitor:
    def __init__(self):
        self.processing_times: Dict[str, List[float]] = {}
        self.slas = {
            'order-processor': ProcessingSLA('order-processor', 500.0, 1000.0, 0.05),
            'payment-processor': ProcessingSLA('payment-processor', 2000.0, 5000.0, 0.01),
            'inventory-service': ProcessingSLA('inventory-service', 200.0, 500.0, 0.02)
        }
    
    def record_processing_time(self, service: str, duration_ms: float):
        if service not in self.processing_times:
            self.processing_times[service] = []
            
        self.processing_times[service].append(duration_ms)
        
        # Sliding Window - letzte 1000 Measurements
        if len(self.processing_times[service]) > 1000:
            self.processing_times[service] = self.processing_times[service][-1000:]
            
        self.check_sla_violations(service)
    
    def check_sla_violations(self, service: str):
        if service not in self.slas or len(self.processing_times[service]) < 20:
            return
            
        sla = self.slas[service]
        times = sorted(self.processing_times[service])
        
        p95_time = times[int(len(times) * 0.95)]
        p99_time = times[int(len(times) * 0.99)]
        
        violations = []
        if p95_time > sla.p95_threshold_ms:
            violations.append(f"P95 processing time ({p95_time:.1f}ms) exceeds SLA ({sla.p95_threshold_ms}ms)")
            
        if p99_time > sla.p99_threshold_ms:
            violations.append(f"P99 processing time ({p99_time:.1f}ms) exceeds SLA ({sla.p99_threshold_ms}ms)")
            
        if violations:
            self.send_alert(service, violations)
    
    def send_alert(self, service: str, violations: List[str]):
        alert_message = f"SLA violation for {service}: {'; '.join(violations)}"
        print(f"ALERT: {alert_message}")  # In Praxis: Slack, PagerDuty, etc.

64.3 Dashboard Design

64.3.1 Hierarchische Dashboard-Struktur

Effective EDA-Dashboards folgen einer Hierarchie vom Überblick zu Details:

Level 1: System Overview Dashboard - Gesamtdurchsatz aller Services - System-weite Error Rate - Maximaler Consumer Lag - Active Alerts

Level 2: Service-spezifische Dashboards - Service-Throughput und Latenz - Topic-spezifische Metriken - Error Rates nach Event-Typ - Resource Utilization

Level 3: Troubleshooting Dashboards - Detailed Error Traces - Event Flow Visualization - Consumer Group Details - Partition-level Metrics

64.3.2 Grafana Dashboard-Konfiguration

System Overview Panel:

{
  "dashboard": {
    "title": "EDA System Overview",
    "panels": [
      {
        "title": "Event Throughput",
        "type": "stat",
        "targets": [
          {
            "expr": "sum(rate(kafka_events_received_total[5m]))",
            "legendFormat": "Events/sec"
          }
        ]
      },
      {
        "title": "Consumer Lag Heatmap", 
        "type": "heatmap",
        "targets": [
          {
            "expr": "kafka_consumer_lag",
            "legendFormat": "{{group}}-{{topic}}"
          }
        ]
      },
      {
        "title": "Error Rate by Service",
        "type": "graph", 
        "targets": [
          {
            "expr": "rate(events_processed_total{status=\"error\"}[5m]) / rate(events_processed_total[5m])",
            "legendFormat": "{{service}}"
          }
        ]
      }
    ]
  }
}

64.3.3 Event Flow Visualization

End-to-End Trace Dashboard:

@Component
public class EventFlowTracer {
    
    @EventListener
    public void onOrderPlaced(OrderPlaced event) {
        TraceContext trace = TraceContext.newBuilder()
                .traceId(generateTraceId())
                .spanId(generateSpanId())
                .sampled(true)
                .build();
                
        Span span = tracer.nextSpan(trace)
                .name("order-processing-flow")
                .tag("order.id", event.getOrderId())
                .tag("customer.id", event.getCustomerId())
                .start();
                
        // Event-Metadaten für Flow-Tracking
        event.setTraceId(trace.traceId());
        event.setFlowStartTime(Instant.now());
    }
    
    @EventListener 
    public void onPaymentProcessed(PaymentProcessed event) {
        if (event.getTraceId() != null) {
            Span span = tracer.nextSpan()
                    .name("payment-completed")
                    .tag("trace.id", event.getTraceId())
                    .tag("payment.amount", event.getAmount().toString())
                    .start();
            span.end();
        }
    }
}

64.3.4 Operational Dashboard Patterns

Service Health Overview:

class ServiceHealthDashboard:
    def __init__(self):
        self.health_indicators = {
            'consumer_lag': self.check_consumer_lag,
            'error_rate': self.check_error_rate, 
            'processing_time': self.check_processing_time,
            'resource_usage': self.check_resource_usage
        }
    
    def get_service_health(self, service_name: str) -> Dict[str, str]:
        health_status = {}
        
        for indicator_name, check_function in self.health_indicators.items():
            try:
                status = check_function(service_name)
                health_status[indicator_name] = status
            except Exception as e:
                health_status[indicator_name] = 'UNKNOWN'
                
        overall_health = self.calculate_overall_health(health_status)
        health_status['overall'] = overall_health
        
        return health_status
    
    def calculate_overall_health(self, indicators: Dict[str, str]) -> str:
        if any(status == 'CRITICAL' for status in indicators.values()):
            return 'CRITICAL'
        elif any(status == 'WARNING' for status in indicators.values()):
            return 'WARNING'  
        elif all(status == 'HEALTHY' for status in indicators.values()):
            return 'HEALTHY'
        else:
            return 'UNKNOWN'

Real-time Event Monitoring:

@RestController
public class EventMonitoringController {
    
    @GetMapping("/api/monitoring/event-flow")
    public Flux<EventFlowStatus> getEventFlow() {
        return Flux.interval(Duration.ofSeconds(5))
                .map(tick -> {
                    EventFlowStatus status = new EventFlowStatus();
                    status.setTimestamp(Instant.now());
                    status.setActiveConsumers(getActiveConsumerCount());
                    status.setTotalLag(getTotalConsumerLag());
                    status.setThroughput(getCurrentThroughput());
                    status.setErrorRate(getCurrentErrorRate());
                    return status;
                })
                .doOnNext(status -> log.debug("Event flow status: {}", status));
    }
}

Diese Monitoring-Strategien geben Ihnen die Transparenz, die Sie für den erfolgreichen Betrieb von Event-Driven Architecture benötigen. Der Schlüssel liegt darin, sowohl technische als auch Business-Metriken zu verfolgen und dabei die Besonderheiten asynchroner Systeme zu berücksichtigen.