43 Systemgrenzen, Kontrakte und Kompatibilität

Event-getriebene Systeme müssen über Service- und Systemgrenzen hinweg kommunizieren, ohne tight gekoppelt zu sein. Event-Kontrakte definieren die Schnittstellen zwischen autonomen Services und ermöglichen Evolution ohne Breaking Changes. Die Herausforderung liegt darin, Flexibilität und Stabilität zu balancieren.

43.0.1 API Contracts für Events

Event-Kontrakte spezifizieren die Struktur, Semantik und Versioning-Regeln für Event-Nachrichten. Sie fungieren als APIs zwischen Event-Producern und -Consumern und ermöglichen lose Kopplung bei gleichzeitiger Vertragsklarheit.

43.0.1.1 Schema-basierte Event-Kontrakte

JSON Schema bietet eine pragmatische Basis für Event-Kontrakt-Definition:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "$id": "https://api.ecommerce.example/events/order-placed/v2",
  "title": "OrderPlaced Event",
  "description": "Emitted when a customer successfully places an order",
  "type": "object",
  "required": ["eventId", "eventType", "timestamp", "version", "data"],
  "properties": {
    "eventId": {
      "type": "string",
      "format": "uuid",
      "description": "Unique identifier for this event instance"
    },
    "eventType": {
      "type": "string",
      "const": "OrderPlaced",
      "description": "Fixed event type identifier"
    },
    "timestamp": {
      "type": "string",
      "format": "date-time",
      "description": "When the event occurred (ISO 8601)"
    },
    "version": {
      "type": "string",
      "const": "v2",
      "description": "Schema version for compatibility handling"
    },
    "data": {
      "type": "object",
      "required": ["orderId", "customerId", "items", "totalAmount"],
      "properties": {
        "orderId": {
          "type": "string",
          "format": "uuid"
        },
        "customerId": {
          "type": "string",
          "format": "uuid"
        },
        "items": {
          "type": "array",
          "minItems": 1,
          "items": {
            "$ref": "#/definitions/OrderItem"
          }
        },
        "totalAmount": {
          "type": "number",
          "minimum": 0,
          "multipleOf": 0.01
        },
        "currency": {
          "type": "string",
          "default": "EUR",
          "enum": ["EUR", "USD", "GBP"]
        },
        "shippingAddress": {
          "$ref": "#/definitions/Address"
        }
      }
    }
  },
  "definitions": {
    "OrderItem": {
      "type": "object",
      "required": ["productId", "quantity", "unitPrice"],
      "properties": {
        "productId": {"type": "string"},
        "productName": {"type": "string"},
        "quantity": {"type": "integer", "minimum": 1},
        "unitPrice": {"type": "number", "minimum": 0}
      }
    },
    "Address": {
      "type": "object",
      "required": ["street", "city", "postalCode", "country"],
      "properties": {
        "street": {"type": "string"},
        "city": {"type": "string"},
        "postalCode": {"type": "string"},
        "country": {"type": "string", "minLength": 2, "maxLength": 2}
      }
    }
  }
}

43.0.1.2 Contract-First Development

Event-Kontrakte werden vor der Implementierung definiert und als Single Source of Truth verwendet:

// Contract-generierte Event-Klassen
@JsonSchemaInject(
    value = "https://api.ecommerce.example/events/order-placed/v2",
    merge = false
)
@JsonPropertyOrder({"eventId", "eventType", "timestamp", "version", "data"})
public class OrderPlacedEvent {
    
    @JsonProperty(required = true)
    @Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$")
    private String eventId;
    
    @JsonProperty(required = true)
    @Pattern(regexp = "^OrderPlaced$")
    private String eventType = "OrderPlaced";
    
    @JsonProperty(required = true)
    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSX")
    private Instant timestamp;
    
    @JsonProperty(required = true)
    @Pattern(regexp = "^v2$")
    private String version = "v2";
    
    @JsonProperty(required = true)
    @Valid
    private OrderPlacedData data;
    
    // Contract-validierte Builder
    public static OrderPlacedEvent.Builder builder() {
        return new Builder();
    }
    
    public static class Builder {
        private String eventId = UUID.randomUUID().toString();
        private Instant timestamp = Instant.now();
        private OrderPlacedData data;
        
