Im Gegensatz zur Choreographie übernimmt bei der Orchestrierung eine zentrale Komponente - der Orchestrator - die Steuerung und Koordination des gesamten Workflows. Dieser kennt den vollständigen Geschäftsprozess und entscheidet, welche Services wann und in welcher Reihenfolge aufgerufen werden.
Der Orchestrator agiert wie ein Dirigent, der jeden Musiker im Orchester dirigiert und den Gesamtklang koordiniert. Er hat die Partitur vor sich und weiß genau, wann welches Instrument einsetzen muss.
// OrderWorkflowOrchestrator - Zentrale Steuerung des Bestellprozesses
@Component
public class OrderWorkflowOrchestrator {
private final PaymentService paymentService;
private final InventoryService inventoryService;
private final ShippingService shippingService;
private final WorkflowStateRepository stateRepository;
public OrderWorkflowOrchestrator(PaymentService paymentService,
InventoryService inventoryService,
ShippingService shippingService,
WorkflowStateRepository stateRepository) {
this.paymentService = paymentService;
this.inventoryService = inventoryService;
this.shippingService = shippingService;
this.stateRepository = stateRepository;
}
@EventListener
public void handleOrderPlaced(OrderPlacedEvent event) {
WorkflowState state = new WorkflowState(event.getOrderId());
state.setStep(WorkflowStep.ORDER_RECEIVED);
stateRepository.save(state);
// Orchestrator entscheidet über die Reihenfolge
processPayment(state, event);
}
private void processPayment(WorkflowState state, OrderPlacedEvent orderEvent) {
try {
PaymentResult result = paymentService.processPayment(
orderEvent.getCustomerId(),
orderEvent.getTotalAmount()
);
if (result.isSuccessful()) {
state.setStep(WorkflowStep.PAYMENT_COMPLETED);
state.setPaymentId(result.getTransactionId());
stateRepository.save(state);
// Nächster Schritt wird vom Orchestrator bestimmt
reserveInventory(state, orderEvent);
} else {
handlePaymentFailure(state, result);
}
} catch (PaymentException e) {
handlePaymentFailure(state, e);
}
}
private void reserveInventory(WorkflowState state, OrderPlacedEvent orderEvent) {
try {
InventoryResult result = inventoryService.reserveItems(orderEvent.getItems());
if (result.isSuccessful()) {
state.setStep(WorkflowStep.INVENTORY_RESERVED);
state.setReservationId(result.getReservationId());
stateRepository.save(state);
// Workflow geht zum nächsten Schritt
initiateShipping(state, orderEvent);
} else {
handleInventoryFailure(state, orderEvent, result);
}
} catch (InventoryException e) {
handleInventoryFailure(state, orderEvent, e);
}
}
private void initiateShipping(WorkflowState state, OrderPlacedEvent orderEvent) {
ShippingResult result = shippingService.createShipment(
orderEvent.getOrderId(),
orderEvent.getShippingAddress()
);
state.setStep(WorkflowStep.SHIPPING_INITIATED);
state.setShipmentId(result.getShipmentId());
stateRepository.save(state);
eventPublisher.publish(new OrderCompletedEvent(orderEvent.getOrderId()));
}
}# order_workflow_orchestrator.py
class OrderWorkflowOrchestrator:
def __init__(self, payment_service, inventory_service, shipping_service, state_repository):
self.payment_service = payment_service
self.inventory_service = inventory_service
self.shipping_service = shipping_service
self.state_repository = state_repository
async def handle_order_placed(self, event: OrderPlacedEvent):
# Workflow-State initialisieren
state = WorkflowState(order_id=event.order_id)
state.step = WorkflowStep.ORDER_RECEIVED
await self.state_repository.save(state)
# Orchestrator steuert den Ablauf
await self._process_payment(state, event)
async def _process_payment(self, state: WorkflowState, order_event: OrderPlacedEvent):
try:
result = await self.payment_service.process_payment(
customer_id=order_event.customer_id,
amount=order_event.total_amount
)
if result.successful:
state.step = WorkflowStep.PAYMENT_COMPLETED
state.payment_id = result.transaction_id
await self.state_repository.save(state)
# Orchestrator bestimmt nächsten Schritt
await self._reserve_inventory(state, order_event)
else:
await self._handle_payment_failure(state, result)
except PaymentException as e:
await self._handle_payment_failure(state, e)
async def _reserve_inventory(self, state: WorkflowState, order_event: OrderPlacedEvent):
try:
result = await self.inventory_service.reserve_items(order_event.items)
if result.successful:
state.step = WorkflowStep.INVENTORY_RESERVED
state.reservation_id = result.reservation_id
await self.state_repository.save(state)
await self._initiate_shipping(state, order_event)
else:
await self._handle_inventory_failure(state, order_event, result)
except InventoryException as e:
await self._handle_inventory_failure(state, order_event, e)Der Orchestrator verwendet ein Command-and-Control-Modell, bei dem er explizite Befehle an nachgelagerte Services sendet und deren Antworten verarbeitet. Dies unterscheidet sich fundamental von der ereignisgetriebenen Choreographie.
| Steuerungsaspekt | Orchestrierung | Choreographie |
|---|---|---|
| Entscheidung | Zentral im Orchestrator | Verteilt in jedem Service |
| Workflow-Wissen | Vollständig im Orchestrator | Fragmentiert über Services |
| Fehlerbehandlung | Zentrale Behandlung | Verteilte Behandlung |
| Änderungen | Orchestrator modifizieren | Services einzeln anpassen |
// Command-Pattern für Orchestrator-Steuerung
public interface WorkflowCommand {
CompletableFuture<WorkflowResult> execute(WorkflowContext context);
CompletableFuture<Void> compensate(WorkflowContext context);
}
@Component
public class ProcessPaymentCommand implements WorkflowCommand {
private final PaymentService paymentService;
@Override
public CompletableFuture<WorkflowResult> execute(WorkflowContext context) {
return CompletableFuture.supplyAsync(() -> {
try {
PaymentResult result = paymentService.processPayment(
context.getCustomerId(),
context.getTotalAmount()
);
return WorkflowResult.success(result);
} catch (Exception e) {
return WorkflowResult.failure(e);
}
});
}
@Override
public CompletableFuture<Void> compensate(WorkflowContext context) {
return CompletableFuture.runAsync(() -> {
if (context.hasPaymentId()) {
paymentService.refundPayment(context.getPaymentId());
}
});
}
}
// Orchestrator nutzt Commands
@Component
public class CommandBasedOrchestrator {
private final List<WorkflowCommand> commands;
public CompletableFuture<Void> executeWorkflow(WorkflowContext context) {
List<WorkflowCommand> executedCommands = new ArrayList<>();
try {
for (WorkflowCommand command : commands) {
WorkflowResult result = command.execute(context).get();
if (result.isFailure()) {
// Kompensation bereits ausgeführter Commands
compensateExecutedCommands(executedCommands, context);
throw new WorkflowException(result.getError());
}
executedCommands.add(command);
context.addResult(result);
}
} catch (Exception e) {
compensateExecutedCommands(executedCommands, context);
throw e;
}
return CompletableFuture.completedFuture(null);
}
}Orchestrators müssen den aktuellen Zustand von Workflows persistent speichern, um bei Fehlern oder Systemneustarts den Workflow an der richtigen Stelle fortsetzen zu können.
// WorkflowState Entity für Persistierung
@Entity
@Table(name = "workflow_states")
public class WorkflowState {
@Id
private String orderId;
@Enumerated(EnumType.STRING)
private WorkflowStep currentStep;
@Column(columnDefinition = "json")
private String workflowData; // JSON-serialisierte Workflow-Daten
private LocalDateTime lastUpdate;
private String errorMessage;
private int retryCount;
// State-spezifische Felder
private String paymentId;
private String reservationId;
private String shipmentId;
// Constructor, Getters, Setters...
}
// State-Management im Orchestrator
@Service
public class StatefulOrderOrchestrator {
@Scheduled(fixedDelay = 60000) // Alle 60 Sekunden
public void resumeStuckWorkflows() {
List<WorkflowState> stuckWorkflows = stateRepository.findStuckWorkflows(
LocalDateTime.now().minusMinutes(5)
);
for (WorkflowState state : stuckWorkflows) {
try {
resumeWorkflowFromState(state);
} catch (Exception e) {
handleWorkflowResumptionError(state, e);
}
}
}
private void resumeWorkflowFromState(WorkflowState state) {
switch (state.getCurrentStep()) {
case PAYMENT_PENDING:
retryPaymentStep(state);
break;
case INVENTORY_PENDING:
retryInventoryStep(state);
break;
case SHIPPING_PENDING:
retryShippingStep(state);
break;
default:
log.warn("Unknown workflow step: {}", state.getCurrentStep());
}
}
}# workflow_state.py
from sqlalchemy import Column, String, DateTime, Integer, Enum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import JSON
Base = declarative_base()
class WorkflowState(Base):
__tablename__ = 'workflow_states'
order_id = Column(String, primary_key=True)
current_step = Column(Enum(WorkflowStep))
workflow_data = Column(JSON)
last_update = Column(DateTime)
error_message = Column(String)
retry_count = Column(Integer, default=0)
payment_id = Column(String)
reservation_id = Column(String)
shipment_id = Column(String)
# stateful_orchestrator.py
class StatefulOrderOrchestrator:
def __init__(self, session, payment_service, inventory_service, shipping_service):
self.session = session
self.payment_service = payment_service
self.inventory_service = inventory_service
self.shipping_service = shipping_service
async def resume_stuck_workflows(self):
cutoff_time = datetime.utcnow() - timedelta(minutes=5)
stuck_workflows = self.session.query(WorkflowState).filter(
WorkflowState.last_update < cutoff_time,
WorkflowState.current_step.in_([
WorkflowStep.PAYMENT_PENDING,
WorkflowStep.INVENTORY_PENDING,
WorkflowStep.SHIPPING_PENDING
])
).all()
for state in stuck_workflows:
try:
await self._resume_workflow_from_state(state)
except Exception as e:
await self._handle_workflow_resumption_error(state, e)
async def _resume_workflow_from_state(self, state: WorkflowState):
if state.current_step == WorkflowStep.PAYMENT_PENDING:
await self._retry_payment_step(state)
elif state.current_step == WorkflowStep.INVENTORY_PENDING:
await self._retry_inventory_step(state)
elif state.current_step == WorkflowStep.SHIPPING_PENDING:
await self._retry_shipping_step(state)Orchestrierung eignet sich besonders gut für Szenarien mit komplexer Geschäftslogik, strikten Konsistenzanforderungen und der Notwendigkeit zentraler Kontrolle.
Ideale Szenarien für Orchestrierung:
| Szenario | Beschreibung | Beispiel |
|---|---|---|
| Komplexe Conditional Logic | Workflows mit vielen Verzweigungen und Bedingungen | Kreditgenehmigung mit verschiedenen Prüfschritten |
| Strong Consistency | Sofortige Konsistenz zwischen Services erforderlich | Finanz-Transaktionen |
| Complex Error Handling | Sophisticated Rollback und Kompensationslogik | Reisebuchung mit Hotel, Flug, Mietwagen |
| Compliance Requirements | Auditierbarkeit und Nachverfolgung erforderlich | Medizinische Behandlungsworkflows |
| Long-running Processes | Workflows mit manuellen Eingriffen | Bestellgenehmigung mit Manager-Freigabe |
// Komplexer Workflow mit Bedingungslogik
@Component
public class LoanApprovalOrchestrator {
public CompletableFuture<LoanDecision> processLoanApplication(LoanApplication application) {
WorkflowContext context = new WorkflowContext(application);
// Schritt 1: Grundlegende Validierung
return validateApplication(context)
.thenCompose(validation -> {
if (!validation.isValid()) {
return CompletableFuture.completedFuture(
LoanDecision.rejected(validation.getReasons())
);
}
// Schritt 2: Kreditwürdigkeitsprüfung
return checkCreditScore(context);
})
.thenCompose(score -> {
context.setCreditScore(score);
// Komplexe Entscheidungslogik
if (score.getValue() < 300) {
return CompletableFuture.completedFuture(
LoanDecision.rejected("Insufficient credit score")
);
} else if (score.getValue() < 600) {
// Zusätzliche Prüfungen für mittlere Scores
return processHighRiskApplication(context);
} else {
// Fast-Track für gute Scores
return processFastTrackApplication(context);
}
});
}
private CompletableFuture<LoanDecision> processHighRiskApplication(WorkflowContext context) {
// Einkommensprüfung
return verifyIncome(context)
.thenCompose(income -> {
if (!income.isSufficient()) {
return CompletableFuture.completedFuture(
LoanDecision.rejected("Insufficient income")
);
}
// Manuelle Prüfung erforderlich
return requestManualReview(context)
.thenCompose(review -> waitForManualDecision(context.getApplicationId()));
});
}
}Anti-Patterns für Orchestrierung:
// Anti-Pattern: Orchestrator als Event-Forwarder
@Component
public class UnnecessaryOrchestrator {
@EventListener
public void handleOrderPlaced(OrderPlacedEvent event) {
// Anti-Pattern: Orchestrator macht nur Event-Forwarding
eventPublisher.publish(new ProcessOrderEvent(event.getOrderId()));
}
@EventListener
public void handleOrderProcessed(OrderProcessedEvent event) {
// Anti-Pattern: Keine Geschäftslogik, nur Weiterleitung
eventPublisher.publish(new ShipOrderEvent(event.getOrderId()));
}
}Die Entscheidung zwischen Orchestrierung und Choreographie hängt stark vom Kontext ab. Orchestrierung bietet Kontrolle und Nachverfolgbarkeit auf Kosten der Flexibilität und kann bei unsachgemäßer Anwendung zu monolithischen Strukturen führen. Das Verständnis der Trade-offs ist entscheidend für die richtige Architekturentscheidung.