42 Synchron-asynchrone Übergänge

Die Transformation von synchronen Request-Response-Systemen zu asynchronen Event-getriebenen Architekturen erfordert Übergangsstrategien, die beide Paradigmen elegant verbinden. Diese Übergänge ermöglichen es, bestehende synchrone APIs schrittweise durch asynchrone Event-Streams zu ersetzen, ohne Breaking Changes für Clients zu verursachen.

42.0.1 Request-Response to Event Transformation

Die Überführung synchroner REST-APIs in Event-getriebene Workflows erfordert die Auflösung des direkten Request-Response-Zyklus in separate Publish- und Subscribe-Operationen.

42.0.1.1 API-Gateway als Transformation-Layer

Ein API-Gateway kann synchrone Requests in asynchrone Event-Workflows überführen und dennoch synchrone Responses an Clients liefern:

@RestController
@RequestMapping("/api/v1/orders")
public class OrderTransformationController {
    
    private final OrderEventPublisher eventPublisher;
    private final OrderResponseAggregator responseAggregator;
    private final CompletableFutureRegistry futureRegistry;
    
    @PostMapping
    public CompletableFuture<ResponseEntity<OrderResponse>> createOrder(
            @RequestBody CreateOrderRequest request) {
        
        String correlationId = UUID.randomUUID().toString();
        
        // 1. CompletableFuture für Response registrieren
        CompletableFuture<OrderResponse> responseFuture = new CompletableFuture<>();
        futureRegistry.register(correlationId, responseFuture);
        
        // 2. Request als Event publizieren
        OrderCreationRequestedEvent event = OrderCreationRequestedEvent.builder()
            .correlationId(correlationId)
            .customerId(request.getCustomerId())
            .items(request.getItems())
            .timestamp(Instant.now())
            .replyTo("order.creation.response.v1")
            .build();
        
        eventPublisher.publishOrderCreationRequested(event);
        
        // 3. Asynchrone Response mit Timeout
        return responseFuture
            .orTimeout(30, TimeUnit.SECONDS)
            .thenApply(orderResponse -> ResponseEntity.ok(orderResponse))
            .exceptionally(throwable -> {
                futureRegistry.unregister(correlationId);
                if (throwable instanceof TimeoutException) {
                    return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build();
                }
                return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
            });
    }
    
    // Event-Handler für asynchrone Responses
    @KafkaListener(topics = "order.creation.response.v1")
    public void handleOrderCreationResponse(OrderCreationResponseEvent event) {
        CompletableFuture<OrderResponse> future = futureRegistry
            .getFuture(event.getCorrelationId());
        
        if (future != null) {
            if (event.isSuccess()) {
                OrderResponse response = OrderResponse.builder()
                    .orderId(event.getOrderId())
                    .status(event.getStatus())
                    .message(event.getMessage())
                    .build();
                
                future.complete(response);
            } else {
                future.completeExceptionally(
                    new OrderProcessingException(event.getErrorMessage())
                );
            }
            
            futureRegistry.unregister(event.getCorrelationId());
        }
    }
}

42.0.1.2 Event-to-Response Aggregation

Komplexe Workflows erfordern die Aggregation mehrerer Events zu einer synchronen Response:

@Component
public class OrderResponseAggregator {
    
    private final Map<String, OrderWorkflowState> activeWorkflows = new ConcurrentHashMap<>();
    private final CompletableFutureRegistry futureRegistry;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        updateWorkflowState(event.getCorrelationId(), workflow -> {
            workflow.setOrderPlaced(true);
            workflow.setOrderId(event.getOrderId());
        });
    }
    
    @KafkaListener(topics = "inventory.reserved.v1")
    public void handleInventoryReserved(InventoryReservedEvent event) {
        updateWorkflowState(event.getCorrelationId(), workflow -> {
            workflow.setInventoryReserved(true);
            workflow.setReservationId(event.getReservationId());
        });
    }
    
    @KafkaListener(topics = "payment.authorized.v1")
    public void handlePaymentAuthorized(PaymentAuthorizedEvent event) {
        updateWorkflowState(event.getCorrelationId(), workflow -> {
            workflow.setPaymentAuthorized(true);
            workflow.setPaymentId(event.getPaymentId());
        });
    }
    
    private void updateWorkflowState(String correlationId, 
                                   Consumer<OrderWorkflowState> updater) {
        OrderWorkflowState workflow = activeWorkflows.computeIfAbsent(
            correlationId, 
            k -> new OrderWorkflowState()
        );
        
        updater.accept(workflow);
        
        // Prüfen ob Workflow komplett
        if (workflow.isComplete()) {
            completeWorkflow(correlationId, workflow);
        }
    }
    
    private void completeWorkflow(String correlationId, OrderWorkflowState workflow) {
        CompletableFuture<OrderResponse> future = futureRegistry.getFuture(correlationId);
        
        if (future != null) {
            OrderResponse response = OrderResponse.builder()
                .orderId(workflow.getOrderId())
                .reservationId(workflow.getReservationId())
                .paymentId(workflow.getPaymentId())
                .status("CONFIRMED")
                .build();
            
            future.complete(response);
            futureRegistry.unregister(correlationId);
        }
        
        activeWorkflows.remove(correlationId);
    }
}

