53 Integration mit Request-Response Systemen

In der Praxis treffen event-getriebene Systeme regelmäßig auf traditionelle synchrone Architekturen. Clients erwarten oft sofortige Antworten, während Backend-Services asynchron über Events kommunizieren. Diese scheinbare Inkompatibilität lässt sich durch bewährte Integrationsmuster elegant auflösen.

53.1 Synchrone Schnittstellen in asynchronen Systemen

53.1.1 Das Spannungsfeld zwischen synchron und asynchron

Event-getriebene Systeme bieten Entkopplung und Skalierbarkeit, aber externe Clients benötigen oft unmittelbare Antworten. Ein typisches Szenario: Ein Kunde möchte seinen Bestellstatus abfragen, während das Backend über Events wie OrderPlaced, PaymentProcessed und OrderShipped kommuniziert.

Synchroner Client Asynchrones Backend Herausforderung
GET /orders/123 Events in Kafka Topics Sofortige Antwort aus Event-Stream
POST /orders Workflow über mehrere Services Status-Feedback bei async. Verarbeitung
PUT /orders/123 Eventual Consistency Konsistente Lese-/Schreiboperationen

53.1.2 API Gateway Patterns

Ein API Gateway fungiert als Übersetzer zwischen synchronen und asynchronen Welten. Es nimmt REST-Requests entgegen und orchestriert die Kommunikation mit event-getriebenen Backend-Services.

Spring Boot API Gateway Implementierung:

@RestController
@RequestMapping("/api/orders")
public class OrderApiController {
    
    private final OrderEventService orderEventService;
    private final OrderQueryService orderQueryService;
    
    @PostMapping
    public ResponseEntity<OrderResponse> createOrder(@RequestBody OrderRequest request) {
        // Event für asynchrone Verarbeitung publizieren
        String orderId = UUID.randomUUID().toString();
        OrderPlacedEvent event = new OrderPlacedEvent(
            orderId, 
            request.getCustomerId(), 
            request.getItems(),
            request.getTotalAmount()
        );
        
        orderEventService.publishOrderPlaced(event);
        
        // Sofortige Antwort mit Tracking-Informationen
        return ResponseEntity.accepted()
            .body(new OrderResponse(orderId, "SUBMITTED", 
                "/api/orders/" + orderId + "/status"));
    }
    
    @GetMapping("/{orderId}")
    public ResponseEntity<OrderDetails> getOrder(@PathVariable String orderId) {
        // Materialized View aus Event-Stream lesen
        OrderDetails details = orderQueryService.getOrderDetails(orderId);
        return ResponseEntity.ok(details);
    }
}

@Service
public class OrderEventService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public void publishOrderPlaced(OrderPlacedEvent event) {
        kafkaTemplate.send("order.placed.v1", event.getOrderId(), event);
    }
}

Python FastAPI Äquivalent:

from fastapi import FastAPI, HTTPException
from confluent_kafka import Producer
import json
import uuid
from datetime import datetime

app = FastAPI()

class OrderApiService:
    def __init__(self):
        self.producer = Producer({
            'bootstrap.servers': 'localhost:9092',
            'client.id': 'order-api'
        })
        self.order_query = OrderQueryService()
    
    async def create_order(self, order_request: dict):
        order_id = str(uuid.uuid4())
        
        # Event für asynchrone Verarbeitung
        event = {
            "eventId": str(uuid.uuid4()),
            "eventType": "OrderPlaced",
            "timestamp": datetime.now().isoformat(),
            "data": {
                "orderId": order_id,
                "customerId": order_request["customerId"],
                "items": order_request["items"],
                "totalAmount": order_request["totalAmount"]
            }
        }
        
        self.producer.produce(
            topic='order.placed.v1',
            key=order_id,
            value=json.dumps(event)
        )
        self.producer.flush()
        
        return {
            "orderId": order_id,
            "status": "SUBMITTED",
            "statusUrl": f"/api/orders/{order_id}/status"
        }

@app.post("/api/orders")
async def create_order(request: dict):
    service = OrderApiService()
    response = await service.create_order(request)
    return response

