Die Transformation von synchronen Request-Response-Systemen zu asynchronen Event-getriebenen Architekturen erfordert Übergangsstrategien, die beide Paradigmen elegant verbinden. Diese Übergänge ermöglichen es, bestehende synchrone APIs schrittweise durch asynchrone Event-Streams zu ersetzen, ohne Breaking Changes für Clients zu verursachen.
Die Überführung synchroner REST-APIs in Event-getriebene Workflows erfordert die Auflösung des direkten Request-Response-Zyklus in separate Publish- und Subscribe-Operationen.
Ein API-Gateway kann synchrone Requests in asynchrone Event-Workflows überführen und dennoch synchrone Responses an Clients liefern:
@RestController
@RequestMapping("/api/v1/orders")
public class OrderTransformationController {
private final OrderEventPublisher eventPublisher;
private final OrderResponseAggregator responseAggregator;
private final CompletableFutureRegistry futureRegistry;
@PostMapping
public CompletableFuture<ResponseEntity<OrderResponse>> createOrder(
@RequestBody CreateOrderRequest request) {
String correlationId = UUID.randomUUID().toString();
// 1. CompletableFuture für Response registrieren
CompletableFuture<OrderResponse> responseFuture = new CompletableFuture<>();
futureRegistry.register(correlationId, responseFuture);
// 2. Request als Event publizieren
OrderCreationRequestedEvent event = OrderCreationRequestedEvent.builder()
.correlationId(correlationId)
.customerId(request.getCustomerId())
.items(request.getItems())
.timestamp(Instant.now())
.replyTo("order.creation.response.v1")
.build();
eventPublisher.publishOrderCreationRequested(event);
// 3. Asynchrone Response mit Timeout
return responseFuture
.orTimeout(30, TimeUnit.SECONDS)
.thenApply(orderResponse -> ResponseEntity.ok(orderResponse))
.exceptionally(throwable -> {
futureRegistry.unregister(correlationId);
if (throwable instanceof TimeoutException) {
return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build();
}
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
});
}
// Event-Handler für asynchrone Responses
@KafkaListener(topics = "order.creation.response.v1")
public void handleOrderCreationResponse(OrderCreationResponseEvent event) {
CompletableFuture<OrderResponse> future = futureRegistry
.getFuture(event.getCorrelationId());
if (future != null) {
if (event.isSuccess()) {
OrderResponse response = OrderResponse.builder()
.orderId(event.getOrderId())
.status(event.getStatus())
.message(event.getMessage())
.build();
future.complete(response);
} else {
future.completeExceptionally(
new OrderProcessingException(event.getErrorMessage())
);
}
futureRegistry.unregister(event.getCorrelationId());
}
}
}Komplexe Workflows erfordern die Aggregation mehrerer Events zu einer synchronen Response:
@Component
public class OrderResponseAggregator {
private final Map<String, OrderWorkflowState> activeWorkflows = new ConcurrentHashMap<>();
private final CompletableFutureRegistry futureRegistry;
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) {
updateWorkflowState(event.getCorrelationId(), workflow -> {
workflow.setOrderPlaced(true);
workflow.setOrderId(event.getOrderId());
});
}
@KafkaListener(topics = "inventory.reserved.v1")
public void handleInventoryReserved(InventoryReservedEvent event) {
updateWorkflowState(event.getCorrelationId(), workflow -> {
workflow.setInventoryReserved(true);
workflow.setReservationId(event.getReservationId());
});
}
@KafkaListener(topics = "payment.authorized.v1")
public void handlePaymentAuthorized(PaymentAuthorizedEvent event) {
updateWorkflowState(event.getCorrelationId(), workflow -> {
workflow.setPaymentAuthorized(true);
workflow.setPaymentId(event.getPaymentId());
});
}
private void updateWorkflowState(String correlationId,
Consumer<OrderWorkflowState> updater) {
OrderWorkflowState workflow = activeWorkflows.computeIfAbsent(
correlationId,
k -> new OrderWorkflowState()
);
updater.accept(workflow);
// Prüfen ob Workflow komplett
if (workflow.isComplete()) {
completeWorkflow(correlationId, workflow);
}
}
private void completeWorkflow(String correlationId, OrderWorkflowState workflow) {
CompletableFuture<OrderResponse> future = futureRegistry.getFuture(correlationId);
if (future != null) {
OrderResponse response = OrderResponse.builder()
.orderId(workflow.getOrderId())
.reservationId(workflow.getReservationId())
.paymentId(workflow.getPaymentId())
.status("CONFIRMED")
.build();
future.complete(response);
futureRegistry.unregister(correlationId);
}
activeWorkflows.remove(correlationId);
}
}
// Workflow-State Container
@Data
public class OrderWorkflowState {
private String orderId;
private String reservationId;
private String paymentId;
private boolean orderPlaced = false;
private boolean inventoryReserved = false;
private boolean paymentAuthorized = false;
public boolean isComplete() {
return orderPlaced && inventoryReserved && paymentAuthorized;
}
}import asyncio
from fastapi import FastAPI, HTTPException
from typing import Dict, Optional
import uuid
from datetime import datetime, timedelta
app = FastAPI()
class OrderEventBridge:
def __init__(self, event_publisher, timeout_seconds=30):
self.event_publisher = event_publisher
self.timeout_seconds = timeout_seconds
self.pending_requests: Dict[str, asyncio.Future] = {}
self.workflow_states: Dict[str, dict] = {}
async def create_order_sync(self, order_request):
correlation_id = str(uuid.uuid4())
# Future für Response erstellen
response_future = asyncio.get_event_loop().create_future()
self.pending_requests[correlation_id] = response_future
# Request als Event publizieren
event = {
'correlationId': correlation_id,
'customerId': order_request['customer_id'],
'items': order_request['items'],
'timestamp': datetime.utcnow().isoformat(),
'replyTo': 'order.creation.response.v1'
}
await self.event_publisher.publish_order_creation_requested(event)
try:
# Auf Response warten mit Timeout
response = await asyncio.wait_for(
response_future,
timeout=self.timeout_seconds
)
return response
except asyncio.TimeoutError:
self.pending_requests.pop(correlation_id, None)
raise HTTPException(status_code=408, detail="Request timeout")
finally:
# Cleanup
self.workflow_states.pop(correlation_id, None)
async def handle_order_placed(self, event):
correlation_id = event.get('correlationId')
if correlation_id:
self._update_workflow_state(correlation_id, {
'order_placed': True,
'order_id': event.get('orderId')
})
async def handle_inventory_reserved(self, event):
correlation_id = event.get('correlationId')
if correlation_id:
self._update_workflow_state(correlation_id, {
'inventory_reserved': True,
'reservation_id': event.get('reservationId')
})
async def handle_payment_authorized(self, event):
correlation_id = event.get('correlationId')
if correlation_id:
self._update_workflow_state(correlation_id, {
'payment_authorized': True,
'payment_id': event.get('paymentId')
})
def _update_workflow_state(self, correlation_id, updates):
state = self.workflow_states.setdefault(correlation_id, {})
state.update(updates)
# Prüfen ob Workflow komplett
if self._is_workflow_complete(state):
self._complete_workflow(correlation_id, state)
def _is_workflow_complete(self, state):
return all([
state.get('order_placed', False),
state.get('inventory_reserved', False),
state.get('payment_authorized', False)
])
def _complete_workflow(self, correlation_id, state):
future = self.pending_requests.pop(correlation_id, None)
if future and not future.done():
response = {
'order_id': state.get('order_id'),
'reservation_id': state.get('reservation_id'),
'payment_id': state.get('payment_id'),
'status': 'CONFIRMED'
}
future.set_result(response)
@app.post("/api/v1/orders")
async def create_order(order_request: dict):
try:
response = await order_bridge.create_order_sync(order_request)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))Callback-Patterns ermöglichen die Integration asynchroner Event-Verarbeitung in synchrone Programmiermodelle durch Registrierung von Callback-Funktionen für spezifische Event-Typen.
@Component
public class EventCallbackRegistry {
private final Map<String, List<EventCallback>> callbackRegistry = new ConcurrentHashMap<>();
public void registerCallback(String eventType, EventCallback callback) {
callbackRegistry.computeIfAbsent(eventType, k -> new ArrayList<>())
.add(callback);
}
public void registerCallback(String eventType, String correlationId, EventCallback callback) {
CorrelationIdCallback correlationCallback = new CorrelationIdCallback(correlationId, callback);
registerCallback(eventType, correlationCallback);
}
@KafkaListener(topics = "#{T(java.util.Arrays).asList('order.placed.v1', 'payment.processed.v1', 'order.shipped.v1')}")
public void handleEvent(Object event, @Header("kafka_receivedTopic") String topic) {
String eventType = extractEventType(topic);
List<EventCallback> callbacks = callbackRegistry.get(eventType);
if (callbacks != null) {
callbacks.forEach(callback -> {
try {
callback.onEvent(event);
} catch (Exception e) {
log.error("Callback execution failed for event type {}", eventType, e);
}
});
// Einmalige Callbacks entfernen
callbacks.removeIf(callback -> callback instanceof OneTimeCallback);
}
}
}
// Callback-Interfaces
@FunctionalInterface
public interface EventCallback {
void onEvent(Object event);
}
// Korrelations-spezifischer Callback
public class CorrelationIdCallback implements EventCallback {
private final String expectedCorrelationId;
private final EventCallback delegate;
public CorrelationIdCallback(String correlationId, EventCallback delegate) {
this.expectedCorrelationId = correlationId;
this.delegate = delegate;
}
@Override
public void onEvent(Object event) {
String eventCorrelationId = extractCorrelationId(event);
if (expectedCorrelationId.equals(eventCorrelationId)) {
delegate.onEvent(event);
}
}
}
// Usage-Beispiel
@Service
public class OrderProcessingService {
private final EventCallbackRegistry callbackRegistry;
public CompletableFuture<String> processOrderWithCallback(CreateOrderRequest request) {
String correlationId = UUID.randomUUID().toString();
CompletableFuture<String> future = new CompletableFuture<>();
// Callback für OrderPlaced Event registrieren
callbackRegistry.registerCallback("OrderPlaced", correlationId, event -> {
OrderPlacedEvent orderEvent = (OrderPlacedEvent) event;
future.complete(orderEvent.getOrderId());
});
// Order-Erstellung triggern
triggerOrderCreation(request, correlationId);
return future;
}
}@Component
public class FluentEventProcessor {
private final EventCallbackRegistry callbackRegistry;
public EventChainBuilder on(String eventType) {
return new EventChainBuilder(eventType, callbackRegistry);
}
// Fluent Builder für Event-Callback-Ketten
public static class EventChainBuilder {
private final String eventType;
private final EventCallbackRegistry registry;
private String correlationId;
private Duration timeout = Duration.ofSeconds(30);
public EventChainBuilder correlationId(String correlationId) {
this.correlationId = correlationId;
return this;
}
public EventChainBuilder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}
public <T> CompletableFuture<T> then(Function<Object, T> transformer) {
CompletableFuture<T> future = new CompletableFuture<>();
EventCallback callback = event -> {
try {
T result = transformer.apply(event);
future.complete(result);
} catch (Exception e) {
future.completeExceptionally(e);
}
};
if (correlationId != null) {
registry.registerCallback(eventType, correlationId, callback);
} else {
registry.registerCallback(eventType, callback);
}
// Timeout handling
CompletableFuture.delayedExecutor(timeout.toMillis(), TimeUnit.MILLISECONDS)
.execute(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException());
}
});
return future;
}
}
}
// Usage-Beispiel mit Fluent API
@Service
public class FluentOrderService {
private final FluentEventProcessor eventProcessor;
public CompletableFuture<OrderResponse> createOrderFluent(CreateOrderRequest request) {
String correlationId = UUID.randomUUID().toString();
return eventProcessor
.on("OrderPlaced")
.correlationId(correlationId)
.timeout(Duration.ofSeconds(30))
.then(event -> {
OrderPlacedEvent orderEvent = (OrderPlacedEvent) event;
return OrderResponse.builder()
.orderId(orderEvent.getOrderId())
.status("PLACED")
.build();
});
}
}Die Integration von Futures und Promises ermöglicht die nahtlose Verknüpfung asynchroner Event-Verarbeitung mit modernen reaktiven Programmiermodellen.
@Service
public class ReactiveOrderService {
private final OrderEventPublisher eventPublisher;
private final Map<String, CompletableFuture<OrderEvent>> pendingOrders = new ConcurrentHashMap<>();
public CompletableFuture<OrderPlacedEvent> createOrderAsync(CreateOrderRequest request) {
String correlationId = UUID.randomUUID().toString();
CompletableFuture<OrderEvent> orderFuture = new CompletableFuture<>();
pendingOrders.put(correlationId, orderFuture);
// Event publizieren
OrderCreationRequestedEvent event = OrderCreationRequestedEvent.builder()
.correlationId(correlationId)
.customerId(request.getCustomerId())
.items(request.getItems())
.timestamp(Instant.now())
.build();
eventPublisher.publishOrderCreationRequested(event);
// Type-safe Transformation
return orderFuture
.thenApply(orderEvent -> (OrderPlacedEvent) orderEvent)
.orTimeout(30, TimeUnit.SECONDS)
.whenComplete((result, throwable) -> {
pendingOrders.remove(correlationId);
if (throwable != null) {
log.error("Order creation failed for correlation {}", correlationId, throwable);
}
});
}
// Chaining von Event-abhängigen Operationen
public CompletableFuture<ShippingInfo> createOrderAndShip(CreateOrderRequest request) {
return createOrderAsync(request)
.thenCompose(this::reserveInventory)
.thenCompose(this::processPayment)
.thenCompose(this::initiateShipping);
}
private CompletableFuture<InventoryReservedEvent> reserveInventory(OrderPlacedEvent orderEvent) {
String correlationId = UUID.randomUUID().toString();
CompletableFuture<OrderEvent> future = new CompletableFuture<>();
pendingOrders.put(correlationId, future);
InventoryReservationRequestedEvent event = InventoryReservationRequestedEvent.builder()
.correlationId(correlationId)
.orderId(orderEvent.getOrderId())
.items(orderEvent.getItems())
.build();
eventPublisher.publishInventoryReservationRequested(event);
return future.thenApply(e -> (InventoryReservedEvent) e);
}
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(OrderPlacedEvent event) {
completeFuture(event.getCorrelationId(), event);
}
@KafkaListener(topics = "inventory.reserved.v1")
public void handleInventoryReserved(InventoryReservedEvent event) {
completeFuture(event.getCorrelationId(), event);
}
private void completeFuture(String correlationId, OrderEvent event) {
CompletableFuture<OrderEvent> future = pendingOrders.remove(correlationId);
if (future != null && !future.isDone()) {
future.complete(event);
}
}
}import asyncio
from typing import Dict, Callable, Optional, Any
import uuid
class EventPromiseManager:
def __init__(self, event_publisher):
self.event_publisher = event_publisher
self.pending_promises: Dict[str, asyncio.Future] = {}
self.event_handlers: Dict[str, Callable] = {}
async def create_order_async(self, order_request: dict) -> dict:
correlation_id = str(uuid.uuid4())
# Future für Response erstellen
order_future = asyncio.get_event_loop().create_future()
self.pending_promises[correlation_id] = order_future
# Event publizieren
event = {
'correlationId': correlation_id,
'customerId': order_request['customer_id'],
'items': order_request['items'],
'timestamp': datetime.utcnow().isoformat()
}
await self.event_publisher.publish_order_creation_requested(event)
try:
# Auf Completion warten
result = await asyncio.wait_for(order_future, timeout=30.0)
return result
finally:
self.pending_promises.pop(correlation_id, None)
async def create_order_and_ship(self, order_request: dict) -> dict:
# Event-Chain mit asyncio
order_event = await self.create_order_async(order_request)
inventory_event = await self._reserve_inventory_async(order_event)
payment_event = await self._process_payment_async(inventory_event)
shipping_event = await self._initiate_shipping_async(payment_event)
return shipping_event
async def _reserve_inventory_async(self, order_event: dict) -> dict:
correlation_id = str(uuid.uuid4())
inventory_future = asyncio.get_event_loop().create_future()
self.pending_promises[correlation_id] = inventory_future
event = {
'correlationId': correlation_id,
'orderId': order_event['orderId'],
'items': order_event['items']
}
await self.event_publisher.publish_inventory_reservation_requested(event)
return await asyncio.wait_for(inventory_future, timeout=30.0)
def handle_order_placed(self, event: dict):
correlation_id = event.get('correlationId')
if correlation_id and correlation_id in self.pending_promises:
future = self.pending_promises.pop(correlation_id)
if not future.done():
future.set_result(event)
def handle_inventory_reserved(self, event: dict):
correlation_id = event.get('correlationId')
if correlation_id and correlation_id in self.pending_promises:
future = self.pending_promises.pop(correlation_id)
if not future.done():
future.set_result(event)
# Reactive Stream-ähnliche API
class EventStream:
def __init__(self, promise_manager):
self.promise_manager = promise_manager
self.pipeline = []
def map(self, transform_func):
self.pipeline.append(('map', transform_func))
return self
def filter(self, predicate_func):
self.pipeline.append(('filter', predicate_func))
return self
def flat_map(self, async_transform_func):
self.pipeline.append(('flat_map', async_transform_func))
return self
async def execute(self, initial_data):
current_data = initial_data
for operation, func in self.pipeline:
if operation == 'map':
current_data = func(current_data)
elif operation == 'filter':
if not func(current_data):
raise ValueError("Data filtered out")
elif operation == 'flat_map':
current_data = await func(current_data)
return current_data
# Usage
async def process_order_stream(order_request):
stream = EventStream(promise_manager)
result = await (stream
.map(lambda req: {**req, 'processed_at': datetime.utcnow().isoformat()})
.filter(lambda req: req['customer_id'] is not None)
.flat_map(lambda req: promise_manager.create_order_async(req))
.flat_map(lambda order: promise_manager._reserve_inventory_async(order))
.execute(order_request)
)
return result| Szenario | Ansatz | Vorteile | Nachteile |
|---|---|---|---|
| Legacy API Migration | API-Gateway Transformation | Non-breaking Changes | Zusätzliche Latenz |
| Workflow Coordination | Response Aggregation | Komplexe Business-Logik | State Management |
| Event-driven Integration | Callback Patterns | Flexible Event-Handling | Callback-Hell-Risiko |
| Reactive Programming | Future/Promise Integration | Compositional, Type-safe | Lernkurve |
Die Wahl der Übergangsstrategie hängt von der Komplexität der bestehenden APIs, Client-Anforderungen und der gewünschten Migration-Geschwindigkeit ab. Oft werden mehrere Ansätze kombiniert, um verschiedene Integration-Szenarien abzudecken.