61 Design für Testbarkeit: Logging, Tracing, Idempotenzprüfungen

Szenario: Ihr Test schlägt fehl, aber Sie wissen nicht warum. Ein Event ist verschwunden, ein Consumer reagiert nicht, ein Flow bricht mittendrin ab. Ohne die richtige Observability gleicht das Debugging einem Stochern im Nebel.

Testbare Event-Driven Architecture bedeutet mehr als nur funktionierende Tests - es bedeutet nachvollziehbare, debugbare und monitorierbare Systeme. Wie können wir unsere Services so gestalten, dass Tests nicht nur erfolgreich laufen, sondern auch aussagekräftige Informationen liefern, wenn etwas schiefgeht?

61.1 Observability in Tests

61.1.1 Was macht ein System observierbar?

Observability in Event-Driven Architecture erfordert drei fundamentale Säulen: Logs, Metriken und Traces. Aber wie unterscheidet sich Observability in Tests von Production-Monitoring?

Test-spezifische Observability-Anforderungen:

Aspekt Production Tests Warum der Unterschied?
Granularität Aggregiert Detailliert Tests brauchen exakte Event-Verfolgung
Retention Langfristig Session-basiert Tests sind kurzlebig und isoliert
Performance Optimiert Vollständig Tests priorisieren Nachvollziehbarkeit
Vertraulichkeit Anonymisiert Vollständig Test-Daten sind meist synthetisch

61.1.2 Event-Lifecycle-Logging

Jedes Event durchläuft einen Lifecycle. Wie können wir jeden Schritt nachvollziehen?

Comprehensive Event Logging:

@Component
public class EventLifecycleLogger {

    private static final Logger log = LoggerFactory.getLogger(EventLifecycleLogger.class);

    public void logEventProduction(String topic, Object event) {
        MDC.put("phase", "PRODUCTION");
        MDC.put("topic", topic);
        MDC.put("eventType", event.getClass().getSimpleName());

        if (event instanceof IdentifiableEvent) {
            MDC.put("eventId", ((IdentifiableEvent) event).getEventId());
            MDC.put("correlationId", ((IdentifiableEvent) event).getCorrelationId());
        }

        log.info("Event produced: {}", event);
        MDC.clear();
    }

    public void logEventConsumption(String topic, Object event, String consumerGroup) {
        MDC.put("phase", "CONSUMPTION");
        MDC.put("topic", topic);
        MDC.put("consumerGroup", consumerGroup);
        MDC.put("eventType", event.getClass().getSimpleName());

        log.info("Event consumed: {}", event);
        MDC.clear();
    }

    public void logEventProcessingStart(Object event) {
        MDC.put("phase", "PROCESSING_START");
        log.info("Processing started for event: {}", event);
    }

    public void logEventProcessingComplete(Object event, Duration processingTime) {
        MDC.put("phase", "PROCESSING_COMPLETE");
        MDC.put("processingTimeMs", String.valueOf(processingTime.toMillis()));
        log.info("Processing completed for event: {}", event);
        MDC.clear();
    }
}

Überlegen Sie: Welche Informationen sind für Ihr Debugging am wichtigsten? Welche Logs würden Ihnen bei einem fehlgeschlagenen Test am meisten helfen?

Python Event Lifecycle Logging:

import logging
import contextvars
from datetime import datetime
from typing import Any, Dict

# Context variables für Trace-Informationen
correlation_id_var = contextvars.ContextVar('correlation_id')
event_phase_var = contextvars.ContextVar('event_phase')

class EventLifecycleLogger:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        
    def log_event_production(self, topic: str, event: Any):
        self._set_context("PRODUCTION", topic=topic, event_type=type(event).__name__)
        
        if hasattr(event, 'event_id'):
            correlation_id_var.set(event.correlation_id)
        
        self.logger.info(f"Event produced: {event}")
        
    def log_event_consumption(self, topic: str, event: Any, consumer_group: str):
        self._set_context("CONSUMPTION", topic=topic, consumer_group=consumer_group)
        self.logger.info(f"Event consumed: {event}")
        
    def _set_context(self, phase: str, **kwargs):
        event_phase_var.set(phase)
        # Zusätzliche Context-Informationen für structured logging
        extra_context = {"phase": phase, **kwargs}
        return extra_context

61.1.3 Test-Assertions für Logs

Wie können wir in Tests verifizieren, dass die richtigen Log-Einträge erstellt werden?

Log Assertion Framework:

@TestConfiguration
public class TestLoggingConfiguration {
    
    @Bean
    @Primary
    public TestLogAppender testLogAppender() {
        return new TestLogAppender();
    }
}

public class TestLogAppender extends AppenderBase<ILoggingEvent> {
    
