66 Alerting für Eventfluss und Fehlerverhalten

Alerting in Event-Driven Systemen unterscheidet sich fundamental von traditionellen Request-Response-Systemen. Statt einzelner fehlgeschlagener Requests müssen Sie den kontinuierlichen Fluss von Events überwachen, asymmetrische Latenz-Patterns verstehen und Business-Impact über mehrere Services hinweg bewerten. Effective Alerting verhindert dabei sowohl Alert-Fatigue als auch übersehene kritische Probleme.

66.1 SLA Definition

66.1.1 Event-Flow-basierte SLAs

In EDA definieren Sie SLAs nicht nur für einzelne Services, sondern für End-to-End-Geschäftsprozesse:

Business Process SLA Framework:

@Configuration
public class BusinessProcessSLAs {
    
    @Bean
    public Map<String, ProcessSLA> processDefinitions() {
        Map<String, ProcessSLA> slas = new HashMap<>();
        
        // Order-to-Payment SLA
        slas.put("order-fulfillment", ProcessSLA.builder()
                .processName("order-fulfillment")
                .startEvent("OrderPlaced")
                .endEvent("OrderShipped")
                .targetDuration(Duration.ofMinutes(45))
                .warningThreshold(Duration.ofMinutes(30))
                .criticalThreshold(Duration.ofMinutes(60))
                .businessImpact(BusinessImpact.HIGH)
                .build());
                
        // Payment Processing SLA  
        slas.put("payment-processing", ProcessSLA.builder()
                .processName("payment-processing")
                .startEvent("OrderPlaced")
                .endEvent("PaymentProcessed")
                .targetDuration(Duration.ofMinutes(5))
                .warningThreshold(Duration.ofMinutes(3))
                .criticalThreshold(Duration.ofMinutes(10))
                .businessImpact(BusinessImpact.CRITICAL)
                .build());
                
        return slas;
    }
}

@Data
@Builder
public class ProcessSLA {
    private String processName;
    private String startEvent;
    private String endEvent;
    private Duration targetDuration;
    private Duration warningThreshold;
    private Duration criticalThreshold;
    private BusinessImpact businessImpact;
    private double availabilityTarget; // 99.9%
    private double errorRateThreshold; // 1%
}

66.1.2 Service-Level SLA Definitions

Neben Business-Process-SLAs benötigen Sie Service-spezifische SLAs:

Service-Typ Latenz-SLA Durchsatz-SLA Verfügbarkeit Fehlerrate
Order Service P95 < 200ms > 1000 orders/min 99.95% < 0.1%
Payment Service P95 < 500ms > 500 payments/min 99.99% < 0.01%
Inventory Service P95 < 100ms > 2000 checks/min 99.9% < 0.5%
Notification Service P95 < 1000ms > 5000 notifications/min 99.5% < 2%

Python SLA Monitoring:

from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime, timedelta
import asyncio

@dataclass
class SLATarget:
    service_name: str
    metric_type: str  # latency, throughput, availability, error_rate
    target_value: float
    measurement_window: timedelta
    alert_threshold: float

class SLAMonitor:
    def __init__(self):
        self.sla_targets = {
            'order-service': [
                SLATarget('order-service', 'latency_p95', 200.0, timedelta(minutes=5), 250.0),
                SLATarget('order-service', 'throughput', 1000.0, timedelta(minutes=1), 800.0),
                SLATarget('order-service', 'error_rate', 0.001, timedelta(minutes=5), 0.005),
                SLATarget('order-service', 'availability', 0.9995, timedelta(hours=1), 0.999)
            ],
            'payment-service': [
                SLATarget('payment-service', 'latency_p95', 500.0, timedelta(minutes=5), 750.0),
                SLATarget('payment-service', 'error_rate', 0.0001, timedelta(minutes=5), 0.001)
            ]
        }
        
    async def check_sla_violations(self) -> List[SLAViolation]:
        violations = []
        
        for service_name, targets in self.sla_targets.items():
            for target in targets:
                current_value = await self.get_current_metric_value(
                    service_name, 
                    target.metric_type, 
                    target.measurement_window
                )
                
                if self.is_violation(current_value, target):
                    violation = SLAViolation(
                        service=service_name,
                        metric=target.metric_type,
                        target_value=target.target_value,
                        actual_value=current_value,
                        severity=self.calculate_severity(current_value, target),
                        timestamp=datetime.now()
                    )
                    violations.append(violation)
                    
        return violations
    
    def is_violation(self, current_value: float, target: SLATarget) -> bool:
        if target.metric_type in ['latency_p95', 'error_rate']:
            return current_value > target.alert_threshold
        elif target.metric_type in ['throughput', 'availability']:
            return current_value < target.alert_threshold
        return False
    
    def calculate_severity(self, actual_value: float, target: SLATarget) -> str:
        deviation = abs(actual_value - target.target_value) / target.target_value
        
        if deviation > 0.5:  # 50% Abweichung
            return 'critical'
        elif deviation > 0.2:  # 20% Abweichung
            return 'warning'
        else:
            return 'info'

