Spring Boot bietet mit Spring Kafka eine elegante Integration für Event-Consumer. Die deklarative Programmierung mit Annotations macht die Implementierung von Event-Handlern intuitiv und reduziert Boilerplate-Code erheblich.
Die @KafkaListener Annotation ist das Herzstück der
Spring Boot Event-Verarbeitung. Sie ermöglicht es, Methoden direkt als
Event-Handler zu markieren.
@Component
@Slf4j
public class PaymentService {
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) {
log.info("Received order placed event for order: {}", event.getOrderId());
processPayment(event);
}
private void processPayment(OrderPlacedEvent event) {
// Geschäftslogik für Zahlungsverarbeitung
PaymentRequest request = PaymentRequest.builder()
.orderId(event.getOrderId())
.customerId(event.getCustomerId())
.amount(event.getTotalAmount())
.currency(event.getCurrency())
.build();
// Simulierte Zahlungsverarbeitung
PaymentResult result = paymentGateway.processPayment(request);
if (result.isSuccessful()) {
publishPaymentProcessedEvent(event.getOrderId(), result);
} else {
handlePaymentFailure(event, result);
}
}
}@Component
public class OrderEventService {
// Mehrere Topics gleichzeitig überwachen
@KafkaListener(topics = {"order.placed.v1", "order.updated.v1"})
public void handleOrderEvents(OrderEvent event) {
switch (event.getEventType()) {
case "OrderPlaced":
handleOrderPlaced((OrderPlacedEvent) event);
break;
case "OrderUpdated":
handleOrderUpdated((OrderUpdatedEvent) event);
break;
}
}
// Pattern-basierte Topic-Auswahl
@KafkaListener(topicPattern = "order\\..*\\.v1")
public void handleAllOrderEvents(OrderEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("Processing event from topic: {}", topic);
routeEventByTopic(event, topic);
}
}@Component
public class ScalablePaymentService {
@KafkaListener(
topics = "order.placed.v1",
groupId = "payment-service-group",
concurrency = "3" // 3 parallele Consumer-Instanzen
)
public void handleOrderPlaced(OrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
log.info("Processing order {} on partition {}", event.getOrderId(), partition);
processPayment(event);
}
// Partitions-spezifische Verarbeitung
@KafkaListener(
topicPartitions = @TopicPartition(
topic = "order.placed.v1",
partitions = {"0", "1"}
),
groupId = "priority-payment-group"
)
public void handlePriorityOrders(OrderPlacedEvent event) {
// Spezielle Behandlung für bestimmte Partitions
processPriorityPayment(event);
}
}@Component
public class EnhancedPaymentService {
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(
OrderPlacedEvent event,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header("correlation-id") String correlationId,
ConsumerRecord<String, OrderPlacedEvent> record) {
log.info("Processing event: correlationId={}, timestamp={}, offset={}",
correlationId, timestamp, record.offset());
// Korrelations-ID für Tracing verwenden
try (MDCCloseable mdcCloseable = MDC.putCloseable("correlationId", correlationId)) {
processPayment(event);
}
}
}# application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: payment-service
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "de.eda.training.events"
max.poll.records: 10
session.timeout.ms: 30000
heartbeat.interval.ms: 3000
listener:
concurrency: 3
poll-timeout: 3000
ack-mode: manual_immediate@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, OrderPlacedEvent> orderConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-service");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.eda.training.events");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Performance-Tuning
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(orderConsumerFactory());
factory.setConcurrency(3);
// Error Handling konfigurieren
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(1000L, 3L) // 3 Versuche mit 1s Pause
));
return factory;
}
}@Component
@ConditionalOnProperty(
value = "payment.service.enabled",
havingValue = "true",
matchIfMissing = true
)
public class ConditionalPaymentService {
@KafkaListener(
topics = "order.placed.v1",
condition = "#{environment.getProperty('payment.processing.mode') == 'async'}"
)
public void handleOrderPlacedAsync(OrderPlacedEvent event) {
processPaymentAsync(event);
}
@KafkaListener(
topics = "order.placed.v1",
condition = "#{@paymentModeService.isRealTimeMode()}"
)
public void handleOrderPlacedRealTime(OrderPlacedEvent event) {
processPaymentRealTime(event);
}
}@Component
public class ResilientPaymentService {
private final PaymentGateway paymentGateway;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order.placed.v1")
@Retryable(
value = {PaymentGatewayException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void handleOrderPlaced(OrderPlacedEvent event) {
log.info("Attempting payment processing for order: {}", event.getOrderId());
try {
PaymentResult result = paymentGateway.processPayment(
createPaymentRequest(event)
);
if (result.isSuccessful()) {
publishPaymentProcessedEvent(event.getOrderId(), result);
} else {
throw new PaymentProcessingException(
"Payment failed: " + result.getErrorMessage()
);
}
} catch (PaymentGatewayException e) {
log.warn("Payment gateway error for order {}: {}",
event.getOrderId(), e.getMessage());
throw e; // Wird von @Retryable abgefangen
}
}
@Recover
public void recoverFromPaymentFailure(PaymentGatewayException e,
OrderPlacedEvent event) {
log.error("Payment processing failed permanently for order {}",
event.getOrderId(), e);
// Dead Letter Topic senden
sendToDeadLetterTopic(event, e);
// Alternative: Kompensation starten
publishPaymentFailedEvent(event.getOrderId(), e.getMessage());
}
}@Component
public class PaymentServiceErrorHandler implements ConsumerAwareListenerErrorHandler {
@Override
public Object handleError(Message<?> message,
ListenerExecutionFailedException exception,
Consumer<?, ?> consumer) {
log.error("Error processing message: {}", message.getPayload(), exception);
// Message-Header auswerten
String topic = (String) message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
Integer partition = (Integer) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
Long offset = (Long) message.getHeaders().get(KafkaHeaders.OFFSET);
// Error-Klassifizierung
if (isRetryableError(exception)) {
log.warn("Retryable error for message at {}:{}:{}", topic, partition, offset);
throw exception; // Weiterwerfen für Retry-Mechanismus
} else {
log.error("Non-retryable error, sending to DLT");
sendToDeadLetterTopic(message, exception);
return null; // Message als verarbeitet markieren
}
}
private boolean isRetryableError(Exception e) {
return e instanceof PaymentGatewayException ||
e instanceof TemporaryServiceException;
}
}@Component
public class DeadLetterTopicHandler {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final PaymentService paymentService;
@KafkaListener(topics = "order.placed.v1.DLT")
public void handleDeadLetterMessage(
OrderPlacedEvent event,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stackTrace) {
log.error("Processing dead letter message for order {}: {}",
event.getOrderId(), errorMessage);
// Manual Review oder alternative Verarbeitung
DeadLetterRecord record = DeadLetterRecord.builder()
.orderId(event.getOrderId())
.originalEvent(event)
.errorMessage(errorMessage)
.timestamp(Instant.now())
.status("PENDING_REVIEW")
.build();
deadLetterRepository.save(record);
// Notification an Operations-Team
notificationService.notifyDeadLetterMessage(record);
}
// Manueller Retry-Endpoint
@EventListener
public void handleManualRetry(DeadLetterRetryEvent retryEvent) {
try {
paymentService.handleOrderPlaced(retryEvent.getOriginalEvent());
deadLetterRepository.markAsResolved(retryEvent.getRecordId());
} catch (Exception e) {
log.error("Manual retry failed for record {}", retryEvent.getRecordId(), e);
}
}
}@Component
public class BatchPaymentService {
private final List<OrderPlacedEvent> eventBatch = new ArrayList<>();
private final Object batchLock = new Object();
@Value("${payment.batch.size:10}")
private int batchSize;
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) {
synchronized (batchLock) {
eventBatch.add(event);
if (eventBatch.size() >= batchSize) {
processBatch(new ArrayList<>(eventBatch));
eventBatch.clear();
}
}
}
@Scheduled(fixedDelay = 5000) // Fallback: alle 5 Sekunden
public void processIncompleteBatch() {
synchronized (batchLock) {
if (!eventBatch.isEmpty()) {
processBatch(new ArrayList<>(eventBatch));
eventBatch.clear();
}
}
}
private void processBatch(List<OrderPlacedEvent> events) {
log.info("Processing payment batch with {} orders", events.size());
List<PaymentRequest> requests = events.stream()
.map(this::createPaymentRequest)
.collect(Collectors.toList());
List<PaymentResult> results = paymentGateway.processBatchPayments(requests);
// Ergebnisse verarbeiten
for (int i = 0; i < events.size(); i++) {
OrderPlacedEvent event = events.get(i);
PaymentResult result = results.get(i);
if (result.isSuccessful()) {
publishPaymentProcessedEvent(event.getOrderId(), result);
} else {
handlePaymentFailure(event, result);
}
}
}
}@Component
public class ConditionalOrderProcessor {
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event,
@Header Map<String, Object> headers) {
// Processing basierend auf Message-Properties
String priority = getHeaderValue(headers, "priority", "normal");
String customerType = getHeaderValue(headers, "customer-type", "regular");
if ("high".equals(priority)) {
processHighPriorityOrder(event);
} else if ("premium".equals(customerType)) {
processPremiumCustomerOrder(event);
} else {
processRegularOrder(event);
}
}
private void processHighPriorityOrder(OrderPlacedEvent event) {
log.info("Processing high priority order: {}", event.getOrderId());
// Express-Verarbeitung
paymentService.processExpressPayment(event);
shippingService.scheduleExpressShipping(event);
}
private void processPremiumCustomerOrder(OrderPlacedEvent event) {
log.info("Processing premium customer order: {}", event.getOrderId());
// Premium-Features
paymentService.processPaymentWithBonusPoints(event);
notificationService.sendPremiumConfirmation(event);
}
private void processRegularOrder(OrderPlacedEvent event) {
log.info("Processing regular order: {}", event.getOrderId());
paymentService.processStandardPayment(event);
}
private String getHeaderValue(Map<String, Object> headers, String key, String defaultValue) {
Object value = headers.get(key);
return value != null ? value.toString() : defaultValue;
}
}@Component
@Transactional
public class TransactionalPaymentService {
private final PaymentRepository paymentRepository;
private final OrderRepository orderRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order.placed.v1")
@Transactional(rollbackFor = Exception.class)
public void handleOrderPlaced(OrderPlacedEvent event) {
try {
// 1. Order in lokaler DB erstellen/aktualisieren
Order order = createOrUpdateOrder(event);
// 2. Payment Record erstellen
Payment payment = Payment.builder()
.orderId(event.getOrderId())
.amount(event.getTotalAmount())
.currency(event.getCurrency())
.status(PaymentStatus.PENDING)
.build();
paymentRepository.save(payment);
// 3. External Payment Gateway (außerhalb Transaktion)
PaymentResult result = processExternalPayment(event);
// 4. Payment Status aktualisieren
payment.setStatus(result.isSuccessful() ?
PaymentStatus.COMPLETED : PaymentStatus.FAILED);
payment.setGatewayTransactionId(result.getTransactionId());
paymentRepository.save(payment);
// 5. Follow-up Event senden
if (result.isSuccessful()) {
publishPaymentProcessedEvent(event.getOrderId(), result);
}
} catch (Exception e) {
log.error("Error processing order payment for {}", event.getOrderId(), e);
throw e; // Rollback der Transaktion
}
}
// Separate Methode ohne @Transactional für externe Calls
private PaymentResult processExternalPayment(OrderPlacedEvent event) {
// Externe Service-Calls sollten nicht in DB-Transaktion laufen
return paymentGateway.processPayment(createPaymentRequest(event));
}
}Die Spring Boot Integration macht Event-Consumer-Implementierung durch deklarative Programmierung sehr effizient. Die Kombination aus Annotations, automatischer Konfiguration und umfassendem Error-Handling reduziert den Code-Aufwand erheblich und ermöglicht es Entwicklern, sich auf die Geschäftslogik zu konzentrieren.
Im siehe Kapitel zu “Eventverarbeitung mit Python” werden wir entsprechende Implementierungen mit asyncio-basierten Ansätzen betrachten, während das siehe Kapitel zu “Manuelles vs. automatisches Offset-Handling” die Details der Message-Bestätigung vertieft.