22 Event-Erzeugung mit Spring Boot

Spring Boot bietet umfassende Unterstützung für Apache Kafka durch das Spring Kafka Projekt. Die Integration ermöglicht es, Events mit minimalem Konfigurationsaufwand zu produzieren und dabei von Springs Dependency Injection, Transaktionsmanagement und Auto-Configuration zu profitieren.

22.1 KafkaTemplate Configuration

Das KafkaTemplate ist die zentrale Abstraktion in Spring Kafka für die Event-Produktion. Es kapselt die Kafka Producer API und integriert sich nahtlos in Springs Application Context.

22.1.1 Auto-Configuration vs. Custom Configuration

Spring Boot bietet umfangreiche Auto-Configuration für Kafka Producer. Für die meisten Anwendungsfälle reichen Properties-basierte Konfigurationen aus, während komplexere Szenarien Custom Configuration erfordern.

Properties-basierte Konfiguration:

# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      batch-size: 16384
      linger-ms: 5
      buffer-memory: 33554432
      enable-idempotence: true
      transaction-id-prefix: order-service-

app:
  events:
    topics:
      order-placed: order.placed.v1
      payment-processed: payment.processed.v1
      inventory-reserved: inventory.reserved.v1

Custom Producer Configuration:

@Configuration
@EnableKafka
@EnableConfigurationProperties(EventProperties.class)
public class KafkaProducerConfig {
    
    private final EventProperties eventProperties;
    
    public KafkaProducerConfig(EventProperties eventProperties) {
        this.eventProperties = eventProperties;
    }
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        
        // Basic Configuration
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                       eventProperties.getBootstrapServers());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       JsonSerializer.class);
        
        // Performance Tuning
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        
        // Idempotence and Transactions
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
                       eventProperties.getTransactionIdPrefix() + 
                       UUID.randomUUID().toString());
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
        
        // Default Topic für convenience
        template.setDefaultTopic(eventProperties.getTopics().getOrderPlaced());
        
        // Observability
        template.setProducerInterceptors(
            Arrays.asList(new MetricsProducerInterceptor())
        );
        
        return template;
    }
    
    @Bean
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
        manager.setTransactionSynchronization(
            AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
        );
        return manager;
    }
}

Configuration Properties:

@ConfigurationProperties(prefix = "app.events")
@Data
public class EventProperties {
    private String bootstrapServers = "localhost:9092";
    private String transactionIdPrefix = "order-service-";
    private Topics topics = new Topics();
    
    @Data
    public static class Topics {
        private String orderPlaced = "order.placed.v1";
        private String paymentProcessed = "payment.processed.v1";
        private String inventoryReserved = "inventory.reserved.v1";
    }
}

22.1.2 Producer Factory Customization

Für erweiterte Anwendungsfälle können multiple Producer Factories konfiguriert werden, beispielsweise für verschiedene Serialisierungsformate oder Performance-Profile.

@Configuration
public class MultiProducerConfig {
    
    @Bean("stringProducerFactory")
    public ProducerFactory<String, String> stringProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG, "1"); // Faster for non-critical events
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean("avroProducerFactory")  
    public ProducerFactory<String, Object> avroProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        configProps.put("schema.registry.url", "http://localhost:8081");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // Highest consistency for critical events
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean("stringKafkaTemplate")
    public KafkaTemplate<String, String> stringKafkaTemplate(
            @Qualifier("stringProducerFactory") ProducerFactory<String, String> factory) {
        return new KafkaTemplate<>(factory);
    }
    
    @Bean("avroKafkaTemplate")
    public KafkaTemplate<String, Object> avroKafkaTemplate(
            @Qualifier("avroProducerFactory") ProducerFactory<String, Object> factory) {
        return new KafkaTemplate<>(factory);
    }
}

22.2 Synchrone vs. asynchrone Produktion

Spring Kafka unterstützt sowohl synchrone als auch asynchrone Event-Produktion. Die Wahl zwischen beiden Ansätzen hat erhebliche Auswirkungen auf Performance, Fehlerbehandlung und Anwendungsarchitektur.

sb_prod_flow.svg