66.1.3 Event-Lag SLA Definitions

Consumer Lag ist eine spezielle Metrik in EDA-Systemen:

@Component
public class EventLagSLAMonitor {
    
    private final Map<String, LagSLA> lagSLAs = Map.of(
        "payment-service", new LagSLA(Duration.ofSeconds(30), Duration.ofMinutes(2), 1000),
        "inventory-service", new LagSLA(Duration.ofSeconds(10), Duration.ofMinutes(1), 500),
        "notification-service", new LagSLA(Duration.ofMinutes(5), Duration.ofMinutes(10), 10000)
    );
    
    @Scheduled(fixedDelay = 30000) // Alle 30 Sekunden
    public void checkLagSLAs() {
        kafkaAdminClient.listConsumerGroups().all()
                .thenCompose(groups -> checkGroupsLag(groups))
                .thenAccept(violations -> violations.forEach(this::sendAlert));
    }
    
    private CompletableFuture<List<LagViolation>> checkGroupsLag(
            Collection<ConsumerGroupListing> groups) {
        
        return groups.stream()
                .filter(group -> lagSLAs.containsKey(group.groupId()))
                .map(this::checkSingleGroupLag)
                .collect(Collectors.toList())
                .stream()
                .reduce(CompletableFuture.completedFuture(new ArrayList<>()),
                        (acc, future) -> acc.thenCombine(future, (list1, list2) -> {
                            list1.addAll(list2);
                            return list1;
                        }));
    }
    
    private CompletableFuture<List<LagViolation>> checkSingleGroupLag(
            ConsumerGroupListing group) {
        
        LagSLA sla = lagSLAs.get(group.groupId());
        
        return kafkaAdminClient.listConsumerGroupOffsets(group.groupId())
                .partitionsToOffsetAndMetadata()
                .thenCompose(offsets -> calculateLag(offsets))
                .thenApply(totalLag -> {
                    List<LagViolation> violations = new ArrayList<>();
                    
                    if (totalLag > sla.getCriticalThreshold()) {
                        violations.add(new LagViolation(
                                group.groupId(), 
                                totalLag, 
                                Severity.CRITICAL,
                                "Consumer lag exceeds critical threshold"));
                    } else if (totalLag > sla.getWarningThreshold()) {
                        violations.add(new LagViolation(
                                group.groupId(), 
                                totalLag, 
                                Severity.WARNING,
                                "Consumer lag exceeds warning threshold"));
                    }
                    
                    return violations;
                });
    }
}

@Data
@AllArgsConstructor
public class LagSLA {
    private Duration warningThreshold;
    private Duration criticalThreshold;
    private long messageCountThreshold;
}

66.2 Alert Design

66.2.1 Hierarchische Alert-Struktur

Alerts in EDA-Systemen sollten hierarchisch organisiert sein, um verschiedene Zielgruppen zu erreichen:

Alert Severity Levels:

from enum import Enum
from typing import List, Dict, Optional
from datetime import datetime

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"
    EMERGENCY = "emergency"

class AlertCategory(Enum):
    BUSINESS_PROCESS = "business_process"
    TECHNICAL_INFRASTRUCTURE = "technical_infrastructure" 
    DATA_QUALITY = "data_quality"
    SECURITY = "security"

@dataclass
class Alert:
    id: str
    severity: AlertSeverity
    category: AlertCategory
    title: str
    description: str
    service: str
    metric_name: str
    current_value: float
    threshold_value: float
    timestamp: datetime
    tags: Dict[str, str]
    runbook_url: Optional[str] = None
    escalation_policy: Optional[str] = None

