21 Ereignisse als Nebenprodukt vs. explizites Publishing

Die Art und Weise, wie Events in einem System erzeugt werden, hat fundamentale Auswirkungen auf Architektur, Konsistenz und Wartbarkeit. Es gibt zwei grundlegend verschiedene Ansätze: Events können als automatisches Nebenprodukt von Geschäftsoperationen entstehen oder explizit durch Anwendungscode publiziert werden. Jeder Ansatz bringt spezifische Vor- und Nachteile mit sich.

21.1 Konzeptionelle Unterschiede

Events als Nebenprodukt entstehen automatisch, wenn sich der Zustand eines Systems ändert. Das System observiert Datenänderungen und leitet daraus Events ab, ohne dass der Geschäftscode explizit Events publiziert. Dieser Ansatz entkoppelt die Fachlogik vollständig von der Event-Erzeugung.

Explizites Publishing macht die Event-Erzeugung zu einem bewussten Teil der Geschäftslogik. Entwickler entscheiden aktiv, wann und welche Events publiziert werden. Dies gibt maximale Kontrolle, führt aber zu engerer Kopplung zwischen Fachlogik und Event-System.

21.1.1 Charakteristische Unterschiede

Aspekt Nebenprodukt Explizites Publishing
Kopplung Sehr lose Enger gekoppelt
Kontrolle Systemgesteuert Entwicklergesteuert
Konsistenz Automatisch Manuell sicherstellen
Flexibilität Begrenzt Hochflexibel
Komplexität Infrastruktur-lastig Code-lastig

21.2 Events als automatisches Nebenprodukt

Bei diesem Ansatz werden Events durch Infrastructure-Komponenten erzeugt, die Änderungen am Systemzustand observieren. Change Data Capture (CDC), Database Triggers oder Event Sourcing Frameworks können als automatische Event-Generatoren fungieren.

21.2.1 Database-getriebene Event-Erzeugung

// Spring Boot: JPA Event Listeners
@Entity
public class Order {
    @Id private String orderId;
    private OrderStatus status;
    private BigDecimal totalAmount;
    
    @PostPersist
    public void onOrderCreated() {
        // Event wird automatisch durch JPA Lifecycle ausgelöst
        ApplicationEventPublisher publisher = 
            BeanUtils.getBean(ApplicationEventPublisher.class);
        publisher.publishEvent(new OrderCreatedEvent(this));
    }
    
    @PostUpdate  
    public void onOrderUpdated() {
        ApplicationEventPublisher publisher = 
            BeanUtils.getBean(ApplicationEventPublisher.class);
        publisher.publishEvent(new OrderUpdatedEvent(this));
    }
}

// Event Listener für automatische Weiterleitung
@EventListener
public class OrderEventHandler {
    private final EventProducer eventProducer;
    
    @Async
    public void handleOrderCreated(OrderCreatedEvent event) {
        OrderPlacedEvent kafkaEvent = OrderPlacedEvent.from(event.getOrder());
        eventProducer.send("order.placed.v1", kafkaEvent);
    }
}
# Python: SQLAlchemy Event System
from sqlalchemy import event
from sqlalchemy.orm import Session

class Order(Base):
    __tablename__ = 'orders'
    
    order_id = Column(String, primary_key=True)
    status = Column(Enum(OrderStatus))
    total_amount = Column(Numeric)

# Automatische Event-Erzeugung über SQLAlchemy Events
@event.listens_for(Order, 'after_insert')
def order_created_listener(mapper, connection, target):
    # Event wird automatisch nach INSERT ausgelöst
    event_data = {
        'orderId': target.order_id,
        'totalAmount': float(target.total_amount),
        'timestamp': datetime.utcnow().isoformat()
    }
    
    # Async Event Publishing
    asyncio.create_task(
        publish_order_event('order.placed.v1', event_data)
    )

@event.listens_for(Order, 'after_update')  
def order_updated_listener(mapper, connection, target):
    if target.status == OrderStatus.CANCELLED:
        event_data = {
            'orderId': target.order_id,
            'reason': 'customer_cancellation'
        }
        asyncio.create_task(
            publish_order_event('order.cancelled.v1', event_data)
        )

21.2.2 Vorteile der automatischen Event-Erzeugung

Vollständige Entkopplung: Der Geschäftscode muss nichts über Events wissen. Fachlogik bleibt rein fachlich.

