Die Art und Weise, wie Events in einem System erzeugt werden, hat fundamentale Auswirkungen auf Architektur, Konsistenz und Wartbarkeit. Es gibt zwei grundlegend verschiedene Ansätze: Events können als automatisches Nebenprodukt von Geschäftsoperationen entstehen oder explizit durch Anwendungscode publiziert werden. Jeder Ansatz bringt spezifische Vor- und Nachteile mit sich.
Events als Nebenprodukt entstehen automatisch, wenn sich der Zustand eines Systems ändert. Das System observiert Datenänderungen und leitet daraus Events ab, ohne dass der Geschäftscode explizit Events publiziert. Dieser Ansatz entkoppelt die Fachlogik vollständig von der Event-Erzeugung.
Explizites Publishing macht die Event-Erzeugung zu einem bewussten Teil der Geschäftslogik. Entwickler entscheiden aktiv, wann und welche Events publiziert werden. Dies gibt maximale Kontrolle, führt aber zu engerer Kopplung zwischen Fachlogik und Event-System.
| Aspekt | Nebenprodukt | Explizites Publishing |
|---|---|---|
| Kopplung | Sehr lose | Enger gekoppelt |
| Kontrolle | Systemgesteuert | Entwicklergesteuert |
| Konsistenz | Automatisch | Manuell sicherstellen |
| Flexibilität | Begrenzt | Hochflexibel |
| Komplexität | Infrastruktur-lastig | Code-lastig |
Bei diesem Ansatz werden Events durch Infrastructure-Komponenten erzeugt, die Änderungen am Systemzustand observieren. Change Data Capture (CDC), Database Triggers oder Event Sourcing Frameworks können als automatische Event-Generatoren fungieren.
// Spring Boot: JPA Event Listeners
@Entity
public class Order {
@Id private String orderId;
private OrderStatus status;
private BigDecimal totalAmount;
@PostPersist
public void onOrderCreated() {
// Event wird automatisch durch JPA Lifecycle ausgelöst
ApplicationEventPublisher publisher =
BeanUtils.getBean(ApplicationEventPublisher.class);
publisher.publishEvent(new OrderCreatedEvent(this));
}
@PostUpdate
public void onOrderUpdated() {
ApplicationEventPublisher publisher =
BeanUtils.getBean(ApplicationEventPublisher.class);
publisher.publishEvent(new OrderUpdatedEvent(this));
}
}
// Event Listener für automatische Weiterleitung
@EventListener
public class OrderEventHandler {
private final EventProducer eventProducer;
@Async
public void handleOrderCreated(OrderCreatedEvent event) {
OrderPlacedEvent kafkaEvent = OrderPlacedEvent.from(event.getOrder());
eventProducer.send("order.placed.v1", kafkaEvent);
}
}# Python: SQLAlchemy Event System
from sqlalchemy import event
from sqlalchemy.orm import Session
class Order(Base):
__tablename__ = 'orders'
order_id = Column(String, primary_key=True)
status = Column(Enum(OrderStatus))
total_amount = Column(Numeric)
# Automatische Event-Erzeugung über SQLAlchemy Events
@event.listens_for(Order, 'after_insert')
def order_created_listener(mapper, connection, target):
# Event wird automatisch nach INSERT ausgelöst
event_data = {
'orderId': target.order_id,
'totalAmount': float(target.total_amount),
'timestamp': datetime.utcnow().isoformat()
}
# Async Event Publishing
asyncio.create_task(
publish_order_event('order.placed.v1', event_data)
)
@event.listens_for(Order, 'after_update')
def order_updated_listener(mapper, connection, target):
if target.status == OrderStatus.CANCELLED:
event_data = {
'orderId': target.order_id,
'reason': 'customer_cancellation'
}
asyncio.create_task(
publish_order_event('order.cancelled.v1', event_data)
)Vollständige Entkopplung: Der Geschäftscode muss nichts über Events wissen. Fachlogik bleibt rein fachlich.
Automatische Konsistenz: Jede Zustandsänderung führt automatisch zu einem Event. Vergessene Events sind nicht möglich.
Einfache Migration: Bestehende Systeme können nachträglich mit Event-Erzeugung ausgestattet werden, ohne Code-Änderungen.
Begrenzte Semantik: Events reflektieren nur technische Datenänderungen, nicht die fachliche Bedeutung.
Schwer kontrollierbar: Event-Zeitpunkt und -Inhalt sind durch die Infrastructure bestimmt.
Performance-Impact: Jede Datenänderung löst Event-Verarbeitung aus, auch bei unwichtigen Updates.
Explizites Publishing macht Events zu einem bewussten Teil der Geschäftslogik. Entwickler entscheiden, wann Events publiziert werden und welche Informationen sie enthalten.
// Spring Boot: Explizite Event-Publikation
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OrderEventPublisher eventPublisher;
public Order processOrder(CreateOrderRequest request) {
// 1. Geschäftslogik ausführen
Order order = new Order(request);
validateOrderBusinessRules(order);
Order savedOrder = orderRepository.save(order);
// 2. Explizit Event publizieren
eventPublisher.publishOrderPlaced(savedOrder);
return savedOrder;
}
public void cancelOrder(String orderId, String reason) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
// Geschäftsregeln prüfen
if (!order.isCancellable()) {
throw new OrderNotCancellableException(orderId);
}
// Zustand ändern
order.cancel(reason);
orderRepository.save(order);
// Fachlich relevantes Event publizieren
eventPublisher.publishOrderCancelled(order, reason);
}
}# Python: Explizite Event-Publikation
class OrderService:
def __init__(self, repository: OrderRepository,
event_publisher: OrderEventPublisher):
self.repository = repository
self.event_publisher = event_publisher
async def process_order(self, request: CreateOrderRequest) -> Order:
# 1. Geschäftslogik ausführen
order = Order.from_request(request)
await self.validate_order_business_rules(order)
saved_order = await self.repository.save(order)
# 2. Explizit Event publizieren
await self.event_publisher.publish_order_placed(saved_order)
return saved_order
async def cancel_order(self, order_id: str, reason: str):
order = await self.repository.find_by_id(order_id)
if not order:
raise OrderNotFoundException(order_id)
# Geschäftsregeln prüfen
if not order.is_cancellable():
raise OrderNotCancellableException(order_id)
# Zustand ändern und fachlich relevantes Event publizieren
order.cancel(reason)
await self.repository.save(order)
await self.event_publisher.publish_order_cancelled(order, reason)Fachliche Semantik: Events können exakt die Geschäftsereignisse repräsentieren, nicht nur Datenänderungen.
Vollständige Kontrolle: Entwickler bestimmen Zeitpunkt, Inhalt und Kondition der Event-Publikation.
Business-orientiert: Event-Struktur folgt fachlichen Anforderungen, nicht technischen Constraints.
Dual Write Problem: Datenbank-Updates und Event-Publikation sind separate Operationen, die inkonsistent werden können.
Entwickler-Disziplin: Events können vergessen werden oder inkonsistent implementiert sein.
Transaktionale Komplexität: Event-Publikation muss mit Geschäftstransaktionen koordiniert werden.
Das Outbox Pattern löst das Dual Write Problem, indem es Event-Publikation in dieselbe Transaktion wie die Geschäftsoperation einbettet. Events werden zunächst in einer lokalen “Outbox”-Tabelle gespeichert und später asynchron publiziert.
// Spring Boot: Outbox Entity
@Entity
@Table(name = "event_outbox")
public class OutboxEvent {
@Id
private String eventId;
@Column(name = "aggregate_id")
private String aggregateId;
@Column(name = "event_type")
private String eventType;
@Column(name = "event_data", columnDefinition = "TEXT")
private String eventData;
@Column(name = "created_at")
private Instant createdAt;
@Column(name = "published_at")
private Instant publishedAt;
@Column(name = "status")
@Enumerated(EnumType.STRING)
private OutboxEventStatus status;
// Konstruktoren, Getters, Setters
}
// Outbox Repository
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
List<OutboxEvent> findByStatusOrderByCreatedAt(OutboxEventStatus status);
@Modifying
@Query("UPDATE OutboxEvent o SET o.status = :status, o.publishedAt = :publishedAt WHERE o.eventId = :eventId")
void markAsPublished(@Param("eventId") String eventId,
@Param("status") OutboxEventStatus status,
@Param("publishedAt") Instant publishedAt);
}# Python: Outbox Model mit SQLAlchemy
from sqlalchemy import Column, String, Text, DateTime, Enum
from enum import Enum as PyEnum
class OutboxEventStatus(PyEnum):
PENDING = "PENDING"
PUBLISHED = "PUBLISHED"
FAILED = "FAILED"
class OutboxEvent(Base):
__tablename__ = 'event_outbox'
event_id = Column(String(36), primary_key=True)
aggregate_id = Column(String(36), nullable=False)
event_type = Column(String(100), nullable=False)
event_data = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
published_at = Column(DateTime)
status = Column(Enum(OutboxEventStatus), default=OutboxEventStatus.PENDING)
# Outbox Repository
class OutboxEventRepository:
def __init__(self, session: Session):
self.session = session
async def save(self, outbox_event: OutboxEvent):
self.session.add(outbox_event)
await self.session.commit()
async def find_pending_events(self) -> List[OutboxEvent]:
return await self.session.query(OutboxEvent).filter(
OutboxEvent.status == OutboxEventStatus.PENDING
).order_by(OutboxEvent.created_at).all()
async def mark_as_published(self, event_id: str):
await self.session.query(OutboxEvent).filter(
OutboxEvent.event_id == event_id
).update({
'status': OutboxEventStatus.PUBLISHED,
'published_at': datetime.utcnow()
})
await self.session.commit()Der Transactional Outbox Ansatz erweitert das grundlegende Outbox Pattern um robuste Transaktionsbehandlung und zuverlässige Event-Publikation.
// Spring Boot: Service mit Transactional Outbox
@Service
@Transactional
public class TransactionalOrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxRepository;
public Order createOrder(CreateOrderRequest request) {
// 1. Geschäftsoperation
Order order = new Order(request);
Order savedOrder = orderRepository.save(order);
// 2. Event in Outbox speichern (gleiche Transaktion!)
OutboxEvent outboxEvent = createOrderPlacedOutboxEvent(savedOrder);
outboxRepository.save(outboxEvent);
return savedOrder;
}
private OutboxEvent createOrderPlacedOutboxEvent(Order order) {
OrderPlacedEvent event = OrderPlacedEvent.from(order);
return OutboxEvent.builder()
.eventId(UUID.randomUUID().toString())
.aggregateId(order.getOrderId())
.eventType("OrderPlaced")
.eventData(JsonUtils.toJson(event))
.status(OutboxEventStatus.PENDING)
.createdAt(Instant.now())
.build();
}
}
// Outbox Event Publisher (separater Prozess)
@Component
public class OutboxEventPublisher {
private final OutboxEventRepository outboxRepository;
private final EventProducer eventProducer;
@Scheduled(fixedDelay = 5000) // Alle 5 Sekunden
public void publishPendingEvents() {
List<OutboxEvent> pendingEvents = outboxRepository
.findByStatusOrderByCreatedAt(OutboxEventStatus.PENDING);
for (OutboxEvent outboxEvent : pendingEvents) {
try {
publishEvent(outboxEvent);
outboxRepository.markAsPublished(
outboxEvent.getEventId(),
OutboxEventStatus.PUBLISHED,
Instant.now()
);
} catch (Exception e) {
log.error("Failed to publish outbox event: {}",
outboxEvent.getEventId(), e);
// Retry-Logic oder Dead Letter handling
}
}
}
private void publishEvent(OutboxEvent outboxEvent) {
String topic = determineTopicFromEventType(outboxEvent.getEventType());
Object eventData = JsonUtils.fromJson(outboxEvent.getEventData());
eventProducer.send(topic, eventData);
}
}# Python: Service mit Transactional Outbox
class TransactionalOrderService:
def __init__(self, order_repository: OrderRepository,
outbox_repository: OutboxEventRepository):
self.order_repository = order_repository
self.outbox_repository = outbox_repository
@transactional
async def create_order(self, request: CreateOrderRequest) -> Order:
# 1. Geschäftsoperation
order = Order.from_request(request)
saved_order = await self.order_repository.save(order)
# 2. Event in Outbox speichern (gleiche Transaktion!)
outbox_event = self.create_order_placed_outbox_event(saved_order)
await self.outbox_repository.save(outbox_event)
return saved_order
def create_order_placed_outbox_event(self, order: Order) -> OutboxEvent:
event = OrderPlacedEvent.from_order(order)
return OutboxEvent(
event_id=str(uuid4()),
aggregate_id=order.order_id,
event_type="OrderPlaced",
event_data=json.dumps(event.to_dict()),
status=OutboxEventStatus.PENDING
)
# Outbox Event Publisher (Background Task)
class OutboxEventPublisher:
def __init__(self, outbox_repository: OutboxEventRepository,
event_producer: EventProducer):
self.outbox_repository = outbox_repository
self.event_producer = event_producer
async def publish_pending_events(self):
"""Wird regelmäßig von einem Scheduler aufgerufen"""
pending_events = await self.outbox_repository.find_pending_events()
for outbox_event in pending_events:
try:
await self.publish_event(outbox_event)
await self.outbox_repository.mark_as_published(outbox_event.event_id)
except Exception as e:
logger.error(f"Failed to publish outbox event: {outbox_event.event_id}", exc_info=e)
# Retry-Logic implementieren
async def publish_event(self, outbox_event: OutboxEvent):
topic = self.determine_topic_from_event_type(outbox_event.event_type)
event_data = json.loads(outbox_event.event_data)
await self.event_producer.send_event(topic, event_data)Transaktionale Sicherheit: Events und Geschäftsdaten werden atomisch gespeichert.
At-least-once Garantie: Events gehen nie verloren, auch bei System-Ausfällen.
Skalierbare Publikation: Event-Publikation kann unabhängig von Geschäftsoperationen skaliert werden.
Retry-fähig: Fehlgeschlagene Event-Publikationen können wiederholt werden.
Event-driven Sideeffects entstehen, wenn Events selbst weitere Events auslösen. Dies führt zu Event-Ketten oder Event-Choreographie, wo ein initiales Event eine Kaskade von Folge-Events triggert.
// Spring Boot: Event-driven Sideeffects
@EventListener
public class PaymentEventHandler {
private final InventoryService inventoryService;
private final ShippingEventPublisher shippingEventPublisher;
@Async
public void handleOrderPlaced(OrderPlacedEvent event) {
// Sideeffect: Inventory reservation triggern
try {
inventoryService.reserveItems(event.getOrderId(), event.getItems());
// Erfolgreiche Reservierung löst weiteres Event aus
} catch (InsufficientInventoryException e) {
// Fehlerfall löst Compensation-Event aus
orderEventPublisher.publishOrderRejected(event.getOrderId(),
"Insufficient inventory");
}
}
@EventListener
public void handlePaymentProcessed(PaymentProcessedEvent event) {
// Sideeffect: Shipping workflow initialisieren
shippingEventPublisher.publishShippingRequested(
event.getOrderId(),
event.getShippingAddress()
);
}
}
@Service
public class InventoryService {
private final InventoryEventPublisher eventPublisher;
public void reserveItems(String orderId, List<OrderItem> items) {
// Inventory-Logik
for (OrderItem item : items) {
reserveItem(item.getProductId(), item.getQuantity());
}
// Sideeffect: InventoryReserved Event publizieren
eventPublisher.publishInventoryReserved(orderId, items);
}
}# Python: Event-driven Sideeffects
class PaymentEventHandler:
def __init__(self, inventory_service: InventoryService,
shipping_event_publisher: ShippingEventPublisher):
self.inventory_service = inventory_service
self.shipping_event_publisher = shipping_event_publisher
async def handle_order_placed(self, event: OrderPlacedEvent):
"""Sideeffect: Inventory reservation triggern"""
try:
await self.inventory_service.reserve_items(
event.order_id, event.items
)
except InsufficientInventoryException as e:
# Fehlerfall löst Compensation-Event aus
await self.order_event_publisher.publish_order_rejected(
event.order_id, "Insufficient inventory"
)
async def handle_payment_processed(self, event: PaymentProcessedEvent):
"""Sideeffect: Shipping workflow initialisieren"""
await self.shipping_event_publisher.publish_shipping_requested(
event.order_id, event.shipping_address
)
class InventoryService:
def __init__(self, event_publisher: InventoryEventPublisher):
self.event_publisher = event_publisher
async def reserve_items(self, order_id: str, items: List[OrderItem]):
# Inventory-Logik
for item in items:
await self.reserve_item(item.product_id, item.quantity)
# Sideeffect: InventoryReserved Event publizieren
await self.event_publisher.publish_inventory_reserved(order_id, items)Event-driven Sideeffects erfordern spezielles Monitoring, um Event-Ketten zu verfolgen und Fehler in komplexen Workflows zu identifizieren.
// Spring Boot: Correlation-ID für Event-Ketten
@Component
public class CorrelatedEventProducer {
private final EventProducer eventProducer;
public void publishWithCorrelation(String topic, Object event, String correlationId) {
// Correlation-ID zu Event hinzufügen
if (event instanceof CorrelatedEvent) {
((CorrelatedEvent) event).setCorrelationId(correlationId);
}
// MDC für Logging setzen
MDC.put("correlationId", correlationId);
try {
eventProducer.send(topic, event);
log.info("Published correlated event to topic: {}", topic);
} finally {
MDC.remove("correlationId");
}
}
}# Python: Correlation-ID für Event-Ketten
import contextvars
correlation_id_var: contextvars.ContextVar[str] = contextvars.ContextVar('correlation_id')
class CorrelatedEventProducer:
def __init__(self, event_producer: EventProducer):
self.event_producer = event_producer
async def publish_with_correlation(self, topic: str, event: Any, correlation_id: str):
# Correlation-ID zu Event hinzufügen
if hasattr(event, 'correlation_id'):
event.correlation_id = correlation_id
# Context Variable für Logging setzen
correlation_id_var.set(correlation_id)
try:
await self.event_producer.send_event(topic, event)
logger.info(f"Published correlated event to topic: {topic}")
finally:
# Context wird automatisch bei Task-Ende bereinigt
passEvent-driven Sideeffects ermöglichen elegante Choreographie in Microservice-Architekturen, erfordern aber sorgfältige Planung der Event-Ketten und robustes Monitoring zur Fehlerdiagnose.
Status-Tracking: ✅ Kapitel “Ereignisse als Nebenprodukt vs. explizites Publishing” erstellt - Verwendet Standards v1.0 - [Datum] - Schwerpunkt auf Outbox Pattern und Event-Choreographie