68 Weiterführende Themen: FaaS, Event Meshes, Streaming, Cloud

Event-Driven Architecture entwickelt sich kontinuierlich weiter, insbesondere im Cloud-Native-Umfeld. Moderne Patterns wie Serverless Computing, Event Meshes und Stream Processing Platforms erweitern die Möglichkeiten von EDA erheblich. Diese Ansätze bieten neue Wege für Skalierung, Abstraktion und Integration, bringen aber auch neue Komplexitäten mit sich.

68.1 Serverless Event Processing

68.1.1 Function-as-a-Service (FaaS) für Event-Driven Workloads

Serverless Computing eignet sich besonders gut für Event-Processing, da Events naturgemäß diskrete, zustandslose Verarbeitungsschritte auslösen:

AWS Lambda Event Processing:

import json
import boto3
from typing import Dict, Any

def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
    """Process Kafka events via AWS Lambda"""
    
    records_processed = 0
    failed_records = []
    
    # Lambda receives batches of Kafka records
    for record in event.get('records', []):
        try:
            # Decode Kafka message
            kafka_data = json.loads(record['value'])
            
            # Extract business event
            order_data = json.loads(kafka_data['data'])
            
            # Process the order event
            result = process_order_placed(order_data)
            
            # Publish downstream event if needed
            if result.get('publish_event'):
                publish_payment_event(result['payment_data'])
            
            records_processed += 1
            
        except Exception as e:
            # Collect failed records for partial batch failure
            failed_records.append({
                'itemIdentifier': record.get('key', 'unknown'),
                'error': str(e)
            })
    
    # Return batch processing results
    response = {
        'batchItemFailures': failed_records
    }
    
    print(f"Processed {records_processed} records, {len(failed_records)} failures")
    return response

def process_order_placed(order_data: Dict[str, Any]) -> Dict[str, Any]:
    """Business logic for order processing"""
    
    # Validate order
    if not validate_order(order_data):
        raise ValueError("Invalid order data")
    
    # Calculate pricing
    pricing = calculate_pricing(order_data)
    
    # Reserve inventory
    inventory_result = reserve_inventory(order_data['items'])
    
    if not inventory_result['success']:
        raise RuntimeError("Inventory reservation failed")
    
    # Prepare payment processing
    payment_data = {
        'order_id': order_data['orderId'],
        'amount': pricing['total'],
        'currency': 'EUR',
        'customer_id': order_data['customerId']
    }
    
    return {
        'publish_event': True,
        'payment_data': payment_data
    }

def publish_payment_event(payment_data: Dict[str, Any]):
    """Publish payment event to EventBridge"""
    
    eventbridge = boto3.client('events')
    
    event_detail = {
        'eventType': 'PaymentInitiated',
        'timestamp': datetime.utcnow().isoformat(),
        'data': payment_data
    }
    
    eventbridge.put_events(
        Entries=[
            {
                'Source': 'order-processor',
                'DetailType': 'Payment Event',
                'Detail': json.dumps(event_detail),
                'EventBusName': 'payment-events'
            }
        ]
    )

Azure Functions mit Event Hubs:

@FunctionName("OrderEventProcessor")
public void processOrderEvents(
    @EventHubTrigger(
        name = "orderEvents",
        eventHubName = "order-placed-events",
        connection = "EventHubConnectionString"
    ) List<String> events,
    @EventHubOutput(
        name = "paymentEvents",
        eventHubName = "payment-initiated-events", 
        connection = "EventHubConnectionString"
    ) OutputBinding<List<PaymentEvent>> outputEvents,
    ExecutionContext context) {
    
    List<PaymentEvent> paymentEvents = new ArrayList<>();
    
    for (String eventData : events) {
        try {
            OrderPlacedEvent orderEvent = objectMapper.readValue(eventData, OrderPlacedEvent.class);
            
            // Process order
            PaymentEvent paymentEvent = processOrder(orderEvent);
            paymentEvents.add(paymentEvent);
            
            context.getLogger().info("Processed order: " + orderEvent.getOrderId());
            
        } catch (Exception e) {
            context.getLogger().severe("Failed to process event: " + e.getMessage());
            // In production: send to dead letter queue
        }
    }
    
    outputEvents.setValue(paymentEvents);
}

