47 CQRS als konsequente Entkopplung von Lesen und Schreiben

47.1 Command and Query Separation

Betrachten Sie Ihr letztes System-Design: Haben Sie schon einmal erlebt, dass eine komplexe Abfrage die Performance Ihrer Schreiboperationen beeinflusst hat? Oder dass Optimierungen für Reports die Struktur Ihrer Geschäftslogik verkompliziert haben?

CQRS (Command Query Responsibility Segregation) stellt eine einfache, aber mächtige Frage: Warum sollten Lesen und Schreiben dieselbe Datenstruktur verwenden?

47.1.1 Das Problem mit vereinheitlichten Modellen

Als Beispiel dient eine traditionelle Order-Entity:

// Traditionell: Ein Modell für alles
@Entity
public class Order {
    private UUID id;
    private UUID customerId;
    private String status;
    private BigDecimal totalAmount;
    private LocalDateTime createdAt;
    private List<OrderItem> items;
    
    // Geschäftslogik für Commands
    public void markAsPaid() { /* ... */ }
    public void addItem(OrderItem item) { /* ... */ }
    public void cancel() { /* ... */ }
    
    // Optimierungen für Queries
    @Formula("(SELECT COUNT(*) FROM order_items oi WHERE oi.order_id = id)")
    private int itemCount;
    
    @Formula("(SELECT c.name FROM customers c WHERE c.id = customer_id)")
    private String customerName;
}

Welche Probleme sehen Sie in diesem Ansatz? Das Modell versucht gleichzeitig mehreren Herren zu dienen:

47.1.2 CQRS: Getrennte Verantwortlichkeiten

Was wäre, wenn wir diese Verantwortlichkeiten trennen würden? CQRS schlägt vor: Unterschiedliche Modelle für unterschiedliche Zwecke.

// Command Model: Optimiert für Geschäftslogik
public class OrderAggregate {
    private UUID orderId;
    private UUID customerId;
    private OrderStatus status;
    private List<OrderItem> items;
    
    // Fokus: Geschäftsregeln und Invarianten
    public List<OrderEvent> placeOrder(List<OrderItem> items) {
        validateItems(items);
        ensureCustomerExists();
        
        this.status = OrderStatus.PLACED;
        this.items = items;
        
        return List.of(new OrderPlacedEvent(orderId, customerId, items));
    }
    
    public List<OrderEvent> markAsPaid(BigDecimal amount) {
        if (status != OrderStatus.PLACED) {
            throw new InvalidOrderStateException("Order must be placed to receive payment");
        }
        
        this.status = OrderStatus.PAID;
        return List.of(new PaymentReceivedEvent(orderId, amount));
    }
}
// Query Model: Optimiert für Abfragen
public class OrderView {
    private UUID orderId;
    private String customerName;        // Denormalisiert für Performance
    private String customerEmail;       // Direkt verfügbar
    private String status;
    private BigDecimal totalAmount;
    private LocalDateTime placedAt;
    private int itemCount;              // Vorberechnet
    private List<OrderItemView> items;  // Flache Struktur
    
    // Keine Geschäftslogik - nur Daten
    // Optimiert für verschiedene Abfragen
}

Überlegen Sie: Wie unterscheiden sich die Anforderungen an diese beiden Modelle? Welche Optimierungen können Sie in jedem Modell vornehmen, die im anderen Modell problematisch wären?

47.1.3 Commands vs. Queries in der Praxis

// Spring Boot: Command Side
@RestController
public class OrderCommandController {
    
    private final OrderCommandService commandService;
    
    @PostMapping("/orders")
    public ResponseEntity<CommandResult> placeOrder(@RequestBody PlaceOrderCommand command) {
        // Command wird verarbeitet, Events erzeugt
        CommandResult result = commandService.handle(command);
        
        // Nur Bestätigung zurückgeben, keine Query-Daten
        return ResponseEntity.ok(result);
    }
    
