Verschiedene Clients haben unterschiedliche Anforderungen an dieselben Geschäftsdaten. Eine mobile App benötigt kompakte JSON-Responses, ein Admin-Dashboard detaillierte Metriken, und ein externes Partner-System strukturierte XML-Feeds. Wie aggregieren Sie Events intelligent für jeden Client-Typ, ohne redundante Services zu entwickeln?
Backend-for-Frontend Patterns lösen diese Herausforderung durch client-spezifische Abstraktionsschichten, die Event-Streams gezielt für verschiedene Konsumenten aufbereiten.
Statt eines monolithischen APIs entwickeln Sie spezialisierte Services, die Events optimal für jeden Client-Typ zusammenfassen. Jeder BFF versteht die spezifischen Anforderungen seines Clients und aggregiert nur relevante Daten.
Betrachten Sie unterschiedliche Client-Perspektiven auf Bestelldaten:
| Client-Typ | Benötigte Daten | Update-Frequenz | Format-Präferenz |
|---|---|---|---|
| Mobile App | Status, Lieferzeit | Bei Statuswechsel | Kompaktes JSON |
| Web-Dashboard | Vollständige Historie | Echtzeit | Detailliertes JSON |
| Partner-API | Logistikdaten | Batch (stündlich) | XML/EDI |
| Analytics | Aggregierte Metriken | Täglich | CSV/Parquet |
Spring Boot Mobile-BFF Implementation:
@RestController
@RequestMapping("/mobile/api")
public class MobileBffController {
private final MobileOrderAggregator orderAggregator;
@GetMapping("/orders/{customerId}")
public ResponseEntity<MobileOrderSummary> getCustomerOrders(
@PathVariable String customerId) {
MobileOrderSummary summary = orderAggregator.aggregateForMobile(customerId);
return ResponseEntity.ok(summary);
}
}
@Service
public class MobileOrderAggregator {
private final OrderEventRepository eventRepo;
@KafkaListener(topics = {
"order.placed.v1",
"payment.processed.v1",
"order.shipped.v1"
})
public void updateMobileProjection(String eventJson) {
// Mobile-spezifische Projektion: nur kritische Status-Updates
ObjectMapper mapper = new ObjectMapper();
JsonNode event = mapper.readTree(eventJson);
String eventType = event.path("eventType").asText();
String orderId = event.path("data").path("orderId").asText();
switch (eventType) {
case "OrderPlaced":
createMobileOrderEntry(event);
break;
case "PaymentProcessed":
updateMobileOrderStatus(orderId, "PAID");
break;
case "OrderShipped":
updateMobileOrderWithTracking(orderId, event);
break;
// Ignoriere Detail-Events wie "InventoryReserved" für Mobile
}
}
public MobileOrderSummary aggregateForMobile(String customerId) {
// Optimiert für Mobile: minimale Datenmenge, maximale Relevanz
List<MobileOrderItem> orders = eventRepo.findActiveOrdersByCustomer(customerId)
.stream()
.map(this::toMobileFormat)
.collect(Collectors.toList());
return MobileOrderSummary.builder()
.customerId(customerId)
.activeOrders(orders)
.nextDelivery(calculateNextDelivery(orders))
.build();
}
private MobileOrderItem toMobileFormat(OrderEvent event) {
return MobileOrderItem.builder()
.orderId(event.getOrderId())
.status(simplifyStatusForMobile(event.getStatus()))
.estimatedDelivery(event.getEstimatedDelivery())
.trackingCode(event.getTrackingNumber())
.build();
}
private String simplifyStatusForMobile(String detailedStatus) {
// Mobile braucht nur 4 Zuständen statt 15 internen Status
return switch (detailedStatus) {
case "SUBMITTED", "PAYMENT_PENDING" -> "PROCESSING";
case "PAID", "INVENTORY_RESERVED", "PICKING" -> "PREPARING";
case "SHIPPED", "IN_TRANSIT" -> "SHIPPED";
case "DELIVERED" -> "DELIVERED";
default -> "UNKNOWN";
};
}
}Dashboard-BFF für detaillierte Ansichten:
@Service
public class DashboardOrderAggregator {
@KafkaListener(topics = "order.events.all.v1") // Alle Events
public void updateDashboardProjection(String eventJson) {
// Dashboard braucht vollständige Event-Historie
ObjectMapper mapper = new ObjectMapper();
JsonNode event = mapper.readTree(eventJson);
DashboardOrderEvent dashboardEvent = DashboardOrderEvent.builder()
.orderId(event.path("data").path("orderId").asText())
.eventType(event.path("eventType").asText())
.timestamp(event.path("timestamp").asText())
.details(event.path("data").toString()) // Vollständige Details
.build();
dashboardEventRepo.save(dashboardEvent);
}
public DashboardOrderDetails aggregateForDashboard(String orderId) {
List<DashboardOrderEvent> events = dashboardEventRepo.findByOrderIdOrderByTimestamp(orderId);
return DashboardOrderDetails.builder()
.orderId(orderId)
.timeline(events)
.currentStatus(deriveCurrentStatus(events))
.metrics(calculateDetailedMetrics(events))
.issues(detectIssues(events))
.build();
}
}Python Client-spezifische Aggregation:
from dataclasses import dataclass
from typing import List, Dict, Optional
import asyncio
import json
@dataclass
class MobileOrderItem:
order_id: str
status: str # Vereinfacht für Mobile
estimated_delivery: Optional[str]
tracking_code: Optional[str]
@dataclass
class DashboardOrderItem:
order_id: str
detailed_status: str
event_timeline: List[Dict]
metrics: Dict
current_issues: List[str]
class ClientSpecificAggregator:
def __init__(self):
self.mobile_projections = {}
self.dashboard_projections = {}
async def handle_event(self, event_data: dict):
event_type = event_data.get('eventType')
# Parallel-Aggregation für verschiedene Clients
await asyncio.gather(
self._update_mobile_projection(event_data),
self._update_dashboard_projection(event_data),
self._update_partner_projection(event_data)
)
async def _update_mobile_projection(self, event_data: dict):
"""Mobile: Nur kritische Status-Updates"""
order_id = event_data['data']['orderId']
event_type = event_data['eventType']
# Filter: Mobile interessiert sich nur für Status-Änderungen
if event_type in ['OrderPlaced', 'PaymentProcessed', 'OrderShipped', 'OrderDelivered']:
mobile_item = self.mobile_projections.get(order_id,
MobileOrderItem(order_id=order_id, status='UNKNOWN',
estimated_delivery=None, tracking_code=None))
# Status-Vereinfachung für Mobile
if event_type == 'OrderPlaced':
mobile_item.status = 'PROCESSING'
elif event_type == 'PaymentProcessed':
mobile_item.status = 'PREPARING'
elif event_type == 'OrderShipped':
mobile_item.status = 'SHIPPED'
mobile_item.tracking_code = event_data['data'].get('trackingNumber')
self.mobile_projections[order_id] = mobile_item
async def _update_dashboard_projection(self, event_data: dict):
"""Dashboard: Vollständige Event-Historie"""
order_id = event_data['data']['orderId']
if order_id not in self.dashboard_projections:
self.dashboard_projections[order_id] = DashboardOrderItem(
order_id=order_id,
detailed_status='UNKNOWN',
event_timeline=[],
metrics={},
current_issues=[]
)
dashboard_item = self.dashboard_projections[order_id]
# Alle Events für Dashboard-Timeline
dashboard_item.event_timeline.append({
'type': event_data['eventType'],
'timestamp': event_data['timestamp'],
'data': event_data['data']
})
# Detaillierte Status-Verfolgung
dashboard_item.detailed_status = event_data['eventType']
# Issue-Detection
if 'Failed' in event_data['eventType']:
dashboard_item.current_issues.append(
f"{event_data['eventType']}: {event_data['data'].get('reason', 'Unknown')}"
)
def get_mobile_summary(self, customer_id: str) -> Dict:
"""Mobile-optimierte Antwort"""
customer_orders = [
order for order in self.mobile_projections.values()
if order.status != 'DELIVERED' # Nur aktive Bestellungen
]
return {
'customerId': customer_id,
'activeOrders': [
{
'orderId': order.order_id,
'status': order.status,
'trackingCode': order.tracking_code
}
for order in customer_orders[:5] # Max 5 für Mobile
],
'nextDelivery': self._calculate_next_delivery(customer_orders)
}
def get_dashboard_details(self, order_id: str) -> Dict:
"""Dashboard-detaillierte Antwort"""
dashboard_item = self.dashboard_projections.get(order_id)
if not dashboard_item:
return {'error': 'Order not found'}
return {
'orderId': order_id,
'currentStatus': dashboard_item.detailed_status,
'timeline': dashboard_item.event_timeline,
'metrics': self._calculate_metrics(dashboard_item.event_timeline),
'issues': dashboard_item.current_issues
}GraphQL ermöglicht es Clients, exakt die Daten anzufordern, die sie benötigen. In Kombination mit Event-Streams entstehen flexible, echtzeitfähige APIs.
Denken Sie an GraphQL als “Query-Sprache für Ihre Event-Aggregationen” - Clients definieren ihre Datenanforderungen, und der GraphQL-Resolver sammelt die entsprechenden Events.
Spring Boot GraphQL Schema für Event-basierte Daten:
// GraphQL Schema Definition
"""
type Order {
orderId: String!
customerId: String!
status: OrderStatus!
items: [OrderItem!]!
timeline: [OrderEvent!]!
estimatedDelivery: String
trackingInfo: TrackingInfo
}
type OrderEvent {
eventType: String!
timestamp: String!
data: JSON!
}
type Query {
order(orderId: String!): Order
customerOrders(customerId: String!, includeHistory: Boolean = false): [Order!]!
}
type Subscription {
orderUpdates(orderId: String!): Order!
}
"""
@Component
public class OrderGraphQLResolver implements GraphQLQueryResolver, GraphQLSubscriptionResolver {
private final OrderEventAggregator aggregator;
private final Flux<OrderUpdateEvent> orderUpdateStream;
// Query Resolver
public Order order(String orderId) {
return aggregator.buildOrderFromEvents(orderId);
}
public List<Order> customerOrders(String customerId, Boolean includeHistory) {
if (includeHistory) {
return aggregator.buildCustomerOrderHistoryFromEvents(customerId);
} else {
return aggregator.buildActiveCustomerOrdersFromEvents(customerId);
}
}
// Subscription Resolver für Real-time Updates
public Publisher<Order> orderUpdates(String orderId) {
return orderUpdateStream
.filter(event -> orderId.equals(event.getOrderId()))
.map(event -> aggregator.buildOrderFromEvents(event.getOrderId()));
}
}
@Service
public class OrderEventAggregator {
private final OrderEventRepository eventRepo;
public Order buildOrderFromEvents(String orderId) {
List<OrderEventEntity> events = eventRepo.findByOrderIdOrderByTimestamp(orderId);
return events.stream()
.collect(Order::new, this::applyEventToOrder, this::combineOrders);
}
private void applyEventToOrder(Order order, OrderEventEntity event) {
switch (event.getEventType()) {
case "OrderPlaced":
order.setOrderId(event.getOrderId());
order.setCustomerId(event.getCustomerId());
order.setStatus(OrderStatus.SUBMITTED);
break;
case "PaymentProcessed":
order.setStatus(OrderStatus.PAID);
break;
case "OrderShipped":
order.setStatus(OrderStatus.SHIPPED);
order.setTrackingInfo(extractTrackingInfo(event));
break;
}
// Timeline für GraphQL-Clients
order.getTimeline().add(OrderEvent.builder()
.eventType(event.getEventType())
.timestamp(event.getTimestamp())
.data(event.getData())
.build());
}
}GraphQL Subscription für Real-time Updates:
import strawberry
from typing import List, Optional, AsyncGenerator
import asyncio
@strawberry.type
class Order:
order_id: str
customer_id: str
status: str
timeline: List['OrderEvent']
tracking_info: Optional[str] = None
@strawberry.type
class OrderEvent:
event_type: str
timestamp: str
data: str
@strawberry.type
class Query:
@strawberry.field
async def order(self, order_id: str) -> Optional[Order]:
aggregator = OrderEventAggregator()
return await aggregator.build_order_from_events(order_id)
@strawberry.field
async def customer_orders(self, customer_id: str, include_history: bool = False) -> List[Order]:
aggregator = OrderEventAggregator()
if include_history:
return await aggregator.build_customer_order_history(customer_id)
else:
return await aggregator.build_active_customer_orders(customer_id)
@strawberry.type
class Subscription:
@strawberry.subscription
async def order_updates(self, order_id: str) -> AsyncGenerator[Order, None]:
"""Real-time Order Updates via GraphQL Subscription"""
event_stream = EventStreamListener()
async for event in event_stream.listen_to_order_events(order_id):
aggregator = OrderEventAggregator()
updated_order = await aggregator.build_order_from_events(order_id)
yield updated_order
class EventStreamListener:
async def listen_to_order_events(self, order_id: str):
# Kafka Consumer für Real-time Events
consumer = AIOKafkaConsumer(
'order.events.all.v1',
bootstrap_servers='localhost:9092',
group_id=f'graphql_subscription_{order_id}'
)
await consumer.start()
try:
async for message in consumer:
event_data = json.loads(message.value)
if event_data['data'].get('orderId') == order_id:
yield event_data
finally:
await consumer.stop()
# GraphQL Schema erstellen
schema = strawberry.Schema(
query=Query,
subscription=Subscription
)Event-getriebene Systeme ermöglichen natürliche Real-time Capabilities. Clients können sich für relevante Events anmelden und sofortige Updates erhalten.
WebSocket-basierte Real-time Updates:
@Controller
public class OrderWebSocketController {
private final SimpMessagingTemplate messagingTemplate;
@KafkaListener(topics = {"order.placed.v1", "order.shipped.v1", "payment.processed.v1"})
public void handleOrderEvent(String eventJson) {
ObjectMapper mapper = new ObjectMapper();
JsonNode event = mapper.readTree(eventJson);
String orderId = event.path("data").path("orderId").asText();
String customerId = event.path("data").path("customerId").asText();
// Client-spezifische Benachrichtigungen
OrderUpdateNotification notification = OrderUpdateNotification.builder()
.orderId(orderId)
.eventType(event.path("eventType").asText())
.timestamp(event.path("timestamp").asText())
.customerMessage(generateCustomerMessage(event))
.build();
// An spezifischen Customer senden
messagingTemplate.convertAndSendToUser(
customerId,
"/queue/order-updates",
notification
);
// An Admin-Dashboard senden
messagingTemplate.convertAndSend(
"/topic/admin/order-updates",
notification
);
}
private String generateCustomerMessage(JsonNode event) {
String eventType = event.path("eventType").asText();
return switch (eventType) {
case "OrderPlaced" -> "Ihre Bestellung wurde erfolgreich aufgegeben.";
case "PaymentProcessed" -> "Ihre Zahlung wurde bestätigt.";
case "OrderShipped" -> "Ihre Bestellung wurde versendet.";
default -> "Status Ihrer Bestellung wurde aktualisiert.";
};
}
}
@RestController
public class OrderSubscriptionController {
@PostMapping("/api/orders/{orderId}/subscribe")
public ResponseEntity<String> subscribeToOrderUpdates(
@PathVariable String orderId,
@RequestParam String webhookUrl) {
// Webhook-Subscription für externe Clients
WebhookSubscription subscription = WebhookSubscription.builder()
.orderId(orderId)
.webhookUrl(webhookUrl)
.subscribedAt(Instant.now())
.build();
subscriptionService.save(subscription);
return ResponseEntity.ok("Subscribed to order updates");
}
}Server-Sent Events (SSE) für Real-time Streams:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
class OrderUpdateStreamer:
def __init__(self):
self.active_connections = {}
async def stream_order_updates(self, customer_id: str):
"""SSE Stream für Customer-spezifische Updates"""
# Kafka Consumer für Real-time Events
consumer = AIOKafkaConsumer(
'order.events.customer.v1',
bootstrap_servers='localhost:9092',
group_id=f'sse_stream_{customer_id}'
)
await consumer.start()
try:
async for message in consumer:
event_data = json.loads(message.value)
# Filter für spezifischen Customer
if event_data['data'].get('customerId') == customer_id:
# Customer-freundliche Nachricht formatieren
sse_message = self._format_sse_message(event_data)
yield f"data: {json.dumps(sse_message)}\n\n"
# Kleine Pause um Client nicht zu überlasten
await asyncio.sleep(0.1)
finally:
await consumer.stop()
def _format_sse_message(self, event_data: dict) -> dict:
event_type = event_data['eventType']
order_id = event_data['data']['orderId']
return {
'type': 'order_update',
'orderId': order_id,
'message': self._get_customer_message(event_type),
'timestamp': event_data['timestamp'],
'data': {
'status': self._simplify_status(event_type),
'orderId': order_id
}
}
def _get_customer_message(self, event_type: str) -> str:
messages = {
'OrderPlaced': 'Ihre Bestellung wurde aufgegeben',
'PaymentProcessed': 'Zahlung bestätigt',
'OrderShipped': 'Bestellung versendet',
'OrderDelivered': 'Bestellung zugestellt'
}
return messages.get(event_type, 'Bestellstatus aktualisiert')
@app.get("/api/customers/{customer_id}/order-stream")
async def stream_customer_orders(customer_id: str):
"""Server-Sent Events Endpoint für Real-time Updates"""
streamer = OrderUpdateStreamer()
return StreamingResponse(
streamer.stream_order_updates(customer_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*"
}
)
# WebSocket Alternative für bidirektionale Kommunikation
@app.websocket("/ws/orders/{customer_id}")
async def websocket_order_updates(websocket: WebSocket, customer_id: str):
await websocket.accept()
consumer = AIOKafkaConsumer(
'order.events.customer.v1',
bootstrap_servers='localhost:9092',
group_id=f'websocket_{customer_id}'
)
await consumer.start()
try:
async for message in consumer:
event_data = json.loads(message.value)
if event_data['data'].get('customerId') == customer_id:
await websocket.send_json({
'type': 'order_update',
'orderId': event_data['data']['orderId'],
'status': event_data['eventType'],
'timestamp': event_data['timestamp']
})
except Exception as e:
print(f"WebSocket error: {e}")
finally:
await consumer.stop()
await websocket.close()Praktische Überlegungen für Real-time BFF:
| Update-Mechanismus | Latenz | Client-Komplexität | Skalierbarkeit | Use Case |
|---|---|---|---|---|
| WebSocket | <100ms | Mittel | Gut | Interactive Dashboards |
| Server-Sent Events | <500ms | Niedrig | Sehr gut | Status Updates |
| GraphQL Subscriptions | <200ms | Niedrig | Gut | Flexible Queries |
| Webhook Callbacks | >1s | Sehr niedrig | Exzellent | System Integration |
Backend-for-Frontend Patterns mit Event-Streams ermöglichen es, die richtige Balance zwischen Client-Spezifität und System-Effizienz zu finden. Jeder Client erhält genau die Daten, die er benötigt, im Format und mit der Aktualität, die optimal für seinen Anwendungsfall ist.
Welche Client-Typen bedienen Sie, und welche unterschiedlichen Anforderungen haben diese an Ihre Event-Daten?