24 Serialisierung, Transaktionen und idempotente Produktion

Robuste Event-Driven Systeme erfordern zuverlässige Serialisierung, transaktionale Garantien und idempotente Event-Produktion. Diese technischen Aspekte sind entscheidend für Datenintegrität, System-Resilienz und korrekte Geschäftslogik in verteilten Umgebungen.

24.1 Serialisierung von Events

Event-Serialisierung wandelt strukturierte Daten in übertragbare Byte-Streams um. Die Wahl des Serialisierungsformats beeinflusst Performance, Schema-Evolution und Interoperabilität zwischen Services.

24.1.1 JSON Serialisierung

JSON ist das am weitesten verbreitete Format für Event-Serialisierung. Es bietet gute Lesbarkeit und breite Tool-Unterstützung, hat aber Nachteile bei Performance und Schema-Evolution.

Spring Boot JSON Serialisierung:

// Custom JSON Serializer für Events
@Component
public class EventJsonSerializer {
    
    private final ObjectMapper objectMapper;
    
    public EventJsonSerializer() {
        this.objectMapper = createOptimizedObjectMapper();
    }
    
    private ObjectMapper createOptimizedObjectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        
        // Performance Optimierungen
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        mapper.registerModule(new JavaTimeModule());
        
        // Event-spezifische Mixins für saubere Serialisierung
        mapper.addMixIn(OrderPlacedEvent.class, OrderEventMixin.class);
        
        return mapper;
    }
    
    public byte[] serialize(String topic, Object event) {
        try {
            // Event Envelope mit Metadata
            EventEnvelope envelope = EventEnvelope.builder()
                .eventId(UUID.randomUUID().toString())
                .eventType(event.getClass().getSimpleName())
                .timestamp(Instant.now())
                .version("v1")
                .correlationId(getCorrelationId())
                .data(event)
                .build();
            
            return objectMapper.writeValueAsBytes(envelope);
            
        } catch (JsonProcessingException e) {
            throw new EventSerializationException("Failed to serialize event", e);
        }
    }
    
    @JsonIgnoreProperties(ignoreUnknown = true)
    public abstract static class OrderEventMixin {
        @JsonProperty("orderId")
        abstract String getOrderId();
        
        @JsonProperty("customerId") 
        abstract String getCustomerId();
        
        @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
        abstract Instant getTimestamp();
    }
    
    private String getCorrelationId() {
        return MDC.get("correlationId");
    }
}

// Event Envelope für konsistente Struktur
@Data
@Builder
public class EventEnvelope {
    private String eventId;
    private String eventType;
    private Instant timestamp;
    private String version;
    private String correlationId;
    private Object data;
    
    // Schema Evolution Support
    @JsonProperty("schemaVersion")
    private String schemaVersion = "1.0";
    
    @JsonProperty("source")
    private String source = "order-service";
}

// KafkaTemplate Konfiguration mit Custom Serializer
@Configuration
public class JsonKafkaConfig {
    
    @Bean
    public ProducerFactory<String, Object> jsonProducerFactory(EventJsonSerializer serializer) {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // Custom Serializer für Event Envelopes
        JsonSerializer<Object> valueSerializer = new JsonSerializer<>(serializer.getObjectMapper());
        valueSerializer.setAddTypeInfo(false); // Typ-Informationen in Event Envelope
        
        DefaultKafkaProducerFactory<String, Object> factory = 
            new DefaultKafkaProducerFactory<>(configProps);
        factory.setValueSerializer(valueSerializer);
        
        return factory;
    }
}

Python JSON Serialisierung:

# Python JSON Serializer mit Schema Validation
import json
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import structlog

logger = structlog.get_logger()

@dataclass
class EventEnvelope:
    event_id: str
    event_type: str
    timestamp: str
    version: str
    correlation_id: Optional[str]
    data: Dict[str, Any]
    schema_version: str = "1.0"
    source: str = "order-service"
    
    @classmethod
    def wrap(cls, event: Any, correlation_id: Optional[str] = None) -> 'EventEnvelope':
        return cls(
            event_id=str(uuid.uuid4()),
            event_type=event.__class__.__name__,
            timestamp=datetime.now(timezone.utc).isoformat(),
            version="v1",
            correlation_id=correlation_id,
            data=asdict(event) if hasattr(event, '__dataclass_fields__') else event.__dict__
        )

