Die Transformation bestehender synchroner Systeme zu Event-getriebenen Architekturen erfordert die systematische Umwandlung von direkten Datenbankoperationen und REST-Calls in Event-Streams. Dieser Prozess macht implizite Geschäftsereignisse explizit sichtbar und ermöglicht lose gekoppelte Systemarchitekturen.
Datenbank-Trigger bieten den direktesten Weg, um Datenbankänderungen automatisch in Events zu überführen. Sie reagieren unmittelbar auf DML-Operationen (INSERT, UPDATE, DELETE) und können Business-Events generieren, ohne Anwendungscode zu modifizieren.
Ein typisches E-Commerce-Szenario nutzt Trigger, um Order-Status-Änderungen in Events zu verwandeln:
-- Event-Tabelle für ausgehende Events
CREATE TABLE order_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
order_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
published BOOLEAN DEFAULT FALSE
);
-- Trigger-Funktion für Order-Änderungen
CREATE OR REPLACE FUNCTION generate_order_events()
RETURNS TRIGGER AS $$
DECLARE
event_data JSONB;
BEGIN
-- Bei INSERT: OrderPlaced Event
IF TG_OP = 'INSERT' THEN
event_data := jsonb_build_object(
'orderId', NEW.id,
'customerId', NEW.customer_id,
'items', NEW.items,
'totalAmount', NEW.total_amount,
'status', NEW.status,
'timestamp', EXTRACT(EPOCH FROM NEW.created_at)
);
INSERT INTO order_events (order_id, event_type, event_data)
VALUES (NEW.id, 'OrderPlaced', event_data);
RETURN NEW;
END IF;
-- Bei UPDATE: Status-spezifische Events
IF TG_OP = 'UPDATE' THEN
-- Nur bei Statusänderung Event generieren
IF OLD.status != NEW.status THEN
event_data := jsonb_build_object(
'orderId', NEW.id,
'previousStatus', OLD.status,
'newStatus', NEW.status,
'timestamp', EXTRACT(EPOCH FROM NEW.updated_at)
);
-- Status-spezifische Event-Typen
CASE NEW.status
WHEN 'PAYMENT_CONFIRMED' THEN
INSERT INTO order_events (order_id, event_type, event_data)
VALUES (NEW.id, 'PaymentConfirmed', event_data);
WHEN 'SHIPPED' THEN
INSERT INTO order_events (order_id, event_type, event_data)
VALUES (NEW.id, 'OrderShipped', event_data);
WHEN 'DELIVERED' THEN
INSERT INTO order_events (order_id, event_type, event_data)
VALUES (NEW.id, 'OrderDelivered', event_data);
WHEN 'CANCELLED' THEN
INSERT INTO order_events (order_id, event_type, event_data)
VALUES (NEW.id, 'OrderCancelled', event_data);
END CASE;
END IF;
RETURN NEW;
END IF;
-- Bei DELETE: OrderCancelled Event
IF TG_OP = 'DELETE' THEN
event_data := jsonb_build_object(
'orderId', OLD.id,
'reason', 'HARD_DELETE',
'timestamp', EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
);
INSERT INTO order_events (order_id, event_type, event_data)
VALUES (OLD.id, 'OrderCancelled', event_data);
RETURN OLD;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Trigger an Order-Tabelle binden
CREATE TRIGGER order_event_trigger
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION generate_order_events();Ein Background-Service verarbeitet die durch Trigger generierten Events:
@Service
public class DatabaseEventPublisher {
private final JdbcTemplate jdbcTemplate;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Scheduled(fixedDelay = 1000)
@Transactional
public void publishPendingEvents() {
List<DatabaseEvent> pendingEvents = fetchUnpublishedEvents();
for (DatabaseEvent event : pendingEvents) {
try {
publishEvent(event);
markAsPublished(event.getId());
} catch (Exception e) {
log.error("Failed to publish event {}: {}", event.getId(), e.getMessage());
// Event bleibt unpubliziert für Retry
}
}
}
private List<DatabaseEvent> fetchUnpublishedEvents() {
String sql = """
SELECT id, order_id, event_type, event_data, created_at
FROM order_events
WHERE published = false
ORDER BY created_at
LIMIT 100
""";
return jdbcTemplate.query(sql, (rs, rowNum) ->
DatabaseEvent.builder()
.id(UUID.fromString(rs.getString("id")))
.orderId(rs.getString("order_id"))
.eventType(rs.getString("event_type"))
.eventData(rs.getString("event_data"))
.createdAt(rs.getTimestamp("created_at").toInstant())
.build()
);
}
private void publishEvent(DatabaseEvent dbEvent) {
String topic = determineTopicFromEventType(dbEvent.getEventType());
// JSON zu Event-Objekt transformieren
Object eventPayload = transformToEventPayload(dbEvent);
kafkaTemplate.send(topic, dbEvent.getOrderId(), eventPayload);
log.info("Published {} event for order {}",
dbEvent.getEventType(), dbEvent.getOrderId());
}
private String determineTopicFromEventType(String eventType) {
return switch (eventType) {
case "OrderPlaced" -> "order.placed.v1";
case "PaymentConfirmed" -> "payment.confirmed.v1";
case "OrderShipped" -> "order.shipped.v1";
case "OrderDelivered" -> "order.delivered.v1";
case "OrderCancelled" -> "order.cancelled.v1";
default -> "order.unknown.v1";
};
}
}import psycopg2
import json
from kafka import KafkaProducer
import asyncio
from typing import List, Dict
class DatabaseEventProcessor:
def __init__(self, db_config, kafka_config):
self.db_config = db_config
self.producer = KafkaProducer(
bootstrap_servers=kafka_config['servers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8')
)
self.topic_mapping = {
'OrderPlaced': 'order.placed.v1',
'PaymentConfirmed': 'payment.confirmed.v1',
'OrderShipped': 'order.shipped.v1',
'OrderDelivered': 'order.delivered.v1',
'OrderCancelled': 'order.cancelled.v1'
}
async def process_pending_events(self):
while True:
try:
events = self._fetch_unpublished_events()
for event in events:
await self._publish_event(event)
self._mark_as_published(event['id'])
await asyncio.sleep(1) # 1 Sekunde Pause
except Exception as e:
print(f"Error processing events: {e}")
await asyncio.sleep(5)
def _fetch_unpublished_events(self) -> List[Dict]:
conn = psycopg2.connect(**self.db_config)
try:
with conn.cursor() as cursor:
cursor.execute("""
SELECT id, order_id, event_type, event_data, created_at
FROM order_events
WHERE published = false
ORDER BY created_at
LIMIT 100
""")
events = []
for row in cursor.fetchall():
events.append({
'id': row[0],
'order_id': row[1],
'event_type': row[2],
'event_data': json.loads(row[3]),
'created_at': row[4]
})
return events
finally:
conn.close()
async def _publish_event(self, event):
topic = self.topic_mapping.get(event['event_type'], 'order.unknown.v1')
# Event-Payload zusammenstellen
payload = {
'eventId': event['id'],
'eventType': event['event_type'],
'timestamp': event['created_at'].isoformat(),
'data': event['event_data']
}
future = self.producer.send(
topic=topic,
key=event['order_id'],
value=payload
)
# Kafka-Bestätigung abwarten
record_metadata = future.get(timeout=10)
print(f"Published {event['event_type']} to {topic}:{record_metadata.partition}")Die Integration von Event-Generierung direkt in die Anwendungslogik bietet mehr Kontrolle über Business-Semantik und Event-Timing, erfordert aber Änderungen am bestehenden Code.
Bestehende REST-Endpoints werden um Event-Publikation erweitert:
@RestController
@RequestMapping("/api/v1/orders")
public class OrderController {
private final OrderService orderService;
private final OrderEventPublisher eventPublisher;
@PostMapping
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest request) {
// 1. Geschäftslogik ausführen
Order order = orderService.createOrder(request);
// 2. Event publizieren
OrderPlacedEvent event = OrderPlacedEvent.builder()
.orderId(order.getId())
.customerId(order.getCustomerId())
.items(order.getItems())
.totalAmount(order.getTotalAmount())
.timestamp(order.getCreatedAt())
.build();
eventPublisher.publishOrderPlaced(event);
// 3. HTTP-Response zurückgeben
return ResponseEntity.ok(OrderResponse.from(order));
}
@PutMapping("/{orderId}/status")
public ResponseEntity<OrderResponse> updateOrderStatus(
@PathVariable String orderId,
@RequestBody UpdateStatusRequest request) {
Order order = orderService.updateStatus(orderId, request.getStatus());
// Status-spezifische Events publizieren
publishStatusChangeEvent(order, request.getStatus());
return ResponseEntity.ok(OrderResponse.from(order));
}
private void publishStatusChangeEvent(Order order, OrderStatus newStatus) {
switch (newStatus) {
case PAYMENT_CONFIRMED:
eventPublisher.publishPaymentConfirmed(
PaymentConfirmedEvent.from(order)
);
break;
case SHIPPED:
eventPublisher.publishOrderShipped(
OrderShippedEvent.from(order)
);
break;
case DELIVERED:
eventPublisher.publishOrderDelivered(
OrderDeliveredEvent.from(order)
);
break;
case CANCELLED:
eventPublisher.publishOrderCancelled(
OrderCancelledEvent.from(order)
);
break;
}
}
}Events werden in der Service-Schicht generiert, um Business-Logik und Event-Publikation zu koppeln:
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final ApplicationEventPublisher applicationEventPublisher;
public Order createOrder(CreateOrderRequest request) {
Order order = Order.builder()
.id(UUID.randomUUID().toString())
.customerId(request.getCustomerId())
.items(request.getItems())
.totalAmount(calculateTotal(request.getItems()))
.status(OrderStatus.PLACED)
.createdAt(Instant.now())
.build();
Order savedOrder = orderRepository.save(order);
// Domain Event publizieren (Spring ApplicationEvent)
applicationEventPublisher.publishEvent(
new OrderPlacedDomainEvent(savedOrder)
);
return savedOrder;
}
public Order processPayment(String orderId, PaymentInfo paymentInfo) {
Order order = findOrderById(orderId);
// Geschäftslogik: Zahlung verarbeiten
order.confirmPayment(paymentInfo);
Order savedOrder = orderRepository.save(order);
// Domain Event publizieren
applicationEventPublisher.publishEvent(
new PaymentConfirmedDomainEvent(savedOrder, paymentInfo)
);
return savedOrder;
}
}
// Domain Event zu Kafka Event Adapter
@Component
public class DomainEventToKafkaAdapter {
private final OrderEventPublisher kafkaEventPublisher;
@EventListener
@Async
public void handleOrderPlaced(OrderPlacedDomainEvent domainEvent) {
OrderPlacedEvent kafkaEvent = transformToKafkaEvent(domainEvent);
kafkaEventPublisher.publishOrderPlaced(kafkaEvent);
}
@EventListener
@Async
public void handlePaymentConfirmed(PaymentConfirmedDomainEvent domainEvent) {
PaymentConfirmedEvent kafkaEvent = transformToKafkaEvent(domainEvent);
kafkaEventPublisher.publishPaymentConfirmed(kafkaEvent);
}
}from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
app = FastAPI()
class CreateOrderRequest(BaseModel):
customer_id: str
items: list
total_amount: float
class OrderService:
def __init__(self, event_publisher):
self.event_publisher = event_publisher
self.orders = {} # Simplified in-memory storage
async def create_order(self, request: CreateOrderRequest):
order = {
'id': str(uuid.uuid4()),
'customer_id': request.customer_id,
'items': request.items,
'total_amount': request.total_amount,
'status': 'PLACED',
'created_at': datetime.utcnow()
}
# Geschäftsdaten speichern
self.orders[order['id']] = order
# Event publizieren
await self.event_publisher.publish_order_placed(order)
return order
@app.post("/api/v1/orders")
async def create_order(request: CreateOrderRequest):
try:
order = await order_service.create_order(request)
return {"order": order}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put("/api/v1/orders/{order_id}/status")
async def update_order_status(order_id: str, status: str):
try:
order = await order_service.update_status(order_id, status)
# Status-spezifische Events
await publish_status_event(order, status)
return {"order": order}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def publish_status_event(order, status):
event_mapping = {
'PAYMENT_CONFIRMED': event_publisher.publish_payment_confirmed,
'SHIPPED': event_publisher.publish_order_shipped,
'DELIVERED': event_publisher.publish_order_delivered,
'CANCELLED': event_publisher.publish_order_cancelled
}
publisher_func = event_mapping.get(status)
if publisher_func:
await publisher_func(order)Transaction Log Mining extrahiert Events direkt aus den Datenbank-WAL-Files (Write-Ahead Logs), ohne Trigger oder Anwendungsänderungen. Diese Technik bietet die geringste Invasivität bei hoher Performance.
@Component
public class PostgreSQLWALMiner {
private final PGReplicationStream replicationStream;
private final EventTransformer eventTransformer;
private final KafkaEventPublisher eventPublisher;
@PostConstruct
public void startWALMining() {
CompletableFuture.runAsync(this::processWALChanges);
}
private void processWALChanges() {
try {
while (true) {
ByteBuffer walMessage = replicationStream.read();
if (walMessage != null) {
processWALMessage(walMessage);
}
Thread.sleep(100); // Kurze Pause zwischen Reads
}
} catch (Exception e) {
log.error("WAL mining failed", e);
// Restart-Logik
}
}
private void processWALMessage(ByteBuffer walMessage) {
try {
// WAL-Nachricht zu LogicalReplicationMessage dekodieren
LogicalReplicationMessage message = LogicalReplicationMessage
.decode(walMessage);
if (message instanceof PgOutputMessage) {
PgOutputMessage pgOutput = (PgOutputMessage) message;
processTableChange(pgOutput);
}
} catch (Exception e) {
log.error("Failed to process WAL message", e);
}
}
private void processTableChange(PgOutputMessage message) {
String tableName = message.getTableName();
// Nur relevante Tabellen verarbeiten
if (!isBusinessRelevantTable(tableName)) {
return;
}
switch (message.getOperation()) {
case INSERT:
handleInsert(tableName, message.getNewValues());
break;
case UPDATE:
handleUpdate(tableName, message.getOldValues(), message.getNewValues());
break;
case DELETE:
handleDelete(tableName, message.getOldValues());
break;
}
}
private void handleUpdate(String tableName, Map<String, Object> oldValues,
Map<String, Object> newValues) {
if ("orders".equals(tableName)) {
String oldStatus = (String) oldValues.get("status");
String newStatus = (String) newValues.get("status");
// Nur bei Statusänderung Event generieren
if (!Objects.equals(oldStatus, newStatus)) {
OrderStatusChangedEvent event = eventTransformer
.createStatusChangeEvent(oldValues, newValues);
eventPublisher.publishStatusChange(event);
}
}
}
}import psycopg2
from psycopg2.extras import LogicalReplicationConnection
import json
class PostgreSQLWALMiner:
def __init__(self, db_config, event_publisher):
self.db_config = db_config
self.event_publisher = event_publisher
self.slot_name = 'order_events_slot'
async def start_wal_mining(self):
conn = psycopg2.connect(
connection_factory=LogicalReplicationConnection,
**self.db_config
)
try:
# Replication Slot erstellen (falls nicht vorhanden)
await self._ensure_replication_slot(conn)
# WAL-Stream konsumieren
await self._consume_wal_stream(conn)
finally:
conn.close()
async def _consume_wal_stream(self, conn):
cursor = conn.cursor()
cursor.start_replication(
slot_name=self.slot_name,
decode=True,
options={'pretty-print': 1}
)
async for message in cursor:
await self._process_wal_message(message)
message.cursor.send_feedback(flush_lsn=message.data_start)
async def _process_wal_message(self, message):
try:
# WAL-Message parsen (Format abhängig von Plugin)
change_data = self._parse_wal_message(message.payload)
if self._is_business_relevant(change_data):
events = self._transform_to_events(change_data)
for event in events:
await self.event_publisher.publish(event)
except Exception as e:
print(f"Failed to process WAL message: {e}")
def _parse_wal_message(self, payload):
# Beispiel für pgoutput-Plugin Format
lines = payload.strip().split('\n')
if lines[0].startswith('BEGIN'):
return None # Transaction start, ignorieren
elif lines[0].startswith('COMMIT'):
return None # Transaction end, ignorieren
elif lines[0].startswith('table'):
return self._parse_table_change(lines)
return None
def _transform_to_events(self, change_data):
events = []
if change_data['table'] == 'orders':
if change_data['operation'] == 'INSERT':
events.append(self._create_order_placed_event(change_data))
elif change_data['operation'] == 'UPDATE':
events.extend(self._create_order_update_events(change_data))
return events| Ansatz | Invasivität | Performance | Flexibilität | Maintenance |
|---|---|---|---|---|
| Database Triggers | Niedrig | Mittel | Mittel | Hoch (DB-abhängig) |
| Application-level | Hoch | Hoch | Sehr hoch | Niedrig |
| Transaction Log Mining | Sehr niedrig | Sehr hoch | Niedrig | Mittel |
Database Triggers eignen sich für: - Legacy-Systeme ohne Änderungsmöglichkeit - Einfache Event-Szenarien - Prototyping und Migration
Application-level Event Generation eignet sich für: - Neue Entwicklungen - Komplexe Business-Event-Logik - Starke Kontrolle über Event-Timing
Transaction Log Mining eignet sich für: - High-Performance-Anforderungen - Minimal-invasive Integration - Reine Datenreplikation
Die Wahl des Ansatzes hängt von Legacy-Constraints, Performance-Anforderungen und der gewünschten Kontrolle über Event-Semantik ab. Oft werden mehrere Ansätze kombiniert: Transaction Log Mining für Datenreplikation und Application-level Events für komplexe Geschäftslogik.