Materialisierte Views sind das Herzstück moderner Event-getriebener Systeme. Sie transformieren kontinuierliche Event-Streams in abfragbare Datenstrukturen und ermöglichen es, komplexe Geschäftsinformationen aus einfachen Events abzuleiten. Dabei entstehen spezialisierte Read-Models, die optimal für spezifische Abfrageanforderungen gestaltet sind.
Event-to-State Projection beschreibt den Prozess, bei dem aus einer Sequenz von Events ein aktueller Zustand rekonstruiert wird. Jedes Event repräsentiert eine Zustandsänderung, und die Projektion wendet diese Änderungen sequenziell auf ein Datenmodell an.
Ein praktisches Beispiel ist die Erstellung einer Kundenübersicht aus verschiedenen Customer-Events:
@Component
public class CustomerProfileProjection {
private final Map<String, CustomerProfile> customerProfiles = new ConcurrentHashMap<>();
@KafkaListener(topics = "customer.registered.v1")
public void handleCustomerRegistered(CustomerRegisteredEvent event) {
CustomerProfile profile = CustomerProfile.builder()
.customerId(event.getCustomerId())
.email(event.getEmail())
.firstName(event.getFirstName())
.lastName(event.getLastName())
.registrationDate(event.getTimestamp())
.totalOrders(0)
.totalSpent(BigDecimal.ZERO)
.loyaltyPoints(0)
.build();
customerProfiles.put(event.getCustomerId(), profile);
}
@KafkaListener(topics = "order.completed.v1")
public void handleOrderCompleted(OrderCompletedEvent event) {
CustomerProfile profile = customerProfiles.get(event.getCustomerId());
if (profile != null) {
profile.setTotalOrders(profile.getTotalOrders() + 1);
profile.setTotalSpent(profile.getTotalSpent().add(event.getTotalAmount()));
profile.setLastOrderDate(event.getTimestamp());
// Loyalitätspunkte basierend auf Bestellwert
int newPoints = event.getTotalAmount().intValue() / 10;
profile.setLoyaltyPoints(profile.getLoyaltyPoints() + newPoints);
}
}
@KafkaListener(topics = "customer.address.updated.v1")
public void handleAddressUpdated(CustomerAddressUpdatedEvent event) {
CustomerProfile profile = customerProfiles.get(event.getCustomerId());
if (profile != null) {
profile.setShippingAddress(event.getNewAddress());
profile.setLastUpdated(event.getTimestamp());
}
}
public CustomerProfile getCustomerProfile(String customerId) {
return customerProfiles.get(customerId);
}
}Die Projektion funktioniert als Zustandsmaschine, die auf Events reagiert und dabei einen materialized view aufbaut. Entscheidend ist die Reihenfolge der Events - sie bestimmt den finalen Zustand.
from datetime import datetime
from decimal import Decimal
from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class CustomerProfile:
customer_id: str
email: str
first_name: str
last_name: str
registration_date: datetime
total_orders: int = 0
total_spent: Decimal = field(default_factory=lambda: Decimal('0'))
loyalty_points: int = 0
shipping_address: Optional[str] = None
last_order_date: Optional[datetime] = None
last_updated: Optional[datetime] = None
class CustomerProfileProjection:
def __init__(self):
self.customer_profiles: Dict[str, CustomerProfile] = {}
async def handle_customer_registered(self, event):
"""Initiale Projektion aus CustomerRegistered Event"""
profile = CustomerProfile(
customer_id=event['customerId'],
email=event['email'],
first_name=event['firstName'],
last_name=event['lastName'],
registration_date=datetime.fromisoformat(event['timestamp'])
)
self.customer_profiles[event['customerId']] = profile
async def handle_order_completed(self, event):
"""Projektion wird durch OrderCompleted Event aktualisiert"""
customer_id = event['customerId']
if customer_id in self.customer_profiles:
profile = self.customer_profiles[customer_id]
profile.total_orders += 1
profile.total_spent += Decimal(str(event['totalAmount']))
profile.last_order_date = datetime.fromisoformat(event['timestamp'])
# Loyalitätspunkte Berechnung
new_points = int(event['totalAmount']) // 10
profile.loyalty_points += new_points
async def handle_address_updated(self, event):
"""Adress-Updates aktualisieren bestehende Projektion"""
customer_id = event['customerId']
if customer_id in self.customer_profiles:
profile = self.customer_profiles[customer_id]
profile.shipping_address = event['newAddress']
profile.last_updated = datetime.fromisoformat(event['timestamp'])
def get_customer_profile(self, customer_id: str) -> Optional[CustomerProfile]:
return self.customer_profiles.get(customer_id)Je nach Anwendungsfall eignen sich verschiedene Strategien zur Materialisierung von Views. Die Wahl beeinflusst Performance, Speicherverbrauch und Konsistenzgarantien erheblich.
| Strategie | Beschreibung | Vorteile | Nachteile | Anwendungsfall |
|---|---|---|---|---|
| Eager Materialization | Views werden sofort bei Event-Eingang aktualisiert | Niedrige Read-Latenz | Höhere Write-Latenz | Häufig abgefragte Daten |
| Lazy Materialization | Views werden erst bei Abfrage berechnet | Geringer Speicherverbrauch | Höhere Read-Latenz | Selten abgefragte Daten |
| Snapshot Materialization | Periodische Vollberechnung | Konsistente Snapshots | Veraltete Zwischenstände | Reporting und Analytics |
| Incremental Materialization | Nur Änderungen werden verarbeitet | Effiziente Updates | Komplexere Logik | Real-time Dashboards |
Eager Materialization eignet sich für häufig abgefragte Views wie Produktkataloge oder Kundenprofile:
@Component
public class ProductCatalogView {
private final Map<String, ProductInfo> catalog = new ConcurrentHashMap<>();
@KafkaListener(topics = "product.created.v1")
public void handleProductCreated(ProductCreatedEvent event) {
// Sofortige Materialisierung
ProductInfo product = ProductInfo.builder()
.productId(event.getProductId())
.name(event.getName())
.description(event.getDescription())
.category(event.getCategory())
.basePrice(event.getPrice())
.available(true)
.build();
catalog.put(event.getProductId(), product);
}
@KafkaListener(topics = "inventory.updated.v1")
public void handleInventoryUpdated(InventoryUpdatedEvent event) {
// Sofortige Aktualisierung der Verfügbarkeit
ProductInfo product = catalog.get(event.getProductId());
if (product != null) {
product.setStockLevel(event.getNewStockLevel());
product.setAvailable(event.getNewStockLevel() > 0);
}
}
// Read-Zugriff ohne Latenz
public ProductInfo getProduct(String productId) {
return catalog.get(productId);
}
public List<ProductInfo> getProductsByCategory(String category) {
return catalog.values().stream()
.filter(product -> category.equals(product.getCategory()))
.collect(Collectors.toList());
}
}Lazy Materialization berechnet Views on-demand und eignet sich für komplexe Aggregationen:
from functools import lru_cache
from typing import List, Dict
import asyncio
class OrderAnalyticsView:
def __init__(self):
self.order_events: List[Dict] = []
self.payment_events: List[Dict] = []
async def handle_order_event(self, event):
"""Events werden gesammelt, aber nicht sofort materialisiert"""
self.order_events.append(event)
# Cache invalidation bei neuen Events
self.get_monthly_revenue.cache_clear()
self.get_top_customers.cache_clear()
async def handle_payment_event(self, event):
self.payment_events.append(event)
self.get_monthly_revenue.cache_clear()
@lru_cache(maxsize=100)
def get_monthly_revenue(self, year: int, month: int) -> Dict:
"""Lazy materialization - berechnet bei Bedarf"""
monthly_orders = [
event for event in self.order_events
if self._is_in_month(event['timestamp'], year, month)
]
total_revenue = sum(
order['totalAmount'] for order in monthly_orders
)
return {
'year': year,
'month': month,
'total_revenue': total_revenue,
'order_count': len(monthly_orders),
'average_order_value': total_revenue / len(monthly_orders) if monthly_orders else 0
}
@lru_cache(maxsize=50)
def get_top_customers(self, limit: int = 10) -> List[Dict]:
"""On-demand Berechnung der Top-Kunden"""
customer_totals = {}
for order in self.order_events:
customer_id = order['customerId']
if customer_id not in customer_totals:
customer_totals[customer_id] = 0
customer_totals[customer_id] += order['totalAmount']
return sorted(
[{'customer_id': k, 'total_spent': v} for k, v in customer_totals.items()],
key=lambda x: x['total_spent'],
reverse=True
)[:limit]Snapshot Materialization erstellt regelmäßige, konsistente Zustandsabbilder:
@Component
public class DailyReportGenerator {
@Scheduled(cron = "0 0 2 * * *") // Täglich um 2 Uhr
public void generateDailySnapshot() {
LocalDate reportDate = LocalDate.now().minusDays(1);
DailyReport report = DailyReport.builder()
.reportDate(reportDate)
.totalOrders(calculateDailyOrders(reportDate))
.totalRevenue(calculateDailyRevenue(reportDate))
.newCustomers(calculateNewCustomers(reportDate))
.returnCustomers(calculateReturnCustomers(reportDate))
.topProducts(calculateTopProducts(reportDate))
.build();
reportRepository.save(report);
// Snapshot ist konsistent zum Zeitpunkt der Erstellung
publishEvent(new DailyReportGeneratedEvent(reportDate, report));
}
}Materialisierte Views in Event-getriebenen Systemen folgen verschiedenen Konsistenzmodellen. Die Wahl beeinflusst die Garantien, die das System gegenüber Lesezugriffen geben kann.
Eventual Consistency ist das häufigste Modell in verteilten Event-Systemen. Views werden asynchron aktualisiert und erreichen nach einer unbestimmten Zeit einen konsistenten Zustand:
@Component
public class EventuallyConsistentInventoryView {
private final Map<String, InventoryStatus> inventory = new ConcurrentHashMap<>();
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) {
// Reservation wird asynchron verarbeitet
event.getItems().forEach(item -> {
InventoryStatus status = inventory.get(item.getProductId());
if (status != null) {
// Temporäre Inkonsistenz möglich
status.setReservedQuantity(
status.getReservedQuantity() + item.getQuantity()
);
}
});
}
@KafkaListener(topics = "inventory.reserved.v1")
public void handleInventoryReserved(InventoryReservedEvent event) {
// Eventual consistency - finaler konsistenter Zustand
InventoryStatus status = inventory.get(event.getProductId());
if (status != null) {
status.setAvailableQuantity(
status.getAvailableQuantity() - event.getReservedQuantity()
);
status.setLastUpdated(event.getTimestamp());
}
}
}Read-your-writes Consistency stellt sicher, dass ein Client seine eigenen Schreibvorgänge sofort lesen kann:
import asyncio
from collections import defaultdict
class ReadYourWritesOrderView:
def __init__(self):
self.order_states = {}
self.client_versions = defaultdict(int) # Track client versions
async def handle_order_placed(self, event, client_id=None):
"""Schreibvorgang wird für Client markiert"""
order_id = event['orderId']
self.order_states[order_id] = {
'status': 'PLACED',
'items': event['items'],
'timestamp': event['timestamp']
}
if client_id:
# Version für Client erhöhen
self.client_versions[client_id] += 1
self.order_states[order_id]['client_version'] = self.client_versions[client_id]
async def get_order_status(self, order_id: str, client_id: str = None):
"""Read garantiert eigene Writes zu sehen"""
order = self.order_states.get(order_id)
if client_id and order:
# Client sieht mindestens seine eigene Version
client_version = self.client_versions.get(client_id, 0)
order_version = order.get('client_version', 0)
if order_version >= client_version:
return order
else:
# Warten auf Konsistenz
await self._wait_for_consistency(order_id, client_version)
return self.order_states.get(order_id)
return orderSession Consistency erweitert Read-your-writes auf Session-Ebene:
| Konsistenzlevel | Garantie | Performance | Komplexität | Anwendungsfall |
|---|---|---|---|---|
| Eventual | Alle Clients sehen am Ende denselben Zustand | Hoch | Niedrig | Analytics, Reporting |
| Read-your-writes | Client sieht eigene Änderungen sofort | Mittel | Mittel | User Interfaces |
| Session | Konsistenz innerhalb einer Session | Mittel | Mittel | Shopping Carts |
| Strong | Alle Clients sehen sofort denselben Zustand | Niedrig | Hoch | Financial Transactions |
Die Materialierung von Views aus Events ermöglicht es, komplexe Read-Models zu erstellen, die optimal für spezifische Abfrageanforderungen gestaltet sind. Dabei ist die Wahl der richtigen Materialisierungsstrategie und des Konsistenzmodells entscheidend für die Performance und Korrektheit des Systems. In der Praxis werden oft mehrere Views mit unterschiedlichen Strategien kombiniert, um verschiedene Anwendungsfälle optimal zu unterstützen.