// Workflow-State Container
@Data
public class OrderWorkflowState {
    private String orderId;
    private String reservationId;
    private String paymentId;
    private boolean orderPlaced = false;
    private boolean inventoryReserved = false;
    private boolean paymentAuthorized = false;
    
    public boolean isComplete() {
        return orderPlaced && inventoryReserved && paymentAuthorized;
    }
}

42.0.1.3 Python FastAPI Event-Bridge

import asyncio
from fastapi import FastAPI, HTTPException
from typing import Dict, Optional
import uuid
from datetime import datetime, timedelta

app = FastAPI()

class OrderEventBridge:
    def __init__(self, event_publisher, timeout_seconds=30):
        self.event_publisher = event_publisher
        self.timeout_seconds = timeout_seconds
        self.pending_requests: Dict[str, asyncio.Future] = {}
        self.workflow_states: Dict[str, dict] = {}
    
    async def create_order_sync(self, order_request):
        correlation_id = str(uuid.uuid4())
        
        # Future für Response erstellen
        response_future = asyncio.get_event_loop().create_future()
        self.pending_requests[correlation_id] = response_future
        
        # Request als Event publizieren
        event = {
            'correlationId': correlation_id,
            'customerId': order_request['customer_id'],
            'items': order_request['items'],
            'timestamp': datetime.utcnow().isoformat(),
            'replyTo': 'order.creation.response.v1'
        }
        
        await self.event_publisher.publish_order_creation_requested(event)
        
        try:
            # Auf Response warten mit Timeout
            response = await asyncio.wait_for(
                response_future, 
                timeout=self.timeout_seconds
            )
            return response
            
        except asyncio.TimeoutError:
            self.pending_requests.pop(correlation_id, None)
            raise HTTPException(status_code=408, detail="Request timeout")
        
        finally:
            # Cleanup
            self.workflow_states.pop(correlation_id, None)
    
    async def handle_order_placed(self, event):
        correlation_id = event.get('correlationId')
        if correlation_id:
            self._update_workflow_state(correlation_id, {
                'order_placed': True,
                'order_id': event.get('orderId')
            })
    
    async def handle_inventory_reserved(self, event):
        correlation_id = event.get('correlationId')
        if correlation_id:
            self._update_workflow_state(correlation_id, {
                'inventory_reserved': True,
                'reservation_id': event.get('reservationId')
            })
    
    async def handle_payment_authorized(self, event):
        correlation_id = event.get('correlationId')
        if correlation_id:
            self._update_workflow_state(correlation_id, {
                'payment_authorized': True,
                'payment_id': event.get('paymentId')
            })
    
    def _update_workflow_state(self, correlation_id, updates):
        state = self.workflow_states.setdefault(correlation_id, {})
        state.update(updates)
        
        # Prüfen ob Workflow komplett
        if self._is_workflow_complete(state):
            self._complete_workflow(correlation_id, state)
    
    def _is_workflow_complete(self, state):
        return all([
            state.get('order_placed', False),
            state.get('inventory_reserved', False),
            state.get('payment_authorized', False)
        ])
    
    def _complete_workflow(self, correlation_id, state):
        future = self.pending_requests.pop(correlation_id, None)
        if future and not future.done():
            response = {
                'order_id': state.get('order_id'),
                'reservation_id': state.get('reservation_id'),
                'payment_id': state.get('payment_id'),
                'status': 'CONFIRMED'
            }
            future.set_result(response)