53.1.3 Event-to-Request Transformation

Manchmal müssen Events in synchrone Aufrufe umgewandelt werden, etwa bei der Integration von Legacy-Systemen. Das Gateway übersetzt Events in REST-Calls.

Pattern: Event-triggered API Call

@Component
public class EventToRequestBridge {
    
    private final RestTemplate restTemplate;
    
    @KafkaListener(topics = "payment.processed.v1")
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        // Event-Daten für Legacy-System aufbereiten
        LegacyPaymentRequest request = LegacyPaymentRequest.builder()
            .orderId(event.getOrderId())
            .amount(event.getAmount())
            .paymentMethod(event.getPaymentMethod())
            .build();
        
        try {
            // Synchroner Aufruf an Legacy-System
            ResponseEntity<String> response = restTemplate.postForEntity(
                "http://legacy-billing/api/payments", 
                request, 
                String.class
            );
            
            if (response.getStatusCode().is2xxSuccessful()) {
                // Erfolg als Event publizieren
                publishBillingCompleted(event.getOrderId());
            }
        } catch (Exception e) {
            // Fehler als Event publizieren
            publishBillingFailed(event.getOrderId(), e.getMessage());
        }
    }
}

53.1.4 Response-to-Event Mapping

Umgekehrt werden Antworten aus synchronen Systemen in Events transformiert, um sie in den event-getriebenen Fluss zu integrieren.

Pattern: Response-triggered Event

class ResponseToEventMapper:
    def __init__(self, event_publisher):
        self.event_publisher = event_publisher
        self.http_client = httpx.AsyncClient()
    
    async def call_external_service_and_emit_event(self, order_id: str):
        try:
            # Synchroner Call an externes System
            response = await self.http_client.get(
                f"http://external-shipping/api/rates/{order_id}"
            )
            
            if response.status_code == 200:
                shipping_data = response.json()
                
                # Response in Event transformieren
                event = {
                    "eventType": "ShippingRateCalculated",
                    "orderId": order_id,
                    "shippingCost": shipping_data["cost"],
                    "estimatedDelivery": shipping_data["delivery_date"]
                }
                
                await self.event_publisher.publish(
                    topic="shipping.rate.calculated.v1",
                    event=event
                )
                
        except Exception as e:
            # Fehler ebenfalls als Event behandeln
            error_event = {
                "eventType": "ShippingRateCalculationFailed",
                "orderId": order_id,
                "error": str(e)
            }
            await self.event_publisher.publish(
                topic="shipping.rate.failed.v1", 
                event=error_event
            )

53.1.5 Praktische Implementierungsmuster

Correlation IDs für Request-Response-Event-Zyklen:

public class CorrelationService {
    
    @PostMapping("/orders/{orderId}/expedite")
    public ResponseEntity<String> expediteOrder(
            @PathVariable String orderId,
            @RequestHeader("X-Correlation-ID") String correlationId) {
        
        // Event mit Correlation ID publizieren
        ExpeditedShippingRequestedEvent event = ExpeditedShippingRequestedEvent.builder()
            .orderId(orderId)
            .correlationId(correlationId)
            .requestedBy("API")
            .build();
            
        eventPublisher.publish("shipping.expedite.requested.v1", event);
        
        return ResponseEntity.accepted()
            .header("X-Correlation-ID", correlationId)
            .body("Expedited shipping requested - track with correlation ID");
    }
}

Timeout-Handling bei async-to-sync Bridges:

Szenario Timeout Fallback-Strategie
Payment-Verifikation 5 Sekunden Cached result oder “pending”
Inventory-Check 2 Sekunden Assume available mit async. Korrektur
Shipping-Rate 10 Sekunden Standard rate mit async. Update

Diese Patterns ermöglichen es, die Vorteile beider Welten zu nutzen: die Responsiveness synchroner APIs und die Robustheit asynchroner Event-Systeme. Im nächsten Abschnitt betrachten wir, wie real-time Requests direkt aus Event-Streams bedient werden können.