52 Workflow-Modellierung mit Events

52.1 Die Herausforderung der Workflow-Modellierung verstehen

Denken Sie einmal zurück: Wann haben Sie das letzte Mal einen komplexen Geschäftsprozess in Code abgebildet? War es ein linearer Ablauf oder gab es Verzweigungen, Schleifen, Wartezeiten und Ausnahmebehandlungen?

Die meisten realen Geschäftsprozesse sind alles andere als linear. Nehmen Sie unser E-Commerce-Beispiel: Was passiert, wenn ein Kunde seine Bestellung ändern möchte, nachdem die Zahlung bereits verarbeitet wurde? Oder wenn ein Artikel plötzlich nicht mehr verfügbar ist, obwohl er bereits reserviert wurde?

Zentrale Frage: Wie können wir solche komplexen, sich verändernden Workflows so modellieren, dass sie sowohl für Menschen verständlich als auch für Maschinen ausführbar sind?

52.2 Process Modeling

52.2.1 Ereignisgetriebene Prozessmodellierung

Traditionelle Prozessmodellierung denkt in Aktivitäten und Übergängen. Event-Driven Architecture dreht diese Perspektive um: Wir modellieren, was passiert ist (Events), nicht was passieren soll (Aktivitäten).

Übung zum Verständnis: Betrachten Sie diese beiden Beschreibungen desselben Prozesses:

Traditionell (aktivitätsorientiert): 1. Prüfe Verfügbarkeit 2. Wenn verfügbar: Reserviere Artikel 3. Verarbeite Zahlung 4. Wenn Zahlung erfolgreich: Versende Artikel

Event-orientiert: - OrderPlaced → InventoryChecked → (InventoryAvailable | InventoryUnavailable) - InventoryAvailable → PaymentProcessed → (PaymentSucceeded | PaymentFailed) - PaymentSucceeded → ShippingInitiated

Frage: Welcher Ansatz ist flexibler, wenn neue Anforderungen hinzukommen? Warum?

52.2.2 Implementierung event-getriebener Prozessmodelle

// Process Model als Event-Handler-Sammlung
@Component
public class OrderProcessModel {
    
    // Prozessschritt 1: Order → Inventory Check
    @EventListener
    public void onOrderPlaced(OrderPlacedEvent event) {
        // Was bedeutet dieses Event für unseren Prozess?
        logProcessStep("Order received", event.getOrderId());
        
        // Welcher nächste Schritt ist logisch?
        inventoryService.checkAvailability(event.getItems())
            .thenAccept(availability -> {
                if (availability.isAllAvailable()) {
                    eventPublisher.publish(new InventoryAvailableEvent(
                        event.getOrderId(), 
                        event.getItems()
                    ));
                } else {
                    eventPublisher.publish(new InventoryUnavailableEvent(
                        event.getOrderId(), 
                        availability.getUnavailableItems()
                    ));
                }
            });
    }
    
    // Prozessschritt 2a: Inventory Available → Payment
    @EventListener
    public void onInventoryAvailable(InventoryAvailableEvent event) {
        logProcessStep("Inventory confirmed", event.getOrderId());
        
        // Parallel oder sequenziell? Was ist sinnvoller?
        CompletableFuture.allOf(
            reserveInventory(event),
            processPayment(event)
        ).thenRun(() -> {
            eventPublisher.publish(new OrderReadyForShippingEvent(
                event.getOrderId()
            ));
        });
    }
    
    // Prozessschritt 2b: Inventory Unavailable → Compensation
    @EventListener
    public void onInventoryUnavailable(InventoryUnavailableEvent event) {
        logProcessStep("Inventory insufficient", event.getOrderId());
        
        // Wie sollten wir auf diesen Fehlerzustand reagieren?
        alternativeService.findAlternatives(event.getUnavailableItems())
            .thenAccept(alternatives -> {
                if (alternatives.isEmpty()) {
                    eventPublisher.publish(new OrderCancelledEvent(
                        event.getOrderId(), 
                        "Items not available"
                    ));
                } else {
                    eventPublisher.publish(new AlternativesFoundEvent(
                        event.getOrderId(), 
                        alternatives
                    ));
                }
            });
    }
}

52.2.3 Python Process Modeling mit Async/Await