class AlertManager:
    def __init__(self):
        self.alert_policies = {
            AlertSeverity.INFO: {
                'notification_channels': ['slack-monitoring'],
                'escalation_delay': timedelta(hours=4),
                'auto_resolve': True
            },
            AlertSeverity.WARNING: {
                'notification_channels': ['slack-alerts', 'email-dev-team'],
                'escalation_delay': timedelta(minutes=30),
                'auto_resolve': False
            },
            AlertSeverity.CRITICAL: {
                'notification_channels': ['slack-alerts', 'email-dev-team', 'pagerduty'],
                'escalation_delay': timedelta(minutes=15),
                'auto_resolve': False
            },
            AlertSeverity.EMERGENCY: {
                'notification_channels': ['pagerduty-high', 'sms-oncall'],
                'escalation_delay': timedelta(minutes=5),
                'auto_resolve': False
            }
        }
    
    async def send_alert(self, alert: Alert):
        policy = self.alert_policies[alert.severity]
        
        # Immediate Notifications
        for channel in policy['notification_channels']:
            await self.send_notification(channel, alert)
        
        # Schedule Escalation if needed
        if not policy['auto_resolve']:
            await self.schedule_escalation(alert, policy['escalation_delay'])
    
    async def send_notification(self, channel: str, alert: Alert):
        if channel.startswith('slack-'):
            await self.send_slack_alert(channel, alert)
        elif channel.startswith('email-'):
            await self.send_email_alert(channel, alert)
        elif channel.startswith('pagerduty'):
            await self.send_pagerduty_alert(channel, alert)

66.2.2 Context-Aware Alerting

Alerts sollten ausreichend Kontext enthalten, um schnelle Diagnose zu ermöglichen:

@Service
public class ContextualAlertService {
    
    public void sendConsumerLagAlert(String consumerGroup, long lagMessages, Duration lagTime) {
        AlertContext context = buildLagAlertContext(consumerGroup, lagMessages, lagTime);
        
        Alert alert = Alert.builder()
                .severity(determineSeverity(lagMessages))
                .title(String.format("Consumer Lag Alert: %s", consumerGroup))
                .description(buildLagDescription(lagMessages, lagTime))
                .context(context)
                .runbookUrl("https://wiki.company.com/runbooks/consumer-lag")
                .tags(Map.of(
                    "consumer_group", consumerGroup,
                    "alert_type", "consumer_lag",
                    "service", extractServiceFromGroup(consumerGroup)
                ))
                .build();
                
        alertManager.send(alert);
    }
    
    private AlertContext buildLagAlertContext(String group, long lagMessages, Duration lagTime) {
        return AlertContext.builder()
                .consumerGroup(group)
                .lagMessages(lagMessages)
                .lagDuration(lagTime)
                .relatedTopics(getTopicsForGroup(group))
                .recentErrorRate(getRecentErrorRate(group))
                .averageProcessingTime(getAverageProcessingTime(group))
                .relatedServices(getDownstreamServices(group))
                .suggestedActions(generateSuggestedActions(lagMessages))
                .impactedBusinessProcesses(getImpactedProcesses(group))
                .build();
    }
    
    private List<String> generateSuggestedActions(long lagMessages) {
        List<String> actions = new ArrayList<>();
        
        if (lagMessages > 100000) {
            actions.add("Consider scaling up consumer instances");
            actions.add("Check for downstream service issues");
        } else if (lagMessages > 10000) {
            actions.add("Review recent deployments");
            actions.add("Check consumer application logs");
        } else {
            actions.add("Monitor for trend continuation");
        }
        
        return actions;
    }
}

66.2.3 Smart Alert Aggregation

Vermeiden Sie Alert-Storms durch intelligente Aggregation:

from collections import defaultdict
from typing import Set
import asyncio