    @PutMapping("/orders/{orderId}/payment")
    public ResponseEntity<CommandResult> recordPayment(
            @PathVariable UUID orderId, 
            @RequestBody RecordPaymentCommand command) {
        
        CommandResult result = commandService.handle(command);
        return ResponseEntity.ok(result);
    }
}
// Spring Boot: Query Side
@RestController
public class OrderQueryController {
    
    private final OrderQueryService queryService;
    
    @GetMapping("/orders/{orderId}")
    public ResponseEntity<OrderView> getOrder(@PathVariable UUID orderId) {
        // Optimiert für Lesezugriff
        OrderView order = queryService.getOrderById(orderId);
        return ResponseEntity.ok(order);
    }
    
    @GetMapping("/customers/{customerId}/orders")
    public ResponseEntity<List<OrderSummaryView>> getCustomerOrders(
            @PathVariable UUID customerId,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        
        // Spezialisierte Abfrage mit Pagination
        List<OrderSummaryView> orders = queryService.getOrdersForCustomer(
            customerId, page, size
        );
        return ResponseEntity.ok(orders);
    }
}

Frage zur Reflexion: Wie würde sich diese Trennung auf Ihre API-Dokumentation auswirken? Welche Vorteile bringt es für API-Konsumenten?

47.2 Independent Scaling

Ein E-Commerce-System hat z.B. ein interessantes Problem: 90% der Requests sind Lesezugriffe (Produktkatalog, Bestellhistorie, Dashboards), aber nur 10% sind Schreibvorgänge (neue Bestellungen, Updates). Wie würden Sie Ihre Infrastruktur optimieren?

47.2.1 Unterschiedliche Last-Charakteristiken

Aspekt Command Side Query Side
Häufigkeit Selten, aber kritisch Häufig, performance-sensitiv
Latenz-Anforderungen Konsistenz wichtiger Sub-Sekunden Response
Ressourcen-Bedarf CPU für Geschäftslogik Memory/Disk für Caching
Skalierungs-Pattern Vertikal (stärkere Hardware) Horizontal (mehr Instanzen)
# Python: Unterschiedliche Deployment-Strategien
# Command Service - Wenige, starke Instanzen
class OrderCommandService:
    def __init__(self):
        self.event_store = EventStore()
        self.aggregate_repository = AggregateRepository()
    
    async def handle_place_order(self, command):
        # Ressourcen-intensive Geschäftslogik
        aggregate = await self.aggregate_repository.get_or_create(command.order_id)
        events = aggregate.place_order(command.items)
        
        # Transactional write
        await self.event_store.save_events(command.order_id, events)
        await self.publish_events(events)

# Query Service - Viele, leichtgewichtige Instanzen  
class OrderQueryService:
    def __init__(self):
        self.read_model_cache = RedisCache()
        self.read_model_store = ReadModelStore()
    
    async def get_order(self, order_id):
        # Cache-optimiert für hohe Durchsätze
        cached_order = await self.read_model_cache.get(f"order:{order_id}")
        if cached_order:
            return cached_order
        
        order = await self.read_model_store.get_order(order_id)
        await self.read_model_cache.set(f"order:{order_id}", order, ttl=300)
        return order

47.2.2 Unabhängige Skalierungs-Strategien

Welche Skalierungs-Herausforderungen haben Sie in Ihren bisherigen Systemen erlebt? CQRS ermöglicht es, jede Seite nach ihren spezifischen Anforderungen zu skalieren:

# Docker Compose: Unterschiedliche Skalierung
version: '3.8'
services:
  order-command-service:
    image: order-command:latest
    deploy:
      replicas: 2  # Wenige Instanzen
      resources:
        limits:
          cpus: '2.0'     # Mehr CPU für Geschäftslogik
          memory: 1G
    environment:
      - PROFILE=command
      - DB_POOL_SIZE=10
  
  order-query-service:
    image: order-query:latest
    deploy:
      replicas: 8  # Viele Instanzen für Load
      resources:
        limits:
          cpus: '0.5'     # Weniger CPU pro Instanz
          memory: 512M
    environment:
      - PROFILE=query
      - CACHE_SIZE=256M
      - READ_REPLICA_COUNT=3

