30 Manuelles vs. automatisches Offset-Handling

Das Offset-Management ist entscheidend für die Zuverlässigkeit von Event-Consumer-Systemen. Offsets markieren die Position eines Consumers in einem Topic und bestimmen, welche Events als “verarbeitet” gelten. Die Wahl zwischen automatischem und manuellem Offset-Handling beeinflusst maßgeblich die Verarbeitungsgarantien und das Verhalten bei Fehlern.

30.1 Grundlagen des Offset-Managements

Ein Offset ist ein numerischer Wert, der die Position einer Nachricht innerhalb einer Topic-Partition eindeutig identifiziert. Wenn ein Consumer ein Event verarbeitet hat, “committed” er den Offset – das bedeutet, er teilt Kafka mit, dass alle Events bis zu dieser Position erfolgreich verarbeitet wurden.

// Konzeptionelle Darstellung
// Topic: order.placed.v1, Partition: 0
// Events: [0] [1] [2] [3] [4] [5] [6] ...
//              ^
//         Last Committed Offset: 2
//         (Events 0, 1, 2 sind als verarbeitet markiert)

Bei einem Neustart beginnt der Consumer mit dem Event nach dem letzten committed Offset – in diesem Fall bei Event 3.

30.2 Offset Commit Strategies

30.2.1 Automatisches Offset-Handling

Bei automatischem Offset-Handling committed Kafka die Offsets in regelmäßigen Abständen, unabhängig davon, ob die Events tatsächlich erfolgreich verarbeitet wurden.

Spring Boot Konfiguration:

@Component
public class AutoCommitPaymentService {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        log.info("Processing order: {}", event.getOrderId());
        
        try {
            processPayment(event);
            // Offset wird automatisch committed, auch wenn Verarbeitung fehlschlägt
        } catch (Exception e) {
            log.error("Payment processing failed for order {}", event.getOrderId(), e);
            // Event ist trotz Fehler als "verarbeitet" markiert!
        }
    }
}
# application.yml für Auto-Commit
spring:
  kafka:
    consumer:
      enable-auto-commit: true
      auto-commit-interval: 5000  # Commit alle 5 Sekunden

Python Implementierung:

from confluent_kafka import Consumer
import json

class AutoCommitPaymentService:
    def __init__(self):
        config = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'payment-service',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': True,  # Automatisches Commit
            'auto.commit.interval.ms': 5000
        }
        self.consumer = Consumer(config)
        self.consumer.subscribe(['order.placed.v1'])
    
    def process_events(self):
        while True:
            msg = self.consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
                
            try:
                event = json.loads(msg.value().decode('utf-8'))
                self.process_payment(event)
                # Offset wird automatisch committed
                
            except Exception as e:
                print(f"Error processing event: {e}")
                # Event gilt trotzdem als verarbeitet!

30.2.2 Manuelles Offset-Handling

Beim manuellen Offset-Handling bestimmt der Consumer explizit, wann ein Event als erfolgreich verarbeitet gilt.

Spring Boot Implementierung:

@Component
public class ManualCommitPaymentService {
    
    @KafkaListener(
        topics = "order.placed.v1",
        containerFactory = "manualAckListenerContainerFactory"
    )
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        log.info("Processing order: {}", event.getOrderId());
        
        try {
            PaymentResult result = processPayment(event);
            
            if (result.isSuccessful()) {
                publishPaymentProcessedEvent(event.getOrderId(), result);
                // Erst nach erfolgreicher Verarbeitung committen
                acknowledgment.acknowledge();
                log.info("Successfully processed and committed order: {}", event.getOrderId());
            } else {
                log.error("Payment failed for order: {}", event.getOrderId());
                // Kein Commit - Event wird beim nächsten Poll erneut verarbeitet
            }
            
        } catch (Exception e) {
            log.error("Error processing order: {}", event.getOrderId(), e);
            // Bei Exception kein Commit - Retry beim nächsten Start
        }
    }
}

@Configuration
public class ManualAckConfig {
    
