20 Architektur einer Producer-Komponente

Event-Producer sind die Eingangstore in eine Event-Driven Architecture. Sie wandeln Geschäftsereignisse in Events um und stellen diese anderen Systemkomponenten zur Verfügung. Ein gut gestalteter Producer folgt klaren architektonischen Prinzipien, die Wartbarkeit, Testbarkeit und Fehlertoleranz gewährleisten.

20.1 Separation of Concerns im Producer

Ein Producer sollte klar getrennte Verantwortlichkeiten haben, um Komplexität zu reduzieren und Änderbarkeit zu fördern. Die Kernverantwortlichkeiten lassen sich in drei Schichten aufteilen:

Geschäftslogik-Schicht: Hier findet die eigentliche Fachlogik statt. Im E-Commerce-Beispiel verarbeitet der OrderService Bestellungen, validiert Daten und führt Geschäftsregeln aus. Diese Schicht weiß nichts von Events oder deren Transport.

Event-Abstraktion-Schicht: Diese Zwischenschicht übersetzt Geschäftsereignisse in Events. Sie definiert, welche Informationen ein Event enthalten soll, wie es strukturiert wird und wann es ausgelöst werden soll.

Transport-Schicht: Die unterste Schicht kümmert sich um die technische Übertragung der Events an den Broker. Sie handhabt Serialisierung, Retry-Mechanismen und Fehlerbehandlung.

20.1.1 Praktische Umsetzung der Schichtentrennung

Spring Boot Implementierung:

// Geschäftslogik-Schicht
@Service
public class OrderService {
    private final OrderRepository orderRepository;
    private final OrderEventPublisher eventPublisher;
    
    public Order createOrder(CreateOrderRequest request) {
        Order order = new Order(request);
        validateOrder(order);
        
        Order savedOrder = orderRepository.save(order);
        eventPublisher.publishOrderPlaced(savedOrder);
        
        return savedOrder;
    }
}

// Event-Abstraktion-Schicht  
@Component
public class OrderEventPublisher {
    private final EventProducer eventProducer;
    
    public void publishOrderPlaced(Order order) {
        OrderPlacedEvent event = OrderPlacedEvent.from(order);
        eventProducer.send("order.placed.v1", event);
    }
}

// Transport-Schicht
@Component
public class EventProducer {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public void send(String topic, Object event) {
        kafkaTemplate.send(topic, event);
    }
}

Python Implementierung:

# Geschäftslogik-Schicht
class OrderService:
    def __init__(self, repository: OrderRepository, 
                 event_publisher: OrderEventPublisher):
        self.repository = repository
        self.event_publisher = event_publisher
    
    def create_order(self, request: CreateOrderRequest) -> Order:
        order = Order.from_request(request)
        self.validate_order(order)
        
        saved_order = self.repository.save(order)
        self.event_publisher.publish_order_placed(saved_order)
        
        return saved_order

# Event-Abstraktion-Schicht
class OrderEventPublisher:
    def __init__(self, event_producer: EventProducer):
        self.event_producer = event_producer
    
    def publish_order_placed(self, order: Order):
        event = OrderPlacedEvent.from_order(order)
        self.event_producer.send("order.placed.v1", event)

# Transport-Schicht  
class EventProducer:
    def __init__(self, kafka_producer):
        self.kafka_producer = kafka_producer
    
    def send(self, topic: str, event: Any):
        self.kafka_producer.produce(topic, event)

Diese Trennung ermöglicht es, jede Schicht unabhängig zu testen und zu ändern. Die Geschäftslogik bleibt frei von Infrastructure-Code, während die Transport-Schicht ausgetauscht werden kann, ohne die Fachlogik zu beeinflussen.

20.2 Configuration und Lifecycle Management

Producer-Komponenten benötigen saubere Konfiguration und Lifecycle-Management, um in verschiedenen Umgebungen zuverlässig zu funktionieren.

20.2.1 Konfigurationsstrategie

Die Konfiguration sollte umgebungsspezifisch und zur Laufzeit änderbar sein. Wichtige Konfigurationsaspekte sind Broker-Verbindungen, Topic-Namen, Serialisierung und Retry-Verhalten.

