55 Backend-for-Frontend (BFF) Patterns

Verschiedene Clients haben unterschiedliche Anforderungen an dieselben Geschäftsdaten. Eine mobile App benötigt kompakte JSON-Responses, ein Admin-Dashboard detaillierte Metriken, und ein externes Partner-System strukturierte XML-Feeds. Wie aggregieren Sie Events intelligent für jeden Client-Typ, ohne redundante Services zu entwickeln?

Backend-for-Frontend Patterns lösen diese Herausforderung durch client-spezifische Abstraktionsschichten, die Event-Streams gezielt für verschiedene Konsumenten aufbereiten.

55.1 Client-specific Event Aggregation

Statt eines monolithischen APIs entwickeln Sie spezialisierte Services, die Events optimal für jeden Client-Typ zusammenfassen. Jeder BFF versteht die spezifischen Anforderungen seines Clients und aggregiert nur relevante Daten.

Betrachten Sie unterschiedliche Client-Perspektiven auf Bestelldaten:

Client-Typ Benötigte Daten Update-Frequenz Format-Präferenz
Mobile App Status, Lieferzeit Bei Statuswechsel Kompaktes JSON
Web-Dashboard Vollständige Historie Echtzeit Detailliertes JSON
Partner-API Logistikdaten Batch (stündlich) XML/EDI
Analytics Aggregierte Metriken Täglich CSV/Parquet

Spring Boot Mobile-BFF Implementation:

@RestController
@RequestMapping("/mobile/api")
public class MobileBffController {
    
    private final MobileOrderAggregator orderAggregator;
    
    @GetMapping("/orders/{customerId}")
    public ResponseEntity<MobileOrderSummary> getCustomerOrders(
            @PathVariable String customerId) {
        
        MobileOrderSummary summary = orderAggregator.aggregateForMobile(customerId);
        return ResponseEntity.ok(summary);
    }
}

@Service
public class MobileOrderAggregator {
    
    private final OrderEventRepository eventRepo;
    
    @KafkaListener(topics = {
        "order.placed.v1", 
        "payment.processed.v1", 
        "order.shipped.v1"
    })
    public void updateMobileProjection(String eventJson) {
        // Mobile-spezifische Projektion: nur kritische Status-Updates
        ObjectMapper mapper = new ObjectMapper();
        JsonNode event = mapper.readTree(eventJson);
        
        String eventType = event.path("eventType").asText();
        String orderId = event.path("data").path("orderId").asText();
        
        switch (eventType) {
            case "OrderPlaced":
                createMobileOrderEntry(event);
                break;
            case "PaymentProcessed":
                updateMobileOrderStatus(orderId, "PAID");
                break;
            case "OrderShipped":
                updateMobileOrderWithTracking(orderId, event);
                break;
            // Ignoriere Detail-Events wie "InventoryReserved" für Mobile
        }
    }
    
    public MobileOrderSummary aggregateForMobile(String customerId) {
        // Optimiert für Mobile: minimale Datenmenge, maximale Relevanz
        List<MobileOrderItem> orders = eventRepo.findActiveOrdersByCustomer(customerId)
            .stream()
            .map(this::toMobileFormat)
            .collect(Collectors.toList());
            
        return MobileOrderSummary.builder()
            .customerId(customerId)
            .activeOrders(orders)
            .nextDelivery(calculateNextDelivery(orders))
            .build();
    }
    
    private MobileOrderItem toMobileFormat(OrderEvent event) {
        return MobileOrderItem.builder()
            .orderId(event.getOrderId())
            .status(simplifyStatusForMobile(event.getStatus()))
            .estimatedDelivery(event.getEstimatedDelivery())
            .trackingCode(event.getTrackingNumber())
            .build();
    }
    
    private String simplifyStatusForMobile(String detailedStatus) {
        // Mobile braucht nur 4 Zuständen statt 15 internen Status
        return switch (detailedStatus) {
            case "SUBMITTED", "PAYMENT_PENDING" -> "PROCESSING";
            case "PAID", "INVENTORY_RESERVED", "PICKING" -> "PREPARING";
            case "SHIPPED", "IN_TRANSIT" -> "SHIPPED";
            case "DELIVERED" -> "DELIVERED";
            default -> "UNKNOWN";
        };
    }
}

