Event-Producer sind die Eingangstore in eine Event-Driven Architecture. Sie wandeln Geschäftsereignisse in Events um und stellen diese anderen Systemkomponenten zur Verfügung. Ein gut gestalteter Producer folgt klaren architektonischen Prinzipien, die Wartbarkeit, Testbarkeit und Fehlertoleranz gewährleisten.
Ein Producer sollte klar getrennte Verantwortlichkeiten haben, um Komplexität zu reduzieren und Änderbarkeit zu fördern. Die Kernverantwortlichkeiten lassen sich in drei Schichten aufteilen:
Geschäftslogik-Schicht: Hier findet die eigentliche Fachlogik statt. Im E-Commerce-Beispiel verarbeitet der OrderService Bestellungen, validiert Daten und führt Geschäftsregeln aus. Diese Schicht weiß nichts von Events oder deren Transport.
Event-Abstraktion-Schicht: Diese Zwischenschicht übersetzt Geschäftsereignisse in Events. Sie definiert, welche Informationen ein Event enthalten soll, wie es strukturiert wird und wann es ausgelöst werden soll.
Transport-Schicht: Die unterste Schicht kümmert sich um die technische Übertragung der Events an den Broker. Sie handhabt Serialisierung, Retry-Mechanismen und Fehlerbehandlung.
Spring Boot Implementierung:
// Geschäftslogik-Schicht
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final OrderEventPublisher eventPublisher;
public Order createOrder(CreateOrderRequest request) {
Order order = new Order(request);
validateOrder(order);
Order savedOrder = orderRepository.save(order);
eventPublisher.publishOrderPlaced(savedOrder);
return savedOrder;
}
}
// Event-Abstraktion-Schicht
@Component
public class OrderEventPublisher {
private final EventProducer eventProducer;
public void publishOrderPlaced(Order order) {
OrderPlacedEvent event = OrderPlacedEvent.from(order);
eventProducer.send("order.placed.v1", event);
}
}
// Transport-Schicht
@Component
public class EventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void send(String topic, Object event) {
kafkaTemplate.send(topic, event);
}
}Python Implementierung:
# Geschäftslogik-Schicht
class OrderService:
def __init__(self, repository: OrderRepository,
event_publisher: OrderEventPublisher):
self.repository = repository
self.event_publisher = event_publisher
def create_order(self, request: CreateOrderRequest) -> Order:
order = Order.from_request(request)
self.validate_order(order)
saved_order = self.repository.save(order)
self.event_publisher.publish_order_placed(saved_order)
return saved_order
# Event-Abstraktion-Schicht
class OrderEventPublisher:
def __init__(self, event_producer: EventProducer):
self.event_producer = event_producer
def publish_order_placed(self, order: Order):
event = OrderPlacedEvent.from_order(order)
self.event_producer.send("order.placed.v1", event)
# Transport-Schicht
class EventProducer:
def __init__(self, kafka_producer):
self.kafka_producer = kafka_producer
def send(self, topic: str, event: Any):
self.kafka_producer.produce(topic, event)Diese Trennung ermöglicht es, jede Schicht unabhängig zu testen und zu ändern. Die Geschäftslogik bleibt frei von Infrastructure-Code, während die Transport-Schicht ausgetauscht werden kann, ohne die Fachlogik zu beeinflussen.
Producer-Komponenten benötigen saubere Konfiguration und Lifecycle-Management, um in verschiedenen Umgebungen zuverlässig zu funktionieren.
Die Konfiguration sollte umgebungsspezifisch und zur Laufzeit änderbar sein. Wichtige Konfigurationsaspekte sind Broker-Verbindungen, Topic-Namen, Serialisierung und Retry-Verhalten.
| Konfigurationsbereich | Spring Boot | Python |
|---|---|---|
| Broker-Verbindung | spring.kafka.bootstrap-servers |
bootstrap.servers |
| Producer-Properties | spring.kafka.producer.* |
Producer-Config Dict |
| Topic-Mapping | @Value oder Config-Klassen |
Environment Variables |
| Retry-Verhalten | spring.kafka.producer.retries |
Producer-Config |
Spring Boot Konfiguration:
@ConfigurationProperties(prefix = "app.events")
@Component
public class EventConfig {
private String orderTopic = "order.placed.v1";
private int retryAttempts = 3;
private Duration retryDelay = Duration.ofSeconds(1);
// Getters und Setters
}
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
}Python Konfiguration:
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class EventConfig:
order_topic: str = "order.placed.v1"
retry_attempts: int = 3
retry_delay_seconds: int = 1
@classmethod
def from_env(cls):
return cls(
order_topic=os.getenv("ORDER_TOPIC", cls.order_topic),
retry_attempts=int(os.getenv("RETRY_ATTEMPTS", cls.retry_attempts))
)
class ProducerFactory:
def __init__(self, config: EventConfig):
self.config = config
def create_producer(self) -> Producer:
producer_config = {
'bootstrap.servers': os.getenv('KAFKA_BROKERS', 'localhost:9092'),
'key.serializer': 'StringSerializer',
'value.serializer': 'JSONSerializer'
}
return Producer(producer_config)Producer-Komponenten müssen sauber gestartet und beendet werden. Spring Boot bietet hier elegante Lifecycle-Hooks, während Python explizitere Verwaltung erfordert.
Spring Boot Lifecycle:
@Component
public class EventProducerLifecycle implements ApplicationListener<ContextRefreshedEvent> {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// Producer ist bereit für Event-Versendung
log.info("Event Producer initialized successfully");
}
@PreDestroy
public void cleanup() {
// Ausstehende Events abarbeiten
kafkaTemplate.flush();
log.info("Event Producer shutdown completed");
}
}Python Lifecycle:
class EventProducerManager:
def __init__(self):
self.producer = None
self.is_started = False
async def start(self):
if self.is_started:
return
self.producer = await self.create_producer()
self.is_started = True
logger.info("Event Producer started successfully")
async def stop(self):
if not self.is_started:
return
if self.producer:
self.producer.flush()
self.producer.close()
self.is_started = False
logger.info("Event Producer stopped successfully")Robuste Producer-Komponenten benötigen umfassendes Error Handling und Monitoring. Fehler können auf verschiedenen Ebenen auftreten: bei der Event-Erstellung, der Serialisierung oder der Übertragung.
Events können aus verschiedenen Gründen nicht erfolgreich versendet werden. Eine systematische Fehlerbehandlung kategorisiert Fehler und definiert angemessene Reaktionen.
Transiente Fehler: Netzwerkprobleme, Broker-Überlastung oder temporäre Connectivity-Issues. Diese Fehler sollten mit Retry-Mechanismen behandelt werden.
Permanente Fehler: Serialisierungsfehler, ungültige Topic-Namen oder Autorisierungsprobleme. Diese Fehler sollten sofort erkannt und geloggt werden, ohne Retry-Versuche.
Business-Fehler: Validierungsfehler oder fachliche Probleme. Diese sollten bereits vor dem Event-Versand erkannt werden.
Spring Boot Error Handling:
@Component
public class RobustEventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final EventMetrics metrics;
public void sendEvent(String topic, Object event) {
try {
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, event);
future.addCallback(
result -> {
metrics.incrementSuccessCounter(topic);
log.debug("Event sent successfully to topic: {}", topic);
},
failure -> {
metrics.incrementErrorCounter(topic);
handleSendFailure(topic, event, failure);
}
);
} catch (Exception e) {
metrics.incrementErrorCounter(topic);
log.error("Failed to send event to topic: {}", topic, e);
throw new EventPublishingException("Failed to publish event", e);
}
}
private void handleSendFailure(String topic, Object event, Throwable failure) {
if (isRetryableError(failure)) {
scheduleRetry(topic, event);
} else {
log.error("Permanent failure sending event to topic: {}", topic, failure);
sendToDeadLetter(topic, event, failure);
}
}
}Python Error Handling:
import asyncio
from typing import Optional
class RobustEventProducer:
def __init__(self, producer: Producer, metrics: EventMetrics):
self.producer = producer
self.metrics = metrics
async def send_event(self, topic: str, event: Any) -> bool:
try:
future = self.producer.produce(
topic,
value=json.dumps(event),
callback=self._delivery_callback
)
self.producer.poll(0) # Trigger delivery
self.metrics.increment_attempt_counter(topic)
return True
except Exception as e:
self.metrics.increment_error_counter(topic)
await self._handle_send_failure(topic, event, e)
return False
def _delivery_callback(self, err: Optional[Exception], msg):
if err:
self.metrics.increment_error_counter(msg.topic())
logger.error(f"Failed to deliver event: {err}")
else:
self.metrics.increment_success_counter(msg.topic())
logger.debug(f"Event delivered to {msg.topic()}")Effective Monitoring gibt Einblick in die Producer-Performance und hilft bei der frühzeitigen Fehlererkennung.
Wichtige Metriken für Producer:
| Metrik | Bedeutung | Kritische Schwellwerte |
|---|---|---|
| Event Send Rate | Events pro Sekunde | Baseline ± 20% |
| Error Rate | Fehlgeschlagene Sends | > 1% |
| Send Latency | Zeit bis Bestätigung | > 100ms |
| Retry Count | Anzahl Wiederholungen | > 3 pro Event |
Spring Boot Monitoring:
@Component
public class EventMetrics {
private final MeterRegistry meterRegistry;
private final Counter successCounter;
private final Counter errorCounter;
private final Timer sendLatency;
public EventMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.successCounter = Counter.builder("events.sent.success")
.register(meterRegistry);
this.errorCounter = Counter.builder("events.sent.error")
.register(meterRegistry);
this.sendLatency = Timer.builder("events.send.latency")
.register(meterRegistry);
}
public void recordSendLatency(Duration duration) {
sendLatency.record(duration);
}
}Python Monitoring:
from prometheus_client import Counter, Histogram, start_http_server
class EventMetrics:
def __init__(self):
self.success_counter = Counter(
'events_sent_success_total',
'Total successful events sent',
['topic']
)
self.error_counter = Counter(
'events_sent_error_total',
'Total failed events',
['topic', 'error_type']
)
self.send_latency = Histogram(
'events_send_latency_seconds',
'Event send latency',
['topic']
)
def increment_success_counter(self, topic: str):
self.success_counter.labels(topic=topic).inc()
def record_send_latency(self, topic: str, latency: float):
self.send_latency.labels(topic=topic).observe(latency)Diese Monitoring-Ansätze ermöglichen es, Producer-Verhalten zu verstehen und proaktiv auf Probleme zu reagieren. Die Kombination aus strukturiertem Error Handling und detailliertem Monitoring bildet das Fundament für zuverlässige Event-Produktion in produktiven Umgebungen.
Status-Tracking: ✅ Kapitel “Architektur einer Producer-Komponente” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf Schichtentrennung und Robustheit