17 Serialisierung und Formatwahl

Die Serialisierung von Events bestimmt, wie Informationen zwischen Systemen übertragen werden. Die Wahl des Serialisierungsformats beeinflusst Performance, Verständlichkeit, Versionierbarkeit und Interoperabilität der gesamten Event-Architektur. Während technische Perfektion verlockend erscheint, entscheiden oft praktische Erwägungen über den langfristigen Erfolg.

17.1 Trade-offs verschiedener Serialisierungsformate

JSON (JavaScript Object Notation) ist das am weitesten verbreitete Format für Event-Serialisierung. Seine textbasierte Natur macht es menschenlesbar und debuggingfreundlich, führt aber zu größeren Nachrichten und langsamerer Verarbeitung.

// Spring Boot - JSON Event Serialisierung
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "eventType")
@JsonSubTypes({
        @JsonSubTypes.Type(value = OrderPlaced.class, name = "OrderPlaced"),
        @JsonSubTypes.Type(value = PaymentProcessed.class, name = "PaymentProcessed")
})
public abstract class BaseEvent {
    @JsonProperty("eventId")
    private String eventId;

    @JsonProperty("timestamp")
    private Instant timestamp;

    @JsonProperty("version")
    private String version;
}

public class OrderPlaced extends BaseEvent {
    @JsonProperty("orderId")
    private String orderId;

    @JsonProperty("customerId")
    private String customerId;

    @JsonProperty("items")
    private List<OrderItem> items;

    @JsonProperty("totalAmount")
    private BigDecimal totalAmount;

    // Konstruktor, Getter...
}
# Python - JSON Event mit dataclasses
import json
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Dict, Any
from decimal import Decimal

@dataclass
class OrderPlaced:
    event_id: str
    order_id: str
    customer_id: str
    items: List[Dict[str, Any]]
    total_amount: Decimal
    timestamp: datetime
    version: str = "v1"
    
    def to_json(self) -> str:
        """Serialisierung nach JSON"""
        data = asdict(self)
        data['total_amount'] = str(self.total_amount)
        data['timestamp'] = self.timestamp.isoformat()
        return json.dumps(data)
    
    @classmethod
    def from_json(cls, json_str: str) -> 'OrderPlaced':
        """Deserialisierung von JSON"""
        data = json.loads(json_str)
        data['total_amount'] = Decimal(data['total_amount'])
        data['timestamp'] = datetime.fromisoformat(data['timestamp'])
        return cls(**data)

Avro bietet Schema-Evolution und kompakte binäre Serialisierung. Events werden gegen ein Schema validiert, was Typsicherheit garantiert, aber Tooling-Komplexität erhöht.