class EventJsonSerializer:
    def __init__(self):
        self.logger = logger.bind(component="EventJsonSerializer")
    
    def serialize(self, topic: str, event: Any, correlation_id: Optional[str] = None) -> bytes:
        """Serialize event with envelope and validation"""
        try:
            # Wrap event in envelope
            envelope = EventEnvelope.wrap(event, correlation_id)
            
            # Custom JSON encoder for datetime and other types
            json_str = json.dumps(
                asdict(envelope),
                default=self._json_serializer,
                separators=(',', ':'),  # Compact representation
                ensure_ascii=False
            )
            
            self.logger.debug(
                "Event serialized",
                topic=topic,
                event_type=envelope.event_type,
                event_id=envelope.event_id,
                size_bytes=len(json_str.encode('utf-8'))
            )
            
            return json_str.encode('utf-8')
            
        except Exception as e:
            self.logger.error(
                "Event serialization failed",
                topic=topic,
                event_type=type(event).__name__,
                error=str(e)
            )
            raise EventSerializationException(f"Failed to serialize event: {e}") from e
    
    def _json_serializer(self, obj: Any) -> Any:
        """Custom JSON serializer for special types"""
        if isinstance(obj, datetime):
            return obj.isoformat()
        elif isinstance(obj, Enum):
            return obj.value
        elif hasattr(obj, '__dict__'):
            return obj.__dict__
        else:
            return str(obj)
    
    def deserialize(self, topic: str, data: bytes) -> EventEnvelope:
        """Deserialize event with validation"""
        try:
            json_data = json.loads(data.decode('utf-8'))
            
            # Validate envelope structure
            required_fields = ['event_id', 'event_type', 'timestamp', 'version', 'data']
            missing_fields = [field for field in required_fields if field not in json_data]
            
            if missing_fields:
                raise ValueError(f"Missing required fields: {missing_fields}")
            
            return EventEnvelope(**json_data)
            
        except Exception as e:
            self.logger.error(
                "Event deserialization failed",
                topic=topic,
                error=str(e)
            )
            raise EventSerializationException(f"Failed to deserialize event: {e}") from e

class EventSerializationException(Exception):
    pass

24.1.2 Avro Serialisierung für Schema Evolution

Avro bietet bessere Performance und robuste Schema-Evolution, erfordert aber Schema Registry Infrastructure.

Spring Boot Avro Integration:

// Avro Schema Definition (order-placed-event.avsc)
/*
{
  "type": "record",
  "name": "OrderPlacedEvent",
  "namespace": "de.eda.training.events",
  "fields": [
    {"name": "eventId", "type": "string"},
    {"name": "eventType", "type": "string", "default": "OrderPlaced"},
    {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "version", "type": "string", "default": "v1"},
    {"name": "orderId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "totalAmount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"},
    {"name": "items", "type": {
      "type": "array",
      "items": {
        "type": "record",
        "name": "OrderItem",
        "fields": [
          {"name": "productId", "type": "string"},
          {"name": "quantity", "type": "int"},
          {"name": "price", "type": "double"}
        ]
      }
    }}
  ]
}
*/

// Avro Producer Configuration
@Configuration
public class AvroKafkaConfig {
    
    @Value("${spring.kafka.schema-registry-url}")
    private String schemaRegistryUrl;
    
    @Bean
    public ProducerFactory<String, GenericRecord> avroProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        configProps.put("schema.registry.url", schemaRegistryUrl);
        
        // Avro-spezifische Konfigurationen
        configProps.put("auto.register.schemas", true);
        configProps.put("use.latest.version", true);
        configProps.put("value.subject.name.strategy", TopicRecordNameStrategy.class);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, GenericRecord> avroKafkaTemplate() {
        return new KafkaTemplate<>(avroProducerFactory());
    }
}

// Avro Event Publisher
@Component
public class AvroEventPublisher {
    
    private final KafkaTemplate<String, GenericRecord> avroKafkaTemplate;
    private final Schema orderPlacedSchema;
    
