60 Simulation von Event-Flows

Angenommen ein Kunde bestellt online, das System verarbeitet die Zahlung, reserviert Lagerbestände und löst den Versand aus. Wie testen Sie diesen komplexen Event-Flow end-to-end? Wie stellen Sie sicher, dass alle Services korrekt zusammenarbeiten, wenn Events durch das System fließen?

Event-Flow-Simulation beantwortet diese Fragen durch systematisches Testen kompletter Geschäftsprozesse in Event-Driven Architecture. Dabei geht es um mehr als einzelne Service-Tests - wir testen das emergente Verhalten des gesamten Systems.

60.1 Event Flow Testing

60.1.1 Was ist ein Event Flow?

Ein Event Flow beschreibt die Kette von Events, die durch einen Geschäftsprozess ausgelöst wird. Denken Sie an unseren E-Commerce-Flow:

CustomerAction → OrderPlaced → PaymentProcessed → InventoryReserved → OrderShipped → CustomerNotified

Aber welche Fragen stellt sich ein Entwickler beim Testen dieses Flows?

60.1.2 Flow-Definition für Tests

Bevor wir testen können, müssen wir Flows explizit definieren:

Deklarative Flow-Definition:

@Component
public class OrderProcessingFlow {
    
    public FlowDefinition defineCompleteOrderFlow() {
        return FlowDefinition.builder()
            .name("Complete Order Processing")
            .startEvent(OrderPlacedEvent.class)
            .expectedEvents(List.of(
                PaymentProcessedEvent.class,
                InventoryReservedEvent.class, 
                OrderShippedEvent.class,
                CustomerNotifiedEvent.class
            ))
            .maxDuration(Duration.ofMinutes(5))
            .build();
    }
}

Python Flow Definition:

@dataclass
class EventFlowDefinition:
    name: str
    start_event: Type
    expected_events: List[Type]
    max_duration: timedelta
    
class OrderProcessingFlow:
    def define_complete_order_flow(self) -> EventFlowDefinition:
        return EventFlowDefinition(
            name="Complete Order Processing",
            start_event=OrderPlacedEvent,
            expected_events=[
                PaymentProcessedEvent,
                InventoryReservedEvent,
                OrderShippedEvent,
                CustomerNotifiedEvent
            ],
            max_duration=timedelta(minutes=5)
        )

60.1.3 Flow Execution Testing

Wie können wir nun verifizieren, dass ein Flow korrekt ausgeführt wird?

Flow Test Implementation:

@Test
void shouldExecuteCompleteOrderFlowSuccessfully() throws Exception {
    // Given - Was ist unser Startzustand?
    FlowDefinition flow = orderProcessingFlow.defineCompleteOrderFlow();
    FlowAssertion flowAssertion = new FlowAssertion(flow);
    
    String customerId = "customer-123";
    String orderId = "order-456";
    
    // When - Wie lösen wir den Flow aus?
    OrderPlacedEvent startEvent = OrderPlacedEvent.builder()
        .orderId(orderId)
        .customerId(customerId)
        .totalAmount(BigDecimal.valueOf(99.99))
        .build();
        
    kafkaTemplate.send("order.placed.v1", startEvent);
    
    // Then - Welche Erwartungen haben wir?
    flowAssertion.waitForCompletion(Duration.ofMinutes(5))
        .assertAllEventsReceived()
        .assertEventOrder()
        .assertNoUnexpectedEvents();
}

Welche Herausforderungen sehen Sie in diesem Ansatz? Wie würden Sie mit asynchroner Verarbeitung und unvorhersagbaren Timing-Aspekten umgehen?

60.1.4 Flow Assertion Framework

Ein robustes Framework für Flow-Testing benötigt intelligente Assertions:

Event Flow Collector:

@Component
public class EventFlowCollector {
    
    private final Map<String, List<Object>> flowEvents = new ConcurrentHashMap<>();
    
