Event-Driven Architecture entwickelt sich kontinuierlich weiter, insbesondere im Cloud-Native-Umfeld. Moderne Patterns wie Serverless Computing, Event Meshes und Stream Processing Platforms erweitern die Möglichkeiten von EDA erheblich. Diese Ansätze bieten neue Wege für Skalierung, Abstraktion und Integration, bringen aber auch neue Komplexitäten mit sich.
Serverless Computing eignet sich besonders gut für Event-Processing, da Events naturgemäß diskrete, zustandslose Verarbeitungsschritte auslösen:
AWS Lambda Event Processing:
import json
import boto3
from typing import Dict, Any
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Process Kafka events via AWS Lambda"""
records_processed = 0
failed_records = []
# Lambda receives batches of Kafka records
for record in event.get('records', []):
try:
# Decode Kafka message
kafka_data = json.loads(record['value'])
# Extract business event
order_data = json.loads(kafka_data['data'])
# Process the order event
result = process_order_placed(order_data)
# Publish downstream event if needed
if result.get('publish_event'):
publish_payment_event(result['payment_data'])
records_processed += 1
except Exception as e:
# Collect failed records for partial batch failure
failed_records.append({
'itemIdentifier': record.get('key', 'unknown'),
'error': str(e)
})
# Return batch processing results
response = {
'batchItemFailures': failed_records
}
print(f"Processed {records_processed} records, {len(failed_records)} failures")
return response
def process_order_placed(order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Business logic for order processing"""
# Validate order
if not validate_order(order_data):
raise ValueError("Invalid order data")
# Calculate pricing
pricing = calculate_pricing(order_data)
# Reserve inventory
inventory_result = reserve_inventory(order_data['items'])
if not inventory_result['success']:
raise RuntimeError("Inventory reservation failed")
# Prepare payment processing
payment_data = {
'order_id': order_data['orderId'],
'amount': pricing['total'],
'currency': 'EUR',
'customer_id': order_data['customerId']
}
return {
'publish_event': True,
'payment_data': payment_data
}
def publish_payment_event(payment_data: Dict[str, Any]):
"""Publish payment event to EventBridge"""
eventbridge = boto3.client('events')
event_detail = {
'eventType': 'PaymentInitiated',
'timestamp': datetime.utcnow().isoformat(),
'data': payment_data
}
eventbridge.put_events(
Entries=[
{
'Source': 'order-processor',
'DetailType': 'Payment Event',
'Detail': json.dumps(event_detail),
'EventBusName': 'payment-events'
}
]
)Azure Functions mit Event Hubs:
@FunctionName("OrderEventProcessor")
public void processOrderEvents(
@EventHubTrigger(
name = "orderEvents",
eventHubName = "order-placed-events",
connection = "EventHubConnectionString"
) List<String> events,
@EventHubOutput(
name = "paymentEvents",
eventHubName = "payment-initiated-events",
connection = "EventHubConnectionString"
) OutputBinding<List<PaymentEvent>> outputEvents,
ExecutionContext context) {
List<PaymentEvent> paymentEvents = new ArrayList<>();
for (String eventData : events) {
try {
OrderPlacedEvent orderEvent = objectMapper.readValue(eventData, OrderPlacedEvent.class);
// Process order
PaymentEvent paymentEvent = processOrder(orderEvent);
paymentEvents.add(paymentEvent);
context.getLogger().info("Processed order: " + orderEvent.getOrderId());
} catch (Exception e) {
context.getLogger().severe("Failed to process event: " + e.getMessage());
// In production: send to dead letter queue
}
}
outputEvents.setValue(paymentEvents);
}
private PaymentEvent processOrder(OrderPlacedEvent orderEvent) {
// Validation
validateOrder(orderEvent);
// Business logic
BigDecimal totalAmount = calculateTotal(orderEvent.getItems());
// Create payment event
return PaymentEvent.builder()
.orderId(orderEvent.getOrderId())
.customerId(orderEvent.getCustomerId())
.amount(totalAmount)
.currency("EUR")
.timestamp(Instant.now())
.build();
}Komplexere Workflows lassen sich mit Serverless-Orchestration-Services realisieren:
AWS Step Functions für Event-Driven Workflows:
{
"Comment": "Order Processing Workflow",
"StartAt": "ProcessOrder",
"States": {
"ProcessOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:ProcessOrder",
"Next": "CheckInventory",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
]
},
"CheckInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:CheckInventory",
"Next": "InventoryAvailable?",
"Catch": [
{
"ErrorEquals": ["InventoryUnavailable"],
"Next": "HandleInventoryFailure"
}
]
},
"InventoryAvailable?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.inventory.available",
"BooleanEquals": true,
"Next": "ProcessPayment"
}
],
"Default": "HandleInventoryFailure"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:ProcessPayment",
"Next": "PublishOrderConfirmed",
"Catch": [
{
"ErrorEquals": ["PaymentFailed"],
"Next": "HandlePaymentFailure"
}
]
},
"PublishOrderConfirmed": {
"Type": "Task",
"Resource": "arn:aws:states:::events:putEvents",
"Parameters": {
"Entries": [
{
"Source": "order-processor",
"DetailType": "Order Confirmed",
"Detail.$": "$.orderConfirmation"
}
]
},
"End": true
},
"HandleInventoryFailure": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:HandleInventoryFailure",
"End": true
},
"HandlePaymentFailure": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:HandlePaymentFailure",
"End": true
}
}
}import asyncio
import aiohttp
from functools import lru_cache
from typing import Optional
class OptimizedEventProcessor:
"""Optimized event processor for serverless environments"""
def __init__(self):
self.http_session: Optional[aiohttp.ClientSession] = None
self.connection_pool = None
@lru_cache(maxsize=128)
def get_cached_config(self, key: str) -> str:
"""Cache configuration to avoid repeated lookups"""
return os.environ.get(key)
async def get_http_session(self) -> aiohttp.ClientSession:
"""Reuse HTTP session across invocations"""
if self.http_session is None or self.http_session.closed:
connector = aiohttp.TCPConnector(
limit=100, # Connection pool size
ttl_dns_cache=300, # DNS cache
use_dns_cache=True
)
self.http_session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=30)
)
return self.http_session
async def process_events_batch(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process events in parallel for better performance"""
# Process events concurrently
tasks = [self.process_single_event(event) for event in events]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Separate successful results from errors
successful_results = []
errors = []
for i, result in enumerate(results):
if isinstance(result, Exception):
errors.append({
'event_index': i,
'error': str(result)
})
else:
successful_results.append(result)
return {
'successful_results': successful_results,
'errors': errors
}
async def process_single_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Process individual event with optimizations"""
session = await self.get_http_session()
# Process business logic
order_data = event['data']
# Parallel external calls
inventory_task = self.check_inventory(session, order_data['items'])
pricing_task = self.calculate_pricing(session, order_data)
inventory_result, pricing_result = await asyncio.gather(
inventory_task, pricing_task
)
return {
'order_id': order_data['orderId'],
'inventory': inventory_result,
'pricing': pricing_result,
'processed_at': datetime.utcnow().isoformat()
}
# Global processor instance to reuse across invocations
processor = OptimizedEventProcessor()
async def lambda_handler(event, context):
"""Lambda handler with optimizations"""
try:
result = await processor.process_events_batch(event['records'])
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}Cloud Events standardisiert Event-Formate für Cloud-native Umgebungen:
@Component
public class CloudEventProcessor {
private final CloudEventBuilder cloudEventBuilder;
public CloudEvent createOrderPlacedEvent(OrderPlaced order) {
return cloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("https://orders.company.com/"))
.withType("com.company.orders.placed.v1")
.withDataContentType("application/json")
.withSubject("order/" + order.getOrderId())
.withTime(OffsetDateTime.now())
.withData(serializeOrder(order))
.withExtension("customer-tier", order.getCustomerTier())
.withExtension("order-priority", order.getPriority())
.build();
}
@KafkaListener(topics = "order-events")
public void handleCloudEvent(@Payload CloudEvent cloudEvent) {
// Extract standard CloudEvent metadata
String eventType = cloudEvent.getType();
String source = cloudEvent.getSource().toString();
OffsetDateTime eventTime = cloudEvent.getTime();
// Route based on event type
switch (eventType) {
case "com.company.orders.placed.v1" -> processOrderPlaced(cloudEvent);
case "com.company.payments.processed.v1" -> processPaymentProcessed(cloudEvent);
case "com.company.inventory.reserved.v1" -> processInventoryReserved(cloudEvent);
default -> log.warn("Unknown event type: {}", eventType);
}
}
private void processOrderPlaced(CloudEvent cloudEvent) {
// Extract custom extensions
String customerTier = cloudEvent.getExtension("customer-tier").toString();
String priority = cloudEvent.getExtension("order-priority").toString();
// Deserialize event data
OrderPlaced order = deserializeEventData(cloudEvent.getData(), OrderPlaced.class);
// Process based on customer tier and priority
if ("premium".equals(customerTier) || "high".equals(priority)) {
processPriorityOrder(order);
} else {
processStandardOrder(order);
}
}
}from abc import ABC, abstractmethod
from typing import Dict, Any, List
import asyncio
class CloudEventBridge(ABC):
"""Abstract base for cloud event bridges"""
@abstractmethod
async def publish_event(self, event: Dict[str, Any]) -> bool:
pass
@abstractmethod
async def subscribe_to_events(self, event_types: List[str], callback) -> bool:
pass
class AWSEventBridge(CloudEventBridge):
def __init__(self, region: str):
self.eventbridge = boto3.client('events', region_name=region)
async def publish_event(self, event: Dict[str, Any]) -> bool:
try:
response = self.eventbridge.put_events(
Entries=[
{
'Source': event.get('source', 'default'),
'DetailType': event.get('type', 'Custom Event'),
'Detail': json.dumps(event.get('data', {})),
'EventBusName': event.get('eventbus', 'default')
}
]
)
return response['FailedEntryCount'] == 0
except Exception as e:
print(f"Failed to publish to AWS EventBridge: {e}")
return False
class AzureEventGridBridge(CloudEventBridge):
def __init__(self, endpoint: str, access_key: str):
self.endpoint = endpoint
self.access_key = access_key
async def publish_event(self, event: Dict[str, Any]) -> bool:
try:
headers = {
'aeg-sas-key': self.access_key,
'Content-Type': 'application/json'
}
event_data = {
'id': event.get('id', str(uuid.uuid4())),
'subject': event.get('subject', 'default'),
'dataVersion': '1.0',
'eventType': event.get('type', 'Custom'),
'data': event.get('data', {}),
'eventTime': event.get('time', datetime.utcnow().isoformat())
}
async with aiohttp.ClientSession() as session:
async with session.post(self.endpoint, json=[event_data], headers=headers) as response:
return response.status == 200
except Exception as e:
print(f"Failed to publish to Azure Event Grid: {e}")
return False
class MultiCloudEventRouter:
"""Route events across multiple cloud providers"""
def __init__(self):
self.bridges: Dict[str, CloudEventBridge] = {}
self.routing_rules: List[Dict[str, Any]] = []
def add_bridge(self, name: str, bridge: CloudEventBridge):
self.bridges[name] = bridge
def add_routing_rule(self, rule: Dict[str, Any]):
"""Add routing rule: {'condition': lambda event: ..., 'targets': ['aws', 'azure']}"""
self.routing_rules.append(rule)
async def route_event(self, event: Dict[str, Any]) -> Dict[str, bool]:
"""Route event to appropriate cloud providers"""
results = {}
for rule in self.routing_rules:
if rule['condition'](event):
# Route to specified targets
tasks = []
for target in rule['targets']:
if target in self.bridges:
tasks.append(self.bridges[target].publish_event(event))
if tasks:
target_results = await asyncio.gather(*tasks, return_exceptions=True)
for i, target in enumerate(rule['targets']):
results[target] = not isinstance(target_results[i], Exception)
return results
# Usage example
router = MultiCloudEventRouter()
# Configure bridges
aws_bridge = AWSEventBridge('us-east-1')
azure_bridge = AzureEventGridBridge('https://topic.eastus-1.eventgrid.azure.net/api/events', 'access-key')
router.add_bridge('aws', aws_bridge)
router.add_bridge('azure', azure_bridge)
# Configure routing rules
router.add_routing_rule({
'condition': lambda event: event.get('type', '').startswith('com.company.orders'),
'targets': ['aws', 'azure'] # Order events go to both
})
router.add_routing_rule({
'condition': lambda event: event.get('source', '').endswith('payment-service'),
'targets': ['aws'] # Payment events only to AWS
})Event Meshes entkoppeln Event-Produktion und -Konsumption vollständig durch intelligente Routing-Layer:
@Component
public class EventMeshNode {
private final Map<String, EventRouter> routers = new ConcurrentHashMap<>();
private final Map<String, SubscriptionFilter> subscriptions = new ConcurrentHashMap<>();
@PostConstruct
public void initializeMeshNode() {
// Register with mesh discovery service
registerWithMesh();
// Start health checking
startHealthChecking();
// Initialize event routing
initializeRouting();
}
public void publishToMesh(Event event) {
// Enrich event with mesh metadata
MeshEvent meshEvent = enrichEventWithMeshMetadata(event);
// Route to interested subscribers
routeEventToSubscribers(meshEvent);
// Propagate to other mesh nodes if needed
propagateToOtherNodes(meshEvent);
}
private void routeEventToSubscribers(MeshEvent meshEvent) {
subscriptions.values().parallelStream()
.filter(subscription -> subscription.matches(meshEvent))
.forEach(subscription -> {
try {
subscription.deliver(meshEvent);
} catch (Exception e) {
handleDeliveryFailure(subscription, meshEvent, e);
}
});
}
private MeshEvent enrichEventWithMeshMetadata(Event event) {
return MeshEvent.builder()
.originalEvent(event)
.meshNodeId(getNodeId())
.routingPath(new ArrayList<>())
.ttl(DEFAULT_TTL)
.qosLevel(QualityOfService.AT_LEAST_ONCE)
.build();
}
@EventListener
public void handleSubscriptionRequest(SubscriptionRequest request) {
SubscriptionFilter filter = SubscriptionFilter.builder()
.subscriberId(request.getSubscriberId())
.eventTypePattern(request.getEventTypePattern())
.sourcePattern(request.getSourcePattern())
.deliveryEndpoint(request.getDeliveryEndpoint())
.qosRequirements(request.getQosRequirements())
.build();
subscriptions.put(request.getSubscriberId(), filter);
// Notify other mesh nodes about new subscription
propagateSubscription(filter);
}
}
@Component
public class EventMeshTopology {
private final Graph<MeshNode, MeshConnection> topology = GraphBuilder.directed().build();
public void addMeshNode(MeshNode node) {
topology.addNode(node);
// Auto-discover and connect to nearby nodes
findAndConnectToNearbyNodes(node);
}
public List<MeshNode> findOptimalPath(MeshNode source, Event event) {
// Find subscribers for this event type
Set<MeshNode> interestedNodes = findNodesWithSubscribers(event.getType());
// Calculate shortest paths to all interested nodes
return interestedNodes.stream()
.map(target -> findShortestPath(source, target))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private void findAndConnectToNearbyNodes(MeshNode newNode) {
topology.nodes().stream()
.filter(existingNode -> !existingNode.equals(newNode))
.filter(existingNode -> isWithinLatencyThreshold(newNode, existingNode))
.forEach(existingNode -> {
MeshConnection connection = establishConnection(newNode, existingNode);
topology.putEdge(newNode, existingNode, connection);
});
}
}from typing import Dict, List, Callable, Optional
from dataclasses import dataclass
import asyncio
from enum import Enum
class RoutingStrategy(Enum):
BROADCAST = "broadcast" # Send to all matching subscribers
LOAD_BALANCE = "load_balance" # Send to one of matching subscribers
FAILOVER = "failover" # Send to primary, fallback to secondary
GEOGRAPHIC = "geographic" # Route based on geographic proximity
@dataclass
class RoutingRule:
condition: Callable[[Dict], bool]
strategy: RoutingStrategy
targets: List[str]
priority: int = 0
metadata: Dict[str, str] = None
class SmartEventRouter:
def __init__(self):
self.routing_rules: List[RoutingRule] = []
self.subscribers: Dict[str, 'EventSubscriber'] = {}
self.metrics: Dict[str, 'SubscriberMetrics'] = {}
def add_routing_rule(self, rule: RoutingRule):
"""Add routing rule, sorted by priority"""
self.routing_rules.append(rule)
self.routing_rules.sort(key=lambda r: r.priority, reverse=True)
async def route_event(self, event: Dict[str, Any]) -> List[str]:
"""Route event based on rules and return list of successful deliveries"""
successful_deliveries = []
for rule in self.routing_rules:
if rule.condition(event):
delivery_result = await self.execute_routing_strategy(event, rule)
successful_deliveries.extend(delivery_result)
break # Use first matching rule
return successful_deliveries
async def execute_routing_strategy(self, event: Dict[str, Any],
rule: RoutingRule) -> List[str]:
"""Execute specific routing strategy"""
if rule.strategy == RoutingStrategy.BROADCAST:
return await self.broadcast_to_all(event, rule.targets)
elif rule.strategy == RoutingStrategy.LOAD_BALANCE:
return await self.load_balance_delivery(event, rule.targets)
elif rule.strategy == RoutingStrategy.FAILOVER:
return await self.failover_delivery(event, rule.targets)
elif rule.strategy == RoutingStrategy.GEOGRAPHIC:
return await self.geographic_routing(event, rule.targets)
return []
async def load_balance_delivery(self, event: Dict[str, Any],
targets: List[str]) -> List[str]:
"""Deliver to subscriber with lowest load"""
# Filter available targets
available_targets = [t for t in targets if self.is_subscriber_healthy(t)]
if not available_targets:
return []
# Select target with lowest load
selected_target = min(available_targets,
key=lambda t: self.get_subscriber_load(t))
success = await self.deliver_to_subscriber(event, selected_target)
return [selected_target] if success else []
async def failover_delivery(self, event: Dict[str, Any],
targets: List[str]) -> List[str]:
"""Try targets in order until one succeeds"""
for target in targets:
if self.is_subscriber_healthy(target):
success = await self.deliver_to_subscriber(event, target)
if success:
return [target]
return []
async def geographic_routing(self, event: Dict[str, Any],
targets: List[str]) -> List[str]:
"""Route to geographically closest subscriber"""
event_region = event.get('metadata', {}).get('region', 'unknown')
# Group targets by region
regional_targets = {}
for target in targets:
target_region = self.get_subscriber_region(target)
if target_region not in regional_targets:
regional_targets[target_region] = []
regional_targets[target_region].append(target)
# Prefer same region, then closest regions
if event_region in regional_targets:
return await self.load_balance_delivery(event, regional_targets[event_region])
else:
# Find closest region
closest_region = self.find_closest_region(event_region, regional_targets.keys())
if closest_region:
return await self.load_balance_delivery(event, regional_targets[closest_region])
return []
def get_subscriber_load(self, subscriber_id: str) -> float:
"""Get current load of subscriber (0.0 - 1.0)"""
metrics = self.metrics.get(subscriber_id)
if not metrics:
return 0.0
# Simple load calculation based on queue depth and processing time
queue_factor = min(metrics.queue_depth / 1000, 1.0) # Max 1000 queued
processing_factor = min(metrics.avg_processing_time_ms / 5000, 1.0) # Max 5s
return (queue_factor + processing_factor) / 2
# Example usage
router = SmartEventRouter()
# Order events: load balance across payment processors
router.add_routing_rule(RoutingRule(
condition=lambda event: event.get('type') == 'OrderPlaced',
strategy=RoutingStrategy.LOAD_BALANCE,
targets=['payment-processor-1', 'payment-processor-2', 'payment-processor-3'],
priority=10
))
# Critical events: broadcast to all monitoring systems
router.add_routing_rule(RoutingRule(
condition=lambda event: event.get('priority') == 'critical',
strategy=RoutingStrategy.BROADCAST,
targets=['monitoring-system', 'alerting-system', 'audit-system'],
priority=20
))
# Regional events: geographic routing
router.add_routing_rule(RoutingRule(
condition=lambda event: 'region' in event.get('metadata', {}),
strategy=RoutingStrategy.GEOGRAPHIC,
targets=['processor-us-east', 'processor-eu-west', 'processor-asia-pacific'],
priority=5
))@Component
public class OrderFulfillmentStreamProcessor {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaStreams orderFulfillmentStream() {
StreamsBuilder builder = new StreamsBuilder();
// Input streams
KStream<String, OrderPlaced> orders = builder.stream("order.placed.v1");
KStream<String, PaymentProcessed> payments = builder.stream("payment.processed.v1");
KStream<String, InventoryReserved> inventory = builder.stream("inventory.reserved.v1");
// Create stateful stores for tracking
StoreBuilder<KeyValueStore<String, OrderState>> orderStateStore =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("order-state-store"),
Serdes.String(),
JsonSerdes.orderState()
);
builder.addStateStore(orderStateStore);
// Process order events and maintain state
KStream<String, OrderState> orderStates = orders
.selectKey((key, order) -> order.getOrderId())
.transformValues(
() -> new OrderStateInitializer(),
"order-state-store"
);
// Join with payment events
KStream<String, OrderState> withPayments = orderStates
.leftJoin(
payments.selectKey((key, payment) -> payment.getOrderId()),
(orderState, payment) -> {
if (payment != null) {
orderState.setPaymentProcessed(true);
orderState.setPaymentTimestamp(payment.getTimestamp());
}
return orderState;
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10))
);
// Join with inventory events
KStream<String, OrderState> withInventory = withPayments
.leftJoin(
inventory.selectKey((key, inv) -> inv.getOrderId()),
(orderState, inventoryEvent) -> {
if (inventoryEvent != null) {
orderState.setInventoryReserved(true);
orderState.setInventoryTimestamp(inventoryEvent.getTimestamp());
}
return orderState;
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10))
);
// Detect complete orders and publish fulfillment events
withInventory
.filter((orderId, orderState) ->
orderState.isPaymentProcessed() && orderState.isInventoryReserved())
.mapValues(this::createFulfillmentReadyEvent)
.to("order.fulfillment.ready.v1");
// Detect stuck orders (payment or inventory missing after timeout)
withInventory
.transformValues(
() -> new StuckOrderDetector(),
"order-state-store"
)
.filter((orderId, stuckOrder) -> stuckOrder != null)
.to("order.stuck.v1");
// Build and configure streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-fulfillment-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
return new KafkaStreams(builder.build(), props);
}
private class StuckOrderDetector implements ValueTransformerWithKey<String, OrderState, StuckOrderEvent> {
private ProcessorContext context;
private KeyValueStore<String, OrderState> stateStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = context.getStateStore("order-state-store");
// Schedule periodic checks for stuck orders
context.schedule(Duration.ofMinutes(5), PunctuationType.WALL_CLOCK_TIME, this::checkForStuckOrders);
}
@Override
public StuckOrderEvent transform(String orderId, OrderState orderState) {
// Update state store
stateStore.put(orderId, orderState);
return null; // Return stuck orders only during scheduled checks
}
private void checkForStuckOrders(long timestamp) {
try (KeyValueIterator<String, OrderState> iterator = stateStore.all()) {
while (iterator.hasNext()) {
KeyValue<String, OrderState> entry = iterator.next();
OrderState state = entry.value;
long orderAge = timestamp - state.getOrderTimestamp().toEpochMilli();
// Check if order is stuck
if (orderAge > Duration.ofMinutes(30).toMillis()) {
if (!state.isPaymentProcessed() || !state.isInventoryReserved()) {
StuckOrderEvent stuckEvent = StuckOrderEvent.builder()
.orderId(entry.key)
.orderState(state)
.stuckReason(determineStuckReason(state))
.detectedAt(Instant.ofEpochMilli(timestamp))
.build();
// Forward stuck order event
context.forward(entry.key, stuckEvent);
}
}
}
}
}
}
}import asyncio
from collections import defaultdict, deque
from typing import Dict, List, Deque, Optional
from datetime import datetime, timedelta
import statistics
class RealTimeAnalyticsProcessor:
"""Real-time analytics for event streams"""
def __init__(self, window_size: timedelta = timedelta(minutes=5)):
self.window_size = window_size
self.event_windows: Dict[str, Deque[Dict]] = defaultdict(lambda: deque())
self.metrics_cache: Dict[str, Dict] = {}
async def process_event_stream(self, event_stream):
"""Process continuous event stream and generate real-time metrics"""
async for event in event_stream:
await self.process_single_event(event)
# Generate metrics every few events
if len(self.event_windows['orders']) % 10 == 0:
metrics = await self.calculate_real_time_metrics()
await self.publish_metrics(metrics)
async def process_single_event(self, event: Dict):
"""Process individual event and update windows"""
event_type = event.get('type', '')
timestamp = datetime.fromisoformat(event.get('timestamp'))
# Add to appropriate window
window_key = self.get_window_key(event_type)
self.event_windows[window_key].append({
'event': event,
'timestamp': timestamp
})
# Remove old events outside window
cutoff_time = datetime.now() - self.window_size
while (self.event_windows[window_key] and
self.event_windows[window_key][0]['timestamp'] < cutoff_time):
self.event_windows[window_key].popleft()
async def calculate_real_time_metrics(self) -> Dict[str, float]:
"""Calculate real-time metrics from current windows"""
metrics = {}
# Order metrics
order_events = [item['event'] for item in self.event_windows['orders']]
metrics['orders_per_minute'] = len(order_events) * (60 / self.window_size.total_seconds())
if order_events:
order_values = [float(event['data'].get('totalAmount', 0)) for event in order_events]
metrics['avg_order_value'] = statistics.mean(order_values)
metrics['total_revenue'] = sum(order_values)
# Payment metrics
payment_events = [item['event'] for item in self.event_windows['payments']]
metrics['payments_per_minute'] = len(payment_events) * (60 / self.window_size.total_seconds())
# Error rate calculation
error_events = [item['event'] for item in self.event_windows['errors']]
total_events = sum(len(window) for window in self.event_windows.values())
metrics['error_rate'] = len(error_events) / max(total_events, 1)
# Complex event patterns
metrics.update(await self.detect_complex_patterns())
return metrics
async def detect_complex_patterns(self) -> Dict[str, Any]:
"""Detect complex event patterns"""
patterns = {}
# Pattern 1: High-value orders without payment (potential issues)
high_value_orders = [
item['event'] for item in self.event_windows['orders']
if float(item['event']['data'].get('totalAmount', 0)) > 1000
]
corresponding_payments = [
item['event'] for item in self.event_windows['payments']
if any(payment['data'].get('orderId') == order['data'].get('orderId')
for order in high_value_orders
for payment in [item['event']])
]
patterns['high_value_orders_without_payment'] = (
len(high_value_orders) - len(corresponding_payments)
)
# Pattern 2: Rapid order placement (potential fraud)
recent_orders = [
item for item in self.event_windows['orders']
if item['timestamp'] > datetime.now() - timedelta(minutes=1)
]
customer_order_counts = defaultdict(int)
for item in recent_orders:
customer_id = item['event']['data'].get('customerId')
customer_order_counts[customer_id] += 1
patterns['rapid_order_customers'] = sum(
1 for count in customer_order_counts.values() if count > 5
)
# Pattern 3: Inventory shortage pattern
inventory_events = [item['event'] for item in self.event_windows['inventory']]
out_of_stock_events = [
event for event in inventory_events
if event['data'].get('available', True) == False
]
patterns['out_of_stock_rate'] = len(out_of_stock_events) / max(len(inventory_events), 1)
return patterns
def get_window_key(self, event_type: str) -> str:
"""Map event types to window keys"""
if 'order' in event_type.lower():
return 'orders'
elif 'payment' in event_type.lower():
return 'payments'
elif 'inventory' in event_type.lower():
return 'inventory'
elif 'error' in event_type.lower():
return 'errors'
else:
return 'other'
class ComplexEventProcessor:
"""Process complex event patterns and correlations"""
def __init__(self):
self.pattern_definitions = {}
self.active_patterns = {}
def define_pattern(self, pattern_name: str, pattern_definition: Dict):
"""Define a complex event pattern"""
self.pattern_definitions[pattern_name] = pattern_definition
async def process_event_for_patterns(self, event: Dict):
"""Check if event completes any defined patterns"""
completed_patterns = []
for pattern_name, pattern_def in self.pattern_definitions.items():
if await self.check_pattern_completion(event, pattern_def):
completed_patterns.append(pattern_name)
await self.handle_pattern_completion(pattern_name, event)
return completed_patterns
async def check_pattern_completion(self, event: Dict, pattern_def: Dict) -> bool:
"""Check if current event completes a pattern"""
# Simplified pattern matching - in practice, use CEP engines
required_events = pattern_def.get('required_events', [])
temporal_constraints = pattern_def.get('temporal_constraints', {})
# Check if this event type is part of the pattern
if event.get('type') not in required_events:
return False
# Check temporal constraints (simplified)
max_time_span = temporal_constraints.get('max_time_span', timedelta(hours=1))
# In a real implementation, you'd maintain state of partial pattern matches
# and check temporal relationships between events
return True # Simplified for example
# Example pattern definition
processor = ComplexEventProcessor()
# Define fraud detection pattern
processor.define_pattern('potential_fraud', {
'required_events': ['OrderPlaced', 'PaymentFailed', 'OrderPlaced'],
'temporal_constraints': {
'max_time_span': timedelta(minutes=10),
'min_frequency': 3 # 3 events within time span
},
'conditions': {
'same_customer': True,
'high_value_orders': True
}
})Diese weiterführenden Themen zeigen, wie sich Event-Driven Architecture in Cloud-Native-Umgebungen weiterentwickelt und neue Möglichkeiten für Skalierung, Abstraktion und intelligente Event-Verarbeitung bietet.