67 Deployment-Strategien für EDA

Event-Driven Architecture erfordert spezielle Deployment-Strategien, da Services über asynchrone Events gekoppelt sind und Schema-Kompatibilität über Service-Grenzen hinweg gewährleistet werden muss. Traditional Blue-Green oder Rolling Deployments müssen angepasst werden, um Event-Schema-Evolution, Consumer-Producer-Kompatibilität und kontinuierlichen Event-Flow zu berücksichtigen.

67.1 Blue-Green Deployment

67.1.1 Event-Aware Blue-Green Strategy

Bei klassischen Blue-Green Deployments wird der gesamte Traffic auf einmal umgeschaltet. In EDA müssen Sie Event-Streams und Consumer-Groups berücksichtigen:

Dual-Stack Event Processing:

@Configuration
public class BlueGreenEventConfiguration {
    
    @Value("${deployment.environment:blue}")
    private String deploymentEnvironment;
    
    @Bean
    @Primary
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return createKafkaTemplate(getTopicSuffix());
    }
    
    @Bean("blueKafkaTemplate")
    public KafkaTemplate<String, Object> blueKafkaTemplate() {
        return createKafkaTemplate("-blue");
    }
    
    @Bean("greenKafkaTemplate")  
    public KafkaTemplate<String, Object> greenKafkaTemplate() {
        return createKafkaTemplate("-green");
    }
    
    private String getTopicSuffix() {
        return switch (deploymentEnvironment) {
            case "blue" -> "-blue";
            case "green" -> "-green";
            case "canary" -> "-canary";
            default -> "";
        };
    }
    
    private KafkaTemplate<String, Object> createKafkaTemplate(String topicSuffix) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // Topic-Interceptor für automatisches Suffix
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
                  List.of(TopicSuffixInterceptor.class.getName()));
        props.put("topic.suffix", topicSuffix);
        
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }
}

public class TopicSuffixInterceptor implements ProducerInterceptor<String, Object> {
    
    private String topicSuffix;
    
    @Override
    public void configure(Map<String, ?> configs) {
        this.topicSuffix = (String) configs.get("topic.suffix");
    }
    
    @Override
    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
        String newTopic = record.topic() + (topicSuffix != null ? topicSuffix : "");
        return new ProducerRecord<>(
                newTopic,
                record.partition(),
                record.timestamp(),
                record.key(),
                record.value(),
                record.headers()
        );
    }
}

Blue-Green Deployment Manager:

import asyncio
from typing import Dict, List
from datetime import datetime, timedelta