    public AvroEventPublisher(KafkaTemplate<String, GenericRecord> avroKafkaTemplate) {
        this.avroKafkaTemplate = avroKafkaTemplate;
        this.orderPlacedSchema = loadSchema("order-placed-event.avsc");
    }
    
    public void publishOrderPlaced(Order order) {
        GenericRecord avroEvent = createOrderPlacedAvroRecord(order);
        
        avroKafkaTemplate.send("order.placed.avro.v1", order.getOrderId(), avroEvent)
            .addCallback(
                result -> log.info("Avro event published: {}", result.getRecordMetadata()),
                failure -> log.error("Failed to publish Avro event", failure)
            );
    }
    
    private GenericRecord createOrderPlacedAvroRecord(Order order) {
        GenericRecord record = new GenericData.Record(orderPlacedSchema);
        
        record.put("eventId", UUID.randomUUID().toString());
        record.put("eventType", "OrderPlaced");
        record.put("timestamp", Instant.now().toEpochMilli());
        record.put("version", "v1");
        record.put("orderId", order.getOrderId());
        record.put("customerId", order.getCustomerId());
        record.put("totalAmount", order.getTotalAmount().doubleValue());
        record.put("currency", order.getCurrency());
        
        // Items Array erstellen
        List<GenericRecord> items = order.getItems().stream()
            .map(this::createOrderItemRecord)
            .collect(Collectors.toList());
        record.put("items", items);
        
        return record;
    }
    
    private GenericRecord createOrderItemRecord(OrderItem item) {
        Schema itemSchema = orderPlacedSchema.getField("items")
            .schema().getElementType();
        
        GenericRecord itemRecord = new GenericData.Record(itemSchema);
        itemRecord.put("productId", item.getProductId());
        itemRecord.put("quantity", item.getQuantity());
        itemRecord.put("price", item.getPrice().doubleValue());
        
        return itemRecord;
    }
    
    private Schema loadSchema(String schemaFile) {
        try (InputStream schemaStream = getClass().getClassLoader()
                .getResourceAsStream("avro/" + schemaFile)) {
            return new Schema.Parser().parse(schemaStream);
        } catch (IOException e) {
            throw new RuntimeException("Failed to load Avro schema: " + schemaFile, e);
        }
    }
}

24.2 Transactional Guarantees

Transaktionale Garantien stellen sicher, dass Event-Publikation atomisch mit anderen Operationen erfolgt. Kafka unterstützt sowohl Producer-seitige Transaktionen als auch koordinierte Transaktionen mit externen Systemen.

24.2.1 Kafka Producer Transactions

// Spring Boot Transactional Event Publishing
@Service
@Transactional
public class TransactionalEventService {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final OrderRepository orderRepository;
    
    @KafkaTransactional
    public void processOrderWithEvents(CreateOrderRequest request) {
        // 1. Database Operation (JPA Transaction)
        Order order = new Order(request);
        Order savedOrder = orderRepository.save(order);
        
        // 2. Kafka Transaction (koordiniert mit JPA)
        publishOrderEvents(savedOrder);
        
        // Beide Transaktionen werden atomisch committed
    }
    
    private void publishOrderEvents(Order order) {
        // Mehrere Events in einer Kafka-Transaktion
        kafkaTemplate.executeInTransaction(template -> {
            // Primary Event
            template.send("order.placed.v1", order.getOrderId(), 
                         OrderPlacedEvent.from(order));
            
            // Related Events
            template.send("inventory.check.requested.v1", order.getOrderId(),
                         InventoryCheckRequestedEvent.from(order));
            
            template.send("payment.authorization.requested.v1", order.getOrderId(),
                         PaymentAuthorizationRequestedEvent.from(order));
            
            return null; // executeInTransaction erfordert Rückgabewert
        });
    }
    
