46 Projections, Snapshots und Aggregation

46.1 Read Model Generation

Wenn Sie für eine E-Commerce-Plattform folgende Frage beantworten müssen: “Zeige alle Bestellungen eines Kunden der letzten 30 Tage, sortiert nach Bestellwert.” Wie würden Sie das in einem Event Sourcing System lösen, wo alle Daten als Events vorliegen?

46.1.1 Das Problem: Events vs. Abfragbare Daten

Events erzählen uns was passiert ist, aber nicht was gerade ist. Betrachten Sie diese Event-Sequenz:

[
  {"eventType": "OrderPlaced", "orderId": "123", "customerId": "456", "amount": 99.99},
  {"eventType": "PaymentReceived", "orderId": "123", "amount": 99.99},
  {"eventType": "OrderShipped", "orderId": "123", "trackingNumber": "DHL123"},
  {"eventType": "OrderPlaced", "orderId": "124", "customerId": "456", "amount": 149.99}
]

Welche Herausforderungen sehen Sie bei einer direkten Abfrage dieser Event-Struktur? Die Events sind chronologisch organisiert, aber Geschäftsabfragen benötigen andere Sichtweisen.

46.1.2 Projections: Maßgeschneiderte Sichten

Eine Projection transformiert Events in eine abfragbare Struktur. Denken Sie daran wie an eine spezielle Brille, die Events in der für Ihre Frage optimalen Form zeigt.

// Spring Boot: Customer Order Projection
@Component
public class CustomerOrderProjection {
    
    private final CustomerOrderRepository repository;
    
    @EventListener
    public void handle(OrderPlacedEvent event) {
        CustomerOrder customerOrder = CustomerOrder.builder()
            .orderId(event.getOrderId())
            .customerId(event.getCustomerId())
            .amount(event.getAmount())
            .status("placed")
            .placedAt(event.getTimestamp())
            .build();
            
        repository.save(customerOrder);
    }
    
    @EventListener  
    public void handle(PaymentReceivedEvent event) {
        repository.updateStatus(event.getOrderId(), "paid");
    }
    
    @EventListener
    public void handle(OrderShippedEvent event) {
        repository.updateStatusAndShipping(
            event.getOrderId(), 
            "shipped", 
            event.getTrackingNumber()
        );
    }
}
# Python: Asynchrone Projection
class CustomerOrderProjection:
    def __init__(self, repository):
        self.repository = repository
    
    async def handle_order_placed(self, event):
        customer_order = {
            'order_id': event.order_id,
            'customer_id': event.customer_id,
            'amount': event.amount,
            'status': 'placed',
            'placed_at': event.timestamp
        }
        await self.repository.save(customer_order)
    
    async def handle_payment_received(self, event):
        await self.repository.update_status(event.order_id, 'paid')
    
    async def handle_order_shipped(self, event):
        await self.repository.update_status_and_shipping(
            event.order_id, 'shipped', event.tracking_number
        )

Überlegen Sie: Welche verschiedenen Projections könnten für das gleiche Set von Events sinnvoll sein? Unterschiedliche Geschäftsfragen erfordern unterschiedliche Sichtweisen.

46.1.3 Multiple Read Models für verschiedene Use Cases

Use Case Projection-Fokus Optimiert für
Kundenportal Bestellhistorie pro Kunde Abfragen nach customer_id
Lagerverwaltung Produktmengen und Reservierungen Abfragen nach product_id
Buchhaltung Umsätze und Zahlungen Zeitraum-basierte Aggregationen
Analytics Verkaufstrends Zeitreihen und Gruppierungen
// Verschiedene Projections für verschiedene Zwecke
@Component
public class InventoryProjection {
    
    @EventListener
    public void handle(OrderPlacedEvent event) {
        // Produktmengen reservieren
        for (OrderItem item : event.getItems()) {
            inventoryService.reserveStock(item.getProductId(), item.getQuantity());
        }
    }
    
    @EventListener
    public void handle(OrderShippedEvent event) {
        // Reservierung in tatsächlichen Abgang umwandeln
        for (OrderItem item : event.getItems()) {
            inventoryService.confirmStockReduction(item.getProductId(), item.getQuantity());
        }
    }
}

46.2 Snapshot Strategies