22.2.1 Synchrone Event-Produktion

Bei synchroner Produktion blockiert der aufrufende Thread, bis die Event-Übertragung bestätigt wird. Dies garantiert sofortige Fehlererkennung, kann aber die Anwendungsperformance beeinträchtigen.

@Service
@Transactional
public class SynchronousOrderService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    private final EventProperties eventProperties;
    
    public Order createOrder(CreateOrderRequest request) {
        // 1. Geschäftslogik
        Order order = new Order(request);
        Order savedOrder = orderRepository.save(order);
        
        // 2. Synchrone Event-Publikation
        OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
        
        try {
            SendResult<String, Object> result = kafkaTemplate
                .send(eventProperties.getTopics().getOrderPlaced(), 
                      savedOrder.getOrderId(), 
                      event)
                .get(5, TimeUnit.SECONDS); // Timeout nach 5 Sekunden
            
            log.info("Order event published successfully: {}", 
                    result.getRecordMetadata());
            
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            log.error("Failed to publish order event for order: {}", 
                     savedOrder.getOrderId(), e);
            
            // Geschäftsentscheidung: Rollback oder Compensation
            throw new EventPublishingException("Event publishing failed", e);
        }
        
        return savedOrder;
    }
    
    public void cancelOrder(String orderId) {
        Order order = findOrderById(orderId);
        order.cancel();
        orderRepository.save(order);
        
        // Synchrone Publikation mit Fehlerbehandlung
        OrderCancelledEvent event = OrderCancelledEvent.from(order);
        publishEventSynchronously(eventProperties.getTopics().getOrderCancelled(), 
                                 orderId, event);
    }
    
    private void publishEventSynchronously(String topic, String key, Object event) {
        try {
            kafkaTemplate.send(topic, key, event).get(3, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.error("Synchronous event publishing failed for topic: {}", topic, e);
            throw new EventPublishingException("Critical event publishing failed", e);
        }
    }
}

22.2.2 Asynchrone Event-Produktion

Asynchrone Produktion gibt die Kontrolle sofort an den aufrufenden Code zurück und behandelt Erfolg oder Fehler in Callbacks. Dies verbessert die Performance, erfordert aber sorgfältige Fehlerbehandlung.

@Service
@Transactional
public class AsynchronousOrderService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    private final EventMetrics eventMetrics;
    private final EventProperties eventProperties;
    
    public Order createOrder(CreateOrderRequest request) {
        // 1. Geschäftslogik
        Order order = new Order(request);
        Order savedOrder = orderRepository.save(order);
        
        // 2. Asynchrone Event-Publikation
        OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
        publishEventAsync(eventProperties.getTopics().getOrderPlaced(), 
                         savedOrder.getOrderId(), event);
        
        return savedOrder;
    }
    
    private void publishEventAsync(String topic, String key, Object event) {
        ListenableFuture<SendResult<String, Object>> future = 
            kafkaTemplate.send(topic, key, event);
        
        // Success Callback
        future.addCallback(
            result -> {
                eventMetrics.incrementSuccessCounter(topic);
                log.debug("Event published successfully to topic: {} with offset: {}", 
                         topic, result.getRecordMetadata().offset());
            },
            failure -> {
                eventMetrics.incrementErrorCounter(topic);
                handleAsyncPublishingFailure(topic, key, event, failure);
            }
        );
    }
    
    private void handleAsyncPublishingFailure(String topic, String key, Object event, Throwable failure) {
        log.error("Async event publishing failed for topic: {} with key: {}", 
                 topic, key, failure);
        
        // Fehlerklassifizierung
        if (isRetryableError(failure)) {
            scheduleRetry(topic, key, event);
        } else {
            sendToDeadLetter(topic, key, event, failure);
        }
        
        // Monitoring und Alerting
        eventMetrics.recordPublishingError(topic, failure.getClass().getSimpleName());
    }
    
    private void scheduleRetry(String topic, String key, Object event) {
        // Implementierung mit @Retryable oder manueller Retry-Logic
        CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
            .execute(() -> publishEventAsync(topic, key, event));
    }
}

