Consumer Groups bilden das Herzstück der skalierbaren Event-Verarbeitung in Kafka. Sie ermöglichen es, die Last auf mehrere Consumer-Instanzen zu verteilen und dabei verschiedene Garantien bezüglich der Verarbeitungsreihenfolge und -sicherheit zu bieten.
Eine Consumer Group ist eine logische Gruppierung von Consumer-Instanzen, die sich die Partitionen eines oder mehrerer Topics teilen. Jede Partition wird dabei immer nur von einem Consumer innerhalb der Group verarbeitet, aber ein Consumer kann mehrere Partitionen bearbeiten.
Das Rebalancing erfolgt automatisch, wenn Consumer der Group beitreten oder sie verlassen. Kafka koordiniert diesen Prozess über einen Group Coordinator und stellt sicher, dass alle Partitionen weiterhin verarbeitet werden. Während des Rebalancing pausiert die Verarbeitung kurzzeitig.
// Spring Boot Consumer mit automatischem Rebalancing
@KafkaListener(topics = "order.placed.v1",
groupId = "inventory-service",
concurrency = "3") // 3 Consumer-Threads
public void handleOrderPlaced(OrderPlacedEvent event) {
reserveInventory(event.getOrderId(), event.getItems());
}# Python Consumer Group Setup
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'inventory-service',
'auto.offset.reset': 'earliest',
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 10000
})
consumer.subscribe(['order.placed.v1'])
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
process_order_placed(json.loads(msg.value()))Skalierungsstrategien hängen von der Partition-Anzahl ab. Bei einem Topic mit 6 Partitionen können maximal 6 Consumer parallel arbeiten. Zusätzliche Consumer bleiben idle und übernehmen erst Arbeit, wenn andere Consumer ausfallen oder die Group verlassen.
Session Management erfolgt über Heartbeats. Consumer senden regelmäßig Heartbeats an den Group Coordinator. Bleibt ein Heartbeat aus, wird der Consumer als ausgefallen betrachtet und seine Partitionen werden neu verteilt.
Die Verarbeitungsreihenfolge ist nur innerhalb einer Partition garantiert. Verschiedene Consumer verarbeiten verschiedene Partitionen parallel, wodurch die globale Reihenfolge verloren geht. Für Anwendungsfälle, die strikte Reihenfolge benötigen, muss die Partitionierung entsprechend gewählt werden.
Die Verarbeitungssemantik definiert, wie oft ein Event verarbeitet wird, wenn Fehler auftreten. Kafka unterstützt verschiedene Semantiken durch unterschiedliche Offset-Commit-Strategien.
At-most-once bedeutet, dass Events maximal einmal verarbeitet werden. Dazu wird der Offset committet, bevor die Verarbeitung beginnt. Schlägt die Verarbeitung fehl, geht das Event verloren.
// At-most-once durch Auto-Commit vor Verarbeitung
@KafkaListener(topics = "order.placed.v1",
containerFactory = "atMostOnceFactory")
public void handleOrderPlaced(OrderPlacedEvent event) {
// Offset bereits committet - Event kann verloren gehen
try {
processOrder(event);
} catch (Exception e) {
log.error("Event lost: " + event.getOrderId(), e);
// Event ist verloren, Offset bereits vorgerückt
}
}At-least-once garantiert, dass jedes Event mindestens einmal verarbeitet wird. Der Offset wird erst nach erfolgreicher Verarbeitung committet. Bei Fehlern wird das Event erneut verarbeitet, wodurch Duplikate entstehen können.
// At-least-once durch manuelles Commit nach Verarbeitung
@KafkaListener(topics = "order.placed.v1",
containerFactory = "manualCommitFactory")
public void handleOrderPlaced(OrderPlacedEvent event,
Acknowledgment ack) {
try {
processOrder(event);
ack.acknowledge(); // Offset erst nach Erfolg committen
} catch (Exception e) {
// Event wird erneut verarbeitet
log.error("Processing failed, will retry: " + event.getOrderId(), e);
}
}# Python At-least-once mit manuellem Commit
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'payment-service',
'enable.auto.commit': False # Manuelles Offset-Management
})
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
try:
process_payment(json.loads(msg.value()))
consumer.commit(msg) # Erst nach Erfolg committen
except Exception as e:
logging.error(f"Processing failed: {e}")
# Event wird beim nächsten Poll erneut geholtExactly-once ist theoretisch möglich, aber praktisch sehr aufwendig. Kafka bietet “Exactly-Once Semantics” (EOS) für Producer-Consumer-Ketten, aber nicht für einzelne Consumer mit externen Seiteneffekten.
Idempotenz ist oft der pragmatischere Ansatz als Exactly-once. Statt zu verhindern, dass Events mehrfach verarbeitet werden, wird sichergestellt, dass mehrfache Verarbeitung keine schädlichen Auswirkungen hat.
// Idempotente Event-Verarbeitung
@Service
public class PaymentService {
public void processPayment(OrderPlacedEvent event) {
// Prüfung auf bereits verarbeitete Events
if (paymentRepository.existsByOrderId(event.getOrderId())) {
log.info("Payment already processed for order: " + event.getOrderId());
return; // Idempotent - keine weitere Aktion
}
// Tatsächliche Verarbeitung
Payment payment = createPayment(event);
paymentRepository.save(payment);
}
}Offset-Commit-Strategien müssen an die Anwendungsanforderungen angepasst werden. Auto-Commit eignet sich für unkritische Verarbeitung, während manuelles Commit bei geschäftskritischen Events bevorzugt wird.
// Konfiguration verschiedener Commit-Strategien
@Configuration
public class KafkaConfig {
// Auto-Commit für unkritische Events
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
autoCommitFactory() {
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}
// Manuelles Commit für kritische Events
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
manualCommitFactory() {
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}Fehlerbehandlung sollte zwischen temporären und permanenten Fehlern unterscheiden. Temporäre Fehler (Netzwerkprobleme, Datenbankverbindung) rechtfertigen Wiederholung, permanente Fehler (ungültige Daten) sollten in Dead Letter Topics geleitet werden.
@Component
public class OrderEventHandler {
@Retryable(value = {TransientException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2))
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) throws Exception {
try {
processOrder(event);
} catch (ValidationException e) {
// Permanenter Fehler - an Dead Letter Topic
sendToDeadLetterTopic(event, e);
} catch (DatabaseException e) {
// Temporärer Fehler - Retry durch @Retryable
throw new TransientException("Database temporarily unavailable", e);
}
}
@Recover
public void handleRetryExhausted(TransientException ex, OrderPlacedEvent event) {
sendToDeadLetterTopic(event, ex);
}
}Monitoring und Observability sind entscheidend für produktive Consumer-Groups. Wichtige Metriken sind Consumer Lag (Rückstand bei der Verarbeitung), Processing Rate und Error Rate.
# Python Consumer mit Metrics
import time
from prometheus_client import Counter, Histogram, Gauge
events_processed = Counter('events_processed_total', 'Total processed events')
processing_time = Histogram('event_processing_seconds', 'Event processing time')
consumer_lag = Gauge('consumer_lag', 'Consumer lag per partition', ['partition'])
def process_with_metrics(msg):
start_time = time.time()
try:
process_event(json.loads(msg.value()))
events_processed.inc()
finally:
processing_time.observe(time.time() - start_time)
# Lag berechnen (vereinfacht)
lag = get_high_water_mark(msg.partition()) - msg.offset()
consumer_lag.labels(partition=msg.partition()).set(lag)Backpressure-Handling wird wichtig, wenn Consumer die Producer-Rate nicht halten können. Kafka bietet keine eingebaute Backpressure, aber Consumer können ihre Poll-Rate und Batch-Größe anpassen.
// Adaptive Batch-Größe basierend auf Processing Time
@Component
public class AdaptiveConsumer {
private int currentBatchSize = 10;
private final int MIN_BATCH = 1;
private final int MAX_BATCH = 100;
@KafkaListener(topics = "high.volume.events")
public void handleEvents(List<Event> events) {
long startTime = System.currentTimeMillis();
events.forEach(this::processEvent);
long processingTime = System.currentTimeMillis() - startTime;
adjustBatchSize(processingTime);
}
private void adjustBatchSize(long processingTime) {
if (processingTime > 5000 && currentBatchSize > MIN_BATCH) {
currentBatchSize = Math.max(MIN_BATCH, currentBatchSize - 5);
} else if (processingTime < 1000 && currentBatchSize < MAX_BATCH) {
currentBatchSize = Math.min(MAX_BATCH, currentBatchSize + 5);
}
// Batch-Größe an Container weitergeben
}
}Die Wahl der richtigen Consumer Group-Konfiguration hängt von den spezifischen Anforderungen ab. Für die meisten EDA-Anwendungen ist At-least-once mit idempotenter Verarbeitung die beste Balance zwischen Zuverlässigkeit und Performance.