63 Was man über Kafka im Betrieb wissen muss – nur das Nötigste

Wenn Sie Event-Driven Architecture mit Kafka umsetzen, müssen Sie nicht zum Kafka-Administrator werden. Dennoch gibt es operationale Grundlagen, die jeder Entwickler verstehen sollte, um robuste Anwendungen zu bauen und Produktionsprobleme zu verstehen.

63.1 Operational Basics

63.1.1 Kafka-Cluster und Broker-Konzept

Kafka läuft als verteiltes System aus mehreren Brokern. Ein Broker ist eine Kafka-Instanz, mehrere Broker bilden ein Cluster. Für Entwickler relevant:

Redundanz durch Replikation:

# docker-compose.yml - Minimales 3-Broker Setup
services:
  kafka-1:
    image: confluentinc/cp-kafka:7.4.0
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      
  kafka-2:
    image: confluentinc/cp-kafka:7.4.0
    environment:
      KAFKA_BROKER_ID: 2
      # ... ähnliche Konfiguration

Partition und Replikation: Jeder Topic wird in Partitionen aufgeteilt. Jede Partition kann auf mehreren Brokern repliziert werden. Als Entwickler definieren Sie die Replikation beim Topic-Setup:

Aspekt Entwickler-Relevanz Operational Impact
Partitionen Parallelität der Consumer Skalierung und Durchsatz
Replikation Datensicherheit Ausfallsicherheit
Leader/Follower Keine direkte Auswirkung Automatic Failover

63.1.2 Topic-Management

Topics sollten explizit erstellt werden, nicht automatisch zur Laufzeit:

Spring Boot Topic-Konfiguration:

@Configuration
public class TopicConfiguration {
    
    @Bean
    public NewTopic orderPlacedTopic() {
        return TopicBuilder.name("order.placed.v1")
                .partitions(3)
                .replicas(3)
                .build();
    }
}

Python Topic-Erstellung:

from kafka.admin import KafkaAdminClient, NewTopic

def create_topics():
    admin = KafkaAdminClient(
        bootstrap_servers=['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092']
    )
    
    topics = [
        NewTopic(
            name='order.placed.v1',
            num_partitions=3,
            replication_factor=3
        )
    ]
    
    admin.create_topics(topics)

63.1.3 Offset-Management im Betrieb

Offsets zeigen, welche Messages ein Consumer bereits verarbeitet hat. Drei kritische Konzepte:

Auto-Commit vs. Manual-Commit:

// Spring Boot - Automatisches Commit
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(@Payload OrderPlaced order) {
    paymentService.processPayment(order);
    // Offset wird automatisch committed
}

// Manuelles Commit für kritische Verarbeitung
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlacedManual(
    @Payload OrderPlaced order,
    Acknowledgment acknowledgment) {
    
    try {
        paymentService.processPayment(order);
        acknowledgment.acknowledge(); // Explizites Commit
    } catch (Exception e) {
        // Kein Commit - Message wird wiederholt
        log.error("Payment processing failed", e);
    }
}

63.2 Configuration Essentials

63.2.1 Producer-Konfiguration für den Betrieb

Die wichtigsten Parameter, die Entwickler verstehen müssen:

Parameter Zweck Empfehlung
acks Bestätigung der Replikation all für Datensicherheit
retries Wiederholung bei Fehlern Hoch (Int.MAX_VALUE)
batch.size Batching für Performance 16384 (Standard)
linger.ms Wartezeit für Batching 5-100ms je nach Latenz-Anforderung

Spring Boot Producer-Konfiguration:

# application.yml
spring:
  kafka:
    producer:
      acks: all
      retries: 2147483647
      properties:
        linger.ms: 50
        batch.size: 16384
        enable.idempotence: true

Python Producer-Konfiguration:

from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
    acks='all',
    retries=2147483647,
    linger_ms=50,
    batch_size=16384,
    enable_idempotence=True
)

63.2.2 Consumer-Konfiguration für den Betrieb

Entscheidende Parameter für stabile Consumer:

Parameter Zweck Operational Impact
enable.auto.commit Automatisches Offset-Commit false für kritische Anwendungen
auto.offset.reset Verhalten bei fehlendem Offset earliest für Vollständigkeit
max.poll.records Messages pro Poll-Zyklus Speicher vs. Durchsatz
session.timeout.ms Consumer-Group Timeout Rebalancing-Verhalten

Spring Boot Consumer-Konfiguration:

spring:
  kafka:
    consumer:
      group-id: payment-service
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        max.poll.records: 100
        session.timeout.ms: 30000