    private final List<ILoggingEvent> events = Collections.synchronizedList(new ArrayList<>());
    
    @Override
    protected void append(ILoggingEvent event) {
        events.add(event);
    }
    
    public LogAssertions assertThat() {
        return new LogAssertions(new ArrayList<>(events));
    }
    
    public void clear() {
        events.clear();
    }
}

public class LogAssertions {
    private final List<ILoggingEvent> logEvents;
    
    public LogAssertions(List<ILoggingEvent> logEvents) {
        this.logEvents = logEvents;
    }
    
    public LogAssertions hasEventProduction(String eventType, String topic) {
        boolean found = logEvents.stream()
            .anyMatch(event -> 
                event.getMessage().contains("Event produced") &&
                event.getMDCPropertyMap().get("eventType").equals(eventType) &&
                event.getMDCPropertyMap().get("topic").equals(topic)
            );
        
        assertThat(found).as("Expected event production log for %s on topic %s", eventType, topic).isTrue();
        return this;
    }
    
    public LogAssertions hasNoErrors() {
        List<ILoggingEvent> errors = logEvents.stream()
            .filter(event -> event.getLevel().equals(Level.ERROR))
            .collect(Collectors.toList());
            
        assertThat(errors).as("Expected no error logs but found: %s", errors).isEmpty();
        return this;
    }
}

61.1.4 Structured Logging für Event-Flows

Wie strukturieren wir Logs so, dass sie maschinenlesbar und für Tests verwertbar sind?

Structured Event Logging:

@Component
public class StructuredEventLogger {
    
    private final ObjectMapper objectMapper;
    
    public void logEventFlow(EventFlowStep step) {
        Map<String, Object> logEntry = Map.of(
            "@timestamp", Instant.now(),
            "flow_id", step.getFlowId(),
            "step_name", step.getStepName(),
            "event_type", step.getEventType(),
            "processing_time_ms", step.getProcessingTime().toMillis(),
            "success", step.isSuccess(),
            "metadata", step.getMetadata()
        );
        
        try {
            String jsonLog = objectMapper.writeValueAsString(logEntry);
            log.info(jsonLog);
        } catch (Exception e) {
            log.warn("Failed to serialize log entry", e);
        }
    }
}

Fragen Sie sich: Wie würden Sie diese Logs in einem fehlgeschlagenen Test auswerten? Welche Tools würden Ihnen dabei helfen?

61.2 Correlation IDs

61.2.1 Warum brauchen wir Correlation IDs?

In Event-Driven Architecture können Events durch mehrere Services fließen. Wie verfolgen wir einen einzelnen Geschäftsprozess durch das gesamte System? Correlation IDs lösen dieses Problem.

Event mit Correlation Context:

public class OrderPlacedEvent implements CorrelatedEvent {
    
    private String eventId;
    private String correlationId;
    private String causationId;  // ID des Events, das dieses Event ausgelöst hat
    private Instant timestamp;
    private OrderData data;
    
    // Getters, setters...
}

public interface CorrelatedEvent {
    String getCorrelationId();
    String getCausationId();
    
    default String getEventId() {
        return UUID.randomUUID().toString();
    }
}

61.2.2 Correlation Context Management

Wie propagieren wir Correlation IDs durch unsere Services?

Spring Boot Correlation Context:

@Component
public class CorrelationContextManager {
    
    private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
    private static final ThreadLocal<String> correlationContext = new ThreadLocal<>();
    
    public void setCorrelationId(String correlationId) {
        correlationContext.set(correlationId);
        MDC.put("correlationId", correlationId);
    }
    
    public String getCorrelationId() {
        return correlationContext.get();
    }
    
    public void clear() {
        correlationContext.remove();
        MDC.remove("correlationId");
    }
    
    @EventListener
    public void handleEvent(Object event) {
        if (event instanceof CorrelatedEvent) {
            setCorrelationId(((CorrelatedEvent) event).getCorrelationId());
        }
    }
}

@Component
public class CorrelatingEventPublisher {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final CorrelationContextManager contextManager;
    
    public void publish(String topic, CorrelatedEvent event) {
        // Correlation ID von aktuellem Context übernehmen oder neuen erstellen
        String correlationId = Optional.ofNullable(contextManager.getCorrelationId())
            .orElse(UUID.randomUUID().toString());
            
        event.setCorrelationId(correlationId);
        
        // Causation ID ist die ID des auslösenden Events
        String causationId = contextManager.getCurrentEventId();
        event.setCausationId(causationId);
        
        kafkaTemplate.send(topic, event);
    }
}

Python Correlation Implementation:

import contextvars
from typing import Optional
import uuid

