Das Outbox Pattern löst eines der fundamentalsten Probleme in Event-Driven Architecture: Wie kann sichergestellt werden, dass Geschäftsdaten und Event-Publikation atomisch erfolgen? Die Herausforderung entsteht, weil Datenbankoperationen und Message-Broker-Calls unterschiedliche Transaktionssysteme verwenden.
In traditionellen Ansätzen führen Geschäftsoperationen oft zu zwei separaten Schreibvorgängen:
@Transactional
public void processOrder(OrderRequest request) {
// 1. Geschäftsdaten in Datenbank schreiben
Order order = new Order(request);
orderRepository.save(order);
// 2. Event publizieren
OrderPlacedEvent event = new OrderPlacedEvent(order);
kafkaTemplate.send("order.placed.v1", event);
// Problem: Was passiert, wenn Kafka nicht erreichbar ist?
}Diese Implementierung ist nicht transaktional konsistent. Verschiedene Failure-Szenarien führen zu inkonsistenten Zuständen:
| Szenario | Datenbank | Event-Publikation | Resultat |
|---|---|---|---|
| Erfolg | ✅ Gespeichert | ✅ Publiziert | Konsistent |
| DB-Fehler | ❌ Rollback | ❌ Nicht publiziert | Konsistent |
| Kafka-Fehler | ✅ Committed | ❌ Nicht publiziert | Inkonsistent |
| Netzwerk-Timeout | ✅ Committed | ❓ Unbekannt | Inkonsistent |
Das Dual Writes Problem manifestiert sich besonders kritisch bei
Geschäftsprozessen, die auf Events angewiesen sind. Wenn die Zahlung
erfolgreich verbucht, aber das PaymentProcessedEvent nicht
publiziert wird, bleibt die Bestellung in einem unvollständigen
Zustand.
Das Outbox Pattern umgeht das Problem, indem Events als Teil derselben Datenbanktransaktion gespeichert werden. Ein separater Prozess liest diese Events und publiziert sie asynchron.
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed_at TIMESTAMP NULL,
version INTEGER DEFAULT 1
);
CREATE INDEX idx_outbox_unprocessed ON outbox_events(processed_at)
WHERE processed_at IS NULL;@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
@Id
private UUID id;
@Column(name = "aggregate_id")
private String aggregateId;
@Column(name = "event_type")
private String eventType;
@Column(name = "event_data", columnDefinition = "jsonb")
private String eventData;
@Column(name = "created_at")
private LocalDateTime createdAt;
@Column(name = "processed_at")
private LocalDateTime processedAt;
// Konstruktoren, Getter, Setter
}
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxRepository;
private final ObjectMapper objectMapper;
public void processOrder(OrderRequest request) {
// 1. Geschäftsdaten speichern
Order order = new Order(request);
order = orderRepository.save(order);
// 2. Event in Outbox speichern (SAME TRANSACTION)
OutboxEvent outboxEvent = createOutboxEvent(order);
outboxRepository.save(outboxEvent);
// Beide Operationen sind atomisch
}
private OutboxEvent createOutboxEvent(Order order) {
try {
OrderPlacedEvent event = OrderPlacedEvent.builder()
.orderId(order.getId())
.customerId(order.getCustomerId())
.items(order.getItems())
.totalAmount(order.getTotalAmount())
.timestamp(order.getCreatedAt())
.build();
return OutboxEvent.builder()
.id(UUID.randomUUID())
.aggregateId(order.getId())
.eventType("OrderPlaced")
.eventData(objectMapper.writeValueAsString(event))
.createdAt(LocalDateTime.now())
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize event", e);
}
}
}@Component
public class OutboxEventPublisher {
private final OutboxEventRepository outboxRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Scheduled(fixedDelay = 1000) // Alle 1 Sekunde
@Transactional
public void publishPendingEvents() {
List<OutboxEvent> pendingEvents = outboxRepository
.findByProcessedAtIsNullOrderByCreatedAt(PageRequest.of(0, 100));
for (OutboxEvent event : pendingEvents) {
try {
publishEvent(event);
markAsProcessed(event);
} catch (Exception e) {
log.error("Failed to publish event {}: {}",
event.getId(), e.getMessage());
// Event bleibt unverarbeitet für nächsten Retry
}
}
}
private void publishEvent(OutboxEvent outboxEvent) {
String topic = determineTopicFromEventType(outboxEvent.getEventType());
String key = outboxEvent.getAggregateId();
// Deserialisierung und Publikation
Object eventPayload = deserializeEvent(outboxEvent);
kafkaTemplate.send(topic, key, eventPayload);
log.info("Published event {} to topic {}",
outboxEvent.getId(), topic);
}
private void markAsProcessed(OutboxEvent event) {
event.setProcessedAt(LocalDateTime.now());
outboxRepository.save(event);
}
}Eine vollständige Spring Boot-Integration nutzt Spring Data JPA und Spring Kafka:
// Configuration
@Configuration
@EnableJpaRepositories
@EnableKafka
@EnableScheduling
public class OutboxConfiguration {
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(props);
}
}
// Repository
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {
@Query("SELECT e FROM OutboxEvent e WHERE e.processedAt IS NULL ORDER BY e.createdAt")
List<OutboxEvent> findByProcessedAtIsNullOrderByCreatedAt(Pageable pageable);
@Modifying
@Query("DELETE FROM OutboxEvent e WHERE e.processedAt < :cutoff")
void deleteProcessedEventsBefore(@Param("cutoff") LocalDateTime cutoff);
}
// Event Publisher Service
@Service
public class OutboxEventPublisher {
private static final Map<String, String> EVENT_TYPE_TO_TOPIC = Map.of(
"OrderPlaced", "order.placed.v1",
"OrderCancelled", "order.cancelled.v1",
"PaymentProcessed", "payment.processed.v1"
);
// Implementation wie oben...
@Scheduled(cron = "0 0 2 * * ?") // Täglich 2:00 Uhr
@Transactional
public void cleanupProcessedEvents() {
LocalDateTime cutoff = LocalDateTime.now().minusDays(7);
outboxRepository.deleteProcessedEventsBefore(cutoff);
log.info("Cleaned up processed events older than {}", cutoff);
}
}Python-Implementierung mit SQLAlchemy und asyncio:
# Models
from sqlalchemy import Column, String, DateTime, Text, Integer
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.declarative import declarative_base
import uuid
from datetime import datetime
Base = declarative_base()
class OutboxEvent(Base):
__tablename__ = 'outbox_events'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
aggregate_id = Column(String(255), nullable=False)
event_type = Column(String(255), nullable=False)
event_data = Column(JSONB, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
processed_at = Column(DateTime, nullable=True)
version = Column(Integer, default=1)
# Service
import json
from sqlalchemy.orm import Session
from kafka import KafkaProducer
class OrderService:
def __init__(self, session: Session):
self.session = session
def process_order(self, order_request: dict):
try:
self.session.begin()
# 1. Geschäftsdaten speichern
order = self._create_order(order_request)
self.session.add(order)
# 2. Event in Outbox speichern
outbox_event = self._create_outbox_event(order)
self.session.add(outbox_event)
self.session.commit()
except Exception as e:
self.session.rollback()
raise e
def _create_outbox_event(self, order):
event_data = {
'orderId': str(order.id),
'customerId': str(order.customer_id),
'items': order.items,
'totalAmount': float(order.total_amount),
'timestamp': order.created_at.isoformat()
}
return OutboxEvent(
aggregate_id=str(order.id),
event_type='OrderPlaced',
event_data=event_data
)
# Async Event Publisher
import asyncio
from kafka import KafkaProducer
class OutboxEventPublisher:
def __init__(self, session_factory, kafka_config):
self.session_factory = session_factory
self.producer = KafkaProducer(
bootstrap_servers=kafka_config['bootstrap_servers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3,
enable_idempotence=True
)
self.topic_mapping = {
'OrderPlaced': 'order.placed.v1',
'OrderCancelled': 'order.cancelled.v1',
'PaymentProcessed': 'payment.processed.v1'
}
async def start_publishing(self):
while True:
try:
await self._publish_pending_events()
await asyncio.sleep(1) # 1 Sekunde Pause
except Exception as e:
print(f"Error in event publisher: {e}")
await asyncio.sleep(5) # Längere Pause bei Fehlern
async def _publish_pending_events(self):
session = self.session_factory()
try:
pending_events = session.query(OutboxEvent)\
.filter(OutboxEvent.processed_at.is_(None))\
.order_by(OutboxEvent.created_at)\
.limit(100)\
.all()
for event in pending_events:
await self._publish_event(event, session)
finally:
session.close()
async def _publish_event(self, event: OutboxEvent, session: Session):
try:
topic = self.topic_mapping.get(event.event_type)
if not topic:
raise ValueError(f"Unknown event type: {event.event_type}")
# Event asynchron publizieren
future = self.producer.send(
topic=topic,
key=event.aggregate_id,
value=event.event_data
)
# Warten auf Bestätigung
await asyncio.wrap_future(asyncio.ensure_future(
asyncio.create_task(self._wait_for_kafka_result(future))
))
# Als verarbeitet markieren
event.processed_at = datetime.utcnow()
session.commit()
print(f"Published event {event.id} to topic {topic}")
except Exception as e:
session.rollback()
print(f"Failed to publish event {event.id}: {e}")
raise
async def _wait_for_kafka_result(self, future):
# Kafka-Future in asyncio-kompatible Form bringen
import concurrent.futures
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, future.get, 10)
return result@Entity
public class OutboxEvent {
// ... andere Felder
@Column(name = "idempotency_key", unique = true)
private String idempotencyKey;
public static OutboxEvent createIdempotent(String aggregateId,
String eventType,
Object eventData) {
// Idempotenz-Key aus Aggregate-ID, Event-Type und Content-Hash
String contentHash = DigestUtils.sha256Hex(
objectMapper.writeValueAsString(eventData)
);
String idempotencyKey = String.format("%s:%s:%s",
aggregateId, eventType, contentHash);
return OutboxEvent.builder()
.idempotencyKey(idempotencyKey)
// ... andere Felder
.build();
}
}Das Outbox Pattern kann bei hohem Durchsatz Performance-Engpässe erzeugen. Optimierungsstrategien:
// Batch-Processing
@Scheduled(fixedDelay = 500)
@Transactional
public void publishEventsBatch() {
List<OutboxEvent> events = outboxRepository
.findByProcessedAtIsNullOrderByCreatedAt(PageRequest.of(0, 500));
if (events.isEmpty()) return;
// Batch-Publikation mit CompletableFuture
List<CompletableFuture<SendResult<String, Object>>> futures = events.stream()
.map(this::publishEventAsync)
.collect(Collectors.toList());
// Warten auf alle Publikationen
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> markAllAsProcessed(events))
.exceptionally(throwable -> {
log.error("Batch publication failed", throwable);
return null;
});
}Das Outbox Pattern gewährleistet transaktionale Konsistenz zwischen Geschäftsdaten und Event-Publikation, führt aber zusätzliche Komplexität und Latenz ein. Die Entscheidung für oder gegen das Pattern hängt von den Konsistenzanforderungen des jeweiligen Use Cases ab.