54 Serving real-time requests aus Event-Streams

Event-getriebene Systeme verarbeiten Daten asynchron, aber Clients erwarten oft sofortige Antworten. Wie bedienen Sie eine Abfrage wie GET /orders/12345 in Millisekunden, wenn die Bestelldaten aus einem kontinuierlichen Stream von Events wie OrderPlaced, PaymentProcessed und OrderShipped entstehen?

Die Lösung liegt in der intelligenten Vorbereitung: Anstatt Events zur Laufzeit zu aggregieren, bauen Sie spezialisierte Datenstrukturen auf, die optimal für Abfragen konzipiert sind.

54.1 CQRS Query Side

Command Query Responsibility Segregation trennt Schreiboperationen (Commands) von Leseoperationen (Queries) vollständig. Während Events die authoritative Wahrheit über Geschäftsereignisse darstellen, dienen Query-Models der effizienten Datenabfrage.

Betrachten Sie verschiedene Anwendungsfälle für Bestelldaten:

Anwendungsfall Benötigte Daten Zugriffsmuster Optimierung
Kundenstatus Status, Lieferzeit, Tracking Einzelabfrage per ID Index auf orderId
Support-Dashboard Vollständige Event-Historie Timeline-Ansicht Sortierung nach Zeit
Analytics Aggregierte Metriken Batch-Verarbeitung Materialisierte Summen

Spring Boot Query-Side Implementation:

// Kundenorientierte View
@Entity
@Table(name = "order_customer_view")
public class OrderCustomerView {
    @Id
    private String orderId;
    private String customerId;
    private OrderStatus status;
    private LocalDateTime lastUpdated;
    private String trackingInfo;
    
    // Optimiert für: WHERE orderId = ? AND customerId = ?
}

// Support-orientierte View  
@Entity
@Table(name = "order_support_view")
public class OrderSupportView {
    @Id
    private String orderId;
    private List<String> eventHistory; // JSON-Array
    private String currentIssue;
    private Integer retryCount;
    
    // Optimiert für: WHERE currentIssue IS NOT NULL
}

@Service
public class OrderQueryService {
    
    private final OrderCustomerViewRepository customerRepo;
    private final OrderSupportViewRepository supportRepo;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        // Customer View aktualisieren
        OrderCustomerView customerView = new OrderCustomerView();
        customerView.setOrderId(event.getOrderId());
        customerView.setCustomerId(event.getCustomerId());
        customerView.setStatus(OrderStatus.SUBMITTED);
        customerView.setLastUpdated(LocalDateTime.now());
        customerRepo.save(customerView);
        
        // Support View initialisieren
        OrderSupportView supportView = new OrderSupportView();
        supportView.setOrderId(event.getOrderId());
        supportView.setEventHistory(List.of("OrderPlaced"));
        supportRepo.save(supportView);
    }
    
    @KafkaListener(topics = "payment.failed.v1")
    public void handlePaymentFailed(PaymentFailedEvent event) {
        // Nur Support View interessiert sich für Fehler
        supportRepo.findById(event.getOrderId())
            .ifPresent(view -> {
                view.setCurrentIssue("Payment failed: " + event.getReason());
                view.setRetryCount(event.getRetryCount());
                supportRepo.save(view);
            });
    }
}

Welche Überlegung steckt dahinter? Verschiedene Nutzer benötigen verschiedene Sichten auf dieselben Daten. Ein Kunde möchte wissen “Wo ist meine Bestellung?”, während der Support fragt “Warum hängt diese Bestellung?”

Python-Äquivalent mit spezialisierter Event-Verarbeitung:

from dataclasses import dataclass
from typing import List, Optional
import asyncio
import json

@dataclass
class CustomerOrderView:
    order_id: str
    customer_id: str
    status: str
    last_updated: str
    tracking_info: Optional[str] = None

@dataclass  
class SupportOrderView:
    order_id: str
    event_history: List[str]
    current_issue: Optional[str] = None
    retry_count: int = 0

class QuerySideProjector:
    def __init__(self):
        self.customer_views = {}  # In-Memory für Demo
        self.support_views = {}
    
    async def handle_event(self, event_data: dict):
        event_type = event_data.get('eventType')
        order_id = event_data.get('data', {}).get('orderId')
        
        # Event-spezifische Projektionen
        if event_type == 'OrderPlaced':
            await self._project_to_customer_view(event_data)
            await self._project_to_support_view(event_data)
            
        elif event_type == 'PaymentFailed':
            await self._project_payment_failure(event_data)
    
    async def _project_to_customer_view(self, event_data: dict):
        data = event_data['data']
        view = CustomerOrderView(
            order_id=data['orderId'],
            customer_id=data['customerId'],
            status='SUBMITTED',
            last_updated=event_data['timestamp']
        )
        self.customer_views[data['orderId']] = view
    
    async def _project_payment_failure(self, event_data: dict):
        order_id = event_data['data']['orderId']
        if order_id in self.support_views:
            view = self.support_views[order_id]
            view.current_issue = f"Payment failed: {event_data['data']['reason']}"
            view.retry_count = event_data['data'].get('retryCount', 0)

