18 Schema Evolution und Versionierung

Event-Schemas sind lebende Artefakte, die sich mit den Geschäftsanforderungen weiterentwickeln. Die Kunst liegt darin, Events so zu evolieren, dass bestehende Consumer weiterhin funktionieren, während neue Funktionalitäten ermöglicht werden. Schema Evolution ist weniger ein technisches Problem als vielmehr eine architektonische Disziplin.

18.1 Forward- und Backward-Compatibility

Backward-Compatibility stellt sicher, dass neue Event-Versionen von bestehenden Consumern verarbeitet werden können. Forward-Compatibility ermöglicht älteren Producern, Events zu senden, die von neueren Consumern verstanden werden.

Backward-Compatible Evolution - Neue Felder sind optional:

// Spring Boot - Backward-Compatible Schema Evolution
// Version 1: Ursprüngliches OrderPlaced Event
public class OrderPlacedV1 {
    private String eventId;
    private String orderId;
    private String customerId;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private Instant placedAt;
    
    // Basis-Event ohne erweiterte Felder
}

// Version 2: Erweitert um optionale Felder
public class OrderPlacedV2 {
    // Alle ursprünglichen Felder bleiben unverändert
    private String eventId;
    private String orderId;
    private String customerId;
    private List<OrderItem> items;
    private BigDecimal totalAmount;
    private Instant placedAt;
    
    // Neue optionale Felder für erweiterte Funktionalität
    @JsonInclude(JsonInclude.Include.NON_NULL)
    private String promotionCode;
    
    @JsonInclude(JsonInclude.Include.NON_NULL)
    private CustomerTier customerTier;
    
    @JsonInclude(JsonInclude.Include.NON_NULL)
    private Map<String, String> metadata;
    
    // Konstruktor für Backward-Compatibility
    public OrderPlacedV2(String eventId, String orderId, String customerId, 
                        List<OrderItem> items, BigDecimal totalAmount, Instant placedAt) {
        this.eventId = eventId;
        this.orderId = orderId;
        this.customerId = customerId;
        this.items = items;
        this.totalAmount = totalAmount;
        this.placedAt = placedAt;
        // Neue Felder bleiben null - das ist OK!
    }
}
# Python - Schema Evolution mit Optional Fields
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Any
from datetime import datetime
from decimal import Decimal
from enum import Enum

class CustomerTier(Enum):
    BASIC = "basic"
    PREMIUM = "premium"
    VIP = "vip"

@dataclass
class OrderPlacedV1:
    """Ursprüngliche Event-Version"""
    event_id: str
    order_id: str
    customer_id: str
    items: List[dict]
    total_amount: Decimal
    placed_at: datetime

@dataclass  
class OrderPlacedV2:
    """Backward-compatible Evolution"""
    # Originalfelder unverändert
    event_id: str
    order_id: str
    customer_id: str
    items: List[dict]
    total_amount: Decimal
    placed_at: datetime
    
    # Neue optionale Felder
    promotion_code: Optional[str] = None
    customer_tier: Optional[CustomerTier] = None
    metadata: Optional[Dict[str, Any]] = field(default_factory=dict)
    
    @classmethod
    def from_v1(cls, v1_event: OrderPlacedV1) -> 'OrderPlacedV2':
        """Migration von V1 zu V2 ohne Datenverlust"""
        return cls(
            event_id=v1_event.event_id,
            order_id=v1_event.order_id,
            customer_id=v1_event.customer_id,
            items=v1_event.items,
            total_amount=v1_event.total_amount,
            placed_at=v1_event.placed_at
            # Neue Felder verwenden Defaults
        )

Forward-Compatible Consumer - Behandelt unbekannte Felder graceful:

// Spring Boot - Forward-Compatible Event Consumer
@Component
public class ForwardCompatibleOrderConsumer {
    
    private final ObjectMapper objectMapper;
    
    public ForwardCompatibleOrderConsumer() {
        this.objectMapper = new ObjectMapper()
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
            .configure(DeserializationFeature.IGNORE_UNKNOWN_PROPERTIES, true);
    }
    