Konfigurationsbereich Spring Boot Python
Broker-Verbindung spring.kafka.bootstrap-servers bootstrap.servers
Producer-Properties spring.kafka.producer.* Producer-Config Dict
Topic-Mapping @Value oder Config-Klassen Environment Variables
Retry-Verhalten spring.kafka.producer.retries Producer-Config

Spring Boot Konfiguration:

@ConfigurationProperties(prefix = "app.events")
@Component
public class EventConfig {
    private String orderTopic = "order.placed.v1";
    private int retryAttempts = 3;
    private Duration retryDelay = Duration.ofSeconds(1);
    
    // Getters und Setters
}

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                       "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
}

Python Konfiguration:

from dataclasses import dataclass
from typing import Dict, Any

@dataclass
class EventConfig:
    order_topic: str = "order.placed.v1"
    retry_attempts: int = 3
    retry_delay_seconds: int = 1
    
    @classmethod
    def from_env(cls):
        return cls(
            order_topic=os.getenv("ORDER_TOPIC", cls.order_topic),
            retry_attempts=int(os.getenv("RETRY_ATTEMPTS", cls.retry_attempts))
        )

class ProducerFactory:
    def __init__(self, config: EventConfig):
        self.config = config
        
    def create_producer(self) -> Producer:
        producer_config = {
            'bootstrap.servers': os.getenv('KAFKA_BROKERS', 'localhost:9092'),
            'key.serializer': 'StringSerializer',
            'value.serializer': 'JSONSerializer'
        }
        return Producer(producer_config)

20.2.2 Lifecycle Management

Producer-Komponenten müssen sauber gestartet und beendet werden. Spring Boot bietet hier elegante Lifecycle-Hooks, während Python explizitere Verwaltung erfordert.

Spring Boot Lifecycle:

@Component
public class EventProducerLifecycle implements ApplicationListener<ContextRefreshedEvent> {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // Producer ist bereit für Event-Versendung
        log.info("Event Producer initialized successfully");
    }
    
    @PreDestroy
    public void cleanup() {
        // Ausstehende Events abarbeiten
        kafkaTemplate.flush();
        log.info("Event Producer shutdown completed");
    }
}

Python Lifecycle:

class EventProducerManager:
    def __init__(self):
        self.producer = None
        self.is_started = False
        
    async def start(self):
        if self.is_started:
            return
            
        self.producer = await self.create_producer()
        self.is_started = True
        logger.info("Event Producer started successfully")
        
    async def stop(self):
        if not self.is_started:
            return
            
        if self.producer:
            self.producer.flush()
            self.producer.close()
        
        self.is_started = False
        logger.info("Event Producer stopped successfully")

20.3 Error Handling und Monitoring

Robuste Producer-Komponenten benötigen umfassendes Error Handling und Monitoring. Fehler können auf verschiedenen Ebenen auftreten: bei der Event-Erstellung, der Serialisierung oder der Übertragung.

20.3.1 Fehlerklassifizierung und -behandlung

Events können aus verschiedenen Gründen nicht erfolgreich versendet werden. Eine systematische Fehlerbehandlung kategorisiert Fehler und definiert angemessene Reaktionen.

Transiente Fehler: Netzwerkprobleme, Broker-Überlastung oder temporäre Connectivity-Issues. Diese Fehler sollten mit Retry-Mechanismen behandelt werden.

Permanente Fehler: Serialisierungsfehler, ungültige Topic-Namen oder Autorisierungsprobleme. Diese Fehler sollten sofort erkannt und geloggt werden, ohne Retry-Versuche.

Business-Fehler: Validierungsfehler oder fachliche Probleme. Diese sollten bereits vor dem Event-Versand erkannt werden.

20.3.2 Implementierung der Fehlerbehandlung

Spring Boot Error Handling:

@Component
public class RobustEventProducer {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final EventMetrics metrics;
    
