Denken Sie einmal zurück: Wann haben Sie das letzte Mal einen komplexen Geschäftsprozess in Code abgebildet? War es ein linearer Ablauf oder gab es Verzweigungen, Schleifen, Wartezeiten und Ausnahmebehandlungen?
Die meisten realen Geschäftsprozesse sind alles andere als linear. Nehmen Sie unser E-Commerce-Beispiel: Was passiert, wenn ein Kunde seine Bestellung ändern möchte, nachdem die Zahlung bereits verarbeitet wurde? Oder wenn ein Artikel plötzlich nicht mehr verfügbar ist, obwohl er bereits reserviert wurde?
Zentrale Frage: Wie können wir solche komplexen, sich verändernden Workflows so modellieren, dass sie sowohl für Menschen verständlich als auch für Maschinen ausführbar sind?
Traditionelle Prozessmodellierung denkt in Aktivitäten und Übergängen. Event-Driven Architecture dreht diese Perspektive um: Wir modellieren, was passiert ist (Events), nicht was passieren soll (Aktivitäten).
Übung zum Verständnis: Betrachten Sie diese beiden Beschreibungen desselben Prozesses:
Traditionell (aktivitätsorientiert): 1. Prüfe Verfügbarkeit 2. Wenn verfügbar: Reserviere Artikel 3. Verarbeite Zahlung 4. Wenn Zahlung erfolgreich: Versende Artikel
Event-orientiert: - OrderPlaced → InventoryChecked → (InventoryAvailable | InventoryUnavailable) - InventoryAvailable → PaymentProcessed → (PaymentSucceeded | PaymentFailed) - PaymentSucceeded → ShippingInitiated
Frage: Welcher Ansatz ist flexibler, wenn neue Anforderungen hinzukommen? Warum?
// Process Model als Event-Handler-Sammlung
@Component
public class OrderProcessModel {
// Prozessschritt 1: Order → Inventory Check
@EventListener
public void onOrderPlaced(OrderPlacedEvent event) {
// Was bedeutet dieses Event für unseren Prozess?
logProcessStep("Order received", event.getOrderId());
// Welcher nächste Schritt ist logisch?
inventoryService.checkAvailability(event.getItems())
.thenAccept(availability -> {
if (availability.isAllAvailable()) {
eventPublisher.publish(new InventoryAvailableEvent(
event.getOrderId(),
event.getItems()
));
} else {
eventPublisher.publish(new InventoryUnavailableEvent(
event.getOrderId(),
availability.getUnavailableItems()
));
}
});
}
// Prozessschritt 2a: Inventory Available → Payment
@EventListener
public void onInventoryAvailable(InventoryAvailableEvent event) {
logProcessStep("Inventory confirmed", event.getOrderId());
// Parallel oder sequenziell? Was ist sinnvoller?
CompletableFuture.allOf(
reserveInventory(event),
processPayment(event)
).thenRun(() -> {
eventPublisher.publish(new OrderReadyForShippingEvent(
event.getOrderId()
));
});
}
// Prozessschritt 2b: Inventory Unavailable → Compensation
@EventListener
public void onInventoryUnavailable(InventoryUnavailableEvent event) {
logProcessStep("Inventory insufficient", event.getOrderId());
// Wie sollten wir auf diesen Fehlerzustand reagieren?
alternativeService.findAlternatives(event.getUnavailableItems())
.thenAccept(alternatives -> {
if (alternatives.isEmpty()) {
eventPublisher.publish(new OrderCancelledEvent(
event.getOrderId(),
"Items not available"
));
} else {
eventPublisher.publish(new AlternativesFoundEvent(
event.getOrderId(),
alternatives
));
}
});
}
}# process_model.py
class OrderProcessModel:
def __init__(self, inventory_service, payment_service, event_publisher):
self.inventory_service = inventory_service
self.payment_service = payment_service
self.event_publisher = event_publisher
async def handle_order_placed(self, event: OrderPlacedEvent):
"""
Frage: Warum beginnen wir den Prozess mit einer Verfügbarkeits-Prüfung?
Antwort: Weil wir frühzeitig Probleme erkennen und dem Kunden
ehrlich mitteilen wollen, was möglich ist.
"""
self._log_process_step("Order received", event.order_id)
availability = await self.inventory_service.check_availability(event.items)
if availability.all_available:
await self.event_publisher.publish(InventoryAvailableEvent(
order_id=event.order_id,
items=event.items
))
else:
await self.event_publisher.publish(InventoryUnavailableEvent(
order_id=event.order_id,
unavailable_items=availability.unavailable_items
))
async def handle_inventory_available(self, event: InventoryAvailableEvent):
"""
Designentscheidung: Parallel oder sequenziell?
- Parallel: Schneller, aber komplexere Fehlerbehandlung
- Sequenziell: Einfacher, aber langsamer
Für unser E-Commerce-Beispiel: Parallel, da Geschwindigkeit kritisch ist
"""
self._log_process_step("Inventory confirmed", event.order_id)
# Parallele Ausführung mit asyncio
reservation_task = self._reserve_inventory(event)
payment_task = self._process_payment(event)
try:
reservation_result, payment_result = await asyncio.gather(
reservation_task,
payment_task
)
if reservation_result.successful and payment_result.successful:
await self.event_publisher.publish(OrderReadyForShippingEvent(
order_id=event.order_id
))
else:
# Was passiert bei partiellen Fehlern?
await self._handle_partial_failure(event, reservation_result, payment_result)
except Exception as e:
await self._handle_process_exception(event, e)Reflexionsfrage: In unserem bisherigen Code - wo ist eigentlich der “Zustand” einer Bestellung gespeichert? Ist er explizit oder müssen wir ihn aus den Events ableiten?
Die meisten event-getriebenen Systeme haben implizite Zustände. State Machines machen diese explizit und bieten dadurch bessere Kontrolle und Verständlichkeit.
// Explizite Zustandsdefinition
public enum OrderState {
PLACED,
INVENTORY_CHECKING,
INVENTORY_CONFIRMED,
INVENTORY_INSUFFICIENT,
PAYMENT_PROCESSING,
PAYMENT_CONFIRMED,
PAYMENT_FAILED,
READY_FOR_SHIPPING,
SHIPPED,
DELIVERED,
CANCELLED
}
// State Machine Implementation
@Component
public class OrderStateMachine {
private final Map<String, OrderState> orderStates = new ConcurrentHashMap<>();
@EventListener
public void onOrderPlaced(OrderPlacedEvent event) {
// State explizit setzen
OrderState newState = transition(
null, // Startzustand
OrderState.PLACED,
event
);
orderStates.put(event.getOrderId(), newState);
// State-abhängige Logik
triggerInventoryCheck(event);
}
@EventListener
public void onInventoryAvailable(InventoryAvailableEvent event) {
OrderState currentState = orderStates.get(event.getOrderId());
// State Transition validieren
if (currentState == OrderState.PLACED) {
OrderState newState = transition(
currentState,
OrderState.INVENTORY_CONFIRMED,
event
);
orderStates.put(event.getOrderId(), newState);
triggerPaymentProcessing(event);
} else {
// Ungültiger State-Übergang
log.warn("Invalid state transition: {} -> INVENTORY_CONFIRMED for order {}",
currentState, event.getOrderId());
}
}
private OrderState transition(OrderState from, OrderState to, DomainEvent event) {
// Frage: Welche Validierungen sollten wir hier durchführen?
// 1. Ist der Übergang erlaubt?
if (!isValidTransition(from, to)) {
throw new InvalidStateTransitionException(from, to);
}
// 2. State-Logging für Debugging
log.info("Order {} transitioned from {} to {} due to {}",
extractOrderId(event), from, to, event.getClass().getSimpleName());
// 3. State-Change Events publizieren (für Monitoring)
eventPublisher.publish(new OrderStateChangedEvent(
extractOrderId(event), from, to, event
));
return to;
}
private boolean isValidTransition(OrderState from, OrderState to) {
// Welche Übergänge sind in unserem Business-Kontext erlaubt?
Map<OrderState, Set<OrderState>> allowedTransitions = Map.of(
null, Set.of(OrderState.PLACED),
OrderState.PLACED, Set.of(OrderState.INVENTORY_CONFIRMED, OrderState.INVENTORY_INSUFFICIENT),
OrderState.INVENTORY_CONFIRMED, Set.of(OrderState.PAYMENT_CONFIRMED, OrderState.PAYMENT_FAILED),
OrderState.PAYMENT_CONFIRMED, Set.of(OrderState.READY_FOR_SHIPPING),
OrderState.READY_FOR_SHIPPING, Set.of(OrderState.SHIPPED),
OrderState.SHIPPED, Set.of(OrderState.DELIVERED)
// Cancelled kann von fast jedem State erreicht werden
);
return allowedTransitions.getOrDefault(from, Set.of()).contains(to) ||
to == OrderState.CANCELLED; // Cancellation ist fast immer möglich
}
}# state_machine.py
from enum import Enum, auto
from typing import Dict, Set, Optional
import logging
class OrderState(Enum):
PLACED = auto()
INVENTORY_CHECKING = auto()
INVENTORY_CONFIRMED = auto()
INVENTORY_INSUFFICIENT = auto()
PAYMENT_PROCESSING = auto()
PAYMENT_CONFIRMED = auto()
PAYMENT_FAILED = auto()
READY_FOR_SHIPPING = auto()
SHIPPED = auto()
DELIVERED = auto()
CANCELLED = auto()
class OrderStateMachine:
def __init__(self, event_publisher):
self.event_publisher = event_publisher
self.order_states: Dict[str, OrderState] = {}
# Explizite Definition erlaubter Übergänge
self.allowed_transitions: Dict[Optional[OrderState], Set[OrderState]] = {
None: {OrderState.PLACED},
OrderState.PLACED: {OrderState.INVENTORY_CONFIRMED, OrderState.INVENTORY_INSUFFICIENT},
OrderState.INVENTORY_CONFIRMED: {OrderState.PAYMENT_CONFIRMED, OrderState.PAYMENT_FAILED},
OrderState.PAYMENT_CONFIRMED: {OrderState.READY_FOR_SHIPPING},
OrderState.READY_FOR_SHIPPING: {OrderState.SHIPPED},
OrderState.SHIPPED: {OrderState.DELIVERED}
}
async def handle_order_placed(self, event: OrderPlacedEvent):
"""
Lernfrage: Warum ist es wichtig, State-Übergänge explizit zu validieren?
Antwort: Weil es uns hilft, inkonsistente Zustände zu vermeiden und
Bugs frühzeitig zu erkennen. Ein System ohne explizite State-Validierung
kann in undefinierte Zustände geraten.
"""
new_state = await self._transition(
order_id=event.order_id,
from_state=None,
to_state=OrderState.PLACED,
event=event
)
self.order_states[event.order_id] = new_state
async def handle_inventory_available(self, event: InventoryAvailableEvent):
current_state = self.order_states.get(event.order_id)
if current_state == OrderState.PLACED:
new_state = await self._transition(
order_id=event.order_id,
from_state=current_state,
to_state=OrderState.INVENTORY_CONFIRMED,
event=event
)
self.order_states[event.order_id] = new_state
else:
logging.warning(
f"Invalid state transition: {current_state} -> INVENTORY_CONFIRMED "
f"for order {event.order_id}"
)
async def _transition(self, order_id: str, from_state: Optional[OrderState],
to_state: OrderState, event) -> OrderState:
# State-Transition validieren
if not self._is_valid_transition(from_state, to_state):
raise InvalidStateTransitionException(from_state, to_state)
# Cancellation ist ein Sonderfall - von fast jedem State möglich
if to_state == OrderState.CANCELLED:
pass # Immer erlaubt
# Logging für Debugging und Monitoring
logging.info(
f"Order {order_id} transitioned from {from_state} to {to_state} "
f"due to {event.__class__.__name__}"
)
# State-Change Event für externe Systeme
await self.event_publisher.publish(OrderStateChangedEvent(
order_id=order_id,
from_state=from_state,
to_state=to_state,
trigger_event=event
))
return to_state
def _is_valid_transition(self, from_state: Optional[OrderState],
to_state: OrderState) -> bool:
allowed = self.allowed_transitions.get(from_state, set())
return to_state in allowed or to_state == OrderState.CANCELLEDHerausforderung: Bisher haben wir technische Workflows modelliert. Aber wie übersetzen wir fachliche Anforderungen in event-getriebene Prozesse?
Szenario: Ihre Fachabteilung kommt zu Ihnen und sagt: “Wir brauchen einen Genehmigungsprozess für Bestellungen über 1000 Euro. Der Manager muss binnen 24 Stunden entscheiden, sonst wird die Bestellung automatisch abgelehnt.”
Frage: Wie würden Sie das als Event-driven Process modellieren?
// Business Process: Manager Approval Workflow
@Component
public class ManagerApprovalProcess {
@EventListener
public void onOrderPlaced(OrderPlacedEvent event) {
// Fachliche Regel: Genehmigung ab 1000 Euro
if (event.getTotalAmount().compareTo(BigDecimal.valueOf(1000)) >= 0) {
initiateApprovalProcess(event);
} else {
// Automatische Genehmigung für kleinere Beträge
eventPublisher.publish(new OrderApprovedEvent(
event.getOrderId(),
"AUTO_APPROVED",
"Amount below approval threshold"
));
}
}
private void initiateApprovalProcess(OrderPlacedEvent event) {
// 1. Approval Request erstellen
ApprovalRequest request = new ApprovalRequest(
event.getOrderId(),
event.getTotalAmount(),
event.getCustomerId(),
LocalDateTime.now().plusHours(24) // 24h Deadline
);
approvalRepository.save(request);
// 2. Manager benachrichtigen
eventPublisher.publish(new ApprovalRequestedEvent(
event.getOrderId(),
request.getId(),
event.getTotalAmount(),
request.getDeadline()
));
// 3. Timeout scheduler
scheduleApprovalTimeout(request);
}
@EventListener
public void onApprovalDecision(ApprovalDecisionEvent event) {
ApprovalRequest request = approvalRepository.findByOrderId(event.getOrderId());
if (request.isExpired()) {
// Zu spät entschieden
eventPublisher.publish(new OrderRejectedEvent(
event.getOrderId(),
"APPROVAL_TIMEOUT",
"Decision received after deadline"
));
return;
}
if (event.isApproved()) {
eventPublisher.publish(new OrderApprovedEvent(
event.getOrderId(),
event.getManagerId(),
event.getReason()
));
} else {
eventPublisher.publish(new OrderRejectedEvent(
event.getOrderId(),
event.getManagerId(),
event.getReason()
));
}
// Request als erledigt markieren
request.setDecisionMade(true);
approvalRepository.save(request);
}
@Scheduled(fixedDelay = 300000) // Alle 5 Minuten
public void processExpiredApprovals() {
List<ApprovalRequest> expiredRequests = approvalRepository.findExpiredRequests();
for (ApprovalRequest request : expiredRequests) {
eventPublisher.publish(new OrderRejectedEvent(
request.getOrderId(),
"SYSTEM",
"Approval timeout - no decision within 24 hours"
));
request.setDecisionMade(true);
request.setAutoRejected(true);
approvalRepository.save(request);
}
}
}Reflexion: Viele Unternehmen haben bereits etablierte BPM-Tools (Camunda, Activiti, etc.). Wie können wir event-getriebene Architektur mit diesen Tools kombinieren?
# bpm_integration.py
class BPMIntegrationService:
"""
Integration mit externen BPM-Engines über Events
Designprinzip: BPM-Engine als ein Service unter vielen,
nicht als zentrale Steuerungsinstanz
"""
def __init__(self, bpm_engine_client, event_publisher):
self.bpm_engine = bpm_engine_client
self.event_publisher = event_publisher
async def handle_complex_approval_request(self, event: ComplexApprovalRequestedEvent):
"""
Frage: Wann sollten wir externe BPM-Tools verwenden?
Antworten:
- Wenn Fachabteilungen Prozesse selbst modellieren wollen
- Bei sehr komplexen Genehmigungsworkflows
- Wenn regulatorische Compliance-Dokumentation erforderlich ist
- Bei häufig ändernden Geschäftsregeln
"""
# BPM Process Instance starten
process_instance = await self.bpm_engine.start_process(
process_key="complex_order_approval",
business_key=event.order_id,
variables={
"orderId": event.order_id,
"amount": float(event.total_amount),
"customerId": event.customer_id,
"riskScore": event.risk_score
}
)
# Process Instance mit unserem Event-System verknüpfen
await self.event_publisher.publish(BPMProcessStartedEvent(
order_id=event.order_id,
process_instance_id=process_instance.id,
process_key="complex_order_approval"
))
async def handle_bpm_task_completed(self, event: BPMTaskCompletedEvent):
"""
BPM-Engine informiert uns über Task-Completion
"""
# Business Logic basierend auf Task-Typ
if event.task_type == "manager_approval":
if event.task_result.get("approved"):
await self.event_publisher.publish(ManagerApprovalCompletedEvent(
order_id=event.business_key,
approved=True,
manager_id=event.task_result.get("managerId"),
comments=event.task_result.get("comments")
))
else:
await self.event_publisher.publish(ManagerApprovalCompletedEvent(
order_id=event.business_key,
approved=False,
manager_id=event.task_result.get("managerId"),
reason=event.task_result.get("rejectionReason")
))
elif event.task_type == "risk_assessment":
await self.event_publisher.publish(RiskAssessmentCompletedEvent(
order_id=event.business_key,
risk_level=event.task_result.get("riskLevel"),
requires_additional_approval=event.task_result.get("requiresAdditionalApproval")
))
async def handle_bpm_process_completed(self, event: BPMProcessCompletedEvent):
"""
Gesamter BPM-Prozess abgeschlossen
"""
process_result = event.process_result
if process_result.get("finalApproval"):
await self.event_publisher.publish(OrderApprovedEvent(
order_id=event.business_key,
approval_source="BPM_WORKFLOW",
final_approver=process_result.get("finalApproverId")
))
else:
await self.event_publisher.publish(OrderRejectedEvent(
order_id=event.business_key,
rejection_source="BPM_WORKFLOW",
rejection_reason=process_result.get("rejectionReason")
))// Process Analytics und Monitoring
@Component
public class ProcessAnalytics {
@EventListener
public void onProcessEvent(ProcessEvent event) {
// Alle Process-Events für Analytics sammeln
ProcessMetric metric = new ProcessMetric(
event.getProcessType(),
event.getProcessStep(),
event.getTimestamp(),
event.getDuration(),
event.getOutcome()
);
metricsRepository.save(metric);
// Real-time Process Monitoring
updateProcessDashboard(event);
}
@Scheduled(cron = "0 0 8 * * MON") // Jeden Montag um 8 Uhr
public void generateWeeklyProcessReport() {
ProcessReport report = ProcessReport.builder()
.timeRange(getLastWeek())
.avgApprovalTime(calculateAverageApprovalTime())
.approvalRate(calculateApprovalRate())
.bottlenecks(identifyBottlenecks())
.recommendations(generateRecommendations())
.build();
// Report an Stakeholder versenden
eventPublisher.publish(new ProcessReportGeneratedEvent(report));
}
private List<ProcessBottleneck> identifyBottlenecks() {
// Welche Process Steps dauern überdurchschnittlich lange?
// Wo häufen sich Fehler?
// Welche Manager sind überlastet?
return processMetricsAnalyzer.findBottlenecks();
}
}Abschließende Reflexion:
Event-getriebene Workflow-Modellierung erfordert ein Umdenken: Anstatt Prozesse als Abfolge von Aktivitäten zu sehen, modellieren wir sie als Reaktionen auf Ereignisse. Dies bietet:
Ihre Aufgabe: Denken Sie an einen komplexen Geschäftsprozess in Ihrem Unternehmen. Wie würden Sie ihn in Events und State Transitions zerlegen? Welche fachlichen Regeln müssten als Event-Handler implementiert werden?