51 Distributed Transactions und Saga Pattern

51.1 Das Problem verteilter Transaktionen verstehen

Bevor wir in die Implementierung von Sagas einsteigen, sollten wir uns eine grundlegende Frage stellen: Warum funktionieren klassische ACID-Transaktionen nicht in verteilten Event-Driven Systemen?

Nun soll der E-Commerce-System eine Bestellung atomisch verarbeiten - entweder komplett erfolgreich oder gar nicht. In einem monolithischen System würden wir eine Datenbank-Transaktion verwenden:

BEGIN TRANSACTION;
  INSERT INTO orders (order_id, customer_id, total) VALUES (...);
  UPDATE inventory SET quantity = quantity - 1 WHERE product_id = ...;
  INSERT INTO payments (order_id, amount, status) VALUES (...);
COMMIT;

Reflexionsfrage: Was passiert, wenn diese Operationen auf drei verschiedene Services verteilt sind (OrderService, InventoryService, PaymentService), die jeweils eigene Datenbanken haben?

Die Antwort führt uns zu einem fundamentalen Problem: Wir brauchen einen Mechanismus für “Transaktionen” über Service-Grenzen hinweg, ohne die Vorteile der Service-Autonomie zu verlieren.

51.2 Saga Pattern Implementation

51.2.1 Grundprinzip der Saga

Eine Saga ist eine Sequenz von lokalen Transaktionen, wobei jede lokale Transaktion ihre eigene Datenbank aktualisiert. Wenn eine Transaktion fehlschlägt, führt die Saga eine Reihe von kompensierenden Transaktionen aus, um die Effekte der vorherigen Transaktionen rückgängig zu machen.

Denken Sie an eine Saga wie an eine Reisebuchung: Sie buchen Flug, Hotel und Mietwagen separat. Wenn der Mietwagen nicht verfügbar ist, müssen Sie Flug und Hotel stornieren.

51.2.2 Choreographed Saga Implementation

// Schritt 1: Order Service startet die Saga
@Service
public class OrderService {
    
    @Transactional
    public void placeOrder(PlaceOrderCommand command) {
        // Lokale Transaktion: Order erstellen
        Order order = new Order(command);
        order.setStatus(OrderStatus.PENDING);
        orderRepository.save(order);
        
        // Saga starten durch Event
        eventPublisher.publish(new OrderPlacedEvent(
            order.getId(),
            command.getCustomerId(),
            command.getItems(),
            command.getTotalAmount()
        ));
    }
    
    // Kompensation bei Saga-Fehlschlag
    @EventListener
    public void handleOrderCancellation(OrderCancellationRequestedEvent event) {
        Order order = orderRepository.findById(event.getOrderId());
        order.setStatus(OrderStatus.CANCELLED);
        orderRepository.save(order);
        
        eventPublisher.publish(new OrderCancelledEvent(event.getOrderId()));
    }
}
// Schritt 2: Payment Service als Teil der Saga
@Service
public class PaymentService {
    
    @EventListener
    @Transactional
    public void handleOrderPlaced(OrderPlacedEvent event) {
        try {
            // Lokale Transaktion: Payment verarbeiten
            Payment payment = processPayment(event.getCustomerId(), event.getTotalAmount());
            
            if (payment.isSuccessful()) {
                eventPublisher.publish(new PaymentProcessedEvent(
                    event.getOrderId(),
                    payment.getTransactionId(),
                    event.getTotalAmount()
                ));
            } else {
                // Saga-Fehlschlag initiieren
                eventPublisher.publish(new PaymentFailedEvent(
                    event.getOrderId(),
                    payment.getErrorCode()
                ));
            }
        } catch (Exception e) {
            eventPublisher.publish(new PaymentFailedEvent(
                event.getOrderId(),
                "TECHNICAL_ERROR"
            ));
        }
    }
    
    // Kompensation: Refund
    @EventListener
    @Transactional
    public void handleOrderCancellation(OrderCancellationRequestedEvent event) {
        Payment payment = paymentRepository.findByOrderId(event.getOrderId());
        if (payment != null && payment.isSuccessful()) {
            Refund refund = processRefund(payment);
            
            eventPublisher.publish(new PaymentRefundedEvent(
                event.getOrderId(),
                refund.getRefundId(),
                payment.getAmount()
            ));
        }
    }
}