    @KafkaTransactional
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        try {
            // Transactional Event Chain
            kafkaTemplate.executeInTransaction(template -> {
                // Order Status Update Event
                template.send("order.payment.confirmed.v1", event.getOrderId(),
                             OrderPaymentConfirmedEvent.from(event));
                
                // Trigger Fulfillment
                template.send("fulfillment.requested.v1", event.getOrderId(),
                             FulfillmentRequestedEvent.from(event));
                
                return null;
            });
            
        } catch (Exception e) {
            log.error("Failed to process payment event chain", e);
            // Publish compensation event
            publishPaymentProcessingFailed(event, e);
            throw e;
        }
    }
    
    // Compensation Event für Fehlerbehandlung
    private void publishPaymentProcessingFailed(PaymentProcessedEvent event, Exception error) {
        PaymentProcessingFailedEvent compensationEvent = PaymentProcessingFailedEvent.builder()
            .orderId(event.getOrderId())
            .paymentId(event.getPaymentId())
            .reason(error.getMessage())
            .timestamp(Instant.now())
            .build();
        
        kafkaTemplate.send("payment.processing.failed.v1", event.getOrderId(), compensationEvent);
    }
}

// Transactional Configuration
@Configuration
@EnableTransactionManagement
public class TransactionalConfig {
    
    @Bean
    @Primary
    public PlatformTransactionManager chainedTransactionManager(
            KafkaTransactionManager kafkaTransactionManager,
            JpaTransactionManager jpaTransactionManager) {
        
        ChainedTransactionManager chainedManager = new ChainedTransactionManager(
            kafkaTransactionManager,  // Kafka zuerst für bessere Fehlerbehandlung
            jpaTransactionManager     // JPA danach
        );
        
        return chainedManager;
    }
    
    @Bean
    public KafkaTransactionManager kafkaTransactionManager(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager(producerFactory);
    }
}

Python Transactional Implementation:

# Python Transactional Event Service
import asyncio
from contextlib import asynccontextmanager
from typing import List, Optional, Any, Dict
import structlog
from dataclasses import dataclass

logger = structlog.get_logger()

@dataclass
class TransactionEvent:
    topic: str
    key: str
    value: Any
    headers: Optional[Dict[str, str]] = None

class TransactionalEventService:
    def __init__(self, producer, db_session_factory):
        self.producer = producer
        self.db_session_factory = db_session_factory
        self.logger = logger.bind(component="TransactionalEventService")
    
    @asynccontextmanager
    async def transaction(self):
        """Context manager für koordinierte Transaktionen"""
        db_session = None
        transaction_id = None
        
        try:
            # 1. Database Transaction starten
            db_session = await self.db_session_factory.create_session()
            await db_session.begin()
            
            # 2. Kafka Transaction starten (wenn unterstützt)
            if hasattr(self.producer, 'begin_transaction'):
                transaction_id = await self.producer.begin_transaction()
            
            # Context für beide Transaktionen bereitstellen
            context = TransactionContext(db_session, self.producer, transaction_id)
            yield context
            
            # 3. Beide Transaktionen committen
            if transaction_id:
                await self.producer.commit_transaction()
            await db_session.commit()
            
            self.logger.info("Transaction committed successfully")
            
        except Exception as e:
            # 4. Rollback bei Fehler
            if transaction_id:
                try:
                    await self.producer.abort_transaction()
                except Exception as rollback_error:
                    self.logger.error("Failed to rollback Kafka transaction", 
                                    error=str(rollback_error))
            
            if db_session:
                try:
                    await db_session.rollback()
                except Exception as rollback_error:
                    self.logger.error("Failed to rollback DB transaction", 
                                    error=str(rollback_error))
            
            self.logger.error("Transaction failed, rolled back", error=str(e))
            raise
        
        finally:
            if db_session:
                await db_session.close()
    
    async def process_order_transactionally(self, request: 'CreateOrderRequest') -> 'Order':
        """Process order with full transactional guarantees"""
        async with self.transaction() as tx:
            # 1. Create and save order
            order = Order.from_request(request)
            await tx.db_session.merge(order)
            
            # 2. Publish events within transaction
            events = [
                TransactionEvent(
                    topic="order.placed.v1",
                    key=order.order_id,
                    value=OrderPlacedEvent.from_order(order)
                ),
                TransactionEvent(
                    topic="inventory.check.requested.v1", 
                    key=order.order_id,
                    value=InventoryCheckRequestedEvent.from_order(order)
                ),
                TransactionEvent(
                    topic="payment.authorization.requested.v1",
                    key=order.order_id, 
                    value=PaymentAuthorizationRequestedEvent.from_order(order)
                )
            ]
            
            await tx.publish_events(events)
            
            self.logger.info(
                "Order processed transactionally",
                order_id=order.order_id,
                events_count=len(events)
            )
            
            return order