Automatische Konsistenz: Jede Zustandsänderung führt automatisch zu einem Event. Vergessene Events sind nicht möglich.

Einfache Migration: Bestehende Systeme können nachträglich mit Event-Erzeugung ausgestattet werden, ohne Code-Änderungen.

21.2.3 Nachteile und Einschränkungen

Begrenzte Semantik: Events reflektieren nur technische Datenänderungen, nicht die fachliche Bedeutung.

Schwer kontrollierbar: Event-Zeitpunkt und -Inhalt sind durch die Infrastructure bestimmt.

Performance-Impact: Jede Datenänderung löst Event-Verarbeitung aus, auch bei unwichtigen Updates.

21.3 Explizite Event-Publikation

Explizites Publishing macht Events zu einem bewussten Teil der Geschäftslogik. Entwickler entscheiden, wann Events publiziert werden und welche Informationen sie enthalten.

21.3.1 Direkte Integration in Geschäftslogik

// Spring Boot: Explizite Event-Publikation
@Service
@Transactional
public class OrderService {
    private final OrderRepository orderRepository;
    private final OrderEventPublisher eventPublisher;
    
    public Order processOrder(CreateOrderRequest request) {
        // 1. Geschäftslogik ausführen
        Order order = new Order(request);
        validateOrderBusinessRules(order);
        
        Order savedOrder = orderRepository.save(order);
        
        // 2. Explizit Event publizieren
        eventPublisher.publishOrderPlaced(savedOrder);
        
        return savedOrder;
    }
    
    public void cancelOrder(String orderId, String reason) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
            
        // Geschäftsregeln prüfen
        if (!order.isCancellable()) {
            throw new OrderNotCancellableException(orderId);
        }
        
        // Zustand ändern
        order.cancel(reason);
        orderRepository.save(order);
        
        // Fachlich relevantes Event publizieren
        eventPublisher.publishOrderCancelled(order, reason);
    }
}
# Python: Explizite Event-Publikation
class OrderService:
    def __init__(self, repository: OrderRepository, 
                 event_publisher: OrderEventPublisher):
        self.repository = repository
        self.event_publisher = event_publisher
    
    async def process_order(self, request: CreateOrderRequest) -> Order:
        # 1. Geschäftslogik ausführen
        order = Order.from_request(request)
        await self.validate_order_business_rules(order)
        
        saved_order = await self.repository.save(order)
        
        # 2. Explizit Event publizieren
        await self.event_publisher.publish_order_placed(saved_order)
        
        return saved_order
    
    async def cancel_order(self, order_id: str, reason: str):
        order = await self.repository.find_by_id(order_id)
        if not order:
            raise OrderNotFoundException(order_id)
            
        # Geschäftsregeln prüfen
        if not order.is_cancellable():
            raise OrderNotCancellableException(order_id)
        
        # Zustand ändern und fachlich relevantes Event publizieren
        order.cancel(reason)
        await self.repository.save(order)
        await self.event_publisher.publish_order_cancelled(order, reason)

21.3.2 Vorteile expliziter Publikation

Fachliche Semantik: Events können exakt die Geschäftsereignisse repräsentieren, nicht nur Datenänderungen.

Vollständige Kontrolle: Entwickler bestimmen Zeitpunkt, Inhalt und Kondition der Event-Publikation.

Business-orientiert: Event-Struktur folgt fachlichen Anforderungen, nicht technischen Constraints.

21.3.3 Herausforderungen

Dual Write Problem: Datenbank-Updates und Event-Publikation sind separate Operationen, die inkonsistent werden können.

Entwickler-Disziplin: Events können vergessen werden oder inkonsistent implementiert sein.

Transaktionale Komplexität: Event-Publikation muss mit Geschäftstransaktionen koordiniert werden.

21.4 Outbox Pattern

Das Outbox Pattern löst das Dual Write Problem, indem es Event-Publikation in dieselbe Transaktion wie die Geschäftsoperation einbettet. Events werden zunächst in einer lokalen “Outbox”-Tabelle gespeichert und später asynchron publiziert.