correlation_id_context = contextvars.ContextVar('correlation_id')
causation_id_context = contextvars.ContextVar('causation_id')

class CorrelationContextManager:
    
    @staticmethod
    def set_correlation_id(correlation_id: str):
        correlation_id_context.set(correlation_id)
    
    @staticmethod 
    def get_correlation_id() -> Optional[str]:
        return correlation_id_context.get(None)
    
    @staticmethod
    def generate_correlation_id() -> str:
        correlation_id = str(uuid.uuid4())
        correlation_id_context.set(correlation_id)
        return correlation_id

class CorrelatedEvent:
    def __init__(self, **kwargs):
        self.event_id = str(uuid.uuid4())
        self.correlation_id = kwargs.get('correlation_id') or CorrelationContextManager.get_correlation_id()
        self.causation_id = kwargs.get('causation_id') or causation_id_context.get(None)
        
    def __post_init__(self):
        if not self.correlation_id:
            self.correlation_id = CorrelationContextManager.generate_correlation_id()

61.2.3 Correlation in Tests nutzen

Wie können wir Correlation IDs für bessere Test-Diagnostik verwenden?

Correlation-basierte Test-Assertions:

@Test
void shouldMaintainCorrelationThroughoutOrderFlow() throws Exception {
    // Given
    String testCorrelationId = "test-correlation-" + UUID.randomUUID();
    CorrelationContextManager.setCorrelationId(testCorrelationId);
    
    TestLogAppender logAppender = applicationContext.getBean(TestLogAppender.class);
    logAppender.clear();
    
    // When
    CreateOrderRequest request = aValidCreateOrderRequest().build();
    orderService.placeOrder(request);
    
    // Wait for event processing
    Thread.sleep(5000);
    
    // Then - Alle Events sollten dieselbe Correlation ID haben
    logAppender.assertThat()
        .hasEventWithCorrelationId(testCorrelationId, "OrderPlaced")
        .hasEventWithCorrelationId(testCorrelationId, "PaymentProcessed")
        .hasEventWithCorrelationId(testCorrelationId, "InventoryReserved")
        .hasEventWithCorrelationId(testCorrelationId, "OrderShipped");
}

Überlegen Sie: Wie würden Sie eine Test-Situation debuggen, in der ein Event die falsche Correlation ID hat? Welche Informationen würden Sie benötigen?

Flow-spezifische Correlation Queries:

@Component
public class CorrelationTestHelper {
    
    public List<LogEntry> getFlowEvents(String correlationId) {
        return logRepository.findByCorrelationId(correlationId)
            .stream()
            .sorted(Comparator.comparing(LogEntry::getTimestamp))
            .collect(Collectors.toList());
    }
    
    public FlowVisualization visualizeFlow(String correlationId) {
        List<LogEntry> events = getFlowEvents(correlationId);
        
        return FlowVisualization.builder()
            .correlationId(correlationId)
            .steps(events.stream()
                .map(this::mapToFlowStep)
                .collect(Collectors.toList()))
            .duration(calculateFlowDuration(events))
            .success(isFlowSuccessful(events))
            .build();
    }
}

61.3 Test Monitoring

61.3.1 Monitoring-Metriken für Tests

Wie können wir Test-Ausführung selbst monitoren? Welche Metriken helfen uns dabei, Test-Performance und -Zuverlässigkeit zu verbessern?

Test-spezifische Metriken:

@Component
public class TestMetricsCollector {
    
    private final MeterRegistry meterRegistry;
    
    public TestMetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
    
    public void recordTestExecution(String testName, Duration duration, boolean success) {
        Timer.Sample sample = Timer.start(meterRegistry);
        sample.stop(Timer.builder("test.execution.time")
            .tag("test.name", testName)
            .tag("test.result", success ? "success" : "failure")
            .register(meterRegistry));
            
        meterRegistry.counter("test.executions.total",
            "test.name", testName,
            "result", success ? "success" : "failure")
            .increment();
    }
    
    public void recordEventFlowMetrics(String flowName, int eventsProcessed, Duration totalTime) {
        meterRegistry.counter("test.events.processed", "flow", flowName)
            .increment(eventsProcessed);
            
        meterRegistry.timer("test.flow.duration", "flow", flowName)
            .record(totalTime);
    }
}

61.3.2 Real-time Test Monitoring Dashboard

Wie können wir Test-Ausführung in Echtzeit verfolgen?

Test Dashboard Integration:

@RestController
@RequestMapping("/test-monitoring")
public class TestMonitoringController {
    
    private final TestMetricsCollector metricsCollector;
    private final TestLogAppender logAppender;
    
