59 Integrationstests mit Testcontainers (Kafka)

Integrationstests in Event-Driven Architecture erfordern echte Event-Infrastruktur. Hier reichen Mocks nicht mehr aus - wir müssen das Zusammenspiel zwischen Producer, Broker und Consumer mit realer Kafka-Infrastruktur testen. Testcontainers löst dieses Problem elegant: Docker-Container werden programmatisch gestartet, für Tests verwendet und automatisch wieder entfernt.

Das Ziel: Realitätsnahe Tests ohne Infrastruktur-Setup-Aufwand und ohne Interferenz zwischen Testläufen.

59.1 Embedded Test Infrastructure

59.1.1 Testcontainers Setup für Kafka

Testcontainers startet Kafka-Container on-demand und stellt Connection-Parameter zur Verfügung:

Spring Boot Testcontainers Setup:

@Testcontainers
@SpringBootTest
class OrderServiceIntegrationTest {
    
    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
            .withEmbeddedZookeeper();
    
    @DynamicPropertySource
    static void configureProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
        registry.add("spring.kafka.producer.key-serializer", () -> "org.apache.kafka.common.serialization.StringSerializer");
        registry.add("spring.kafka.producer.value-serializer", () -> "org.springframework.kafka.support.serializer.JsonSerializer");
    }
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
}

Python Testcontainers Setup:

import pytest
from testcontainers.kafka import KafkaContainer
from kafka import KafkaProducer, KafkaConsumer
import json

@pytest.fixture(scope="session")
def kafka_container():
    with KafkaContainer() as kafka:
        yield kafka

@pytest.fixture
def kafka_config(kafka_container):
    return {
        'bootstrap_servers': kafka_container.get_bootstrap_server(),
        'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
        'value_deserializer': lambda m: json.loads(m.decode('utf-8'))
    }

class TestOrderServiceIntegration:
    def test_order_processing_flow(self, kafka_config):
        # Test implementation
        pass

59.1.2 Container Lifecycle Management

Testcontainers verwaltet Container-Lifecycle automatisch:

Container-Scope Strategien:

Scope Lebensdauer Verwendung Performance
Method Pro Test Vollständige Isolation Langsam
Class Pro Testklasse Balance Isolation/Speed Mittel
Session Pro Testsuite Maximale Performance Schnell

Optimierte Container-Wiederverwendung:

@Testcontainers
class OrderProcessingIntegrationTest {
    
    // Session-scoped Container für bessere Performance
    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
            .withEmbeddedZookeeper()
            .withReuse(true); // Container-Wiederverwendung zwischen Testläufen
    
    @BeforeEach
    void cleanupTopics() {
        // Topics zwischen Tests zurücksetzen
        kafkaAdmin.deleteTopics(List.of("order.placed.v1", "payment.processed.v1"));
        kafkaAdmin.createTopics(List.of(
            new NewTopic("order.placed.v1", 1, (short) 1),
            new NewTopic("payment.processed.v1", 1, (short) 1)
        ));
    }
}

59.1.3 Multi-Service Integration Tests

Realistische Szenarien testen mehrere Services gleichzeitig:

End-to-End Order Processing Test:

@Test
void shouldProcessCompleteOrderFlow() throws Exception {
    // Given
    CreateOrderRequest orderRequest = CreateOrderRequest.builder()
        .customerId("customer-123")
        .items(List.of(new OrderItem("product-1", 2)))
        .build();
    
    // Setup Consumer für PaymentProcessed Events
    CountDownLatch paymentLatch = new CountDownLatch(1);
    
    @KafkaListener(topics = "payment.processed.v1")
    void handlePaymentProcessed(PaymentProcessedEvent event) {
        paymentLatch.countDown();
    }
    
    // When
    orderService.placeOrder(orderRequest);
    
    // Then
    boolean paymentProcessed = paymentLatch.await(5, TimeUnit.SECONDS);
    assertThat(paymentProcessed).isTrue();
    
    // Verify complete state
    Order order = orderRepository.findByCustomerId("customer-123").get(0);
    assertThat(order.getStatus()).isEqualTo(OrderStatus.PAYMENT_PENDING);
}