# process_model.py
class OrderProcessModel:
    def __init__(self, inventory_service, payment_service, event_publisher):
        self.inventory_service = inventory_service
        self.payment_service = payment_service
        self.event_publisher = event_publisher
    
    async def handle_order_placed(self, event: OrderPlacedEvent):
        """
        Frage: Warum beginnen wir den Prozess mit einer Verfügbarkeits-Prüfung?
        Antwort: Weil wir frühzeitig Probleme erkennen und dem Kunden 
        ehrlich mitteilen wollen, was möglich ist.
        """
        self._log_process_step("Order received", event.order_id)
        
        availability = await self.inventory_service.check_availability(event.items)
        
        if availability.all_available:
            await self.event_publisher.publish(InventoryAvailableEvent(
                order_id=event.order_id,
                items=event.items
            ))
        else:
            await self.event_publisher.publish(InventoryUnavailableEvent(
                order_id=event.order_id,
                unavailable_items=availability.unavailable_items
            ))
    
    async def handle_inventory_available(self, event: InventoryAvailableEvent):
        """
        Designentscheidung: Parallel oder sequenziell?
        - Parallel: Schneller, aber komplexere Fehlerbehandlung
        - Sequenziell: Einfacher, aber langsamer
        
        Für unser E-Commerce-Beispiel: Parallel, da Geschwindigkeit kritisch ist
        """
        self._log_process_step("Inventory confirmed", event.order_id)
        
        # Parallele Ausführung mit asyncio
        reservation_task = self._reserve_inventory(event)
        payment_task = self._process_payment(event)
        
        try:
            reservation_result, payment_result = await asyncio.gather(
                reservation_task, 
                payment_task
            )
            
            if reservation_result.successful and payment_result.successful:
                await self.event_publisher.publish(OrderReadyForShippingEvent(
                    order_id=event.order_id
                ))
            else:
                # Was passiert bei partiellen Fehlern?
                await self._handle_partial_failure(event, reservation_result, payment_result)
                
        except Exception as e:
            await self._handle_process_exception(event, e)

52.3 State Machines

52.3.1 Von impliziten zu expliziten Zuständen

Reflexionsfrage: In unserem bisherigen Code - wo ist eigentlich der “Zustand” einer Bestellung gespeichert? Ist er explizit oder müssen wir ihn aus den Events ableiten?

Die meisten event-getriebenen Systeme haben implizite Zustände. State Machines machen diese explizit und bieten dadurch bessere Kontrolle und Verständlichkeit.

52.3.2 Implementierung einer Order State Machine

// Explizite Zustandsdefinition
public enum OrderState {
    PLACED,
    INVENTORY_CHECKING,
    INVENTORY_CONFIRMED,
    INVENTORY_INSUFFICIENT,
    PAYMENT_PROCESSING,
    PAYMENT_CONFIRMED,
    PAYMENT_FAILED,
    READY_FOR_SHIPPING,
    SHIPPED,
    DELIVERED,
    CANCELLED
}

// State Machine Implementation
@Component
public class OrderStateMachine {
    
    private final Map<String, OrderState> orderStates = new ConcurrentHashMap<>();
    
    @EventListener
    public void onOrderPlaced(OrderPlacedEvent event) {
        // State explizit setzen
        OrderState newState = transition(
            null, // Startzustand
            OrderState.PLACED,
            event
        );
        
        orderStates.put(event.getOrderId(), newState);
        
        // State-abhängige Logik
        triggerInventoryCheck(event);
    }
    
    @EventListener
    public void onInventoryAvailable(InventoryAvailableEvent event) {
        OrderState currentState = orderStates.get(event.getOrderId());
        
        // State Transition validieren
        if (currentState == OrderState.PLACED) {
            OrderState newState = transition(
                currentState,
                OrderState.INVENTORY_CONFIRMED,
                event
            );
            
            orderStates.put(event.getOrderId(), newState);
            triggerPaymentProcessing(event);
        } else {
            // Ungültiger State-Übergang
            log.warn("Invalid state transition: {} -> INVENTORY_CONFIRMED for order {}", 
                currentState, event.getOrderId());
        }
    }
    
    private OrderState transition(OrderState from, OrderState to, DomainEvent event) {
        // Frage: Welche Validierungen sollten wir hier durchführen?
        
        // 1. Ist der Übergang erlaubt?
        if (!isValidTransition(from, to)) {
            throw new InvalidStateTransitionException(from, to);
        }
        
        // 2. State-Logging für Debugging
        log.info("Order {} transitioned from {} to {} due to {}", 
            extractOrderId(event), from, to, event.getClass().getSimpleName());
        
        // 3. State-Change Events publizieren (für Monitoring)
        eventPublisher.publish(new OrderStateChangedEvent(
            extractOrderId(event), from, to, event
        ));
        
        return to;
    }
    