class AlertAggregator:
    def __init__(self):
        self.pending_alerts: Dict[str, List[Alert]] = defaultdict(list)
        self.suppression_rules = {
            'consumer_lag': {
                'window': timedelta(minutes=5),
                'max_alerts': 3,
                'correlation_keys': ['consumer_group', 'service']
            },
            'error_rate': {
                'window': timedelta(minutes=2),
                'max_alerts': 5,
                'correlation_keys': ['service', 'error_type']
            }
        }
    
    async def process_alert(self, alert: Alert) -> bool:
        """Returns True if alert should be sent, False if suppressed"""
        
        alert_type = alert.tags.get('alert_type', 'unknown')
        
        if alert_type not in self.suppression_rules:
            return True  # Send immediately if no suppression rule
            
        rule = self.suppression_rules[alert_type]
        correlation_key = self.build_correlation_key(alert, rule['correlation_keys'])
        
        # Add to pending alerts
        self.pending_alerts[correlation_key].append(alert)
        
        # Clean old alerts outside window
        cutoff_time = datetime.now() - rule['window']
        self.pending_alerts[correlation_key] = [
            a for a in self.pending_alerts[correlation_key] 
            if a.timestamp > cutoff_time
        ]
        
        # Check if we should send aggregated alert
        if len(self.pending_alerts[correlation_key]) >= rule['max_alerts']:
            await self.send_aggregated_alert(correlation_key, self.pending_alerts[correlation_key])
            self.pending_alerts[correlation_key] = []  # Clear after sending
            return False  # Don't send individual alert
        
        # Send first alert immediately, suppress subsequent ones
        return len(self.pending_alerts[correlation_key]) == 1
    
    async def send_aggregated_alert(self, correlation_key: str, alerts: List[Alert]):
        aggregated = Alert(
            id=f"aggregated-{correlation_key}-{datetime.now().isoformat()}",
            severity=max(alert.severity for alert in alerts),
            category=alerts[0].category,
            title=f"Multiple {alerts[0].tags.get('alert_type', 'alerts')} for {correlation_key}",
            description=f"Received {len(alerts)} similar alerts in the last few minutes",
            service=alerts[0].service,
            metric_name="aggregated",
            current_value=len(alerts),
            threshold_value=1,
            timestamp=datetime.now(),
            tags={**alerts[0].tags, 'aggregated': 'true', 'alert_count': str(len(alerts))}
        )
        
        await self.alert_manager.send_alert(aggregated)
    
    def build_correlation_key(self, alert: Alert, correlation_keys: List[str]) -> str:
        key_parts = [alert.tags.get(key, 'unknown') for key in correlation_keys]
        return '-'.join(key_parts)

66.3 Escalation Procedures

66.3.1 Time-based Escalation

@Component
public class EscalationManager {
    
    private final Map<String, EscalationPolicy> policies = Map.of(
        "payment-critical", EscalationPolicy.builder()
                .level1(Duration.ZERO, List.of("slack-payments", "pagerduty-payments"))
                .level2(Duration.ofMinutes(5), List.of("phone-oncall-lead"))
                .level3(Duration.ofMinutes(15), List.of("escalation-manager"))
                .level4(Duration.ofMinutes(30), List.of("director-engineering"))
                .build(),
                
        "order-processing", EscalationPolicy.builder()
                .level1(Duration.ZERO, List.of("slack-orders"))
                .level2(Duration.ofMinutes(15), List.of("email-team-lead"))
                .level3(Duration.ofMinutes(45), List.of("pagerduty-oncall"))
                .build()
    );
    
    @Async
    public void startEscalation(Alert alert) {
        String policyName = determineEscalationPolicy(alert);
        EscalationPolicy policy = policies.get(policyName);
        
        if (policy == null) {
            log.warn("No escalation policy found for alert: {}", alert.getId());
            return;
        }
        
        scheduleEscalationLevels(alert, policy);
    }
    
    private void scheduleEscalationLevels(Alert alert, EscalationPolicy policy) {
        // Level 1 - Immediate
        sendNotifications(alert, policy.getLevel1Channels());
        
        // Schedule subsequent levels
        taskScheduler.schedule(
            () -> {
                if (!isAlertResolved(alert.getId())) {
                    sendNotifications(alert, policy.getLevel2Channels());
                }
            },
            Instant.now().plus(policy.getLevel2Delay())
        );
        
        taskScheduler.schedule(
            () -> {
                if (!isAlertResolved(alert.getId())) {
                    sendNotifications(alert, policy.getLevel3Channels());
                    // Escalate alert severity for Level 3
                    Alert escalatedAlert = alert.toBuilder()
                            .severity(AlertSeverity.EMERGENCY)
                            .title("[ESCALATED] " + alert.getTitle())
                            .build();
                    sendNotifications(escalatedAlert, policy.getLevel3Channels());
                }
            },
            Instant.now().plus(policy.getLevel3Delay())
        );
    }
}

66.3.2 Business Impact-based Escalation