class TransactionContext:
    def __init__(self, db_session, producer, transaction_id: Optional[str]):
        self.db_session = db_session
        self.producer = producer
        self.transaction_id = transaction_id
        self.pending_events: List[TransactionEvent] = []
    
    async def publish_events(self, events: List[TransactionEvent]):
        """Publish events within transaction context"""
        for event in events:
            if self.transaction_id:
                # Transactional send
                await self.producer.send_transactional(
                    topic=event.topic,
                    key=event.key,
                    value=event.value,
                    headers=event.headers
                )
            else:
                # Fallback: Store events for later publication
                self.pending_events.append(event)
        
        # Publish pending events if no transaction support
        if not self.transaction_id and self.pending_events:
            await self._publish_pending_events()
    
    async def _publish_pending_events(self):
        """Publish events that were queued during transaction"""
        for event in self.pending_events:
            await self.producer.send_event(
                topic=event.topic,
                event=event.value,
                key=event.key
            )
        self.pending_events.clear()

24.3 Idempotenz-Keys und -Strategies

Idempotente Event-Produktion verhindert Duplikat-Events durch eindeutige Identifikation und Duplicate Detection. Dies ist essentiell für At-least-once Delivery Semantics.

24.3.1 Idempotenz-Key Strategien

// Spring Boot Idempotent Event Publisher
@Component
public class IdempotentEventPublisher {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final RedisTemplate<String, String> redisTemplate;
    private final EventMetrics eventMetrics;
    
    private static final String IDEMPOTENCY_KEY_PREFIX = "event:idempotency:";
    private static final Duration IDEMPOTENCY_TTL = Duration.ofHours(24);
    
    public CompletableFuture<SendResult<String, Object>> publishIdempotent(
            String topic, String businessKey, Object event, String idempotencyKey) {
        
        // 1. Check if event was already published
        String redisKey = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
        String existingEventId = redisTemplate.opsForValue().get(redisKey);
        
        if (existingEventId != null) {
            eventMetrics.incrementDuplicateDetected(topic);
            log.debug("Duplicate event detected, skipping publication", 
                     idempotencyKey, existingEventId);
            
            // Return completed future with cached result
            return CompletableFuture.completedFuture(
                createCachedSendResult(topic, businessKey, existingEventId)
            );
        }
        
        // 2. Add idempotency metadata to event
        IdempotentEvent idempotentEvent = wrapWithIdempotency(event, idempotencyKey);
        
        // 3. Publish event
        ListenableFuture<SendResult<String, Object>> future = 
            kafkaTemplate.send(topic, businessKey, idempotentEvent);
        
        // 4. Store idempotency key on successful publish
        return future.completable().thenApply(result -> {
            String eventId = idempotentEvent.getEventId();
            redisTemplate.opsForValue().set(redisKey, eventId, IDEMPOTENCY_TTL);
            
            eventMetrics.incrementSuccessfulPublish(topic);
            log.info("Idempotent event published successfully", 
                    idempotencyKey, eventId, result.getRecordMetadata().offset());
            
            return result;
        }).exceptionally(throwable -> {
            eventMetrics.incrementFailedPublish(topic);
            log.error("Failed to publish idempotent event", throwable);
            throw new CompletionException(throwable);
        });
    }
    
    public CompletableFuture<SendResult<String, Object>> publishOrderEvent(
            Order order, String eventType) {
        
        // Business-based idempotency key
        String idempotencyKey = generateOrderEventIdempotencyKey(order, eventType);
        
        Object event = createEventForType(order, eventType);
        String topic = getTopicForEventType(eventType);
        
        return publishIdempotent(topic, order.getOrderId(), event, idempotencyKey);
    }
    
    private String generateOrderEventIdempotencyKey(Order order, String eventType) {
        // Kombiniere Business-Key mit Event-Typ und Order-Version
        return String.format("%s:%s:%s:%d", 
                           order.getOrderId(), 
                           eventType, 
                           order.getStatus(),
                           order.getVersion());
    }
    
    private IdempotentEvent wrapWithIdempotency(Object event, String idempotencyKey) {
        return IdempotentEvent.builder()
            .eventId(UUID.randomUUID().toString())
            .idempotencyKey(idempotencyKey)
            .timestamp(Instant.now())
            .eventData(event)
            .build();
    }
    
