Spring Boot bietet umfassende Unterstützung für Apache Kafka durch das Spring Kafka Projekt. Die Integration ermöglicht es, Events mit minimalem Konfigurationsaufwand zu produzieren und dabei von Springs Dependency Injection, Transaktionsmanagement und Auto-Configuration zu profitieren.
Das KafkaTemplate ist die zentrale Abstraktion in Spring Kafka für die Event-Produktion. Es kapselt die Kafka Producer API und integriert sich nahtlos in Springs Application Context.
Spring Boot bietet umfangreiche Auto-Configuration für Kafka Producer. Für die meisten Anwendungsfälle reichen Properties-basierte Konfigurationen aus, während komplexere Szenarien Custom Configuration erfordern.
Properties-basierte Konfiguration:
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
batch-size: 16384
linger-ms: 5
buffer-memory: 33554432
enable-idempotence: true
transaction-id-prefix: order-service-
app:
events:
topics:
order-placed: order.placed.v1
payment-processed: payment.processed.v1
inventory-reserved: inventory.reserved.v1Custom Producer Configuration:
@Configuration
@EnableKafka
@EnableConfigurationProperties(EventProperties.class)
public class KafkaProducerConfig {
private final EventProperties eventProperties;
public KafkaProducerConfig(EventProperties eventProperties) {
this.eventProperties = eventProperties;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// Basic Configuration
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
eventProperties.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
// Performance Tuning
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 5);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// Idempotence and Transactions
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
eventProperties.getTransactionIdPrefix() +
UUID.randomUUID().toString());
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
// Default Topic für convenience
template.setDefaultTopic(eventProperties.getTopics().getOrderPlaced());
// Observability
template.setProducerInterceptors(
Arrays.asList(new MetricsProducerInterceptor())
);
return template;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
manager.setTransactionSynchronization(
AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
);
return manager;
}
}Configuration Properties:
@ConfigurationProperties(prefix = "app.events")
@Data
public class EventProperties {
private String bootstrapServers = "localhost:9092";
private String transactionIdPrefix = "order-service-";
private Topics topics = new Topics();
@Data
public static class Topics {
private String orderPlaced = "order.placed.v1";
private String paymentProcessed = "payment.processed.v1";
private String inventoryReserved = "inventory.reserved.v1";
}
}Für erweiterte Anwendungsfälle können multiple Producer Factories konfiguriert werden, beispielsweise für verschiedene Serialisierungsformate oder Performance-Profile.
@Configuration
public class MultiProducerConfig {
@Bean("stringProducerFactory")
public ProducerFactory<String, String> stringProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "1"); // Faster for non-critical events
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean("avroProducerFactory")
public ProducerFactory<String, Object> avroProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
configProps.put("schema.registry.url", "http://localhost:8081");
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // Highest consistency for critical events
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean("stringKafkaTemplate")
public KafkaTemplate<String, String> stringKafkaTemplate(
@Qualifier("stringProducerFactory") ProducerFactory<String, String> factory) {
return new KafkaTemplate<>(factory);
}
@Bean("avroKafkaTemplate")
public KafkaTemplate<String, Object> avroKafkaTemplate(
@Qualifier("avroProducerFactory") ProducerFactory<String, Object> factory) {
return new KafkaTemplate<>(factory);
}
}Spring Kafka unterstützt sowohl synchrone als auch asynchrone Event-Produktion. Die Wahl zwischen beiden Ansätzen hat erhebliche Auswirkungen auf Performance, Fehlerbehandlung und Anwendungsarchitektur.
Bei synchroner Produktion blockiert der aufrufende Thread, bis die Event-Übertragung bestätigt wird. Dies garantiert sofortige Fehlererkennung, kann aber die Anwendungsperformance beeinträchtigen.
@Service
@Transactional
public class SynchronousOrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
private final EventProperties eventProperties;
public Order createOrder(CreateOrderRequest request) {
// 1. Geschäftslogik
Order order = new Order(request);
Order savedOrder = orderRepository.save(order);
// 2. Synchrone Event-Publikation
OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
try {
SendResult<String, Object> result = kafkaTemplate
.send(eventProperties.getTopics().getOrderPlaced(),
savedOrder.getOrderId(),
event)
.get(5, TimeUnit.SECONDS); // Timeout nach 5 Sekunden
log.info("Order event published successfully: {}",
result.getRecordMetadata());
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.error("Failed to publish order event for order: {}",
savedOrder.getOrderId(), e);
// Geschäftsentscheidung: Rollback oder Compensation
throw new EventPublishingException("Event publishing failed", e);
}
return savedOrder;
}
public void cancelOrder(String orderId) {
Order order = findOrderById(orderId);
order.cancel();
orderRepository.save(order);
// Synchrone Publikation mit Fehlerbehandlung
OrderCancelledEvent event = OrderCancelledEvent.from(order);
publishEventSynchronously(eventProperties.getTopics().getOrderCancelled(),
orderId, event);
}
private void publishEventSynchronously(String topic, String key, Object event) {
try {
kafkaTemplate.send(topic, key, event).get(3, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Synchronous event publishing failed for topic: {}", topic, e);
throw new EventPublishingException("Critical event publishing failed", e);
}
}
}Asynchrone Produktion gibt die Kontrolle sofort an den aufrufenden Code zurück und behandelt Erfolg oder Fehler in Callbacks. Dies verbessert die Performance, erfordert aber sorgfältige Fehlerbehandlung.
@Service
@Transactional
public class AsynchronousOrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
private final EventMetrics eventMetrics;
private final EventProperties eventProperties;
public Order createOrder(CreateOrderRequest request) {
// 1. Geschäftslogik
Order order = new Order(request);
Order savedOrder = orderRepository.save(order);
// 2. Asynchrone Event-Publikation
OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
publishEventAsync(eventProperties.getTopics().getOrderPlaced(),
savedOrder.getOrderId(), event);
return savedOrder;
}
private void publishEventAsync(String topic, String key, Object event) {
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, key, event);
// Success Callback
future.addCallback(
result -> {
eventMetrics.incrementSuccessCounter(topic);
log.debug("Event published successfully to topic: {} with offset: {}",
topic, result.getRecordMetadata().offset());
},
failure -> {
eventMetrics.incrementErrorCounter(topic);
handleAsyncPublishingFailure(topic, key, event, failure);
}
);
}
private void handleAsyncPublishingFailure(String topic, String key, Object event, Throwable failure) {
log.error("Async event publishing failed for topic: {} with key: {}",
topic, key, failure);
// Fehlerklassifizierung
if (isRetryableError(failure)) {
scheduleRetry(topic, key, event);
} else {
sendToDeadLetter(topic, key, event, failure);
}
// Monitoring und Alerting
eventMetrics.recordPublishingError(topic, failure.getClass().getSimpleName());
}
private void scheduleRetry(String topic, String key, Object event) {
// Implementierung mit @Retryable oder manueller Retry-Logic
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
.execute(() -> publishEventAsync(topic, key, event));
}
}Moderne Spring Kafka Versionen unterstützen CompletableFuture für elegantere asynchrone Verarbeitung.
@Service
public class ModernAsyncOrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public CompletableFuture<Order> createOrderAsync(CreateOrderRequest request) {
return CompletableFuture
.supplyAsync(() -> {
// Geschäftslogik
Order order = new Order(request);
return orderRepository.save(order);
})
.thenCompose(savedOrder -> {
// Event-Publikation
OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
return publishEventCompletable(
"order.placed.v1",
savedOrder.getOrderId(),
event
).thenApply(result -> savedOrder);
});
}
private CompletableFuture<SendResult<String, Object>> publishEventCompletable(
String topic, String key, Object event) {
CompletableFuture<SendResult<String, Object>> kafkaFuture =
kafkaTemplate.send(topic, key, event).completable();
return kafkaFuture
.whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("Event publishing failed", throwable);
} else {
log.debug("Event published to offset: {}",
result.getRecordMetadata().offset());
}
});
}
}Spring Kafka integriert sich nahtlos in Springs Transaktionsmanagement. Dies ermöglicht atomische Operationen zwischen Datenbank-Updates und Event-Publikation.
@Service
@Transactional
public class TransactionalOrderService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
@KafkaTransactional
public Order createOrderWithTransactionalEvents(CreateOrderRequest request) {
// 1. Datenbank-Operation (in JPA Transaction)
Order order = new Order(request);
Order savedOrder = orderRepository.save(order);
// 2. Kafka Transaction (koordiniert mit JPA Transaction)
OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
kafkaTemplate.send("order.placed.v1", savedOrder.getOrderId(), event);
// 3. Additional Events in same transaction
publishInventoryReservationRequest(savedOrder);
publishPaymentRequest(savedOrder);
return savedOrder;
}
private void publishInventoryReservationRequest(Order order) {
InventoryReservationRequestedEvent event =
InventoryReservationRequestedEvent.from(order);
kafkaTemplate.send("inventory.reservation.requested.v1",
order.getOrderId(), event);
}
private void publishPaymentRequest(Order order) {
PaymentRequestedEvent event = PaymentRequestedEvent.from(order);
kafkaTemplate.send("payment.requested.v1", order.getOrderId(), event);
}
@KafkaTransactional
public void processOrderWorkflow(String orderId) {
try {
// Multiple events in single transaction
kafkaTemplate.executeInTransaction(template -> {
Order order = findOrderById(orderId);
// Event chain publishing
template.send("order.processing.started.v1", orderId,
OrderProcessingStartedEvent.from(order));
if (order.requiresInventoryCheck()) {
template.send("inventory.check.requested.v1", orderId,
InventoryCheckRequestedEvent.from(order));
}
if (order.requiresPaymentProcessing()) {
template.send("payment.processing.requested.v1", orderId,
PaymentProcessingRequestedEvent.from(order));
}
return null; // executeInTransaction requires return value
});
} catch (Exception e) {
log.error("Transactional event publishing failed for order: {}", orderId, e);
throw new EventPublishingException("Transaction failed", e);
}
}
}Für komplexere Transaktionsszenarien können mehrere Transaction Manager koordiniert werden.
@Configuration
public class TransactionManagerConfig {
@Bean
@Primary
public PlatformTransactionManager chainedTransactionManager(
DataSourceTransactionManager dbTransactionManager,
KafkaTransactionManager kafkaTransactionManager) {
return new ChainedTransactionManager(
kafkaTransactionManager, // Kafka zuerst für bessere Fehlerbehandlung
dbTransactionManager // JPA/JDBC danach
);
}
@Bean
public DataSourceTransactionManager dbTransactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
@Service
public class ChainedTransactionOrderService {
@Transactional("chainedTransactionManager")
public Order createOrderWithChainedTransactions(CreateOrderRequest request) {
// Beide Transaktionen werden koordiniert
Order order = new Order(request);
Order savedOrder = orderRepository.save(order); // JPA Transaction
OrderPlacedEvent event = OrderPlacedEvent.from(savedOrder);
kafkaTemplate.send("order.placed.v1", savedOrder.getOrderId(), event); // Kafka Transaction
return savedOrder;
}
}Die folgenden Beispiele zeigen realistische Implementierungen für typische E-Commerce-Szenarien.
@Component
public class OrderEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final EventProperties eventProperties;
private final EventMetrics eventMetrics;
private final RetryTemplate retryTemplate;
public OrderEventPublisher(KafkaTemplate<String, Object> kafkaTemplate,
EventProperties eventProperties,
EventMetrics eventMetrics) {
this.kafkaTemplate = kafkaTemplate;
this.eventProperties = eventProperties;
this.eventMetrics = eventMetrics;
this.retryTemplate = createRetryTemplate();
}
public void publishOrderPlaced(Order order) {
OrderPlacedEvent event = OrderPlacedEvent.builder()
.eventId(UUID.randomUUID().toString())
.eventType("OrderPlaced")
.timestamp(Instant.now())
.version("v1")
.orderId(order.getOrderId())
.customerId(order.getCustomerId())
.items(order.getItems().stream()
.map(OrderItemDto::from)
.collect(Collectors.toList()))
.totalAmount(order.getTotalAmount())
.currency(order.getCurrency())
.shippingAddress(AddressDto.from(order.getShippingAddress()))
.build();
publishEventWithRetry(eventProperties.getTopics().getOrderPlaced(),
order.getOrderId(), event);
}
public void publishOrderCancelled(Order order, String reason) {
OrderCancelledEvent event = OrderCancelledEvent.builder()
.eventId(UUID.randomUUID().toString())
.eventType("OrderCancelled")
.timestamp(Instant.now())
.version("v1")
.orderId(order.getOrderId())
.customerId(order.getCustomerId())
.reason(reason)
.originalAmount(order.getTotalAmount())
.build();
publishEventWithRetry(eventProperties.getTopics().getOrderCancelled(),
order.getOrderId(), event);
}
private void publishEventWithRetry(String topic, String key, Object event) {
retryTemplate.execute(context -> {
try {
SendResult<String, Object> result = kafkaTemplate
.send(topic, key, event)
.get(5, TimeUnit.SECONDS);
eventMetrics.incrementSuccessCounter(topic);
eventMetrics.recordPublishLatency(topic,
System.currentTimeMillis() - context.getAttribute("startTime"));
log.debug("Event published successfully: topic={}, key={}, offset={}",
topic, key, result.getRecordMetadata().offset());
return result;
} catch (Exception e) {
eventMetrics.incrementErrorCounter(topic);
log.warn("Event publishing attempt failed: topic={}, key={}, attempt={}",
topic, key, context.getRetryCount(), e);
throw new EventPublishingException("Event publishing failed", e);
}
});
}
private RetryTemplate createRetryTemplate() {
RetryTemplate template = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000L); // 1 second
template.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
template.setRetryPolicy(retryPolicy);
return template;
}
}@Component
public class CorrelatedEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final Tracer tracer;
public void publishWithCorrelation(String topic, String key, Object event,
String correlationId) {
Span span = tracer.nextSpan()
.name("event-publishing")
.tag("topic", topic)
.tag("correlation.id", correlationId)
.start();
try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
// Correlation-ID zu Event hinzufügen
if (event instanceof CorrelatedEvent) {
((CorrelatedEvent) event).setCorrelationId(correlationId);
}
// Tracing Headers hinzufügen
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, event);
TraceContext.Injector<ProducerRecord<String, Object>> injector =
tracing.propagation().injector(ProducerRecord::headers);
injector.inject(span.context(), record);
kafkaTemplate.send(record).addCallback(
result -> {
span.tag("status", "success");
span.tag("offset", String.valueOf(result.getRecordMetadata().offset()));
span.end();
},
failure -> {
span.tag("status", "error");
span.tag("error", failure.getMessage());
span.end();
}
);
} catch (Exception e) {
span.tag("status", "error");
span.tag("error", e.getMessage());
span.end();
throw e;
}
}
}@Service
@Slf4j
public class ProductionEventService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final MeterRegistry meterRegistry;
private final EventProperties eventProperties;
private final Counter successfulEvents;
private final Counter failedEvents;
private final Timer publishingLatency;
public ProductionEventService(KafkaTemplate<String, Object> kafkaTemplate,
MeterRegistry meterRegistry,
EventProperties eventProperties) {
this.kafkaTemplate = kafkaTemplate;
this.meterRegistry = meterRegistry;
this.eventProperties = eventProperties;
// Metrics initialization
this.successfulEvents = Counter.builder("events.published.success")
.description("Number of successfully published events")
.register(meterRegistry);
this.failedEvents = Counter.builder("events.published.failed")
.description("Number of failed event publications")
.register(meterRegistry);
this.publishingLatency = Timer.builder("events.publishing.latency")
.description("Event publishing latency")
.register(meterRegistry);
}
public CompletableFuture<Void> publishOrderEvent(OrderEvent event) {
return Timer.Sample.start(meterRegistry)
.stop(publishingLatency)
.thenCompose(sample -> {
String topic = determineTopicFromEvent(event);
String key = event.getOrderId();
return kafkaTemplate.send(topic, key, event)
.completable()
.thenRun(() -> {
successfulEvents.increment();
log.info("Order event published: type={}, orderId={}",
event.getEventType(), event.getOrderId());
})
.exceptionally(throwable -> {
failedEvents.increment();
log.error("Failed to publish order event: type={}, orderId={}",
event.getEventType(), event.getOrderId(), throwable);
return null;
});
});
}
private String determineTopicFromEvent(OrderEvent event) {
switch (event.getEventType()) {
case "OrderPlaced":
return eventProperties.getTopics().getOrderPlaced();
case "OrderCancelled":
return eventProperties.getTopics().getOrderCancelled();
case "PaymentProcessed":
return eventProperties.getTopics().getPaymentProcessed();
default:
throw new IllegalArgumentException("Unknown event type: " + event.getEventType());
}
}
@EventListener
public void handleApplicationReady(ApplicationReadyEvent event) {
log.info("Production Event Service ready. Kafka template configured with {} properties",
kafkaTemplate.getProducerFactory().getConfigurationProperties().size());
}
}Diese Implementierungen zeigen realistische Ansätze für robuste Event-Produktion in Spring Boot Anwendungen. Sie berücksichtigen Performance, Fehlerbehandlung, Monitoring und Production-Requirements, während sie die Flexibilität der Spring Kafka Integration vollständig ausnutzen.
Status-Tracking: ✅ Kapitel “Event-Erzeugung mit Spring Boot” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf KafkaTemplate, Transaktionen und Production-Ready Implementierungen