39 Outbox Pattern

Das Outbox Pattern löst eines der fundamentalsten Probleme in Event-Driven Architecture: Wie kann sichergestellt werden, dass Geschäftsdaten und Event-Publikation atomisch erfolgen? Die Herausforderung entsteht, weil Datenbankoperationen und Message-Broker-Calls unterschiedliche Transaktionssysteme verwenden.

39.0.1 Das Dual Writes Problem

In traditionellen Ansätzen führen Geschäftsoperationen oft zu zwei separaten Schreibvorgängen:

@Transactional
public void processOrder(OrderRequest request) {
    // 1. Geschäftsdaten in Datenbank schreiben
    Order order = new Order(request);
    orderRepository.save(order);
    
    // 2. Event publizieren
    OrderPlacedEvent event = new OrderPlacedEvent(order);
    kafkaTemplate.send("order.placed.v1", event);
    
    // Problem: Was passiert, wenn Kafka nicht erreichbar ist?
}

Diese Implementierung ist nicht transaktional konsistent. Verschiedene Failure-Szenarien führen zu inkonsistenten Zuständen:

Szenario Datenbank Event-Publikation Resultat
Erfolg ✅ Gespeichert ✅ Publiziert Konsistent
DB-Fehler ❌ Rollback ❌ Nicht publiziert Konsistent
Kafka-Fehler ✅ Committed ❌ Nicht publiziert Inkonsistent
Netzwerk-Timeout ✅ Committed ❓ Unbekannt Inkonsistent

Das Dual Writes Problem manifestiert sich besonders kritisch bei Geschäftsprozessen, die auf Events angewiesen sind. Wenn die Zahlung erfolgreich verbucht, aber das PaymentProcessedEvent nicht publiziert wird, bleibt die Bestellung in einem unvollständigen Zustand.

39.0.2 Transactional Outbox Implementation

Das Outbox Pattern umgeht das Problem, indem Events als Teil derselben Datenbanktransaktion gespeichert werden. Ein separater Prozess liest diese Events und publiziert sie asynchron.

39.0.2.1 Outbox-Tabellenstruktur

CREATE TABLE outbox_events (
    id UUID PRIMARY KEY,
    aggregate_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    processed_at TIMESTAMP NULL,
    version INTEGER DEFAULT 1
);

CREATE INDEX idx_outbox_unprocessed ON outbox_events(processed_at) 
WHERE processed_at IS NULL;

39.0.2.2 Transaktionale Event-Speicherung

@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
    @Id
    private UUID id;
    
    @Column(name = "aggregate_id")
    private String aggregateId;
    
    @Column(name = "event_type") 
    private String eventType;
    
    @Column(name = "event_data", columnDefinition = "jsonb")
    private String eventData;
    
    @Column(name = "created_at")
    private LocalDateTime createdAt;
    
    @Column(name = "processed_at")
    private LocalDateTime processedAt;
    
    // Konstruktoren, Getter, Setter
}

@Service
@Transactional
public class OrderService {
    
    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxRepository;
    private final ObjectMapper objectMapper;
    
    public void processOrder(OrderRequest request) {
        // 1. Geschäftsdaten speichern
        Order order = new Order(request);
        order = orderRepository.save(order);
        
        // 2. Event in Outbox speichern (SAME TRANSACTION)
        OutboxEvent outboxEvent = createOutboxEvent(order);
        outboxRepository.save(outboxEvent);
        
        // Beide Operationen sind atomisch
    }
    
    private OutboxEvent createOutboxEvent(Order order) {
        try {
            OrderPlacedEvent event = OrderPlacedEvent.builder()
                .orderId(order.getId())
                .customerId(order.getCustomerId())
                .items(order.getItems())
                .totalAmount(order.getTotalAmount())
                .timestamp(order.getCreatedAt())
                .build();
            
            return OutboxEvent.builder()
                .id(UUID.randomUUID())
                .aggregateId(order.getId())
                .eventType("OrderPlaced")
                .eventData(objectMapper.writeValueAsString(event))
                .createdAt(LocalDateTime.now())
                .build();
                
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize event", e);
        }
    }
}

39.0.2.3 Event Publisher (Separater Prozess)

@Component
public class OutboxEventPublisher {
    
    private final OutboxEventRepository outboxRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;
    
