Angenommen ein Kunde bestellt online, das System verarbeitet die Zahlung, reserviert Lagerbestände und löst den Versand aus. Wie testen Sie diesen komplexen Event-Flow end-to-end? Wie stellen Sie sicher, dass alle Services korrekt zusammenarbeiten, wenn Events durch das System fließen?
Event-Flow-Simulation beantwortet diese Fragen durch systematisches Testen kompletter Geschäftsprozesse in Event-Driven Architecture. Dabei geht es um mehr als einzelne Service-Tests - wir testen das emergente Verhalten des gesamten Systems.
Ein Event Flow beschreibt die Kette von Events, die durch einen Geschäftsprozess ausgelöst wird. Denken Sie an unseren E-Commerce-Flow:
CustomerAction → OrderPlaced → PaymentProcessed → InventoryReserved → OrderShipped → CustomerNotified
Aber welche Fragen stellt sich ein Entwickler beim Testen dieses Flows?
Bevor wir testen können, müssen wir Flows explizit definieren:
Deklarative Flow-Definition:
@Component
public class OrderProcessingFlow {
public FlowDefinition defineCompleteOrderFlow() {
return FlowDefinition.builder()
.name("Complete Order Processing")
.startEvent(OrderPlacedEvent.class)
.expectedEvents(List.of(
PaymentProcessedEvent.class,
InventoryReservedEvent.class,
OrderShippedEvent.class,
CustomerNotifiedEvent.class
))
.maxDuration(Duration.ofMinutes(5))
.build();
}
}Python Flow Definition:
@dataclass
class EventFlowDefinition:
name: str
start_event: Type
expected_events: List[Type]
max_duration: timedelta
class OrderProcessingFlow:
def define_complete_order_flow(self) -> EventFlowDefinition:
return EventFlowDefinition(
name="Complete Order Processing",
start_event=OrderPlacedEvent,
expected_events=[
PaymentProcessedEvent,
InventoryReservedEvent,
OrderShippedEvent,
CustomerNotifiedEvent
],
max_duration=timedelta(minutes=5)
)Wie können wir nun verifizieren, dass ein Flow korrekt ausgeführt wird?
Flow Test Implementation:
@Test
void shouldExecuteCompleteOrderFlowSuccessfully() throws Exception {
// Given - Was ist unser Startzustand?
FlowDefinition flow = orderProcessingFlow.defineCompleteOrderFlow();
FlowAssertion flowAssertion = new FlowAssertion(flow);
String customerId = "customer-123";
String orderId = "order-456";
// When - Wie lösen wir den Flow aus?
OrderPlacedEvent startEvent = OrderPlacedEvent.builder()
.orderId(orderId)
.customerId(customerId)
.totalAmount(BigDecimal.valueOf(99.99))
.build();
kafkaTemplate.send("order.placed.v1", startEvent);
// Then - Welche Erwartungen haben wir?
flowAssertion.waitForCompletion(Duration.ofMinutes(5))
.assertAllEventsReceived()
.assertEventOrder()
.assertNoUnexpectedEvents();
}Welche Herausforderungen sehen Sie in diesem Ansatz? Wie würden Sie mit asynchroner Verarbeitung und unvorhersagbaren Timing-Aspekten umgehen?
Ein robustes Framework für Flow-Testing benötigt intelligente Assertions:
Event Flow Collector:
@Component
public class EventFlowCollector {
private final Map<String, List<Object>> flowEvents = new ConcurrentHashMap<>();
@EventListener
public void collectEvent(Object event) {
String flowId = extractFlowId(event);
flowEvents.computeIfAbsent(flowId, k -> new ArrayList<>()).add(event);
}
public FlowExecution getFlowExecution(String flowId) {
List<Object> events = flowEvents.getOrDefault(flowId, List.of());
return new FlowExecution(flowId, events);
}
private String extractFlowId(Object event) {
// Wie identifizieren wir zusammengehörige Events?
// Correlation ID, Order ID, Customer ID?
if (event instanceof OrderPlacedEvent) {
return ((OrderPlacedEvent) event).getOrderId();
}
// ... weitere Event-Typen
return "unknown";
}
}Überlegen Sie: Welche Informationen benötigen wir, um Events einem Flow zuzuordnen? Wie können wir sicherstellen, dass wir keine Events verpassen?
End-to-End Tests simulieren echte Benutzerinteraktionen. Aber welche Szenarien sind wirklich repräsentativ?
Scenario-Based Test Design:
| Szenario | Besonderheit | Erwarteter Flow | Fehlerfall |
|---|---|---|---|
| Standard-Bestellung | Alles verfügbar | Vollständiger Flow | - |
| Lagerengpass | Produkt nicht verfügbar | Stopp bei Inventory | InventoryUnavailableEvent |
| Zahlungsfehler | Kreditkarte abgelehnt | Stopp bei Payment | PaymentFailedEvent |
| Teilversand | Gemischte Verfügbarkeit | Mehrfacher Versand | Multiple ShippedEvents |
Complex Scenario Test:
@Test
void shouldHandlePartialInventoryScenario() throws Exception {
// Given - Wie modellieren wir einen komplexen Geschäftsfall?
String orderId = "order-mixed-inventory";
// Setup: Nur eines von zwei Produkten verfügbar
inventoryService.setAvailability("product-1", 5); // Verfügbar
inventoryService.setAvailability("product-2", 0); // Nicht verfügbar
OrderPlacedEvent orderEvent = OrderPlacedEvent.builder()
.orderId(orderId)
.items(List.of(
new OrderItem("product-1", 2),
new OrderItem("product-2", 1) // Wird fehlschlagen
))
.build();
FlowAssertion flowAssertion = new FlowAssertion(
"Partial Inventory Flow",
Duration.ofMinutes(3)
);
// When
kafkaTemplate.send("order.placed.v1", orderEvent);
// Then - Was erwarten wir bei Teilausführung?
flowAssertion.waitForCompletion()
.assertEventReceived(PaymentProcessedEvent.class)
.assertEventReceived(InventoryPartiallyReservedEvent.class)
.assertEventReceived(BackorderCreatedEvent.class)
.assertEventNotReceived(OrderShippedEvent.class); // Kein Versand bei Teilbestand
}Wie verhält sich unser System, wenn Dinge schiefgehen? Chaos Engineering simuliert systematisch Ausfälle:
Failure Injection Testing:
@Test
void shouldRecoverFromTemporaryPaymentServiceOutage() throws Exception {
// Given
String orderId = "order-chaos-test";
OrderPlacedEvent orderEvent = createOrderEvent(orderId);
FlowAssertion flowAssertion = new FlowAssertion(
"Recovery Flow",
Duration.ofMinutes(10) // Längere Timeout für Recovery
);
// When - Simuliere Service-Ausfall
paymentServiceContainer.stop(); // Payment Service offline
kafkaTemplate.send("order.placed.v1", orderEvent);
Thread.sleep(30000); // 30 Sekunden warten
paymentServiceContainer.start(); // Service wieder online
// Then - System sollte sich erholen
flowAssertion.waitForCompletion()
.assertEventuallyReceived(PaymentProcessedEvent.class)
.assertRecoveryWithin(Duration.ofMinutes(2));
}Welche Ausfallszenarien sind in Ihrem System besonders kritisch? Wie können Sie diese systematisch testen?
In Multi-Tenant-Systemen müssen Flows zwischen Mandanten isoliert werden:
Tenant Isolation Test:
@Test
void shouldIsolateFlowsBetweenTenants() throws Exception {
// Given - Zwei unterschiedliche Mandanten
String tenant1OrderId = "tenant1-order-123";
String tenant2OrderId = "tenant2-order-456";
// When - Gleichzeitige Bestellungen
OrderPlacedEvent tenant1Order = createOrderEvent(tenant1OrderId, "tenant-1");
OrderPlacedEvent tenant2Order = createOrderEvent(tenant2OrderId, "tenant-2");
kafkaTemplate.send("order.placed.v1", tenant1Order);
kafkaTemplate.send("order.placed.v1", tenant2Order);
// Then - Flows sollten unabhängig verlaufen
FlowAssertion tenant1Flow = new FlowAssertion("tenant-1", tenant1OrderId);
FlowAssertion tenant2Flow = new FlowAssertion("tenant-2", tenant2OrderId);
tenant1Flow.waitForCompletion().assertNoEventLeakage(tenant2OrderId);
tenant2Flow.waitForCompletion().assertNoEventLeakage(tenant1OrderId);
}Wie viele parallele Event-Flows kann unser System verarbeiten? Performance-Tests geben Antworten:
Flow Throughput Test:
@Test
void shouldHandleHighThroughputEventFlows() throws Exception {
// Given - Wie viele Flows können wir parallel ausführen?
int numberOfFlows = 1000;
int expectedThroughputPerSecond = 100;
CountDownLatch completedFlows = new CountDownLatch(numberOfFlows);
ConcurrentHashMap<String, Long> flowCompletionTimes = new ConcurrentHashMap<>();
@EventListener
void trackFlowCompletion(CustomerNotifiedEvent event) {
flowCompletionTimes.put(event.getOrderId(), System.currentTimeMillis());
completedFlows.countDown();
}
// When - Massive parallele Flow-Auslösung
long startTime = System.currentTimeMillis();
for (int i = 0; i < numberOfFlows; i++) {
OrderPlacedEvent orderEvent = createOrderEvent("order-" + i);
kafkaTemplate.send("order.placed.v1", orderEvent);
}
// Then - Performance-Assertions
boolean allCompleted = completedFlows.await(60, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long totalDuration = endTime - startTime;
assertThat(allCompleted).isTrue();
assertThat(flowCompletionTimes.size()).isEqualTo(numberOfFlows);
double throughput = (numberOfFlows * 1000.0) / totalDuration;
assertThat(throughput).isGreaterThan(expectedThroughputPerSecond);
}Wie schnell durchlaufen Events unsere Flows? Latency-Messungen zeigen Bottlenecks auf:
End-to-End Latency Measurement:
@Component
public class FlowLatencyTracker {
private final Map<String, Long> flowStartTimes = new ConcurrentHashMap<>();
private final List<LatencyMeasurement> measurements = Collections.synchronizedList(new ArrayList<>());
@EventListener
public void trackFlowStart(OrderPlacedEvent event) {
flowStartTimes.put(event.getOrderId(), System.nanoTime());
}
@EventListener
public void trackFlowEnd(CustomerNotifiedEvent event) {
Long startTime = flowStartTimes.remove(event.getOrderId());
if (startTime != null) {
long latency = System.nanoTime() - startTime;
measurements.add(new LatencyMeasurement(event.getOrderId(), latency));
}
}
public LatencyStatistics getStatistics() {
// Welche Metriken sind für uns wichtig?
// P50, P95, P99 Latency? Average? Maximum?
return LatencyStatistics.from(measurements);
}
}Latency Assertion:
@Test
void shouldMeetLatencyRequirements() throws Exception {
// Given
LatencyRequirement requirement = LatencyRequirement.builder()
.p50(Duration.ofSeconds(2))
.p95(Duration.ofSeconds(5))
.p99(Duration.ofSeconds(10))
.build();
// When - Standard Load
for (int i = 0; i < 100; i++) {
OrderPlacedEvent event = createOrderEvent("latency-test-" + i);
kafkaTemplate.send("order.placed.v1", event);
}
// Wait for completion
Thread.sleep(30000);
// Then
LatencyStatistics stats = latencyTracker.getStatistics();
assertThat(stats.getP50()).isLessThan(requirement.getP50());
assertThat(stats.getP95()).isLessThan(requirement.getP95());
assertThat(stats.getP99()).isLessThan(requirement.getP99());
}Event-Flows können Memory-Leaks oder Resource-Erschöpfung verursachen:
Resource Consumption Test:
@Test
void shouldNotLeakResourcesDuringLongRunningFlows() throws Exception {
// Given - Baseline Memory
Runtime runtime = Runtime.getRuntime();
runtime.gc(); // Force GC for baseline
long baselineMemory = runtime.totalMemory() - runtime.freeMemory();
// When - Extended Flow Processing
for (int batch = 0; batch < 10; batch++) {
for (int i = 0; i < 100; i++) {
OrderPlacedEvent event = createOrderEvent("batch-" + batch + "-order-" + i);
kafkaTemplate.send("order.placed.v1", event);
}
// Wait for batch completion
Thread.sleep(10000);
// Check memory growth
runtime.gc();
long currentMemory = runtime.totalMemory() - runtime.freeMemory();
long memoryGrowth = currentMemory - baselineMemory;
// Memory shouldn't grow unbounded
assertThat(memoryGrowth).isLessThan(100_000_000); // 100MB limit
}
}Welche Kennzahlen sind für Ihr System besonders wichtig? Wie würden Sie feststellen, ob Ihr System produktionsreif ist?
Performance-Tests sollten auch Monitoring-Metriken validieren:
Metrics Validation:
@Test
void shouldProduceCorrectMetricsDuringFlowExecution() throws Exception {
// Given
MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
// When
for (int i = 0; i < 50; i++) {
OrderPlacedEvent event = createOrderEvent("metrics-test-" + i);
kafkaTemplate.send("order.placed.v1", event);
}
// Wait for processing
Thread.sleep(30000);
// Then - Validate business metrics
Counter orderCounter = meterRegistry.counter("orders.placed");
Counter paymentCounter = meterRegistry.counter("payments.processed");
Timer flowTimer = meterRegistry.timer("flow.order.completion");
assertThat(orderCounter.count()).isEqualTo(50);
assertThat(paymentCounter.count()).isEqualTo(50);
assertThat(flowTimer.count()).isEqualTo(50);
assertThat(flowTimer.mean(TimeUnit.SECONDS)).isLessThan(5.0);
}Event-Flow-Simulation verbindet technische Tests mit fachlichen Anforderungen. Sie zeigt nicht nur, ob einzelne Services funktionieren, sondern ob das Gesamtsystem die Geschäftsziele erfüllt. Der Schlüssel liegt darin, realistische Szenarien zu definieren, systematisch zu testen und dabei sowohl Happy Path als auch Fehlerfälle abzudecken.