49 Orchestrierung: Zentrale Workflow-Steuerung

49.1 Das Orchestrator Pattern

Im Gegensatz zur Choreographie übernimmt bei der Orchestrierung eine zentrale Komponente - der Orchestrator - die Steuerung und Koordination des gesamten Workflows. Dieser kennt den vollständigen Geschäftsprozess und entscheidet, welche Services wann und in welcher Reihenfolge aufgerufen werden.

Der Orchestrator agiert wie ein Dirigent, der jeden Musiker im Orchester dirigiert und den Gesamtklang koordiniert. Er hat die Partitur vor sich und weiß genau, wann welches Instrument einsetzen muss.

49.1.1 Grundarchitektur eines Orchestrators

// OrderWorkflowOrchestrator - Zentrale Steuerung des Bestellprozesses
@Component
public class OrderWorkflowOrchestrator {
    
    private final PaymentService paymentService;
    private final InventoryService inventoryService;
    private final ShippingService shippingService;
    private final WorkflowStateRepository stateRepository;
    
    public OrderWorkflowOrchestrator(PaymentService paymentService,
                                   InventoryService inventoryService,
                                   ShippingService shippingService,
                                   WorkflowStateRepository stateRepository) {
        this.paymentService = paymentService;
        this.inventoryService = inventoryService;
        this.shippingService = shippingService;
        this.stateRepository = stateRepository;
    }
    
    @EventListener
    public void handleOrderPlaced(OrderPlacedEvent event) {
        WorkflowState state = new WorkflowState(event.getOrderId());
        state.setStep(WorkflowStep.ORDER_RECEIVED);
        stateRepository.save(state);
        
        // Orchestrator entscheidet über die Reihenfolge
        processPayment(state, event);
    }
    
    private void processPayment(WorkflowState state, OrderPlacedEvent orderEvent) {
        try {
            PaymentResult result = paymentService.processPayment(
                orderEvent.getCustomerId(),
                orderEvent.getTotalAmount()
            );
            
            if (result.isSuccessful()) {
                state.setStep(WorkflowStep.PAYMENT_COMPLETED);
                state.setPaymentId(result.getTransactionId());
                stateRepository.save(state);
                
                // Nächster Schritt wird vom Orchestrator bestimmt
                reserveInventory(state, orderEvent);
            } else {
                handlePaymentFailure(state, result);
            }
        } catch (PaymentException e) {
            handlePaymentFailure(state, e);
        }
    }
    
    private void reserveInventory(WorkflowState state, OrderPlacedEvent orderEvent) {
        try {
            InventoryResult result = inventoryService.reserveItems(orderEvent.getItems());
            
            if (result.isSuccessful()) {
                state.setStep(WorkflowStep.INVENTORY_RESERVED);
                state.setReservationId(result.getReservationId());
                stateRepository.save(state);
                
                // Workflow geht zum nächsten Schritt
                initiateShipping(state, orderEvent);
            } else {
                handleInventoryFailure(state, orderEvent, result);
            }
        } catch (InventoryException e) {
            handleInventoryFailure(state, orderEvent, e);
        }
    }
    
    private void initiateShipping(WorkflowState state, OrderPlacedEvent orderEvent) {
        ShippingResult result = shippingService.createShipment(
            orderEvent.getOrderId(),
            orderEvent.getShippingAddress()
        );
        
        state.setStep(WorkflowStep.SHIPPING_INITIATED);
        state.setShipmentId(result.getShipmentId());
        stateRepository.save(state);
        
        eventPublisher.publish(new OrderCompletedEvent(orderEvent.getOrderId()));
    }
}

49.1.2 Python-Implementierung des Orchestrator Patterns