private PaymentEvent processOrder(OrderPlacedEvent orderEvent) {
    // Validation
    validateOrder(orderEvent);
    
    // Business logic
    BigDecimal totalAmount = calculateTotal(orderEvent.getItems());
    
    // Create payment event
    return PaymentEvent.builder()
            .orderId(orderEvent.getOrderId())
            .customerId(orderEvent.getCustomerId())
            .amount(totalAmount)
            .currency("EUR")
            .timestamp(Instant.now())
            .build();
}

68.1.2 Serverless Event Orchestration

Komplexere Workflows lassen sich mit Serverless-Orchestration-Services realisieren:

AWS Step Functions für Event-Driven Workflows:

{
  "Comment": "Order Processing Workflow",
  "StartAt": "ProcessOrder",
  "States": {
    "ProcessOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:ProcessOrder",
      "Next": "CheckInventory",
      "Retry": [
        {
          "ErrorEquals": ["States.TaskFailed"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
    "CheckInventory": {
      "Type": "Task", 
      "Resource": "arn:aws:lambda:region:account:function:CheckInventory",
      "Next": "InventoryAvailable?",
      "Catch": [
        {
          "ErrorEquals": ["InventoryUnavailable"],
          "Next": "HandleInventoryFailure"
        }
      ]
    },
    "InventoryAvailable?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.inventory.available",
          "BooleanEquals": true,
          "Next": "ProcessPayment"
        }
      ],
      "Default": "HandleInventoryFailure"
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:ProcessPayment",
      "Next": "PublishOrderConfirmed",
      "Catch": [
        {
          "ErrorEquals": ["PaymentFailed"],
          "Next": "HandlePaymentFailure"
        }
      ]
    },
    "PublishOrderConfirmed": {
      "Type": "Task",
      "Resource": "arn:aws:states:::events:putEvents",
      "Parameters": {
        "Entries": [
          {
            "Source": "order-processor",
            "DetailType": "Order Confirmed",
            "Detail.$": "$.orderConfirmation"
          }
        ]
      },
      "End": true
    },
    "HandleInventoryFailure": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:region:account:function:HandleInventoryFailure",
      "End": true
    },
    "HandlePaymentFailure": {
      "Type": "Task", 
      "Resource": "arn:aws:lambda:region:account:function:HandlePaymentFailure",
      "End": true
    }
  }
}

68.1.3 Cold Start Optimization für Event Processing

import asyncio
import aiohttp
from functools import lru_cache
from typing import Optional