21.4.1 Grundprinzip des Outbox Patterns

  1. Transaktionale Speicherung: Events werden in derselben Datenbank-Transaktion wie die Geschäftsdaten in eine Outbox-Tabelle geschrieben
  2. Asynchrone Publikation: Ein separater Prozess liest Events aus der Outbox und publiziert sie an den Event-Broker
  3. At-least-once Garantie: Events werden garantiert publiziert, können aber mehrfach auftreten

21.4.2 Implementierung der Outbox-Tabelle

// Spring Boot: Outbox Entity
@Entity
@Table(name = "event_outbox")
public class OutboxEvent {
    @Id
    private String eventId;
    
    @Column(name = "aggregate_id")
    private String aggregateId;
    
    @Column(name = "event_type") 
    private String eventType;
    
    @Column(name = "event_data", columnDefinition = "TEXT")
    private String eventData;
    
    @Column(name = "created_at")
    private Instant createdAt;
    
    @Column(name = "published_at")
    private Instant publishedAt;
    
    @Column(name = "status")
    @Enumerated(EnumType.STRING)
    private OutboxEventStatus status;
    
    // Konstruktoren, Getters, Setters
}

// Outbox Repository
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
    List<OutboxEvent> findByStatusOrderByCreatedAt(OutboxEventStatus status);
    
    @Modifying
    @Query("UPDATE OutboxEvent o SET o.status = :status, o.publishedAt = :publishedAt WHERE o.eventId = :eventId")
    void markAsPublished(@Param("eventId") String eventId, 
                        @Param("status") OutboxEventStatus status,
                        @Param("publishedAt") Instant publishedAt);
}
# Python: Outbox Model mit SQLAlchemy
from sqlalchemy import Column, String, Text, DateTime, Enum
from enum import Enum as PyEnum

class OutboxEventStatus(PyEnum):
    PENDING = "PENDING"
    PUBLISHED = "PUBLISHED"
    FAILED = "FAILED"

class OutboxEvent(Base):
    __tablename__ = 'event_outbox'
    
    event_id = Column(String(36), primary_key=True)
    aggregate_id = Column(String(36), nullable=False)
    event_type = Column(String(100), nullable=False)
    event_data = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    published_at = Column(DateTime)
    status = Column(Enum(OutboxEventStatus), default=OutboxEventStatus.PENDING)

# Outbox Repository
class OutboxEventRepository:
    def __init__(self, session: Session):
        self.session = session
    
    async def save(self, outbox_event: OutboxEvent):
        self.session.add(outbox_event)
        await self.session.commit()
    
    async def find_pending_events(self) -> List[OutboxEvent]:
        return await self.session.query(OutboxEvent).filter(
            OutboxEvent.status == OutboxEventStatus.PENDING
        ).order_by(OutboxEvent.created_at).all()
    
    async def mark_as_published(self, event_id: str):
        await self.session.query(OutboxEvent).filter(
            OutboxEvent.event_id == event_id
        ).update({
            'status': OutboxEventStatus.PUBLISHED,
            'published_at': datetime.utcnow()
        })
        await self.session.commit()

21.5 Transactional Outbox

Der Transactional Outbox Ansatz erweitert das grundlegende Outbox Pattern um robuste Transaktionsbehandlung und zuverlässige Event-Publikation.

21.5.1 Service-Integration mit Outbox

// Spring Boot: Service mit Transactional Outbox
@Service
@Transactional
public class TransactionalOrderService {
    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxRepository;
    
    public Order createOrder(CreateOrderRequest request) {
        // 1. Geschäftsoperation
        Order order = new Order(request);
        Order savedOrder = orderRepository.save(order);
        
        // 2. Event in Outbox speichern (gleiche Transaktion!)
        OutboxEvent outboxEvent = createOrderPlacedOutboxEvent(savedOrder);
        outboxRepository.save(outboxEvent);
        
        return savedOrder;
    }
    
    private OutboxEvent createOrderPlacedOutboxEvent(Order order) {
        OrderPlacedEvent event = OrderPlacedEvent.from(order);
        
        return OutboxEvent.builder()
            .eventId(UUID.randomUUID().toString())
            .aggregateId(order.getOrderId())
            .eventType("OrderPlaced")
            .eventData(JsonUtils.toJson(event))
            .status(OutboxEventStatus.PENDING)
            .createdAt(Instant.now())
            .build();
    }
}

// Outbox Event Publisher (separater Prozess)
@Component
public class OutboxEventPublisher {
    private final OutboxEventRepository outboxRepository;
    private final EventProducer eventProducer;
    