    private boolean isValidTransition(OrderState from, OrderState to) {
        // Welche Übergänge sind in unserem Business-Kontext erlaubt?
        Map<OrderState, Set<OrderState>> allowedTransitions = Map.of(
            null, Set.of(OrderState.PLACED),
            OrderState.PLACED, Set.of(OrderState.INVENTORY_CONFIRMED, OrderState.INVENTORY_INSUFFICIENT),
            OrderState.INVENTORY_CONFIRMED, Set.of(OrderState.PAYMENT_CONFIRMED, OrderState.PAYMENT_FAILED),
            OrderState.PAYMENT_CONFIRMED, Set.of(OrderState.READY_FOR_SHIPPING),
            OrderState.READY_FOR_SHIPPING, Set.of(OrderState.SHIPPED),
            OrderState.SHIPPED, Set.of(OrderState.DELIVERED)
            // Cancelled kann von fast jedem State erreicht werden
        );
        
        return allowedTransitions.getOrDefault(from, Set.of()).contains(to) ||
               to == OrderState.CANCELLED; // Cancellation ist fast immer möglich
    }
}

52.3.3 Python State Machine mit Enum und Type Safety

# state_machine.py
from enum import Enum, auto
from typing import Dict, Set, Optional
import logging

class OrderState(Enum):
    PLACED = auto()
    INVENTORY_CHECKING = auto()
    INVENTORY_CONFIRMED = auto()
    INVENTORY_INSUFFICIENT = auto()
    PAYMENT_PROCESSING = auto()
    PAYMENT_CONFIRMED = auto()
    PAYMENT_FAILED = auto()
    READY_FOR_SHIPPING = auto()
    SHIPPED = auto()
    DELIVERED = auto()
    CANCELLED = auto()

class OrderStateMachine:
    def __init__(self, event_publisher):
        self.event_publisher = event_publisher
        self.order_states: Dict[str, OrderState] = {}
        
        # Explizite Definition erlaubter Übergänge
        self.allowed_transitions: Dict[Optional[OrderState], Set[OrderState]] = {
            None: {OrderState.PLACED},
            OrderState.PLACED: {OrderState.INVENTORY_CONFIRMED, OrderState.INVENTORY_INSUFFICIENT},
            OrderState.INVENTORY_CONFIRMED: {OrderState.PAYMENT_CONFIRMED, OrderState.PAYMENT_FAILED},
            OrderState.PAYMENT_CONFIRMED: {OrderState.READY_FOR_SHIPPING},
            OrderState.READY_FOR_SHIPPING: {OrderState.SHIPPED},
            OrderState.SHIPPED: {OrderState.DELIVERED}
        }
    
    async def handle_order_placed(self, event: OrderPlacedEvent):
        """
        Lernfrage: Warum ist es wichtig, State-Übergänge explizit zu validieren?
        
        Antwort: Weil es uns hilft, inkonsistente Zustände zu vermeiden und 
        Bugs frühzeitig zu erkennen. Ein System ohne explizite State-Validierung
        kann in undefinierte Zustände geraten.
        """
        new_state = await self._transition(
            order_id=event.order_id,
            from_state=None,
            to_state=OrderState.PLACED,
            event=event
        )
        
        self.order_states[event.order_id] = new_state
    
    async def handle_inventory_available(self, event: InventoryAvailableEvent):
        current_state = self.order_states.get(event.order_id)
        
        if current_state == OrderState.PLACED:
            new_state = await self._transition(
                order_id=event.order_id,
                from_state=current_state,
                to_state=OrderState.INVENTORY_CONFIRMED,
                event=event
            )
            self.order_states[event.order_id] = new_state
        else:
            logging.warning(
                f"Invalid state transition: {current_state} -> INVENTORY_CONFIRMED "
                f"for order {event.order_id}"
            )
    
    async def _transition(self, order_id: str, from_state: Optional[OrderState], 
                         to_state: OrderState, event) -> OrderState:
        # State-Transition validieren
        if not self._is_valid_transition(from_state, to_state):
            raise InvalidStateTransitionException(from_state, to_state)
        
        # Cancellation ist ein Sonderfall - von fast jedem State möglich
        if to_state == OrderState.CANCELLED:
            pass  # Immer erlaubt
        
        # Logging für Debugging und Monitoring
        logging.info(
            f"Order {order_id} transitioned from {from_state} to {to_state} "
            f"due to {event.__class__.__name__}"
        )
        
        # State-Change Event für externe Systeme
        await self.event_publisher.publish(OrderStateChangedEvent(
            order_id=order_id,
            from_state=from_state,
            to_state=to_state,
            trigger_event=event
        ))
        
        return to_state
    
    def _is_valid_transition(self, from_state: Optional[OrderState], 
                           to_state: OrderState) -> bool:
        allowed = self.allowed_transitions.get(from_state, set())
        return to_state in allowed or to_state == OrderState.CANCELLED