    @GetMapping("/metrics")
    public TestMetrics getCurrentMetrics() {
        return TestMetrics.builder()
            .runningTests(getRunningTestCount())
            .completedTests(getCompletedTestCount())
            .failureRate(getFailureRate())
            .averageExecutionTime(getAverageExecutionTime())
            .build();
    }
    
    @GetMapping("/logs/{correlationId}")
    public List<LogEntry> getCorrelatedLogs(@PathVariable String correlationId) {
        return logAppender.getLogsByCorrelationId(correlationId);
    }
    
    @GetMapping("/flows/active")
    public List<ActiveFlow> getActiveFlows() {
        return flowTracker.getActiveFlows();
    }
}

Fragen Sie sich: Welche Informationen würden Sie in einem Test-Dashboard sehen wollen? Wie würden Sie erkennen, ob Ihre Tests zuverlässig laufen?

61.3.3 Automated Test Health Monitoring

Wie können wir automatisch erkennen, wenn Tests instabil werden?

Test Health Metrics:

@Component
public class TestHealthMonitor {
    
    private final MeterRegistry meterRegistry;
    private final AlertingService alertingService;
    
    @EventListener
    public void monitorTestResult(TestExecutionResult result) {
        updateTestHealthMetrics(result);
        checkForTestDegradation(result);
    }
    
    private void updateTestHealthMetrics(TestExecutionResult result) {
        String testName = result.getTestName();
        
        // Flakiness Detection
        double successRate = calculateRecentSuccessRate(testName);
        meterRegistry.gauge("test.success.rate", Tags.of("test", testName), successRate);
        
        // Performance Degradation
        Duration avgExecutionTime = calculateAverageExecutionTime(testName);
        meterRegistry.gauge("test.avg.execution.time", Tags.of("test", testName), 
            avgExecutionTime.toMillis());
    }
    
    private void checkForTestDegradation(TestExecutionResult result) {
        String testName = result.getTestName();
        
        // Alert bei häufigen Fehlschlägen
        double recentFailureRate = calculateRecentFailureRate(testName, Duration.ofHours(24));
        if (recentFailureRate > 0.3) { // 30% Failure Rate
            alertingService.alert(
                AlertLevel.WARNING,
                String.format("Test %s has high failure rate: %.1f%%", testName, recentFailureRate * 100)
            );
        }
        
        // Alert bei Performance-Degradation
        Duration currentExecutionTime = result.getExecutionTime();
        Duration historicalAverage = getHistoricalAverageExecutionTime(testName);
        
        if (currentExecutionTime.toMillis() > historicalAverage.toMillis() * 2) {
            alertingService.alert(
                AlertLevel.WARNING,
                String.format("Test %s execution time degraded: %dms vs %dms average", 
                    testName, currentExecutionTime.toMillis(), historicalAverage.toMillis())
            );
        }
    }
}

61.3.4 Test Environment Health Checks

Wie stellen wir sicher, dass unsere Test-Infrastruktur selbst gesund ist?

Infrastructure Health Monitoring:

@Component
public class TestInfrastructureHealthCheck {
    
    @Autowired
    private KafkaContainer kafkaContainer;
    
    @Autowired
    private TestcontainersHealthIndicator healthIndicator;
    
    @Scheduled(fixedRate = 30000) // Jede 30 Sekunden
    public void checkInfrastructureHealth() {
        
        // Kafka Container Health
        if (!kafkaContainer.isRunning()) {
            log.error("Kafka container is not running - tests may fail");
            meterRegistry.counter("test.infrastructure.failures", "component", "kafka").increment();
        }
        
        // Memory Usage
        Runtime runtime = Runtime.getRuntime();
        long usedMemory = runtime.totalMemory() - runtime.freeMemory();
        long maxMemory = runtime.maxMemory();
        double memoryUsage = (double) usedMemory / maxMemory;
        
        meterRegistry.gauge("test.infrastructure.memory.usage", memoryUsage);
        
        if (memoryUsage > 0.9) {
            log.warn("High memory usage detected: {}%", memoryUsage * 100);
        }
        
        // Container Resource Usage
        checkContainerResources();
    }
    
    private void checkContainerResources() {
        // Docker resource monitoring
        // CPU, Memory, Disk usage der Testcontainers
    }
}

Denken Sie daran: Ein gut observierbares System ist ein testbares System. Observability sollte nicht nachträglich hinzugefügt werden, sondern von Anfang an mitentworfen werden. Correlation IDs, strukturierte Logs und umfassende Metriken machen den Unterschied zwischen frustrierendem Debugging und effizienter Problemlösung.

Welche Observability-Patterns würden in Ihrem konkreten Use Case den größten Nutzen bringen? Wie würden Sie beginnen, wenn Sie ein bestehendes System testbarer machen müssten?