54.2 Materialized View Serving

Materialisierte Views sind vorberechnete, persistent gespeicherte Projektionen der Event-Historie. Sie ermöglichen konstante Antwortzeiten unabhängig von der Event-Stream-Größe.

Denken Sie an materialisierte Views als “Event-Stream-Snapshots” - sie fangen den aktuellen Zustand ein, ohne die gesamte Event-Historie durchlaufen zu müssen.

Architekturentscheidung: Wann materialisieren?

@Component
public class MaterializedViewManager {
    
    private final JdbcTemplate jdbcTemplate;
    
    @EventListener
    @Async
    public void rebuildOrderSummaryView(ViewRebuildRequestedEvent event) {
        // Vollständige Neuerstellung aus Event-Store
        String sql = """
            INSERT INTO order_summary_view 
            SELECT 
                order_id,
                customer_id,
                LAST_VALUE(status) as current_status,
                SUM(CASE WHEN event_type = 'OrderPlaced' THEN amount ELSE 0 END) as total_amount,
                MAX(event_timestamp) as last_updated
            FROM event_store 
            WHERE event_type IN ('OrderPlaced', 'PaymentProcessed', 'OrderShipped')
            GROUP BY order_id, customer_id
        """;
        
        jdbcTemplate.execute("TRUNCATE TABLE order_summary_view");
        jdbcTemplate.update(sql);
    }
    
    @KafkaListener(topics = {"order.placed.v1", "payment.processed.v1", "order.shipped.v1"})
    public void incrementalUpdate(String eventJson) {
        // Inkrementelle Aktualisierung für bessere Performance
        ObjectMapper mapper = new ObjectMapper();
        JsonNode event = mapper.readTree(eventJson);
        
        String orderId = event.path("data").path("orderId").asText();
        String eventType = event.path("eventType").asText();
        
        switch (eventType) {
            case "OrderPlaced":
                upsertOrderSummary(orderId, "PLACED", event);
                break;
            case "PaymentProcessed":
                updateOrderStatus(orderId, "PAID");
                break;
            case "OrderShipped":
                updateOrderStatus(orderId, "SHIPPED");
                break;
        }
    }
}

Python-basierte Materialized View mit SQLAlchemy:

from sqlalchemy import create_engine, Column, String, DateTime, Decimal, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

Base = declarative_base()

class OrderSummaryView(Base):
    __tablename__ = 'order_summary_view'
    
    order_id = Column(String, primary_key=True)
    customer_id = Column(String)
    current_status = Column(String)
    total_amount = Column(Decimal)
    item_count = Column(Integer)
    last_updated = Column(DateTime)

class MaterializedViewBuilder:
    def __init__(self, db_url: str):
        self.engine = create_engine(db_url)
        self.Session = sessionmaker(bind=self.engine)
        Base.metadata.create_all(self.engine)
    
    async def process_event_batch(self, events: List[dict]):
        """Batch-Verarbeitung für bessere Performance"""
        session = self.Session()
        
        try:
            for event in events:
                await self._update_materialized_view(session, event)
            session.commit()
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()
    
    async def _update_materialized_view(self, session, event_data: dict):
        order_id = event_data['data']['orderId']
        
        # Upsert-Pattern für materialisierte Views
        view = session.query(OrderSummaryView).filter_by(
            order_id=order_id
        ).first()
        
        if not view:
            view = OrderSummaryView(order_id=order_id)
            session.add(view)
        
        # Event-spezifische Aktualisierung
        event_type = event_data['eventType']
        if event_type == 'OrderPlaced':
            view.customer_id = event_data['data']['customerId']
            view.current_status = 'PLACED'
            view.total_amount = event_data['data']['totalAmount']
            view.item_count = len(event_data['data']['items'])
        
        view.last_updated = datetime.now()

54.3 Low-latency Read Models

Für Anwendungsfälle mit strengen Latenz-Anforderungen reichen datenbankbasierte Views nicht aus. In-Memory Read Models bieten Sub-Millisekunden-Antwortzeiten.

Fragen Sie sich: Welche Daten benötigen Sie wirklich in Echtzeit? Oft sind es kritische Statusinformationen und Geschäftskennzahlen.

High-Performance In-Memory Cache:

@Component
public class OrderStatusCache {
    
    private final ConcurrentHashMap<String, CachedOrderStatus> cache = new ConcurrentHashMap<>();
    private final Meter cacheHits = Metrics.counter("order.cache.hits");
    private final Meter cacheMisses = Metrics.counter("order.cache.misses");
    
    @Value("${order.cache.ttl.minutes:30}")
    private int cacheTtlMinutes;
    