# order_workflow_orchestrator.py
class OrderWorkflowOrchestrator:
    def __init__(self, payment_service, inventory_service, shipping_service, state_repository):
        self.payment_service = payment_service
        self.inventory_service = inventory_service
        self.shipping_service = shipping_service
        self.state_repository = state_repository
    
    async def handle_order_placed(self, event: OrderPlacedEvent):
        # Workflow-State initialisieren
        state = WorkflowState(order_id=event.order_id)
        state.step = WorkflowStep.ORDER_RECEIVED
        await self.state_repository.save(state)
        
        # Orchestrator steuert den Ablauf
        await self._process_payment(state, event)
    
    async def _process_payment(self, state: WorkflowState, order_event: OrderPlacedEvent):
        try:
            result = await self.payment_service.process_payment(
                customer_id=order_event.customer_id,
                amount=order_event.total_amount
            )
            
            if result.successful:
                state.step = WorkflowStep.PAYMENT_COMPLETED
                state.payment_id = result.transaction_id
                await self.state_repository.save(state)
                
                # Orchestrator bestimmt nächsten Schritt
                await self._reserve_inventory(state, order_event)
            else:
                await self._handle_payment_failure(state, result)
                
        except PaymentException as e:
            await self._handle_payment_failure(state, e)
    
    async def _reserve_inventory(self, state: WorkflowState, order_event: OrderPlacedEvent):
        try:
            result = await self.inventory_service.reserve_items(order_event.items)
            
            if result.successful:
                state.step = WorkflowStep.INVENTORY_RESERVED
                state.reservation_id = result.reservation_id
                await self.state_repository.save(state)
                
                await self._initiate_shipping(state, order_event)
            else:
                await self._handle_inventory_failure(state, order_event, result)
                
        except InventoryException as e:
            await self._handle_inventory_failure(state, order_event, e)

49.2 Command and Control

49.2.1 Explizite Steuerung vs. emergentes Verhalten

Der Orchestrator verwendet ein Command-and-Control-Modell, bei dem er explizite Befehle an nachgelagerte Services sendet und deren Antworten verarbeitet. Dies unterscheidet sich fundamental von der ereignisgetriebenen Choreographie.

Steuerungsaspekt Orchestrierung Choreographie
Entscheidung Zentral im Orchestrator Verteilt in jedem Service
Workflow-Wissen Vollständig im Orchestrator Fragmentiert über Services
Fehlerbehandlung Zentrale Behandlung Verteilte Behandlung
Änderungen Orchestrator modifizieren Services einzeln anpassen

49.2.2 Implementierung von Command-Patterns

// Command-Pattern für Orchestrator-Steuerung
public interface WorkflowCommand {
    CompletableFuture<WorkflowResult> execute(WorkflowContext context);
    CompletableFuture<Void> compensate(WorkflowContext context);
}

@Component
public class ProcessPaymentCommand implements WorkflowCommand {
    private final PaymentService paymentService;
    
    @Override
    public CompletableFuture<WorkflowResult> execute(WorkflowContext context) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                PaymentResult result = paymentService.processPayment(
                    context.getCustomerId(),
                    context.getTotalAmount()
                );
                return WorkflowResult.success(result);
            } catch (Exception e) {
                return WorkflowResult.failure(e);
            }
        });
    }
    
    @Override
    public CompletableFuture<Void> compensate(WorkflowContext context) {
        return CompletableFuture.runAsync(() -> {
            if (context.hasPaymentId()) {
                paymentService.refundPayment(context.getPaymentId());
            }
        });
    }
}

// Orchestrator nutzt Commands
@Component
public class CommandBasedOrchestrator {
    private final List<WorkflowCommand> commands;
    
    public CompletableFuture<Void> executeWorkflow(WorkflowContext context) {
        List<WorkflowCommand> executedCommands = new ArrayList<>();
        
        try {
            for (WorkflowCommand command : commands) {
                WorkflowResult result = command.execute(context).get();
                
                if (result.isFailure()) {
                    // Kompensation bereits ausgeführter Commands
                    compensateExecutedCommands(executedCommands, context);
                    throw new WorkflowException(result.getError());
                }
                
                executedCommands.add(command);
                context.addResult(result);
            }
        } catch (Exception e) {
            compensateExecutedCommands(executedCommands, context);
            throw e;
        }
        
        return CompletableFuture.completedFuture(null);
    }
}

