33 Zustandsaufbau aus Events (Materialisierte Views)

Materialisierte Views sind das Herzstück moderner Event-getriebener Systeme. Sie transformieren kontinuierliche Event-Streams in abfragbare Datenstrukturen und ermöglichen es, komplexe Geschäftsinformationen aus einfachen Events abzuleiten. Dabei entstehen spezialisierte Read-Models, die optimal für spezifische Abfrageanforderungen gestaltet sind.

33.1 Event-to-State Projection

Event-to-State Projection beschreibt den Prozess, bei dem aus einer Sequenz von Events ein aktueller Zustand rekonstruiert wird. Jedes Event repräsentiert eine Zustandsänderung, und die Projektion wendet diese Änderungen sequenziell auf ein Datenmodell an.

Ein praktisches Beispiel ist die Erstellung einer Kundenübersicht aus verschiedenen Customer-Events:

@Component
public class CustomerProfileProjection {
    private final Map<String, CustomerProfile> customerProfiles = new ConcurrentHashMap<>();
    
    @KafkaListener(topics = "customer.registered.v1")
    public void handleCustomerRegistered(CustomerRegisteredEvent event) {
        CustomerProfile profile = CustomerProfile.builder()
            .customerId(event.getCustomerId())
            .email(event.getEmail())
            .firstName(event.getFirstName())
            .lastName(event.getLastName())
            .registrationDate(event.getTimestamp())
            .totalOrders(0)
            .totalSpent(BigDecimal.ZERO)
            .loyaltyPoints(0)
            .build();
            
        customerProfiles.put(event.getCustomerId(), profile);
    }
    
    @KafkaListener(topics = "order.completed.v1")
    public void handleOrderCompleted(OrderCompletedEvent event) {
        CustomerProfile profile = customerProfiles.get(event.getCustomerId());
        if (profile != null) {
            profile.setTotalOrders(profile.getTotalOrders() + 1);
            profile.setTotalSpent(profile.getTotalSpent().add(event.getTotalAmount()));
            profile.setLastOrderDate(event.getTimestamp());
            
            // Loyalitätspunkte basierend auf Bestellwert
            int newPoints = event.getTotalAmount().intValue() / 10;
            profile.setLoyaltyPoints(profile.getLoyaltyPoints() + newPoints);
        }
    }
    
    @KafkaListener(topics = "customer.address.updated.v1")
    public void handleAddressUpdated(CustomerAddressUpdatedEvent event) {
        CustomerProfile profile = customerProfiles.get(event.getCustomerId());
        if (profile != null) {
            profile.setShippingAddress(event.getNewAddress());
            profile.setLastUpdated(event.getTimestamp());
        }
    }
    
    public CustomerProfile getCustomerProfile(String customerId) {
        return customerProfiles.get(customerId);
    }
}

Die Projektion funktioniert als Zustandsmaschine, die auf Events reagiert und dabei einen materialized view aufbaut. Entscheidend ist die Reihenfolge der Events - sie bestimmt den finalen Zustand.

from datetime import datetime
from decimal import Decimal
from dataclasses import dataclass, field
from typing import Dict, Optional

@dataclass
class CustomerProfile:
    customer_id: str
    email: str
    first_name: str
    last_name: str
    registration_date: datetime
    total_orders: int = 0
    total_spent: Decimal = field(default_factory=lambda: Decimal('0'))
    loyalty_points: int = 0
    shipping_address: Optional[str] = None
    last_order_date: Optional[datetime] = None
    last_updated: Optional[datetime] = None

class CustomerProfileProjection:
    def __init__(self):
        self.customer_profiles: Dict[str, CustomerProfile] = {}
    
    async def handle_customer_registered(self, event):
        """Initiale Projektion aus CustomerRegistered Event"""
        profile = CustomerProfile(
            customer_id=event['customerId'],
            email=event['email'],
            first_name=event['firstName'],
            last_name=event['lastName'],
            registration_date=datetime.fromisoformat(event['timestamp'])
        )
        
        self.customer_profiles[event['customerId']] = profile
    
    async def handle_order_completed(self, event):
        """Projektion wird durch OrderCompleted Event aktualisiert"""
        customer_id = event['customerId']
        if customer_id in self.customer_profiles:
            profile = self.customer_profiles[customer_id]
            profile.total_orders += 1
            profile.total_spent += Decimal(str(event['totalAmount']))
            profile.last_order_date = datetime.fromisoformat(event['timestamp'])
            
            # Loyalitätspunkte Berechnung
            new_points = int(event['totalAmount']) // 10
            profile.loyalty_points += new_points
    
    async def handle_address_updated(self, event):
        """Adress-Updates aktualisieren bestehende Projektion"""
        customer_id = event['customerId']
        if customer_id in self.customer_profiles:
            profile = self.customer_profiles[customer_id]
            profile.shipping_address = event['newAddress']
            profile.last_updated = datetime.fromisoformat(event['timestamp'])
    
    def get_customer_profile(self, customer_id: str) -> Optional[CustomerProfile]:
        return self.customer_profiles.get(customer_id)