@app.post("/api/v1/orders")
async def create_order(order_request: dict):
    try:
        response = await order_bridge.create_order_sync(order_request)
        return response
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

42.0.2 Callback Patterns

Callback-Patterns ermöglichen die Integration asynchroner Event-Verarbeitung in synchrone Programmiermodelle durch Registrierung von Callback-Funktionen für spezifische Event-Typen.

42.0.2.1 Event-Callback Registry

@Component
public class EventCallbackRegistry {
    
    private final Map<String, List<EventCallback>> callbackRegistry = new ConcurrentHashMap<>();
    
    public void registerCallback(String eventType, EventCallback callback) {
        callbackRegistry.computeIfAbsent(eventType, k -> new ArrayList<>())
                       .add(callback);
    }
    
    public void registerCallback(String eventType, String correlationId, EventCallback callback) {
        CorrelationIdCallback correlationCallback = new CorrelationIdCallback(correlationId, callback);
        registerCallback(eventType, correlationCallback);
    }
    
    @KafkaListener(topics = "#{T(java.util.Arrays).asList('order.placed.v1', 'payment.processed.v1', 'order.shipped.v1')}")
    public void handleEvent(Object event, @Header("kafka_receivedTopic") String topic) {
        String eventType = extractEventType(topic);
        
        List<EventCallback> callbacks = callbackRegistry.get(eventType);
        if (callbacks != null) {
            callbacks.forEach(callback -> {
                try {
                    callback.onEvent(event);
                } catch (Exception e) {
                    log.error("Callback execution failed for event type {}", eventType, e);
                }
            });
            
            // Einmalige Callbacks entfernen
            callbacks.removeIf(callback -> callback instanceof OneTimeCallback);
        }
    }
}

// Callback-Interfaces
@FunctionalInterface
public interface EventCallback {
    void onEvent(Object event);
}

// Korrelations-spezifischer Callback
public class CorrelationIdCallback implements EventCallback {
    private final String expectedCorrelationId;
    private final EventCallback delegate;
    
    public CorrelationIdCallback(String correlationId, EventCallback delegate) {
        this.expectedCorrelationId = correlationId;
        this.delegate = delegate;
    }
    
    @Override
    public void onEvent(Object event) {
        String eventCorrelationId = extractCorrelationId(event);
        if (expectedCorrelationId.equals(eventCorrelationId)) {
            delegate.onEvent(event);
        }
    }
}

// Usage-Beispiel
@Service
public class OrderProcessingService {
    
    private final EventCallbackRegistry callbackRegistry;
    
    public CompletableFuture<String> processOrderWithCallback(CreateOrderRequest request) {
        String correlationId = UUID.randomUUID().toString();
        CompletableFuture<String> future = new CompletableFuture<>();
        
        // Callback für OrderPlaced Event registrieren
        callbackRegistry.registerCallback("OrderPlaced", correlationId, event -> {
            OrderPlacedEvent orderEvent = (OrderPlacedEvent) event;
            future.complete(orderEvent.getOrderId());
        });
        
        // Order-Erstellung triggern
        triggerOrderCreation(request, correlationId);
        
        return future;
    }
}

42.0.2.2 Fluent Callback API

@Component
public class FluentEventProcessor {
    
    private final EventCallbackRegistry callbackRegistry;
    
    public EventChainBuilder on(String eventType) {
        return new EventChainBuilder(eventType, callbackRegistry);
    }
    
    // Fluent Builder für Event-Callback-Ketten
    public static class EventChainBuilder {
        private final String eventType;
        private final EventCallbackRegistry registry;
        private String correlationId;
        private Duration timeout = Duration.ofSeconds(30);
        
        public EventChainBuilder correlationId(String correlationId) {
            this.correlationId = correlationId;
            return this;
        }
        
        public EventChainBuilder timeout(Duration timeout) {
            this.timeout = timeout;
            return this;
        }
        