        public Builder data(OrderPlacedData data) {
            this.data = data;
            return this;
        }
        
        public OrderPlacedEvent build() {
            validateContract();
            return new OrderPlacedEvent(eventId, timestamp, data);
        }
        
        private void validateContract() {
            if (data == null) {
                throw new ContractViolationException("OrderPlaced event requires data");
            }
            // Weitere Contract-Validierungen...
        }
    }
}

43.0.1.3 Contract Testing und Validation

@Component
public class EventContractValidator {
    
    private final Map<String, JsonSchema> schemaRegistry = new ConcurrentHashMap<>();
    private final ObjectMapper objectMapper;
    
    @PostConstruct
    public void loadSchemas() {
        // Schemas aus Registry oder lokalen Dateien laden
        loadSchema("OrderPlaced", "v2", "/schemas/order-placed-v2.json");
        loadSchema("PaymentProcessed", "v1", "/schemas/payment-processed-v1.json");
    }
    
    public void validateEvent(Object event) throws EventContractException {
        String eventType = extractEventType(event);
        String version = extractVersion(event);
        
        JsonSchema schema = getSchema(eventType, version);
        if (schema == null) {
            throw new EventContractException(
                String.format("No schema found for event %s version %s", eventType, version)
            );
        }
        
        try {
            JsonNode eventJson = objectMapper.valueToTree(event);
            Set<ValidationMessage> violations = schema.validate(eventJson);
            
            if (!violations.isEmpty()) {
                String violationMessages = violations.stream()
                    .map(ValidationMessage::getMessage)
                    .collect(Collectors.joining(", "));
                    
                throw new EventContractException(
                    String.format("Event contract violation: %s", violationMessages)
                );
            }
        } catch (Exception e) {
            throw new EventContractException("Event validation failed", e);
        }
    }
    
    private JsonSchema getSchema(String eventType, String version) {
        return schemaRegistry.get(eventType + ":" + version);
    }
}

// Integration in Event-Publisher
@Service
public class ContractValidatedEventPublisher {
    
    private final EventContractValidator validator;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public void publishEvent(String topic, String key, Object event) {
        // 1. Contract-Validierung vor Publikation
        validator.validateEvent(event);
        
        // 2. Event publizieren
        kafkaTemplate.send(topic, key, event);
    }
}

43.0.1.4 Python Schema Validation

import jsonschema
import json
from typing import Dict, Any
from datetime import datetime

class EventContractValidator:
    def __init__(self):
        self.schemas: Dict[str, dict] = {}
        self._load_schemas()
    
    def _load_schemas(self):
        # Schema-Registry oder lokale Dateien laden
        with open('schemas/order-placed-v2.json', 'r') as f:
            self.schemas['OrderPlaced:v2'] = json.load(f)
    
    def validate_event(self, event: Dict[str, Any]):
        event_type = event.get('eventType')
        version = event.get('version')
        
        schema_key = f"{event_type}:{version}"
        schema = self.schemas.get(schema_key)
        
        if not schema:
            raise EventContractException(
                f"No schema found for {event_type} version {version}"
            )
        
        try:
            jsonschema.validate(event, schema)
        except jsonschema.ValidationError as e:
            raise EventContractException(f"Contract violation: {e.message}")

# Contract-first Event Builder
class OrderPlacedEventBuilder:
    def __init__(self):
        self.event = {
            'eventId': str(uuid.uuid4()),
            'eventType': 'OrderPlaced',
            'timestamp': datetime.utcnow().isoformat(),
            'version': 'v2',
            'data': {}
        }
        self.validator = EventContractValidator()
    
    def order_id(self, order_id: str):
        self.event['data']['orderId'] = order_id
        return self
    
    def customer_id(self, customer_id: str):
        self.event['data']['customerId'] = customer_id
        return self
    
    def items(self, items: list):
        self.event['data']['items'] = items
        return self
    
    def total_amount(self, amount: float):
        self.event['data']['totalAmount'] = amount
        return self
    
    def build(self) -> Dict[str, Any]:
        # Contract-Validierung vor Build
        self.validator.validate_event(self.event)
        return self.event.copy()

