9 Asynchrone Kommunikation: Producer, Broker, Consumer

Die Grundlage jeder Event-Driven Architecture bildet das asynchrone Kommunikationsmodell zwischen drei zentralen Komponenten: Producer, Broker und Consumer. Dieses Modell ermöglicht die zeitliche und räumliche Entkopplung von Systemkomponenten und bildet das Fundament für skalierbare, resiliente Architekturen.

9.1 Grundlegende Architekturkomponenten

Die drei Komponenten bilden ein entkoppeltes System, bei dem jede Komponente ihre spezifische Rolle erfüllt, ohne direkte Abhängigkeiten zu den anderen zu haben.

9.2 Rollen und Verantwortlichkeiten

Komponente Verantwortlichkeit Beispiel im E-Commerce
Producer Erzeugt Events bei Geschäftsereignissen OrderService erstellt “OrderPlaced” Event
Broker Persistiert, ordnet und verteilt Events Kafka speichert Events in Topics
Consumer Reagiert auf Events mit Geschäftslogik InventoryService reserviert Artikel

9.2.1 Producer: Event-Erzeugung

Ein Producer reagiert auf Geschäftsereignisse und wandelt diese in Events um. Dabei kennt er weder die Anzahl noch die Art der Consumer, die diese Events verarbeiten werden.

// Spring Boot Producer Beispiel
@Service
public class OrderService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void placeOrder(Order order) {
        // Geschäftslogik: Bestellung validieren und speichern
        validateAndSaveOrder(order);
        
        // Event erzeugen
        OrderPlacedEvent event = new OrderPlacedEvent(
            order.getId(), 
            order.getCustomerId(), 
            order.getTotalAmount()
        );
        
        // Event publizieren
        kafkaTemplate.send("order.placed.v1", 
                          order.getId().toString(), 
                          toJson(event));
    }
}
# Python Producer Beispiel
from confluent_kafka import Producer
import json

class OrderService:
    def __init__(self):
        self.producer = Producer({'bootstrap.servers': 'localhost:9092'})
    
    def place_order(self, order):
        # Geschäftslogik: Bestellung validieren und speichern
        self.validate_and_save_order(order)
        
        # Event erzeugen
        event = {
            'orderId': order['id'],
            'customerId': order['customerId'], 
            'totalAmount': order['totalAmount']
        }
        
        # Event publizieren
        self.producer.produce(
            topic='order.placed.v1',
            key=str(order['id']),
            value=json.dumps(event)
        )
        self.producer.flush()

9.2.2 Broker: Event-Vermittlung

Der Broker fungiert als zentraler Vermittler zwischen Producern und Consumern. Er übernimmt die Persistierung der Events und stellt sicher, dass jeder Consumer die für ihn relevanten Events in der korrekten Reihenfolge erhält.

9.2.3 Consumer: Event-Verarbeitung

Consumer abonnieren bestimmte Event-Typen und führen ihre spezifische Geschäftslogik aus, wenn entsprechende Events eintreffen.

// Spring Boot Consumer Beispiel
@Component
public class InventoryService {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(String message) {
        OrderPlacedEvent event = fromJson(message, OrderPlacedEvent.class);
        
        // Geschäftslogik: Artikel reservieren
        reserveItems(event.getOrderId());
        
        // Weiteres Event erzeugen (falls nötig)
        publishInventoryReservedEvent(event.getOrderId());
    }
}
# Python Consumer Beispiel
from confluent_kafka import Consumer
import json

class InventoryService:
    def __init__(self):
        self.consumer = Consumer({
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'inventory-service',
            'auto.offset.reset': 'earliest'
        })
        self.consumer.subscribe(['order.placed.v1'])
    
    def run(self):
        while True:
            msg = self.consumer.poll(1.0)
            if msg and not msg.error():
                event = json.loads(msg.value().decode('utf-8'))
                
                # Geschäftslogik: Artikel reservieren
                self.reserve_items(event['orderId'])
                
                # Weiteres Event erzeugen (falls nötig)
                self.publish_inventory_reserved_event(event['orderId'])

9.3 Entkopplung durch asynchrone Patterns

Die asynchrone Kommunikation ermöglicht verschiedene Formen der Entkopplung, die entscheidend für die Skalierbarkeit und Wartbarkeit von Systemen sind.

9.3.1 Zeitliche Entkopplung

Producer und Consumer müssen nicht gleichzeitig aktiv sein. Events werden im Broker zwischengespeichert und können auch verarbeitet werden, wenn der ursprüngliche Producer bereits beendet ist.

Vorteil: Services können unabhängig voneinander neu gestartet oder aktualisiert werden, ohne dass Daten verloren gehen.

9.3.2 Räumliche Entkopplung

Producer kennen ihre Consumer nicht und umgekehrt. Neue Consumer können hinzugefügt werden, ohne dass bestehende Producer modifiziert werden müssen.

Vorteil: Das System kann horizontal skaliert werden, indem neue Consumer-Instanzen hinzugefügt werden.

9.3.3 Synchronisationsentkopplung

Jeder Consumer verarbeitet Events in seinem eigenen Tempo. Ein langsamer Consumer blockiert nicht die anderen Consumer oder den Producer.

Vorteil: Unterschiedliche Services können verschiedene Performance-Charakteristika haben, ohne sich gegenseitig zu beeinflussen.

Die Kombination dieser Entkopplungsformen ermöglicht es, robuste Systeme zu bauen, die sich organisch entwickeln und skalieren können. Ein OrderService kann beispielsweise gleichzeitig von einem schnellen InventoryService und einem langsameren AnalyticsService verarbeitet werden, ohne dass einer den anderen beeinflusst.

Diese Architektur bildet die Grundlage für komplexere Patterns wie Event Sourcing und CQRS, die in späteren Kapiteln behandelt werden.