22.2.3 CompletableFuture-basierte Ansätze

Moderne Spring Kafka Versionen unterstützen CompletableFuture für elegantere asynchrone Verarbeitung.

@Service
public class ModernAsyncOrderService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public CompletableFuture<Order> createOrderAsync(CreateOrderRequest request) {
        return CompletableFuture
            .supplyAsync(() -> {
                // Geschäftslogik
                Order order = new Order(request);
                return orderRepository.save(order);
            })
            .thenCompose(savedOrder -> {
                // Event-Publikation
                OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
                return publishEventCompletable(
                    "order.placed.v1", 
                    savedOrder.getOrderId(), 
                    event
                ).thenApply(result -> savedOrder);
            });
    }
    
    private CompletableFuture<SendResult<String, Object>> publishEventCompletable(
            String topic, String key, Object event) {
        
        CompletableFuture<SendResult<String, Object>> kafkaFuture = 
            kafkaTemplate.send(topic, key, event).completable();
        
        return kafkaFuture
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    log.error("Event publishing failed", throwable);
                } else {
                    log.debug("Event published to offset: {}", 
                             result.getRecordMetadata().offset());
                }
            });
    }
}

22.3 Transaction Support

Spring Kafka integriert sich nahtlos in Springs Transaktionsmanagement. Dies ermöglicht atomische Operationen zwischen Datenbank-Updates und Event-Publikation.

22.3.1 Transactional Event Publishing

@Service
@Transactional
public class TransactionalOrderService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    
    @KafkaTransactional
    public Order createOrderWithTransactionalEvents(CreateOrderRequest request) {
        // 1. Datenbank-Operation (in JPA Transaction)
        Order order = new Order(request);
        Order savedOrder = orderRepository.save(order);
        
        // 2. Kafka Transaction (koordiniert mit JPA Transaction)
        OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
        kafkaTemplate.send("order.placed.v1", savedOrder.getOrderId(), event);
        
        // 3. Additional Events in same transaction
        publishInventoryReservationRequest(savedOrder);
        publishPaymentRequest(savedOrder);
        
        return savedOrder;
    }
    
    private void publishInventoryReservationRequest(Order order) {
        InventoryReservationRequestedEvent event = 
            InventoryReservationRequestedEvent.from(order);
        kafkaTemplate.send("inventory.reservation.requested.v1", 
                          order.getOrderId(), event);
    }
    
    private void publishPaymentRequest(Order order) {
        PaymentRequestedEvent event = PaymentRequestedEvent.from(order);
        kafkaTemplate.send("payment.requested.v1", order.getOrderId(), event);
    }
    
    @KafkaTransactional
    public void processOrderWorkflow(String orderId) {
        try {
            // Multiple events in single transaction
            kafkaTemplate.executeInTransaction(template -> {
                Order order = findOrderById(orderId);
                
                // Event chain publishing
                template.send("order.processing.started.v1", orderId, 
                             OrderProcessingStartedEvent.from(order));
                
                if (order.requiresInventoryCheck()) {
                    template.send("inventory.check.requested.v1", orderId,
                                 InventoryCheckRequestedEvent.from(order));
                }
                
                if (order.requiresPaymentProcessing()) {
                    template.send("payment.processing.requested.v1", orderId,
                                 PaymentProcessingRequestedEvent.from(order));
                }
                
                return null; // executeInTransaction requires return value
            });
            
        } catch (Exception e) {
            log.error("Transactional event publishing failed for order: {}", orderId, e);
            throw new EventPublishingException("Transaction failed", e);
        }
    }
}

22.3.2 ChainedTransactionManager für Multi-Resource Transactions

Für komplexere Transaktionsszenarien können mehrere Transaction Manager koordiniert werden.

@Configuration
public class TransactionManagerConfig {
    
    @Bean
    @Primary
    public PlatformTransactionManager chainedTransactionManager(
            DataSourceTransactionManager dbTransactionManager,
            KafkaTransactionManager kafkaTransactionManager) {
        
        return new ChainedTransactionManager(
            kafkaTransactionManager,  // Kafka zuerst für bessere Fehlerbehandlung
            dbTransactionManager      // JPA/JDBC danach
        );
    }
    