class BusinessImpactEscalator:
    def __init__(self):
        self.business_process_priorities = {
            'payment-processing': {
                'revenue_impact_per_minute': 10000,  # $10k/min
                'customer_impact_threshold': 100,    # customers affected
                'escalation_multiplier': 3.0
            },
            'order-fulfillment': {
                'revenue_impact_per_minute': 5000,   # $5k/min
                'customer_impact_threshold': 50,
                'escalation_multiplier': 2.0
            },
            'inventory-management': {
                'revenue_impact_per_minute': 2000,   # $2k/min
                'customer_impact_threshold': 200,
                'escalation_multiplier': 1.5
            }
        }
    
    def calculate_escalation_urgency(self, alert: Alert) -> float:
        """Returns escalation urgency score (0-10)"""
        
        process = alert.tags.get('business_process')
        if process not in self.business_process_priorities:
            return 5.0  # Default urgency
            
        priority = self.business_process_priorities[process]
        
        # Base urgency from alert severity
        base_urgency = {
            AlertSeverity.INFO: 1.0,
            AlertSeverity.WARNING: 3.0,
            AlertSeverity.CRITICAL: 7.0,
            AlertSeverity.EMERGENCY: 9.0
        }.get(alert.severity, 5.0)
        
        # Business impact multipliers
        impact_multiplier = 1.0
        
        # Revenue impact
        if hasattr(alert, 'estimated_revenue_impact'):
            if alert.estimated_revenue_impact > priority['revenue_impact_per_minute']:
                impact_multiplier *= priority['escalation_multiplier']
        
        # Customer impact
        if hasattr(alert, 'affected_customers'):
            if alert.affected_customers > priority['customer_impact_threshold']:
                impact_multiplier *= 1.5
        
        # Time factor - urgency increases over time
        alert_age_minutes = (datetime.now() - alert.timestamp).total_seconds() / 60
        time_multiplier = 1.0 + (alert_age_minutes * 0.1)  # 10% increase per minute
        
        urgency_score = min(10.0, base_urgency * impact_multiplier * time_multiplier)
        return urgency_score
    
    async def escalate_based_on_impact(self, alert: Alert):
        urgency = self.calculate_escalation_urgency(alert)
        
        if urgency >= 8.0:
            # Emergency escalation
            await self.send_emergency_escalation(alert)
        elif urgency >= 6.0:
            # Fast escalation
            await self.schedule_escalation(alert, delay=timedelta(minutes=5))
        elif urgency >= 4.0:
            # Standard escalation
            await self.schedule_escalation(alert, delay=timedelta(minutes=15))
        else:
            # Slow escalation
            await self.schedule_escalation(alert, delay=timedelta(hours=1))

66.3.3 Automated Resolution and De-escalation

@Component
public class AutoResolutionService {
    
    @EventListener
    public void onMetricImprovement(MetricImprovementEvent event) {
        List<Alert> relatedAlerts = alertRepository.findActiveAlertsByMetric(
                event.getServiceName(), 
                event.getMetricName());
                
        for (Alert alert : relatedAlerts) {
            if (shouldAutoResolve(alert, event)) {
                resolveAlert(alert, "Auto-resolved: metric returned to normal");
            }
        }
    }
    
    private boolean shouldAutoResolve(Alert alert, MetricImprovementEvent event) {
        // Only auto-resolve if metric has been stable for sufficient time
        Duration stableFor = Duration.between(event.getFirstImprovementTime(), Instant.now());
        Duration requiredStability = getRequiredStabilityPeriod(alert.getSeverity());
        
        return stableFor.compareTo(requiredStability) >= 0 &&
               event.getCurrentValue() <= alert.getThresholdValue() * 0.8; // 20% buffer
    }
    
    private Duration getRequiredStabilityPeriod(AlertSeverity severity) {
        return switch (severity) {
            case INFO -> Duration.ofMinutes(2);
            case WARNING -> Duration.ofMinutes(5);
            case CRITICAL -> Duration.ofMinutes(10);
            case EMERGENCY -> Duration.ofMinutes(15);
        };
    }
    
    private void resolveAlert(Alert alert, String resolution) {
        alert.setStatus(AlertStatus.RESOLVED);
        alert.setResolutionTime(Instant.now());
        alert.setResolutionNote(resolution);
        
        alertRepository.save(alert);
        
        // Notify stakeholders of resolution
        notificationService.sendResolutionNotification(alert);
        
        // Cancel any pending escalations
        escalationManager.cancelEscalation(alert.getId());
    }
}

Diese Alerting-Strategien helfen Ihnen dabei, in Event-Driven Architekturen sowohl technische als auch Business-Probleme frühzeitig zu erkennen und angemessen darauf zu reagieren. Der Schlüssel liegt in der Balance zwischen Sensitivität und Spezifität der Alerts sowie in der Berücksichtigung der besonderen Charakteristika asynchroner Systeme.