Robuste Event-Driven Systeme erfordern zuverlässige Serialisierung, transaktionale Garantien und idempotente Event-Produktion. Diese technischen Aspekte sind entscheidend für Datenintegrität, System-Resilienz und korrekte Geschäftslogik in verteilten Umgebungen.
Event-Serialisierung wandelt strukturierte Daten in übertragbare Byte-Streams um. Die Wahl des Serialisierungsformats beeinflusst Performance, Schema-Evolution und Interoperabilität zwischen Services.
JSON ist das am weitesten verbreitete Format für Event-Serialisierung. Es bietet gute Lesbarkeit und breite Tool-Unterstützung, hat aber Nachteile bei Performance und Schema-Evolution.
Spring Boot JSON Serialisierung:
// Custom JSON Serializer für Events
@Component
public class EventJsonSerializer {
private final ObjectMapper objectMapper;
public EventJsonSerializer() {
this.objectMapper = createOptimizedObjectMapper();
}
private ObjectMapper createOptimizedObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
// Performance Optimierungen
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
mapper.registerModule(new JavaTimeModule());
// Event-spezifische Mixins für saubere Serialisierung
mapper.addMixIn(OrderPlacedEvent.class, OrderEventMixin.class);
return mapper;
}
public byte[] serialize(String topic, Object event) {
try {
// Event Envelope mit Metadata
EventEnvelope envelope = EventEnvelope.builder()
.eventId(UUID.randomUUID().toString())
.eventType(event.getClass().getSimpleName())
.timestamp(Instant.now())
.version("v1")
.correlationId(getCorrelationId())
.data(event)
.build();
return objectMapper.writeValueAsBytes(envelope);
} catch (JsonProcessingException e) {
throw new EventSerializationException("Failed to serialize event", e);
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract static class OrderEventMixin {
@JsonProperty("orderId")
abstract String getOrderId();
@JsonProperty("customerId")
abstract String getCustomerId();
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
abstract Instant getTimestamp();
}
private String getCorrelationId() {
return MDC.get("correlationId");
}
}
// Event Envelope für konsistente Struktur
@Data
@Builder
public class EventEnvelope {
private String eventId;
private String eventType;
private Instant timestamp;
private String version;
private String correlationId;
private Object data;
// Schema Evolution Support
@JsonProperty("schemaVersion")
private String schemaVersion = "1.0";
@JsonProperty("source")
private String source = "order-service";
}
// KafkaTemplate Konfiguration mit Custom Serializer
@Configuration
public class JsonKafkaConfig {
@Bean
public ProducerFactory<String, Object> jsonProducerFactory(EventJsonSerializer serializer) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Custom Serializer für Event Envelopes
JsonSerializer<Object> valueSerializer = new JsonSerializer<>(serializer.getObjectMapper());
valueSerializer.setAddTypeInfo(false); // Typ-Informationen in Event Envelope
DefaultKafkaProducerFactory<String, Object> factory =
new DefaultKafkaProducerFactory<>(configProps);
factory.setValueSerializer(valueSerializer);
return factory;
}
}Python JSON Serialisierung:
# Python JSON Serializer mit Schema Validation
import json
import uuid
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import structlog
logger = structlog.get_logger()
@dataclass
class EventEnvelope:
event_id: str
event_type: str
timestamp: str
version: str
correlation_id: Optional[str]
data: Dict[str, Any]
schema_version: str = "1.0"
source: str = "order-service"
@classmethod
def wrap(cls, event: Any, correlation_id: Optional[str] = None) -> 'EventEnvelope':
return cls(
event_id=str(uuid.uuid4()),
event_type=event.__class__.__name__,
timestamp=datetime.now(timezone.utc).isoformat(),
version="v1",
correlation_id=correlation_id,
data=asdict(event) if hasattr(event, '__dataclass_fields__') else event.__dict__
)
class EventJsonSerializer:
def __init__(self):
self.logger = logger.bind(component="EventJsonSerializer")
def serialize(self, topic: str, event: Any, correlation_id: Optional[str] = None) -> bytes:
"""Serialize event with envelope and validation"""
try:
# Wrap event in envelope
envelope = EventEnvelope.wrap(event, correlation_id)
# Custom JSON encoder for datetime and other types
json_str = json.dumps(
asdict(envelope),
default=self._json_serializer,
separators=(',', ':'), # Compact representation
ensure_ascii=False
)
self.logger.debug(
"Event serialized",
topic=topic,
event_type=envelope.event_type,
event_id=envelope.event_id,
size_bytes=len(json_str.encode('utf-8'))
)
return json_str.encode('utf-8')
except Exception as e:
self.logger.error(
"Event serialization failed",
topic=topic,
event_type=type(event).__name__,
error=str(e)
)
raise EventSerializationException(f"Failed to serialize event: {e}") from e
def _json_serializer(self, obj: Any) -> Any:
"""Custom JSON serializer for special types"""
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, Enum):
return obj.value
elif hasattr(obj, '__dict__'):
return obj.__dict__
else:
return str(obj)
def deserialize(self, topic: str, data: bytes) -> EventEnvelope:
"""Deserialize event with validation"""
try:
json_data = json.loads(data.decode('utf-8'))
# Validate envelope structure
required_fields = ['event_id', 'event_type', 'timestamp', 'version', 'data']
missing_fields = [field for field in required_fields if field not in json_data]
if missing_fields:
raise ValueError(f"Missing required fields: {missing_fields}")
return EventEnvelope(**json_data)
except Exception as e:
self.logger.error(
"Event deserialization failed",
topic=topic,
error=str(e)
)
raise EventSerializationException(f"Failed to deserialize event: {e}") from e
class EventSerializationException(Exception):
passAvro bietet bessere Performance und robuste Schema-Evolution, erfordert aber Schema Registry Infrastructure.
Spring Boot Avro Integration:
// Avro Schema Definition (order-placed-event.avsc)
/*
{
"type": "record",
"name": "OrderPlacedEvent",
"namespace": "de.eda.training.events",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "eventType", "type": "string", "default": "OrderPlaced"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "version", "type": "string", "default": "v1"},
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"},
{"name": "items", "type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}}
]
}
*/
// Avro Producer Configuration
@Configuration
public class AvroKafkaConfig {
@Value("${spring.kafka.schema-registry-url}")
private String schemaRegistryUrl;
@Bean
public ProducerFactory<String, GenericRecord> avroProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
configProps.put("schema.registry.url", schemaRegistryUrl);
// Avro-spezifische Konfigurationen
configProps.put("auto.register.schemas", true);
configProps.put("use.latest.version", true);
configProps.put("value.subject.name.strategy", TopicRecordNameStrategy.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, GenericRecord> avroKafkaTemplate() {
return new KafkaTemplate<>(avroProducerFactory());
}
}
// Avro Event Publisher
@Component
public class AvroEventPublisher {
private final KafkaTemplate<String, GenericRecord> avroKafkaTemplate;
private final Schema orderPlacedSchema;
public AvroEventPublisher(KafkaTemplate<String, GenericRecord> avroKafkaTemplate) {
this.avroKafkaTemplate = avroKafkaTemplate;
this.orderPlacedSchema = loadSchema("order-placed-event.avsc");
}
public void publishOrderPlaced(Order order) {
GenericRecord avroEvent = createOrderPlacedAvroRecord(order);
avroKafkaTemplate.send("order.placed.avro.v1", order.getOrderId(), avroEvent)
.addCallback(
result -> log.info("Avro event published: {}", result.getRecordMetadata()),
failure -> log.error("Failed to publish Avro event", failure)
);
}
private GenericRecord createOrderPlacedAvroRecord(Order order) {
GenericRecord record = new GenericData.Record(orderPlacedSchema);
record.put("eventId", UUID.randomUUID().toString());
record.put("eventType", "OrderPlaced");
record.put("timestamp", Instant.now().toEpochMilli());
record.put("version", "v1");
record.put("orderId", order.getOrderId());
record.put("customerId", order.getCustomerId());
record.put("totalAmount", order.getTotalAmount().doubleValue());
record.put("currency", order.getCurrency());
// Items Array erstellen
List<GenericRecord> items = order.getItems().stream()
.map(this::createOrderItemRecord)
.collect(Collectors.toList());
record.put("items", items);
return record;
}
private GenericRecord createOrderItemRecord(OrderItem item) {
Schema itemSchema = orderPlacedSchema.getField("items")
.schema().getElementType();
GenericRecord itemRecord = new GenericData.Record(itemSchema);
itemRecord.put("productId", item.getProductId());
itemRecord.put("quantity", item.getQuantity());
itemRecord.put("price", item.getPrice().doubleValue());
return itemRecord;
}
private Schema loadSchema(String schemaFile) {
try (InputStream schemaStream = getClass().getClassLoader()
.getResourceAsStream("avro/" + schemaFile)) {
return new Schema.Parser().parse(schemaStream);
} catch (IOException e) {
throw new RuntimeException("Failed to load Avro schema: " + schemaFile, e);
}
}
}Transaktionale Garantien stellen sicher, dass Event-Publikation atomisch mit anderen Operationen erfolgt. Kafka unterstützt sowohl Producer-seitige Transaktionen als auch koordinierte Transaktionen mit externen Systemen.
// Spring Boot Transactional Event Publishing
@Service
@Transactional
public class TransactionalEventService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final OrderRepository orderRepository;
@KafkaTransactional
public void processOrderWithEvents(CreateOrderRequest request) {
// 1. Database Operation (JPA Transaction)
Order order = new Order(request);
Order savedOrder = orderRepository.save(order);
// 2. Kafka Transaction (koordiniert mit JPA)
publishOrderEvents(savedOrder);
// Beide Transaktionen werden atomisch committed
}
private void publishOrderEvents(Order order) {
// Mehrere Events in einer Kafka-Transaktion
kafkaTemplate.executeInTransaction(template -> {
// Primary Event
template.send("order.placed.v1", order.getOrderId(),
OrderPlacedEvent.from(order));
// Related Events
template.send("inventory.check.requested.v1", order.getOrderId(),
InventoryCheckRequestedEvent.from(order));
template.send("payment.authorization.requested.v1", order.getOrderId(),
PaymentAuthorizationRequestedEvent.from(order));
return null; // executeInTransaction erfordert Rückgabewert
});
}
@KafkaTransactional
public void handlePaymentProcessed(PaymentProcessedEvent event) {
try {
// Transactional Event Chain
kafkaTemplate.executeInTransaction(template -> {
// Order Status Update Event
template.send("order.payment.confirmed.v1", event.getOrderId(),
OrderPaymentConfirmedEvent.from(event));
// Trigger Fulfillment
template.send("fulfillment.requested.v1", event.getOrderId(),
FulfillmentRequestedEvent.from(event));
return null;
});
} catch (Exception e) {
log.error("Failed to process payment event chain", e);
// Publish compensation event
publishPaymentProcessingFailed(event, e);
throw e;
}
}
// Compensation Event für Fehlerbehandlung
private void publishPaymentProcessingFailed(PaymentProcessedEvent event, Exception error) {
PaymentProcessingFailedEvent compensationEvent = PaymentProcessingFailedEvent.builder()
.orderId(event.getOrderId())
.paymentId(event.getPaymentId())
.reason(error.getMessage())
.timestamp(Instant.now())
.build();
kafkaTemplate.send("payment.processing.failed.v1", event.getOrderId(), compensationEvent);
}
}
// Transactional Configuration
@Configuration
@EnableTransactionManagement
public class TransactionalConfig {
@Bean
@Primary
public PlatformTransactionManager chainedTransactionManager(
KafkaTransactionManager kafkaTransactionManager,
JpaTransactionManager jpaTransactionManager) {
ChainedTransactionManager chainedManager = new ChainedTransactionManager(
kafkaTransactionManager, // Kafka zuerst für bessere Fehlerbehandlung
jpaTransactionManager // JPA danach
);
return chainedManager;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager(producerFactory);
}
}Python Transactional Implementation:
# Python Transactional Event Service
import asyncio
from contextlib import asynccontextmanager
from typing import List, Optional, Any, Dict
import structlog
from dataclasses import dataclass
logger = structlog.get_logger()
@dataclass
class TransactionEvent:
topic: str
key: str
value: Any
headers: Optional[Dict[str, str]] = None
class TransactionalEventService:
def __init__(self, producer, db_session_factory):
self.producer = producer
self.db_session_factory = db_session_factory
self.logger = logger.bind(component="TransactionalEventService")
@asynccontextmanager
async def transaction(self):
"""Context manager für koordinierte Transaktionen"""
db_session = None
transaction_id = None
try:
# 1. Database Transaction starten
db_session = await self.db_session_factory.create_session()
await db_session.begin()
# 2. Kafka Transaction starten (wenn unterstützt)
if hasattr(self.producer, 'begin_transaction'):
transaction_id = await self.producer.begin_transaction()
# Context für beide Transaktionen bereitstellen
context = TransactionContext(db_session, self.producer, transaction_id)
yield context
# 3. Beide Transaktionen committen
if transaction_id:
await self.producer.commit_transaction()
await db_session.commit()
self.logger.info("Transaction committed successfully")
except Exception as e:
# 4. Rollback bei Fehler
if transaction_id:
try:
await self.producer.abort_transaction()
except Exception as rollback_error:
self.logger.error("Failed to rollback Kafka transaction",
error=str(rollback_error))
if db_session:
try:
await db_session.rollback()
except Exception as rollback_error:
self.logger.error("Failed to rollback DB transaction",
error=str(rollback_error))
self.logger.error("Transaction failed, rolled back", error=str(e))
raise
finally:
if db_session:
await db_session.close()
async def process_order_transactionally(self, request: 'CreateOrderRequest') -> 'Order':
"""Process order with full transactional guarantees"""
async with self.transaction() as tx:
# 1. Create and save order
order = Order.from_request(request)
await tx.db_session.merge(order)
# 2. Publish events within transaction
events = [
TransactionEvent(
topic="order.placed.v1",
key=order.order_id,
value=OrderPlacedEvent.from_order(order)
),
TransactionEvent(
topic="inventory.check.requested.v1",
key=order.order_id,
value=InventoryCheckRequestedEvent.from_order(order)
),
TransactionEvent(
topic="payment.authorization.requested.v1",
key=order.order_id,
value=PaymentAuthorizationRequestedEvent.from_order(order)
)
]
await tx.publish_events(events)
self.logger.info(
"Order processed transactionally",
order_id=order.order_id,
events_count=len(events)
)
return order
class TransactionContext:
def __init__(self, db_session, producer, transaction_id: Optional[str]):
self.db_session = db_session
self.producer = producer
self.transaction_id = transaction_id
self.pending_events: List[TransactionEvent] = []
async def publish_events(self, events: List[TransactionEvent]):
"""Publish events within transaction context"""
for event in events:
if self.transaction_id:
# Transactional send
await self.producer.send_transactional(
topic=event.topic,
key=event.key,
value=event.value,
headers=event.headers
)
else:
# Fallback: Store events for later publication
self.pending_events.append(event)
# Publish pending events if no transaction support
if not self.transaction_id and self.pending_events:
await self._publish_pending_events()
async def _publish_pending_events(self):
"""Publish events that were queued during transaction"""
for event in self.pending_events:
await self.producer.send_event(
topic=event.topic,
event=event.value,
key=event.key
)
self.pending_events.clear()Idempotente Event-Produktion verhindert Duplikat-Events durch eindeutige Identifikation und Duplicate Detection. Dies ist essentiell für At-least-once Delivery Semantics.
// Spring Boot Idempotent Event Publisher
@Component
public class IdempotentEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final RedisTemplate<String, String> redisTemplate;
private final EventMetrics eventMetrics;
private static final String IDEMPOTENCY_KEY_PREFIX = "event:idempotency:";
private static final Duration IDEMPOTENCY_TTL = Duration.ofHours(24);
public CompletableFuture<SendResult<String, Object>> publishIdempotent(
String topic, String businessKey, Object event, String idempotencyKey) {
// 1. Check if event was already published
String redisKey = IDEMPOTENCY_KEY_PREFIX + idempotencyKey;
String existingEventId = redisTemplate.opsForValue().get(redisKey);
if (existingEventId != null) {
eventMetrics.incrementDuplicateDetected(topic);
log.debug("Duplicate event detected, skipping publication",
idempotencyKey, existingEventId);
// Return completed future with cached result
return CompletableFuture.completedFuture(
createCachedSendResult(topic, businessKey, existingEventId)
);
}
// 2. Add idempotency metadata to event
IdempotentEvent idempotentEvent = wrapWithIdempotency(event, idempotencyKey);
// 3. Publish event
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, businessKey, idempotentEvent);
// 4. Store idempotency key on successful publish
return future.completable().thenApply(result -> {
String eventId = idempotentEvent.getEventId();
redisTemplate.opsForValue().set(redisKey, eventId, IDEMPOTENCY_TTL);
eventMetrics.incrementSuccessfulPublish(topic);
log.info("Idempotent event published successfully",
idempotencyKey, eventId, result.getRecordMetadata().offset());
return result;
}).exceptionally(throwable -> {
eventMetrics.incrementFailedPublish(topic);
log.error("Failed to publish idempotent event", throwable);
throw new CompletionException(throwable);
});
}
public CompletableFuture<SendResult<String, Object>> publishOrderEvent(
Order order, String eventType) {
// Business-based idempotency key
String idempotencyKey = generateOrderEventIdempotencyKey(order, eventType);
Object event = createEventForType(order, eventType);
String topic = getTopicForEventType(eventType);
return publishIdempotent(topic, order.getOrderId(), event, idempotencyKey);
}
private String generateOrderEventIdempotencyKey(Order order, String eventType) {
// Kombiniere Business-Key mit Event-Typ und Order-Version
return String.format("%s:%s:%s:%d",
order.getOrderId(),
eventType,
order.getStatus(),
order.getVersion());
}
private IdempotentEvent wrapWithIdempotency(Object event, String idempotencyKey) {
return IdempotentEvent.builder()
.eventId(UUID.randomUUID().toString())
.idempotencyKey(idempotencyKey)
.timestamp(Instant.now())
.eventData(event)
.build();
}
@Data
@Builder
public static class IdempotentEvent {
private String eventId;
private String idempotencyKey;
private Instant timestamp;
private Object eventData;
// Checksumme für zusätzliche Verifikation
@JsonIgnore
public String getContentChecksum() {
try {
String content = new ObjectMapper().writeValueAsString(eventData);
return DigestUtils.sha256Hex(content);
} catch (Exception e) {
return "unknown";
}
}
}
}
// Advanced Idempotency mit Database-backed Storage
@Component
public class DatabaseIdempotencyService {
private final IdempotencyRepository idempotencyRepository;
@Transactional
public boolean isEventAlreadyPublished(String idempotencyKey) {
return idempotencyRepository.findByIdempotencyKey(idempotencyKey)
.map(record -> {
// Check if record is still valid (not expired)
return record.getCreatedAt()
.isAfter(Instant.now().minus(Duration.ofHours(24)));
})
.orElse(false);
}
@Transactional
public void recordEventPublication(String idempotencyKey, String eventId,
String topic, Object eventData) {
IdempotencyRecord record = IdempotencyRecord.builder()
.idempotencyKey(idempotencyKey)
.eventId(eventId)
.topic(topic)
.eventDataHash(calculateEventHash(eventData))
.createdAt(Instant.now())
.build();
idempotencyRepository.save(record);
}
private String calculateEventHash(Object eventData) {
try {
String json = new ObjectMapper().writeValueAsString(eventData);
return DigestUtils.sha256Hex(json);
} catch (Exception e) {
return "hash-error";
}
}
}Python Idempotent Implementation:
# Python Idempotent Event Publisher
import hashlib
import json
import asyncio
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import structlog
logger = structlog.get_logger()
class IdempotentEventPublisher:
def __init__(self, producer, redis_client, ttl_hours: int = 24):
self.producer = producer
self.redis_client = redis_client
self.ttl_seconds = ttl_hours * 3600
self.logger = logger.bind(component="IdempotentEventPublisher")
async def publish_idempotent(self, topic: str, business_key: str,
event: Any, idempotency_key: str) -> Dict[str, Any]:
"""Publish event with idempotency guarantees"""
# 1. Check for existing publication
redis_key = f"event:idempotency:{idempotency_key}"
existing_event_id = await self.redis_client.get(redis_key)
if existing_event_id:
self.logger.debug(
"Duplicate event detected, skipping publication",
idempotency_key=idempotency_key,
existing_event_id=existing_event_id
)
return {
'status': 'duplicate',
'event_id': existing_event_id,
'idempotency_key': idempotency_key
}
# 2. Wrap event with idempotency metadata
idempotent_event = self._wrap_with_idempotency(event, idempotency_key)
try:
# 3. Publish event
result = await self.producer.send_event(
topic=topic,
event=idempotent_event,
key=business_key
)
# 4. Store idempotency key on success
await self.redis_client.setex(
redis_key,
self.ttl_seconds,
idempotent_event['event_id']
)
self.logger.info(
"Idempotent event published successfully",
idempotency_key=idempotency_key,
event_id=idempotent_event['event_id'],
topic=topic,
offset=result.get('offset')
)
return {
'status': 'published',
'event_id': idempotent_event['event_id'],
'idempotency_key': idempotency_key,
'result': result
}
except Exception as e:
self.logger.error(
"Failed to publish idempotent event",
idempotency_key=idempotency_key,
topic=topic,
error=str(e)
)
raise
async def publish_order_event(self, order: 'Order', event_type: str) -> Dict[str, Any]:
"""Publish order event with business-based idempotency"""
# Generate business-based idempotency key
idempotency_key = self._generate_order_idempotency_key(order, event_type)
# Create event data
event_data = self._create_event_for_type(order, event_type)
topic = self._get_topic_for_event_type(event_type)
return await self.publish_idempotent(
topic=topic,
business_key=order.order_id,
event=event_data,
idempotency_key=idempotency_key
)
def _generate_order_idempotency_key(self, order: 'Order', event_type: str) -> str:
"""Generate idempotency key based on business logic"""
# Include order state to handle status changes
key_components = [
order.order_id,
event_type,
order.status.value,
str(order.version)
]
return ":".join(key_components)
def _wrap_with_idempotency(self, event: Any, idempotency_key: str) -> Dict[str, Any]:
"""Wrap event with idempotency metadata"""
import uuid
idempotent_event = {
'event_id': str(uuid.uuid4()),
'idempotency_key': idempotency_key,
'timestamp': datetime.utcnow().isoformat(),
'event_data': event,
'content_hash': self._calculate_content_hash(event)
}
return idempotent_event
def _calculate_content_hash(self, event: Any) -> str:
"""Calculate content hash for additional verification"""
try:
# Normalize the event data for consistent hashing
if hasattr(event, '__dict__'):
content = json.dumps(event.__dict__, sort_keys=True, default=str)
else:
content = json.dumps(event, sort_keys=True, default=str)
return hashlib.sha256(content.encode('utf-8')).hexdigest()[:16]
except Exception:
return "hash-error"
# Database-backed Idempotency für bessere Persistenz
class DatabaseIdempotencyService:
def __init__(self, db_session_factory):
self.db_session_factory = db_session_factory
self.logger = logger.bind(component="DatabaseIdempotencyService")
async def is_event_already_published(self, idempotency_key: str) -> bool:
"""Check if event was already published"""
async with self.db_session_factory.create_session() as session:
# Query for existing idempotency record
query = """
SELECT event_id, created_at
FROM event_idempotency
WHERE idempotency_key = %s
AND created_at > %s
"""
cutoff_time = datetime.utcnow() - timedelta(hours=24)
result = await session.execute(query, (idempotency_key, cutoff_time))
return result.fetchone() is not None
async def record_event_publication(self, idempotency_key: str, event_id: str,
topic: str, event_data: Any):
"""Record successful event publication"""
async with self.db_session_factory.create_session() as session:
try:
await session.begin()
insert_query = """
INSERT INTO event_idempotency
(idempotency_key, event_id, topic, event_data_hash, created_at)
VALUES (%s, %s, %s, %s, %s)
"""
content_hash = self._calculate_event_hash(event_data)
await session.execute(insert_query, (
idempotency_key,
event_id,
topic,
content_hash,
datetime.utcnow()
))
await session.commit()
self.logger.debug(
"Idempotency record created",
idempotency_key=idempotency_key,
event_id=event_id
)
except Exception as e:
await session.rollback()
self.logger.error(
"Failed to record idempotency",
idempotency_key=idempotency_key,
error=str(e)
)
raise
def _calculate_event_hash(self, event_data: Any) -> str:
"""Calculate hash of event data"""
try:
content = json.dumps(event_data, sort_keys=True, default=str)
return hashlib.sha256(content.encode('utf-8')).hexdigest()
except Exception:
return "hash-error"Duplicate Detection auf Consumer-Seite bietet zusätzliche Sicherheit gegen Duplicate Events, auch wenn Producer-seitige Idempotenz versagt.
// Spring Boot Consumer mit Duplicate Detection
@KafkaListener(topics = "order.placed.v1")
public class OrderEventConsumer {
private final DuplicateDetectionService duplicateDetection;
private final OrderEventHandler orderEventHandler;
public void handleOrderPlaced(ConsumerRecord<String, OrderPlacedEvent> record) {
OrderPlacedEvent event = record.value();
String eventId = event.getEventId();
// Duplicate Detection
if (duplicateDetection.isDuplicate(eventId, record.topic())) {
log.debug("Duplicate event detected and ignored: {}", eventId);
return; // Idempotent: ignore duplicate
}
try {
// Process event
orderEventHandler.handleOrderPlaced(event);
// Mark as processed
duplicateDetection.markAsProcessed(eventId, record.topic(),
record.offset(), record.partition());
} catch (Exception e) {
log.error("Failed to process order event: {}", eventId, e);
throw e; // Trigger retry
}
}
}
@Service
public class DuplicateDetectionService {
private final RedisTemplate<String, String> redisTemplate;
private static final Duration DETECTION_WINDOW = Duration.ofDays(7);
public boolean isDuplicate(String eventId, String topic) {
String key = String.format("processed:%s:%s", topic, eventId);
return redisTemplate.hasKey(key);
}
public void markAsProcessed(String eventId, String topic, long offset, int partition) {
String key = String.format("processed:%s:%s", topic, eventId);
String value = String.format("%d:%d:%s", partition, offset, Instant.now());
redisTemplate.opsForValue().set(key, value, DETECTION_WINDOW);
}
}Python Consumer Duplicate Detection:
# Python Consumer mit Duplicate Detection
class DuplicateDetectionConsumer:
def __init__(self, redis_client, detection_window_days: int = 7):
self.redis_client = redis_client
self.detection_window_seconds = detection_window_days * 24 * 3600
self.logger = logger.bind(component="DuplicateDetectionConsumer")
async def handle_order_event(self, event_data: Dict[str, Any],
topic: str, partition: int, offset: int):
"""Handle order event with duplicate detection"""
event_id = event_data.get('event_id')
if not event_id:
self.logger.warning("Event without event_id received", topic=topic)
return
# Check for duplicate
if await self.is_duplicate(event_id, topic):
self.logger.debug(
"Duplicate event detected and ignored",
event_id=event_id,
topic=topic
)
return # Idempotent: ignore duplicate
try:
# Process the event
await self.process_event(event_data)
# Mark as processed
await self.mark_as_processed(event_id, topic, partition, offset)
self.logger.info(
"Event processed successfully",
event_id=event_id,
topic=topic,
partition=partition,
offset=offset
)
except Exception as e:
self.logger.error(
"Failed to process event",
event_id=event_id,
topic=topic,
error=str(e)
)
raise # Trigger retry
async def is_duplicate(self, event_id: str, topic: str) -> bool:
"""Check if event was already processed"""
key = f"processed:{topic}:{event_id}"
return await self.redis_client.exists(key) > 0
async def mark_as_processed(self, event_id: str, topic: str,
partition: int, offset: int):
"""Mark event as successfully processed"""
key = f"processed:{topic}:{event_id}"
value = f"{partition}:{offset}:{datetime.utcnow().isoformat()}"
await self.redis_client.setex(
key,
self.detection_window_seconds,
value
)
async def process_event(self, event_data: Dict[str, Any]):
"""Process the actual event business logic"""
event_type = event_data.get('event_type')
if event_type == 'OrderPlaced':
await self.handle_order_placed(event_data['event_data'])
elif event_type == 'OrderCancelled':
await self.handle_order_cancelled(event_data['event_data'])
else:
raise ValueError(f"Unknown event type: {event_type}")Diese Implementierungen zeigen, wie Serialisierung, Transaktionen und Idempotenz zusammenwirken, um robuste Event-Driven Systeme zu schaffen. Die Kombination aus producer-seitiger Idempotenz und consumer-seitiger Duplicate Detection bietet maximale Sicherheit gegen Event-Duplikate, während transaktionale Garantien Datenkonsistenz sicherstellen.
Status-Tracking: ✅ Kapitel “Serialisierung, Transaktionen und idempotente Produktion” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf robuste Event-Garantien und Duplicate Detection