    @Bean
    public DataSourceTransactionManager dbTransactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
}

@Service
public class ChainedTransactionOrderService {
    
    @Transactional("chainedTransactionManager")
    public Order createOrderWithChainedTransactions(CreateOrderRequest request) {
        // Beide Transaktionen werden koordiniert
        Order order = new Order(request);
        Order savedOrder = orderRepository.save(order); // JPA Transaction
        
        OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
        kafkaTemplate.send("order.placed.v1", savedOrder.getOrderId(), event); // Kafka Transaction
        
        return savedOrder;
    }
}

22.4 Praktische Implementierungsbeispiele

Die folgenden Beispiele zeigen realistische Implementierungen für typische E-Commerce-Szenarien.

22.4.1 Robuste Order Event Publisher

@Component
public class OrderEventPublisher {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final EventProperties eventProperties;
    private final EventMetrics eventMetrics;
    private final RetryTemplate retryTemplate;
    
    public OrderEventPublisher(KafkaTemplate<String, Object> kafkaTemplate,
                              EventProperties eventProperties,
                              EventMetrics eventMetrics) {
        this.kafkaTemplate = kafkaTemplate;
        this.eventProperties = eventProperties;
        this.eventMetrics = eventMetrics;
        this.retryTemplate = createRetryTemplate();
    }
    
    public void publishOrderPlaced(Order order) {
        OrderPlacedEvent event = OrderPlacedEvent.builder()
            .eventId(UUID.randomUUID().toString())
            .eventType("OrderPlaced")
            .timestamp(Instant.now())
            .version("v1")
            .orderId(order.getOrderId())
            .customerId(order.getCustomerId())
            .items(order.getItems().stream()
                   .map(OrderItemDto::from)
                   .collect(Collectors.toList()))
            .totalAmount(order.getTotalAmount())
            .currency(order.getCurrency())
            .shippingAddress(AddressDto.from(order.getShippingAddress()))
            .build();
        
        publishEventWithRetry(eventProperties.getTopics().getOrderPlaced(), 
                             order.getOrderId(), event);
    }
    
    public void publishOrderCancelled(Order order, String reason) {
        OrderCancelledEvent event = OrderCancelledEvent.builder()
            .eventId(UUID.randomUUID().toString())
            .eventType("OrderCancelled")
            .timestamp(Instant.now())
            .version("v1")
            .orderId(order.getOrderId())
            .customerId(order.getCustomerId())
            .reason(reason)
            .originalAmount(order.getTotalAmount())
            .build();
        
        publishEventWithRetry(eventProperties.getTopics().getOrderCancelled(), 
                             order.getOrderId(), event);
    }
    
    private void publishEventWithRetry(String topic, String key, Object event) {
        retryTemplate.execute(context -> {
            try {
                SendResult<String, Object> result = kafkaTemplate
                    .send(topic, key, event)
                    .get(5, TimeUnit.SECONDS);
                
                eventMetrics.incrementSuccessCounter(topic);
                eventMetrics.recordPublishLatency(topic, 
                    System.currentTimeMillis() - context.getAttribute("startTime"));
                
                log.debug("Event published successfully: topic={}, key={}, offset={}", 
                         topic, key, result.getRecordMetadata().offset());
                
                return result;
                
            } catch (Exception e) {
                eventMetrics.incrementErrorCounter(topic);
                log.warn("Event publishing attempt failed: topic={}, key={}, attempt={}", 
                        topic, key, context.getRetryCount(), e);
                throw new EventPublishingException("Event publishing failed", e);
            }
        });
    }
    
    private RetryTemplate createRetryTemplate() {
        RetryTemplate template = new RetryTemplate();
        
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1000L); // 1 second
        template.setBackOffPolicy(backOffPolicy);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        template.setRetryPolicy(retryPolicy);
        
        return template;
    }
}

22.4.2 Event Correlation und Tracing

@Component
public class CorrelatedEventPublisher {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final Tracer tracer;
    