// Spring Boot - Avro Event Definition (Schema-basiert)
// order-placed.avsc
{
        "namespace": "de.eda.training.events",
        "type": "record",
        "name": "OrderPlaced",
        "fields": [
        {"name": "eventId", "type": "string"},
        {"name": "orderId", "type": "string"},
        {"name": "customerId", "type": "string"},
        {"name": "totalAmount", "type": {"type": "bytes", "logicalType": "decimal"}},
        {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
        {"name": "items", "type": {"type": "array", "items": "OrderItem"}}
        ]
        }

// Generierte Java-Klasse verwenden
@Component
public class AvroEventProducer {

    public void publishOrderPlaced(Order order) {
        OrderPlaced event = OrderPlaced.newBuilder()
                .setEventId(UUID.randomUUID().toString())
                .setOrderId(order.getId())
                .setCustomerId(order.getCustomerId())
                .setTotalAmount(order.getTotalAmount().toPlainString())
                .setTimestamp(Instant.now().toEpochMilli())
                .setItems(convertItems(order.getItems()))
                .build();

        // Avro-Serialisierung erfolgt automatisch
        kafkaTemplate.send("order.events", event);
    }
}
# Python - Avro mit Schema Registry Integration
import avro.schema
import avro.io
import io
from confluent_kafka import avro

# Schema-Definition
schema_str = """
{
  "namespace": "eda_training.events",
  "type": "record", 
  "name": "OrderPlaced",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "total_amount", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "items", "type": {"type": "array", "items": "string"}}
  ]
}
"""

class AvroEventSerializer:
    def __init__(self, schema_str: str):
        self.schema = avro.schema.parse(schema_str)
        
    def serialize(self, event_data: dict) -> bytes:
        """Binäre Avro-Serialisierung"""
        writer = avro.io.DatumWriter(self.schema)
        bytes_writer = io.BytesIO()
        encoder = avro.io.BinaryEncoder(bytes_writer)
        writer.write(event_data, encoder)
        return bytes_writer.getvalue()

Protobuf (Protocol Buffers) kombiniert kompakte Serialisierung mit starker Typisierung. Die Interface Definition Language (IDL) ermöglicht sprachübergreifende Kompatibilität, erfordert aber Code-Generierung.

// Spring Boot - Protobuf Event Definition
// order_events.proto
syntax = "proto3";
        package de.eda.training.events;

message OrderPlaced {
string event_id = 1;
string order_id = 2;
string customer_id = 3;
string total_amount = 4;
int64 timestamp = 5;
repeated OrderItem items = 6;
        }

message OrderItem {
string product_id = 1;
int32 quantity = 2;
string unit_price = 3;
}

// Java-Implementierung mit generiertem Code
@Service
public class ProtobufEventService {

    public void publishOrderPlaced(Order order) {
        OrderEvents.OrderPlaced event = OrderEvents.OrderPlaced.newBuilder()
                .setEventId(UUID.randomUUID().toString())
                .setOrderId(order.getId())
                .setCustomerId(order.getCustomerId())
                .setTotalAmount(order.getTotalAmount().toPlainString())
                .setTimestamp(Instant.now().toEpochMilli())
                .addAllItems(convertToProtobufItems(order.getItems()))
                .build();

        // Protobuf-Serialisierung
        byte[] serialized = event.toByteArray();
        kafkaTemplate.send("order.events", serialized);
    }
}
# Python - Protobuf Event Handling
import order_events_pb2  # Generierte Python-Klasse
from google.protobuf.message import Message
import time

class ProtobufEventHandler:
    
    def create_order_placed_event(self, order_data: dict) -> bytes:
        """Erstellt und serialisiert Protobuf Event"""
        event = order_events_pb2.OrderPlaced()
        event.event_id = order_data['event_id']
        event.order_id = order_data['order_id'] 
        event.customer_id = order_data['customer_id']
        event.total_amount = str(order_data['total_amount'])
        event.timestamp = int(time.time() * 1000)
        
        for item in order_data['items']:
            proto_item = event.items.add()
            proto_item.product_id = item['product_id']
            proto_item.quantity = item['quantity']
            proto_item.unit_price = str(item['unit_price'])
            
        return event.SerializeToString()
    
    def parse_order_placed_event(self, data: bytes) -> dict:
        """Deserialisiert Protobuf Event"""
        event = order_events_pb2.OrderPlaced()
        event.ParseFromString(data)
        
        return {
            'event_id': event.event_id,
            'order_id': event.order_id,
            'customer_id': event.customer_id,
            'total_amount': event.total_amount,
            'timestamp': event.timestamp,
            'items': [
                {
                    'product_id': item.product_id,
                    'quantity': item.quantity,
                    'unit_price': item.unit_price
                }
                for item in event.items
            ]
        }
Format Kompaktheit Lesbarkeit Schema-Evolution Tooling-Aufwand Performance
JSON Niedrig Hoch Manuell Minimal Moderat
Avro Hoch Niedrig Automatisch Moderat Hoch
Protobuf Hoch Niedrig Eingeschränkt Hoch Sehr hoch

17.2 Performance vs. Verständlichkeit

Der Zielkonflikt zwischen Performance und Verständlichkeit manifestiert sich in verschiedenen Dimensionen der Systemarchitektur.

Verständlichkeit umfasst sowohl menschliche Lesbarkeit als auch Debugging-Freundlichkeit. JSON-Events können direkt in Logs analysiert, mit Standard-Tools inspiziert und von Entwicklern ohne spezielle Decoder verstanden werden.

// Spring Boot - Debugging-freundliche JSON-Events
@Component
public class DebuggingFriendlyEvents {

    private static final Logger logger = LoggerFactory.getLogger(DebuggingFriendlyEvents.class);

    public void logEventProcessing(String jsonEvent) {
        // JSON kann direkt geloggt und analysiert werden
        logger.info("Processing event: {}", jsonEvent);

        try {
            ObjectMapper mapper = new ObjectMapper();
            JsonNode eventNode = mapper.readTree(jsonEvent);

            // Einfache Feldextraktion ohne Schema
            String eventType = eventNode.get("eventType").asText();
            String orderId = eventNode.path("orderId").asText();

            logger.info("Event type: {}, Order ID: {}", eventType, orderId);

        } catch (Exception e) {
            logger.error("Event parsing failed - but JSON is still readable: {}", jsonEvent);
        }
    }
}

Performance betrifft Serialisierung/Deserialisierung, Netzwerkübertragung und Speicherverbrauch. Binäre Formate reduzieren sowohl Übertragungszeiten als auch Speicherbedarf erheblich.

# Python - Performance-Vergleich verschiedener Formate
import json
import time
from decimal import Decimal
import pickle

class PerformanceComparison:
    
    def __init__(self):
        self.test_event = {
            'event_id': 'evt_123456789',
            'order_id': 'ord_987654321', 
            'customer_id': 'cust_456789123',
            'total_amount': Decimal('299.99'),
            'timestamp': time.time(),
            'items': [
                {'product_id': f'prod_{i}', 'quantity': i, 'price': Decimal(f'{i}.99')}
                for i in range(1, 50)  # 49 Items für realistische Größe
            ]
        }
    
    def benchmark_json_serialization(self, iterations: int = 1000):
        """JSON Serialisierung Benchmark"""
        start_time = time.time()
        total_size = 0
        
        for _ in range(iterations):
            # JSON-Serialisierung (mit Decimal-Handling)
            json_data = json.dumps(self.test_event, default=str)
            total_size += len(json_data.encode('utf-8'))
            
        duration = time.time() - start_time
        avg_size = total_size / iterations
        
        return {
            'format': 'JSON',
            'duration_ms': duration * 1000,
            'avg_size_bytes': avg_size,
            'readable': True
        }
    
    def benchmark_binary_serialization(self, iterations: int = 1000):
        """Binäre Serialisierung Benchmark (pickle als Beispiel)"""
        start_time = time.time()
        total_size = 0
        
        for _ in range(iterations):
            # Binäre Serialisierung
            binary_data = pickle.dumps(self.test_event)
            total_size += len(binary_data)
            
        duration = time.time() - start_time
        avg_size = total_size / iterations
        
        return {
            'format': 'Binary',
            'duration_ms': duration * 1000,
            'avg_size_bytes': avg_size,
            'readable': False
        }

Die Entscheidung zwischen Performance und Verständlichkeit hängt von Systemanforderungen ab:

17.3 Tooling und Ecosystem-Considerations

Die Wahl des Serialisierungsformats beeinflusst das gesamte Entwicklungs- und Betriebsökosystem.

JSON-Ökosystem profitiert von universeller Tool-Unterstützung:

// Spring Boot - JSON-Tooling Integration
@Configuration
public class JsonEventConfiguration {

    @Bean
    public ObjectMapper eventObjectMapper() {
        return new ObjectMapper()
                .registerModule(new JavaTimeModule())
                .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @Bean
    public JsonSchemaGenerator schemaGenerator() {
        // Schema-Generierung für Dokumentation
        return new JsonSchemaGenerator(eventObjectMapper());
    }
}

@Component
public class JsonEventValidator {

    public boolean validateEvent(String jsonEvent, JsonSchema schema) {
        // JSON Schema Validation ohne Code-Generierung
        try {
            JsonNode eventNode = objectMapper.readTree(jsonEvent);
            Set<ValidationMessage> errors = schema.validate(eventNode);
            return errors.isEmpty();
        } catch (Exception e) {
            return false;
        }
    }
}

Schema-Registry-Integration für Avro und Protobuf:

# Python - Schema Registry Integration
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, AvroConsumer

class SchemaRegistryEventHandler:
    
    def __init__(self, schema_registry_url: str):
        self.schema_registry_conf = {
            'url': schema_registry_url
        }
        
        # Automatische Schema-Evolution
        self.producer = AvroProducer({
            'bootstrap.servers': 'localhost:9092',
            'schema.registry.url': schema_registry_url
        })
    
    def publish_with_schema_evolution(self, topic: str, event_data: dict, schema_version: str = "latest"):
        """Publiziert Events mit automatischer Schema-Verwaltung"""
        try:
            self.producer.produce(
                topic=topic,
                value=event_data,
                value_schema_id=schema_version
            )
            self.producer.flush()
        except Exception as e:
            # Schema-Evolution-Fehler behandeln
            self.handle_schema_compatibility_error(e, event_data)

Monitoring und Observability variieren nach Format:

// Spring Boot - Format-spezifisches Monitoring
@Component
public class EventFormatMetrics {

    private final MeterRegistry meterRegistry;
    private final Counter jsonEvents;
    private final Counter avroEvents;
    private final Timer serializationTime;

    public EventFormatMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.jsonEvents = Counter.builder("events.serialized")
                .tag("format", "json")
                .register(meterRegistry);
        this.avroEvents = Counter.builder("events.serialized")
                .tag("format", "avro")
                .register(meterRegistry);
        this.serializationTime = Timer.builder("events.serialization.time")
                .register(meterRegistry);
    }

    public void recordEventSerialization(String format, long duration) {
        Timer.Sample sample = Timer.start(meterRegistry);
        sample.stop(Timer.builder("events.serialization.time")
                .tag("format", format)
                .register(meterRegistry));

        switch (format) {
            case "json" -> jsonEvents.increment();
            case "avro" -> avroEvents.increment();
        }
    }
}

Entwicklungsworkflow-Unterschiede:

Entscheidungsmatrix für Format-Wahl:

# Python - Entscheidungshilfe für Serialisierungsformat
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List

class SerializationRequirement(Enum):
    HUMAN_READABLE = "human_readable"
    HIGH_PERFORMANCE = "high_performance" 
    SCHEMA_EVOLUTION = "schema_evolution"
    MINIMAL_TOOLING = "minimal_tooling"
    CROSS_LANGUAGE = "cross_language"
    DEBUGGING_FRIENDLY = "debugging_friendly"

@dataclass
class FormatRecommendation:
    format_name: str
    score: int
    pros: List[str]
    cons: List[str]

class SerializationFormatSelector:
    
    def __init__(self):
        self.format_scores = {
            'JSON': {
                SerializationRequirement.HUMAN_READABLE: 5,
                SerializationRequirement.HIGH_PERFORMANCE: 2,
                SerializationRequirement.SCHEMA_EVOLUTION: 2,
                SerializationRequirement.MINIMAL_TOOLING: 5,
                SerializationRequirement.CROSS_LANGUAGE: 5,
                SerializationRequirement.DEBUGGING_FRIENDLY: 5
            },
            'Avro': {
                SerializationRequirement.HUMAN_READABLE: 1,
                SerializationRequirement.HIGH_PERFORMANCE: 4,
                SerializationRequirement.SCHEMA_EVOLUTION: 5,
                SerializationRequirement.MINIMAL_TOOLING: 2,
                SerializationRequirement.CROSS_LANGUAGE: 4,
                SerializationRequirement.DEBUGGING_FRIENDLY: 2
            },
            'Protobuf': {
                SerializationRequirement.HUMAN_READABLE: 1,
                SerializationRequirement.HIGH_PERFORMANCE: 5,
                SerializationRequirement.SCHEMA_EVOLUTION: 3,
                SerializationRequirement.MINIMAL_TOOLING: 1,
                SerializationRequirement.CROSS_LANGUAGE: 5,
                SerializationRequirement.DEBUGGING_FRIENDLY: 1
            }
        }
    
    def recommend_format(self, requirements: List[SerializationRequirement]) -> FormatRecommendation:
        """Empfiehlt Serialisierungsformat basierend auf Anforderungen"""
        format_scores = {}
        
        for format_name, scores in self.format_scores.items():
            total_score = sum(scores[req] for req in requirements)
            format_scores[format_name] = total_score
        
        best_format = max(format_scores, key=format_scores.get)
        
        return FormatRecommendation(
            format_name=best_format,
            score=format_scores[best_format],
            pros=self._get_format_pros(best_format),
            cons=self._get_format_cons(best_format)
        )

Die Serialisierungsformat-Entscheidung sollte früh in der Architekturphase getroffen werden, da spätere Änderungen aufwändig sind. JSON bietet Entwicklungsgeschwindigkeit und Verständlichkeit, Avro ermöglicht robuste Schema-Evolution, Protobuf maximiert Performance und sprachübergreifende Konsistenz. Hybride Ansätze mit verschiedenen Formaten für verschiedene Event-Typen sind möglich, erhöhen aber die Systemkomplexität.