Das Offset-Management ist entscheidend für die Zuverlässigkeit von Event-Consumer-Systemen. Offsets markieren die Position eines Consumers in einem Topic und bestimmen, welche Events als “verarbeitet” gelten. Die Wahl zwischen automatischem und manuellem Offset-Handling beeinflusst maßgeblich die Verarbeitungsgarantien und das Verhalten bei Fehlern.
Ein Offset ist ein numerischer Wert, der die Position einer Nachricht innerhalb einer Topic-Partition eindeutig identifiziert. Wenn ein Consumer ein Event verarbeitet hat, “committed” er den Offset – das bedeutet, er teilt Kafka mit, dass alle Events bis zu dieser Position erfolgreich verarbeitet wurden.
// Konzeptionelle Darstellung
// Topic: order.placed.v1, Partition: 0
// Events: [0] [1] [2] [3] [4] [5] [6] ...
// ^
// Last Committed Offset: 2
// (Events 0, 1, 2 sind als verarbeitet markiert)Bei einem Neustart beginnt der Consumer mit dem Event nach dem letzten committed Offset – in diesem Fall bei Event 3.
Bei automatischem Offset-Handling committed Kafka die Offsets in regelmäßigen Abständen, unabhängig davon, ob die Events tatsächlich erfolgreich verarbeitet wurden.
Spring Boot Konfiguration:
@Component
public class AutoCommitPaymentService {
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) {
log.info("Processing order: {}", event.getOrderId());
try {
processPayment(event);
// Offset wird automatisch committed, auch wenn Verarbeitung fehlschlägt
} catch (Exception e) {
log.error("Payment processing failed for order {}", event.getOrderId(), e);
// Event ist trotz Fehler als "verarbeitet" markiert!
}
}
}# application.yml für Auto-Commit
spring:
kafka:
consumer:
enable-auto-commit: true
auto-commit-interval: 5000 # Commit alle 5 SekundenPython Implementierung:
from confluent_kafka import Consumer
import json
class AutoCommitPaymentService:
def __init__(self):
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'payment-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True, # Automatisches Commit
'auto.commit.interval.ms': 5000
}
self.consumer = Consumer(config)
self.consumer.subscribe(['order.placed.v1'])
def process_events(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
try:
event = json.loads(msg.value().decode('utf-8'))
self.process_payment(event)
# Offset wird automatisch committed
except Exception as e:
print(f"Error processing event: {e}")
# Event gilt trotzdem als verarbeitet!Beim manuellen Offset-Handling bestimmt der Consumer explizit, wann ein Event als erfolgreich verarbeitet gilt.
Spring Boot Implementierung:
@Component
public class ManualCommitPaymentService {
@KafkaListener(
topics = "order.placed.v1",
containerFactory = "manualAckListenerContainerFactory"
)
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
log.info("Processing order: {}", event.getOrderId());
try {
PaymentResult result = processPayment(event);
if (result.isSuccessful()) {
publishPaymentProcessedEvent(event.getOrderId(), result);
// Erst nach erfolgreicher Verarbeitung committen
acknowledgment.acknowledge();
log.info("Successfully processed and committed order: {}", event.getOrderId());
} else {
log.error("Payment failed for order: {}", event.getOrderId());
// Kein Commit - Event wird beim nächsten Poll erneut verarbeitet
}
} catch (Exception e) {
log.error("Error processing order: {}", event.getOrderId(), e);
// Bei Exception kein Commit - Retry beim nächsten Start
}
}
}
@Configuration
public class ManualAckConfig {
@Bean("manualAckListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
manualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}Python Implementierung:
class ManualCommitPaymentService:
def __init__(self):
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'payment-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manuelles Commit
}
self.consumer = Consumer(config)
self.consumer.subscribe(['order.placed.v1'])
def process_events(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
try:
event = json.loads(msg.value().decode('utf-8'))
success = self.process_payment(event)
if success:
# Nur bei Erfolg committen
self.consumer.commit(msg)
print(f"Successfully processed and committed order: {event['orderId']}")
else:
print(f"Payment failed for order: {event['orderId']} - no commit")
except Exception as e:
print(f"Error processing event: {e} - no commit")
# Event wird beim nächsten Poll erneut verarbeitetFür bessere Performance können mehrere Events als Batch committed werden.
Spring Boot Batch-Commit:
@Component
public class BatchCommitPaymentService {
private static final int BATCH_SIZE = 10;
private final List<OrderPlacedEvent> eventBatch = new ArrayList<>();
private Acknowledgment pendingAcknowledment;
@KafkaListener(
topics = "order.placed.v1",
containerFactory = "batchManualAckListenerContainerFactory"
)
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
synchronized (this) {
eventBatch.add(event);
pendingAcknowledment = acknowledgment;
if (eventBatch.size() >= BATCH_SIZE) {
processBatch();
}
}
}
@Scheduled(fixedDelay = 5000)
public void processIncompleteBatch() {
synchronized (this) {
if (!eventBatch.isEmpty()) {
processBatch();
}
}
}
private void processBatch() {
try {
List<PaymentResult> results = paymentService.processBatchPayments(eventBatch);
boolean allSuccessful = results.stream().allMatch(PaymentResult::isSuccessful);
if (allSuccessful) {
// Alle Events erfolgreich - Batch committen
if (pendingAcknowledment != null) {
pendingAcknowledment.acknowledge();
}
log.info("Successfully processed and committed batch of {} orders",
eventBatch.size());
} else {
log.error("Some payments failed in batch - no commit");
// Bei teilweisem Fehler kein Commit - ganze Batch wird wiederholt
}
} catch (Exception e) {
log.error("Batch processing failed", e);
} finally {
eventBatch.clear();
pendingAcknowledment = null;
}
}
}@Component
public class RestartSafePaymentService {
private final PaymentRepository paymentRepository;
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
String orderId = event.getOrderId();
// Idempotenz-Check: Wurde diese Order bereits verarbeitet?
if (paymentRepository.existsByOrderId(orderId)) {
log.info("Order {} already processed, skipping", orderId);
acknowledgment.acknowledge();
return;
}
try {
PaymentResult result = processPayment(event);
// Payment-Record in DB speichern für Idempotenz
Payment payment = Payment.builder()
.orderId(orderId)
.amount(event.getTotalAmount())
.status(result.isSuccessful() ? PaymentStatus.COMPLETED : PaymentStatus.FAILED)
.build();
paymentRepository.save(payment);
if (result.isSuccessful()) {
publishPaymentProcessedEvent(orderId, result);
}
// Erst nach kompletter Verarbeitung committen
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Payment processing failed for order {}", orderId, e);
// Kein Commit - Event wird beim Restart erneut verarbeitet
}
}
}class SelectiveReprocessingService:
def __init__(self):
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'payment-service',
'enable.auto.commit': False
}
self.consumer = Consumer(config)
self.consumer.subscribe(['order.placed.v1'])
self.processed_orders = set() # In-Memory Cache für Demo
def process_events(self):
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
try:
event = json.loads(msg.value().decode('utf-8'))
order_id = event['orderId']
# Skip bereits verarbeitete Orders
if order_id in self.processed_orders:
print(f"Order {order_id} already processed, skipping")
self.consumer.commit(msg)
continue
success = self.process_payment(event)
if success:
self.processed_orders.add(order_id)
self.consumer.commit(msg)
print(f"Successfully processed order: {order_id}")
else:
print(f"Payment failed for order: {order_id} - will retry")
# Kein Commit - Retry beim nächsten Poll
except Exception as e:
print(f"Error processing event: {e}")@Component
public class OffsetManagementService {
private final KafkaAdmin kafkaAdmin;
// Programmatisches Zurücksetzen auf früheren Zeitpunkt
public void reprocessFromTimestamp(String groupId, long timestampMillis) {
AdminClient adminClient = AdminClient.create(kafkaAdminConfig());
try {
// Consumer Group stoppen
stopConsumerGroup(groupId);
// Offsets auf Timestamp zurücksetzen
Map<TopicPartition, OffsetAndTimestamp> timestampOffsets =
getOffsetsForTimestamp(timestampMillis);
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
timestampOffsets.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new OffsetAndMetadata(entry.getValue().offset())
));
// Neue Offsets committen
adminClient.alterConsumerGroupOffsets(groupId, offsetsToCommit);
log.info("Reset offsets for group {} to timestamp {}", groupId, timestampMillis);
} finally {
adminClient.close();
// Consumer Group wieder starten
startConsumerGroup(groupId);
}
}
// Reprocessing einer bestimmten Order
@EventListener
public void handleReprocessRequest(ReprocessOrderEvent event) {
String orderId = event.getOrderId();
// Suche nach dem spezifischen Event
TopicPartition partition = findPartitionForOrder(orderId);
long offset = findOffsetForOrder(orderId, partition);
if (offset >= 0) {
// Temporären Consumer für Reprocessing erstellen
reprocessSingleEvent(partition, offset);
}
}
}Die Standard-Semantik bei manuellem Offset-Handling garantiert, dass jedes Event mindestens einmal verarbeitet wird.
@Component
public class AtLeastOncePaymentService {
@KafkaListener(topics = "order.placed.v1")
@Transactional
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
try {
// 1. Geschäftslogik ausführen
PaymentResult result = processPayment(event);
// 2. Lokalen Zustand in DB persistieren
savePaymentResult(event.getOrderId(), result);
// 3. Follow-up Events publizieren
if (result.isSuccessful()) {
publishPaymentProcessedEvent(event.getOrderId(), result);
}
// 4. Erst nach kompletter Verarbeitung committen
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Processing failed for order {}", event.getOrderId(), e);
// Bei Fehler kein Commit - Event wird erneut verarbeitet
// Dadurch ist At-least-once garantiert
}
}
}Für Exactly-once-Semantik muss Idempotenz sichergestellt werden.
@Component
public class ExactlyOncePaymentService {
private final PaymentRepository paymentRepository;
private final IdempotencyService idempotencyService;
@KafkaListener(topics = "order.placed.v1")
@Transactional
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
String orderId = event.getOrderId();
String idempotencyKey = generateIdempotencyKey(event);
// Idempotenz-Check
if (idempotencyService.isAlreadyProcessed(idempotencyKey)) {
log.info("Event already processed with key {}, skipping", idempotencyKey);
acknowledgment.acknowledge();
return;
}
try {
// Idempotency-Record erstellen (verhindert Doppelverarbeitung)
idempotencyService.markAsProcessing(idempotencyKey);
PaymentResult result = processPayment(event);
// Atomare Operation: Payment + Idempotency Record
Payment payment = createPaymentRecord(event, result);
paymentRepository.save(payment);
idempotencyService.markAsCompleted(idempotencyKey, payment.getId());
if (result.isSuccessful()) {
publishPaymentProcessedEvent(orderId, result);
}
acknowledgment.acknowledge();
} catch (Exception e) {
idempotencyService.markAsFailed(idempotencyKey, e.getMessage());
throw e;
}
}
private String generateIdempotencyKey(OrderPlacedEvent event) {
// Eindeutiger Key basierend auf Event-Inhalt
return "payment:" + event.getOrderId() + ":" +
event.getEventId() + ":" + event.getTimestamp();
}
}@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
private String id;
private String aggregateId;
private String eventType;
private String eventPayload;
private LocalDateTime createdAt;
private boolean processed;
}
@Component
@Transactional
public class TransactionalOutboxPaymentService {
private final PaymentRepository paymentRepository;
private final OutboxEventRepository outboxRepository;
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
try {
String orderId = event.getOrderId();
// Alles in einer Datenbank-Transaktion
PaymentResult result = processPayment(event);
// 1. Payment-Record speichern
Payment payment = createPaymentRecord(event, result);
paymentRepository.save(payment);
// 2. Outbound-Event in Outbox speichern (nicht direkt senden!)
if (result.isSuccessful()) {
OutboxEvent outboxEvent = OutboxEvent.builder()
.id(UUID.randomUUID().toString())
.aggregateId(orderId)
.eventType("PaymentProcessed")
.eventPayload(createPaymentProcessedEventJson(orderId, result))
.createdAt(LocalDateTime.now())
.processed(false)
.build();
outboxRepository.save(outboxEvent);
}
// 3. Kafka-Offset committen
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Transaction failed for order {}", event.getOrderId(), e);
throw e; // Rollback der gesamten Transaktion
}
}
}
// Separater Service für Outbox-Verarbeitung
@Component
public class OutboxProcessor {
@Scheduled(fixedDelay = 1000)
@Transactional
public void processOutboxEvents() {
List<OutboxEvent> unprocessedEvents =
outboxRepository.findByProcessedFalseOrderByCreatedAt(PageRequest.of(0, 10));
for (OutboxEvent event : unprocessedEvents) {
try {
kafkaTemplate.send("payment.processed.v1", event.getEventPayload());
event.setProcessed(true);
outboxRepository.save(event);
} catch (Exception e) {
log.error("Failed to send outbox event {}", event.getId(), e);
}
}
}
}Die Wahl der Offset-Strategie hängt von den spezifischen Anforderungen ab: Automatisches Offset-Handling bietet bessere Performance bei relaxteren Konsistenzanforderungen, während manuelles Handling präzise Kontrolle über Verarbeitungsgarantien ermöglicht. Für kritische Geschäftsprozesse wie Zahlungsverarbeitung ist manuelles Offset-Handling meist die bessere Wahl.
Im siehe Kapitel zu “Fehlertoleranz, Dead Letter Topics, Wiederholungen” werden wir robuste Strategien für den Umgang mit nicht verarbeitbaren Events behandeln, während das siehe Kapitel zu “Verarbeitung und Zustandsmanagement” die Auswirkungen verschiedener Offset-Strategien auf die Zustandskonsistenz vertieft.