Angenommen eine Bestellung hat über die Jahre 10.000 Events angesammelt. Wie lange würde es dauern, den aktuellen Zustand durch Replay aller Events zu berechnen? Hier kommen Snapshots ins Spiel.

46.2.1 Was ist ein Snapshot?

Ein Snapshot ist ein eingefrorener Zustand zu einem bestimmten Zeitpunkt. Statt alle Events zu replaying, starten wir vom Snapshot und replaying nur die neueren Events.

// Spring Boot: Snapshot-Mechanismus
@Entity
public class OrderSnapshot {
    private UUID orderId;
    private String snapshotData; // JSON des Order-Zustands
    private Long eventVersion;   // Letzte Event-Nummer im Snapshot
    private LocalDateTime createdAt;
}

@Service
public class OrderSnapshotService {
    
    // Snapshot-Erstellung
    public void createSnapshot(UUID orderId) {
        List<OrderEvent> allEvents = eventStore.getEventsForOrder(orderId);
        OrderView currentState = replayEvents(allEvents);
        
        OrderSnapshot snapshot = new OrderSnapshot(
            orderId,
            objectMapper.writeValueAsString(currentState),
            allEvents.size(),
            LocalDateTime.now()
        );
        
        snapshotRepository.save(snapshot);
    }
    
    // Schnelle Zustandsrekonstruktion
    public OrderView buildStateWithSnapshot(UUID orderId) {
        OrderSnapshot snapshot = snapshotRepository.findLatestByOrderId(orderId);
        
        if (snapshot == null) {
            // Kein Snapshot vorhanden, komplettes Replay
            return buildStateFromEvents(orderId);
        }
        
        // Vom Snapshot starten
        OrderView state = objectMapper.readValue(snapshot.getSnapshotData(), OrderView.class);
        
        // Nur Events nach dem Snapshot replaying
        List<OrderEvent> eventsAfterSnapshot = eventStore.getEventsForOrderAfterVersion(
            orderId, snapshot.getEventVersion()
        );
        
        return replayEventsFromState(state, eventsAfterSnapshot);
    }
}

46.2.2 Snapshot-Strategien: Wann und wie oft?

Wie würden Sie entscheiden, wann ein neuer Snapshot erstellt werden soll? Verschiedene Strategien haben unterschiedliche Trade-offs:

Strategie Vorteile Nachteile Anwendungsfall
Feste Intervalle (z.B. alle 100 Events) Vorhersagbar, einfach Möglicherweise unnötige Snapshots Gleichmäßige Event-Verteilung
Zeit-basiert (z.B. täglich) Regelmäßige Performance Events können sich ungleich verteilen Batch-orientierte Systeme
Performance-getriggert Optimale Performance Komplexere Logik High-Performance Anforderungen
On-Demand Ressourcen-schonend Unvorhersagbare Latenz Selten abgefragte Aggregate
# Python: Flexible Snapshot-Strategie
class SnapshotStrategy:
    def __init__(self, max_events=100, max_age_hours=24):
        self.max_events = max_events
        self.max_age_hours = max_age_hours
    
    def should_create_snapshot(self, order_id):
        # Strategie 1: Zu viele Events seit letztem Snapshot
        events_since_snapshot = self.count_events_since_last_snapshot(order_id)
        if events_since_snapshot >= self.max_events:
            return True
        
        # Strategie 2: Letzter Snapshot zu alt
        last_snapshot = self.get_last_snapshot(order_id)
        if last_snapshot and self.is_older_than_hours(last_snapshot, self.max_age_hours):
            return True
        
        return False
    
    async def create_snapshot_if_needed(self, order_id):
        if self.should_create_snapshot(order_id):
            await self.snapshot_service.create_snapshot(order_id)

Denken Sie über Ihr eigenes System nach: Welche Snapshot-Strategie würde am besten zu Ihren Zugriffsmustern passen?

46.3 Eventually Consistent Views

46.3.1 Das Timing-Problem

Hier eine wichtige Frage: Was passiert, wenn ein Event verarbeitet wird, aber die Projection noch nicht aktualisiert ist? Ein Kunde bestellt etwas, aber das Read Model zeigt die Bestellung noch nicht an.