    @Bean("manualAckListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> 
           manualAckListenerContainerFactory() {
        
        ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        
        return factory;
    }
}

Python Implementierung:

class ManualCommitPaymentService:
    def __init__(self):
        config = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'payment-service',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False  # Manuelles Commit
        }
        self.consumer = Consumer(config)
        self.consumer.subscribe(['order.placed.v1'])
    
    def process_events(self):
        while True:
            msg = self.consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
                
            try:
                event = json.loads(msg.value().decode('utf-8'))
                success = self.process_payment(event)
                
                if success:
                    # Nur bei Erfolg committen
                    self.consumer.commit(msg)
                    print(f"Successfully processed and committed order: {event['orderId']}")
                else:
                    print(f"Payment failed for order: {event['orderId']} - no commit")
                    
            except Exception as e:
                print(f"Error processing event: {e} - no commit")
                # Event wird beim nächsten Poll erneut verarbeitet

30.2.3 Batch-Commit-Strategien

Für bessere Performance können mehrere Events als Batch committed werden.

Spring Boot Batch-Commit:

@Component
public class BatchCommitPaymentService {
    private static final int BATCH_SIZE = 10;
    private final List<OrderPlacedEvent> eventBatch = new ArrayList<>();
    private Acknowledgment pendingAcknowledment;
    
    @KafkaListener(
        topics = "order.placed.v1",
        containerFactory = "batchManualAckListenerContainerFactory"
    )
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        synchronized (this) {
            eventBatch.add(event);
            pendingAcknowledment = acknowledgment;
            
            if (eventBatch.size() >= BATCH_SIZE) {
                processBatch();
            }
        }
    }
    
    @Scheduled(fixedDelay = 5000)
    public void processIncompleteBatch() {
        synchronized (this) {
            if (!eventBatch.isEmpty()) {
                processBatch();
            }
        }
    }
    
    private void processBatch() {
        try {
            List<PaymentResult> results = paymentService.processBatchPayments(eventBatch);
            
            boolean allSuccessful = results.stream().allMatch(PaymentResult::isSuccessful);
            
            if (allSuccessful) {
                // Alle Events erfolgreich - Batch committen
                if (pendingAcknowledment != null) {
                    pendingAcknowledment.acknowledge();
                }
                log.info("Successfully processed and committed batch of {} orders", 
                        eventBatch.size());
            } else {
                log.error("Some payments failed in batch - no commit");
                // Bei teilweisem Fehler kein Commit - ganze Batch wird wiederholt
            }
            
        } catch (Exception e) {
            log.error("Batch processing failed", e);
        } finally {
            eventBatch.clear();
            pendingAcknowledment = null;
        }
    }
}

30.3 Reprocessing Scenarios

30.3.1 Consumer-Restart mit uncommitted Events

@Component
public class RestartSafePaymentService {
    private final PaymentRepository paymentRepository;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        String orderId = event.getOrderId();
        
        // Idempotenz-Check: Wurde diese Order bereits verarbeitet?
        if (paymentRepository.existsByOrderId(orderId)) {
            log.info("Order {} already processed, skipping", orderId);
            acknowledgment.acknowledge();
            return;
        }
        
        try {
            PaymentResult result = processPayment(event);
            
            // Payment-Record in DB speichern für Idempotenz
            Payment payment = Payment.builder()
                .orderId(orderId)
                .amount(event.getTotalAmount())
                .status(result.isSuccessful() ? PaymentStatus.COMPLETED : PaymentStatus.FAILED)
                .build();
            
            paymentRepository.save(payment);
            
            if (result.isSuccessful()) {
                publishPaymentProcessedEvent(orderId, result);
            }
            
            // Erst nach kompletter Verarbeitung committen
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            log.error("Payment processing failed for order {}", orderId, e);
            // Kein Commit - Event wird beim Restart erneut verarbeitet
        }
    }
}

30.3.2 Selective Reprocessing