    public void publishWithCorrelation(String topic, String key, Object event, 
                                      String correlationId) {
        Span span = tracer.nextSpan()
            .name("event-publishing")
            .tag("topic", topic)
            .tag("correlation.id", correlationId)
            .start();
        
        try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
            // Correlation-ID zu Event hinzufügen
            if (event instanceof CorrelatedEvent) {
                ((CorrelatedEvent) event).setCorrelationId(correlationId);
            }
            
            // Tracing Headers hinzufügen
            ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, event);
            TraceContext.Injector<ProducerRecord<String, Object>> injector = 
                tracing.propagation().injector(ProducerRecord::headers);
            injector.inject(span.context(), record);
            
            kafkaTemplate.send(record).addCallback(
                result -> {
                    span.tag("status", "success");
                    span.tag("offset", String.valueOf(result.getRecordMetadata().offset()));
                    span.end();
                },
                failure -> {
                    span.tag("status", "error");
                    span.tag("error", failure.getMessage());
                    span.end();
                }
            );
            
        } catch (Exception e) {
            span.tag("status", "error");
            span.tag("error", e.getMessage());
            span.end();
            throw e;
        }
    }
}

22.4.3 Production-Ready Event Service

@Service
@Slf4j
public class ProductionEventService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final MeterRegistry meterRegistry;
    private final EventProperties eventProperties;
    
    private final Counter successfulEvents;
    private final Counter failedEvents;
    private final Timer publishingLatency;
    
    public ProductionEventService(KafkaTemplate<String, Object> kafkaTemplate,
                                 MeterRegistry meterRegistry,
                                 EventProperties eventProperties) {
        this.kafkaTemplate = kafkaTemplate;
        this.meterRegistry = meterRegistry;
        this.eventProperties = eventProperties;
        
        // Metrics initialization
        this.successfulEvents = Counter.builder("events.published.success")
            .description("Number of successfully published events")
            .register(meterRegistry);
        
        this.failedEvents = Counter.builder("events.published.failed")
            .description("Number of failed event publications")
            .register(meterRegistry);
        
        this.publishingLatency = Timer.builder("events.publishing.latency")
            .description("Event publishing latency")
            .register(meterRegistry);
    }
    
    public CompletableFuture<Void> publishOrderEvent(OrderEvent event) {
        return Timer.Sample.start(meterRegistry)
            .stop(publishingLatency)
            .thenCompose(sample -> {
                
                String topic = determineTopicFromEvent(event);
                String key = event.getOrderId();
                
                return kafkaTemplate.send(topic, key, event)
                    .completable()
                    .thenRun(() -> {
                        successfulEvents.increment();
                        log.info("Order event published: type={}, orderId={}", 
                                event.getEventType(), event.getOrderId());
                    })
                    .exceptionally(throwable -> {
                        failedEvents.increment();
                        log.error("Failed to publish order event: type={}, orderId={}", 
                                 event.getEventType(), event.getOrderId(), throwable);
                        return null;
                    });
            });
    }
    
    private String determineTopicFromEvent(OrderEvent event) {
        switch (event.getEventType()) {
            case "OrderPlaced":
                return eventProperties.getTopics().getOrderPlaced();
            case "OrderCancelled":
                return eventProperties.getTopics().getOrderCancelled();
            case "PaymentProcessed":
                return eventProperties.getTopics().getPaymentProcessed();
            default:
                throw new IllegalArgumentException("Unknown event type: " + event.getEventType());
        }
    }
    
    @EventListener
    public void handleApplicationReady(ApplicationReadyEvent event) {
        log.info("Production Event Service ready. Kafka template configured with {} properties", 
                kafkaTemplate.getProducerFactory().getConfigurationProperties().size());
    }
}

Diese Implementierungen zeigen realistische Ansätze für robuste Event-Produktion in Spring Boot Anwendungen. Sie berücksichtigen Performance, Fehlerbehandlung, Monitoring und Production-Requirements, während sie die Flexibilität der Spring Kafka Integration vollständig ausnutzen.


Status-Tracking: ✅ Kapitel “Event-Erzeugung mit Spring Boot” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf KafkaTemplate, Transaktionen und Production-Ready Implementierungen