Bevor wir in die Implementierung von Sagas einsteigen, sollten wir uns eine grundlegende Frage stellen: Warum funktionieren klassische ACID-Transaktionen nicht in verteilten Event-Driven Systemen?
Nun soll der E-Commerce-System eine Bestellung atomisch verarbeiten - entweder komplett erfolgreich oder gar nicht. In einem monolithischen System würden wir eine Datenbank-Transaktion verwenden:
BEGIN TRANSACTION;
INSERT INTO orders (order_id, customer_id, total) VALUES (...);
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = ...;
INSERT INTO payments (order_id, amount, status) VALUES (...);
COMMIT;Reflexionsfrage: Was passiert, wenn diese Operationen auf drei verschiedene Services verteilt sind (OrderService, InventoryService, PaymentService), die jeweils eigene Datenbanken haben?
Die Antwort führt uns zu einem fundamentalen Problem: Wir brauchen einen Mechanismus für “Transaktionen” über Service-Grenzen hinweg, ohne die Vorteile der Service-Autonomie zu verlieren.
Eine Saga ist eine Sequenz von lokalen Transaktionen, wobei jede lokale Transaktion ihre eigene Datenbank aktualisiert. Wenn eine Transaktion fehlschlägt, führt die Saga eine Reihe von kompensierenden Transaktionen aus, um die Effekte der vorherigen Transaktionen rückgängig zu machen.
Denken Sie an eine Saga wie an eine Reisebuchung: Sie buchen Flug, Hotel und Mietwagen separat. Wenn der Mietwagen nicht verfügbar ist, müssen Sie Flug und Hotel stornieren.
// Schritt 1: Order Service startet die Saga
@Service
public class OrderService {
@Transactional
public void placeOrder(PlaceOrderCommand command) {
// Lokale Transaktion: Order erstellen
Order order = new Order(command);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// Saga starten durch Event
eventPublisher.publish(new OrderPlacedEvent(
order.getId(),
command.getCustomerId(),
command.getItems(),
command.getTotalAmount()
));
}
// Kompensation bei Saga-Fehlschlag
@EventListener
public void handleOrderCancellation(OrderCancellationRequestedEvent event) {
Order order = orderRepository.findById(event.getOrderId());
order.setStatus(OrderStatus.CANCELLED);
orderRepository.save(order);
eventPublisher.publish(new OrderCancelledEvent(event.getOrderId()));
}
}// Schritt 2: Payment Service als Teil der Saga
@Service
public class PaymentService {
@EventListener
@Transactional
public void handleOrderPlaced(OrderPlacedEvent event) {
try {
// Lokale Transaktion: Payment verarbeiten
Payment payment = processPayment(event.getCustomerId(), event.getTotalAmount());
if (payment.isSuccessful()) {
eventPublisher.publish(new PaymentProcessedEvent(
event.getOrderId(),
payment.getTransactionId(),
event.getTotalAmount()
));
} else {
// Saga-Fehlschlag initiieren
eventPublisher.publish(new PaymentFailedEvent(
event.getOrderId(),
payment.getErrorCode()
));
}
} catch (Exception e) {
eventPublisher.publish(new PaymentFailedEvent(
event.getOrderId(),
"TECHNICAL_ERROR"
));
}
}
// Kompensation: Refund
@EventListener
@Transactional
public void handleOrderCancellation(OrderCancellationRequestedEvent event) {
Payment payment = paymentRepository.findByOrderId(event.getOrderId());
if (payment != null && payment.isSuccessful()) {
Refund refund = processRefund(payment);
eventPublisher.publish(new PaymentRefundedEvent(
event.getOrderId(),
refund.getRefundId(),
payment.getAmount()
));
}
}
}# order_service.py
class OrderService:
def __init__(self, order_repository, event_publisher):
self.order_repository = order_repository
self.event_publisher = event_publisher
async def place_order(self, command: PlaceOrderCommand):
# Lokale Transaktion
async with self.order_repository.transaction():
order = Order(
id=command.order_id,
customer_id=command.customer_id,
items=command.items,
total_amount=command.total_amount,
status=OrderStatus.PENDING
)
await self.order_repository.save(order)
# Saga starten
await self.event_publisher.publish(OrderPlacedEvent(
order_id=order.id,
customer_id=order.customer_id,
items=order.items,
total_amount=order.total_amount
))
async def handle_order_cancellation(self, event: OrderCancellationRequestedEvent):
async with self.order_repository.transaction():
order = await self.order_repository.find_by_id(event.order_id)
order.status = OrderStatus.CANCELLED
await self.order_repository.save(order)
await self.event_publisher.publish(OrderCancelledEvent(
order_id=event.order_id
))Für komplexere Saga-Logik kann ein Saga-Orchestrator die Koordination übernehmen:
@Component
public class OrderSagaOrchestrator {
private final Map<String, SagaInstance> activeInstances = new ConcurrentHashMap<>();
@EventListener
public void handleOrderPlaced(OrderPlacedEvent event) {
SagaInstance saga = new SagaInstance(event.getOrderId());
saga.addStep(new ProcessPaymentStep());
saga.addStep(new ReserveInventoryStep());
saga.addStep(new InitiateShippingStep());
activeInstances.put(event.getOrderId(), saga);
// Ersten Schritt ausführen
executeNextStep(saga, event);
}
@EventListener
public void handlePaymentProcessed(PaymentProcessedEvent event) {
SagaInstance saga = activeInstances.get(event.getOrderId());
if (saga != null) {
saga.markStepCompleted(ProcessPaymentStep.class);
executeNextStep(saga, event);
}
}
@EventListener
public void handlePaymentFailed(PaymentFailedEvent event) {
SagaInstance saga = activeInstances.get(event.getOrderId());
if (saga != null) {
// Kompensation starten
compensateSaga(saga, event);
}
}
private void compensateSaga(SagaInstance saga, DomainEvent triggerEvent) {
List<SagaStep> completedSteps = saga.getCompletedSteps();
// Rückwärts durch die Schritte gehen
for (int i = completedSteps.size() - 1; i >= 0; i--) {
SagaStep step = completedSteps.get(i);
step.compensate(triggerEvent);
}
activeInstances.remove(saga.getId());
}
}Frage zum Nachdenken: Wenn Sie eine Hotelreservierung stornieren müssen, reicht es aus, den Datensatz zu löschen? Was ist mit Stornierungsgebühren, Benachrichtigungen oder anderen Seiteneffekten?
Kompensation in Sagas geht über einfaches “Rückgängigmachen” hinaus. Kompensations-Transaktionen müssen:
| Operation | Naive Kompensation | Semantische Kompensation |
|---|---|---|
| CreateOrder | DELETE order | SET status = ‘CANCELLED’, Notify customer |
| ChargePayment | DELETE payment | REFUND payment, Update billing |
| ReserveInventory | inventory += reserved | RELEASE reservation, Update availability |
| SendEmail | ❌ Unmöglich | Send cancellation email |
// Kompensations-Interface
public interface CompensationLogic {
CompensationResult compensate(CompensationContext context);
boolean isIdempotent();
int getMaxRetries();
}
// Konkrete Kompensations-Implementierung
@Component
public class PaymentCompensation implements CompensationLogic {
private final RefundService refundService;
private final PaymentRepository paymentRepository;
@Override
public CompensationResult compensate(CompensationContext context) {
String orderId = context.getOrderId();
try {
// Idempotenz prüfen
if (isAlreadyCompensated(orderId)) {
return CompensationResult.alreadyCompensated();
}
Payment payment = paymentRepository.findByOrderId(orderId);
if (payment == null) {
return CompensationResult.nothingToCompensate();
}
// Semantische Kompensation
RefundResult refund = refundService.processRefund(
payment.getTransactionId(),
payment.getAmount(),
"Order cancellation"
);
if (refund.isSuccessful()) {
markAsCompensated(orderId, refund.getRefundId());
// Benachrichtigung als Teil der Kompensation
notificationService.notifyCustomer(
payment.getCustomerId(),
"Refund processed for order " + orderId
);
return CompensationResult.success(refund.getRefundId());
} else {
return CompensationResult.retry(refund.getErrorMessage());
}
} catch (Exception e) {
log.error("Compensation failed for order {}", orderId, e);
return CompensationResult.retry(e.getMessage());
}
}
@Override
public boolean isIdempotent() {
return true; // Mehrfache Refund-Versuche sind sicher
}
private boolean isAlreadyCompensated(String orderId) {
return compensationLogRepository.existsByOrderId(orderId);
}
}# compensation_logic.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
class CompensationStatus(Enum):
SUCCESS = "success"
RETRY = "retry"
ALREADY_COMPENSATED = "already_compensated"
NOTHING_TO_COMPENSATE = "nothing_to_compensate"
@dataclass
class CompensationResult:
status: CompensationStatus
message: str = ""
reference_id: str = None
class CompensationLogic(ABC):
@abstractmethod
async def compensate(self, context: CompensationContext) -> CompensationResult:
pass
@abstractmethod
def is_idempotent(self) -> bool:
pass
@abstractmethod
def get_max_retries(self) -> int:
pass
class PaymentCompensation(CompensationLogic):
def __init__(self, refund_service, payment_repository, compensation_log_repository):
self.refund_service = refund_service
self.payment_repository = payment_repository
self.compensation_log_repository = compensation_log_repository
async def compensate(self, context: CompensationContext) -> CompensationResult:
order_id = context.order_id
# Idempotenz prüfen
if await self._is_already_compensated(order_id):
return CompensationResult(
status=CompensationStatus.ALREADY_COMPENSATED,
message="Payment already refunded"
)
payment = await self.payment_repository.find_by_order_id(order_id)
if not payment:
return CompensationResult(
status=CompensationStatus.NOTHING_TO_COMPENSATE,
message="No payment found for order"
)
try:
# Semantische Kompensation
refund_result = await self.refund_service.process_refund(
transaction_id=payment.transaction_id,
amount=payment.amount,
reason="Order cancellation"
)
if refund_result.successful:
await self._mark_as_compensated(order_id, refund_result.refund_id)
return CompensationResult(
status=CompensationStatus.SUCCESS,
reference_id=refund_result.refund_id
)
else:
return CompensationResult(
status=CompensationStatus.RETRY,
message=refund_result.error_message
)
except Exception as e:
return CompensationResult(
status=CompensationStatus.RETRY,
message=str(e)
)
def is_idempotent(self) -> bool:
return True
def get_max_retries(self) -> int:
return 3Szenario zum Verständnis: Stellen Sie sich einen Reisebuchungsprozess vor, der mehrere Tage dauert: 1. Kunde reserviert Flug (sofort) 2. Kunde wählt Hotel (24h später) 3. Manueller Approval-Prozess (kann 2-3 Tage dauern) 4. Finale Buchung und Zahlung
Herausforderung: Wie verwalten wir eine Saga, die Tage oder Wochen läuft?
// Saga-State für langläufige Prozesse
@Entity
@Table(name = "saga_instances")
public class SagaInstance {
@Id
private String sagaId;
@Enumerated(EnumType.STRING)
private SagaStatus status;
@Column(columnDefinition = "json")
private String sagaData; // Serialisierte Saga-Daten
private LocalDateTime createdAt;
private LocalDateTime lastUpdated;
private LocalDateTime expiresAt; // Für Timeouts
@OneToMany(mappedBy = "sagaInstance", cascade = CascadeType.ALL)
private List<SagaStep> steps;
// Methoden für State-Management
public void addCompletedStep(String stepType, String resultData) {
SagaStep step = new SagaStep(this, stepType, StepStatus.COMPLETED, resultData);
steps.add(step);
this.lastUpdated = LocalDateTime.now();
}
public boolean isExpired() {
return expiresAt != null && LocalDateTime.now().isAfter(expiresAt);
}
}
// Langläufige Saga-Verwaltung
@Service
public class TravelBookingSaga {
@Scheduled(fixedDelay = 3600000) // Jede Stunde
public void processExpiredSagas() {
List<SagaInstance> expiredSagas = sagaRepository.findExpiredSagas();
for (SagaInstance saga : expiredSagas) {
try {
handleSagaTimeout(saga);
} catch (Exception e) {
log.error("Error handling expired saga {}", saga.getSagaId(), e);
}
}
}
private void handleSagaTimeout(SagaInstance saga) {
// Timeout-spezifische Kompensation
if (saga.getStatus() == SagaStatus.WAITING_FOR_APPROVAL) {
// Nach 7 Tagen ohne Approval automatisch stornieren
compensateSaga(saga, "TIMEOUT_APPROVAL");
} else if (saga.getStatus() == SagaStatus.WAITING_FOR_PAYMENT) {
// Nach 24 Stunden ohne Zahlung stornieren
compensateSaga(saga, "TIMEOUT_PAYMENT");
}
}
@EventListener
public void handleManualApproval(ManualApprovalEvent event) {
SagaInstance saga = sagaRepository.findById(event.getSagaId());
if (saga.isExpired()) {
// Zu spät - Saga bereits abgelaufen
eventPublisher.publish(new ApprovalRejectedEvent(
event.getSagaId(),
"Approval timeout exceeded"
));
return;
}
if (event.isApproved()) {
saga.addCompletedStep("MANUAL_APPROVAL", event.getApprovalDetails());
saga.setStatus(SagaStatus.APPROVED);
continueBookingProcess(saga);
} else {
compensateSaga(saga, "MANUAL_REJECTION");
}
}
}# long_running_saga.py
import asyncio
from datetime import datetime, timedelta
from enum import Enum
class SagaStatus(Enum):
STARTED = "started"
WAITING_FOR_APPROVAL = "waiting_for_approval"
WAITING_FOR_PAYMENT = "waiting_for_payment"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
class TravelBookingSaga:
def __init__(self, saga_repository, compensation_service, event_publisher):
self.saga_repository = saga_repository
self.compensation_service = compensation_service
self.event_publisher = event_publisher
async def start_booking_saga(self, booking_request: BookingRequest):
saga_id = f"travel-{booking_request.customer_id}-{datetime.now().timestamp()}"
saga_instance = SagaInstance(
saga_id=saga_id,
status=SagaStatus.STARTED,
created_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(days=7), # 7 Tage Timeout
saga_data={
'customer_id': booking_request.customer_id,
'flight_preference': booking_request.flight_preference,
'hotel_preference': booking_request.hotel_preference,
'total_budget': booking_request.total_budget
}
)
await self.saga_repository.save(saga_instance)
# Ersten Schritt ausführen
await self._reserve_flight(saga_instance, booking_request)
async def handle_flight_reserved(self, event: FlightReservedEvent):
saga = await self.saga_repository.find_by_id(event.saga_id)
if saga.is_expired():
await self._handle_saga_timeout(saga)
return
saga.add_completed_step("FLIGHT_RESERVED", {
'reservation_id': event.reservation_id,
'flight_number': event.flight_number,
'price': event.price
})
# Warten auf manuelle Hotel-Auswahl (kann Tage dauern)
saga.status = SagaStatus.WAITING_FOR_APPROVAL
await self.saga_repository.save(saga)
# Kunde benachrichtigen
await self.event_publisher.publish(CustomerNotificationEvent(
customer_id=saga.saga_data['customer_id'],
message="Flight reserved. Please select hotel within 7 days.",
action_required=True,
deadline=saga.expires_at
))
async def handle_hotel_selected(self, event: HotelSelectedEvent):
saga = await self.saga_repository.find_by_id(event.saga_id)
if saga.is_expired():
await self._handle_saga_timeout(saga)
return
# Manuelle Genehmigung erforderlich (für teure Buchungen)
if self._requires_manual_approval(saga, event):
await self._request_manual_approval(saga, event)
else:
await self._proceed_to_payment(saga, event)
# Timeout-Handling für langläufige Sagas
async def process_expired_sagas(self):
"""Scheduled task - läuft regelmäßig"""
expired_sagas = await self.saga_repository.find_expired_sagas()
for saga in expired_sagas:
await self._handle_saga_timeout(saga)
async def _handle_saga_timeout(self, saga: SagaInstance):
"""Kompensation für abgelaufene Sagas"""
if saga.status == SagaStatus.WAITING_FOR_APPROVAL:
# Flight-Reservierung stornieren
await self.compensation_service.compensate_flight_reservation(saga)
elif saga.status == SagaStatus.WAITING_FOR_PAYMENT:
# Flight und Hotel stornieren
await self.compensation_service.compensate_flight_reservation(saga)
await self.compensation_service.compensate_hotel_reservation(saga)
saga.status = SagaStatus.FAILED
saga.failure_reason = "TIMEOUT"
await self.saga_repository.save(saga)// Saga-Monitoring Dashboard
@RestController
@RequestMapping("/admin/sagas")
public class SagaManagementController {
@GetMapping("/active")
public List<SagaInstanceDto> getActiveSagas() {
return sagaRepository.findActiveSagas()
.stream()
.map(this::toDto)
.collect(Collectors.toList());
}
@GetMapping("/stuck")
public List<SagaInstanceDto> getStuckSagas() {
// Sagas die länger als erwartet in einem Zustand sind
return sagaRepository.findStuckSagas(Duration.ofHours(24))
.stream()
.map(this::toDto)
.collect(Collectors.toList());
}
@PostMapping("/{sagaId}/compensate")
public ResponseEntity<Void> forceSagaCompensation(@PathVariable String sagaId) {
SagaInstance saga = sagaRepository.findById(sagaId);
if (saga == null) {
return ResponseEntity.notFound().build();
}
// Manuelle Kompensation auslösen
sagaCompensationService.forceSagaCompensation(saga, "MANUAL_INTERVENTION");
return ResponseEntity.ok().build();
}
}Das Saga Pattern ermöglicht es uns, die Vorteile verteilter Services zu nutzen, ohne auf transaktionale Konsistenz zu verzichten. Die Kunst liegt darin, die richtige Balance zwischen Autonomie und Koordination zu finden und dabei robuste Kompensations-Mechanismen zu implementieren, die auch bei langläufigen Prozessen zuverlässig funktionieren.
Reflexion: Wie würden Sie entscheiden, ob ein Geschäftsprozess als choreographierte oder orchestrierte Saga implementiert werden sollte? Welche Faktoren spielen dabei eine Rolle?