Dashboard-BFF für detaillierte Ansichten:

@Service
public class DashboardOrderAggregator {
    
    @KafkaListener(topics = "order.events.all.v1") // Alle Events
    public void updateDashboardProjection(String eventJson) {
        // Dashboard braucht vollständige Event-Historie
        ObjectMapper mapper = new ObjectMapper();
        JsonNode event = mapper.readTree(eventJson);
        
        DashboardOrderEvent dashboardEvent = DashboardOrderEvent.builder()
            .orderId(event.path("data").path("orderId").asText())
            .eventType(event.path("eventType").asText())
            .timestamp(event.path("timestamp").asText())
            .details(event.path("data").toString()) // Vollständige Details
            .build();
            
        dashboardEventRepo.save(dashboardEvent);
    }
    
    public DashboardOrderDetails aggregateForDashboard(String orderId) {
        List<DashboardOrderEvent> events = dashboardEventRepo.findByOrderIdOrderByTimestamp(orderId);
        
        return DashboardOrderDetails.builder()
            .orderId(orderId)
            .timeline(events)
            .currentStatus(deriveCurrentStatus(events))
            .metrics(calculateDetailedMetrics(events))
            .issues(detectIssues(events))
            .build();
    }
}

Python Client-spezifische Aggregation:

from dataclasses import dataclass
from typing import List, Dict, Optional
import asyncio
import json

@dataclass
class MobileOrderItem:
    order_id: str
    status: str  # Vereinfacht für Mobile
    estimated_delivery: Optional[str]
    tracking_code: Optional[str]

@dataclass
class DashboardOrderItem:
    order_id: str
    detailed_status: str
    event_timeline: List[Dict]
    metrics: Dict
    current_issues: List[str]

class ClientSpecificAggregator:
    def __init__(self):
        self.mobile_projections = {}
        self.dashboard_projections = {}
    
    async def handle_event(self, event_data: dict):
        event_type = event_data.get('eventType')
        
        # Parallel-Aggregation für verschiedene Clients
        await asyncio.gather(
            self._update_mobile_projection(event_data),
            self._update_dashboard_projection(event_data),
            self._update_partner_projection(event_data)
        )
    
    async def _update_mobile_projection(self, event_data: dict):
        """Mobile: Nur kritische Status-Updates"""
        order_id = event_data['data']['orderId']
        event_type = event_data['eventType']
        
        # Filter: Mobile interessiert sich nur für Status-Änderungen
        if event_type in ['OrderPlaced', 'PaymentProcessed', 'OrderShipped', 'OrderDelivered']:
            mobile_item = self.mobile_projections.get(order_id, 
                MobileOrderItem(order_id=order_id, status='UNKNOWN', 
                               estimated_delivery=None, tracking_code=None))
            
            # Status-Vereinfachung für Mobile
            if event_type == 'OrderPlaced':
                mobile_item.status = 'PROCESSING'
            elif event_type == 'PaymentProcessed':
                mobile_item.status = 'PREPARING'
            elif event_type == 'OrderShipped':
                mobile_item.status = 'SHIPPED'
                mobile_item.tracking_code = event_data['data'].get('trackingNumber')
            
            self.mobile_projections[order_id] = mobile_item
    
    async def _update_dashboard_projection(self, event_data: dict):
        """Dashboard: Vollständige Event-Historie"""
        order_id = event_data['data']['orderId']
        
        if order_id not in self.dashboard_projections:
            self.dashboard_projections[order_id] = DashboardOrderItem(
                order_id=order_id,
                detailed_status='UNKNOWN',
                event_timeline=[],
                metrics={},
                current_issues=[]
            )
        
        dashboard_item = self.dashboard_projections[order_id]
        
        # Alle Events für Dashboard-Timeline
        dashboard_item.event_timeline.append({
            'type': event_data['eventType'],
            'timestamp': event_data['timestamp'],
            'data': event_data['data']
        })
        
        # Detaillierte Status-Verfolgung
        dashboard_item.detailed_status = event_data['eventType']
        
        # Issue-Detection
        if 'Failed' in event_data['eventType']:
            dashboard_item.current_issues.append(
                f"{event_data['eventType']}: {event_data['data'].get('reason', 'Unknown')}"
            )
    
    def get_mobile_summary(self, customer_id: str) -> Dict:
        """Mobile-optimierte Antwort"""
        customer_orders = [
            order for order in self.mobile_projections.values()
            if order.status != 'DELIVERED'  # Nur aktive Bestellungen
        ]
        
        return {
            'customerId': customer_id,
            'activeOrders': [
                {
                    'orderId': order.order_id,
                    'status': order.status,
                    'trackingCode': order.tracking_code
                }
                for order in customer_orders[:5]  # Max 5 für Mobile
            ],
            'nextDelivery': self._calculate_next_delivery(customer_orders)
        }
    
    def get_dashboard_details(self, order_id: str) -> Dict:
        """Dashboard-detaillierte Antwort"""
        dashboard_item = self.dashboard_projections.get(order_id)
        if not dashboard_item:
            return {'error': 'Order not found'}
        
        return {
            'orderId': order_id,
            'currentStatus': dashboard_item.detailed_status,
            'timeline': dashboard_item.event_timeline,
            'metrics': self._calculate_metrics(dashboard_item.event_timeline),
            'issues': dashboard_item.current_issues
        }

