26 Events konsumieren

26.1 Architektur einer Consumer-Komponente

Ein Event-Consumer ist mehr als nur ein einfacher Message-Handler. Er muss robust, skalierbar und wartbar sein, während er kontinuierlich eingehende Events verarbeitet. Die Architektur einer Consumer-Komponente folgt dabei etablierten Patterns, die sowohl die Verarbeitung einzelner Events als auch das Gesamtverhalten des Systems berücksichtigen.

26.1.1 Consumer Lifecycle und State Management

Der Lebenszyklus eines Consumers durchläuft mehrere definierte Phasen: Initialisierung, Registrierung am Event-Broker, aktive Verarbeitung und kontrolliertes Herunterfahren. Jede Phase erfordert spezifische Behandlung, um sowohl Datenintegrität als auch System-Stabilität zu gewährleisten.

Initialisierungsphase

Während der Initialisierung lädt der Consumer seine Konfiguration, stellt Verbindungen her und registriert sich bei den gewünschten Event-Topics. Diese Phase ist kritisch, da hier entschieden wird, ab welchem Punkt Events verarbeitet werden sollen.

@Component
public class PaymentConsumer {
    
    @Value("${payment.consumer.group}")
    private String consumerGroup;
    
    @Value("${payment.consumer.auto-offset-reset}")
    private String offsetReset;
    
    @PostConstruct
    public void initialize() {
        log.info("Initializing PaymentConsumer for group: {}", consumerGroup);
        setupHealthCheck();
        initializeStateStore();
    }
    
    private void initializeStateStore() {
        // State-Management für Verarbeitungsstatus
        paymentStateStore = new ConcurrentHashMap<>();
    }
}
import asyncio
from confluent_kafka import Consumer
from typing import Dict, Any
import logging

class PaymentConsumer:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.consumer = None
        self.running = False
        self.payment_state_store = {}
        
    async def initialize(self):
        """Initialisiert den Consumer und State-Management"""
        logging.info(f"Initializing PaymentConsumer for group: {self.config['group.id']}")
        
        self.consumer = Consumer(self.config)
        self.consumer.subscribe(['order.placed.v1'])
        self.running = True
        
        await self._setup_health_check()
        self._initialize_state_store()
    
    def _initialize_state_store(self):
        """Setup für lokales State-Management"""
        self.payment_state_store = {}

Aktive Verarbeitungsphase

Während der aktiven Phase verarbeitet der Consumer kontinuierlich eingehende Events. Dabei muss er seinen internen Zustand konsistent halten und gleichzeitig auf Systemsignale reagieren können.

@KafkaListener(topics = "order.placed.v1", groupId = "${payment.consumer.group}")
public void handleOrderPlaced(OrderPlacedEvent event, 
                             @Header("kafka_receivedPartition") int partition,
                             @Header("kafka_offset") long offset) {
    
    String orderId = event.getOrderId();
    
    // State-Management: Verarbeitungsstatus verfolgen
    ProcessingState state = new ProcessingState(orderId, partition, offset);
    paymentStateStore.put(orderId, state);
    
    try {
        processPayment(event);
        state.markCompleted();
        
    } catch (PaymentException e) {
        state.markFailed(e.getMessage());
        handlePaymentError(event, e);
    }
}
async def process_events(self):
    """Hauptverarbeitungsschleife mit State-Management"""
    while self.running:
        try:
            msg = self.consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
                
            if msg.error():
                logging.error(f"Consumer error: {msg.error()}")
                continue
            
            # State-Management für eingehende Events
            order_id = msg.value()['orderId']
            processing_state = {
                'order_id': order_id,
                'partition': msg.partition(),
                'offset': msg.offset(),
                'status': 'processing'
            }
            
            self.payment_state_store[order_id] = processing_state
            
            await self._process_payment(msg.value())
            processing_state['status'] = 'completed'
            
        except Exception as e:
            processing_state['status'] = 'failed'
            processing_state['error'] = str(e)
            await self._handle_payment_error(msg.value(), e)

Graceful Shutdown

Das kontrollierte Herunterfahren stellt sicher, dass laufende Verarbeitungen abgeschlossen werden, bevor der Consumer terminiert.

@PreDestroy
public void shutdown() {
    log.info("Gracefully shutting down PaymentConsumer");
    
    // Stoppe Annahme neuer Events
    this.running = false;
    
    // Warte auf Abschluss laufender Verarbeitungen
    awaitInFlightCompletion();
    
    // Persistiere finalen State
    persistStateStore();
}