33.2 View Materialization Strategies

Je nach Anwendungsfall eignen sich verschiedene Strategien zur Materialisierung von Views. Die Wahl beeinflusst Performance, Speicherverbrauch und Konsistenzgarantien erheblich.

Strategie Beschreibung Vorteile Nachteile Anwendungsfall
Eager Materialization Views werden sofort bei Event-Eingang aktualisiert Niedrige Read-Latenz Höhere Write-Latenz Häufig abgefragte Daten
Lazy Materialization Views werden erst bei Abfrage berechnet Geringer Speicherverbrauch Höhere Read-Latenz Selten abgefragte Daten
Snapshot Materialization Periodische Vollberechnung Konsistente Snapshots Veraltete Zwischenstände Reporting und Analytics
Incremental Materialization Nur Änderungen werden verarbeitet Effiziente Updates Komplexere Logik Real-time Dashboards

Eager Materialization eignet sich für häufig abgefragte Views wie Produktkataloge oder Kundenprofile:

@Component
public class ProductCatalogView {
    private final Map<String, ProductInfo> catalog = new ConcurrentHashMap<>();
    
    @KafkaListener(topics = "product.created.v1")
    public void handleProductCreated(ProductCreatedEvent event) {
        // Sofortige Materialisierung
        ProductInfo product = ProductInfo.builder()
            .productId(event.getProductId())
            .name(event.getName())
            .description(event.getDescription())
            .category(event.getCategory())
            .basePrice(event.getPrice())
            .available(true)
            .build();
            
        catalog.put(event.getProductId(), product);
    }
    
    @KafkaListener(topics = "inventory.updated.v1")
    public void handleInventoryUpdated(InventoryUpdatedEvent event) {
        // Sofortige Aktualisierung der Verfügbarkeit
        ProductInfo product = catalog.get(event.getProductId());
        if (product != null) {
            product.setStockLevel(event.getNewStockLevel());
            product.setAvailable(event.getNewStockLevel() > 0);
        }
    }
    
    // Read-Zugriff ohne Latenz
    public ProductInfo getProduct(String productId) {
        return catalog.get(productId);
    }
    
    public List<ProductInfo> getProductsByCategory(String category) {
        return catalog.values().stream()
            .filter(product -> category.equals(product.getCategory()))
            .collect(Collectors.toList());
    }
}

Lazy Materialization berechnet Views on-demand und eignet sich für komplexe Aggregationen:

from functools import lru_cache
from typing import List, Dict
import asyncio

class OrderAnalyticsView:
    def __init__(self):
        self.order_events: List[Dict] = []
        self.payment_events: List[Dict] = []
        
    async def handle_order_event(self, event):
        """Events werden gesammelt, aber nicht sofort materialisiert"""
        self.order_events.append(event)
        # Cache invalidation bei neuen Events
        self.get_monthly_revenue.cache_clear()
        self.get_top_customers.cache_clear()
    
    async def handle_payment_event(self, event):
        self.payment_events.append(event)
        self.get_monthly_revenue.cache_clear()
    
    @lru_cache(maxsize=100)
    def get_monthly_revenue(self, year: int, month: int) -> Dict:
        """Lazy materialization - berechnet bei Bedarf"""
        monthly_orders = [
            event for event in self.order_events
            if self._is_in_month(event['timestamp'], year, month)
        ]
        
        total_revenue = sum(
            order['totalAmount'] for order in monthly_orders
        )
        
        return {
            'year': year,
            'month': month,
            'total_revenue': total_revenue,
            'order_count': len(monthly_orders),
            'average_order_value': total_revenue / len(monthly_orders) if monthly_orders else 0
        }
    
    @lru_cache(maxsize=50)
    def get_top_customers(self, limit: int = 10) -> List[Dict]:
        """On-demand Berechnung der Top-Kunden"""
        customer_totals = {}
        
        for order in self.order_events:
            customer_id = order['customerId']
            if customer_id not in customer_totals:
                customer_totals[customer_id] = 0
            customer_totals[customer_id] += order['totalAmount']
        
        return sorted(
            [{'customer_id': k, 'total_spent': v} for k, v in customer_totals.items()],
            key=lambda x: x['total_spent'],
            reverse=True
        )[:limit]