    @Scheduled(fixedDelay = 1000) // Alle 1 Sekunde
    @Transactional
    public void publishPendingEvents() {
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByProcessedAtIsNullOrderByCreatedAt(PageRequest.of(0, 100));
        
        for (OutboxEvent event : pendingEvents) {
            try {
                publishEvent(event);
                markAsProcessed(event);
                
            } catch (Exception e) {
                log.error("Failed to publish event {}: {}", 
                    event.getId(), e.getMessage());
                // Event bleibt unverarbeitet für nächsten Retry
            }
        }
    }
    
    private void publishEvent(OutboxEvent outboxEvent) {
        String topic = determineTopicFromEventType(outboxEvent.getEventType());
        String key = outboxEvent.getAggregateId();
        
        // Deserialisierung und Publikation
        Object eventPayload = deserializeEvent(outboxEvent);
        kafkaTemplate.send(topic, key, eventPayload);
        
        log.info("Published event {} to topic {}", 
            outboxEvent.getId(), topic);
    }
    
    private void markAsProcessed(OutboxEvent event) {
        event.setProcessedAt(LocalDateTime.now());
        outboxRepository.save(event);
    }
}

39.0.3 Spring Boot Implementation

Eine vollständige Spring Boot-Integration nutzt Spring Data JPA und Spring Kafka:

// Configuration
@Configuration
@EnableJpaRepositories
@EnableKafka
@EnableScheduling
public class OutboxConfiguration {
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        return new DefaultKafkaProducerFactory<>(props);
    }
}

// Repository
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, UUID> {
    
    @Query("SELECT e FROM OutboxEvent e WHERE e.processedAt IS NULL ORDER BY e.createdAt")
    List<OutboxEvent> findByProcessedAtIsNullOrderByCreatedAt(Pageable pageable);
    
    @Modifying
    @Query("DELETE FROM OutboxEvent e WHERE e.processedAt < :cutoff")
    void deleteProcessedEventsBefore(@Param("cutoff") LocalDateTime cutoff);
}

// Event Publisher Service
@Service
public class OutboxEventPublisher {
    
    private static final Map<String, String> EVENT_TYPE_TO_TOPIC = Map.of(
        "OrderPlaced", "order.placed.v1",
        "OrderCancelled", "order.cancelled.v1",
        "PaymentProcessed", "payment.processed.v1"
    );
    
    // Implementation wie oben...
    
    @Scheduled(cron = "0 0 2 * * ?") // Täglich 2:00 Uhr
    @Transactional
    public void cleanupProcessedEvents() {
        LocalDateTime cutoff = LocalDateTime.now().minusDays(7);
        outboxRepository.deleteProcessedEventsBefore(cutoff);
        log.info("Cleaned up processed events older than {}", cutoff);
    }
}

39.0.4 Python Implementation

Python-Implementierung mit SQLAlchemy und asyncio:

# Models
from sqlalchemy import Column, String, DateTime, Text, Integer
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.declarative import declarative_base
import uuid
from datetime import datetime

Base = declarative_base()