// Das klassische Race Condition Problem
@RestController
public class OrderController {
    
    @PostMapping("/orders")
    public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
        // Event wird publiziert
        OrderPlacedEvent event = orderService.placeOrder(request);
        
        // PROBLEM: Read Model möglicherweise noch nicht aktualisiert!
        OrderView orderView = orderQueryService.getOrder(event.getOrderId());
        // orderView könnte null sein!
        
        return ResponseEntity.ok(new OrderResponse(orderView));
    }
}

46.3.2 Eventual Consistency verstehen

Eventually Consistent bedeutet: Das System wird konsistent, aber nicht sofort. Die Projection wird irgendwann aktualisiert.

// Lösungsansatz 1: Optimistic Read
@Service
public class OrderQueryService {
    
    public OrderView getOrderWithFallback(UUID orderId) {
        // Zuerst Read Model versuchen (schnell)
        OrderView fromReadModel = readModelRepository.findByOrderId(orderId);
        
        if (fromReadModel != null) {
            return fromReadModel;
        }
        
        // Fallback: Aus Events rekonstruieren (langsamer, aber immer aktuell)
        return eventSourcingService.buildCurrentState(orderId);
    }
}
# Python: Retry-Strategie für Eventually Consistent Reads
class OrderQueryService:
    async def get_order_with_retry(self, order_id, max_retries=3, delay_ms=100):
        for attempt in range(max_retries):
            # Read Model abfragen
            order = await self.read_model.get_order(order_id)
            if order:
                return order
            
            # Kurz warten, dann erneut versuchen
            await asyncio.sleep(delay_ms / 1000)
            delay_ms *= 2  # Exponential backoff
        
        # Letzter Ausweg: Event Sourcing
        return await self.event_sourcing.build_current_state(order_id)

46.3.3 Strategien für Eventually Consistent Systems

Wie würden Sie Ihren Kunden erklären, dass ihre gerade aufgegebene Bestellung “noch nicht sichtbar” ist? Verschiedene UX-Patterns helfen dabei:

Pattern Beschreibung User Experience
Optimistic UI Zeige erwarteten Zustand sofort “Bestellung wird verarbeitet…”
Polling Regelmäßig nach Updates suchen “Aktualisieren…” Button
Push Updates WebSocket/SSE für Echtzeit-Updates Live-Updates in der UI
Eventual Message Transparente Kommunikation “Bestellung erscheint in wenigen Sekunden”
// Optimistic UI Pattern
@RestController
public class OrderController {
    
    @PostMapping("/orders")
    public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
        OrderPlacedEvent event = orderService.placeOrder(request);
        
        // Optimistic Response: Erwarteten Zustand zurückgeben
        OrderView optimisticView = OrderView.builder()
            .orderId(event.getOrderId())
            .customerId(event.getCustomerId())
            .status("processing") // Erwarteter Status
            .amount(event.getAmount())
            .build();
        
        return ResponseEntity.ok(new OrderResponse(optimisticView));
    }
}

46.3.4 Monitoring und Debugging

Wie würden Sie herausfinden, ob Ihre Projections “hinterherhinken”? Monitoring ist entscheidend für Eventually Consistent Systems:

// Projection Lag Monitoring
@Component
public class ProjectionMonitoring {
    
    @Scheduled(fixedRate = 30000) // Alle 30 Sekunden
    public void checkProjectionLag() {
        long latestEventTimestamp = eventStore.getLatestEventTimestamp();
        long latestProjectionTimestamp = readModelStore.getLatestUpdateTimestamp();
        
        long lagInMillis = latestEventTimestamp - latestProjectionTimestamp;
        
        if (lagInMillis > 5000) { // Mehr als 5 Sekunden Verzögerung
            alertService.alert("Projection lag detected: " + lagInMillis + "ms");
        }
        
        metricsService.recordProjectionLag(lagInMillis);
    }
}

Reflektieren Sie über Ihr eigenes System: Welche Bereiche können Eventually Consistent sein, und welche benötigen Strong Consistency? Die Antwort hängt von den Geschäftsanforderungen ab.

Im nächsten Kapitel schauen wir uns an, wie CQRS diese Konzepte formalisiert und eine klare Trennung zwischen Schreib- und Lesemodellen schafft.