Snapshot Materialization erstellt regelmäßige, konsistente Zustandsabbilder:

@Component
public class DailyReportGenerator {
    
    @Scheduled(cron = "0 0 2 * * *") // Täglich um 2 Uhr
    public void generateDailySnapshot() {
        LocalDate reportDate = LocalDate.now().minusDays(1);
        
        DailyReport report = DailyReport.builder()
            .reportDate(reportDate)
            .totalOrders(calculateDailyOrders(reportDate))
            .totalRevenue(calculateDailyRevenue(reportDate))
            .newCustomers(calculateNewCustomers(reportDate))
            .returnCustomers(calculateReturnCustomers(reportDate))
            .topProducts(calculateTopProducts(reportDate))
            .build();
            
        reportRepository.save(report);
        
        // Snapshot ist konsistent zum Zeitpunkt der Erstellung
        publishEvent(new DailyReportGeneratedEvent(reportDate, report));
    }
}

33.3 Consistency Models

Materialisierte Views in Event-getriebenen Systemen folgen verschiedenen Konsistenzmodellen. Die Wahl beeinflusst die Garantien, die das System gegenüber Lesezugriffen geben kann.

Eventual Consistency ist das häufigste Modell in verteilten Event-Systemen. Views werden asynchron aktualisiert und erreichen nach einer unbestimmten Zeit einen konsistenten Zustand:

@Component
public class EventuallyConsistentInventoryView {
    private final Map<String, InventoryStatus> inventory = new ConcurrentHashMap<>();
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        // Reservation wird asynchron verarbeitet
        event.getItems().forEach(item -> {
            InventoryStatus status = inventory.get(item.getProductId());
            if (status != null) {
                // Temporäre Inkonsistenz möglich
                status.setReservedQuantity(
                    status.getReservedQuantity() + item.getQuantity()
                );
            }
        });
    }
    
    @KafkaListener(topics = "inventory.reserved.v1")
    public void handleInventoryReserved(InventoryReservedEvent event) {
        // Eventual consistency - finaler konsistenter Zustand
        InventoryStatus status = inventory.get(event.getProductId());
        if (status != null) {
            status.setAvailableQuantity(
                status.getAvailableQuantity() - event.getReservedQuantity()
            );
            status.setLastUpdated(event.getTimestamp());
        }
    }
}

Read-your-writes Consistency stellt sicher, dass ein Client seine eigenen Schreibvorgänge sofort lesen kann:

import asyncio
from collections import defaultdict

class ReadYourWritesOrderView:
    def __init__(self):
        self.order_states = {}
        self.client_versions = defaultdict(int)  # Track client versions
    
    async def handle_order_placed(self, event, client_id=None):
        """Schreibvorgang wird für Client markiert"""
        order_id = event['orderId']
        self.order_states[order_id] = {
            'status': 'PLACED',
            'items': event['items'],
            'timestamp': event['timestamp']
        }
        
        if client_id:
            # Version für Client erhöhen
            self.client_versions[client_id] += 1
            self.order_states[order_id]['client_version'] = self.client_versions[client_id]
    
    async def get_order_status(self, order_id: str, client_id: str = None):
        """Read garantiert eigene Writes zu sehen"""
        order = self.order_states.get(order_id)
        
        if client_id and order:
            # Client sieht mindestens seine eigene Version
            client_version = self.client_versions.get(client_id, 0)
            order_version = order.get('client_version', 0)
            
            if order_version >= client_version:
                return order
            else:
                # Warten auf Konsistenz
                await self._wait_for_consistency(order_id, client_version)
                return self.order_states.get(order_id)
        
        return order

Session Consistency erweitert Read-your-writes auf Session-Ebene:

Konsistenzlevel Garantie Performance Komplexität Anwendungsfall
Eventual Alle Clients sehen am Ende denselben Zustand Hoch Niedrig Analytics, Reporting
Read-your-writes Client sieht eigene Änderungen sofort Mittel Mittel User Interfaces
Session Konsistenz innerhalb einer Session Mittel Mittel Shopping Carts
Strong Alle Clients sehen sofort denselben Zustand Niedrig Hoch Financial Transactions

Die Materialierung von Views aus Events ermöglicht es, komplexe Read-Models zu erstellen, die optimal für spezifische Abfrageanforderungen gestaltet sind. Dabei ist die Wahl der richtigen Materialisierungsstrategie und des Konsistenzmodells entscheidend für die Performance und Korrektheit des Systems. In der Praxis werden oft mehrere Views mit unterschiedlichen Strategien kombiniert, um verschiedene Anwendungsfälle optimal zu unterstützen.