52.4 Business Process Management

52.4.1 Von technischen zu fachlichen Prozessen

Herausforderung: Bisher haben wir technische Workflows modelliert. Aber wie übersetzen wir fachliche Anforderungen in event-getriebene Prozesse?

Szenario: Ihre Fachabteilung kommt zu Ihnen und sagt: “Wir brauchen einen Genehmigungsprozess für Bestellungen über 1000 Euro. Der Manager muss binnen 24 Stunden entscheiden, sonst wird die Bestellung automatisch abgelehnt.”

Frage: Wie würden Sie das als Event-driven Process modellieren?

52.4.2 Implementierung fachlicher Prozesse

// Business Process: Manager Approval Workflow
@Component
public class ManagerApprovalProcess {
    
    @EventListener
    public void onOrderPlaced(OrderPlacedEvent event) {
        // Fachliche Regel: Genehmigung ab 1000 Euro
        if (event.getTotalAmount().compareTo(BigDecimal.valueOf(1000)) >= 0) {
            initiateApprovalProcess(event);
        } else {
            // Automatische Genehmigung für kleinere Beträge
            eventPublisher.publish(new OrderApprovedEvent(
                event.getOrderId(),
                "AUTO_APPROVED",
                "Amount below approval threshold"
            ));
        }
    }
    
    private void initiateApprovalProcess(OrderPlacedEvent event) {
        // 1. Approval Request erstellen
        ApprovalRequest request = new ApprovalRequest(
            event.getOrderId(),
            event.getTotalAmount(),
            event.getCustomerId(),
            LocalDateTime.now().plusHours(24) // 24h Deadline
        );
        
        approvalRepository.save(request);
        
        // 2. Manager benachrichtigen
        eventPublisher.publish(new ApprovalRequestedEvent(
            event.getOrderId(),
            request.getId(),
            event.getTotalAmount(),
            request.getDeadline()
        ));
        
        // 3. Timeout scheduler
        scheduleApprovalTimeout(request);
    }
    
    @EventListener
    public void onApprovalDecision(ApprovalDecisionEvent event) {
        ApprovalRequest request = approvalRepository.findByOrderId(event.getOrderId());
        
        if (request.isExpired()) {
            // Zu spät entschieden
            eventPublisher.publish(new OrderRejectedEvent(
                event.getOrderId(),
                "APPROVAL_TIMEOUT",
                "Decision received after deadline"
            ));
            return;
        }
        
        if (event.isApproved()) {
            eventPublisher.publish(new OrderApprovedEvent(
                event.getOrderId(),
                event.getManagerId(),
                event.getReason()
            ));
        } else {
            eventPublisher.publish(new OrderRejectedEvent(
                event.getOrderId(),
                event.getManagerId(),
                event.getReason()
            ));
        }
        
        // Request als erledigt markieren
        request.setDecisionMade(true);
        approvalRepository.save(request);
    }
    
    @Scheduled(fixedDelay = 300000) // Alle 5 Minuten
    public void processExpiredApprovals() {
        List<ApprovalRequest> expiredRequests = approvalRepository.findExpiredRequests();
        
        for (ApprovalRequest request : expiredRequests) {
            eventPublisher.publish(new OrderRejectedEvent(
                request.getOrderId(),
                "SYSTEM",
                "Approval timeout - no decision within 24 hours"
            ));
            
            request.setDecisionMade(true);
            request.setAutoRejected(true);
            approvalRepository.save(request);
        }
    }
}

52.4.3 Integration mit externen BPM-Systemen

Reflexion: Viele Unternehmen haben bereits etablierte BPM-Tools (Camunda, Activiti, etc.). Wie können wir event-getriebene Architektur mit diesen Tools kombinieren?