class SelectiveReprocessingService:
    def __init__(self):
        config = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'payment-service',
            'enable.auto.commit': False
        }
        self.consumer = Consumer(config)
        self.consumer.subscribe(['order.placed.v1'])
        self.processed_orders = set()  # In-Memory Cache für Demo
    
    def process_events(self):
        while True:
            msg = self.consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
                
            try:
                event = json.loads(msg.value().decode('utf-8'))
                order_id = event['orderId']
                
                # Skip bereits verarbeitete Orders
                if order_id in self.processed_orders:
                    print(f"Order {order_id} already processed, skipping")
                    self.consumer.commit(msg)
                    continue
                
                success = self.process_payment(event)
                
                if success:
                    self.processed_orders.add(order_id)
                    self.consumer.commit(msg)
                    print(f"Successfully processed order: {order_id}")
                else:
                    print(f"Payment failed for order: {order_id} - will retry")
                    # Kein Commit - Retry beim nächsten Poll
                    
            except Exception as e:
                print(f"Error processing event: {e}")

30.3.3 Offset Reset und Rewind

@Component
public class OffsetManagementService {
    private final KafkaAdmin kafkaAdmin;
    
    // Programmatisches Zurücksetzen auf früheren Zeitpunkt
    public void reprocessFromTimestamp(String groupId, long timestampMillis) {
        AdminClient adminClient = AdminClient.create(kafkaAdminConfig());
        
        try {
            // Consumer Group stoppen
            stopConsumerGroup(groupId);
            
            // Offsets auf Timestamp zurücksetzen
            Map<TopicPartition, OffsetAndTimestamp> timestampOffsets = 
                getOffsetsForTimestamp(timestampMillis);
            
            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
                timestampOffsets.entrySet().stream()
                    .collect(Collectors.toMap(
                        Map.Entry::getKey,
                        entry -> new OffsetAndMetadata(entry.getValue().offset())
                    ));
            
            // Neue Offsets committen
            adminClient.alterConsumerGroupOffsets(groupId, offsetsToCommit);
            
            log.info("Reset offsets for group {} to timestamp {}", groupId, timestampMillis);
            
        } finally {
            adminClient.close();
            // Consumer Group wieder starten
            startConsumerGroup(groupId);
        }
    }
    
    // Reprocessing einer bestimmten Order
    @EventListener
    public void handleReprocessRequest(ReprocessOrderEvent event) {
        String orderId = event.getOrderId();
        
        // Suche nach dem spezifischen Event
        TopicPartition partition = findPartitionForOrder(orderId);
        long offset = findOffsetForOrder(orderId, partition);
        
        if (offset >= 0) {
            // Temporären Consumer für Reprocessing erstellen
            reprocessSingleEvent(partition, offset);
        }
    }
}

30.4 Consistency Guarantees

30.4.1 At-least-once Delivery

Die Standard-Semantik bei manuellem Offset-Handling garantiert, dass jedes Event mindestens einmal verarbeitet wird.

@Component
public class AtLeastOncePaymentService {
    
    @KafkaListener(topics = "order.placed.v1")
    @Transactional
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        try {
            // 1. Geschäftslogik ausführen
            PaymentResult result = processPayment(event);
            
            // 2. Lokalen Zustand in DB persistieren
            savePaymentResult(event.getOrderId(), result);
            
            // 3. Follow-up Events publizieren
            if (result.isSuccessful()) {
                publishPaymentProcessedEvent(event.getOrderId(), result);
            }
            
            // 4. Erst nach kompletter Verarbeitung committen
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            log.error("Processing failed for order {}", event.getOrderId(), e);
            // Bei Fehler kein Commit - Event wird erneut verarbeitet
            // Dadurch ist At-least-once garantiert
        }
    }
}

30.4.2 Exactly-once Semantik

Für Exactly-once-Semantik muss Idempotenz sichergestellt werden.

@Component
public class ExactlyOncePaymentService {
    private final PaymentRepository paymentRepository;
    private final IdempotencyService idempotencyService;
    
    @KafkaListener(topics = "order.placed.v1")
    @Transactional
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        String orderId = event.getOrderId();
        String idempotencyKey = generateIdempotencyKey(event);
        