class OutboxEvent(Base):
    __tablename__ = 'outbox_events'
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    aggregate_id = Column(String(255), nullable=False)
    event_type = Column(String(255), nullable=False)
    event_data = Column(JSONB, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    processed_at = Column(DateTime, nullable=True)
    version = Column(Integer, default=1)

# Service
import json
from sqlalchemy.orm import Session
from kafka import KafkaProducer

class OrderService:
    def __init__(self, session: Session):
        self.session = session
    
    def process_order(self, order_request: dict):
        try:
            self.session.begin()
            
            # 1. Geschäftsdaten speichern
            order = self._create_order(order_request)
            self.session.add(order)
            
            # 2. Event in Outbox speichern
            outbox_event = self._create_outbox_event(order)
            self.session.add(outbox_event)
            
            self.session.commit()
            
        except Exception as e:
            self.session.rollback()
            raise e
    
    def _create_outbox_event(self, order):
        event_data = {
            'orderId': str(order.id),
            'customerId': str(order.customer_id),
            'items': order.items,
            'totalAmount': float(order.total_amount),
            'timestamp': order.created_at.isoformat()
        }
        
        return OutboxEvent(
            aggregate_id=str(order.id),
            event_type='OrderPlaced',
            event_data=event_data
        )

# Async Event Publisher
import asyncio
from kafka import KafkaProducer

class OutboxEventPublisher:
    def __init__(self, session_factory, kafka_config):
        self.session_factory = session_factory
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_config['bootstrap_servers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',
            retries=3,
            enable_idempotence=True
        )
        
        self.topic_mapping = {
            'OrderPlaced': 'order.placed.v1',
            'OrderCancelled': 'order.cancelled.v1',
            'PaymentProcessed': 'payment.processed.v1'
        }
    
    async def start_publishing(self):
        while True:
            try:
                await self._publish_pending_events()
                await asyncio.sleep(1)  # 1 Sekunde Pause
            except Exception as e:
                print(f"Error in event publisher: {e}")
                await asyncio.sleep(5)  # Längere Pause bei Fehlern
    
    async def _publish_pending_events(self):
        session = self.session_factory()
        try:
            pending_events = session.query(OutboxEvent)\
                .filter(OutboxEvent.processed_at.is_(None))\
                .order_by(OutboxEvent.created_at)\
                .limit(100)\
                .all()
            
            for event in pending_events:
                await self._publish_event(event, session)
                
        finally:
            session.close()
    
    async def _publish_event(self, event: OutboxEvent, session: Session):
        try:
            topic = self.topic_mapping.get(event.event_type)
            if not topic:
                raise ValueError(f"Unknown event type: {event.event_type}")
            
            # Event asynchron publizieren
            future = self.producer.send(
                topic=topic,
                key=event.aggregate_id,
                value=event.event_data
            )
            
            # Warten auf Bestätigung
            await asyncio.wrap_future(asyncio.ensure_future(
                asyncio.create_task(self._wait_for_kafka_result(future))
            ))
            
            # Als verarbeitet markieren
            event.processed_at = datetime.utcnow()
            session.commit()
            
            print(f"Published event {event.id} to topic {topic}")
            
        except Exception as e:
            session.rollback()
            print(f"Failed to publish event {event.id}: {e}")
            raise
    
    async def _wait_for_kafka_result(self, future):
        # Kafka-Future in asyncio-kompatible Form bringen
        import concurrent.futures
        loop = asyncio.get_event_loop()
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            result = await loop.run_in_executor(executor, future.get, 10)
            return result

39.0.5 Erweiterte Implementierungsaspekte

39.0.5.1 Idempotenz und Duplicate Detection

@Entity
public class OutboxEvent {
    // ... andere Felder
    
    @Column(name = "idempotency_key", unique = true)
    private String idempotencyKey;
    
    public static OutboxEvent createIdempotent(String aggregateId, 
                                             String eventType, 
                                             Object eventData) {
        // Idempotenz-Key aus Aggregate-ID, Event-Type und Content-Hash
        String contentHash = DigestUtils.sha256Hex(
            objectMapper.writeValueAsString(eventData)
        );
        
        String idempotencyKey = String.format("%s:%s:%s", 
            aggregateId, eventType, contentHash);
        
        return OutboxEvent.builder()
            .idempotencyKey(idempotencyKey)
            // ... andere Felder
            .build();
    }
}

39.0.5.2 Performance-Optimierungen

Das Outbox Pattern kann bei hohem Durchsatz Performance-Engpässe erzeugen. Optimierungsstrategien:

// Batch-Processing
@Scheduled(fixedDelay = 500)
@Transactional
public void publishEventsBatch() {
    List<OutboxEvent> events = outboxRepository
        .findByProcessedAtIsNullOrderByCreatedAt(PageRequest.of(0, 500));
    
    if (events.isEmpty()) return;
    
    // Batch-Publikation mit CompletableFuture
    List<CompletableFuture<SendResult<String, Object>>> futures = events.stream()
        .map(this::publishEventAsync)
        .collect(Collectors.toList());
    
    // Warten auf alle Publikationen
    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenRun(() -> markAllAsProcessed(events))
        .exceptionally(throwable -> {
            log.error("Batch publication failed", throwable);
            return null;
        });
}

Das Outbox Pattern gewährleistet transaktionale Konsistenz zwischen Geschäftsdaten und Event-Publikation, führt aber zusätzliche Komplexität und Latenz ein. Die Entscheidung für oder gegen das Pattern hängt von den Konsistenzanforderungen des jeweiligen Use Cases ab.