28 Eventverarbeitung mit Spring Boot

Spring Boot bietet mit Spring Kafka eine elegante Integration für Event-Consumer. Die deklarative Programmierung mit Annotations macht die Implementierung von Event-Handlern intuitiv und reduziert Boilerplate-Code erheblich.

28.1 KafkaListener Annotation

Die @KafkaListener Annotation ist das Herzstück der Spring Boot Event-Verarbeitung. Sie ermöglicht es, Methoden direkt als Event-Handler zu markieren.

28.1.1 Grundlegende Verwendung

@Component
@Slf4j
public class PaymentService {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        log.info("Received order placed event for order: {}", event.getOrderId());
        processPayment(event);
    }
    
    private void processPayment(OrderPlacedEvent event) {
        // Geschäftslogik für Zahlungsverarbeitung
        PaymentRequest request = PaymentRequest.builder()
            .orderId(event.getOrderId())
            .customerId(event.getCustomerId())
            .amount(event.getTotalAmount())
            .currency(event.getCurrency())
            .build();
            
        // Simulierte Zahlungsverarbeitung
        PaymentResult result = paymentGateway.processPayment(request);
        
        if (result.isSuccessful()) {
            publishPaymentProcessedEvent(event.getOrderId(), result);
        } else {
            handlePaymentFailure(event, result);
        }
    }
}

28.1.2 Topic-Patterns und dynamische Topics

@Component
public class OrderEventService {
    
    // Mehrere Topics gleichzeitig überwachen
    @KafkaListener(topics = {"order.placed.v1", "order.updated.v1"})
    public void handleOrderEvents(OrderEvent event) {
        switch (event.getEventType()) {
            case "OrderPlaced":
                handleOrderPlaced((OrderPlacedEvent) event);
                break;
            case "OrderUpdated":
                handleOrderUpdated((OrderUpdatedEvent) event);
                break;
        }
    }
    
    // Pattern-basierte Topic-Auswahl
    @KafkaListener(topicPattern = "order\\..*\\.v1")
    public void handleAllOrderEvents(OrderEvent event, 
                                   @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("Processing event from topic: {}", topic);
        routeEventByTopic(event, topic);
    }
}

28.1.3 Consumer Groups und Partitionierung

@Component
public class ScalablePaymentService {
    
    @KafkaListener(
        topics = "order.placed.v1",
        groupId = "payment-service-group",
        concurrency = "3"  // 3 parallele Consumer-Instanzen
    )
    public void handleOrderPlaced(OrderPlacedEvent event,
                                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        log.info("Processing order {} on partition {}", event.getOrderId(), partition);
        processPayment(event);
    }
    
    // Partitions-spezifische Verarbeitung
    @KafkaListener(
        topicPartitions = @TopicPartition(
            topic = "order.placed.v1", 
            partitions = {"0", "1"}
        ),
        groupId = "priority-payment-group"
    )
    public void handlePriorityOrders(OrderPlacedEvent event) {
        // Spezielle Behandlung für bestimmte Partitions
        processPriorityPayment(event);
    }
}

28.1.4 Message-Header und Metadaten

@Component
public class EnhancedPaymentService {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(
            OrderPlacedEvent event,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header("correlation-id") String correlationId,
            ConsumerRecord<String, OrderPlacedEvent> record) {
        
        log.info("Processing event: correlationId={}, timestamp={}, offset={}", 
                correlationId, timestamp, record.offset());
        
        // Korrelations-ID für Tracing verwenden
        try (MDCCloseable mdcCloseable = MDC.putCloseable("correlationId", correlationId)) {
            processPayment(event);
        }
    }
}

28.2 Consumer Configuration

28.2.1 Application Properties

# application.yml
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: payment-service
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "de.eda.training.events"
        max.poll.records: 10
        session.timeout.ms: 30000
        heartbeat.interval.ms: 3000
        
    listener:
      concurrency: 3
      poll-timeout: 3000
      ack-mode: manual_immediate

28.2.2 Programmatische Konfiguration

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    
    @Bean
    public ConsumerFactory<String, OrderPlacedEvent> orderConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-service");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.eda.training.events");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Performance-Tuning
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> 
           kafkaListenerContainerFactory() {
        
        ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(orderConsumerFactory());
        factory.setConcurrency(3);
        
        // Error Handling konfigurieren
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(1000L, 3L)  // 3 Versuche mit 1s Pause
        ));
        
        return factory;
    }
}

28.2.3 Conditional Consumer

@Component
@ConditionalOnProperty(
    value = "payment.service.enabled", 
    havingValue = "true", 
    matchIfMissing = true
)
public class ConditionalPaymentService {
    
    @KafkaListener(
        topics = "order.placed.v1",
        condition = "#{environment.getProperty('payment.processing.mode') == 'async'}"
    )
    public void handleOrderPlacedAsync(OrderPlacedEvent event) {
        processPaymentAsync(event);
    }
    