class BlueGreenDeploymentManager:
    def __init__(self, kafka_admin, prometheus_client):
        self.kafka_admin = kafka_admin
        self.prometheus = prometheus_client
        self.current_environment = "blue"
        self.switch_in_progress = False
        
    async def execute_blue_green_deployment(self, new_version: str) -> bool:
        """Execute blue-green deployment for EDA services"""
        
        target_env = "green" if self.current_environment == "blue" else "blue"
        
        try:
            self.switch_in_progress = True
            
            # Phase 1: Deploy to target environment
            await self.deploy_to_environment(target_env, new_version)
            
            # Phase 2: Validate target environment
            validation_result = await self.validate_environment(target_env)
            if not validation_result.success:
                raise Exception(f"Environment validation failed: {validation_result.errors}")
            
            # Phase 3: Gradual traffic switch for events
            await self.gradual_event_switch(target_env)
            
            # Phase 4: Monitor switch progress
            monitoring_result = await self.monitor_switch_progress(target_env)
            if not monitoring_result.success:
                await self.rollback_switch()
                raise Exception("Switch monitoring failed, rolled back")
            
            # Phase 5: Complete switch
            await self.complete_environment_switch(target_env)
            
            self.current_environment = target_env
            return True
            
        except Exception as e:
            await self.handle_deployment_failure(e)
            return False
        finally:
            self.switch_in_progress = False
    
    async def gradual_event_switch(self, target_env: str):
        """Gradually switch event processing to target environment"""
        
        # Get all consumer groups
        consumer_groups = await self.get_consumer_groups()
        
        # Switch consumer groups one by one
        for group in consumer_groups:
            await self.switch_consumer_group(group, target_env)
            
            # Wait for lag to stabilize
            await self.wait_for_lag_stabilization(group, target_env)
            
            # Validate processing health
            health_check = await self.check_processing_health(group, target_env)
            if not health_check.success:
                raise Exception(f"Health check failed for group {group}")
    
    async def switch_consumer_group(self, group_id: str, target_env: str):
        """Switch a specific consumer group to target environment"""
        
        # Update consumer group configuration
        await self.update_consumer_group_config(group_id, {
            'environment': target_env,
            'topics': self.get_target_topics(group_id, target_env)
        })
        
        # Restart consumer instances
        await self.restart_consumer_instances(group_id)
        
    async def wait_for_lag_stabilization(self, group_id: str, target_env: str, 
                                        timeout: timedelta = timedelta(minutes=5)):
        """Wait for consumer lag to stabilize after switch"""
        
        start_time = datetime.now()
        
        while datetime.now() - start_time < timeout:
            lag_metrics = await self.get_consumer_lag(group_id)
            
            # Check if lag is within acceptable bounds
            if all(lag < 1000 for lag in lag_metrics.values()):
                return True
                
            await asyncio.sleep(10)  # Check every 10 seconds
        
        raise Exception(f"Consumer lag did not stabilize within {timeout}")
    
    async def monitor_switch_progress(self, target_env: str) -> 'MonitoringResult':
        """Monitor the progress of environment switch"""
        
        metrics_to_check = [
            'event_processing_success_rate',
            'event_processing_latency_p95',
            'consumer_lag_total',
            'error_rate'
        ]
        
        monitoring_window = timedelta(minutes=10)
        start_time = datetime.now()
        
        baseline_metrics = await self.get_baseline_metrics(self.current_environment)
        
        while datetime.now() - start_time < monitoring_window:
            current_metrics = await self.get_current_metrics(target_env)
            
            # Compare against baseline
            comparison = self.compare_metrics(baseline_metrics, current_metrics)
            
            if comparison.degradation_detected:
                return MonitoringResult(
                    success=False, 
                    errors=[f"Performance degradation detected: {comparison.issues}"]
                )
            
            await asyncio.sleep(30)  # Check every 30 seconds
        
        return MonitoringResult(success=True, errors=[])

67.1.2 Event-Schema Compatibility Checks

@Component
public class SchemaCompatibilityChecker {
    
    private final SchemaRegistryClient schemaRegistry;
    
    public CompatibilityResult checkDeploymentCompatibility(
            String serviceName, String newVersion) {
        
        List<String> topics = getServiceTopics(serviceName);
        List<CompatibilityIssue> issues = new ArrayList<>();
        
        for (String topic : topics) {
            // Check producer schema compatibility
            CompatibilityIssue producerIssue = checkProducerCompatibility(topic, newVersion);
            if (producerIssue != null) {
                issues.add(producerIssue);
            }
            
            // Check consumer schema compatibility  
            CompatibilityIssue consumerIssue = checkConsumerCompatibility(topic, newVersion);
            if (consumerIssue != null) {
                issues.add(consumerIssue);
            }
        }
        
        return new CompatibilityResult(issues.isEmpty(), issues);
    }
    
    private CompatibilityIssue checkProducerCompatibility(String topic, String newVersion) {
        try {
            Schema latestSchema = schemaRegistry.getLatestSchemaMetadata(topic + "-value").getSchema();
            Schema newSchema = getSchemaForVersion(topic, newVersion);
            
            // Check backward compatibility (can new producers write events that old consumers can read?)
            AvroCompatibilityLevel compatibility = AvroCompatibilityChecker.checkCompatibility(
                    newSchema, Collections.singletonList(latestSchema));
                    
            if (compatibility == AvroCompatibilityLevel.NONE) {
                return CompatibilityIssue.builder()
                        .type(CompatibilityIssueType.PRODUCER_SCHEMA_INCOMPATIBLE)
                        .topic(topic)
                        .description("New producer schema is not backward compatible")
                        .severity(Severity.CRITICAL)
                        .build();
            }
            
        } catch (Exception e) {
            return CompatibilityIssue.builder()
                    .type(CompatibilityIssueType.SCHEMA_CHECK_FAILED)
                    .topic(topic)
                    .description("Failed to check schema compatibility: " + e.getMessage())
                    .severity(Severity.WARNING)
                    .build();
        }
        
        return null; // No issues
    }
    