55.2 GraphQL Integration

GraphQL ermöglicht es Clients, exakt die Daten anzufordern, die sie benötigen. In Kombination mit Event-Streams entstehen flexible, echtzeitfähige APIs.

Denken Sie an GraphQL als “Query-Sprache für Ihre Event-Aggregationen” - Clients definieren ihre Datenanforderungen, und der GraphQL-Resolver sammelt die entsprechenden Events.

Spring Boot GraphQL Schema für Event-basierte Daten:

// GraphQL Schema Definition
"""
type Order {
  orderId: String!
  customerId: String!
  status: OrderStatus!
  items: [OrderItem!]!
  timeline: [OrderEvent!]!
  estimatedDelivery: String
  trackingInfo: TrackingInfo
}

type OrderEvent {
  eventType: String!
  timestamp: String!
  data: JSON!
}

type Query {
  order(orderId: String!): Order
  customerOrders(customerId: String!, includeHistory: Boolean = false): [Order!]!
}

type Subscription {
  orderUpdates(orderId: String!): Order!
}
"""

@Component
public class OrderGraphQLResolver implements GraphQLQueryResolver, GraphQLSubscriptionResolver {
    
    private final OrderEventAggregator aggregator;
    private final Flux<OrderUpdateEvent> orderUpdateStream;
    
    // Query Resolver
    public Order order(String orderId) {
        return aggregator.buildOrderFromEvents(orderId);
    }
    
    public List<Order> customerOrders(String customerId, Boolean includeHistory) {
        if (includeHistory) {
            return aggregator.buildCustomerOrderHistoryFromEvents(customerId);
        } else {
            return aggregator.buildActiveCustomerOrdersFromEvents(customerId);
        }
    }
    
    // Subscription Resolver für Real-time Updates
    public Publisher<Order> orderUpdates(String orderId) {
        return orderUpdateStream
            .filter(event -> orderId.equals(event.getOrderId()))
            .map(event -> aggregator.buildOrderFromEvents(event.getOrderId()));
    }
}

@Service
public class OrderEventAggregator {
    
    private final OrderEventRepository eventRepo;
    
    public Order buildOrderFromEvents(String orderId) {
        List<OrderEventEntity> events = eventRepo.findByOrderIdOrderByTimestamp(orderId);
        
        return events.stream()
            .collect(Order::new, this::applyEventToOrder, this::combineOrders);
    }
    
    private void applyEventToOrder(Order order, OrderEventEntity event) {
        switch (event.getEventType()) {
            case "OrderPlaced":
                order.setOrderId(event.getOrderId());
                order.setCustomerId(event.getCustomerId());
                order.setStatus(OrderStatus.SUBMITTED);
                break;
            case "PaymentProcessed":
                order.setStatus(OrderStatus.PAID);
                break;
            case "OrderShipped":
                order.setStatus(OrderStatus.SHIPPED);
                order.setTrackingInfo(extractTrackingInfo(event));
                break;
        }
        
        // Timeline für GraphQL-Clients
        order.getTimeline().add(OrderEvent.builder()
            .eventType(event.getEventType())
            .timestamp(event.getTimestamp())
            .data(event.getData())
            .build());
    }
}

