Wenn Sie für eine E-Commerce-Plattform folgende Frage beantworten müssen: “Zeige alle Bestellungen eines Kunden der letzten 30 Tage, sortiert nach Bestellwert.” Wie würden Sie das in einem Event Sourcing System lösen, wo alle Daten als Events vorliegen?
Events erzählen uns was passiert ist, aber nicht was gerade ist. Betrachten Sie diese Event-Sequenz:
[
{"eventType": "OrderPlaced", "orderId": "123", "customerId": "456", "amount": 99.99},
{"eventType": "PaymentReceived", "orderId": "123", "amount": 99.99},
{"eventType": "OrderShipped", "orderId": "123", "trackingNumber": "DHL123"},
{"eventType": "OrderPlaced", "orderId": "124", "customerId": "456", "amount": 149.99}
]Welche Herausforderungen sehen Sie bei einer direkten Abfrage dieser Event-Struktur? Die Events sind chronologisch organisiert, aber Geschäftsabfragen benötigen andere Sichtweisen.
Eine Projection transformiert Events in eine abfragbare Struktur. Denken Sie daran wie an eine spezielle Brille, die Events in der für Ihre Frage optimalen Form zeigt.
// Spring Boot: Customer Order Projection
@Component
public class CustomerOrderProjection {
private final CustomerOrderRepository repository;
@EventListener
public void handle(OrderPlacedEvent event) {
CustomerOrder customerOrder = CustomerOrder.builder()
.orderId(event.getOrderId())
.customerId(event.getCustomerId())
.amount(event.getAmount())
.status("placed")
.placedAt(event.getTimestamp())
.build();
repository.save(customerOrder);
}
@EventListener
public void handle(PaymentReceivedEvent event) {
repository.updateStatus(event.getOrderId(), "paid");
}
@EventListener
public void handle(OrderShippedEvent event) {
repository.updateStatusAndShipping(
event.getOrderId(),
"shipped",
event.getTrackingNumber()
);
}
}# Python: Asynchrone Projection
class CustomerOrderProjection:
def __init__(self, repository):
self.repository = repository
async def handle_order_placed(self, event):
customer_order = {
'order_id': event.order_id,
'customer_id': event.customer_id,
'amount': event.amount,
'status': 'placed',
'placed_at': event.timestamp
}
await self.repository.save(customer_order)
async def handle_payment_received(self, event):
await self.repository.update_status(event.order_id, 'paid')
async def handle_order_shipped(self, event):
await self.repository.update_status_and_shipping(
event.order_id, 'shipped', event.tracking_number
)Überlegen Sie: Welche verschiedenen Projections könnten für das gleiche Set von Events sinnvoll sein? Unterschiedliche Geschäftsfragen erfordern unterschiedliche Sichtweisen.
| Use Case | Projection-Fokus | Optimiert für |
|---|---|---|
| Kundenportal | Bestellhistorie pro Kunde | Abfragen nach customer_id |
| Lagerverwaltung | Produktmengen und Reservierungen | Abfragen nach product_id |
| Buchhaltung | Umsätze und Zahlungen | Zeitraum-basierte Aggregationen |
| Analytics | Verkaufstrends | Zeitreihen und Gruppierungen |
// Verschiedene Projections für verschiedene Zwecke
@Component
public class InventoryProjection {
@EventListener
public void handle(OrderPlacedEvent event) {
// Produktmengen reservieren
for (OrderItem item : event.getItems()) {
inventoryService.reserveStock(item.getProductId(), item.getQuantity());
}
}
@EventListener
public void handle(OrderShippedEvent event) {
// Reservierung in tatsächlichen Abgang umwandeln
for (OrderItem item : event.getItems()) {
inventoryService.confirmStockReduction(item.getProductId(), item.getQuantity());
}
}
}Angenommen eine Bestellung hat über die Jahre 10.000 Events angesammelt. Wie lange würde es dauern, den aktuellen Zustand durch Replay aller Events zu berechnen? Hier kommen Snapshots ins Spiel.
Ein Snapshot ist ein eingefrorener Zustand zu einem bestimmten Zeitpunkt. Statt alle Events zu replaying, starten wir vom Snapshot und replaying nur die neueren Events.
// Spring Boot: Snapshot-Mechanismus
@Entity
public class OrderSnapshot {
private UUID orderId;
private String snapshotData; // JSON des Order-Zustands
private Long eventVersion; // Letzte Event-Nummer im Snapshot
private LocalDateTime createdAt;
}
@Service
public class OrderSnapshotService {
// Snapshot-Erstellung
public void createSnapshot(UUID orderId) {
List<OrderEvent> allEvents = eventStore.getEventsForOrder(orderId);
OrderView currentState = replayEvents(allEvents);
OrderSnapshot snapshot = new OrderSnapshot(
orderId,
objectMapper.writeValueAsString(currentState),
allEvents.size(),
LocalDateTime.now()
);
snapshotRepository.save(snapshot);
}
// Schnelle Zustandsrekonstruktion
public OrderView buildStateWithSnapshot(UUID orderId) {
OrderSnapshot snapshot = snapshotRepository.findLatestByOrderId(orderId);
if (snapshot == null) {
// Kein Snapshot vorhanden, komplettes Replay
return buildStateFromEvents(orderId);
}
// Vom Snapshot starten
OrderView state = objectMapper.readValue(snapshot.getSnapshotData(), OrderView.class);
// Nur Events nach dem Snapshot replaying
List<OrderEvent> eventsAfterSnapshot = eventStore.getEventsForOrderAfterVersion(
orderId, snapshot.getEventVersion()
);
return replayEventsFromState(state, eventsAfterSnapshot);
}
}Wie würden Sie entscheiden, wann ein neuer Snapshot erstellt werden soll? Verschiedene Strategien haben unterschiedliche Trade-offs:
| Strategie | Vorteile | Nachteile | Anwendungsfall |
|---|---|---|---|
| Feste Intervalle (z.B. alle 100 Events) | Vorhersagbar, einfach | Möglicherweise unnötige Snapshots | Gleichmäßige Event-Verteilung |
| Zeit-basiert (z.B. täglich) | Regelmäßige Performance | Events können sich ungleich verteilen | Batch-orientierte Systeme |
| Performance-getriggert | Optimale Performance | Komplexere Logik | High-Performance Anforderungen |
| On-Demand | Ressourcen-schonend | Unvorhersagbare Latenz | Selten abgefragte Aggregate |
# Python: Flexible Snapshot-Strategie
class SnapshotStrategy:
def __init__(self, max_events=100, max_age_hours=24):
self.max_events = max_events
self.max_age_hours = max_age_hours
def should_create_snapshot(self, order_id):
# Strategie 1: Zu viele Events seit letztem Snapshot
events_since_snapshot = self.count_events_since_last_snapshot(order_id)
if events_since_snapshot >= self.max_events:
return True
# Strategie 2: Letzter Snapshot zu alt
last_snapshot = self.get_last_snapshot(order_id)
if last_snapshot and self.is_older_than_hours(last_snapshot, self.max_age_hours):
return True
return False
async def create_snapshot_if_needed(self, order_id):
if self.should_create_snapshot(order_id):
await self.snapshot_service.create_snapshot(order_id)Denken Sie über Ihr eigenes System nach: Welche Snapshot-Strategie würde am besten zu Ihren Zugriffsmustern passen?
Hier eine wichtige Frage: Was passiert, wenn ein Event verarbeitet wird, aber die Projection noch nicht aktualisiert ist? Ein Kunde bestellt etwas, aber das Read Model zeigt die Bestellung noch nicht an.
// Das klassische Race Condition Problem
@RestController
public class OrderController {
@PostMapping("/orders")
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
// Event wird publiziert
OrderPlacedEvent event = orderService.placeOrder(request);
// PROBLEM: Read Model möglicherweise noch nicht aktualisiert!
OrderView orderView = orderQueryService.getOrder(event.getOrderId());
// orderView könnte null sein!
return ResponseEntity.ok(new OrderResponse(orderView));
}
}Eventually Consistent bedeutet: Das System wird konsistent, aber nicht sofort. Die Projection wird irgendwann aktualisiert.
// Lösungsansatz 1: Optimistic Read
@Service
public class OrderQueryService {
public OrderView getOrderWithFallback(UUID orderId) {
// Zuerst Read Model versuchen (schnell)
OrderView fromReadModel = readModelRepository.findByOrderId(orderId);
if (fromReadModel != null) {
return fromReadModel;
}
// Fallback: Aus Events rekonstruieren (langsamer, aber immer aktuell)
return eventSourcingService.buildCurrentState(orderId);
}
}# Python: Retry-Strategie für Eventually Consistent Reads
class OrderQueryService:
async def get_order_with_retry(self, order_id, max_retries=3, delay_ms=100):
for attempt in range(max_retries):
# Read Model abfragen
order = await self.read_model.get_order(order_id)
if order:
return order
# Kurz warten, dann erneut versuchen
await asyncio.sleep(delay_ms / 1000)
delay_ms *= 2 # Exponential backoff
# Letzter Ausweg: Event Sourcing
return await self.event_sourcing.build_current_state(order_id)Wie würden Sie Ihren Kunden erklären, dass ihre gerade aufgegebene Bestellung “noch nicht sichtbar” ist? Verschiedene UX-Patterns helfen dabei:
| Pattern | Beschreibung | User Experience |
|---|---|---|
| Optimistic UI | Zeige erwarteten Zustand sofort | “Bestellung wird verarbeitet…” |
| Polling | Regelmäßig nach Updates suchen | “Aktualisieren…” Button |
| Push Updates | WebSocket/SSE für Echtzeit-Updates | Live-Updates in der UI |
| Eventual Message | Transparente Kommunikation | “Bestellung erscheint in wenigen Sekunden” |
// Optimistic UI Pattern
@RestController
public class OrderController {
@PostMapping("/orders")
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
OrderPlacedEvent event = orderService.placeOrder(request);
// Optimistic Response: Erwarteten Zustand zurückgeben
OrderView optimisticView = OrderView.builder()
.orderId(event.getOrderId())
.customerId(event.getCustomerId())
.status("processing") // Erwarteter Status
.amount(event.getAmount())
.build();
return ResponseEntity.ok(new OrderResponse(optimisticView));
}
}Wie würden Sie herausfinden, ob Ihre Projections “hinterherhinken”? Monitoring ist entscheidend für Eventually Consistent Systems:
// Projection Lag Monitoring
@Component
public class ProjectionMonitoring {
@Scheduled(fixedRate = 30000) // Alle 30 Sekunden
public void checkProjectionLag() {
long latestEventTimestamp = eventStore.getLatestEventTimestamp();
long latestProjectionTimestamp = readModelStore.getLatestUpdateTimestamp();
long lagInMillis = latestEventTimestamp - latestProjectionTimestamp;
if (lagInMillis > 5000) { // Mehr als 5 Sekunden Verzögerung
alertService.alert("Projection lag detected: " + lagInMillis + "ms");
}
metricsService.recordProjectionLag(lagInMillis);
}
}Reflektieren Sie über Ihr eigenes System: Welche Bereiche können Eventually Consistent sein, und welche benötigen Strong Consistency? Die Antwort hängt von den Geschäftsanforderungen ab.
Im nächsten Kapitel schauen wir uns an, wie CQRS diese Konzepte formalisiert und eine klare Trennung zwischen Schreib- und Lesemodellen schafft.