Ein Event-Consumer ist mehr als nur ein einfacher Message-Handler. Er muss robust, skalierbar und wartbar sein, während er kontinuierlich eingehende Events verarbeitet. Die Architektur einer Consumer-Komponente folgt dabei etablierten Patterns, die sowohl die Verarbeitung einzelner Events als auch das Gesamtverhalten des Systems berücksichtigen.
Der Lebenszyklus eines Consumers durchläuft mehrere definierte Phasen: Initialisierung, Registrierung am Event-Broker, aktive Verarbeitung und kontrolliertes Herunterfahren. Jede Phase erfordert spezifische Behandlung, um sowohl Datenintegrität als auch System-Stabilität zu gewährleisten.
Initialisierungsphase
Während der Initialisierung lädt der Consumer seine Konfiguration, stellt Verbindungen her und registriert sich bei den gewünschten Event-Topics. Diese Phase ist kritisch, da hier entschieden wird, ab welchem Punkt Events verarbeitet werden sollen.
@Component
public class PaymentConsumer {
@Value("${payment.consumer.group}")
private String consumerGroup;
@Value("${payment.consumer.auto-offset-reset}")
private String offsetReset;
@PostConstruct
public void initialize() {
log.info("Initializing PaymentConsumer for group: {}", consumerGroup);
setupHealthCheck();
initializeStateStore();
}
private void initializeStateStore() {
// State-Management für Verarbeitungsstatus
paymentStateStore = new ConcurrentHashMap<>();
}
}import asyncio
from confluent_kafka import Consumer
from typing import Dict, Any
import logging
class PaymentConsumer:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.consumer = None
self.running = False
self.payment_state_store = {}
async def initialize(self):
"""Initialisiert den Consumer und State-Management"""
logging.info(f"Initializing PaymentConsumer for group: {self.config['group.id']}")
self.consumer = Consumer(self.config)
self.consumer.subscribe(['order.placed.v1'])
self.running = True
await self._setup_health_check()
self._initialize_state_store()
def _initialize_state_store(self):
"""Setup für lokales State-Management"""
self.payment_state_store = {}Aktive Verarbeitungsphase
Während der aktiven Phase verarbeitet der Consumer kontinuierlich eingehende Events. Dabei muss er seinen internen Zustand konsistent halten und gleichzeitig auf Systemsignale reagieren können.
@KafkaListener(topics = "order.placed.v1", groupId = "${payment.consumer.group}")
public void handleOrderPlaced(OrderPlacedEvent event,
@Header("kafka_receivedPartition") int partition,
@Header("kafka_offset") long offset) {
String orderId = event.getOrderId();
// State-Management: Verarbeitungsstatus verfolgen
ProcessingState state = new ProcessingState(orderId, partition, offset);
paymentStateStore.put(orderId, state);
try {
processPayment(event);
state.markCompleted();
} catch (PaymentException e) {
state.markFailed(e.getMessage());
handlePaymentError(event, e);
}
}async def process_events(self):
"""Hauptverarbeitungsschleife mit State-Management"""
while self.running:
try:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
logging.error(f"Consumer error: {msg.error()}")
continue
# State-Management für eingehende Events
order_id = msg.value()['orderId']
processing_state = {
'order_id': order_id,
'partition': msg.partition(),
'offset': msg.offset(),
'status': 'processing'
}
self.payment_state_store[order_id] = processing_state
await self._process_payment(msg.value())
processing_state['status'] = 'completed'
except Exception as e:
processing_state['status'] = 'failed'
processing_state['error'] = str(e)
await self._handle_payment_error(msg.value(), e)Graceful Shutdown
Das kontrollierte Herunterfahren stellt sicher, dass laufende Verarbeitungen abgeschlossen werden, bevor der Consumer terminiert.
@PreDestroy
public void shutdown() {
log.info("Gracefully shutting down PaymentConsumer");
// Stoppe Annahme neuer Events
this.running = false;
// Warte auf Abschluss laufender Verarbeitungen
awaitInFlightCompletion();
// Persistiere finalen State
persistStateStore();
}
private void awaitInFlightCompletion() {
int maxWaitSeconds = 30;
int waited = 0;
while (hasInFlightProcessing() && waited < maxWaitSeconds) {
try {
Thread.sleep(1000);
waited++;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}Event-Consumer müssen horizontal skalieren können, um mit steigender Event-Last umzugehen. Die Skalierung erfolgt dabei über mehrere Consumer-Instanzen, die sich die Verarbeitung der Events teilen.
Horizontale Skalierung durch Consumer Groups
Consumer Groups ermöglichen es, die Verarbeitungslast auf mehrere Instanzen zu verteilen. Jede Partition eines Topics wird dabei genau einer Consumer-Instanz zugeordnet.
| Partitionen | Consumer-Instanzen | Verteilung |
|---|---|---|
| 4 | 2 | Jeder Consumer bearbeitet 2 Partitionen |
| 4 | 4 | Jeder Consumer bearbeitet 1 Partition |
| 4 | 6 | 2 Consumer bleiben inaktiv |
@Component
public class PaymentConsumerConfig {
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-service-group");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
return new DefaultKafkaConsumerFactory<>(props);
}
}Load Balancing und Partition Assignment
Die Kafka-Broker verteilen Partitionen automatisch auf verfügbare Consumer. Bei Änderungen in der Consumer-Group (neue Instanzen oder Ausfälle) erfolgt ein Rebalancing.
# Consumer-Konfiguration für optimales Load Balancing
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'payment-service-group',
'auto.offset.reset': 'earliest',
'max.poll.records': 100,
'fetch.min.bytes': 1024,
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000
}
class ScalablePaymentConsumer:
def __init__(self, instance_id: str):
self.instance_id = instance_id
self.consumer = Consumer(consumer_config)
async def start_processing(self):
"""Startet Verarbeitung mit Load Balancing"""
self.consumer.subscribe(['order.placed.v1'],
on_assign=self._on_partition_assign,
on_revoke=self._on_partition_revoke)
while self.running:
await self._process_batch()
def _on_partition_assign(self, consumer, partitions):
"""Callback bei Partition-Zuweisung"""
logging.info(f"Instance {self.instance_id} assigned partitions: {partitions}")
def _on_partition_revoke(self, consumer, partitions):
"""Callback bei Partition-Entzug"""
logging.info(f"Instance {self.instance_id} revoked partitions: {partitions}")
# Cleanup für betroffene PartitionenDie Konfiguration von Consumer-Komponenten erfordert Balance zwischen Performance, Zuverlässigkeit und Ressourcenverbrauch. Verschiedene Parameter beeinflussen das Verhalten erheblich.
Performance-relevante Konfiguration
# application.yml
spring:
kafka:
consumer:
group-id: payment-service-group
max-poll-records: 100 # Batch-Größe pro Poll
fetch-min-bytes: 1024 # Minimale Fetch-Größe
fetch-max-wait: 500ms # Maximale Wartezeit
heartbeat-interval: 10s # Heartbeat-Intervall
session-timeout: 30s # Session-Timeout
payment:
consumer:
processing:
batch-size: 50 # Interne Batch-Verarbeitung
timeout: 5s # Processing-Timeout pro Event
retry-attempts: 3 # Anzahl Wiederholungen# Consumer-Konfiguration mit Performance-Tuning
CONSUMER_CONFIG = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'payment-service-group',
'max.poll.records': 100,
'fetch.min.bytes': 1024,
'fetch.max.wait.ms': 500,
'heartbeat.interval.ms': 10000,
'session.timeout.ms': 30000,
'enable.auto.commit': False, # Manuelles Offset-Management
'auto.offset.reset': 'earliest'
}
class ConfigurablePaymentConsumer:
def __init__(self):
self.config = CONSUMER_CONFIG.copy()
self.processing_timeout = 5
self.batch_size = 50
self.retry_attempts = 3
def update_config(self, **kwargs):
"""Dynamische Konfigurationsanpassung"""
for key, value in kwargs.items():
if key in self.config:
self.config[key] = value
logging.info(f"Updated config: {key} = {value}")Umgebungsspezifische Konfiguration
Verschiedene Deployment-Umgebungen erfordern angepasste Konfigurationen, um optimal zu funktionieren.
@Configuration
@Profile("production")
public class ProductionConsumerConfig {
@Bean
public ConsumerFactory<String, OrderPlacedEvent> productionConsumerFactory() {
Map<String, Object> props = new HashMap<>();
// Produktions-optimierte Einstellungen
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 4096);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
return new DefaultKafkaConsumerFactory<>(props);
}
}
@Configuration
@Profile("development")
public class DevelopmentConsumerConfig {
@Bean
public ConsumerFactory<String, OrderPlacedEvent> developmentConsumerFactory() {
Map<String, Object> props = new HashMap<>();
// Entwicklungs-freundliche Einstellungen
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
}Die Architektur einer Consumer-Komponente muss diese verschiedenen Aspekte harmonisch kombinieren. Ein gut gestalteter Consumer startet zuverlässig, verarbeitet Events effizient im Team mit anderen Instanzen und lässt sich flexibel konfigurieren. Der Schlüssel liegt darin, die Balance zwischen Einfachheit und den notwendigen Funktionen zu finden, die für robuste Event-Verarbeitung erforderlich sind.