Alerting in Event-Driven Systemen unterscheidet sich fundamental von traditionellen Request-Response-Systemen. Statt einzelner fehlgeschlagener Requests müssen Sie den kontinuierlichen Fluss von Events überwachen, asymmetrische Latenz-Patterns verstehen und Business-Impact über mehrere Services hinweg bewerten. Effective Alerting verhindert dabei sowohl Alert-Fatigue als auch übersehene kritische Probleme.
In EDA definieren Sie SLAs nicht nur für einzelne Services, sondern für End-to-End-Geschäftsprozesse:
Business Process SLA Framework:
@Configuration
public class BusinessProcessSLAs {
@Bean
public Map<String, ProcessSLA> processDefinitions() {
Map<String, ProcessSLA> slas = new HashMap<>();
// Order-to-Payment SLA
slas.put("order-fulfillment", ProcessSLA.builder()
.processName("order-fulfillment")
.startEvent("OrderPlaced")
.endEvent("OrderShipped")
.targetDuration(Duration.ofMinutes(45))
.warningThreshold(Duration.ofMinutes(30))
.criticalThreshold(Duration.ofMinutes(60))
.businessImpact(BusinessImpact.HIGH)
.build());
// Payment Processing SLA
slas.put("payment-processing", ProcessSLA.builder()
.processName("payment-processing")
.startEvent("OrderPlaced")
.endEvent("PaymentProcessed")
.targetDuration(Duration.ofMinutes(5))
.warningThreshold(Duration.ofMinutes(3))
.criticalThreshold(Duration.ofMinutes(10))
.businessImpact(BusinessImpact.CRITICAL)
.build());
return slas;
}
}
@Data
@Builder
public class ProcessSLA {
private String processName;
private String startEvent;
private String endEvent;
private Duration targetDuration;
private Duration warningThreshold;
private Duration criticalThreshold;
private BusinessImpact businessImpact;
private double availabilityTarget; // 99.9%
private double errorRateThreshold; // 1%
}Neben Business-Process-SLAs benötigen Sie Service-spezifische SLAs:
| Service-Typ | Latenz-SLA | Durchsatz-SLA | Verfügbarkeit | Fehlerrate |
|---|---|---|---|---|
| Order Service | P95 < 200ms | > 1000 orders/min | 99.95% | < 0.1% |
| Payment Service | P95 < 500ms | > 500 payments/min | 99.99% | < 0.01% |
| Inventory Service | P95 < 100ms | > 2000 checks/min | 99.9% | < 0.5% |
| Notification Service | P95 < 1000ms | > 5000 notifications/min | 99.5% | < 2% |
Python SLA Monitoring:
from dataclasses import dataclass
from typing import Dict, List
from datetime import datetime, timedelta
import asyncio
@dataclass
class SLATarget:
service_name: str
metric_type: str # latency, throughput, availability, error_rate
target_value: float
measurement_window: timedelta
alert_threshold: float
class SLAMonitor:
def __init__(self):
self.sla_targets = {
'order-service': [
SLATarget('order-service', 'latency_p95', 200.0, timedelta(minutes=5), 250.0),
SLATarget('order-service', 'throughput', 1000.0, timedelta(minutes=1), 800.0),
SLATarget('order-service', 'error_rate', 0.001, timedelta(minutes=5), 0.005),
SLATarget('order-service', 'availability', 0.9995, timedelta(hours=1), 0.999)
],
'payment-service': [
SLATarget('payment-service', 'latency_p95', 500.0, timedelta(minutes=5), 750.0),
SLATarget('payment-service', 'error_rate', 0.0001, timedelta(minutes=5), 0.001)
]
}
async def check_sla_violations(self) -> List[SLAViolation]:
violations = []
for service_name, targets in self.sla_targets.items():
for target in targets:
current_value = await self.get_current_metric_value(
service_name,
target.metric_type,
target.measurement_window
)
if self.is_violation(current_value, target):
violation = SLAViolation(
service=service_name,
metric=target.metric_type,
target_value=target.target_value,
actual_value=current_value,
severity=self.calculate_severity(current_value, target),
timestamp=datetime.now()
)
violations.append(violation)
return violations
def is_violation(self, current_value: float, target: SLATarget) -> bool:
if target.metric_type in ['latency_p95', 'error_rate']:
return current_value > target.alert_threshold
elif target.metric_type in ['throughput', 'availability']:
return current_value < target.alert_threshold
return False
def calculate_severity(self, actual_value: float, target: SLATarget) -> str:
deviation = abs(actual_value - target.target_value) / target.target_value
if deviation > 0.5: # 50% Abweichung
return 'critical'
elif deviation > 0.2: # 20% Abweichung
return 'warning'
else:
return 'info'Consumer Lag ist eine spezielle Metrik in EDA-Systemen:
@Component
public class EventLagSLAMonitor {
private final Map<String, LagSLA> lagSLAs = Map.of(
"payment-service", new LagSLA(Duration.ofSeconds(30), Duration.ofMinutes(2), 1000),
"inventory-service", new LagSLA(Duration.ofSeconds(10), Duration.ofMinutes(1), 500),
"notification-service", new LagSLA(Duration.ofMinutes(5), Duration.ofMinutes(10), 10000)
);
@Scheduled(fixedDelay = 30000) // Alle 30 Sekunden
public void checkLagSLAs() {
kafkaAdminClient.listConsumerGroups().all()
.thenCompose(groups -> checkGroupsLag(groups))
.thenAccept(violations -> violations.forEach(this::sendAlert));
}
private CompletableFuture<List<LagViolation>> checkGroupsLag(
Collection<ConsumerGroupListing> groups) {
return groups.stream()
.filter(group -> lagSLAs.containsKey(group.groupId()))
.map(this::checkSingleGroupLag)
.collect(Collectors.toList())
.stream()
.reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(acc, future) -> acc.thenCombine(future, (list1, list2) -> {
list1.addAll(list2);
return list1;
}));
}
private CompletableFuture<List<LagViolation>> checkSingleGroupLag(
ConsumerGroupListing group) {
LagSLA sla = lagSLAs.get(group.groupId());
return kafkaAdminClient.listConsumerGroupOffsets(group.groupId())
.partitionsToOffsetAndMetadata()
.thenCompose(offsets -> calculateLag(offsets))
.thenApply(totalLag -> {
List<LagViolation> violations = new ArrayList<>();
if (totalLag > sla.getCriticalThreshold()) {
violations.add(new LagViolation(
group.groupId(),
totalLag,
Severity.CRITICAL,
"Consumer lag exceeds critical threshold"));
} else if (totalLag > sla.getWarningThreshold()) {
violations.add(new LagViolation(
group.groupId(),
totalLag,
Severity.WARNING,
"Consumer lag exceeds warning threshold"));
}
return violations;
});
}
}
@Data
@AllArgsConstructor
public class LagSLA {
private Duration warningThreshold;
private Duration criticalThreshold;
private long messageCountThreshold;
}Alerts in EDA-Systemen sollten hierarchisch organisiert sein, um verschiedene Zielgruppen zu erreichen:
Alert Severity Levels:
from enum import Enum
from typing import List, Dict, Optional
from datetime import datetime
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
class AlertCategory(Enum):
BUSINESS_PROCESS = "business_process"
TECHNICAL_INFRASTRUCTURE = "technical_infrastructure"
DATA_QUALITY = "data_quality"
SECURITY = "security"
@dataclass
class Alert:
id: str
severity: AlertSeverity
category: AlertCategory
title: str
description: str
service: str
metric_name: str
current_value: float
threshold_value: float
timestamp: datetime
tags: Dict[str, str]
runbook_url: Optional[str] = None
escalation_policy: Optional[str] = None
class AlertManager:
def __init__(self):
self.alert_policies = {
AlertSeverity.INFO: {
'notification_channels': ['slack-monitoring'],
'escalation_delay': timedelta(hours=4),
'auto_resolve': True
},
AlertSeverity.WARNING: {
'notification_channels': ['slack-alerts', 'email-dev-team'],
'escalation_delay': timedelta(minutes=30),
'auto_resolve': False
},
AlertSeverity.CRITICAL: {
'notification_channels': ['slack-alerts', 'email-dev-team', 'pagerduty'],
'escalation_delay': timedelta(minutes=15),
'auto_resolve': False
},
AlertSeverity.EMERGENCY: {
'notification_channels': ['pagerduty-high', 'sms-oncall'],
'escalation_delay': timedelta(minutes=5),
'auto_resolve': False
}
}
async def send_alert(self, alert: Alert):
policy = self.alert_policies[alert.severity]
# Immediate Notifications
for channel in policy['notification_channels']:
await self.send_notification(channel, alert)
# Schedule Escalation if needed
if not policy['auto_resolve']:
await self.schedule_escalation(alert, policy['escalation_delay'])
async def send_notification(self, channel: str, alert: Alert):
if channel.startswith('slack-'):
await self.send_slack_alert(channel, alert)
elif channel.startswith('email-'):
await self.send_email_alert(channel, alert)
elif channel.startswith('pagerduty'):
await self.send_pagerduty_alert(channel, alert)Alerts sollten ausreichend Kontext enthalten, um schnelle Diagnose zu ermöglichen:
@Service
public class ContextualAlertService {
public void sendConsumerLagAlert(String consumerGroup, long lagMessages, Duration lagTime) {
AlertContext context = buildLagAlertContext(consumerGroup, lagMessages, lagTime);
Alert alert = Alert.builder()
.severity(determineSeverity(lagMessages))
.title(String.format("Consumer Lag Alert: %s", consumerGroup))
.description(buildLagDescription(lagMessages, lagTime))
.context(context)
.runbookUrl("https://wiki.company.com/runbooks/consumer-lag")
.tags(Map.of(
"consumer_group", consumerGroup,
"alert_type", "consumer_lag",
"service", extractServiceFromGroup(consumerGroup)
))
.build();
alertManager.send(alert);
}
private AlertContext buildLagAlertContext(String group, long lagMessages, Duration lagTime) {
return AlertContext.builder()
.consumerGroup(group)
.lagMessages(lagMessages)
.lagDuration(lagTime)
.relatedTopics(getTopicsForGroup(group))
.recentErrorRate(getRecentErrorRate(group))
.averageProcessingTime(getAverageProcessingTime(group))
.relatedServices(getDownstreamServices(group))
.suggestedActions(generateSuggestedActions(lagMessages))
.impactedBusinessProcesses(getImpactedProcesses(group))
.build();
}
private List<String> generateSuggestedActions(long lagMessages) {
List<String> actions = new ArrayList<>();
if (lagMessages > 100000) {
actions.add("Consider scaling up consumer instances");
actions.add("Check for downstream service issues");
} else if (lagMessages > 10000) {
actions.add("Review recent deployments");
actions.add("Check consumer application logs");
} else {
actions.add("Monitor for trend continuation");
}
return actions;
}
}Vermeiden Sie Alert-Storms durch intelligente Aggregation:
from collections import defaultdict
from typing import Set
import asyncio
class AlertAggregator:
def __init__(self):
self.pending_alerts: Dict[str, List[Alert]] = defaultdict(list)
self.suppression_rules = {
'consumer_lag': {
'window': timedelta(minutes=5),
'max_alerts': 3,
'correlation_keys': ['consumer_group', 'service']
},
'error_rate': {
'window': timedelta(minutes=2),
'max_alerts': 5,
'correlation_keys': ['service', 'error_type']
}
}
async def process_alert(self, alert: Alert) -> bool:
"""Returns True if alert should be sent, False if suppressed"""
alert_type = alert.tags.get('alert_type', 'unknown')
if alert_type not in self.suppression_rules:
return True # Send immediately if no suppression rule
rule = self.suppression_rules[alert_type]
correlation_key = self.build_correlation_key(alert, rule['correlation_keys'])
# Add to pending alerts
self.pending_alerts[correlation_key].append(alert)
# Clean old alerts outside window
cutoff_time = datetime.now() - rule['window']
self.pending_alerts[correlation_key] = [
a for a in self.pending_alerts[correlation_key]
if a.timestamp > cutoff_time
]
# Check if we should send aggregated alert
if len(self.pending_alerts[correlation_key]) >= rule['max_alerts']:
await self.send_aggregated_alert(correlation_key, self.pending_alerts[correlation_key])
self.pending_alerts[correlation_key] = [] # Clear after sending
return False # Don't send individual alert
# Send first alert immediately, suppress subsequent ones
return len(self.pending_alerts[correlation_key]) == 1
async def send_aggregated_alert(self, correlation_key: str, alerts: List[Alert]):
aggregated = Alert(
id=f"aggregated-{correlation_key}-{datetime.now().isoformat()}",
severity=max(alert.severity for alert in alerts),
category=alerts[0].category,
title=f"Multiple {alerts[0].tags.get('alert_type', 'alerts')} for {correlation_key}",
description=f"Received {len(alerts)} similar alerts in the last few minutes",
service=alerts[0].service,
metric_name="aggregated",
current_value=len(alerts),
threshold_value=1,
timestamp=datetime.now(),
tags={**alerts[0].tags, 'aggregated': 'true', 'alert_count': str(len(alerts))}
)
await self.alert_manager.send_alert(aggregated)
def build_correlation_key(self, alert: Alert, correlation_keys: List[str]) -> str:
key_parts = [alert.tags.get(key, 'unknown') for key in correlation_keys]
return '-'.join(key_parts)@Component
public class EscalationManager {
private final Map<String, EscalationPolicy> policies = Map.of(
"payment-critical", EscalationPolicy.builder()
.level1(Duration.ZERO, List.of("slack-payments", "pagerduty-payments"))
.level2(Duration.ofMinutes(5), List.of("phone-oncall-lead"))
.level3(Duration.ofMinutes(15), List.of("escalation-manager"))
.level4(Duration.ofMinutes(30), List.of("director-engineering"))
.build(),
"order-processing", EscalationPolicy.builder()
.level1(Duration.ZERO, List.of("slack-orders"))
.level2(Duration.ofMinutes(15), List.of("email-team-lead"))
.level3(Duration.ofMinutes(45), List.of("pagerduty-oncall"))
.build()
);
@Async
public void startEscalation(Alert alert) {
String policyName = determineEscalationPolicy(alert);
EscalationPolicy policy = policies.get(policyName);
if (policy == null) {
log.warn("No escalation policy found for alert: {}", alert.getId());
return;
}
scheduleEscalationLevels(alert, policy);
}
private void scheduleEscalationLevels(Alert alert, EscalationPolicy policy) {
// Level 1 - Immediate
sendNotifications(alert, policy.getLevel1Channels());
// Schedule subsequent levels
taskScheduler.schedule(
() -> {
if (!isAlertResolved(alert.getId())) {
sendNotifications(alert, policy.getLevel2Channels());
}
},
Instant.now().plus(policy.getLevel2Delay())
);
taskScheduler.schedule(
() -> {
if (!isAlertResolved(alert.getId())) {
sendNotifications(alert, policy.getLevel3Channels());
// Escalate alert severity for Level 3
Alert escalatedAlert = alert.toBuilder()
.severity(AlertSeverity.EMERGENCY)
.title("[ESCALATED] " + alert.getTitle())
.build();
sendNotifications(escalatedAlert, policy.getLevel3Channels());
}
},
Instant.now().plus(policy.getLevel3Delay())
);
}
}class BusinessImpactEscalator:
def __init__(self):
self.business_process_priorities = {
'payment-processing': {
'revenue_impact_per_minute': 10000, # $10k/min
'customer_impact_threshold': 100, # customers affected
'escalation_multiplier': 3.0
},
'order-fulfillment': {
'revenue_impact_per_minute': 5000, # $5k/min
'customer_impact_threshold': 50,
'escalation_multiplier': 2.0
},
'inventory-management': {
'revenue_impact_per_minute': 2000, # $2k/min
'customer_impact_threshold': 200,
'escalation_multiplier': 1.5
}
}
def calculate_escalation_urgency(self, alert: Alert) -> float:
"""Returns escalation urgency score (0-10)"""
process = alert.tags.get('business_process')
if process not in self.business_process_priorities:
return 5.0 # Default urgency
priority = self.business_process_priorities[process]
# Base urgency from alert severity
base_urgency = {
AlertSeverity.INFO: 1.0,
AlertSeverity.WARNING: 3.0,
AlertSeverity.CRITICAL: 7.0,
AlertSeverity.EMERGENCY: 9.0
}.get(alert.severity, 5.0)
# Business impact multipliers
impact_multiplier = 1.0
# Revenue impact
if hasattr(alert, 'estimated_revenue_impact'):
if alert.estimated_revenue_impact > priority['revenue_impact_per_minute']:
impact_multiplier *= priority['escalation_multiplier']
# Customer impact
if hasattr(alert, 'affected_customers'):
if alert.affected_customers > priority['customer_impact_threshold']:
impact_multiplier *= 1.5
# Time factor - urgency increases over time
alert_age_minutes = (datetime.now() - alert.timestamp).total_seconds() / 60
time_multiplier = 1.0 + (alert_age_minutes * 0.1) # 10% increase per minute
urgency_score = min(10.0, base_urgency * impact_multiplier * time_multiplier)
return urgency_score
async def escalate_based_on_impact(self, alert: Alert):
urgency = self.calculate_escalation_urgency(alert)
if urgency >= 8.0:
# Emergency escalation
await self.send_emergency_escalation(alert)
elif urgency >= 6.0:
# Fast escalation
await self.schedule_escalation(alert, delay=timedelta(minutes=5))
elif urgency >= 4.0:
# Standard escalation
await self.schedule_escalation(alert, delay=timedelta(minutes=15))
else:
# Slow escalation
await self.schedule_escalation(alert, delay=timedelta(hours=1))@Component
public class AutoResolutionService {
@EventListener
public void onMetricImprovement(MetricImprovementEvent event) {
List<Alert> relatedAlerts = alertRepository.findActiveAlertsByMetric(
event.getServiceName(),
event.getMetricName());
for (Alert alert : relatedAlerts) {
if (shouldAutoResolve(alert, event)) {
resolveAlert(alert, "Auto-resolved: metric returned to normal");
}
}
}
private boolean shouldAutoResolve(Alert alert, MetricImprovementEvent event) {
// Only auto-resolve if metric has been stable for sufficient time
Duration stableFor = Duration.between(event.getFirstImprovementTime(), Instant.now());
Duration requiredStability = getRequiredStabilityPeriod(alert.getSeverity());
return stableFor.compareTo(requiredStability) >= 0 &&
event.getCurrentValue() <= alert.getThresholdValue() * 0.8; // 20% buffer
}
private Duration getRequiredStabilityPeriod(AlertSeverity severity) {
return switch (severity) {
case INFO -> Duration.ofMinutes(2);
case WARNING -> Duration.ofMinutes(5);
case CRITICAL -> Duration.ofMinutes(10);
case EMERGENCY -> Duration.ofMinutes(15);
};
}
private void resolveAlert(Alert alert, String resolution) {
alert.setStatus(AlertStatus.RESOLVED);
alert.setResolutionTime(Instant.now());
alert.setResolutionNote(resolution);
alertRepository.save(alert);
// Notify stakeholders of resolution
notificationService.sendResolutionNotification(alert);
// Cancel any pending escalations
escalationManager.cancelEscalation(alert.getId());
}
}Diese Alerting-Strategien helfen Ihnen dabei, in Event-Driven Architekturen sowohl technische als auch Business-Probleme frühzeitig zu erkennen und angemessen darauf zu reagieren. Der Schlüssel liegt in der Balance zwischen Sensitivität und Spezifität der Alerts sowie in der Berücksichtigung der besonderen Charakteristika asynchroner Systeme.