private void awaitInFlightCompletion() {
    int maxWaitSeconds = 30;
    int waited = 0;
    
    while (hasInFlightProcessing() && waited < maxWaitSeconds) {
        try {
            Thread.sleep(1000);
            waited++;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
}

26.1.2 Scaling und Load Balancing

Event-Consumer müssen horizontal skalieren können, um mit steigender Event-Last umzugehen. Die Skalierung erfolgt dabei über mehrere Consumer-Instanzen, die sich die Verarbeitung der Events teilen.

Horizontale Skalierung durch Consumer Groups

Consumer Groups ermöglichen es, die Verarbeitungslast auf mehrere Instanzen zu verteilen. Jede Partition eines Topics wird dabei genau einer Consumer-Instanz zugeordnet.

Partitionen Consumer-Instanzen Verteilung
4 2 Jeder Consumer bearbeitet 2 Partitionen
4 4 Jeder Consumer bearbeitet 1 Partition
4 6 2 Consumer bleiben inaktiv
@Component
public class PaymentConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-service-group");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Load Balancing und Partition Assignment

Die Kafka-Broker verteilen Partitionen automatisch auf verfügbare Consumer. Bei Änderungen in der Consumer-Group (neue Instanzen oder Ausfälle) erfolgt ein Rebalancing.

# Consumer-Konfiguration für optimales Load Balancing
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'payment-service-group',
    'auto.offset.reset': 'earliest',
    'max.poll.records': 100,
    'fetch.min.bytes': 1024,
    'session.timeout.ms': 30000,
    'heartbeat.interval.ms': 10000
}

class ScalablePaymentConsumer:
    def __init__(self, instance_id: str):
        self.instance_id = instance_id
        self.consumer = Consumer(consumer_config)
        
    async def start_processing(self):
        """Startet Verarbeitung mit Load Balancing"""
        self.consumer.subscribe(['order.placed.v1'], 
                               on_assign=self._on_partition_assign,
                               on_revoke=self._on_partition_revoke)
        
        while self.running:
            await self._process_batch()
    
    def _on_partition_assign(self, consumer, partitions):
        """Callback bei Partition-Zuweisung"""
        logging.info(f"Instance {self.instance_id} assigned partitions: {partitions}")
        
    def _on_partition_revoke(self, consumer, partitions):
        """Callback bei Partition-Entzug"""
        logging.info(f"Instance {self.instance_id} revoked partitions: {partitions}")
        # Cleanup für betroffene Partitionen

26.1.3 Configuration Management

Die Konfiguration von Consumer-Komponenten erfordert Balance zwischen Performance, Zuverlässigkeit und Ressourcenverbrauch. Verschiedene Parameter beeinflussen das Verhalten erheblich.

Performance-relevante Konfiguration

# application.yml
spring:
  kafka:
    consumer:
      group-id: payment-service-group
      max-poll-records: 100          # Batch-Größe pro Poll
      fetch-min-bytes: 1024          # Minimale Fetch-Größe
      fetch-max-wait: 500ms          # Maximale Wartezeit
      heartbeat-interval: 10s        # Heartbeat-Intervall
      session-timeout: 30s           # Session-Timeout
      
payment:
  consumer:
    processing:
      batch-size: 50                 # Interne Batch-Verarbeitung
      timeout: 5s                    # Processing-Timeout pro Event
      retry-attempts: 3              # Anzahl Wiederholungen
# Consumer-Konfiguration mit Performance-Tuning
CONSUMER_CONFIG = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'payment-service-group',
    'max.poll.records': 100,
    'fetch.min.bytes': 1024,
    'fetch.max.wait.ms': 500,
    'heartbeat.interval.ms': 10000,
    'session.timeout.ms': 30000,
    'enable.auto.commit': False,      # Manuelles Offset-Management
    'auto.offset.reset': 'earliest'
}

class ConfigurablePaymentConsumer:
    def __init__(self):
        self.config = CONSUMER_CONFIG.copy()
        self.processing_timeout = 5
        self.batch_size = 50
        self.retry_attempts = 3
        
    def update_config(self, **kwargs):
        """Dynamische Konfigurationsanpassung"""
        for key, value in kwargs.items():
            if key in self.config:
                self.config[key] = value
                logging.info(f"Updated config: {key} = {value}")

Umgebungsspezifische Konfiguration

Verschiedene Deployment-Umgebungen erfordern angepasste Konfigurationen, um optimal zu funktionieren.

@Configuration
@Profile("production")
public class ProductionConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, OrderPlacedEvent> productionConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        // Produktions-optimierte Einstellungen
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

@Configuration
@Profile("development")
public class DevelopmentConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, OrderPlacedEvent> developmentConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        // Entwicklungs-freundliche Einstellungen
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Die Architektur einer Consumer-Komponente muss diese verschiedenen Aspekte harmonisch kombinieren. Ein gut gestalteter Consumer startet zuverlässig, verarbeitet Events effizient im Team mit anderen Instanzen und lässt sich flexibel konfigurieren. Der Schlüssel liegt darin, die Balance zwischen Einfachheit und den notwendigen Funktionen zu finden, die für robuste Event-Verarbeitung erforderlich sind.