# Usage
event = (OrderPlacedEventBuilder()
    .order_id("123e4567-e89b-12d3-a456-426614174000")
    .customer_id("987fcdeb-51d2-43e8-b12a-0123456789ab")
    .items([{"productId": "P001", "quantity": 2, "unitPrice": 29.99}])
    .total_amount(59.98)
    .build())

43.0.2 Cross-system Event Design

Event-Design für systemübergreifende Kommunikation erfordert besondere Aufmerksamkeit für Domänengrenzen, Datenabhängigkeiten und Service-Autonomie.

43.0.2.1 Bounded Context Event Mapping

Events müssen Domänengrenzen respektieren und Bounded Context-spezifische Begriffe verwenden:

// Order-Service Events (Order Bounded Context)
public class OrderPlacedEvent {
    private String orderId;
    private String customerId;  // Customer-ID Referenz, keine Customer-Details
    private List<OrderLineItem> items;
    private OrderAmount totalAmount;
    private OrderStatus status;
}

// Customer-Service Events (Customer Bounded Context)  
public class CustomerRegisteredEvent {
    private String customerId;
    private CustomerProfile profile;
    private CustomerPreferences preferences;
    // Keine Order-spezifischen Daten
}

// Payment-Service Events (Payment Bounded Context)
public class PaymentProcessedEvent {
    private String paymentId;
    private String orderId;     // Referenz zum Order-Context
    private PaymentAmount amount;
    private PaymentMethod method;
    private PaymentStatus status;
    // Keine Customer- oder Inventory-Details
}

43.0.2.2 Anti-Corruption Layer für Event-Transformation

@Component
public class CrossSystemEventAdapter {
    
    // Order-Context → Inventory-Context Transformation
    public InventoryReservationRequestedEvent adaptOrderToInventory(OrderPlacedEvent orderEvent) {
        List<InventoryItem> inventoryItems = orderEvent.getItems().stream()
            .map(orderItem -> InventoryItem.builder()
                .productId(orderItem.getProductId())
                .quantity(orderItem.getQuantity())
                .reservationContext("ORDER")
                .build())
            .collect(Collectors.toList());
        
        return InventoryReservationRequestedEvent.builder()
            .reservationId(UUID.randomUUID().toString())
            .requestingContext("ORDER_SERVICE")
            .externalReferenceId(orderEvent.getOrderId())
            .items(inventoryItems)
            .reservationExpiry(Instant.now().plus(Duration.ofMinutes(15)))
            .build();
    }
    
    // Payment-Context → Order-Context Transformation
    public OrderPaymentConfirmedEvent adaptPaymentToOrder(PaymentProcessedEvent paymentEvent) {
        return OrderPaymentConfirmedEvent.builder()
            .orderId(paymentEvent.getOrderId())  // Cross-Context-Referenz
            .paymentReference(paymentEvent.getPaymentId())
            .confirmedAmount(paymentEvent.getAmount().getValue())
            .currency(paymentEvent.getAmount().getCurrency())
            .paymentMethod(translatePaymentMethod(paymentEvent.getMethod()))
            .confirmationTimestamp(paymentEvent.getProcessedAt())
            .build();
    }
    
    private OrderPaymentMethod translatePaymentMethod(PaymentMethod paymentMethod) {
        // Context-spezifische Enum-Transformation
        return switch (paymentMethod) {
            case CREDIT_CARD -> OrderPaymentMethod.CARD;
            case BANK_TRANSFER -> OrderPaymentMethod.TRANSFER;
            case DIGITAL_WALLET -> OrderPaymentMethod.DIGITAL;
            default -> OrderPaymentMethod.OTHER;
        };
    }
}

43.0.2.3 Event-Enrichment Patterns

@Service
public class EventEnrichmentService {
    
    private final CustomerRepository customerRepository;
    private final ProductRepository productRepository;
    
    @KafkaListener(topics = "order.placed.v1")
    public void enrichOrderPlacedEvent(OrderPlacedEvent event) {
        try {
            EnrichedOrderPlacedEvent enrichedEvent = enrichOrderEvent(event);
            publishEnrichedEvent(enrichedEvent);
            
        } catch (Exception e) {
            log.warn("Failed to enrich order event {}: {}", event.getOrderId(), e.getMessage());
            // Fallback: Original Event weiterleiten
            republishOriginalEvent(event);
        }
    }
    