    private CompatibilityIssue checkConsumerCompatibility(String topic, String newVersion) {
        // Check if new consumer can handle existing events in topic
        try {
            List<Schema> historicalSchemas = getHistoricalSchemas(topic);
            Schema newConsumerSchema = getConsumerSchemaForVersion(topic, newVersion);
            
            for (Schema historicalSchema : historicalSchemas) {
                AvroCompatibilityLevel compatibility = AvroCompatibilityChecker.checkCompatibility(
                        historicalSchema, Collections.singletonList(newConsumerSchema));
                        
                if (compatibility == AvroCompatibilityLevel.NONE) {
                    return CompatibilityIssue.builder()
                            .type(CompatibilityIssueType.CONSUMER_SCHEMA_INCOMPATIBLE)
                            .topic(topic)
                            .description("New consumer cannot handle existing events")
                            .severity(Severity.CRITICAL)
                            .historicalSchemaVersion(historicalSchema.getVersion())
                            .build();
                }
            }
            
        } catch (Exception e) {
            // Similar error handling as producer check
        }
        
        return null;
    }
}

67.2 Rolling Updates

67.2.1 Controlled Rolling Updates for Event Processors

Rolling Updates in EDA erfordern besondere Aufmerksamkeit bei Consumer Groups und Event-Processing-Kontinuität:

from typing import List, Dict
from dataclasses import dataclass
import asyncio

@dataclass
class RollingUpdateConfig:
    service_name: str
    consumer_groups: List[str]
    max_parallel_updates: int = 1
    health_check_timeout: int = 300  # 5 minutes
    rollback_threshold: float = 0.05  # 5% error rate
    