    @EventListener
    public void collectEvent(Object event) {
        String flowId = extractFlowId(event);
        flowEvents.computeIfAbsent(flowId, k -> new ArrayList<>()).add(event);
    }
    
    public FlowExecution getFlowExecution(String flowId) {
        List<Object> events = flowEvents.getOrDefault(flowId, List.of());
        return new FlowExecution(flowId, events);
    }
    
    private String extractFlowId(Object event) {
        // Wie identifizieren wir zusammengehörige Events?
        // Correlation ID, Order ID, Customer ID?
        if (event instanceof OrderPlacedEvent) {
            return ((OrderPlacedEvent) event).getOrderId();
        }
        // ... weitere Event-Typen
        return "unknown";
    }
}

Überlegen Sie: Welche Informationen benötigen wir, um Events einem Flow zuzuordnen? Wie können wir sicherstellen, dass wir keine Events verpassen?

60.2 End-to-end Scenarios

60.2.1 Realistische Geschäftsszenarien

End-to-End Tests simulieren echte Benutzerinteraktionen. Aber welche Szenarien sind wirklich repräsentativ?

Scenario-Based Test Design:

Szenario Besonderheit Erwarteter Flow Fehlerfall
Standard-Bestellung Alles verfügbar Vollständiger Flow -
Lagerengpass Produkt nicht verfügbar Stopp bei Inventory InventoryUnavailableEvent
Zahlungsfehler Kreditkarte abgelehnt Stopp bei Payment PaymentFailedEvent
Teilversand Gemischte Verfügbarkeit Mehrfacher Versand Multiple ShippedEvents

Complex Scenario Test:

@Test
void shouldHandlePartialInventoryScenario() throws Exception {
    // Given - Wie modellieren wir einen komplexen Geschäftsfall?
    String orderId = "order-mixed-inventory";
    
    // Setup: Nur eines von zwei Produkten verfügbar
    inventoryService.setAvailability("product-1", 5);  // Verfügbar
    inventoryService.setAvailability("product-2", 0);  // Nicht verfügbar
    
    OrderPlacedEvent orderEvent = OrderPlacedEvent.builder()
        .orderId(orderId)
        .items(List.of(
            new OrderItem("product-1", 2),
            new OrderItem("product-2", 1)  // Wird fehlschlagen
        ))
        .build();
    
    FlowAssertion flowAssertion = new FlowAssertion(
        "Partial Inventory Flow", 
        Duration.ofMinutes(3)
    );
    
    // When
    kafkaTemplate.send("order.placed.v1", orderEvent);
    
    // Then - Was erwarten wir bei Teilausführung?
    flowAssertion.waitForCompletion()
        .assertEventReceived(PaymentProcessedEvent.class)
        .assertEventReceived(InventoryPartiallyReservedEvent.class)
        .assertEventReceived(BackorderCreatedEvent.class)
        .assertEventNotReceived(OrderShippedEvent.class);  // Kein Versand bei Teilbestand
}

60.2.2 Chaos Engineering für Event Flows

Wie verhält sich unser System, wenn Dinge schiefgehen? Chaos Engineering simuliert systematisch Ausfälle:

Failure Injection Testing:

@Test
void shouldRecoverFromTemporaryPaymentServiceOutage() throws Exception {
    // Given
    String orderId = "order-chaos-test";
    OrderPlacedEvent orderEvent = createOrderEvent(orderId);
    
    FlowAssertion flowAssertion = new FlowAssertion(
        "Recovery Flow",
        Duration.ofMinutes(10)  // Längere Timeout für Recovery
    );
    
    // When - Simuliere Service-Ausfall
    paymentServiceContainer.stop();  // Payment Service offline
    
    kafkaTemplate.send("order.placed.v1", orderEvent);
    Thread.sleep(30000);  // 30 Sekunden warten
    
    paymentServiceContainer.start();  // Service wieder online
    
    // Then - System sollte sich erholen
    flowAssertion.waitForCompletion()
        .assertEventuallyReceived(PaymentProcessedEvent.class)
        .assertRecoveryWithin(Duration.ofMinutes(2));
}