51.2.3 Python Saga Implementation

# order_service.py
class OrderService:
    def __init__(self, order_repository, event_publisher):
        self.order_repository = order_repository
        self.event_publisher = event_publisher
    
    async def place_order(self, command: PlaceOrderCommand):
        # Lokale Transaktion
        async with self.order_repository.transaction():
            order = Order(
                id=command.order_id,
                customer_id=command.customer_id,
                items=command.items,
                total_amount=command.total_amount,
                status=OrderStatus.PENDING
            )
            await self.order_repository.save(order)
            
            # Saga starten
            await self.event_publisher.publish(OrderPlacedEvent(
                order_id=order.id,
                customer_id=order.customer_id,
                items=order.items,
                total_amount=order.total_amount
            ))
    
    async def handle_order_cancellation(self, event: OrderCancellationRequestedEvent):
        async with self.order_repository.transaction():
            order = await self.order_repository.find_by_id(event.order_id)
            order.status = OrderStatus.CANCELLED
            await self.order_repository.save(order)
            
            await self.event_publisher.publish(OrderCancelledEvent(
                order_id=event.order_id
            ))

51.2.4 Orchestrated Saga Implementation

Für komplexere Saga-Logik kann ein Saga-Orchestrator die Koordination übernehmen:

@Component
public class OrderSagaOrchestrator {
    
    private final Map<String, SagaInstance> activeInstances = new ConcurrentHashMap<>();
    
    @EventListener
    public void handleOrderPlaced(OrderPlacedEvent event) {
        SagaInstance saga = new SagaInstance(event.getOrderId());
        saga.addStep(new ProcessPaymentStep());
        saga.addStep(new ReserveInventoryStep());
        saga.addStep(new InitiateShippingStep());
        
        activeInstances.put(event.getOrderId(), saga);
        
        // Ersten Schritt ausführen
        executeNextStep(saga, event);
    }
    
    @EventListener
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        SagaInstance saga = activeInstances.get(event.getOrderId());
        if (saga != null) {
            saga.markStepCompleted(ProcessPaymentStep.class);
            executeNextStep(saga, event);
        }
    }
    
    @EventListener
    public void handlePaymentFailed(PaymentFailedEvent event) {
        SagaInstance saga = activeInstances.get(event.getOrderId());
        if (saga != null) {
            // Kompensation starten
            compensateSaga(saga, event);
        }
    }
    
    private void compensateSaga(SagaInstance saga, DomainEvent triggerEvent) {
        List<SagaStep> completedSteps = saga.getCompletedSteps();
        
        // Rückwärts durch die Schritte gehen
        for (int i = completedSteps.size() - 1; i >= 0; i--) {
            SagaStep step = completedSteps.get(i);
            step.compensate(triggerEvent);
        }
        
        activeInstances.remove(saga.getId());
    }
}

51.3 Compensation Logic

51.3.1 Prinzipien der Kompensation

Frage zum Nachdenken: Wenn Sie eine Hotelreservierung stornieren müssen, reicht es aus, den Datensatz zu löschen? Was ist mit Stornierungsgebühren, Benachrichtigungen oder anderen Seiteneffekten?

Kompensation in Sagas geht über einfaches “Rückgängigmachen” hinaus. Kompensations-Transaktionen müssen:

  1. Semantisch korrekt sein (nicht nur technisch)
  2. Idempotent sein (mehrfache Ausführung schadet nicht)
  3. Immer erfolgreich sein (oder wenigstens eventual consistent)
Operation Naive Kompensation Semantische Kompensation
CreateOrder DELETE order SET status = ‘CANCELLED’, Notify customer
ChargePayment DELETE payment REFUND payment, Update billing
ReserveInventory inventory += reserved RELEASE reservation, Update availability
SendEmail ❌ Unmöglich Send cancellation email

51.3.2 Implementierung robuster Kompensation

// Kompensations-Interface
public interface CompensationLogic {
    CompensationResult compensate(CompensationContext context);
    boolean isIdempotent();
    int getMaxRetries();
}

// Konkrete Kompensations-Implementierung
@Component
public class PaymentCompensation implements CompensationLogic {
    
    private final RefundService refundService;
    private final PaymentRepository paymentRepository;
    
