Szenario: Ihr Test schlägt fehl, aber Sie wissen nicht warum. Ein Event ist verschwunden, ein Consumer reagiert nicht, ein Flow bricht mittendrin ab. Ohne die richtige Observability gleicht das Debugging einem Stochern im Nebel.
Testbare Event-Driven Architecture bedeutet mehr als nur funktionierende Tests - es bedeutet nachvollziehbare, debugbare und monitorierbare Systeme. Wie können wir unsere Services so gestalten, dass Tests nicht nur erfolgreich laufen, sondern auch aussagekräftige Informationen liefern, wenn etwas schiefgeht?
Observability in Event-Driven Architecture erfordert drei fundamentale Säulen: Logs, Metriken und Traces. Aber wie unterscheidet sich Observability in Tests von Production-Monitoring?
Test-spezifische Observability-Anforderungen:
| Aspekt | Production | Tests | Warum der Unterschied? |
|---|---|---|---|
| Granularität | Aggregiert | Detailliert | Tests brauchen exakte Event-Verfolgung |
| Retention | Langfristig | Session-basiert | Tests sind kurzlebig und isoliert |
| Performance | Optimiert | Vollständig | Tests priorisieren Nachvollziehbarkeit |
| Vertraulichkeit | Anonymisiert | Vollständig | Test-Daten sind meist synthetisch |
Jedes Event durchläuft einen Lifecycle. Wie können wir jeden Schritt nachvollziehen?
Comprehensive Event Logging:
@Component
public class EventLifecycleLogger {
private static final Logger log = LoggerFactory.getLogger(EventLifecycleLogger.class);
public void logEventProduction(String topic, Object event) {
MDC.put("phase", "PRODUCTION");
MDC.put("topic", topic);
MDC.put("eventType", event.getClass().getSimpleName());
if (event instanceof IdentifiableEvent) {
MDC.put("eventId", ((IdentifiableEvent) event).getEventId());
MDC.put("correlationId", ((IdentifiableEvent) event).getCorrelationId());
}
log.info("Event produced: {}", event);
MDC.clear();
}
public void logEventConsumption(String topic, Object event, String consumerGroup) {
MDC.put("phase", "CONSUMPTION");
MDC.put("topic", topic);
MDC.put("consumerGroup", consumerGroup);
MDC.put("eventType", event.getClass().getSimpleName());
log.info("Event consumed: {}", event);
MDC.clear();
}
public void logEventProcessingStart(Object event) {
MDC.put("phase", "PROCESSING_START");
log.info("Processing started for event: {}", event);
}
public void logEventProcessingComplete(Object event, Duration processingTime) {
MDC.put("phase", "PROCESSING_COMPLETE");
MDC.put("processingTimeMs", String.valueOf(processingTime.toMillis()));
log.info("Processing completed for event: {}", event);
MDC.clear();
}
}Überlegen Sie: Welche Informationen sind für Ihr Debugging am wichtigsten? Welche Logs würden Ihnen bei einem fehlgeschlagenen Test am meisten helfen?
Python Event Lifecycle Logging:
import logging
import contextvars
from datetime import datetime
from typing import Any, Dict
# Context variables für Trace-Informationen
correlation_id_var = contextvars.ContextVar('correlation_id')
event_phase_var = contextvars.ContextVar('event_phase')
class EventLifecycleLogger:
def __init__(self):
self.logger = logging.getLogger(__name__)
def log_event_production(self, topic: str, event: Any):
self._set_context("PRODUCTION", topic=topic, event_type=type(event).__name__)
if hasattr(event, 'event_id'):
correlation_id_var.set(event.correlation_id)
self.logger.info(f"Event produced: {event}")
def log_event_consumption(self, topic: str, event: Any, consumer_group: str):
self._set_context("CONSUMPTION", topic=topic, consumer_group=consumer_group)
self.logger.info(f"Event consumed: {event}")
def _set_context(self, phase: str, **kwargs):
event_phase_var.set(phase)
# Zusätzliche Context-Informationen für structured logging
extra_context = {"phase": phase, **kwargs}
return extra_contextWie können wir in Tests verifizieren, dass die richtigen Log-Einträge erstellt werden?
Log Assertion Framework:
@TestConfiguration
public class TestLoggingConfiguration {
@Bean
@Primary
public TestLogAppender testLogAppender() {
return new TestLogAppender();
}
}
public class TestLogAppender extends AppenderBase<ILoggingEvent> {
private final List<ILoggingEvent> events = Collections.synchronizedList(new ArrayList<>());
@Override
protected void append(ILoggingEvent event) {
events.add(event);
}
public LogAssertions assertThat() {
return new LogAssertions(new ArrayList<>(events));
}
public void clear() {
events.clear();
}
}
public class LogAssertions {
private final List<ILoggingEvent> logEvents;
public LogAssertions(List<ILoggingEvent> logEvents) {
this.logEvents = logEvents;
}
public LogAssertions hasEventProduction(String eventType, String topic) {
boolean found = logEvents.stream()
.anyMatch(event ->
event.getMessage().contains("Event produced") &&
event.getMDCPropertyMap().get("eventType").equals(eventType) &&
event.getMDCPropertyMap().get("topic").equals(topic)
);
assertThat(found).as("Expected event production log for %s on topic %s", eventType, topic).isTrue();
return this;
}
public LogAssertions hasNoErrors() {
List<ILoggingEvent> errors = logEvents.stream()
.filter(event -> event.getLevel().equals(Level.ERROR))
.collect(Collectors.toList());
assertThat(errors).as("Expected no error logs but found: %s", errors).isEmpty();
return this;
}
}Wie strukturieren wir Logs so, dass sie maschinenlesbar und für Tests verwertbar sind?
Structured Event Logging:
@Component
public class StructuredEventLogger {
private final ObjectMapper objectMapper;
public void logEventFlow(EventFlowStep step) {
Map<String, Object> logEntry = Map.of(
"@timestamp", Instant.now(),
"flow_id", step.getFlowId(),
"step_name", step.getStepName(),
"event_type", step.getEventType(),
"processing_time_ms", step.getProcessingTime().toMillis(),
"success", step.isSuccess(),
"metadata", step.getMetadata()
);
try {
String jsonLog = objectMapper.writeValueAsString(logEntry);
log.info(jsonLog);
} catch (Exception e) {
log.warn("Failed to serialize log entry", e);
}
}
}Fragen Sie sich: Wie würden Sie diese Logs in einem fehlgeschlagenen Test auswerten? Welche Tools würden Ihnen dabei helfen?
In Event-Driven Architecture können Events durch mehrere Services fließen. Wie verfolgen wir einen einzelnen Geschäftsprozess durch das gesamte System? Correlation IDs lösen dieses Problem.
Event mit Correlation Context:
public class OrderPlacedEvent implements CorrelatedEvent {
private String eventId;
private String correlationId;
private String causationId; // ID des Events, das dieses Event ausgelöst hat
private Instant timestamp;
private OrderData data;
// Getters, setters...
}
public interface CorrelatedEvent {
String getCorrelationId();
String getCausationId();
default String getEventId() {
return UUID.randomUUID().toString();
}
}Wie propagieren wir Correlation IDs durch unsere Services?
Spring Boot Correlation Context:
@Component
public class CorrelationContextManager {
private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
private static final ThreadLocal<String> correlationContext = new ThreadLocal<>();
public void setCorrelationId(String correlationId) {
correlationContext.set(correlationId);
MDC.put("correlationId", correlationId);
}
public String getCorrelationId() {
return correlationContext.get();
}
public void clear() {
correlationContext.remove();
MDC.remove("correlationId");
}
@EventListener
public void handleEvent(Object event) {
if (event instanceof CorrelatedEvent) {
setCorrelationId(((CorrelatedEvent) event).getCorrelationId());
}
}
}
@Component
public class CorrelatingEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final CorrelationContextManager contextManager;
public void publish(String topic, CorrelatedEvent event) {
// Correlation ID von aktuellem Context übernehmen oder neuen erstellen
String correlationId = Optional.ofNullable(contextManager.getCorrelationId())
.orElse(UUID.randomUUID().toString());
event.setCorrelationId(correlationId);
// Causation ID ist die ID des auslösenden Events
String causationId = contextManager.getCurrentEventId();
event.setCausationId(causationId);
kafkaTemplate.send(topic, event);
}
}Python Correlation Implementation:
import contextvars
from typing import Optional
import uuid
correlation_id_context = contextvars.ContextVar('correlation_id')
causation_id_context = contextvars.ContextVar('causation_id')
class CorrelationContextManager:
@staticmethod
def set_correlation_id(correlation_id: str):
correlation_id_context.set(correlation_id)
@staticmethod
def get_correlation_id() -> Optional[str]:
return correlation_id_context.get(None)
@staticmethod
def generate_correlation_id() -> str:
correlation_id = str(uuid.uuid4())
correlation_id_context.set(correlation_id)
return correlation_id
class CorrelatedEvent:
def __init__(self, **kwargs):
self.event_id = str(uuid.uuid4())
self.correlation_id = kwargs.get('correlation_id') or CorrelationContextManager.get_correlation_id()
self.causation_id = kwargs.get('causation_id') or causation_id_context.get(None)
def __post_init__(self):
if not self.correlation_id:
self.correlation_id = CorrelationContextManager.generate_correlation_id()Wie können wir Correlation IDs für bessere Test-Diagnostik verwenden?
Correlation-basierte Test-Assertions:
@Test
void shouldMaintainCorrelationThroughoutOrderFlow() throws Exception {
// Given
String testCorrelationId = "test-correlation-" + UUID.randomUUID();
CorrelationContextManager.setCorrelationId(testCorrelationId);
TestLogAppender logAppender = applicationContext.getBean(TestLogAppender.class);
logAppender.clear();
// When
CreateOrderRequest request = aValidCreateOrderRequest().build();
orderService.placeOrder(request);
// Wait for event processing
Thread.sleep(5000);
// Then - Alle Events sollten dieselbe Correlation ID haben
logAppender.assertThat()
.hasEventWithCorrelationId(testCorrelationId, "OrderPlaced")
.hasEventWithCorrelationId(testCorrelationId, "PaymentProcessed")
.hasEventWithCorrelationId(testCorrelationId, "InventoryReserved")
.hasEventWithCorrelationId(testCorrelationId, "OrderShipped");
}Überlegen Sie: Wie würden Sie eine Test-Situation debuggen, in der ein Event die falsche Correlation ID hat? Welche Informationen würden Sie benötigen?
Flow-spezifische Correlation Queries:
@Component
public class CorrelationTestHelper {
public List<LogEntry> getFlowEvents(String correlationId) {
return logRepository.findByCorrelationId(correlationId)
.stream()
.sorted(Comparator.comparing(LogEntry::getTimestamp))
.collect(Collectors.toList());
}
public FlowVisualization visualizeFlow(String correlationId) {
List<LogEntry> events = getFlowEvents(correlationId);
return FlowVisualization.builder()
.correlationId(correlationId)
.steps(events.stream()
.map(this::mapToFlowStep)
.collect(Collectors.toList()))
.duration(calculateFlowDuration(events))
.success(isFlowSuccessful(events))
.build();
}
}Wie können wir Test-Ausführung selbst monitoren? Welche Metriken helfen uns dabei, Test-Performance und -Zuverlässigkeit zu verbessern?
Test-spezifische Metriken:
@Component
public class TestMetricsCollector {
private final MeterRegistry meterRegistry;
public TestMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void recordTestExecution(String testName, Duration duration, boolean success) {
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(Timer.builder("test.execution.time")
.tag("test.name", testName)
.tag("test.result", success ? "success" : "failure")
.register(meterRegistry));
meterRegistry.counter("test.executions.total",
"test.name", testName,
"result", success ? "success" : "failure")
.increment();
}
public void recordEventFlowMetrics(String flowName, int eventsProcessed, Duration totalTime) {
meterRegistry.counter("test.events.processed", "flow", flowName)
.increment(eventsProcessed);
meterRegistry.timer("test.flow.duration", "flow", flowName)
.record(totalTime);
}
}Wie können wir Test-Ausführung in Echtzeit verfolgen?
Test Dashboard Integration:
@RestController
@RequestMapping("/test-monitoring")
public class TestMonitoringController {
private final TestMetricsCollector metricsCollector;
private final TestLogAppender logAppender;
@GetMapping("/metrics")
public TestMetrics getCurrentMetrics() {
return TestMetrics.builder()
.runningTests(getRunningTestCount())
.completedTests(getCompletedTestCount())
.failureRate(getFailureRate())
.averageExecutionTime(getAverageExecutionTime())
.build();
}
@GetMapping("/logs/{correlationId}")
public List<LogEntry> getCorrelatedLogs(@PathVariable String correlationId) {
return logAppender.getLogsByCorrelationId(correlationId);
}
@GetMapping("/flows/active")
public List<ActiveFlow> getActiveFlows() {
return flowTracker.getActiveFlows();
}
}Fragen Sie sich: Welche Informationen würden Sie in einem Test-Dashboard sehen wollen? Wie würden Sie erkennen, ob Ihre Tests zuverlässig laufen?
Wie können wir automatisch erkennen, wenn Tests instabil werden?
Test Health Metrics:
@Component
public class TestHealthMonitor {
private final MeterRegistry meterRegistry;
private final AlertingService alertingService;
@EventListener
public void monitorTestResult(TestExecutionResult result) {
updateTestHealthMetrics(result);
checkForTestDegradation(result);
}
private void updateTestHealthMetrics(TestExecutionResult result) {
String testName = result.getTestName();
// Flakiness Detection
double successRate = calculateRecentSuccessRate(testName);
meterRegistry.gauge("test.success.rate", Tags.of("test", testName), successRate);
// Performance Degradation
Duration avgExecutionTime = calculateAverageExecutionTime(testName);
meterRegistry.gauge("test.avg.execution.time", Tags.of("test", testName),
avgExecutionTime.toMillis());
}
private void checkForTestDegradation(TestExecutionResult result) {
String testName = result.getTestName();
// Alert bei häufigen Fehlschlägen
double recentFailureRate = calculateRecentFailureRate(testName, Duration.ofHours(24));
if (recentFailureRate > 0.3) { // 30% Failure Rate
alertingService.alert(
AlertLevel.WARNING,
String.format("Test %s has high failure rate: %.1f%%", testName, recentFailureRate * 100)
);
}
// Alert bei Performance-Degradation
Duration currentExecutionTime = result.getExecutionTime();
Duration historicalAverage = getHistoricalAverageExecutionTime(testName);
if (currentExecutionTime.toMillis() > historicalAverage.toMillis() * 2) {
alertingService.alert(
AlertLevel.WARNING,
String.format("Test %s execution time degraded: %dms vs %dms average",
testName, currentExecutionTime.toMillis(), historicalAverage.toMillis())
);
}
}
}Wie stellen wir sicher, dass unsere Test-Infrastruktur selbst gesund ist?
Infrastructure Health Monitoring:
@Component
public class TestInfrastructureHealthCheck {
@Autowired
private KafkaContainer kafkaContainer;
@Autowired
private TestcontainersHealthIndicator healthIndicator;
@Scheduled(fixedRate = 30000) // Jede 30 Sekunden
public void checkInfrastructureHealth() {
// Kafka Container Health
if (!kafkaContainer.isRunning()) {
log.error("Kafka container is not running - tests may fail");
meterRegistry.counter("test.infrastructure.failures", "component", "kafka").increment();
}
// Memory Usage
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
double memoryUsage = (double) usedMemory / maxMemory;
meterRegistry.gauge("test.infrastructure.memory.usage", memoryUsage);
if (memoryUsage > 0.9) {
log.warn("High memory usage detected: {}%", memoryUsage * 100);
}
// Container Resource Usage
checkContainerResources();
}
private void checkContainerResources() {
// Docker resource monitoring
// CPU, Memory, Disk usage der Testcontainers
}
}Denken Sie daran: Ein gut observierbares System ist ein testbares System. Observability sollte nicht nachträglich hinzugefügt werden, sondern von Anfang an mitentworfen werden. Correlation IDs, strukturierte Logs und umfassende Metriken machen den Unterschied zwischen frustrierendem Debugging und effizienter Problemlösung.
Welche Observability-Patterns würden in Ihrem konkreten Use Case den größten Nutzen bringen? Wie würden Sie beginnen, wenn Sie ein bestehendes System testbarer machen müssten?