class RollingUpdateManager:
    def __init__(self, kubernetes_client, kafka_admin):
        self.k8s = kubernetes_client
        self.kafka_admin = kafka_admin
        
    async def execute_rolling_update(self, config: RollingUpdateConfig, 
                                   new_image: str) -> bool:
        """Execute controlled rolling update for EDA service"""
        
        try:
            # Phase 1: Pre-deployment validation
            await self.pre_deployment_validation(config)
            
            # Phase 2: Get current deployment state
            current_pods = await self.get_service_pods(config.service_name)
            
            # Phase 3: Rolling update with consumer group awareness
            success = await self.rolling_update_with_consumer_awareness(
                config, current_pods, new_image)
                
            if not success:
                await self.rollback_deployment(config)
                return False
                
            # Phase 4: Post-deployment validation
            await self.post_deployment_validation(config)
            
            return True
            
        except Exception as e:
            await self.handle_update_failure(config, e)
            return False
    
    async def rolling_update_with_consumer_awareness(self, 
                                                   config: RollingUpdateConfig,
                                                   current_pods: List[str], 
                                                   new_image: str) -> bool:
        """Rolling update that considers consumer group rebalancing"""
        
        # Update pods in batches to minimize consumer group rebalancing
        batch_size = min(config.max_parallel_updates, len(current_pods) // 3)
        batches = [current_pods[i:i + batch_size] 
                  for i in range(0, len(current_pods), batch_size)]
        
        for batch_index, pod_batch in enumerate(batches):
            
            # Pre-batch: Ensure consumer groups are stable
            await self.wait_for_consumer_group_stability(config.consumer_groups)
            
            # Update current batch
            await self.update_pod_batch(pod_batch, new_image)
            
            # Wait for new pods to be ready
            await self.wait_for_pods_ready(pod_batch, config.health_check_timeout)
            
            # Validate consumer group health after rebalancing
            health_check = await self.validate_consumer_group_health(
                config.consumer_groups)
                
            if not health_check.success:
                raise Exception(f"Consumer group health check failed: {health_check.errors}")
            
            # Monitor error rates
            error_rate = await self.get_current_error_rate(config.service_name)
            if error_rate > config.rollback_threshold:
                raise Exception(f"Error rate {error_rate} exceeds threshold {config.rollback_threshold}")
        
        return True
    
    async def wait_for_consumer_group_stability(self, consumer_groups: List[str]):
        """Wait for consumer groups to be in stable state (no rebalancing)"""
        
        for group in consumer_groups:
            stable = False
            attempts = 0
            
            while not stable and attempts < 30:  # Max 5 minutes
                group_state = await self.get_consumer_group_state(group)
                
                if group_state.state == 'Stable':
                    stable = True
                else:
                    await asyncio.sleep(10)
                    attempts += 1
            
            if not stable:
                raise Exception(f"Consumer group {group} did not stabilize")
    
    async def validate_consumer_group_health(self, consumer_groups: List[str]) -> 'ValidationResult':
        """Validate health of consumer groups after pod updates"""
        
        errors = []
        
        for group in consumer_groups:
            # Check consumer lag
            lag_metrics = await self.get_consumer_lag(group)
            max_lag = max(lag_metrics.values()) if lag_metrics else 0
            
            if max_lag > 10000:  # 10k message lag threshold
                errors.append(f"High consumer lag for group {group}: {max_lag}")
            
            # Check processing rate
            processing_rate = await self.get_processing_rate(group)
            if processing_rate < 0.8:  # Less than 80% of expected rate
                errors.append(f"Low processing rate for group {group}: {processing_rate}")
            
            # Check error rate
            error_rate = await self.get_group_error_rate(group)
            if error_rate > 0.05:  # More than 5% errors
                errors.append(f"High error rate for group {group}: {error_rate}")
        
        return ValidationResult(success=len(errors) == 0, errors=errors)

67.2.2 Event-Processing Continuity During Updates

@Component
public class ContinuityManager {
    
    @Value("${deployment.enable.graceful.shutdown:true}")
    private boolean enableGracefulShutdown;
    
    @PreDestroy
    public void gracefulShutdown() {
        if (!enableGracefulShutdown) {
            return;
        }
        
        log.info("Starting graceful shutdown of event processors");
        
        // Phase 1: Stop accepting new events
        stopAcceptingNewEvents();
        
        // Phase 2: Complete processing of in-flight events
        completeInFlightProcessing();
        
        // Phase 3: Commit final offsets
        commitFinalOffsets();
        
        log.info("Graceful shutdown completed");
    }
    
    private void stopAcceptingNewEvents() {
        // Pause all Kafka listeners
        kafkaListenerEndpointRegistry.getAllListenerContainers()
                .forEach(container -> {
                    log.info("Pausing container: {}", container.getGroupId());
                    container.pause();
                });
    }
    
    private void completeInFlightProcessing() {
        // Wait for current processing to complete
        int maxWaitSeconds = 60;
        int waitedSeconds = 0;
        
        while (hasInFlightProcessing() && waitedSeconds < maxWaitSeconds) {
            try {
                Thread.sleep(1000);
                waitedSeconds++;
                
                if (waitedSeconds % 10 == 0) {
                    log.info("Waiting for in-flight processing completion: {}s elapsed", waitedSeconds);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        if (hasInFlightProcessing()) {
            log.warn("Forcing shutdown with in-flight processing remaining");
        }
    }
    
    private void commitFinalOffsets() {
        kafkaListenerEndpointRegistry.getAllListenerContainers()
                .forEach(container -> {
                    if (container instanceof ConcurrentMessageListenerContainer) {
                        ConcurrentMessageListenerContainer<?> concreteContainer = 
                                (ConcurrentMessageListenerContainer<?>) container;
                                
                        // Force final offset commit
                        concreteContainer.getContainerProperties()
                                .getConsumerRebalanceListener()
                                .onPartitionsRevoked(Collections.emptyList());
                    }
                });
    }
    
    @Bean
    public ConsumerRebalanceListener gracefulRebalanceListener() {
        return new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                log.info("Partitions revoked during rebalance: {}", partitions);
                
                // Complete processing of current batch
                completeCurrentBatch();
                
                // Commit offsets for revoked partitions
                commitOffsetsForPartitions(partitions);
            }
            
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                log.info("Partitions assigned during rebalance: {}", partitions);
                
                // Initialize processing for new partitions
                initializePartitionProcessing(partitions);
            }
        };
    }
}

67.3 Schema Migration

67.3.1 Event Schema Evolution Strategy

from typing import Dict, Any, Optional
from enum import Enum
import json

class SchemaEvolutionType(Enum):
    BACKWARD_COMPATIBLE = "backward_compatible"  # Old consumers can read new events
    FORWARD_COMPATIBLE = "forward_compatible"   # New consumers can read old events
    FULL_COMPATIBLE = "full_compatible"         # Both directions
    BREAKING_CHANGE = "breaking_change"         # Incompatible

class SchemaVersionManager:
    def __init__(self, schema_registry_client):
        self.schema_registry = schema_registry_client
        self.migration_strategies = {
            SchemaEvolutionType.BACKWARD_COMPATIBLE: self.backward_compatible_migration,
            SchemaEvolutionType.FORWARD_COMPATIBLE: self.forward_compatible_migration,
            SchemaEvolutionType.FULL_COMPATIBLE: self.full_compatible_migration,
            SchemaEvolutionType.BREAKING_CHANGE: self.breaking_change_migration
        }
    
    async def execute_schema_migration(self, topic: str, new_schema: Dict[str, Any], 
                                     evolution_type: SchemaEvolutionType) -> bool:
        """Execute schema migration based on compatibility type"""
        
        migration_strategy = self.migration_strategies[evolution_type]
        return await migration_strategy(topic, new_schema)
    
    async def backward_compatible_migration(self, topic: str, new_schema: Dict[str, Any]) -> bool:
        """Migration for backward compatible changes (add optional fields, remove fields)"""
        
        # Phase 1: Validate backward compatibility
        compatibility_check = await self.validate_backward_compatibility(topic, new_schema)
        if not compatibility_check.is_compatible:
            raise Exception(f"Schema is not backward compatible: {compatibility_check.issues}")
        
        # Phase 2: Update producers first (they write new format)
        await self.update_producers(topic, new_schema)
        
        # Phase 3: Wait for producer rollout completion
        await self.wait_for_producer_rollout(topic)
        
        # Phase 4: Update consumers (they must handle both old and new format)
        await self.update_consumers(topic, new_schema)
        
        # Phase 5: Register new schema version
        await self.register_schema_version(topic, new_schema)
        
        return True
    
    async def forward_compatible_migration(self, topic: str, new_schema: Dict[str, Any]) -> bool:
        """Migration for forward compatible changes (remove optional fields, add fields)"""
        
        # Phase 1: Update consumers first (they must handle new format)
        await self.update_consumers(topic, new_schema)
        
        # Phase 2: Wait for consumer rollout completion
        await self.wait_for_consumer_rollout(topic)
        
        # Phase 3: Update producers (they write new format)
        await self.update_producers(topic, new_schema)
        
        # Phase 4: Register new schema version
        await self.register_schema_version(topic, new_schema)
        
        return True
    
    async def breaking_change_migration(self, topic: str, new_schema: Dict[str, Any]) -> bool:
        """Migration for breaking changes - requires topic versioning"""
        
        new_topic = f"{topic}.v{await self.get_next_version_number(topic)}"
        
        # Phase 1: Create new topic with new schema
        await self.create_versioned_topic(new_topic, new_schema)
        
        # Phase 2: Deploy dual-write producers
        await self.deploy_dual_write_producers(topic, new_topic, new_schema)
        
        # Phase 3: Deploy new consumers for new topic
        await self.deploy_new_topic_consumers(new_topic, new_schema)
        
        # Phase 4: Migrate existing data (if needed)
        await self.migrate_existing_events(topic, new_topic, new_schema)
        
        # Phase 5: Switch traffic to new topic
        await self.switch_to_new_topic(topic, new_topic)
        
        # Phase 6: Deprecate old topic (after migration period)
        await self.schedule_topic_deprecation(topic)
        
        return True

class EventTransformer:
    """Transform events between different schema versions"""
    
    def __init__(self):
        self.transformers = {}  # version -> transformer function
    
    def register_transformer(self, from_version: str, to_version: str, 
                           transformer_func):
        """Register a transformation function between schema versions"""
        key = f"{from_version}->{to_version}"
        self.transformers[key] = transformer_func
    
    def transform_event(self, event: Dict[str, Any], from_version: str, 
                       to_version: str) -> Dict[str, Any]:
        """Transform event from one schema version to another"""
        
        transformation_key = f"{from_version}->{to_version}"
        
        if transformation_key not in self.transformers:
            # Try to find a transformation path
            path = self.find_transformation_path(from_version, to_version)
            if not path:
                raise Exception(f"No transformation path from {from_version} to {to_version}")
            
            # Apply chained transformations
            current_event = event
            for step in path:
                current_event = self.transformers[step](current_event)
            return current_event
        
        return self.transformers[transformation_key](event)
    
    def find_transformation_path(self, from_version: str, to_version: str) -> Optional[List[str]]:
        """Find a path of transformations from source to target version"""
        # Simplified path finding - in practice, use graph algorithms
        available_transformations = list(self.transformers.keys())
        
        # Direct transformation
        direct_path = f"{from_version}->{to_version}"
        if direct_path in available_transformations:
            return [direct_path]
        
        # Two-step transformation (simplified)
        for transformation in available_transformations:
            from_v, to_v = transformation.split('->')
            if from_v == from_version:
                next_path = f"{to_v}->{to_version}"
                if next_path in available_transformations:
                    return [transformation, next_path]
        
        return None

# Example transformer registration
transformer = EventTransformer()

# Order v1 -> v2: Add currency field with default
transformer.register_transformer("order.v1", "order.v2", 
    lambda event: {**event, "currency": "EUR"})

# Order v2 -> v3: Rename totalAmount to total_amount  
transformer.register_transformer("order.v2", "order.v3",
    lambda event: {
        **{k: v for k, v in event.items() if k != "totalAmount"},
        "total_amount": event.get("totalAmount")
    })

67.3.2 Automated Schema Migration Pipeline

@Component
public class SchemaMigrationPipeline {
    
    private final SchemaRegistryClient schemaRegistry;
    private final DeploymentManager deploymentManager;
    private final CompatibilityChecker compatibilityChecker;
    
    public MigrationResult executeSchemaMigration(SchemaMigrationRequest request) {
        
        MigrationPlan plan = createMigrationPlan(request);
        
        try {
            // Phase 1: Pre-migration validation
            ValidationResult validation = validateMigration(plan);
            if (!validation.isValid()) {
                return MigrationResult.failed(validation.getErrors());
            }
            
            // Phase 2: Execute migration steps
            for (MigrationStep step : plan.getSteps()) {
                StepResult result = executeStep(step);
                if (!result.isSuccessful()) {
                    rollbackMigration(plan, step);
                    return MigrationResult.failed(result.getErrors());
                }
            }
            
            // Phase 3: Post-migration validation
            ValidationResult postValidation = validatePostMigration(plan);
            if (!postValidation.isValid()) {
                rollbackMigration(plan, null);
                return MigrationResult.failed(postValidation.getErrors());
            }
            
            return MigrationResult.success();
            
        } catch (Exception e) {
            log.error("Schema migration failed", e);
            rollbackMigration(plan, null);
            return MigrationResult.failed(List.of(e.getMessage()));
        }
    }
    
    private MigrationPlan createMigrationPlan(SchemaMigrationRequest request) {
        Schema currentSchema = schemaRegistry.getLatestSchemaMetadata(
                request.getTopic() + "-value").getSchema();
        Schema targetSchema = request.getNewSchema();
        
        CompatibilityLevel compatibility = compatibilityChecker.checkCompatibility(
                currentSchema, targetSchema);
                
        List<MigrationStep> steps = switch (compatibility) {
            case BACKWARD -> createBackwardCompatibleSteps(request);
            case FORWARD -> createForwardCompatibleSteps(request);
            case FULL -> createFullCompatibleSteps(request);
            case NONE -> createBreakingChangeSteps(request);
        };
        
        return MigrationPlan.builder()
                .request(request)
                .currentSchema(currentSchema)
                .targetSchema(targetSchema)
                .compatibility(compatibility)
                .steps(steps)
                .build();
    }
    
    private List<MigrationStep> createBackwardCompatibleSteps(SchemaMigrationRequest request) {
        return List.of(
            MigrationStep.builder()
                    .type(StepType.UPDATE_PRODUCERS)
                    .description("Update producers to write new schema format")
                    .services(request.getProducerServices())
                    .build(),
            MigrationStep.builder()
                    .type(StepType.WAIT_FOR_ROLLOUT)
                    .description("Wait for producer rollout completion")
                    .timeout(Duration.ofMinutes(10))
                    .build(),
            MigrationStep.builder()
                    .type(StepType.UPDATE_CONSUMERS)
                    .description("Update consumers to handle new schema format")
                    .services(request.getConsumerServices())
                    .build(),
            MigrationStep.builder()
                    .type(StepType.REGISTER_SCHEMA)
                    .description("Register new schema version in registry")
                    .schema(request.getNewSchema())
                    .build()
        );
    }
    
    private StepResult executeStep(MigrationStep step) {
        return switch (step.getType()) {
            case UPDATE_PRODUCERS -> updateProducers(step);
            case UPDATE_CONSUMERS -> updateConsumers(step);
            case WAIT_FOR_ROLLOUT -> waitForRollout(step);
            case REGISTER_SCHEMA -> registerSchema(step);
            case CREATE_TOPIC -> createTopicVersion(step);
            case MIGRATE_DATA -> migrateData(step);
        };
    }
}

Diese Deployment-Strategien ermöglichen es Ihnen, Event-Driven Architekturen sicher und kontrolliert zu aktualisieren, während Sie die Besonderheiten asynchroner Systeme und Event-Schema-Evolution berücksichtigen.