GraphQL Subscription für Real-time Updates:

import strawberry
from typing import List, Optional, AsyncGenerator
import asyncio

@strawberry.type
class Order:
    order_id: str
    customer_id: str
    status: str
    timeline: List['OrderEvent']
    tracking_info: Optional[str] = None

@strawberry.type
class OrderEvent:
    event_type: str
    timestamp: str
    data: str

@strawberry.type
class Query:
    @strawberry.field
    async def order(self, order_id: str) -> Optional[Order]:
        aggregator = OrderEventAggregator()
        return await aggregator.build_order_from_events(order_id)
    
    @strawberry.field
    async def customer_orders(self, customer_id: str, include_history: bool = False) -> List[Order]:
        aggregator = OrderEventAggregator()
        if include_history:
            return await aggregator.build_customer_order_history(customer_id)
        else:
            return await aggregator.build_active_customer_orders(customer_id)

@strawberry.type
class Subscription:
    @strawberry.subscription
    async def order_updates(self, order_id: str) -> AsyncGenerator[Order, None]:
        """Real-time Order Updates via GraphQL Subscription"""
        event_stream = EventStreamListener()
        
        async for event in event_stream.listen_to_order_events(order_id):
            aggregator = OrderEventAggregator()
            updated_order = await aggregator.build_order_from_events(order_id)
            yield updated_order

class EventStreamListener:
    async def listen_to_order_events(self, order_id: str):
        # Kafka Consumer für Real-time Events
        consumer = AIOKafkaConsumer(
            'order.events.all.v1',
            bootstrap_servers='localhost:9092',
            group_id=f'graphql_subscription_{order_id}'
        )
        
        await consumer.start()
        try:
            async for message in consumer:
                event_data = json.loads(message.value)
                if event_data['data'].get('orderId') == order_id:
                    yield event_data
        finally:
            await consumer.stop()

# GraphQL Schema erstellen
schema = strawberry.Schema(
    query=Query,
    subscription=Subscription
)

55.3 Real-time Updates

Event-getriebene Systeme ermöglichen natürliche Real-time Capabilities. Clients können sich für relevante Events anmelden und sofortige Updates erhalten.

WebSocket-basierte Real-time Updates:

@Controller
public class OrderWebSocketController {
    
    private final SimpMessagingTemplate messagingTemplate;
    
    @KafkaListener(topics = {"order.placed.v1", "order.shipped.v1", "payment.processed.v1"})
    public void handleOrderEvent(String eventJson) {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode event = mapper.readTree(eventJson);
        
        String orderId = event.path("data").path("orderId").asText();
        String customerId = event.path("data").path("customerId").asText();
        
        // Client-spezifische Benachrichtigungen
        OrderUpdateNotification notification = OrderUpdateNotification.builder()
            .orderId(orderId)
            .eventType(event.path("eventType").asText())
            .timestamp(event.path("timestamp").asText())
            .customerMessage(generateCustomerMessage(event))
            .build();
        
        // An spezifischen Customer senden
        messagingTemplate.convertAndSendToUser(
            customerId, 
            "/queue/order-updates", 
            notification
        );
        
        // An Admin-Dashboard senden
        messagingTemplate.convertAndSend(
            "/topic/admin/order-updates",
            notification
        );
    }
    
    private String generateCustomerMessage(JsonNode event) {
        String eventType = event.path("eventType").asText();
        return switch (eventType) {
            case "OrderPlaced" -> "Ihre Bestellung wurde erfolgreich aufgegeben.";
            case "PaymentProcessed" -> "Ihre Zahlung wurde bestätigt.";
            case "OrderShipped" -> "Ihre Bestellung wurde versendet.";
            default -> "Status Ihrer Bestellung wurde aktualisiert.";
        };
    }
}

@RestController
public class OrderSubscriptionController {
    