    @Override
    public CompensationResult compensate(CompensationContext context) {
        String orderId = context.getOrderId();
        
        try {
            // Idempotenz prüfen
            if (isAlreadyCompensated(orderId)) {
                return CompensationResult.alreadyCompensated();
            }
            
            Payment payment = paymentRepository.findByOrderId(orderId);
            if (payment == null) {
                return CompensationResult.nothingToCompensate();
            }
            
            // Semantische Kompensation
            RefundResult refund = refundService.processRefund(
                payment.getTransactionId(),
                payment.getAmount(),
                "Order cancellation"
            );
            
            if (refund.isSuccessful()) {
                markAsCompensated(orderId, refund.getRefundId());
                
                // Benachrichtigung als Teil der Kompensation
                notificationService.notifyCustomer(
                    payment.getCustomerId(),
                    "Refund processed for order " + orderId
                );
                
                return CompensationResult.success(refund.getRefundId());
            } else {
                return CompensationResult.retry(refund.getErrorMessage());
            }
            
        } catch (Exception e) {
            log.error("Compensation failed for order {}", orderId, e);
            return CompensationResult.retry(e.getMessage());
        }
    }
    
    @Override
    public boolean isIdempotent() {
        return true; // Mehrfache Refund-Versuche sind sicher
    }
    
    private boolean isAlreadyCompensated(String orderId) {
        return compensationLogRepository.existsByOrderId(orderId);
    }
}

51.3.3 Python Compensation Implementation

# compensation_logic.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum

class CompensationStatus(Enum):
    SUCCESS = "success"
    RETRY = "retry"
    ALREADY_COMPENSATED = "already_compensated"
    NOTHING_TO_COMPENSATE = "nothing_to_compensate"

@dataclass
class CompensationResult:
    status: CompensationStatus
    message: str = ""
    reference_id: str = None

class CompensationLogic(ABC):
    @abstractmethod
    async def compensate(self, context: CompensationContext) -> CompensationResult:
        pass
    
    @abstractmethod
    def is_idempotent(self) -> bool:
        pass
    
    @abstractmethod
    def get_max_retries(self) -> int:
        pass

class PaymentCompensation(CompensationLogic):
    def __init__(self, refund_service, payment_repository, compensation_log_repository):
        self.refund_service = refund_service
        self.payment_repository = payment_repository
        self.compensation_log_repository = compensation_log_repository
    
    async def compensate(self, context: CompensationContext) -> CompensationResult:
        order_id = context.order_id
        
        # Idempotenz prüfen
        if await self._is_already_compensated(order_id):
            return CompensationResult(
                status=CompensationStatus.ALREADY_COMPENSATED,
                message="Payment already refunded"
            )
        
        payment = await self.payment_repository.find_by_order_id(order_id)
        if not payment:
            return CompensationResult(
                status=CompensationStatus.NOTHING_TO_COMPENSATE,
                message="No payment found for order"
            )
        
        try:
            # Semantische Kompensation
            refund_result = await self.refund_service.process_refund(
                transaction_id=payment.transaction_id,
                amount=payment.amount,
                reason="Order cancellation"
            )
            
            if refund_result.successful:
                await self._mark_as_compensated(order_id, refund_result.refund_id)
                
                return CompensationResult(
                    status=CompensationStatus.SUCCESS,
                    reference_id=refund_result.refund_id
                )
            else:
                return CompensationResult(
                    status=CompensationStatus.RETRY,
                    message=refund_result.error_message
                )
                
        except Exception as e:
            return CompensationResult(
                status=CompensationStatus.RETRY,
                message=str(e)
            )
    
    def is_idempotent(self) -> bool:
        return True
    
    def get_max_retries(self) -> int:
        return 3

51.4 Long-running Transactions

51.4.1 Charakteristika langläufiger Saga-Transaktionen

Szenario zum Verständnis: Stellen Sie sich einen Reisebuchungsprozess vor, der mehrere Tage dauert: 1. Kunde reserviert Flug (sofort) 2. Kunde wählt Hotel (24h später) 3. Manueller Approval-Prozess (kann 2-3 Tage dauern) 4. Finale Buchung und Zahlung

Herausforderung: Wie verwalten wir eine Saga, die Tage oder Wochen läuft?