    public void sendEvent(String topic, Object event) {
        try {
            ListenableFuture<SendResult<String, Object>> future = 
                kafkaTemplate.send(topic, event);
                
            future.addCallback(
                result -> {
                    metrics.incrementSuccessCounter(topic);
                    log.debug("Event sent successfully to topic: {}", topic);
                },
                failure -> {
                    metrics.incrementErrorCounter(topic);
                    handleSendFailure(topic, event, failure);
                }
            );
        } catch (Exception e) {
            metrics.incrementErrorCounter(topic);
            log.error("Failed to send event to topic: {}", topic, e);
            throw new EventPublishingException("Failed to publish event", e);
        }
    }
    
    private void handleSendFailure(String topic, Object event, Throwable failure) {
        if (isRetryableError(failure)) {
            scheduleRetry(topic, event);
        } else {
            log.error("Permanent failure sending event to topic: {}", topic, failure);
            sendToDeadLetter(topic, event, failure);
        }
    }
}

Python Error Handling:

import asyncio
from typing import Optional

class RobustEventProducer:
    def __init__(self, producer: Producer, metrics: EventMetrics):
        self.producer = producer
        self.metrics = metrics
        
    async def send_event(self, topic: str, event: Any) -> bool:
        try:
            future = self.producer.produce(
                topic, 
                value=json.dumps(event),
                callback=self._delivery_callback
            )
            
            self.producer.poll(0)  # Trigger delivery
            self.metrics.increment_attempt_counter(topic)
            return True
            
        except Exception as e:
            self.metrics.increment_error_counter(topic)
            await self._handle_send_failure(topic, event, e)
            return False
    
    def _delivery_callback(self, err: Optional[Exception], msg):
        if err:
            self.metrics.increment_error_counter(msg.topic())
            logger.error(f"Failed to deliver event: {err}")
        else:
            self.metrics.increment_success_counter(msg.topic())
            logger.debug(f"Event delivered to {msg.topic()}")

20.3.3 Monitoring und Metriken

Effective Monitoring gibt Einblick in die Producer-Performance und hilft bei der frühzeitigen Fehlererkennung.

Wichtige Metriken für Producer:

Metrik Bedeutung Kritische Schwellwerte
Event Send Rate Events pro Sekunde Baseline ± 20%
Error Rate Fehlgeschlagene Sends > 1%
Send Latency Zeit bis Bestätigung > 100ms
Retry Count Anzahl Wiederholungen > 3 pro Event

Spring Boot Monitoring:

@Component
public class EventMetrics {
    private final MeterRegistry meterRegistry;
    private final Counter successCounter;
    private final Counter errorCounter;
    private final Timer sendLatency;
    
    public EventMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.successCounter = Counter.builder("events.sent.success")
            .register(meterRegistry);
        this.errorCounter = Counter.builder("events.sent.error")
            .register(meterRegistry);
        this.sendLatency = Timer.builder("events.send.latency")
            .register(meterRegistry);
    }
    
    public void recordSendLatency(Duration duration) {
        sendLatency.record(duration);
    }
}

Python Monitoring:

from prometheus_client import Counter, Histogram, start_http_server

class EventMetrics:
    def __init__(self):
        self.success_counter = Counter(
            'events_sent_success_total', 
            'Total successful events sent',
            ['topic']
        )
        self.error_counter = Counter(
            'events_sent_error_total',
            'Total failed events',
            ['topic', 'error_type']
        )
        self.send_latency = Histogram(
            'events_send_latency_seconds',
            'Event send latency',
            ['topic']
        )
    
    def increment_success_counter(self, topic: str):
        self.success_counter.labels(topic=topic).inc()
        
    def record_send_latency(self, topic: str, latency: float):
        self.send_latency.labels(topic=topic).observe(latency)

Diese Monitoring-Ansätze ermöglichen es, Producer-Verhalten zu verstehen und proaktiv auf Probleme zu reagieren. Die Kombination aus strukturiertem Error Handling und detailliertem Monitoring bildet das Fundament für zuverlässige Event-Produktion in produktiven Umgebungen.


Status-Tracking: ✅ Kapitel “Architektur einer Producer-Komponente” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf Schichtentrennung und Robustheit