49.3 State Management in Orchestrators

49.3.1 Persistente Workflow-States

Orchestrators müssen den aktuellen Zustand von Workflows persistent speichern, um bei Fehlern oder Systemneustarts den Workflow an der richtigen Stelle fortsetzen zu können.

// WorkflowState Entity für Persistierung
@Entity
@Table(name = "workflow_states")
public class WorkflowState {
    @Id
    private String orderId;
    
    @Enumerated(EnumType.STRING)
    private WorkflowStep currentStep;
    
    @Column(columnDefinition = "json")
    private String workflowData; // JSON-serialisierte Workflow-Daten
    
    private LocalDateTime lastUpdate;
    private String errorMessage;
    private int retryCount;
    
    // State-spezifische Felder
    private String paymentId;
    private String reservationId;
    private String shipmentId;
    
    // Constructor, Getters, Setters...
}

// State-Management im Orchestrator
@Service
public class StatefulOrderOrchestrator {
    
    @Scheduled(fixedDelay = 60000) // Alle 60 Sekunden
    public void resumeStuckWorkflows() {
        List<WorkflowState> stuckWorkflows = stateRepository.findStuckWorkflows(
            LocalDateTime.now().minusMinutes(5)
        );
        
        for (WorkflowState state : stuckWorkflows) {
            try {
                resumeWorkflowFromState(state);
            } catch (Exception e) {
                handleWorkflowResumptionError(state, e);
            }
        }
    }
    
    private void resumeWorkflowFromState(WorkflowState state) {
        switch (state.getCurrentStep()) {
            case PAYMENT_PENDING:
                retryPaymentStep(state);
                break;
            case INVENTORY_PENDING:
                retryInventoryStep(state);
                break;
            case SHIPPING_PENDING:
                retryShippingStep(state);
                break;
            default:
                log.warn("Unknown workflow step: {}", state.getCurrentStep());
        }
    }
}

49.3.2 Python State-Management mit SQLAlchemy

# workflow_state.py
from sqlalchemy import Column, String, DateTime, Integer, Enum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import JSON

Base = declarative_base()

class WorkflowState(Base):
    __tablename__ = 'workflow_states'
    
    order_id = Column(String, primary_key=True)
    current_step = Column(Enum(WorkflowStep))
    workflow_data = Column(JSON)
    last_update = Column(DateTime)
    error_message = Column(String)
    retry_count = Column(Integer, default=0)
    
    payment_id = Column(String)
    reservation_id = Column(String)
    shipment_id = Column(String)

# stateful_orchestrator.py
class StatefulOrderOrchestrator:
    def __init__(self, session, payment_service, inventory_service, shipping_service):
        self.session = session
        self.payment_service = payment_service
        self.inventory_service = inventory_service
        self.shipping_service = shipping_service
    
    async def resume_stuck_workflows(self):
        cutoff_time = datetime.utcnow() - timedelta(minutes=5)
        stuck_workflows = self.session.query(WorkflowState).filter(
            WorkflowState.last_update < cutoff_time,
            WorkflowState.current_step.in_([
                WorkflowStep.PAYMENT_PENDING,
                WorkflowStep.INVENTORY_PENDING,
                WorkflowStep.SHIPPING_PENDING
            ])
        ).all()
        
        for state in stuck_workflows:
            try:
                await self._resume_workflow_from_state(state)
            except Exception as e:
                await self._handle_workflow_resumption_error(state, e)
    
    async def _resume_workflow_from_state(self, state: WorkflowState):
        if state.current_step == WorkflowStep.PAYMENT_PENDING:
            await self._retry_payment_step(state)
        elif state.current_step == WorkflowStep.INVENTORY_PENDING:
            await self._retry_inventory_step(state)
        elif state.current_step == WorkflowStep.SHIPPING_PENDING:
            await self._retry_shipping_step(state)