Welche Ausfallszenarien sind in Ihrem System besonders kritisch? Wie können Sie diese systematisch testen?

60.2.3 Multi-Tenant Flow Testing

In Multi-Tenant-Systemen müssen Flows zwischen Mandanten isoliert werden:

Tenant Isolation Test:

@Test
void shouldIsolateFlowsBetweenTenants() throws Exception {
    // Given - Zwei unterschiedliche Mandanten
    String tenant1OrderId = "tenant1-order-123";
    String tenant2OrderId = "tenant2-order-456";
    
    // When - Gleichzeitige Bestellungen
    OrderPlacedEvent tenant1Order = createOrderEvent(tenant1OrderId, "tenant-1");
    OrderPlacedEvent tenant2Order = createOrderEvent(tenant2OrderId, "tenant-2");
    
    kafkaTemplate.send("order.placed.v1", tenant1Order);
    kafkaTemplate.send("order.placed.v1", tenant2Order);
    
    // Then - Flows sollten unabhängig verlaufen
    FlowAssertion tenant1Flow = new FlowAssertion("tenant-1", tenant1OrderId);
    FlowAssertion tenant2Flow = new FlowAssertion("tenant-2", tenant2OrderId);
    
    tenant1Flow.waitForCompletion().assertNoEventLeakage(tenant2OrderId);
    tenant2Flow.waitForCompletion().assertNoEventLeakage(tenant1OrderId);
}

60.3 Performance Testing

60.3.1 Load Testing für Event Flows

Wie viele parallele Event-Flows kann unser System verarbeiten? Performance-Tests geben Antworten:

Flow Throughput Test:

@Test
void shouldHandleHighThroughputEventFlows() throws Exception {
    // Given - Wie viele Flows können wir parallel ausführen?
    int numberOfFlows = 1000;
    int expectedThroughputPerSecond = 100;
    
    CountDownLatch completedFlows = new CountDownLatch(numberOfFlows);
    ConcurrentHashMap<String, Long> flowCompletionTimes = new ConcurrentHashMap<>();
    
    @EventListener
    void trackFlowCompletion(CustomerNotifiedEvent event) {
        flowCompletionTimes.put(event.getOrderId(), System.currentTimeMillis());
        completedFlows.countDown();
    }
    
    // When - Massive parallele Flow-Auslösung
    long startTime = System.currentTimeMillis();
    
    for (int i = 0; i < numberOfFlows; i++) {
        OrderPlacedEvent orderEvent = createOrderEvent("order-" + i);
        kafkaTemplate.send("order.placed.v1", orderEvent);
    }
    
    // Then - Performance-Assertions
    boolean allCompleted = completedFlows.await(60, TimeUnit.SECONDS);
    long endTime = System.currentTimeMillis();
    long totalDuration = endTime - startTime;
    
    assertThat(allCompleted).isTrue();
    assertThat(flowCompletionTimes.size()).isEqualTo(numberOfFlows);
    
    double throughput = (numberOfFlows * 1000.0) / totalDuration;
    assertThat(throughput).isGreaterThan(expectedThroughputPerSecond);
}

60.3.2 Latency Testing

Wie schnell durchlaufen Events unsere Flows? Latency-Messungen zeigen Bottlenecks auf:

End-to-End Latency Measurement:

@Component
public class FlowLatencyTracker {
    
    private final Map<String, Long> flowStartTimes = new ConcurrentHashMap<>();
    private final List<LatencyMeasurement> measurements = Collections.synchronizedList(new ArrayList<>());
    
    @EventListener
    public void trackFlowStart(OrderPlacedEvent event) {
        flowStartTimes.put(event.getOrderId(), System.nanoTime());
    }
    
    @EventListener  
    public void trackFlowEnd(CustomerNotifiedEvent event) {
        Long startTime = flowStartTimes.remove(event.getOrderId());
        if (startTime != null) {
            long latency = System.nanoTime() - startTime;
            measurements.add(new LatencyMeasurement(event.getOrderId(), latency));
        }
    }
    