59.2 Test Environment Management

59.2.1 Umgebungs-Isolation zwischen Tests

Jeder Test benötigt eine saubere Kafka-Umgebung:

Topic-Management Strategien:

@Component
public class TestKafkaManager {
    
    private final AdminClient adminClient;
    
    public void resetTestEnvironment() {
        // Alle Test-Topics löschen
        Set<String> existingTopics = adminClient.listTopics().names().get();
        List<String> testTopics = existingTopics.stream()
            .filter(topic -> topic.contains("test") || topic.contains("v1"))
            .collect(Collectors.toList());
            
        if (!testTopics.isEmpty()) {
            adminClient.deleteTopics(testTopics).all().get();
        }
    }
    
    public void createTestTopics(List<String> topicNames) {
        List<NewTopic> topics = topicNames.stream()
            .map(name -> new NewTopic(name, 1, (short) 1))
            .collect(Collectors.toList());
            
        adminClient.createTopics(topics).all().get();
    }
}

59.2.2 Container-Konfiguration für Tests

Test-spezifische Kafka-Konfiguration optimiert Geschwindigkeit und Zuverlässigkeit:

Optimierte Test-Container-Konfiguration:

@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
    .withEmbeddedZookeeper()
    .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
    .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
    .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
    .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "1"); // Sofortiges Flushing für Tests

59.2.3 Parallele Testausführung

Testcontainers unterstützt parallele Testausführung durch Port-Randomisierung:

Parallel Test Configuration:

// JUnit 5 Parallel Execution
@EnabledIf("java.util.concurrent.ForkJoinPool.getCommonPoolParallelism() > 1")
@Execution(ExecutionMode.CONCURRENT)
class ParallelIntegrationTest {
    
    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
        .withEmbeddedZookeeper()
        .withNetworkAliases("kafka-" + UUID.randomUUID().toString()); // Eindeutige Network-Aliases
}

Python Parallel Test Setup:

@pytest.fixture(scope="function")  # Function-scope für Parallelität
def isolated_kafka():
    with KafkaContainer() as kafka:
        # Jeder Test bekommt eigenen Container
        yield kafka

@pytest.mark.parametrize("test_scenario", [
    "high_volume_orders",
    "payment_failures", 
    "inventory_shortages"
])
def test_order_scenarios_parallel(isolated_kafka, test_scenario):
    # Tests laufen parallel mit separaten Containern
    pass

59.3 Test Data Lifecycle

59.3.1 Event-Daten für Tests generieren

Konsistente Testdaten über den gesamten Event-Flow:

Test Data Factory:

@Component
public class EventTestDataFactory {
    
    public OrderPlacedEvent createOrderPlacedEvent(String customerId) {
        return OrderPlacedEvent.builder()
            .eventId(UUID.randomUUID().toString())
            .eventType("OrderPlaced")
            .timestamp(Instant.now())
            .version("v1")
            .data(OrderData.builder()
                .orderId(UUID.randomUUID().toString())
                .customerId(customerId)
                .items(List.of(new OrderItem("product-1", 2)))
                .totalAmount(BigDecimal.valueOf(99.99))
                .currency("EUR")
                .build())
            .build();
    }
    
    public void publishEvent(String topic, Object event) {
        kafkaTemplate.send(topic, event).get(5, TimeUnit.SECONDS);
    }
}

59.3.2 Consumer-State zwischen Tests verwalten

Consumer-Offsets und State müssen zwischen Tests zurückgesetzt werden:

Consumer-Reset Strategien:

@TestConfiguration
public class TestConsumerConfiguration {
    
    @Bean
    @Primary
    public ConsumerFactory<String, Object> testConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + UUID.randomUUID());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Test-Cleanup zwischen Tests:

@AfterEach
void cleanupAfterTest() {
    // Consumer-Gruppen zurücksetzen
    consumerGroupManager.resetConsumerGroup("payment-service-test");
    consumerGroupManager.resetConsumerGroup("inventory-service-test");
    
    // Pending Messages löschen
    kafkaTestUtils.flushAllMessages();
    
    // Application State zurücksetzen
    orderRepository.deleteAll();
    paymentRepository.deleteAll();
}

59.3.3 Event-Sequence Testing

Komplexe Event-Abfolgen testen:

Multi-Step Event Flow Test:

@Test
void shouldHandleCompleteOrderToShippingFlow() throws Exception {
    // Given
    String orderId = "order-123";
    String customerId = "customer-456";
    
    // Event-Listeners für jeden Schritt
    List<Object> receivedEvents = Collections.synchronizedList(new ArrayList<>());
    
    @KafkaListener(topics = {"payment.processed.v1", "inventory.reserved.v1", "order.shipped.v1"})
    void captureEvents(Object event) {
        receivedEvents.add(event);
    }
    
    // When - Trigger initial event
    OrderPlacedEvent orderEvent = eventTestDataFactory.createOrderPlacedEvent(customerId);
    eventTestDataFactory.publishEvent("order.placed.v1", orderEvent);
    
    // Then - Verify event sequence
    await().atMost(10, TimeUnit.SECONDS).until(() -> receivedEvents.size() >= 3);
    
    assertThat(receivedEvents)
        .hasSize(3)
        .extracting(Object::getClass)
        .containsExactly(
            PaymentProcessedEvent.class,
            InventoryReservedEvent.class, 
            OrderShippedEvent.class
        );
}

59.3.4 Performance-Test-Daten

Load Testing mit Testcontainers:

High-Volume Event Test:

@Test
void shouldHandleHighVolumeOrderProcessing() throws Exception {
    // Given
    int numberOfOrders = 1000;
    CountDownLatch processedLatch = new CountDownLatch(numberOfOrders);
    
    @KafkaListener(topics = "payment.processed.v1")
    void countProcessedPayments(PaymentProcessedEvent event) {
        processedLatch.countDown();
    }
    
    // When - Publish many events
    for (int i = 0; i < numberOfOrders; i++) {
        OrderPlacedEvent event = eventTestDataFactory.createOrderPlacedEvent("customer-" + i);
        kafkaTemplate.send("order.placed.v1", event);
    }
    
    // Then - All should be processed within time limit
    boolean allProcessed = processedLatch.await(30, TimeUnit.SECONDS);
    assertThat(allProcessed).isTrue();
    
    // Verify no message loss
    assertThat(paymentRepository.count()).isEqualTo(numberOfOrders);
}

59.3.5 Test-Monitoring und Debugging

Integration Tests benötigen erweiterte Observability:

Test Event Tracing:

@TestConfiguration
public class TestObservabilityConfiguration {
    
    @Bean
    public TestEventTracer testEventTracer() {
        return new TestEventTracer();
    }
    
    @EventListener
    public void traceEvent(ApplicationEvent event) {
        testEventTracer.recordEvent(event);
    }
}

@Component
public class TestEventTracer {
    private final List<TracedEvent> eventTrace = Collections.synchronizedList(new ArrayList<>());
    
    public void recordEvent(Object event) {
        eventTrace.add(new TracedEvent(
            event.getClass().getSimpleName(),
            Instant.now(),
            Thread.currentThread().getName()
        ));
    }
    
    public List<TracedEvent> getEventTrace() {
        return new ArrayList<>(eventTrace);
    }
    
    @AfterEach
    void printEventTrace() {
        eventTrace.forEach(event -> 
            log.info("Event: {} at {} on thread {}", 
                event.getType(), event.getTimestamp(), event.getThread()));
    }
}

Integrationstests mit Testcontainers bringen Event-Driven Architecture näher an die Realität. Sie decken Timing-Probleme, Serialisierung-Issues und Infrastruktur-spezifische Verhaltensweisen auf, die Unit Tests nicht erkennen können. Der Schlüssel liegt in effizientem Container-Management und sauberer Test-Isolation.