49.4 When to Use Orchestration

49.4.1 Anwendungsszenarien für zentrale Steuerung

Orchestrierung eignet sich besonders gut für Szenarien mit komplexer Geschäftslogik, strikten Konsistenzanforderungen und der Notwendigkeit zentraler Kontrolle.

Ideale Szenarien für Orchestrierung:

Szenario Beschreibung Beispiel
Komplexe Conditional Logic Workflows mit vielen Verzweigungen und Bedingungen Kreditgenehmigung mit verschiedenen Prüfschritten
Strong Consistency Sofortige Konsistenz zwischen Services erforderlich Finanz-Transaktionen
Complex Error Handling Sophisticated Rollback und Kompensationslogik Reisebuchung mit Hotel, Flug, Mietwagen
Compliance Requirements Auditierbarkeit und Nachverfolgung erforderlich Medizinische Behandlungsworkflows
Long-running Processes Workflows mit manuellen Eingriffen Bestellgenehmigung mit Manager-Freigabe

49.4.2 Implementierung komplexer Geschäftslogik

// Komplexer Workflow mit Bedingungslogik
@Component
public class LoanApprovalOrchestrator {
    
    public CompletableFuture<LoanDecision> processLoanApplication(LoanApplication application) {
        WorkflowContext context = new WorkflowContext(application);
        
        // Schritt 1: Grundlegende Validierung
        return validateApplication(context)
            .thenCompose(validation -> {
                if (!validation.isValid()) {
                    return CompletableFuture.completedFuture(
                        LoanDecision.rejected(validation.getReasons())
                    );
                }
                
                // Schritt 2: Kreditwürdigkeitsprüfung
                return checkCreditScore(context);
            })
            .thenCompose(score -> {
                context.setCreditScore(score);
                
                // Komplexe Entscheidungslogik
                if (score.getValue() < 300) {
                    return CompletableFuture.completedFuture(
                        LoanDecision.rejected("Insufficient credit score")
                    );
                } else if (score.getValue() < 600) {
                    // Zusätzliche Prüfungen für mittlere Scores
                    return processHighRiskApplication(context);
                } else {
                    // Fast-Track für gute Scores
                    return processFastTrackApplication(context);
                }
            });
    }
    
    private CompletableFuture<LoanDecision> processHighRiskApplication(WorkflowContext context) {
        // Einkommensprüfung
        return verifyIncome(context)
            .thenCompose(income -> {
                if (!income.isSufficient()) {
                    return CompletableFuture.completedFuture(
                        LoanDecision.rejected("Insufficient income")
                    );
                }
                
                // Manuelle Prüfung erforderlich
                return requestManualReview(context)
                    .thenCompose(review -> waitForManualDecision(context.getApplicationId()));
            });
    }
}

49.4.3 Wann Orchestrierung vermeiden

Anti-Patterns für Orchestrierung:

// Anti-Pattern: Orchestrator als Event-Forwarder
@Component
public class UnnecessaryOrchestrator {
    
    @EventListener
    public void handleOrderPlaced(OrderPlacedEvent event) {
        // Anti-Pattern: Orchestrator macht nur Event-Forwarding
        eventPublisher.publish(new ProcessOrderEvent(event.getOrderId()));
    }
    
    @EventListener  
    public void handleOrderProcessed(OrderProcessedEvent event) {
        // Anti-Pattern: Keine Geschäftslogik, nur Weiterleitung
        eventPublisher.publish(new ShipOrderEvent(event.getOrderId()));
    }
}

Die Entscheidung zwischen Orchestrierung und Choreographie hängt stark vom Kontext ab. Orchestrierung bietet Kontrolle und Nachverfolgbarkeit auf Kosten der Flexibilität und kann bei unsachgemäßer Anwendung zu monolithischen Strukturen führen. Das Verständnis der Trade-offs ist entscheidend für die richtige Architekturentscheidung.