Event-getriebene Systeme verarbeiten Daten asynchron, aber Clients
erwarten oft sofortige Antworten. Wie bedienen Sie eine Abfrage wie
GET /orders/12345 in Millisekunden, wenn die Bestelldaten
aus einem kontinuierlichen Stream von Events wie
OrderPlaced, PaymentProcessed und
OrderShipped entstehen?
Die Lösung liegt in der intelligenten Vorbereitung: Anstatt Events zur Laufzeit zu aggregieren, bauen Sie spezialisierte Datenstrukturen auf, die optimal für Abfragen konzipiert sind.
Command Query Responsibility Segregation trennt Schreiboperationen (Commands) von Leseoperationen (Queries) vollständig. Während Events die authoritative Wahrheit über Geschäftsereignisse darstellen, dienen Query-Models der effizienten Datenabfrage.
Betrachten Sie verschiedene Anwendungsfälle für Bestelldaten:
| Anwendungsfall | Benötigte Daten | Zugriffsmuster | Optimierung |
|---|---|---|---|
| Kundenstatus | Status, Lieferzeit, Tracking | Einzelabfrage per ID | Index auf orderId |
| Support-Dashboard | Vollständige Event-Historie | Timeline-Ansicht | Sortierung nach Zeit |
| Analytics | Aggregierte Metriken | Batch-Verarbeitung | Materialisierte Summen |
Spring Boot Query-Side Implementation:
// Kundenorientierte View
@Entity
@Table(name = "order_customer_view")
public class OrderCustomerView {
@Id
private String orderId;
private String customerId;
private OrderStatus status;
private LocalDateTime lastUpdated;
private String trackingInfo;
// Optimiert für: WHERE orderId = ? AND customerId = ?
}
// Support-orientierte View
@Entity
@Table(name = "order_support_view")
public class OrderSupportView {
@Id
private String orderId;
private List<String> eventHistory; // JSON-Array
private String currentIssue;
private Integer retryCount;
// Optimiert für: WHERE currentIssue IS NOT NULL
}
@Service
public class OrderQueryService {
private final OrderCustomerViewRepository customerRepo;
private final OrderSupportViewRepository supportRepo;
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) {
// Customer View aktualisieren
OrderCustomerView customerView = new OrderCustomerView();
customerView.setOrderId(event.getOrderId());
customerView.setCustomerId(event.getCustomerId());
customerView.setStatus(OrderStatus.SUBMITTED);
customerView.setLastUpdated(LocalDateTime.now());
customerRepo.save(customerView);
// Support View initialisieren
OrderSupportView supportView = new OrderSupportView();
supportView.setOrderId(event.getOrderId());
supportView.setEventHistory(List.of("OrderPlaced"));
supportRepo.save(supportView);
}
@KafkaListener(topics = "payment.failed.v1")
public void handlePaymentFailed(PaymentFailedEvent event) {
// Nur Support View interessiert sich für Fehler
supportRepo.findById(event.getOrderId())
.ifPresent(view -> {
view.setCurrentIssue("Payment failed: " + event.getReason());
view.setRetryCount(event.getRetryCount());
supportRepo.save(view);
});
}
}Welche Überlegung steckt dahinter? Verschiedene Nutzer benötigen verschiedene Sichten auf dieselben Daten. Ein Kunde möchte wissen “Wo ist meine Bestellung?”, während der Support fragt “Warum hängt diese Bestellung?”
Python-Äquivalent mit spezialisierter Event-Verarbeitung:
from dataclasses import dataclass
from typing import List, Optional
import asyncio
import json
@dataclass
class CustomerOrderView:
order_id: str
customer_id: str
status: str
last_updated: str
tracking_info: Optional[str] = None
@dataclass
class SupportOrderView:
order_id: str
event_history: List[str]
current_issue: Optional[str] = None
retry_count: int = 0
class QuerySideProjector:
def __init__(self):
self.customer_views = {} # In-Memory für Demo
self.support_views = {}
async def handle_event(self, event_data: dict):
event_type = event_data.get('eventType')
order_id = event_data.get('data', {}).get('orderId')
# Event-spezifische Projektionen
if event_type == 'OrderPlaced':
await self._project_to_customer_view(event_data)
await self._project_to_support_view(event_data)
elif event_type == 'PaymentFailed':
await self._project_payment_failure(event_data)
async def _project_to_customer_view(self, event_data: dict):
data = event_data['data']
view = CustomerOrderView(
order_id=data['orderId'],
customer_id=data['customerId'],
status='SUBMITTED',
last_updated=event_data['timestamp']
)
self.customer_views[data['orderId']] = view
async def _project_payment_failure(self, event_data: dict):
order_id = event_data['data']['orderId']
if order_id in self.support_views:
view = self.support_views[order_id]
view.current_issue = f"Payment failed: {event_data['data']['reason']}"
view.retry_count = event_data['data'].get('retryCount', 0)Materialisierte Views sind vorberechnete, persistent gespeicherte Projektionen der Event-Historie. Sie ermöglichen konstante Antwortzeiten unabhängig von der Event-Stream-Größe.
Denken Sie an materialisierte Views als “Event-Stream-Snapshots” - sie fangen den aktuellen Zustand ein, ohne die gesamte Event-Historie durchlaufen zu müssen.
Architekturentscheidung: Wann materialisieren?
@Component
public class MaterializedViewManager {
private final JdbcTemplate jdbcTemplate;
@EventListener
@Async
public void rebuildOrderSummaryView(ViewRebuildRequestedEvent event) {
// Vollständige Neuerstellung aus Event-Store
String sql = """
INSERT INTO order_summary_view
SELECT
order_id,
customer_id,
LAST_VALUE(status) as current_status,
SUM(CASE WHEN event_type = 'OrderPlaced' THEN amount ELSE 0 END) as total_amount,
MAX(event_timestamp) as last_updated
FROM event_store
WHERE event_type IN ('OrderPlaced', 'PaymentProcessed', 'OrderShipped')
GROUP BY order_id, customer_id
""";
jdbcTemplate.execute("TRUNCATE TABLE order_summary_view");
jdbcTemplate.update(sql);
}
@KafkaListener(topics = {"order.placed.v1", "payment.processed.v1", "order.shipped.v1"})
public void incrementalUpdate(String eventJson) {
// Inkrementelle Aktualisierung für bessere Performance
ObjectMapper mapper = new ObjectMapper();
JsonNode event = mapper.readTree(eventJson);
String orderId = event.path("data").path("orderId").asText();
String eventType = event.path("eventType").asText();
switch (eventType) {
case "OrderPlaced":
upsertOrderSummary(orderId, "PLACED", event);
break;
case "PaymentProcessed":
updateOrderStatus(orderId, "PAID");
break;
case "OrderShipped":
updateOrderStatus(orderId, "SHIPPED");
break;
}
}
}Python-basierte Materialized View mit SQLAlchemy:
from sqlalchemy import create_engine, Column, String, DateTime, Decimal, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
Base = declarative_base()
class OrderSummaryView(Base):
__tablename__ = 'order_summary_view'
order_id = Column(String, primary_key=True)
customer_id = Column(String)
current_status = Column(String)
total_amount = Column(Decimal)
item_count = Column(Integer)
last_updated = Column(DateTime)
class MaterializedViewBuilder:
def __init__(self, db_url: str):
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
Base.metadata.create_all(self.engine)
async def process_event_batch(self, events: List[dict]):
"""Batch-Verarbeitung für bessere Performance"""
session = self.Session()
try:
for event in events:
await self._update_materialized_view(session, event)
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
async def _update_materialized_view(self, session, event_data: dict):
order_id = event_data['data']['orderId']
# Upsert-Pattern für materialisierte Views
view = session.query(OrderSummaryView).filter_by(
order_id=order_id
).first()
if not view:
view = OrderSummaryView(order_id=order_id)
session.add(view)
# Event-spezifische Aktualisierung
event_type = event_data['eventType']
if event_type == 'OrderPlaced':
view.customer_id = event_data['data']['customerId']
view.current_status = 'PLACED'
view.total_amount = event_data['data']['totalAmount']
view.item_count = len(event_data['data']['items'])
view.last_updated = datetime.now()Für Anwendungsfälle mit strengen Latenz-Anforderungen reichen datenbankbasierte Views nicht aus. In-Memory Read Models bieten Sub-Millisekunden-Antwortzeiten.
Fragen Sie sich: Welche Daten benötigen Sie wirklich in Echtzeit? Oft sind es kritische Statusinformationen und Geschäftskennzahlen.
High-Performance In-Memory Cache:
@Component
public class OrderStatusCache {
private final ConcurrentHashMap<String, CachedOrderStatus> cache = new ConcurrentHashMap<>();
private final Meter cacheHits = Metrics.counter("order.cache.hits");
private final Meter cacheMisses = Metrics.counter("order.cache.misses");
@Value("${order.cache.ttl.minutes:30}")
private int cacheTtlMinutes;
@KafkaListener(topics = {
"order.placed.v1",
"payment.processed.v1",
"order.shipped.v1"
})
public void updateCache(String eventJson, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
ObjectMapper mapper = new ObjectMapper();
JsonNode event = mapper.readTree(eventJson);
String orderId = event.path("data").path("orderId").asText();
CachedOrderStatus status = cache.computeIfAbsent(orderId,
k -> new CachedOrderStatus(orderId));
switch (topic) {
case "order.placed.v1":
status.updateStatus("PLACED", Instant.now());
status.setCustomerId(event.path("data").path("customerId").asText());
break;
case "payment.processed.v1":
status.updateStatus("PAID", Instant.now());
status.setPaymentConfirmed(true);
break;
case "order.shipped.v1":
status.updateStatus("SHIPPED", Instant.now());
status.setTrackingNumber(event.path("data").path("trackingNumber").asText());
break;
}
}
public Optional<CachedOrderStatus> getOrderStatus(String orderId) {
CachedOrderStatus status = cache.get(orderId);
if (status != null && !status.isExpired(cacheTtlMinutes)) {
cacheHits.increment();
return Optional.of(status);
} else {
cacheMisses.increment();
if (status != null) {
cache.remove(orderId); // Cleanup expired entries
}
return Optional.empty();
}
}
}
@RestController
public class FastOrderController {
private final OrderStatusCache cache;
private final OrderQueryService fallbackService;
@GetMapping("/api/orders/{orderId}/status")
public ResponseEntity<OrderStatusResponse> getOrderStatus(@PathVariable String orderId) {
// Erste Priorität: Cache-Lookup
Optional<CachedOrderStatus> cachedStatus = cache.getOrderStatus(orderId);
if (cachedStatus.isPresent()) {
return ResponseEntity.ok()
.header("X-Data-Source", "cache")
.body(new OrderStatusResponse(cachedStatus.get()));
}
// Fallback: Materialized View
OrderSummaryView view = fallbackService.getOrderSummary(orderId);
return ResponseEntity.ok()
.header("X-Data-Source", "database")
.body(new OrderStatusResponse(view));
}
}Cache-Strategien für Event-Streams:
| Strategie | Konsistenz | Latenz | Memory-Verbrauch | Use Case |
|---|---|---|---|---|
| Write-through | Stark | Mittel | Hoch | Kritische Daten |
| Cache-aside | Eventual | Niedrig | Mittel | Status-Abfragen |
| Write-behind | Eventual | Sehr niedrig | Variabel | Analytics |
Python Redis-basierter Cache:
import redis
import json
from typing import Optional
from dataclasses import dataclass, asdict
import asyncio
@dataclass
class FastOrderStatus:
order_id: str
status: str
last_updated: float
customer_id: Optional[str] = None
tracking_number: Optional[str] = None
class RedisOrderCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url, decode_responses=True)
self.ttl_seconds = 1800 # 30 Minuten
async def update_order_status(self, event_data: dict):
order_id = event_data['data']['orderId']
event_type = event_data['eventType']
# Bestehenden Status laden oder neuen erstellen
existing = await self.get_order_status(order_id)
if existing:
status = existing
else:
status = FastOrderStatus(
order_id=order_id,
status='UNKNOWN',
last_updated=time.time()
)
# Event-spezifische Updates
if event_type == 'OrderPlaced':
status.status = 'PLACED'
status.customer_id = event_data['data']['customerId']
elif event_type == 'PaymentProcessed':
status.status = 'PAID'
elif event_type == 'OrderShipped':
status.status = 'SHIPPED'
status.tracking_number = event_data['data'].get('trackingNumber')
status.last_updated = time.time()
# Cache-Update mit TTL
cache_key = f"order:status:{order_id}"
self.redis_client.setex(
cache_key,
self.ttl_seconds,
json.dumps(asdict(status))
)
async def get_order_status(self, order_id: str) -> Optional[FastOrderStatus]:
cache_key = f"order:status:{order_id}"
cached_data = self.redis_client.get(cache_key)
if cached_data:
data = json.loads(cached_data)
return FastOrderStatus(**data)
return None
# FastAPI Integration
@app.get("/api/orders/{order_id}/fast-status")
async def get_fast_order_status(order_id: str):
cache = RedisOrderCache()
status = await cache.get_order_status(order_id)
if status:
return {
"orderId": status.order_id,
"status": status.status,
"lastUpdated": status.last_updated,
"source": "cache"
}
# Fallback zu langsamerem Service
raise HTTPException(status_code=404, detail="Order not found in cache")Die Kombination aus CQRS-Query-Models, materialisierten Views und Low-Latency-Caches ermöglicht es, die Vorteile event-getriebener Architekturen zu nutzen, ohne die Performance-Erwartungen moderner Anwendungen zu kompromittieren.
Welche Latenz-Anforderungen haben Ihre kritischen Abfragen? Diese Entscheidung bestimmt maßgeblich Ihre Cache-Architektur.