    @Data
    @Builder
    public static class IdempotentEvent {
        private String eventId;
        private String idempotencyKey;
        private Instant timestamp;
        private Object eventData;
        
        // Checksumme für zusätzliche Verifikation
        @JsonIgnore
        public String getContentChecksum() {
            try {
                String content = new ObjectMapper().writeValueAsString(eventData);
                return DigestUtils.sha256Hex(content);
            } catch (Exception e) {
                return "unknown";
            }
        }
    }
}

// Advanced Idempotency mit Database-backed Storage
@Component
public class DatabaseIdempotencyService {
    
    private final IdempotencyRepository idempotencyRepository;
    
    @Transactional
    public boolean isEventAlreadyPublished(String idempotencyKey) {
        return idempotencyRepository.findByIdempotencyKey(idempotencyKey)
            .map(record -> {
                // Check if record is still valid (not expired)
                return record.getCreatedAt()
                    .isAfter(Instant.now().minus(Duration.ofHours(24)));
            })
            .orElse(false);
    }
    
    @Transactional
    public void recordEventPublication(String idempotencyKey, String eventId, 
                                     String topic, Object eventData) {
        IdempotencyRecord record = IdempotencyRecord.builder()
            .idempotencyKey(idempotencyKey)
            .eventId(eventId)
            .topic(topic)
            .eventDataHash(calculateEventHash(eventData))
            .createdAt(Instant.now())
            .build();
        
        idempotencyRepository.save(record);
    }
    
    private String calculateEventHash(Object eventData) {
        try {
            String json = new ObjectMapper().writeValueAsString(eventData);
            return DigestUtils.sha256Hex(json);
        } catch (Exception e) {
            return "hash-error";
        }
    }
}

Python Idempotent Implementation:

# Python Idempotent Event Publisher
import hashlib
import json
import asyncio
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import structlog

logger = structlog.get_logger()

class IdempotentEventPublisher:
    def __init__(self, producer, redis_client, ttl_hours: int = 24):
        self.producer = producer
        self.redis_client = redis_client
        self.ttl_seconds = ttl_hours * 3600
        self.logger = logger.bind(component="IdempotentEventPublisher")
    
    async def publish_idempotent(self, topic: str, business_key: str, 
                               event: Any, idempotency_key: str) -> Dict[str, Any]:
        """Publish event with idempotency guarantees"""
        
        # 1. Check for existing publication
        redis_key = f"event:idempotency:{idempotency_key}"
        existing_event_id = await self.redis_client.get(redis_key)
        
        if existing_event_id:
            self.logger.debug(
                "Duplicate event detected, skipping publication",
                idempotency_key=idempotency_key,
                existing_event_id=existing_event_id
            )
            return {
                'status': 'duplicate',
                'event_id': existing_event_id,
                'idempotency_key': idempotency_key
            }
        
        # 2. Wrap event with idempotency metadata
        idempotent_event = self._wrap_with_idempotency(event, idempotency_key)
        
        try:
            # 3. Publish event
            result = await self.producer.send_event(
                topic=topic,
                event=idempotent_event,
                key=business_key
            )
            
            # 4. Store idempotency key on success
            await self.redis_client.setex(
                redis_key, 
                self.ttl_seconds, 
                idempotent_event['event_id']
            )
            
            self.logger.info(
                "Idempotent event published successfully",
                idempotency_key=idempotency_key,
                event_id=idempotent_event['event_id'],
                topic=topic,
                offset=result.get('offset')
            )
            
            return {
                'status': 'published',
                'event_id': idempotent_event['event_id'],
                'idempotency_key': idempotency_key,
                'result': result
            }
            
        except Exception as e:
            self.logger.error(
                "Failed to publish idempotent event",
                idempotency_key=idempotency_key,
                topic=topic,
                error=str(e)
            )
            raise
    
    async def publish_order_event(self, order: 'Order', event_type: str) -> Dict[str, Any]:
        """Publish order event with business-based idempotency"""
        
        # Generate business-based idempotency key
        idempotency_key = self._generate_order_idempotency_key(order, event_type)
        
        # Create event data
        event_data = self._create_event_for_type(order, event_type)
        topic = self._get_topic_for_event_type(event_type)
        
        return await self.publish_idempotent(
            topic=topic,
            business_key=order.order_id,
            event=event_data,
            idempotency_key=idempotency_key
        )
    
    def _generate_order_idempotency_key(self, order: 'Order', event_type: str) -> str:
        """Generate idempotency key based on business logic"""
        # Include order state to handle status changes
        key_components = [
            order.order_id,
            event_type,
            order.status.value,
            str(order.version)
        ]
        return ":".join(key_components)
    
    def _wrap_with_idempotency(self, event: Any, idempotency_key: str) -> Dict[str, Any]:
        """Wrap event with idempotency metadata"""
        import uuid
        
        idempotent_event = {
            'event_id': str(uuid.uuid4()),
            'idempotency_key': idempotency_key,
            'timestamp': datetime.utcnow().isoformat(),
            'event_data': event,
            'content_hash': self._calculate_content_hash(event)
        }
        
        return idempotent_event
    
    def _calculate_content_hash(self, event: Any) -> str:
        """Calculate content hash for additional verification"""
        try:
            # Normalize the event data for consistent hashing
            if hasattr(event, '__dict__'):
                content = json.dumps(event.__dict__, sort_keys=True, default=str)
            else:
                content = json.dumps(event, sort_keys=True, default=str)
            
            return hashlib.sha256(content.encode('utf-8')).hexdigest()[:16]
        except Exception:
            return "hash-error"