        public <T> CompletableFuture<T> then(Function<Object, T> transformer) {
            CompletableFuture<T> future = new CompletableFuture<>();
            
            EventCallback callback = event -> {
                try {
                    T result = transformer.apply(event);
                    future.complete(result);
                } catch (Exception e) {
                    future.completeExceptionally(e);
                }
            };
            
            if (correlationId != null) {
                registry.registerCallback(eventType, correlationId, callback);
            } else {
                registry.registerCallback(eventType, callback);
            }
            
            // Timeout handling
            CompletableFuture.delayedExecutor(timeout.toMillis(), TimeUnit.MILLISECONDS)
                           .execute(() -> {
                               if (!future.isDone()) {
                                   future.completeExceptionally(new TimeoutException());
                               }
                           });
            
            return future;
        }
    }
}

// Usage-Beispiel mit Fluent API
@Service
public class FluentOrderService {
    
    private final FluentEventProcessor eventProcessor;
    
    public CompletableFuture<OrderResponse> createOrderFluent(CreateOrderRequest request) {
        String correlationId = UUID.randomUUID().toString();
        
        return eventProcessor
            .on("OrderPlaced")
            .correlationId(correlationId)
            .timeout(Duration.ofSeconds(30))
            .then(event -> {
                OrderPlacedEvent orderEvent = (OrderPlacedEvent) event;
                return OrderResponse.builder()
                    .orderId(orderEvent.getOrderId())
                    .status("PLACED")
                    .build();
            });
    }
}

42.0.3 Future/Promise Integration

Die Integration von Futures und Promises ermöglicht die nahtlose Verknüpfung asynchroner Event-Verarbeitung mit modernen reaktiven Programmiermodellen.

42.0.3.1 CompletableFuture Event-Integration

@Service
public class ReactiveOrderService {
    
    private final OrderEventPublisher eventPublisher;
    private final Map<String, CompletableFuture<OrderEvent>> pendingOrders = new ConcurrentHashMap<>();
    
    public CompletableFuture<OrderPlacedEvent> createOrderAsync(CreateOrderRequest request) {
        String correlationId = UUID.randomUUID().toString();
        
        CompletableFuture<OrderEvent> orderFuture = new CompletableFuture<>();
        pendingOrders.put(correlationId, orderFuture);
        
        // Event publizieren
        OrderCreationRequestedEvent event = OrderCreationRequestedEvent.builder()
            .correlationId(correlationId)
            .customerId(request.getCustomerId())
            .items(request.getItems())
            .timestamp(Instant.now())
            .build();
        
        eventPublisher.publishOrderCreationRequested(event);
        
        // Type-safe Transformation
        return orderFuture
            .thenApply(orderEvent -> (OrderPlacedEvent) orderEvent)
            .orTimeout(30, TimeUnit.SECONDS)
            .whenComplete((result, throwable) -> {
                pendingOrders.remove(correlationId);
                if (throwable != null) {
                    log.error("Order creation failed for correlation {}", correlationId, throwable);
                }
            });
    }
    
    // Chaining von Event-abhängigen Operationen
    public CompletableFuture<ShippingInfo> createOrderAndShip(CreateOrderRequest request) {
        return createOrderAsync(request)
            .thenCompose(this::reserveInventory)
            .thenCompose(this::processPayment)
            .thenCompose(this::initiateShipping);
    }
    
    private CompletableFuture<InventoryReservedEvent> reserveInventory(OrderPlacedEvent orderEvent) {
        String correlationId = UUID.randomUUID().toString();
        CompletableFuture<OrderEvent> future = new CompletableFuture<>();
        pendingOrders.put(correlationId, future);
        
        InventoryReservationRequestedEvent event = InventoryReservationRequestedEvent.builder()
            .correlationId(correlationId)
            .orderId(orderEvent.getOrderId())
            .items(orderEvent.getItems())
            .build();
        
        eventPublisher.publishInventoryReservationRequested(event);
        
        return future.thenApply(e -> (InventoryReservedEvent) e);
    }
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        completeFuture(event.getCorrelationId(), event);
    }
    
    @KafkaListener(topics = "inventory.reserved.v1")
    public void handleInventoryReserved(InventoryReservedEvent event) {
        completeFuture(event.getCorrelationId(), event);
    }
    
    private void completeFuture(String correlationId, OrderEvent event) {
        CompletableFuture<OrderEvent> future = pendingOrders.remove(correlationId);
        if (future != null && !future.isDone()) {
            future.complete(event);
        }
    }
}

42.0.3.2 Python AsyncIO Promise-Integration

import asyncio
from typing import Dict, Callable, Optional, Any
import uuid