    public LatencyStatistics getStatistics() {
        // Welche Metriken sind für uns wichtig?
        // P50, P95, P99 Latency? Average? Maximum?
        return LatencyStatistics.from(measurements);
    }
}

Latency Assertion:

@Test
void shouldMeetLatencyRequirements() throws Exception {
    // Given
    LatencyRequirement requirement = LatencyRequirement.builder()
        .p50(Duration.ofSeconds(2))
        .p95(Duration.ofSeconds(5)) 
        .p99(Duration.ofSeconds(10))
        .build();
    
    // When - Standard Load
    for (int i = 0; i < 100; i++) {
        OrderPlacedEvent event = createOrderEvent("latency-test-" + i);
        kafkaTemplate.send("order.placed.v1", event);
    }
    
    // Wait for completion
    Thread.sleep(30000);
    
    // Then
    LatencyStatistics stats = latencyTracker.getStatistics();
    assertThat(stats.getP50()).isLessThan(requirement.getP50());
    assertThat(stats.getP95()).isLessThan(requirement.getP95());
    assertThat(stats.getP99()).isLessThan(requirement.getP99());
}

60.3.3 Memory und Resource Testing

Event-Flows können Memory-Leaks oder Resource-Erschöpfung verursachen:

Resource Consumption Test:

@Test
void shouldNotLeakResourcesDuringLongRunningFlows() throws Exception {
    // Given - Baseline Memory
    Runtime runtime = Runtime.getRuntime();
    runtime.gc(); // Force GC for baseline
    long baselineMemory = runtime.totalMemory() - runtime.freeMemory();
    
    // When - Extended Flow Processing
    for (int batch = 0; batch < 10; batch++) {
        for (int i = 0; i < 100; i++) {
            OrderPlacedEvent event = createOrderEvent("batch-" + batch + "-order-" + i);
            kafkaTemplate.send("order.placed.v1", event);
        }
        
        // Wait for batch completion
        Thread.sleep(10000);
        
        // Check memory growth
        runtime.gc();
        long currentMemory = runtime.totalMemory() - runtime.freeMemory();
        long memoryGrowth = currentMemory - baselineMemory;
        
        // Memory shouldn't grow unbounded
        assertThat(memoryGrowth).isLessThan(100_000_000); // 100MB limit
    }
}

Welche Kennzahlen sind für Ihr System besonders wichtig? Wie würden Sie feststellen, ob Ihr System produktionsreif ist?

60.3.4 Monitoring Integration in Tests

Performance-Tests sollten auch Monitoring-Metriken validieren:

Metrics Validation:

@Test
void shouldProduceCorrectMetricsDuringFlowExecution() throws Exception {
    // Given
    MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
    
    // When
    for (int i = 0; i < 50; i++) {
        OrderPlacedEvent event = createOrderEvent("metrics-test-" + i);
        kafkaTemplate.send("order.placed.v1", event);
    }
    
    // Wait for processing
    Thread.sleep(30000);
    
    // Then - Validate business metrics
    Counter orderCounter = meterRegistry.counter("orders.placed");
    Counter paymentCounter = meterRegistry.counter("payments.processed");
    Timer flowTimer = meterRegistry.timer("flow.order.completion");
    
    assertThat(orderCounter.count()).isEqualTo(50);
    assertThat(paymentCounter.count()).isEqualTo(50);
    assertThat(flowTimer.count()).isEqualTo(50);
    assertThat(flowTimer.mean(TimeUnit.SECONDS)).isLessThan(5.0);
}

Event-Flow-Simulation verbindet technische Tests mit fachlichen Anforderungen. Sie zeigt nicht nur, ob einzelne Services funktionieren, sondern ob das Gesamtsystem die Geschäftsziele erfüllt. Der Schlüssel liegt darin, realistische Szenarien zu definieren, systematisch zu testen und dabei sowohl Happy Path als auch Fehlerfälle abzudecken.