# Database-backed Idempotency für bessere Persistenz
class DatabaseIdempotencyService:
    def __init__(self, db_session_factory):
        self.db_session_factory = db_session_factory
        self.logger = logger.bind(component="DatabaseIdempotencyService")
    
    async def is_event_already_published(self, idempotency_key: str) -> bool:
        """Check if event was already published"""
        async with self.db_session_factory.create_session() as session:
            # Query for existing idempotency record
            query = """
                SELECT event_id, created_at 
                FROM event_idempotency 
                WHERE idempotency_key = %s 
                AND created_at > %s
            """
            
            cutoff_time = datetime.utcnow() - timedelta(hours=24)
            result = await session.execute(query, (idempotency_key, cutoff_time))
            
            return result.fetchone() is not None
    
    async def record_event_publication(self, idempotency_key: str, event_id: str,
                                     topic: str, event_data: Any):
        """Record successful event publication"""
        async with self.db_session_factory.create_session() as session:
            try:
                await session.begin()
                
                insert_query = """
                    INSERT INTO event_idempotency 
                    (idempotency_key, event_id, topic, event_data_hash, created_at)
                    VALUES (%s, %s, %s, %s, %s)
                """
                
                content_hash = self._calculate_event_hash(event_data)
                
                await session.execute(insert_query, (
                    idempotency_key,
                    event_id,
                    topic,
                    content_hash,
                    datetime.utcnow()
                ))
                
                await session.commit()
                
                self.logger.debug(
                    "Idempotency record created",
                    idempotency_key=idempotency_key,
                    event_id=event_id
                )
                
            except Exception as e:
                await session.rollback()
                self.logger.error(
                    "Failed to record idempotency",
                    idempotency_key=idempotency_key,
                    error=str(e)
                )
                raise
    
    def _calculate_event_hash(self, event_data: Any) -> str:
        """Calculate hash of event data"""
        try:
            content = json.dumps(event_data, sort_keys=True, default=str)
            return hashlib.sha256(content.encode('utf-8')).hexdigest()
        except Exception:
            return "hash-error"

24.4 Duplicate Detection

Duplicate Detection auf Consumer-Seite bietet zusätzliche Sicherheit gegen Duplicate Events, auch wenn Producer-seitige Idempotenz versagt.

24.4.1 Consumer-Side Duplicate Detection

// Spring Boot Consumer mit Duplicate Detection
@KafkaListener(topics = "order.placed.v1")
public class OrderEventConsumer {
    
    private final DuplicateDetectionService duplicateDetection;
    private final OrderEventHandler orderEventHandler;
    