class EventPromiseManager:
    def __init__(self, event_publisher):
        self.event_publisher = event_publisher
        self.pending_promises: Dict[str, asyncio.Future] = {}
        self.event_handlers: Dict[str, Callable] = {}
    
    async def create_order_async(self, order_request: dict) -> dict:
        correlation_id = str(uuid.uuid4())
        
        # Future für Response erstellen
        order_future = asyncio.get_event_loop().create_future()
        self.pending_promises[correlation_id] = order_future
        
        # Event publizieren
        event = {
            'correlationId': correlation_id,
            'customerId': order_request['customer_id'],
            'items': order_request['items'],
            'timestamp': datetime.utcnow().isoformat()
        }
        
        await self.event_publisher.publish_order_creation_requested(event)
        
        try:
            # Auf Completion warten
            result = await asyncio.wait_for(order_future, timeout=30.0)
            return result
        finally:
            self.pending_promises.pop(correlation_id, None)
    
    async def create_order_and_ship(self, order_request: dict) -> dict:
        # Event-Chain mit asyncio
        order_event = await self.create_order_async(order_request)
        inventory_event = await self._reserve_inventory_async(order_event)
        payment_event = await self._process_payment_async(inventory_event)
        shipping_event = await self._initiate_shipping_async(payment_event)
        
        return shipping_event
    
    async def _reserve_inventory_async(self, order_event: dict) -> dict:
        correlation_id = str(uuid.uuid4())
        
        inventory_future = asyncio.get_event_loop().create_future()
        self.pending_promises[correlation_id] = inventory_future
        
        event = {
            'correlationId': correlation_id,
            'orderId': order_event['orderId'],
            'items': order_event['items']
        }
        
        await self.event_publisher.publish_inventory_reservation_requested(event)
        
        return await asyncio.wait_for(inventory_future, timeout=30.0)
    
    def handle_order_placed(self, event: dict):
        correlation_id = event.get('correlationId')
        if correlation_id and correlation_id in self.pending_promises:
            future = self.pending_promises.pop(correlation_id)
            if not future.done():
                future.set_result(event)
    
    def handle_inventory_reserved(self, event: dict):
        correlation_id = event.get('correlationId')
        if correlation_id and correlation_id in self.pending_promises:
            future = self.pending_promises.pop(correlation_id)
            if not future.done():
                future.set_result(event)

# Reactive Stream-ähnliche API
class EventStream:
    def __init__(self, promise_manager):
        self.promise_manager = promise_manager
        self.pipeline = []
    
    def map(self, transform_func):
        self.pipeline.append(('map', transform_func))
        return self
    
    def filter(self, predicate_func):
        self.pipeline.append(('filter', predicate_func))
        return self
    
    def flat_map(self, async_transform_func):
        self.pipeline.append(('flat_map', async_transform_func))
        return self
    
    async def execute(self, initial_data):
        current_data = initial_data
        
        for operation, func in self.pipeline:
            if operation == 'map':
                current_data = func(current_data)
            elif operation == 'filter':
                if not func(current_data):
                    raise ValueError("Data filtered out")
            elif operation == 'flat_map':
                current_data = await func(current_data)
        
        return current_data

# Usage
async def process_order_stream(order_request):
    stream = EventStream(promise_manager)
    
    result = await (stream
        .map(lambda req: {**req, 'processed_at': datetime.utcnow().isoformat()})
        .filter(lambda req: req['customer_id'] is not None)
        .flat_map(lambda req: promise_manager.create_order_async(req))
        .flat_map(lambda order: promise_manager._reserve_inventory_async(order))
        .execute(order_request)
    )
    
    return result

42.0.4 Übergangsstrategien-Matrix

Szenario Ansatz Vorteile Nachteile
Legacy API Migration API-Gateway Transformation Non-breaking Changes Zusätzliche Latenz
Workflow Coordination Response Aggregation Komplexe Business-Logik State Management
Event-driven Integration Callback Patterns Flexible Event-Handling Callback-Hell-Risiko
Reactive Programming Future/Promise Integration Compositional, Type-safe Lernkurve

Die Wahl der Übergangsstrategie hängt von der Komplexität der bestehenden APIs, Client-Anforderungen und der gewünschten Migration-Geschwindigkeit ab. Oft werden mehrere Ansätze kombiniert, um verschiedene Integration-Szenarien abzudecken.