        // Idempotenz-Check
        if (idempotencyService.isAlreadyProcessed(idempotencyKey)) {
            log.info("Event already processed with key {}, skipping", idempotencyKey);
            acknowledgment.acknowledge();
            return;
        }
        
        try {
            // Idempotency-Record erstellen (verhindert Doppelverarbeitung)
            idempotencyService.markAsProcessing(idempotencyKey);
            
            PaymentResult result = processPayment(event);
            
            // Atomare Operation: Payment + Idempotency Record
            Payment payment = createPaymentRecord(event, result);
            paymentRepository.save(payment);
            idempotencyService.markAsCompleted(idempotencyKey, payment.getId());
            
            if (result.isSuccessful()) {
                publishPaymentProcessedEvent(orderId, result);
            }
            
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            idempotencyService.markAsFailed(idempotencyKey, e.getMessage());
            throw e;
        }
    }
    
    private String generateIdempotencyKey(OrderPlacedEvent event) {
        // Eindeutiger Key basierend auf Event-Inhalt
        return "payment:" + event.getOrderId() + ":" + 
               event.getEventId() + ":" + event.getTimestamp();
    }
}

30.4.3 Transaktionale Outbox für Exactly-once

@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
    @Id
    private String id;
    private String aggregateId;
    private String eventType;
    private String eventPayload;
    private LocalDateTime createdAt;
    private boolean processed;
}

@Component
@Transactional
public class TransactionalOutboxPaymentService {
    private final PaymentRepository paymentRepository;
    private final OutboxEventRepository outboxRepository;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment acknowledgment) {
        try {
            String orderId = event.getOrderId();
            
            // Alles in einer Datenbank-Transaktion
            PaymentResult result = processPayment(event);
            
            // 1. Payment-Record speichern
            Payment payment = createPaymentRecord(event, result);
            paymentRepository.save(payment);
            
            // 2. Outbound-Event in Outbox speichern (nicht direkt senden!)
            if (result.isSuccessful()) {
                OutboxEvent outboxEvent = OutboxEvent.builder()
                    .id(UUID.randomUUID().toString())
                    .aggregateId(orderId)
                    .eventType("PaymentProcessed")
                    .eventPayload(createPaymentProcessedEventJson(orderId, result))
                    .createdAt(LocalDateTime.now())
                    .processed(false)
                    .build();
                
                outboxRepository.save(outboxEvent);
            }
            
            // 3. Kafka-Offset committen
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            log.error("Transaction failed for order {}", event.getOrderId(), e);
            throw e; // Rollback der gesamten Transaktion
        }
    }
}

// Separater Service für Outbox-Verarbeitung
@Component
public class OutboxProcessor {
    
    @Scheduled(fixedDelay = 1000)
    @Transactional
    public void processOutboxEvents() {
        List<OutboxEvent> unprocessedEvents = 
            outboxRepository.findByProcessedFalseOrderByCreatedAt(PageRequest.of(0, 10));
        
        for (OutboxEvent event : unprocessedEvents) {
            try {
                kafkaTemplate.send("payment.processed.v1", event.getEventPayload());
                
                event.setProcessed(true);
                outboxRepository.save(event);
                
            } catch (Exception e) {
                log.error("Failed to send outbox event {}", event.getId(), e);
            }
        }
    }
}

Die Wahl der Offset-Strategie hängt von den spezifischen Anforderungen ab: Automatisches Offset-Handling bietet bessere Performance bei relaxteren Konsistenzanforderungen, während manuelles Handling präzise Kontrolle über Verarbeitungsgarantien ermöglicht. Für kritische Geschäftsprozesse wie Zahlungsverarbeitung ist manuelles Offset-Handling meist die bessere Wahl.

Im siehe Kapitel zu “Fehlertoleranz, Dead Letter Topics, Wiederholungen” werden wir robuste Strategien für den Umgang mit nicht verarbeitbaren Events behandeln, während das siehe Kapitel zu “Verarbeitung und Zustandsmanagement” die Auswirkungen verschiedener Offset-Strategien auf die Zustandskonsistenz vertieft.