    @PostMapping("/api/orders/{orderId}/subscribe")
    public ResponseEntity<String> subscribeToOrderUpdates(
            @PathVariable String orderId,
            @RequestParam String webhookUrl) {
        
        // Webhook-Subscription für externe Clients
        WebhookSubscription subscription = WebhookSubscription.builder()
            .orderId(orderId)
            .webhookUrl(webhookUrl)
            .subscribedAt(Instant.now())
            .build();
            
        subscriptionService.save(subscription);
        
        return ResponseEntity.ok("Subscribed to order updates");
    }
}

Server-Sent Events (SSE) für Real-time Streams:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

class OrderUpdateStreamer:
    def __init__(self):
        self.active_connections = {}
    
    async def stream_order_updates(self, customer_id: str):
        """SSE Stream für Customer-spezifische Updates"""
        
        # Kafka Consumer für Real-time Events
        consumer = AIOKafkaConsumer(
            'order.events.customer.v1',
            bootstrap_servers='localhost:9092',
            group_id=f'sse_stream_{customer_id}'
        )
        
        await consumer.start()
        
        try:
            async for message in consumer:
                event_data = json.loads(message.value)
                
                # Filter für spezifischen Customer
                if event_data['data'].get('customerId') == customer_id:
                    
                    # Customer-freundliche Nachricht formatieren
                    sse_message = self._format_sse_message(event_data)
                    yield f"data: {json.dumps(sse_message)}\n\n"
                    
                    # Kleine Pause um Client nicht zu überlasten
                    await asyncio.sleep(0.1)
                    
        finally:
            await consumer.stop()
    
    def _format_sse_message(self, event_data: dict) -> dict:
        event_type = event_data['eventType']
        order_id = event_data['data']['orderId']
        
        return {
            'type': 'order_update',
            'orderId': order_id,
            'message': self._get_customer_message(event_type),
            'timestamp': event_data['timestamp'],
            'data': {
                'status': self._simplify_status(event_type),
                'orderId': order_id
            }
        }
    
    def _get_customer_message(self, event_type: str) -> str:
        messages = {
            'OrderPlaced': 'Ihre Bestellung wurde aufgegeben',
            'PaymentProcessed': 'Zahlung bestätigt',
            'OrderShipped': 'Bestellung versendet',
            'OrderDelivered': 'Bestellung zugestellt'
        }
        return messages.get(event_type, 'Bestellstatus aktualisiert')

@app.get("/api/customers/{customer_id}/order-stream")
async def stream_customer_orders(customer_id: str):
    """Server-Sent Events Endpoint für Real-time Updates"""
    
    streamer = OrderUpdateStreamer()
    
    return StreamingResponse(
        streamer.stream_order_updates(customer_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Access-Control-Allow-Origin": "*"
        }
    )

# WebSocket Alternative für bidirektionale Kommunikation
@app.websocket("/ws/orders/{customer_id}")
async def websocket_order_updates(websocket: WebSocket, customer_id: str):
    await websocket.accept()
    
    consumer = AIOKafkaConsumer(
        'order.events.customer.v1',
        bootstrap_servers='localhost:9092',
        group_id=f'websocket_{customer_id}'
    )
    
    await consumer.start()
    
    try:
        async for message in consumer:
            event_data = json.loads(message.value)
            
            if event_data['data'].get('customerId') == customer_id:
                await websocket.send_json({
                    'type': 'order_update',
                    'orderId': event_data['data']['orderId'],
                    'status': event_data['eventType'],
                    'timestamp': event_data['timestamp']
                })
                
    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await consumer.stop()
        await websocket.close()

Praktische Überlegungen für Real-time BFF:

Update-Mechanismus Latenz Client-Komplexität Skalierbarkeit Use Case
WebSocket <100ms Mittel Gut Interactive Dashboards
Server-Sent Events <500ms Niedrig Sehr gut Status Updates
GraphQL Subscriptions <200ms Niedrig Gut Flexible Queries
Webhook Callbacks >1s Sehr niedrig Exzellent System Integration

Backend-for-Frontend Patterns mit Event-Streams ermöglichen es, die richtige Balance zwischen Client-Spezifität und System-Effizienz zu finden. Jeder Client erhält genau die Daten, die er benötigt, im Format und mit der Aktualität, die optimal für seinen Anwendungsfall ist.

Welche Client-Typen bedienen Sie, und welche unterschiedlichen Anforderungen haben diese an Ihre Event-Daten?