    public void handleOrderPlaced(ConsumerRecord<String, OrderPlacedEvent> record) {
        OrderPlacedEvent event = record.value();
        String eventId = event.getEventId();
        
        // Duplicate Detection
        if (duplicateDetection.isDuplicate(eventId, record.topic())) {
            log.debug("Duplicate event detected and ignored: {}", eventId);
            return; // Idempotent: ignore duplicate
        }
        
        try {
            // Process event
            orderEventHandler.handleOrderPlaced(event);
            
            // Mark as processed
            duplicateDetection.markAsProcessed(eventId, record.topic(), 
                                             record.offset(), record.partition());
            
        } catch (Exception e) {
            log.error("Failed to process order event: {}", eventId, e);
            throw e; // Trigger retry
        }
    }
}

@Service
public class DuplicateDetectionService {
    
    private final RedisTemplate<String, String> redisTemplate;
    private static final Duration DETECTION_WINDOW = Duration.ofDays(7);
    
    public boolean isDuplicate(String eventId, String topic) {
        String key = String.format("processed:%s:%s", topic, eventId);
        return redisTemplate.hasKey(key);
    }
    
    public void markAsProcessed(String eventId, String topic, long offset, int partition) {
        String key = String.format("processed:%s:%s", topic, eventId);
        String value = String.format("%d:%d:%s", partition, offset, Instant.now());
        
        redisTemplate.opsForValue().set(key, value, DETECTION_WINDOW);
    }
}

Python Consumer Duplicate Detection:

# Python Consumer mit Duplicate Detection
class DuplicateDetectionConsumer:
    def __init__(self, redis_client, detection_window_days: int = 7):
        self.redis_client = redis_client
        self.detection_window_seconds = detection_window_days * 24 * 3600
        self.logger = logger.bind(component="DuplicateDetectionConsumer")
    
    async def handle_order_event(self, event_data: Dict[str, Any], 
                                topic: str, partition: int, offset: int):
        """Handle order event with duplicate detection"""
        
        event_id = event_data.get('event_id')
        if not event_id:
            self.logger.warning("Event without event_id received", topic=topic)
            return
        
        # Check for duplicate
        if await self.is_duplicate(event_id, topic):
            self.logger.debug(
                "Duplicate event detected and ignored",
                event_id=event_id,
                topic=topic
            )
            return  # Idempotent: ignore duplicate
        
        try:
            # Process the event
            await self.process_event(event_data)
            
            # Mark as processed
            await self.mark_as_processed(event_id, topic, partition, offset)
            
            self.logger.info(
                "Event processed successfully",
                event_id=event_id,
                topic=topic,
                partition=partition,
                offset=offset
            )
            
        except Exception as e:
            self.logger.error(
                "Failed to process event",
                event_id=event_id,
                topic=topic,
                error=str(e)
            )
            raise  # Trigger retry
    
    async def is_duplicate(self, event_id: str, topic: str) -> bool:
        """Check if event was already processed"""
        key = f"processed:{topic}:{event_id}"
        return await self.redis_client.exists(key) > 0
    
    async def mark_as_processed(self, event_id: str, topic: str, 
                              partition: int, offset: int):
        """Mark event as successfully processed"""
        key = f"processed:{topic}:{event_id}"
        value = f"{partition}:{offset}:{datetime.utcnow().isoformat()}"
        
        await self.redis_client.setex(
            key, 
            self.detection_window_seconds, 
            value
        )
    
    async def process_event(self, event_data: Dict[str, Any]):
        """Process the actual event business logic"""
        event_type = event_data.get('event_type')
        
        if event_type == 'OrderPlaced':
            await self.handle_order_placed(event_data['event_data'])
        elif event_type == 'OrderCancelled':
            await self.handle_order_cancelled(event_data['event_data'])
        else:
            raise ValueError(f"Unknown event type: {event_type}")

Diese Implementierungen zeigen, wie Serialisierung, Transaktionen und Idempotenz zusammenwirken, um robuste Event-Driven Systeme zu schaffen. Die Kombination aus producer-seitiger Idempotenz und consumer-seitiger Duplicate Detection bietet maximale Sicherheit gegen Event-Duplikate, während transaktionale Garantien Datenkonsistenz sicherstellen.


Status-Tracking: ✅ Kapitel “Serialisierung, Transaktionen und idempotente Produktion” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf robuste Event-Garantien und Duplicate Detection