Überlegen Sie: Wie würden sich Ihre Monitoring-Metriken zwischen Command- und Query-Seite unterscheiden?

47.2.3 Geo-Distribution und Read Replicas

// Spring Boot: Geo-verteilte Query-Services
@Configuration
public class QueryServiceConfiguration {
    
    @Bean
    @Primary
    public DataSource primaryReadOnlyDataSource() {
        // Lokale Read-Replica für niedrige Latenz
        return DataSourceBuilder.create()
            .url("jdbc:postgresql://local-replica:5432/orders_readonly")
            .build();
    }
    
    @Bean("globalQueryDataSource")
    public DataSource globalQueryDataSource() {
        // Globale Verteilung für internationale Queries
        return DataSourceBuilder.create()
            .url("jdbc:postgresql://global-replica:5432/orders_readonly")
            .build();
    }
    
    @Bean
    public OrderQueryService geoOptimizedQueryService() {
        return new OrderQueryService(
            primaryReadOnlyDataSource(),
            redisCluster(),
            geoLocationService()
        );
    }
}

Denken Sie über Ihre Benutzer nach: Welche geografischen Verteilungen würden für Ihre Anwendung Sinn machen?

47.3 Different Data Models

Hier eine provokante Frage: Warum sollte eine Tabelle, die für schnelle Writes optimiert ist, dieselbe Struktur haben wie eine Tabelle, die für komplexe Analytical Queries optimiert ist?

47.3.1 Write-optimierte vs. Read-optimierte Strukturen

-- Command Side: Normalisiert für Datenintegrität
CREATE TABLE orders (
    id UUID PRIMARY KEY,
    customer_id UUID NOT NULL,
    status VARCHAR(20) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    FOREIGN KEY (customer_id) REFERENCES customers(id)
);

CREATE TABLE order_items (
    id UUID PRIMARY KEY,
    order_id UUID NOT NULL,
    product_id UUID NOT NULL,
    quantity INTEGER NOT NULL,
    unit_price DECIMAL(10,2) NOT NULL,
    FOREIGN KEY (order_id) REFERENCES orders(id),
    FOREIGN KEY (product_id) REFERENCES products(id)
);
-- Query Side: Denormalisiert für Performance
CREATE TABLE order_views (
    order_id UUID PRIMARY KEY,
    customer_id UUID,
    customer_name VARCHAR(255),        -- Denormalisiert
    customer_email VARCHAR(255),       -- Denormalisiert
    status VARCHAR(20),
    total_amount DECIMAL(12,2),        -- Vorberechnet
    item_count INTEGER,                -- Vorberechnet
    created_at TIMESTAMP,
    last_updated TIMESTAMP,
    
    -- Für häufige Abfragen optimierte Indizes
    INDEX idx_customer_created (customer_id, created_at),
    INDEX idx_status_amount (status, total_amount),
    INDEX idx_created_status (created_at, status)
);

-- Separate Tabelle für Item-Details wenn benötigt
CREATE TABLE order_item_views (
    order_id UUID,
    product_id UUID,
    product_name VARCHAR(255),         -- Denormalisiert
    product_category VARCHAR(100),     -- Denormalisiert
    quantity INTEGER,
    unit_price DECIMAL(10,2),
    total_price DECIMAL(10,2),         -- Vorberechnet
    
    INDEX idx_order_product (order_id, product_id),
    INDEX idx_category_price (product_category, total_price)
);

Welche Trade-offs sehen Sie in diesem Ansatz? Die Query-Seite opfert Speicherplatz und Normalisierung für Performance.

47.3.2 Spezialisierte Read Models für verschiedene Use Cases

// Spring Boot: Verschiedene Read Models
@Entity
@Table(name = "customer_order_summary")
public class CustomerOrderSummaryView {
    // Optimiert für Kundenportal
    private UUID customerId;
    private String customerName;
    private Integer totalOrders;
    private BigDecimal totalSpent;
    private LocalDateTime lastOrderDate;
    private String favoriteCategory;
}

