Integrationstests in Event-Driven Architecture erfordern echte Event-Infrastruktur. Hier reichen Mocks nicht mehr aus - wir müssen das Zusammenspiel zwischen Producer, Broker und Consumer mit realer Kafka-Infrastruktur testen. Testcontainers löst dieses Problem elegant: Docker-Container werden programmatisch gestartet, für Tests verwendet und automatisch wieder entfernt.
Das Ziel: Realitätsnahe Tests ohne Infrastruktur-Setup-Aufwand und ohne Interferenz zwischen Testläufen.
Testcontainers startet Kafka-Container on-demand und stellt Connection-Parameter zur Verfügung:
Spring Boot Testcontainers Setup:
@Testcontainers
@SpringBootTest
class OrderServiceIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withEmbeddedZookeeper();
@DynamicPropertySource
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
registry.add("spring.kafka.producer.key-serializer", () -> "org.apache.kafka.common.serialization.StringSerializer");
registry.add("spring.kafka.producer.value-serializer", () -> "org.springframework.kafka.support.serializer.JsonSerializer");
}
@Autowired
private OrderService orderService;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
}Python Testcontainers Setup:
import pytest
from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer
import json
@pytest.fixture(scope="session")
def kafka_container():
with KafkaContainer() as kafka:
yield kafka
@pytest.fixture
def kafka_config(kafka_container):
return {
'bootstrap_servers': kafka_container.get_bootstrap_server(),
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
'value_deserializer': lambda m: json.loads(m.decode('utf-8'))
}
class TestOrderServiceIntegration:
def test_order_processing_flow(self, kafka_config):
# Test implementation
passTestcontainers verwaltet Container-Lifecycle automatisch:
Container-Scope Strategien:
| Scope | Lebensdauer | Verwendung | Performance |
|---|---|---|---|
| Method | Pro Test | Vollständige Isolation | Langsam |
| Class | Pro Testklasse | Balance Isolation/Speed | Mittel |
| Session | Pro Testsuite | Maximale Performance | Schnell |
Optimierte Container-Wiederverwendung:
@Testcontainers
class OrderProcessingIntegrationTest {
// Session-scoped Container für bessere Performance
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withEmbeddedZookeeper()
.withReuse(true); // Container-Wiederverwendung zwischen Testläufen
@BeforeEach
void cleanupTopics() {
// Topics zwischen Tests zurücksetzen
kafkaAdmin.deleteTopics(List.of("order.placed.v1", "payment.processed.v1"));
kafkaAdmin.createTopics(List.of(
new NewTopic("order.placed.v1", 1, (short) 1),
new NewTopic("payment.processed.v1", 1, (short) 1)
));
}
}Realistische Szenarien testen mehrere Services gleichzeitig:
End-to-End Order Processing Test:
@Test
void shouldProcessCompleteOrderFlow() throws Exception {
// Given
CreateOrderRequest orderRequest = CreateOrderRequest.builder()
.customerId("customer-123")
.items(List.of(new OrderItem("product-1", 2)))
.build();
// Setup Consumer für PaymentProcessed Events
CountDownLatch paymentLatch = new CountDownLatch(1);
@KafkaListener(topics = "payment.processed.v1")
void handlePaymentProcessed(PaymentProcessedEvent event) {
paymentLatch.countDown();
}
// When
orderService.placeOrder(orderRequest);
// Then
boolean paymentProcessed = paymentLatch.await(5, TimeUnit.SECONDS);
assertThat(paymentProcessed).isTrue();
// Verify complete state
Order order = orderRepository.findByCustomerId("customer-123").get(0);
assertThat(order.getStatus()).isEqualTo(OrderStatus.PAYMENT_PENDING);
}Jeder Test benötigt eine saubere Kafka-Umgebung:
Topic-Management Strategien:
@Component
public class TestKafkaManager {
private final AdminClient adminClient;
public void resetTestEnvironment() {
// Alle Test-Topics löschen
Set<String> existingTopics = adminClient.listTopics().names().get();
List<String> testTopics = existingTopics.stream()
.filter(topic -> topic.contains("test") || topic.contains("v1"))
.collect(Collectors.toList());
if (!testTopics.isEmpty()) {
adminClient.deleteTopics(testTopics).all().get();
}
}
public void createTestTopics(List<String> topicNames) {
List<NewTopic> topics = topicNames.stream()
.map(name -> new NewTopic(name, 1, (short) 1))
.collect(Collectors.toList());
adminClient.createTopics(topics).all().get();
}
}Test-spezifische Kafka-Konfiguration optimiert Geschwindigkeit und Zuverlässigkeit:
Optimierte Test-Container-Konfiguration:
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withEmbeddedZookeeper()
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "1"); // Sofortiges Flushing für TestsTestcontainers unterstützt parallele Testausführung durch Port-Randomisierung:
Parallel Test Configuration:
// JUnit 5 Parallel Execution
@EnabledIf("java.util.concurrent.ForkJoinPool.getCommonPoolParallelism() > 1")
@Execution(ExecutionMode.CONCURRENT)
class ParallelIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
.withEmbeddedZookeeper()
.withNetworkAliases("kafka-" + UUID.randomUUID().toString()); // Eindeutige Network-Aliases
}Python Parallel Test Setup:
@pytest.fixture(scope="function") # Function-scope für Parallelität
def isolated_kafka():
with KafkaContainer() as kafka:
# Jeder Test bekommt eigenen Container
yield kafka
@pytest.mark.parametrize("test_scenario", [
"high_volume_orders",
"payment_failures",
"inventory_shortages"
])
def test_order_scenarios_parallel(isolated_kafka, test_scenario):
# Tests laufen parallel mit separaten Containern
passKonsistente Testdaten über den gesamten Event-Flow:
Test Data Factory:
@Component
public class EventTestDataFactory {
public OrderPlacedEvent createOrderPlacedEvent(String customerId) {
return OrderPlacedEvent.builder()
.eventId(UUID.randomUUID().toString())
.eventType("OrderPlaced")
.timestamp(Instant.now())
.version("v1")
.data(OrderData.builder()
.orderId(UUID.randomUUID().toString())
.customerId(customerId)
.items(List.of(new OrderItem("product-1", 2)))
.totalAmount(BigDecimal.valueOf(99.99))
.currency("EUR")
.build())
.build();
}
public void publishEvent(String topic, Object event) {
kafkaTemplate.send(topic, event).get(5, TimeUnit.SECONDS);
}
}Consumer-Offsets und State müssen zwischen Tests zurückgesetzt werden:
Consumer-Reset Strategien:
@TestConfiguration
public class TestConsumerConfiguration {
@Bean
@Primary
public ConsumerFactory<String, Object> testConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + UUID.randomUUID());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(props);
}
}Test-Cleanup zwischen Tests:
@AfterEach
void cleanupAfterTest() {
// Consumer-Gruppen zurücksetzen
consumerGroupManager.resetConsumerGroup("payment-service-test");
consumerGroupManager.resetConsumerGroup("inventory-service-test");
// Pending Messages löschen
kafkaTestUtils.flushAllMessages();
// Application State zurücksetzen
orderRepository.deleteAll();
paymentRepository.deleteAll();
}Komplexe Event-Abfolgen testen:
Multi-Step Event Flow Test:
@Test
void shouldHandleCompleteOrderToShippingFlow() throws Exception {
// Given
String orderId = "order-123";
String customerId = "customer-456";
// Event-Listeners für jeden Schritt
List<Object> receivedEvents = Collections.synchronizedList(new ArrayList<>());
@KafkaListener(topics = {"payment.processed.v1", "inventory.reserved.v1", "order.shipped.v1"})
void captureEvents(Object event) {
receivedEvents.add(event);
}
// When - Trigger initial event
OrderPlacedEvent orderEvent = eventTestDataFactory.createOrderPlacedEvent(customerId);
eventTestDataFactory.publishEvent("order.placed.v1", orderEvent);
// Then - Verify event sequence
await().atMost(10, TimeUnit.SECONDS).until(() -> receivedEvents.size() >= 3);
assertThat(receivedEvents)
.hasSize(3)
.extracting(Object::getClass)
.containsExactly(
PaymentProcessedEvent.class,
InventoryReservedEvent.class,
OrderShippedEvent.class
);
}Load Testing mit Testcontainers:
High-Volume Event Test:
@Test
void shouldHandleHighVolumeOrderProcessing() throws Exception {
// Given
int numberOfOrders = 1000;
CountDownLatch processedLatch = new CountDownLatch(numberOfOrders);
@KafkaListener(topics = "payment.processed.v1")
void countProcessedPayments(PaymentProcessedEvent event) {
processedLatch.countDown();
}
// When - Publish many events
for (int i = 0; i < numberOfOrders; i++) {
OrderPlacedEvent event = eventTestDataFactory.createOrderPlacedEvent("customer-" + i);
kafkaTemplate.send("order.placed.v1", event);
}
// Then - All should be processed within time limit
boolean allProcessed = processedLatch.await(30, TimeUnit.SECONDS);
assertThat(allProcessed).isTrue();
// Verify no message loss
assertThat(paymentRepository.count()).isEqualTo(numberOfOrders);
}Integration Tests benötigen erweiterte Observability:
Test Event Tracing:
@TestConfiguration
public class TestObservabilityConfiguration {
@Bean
public TestEventTracer testEventTracer() {
return new TestEventTracer();
}
@EventListener
public void traceEvent(ApplicationEvent event) {
testEventTracer.recordEvent(event);
}
}
@Component
public class TestEventTracer {
private final List<TracedEvent> eventTrace = Collections.synchronizedList(new ArrayList<>());
public void recordEvent(Object event) {
eventTrace.add(new TracedEvent(
event.getClass().getSimpleName(),
Instant.now(),
Thread.currentThread().getName()
));
}
public List<TracedEvent> getEventTrace() {
return new ArrayList<>(eventTrace);
}
@AfterEach
void printEventTrace() {
eventTrace.forEach(event ->
log.info("Event: {} at {} on thread {}",
event.getType(), event.getTimestamp(), event.getThread()));
}
}Integrationstests mit Testcontainers bringen Event-Driven Architecture näher an die Realität. Sie decken Timing-Probleme, Serialisierung-Issues und Infrastruktur-spezifische Verhaltensweisen auf, die Unit Tests nicht erkennen können. Der Schlüssel liegt in effizientem Container-Management und sauberer Test-Isolation.