    @KafkaListener(
        topics = "order.placed.v1",
        condition = "#{@paymentModeService.isRealTimeMode()}"
    )
    public void handleOrderPlacedRealTime(OrderPlacedEvent event) {
        processPaymentRealTime(event);
    }
}

28.3 Error Recovery

28.3.1 Retry-Mechanismen

@Component
public class ResilientPaymentService {
    private final PaymentGateway paymentGateway;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @KafkaListener(topics = "order.placed.v1")
    @Retryable(
        value = {PaymentGatewayException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void handleOrderPlaced(OrderPlacedEvent event) {
        log.info("Attempting payment processing for order: {}", event.getOrderId());
        
        try {
            PaymentResult result = paymentGateway.processPayment(
                createPaymentRequest(event)
            );
            
            if (result.isSuccessful()) {
                publishPaymentProcessedEvent(event.getOrderId(), result);
            } else {
                throw new PaymentProcessingException(
                    "Payment failed: " + result.getErrorMessage()
                );
            }
        } catch (PaymentGatewayException e) {
            log.warn("Payment gateway error for order {}: {}", 
                    event.getOrderId(), e.getMessage());
            throw e; // Wird von @Retryable abgefangen
        }
    }
    
    @Recover
    public void recoverFromPaymentFailure(PaymentGatewayException e, 
                                        OrderPlacedEvent event) {
        log.error("Payment processing failed permanently for order {}", 
                 event.getOrderId(), e);
        
        // Dead Letter Topic senden
        sendToDeadLetterTopic(event, e);
        
        // Alternative: Kompensation starten
        publishPaymentFailedEvent(event.getOrderId(), e.getMessage());
    }
}

28.3.2 Custom Error Handler

@Component
public class PaymentServiceErrorHandler implements ConsumerAwareListenerErrorHandler {
    
    @Override
    public Object handleError(Message<?> message, 
                            ListenerExecutionFailedException exception,
                            Consumer<?, ?> consumer) {
        
        log.error("Error processing message: {}", message.getPayload(), exception);
        
        // Message-Header auswerten
        String topic = (String) message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
        Integer partition = (Integer) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
        Long offset = (Long) message.getHeaders().get(KafkaHeaders.OFFSET);
        
        // Error-Klassifizierung
        if (isRetryableError(exception)) {
            log.warn("Retryable error for message at {}:{}:{}", topic, partition, offset);
            throw exception; // Weiterwerfen für Retry-Mechanismus
        } else {
            log.error("Non-retryable error, sending to DLT");
            sendToDeadLetterTopic(message, exception);
            return null; // Message als verarbeitet markieren
        }
    }
    
    private boolean isRetryableError(Exception e) {
        return e instanceof PaymentGatewayException ||
               e instanceof TemporaryServiceException;
    }
}

28.3.3 Dead Letter Topic Handler

@Component
public class DeadLetterTopicHandler {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final PaymentService paymentService;
    
    @KafkaListener(topics = "order.placed.v1.DLT")
    public void handleDeadLetterMessage(
            OrderPlacedEvent event,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stackTrace) {
        
        log.error("Processing dead letter message for order {}: {}", 
                 event.getOrderId(), errorMessage);
        
        // Manual Review oder alternative Verarbeitung
        DeadLetterRecord record = DeadLetterRecord.builder()
            .orderId(event.getOrderId())
            .originalEvent(event)
            .errorMessage(errorMessage)
            .timestamp(Instant.now())
            .status("PENDING_REVIEW")
            .build();
            
        deadLetterRepository.save(record);
        
        // Notification an Operations-Team
        notificationService.notifyDeadLetterMessage(record);
    }
    
    // Manueller Retry-Endpoint
    @EventListener
    public void handleManualRetry(DeadLetterRetryEvent retryEvent) {
        try {
            paymentService.handleOrderPlaced(retryEvent.getOriginalEvent());
            deadLetterRepository.markAsResolved(retryEvent.getRecordId());
        } catch (Exception e) {
            log.error("Manual retry failed for record {}", retryEvent.getRecordId(), e);
        }
    }
}

28.4 Practical Spring Boot Examples

28.4.1 Batch Processing mit Spring Boot

@Component
public class BatchPaymentService {
    private final List<OrderPlacedEvent> eventBatch = new ArrayList<>();
    private final Object batchLock = new Object();
    
    @Value("${payment.batch.size:10}")
    private int batchSize;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        synchronized (batchLock) {
            eventBatch.add(event);
            
            if (eventBatch.size() >= batchSize) {
                processBatch(new ArrayList<>(eventBatch));
                eventBatch.clear();
            }
        }
    }
    
    @Scheduled(fixedDelay = 5000) // Fallback: alle 5 Sekunden
    public void processIncompleteBatch() {
        synchronized (batchLock) {
            if (!eventBatch.isEmpty()) {
                processBatch(new ArrayList<>(eventBatch));
                eventBatch.clear();
            }
        }
    }
    
    private void processBatch(List<OrderPlacedEvent> events) {
        log.info("Processing payment batch with {} orders", events.size());
        
        List<PaymentRequest> requests = events.stream()
            .map(this::createPaymentRequest)
            .collect(Collectors.toList());
            
        List<PaymentResult> results = paymentGateway.processBatchPayments(requests);
        
        // Ergebnisse verarbeiten
        for (int i = 0; i < events.size(); i++) {
            OrderPlacedEvent event = events.get(i);
            PaymentResult result = results.get(i);
            
            if (result.isSuccessful()) {
                publishPaymentProcessedEvent(event.getOrderId(), result);
            } else {
                handlePaymentFailure(event, result);
            }
        }
    }
}

28.4.2 Conditional Message Processing

@Component
public class ConditionalOrderProcessor {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event,
                                @Header Map<String, Object> headers) {
        
        // Processing basierend auf Message-Properties
        String priority = getHeaderValue(headers, "priority", "normal");
        String customerType = getHeaderValue(headers, "customer-type", "regular");
        
        if ("high".equals(priority)) {
            processHighPriorityOrder(event);
        } else if ("premium".equals(customerType)) {
            processPremiumCustomerOrder(event);
        } else {
            processRegularOrder(event);
        }
    }
    