@Entity  
@Table(name = "sales_analytics")
public class SalesAnalyticsView {
    // Optimiert für Business Intelligence
    private LocalDate date;
    private String productCategory;
    private String region;
    private Integer orderCount;
    private BigDecimal revenue;
    private BigDecimal averageOrderValue;
}

@Entity
@Table(name = "inventory_impact")  
public class InventoryImpactView {
    // Optimiert für Lagerverwaltung
    private UUID productId;
    private String productName;
    private Integer reservedQuantity;
    private Integer shippedQuantity;
    private LocalDateTime lastReservation;
    private LocalDateTime nextExpectedShipment;
}

Überlegen Sie: Welche anderen spezialisierten Views könnten für Ihr Geschäftsmodell relevant sein?

47.3.3 Technologie-Mix für optimale Performance

# Python: Verschiedene Datenbanken für verschiedene Zwecke
class MultiStorageQueryService:
    def __init__(self):
        self.postgres_client = PostgresClient()      # Strukturierte Queries
        self.redis_client = RedisClient()            # Caching
        self.elasticsearch_client = ESClient()       # Volltextsuche
        self.clickhouse_client = ClickHouseClient()  # Analytics
    
    async def get_order_details(self, order_id):
        # PostgreSQL für strukturierte Daten
        return await self.postgres_client.get_order(order_id)
    
    async def search_orders(self, search_term):
        # Elasticsearch für Volltextsuche
        return await self.elasticsearch_client.search_orders(search_term)
    
    async def get_sales_trends(self, time_range):
        # ClickHouse für analytische Queries
        return await self.clickhouse_client.get_sales_trends(time_range)
    
    async def get_frequent_customers(self):
        # Redis für häufig abgefragte, einfache Daten
        cached_result = await self.redis_client.get("frequent_customers")
        if cached_result:
            return cached_result
        
        # Fallback zu PostgreSQL
        result = await self.postgres_client.get_frequent_customers()
        await self.redis_client.setex("frequent_customers", 3600, result)
        return result

Braucht Ihre Anwendung z.B. Volltextsuche, Real-time Analytics und transaktionale Konsistenz. Müssen alle diese Anforderungen von derselben Datenbank erfüllt werden?

47.3.4 Synchronisation zwischen den Modellen

// Event-driven Synchronisation
@EventListener
public class ReadModelUpdater {
    
    @Async
    @EventListener
    public void updateCustomerSummary(OrderPlacedEvent event) {
        // Customer Summary View aktualisieren
        customerSummaryRepository.incrementOrderCount(event.getCustomerId());
        customerSummaryRepository.addToTotalSpent(event.getCustomerId(), event.getAmount());
    }
    
    @Async
    @EventListener  
    public void updateSalesAnalytics(OrderPlacedEvent event) {
        // Sales Analytics View aktualisieren
        LocalDate orderDate = event.getTimestamp().toLocalDate();
        salesAnalyticsRepository.incrementDailySales(orderDate, event.getAmount());
    }
    
    @Async
    @EventListener
    public void updateInventoryImpact(OrderPlacedEvent event) {
        // Inventory Impact View aktualisieren
        for (OrderItem item : event.getItems()) {
            inventoryRepository.reserveQuantity(item.getProductId(), item.getQuantity());
        }
    }
}

Abschließende Überlegung: CQRS mag zunächst wie “mehr Komplexität” aussehen. Aber ist ein System, das versucht, alle Anforderungen mit einem einzigen Modell zu erfüllen, wirklich einfacher?

In der Praxis ermöglicht CQRS es, jede Seite für ihren spezifischen Zweck zu optimieren. Das Ergebnis sind oft performantere, wartbarere und skalierbarere Systeme – sobald man die Komplexität des Starts überwunden hat.

Im nächsten Kapitel schauen wir uns die praktische Umsetzung an und implementieren ein vollständiges CQRS-System mit Spring Boot und Python.