    @KafkaListener(topics = "order.placed")
    public void handleOrderPlaced(String eventJson) {
        try {
            // Parser ignoriert unbekannte Felder automatisch
            JsonNode eventNode = objectMapper.readTree(eventJson);
            
            // Verarbeitung nur der bekannten Felder
            String orderId = eventNode.get("orderId").asText();
            String customerId = eventNode.get("customerId").asText();
            BigDecimal totalAmount = new BigDecimal(eventNode.get("totalAmount").asText());
            
            processOrder(orderId, customerId, totalAmount);
            
            // Unbekannte Felder werden einfach ignoriert
            
        } catch (Exception e) {
            handleParsingError(eventJson, e);
        }
    }
    
    private void processOrder(String orderId, String customerId, BigDecimal amount) {
        // Geschäftslogik basiert nur auf stabilen Feldern
        logger.info("Processing order {} for customer {} with amount {}", 
                   orderId, customerId, amount);
    }
}
# Python - Forward-Compatible Event Processing
import json
from typing import Dict, Any
import logging

class ForwardCompatibleEventProcessor:
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def process_order_placed_event(self, event_json: str) -> None:
        """Verarbeitet Events unabhängig von der Version"""
        try:
            event_data = json.loads(event_json)
            
            # Extraktion nur der Kern-Felder
            core_fields = self._extract_core_fields(event_data)
            
            if self._validate_core_fields(core_fields):
                self._process_order(core_fields)
            else:
                self.logger.warning("Invalid core fields in event")
                
        except json.JSONDecodeError as e:
            self.logger.error(f"Failed to parse event JSON: {e}")
        except Exception as e:
            self.logger.error(f"Unexpected error processing event: {e}")
    
    def _extract_core_fields(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
        """Extrahiert nur die stabilen Kern-Felder"""
        return {
            'event_id': event_data.get('event_id'),
            'order_id': event_data.get('order_id'),
            'customer_id': event_data.get('customer_id'),
            'total_amount': event_data.get('total_amount'),
            'placed_at': event_data.get('placed_at')
        }
    
    def _validate_core_fields(self, fields: Dict[str, Any]) -> bool:
        """Validiert, dass alle erforderlichen Kern-Felder vorhanden sind"""
        required = ['event_id', 'order_id', 'customer_id', 'total_amount']
        return all(fields.get(field) is not None for field in required)
    
    def _process_order(self, core_fields: Dict[str, Any]) -> None:
        """Geschäftslogik basiert nur auf stabilen Feldern"""
        self.logger.info(f"Processing order {core_fields['order_id']} "
                        f"for customer {core_fields['customer_id']}")
Compatibility-Typ Änderungsrichtung Anforderung Beispiel
Backward Neue Producer → Alte Consumer Felder optional/default V2 Event → V1 Consumer
Forward Alte Producer → Neue Consumer Unbekannte Felder ignorieren V1 Event → V2 Consumer
Full Bidirektional Beide Richtungen erfüllt V1 ↔︎ V2 kompatibel

18.2 Breaking Changes vermeiden

Breaking Changes entstehen durch inkompatible Modifikationen existierender Event-Strukturen. Die systematische Vermeidung erfordert Designdisziplin und klare Evolutionsregeln.

Sichere Schema-Änderungen:

// Spring Boot - Sichere vs. Gefährliche Schema-Evolution
public class SafeSchemaEvolution {
    
    // ✅ SICHER: Optionale Felder hinzufügen
    public static class OrderPlacedSafe {
        // Bestehende Felder unverändert
        private String orderId;
        private BigDecimal totalAmount;
        
        // Neue optionale Felder
        @JsonInclude(JsonInclude.Include.NON_NULL)
        private String loyaltyPointsEarned;  // Optional, null ist OK
        
        @JsonInclude(JsonInclude.Include.NON_NULL)
        private List<String> appliedPromotions; // Optional, leere Liste ist OK
    }
    
    // ✅ SICHER: Felder erweitern (String → erweiterte Struktur)
    public static class CustomerInfo {
        private String customerId;
        // War: private String customerEmail;
        // Jetzt: Struktur mit Backward-Compatibility
        @JsonUnwrapped
        private CustomerContact contact;
    }
    
    public static class CustomerContact {
        @JsonProperty("customerEmail") // Behält alten JSON-Namen
        private String email;
        
        @JsonInclude(JsonInclude.Include.NON_NULL)
        private String phone; // Neu und optional
    }
}

public class DangerousSchemaEvolution {
    
    // ❌ BREAKING: Feld umbenennen
    public static class OrderPlacedBreaking {
        // private String orderId; // Alt
        private String orderIdentifier; // Neu - BRICHT ALTE CONSUMER!
    }
    
    // ❌ BREAKING: Feld-Typ ändern
    public static class CustomerInfoBreaking {
        private String customerId;
        // private String totalAmount; // War String  
        private BigDecimal totalAmount; // Jetzt BigDecimal - BREAKING!
    }
    
    // ❌ BREAKING: Pflichtfeld hinzufügen
    public static class OrderPlacedRequiredField {
        private String orderId;
        private String shippingMethod; // Neu und erforderlich - BREAKING!
    }
}
# Python - Breaking Change Detection und Vermeidung
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
from enum import Enum

class SchemaChangeType(Enum):
    SAFE_ADD_OPTIONAL = "safe_add_optional"
    SAFE_EXTEND_ENUM = "safe_extend_enum"
    BREAKING_RENAME = "breaking_rename"
    BREAKING_TYPE_CHANGE = "breaking_type_change"
    BREAKING_REMOVE = "breaking_remove"

@dataclass
class SchemaChangeAnalysis:
    change_type: SchemaChangeType
    field_name: str
    is_breaking: bool
    description: str
    mitigation: Optional[str] = None

class SchemaEvolutionValidator:
    
    def __init__(self):
        self.safe_patterns = {
            'add_optional_field',
            'add_enum_value',
            'extend_string_to_object',
            'add_metadata_field'
        }
        
        self.breaking_patterns = {
            'rename_field',
            'change_field_type', 
            'remove_field',
            'add_required_field',
            'change_enum_semantics'
        }
    
    def validate_schema_change(self, old_schema: Dict, new_schema: Dict) -> List[SchemaChangeAnalysis]:
        """Analysiert Schema-Änderungen auf Breaking Changes"""
        changes = []
        
        # Prüfe auf entfernte Felder
        for field in old_schema.keys():
            if field not in new_schema:
                changes.append(SchemaChangeAnalysis(
                    change_type=SchemaChangeType.BREAKING_REMOVE,
                    field_name=field,
                    is_breaking=True,
                    description=f"Field '{field}' was removed",
                    mitigation="Deprecate field first, then remove in major version"
                ))
        
        # Prüfe auf Typ-Änderungen
        for field in old_schema.keys():
            if field in new_schema:
                old_type = old_schema[field].get('type')
                new_type = new_schema[field].get('type')
                
                if old_type != new_type:
                    changes.append(SchemaChangeAnalysis(
                        change_type=SchemaChangeType.BREAKING_TYPE_CHANGE,
                        field_name=field,
                        is_breaking=True,
                        description=f"Field '{field}' type changed from {old_type} to {new_type}",
                        mitigation="Use union types or add new field instead"
                    ))
        
        return changes

# Sichere Feld-Evolution mit Wrapper-Pattern
@dataclass
class EvolvableOrderEvent:
    """Event mit eingebauter Evolution-Fähigkeit"""
    # Kern-Daten (stabil)
    core: Dict[str, Any]
    
    # Erweiterungen (evolution-freundlich)
    extensions: Dict[str, Any]
    
    # Version für Schema-Tracking
    schema_version: str = "1.0"
    
    def get_field(self, field_name: str, default: Any = None) -> Any:
        """Sichere Feld-Extraktion mit Fallback"""
        return self.core.get(field_name, self.extensions.get(field_name, default))
    
    def add_extension(self, key: str, value: Any) -> None:
        """Fügt neue Daten hinzu ohne Breaking Changes"""
        self.extensions[key] = value

Evolution-Strategien für verschiedene Änderungstypen:

// Spring Boot - Konkrete Evolution-Patterns
@Component
public class SchemaEvolutionPatterns {
    
    // Pattern 1: Feld-Erweiterung statt Änderung
    public void demonstrateFieldExtension() {
        // Statt customerEmail → customer.email zu ändern
        // Beide Varianten parallel unterstützen
        
        class CustomerInfoEvolved {
            @JsonProperty("customerEmail") // Legacy-Support
            @Deprecated
            private String customerEmail;
            
            @JsonProperty("customer") // Neue Struktur
            private CustomerDetails customer;
            
            // Migration-Logic für Backward-Compatibility
            @JsonIgnore
            public String getEffectiveEmail() {
                return customer != null ? customer.getEmail() : customerEmail;
            }
        }
    }
    
    // Pattern 2: Versionierte Event-Wrapper
    public void demonstrateVersionedWrapper() {
        class VersionedEvent<T> {
            private String eventType;
            private String version;
            private T payload;
            private Map<String, Object> metadata;
            
            // Consumer können basierend auf Version entscheiden
            public Optional<T> getPayloadIfCompatible(String minVersion) {
                return isVersionCompatible(version, minVersion) 
                    ? Optional.of(payload) 
                    : Optional.empty();
            }
        }
    }
    
    // Pattern 3: Union-Types für Type-Evolution
    public void demonstrateUnionTypes() {
        class FlexibleAmount {
            @JsonProperty("value")
            private Object value; // String oder BigDecimal
            
            @JsonProperty("type")
            private String valueType; // "string" oder "decimal"
            
            public BigDecimal getAsDecimal() {
                if ("decimal".equals(valueType) && value instanceof BigDecimal) {
                    return (BigDecimal) value;
                } else if ("string".equals(valueType) && value instanceof String) {
                    return new BigDecimal((String) value);
                }
                throw new IllegalStateException("Cannot convert to decimal");
            }
        }
    }
}

18.3 Migration Strategies

Schema-Migration erfordert koordinierte Strategien für Producer und Consumer, um zero-downtime Evolution zu ermöglichen.

Blue-Green Schema Migration:

# Python - Blue-Green Schema Migration Pattern
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import logging

class EventSchemaVersion(ABC):
    """Basis für Schema-Versionen"""
    
    @abstractmethod
    def get_version(self) -> str:
        pass
    
    @abstractmethod
    def serialize_event(self, event_data: Dict[str, Any]) -> bytes:
        pass
    
    @abstractmethod
    def deserialize_event(self, event_bytes: bytes) -> Dict[str, Any]:
        pass

class OrderPlacedV1Schema(EventSchemaVersion):
    def get_version(self) -> str:
        return "1.0"
    
    def serialize_event(self, event_data: Dict[str, Any]) -> bytes:
        # V1 Serialisierung - nur Kern-Felder
        v1_data = {
            'event_id': event_data['event_id'],
            'order_id': event_data['order_id'],
            'customer_id': event_data['customer_id'],
            'total_amount': str(event_data['total_amount'])
        }
        return json.dumps(v1_data).encode('utf-8')
    
    def deserialize_event(self, event_bytes: bytes) -> Dict[str, Any]:
        return json.loads(event_bytes.decode('utf-8'))

class OrderPlacedV2Schema(EventSchemaVersion):
    def get_version(self) -> str:
        return "2.0"
    
    def serialize_event(self, event_data: Dict[str, Any]) -> bytes:
        # V2 Serialisierung - alle Felder
        return json.dumps(event_data, default=str).encode('utf-8')
    
    def deserialize_event(self, event_bytes: bytes) -> Dict[str, Any]:
        return json.loads(event_bytes.decode('utf-8'))

class SchemaAwareEventProducer:
    """Producer mit Multi-Version Support"""
    
    def __init__(self):
        self.schemas = {
            "1.0": OrderPlacedV1Schema(),
            "2.0": OrderPlacedV2Schema()
        }
        self.default_version = "2.0"
        self.consumer_versions = {}  # Tracking der Consumer-Capabilities
    
    def publish_event(self, topic: str, event_data: Dict[str, Any], 
                     target_consumers: List[str] = None) -> None:
        """Publiziert Event in kompatiblen Versionen"""
        
        if target_consumers:
            # Ermittle minimale Schema-Version für Target-Consumer
            min_version = self._get_minimum_supported_version(target_consumers)
        else:
            # Verwende Default-Version
            min_version = self.default_version
        
        schema = self.schemas[min_version]
        serialized_event = schema.serialize_event(event_data)
        
        # Event-Header mit Schema-Version
        headers = {
            'schema-version': min_version,
            'content-type': 'application/json'
        }
        
        self._send_to_kafka(topic, serialized_event, headers)
    
    def _get_minimum_supported_version(self, consumers: List[str]) -> str:
        """Ermittelt niedrigste unterstützte Schema-Version"""
        supported_versions = [
            self.consumer_versions.get(consumer, "1.0") 
            for consumer in consumers
        ]
        return min(supported_versions)

class SchemaAwareEventConsumer:
    """Consumer mit Multi-Version Support"""
    
    def __init__(self, supported_versions: List[str]):
        self.schemas = {
            "1.0": OrderPlacedV1Schema(),
            "2.0": OrderPlacedV2Schema()
        }
        self.supported_versions = supported_versions
        self.logger = logging.getLogger(__name__)
    
    def process_event(self, event_bytes: bytes, headers: Dict[str, str]) -> None:
        """Verarbeitet Events verschiedener Schema-Versionen"""
        schema_version = headers.get('schema-version', '1.0')
        
        if schema_version not in self.supported_versions:
            self.logger.warning(f"Unsupported schema version: {schema_version}")
            return
        
        schema = self.schemas[schema_version]
        event_data = schema.deserialize_event(event_bytes)
        
        # Normalisierung auf interne Struktur
        normalized_event = self._normalize_event(event_data, schema_version)
        self._process_normalized_event(normalized_event)
    
    def _normalize_event(self, event_data: Dict[str, Any], version: str) -> Dict[str, Any]:
        """Normalisiert Events verschiedener Versionen"""
        if version == "1.0":
            # V1 Event zu interner Struktur
            return {
                'order_id': event_data['order_id'],
                'customer_id': event_data['customer_id'],
                'total_amount': Decimal(event_data['total_amount']),
                'metadata': {}  # V1 hat keine Metadata
            }
        elif version == "2.0":
            # V2 Event bereits kompatibel
            return event_data
        
        raise ValueError(f"Unknown schema version: {version}")

Graduelle Migration-Strategie:

// Spring Boot - Graduelle Schema-Migration
@Component
public class GradualSchemaMigration {
    
    private final EventVersionRegistry versionRegistry;
    private final MetricsService metricsService;
    
    // Phase 1: Dual-Write (Producer schreibt beide Versionen)
    public void dualWritePhase(OrderEvent event) {
        // Schreibe V1 für Backward-Compatibility
        String v1Topic = "order.placed.v1";
        OrderPlacedV1 v1Event = convertToV1(event);
        kafkaTemplate.send(v1Topic, v1Event);
        
        // Schreibe V2 für neue Consumer
        String v2Topic = "order.placed.v2";
        OrderPlacedV2 v2Event = convertToV2(event);
        kafkaTemplate.send(v2Topic, v2Event);
        
        metricsService.incrementCounter("schema.migration.dual_write");
    }
    
    // Phase 2: Consumer-Migration (Consumer lesen neue Version)
    public void consumerMigrationPhase() {
        // Consumer werden graduell auf V2-Topic umgestellt
        // V1-Topic läuft parallel weiter
        
        List<String> migratedConsumers = versionRegistry.getMigratedConsumers();
        double migrationProgress = calculateMigrationProgress(migratedConsumers);
        
        if (migrationProgress > 0.9) { // 90% migriert
            logger.info("Ready for V1 topic deprecation");
        }
    }
    
    // Phase 3: Cleanup (V1-Topic entfernen)
    public void cleanupPhase() {
        // Nur noch V2-Topic verwenden
        // V1-Producer-Code entfernen
        
        metricsService.incrementCounter("schema.migration.completed");
    }
    
    private OrderPlacedV1 convertToV1(OrderEvent event) {
        // Reduzierung auf V1-Felder
        return new OrderPlacedV1(
            event.getEventId(),
            event.getOrderId(),
            event.getCustomerId(),
            event.getItems(),
            event.getTotalAmount(),
            event.getPlacedAt()
        );
    }
}

Migration-Timeline und Koordination:

Phase Dauer Producer Consumer Topics Risiko
Prep 1 Woche V1 V1 v1 Niedrig
Dual-Write 2 Wochen V1+V2 V1 v1, v2 Mittel
Consumer-Migration 4 Wochen V1+V2 V1→V2 v1, v2 Mittel
Cleanup 1 Woche V2 V2 v2 Niedrig

Schema Evolution erfordert Planung, Disziplin und schrittweise Umsetzung. Forward- und Backward-Compatibility ermöglichen sichere Evolution, während systematische Migration-Strategien zero-downtime Upgrades gewährleisten. Die wichtigste Regel bleibt: Breaking Changes sind fast immer vermeidbar, wenn das Schema von Anfang an evolution-freundlich designed wird.