    private EnrichedOrderPlacedEvent enrichOrderEvent(OrderPlacedEvent original) {
        // Customer-Information anreichern (Optional)
        Optional<CustomerSummary> customer = customerRepository
            .findSummaryById(original.getCustomerId());
        
        // Product-Information anreichern
        List<EnrichedOrderItem> enrichedItems = original.getItems().stream()
            .map(this::enrichOrderItem)
            .collect(Collectors.toList());
        
        return EnrichedOrderPlacedEvent.builder()
            .originalEvent(original)
            .customerSummary(customer.orElse(null))
            .enrichedItems(enrichedItems)
            .enrichmentTimestamp(Instant.now())
            .build();
    }
    
    private EnrichedOrderItem enrichOrderItem(OrderItem item) {
        Optional<ProductInfo> productInfo = productRepository
            .findProductInfoById(item.getProductId());
        
        return EnrichedOrderItem.builder()
            .originalItem(item)
            .productName(productInfo.map(ProductInfo::getName).orElse("Unknown"))
            .category(productInfo.map(ProductInfo::getCategory).orElse("Unknown"))
            .weight(productInfo.map(ProductInfo::getWeight).orElse(null))
            .build();
    }
}

43.0.3 Backwards Compatibility

Event-Schema-Evolution muss Backwards Compatibility gewährleisten, damit Consumer mit unterschiedlichen Schema-Versionen koexistieren können.

43.0.3.1 Schema Evolution Strategies

Additive Changes (Safe):

// v1  v2: Neues optionales Feld hinzufügen
{
  "data": {
    "orderId": "123",
    "customerId": "456",
    "items": [...],
    "totalAmount": 99.99,
    "currency": "EUR"  // NEU in v2, mit Default-Wert
  }
}

Expanding Enums (Safe):

// v1 Enum
public enum OrderStatus {
    PLACED, CONFIRMED, SHIPPED, DELIVERED, CANCELLED
}

// v2 Enum (additive)
public enum OrderStatus {
    PLACED, CONFIRMED, SHIPPED, DELIVERED, CANCELLED,
    PREPARING,     // NEU
    OUT_FOR_DELIVERY  // NEU
}

Breaking Changes (Dangerous):

// v1  v2: BREAKING - Feld umbenannt
{
  "data": {
    "orderId": "123",
    // "customerId": "456",  // ENTFERNT
    "customer": {            // NEU - Breaking Change
      "id": "456",
      "name": "John Doe"
    }
  }
}

43.0.3.2 Versioned Event Handler

@Component
public class VersionedOrderEventHandler {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlacedV1(OrderPlacedEventV1 event) {
        // Legacy v1 Handler
        processOrderV1(event);
    }
    
    @KafkaListener(topics = "order.placed.v2")  
    public void handleOrderPlacedV2(OrderPlacedEventV2 event) {
        // Current v2 Handler
        processOrderV2(event);
    }
    
    // Parallel Support während Migration
    @KafkaListener(topics = "order.placed.v1")
    public void migrateV1ToV2(OrderPlacedEventV1 eventV1) {
        if (shouldMigrateToV2(eventV1)) {
            OrderPlacedEventV2 eventV2 = transformV1ToV2(eventV1);
            republishAsV2(eventV2);
        }
    }
    
    private OrderPlacedEventV2 transformV1ToV2(OrderPlacedEventV1 v1Event) {
        return OrderPlacedEventV2.builder()
            .orderId(v1Event.getOrderId())
            .customerId(v1Event.getCustomerId())
            .items(v1Event.getItems())
            .totalAmount(v1Event.getTotalAmount())
            .currency("EUR")  // Default-Wert für neues Feld
            .version("v2")
            .build();
    }
}

43.0.3.3 Schema Registry Integration

@Configuration
public class SchemaRegistryConfiguration {
    
    @Bean
    public SchemaRegistryClient schemaRegistryClient() {
        return new CachedSchemaRegistryClient(
            "http://schema-registry:8081", 
            100
        );
    }
    
    @Bean
    public KafkaAvroSerializer avroSerializer() {
        return new KafkaAvroSerializer(schemaRegistryClient());
    }
    
