Event-getriebene Systeme müssen über Service- und Systemgrenzen hinweg kommunizieren, ohne tight gekoppelt zu sein. Event-Kontrakte definieren die Schnittstellen zwischen autonomen Services und ermöglichen Evolution ohne Breaking Changes. Die Herausforderung liegt darin, Flexibilität und Stabilität zu balancieren.
Event-Kontrakte spezifizieren die Struktur, Semantik und Versioning-Regeln für Event-Nachrichten. Sie fungieren als APIs zwischen Event-Producern und -Consumern und ermöglichen lose Kopplung bei gleichzeitiger Vertragsklarheit.
JSON Schema bietet eine pragmatische Basis für Event-Kontrakt-Definition:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://api.ecommerce.example/events/order-placed/v2",
"title": "OrderPlaced Event",
"description": "Emitted when a customer successfully places an order",
"type": "object",
"required": ["eventId", "eventType", "timestamp", "version", "data"],
"properties": {
"eventId": {
"type": "string",
"format": "uuid",
"description": "Unique identifier for this event instance"
},
"eventType": {
"type": "string",
"const": "OrderPlaced",
"description": "Fixed event type identifier"
},
"timestamp": {
"type": "string",
"format": "date-time",
"description": "When the event occurred (ISO 8601)"
},
"version": {
"type": "string",
"const": "v2",
"description": "Schema version for compatibility handling"
},
"data": {
"type": "object",
"required": ["orderId", "customerId", "items", "totalAmount"],
"properties": {
"orderId": {
"type": "string",
"format": "uuid"
},
"customerId": {
"type": "string",
"format": "uuid"
},
"items": {
"type": "array",
"minItems": 1,
"items": {
"$ref": "#/definitions/OrderItem"
}
},
"totalAmount": {
"type": "number",
"minimum": 0,
"multipleOf": 0.01
},
"currency": {
"type": "string",
"default": "EUR",
"enum": ["EUR", "USD", "GBP"]
},
"shippingAddress": {
"$ref": "#/definitions/Address"
}
}
}
},
"definitions": {
"OrderItem": {
"type": "object",
"required": ["productId", "quantity", "unitPrice"],
"properties": {
"productId": {"type": "string"},
"productName": {"type": "string"},
"quantity": {"type": "integer", "minimum": 1},
"unitPrice": {"type": "number", "minimum": 0}
}
},
"Address": {
"type": "object",
"required": ["street", "city", "postalCode", "country"],
"properties": {
"street": {"type": "string"},
"city": {"type": "string"},
"postalCode": {"type": "string"},
"country": {"type": "string", "minLength": 2, "maxLength": 2}
}
}
}
}Event-Kontrakte werden vor der Implementierung definiert und als Single Source of Truth verwendet:
// Contract-generierte Event-Klassen
@JsonSchemaInject(
value = "https://api.ecommerce.example/events/order-placed/v2",
merge = false
)
@JsonPropertyOrder({"eventId", "eventType", "timestamp", "version", "data"})
public class OrderPlacedEvent {
@JsonProperty(required = true)
@Pattern(regexp = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$")
private String eventId;
@JsonProperty(required = true)
@Pattern(regexp = "^OrderPlaced$")
private String eventType = "OrderPlaced";
@JsonProperty(required = true)
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSX")
private Instant timestamp;
@JsonProperty(required = true)
@Pattern(regexp = "^v2$")
private String version = "v2";
@JsonProperty(required = true)
@Valid
private OrderPlacedData data;
// Contract-validierte Builder
public static OrderPlacedEvent.Builder builder() {
return new Builder();
}
public static class Builder {
private String eventId = UUID.randomUUID().toString();
private Instant timestamp = Instant.now();
private OrderPlacedData data;
public Builder data(OrderPlacedData data) {
this.data = data;
return this;
}
public OrderPlacedEvent build() {
validateContract();
return new OrderPlacedEvent(eventId, timestamp, data);
}
private void validateContract() {
if (data == null) {
throw new ContractViolationException("OrderPlaced event requires data");
}
// Weitere Contract-Validierungen...
}
}
}@Component
public class EventContractValidator {
private final Map<String, JsonSchema> schemaRegistry = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper;
@PostConstruct
public void loadSchemas() {
// Schemas aus Registry oder lokalen Dateien laden
loadSchema("OrderPlaced", "v2", "/schemas/order-placed-v2.json");
loadSchema("PaymentProcessed", "v1", "/schemas/payment-processed-v1.json");
}
public void validateEvent(Object event) throws EventContractException {
String eventType = extractEventType(event);
String version = extractVersion(event);
JsonSchema schema = getSchema(eventType, version);
if (schema == null) {
throw new EventContractException(
String.format("No schema found for event %s version %s", eventType, version)
);
}
try {
JsonNode eventJson = objectMapper.valueToTree(event);
Set<ValidationMessage> violations = schema.validate(eventJson);
if (!violations.isEmpty()) {
String violationMessages = violations.stream()
.map(ValidationMessage::getMessage)
.collect(Collectors.joining(", "));
throw new EventContractException(
String.format("Event contract violation: %s", violationMessages)
);
}
} catch (Exception e) {
throw new EventContractException("Event validation failed", e);
}
}
private JsonSchema getSchema(String eventType, String version) {
return schemaRegistry.get(eventType + ":" + version);
}
}
// Integration in Event-Publisher
@Service
public class ContractValidatedEventPublisher {
private final EventContractValidator validator;
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publishEvent(String topic, String key, Object event) {
// 1. Contract-Validierung vor Publikation
validator.validateEvent(event);
// 2. Event publizieren
kafkaTemplate.send(topic, key, event);
}
}import jsonschema
import json
from typing import Dict, Any
from datetime import datetime
class EventContractValidator:
def __init__(self):
self.schemas: Dict[str, dict] = {}
self._load_schemas()
def _load_schemas(self):
# Schema-Registry oder lokale Dateien laden
with open('schemas/order-placed-v2.json', 'r') as f:
self.schemas['OrderPlaced:v2'] = json.load(f)
def validate_event(self, event: Dict[str, Any]):
event_type = event.get('eventType')
version = event.get('version')
schema_key = f"{event_type}:{version}"
schema = self.schemas.get(schema_key)
if not schema:
raise EventContractException(
f"No schema found for {event_type} version {version}"
)
try:
jsonschema.validate(event, schema)
except jsonschema.ValidationError as e:
raise EventContractException(f"Contract violation: {e.message}")
# Contract-first Event Builder
class OrderPlacedEventBuilder:
def __init__(self):
self.event = {
'eventId': str(uuid.uuid4()),
'eventType': 'OrderPlaced',
'timestamp': datetime.utcnow().isoformat(),
'version': 'v2',
'data': {}
}
self.validator = EventContractValidator()
def order_id(self, order_id: str):
self.event['data']['orderId'] = order_id
return self
def customer_id(self, customer_id: str):
self.event['data']['customerId'] = customer_id
return self
def items(self, items: list):
self.event['data']['items'] = items
return self
def total_amount(self, amount: float):
self.event['data']['totalAmount'] = amount
return self
def build(self) -> Dict[str, Any]:
# Contract-Validierung vor Build
self.validator.validate_event(self.event)
return self.event.copy()
# Usage
event = (OrderPlacedEventBuilder()
.order_id("123e4567-e89b-12d3-a456-426614174000")
.customer_id("987fcdeb-51d2-43e8-b12a-0123456789ab")
.items([{"productId": "P001", "quantity": 2, "unitPrice": 29.99}])
.total_amount(59.98)
.build())Event-Design für systemübergreifende Kommunikation erfordert besondere Aufmerksamkeit für Domänengrenzen, Datenabhängigkeiten und Service-Autonomie.
Events müssen Domänengrenzen respektieren und Bounded Context-spezifische Begriffe verwenden:
// Order-Service Events (Order Bounded Context)
public class OrderPlacedEvent {
private String orderId;
private String customerId; // Customer-ID Referenz, keine Customer-Details
private List<OrderLineItem> items;
private OrderAmount totalAmount;
private OrderStatus status;
}
// Customer-Service Events (Customer Bounded Context)
public class CustomerRegisteredEvent {
private String customerId;
private CustomerProfile profile;
private CustomerPreferences preferences;
// Keine Order-spezifischen Daten
}
// Payment-Service Events (Payment Bounded Context)
public class PaymentProcessedEvent {
private String paymentId;
private String orderId; // Referenz zum Order-Context
private PaymentAmount amount;
private PaymentMethod method;
private PaymentStatus status;
// Keine Customer- oder Inventory-Details
}@Component
public class CrossSystemEventAdapter {
// Order-Context → Inventory-Context Transformation
public InventoryReservationRequestedEvent adaptOrderToInventory(OrderPlacedEvent orderEvent) {
List<InventoryItem> inventoryItems = orderEvent.getItems().stream()
.map(orderItem -> InventoryItem.builder()
.productId(orderItem.getProductId())
.quantity(orderItem.getQuantity())
.reservationContext("ORDER")
.build())
.collect(Collectors.toList());
return InventoryReservationRequestedEvent.builder()
.reservationId(UUID.randomUUID().toString())
.requestingContext("ORDER_SERVICE")
.externalReferenceId(orderEvent.getOrderId())
.items(inventoryItems)
.reservationExpiry(Instant.now().plus(Duration.ofMinutes(15)))
.build();
}
// Payment-Context → Order-Context Transformation
public OrderPaymentConfirmedEvent adaptPaymentToOrder(PaymentProcessedEvent paymentEvent) {
return OrderPaymentConfirmedEvent.builder()
.orderId(paymentEvent.getOrderId()) // Cross-Context-Referenz
.paymentReference(paymentEvent.getPaymentId())
.confirmedAmount(paymentEvent.getAmount().getValue())
.currency(paymentEvent.getAmount().getCurrency())
.paymentMethod(translatePaymentMethod(paymentEvent.getMethod()))
.confirmationTimestamp(paymentEvent.getProcessedAt())
.build();
}
private OrderPaymentMethod translatePaymentMethod(PaymentMethod paymentMethod) {
// Context-spezifische Enum-Transformation
return switch (paymentMethod) {
case CREDIT_CARD -> OrderPaymentMethod.CARD;
case BANK_TRANSFER -> OrderPaymentMethod.TRANSFER;
case DIGITAL_WALLET -> OrderPaymentMethod.DIGITAL;
default -> OrderPaymentMethod.OTHER;
};
}
}@Service
public class EventEnrichmentService {
private final CustomerRepository customerRepository;
private final ProductRepository productRepository;
@KafkaListener(topics = "order.placed.v1")
public void enrichOrderPlacedEvent(OrderPlacedEvent event) {
try {
EnrichedOrderPlacedEvent enrichedEvent = enrichOrderEvent(event);
publishEnrichedEvent(enrichedEvent);
} catch (Exception e) {
log.warn("Failed to enrich order event {}: {}", event.getOrderId(), e.getMessage());
// Fallback: Original Event weiterleiten
republishOriginalEvent(event);
}
}
private EnrichedOrderPlacedEvent enrichOrderEvent(OrderPlacedEvent original) {
// Customer-Information anreichern (Optional)
Optional<CustomerSummary> customer = customerRepository
.findSummaryById(original.getCustomerId());
// Product-Information anreichern
List<EnrichedOrderItem> enrichedItems = original.getItems().stream()
.map(this::enrichOrderItem)
.collect(Collectors.toList());
return EnrichedOrderPlacedEvent.builder()
.originalEvent(original)
.customerSummary(customer.orElse(null))
.enrichedItems(enrichedItems)
.enrichmentTimestamp(Instant.now())
.build();
}
private EnrichedOrderItem enrichOrderItem(OrderItem item) {
Optional<ProductInfo> productInfo = productRepository
.findProductInfoById(item.getProductId());
return EnrichedOrderItem.builder()
.originalItem(item)
.productName(productInfo.map(ProductInfo::getName).orElse("Unknown"))
.category(productInfo.map(ProductInfo::getCategory).orElse("Unknown"))
.weight(productInfo.map(ProductInfo::getWeight).orElse(null))
.build();
}
}Event-Schema-Evolution muss Backwards Compatibility gewährleisten, damit Consumer mit unterschiedlichen Schema-Versionen koexistieren können.
Additive Changes (Safe):
// v1 → v2: Neues optionales Feld hinzufügen
{
"data": {
"orderId": "123",
"customerId": "456",
"items": [...],
"totalAmount": 99.99,
"currency": "EUR" // NEU in v2, mit Default-Wert
}
}Expanding Enums (Safe):
// v1 Enum
public enum OrderStatus {
PLACED, CONFIRMED, SHIPPED, DELIVERED, CANCELLED
}
// v2 Enum (additive)
public enum OrderStatus {
PLACED, CONFIRMED, SHIPPED, DELIVERED, CANCELLED,
PREPARING, // NEU
OUT_FOR_DELIVERY // NEU
}Breaking Changes (Dangerous):
// v1 → v2: BREAKING - Feld umbenannt
{
"data": {
"orderId": "123",
// "customerId": "456", // ENTFERNT
"customer": { // NEU - Breaking Change
"id": "456",
"name": "John Doe"
}
}
}@Component
public class VersionedOrderEventHandler {
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlacedV1(OrderPlacedEventV1 event) {
// Legacy v1 Handler
processOrderV1(event);
}
@KafkaListener(topics = "order.placed.v2")
public void handleOrderPlacedV2(OrderPlacedEventV2 event) {
// Current v2 Handler
processOrderV2(event);
}
// Parallel Support während Migration
@KafkaListener(topics = "order.placed.v1")
public void migrateV1ToV2(OrderPlacedEventV1 eventV1) {
if (shouldMigrateToV2(eventV1)) {
OrderPlacedEventV2 eventV2 = transformV1ToV2(eventV1);
republishAsV2(eventV2);
}
}
private OrderPlacedEventV2 transformV1ToV2(OrderPlacedEventV1 v1Event) {
return OrderPlacedEventV2.builder()
.orderId(v1Event.getOrderId())
.customerId(v1Event.getCustomerId())
.items(v1Event.getItems())
.totalAmount(v1Event.getTotalAmount())
.currency("EUR") // Default-Wert für neues Feld
.version("v2")
.build();
}
}@Configuration
public class SchemaRegistryConfiguration {
@Bean
public SchemaRegistryClient schemaRegistryClient() {
return new CachedSchemaRegistryClient(
"http://schema-registry:8081",
100
);
}
@Bean
public KafkaAvroSerializer avroSerializer() {
return new KafkaAvroSerializer(schemaRegistryClient());
}
@Bean
public KafkaAvroDeserializer avroDeserializer() {
KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistryClient());
deserializer.configure(
Map.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true),
false
);
return deserializer;
}
}
// Avro Schema Evolution Support
@Service
public class AvroEventService {
private final SchemaRegistryClient schemaRegistry;
private final KafkaTemplate<String, SpecificRecord> avroTemplate;
public void publishEventWithSchema(String topic, SpecificRecord event) {
try {
// Schema automatisch aus Avro-Record extrahiert
avroTemplate.send(topic, event);
} catch (SerializationException e) {
log.error("Schema compatibility issue: {}", e.getMessage());
throw new EventPublishingException("Schema incompatible", e);
}
}
@KafkaListener(topics = "order.placed")
public void handleOrderPlaced(
@Payload OrderPlacedAvro event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// Avro unterstützt automatische Schema-Evolution
String orderId = event.getOrderId().toString();
// Neue Felder mit Default-Handling
String currency = event.getCurrency() != null ?
event.getCurrency().toString() : "EUR";
processOrder(orderId, currency);
}
}from typing import Dict, Any, Optional
import json
class VersionCompatibilityHandler:
def __init__(self):
self.version_transformers = {
('v1', 'v2'): self._transform_v1_to_v2,
('v2', 'v3'): self._transform_v2_to_v3
}
def handle_event(self, event: Dict[str, Any], target_version: str = 'v2'):
current_version = event.get('version', 'v1')
if current_version == target_version:
return event
# Multi-step transformation falls nötig
transformed_event = event
transformation_path = self._find_transformation_path(current_version, target_version)
for from_version, to_version in transformation_path:
transformer = self.version_transformers.get((from_version, to_version))
if transformer:
transformed_event = transformer(transformed_event)
else:
raise ValueError(f"No transformer for {from_version} → {to_version}")
return transformed_event
def _transform_v1_to_v2(self, v1_event: Dict[str, Any]) -> Dict[str, Any]:
v2_event = v1_event.copy()
v2_event['version'] = 'v2'
# Neues optionales Feld mit Default
if 'currency' not in v2_event['data']:
v2_event['data']['currency'] = 'EUR'
return v2_event
def _transform_v2_to_v3(self, v2_event: Dict[str, Any]) -> Dict[str, Any]:
v3_event = v2_event.copy()
v3_event['version'] = 'v3'
# Strukturelle Änderung: Items → LineItems
if 'items' in v2_event['data']:
v3_event['data']['lineItems'] = v2_event['data']['items']
del v3_event['data']['items']
return v3_event
# Consumer mit automatischer Version-Behandlung
class VersionAwareEventConsumer:
def __init__(self, compatibility_handler):
self.compatibility_handler = compatibility_handler
async def consume_order_placed(self, event: Dict[str, Any]):
# Event auf aktuelle Version normalisieren
normalized_event = self.compatibility_handler.handle_event(event, 'v2')
# Business-Logic mit normalisiertem Event
await self._process_order_v2(normalized_event)
async def _process_order_v2(self, event: Dict[str, Any]):
data = event['data']
currency = data.get('currency', 'EUR') # Safe access mit Default
print(f"Processing order {data['orderId']} in {currency}")| Schema Change | Backwards Compatible | Consumer Impact | Migration Strategy |
|---|---|---|---|
| Add Optional Field | ✅ Yes | None | Direct deployment |
| Add Enum Value | ✅ Yes | Graceful degradation | Default handling |
| Remove Optional Field | ✅ Yes | None (if unused) | Deprecation period |
| Rename Field | ❌ No | Breaking | Parallel versioning |
| Change Field Type | ❌ No | Breaking | New event version |
| Remove Required Field | ❌ No | Breaking | Migration pipeline |
Die Kombination aus Schema-Definition, Contract Testing und evolutionären Transformations-Strategien ermöglicht es, Event-getriebene Systeme über Systemgrenzen hinweg zu entwickeln, ohne dabei Flexibilität oder Stabilität zu opfern.