63.2.3 Idempotenz und Exactly-Once-Semantik

Für kritische Anwendungsfälle wie Zahlungsverarbeitung:

@Component
public class IdempotentPaymentProcessor {
    
    private final Set<String> processedOrders = ConcurrentHashMap.newKeySet();
    
    @KafkaListener(topics = "order.placed.v1")
    @Transactional
    public void handleOrderPlaced(@Payload OrderPlaced order) {
        String orderId = order.getOrderId();
        
        // Idempotenz-Check
        if (processedOrders.contains(orderId)) {
            log.info("Order {} already processed, skipping", orderId);
            return;
        }
        
        paymentService.processPayment(order);
        processedOrders.add(orderId);
    }
}

63.3 Common Operational Issues

63.3.1 Consumer Lag - Das häufigste Problem

Consumer Lag entsteht, wenn Consumer nicht schnell genug verarbeiten. Typische Szenarien:

Monitoring von Consumer Lag:

@Component
public class ConsumerLagMonitor {
    
    private final MeterRegistry meterRegistry;
    
    @EventListener
    public void onConsumerLag(ConsumerLagEvent event) {
        Gauge.builder("kafka.consumer.lag")
                .tags("topic", event.getTopic(), 
                      "partition", String.valueOf(event.getPartition()))
                .register(meterRegistry)
                .set(event.getLag());
    }
}

Lag-Behandlung in Python:

import time
from kafka import KafkaConsumer

def monitor_and_process():
    consumer = KafkaConsumer(
        'order.placed.v1',
        group_id='payment-service',
        auto_offset_reset='earliest'
    )
    
    for message in consumer:
        start_time = time.time()
        
        try:
            process_order(message.value)
            processing_time = time.time() - start_time
            
            # Lag-Warnung bei langsamer Verarbeitung
            if processing_time > 5.0:
                log.warning(f"Slow processing: {processing_time}s")
                
        except Exception as e:
            log.error(f"Processing failed: {e}")
            # Message wird nicht committed - automatische Wiederholung

63.3.2 Rebalancing und Consumer Groups

Rebalancing tritt auf, wenn Consumer einer Gruppe hinzugefügt oder entfernt werden:

Rebalancing-Trigger Auswirkung Mitigation
Consumer-Neustart Kurze Verarbeitungspause Graceful Shutdown
Consumer-Absturz Längere Pause Health Checks
Netzwerkprobleme Session Timeout Robuste Netzwerkkonfiguration

Graceful Shutdown in Spring Boot:

@PreDestroy
public void shutdown() {
    log.info("Shutting down payment processor gracefully");
    
    // Aktuelle Verarbeitung abschließen
    if (currentProcessing != null) {
        currentProcessing.complete();
    }
    
    // Consumer explizit schließen
    if (kafkaConsumer != null) {
        kafkaConsumer.close(Duration.ofSeconds(30));
    }
}

63.3.3 Schema Evolution Probleme

Wenn Sie Event-Strukturen ändern, können Kompatibilitätsprobleme auftreten:

Backward-Compatible Schema Evolution:

// Version 1
public class OrderPlaced {
    private String orderId;
    private BigDecimal totalAmount;
    // ...
}

// Version 2 - Backward Compatible
public class OrderPlaced {
    private String orderId;
    private BigDecimal totalAmount;
    private String currency = "EUR"; // Default für alte Events
    private List<String> tags = new ArrayList<>(); // Optional neue Felder
    // ...
}

63.3.4 Dead Letter Topics und Error Handling

Für Messages, die nicht verarbeitet werden können:

@Component
public class OrderProcessor {
    
    @Retryable(value = {TransientException.class}, maxAttempts = 3)
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrder(@Payload OrderPlaced order) {
        try {
            processOrder(order);
        } catch (TransientException e) {
            // Wird automatisch wiederholt
            throw e;
        } catch (Exception e) {
            // Permanent error - an Dead Letter Topic
            sendToDeadLetterTopic(order, e);
        }
    }
    
    @Recover
    public void handleFailedOrder(TransientException ex, OrderPlaced order) {
        // Nach allen Wiederholungen
        sendToDeadLetterTopic(order, ex);
    }
}

Diese operationalen Grundlagen helfen Ihnen, Event-Driven Architecture mit Kafka erfolgreich umzusetzen, ohne sich in administrativen Details zu verlieren. Der Fokus liegt darauf, was für die tägliche Entwicklungsarbeit relevant ist und häufige Produktionsprobleme zu vermeiden.