    @Bean
    public KafkaAvroDeserializer avroDeserializer() {
        KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistryClient());
        deserializer.configure(
            Map.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true),
            false
        );
        return deserializer;
    }
}

// Avro Schema Evolution Support
@Service
public class AvroEventService {
    
    private final SchemaRegistryClient schemaRegistry;
    private final KafkaTemplate<String, SpecificRecord> avroTemplate;
    
    public void publishEventWithSchema(String topic, SpecificRecord event) {
        try {
            // Schema automatisch aus Avro-Record extrahiert
            avroTemplate.send(topic, event);
            
        } catch (SerializationException e) {
            log.error("Schema compatibility issue: {}", e.getMessage());
            throw new EventPublishingException("Schema incompatible", e);
        }
    }
    
    @KafkaListener(topics = "order.placed")
    public void handleOrderPlaced(
            @Payload OrderPlacedAvro event,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        
        // Avro unterstützt automatische Schema-Evolution
        String orderId = event.getOrderId().toString();
        
        // Neue Felder mit Default-Handling
        String currency = event.getCurrency() != null ? 
            event.getCurrency().toString() : "EUR";
        
        processOrder(orderId, currency);
    }
}

43.0.3.4 Python Schema Evolution

from typing import Dict, Any, Optional
import json

class VersionCompatibilityHandler:
    def __init__(self):
        self.version_transformers = {
            ('v1', 'v2'): self._transform_v1_to_v2,
            ('v2', 'v3'): self._transform_v2_to_v3
        }
    
    def handle_event(self, event: Dict[str, Any], target_version: str = 'v2'):
        current_version = event.get('version', 'v1')
        
        if current_version == target_version:
            return event
        
        # Multi-step transformation falls nötig
        transformed_event = event
        transformation_path = self._find_transformation_path(current_version, target_version)
        
        for from_version, to_version in transformation_path:
            transformer = self.version_transformers.get((from_version, to_version))
            if transformer:
                transformed_event = transformer(transformed_event)
            else:
                raise ValueError(f"No transformer for {from_version}{to_version}")
        
        return transformed_event
    
    def _transform_v1_to_v2(self, v1_event: Dict[str, Any]) -> Dict[str, Any]:
        v2_event = v1_event.copy()
        v2_event['version'] = 'v2'
        
        # Neues optionales Feld mit Default
        if 'currency' not in v2_event['data']:
            v2_event['data']['currency'] = 'EUR'
        
        return v2_event
    
    def _transform_v2_to_v3(self, v2_event: Dict[str, Any]) -> Dict[str, Any]:
        v3_event = v2_event.copy()
        v3_event['version'] = 'v3'
        
        # Strukturelle Änderung: Items → LineItems
        if 'items' in v2_event['data']:
            v3_event['data']['lineItems'] = v2_event['data']['items']
            del v3_event['data']['items']
        
        return v3_event

# Consumer mit automatischer Version-Behandlung  
class VersionAwareEventConsumer:
    def __init__(self, compatibility_handler):
        self.compatibility_handler = compatibility_handler
    
    async def consume_order_placed(self, event: Dict[str, Any]):
        # Event auf aktuelle Version normalisieren
        normalized_event = self.compatibility_handler.handle_event(event, 'v2')
        
        # Business-Logic mit normalisiertem Event
        await self._process_order_v2(normalized_event)
    
    async def _process_order_v2(self, event: Dict[str, Any]):
        data = event['data']
        currency = data.get('currency', 'EUR')  # Safe access mit Default
        
        print(f"Processing order {data['orderId']} in {currency}")

43.0.4 Kompatibilitäts-Matrix

Schema Change Backwards Compatible Consumer Impact Migration Strategy
Add Optional Field ✅ Yes None Direct deployment
Add Enum Value ✅ Yes Graceful degradation Default handling
Remove Optional Field ✅ Yes None (if unused) Deprecation period
Rename Field ❌ No Breaking Parallel versioning
Change Field Type ❌ No Breaking New event version
Remove Required Field ❌ No Breaking Migration pipeline

Die Kombination aus Schema-Definition, Contract Testing und evolutionären Transformations-Strategien ermöglicht es, Event-getriebene Systeme über Systemgrenzen hinweg zu entwickeln, ohne dabei Flexibilität oder Stabilität zu opfern.