51.4.2 Persistente Saga-States

// Saga-State für langläufige Prozesse
@Entity
@Table(name = "saga_instances")
public class SagaInstance {
    @Id
    private String sagaId;
    
    @Enumerated(EnumType.STRING)
    private SagaStatus status;
    
    @Column(columnDefinition = "json")
    private String sagaData; // Serialisierte Saga-Daten
    
    private LocalDateTime createdAt;
    private LocalDateTime lastUpdated;
    private LocalDateTime expiresAt; // Für Timeouts
    
    @OneToMany(mappedBy = "sagaInstance", cascade = CascadeType.ALL)
    private List<SagaStep> steps;
    
    // Methoden für State-Management
    public void addCompletedStep(String stepType, String resultData) {
        SagaStep step = new SagaStep(this, stepType, StepStatus.COMPLETED, resultData);
        steps.add(step);
        this.lastUpdated = LocalDateTime.now();
    }
    
    public boolean isExpired() {
        return expiresAt != null && LocalDateTime.now().isAfter(expiresAt);
    }
}

// Langläufige Saga-Verwaltung
@Service
public class TravelBookingSaga {
    
    @Scheduled(fixedDelay = 3600000) // Jede Stunde
    public void processExpiredSagas() {
        List<SagaInstance> expiredSagas = sagaRepository.findExpiredSagas();
        
        for (SagaInstance saga : expiredSagas) {
            try {
                handleSagaTimeout(saga);
            } catch (Exception e) {
                log.error("Error handling expired saga {}", saga.getSagaId(), e);
            }
        }
    }
    
    private void handleSagaTimeout(SagaInstance saga) {
        // Timeout-spezifische Kompensation
        if (saga.getStatus() == SagaStatus.WAITING_FOR_APPROVAL) {
            // Nach 7 Tagen ohne Approval automatisch stornieren
            compensateSaga(saga, "TIMEOUT_APPROVAL");
        } else if (saga.getStatus() == SagaStatus.WAITING_FOR_PAYMENT) {
            // Nach 24 Stunden ohne Zahlung stornieren
            compensateSaga(saga, "TIMEOUT_PAYMENT");
        }
    }
    
    @EventListener
    public void handleManualApproval(ManualApprovalEvent event) {
        SagaInstance saga = sagaRepository.findById(event.getSagaId());
        
        if (saga.isExpired()) {
            // Zu spät - Saga bereits abgelaufen
            eventPublisher.publish(new ApprovalRejectedEvent(
                event.getSagaId(), 
                "Approval timeout exceeded"
            ));
            return;
        }
        
        if (event.isApproved()) {
            saga.addCompletedStep("MANUAL_APPROVAL", event.getApprovalDetails());
            saga.setStatus(SagaStatus.APPROVED);
            continueBookingProcess(saga);
        } else {
            compensateSaga(saga, "MANUAL_REJECTION");
        }
    }
}

51.4.3 Python Long-running Saga Implementation

# long_running_saga.py
import asyncio
from datetime import datetime, timedelta
from enum import Enum

class SagaStatus(Enum):
    STARTED = "started"
    WAITING_FOR_APPROVAL = "waiting_for_approval"
    WAITING_FOR_PAYMENT = "waiting_for_payment"
    COMPLETED = "completed"
    COMPENSATING = "compensating"
    FAILED = "failed"