    @Scheduled(fixedDelay = 5000) // Alle 5 Sekunden
    public void publishPendingEvents() {
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAt(OutboxEventStatus.PENDING);
            
        for (OutboxEvent outboxEvent : pendingEvents) {
            try {
                publishEvent(outboxEvent);
                outboxRepository.markAsPublished(
                    outboxEvent.getEventId(),
                    OutboxEventStatus.PUBLISHED,
                    Instant.now()
                );
            } catch (Exception e) {
                log.error("Failed to publish outbox event: {}", 
                         outboxEvent.getEventId(), e);
                // Retry-Logic oder Dead Letter handling
            }
        }
    }
    
    private void publishEvent(OutboxEvent outboxEvent) {
        String topic = determineTopicFromEventType(outboxEvent.getEventType());
        Object eventData = JsonUtils.fromJson(outboxEvent.getEventData());
        eventProducer.send(topic, eventData);
    }
}
# Python: Service mit Transactional Outbox
class TransactionalOrderService:
    def __init__(self, order_repository: OrderRepository,
                 outbox_repository: OutboxEventRepository):
        self.order_repository = order_repository
        self.outbox_repository = outbox_repository
    
    @transactional
    async def create_order(self, request: CreateOrderRequest) -> Order:
        # 1. Geschäftsoperation
        order = Order.from_request(request)
        saved_order = await self.order_repository.save(order)
        
        # 2. Event in Outbox speichern (gleiche Transaktion!)
        outbox_event = self.create_order_placed_outbox_event(saved_order)
        await self.outbox_repository.save(outbox_event)
        
        return saved_order
    
    def create_order_placed_outbox_event(self, order: Order) -> OutboxEvent:
        event = OrderPlacedEvent.from_order(order)
        
        return OutboxEvent(
            event_id=str(uuid4()),
            aggregate_id=order.order_id,
            event_type="OrderPlaced",
            event_data=json.dumps(event.to_dict()),
            status=OutboxEventStatus.PENDING
        )

# Outbox Event Publisher (Background Task)
class OutboxEventPublisher:
    def __init__(self, outbox_repository: OutboxEventRepository,
                 event_producer: EventProducer):
        self.outbox_repository = outbox_repository
        self.event_producer = event_producer
        
    async def publish_pending_events(self):
        """Wird regelmäßig von einem Scheduler aufgerufen"""
        pending_events = await self.outbox_repository.find_pending_events()
        
        for outbox_event in pending_events:
            try:
                await self.publish_event(outbox_event)
                await self.outbox_repository.mark_as_published(outbox_event.event_id)
            except Exception as e:
                logger.error(f"Failed to publish outbox event: {outbox_event.event_id}", exc_info=e)
                # Retry-Logic implementieren
    
    async def publish_event(self, outbox_event: OutboxEvent):
        topic = self.determine_topic_from_event_type(outbox_event.event_type)
        event_data = json.loads(outbox_event.event_data)
        await self.event_producer.send_event(topic, event_data)

21.5.2 Vorteile des Transactional Outbox Patterns

Transaktionale Sicherheit: Events und Geschäftsdaten werden atomisch gespeichert.

At-least-once Garantie: Events gehen nie verloren, auch bei System-Ausfällen.

Skalierbare Publikation: Event-Publikation kann unabhängig von Geschäftsoperationen skaliert werden.

Retry-fähig: Fehlgeschlagene Event-Publikationen können wiederholt werden.

21.6 Event-driven Sideeffects

Event-driven Sideeffects entstehen, wenn Events selbst weitere Events auslösen. Dies führt zu Event-Ketten oder Event-Choreographie, wo ein initiales Event eine Kaskade von Folge-Events triggert.

21.6.1 Choreographie-basierte Event-Ketten

// Spring Boot: Event-driven Sideeffects
@EventListener
public class PaymentEventHandler {
    private final InventoryService inventoryService;
    private final ShippingEventPublisher shippingEventPublisher;
    
    @Async
    public void handleOrderPlaced(OrderPlacedEvent event) {
        // Sideeffect: Inventory reservation triggern
        try {
            inventoryService.reserveItems(event.getOrderId(), event.getItems());
            // Erfolgreiche Reservierung löst weiteres Event aus
        } catch (InsufficientInventoryException e) {
            // Fehlerfall löst Compensation-Event aus
            orderEventPublisher.publishOrderRejected(event.getOrderId(), 
                                                   "Insufficient inventory");
        }
    }
    
