Event-Schemas sind lebende Artefakte, die sich mit den Geschäftsanforderungen weiterentwickeln. Die Kunst liegt darin, Events so zu evolieren, dass bestehende Consumer weiterhin funktionieren, während neue Funktionalitäten ermöglicht werden. Schema Evolution ist weniger ein technisches Problem als vielmehr eine architektonische Disziplin.
Backward-Compatibility stellt sicher, dass neue Event-Versionen von bestehenden Consumern verarbeitet werden können. Forward-Compatibility ermöglicht älteren Producern, Events zu senden, die von neueren Consumern verstanden werden.
Backward-Compatible Evolution - Neue Felder sind optional:
// Spring Boot - Backward-Compatible Schema Evolution
// Version 1: Ursprüngliches OrderPlaced Event
public class OrderPlacedV1 {
private String eventId;
private String orderId;
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private Instant placedAt;
// Basis-Event ohne erweiterte Felder
}
// Version 2: Erweitert um optionale Felder
public class OrderPlacedV2 {
// Alle ursprünglichen Felder bleiben unverändert
private String eventId;
private String orderId;
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private Instant placedAt;
// Neue optionale Felder für erweiterte Funktionalität
@JsonInclude(JsonInclude.Include.NON_NULL)
private String promotionCode;
@JsonInclude(JsonInclude.Include.NON_NULL)
private CustomerTier customerTier;
@JsonInclude(JsonInclude.Include.NON_NULL)
private Map<String, String> metadata;
// Konstruktor für Backward-Compatibility
public OrderPlacedV2(String eventId, String orderId, String customerId,
List<OrderItem> items, BigDecimal totalAmount, Instant placedAt) {
this.eventId = eventId;
this.orderId = orderId;
this.customerId = customerId;
this.items = items;
this.totalAmount = totalAmount;
this.placedAt = placedAt;
// Neue Felder bleiben null - das ist OK!
}
}# Python - Schema Evolution mit Optional Fields
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Any
from datetime import datetime
from decimal import Decimal
from enum import Enum
class CustomerTier(Enum):
BASIC = "basic"
PREMIUM = "premium"
VIP = "vip"
@dataclass
class OrderPlacedV1:
"""Ursprüngliche Event-Version"""
event_id: str
order_id: str
customer_id: str
items: List[dict]
total_amount: Decimal
placed_at: datetime
@dataclass
class OrderPlacedV2:
"""Backward-compatible Evolution"""
# Originalfelder unverändert
event_id: str
order_id: str
customer_id: str
items: List[dict]
total_amount: Decimal
placed_at: datetime
# Neue optionale Felder
promotion_code: Optional[str] = None
customer_tier: Optional[CustomerTier] = None
metadata: Optional[Dict[str, Any]] = field(default_factory=dict)
@classmethod
def from_v1(cls, v1_event: OrderPlacedV1) -> 'OrderPlacedV2':
"""Migration von V1 zu V2 ohne Datenverlust"""
return cls(
event_id=v1_event.event_id,
order_id=v1_event.order_id,
customer_id=v1_event.customer_id,
items=v1_event.items,
total_amount=v1_event.total_amount,
placed_at=v1_event.placed_at
# Neue Felder verwenden Defaults
)Forward-Compatible Consumer - Behandelt unbekannte Felder graceful:
// Spring Boot - Forward-Compatible Event Consumer
@Component
public class ForwardCompatibleOrderConsumer {
private final ObjectMapper objectMapper;
public ForwardCompatibleOrderConsumer() {
this.objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(DeserializationFeature.IGNORE_UNKNOWN_PROPERTIES, true);
}
@KafkaListener(topics = "order.placed")
public void handleOrderPlaced(String eventJson) {
try {
// Parser ignoriert unbekannte Felder automatisch
JsonNode eventNode = objectMapper.readTree(eventJson);
// Verarbeitung nur der bekannten Felder
String orderId = eventNode.get("orderId").asText();
String customerId = eventNode.get("customerId").asText();
BigDecimal totalAmount = new BigDecimal(eventNode.get("totalAmount").asText());
processOrder(orderId, customerId, totalAmount);
// Unbekannte Felder werden einfach ignoriert
} catch (Exception e) {
handleParsingError(eventJson, e);
}
}
private void processOrder(String orderId, String customerId, BigDecimal amount) {
// Geschäftslogik basiert nur auf stabilen Feldern
logger.info("Processing order {} for customer {} with amount {}",
orderId, customerId, amount);
}
}# Python - Forward-Compatible Event Processing
import json
from typing import Dict, Any
import logging
class ForwardCompatibleEventProcessor:
def __init__(self):
self.logger = logging.getLogger(__name__)
def process_order_placed_event(self, event_json: str) -> None:
"""Verarbeitet Events unabhängig von der Version"""
try:
event_data = json.loads(event_json)
# Extraktion nur der Kern-Felder
core_fields = self._extract_core_fields(event_data)
if self._validate_core_fields(core_fields):
self._process_order(core_fields)
else:
self.logger.warning("Invalid core fields in event")
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse event JSON: {e}")
except Exception as e:
self.logger.error(f"Unexpected error processing event: {e}")
def _extract_core_fields(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
"""Extrahiert nur die stabilen Kern-Felder"""
return {
'event_id': event_data.get('event_id'),
'order_id': event_data.get('order_id'),
'customer_id': event_data.get('customer_id'),
'total_amount': event_data.get('total_amount'),
'placed_at': event_data.get('placed_at')
}
def _validate_core_fields(self, fields: Dict[str, Any]) -> bool:
"""Validiert, dass alle erforderlichen Kern-Felder vorhanden sind"""
required = ['event_id', 'order_id', 'customer_id', 'total_amount']
return all(fields.get(field) is not None for field in required)
def _process_order(self, core_fields: Dict[str, Any]) -> None:
"""Geschäftslogik basiert nur auf stabilen Feldern"""
self.logger.info(f"Processing order {core_fields['order_id']} "
f"for customer {core_fields['customer_id']}")| Compatibility-Typ | Änderungsrichtung | Anforderung | Beispiel |
|---|---|---|---|
| Backward | Neue Producer → Alte Consumer | Felder optional/default | V2 Event → V1 Consumer |
| Forward | Alte Producer → Neue Consumer | Unbekannte Felder ignorieren | V1 Event → V2 Consumer |
| Full | Bidirektional | Beide Richtungen erfüllt | V1 ↔︎ V2 kompatibel |
Breaking Changes entstehen durch inkompatible Modifikationen existierender Event-Strukturen. Die systematische Vermeidung erfordert Designdisziplin und klare Evolutionsregeln.
Sichere Schema-Änderungen:
// Spring Boot - Sichere vs. Gefährliche Schema-Evolution
public class SafeSchemaEvolution {
// ✅ SICHER: Optionale Felder hinzufügen
public static class OrderPlacedSafe {
// Bestehende Felder unverändert
private String orderId;
private BigDecimal totalAmount;
// Neue optionale Felder
@JsonInclude(JsonInclude.Include.NON_NULL)
private String loyaltyPointsEarned; // Optional, null ist OK
@JsonInclude(JsonInclude.Include.NON_NULL)
private List<String> appliedPromotions; // Optional, leere Liste ist OK
}
// ✅ SICHER: Felder erweitern (String → erweiterte Struktur)
public static class CustomerInfo {
private String customerId;
// War: private String customerEmail;
// Jetzt: Struktur mit Backward-Compatibility
@JsonUnwrapped
private CustomerContact contact;
}
public static class CustomerContact {
@JsonProperty("customerEmail") // Behält alten JSON-Namen
private String email;
@JsonInclude(JsonInclude.Include.NON_NULL)
private String phone; // Neu und optional
}
}
public class DangerousSchemaEvolution {
// ❌ BREAKING: Feld umbenennen
public static class OrderPlacedBreaking {
// private String orderId; // Alt
private String orderIdentifier; // Neu - BRICHT ALTE CONSUMER!
}
// ❌ BREAKING: Feld-Typ ändern
public static class CustomerInfoBreaking {
private String customerId;
// private String totalAmount; // War String
private BigDecimal totalAmount; // Jetzt BigDecimal - BREAKING!
}
// ❌ BREAKING: Pflichtfeld hinzufügen
public static class OrderPlacedRequiredField {
private String orderId;
private String shippingMethod; // Neu und erforderlich - BREAKING!
}
}# Python - Breaking Change Detection und Vermeidung
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
from enum import Enum
class SchemaChangeType(Enum):
SAFE_ADD_OPTIONAL = "safe_add_optional"
SAFE_EXTEND_ENUM = "safe_extend_enum"
BREAKING_RENAME = "breaking_rename"
BREAKING_TYPE_CHANGE = "breaking_type_change"
BREAKING_REMOVE = "breaking_remove"
@dataclass
class SchemaChangeAnalysis:
change_type: SchemaChangeType
field_name: str
is_breaking: bool
description: str
mitigation: Optional[str] = None
class SchemaEvolutionValidator:
def __init__(self):
self.safe_patterns = {
'add_optional_field',
'add_enum_value',
'extend_string_to_object',
'add_metadata_field'
}
self.breaking_patterns = {
'rename_field',
'change_field_type',
'remove_field',
'add_required_field',
'change_enum_semantics'
}
def validate_schema_change(self, old_schema: Dict, new_schema: Dict) -> List[SchemaChangeAnalysis]:
"""Analysiert Schema-Änderungen auf Breaking Changes"""
changes = []
# Prüfe auf entfernte Felder
for field in old_schema.keys():
if field not in new_schema:
changes.append(SchemaChangeAnalysis(
change_type=SchemaChangeType.BREAKING_REMOVE,
field_name=field,
is_breaking=True,
description=f"Field '{field}' was removed",
mitigation="Deprecate field first, then remove in major version"
))
# Prüfe auf Typ-Änderungen
for field in old_schema.keys():
if field in new_schema:
old_type = old_schema[field].get('type')
new_type = new_schema[field].get('type')
if old_type != new_type:
changes.append(SchemaChangeAnalysis(
change_type=SchemaChangeType.BREAKING_TYPE_CHANGE,
field_name=field,
is_breaking=True,
description=f"Field '{field}' type changed from {old_type} to {new_type}",
mitigation="Use union types or add new field instead"
))
return changes
# Sichere Feld-Evolution mit Wrapper-Pattern
@dataclass
class EvolvableOrderEvent:
"""Event mit eingebauter Evolution-Fähigkeit"""
# Kern-Daten (stabil)
core: Dict[str, Any]
# Erweiterungen (evolution-freundlich)
extensions: Dict[str, Any]
# Version für Schema-Tracking
schema_version: str = "1.0"
def get_field(self, field_name: str, default: Any = None) -> Any:
"""Sichere Feld-Extraktion mit Fallback"""
return self.core.get(field_name, self.extensions.get(field_name, default))
def add_extension(self, key: str, value: Any) -> None:
"""Fügt neue Daten hinzu ohne Breaking Changes"""
self.extensions[key] = valueEvolution-Strategien für verschiedene Änderungstypen:
// Spring Boot - Konkrete Evolution-Patterns
@Component
public class SchemaEvolutionPatterns {
// Pattern 1: Feld-Erweiterung statt Änderung
public void demonstrateFieldExtension() {
// Statt customerEmail → customer.email zu ändern
// Beide Varianten parallel unterstützen
class CustomerInfoEvolved {
@JsonProperty("customerEmail") // Legacy-Support
@Deprecated
private String customerEmail;
@JsonProperty("customer") // Neue Struktur
private CustomerDetails customer;
// Migration-Logic für Backward-Compatibility
@JsonIgnore
public String getEffectiveEmail() {
return customer != null ? customer.getEmail() : customerEmail;
}
}
}
// Pattern 2: Versionierte Event-Wrapper
public void demonstrateVersionedWrapper() {
class VersionedEvent<T> {
private String eventType;
private String version;
private T payload;
private Map<String, Object> metadata;
// Consumer können basierend auf Version entscheiden
public Optional<T> getPayloadIfCompatible(String minVersion) {
return isVersionCompatible(version, minVersion)
? Optional.of(payload)
: Optional.empty();
}
}
}
// Pattern 3: Union-Types für Type-Evolution
public void demonstrateUnionTypes() {
class FlexibleAmount {
@JsonProperty("value")
private Object value; // String oder BigDecimal
@JsonProperty("type")
private String valueType; // "string" oder "decimal"
public BigDecimal getAsDecimal() {
if ("decimal".equals(valueType) && value instanceof BigDecimal) {
return (BigDecimal) value;
} else if ("string".equals(valueType) && value instanceof String) {
return new BigDecimal((String) value);
}
throw new IllegalStateException("Cannot convert to decimal");
}
}
}
}Schema-Migration erfordert koordinierte Strategien für Producer und Consumer, um zero-downtime Evolution zu ermöglichen.
Blue-Green Schema Migration:
# Python - Blue-Green Schema Migration Pattern
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import logging
class EventSchemaVersion(ABC):
"""Basis für Schema-Versionen"""
@abstractmethod
def get_version(self) -> str:
pass
@abstractmethod
def serialize_event(self, event_data: Dict[str, Any]) -> bytes:
pass
@abstractmethod
def deserialize_event(self, event_bytes: bytes) -> Dict[str, Any]:
pass
class OrderPlacedV1Schema(EventSchemaVersion):
def get_version(self) -> str:
return "1.0"
def serialize_event(self, event_data: Dict[str, Any]) -> bytes:
# V1 Serialisierung - nur Kern-Felder
v1_data = {
'event_id': event_data['event_id'],
'order_id': event_data['order_id'],
'customer_id': event_data['customer_id'],
'total_amount': str(event_data['total_amount'])
}
return json.dumps(v1_data).encode('utf-8')
def deserialize_event(self, event_bytes: bytes) -> Dict[str, Any]:
return json.loads(event_bytes.decode('utf-8'))
class OrderPlacedV2Schema(EventSchemaVersion):
def get_version(self) -> str:
return "2.0"
def serialize_event(self, event_data: Dict[str, Any]) -> bytes:
# V2 Serialisierung - alle Felder
return json.dumps(event_data, default=str).encode('utf-8')
def deserialize_event(self, event_bytes: bytes) -> Dict[str, Any]:
return json.loads(event_bytes.decode('utf-8'))
class SchemaAwareEventProducer:
"""Producer mit Multi-Version Support"""
def __init__(self):
self.schemas = {
"1.0": OrderPlacedV1Schema(),
"2.0": OrderPlacedV2Schema()
}
self.default_version = "2.0"
self.consumer_versions = {} # Tracking der Consumer-Capabilities
def publish_event(self, topic: str, event_data: Dict[str, Any],
target_consumers: List[str] = None) -> None:
"""Publiziert Event in kompatiblen Versionen"""
if target_consumers:
# Ermittle minimale Schema-Version für Target-Consumer
min_version = self._get_minimum_supported_version(target_consumers)
else:
# Verwende Default-Version
min_version = self.default_version
schema = self.schemas[min_version]
serialized_event = schema.serialize_event(event_data)
# Event-Header mit Schema-Version
headers = {
'schema-version': min_version,
'content-type': 'application/json'
}
self._send_to_kafka(topic, serialized_event, headers)
def _get_minimum_supported_version(self, consumers: List[str]) -> str:
"""Ermittelt niedrigste unterstützte Schema-Version"""
supported_versions = [
self.consumer_versions.get(consumer, "1.0")
for consumer in consumers
]
return min(supported_versions)
class SchemaAwareEventConsumer:
"""Consumer mit Multi-Version Support"""
def __init__(self, supported_versions: List[str]):
self.schemas = {
"1.0": OrderPlacedV1Schema(),
"2.0": OrderPlacedV2Schema()
}
self.supported_versions = supported_versions
self.logger = logging.getLogger(__name__)
def process_event(self, event_bytes: bytes, headers: Dict[str, str]) -> None:
"""Verarbeitet Events verschiedener Schema-Versionen"""
schema_version = headers.get('schema-version', '1.0')
if schema_version not in self.supported_versions:
self.logger.warning(f"Unsupported schema version: {schema_version}")
return
schema = self.schemas[schema_version]
event_data = schema.deserialize_event(event_bytes)
# Normalisierung auf interne Struktur
normalized_event = self._normalize_event(event_data, schema_version)
self._process_normalized_event(normalized_event)
def _normalize_event(self, event_data: Dict[str, Any], version: str) -> Dict[str, Any]:
"""Normalisiert Events verschiedener Versionen"""
if version == "1.0":
# V1 Event zu interner Struktur
return {
'order_id': event_data['order_id'],
'customer_id': event_data['customer_id'],
'total_amount': Decimal(event_data['total_amount']),
'metadata': {} # V1 hat keine Metadata
}
elif version == "2.0":
# V2 Event bereits kompatibel
return event_data
raise ValueError(f"Unknown schema version: {version}")Graduelle Migration-Strategie:
// Spring Boot - Graduelle Schema-Migration
@Component
public class GradualSchemaMigration {
private final EventVersionRegistry versionRegistry;
private final MetricsService metricsService;
// Phase 1: Dual-Write (Producer schreibt beide Versionen)
public void dualWritePhase(OrderEvent event) {
// Schreibe V1 für Backward-Compatibility
String v1Topic = "order.placed.v1";
OrderPlacedV1 v1Event = convertToV1(event);
kafkaTemplate.send(v1Topic, v1Event);
// Schreibe V2 für neue Consumer
String v2Topic = "order.placed.v2";
OrderPlacedV2 v2Event = convertToV2(event);
kafkaTemplate.send(v2Topic, v2Event);
metricsService.incrementCounter("schema.migration.dual_write");
}
// Phase 2: Consumer-Migration (Consumer lesen neue Version)
public void consumerMigrationPhase() {
// Consumer werden graduell auf V2-Topic umgestellt
// V1-Topic läuft parallel weiter
List<String> migratedConsumers = versionRegistry.getMigratedConsumers();
double migrationProgress = calculateMigrationProgress(migratedConsumers);
if (migrationProgress > 0.9) { // 90% migriert
logger.info("Ready for V1 topic deprecation");
}
}
// Phase 3: Cleanup (V1-Topic entfernen)
public void cleanupPhase() {
// Nur noch V2-Topic verwenden
// V1-Producer-Code entfernen
metricsService.incrementCounter("schema.migration.completed");
}
private OrderPlacedV1 convertToV1(OrderEvent event) {
// Reduzierung auf V1-Felder
return new OrderPlacedV1(
event.getEventId(),
event.getOrderId(),
event.getCustomerId(),
event.getItems(),
event.getTotalAmount(),
event.getPlacedAt()
);
}
}Migration-Timeline und Koordination:
| Phase | Dauer | Producer | Consumer | Topics | Risiko |
|---|---|---|---|---|---|
| Prep | 1 Woche | V1 | V1 | v1 | Niedrig |
| Dual-Write | 2 Wochen | V1+V2 | V1 | v1, v2 | Mittel |
| Consumer-Migration | 4 Wochen | V1+V2 | V1→V2 | v1, v2 | Mittel |
| Cleanup | 1 Woche | V2 | V2 | v2 | Niedrig |
Schema Evolution erfordert Planung, Disziplin und schrittweise Umsetzung. Forward- und Backward-Compatibility ermöglichen sichere Evolution, während systematische Migration-Strategien zero-downtime Upgrades gewährleisten. Die wichtigste Regel bleibt: Breaking Changes sind fast immer vermeidbar, wenn das Schema von Anfang an evolution-freundlich designed wird.