class TravelBookingSaga:
    def __init__(self, saga_repository, compensation_service, event_publisher):
        self.saga_repository = saga_repository
        self.compensation_service = compensation_service
        self.event_publisher = event_publisher
    
    async def start_booking_saga(self, booking_request: BookingRequest):
        saga_id = f"travel-{booking_request.customer_id}-{datetime.now().timestamp()}"
        
        saga_instance = SagaInstance(
            saga_id=saga_id,
            status=SagaStatus.STARTED,
            created_at=datetime.utcnow(),
            expires_at=datetime.utcnow() + timedelta(days=7),  # 7 Tage Timeout
            saga_data={
                'customer_id': booking_request.customer_id,
                'flight_preference': booking_request.flight_preference,
                'hotel_preference': booking_request.hotel_preference,
                'total_budget': booking_request.total_budget
            }
        )
        
        await self.saga_repository.save(saga_instance)
        
        # Ersten Schritt ausführen
        await self._reserve_flight(saga_instance, booking_request)
    
    async def handle_flight_reserved(self, event: FlightReservedEvent):
        saga = await self.saga_repository.find_by_id(event.saga_id)
        
        if saga.is_expired():
            await self._handle_saga_timeout(saga)
            return
        
        saga.add_completed_step("FLIGHT_RESERVED", {
            'reservation_id': event.reservation_id,
            'flight_number': event.flight_number,
            'price': event.price
        })
        
        # Warten auf manuelle Hotel-Auswahl (kann Tage dauern)
        saga.status = SagaStatus.WAITING_FOR_APPROVAL
        await self.saga_repository.save(saga)
        
        # Kunde benachrichtigen
        await self.event_publisher.publish(CustomerNotificationEvent(
            customer_id=saga.saga_data['customer_id'],
            message="Flight reserved. Please select hotel within 7 days.",
            action_required=True,
            deadline=saga.expires_at
        ))
    
    async def handle_hotel_selected(self, event: HotelSelectedEvent):
        saga = await self.saga_repository.find_by_id(event.saga_id)
        
        if saga.is_expired():
            await self._handle_saga_timeout(saga)
            return
        
        # Manuelle Genehmigung erforderlich (für teure Buchungen)
        if self._requires_manual_approval(saga, event):
            await self._request_manual_approval(saga, event)
        else:
            await self._proceed_to_payment(saga, event)
    
    # Timeout-Handling für langläufige Sagas
    async def process_expired_sagas(self):
        """Scheduled task - läuft regelmäßig"""
        expired_sagas = await self.saga_repository.find_expired_sagas()
        
        for saga in expired_sagas:
            await self._handle_saga_timeout(saga)
    
    async def _handle_saga_timeout(self, saga: SagaInstance):
        """Kompensation für abgelaufene Sagas"""
        if saga.status == SagaStatus.WAITING_FOR_APPROVAL:
            # Flight-Reservierung stornieren
            await self.compensation_service.compensate_flight_reservation(saga)
        elif saga.status == SagaStatus.WAITING_FOR_PAYMENT:
            # Flight und Hotel stornieren
            await self.compensation_service.compensate_flight_reservation(saga)
            await self.compensation_service.compensate_hotel_reservation(saga)
        
        saga.status = SagaStatus.FAILED
        saga.failure_reason = "TIMEOUT"
        await self.saga_repository.save(saga)

51.4.4 Saga Monitoring und Management

// Saga-Monitoring Dashboard
@RestController
@RequestMapping("/admin/sagas")
public class SagaManagementController {
    
    @GetMapping("/active")
    public List<SagaInstanceDto> getActiveSagas() {
        return sagaRepository.findActiveSagas()
            .stream()
            .map(this::toDto)
            .collect(Collectors.toList());
    }
    
    @GetMapping("/stuck")
    public List<SagaInstanceDto> getStuckSagas() {
        // Sagas die länger als erwartet in einem Zustand sind
        return sagaRepository.findStuckSagas(Duration.ofHours(24))
            .stream()
            .map(this::toDto)
            .collect(Collectors.toList());
    }
    
    @PostMapping("/{sagaId}/compensate")
    public ResponseEntity<Void> forceSagaCompensation(@PathVariable String sagaId) {
        SagaInstance saga = sagaRepository.findById(sagaId);
        
        if (saga == null) {
            return ResponseEntity.notFound().build();
        }
        
        // Manuelle Kompensation auslösen
        sagaCompensationService.forceSagaCompensation(saga, "MANUAL_INTERVENTION");
        
        return ResponseEntity.ok().build();
    }
}

Das Saga Pattern ermöglicht es uns, die Vorteile verteilter Services zu nutzen, ohne auf transaktionale Konsistenz zu verzichten. Die Kunst liegt darin, die richtige Balance zwischen Autonomie und Koordination zu finden und dabei robuste Kompensations-Mechanismen zu implementieren, die auch bei langläufigen Prozessen zuverlässig funktionieren.

Reflexion: Wie würden Sie entscheiden, ob ein Geschäftsprozess als choreographierte oder orchestrierte Saga implementiert werden sollte? Welche Faktoren spielen dabei eine Rolle?