    @KafkaListener(topics = {
        "order.placed.v1", 
        "payment.processed.v1", 
        "order.shipped.v1"
    })
    public void updateCache(String eventJson, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        
        ObjectMapper mapper = new ObjectMapper();
        JsonNode event = mapper.readTree(eventJson);
        String orderId = event.path("data").path("orderId").asText();
        
        CachedOrderStatus status = cache.computeIfAbsent(orderId, 
            k -> new CachedOrderStatus(orderId));
        
        switch (topic) {
            case "order.placed.v1":
                status.updateStatus("PLACED", Instant.now());
                status.setCustomerId(event.path("data").path("customerId").asText());
                break;
                
            case "payment.processed.v1":
                status.updateStatus("PAID", Instant.now());
                status.setPaymentConfirmed(true);
                break;
                
            case "order.shipped.v1":
                status.updateStatus("SHIPPED", Instant.now());
                status.setTrackingNumber(event.path("data").path("trackingNumber").asText());
                break;
        }
    }
    
    public Optional<CachedOrderStatus> getOrderStatus(String orderId) {
        CachedOrderStatus status = cache.get(orderId);
        
        if (status != null && !status.isExpired(cacheTtlMinutes)) {
            cacheHits.increment();
            return Optional.of(status);
        } else {
            cacheMisses.increment();
            if (status != null) {
                cache.remove(orderId); // Cleanup expired entries
            }
            return Optional.empty();
        }
    }
}

@RestController
public class FastOrderController {
    
    private final OrderStatusCache cache;
    private final OrderQueryService fallbackService;
    
    @GetMapping("/api/orders/{orderId}/status")
    public ResponseEntity<OrderStatusResponse> getOrderStatus(@PathVariable String orderId) {
        
        // Erste Priorität: Cache-Lookup
        Optional<CachedOrderStatus> cachedStatus = cache.getOrderStatus(orderId);
        if (cachedStatus.isPresent()) {
            return ResponseEntity.ok()
                .header("X-Data-Source", "cache")
                .body(new OrderStatusResponse(cachedStatus.get()));
        }
        
        // Fallback: Materialized View
        OrderSummaryView view = fallbackService.getOrderSummary(orderId);
        return ResponseEntity.ok()
            .header("X-Data-Source", "database")
            .body(new OrderStatusResponse(view));
    }
}

Cache-Strategien für Event-Streams:

Strategie Konsistenz Latenz Memory-Verbrauch Use Case
Write-through Stark Mittel Hoch Kritische Daten
Cache-aside Eventual Niedrig Mittel Status-Abfragen
Write-behind Eventual Sehr niedrig Variabel Analytics

Python Redis-basierter Cache:

import redis
import json
from typing import Optional
from dataclasses import dataclass, asdict
import asyncio

@dataclass
class FastOrderStatus:
    order_id: str
    status: str
    last_updated: float
    customer_id: Optional[str] = None
    tracking_number: Optional[str] = None

class RedisOrderCache:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url, decode_responses=True)
        self.ttl_seconds = 1800  # 30 Minuten
    
    async def update_order_status(self, event_data: dict):
        order_id = event_data['data']['orderId']
        event_type = event_data['eventType']
        
        # Bestehenden Status laden oder neuen erstellen
        existing = await self.get_order_status(order_id)
        if existing:
            status = existing
        else:
            status = FastOrderStatus(
                order_id=order_id,
                status='UNKNOWN',
                last_updated=time.time()
            )
        
        # Event-spezifische Updates
        if event_type == 'OrderPlaced':
            status.status = 'PLACED'
            status.customer_id = event_data['data']['customerId']
        elif event_type == 'PaymentProcessed':
            status.status = 'PAID'
        elif event_type == 'OrderShipped':
            status.status = 'SHIPPED'
            status.tracking_number = event_data['data'].get('trackingNumber')
        
        status.last_updated = time.time()
        
        # Cache-Update mit TTL
        cache_key = f"order:status:{order_id}"
        self.redis_client.setex(
            cache_key, 
            self.ttl_seconds, 
            json.dumps(asdict(status))
        )
    
    async def get_order_status(self, order_id: str) -> Optional[FastOrderStatus]:
        cache_key = f"order:status:{order_id}"
        cached_data = self.redis_client.get(cache_key)
        
        if cached_data:
            data = json.loads(cached_data)
            return FastOrderStatus(**data)
        
        return None

# FastAPI Integration
@app.get("/api/orders/{order_id}/fast-status")
async def get_fast_order_status(order_id: str):
    cache = RedisOrderCache()
    status = await cache.get_order_status(order_id)
    
    if status:
        return {
            "orderId": status.order_id,
            "status": status.status,
            "lastUpdated": status.last_updated,
            "source": "cache"
        }
    
    # Fallback zu langsamerem Service
    raise HTTPException(status_code=404, detail="Order not found in cache")

Die Kombination aus CQRS-Query-Models, materialisierten Views und Low-Latency-Caches ermöglicht es, die Vorteile event-getriebener Architekturen zu nutzen, ohne die Performance-Erwartungen moderner Anwendungen zu kompromittieren.

Welche Latenz-Anforderungen haben Ihre kritischen Abfragen? Diese Entscheidung bestimmt maßgeblich Ihre Cache-Architektur.