    @EventListener
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        // Sideeffect: Shipping workflow initialisieren
        shippingEventPublisher.publishShippingRequested(
            event.getOrderId(), 
            event.getShippingAddress()
        );
    }
}

@Service
public class InventoryService {
    private final InventoryEventPublisher eventPublisher;
    
    public void reserveItems(String orderId, List<OrderItem> items) {
        // Inventory-Logik
        for (OrderItem item : items) {
            reserveItem(item.getProductId(), item.getQuantity());
        }
        
        // Sideeffect: InventoryReserved Event publizieren
        eventPublisher.publishInventoryReserved(orderId, items);
    }
}
# Python: Event-driven Sideeffects
class PaymentEventHandler:
    def __init__(self, inventory_service: InventoryService,
                 shipping_event_publisher: ShippingEventPublisher):
        self.inventory_service = inventory_service
        self.shipping_event_publisher = shipping_event_publisher
    
    async def handle_order_placed(self, event: OrderPlacedEvent):
        """Sideeffect: Inventory reservation triggern"""
        try:
            await self.inventory_service.reserve_items(
                event.order_id, event.items
            )
        except InsufficientInventoryException as e:
            # Fehlerfall löst Compensation-Event aus
            await self.order_event_publisher.publish_order_rejected(
                event.order_id, "Insufficient inventory"
            )
    
    async def handle_payment_processed(self, event: PaymentProcessedEvent):
        """Sideeffect: Shipping workflow initialisieren"""
        await self.shipping_event_publisher.publish_shipping_requested(
            event.order_id, event.shipping_address
        )

class InventoryService:
    def __init__(self, event_publisher: InventoryEventPublisher):
        self.event_publisher = event_publisher
    
    async def reserve_items(self, order_id: str, items: List[OrderItem]):
        # Inventory-Logik
        for item in items:
            await self.reserve_item(item.product_id, item.quantity)
        
        # Sideeffect: InventoryReserved Event publizieren
        await self.event_publisher.publish_inventory_reserved(order_id, items)

21.6.2 Event-Ketten-Monitoring

Event-driven Sideeffects erfordern spezielles Monitoring, um Event-Ketten zu verfolgen und Fehler in komplexen Workflows zu identifizieren.

// Spring Boot: Correlation-ID für Event-Ketten
@Component
public class CorrelatedEventProducer {
    private final EventProducer eventProducer;
    
    public void publishWithCorrelation(String topic, Object event, String correlationId) {
        // Correlation-ID zu Event hinzufügen
        if (event instanceof CorrelatedEvent) {
            ((CorrelatedEvent) event).setCorrelationId(correlationId);
        }
        
        // MDC für Logging setzen
        MDC.put("correlationId", correlationId);
        
        try {
            eventProducer.send(topic, event);
            log.info("Published correlated event to topic: {}", topic);
        } finally {
            MDC.remove("correlationId");
        }
    }
}
# Python: Correlation-ID für Event-Ketten
import contextvars

correlation_id_var: contextvars.ContextVar[str] = contextvars.ContextVar('correlation_id')

class CorrelatedEventProducer:
    def __init__(self, event_producer: EventProducer):
        self.event_producer = event_producer
    
    async def publish_with_correlation(self, topic: str, event: Any, correlation_id: str):
        # Correlation-ID zu Event hinzufügen
        if hasattr(event, 'correlation_id'):
            event.correlation_id = correlation_id
        
        # Context Variable für Logging setzen
        correlation_id_var.set(correlation_id)
        
        try:
            await self.event_producer.send_event(topic, event)
            logger.info(f"Published correlated event to topic: {topic}")
        finally:
            # Context wird automatisch bei Task-Ende bereinigt
            pass

Event-driven Sideeffects ermöglichen elegante Choreographie in Microservice-Architekturen, erfordern aber sorgfältige Planung der Event-Ketten und robustes Monitoring zur Fehlerdiagnose.


Status-Tracking: ✅ Kapitel “Ereignisse als Nebenprodukt vs. explizites Publishing” erstellt - Verwendet Standards v1.0 - [Datum] - Schwerpunkt auf Outbox Pattern und Event-Choreographie