In der Praxis treffen event-getriebene Systeme regelmäßig auf traditionelle synchrone Architekturen. Clients erwarten oft sofortige Antworten, während Backend-Services asynchron über Events kommunizieren. Diese scheinbare Inkompatibilität lässt sich durch bewährte Integrationsmuster elegant auflösen.
Event-getriebene Systeme bieten Entkopplung und Skalierbarkeit, aber
externe Clients benötigen oft unmittelbare Antworten. Ein typisches
Szenario: Ein Kunde möchte seinen Bestellstatus abfragen, während das
Backend über Events wie OrderPlaced,
PaymentProcessed und OrderShipped
kommuniziert.
| Synchroner Client | Asynchrones Backend | Herausforderung |
|---|---|---|
| GET /orders/123 | Events in Kafka Topics | Sofortige Antwort aus Event-Stream |
| POST /orders | Workflow über mehrere Services | Status-Feedback bei async. Verarbeitung |
| PUT /orders/123 | Eventual Consistency | Konsistente Lese-/Schreiboperationen |
Ein API Gateway fungiert als Übersetzer zwischen synchronen und asynchronen Welten. Es nimmt REST-Requests entgegen und orchestriert die Kommunikation mit event-getriebenen Backend-Services.
Spring Boot API Gateway Implementierung:
@RestController
@RequestMapping("/api/orders")
public class OrderApiController {
private final OrderEventService orderEventService;
private final OrderQueryService orderQueryService;
@PostMapping
public ResponseEntity<OrderResponse> createOrder(@RequestBody OrderRequest request) {
// Event für asynchrone Verarbeitung publizieren
String orderId = UUID.randomUUID().toString();
OrderPlacedEvent event = new OrderPlacedEvent(
orderId,
request.getCustomerId(),
request.getItems(),
request.getTotalAmount()
);
orderEventService.publishOrderPlaced(event);
// Sofortige Antwort mit Tracking-Informationen
return ResponseEntity.accepted()
.body(new OrderResponse(orderId, "SUBMITTED",
"/api/orders/" + orderId + "/status"));
}
@GetMapping("/{orderId}")
public ResponseEntity<OrderDetails> getOrder(@PathVariable String orderId) {
// Materialized View aus Event-Stream lesen
OrderDetails details = orderQueryService.getOrderDetails(orderId);
return ResponseEntity.ok(details);
}
}
@Service
public class OrderEventService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publishOrderPlaced(OrderPlacedEvent event) {
kafkaTemplate.send("order.placed.v1", event.getOrderId(), event);
}
}Python FastAPI Äquivalent:
from fastapi import FastAPI, HTTPException
from confluent_kafka import Producer
import json
import uuid
from datetime import datetime
app = FastAPI()
class OrderApiService:
def __init__(self):
self.producer = Producer({
'bootstrap.servers': 'localhost:9092',
'client.id': 'order-api'
})
self.order_query = OrderQueryService()
async def create_order(self, order_request: dict):
order_id = str(uuid.uuid4())
# Event für asynchrone Verarbeitung
event = {
"eventId": str(uuid.uuid4()),
"eventType": "OrderPlaced",
"timestamp": datetime.now().isoformat(),
"data": {
"orderId": order_id,
"customerId": order_request["customerId"],
"items": order_request["items"],
"totalAmount": order_request["totalAmount"]
}
}
self.producer.produce(
topic='order.placed.v1',
key=order_id,
value=json.dumps(event)
)
self.producer.flush()
return {
"orderId": order_id,
"status": "SUBMITTED",
"statusUrl": f"/api/orders/{order_id}/status"
}
@app.post("/api/orders")
async def create_order(request: dict):
service = OrderApiService()
response = await service.create_order(request)
return responseManchmal müssen Events in synchrone Aufrufe umgewandelt werden, etwa bei der Integration von Legacy-Systemen. Das Gateway übersetzt Events in REST-Calls.
Pattern: Event-triggered API Call
@Component
public class EventToRequestBridge {
private final RestTemplate restTemplate;
@KafkaListener(topics = "payment.processed.v1")
public void handlePaymentProcessed(PaymentProcessedEvent event) {
// Event-Daten für Legacy-System aufbereiten
LegacyPaymentRequest request = LegacyPaymentRequest.builder()
.orderId(event.getOrderId())
.amount(event.getAmount())
.paymentMethod(event.getPaymentMethod())
.build();
try {
// Synchroner Aufruf an Legacy-System
ResponseEntity<String> response = restTemplate.postForEntity(
"http://legacy-billing/api/payments",
request,
String.class
);
if (response.getStatusCode().is2xxSuccessful()) {
// Erfolg als Event publizieren
publishBillingCompleted(event.getOrderId());
}
} catch (Exception e) {
// Fehler als Event publizieren
publishBillingFailed(event.getOrderId(), e.getMessage());
}
}
}Umgekehrt werden Antworten aus synchronen Systemen in Events transformiert, um sie in den event-getriebenen Fluss zu integrieren.
Pattern: Response-triggered Event
class ResponseToEventMapper:
def __init__(self, event_publisher):
self.event_publisher = event_publisher
self.http_client = httpx.AsyncClient()
async def call_external_service_and_emit_event(self, order_id: str):
try:
# Synchroner Call an externes System
response = await self.http_client.get(
f"http://external-shipping/api/rates/{order_id}"
)
if response.status_code == 200:
shipping_data = response.json()
# Response in Event transformieren
event = {
"eventType": "ShippingRateCalculated",
"orderId": order_id,
"shippingCost": shipping_data["cost"],
"estimatedDelivery": shipping_data["delivery_date"]
}
await self.event_publisher.publish(
topic="shipping.rate.calculated.v1",
event=event
)
except Exception as e:
# Fehler ebenfalls als Event behandeln
error_event = {
"eventType": "ShippingRateCalculationFailed",
"orderId": order_id,
"error": str(e)
}
await self.event_publisher.publish(
topic="shipping.rate.failed.v1",
event=error_event
)Correlation IDs für Request-Response-Event-Zyklen:
public class CorrelationService {
@PostMapping("/orders/{orderId}/expedite")
public ResponseEntity<String> expediteOrder(
@PathVariable String orderId,
@RequestHeader("X-Correlation-ID") String correlationId) {
// Event mit Correlation ID publizieren
ExpeditedShippingRequestedEvent event = ExpeditedShippingRequestedEvent.builder()
.orderId(orderId)
.correlationId(correlationId)
.requestedBy("API")
.build();
eventPublisher.publish("shipping.expedite.requested.v1", event);
return ResponseEntity.accepted()
.header("X-Correlation-ID", correlationId)
.body("Expedited shipping requested - track with correlation ID");
}
}Timeout-Handling bei async-to-sync Bridges:
| Szenario | Timeout | Fallback-Strategie |
|---|---|---|
| Payment-Verifikation | 5 Sekunden | Cached result oder “pending” |
| Inventory-Check | 2 Sekunden | Assume available mit async. Korrektur |
| Shipping-Rate | 10 Sekunden | Standard rate mit async. Update |
Diese Patterns ermöglichen es, die Vorteile beider Welten zu nutzen: die Responsiveness synchroner APIs und die Robustheit asynchroner Event-Systeme. Im nächsten Abschnitt betrachten wir, wie real-time Requests direkt aus Event-Streams bedient werden können.