    private void processHighPriorityOrder(OrderPlacedEvent event) {
        log.info("Processing high priority order: {}", event.getOrderId());
        // Express-Verarbeitung
        paymentService.processExpressPayment(event);
        shippingService.scheduleExpressShipping(event);
    }
    
    private void processPremiumCustomerOrder(OrderPlacedEvent event) {
        log.info("Processing premium customer order: {}", event.getOrderId());
        // Premium-Features
        paymentService.processPaymentWithBonusPoints(event);
        notificationService.sendPremiumConfirmation(event);
    }
    
    private void processRegularOrder(OrderPlacedEvent event) {
        log.info("Processing regular order: {}", event.getOrderId());
        paymentService.processStandardPayment(event);
    }
    
    private String getHeaderValue(Map<String, Object> headers, String key, String defaultValue) {
        Object value = headers.get(key);
        return value != null ? value.toString() : defaultValue;
    }
}

28.4.3 Transaction Management

@Component
@Transactional
public class TransactionalPaymentService {
    private final PaymentRepository paymentRepository;
    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @KafkaListener(topics = "order.placed.v1")
    @Transactional(rollbackFor = Exception.class)
    public void handleOrderPlaced(OrderPlacedEvent event) {
        try {
            // 1. Order in lokaler DB erstellen/aktualisieren
            Order order = createOrUpdateOrder(event);
            
            // 2. Payment Record erstellen
            Payment payment = Payment.builder()
                .orderId(event.getOrderId())
                .amount(event.getTotalAmount())
                .currency(event.getCurrency())
                .status(PaymentStatus.PENDING)
                .build();
            
            paymentRepository.save(payment);
            
            // 3. External Payment Gateway (außerhalb Transaktion)
            PaymentResult result = processExternalPayment(event);
            
            // 4. Payment Status aktualisieren
            payment.setStatus(result.isSuccessful() ? 
                PaymentStatus.COMPLETED : PaymentStatus.FAILED);
            payment.setGatewayTransactionId(result.getTransactionId());
            paymentRepository.save(payment);
            
            // 5. Follow-up Event senden
            if (result.isSuccessful()) {
                publishPaymentProcessedEvent(event.getOrderId(), result);
            }
            
        } catch (Exception e) {
            log.error("Error processing order payment for {}", event.getOrderId(), e);
            throw e; // Rollback der Transaktion
        }
    }
    
    // Separate Methode ohne @Transactional für externe Calls
    private PaymentResult processExternalPayment(OrderPlacedEvent event) {
        // Externe Service-Calls sollten nicht in DB-Transaktion laufen
        return paymentGateway.processPayment(createPaymentRequest(event));
    }
}

Die Spring Boot Integration macht Event-Consumer-Implementierung durch deklarative Programmierung sehr effizient. Die Kombination aus Annotations, automatischer Konfiguration und umfassendem Error-Handling reduziert den Code-Aufwand erheblich und ermöglicht es Entwicklern, sich auf die Geschäftslogik zu konzentrieren.

Im siehe Kapitel zu “Eventverarbeitung mit Python” werden wir entsprechende Implementierungen mit asyncio-basierten Ansätzen betrachten, während das siehe Kapitel zu “Manuelles vs. automatisches Offset-Handling” die Details der Message-Bestätigung vertieft.