class OptimizedEventProcessor:
    """Optimized event processor for serverless environments"""
    
    def __init__(self):
        self.http_session: Optional[aiohttp.ClientSession] = None
        self.connection_pool = None
        
    @lru_cache(maxsize=128)
    def get_cached_config(self, key: str) -> str:
        """Cache configuration to avoid repeated lookups"""
        return os.environ.get(key)
    
    async def get_http_session(self) -> aiohttp.ClientSession:
        """Reuse HTTP session across invocations"""
        if self.http_session is None or self.http_session.closed:
            connector = aiohttp.TCPConnector(
                limit=100,  # Connection pool size
                ttl_dns_cache=300,  # DNS cache
                use_dns_cache=True
            )
            self.http_session = aiohttp.ClientSession(
                connector=connector,
                timeout=aiohttp.ClientTimeout(total=30)
            )
        return self.http_session
    
    async def process_events_batch(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Process events in parallel for better performance"""
        
        # Process events concurrently
        tasks = [self.process_single_event(event) for event in events]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Separate successful results from errors
        successful_results = []
        errors = []
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                errors.append({
                    'event_index': i,
                    'error': str(result)
                })
            else:
                successful_results.append(result)
        
        return {
            'successful_results': successful_results,
            'errors': errors
        }
    
    async def process_single_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """Process individual event with optimizations"""
        
        session = await self.get_http_session()
        
        # Process business logic
        order_data = event['data']
        
        # Parallel external calls
        inventory_task = self.check_inventory(session, order_data['items'])
        pricing_task = self.calculate_pricing(session, order_data)
        
        inventory_result, pricing_result = await asyncio.gather(
            inventory_task, pricing_task
        )
        
        return {
            'order_id': order_data['orderId'],
            'inventory': inventory_result,
            'pricing': pricing_result,
            'processed_at': datetime.utcnow().isoformat()
        }

# Global processor instance to reuse across invocations
processor = OptimizedEventProcessor()

async def lambda_handler(event, context):
    """Lambda handler with optimizations"""
    
    try:
        result = await processor.process_events_batch(event['records'])
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

68.2 Cloud-native Patterns

68.2.1 Cloud Events Standard

Cloud Events standardisiert Event-Formate für Cloud-native Umgebungen:

@Component
public class CloudEventProcessor {
    
    private final CloudEventBuilder cloudEventBuilder;
    
    public CloudEvent createOrderPlacedEvent(OrderPlaced order) {
        return cloudEventBuilder.v1()
                .withId(UUID.randomUUID().toString())
                .withSource(URI.create("https://orders.company.com/"))
                .withType("com.company.orders.placed.v1")
                .withDataContentType("application/json")
                .withSubject("order/" + order.getOrderId())
                .withTime(OffsetDateTime.now())
                .withData(serializeOrder(order))
                .withExtension("customer-tier", order.getCustomerTier())
                .withExtension("order-priority", order.getPriority())
                .build();
    }
    
    @KafkaListener(topics = "order-events")
    public void handleCloudEvent(@Payload CloudEvent cloudEvent) {
        
        // Extract standard CloudEvent metadata
        String eventType = cloudEvent.getType();
        String source = cloudEvent.getSource().toString();
        OffsetDateTime eventTime = cloudEvent.getTime();
        
        // Route based on event type
        switch (eventType) {
            case "com.company.orders.placed.v1" -> processOrderPlaced(cloudEvent);
            case "com.company.payments.processed.v1" -> processPaymentProcessed(cloudEvent);
            case "com.company.inventory.reserved.v1" -> processInventoryReserved(cloudEvent);
            default -> log.warn("Unknown event type: {}", eventType);
        }
    }
    
    private void processOrderPlaced(CloudEvent cloudEvent) {
        // Extract custom extensions
        String customerTier = cloudEvent.getExtension("customer-tier").toString();
        String priority = cloudEvent.getExtension("order-priority").toString();
        
        // Deserialize event data
        OrderPlaced order = deserializeEventData(cloudEvent.getData(), OrderPlaced.class);
        
        // Process based on customer tier and priority
        if ("premium".equals(customerTier) || "high".equals(priority)) {
            processPriorityOrder(order);
        } else {
            processStandardOrder(order);
        }
    }
}

68.2.2 Multi-Cloud Event Integration

from abc import ABC, abstractmethod
from typing import Dict, Any, List
import asyncio

class CloudEventBridge(ABC):
    """Abstract base for cloud event bridges"""
    
    @abstractmethod
    async def publish_event(self, event: Dict[str, Any]) -> bool:
        pass
    
    @abstractmethod
    async def subscribe_to_events(self, event_types: List[str], callback) -> bool:
        pass

class AWSEventBridge(CloudEventBridge):
    def __init__(self, region: str):
        self.eventbridge = boto3.client('events', region_name=region)
    
    async def publish_event(self, event: Dict[str, Any]) -> bool:
        try:
            response = self.eventbridge.put_events(
                Entries=[
                    {
                        'Source': event.get('source', 'default'),
                        'DetailType': event.get('type', 'Custom Event'),
                        'Detail': json.dumps(event.get('data', {})),
                        'EventBusName': event.get('eventbus', 'default')
                    }
                ]
            )
            return response['FailedEntryCount'] == 0
        except Exception as e:
            print(f"Failed to publish to AWS EventBridge: {e}")
            return False

class AzureEventGridBridge(CloudEventBridge):
    def __init__(self, endpoint: str, access_key: str):
        self.endpoint = endpoint
        self.access_key = access_key
    
    async def publish_event(self, event: Dict[str, Any]) -> bool:
        try:
            headers = {
                'aeg-sas-key': self.access_key,
                'Content-Type': 'application/json'
            }
            
            event_data = {
                'id': event.get('id', str(uuid.uuid4())),
                'subject': event.get('subject', 'default'),
                'dataVersion': '1.0',
                'eventType': event.get('type', 'Custom'),
                'data': event.get('data', {}),
                'eventTime': event.get('time', datetime.utcnow().isoformat())
            }
            
            async with aiohttp.ClientSession() as session:
                async with session.post(self.endpoint, json=[event_data], headers=headers) as response:
                    return response.status == 200
                    
        except Exception as e:
            print(f"Failed to publish to Azure Event Grid: {e}")
            return False

class MultiCloudEventRouter:
    """Route events across multiple cloud providers"""
    
    def __init__(self):
        self.bridges: Dict[str, CloudEventBridge] = {}
        self.routing_rules: List[Dict[str, Any]] = []
    
    def add_bridge(self, name: str, bridge: CloudEventBridge):
        self.bridges[name] = bridge
    
    def add_routing_rule(self, rule: Dict[str, Any]):
        """Add routing rule: {'condition': lambda event: ..., 'targets': ['aws', 'azure']}"""
        self.routing_rules.append(rule)
    
    async def route_event(self, event: Dict[str, Any]) -> Dict[str, bool]:
        """Route event to appropriate cloud providers"""
        
        results = {}
        
        for rule in self.routing_rules:
            if rule['condition'](event):
                # Route to specified targets
                tasks = []
                for target in rule['targets']:
                    if target in self.bridges:
                        tasks.append(self.bridges[target].publish_event(event))
                
                if tasks:
                    target_results = await asyncio.gather(*tasks, return_exceptions=True)
                    for i, target in enumerate(rule['targets']):
                        results[target] = not isinstance(target_results[i], Exception)
        
        return results

# Usage example
router = MultiCloudEventRouter()

# Configure bridges
aws_bridge = AWSEventBridge('us-east-1')
azure_bridge = AzureEventGridBridge('https://topic.eastus-1.eventgrid.azure.net/api/events', 'access-key')

router.add_bridge('aws', aws_bridge)
router.add_bridge('azure', azure_bridge)

# Configure routing rules
router.add_routing_rule({
    'condition': lambda event: event.get('type', '').startswith('com.company.orders'),
    'targets': ['aws', 'azure']  # Order events go to both
})

router.add_routing_rule({
    'condition': lambda event: event.get('source', '').endswith('payment-service'),
    'targets': ['aws']  # Payment events only to AWS
})

68.3 Event Mesh Architectures

68.3.1 Distributed Event Mesh

Event Meshes entkoppeln Event-Produktion und -Konsumption vollständig durch intelligente Routing-Layer:

@Component
public class EventMeshNode {
    
    private final Map<String, EventRouter> routers = new ConcurrentHashMap<>();
    private final Map<String, SubscriptionFilter> subscriptions = new ConcurrentHashMap<>();
    
    @PostConstruct
    public void initializeMeshNode() {
        // Register with mesh discovery service
        registerWithMesh();
        
        // Start health checking
        startHealthChecking();
        
        // Initialize event routing
        initializeRouting();
    }
    
    public void publishToMesh(Event event) {
        // Enrich event with mesh metadata
        MeshEvent meshEvent = enrichEventWithMeshMetadata(event);
        
        // Route to interested subscribers
        routeEventToSubscribers(meshEvent);
        
        // Propagate to other mesh nodes if needed
        propagateToOtherNodes(meshEvent);
    }
    
    private void routeEventToSubscribers(MeshEvent meshEvent) {
        subscriptions.values().parallelStream()
                .filter(subscription -> subscription.matches(meshEvent))
                .forEach(subscription -> {
                    try {
                        subscription.deliver(meshEvent);
                    } catch (Exception e) {
                        handleDeliveryFailure(subscription, meshEvent, e);
                    }
                });
    }
    
    private MeshEvent enrichEventWithMeshMetadata(Event event) {
        return MeshEvent.builder()
                .originalEvent(event)
                .meshNodeId(getNodeId())
                .routingPath(new ArrayList<>())
                .ttl(DEFAULT_TTL)
                .qosLevel(QualityOfService.AT_LEAST_ONCE)
                .build();
    }
    
    @EventListener
    public void handleSubscriptionRequest(SubscriptionRequest request) {
        SubscriptionFilter filter = SubscriptionFilter.builder()
                .subscriberId(request.getSubscriberId())
                .eventTypePattern(request.getEventTypePattern())
                .sourcePattern(request.getSourcePattern())
                .deliveryEndpoint(request.getDeliveryEndpoint())
                .qosRequirements(request.getQosRequirements())
                .build();
                
        subscriptions.put(request.getSubscriberId(), filter);
        
        // Notify other mesh nodes about new subscription
        propagateSubscription(filter);
    }
}

@Component
public class EventMeshTopology {
    
    private final Graph<MeshNode, MeshConnection> topology = GraphBuilder.directed().build();
    
    public void addMeshNode(MeshNode node) {
        topology.addNode(node);
        
        // Auto-discover and connect to nearby nodes
        findAndConnectToNearbyNodes(node);
    }
    
    public List<MeshNode> findOptimalPath(MeshNode source, Event event) {
        // Find subscribers for this event type
        Set<MeshNode> interestedNodes = findNodesWithSubscribers(event.getType());
        
        // Calculate shortest paths to all interested nodes
        return interestedNodes.stream()
                .map(target -> findShortestPath(source, target))
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }
    
    private void findAndConnectToNearbyNodes(MeshNode newNode) {
        topology.nodes().stream()
                .filter(existingNode -> !existingNode.equals(newNode))
                .filter(existingNode -> isWithinLatencyThreshold(newNode, existingNode))
                .forEach(existingNode -> {
                    MeshConnection connection = establishConnection(newNode, existingNode);
                    topology.putEdge(newNode, existingNode, connection);
                });
    }
}

68.3.2 Smart Event Routing

from typing import Dict, List, Callable, Optional
from dataclasses import dataclass
import asyncio
from enum import Enum

class RoutingStrategy(Enum):
    BROADCAST = "broadcast"  # Send to all matching subscribers
    LOAD_BALANCE = "load_balance"  # Send to one of matching subscribers
    FAILOVER = "failover"  # Send to primary, fallback to secondary
    GEOGRAPHIC = "geographic"  # Route based on geographic proximity

@dataclass
class RoutingRule:
    condition: Callable[[Dict], bool]
    strategy: RoutingStrategy
    targets: List[str]
    priority: int = 0
    metadata: Dict[str, str] = None

class SmartEventRouter:
    def __init__(self):
        self.routing_rules: List[RoutingRule] = []
        self.subscribers: Dict[str, 'EventSubscriber'] = {}
        self.metrics: Dict[str, 'SubscriberMetrics'] = {}
        
    def add_routing_rule(self, rule: RoutingRule):
        """Add routing rule, sorted by priority"""
        self.routing_rules.append(rule)
        self.routing_rules.sort(key=lambda r: r.priority, reverse=True)
    
    async def route_event(self, event: Dict[str, Any]) -> List[str]:
        """Route event based on rules and return list of successful deliveries"""
        
        successful_deliveries = []
        
        for rule in self.routing_rules:
            if rule.condition(event):
                delivery_result = await self.execute_routing_strategy(event, rule)
                successful_deliveries.extend(delivery_result)
                break  # Use first matching rule
        
        return successful_deliveries
    
    async def execute_routing_strategy(self, event: Dict[str, Any], 
                                     rule: RoutingRule) -> List[str]:
        """Execute specific routing strategy"""
        
        if rule.strategy == RoutingStrategy.BROADCAST:
            return await self.broadcast_to_all(event, rule.targets)
        
        elif rule.strategy == RoutingStrategy.LOAD_BALANCE:
            return await self.load_balance_delivery(event, rule.targets)
        
        elif rule.strategy == RoutingStrategy.FAILOVER:
            return await self.failover_delivery(event, rule.targets)
        
        elif rule.strategy == RoutingStrategy.GEOGRAPHIC:
            return await self.geographic_routing(event, rule.targets)
        
        return []
    
    async def load_balance_delivery(self, event: Dict[str, Any], 
                                  targets: List[str]) -> List[str]:
        """Deliver to subscriber with lowest load"""
        
        # Filter available targets
        available_targets = [t for t in targets if self.is_subscriber_healthy(t)]
        
        if not available_targets:
            return []
        
        # Select target with lowest load
        selected_target = min(available_targets, 
                            key=lambda t: self.get_subscriber_load(t))
        
        success = await self.deliver_to_subscriber(event, selected_target)
        return [selected_target] if success else []
    
    async def failover_delivery(self, event: Dict[str, Any], 
                              targets: List[str]) -> List[str]:
        """Try targets in order until one succeeds"""
        
        for target in targets:
            if self.is_subscriber_healthy(target):
                success = await self.deliver_to_subscriber(event, target)
                if success:
                    return [target]
        
        return []
    
    async def geographic_routing(self, event: Dict[str, Any], 
                               targets: List[str]) -> List[str]:
        """Route to geographically closest subscriber"""
        
        event_region = event.get('metadata', {}).get('region', 'unknown')
        
        # Group targets by region
        regional_targets = {}
        for target in targets:
            target_region = self.get_subscriber_region(target)
            if target_region not in regional_targets:
                regional_targets[target_region] = []
            regional_targets[target_region].append(target)
        
        # Prefer same region, then closest regions
        if event_region in regional_targets:
            return await self.load_balance_delivery(event, regional_targets[event_region])
        else:
            # Find closest region
            closest_region = self.find_closest_region(event_region, regional_targets.keys())
            if closest_region:
                return await self.load_balance_delivery(event, regional_targets[closest_region])
        
        return []
    
    def get_subscriber_load(self, subscriber_id: str) -> float:
        """Get current load of subscriber (0.0 - 1.0)"""
        metrics = self.metrics.get(subscriber_id)
        if not metrics:
            return 0.0
        
        # Simple load calculation based on queue depth and processing time
        queue_factor = min(metrics.queue_depth / 1000, 1.0)  # Max 1000 queued
        processing_factor = min(metrics.avg_processing_time_ms / 5000, 1.0)  # Max 5s
        
        return (queue_factor + processing_factor) / 2

# Example usage
router = SmartEventRouter()

# Order events: load balance across payment processors
router.add_routing_rule(RoutingRule(
    condition=lambda event: event.get('type') == 'OrderPlaced',
    strategy=RoutingStrategy.LOAD_BALANCE,
    targets=['payment-processor-1', 'payment-processor-2', 'payment-processor-3'],
    priority=10
))

# Critical events: broadcast to all monitoring systems
router.add_routing_rule(RoutingRule(
    condition=lambda event: event.get('priority') == 'critical',
    strategy=RoutingStrategy.BROADCAST,
    targets=['monitoring-system', 'alerting-system', 'audit-system'],
    priority=20
))

# Regional events: geographic routing
router.add_routing_rule(RoutingRule(
    condition=lambda event: 'region' in event.get('metadata', {}),
    strategy=RoutingStrategy.GEOGRAPHIC,
    targets=['processor-us-east', 'processor-eu-west', 'processor-asia-pacific'],
    priority=5
))

68.4 Stream Processing Platforms

68.4.1 Advanced Stream Processing mit Kafka Streams

@Component
public class OrderFulfillmentStreamProcessor {
    
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Bean
    public KafkaStreams orderFulfillmentStream() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Input streams
        KStream<String, OrderPlaced> orders = builder.stream("order.placed.v1");
        KStream<String, PaymentProcessed> payments = builder.stream("payment.processed.v1");
        KStream<String, InventoryReserved> inventory = builder.stream("inventory.reserved.v1");
        
        // Create stateful stores for tracking
        StoreBuilder<KeyValueStore<String, OrderState>> orderStateStore = 
                Stores.keyValueStoreBuilder(
                        Stores.persistentKeyValueStore("order-state-store"),
                        Serdes.String(),
                        JsonSerdes.orderState()
                );
        builder.addStateStore(orderStateStore);
        
        // Process order events and maintain state
        KStream<String, OrderState> orderStates = orders
                .selectKey((key, order) -> order.getOrderId())
                .transformValues(
                        () -> new OrderStateInitializer(),
                        "order-state-store"
                );
        
        // Join with payment events
        KStream<String, OrderState> withPayments = orderStates
                .leftJoin(
                        payments.selectKey((key, payment) -> payment.getOrderId()),
                        (orderState, payment) -> {
                            if (payment != null) {
                                orderState.setPaymentProcessed(true);
                                orderState.setPaymentTimestamp(payment.getTimestamp());
                            }
                            return orderState;
                        },
                        JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10))
                );
        
        // Join with inventory events
        KStream<String, OrderState> withInventory = withPayments
                .leftJoin(
                        inventory.selectKey((key, inv) -> inv.getOrderId()),
                        (orderState, inventoryEvent) -> {
                            if (inventoryEvent != null) {
                                orderState.setInventoryReserved(true);
                                orderState.setInventoryTimestamp(inventoryEvent.getTimestamp());
                            }
                            return orderState;
                        },
                        JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10))
                );
        
        // Detect complete orders and publish fulfillment events
        withInventory
                .filter((orderId, orderState) -> 
                        orderState.isPaymentProcessed() && orderState.isInventoryReserved())
                .mapValues(this::createFulfillmentReadyEvent)
                .to("order.fulfillment.ready.v1");
        
        // Detect stuck orders (payment or inventory missing after timeout)
        withInventory
                .transformValues(
                        () -> new StuckOrderDetector(),
                        "order-state-store"
                )
                .filter((orderId, stuckOrder) -> stuckOrder != null)
                .to("order.stuck.v1");
        
        // Build and configure streams
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-fulfillment-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
        return new KafkaStreams(builder.build(), props);
    }
    
    private class StuckOrderDetector implements ValueTransformerWithKey<String, OrderState, StuckOrderEvent> {
        
        private ProcessorContext context;
        private KeyValueStore<String, OrderState> stateStore;
        
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            this.stateStore = context.getStateStore("order-state-store");
            
            // Schedule periodic checks for stuck orders
            context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, this::checkForStuckOrders);
        }
        
        @Override
        public StuckOrderEvent transform(String orderId, OrderState orderState) {
            // Update state store
            stateStore.put(orderId, orderState);
            return null; // Return stuck orders only during scheduled checks
        }
        
        private void checkForStuckOrders(long timestamp) {
            try (KeyValueIterator<String, OrderState> iterator = stateStore.all()) {
                while (iterator.hasNext()) {
                    KeyValue<String, OrderState> entry = iterator.next();
                    OrderState state = entry.value;
                    
                    long orderAge = timestamp - state.getOrderTimestamp().toEpochMilli();
                    
                    // Check if order is stuck
                    if (orderAge > Duration.ofMinutes(30).toMillis()) {
                        if (!state.isPaymentProcessed() || !state.isInventoryReserved()) {
                            StuckOrderEvent stuckEvent = StuckOrderEvent.builder()
                                    .orderId(entry.key)
                                    .orderState(state)
                                    .stuckReason(determineStuckReason(state))
                                    .detectedAt(Instant.ofEpochMilli(timestamp))
                                    .build();
                                    
                            // Forward stuck order event
                            context.forward(entry.key, stuckEvent);
                        }
                    }
                }
            }
        }
    }
}

68.4.2 Real-time Analytics und Complex Event Processing

import asyncio
from collections import defaultdict, deque
from typing import Dict, List, Deque, Optional
from datetime import datetime, timedelta
import statistics

class RealTimeAnalyticsProcessor:
    """Real-time analytics for event streams"""
    
    def __init__(self, window_size: timedelta = timedelta(minutes=5)):
        self.window_size = window_size
        self.event_windows: Dict[str, Deque[Dict]] = defaultdict(lambda: deque())
        self.metrics_cache: Dict[str, Dict] = {}
        
    async def process_event_stream(self, event_stream):
        """Process continuous event stream and generate real-time metrics"""
        
        async for event in event_stream:
            await self.process_single_event(event)
            
            # Generate metrics every few events
            if len(self.event_windows['orders']) % 10 == 0:
                metrics = await self.calculate_real_time_metrics()
                await self.publish_metrics(metrics)
    
    async def process_single_event(self, event: Dict):
        """Process individual event and update windows"""
        
        event_type = event.get('type', '')
        timestamp = datetime.fromisoformat(event.get('timestamp'))
        
        # Add to appropriate window
        window_key = self.get_window_key(event_type)
        self.event_windows[window_key].append({
            'event': event,
            'timestamp': timestamp
        })
        
        # Remove old events outside window
        cutoff_time = datetime.now() - self.window_size
        while (self.event_windows[window_key] and 
               self.event_windows[window_key][0]['timestamp'] < cutoff_time):
            self.event_windows[window_key].popleft()
    
    async def calculate_real_time_metrics(self) -> Dict[str, float]:
        """Calculate real-time metrics from current windows"""
        
        metrics = {}
        
        # Order metrics
        order_events = [item['event'] for item in self.event_windows['orders']]
        metrics['orders_per_minute'] = len(order_events) * (60 / self.window_size.total_seconds())
        
        if order_events:
            order_values = [float(event['data'].get('totalAmount', 0)) for event in order_events]
            metrics['avg_order_value'] = statistics.mean(order_values)
            metrics['total_revenue'] = sum(order_values)
        
        # Payment metrics
        payment_events = [item['event'] for item in self.event_windows['payments']]
        metrics['payments_per_minute'] = len(payment_events) * (60 / self.window_size.total_seconds())
        
        # Error rate calculation
        error_events = [item['event'] for item in self.event_windows['errors']]
        total_events = sum(len(window) for window in self.event_windows.values())
        metrics['error_rate'] = len(error_events) / max(total_events, 1)
        
        # Complex event patterns
        metrics.update(await self.detect_complex_patterns())
        
        return metrics
    
    async def detect_complex_patterns(self) -> Dict[str, Any]:
        """Detect complex event patterns"""
        
        patterns = {}
        
        # Pattern 1: High-value orders without payment (potential issues)
        high_value_orders = [
            item['event'] for item in self.event_windows['orders']
            if float(item['event']['data'].get('totalAmount', 0)) > 1000
        ]
        
        corresponding_payments = [
            item['event'] for item in self.event_windows['payments']
            if any(payment['data'].get('orderId') == order['data'].get('orderId') 
                  for order in high_value_orders
                  for payment in [item['event']])
        ]
        
        patterns['high_value_orders_without_payment'] = (
            len(high_value_orders) - len(corresponding_payments)
        )
        
        # Pattern 2: Rapid order placement (potential fraud)
        recent_orders = [
            item for item in self.event_windows['orders']
            if item['timestamp'] > datetime.now() - timedelta(minutes=1)
        ]
        
        customer_order_counts = defaultdict(int)
        for item in recent_orders:
            customer_id = item['event']['data'].get('customerId')
            customer_order_counts[customer_id] += 1
        
        patterns['rapid_order_customers'] = sum(
            1 for count in customer_order_counts.values() if count > 5
        )
        
        # Pattern 3: Inventory shortage pattern
        inventory_events = [item['event'] for item in self.event_windows['inventory']]
        out_of_stock_events = [
            event for event in inventory_events
            if event['data'].get('available', True) == False
        ]
        
        patterns['out_of_stock_rate'] = len(out_of_stock_events) / max(len(inventory_events), 1)
        
        return patterns
    
    def get_window_key(self, event_type: str) -> str:
        """Map event types to window keys"""
        
        if 'order' in event_type.lower():
            return 'orders'
        elif 'payment' in event_type.lower():
            return 'payments'
        elif 'inventory' in event_type.lower():
            return 'inventory'
        elif 'error' in event_type.lower():
            return 'errors'
        else:
            return 'other'

class ComplexEventProcessor:
    """Process complex event patterns and correlations"""
    
    def __init__(self):
        self.pattern_definitions = {}
        self.active_patterns = {}
        
    def define_pattern(self, pattern_name: str, pattern_definition: Dict):
        """Define a complex event pattern"""
        self.pattern_definitions[pattern_name] = pattern_definition
    
    async def process_event_for_patterns(self, event: Dict):
        """Check if event completes any defined patterns"""
        
        completed_patterns = []
        
        for pattern_name, pattern_def in self.pattern_definitions.items():
            if await self.check_pattern_completion(event, pattern_def):
                completed_patterns.append(pattern_name)
                await self.handle_pattern_completion(pattern_name, event)
        
        return completed_patterns
    
    async def check_pattern_completion(self, event: Dict, pattern_def: Dict) -> bool:
        """Check if current event completes a pattern"""
        
        # Simplified pattern matching - in practice, use CEP engines
        required_events = pattern_def.get('required_events', [])
        temporal_constraints = pattern_def.get('temporal_constraints', {})
        
        # Check if this event type is part of the pattern
        if event.get('type') not in required_events:
            return False
        
        # Check temporal constraints (simplified)
        max_time_span = temporal_constraints.get('max_time_span', timedelta(hours=1))
        
        # In a real implementation, you'd maintain state of partial pattern matches
        # and check temporal relationships between events
        
        return True  # Simplified for example

# Example pattern definition
processor = ComplexEventProcessor()

# Define fraud detection pattern
processor.define_pattern('potential_fraud', {
    'required_events': ['OrderPlaced', 'PaymentFailed', 'OrderPlaced'],
    'temporal_constraints': {
        'max_time_span': timedelta(minutes=10),
        'min_frequency': 3  # 3 events within time span
    },
    'conditions': {
        'same_customer': True,
        'high_value_orders': True
    }
})

Diese weiterführenden Themen zeigen, wie sich Event-Driven Architecture in Cloud-Native-Umgebungen weiterentwickelt und neue Möglichkeiten für Skalierung, Abstraktion und intelligente Event-Verarbeitung bietet.