# bpm_integration.py
class BPMIntegrationService:
    """
    Integration mit externen BPM-Engines über Events
    
    Designprinzip: BPM-Engine als ein Service unter vielen,
    nicht als zentrale Steuerungsinstanz
    """
    
    def __init__(self, bpm_engine_client, event_publisher):
        self.bpm_engine = bpm_engine_client
        self.event_publisher = event_publisher
    
    async def handle_complex_approval_request(self, event: ComplexApprovalRequestedEvent):
        """
        Frage: Wann sollten wir externe BPM-Tools verwenden?
        
        Antworten:
        - Wenn Fachabteilungen Prozesse selbst modellieren wollen
        - Bei sehr komplexen Genehmigungsworkflows
        - Wenn regulatorische Compliance-Dokumentation erforderlich ist
        - Bei häufig ändernden Geschäftsregeln
        """
        
        # BPM Process Instance starten
        process_instance = await self.bpm_engine.start_process(
            process_key="complex_order_approval",
            business_key=event.order_id,
            variables={
                "orderId": event.order_id,
                "amount": float(event.total_amount),
                "customerId": event.customer_id,
                "riskScore": event.risk_score
            }
        )
        
        # Process Instance mit unserem Event-System verknüpfen
        await self.event_publisher.publish(BPMProcessStartedEvent(
            order_id=event.order_id,
            process_instance_id=process_instance.id,
            process_key="complex_order_approval"
        ))
    
    async def handle_bpm_task_completed(self, event: BPMTaskCompletedEvent):
        """
        BPM-Engine informiert uns über Task-Completion
        """
        
        # Business Logic basierend auf Task-Typ
        if event.task_type == "manager_approval":
            if event.task_result.get("approved"):
                await self.event_publisher.publish(ManagerApprovalCompletedEvent(
                    order_id=event.business_key,
                    approved=True,
                    manager_id=event.task_result.get("managerId"),
                    comments=event.task_result.get("comments")
                ))
            else:
                await self.event_publisher.publish(ManagerApprovalCompletedEvent(
                    order_id=event.business_key,
                    approved=False,
                    manager_id=event.task_result.get("managerId"),
                    reason=event.task_result.get("rejectionReason")
                ))
        
        elif event.task_type == "risk_assessment":
            await self.event_publisher.publish(RiskAssessmentCompletedEvent(
                order_id=event.business_key,
                risk_level=event.task_result.get("riskLevel"),
                requires_additional_approval=event.task_result.get("requiresAdditionalApproval")
            ))
    
    async def handle_bpm_process_completed(self, event: BPMProcessCompletedEvent):
        """
        Gesamter BPM-Prozess abgeschlossen
        """
        process_result = event.process_result
        
        if process_result.get("finalApproval"):
            await self.event_publisher.publish(OrderApprovedEvent(
                order_id=event.business_key,
                approval_source="BPM_WORKFLOW",
                final_approver=process_result.get("finalApproverId")
            ))
        else:
            await self.event_publisher.publish(OrderRejectedEvent(
                order_id=event.business_key,
                rejection_source="BPM_WORKFLOW",
                rejection_reason=process_result.get("rejectionReason")
            ))

52.4.4 Monitoring und Optimierung von Business Processes

// Process Analytics und Monitoring
@Component
public class ProcessAnalytics {
    
    @EventListener
    public void onProcessEvent(ProcessEvent event) {
        // Alle Process-Events für Analytics sammeln
        ProcessMetric metric = new ProcessMetric(
            event.getProcessType(),
            event.getProcessStep(),
            event.getTimestamp(),
            event.getDuration(),
            event.getOutcome()
        );
        
        metricsRepository.save(metric);
        
        // Real-time Process Monitoring
        updateProcessDashboard(event);
    }
    
    @Scheduled(cron = "0 0 8 * * MON") // Jeden Montag um 8 Uhr
    public void generateWeeklyProcessReport() {
        ProcessReport report = ProcessReport.builder()
            .timeRange(getLastWeek())
            .avgApprovalTime(calculateAverageApprovalTime())
            .approvalRate(calculateApprovalRate())
            .bottlenecks(identifyBottlenecks())
            .recommendations(generateRecommendations())
            .build();
        
        // Report an Stakeholder versenden
        eventPublisher.publish(new ProcessReportGeneratedEvent(report));
    }
    
    private List<ProcessBottleneck> identifyBottlenecks() {
        // Welche Process Steps dauern überdurchschnittlich lange?
        // Wo häufen sich Fehler?
        // Welche Manager sind überlastet?
        
        return processMetricsAnalyzer.findBottlenecks();
    }
}

Abschließende Reflexion:

Event-getriebene Workflow-Modellierung erfordert ein Umdenken: Anstatt Prozesse als Abfolge von Aktivitäten zu sehen, modellieren wir sie als Reaktionen auf Ereignisse. Dies bietet:

  1. Flexibilität: Neue Anforderungen können als neue Event-Handler hinzugefügt werden
  2. Nachvollziehbarkeit: Jeder Process-Schritt ist durch Events dokumentiert
  3. Testbarkeit: Events können isoliert getestet werden
  4. Monitoring: Process-Performance ist durch Event-Timing messbar

Ihre Aufgabe: Denken Sie an einen komplexen Geschäftsprozess in Ihrem Unternehmen. Wie würden Sie ihn in Events und State Transitions zerlegen? Welche fachlichen Regeln müssten als Event-Handler implementiert werden?