Wenn Sie Event-Driven Architecture mit Kafka umsetzen, müssen Sie nicht zum Kafka-Administrator werden. Dennoch gibt es operationale Grundlagen, die jeder Entwickler verstehen sollte, um robuste Anwendungen zu bauen und Produktionsprobleme zu verstehen.
Kafka läuft als verteiltes System aus mehreren Brokern. Ein Broker ist eine Kafka-Instanz, mehrere Broker bilden ein Cluster. Für Entwickler relevant:
Redundanz durch Replikation:
# docker-compose.yml - Minimales 3-Broker Setup
services:
kafka-1:
image: confluentinc/cp-kafka:7.4.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
kafka-2:
image: confluentinc/cp-kafka:7.4.0
environment:
KAFKA_BROKER_ID: 2
# ... ähnliche KonfigurationPartition und Replikation: Jeder Topic wird in Partitionen aufgeteilt. Jede Partition kann auf mehreren Brokern repliziert werden. Als Entwickler definieren Sie die Replikation beim Topic-Setup:
| Aspekt | Entwickler-Relevanz | Operational Impact |
|---|---|---|
| Partitionen | Parallelität der Consumer | Skalierung und Durchsatz |
| Replikation | Datensicherheit | Ausfallsicherheit |
| Leader/Follower | Keine direkte Auswirkung | Automatic Failover |
Topics sollten explizit erstellt werden, nicht automatisch zur Laufzeit:
Spring Boot Topic-Konfiguration:
@Configuration
public class TopicConfiguration {
@Bean
public NewTopic orderPlacedTopic() {
return TopicBuilder.name("order.placed.v1")
.partitions(3)
.replicas(3)
.build();
}
}Python Topic-Erstellung:
from kafka.admin import KafkaAdminClient, NewTopic
def create_topics():
admin = KafkaAdminClient(
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092']
)
topics = [
NewTopic(
name='order.placed.v1',
num_partitions=3,
replication_factor=3
)
]
admin.create_topics(topics)Offsets zeigen, welche Messages ein Consumer bereits verarbeitet hat. Drei kritische Konzepte:
Auto-Commit vs. Manual-Commit:
// Spring Boot - Automatisches Commit
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlaced(@Payload OrderPlaced order) {
paymentService.processPayment(order);
// Offset wird automatisch committed
}
// Manuelles Commit für kritische Verarbeitung
@KafkaListener(topics = "order.placed.v1")
public void handleOrderPlacedManual(
@Payload OrderPlaced order,
Acknowledgment acknowledgment) {
try {
paymentService.processPayment(order);
acknowledgment.acknowledge(); // Explizites Commit
} catch (Exception e) {
// Kein Commit - Message wird wiederholt
log.error("Payment processing failed", e);
}
}Die wichtigsten Parameter, die Entwickler verstehen müssen:
| Parameter | Zweck | Empfehlung |
|---|---|---|
acks |
Bestätigung der Replikation | all für Datensicherheit |
retries |
Wiederholung bei Fehlern | Hoch (Int.MAX_VALUE) |
batch.size |
Batching für Performance | 16384 (Standard) |
linger.ms |
Wartezeit für Batching | 5-100ms je nach Latenz-Anforderung |
Spring Boot Producer-Konfiguration:
# application.yml
spring:
kafka:
producer:
acks: all
retries: 2147483647
properties:
linger.ms: 50
batch.size: 16384
enable.idempotence: truePython Producer-Konfiguration:
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['kafka-1:9092', 'kafka-2:9092'],
acks='all',
retries=2147483647,
linger_ms=50,
batch_size=16384,
enable_idempotence=True
)Entscheidende Parameter für stabile Consumer:
| Parameter | Zweck | Operational Impact |
|---|---|---|
enable.auto.commit |
Automatisches Offset-Commit | false für kritische Anwendungen |
auto.offset.reset |
Verhalten bei fehlendem Offset | earliest für Vollständigkeit |
max.poll.records |
Messages pro Poll-Zyklus | Speicher vs. Durchsatz |
session.timeout.ms |
Consumer-Group Timeout | Rebalancing-Verhalten |
Spring Boot Consumer-Konfiguration:
spring:
kafka:
consumer:
group-id: payment-service
auto-offset-reset: earliest
enable-auto-commit: false
properties:
max.poll.records: 100
session.timeout.ms: 30000Für kritische Anwendungsfälle wie Zahlungsverarbeitung:
@Component
public class IdempotentPaymentProcessor {
private final Set<String> processedOrders = ConcurrentHashMap.newKeySet();
@KafkaListener(topics = "order.placed.v1")
@Transactional
public void handleOrderPlaced(@Payload OrderPlaced order) {
String orderId = order.getOrderId();
// Idempotenz-Check
if (processedOrders.contains(orderId)) {
log.info("Order {} already processed, skipping", orderId);
return;
}
paymentService.processPayment(order);
processedOrders.add(orderId);
}
}Consumer Lag entsteht, wenn Consumer nicht schnell genug verarbeiten. Typische Szenarien:
Monitoring von Consumer Lag:
@Component
public class ConsumerLagMonitor {
private final MeterRegistry meterRegistry;
@EventListener
public void onConsumerLag(ConsumerLagEvent event) {
Gauge.builder("kafka.consumer.lag")
.tags("topic", event.getTopic(),
"partition", String.valueOf(event.getPartition()))
.register(meterRegistry)
.set(event.getLag());
}
}Lag-Behandlung in Python:
import time
from kafka import KafkaConsumer
def monitor_and_process():
consumer = KafkaConsumer(
'order.placed.v1',
group_id='payment-service',
auto_offset_reset='earliest'
)
for message in consumer:
start_time = time.time()
try:
process_order(message.value)
processing_time = time.time() - start_time
# Lag-Warnung bei langsamer Verarbeitung
if processing_time > 5.0:
log.warning(f"Slow processing: {processing_time}s")
except Exception as e:
log.error(f"Processing failed: {e}")
# Message wird nicht committed - automatische WiederholungRebalancing tritt auf, wenn Consumer einer Gruppe hinzugefügt oder entfernt werden:
| Rebalancing-Trigger | Auswirkung | Mitigation |
|---|---|---|
| Consumer-Neustart | Kurze Verarbeitungspause | Graceful Shutdown |
| Consumer-Absturz | Längere Pause | Health Checks |
| Netzwerkprobleme | Session Timeout | Robuste Netzwerkkonfiguration |
Graceful Shutdown in Spring Boot:
@PreDestroy
public void shutdown() {
log.info("Shutting down payment processor gracefully");
// Aktuelle Verarbeitung abschließen
if (currentProcessing != null) {
currentProcessing.complete();
}
// Consumer explizit schließen
if (kafkaConsumer != null) {
kafkaConsumer.close(Duration.ofSeconds(30));
}
}Wenn Sie Event-Strukturen ändern, können Kompatibilitätsprobleme auftreten:
Backward-Compatible Schema Evolution:
// Version 1
public class OrderPlaced {
private String orderId;
private BigDecimal totalAmount;
// ...
}
// Version 2 - Backward Compatible
public class OrderPlaced {
private String orderId;
private BigDecimal totalAmount;
private String currency = "EUR"; // Default für alte Events
private List<String> tags = new ArrayList<>(); // Optional neue Felder
// ...
}Für Messages, die nicht verarbeitet werden können:
@Component
public class OrderProcessor {
@Retryable(value = {TransientException.class}, maxAttempts = 3)
@KafkaListener(topics = "order.placed.v1")
public void handleOrder(@Payload OrderPlaced order) {
try {
processOrder(order);
} catch (TransientException e) {
// Wird automatisch wiederholt
throw e;
} catch (Exception e) {
// Permanent error - an Dead Letter Topic
sendToDeadLetterTopic(order, e);
}
}
@Recover
public void handleFailedOrder(TransientException ex, OrderPlaced order) {
// Nach allen Wiederholungen
sendToDeadLetterTopic(order, ex);
}
}Diese operationalen Grundlagen helfen Ihnen, Event-Driven Architecture mit Kafka erfolgreich umzusetzen, ohne sich in administrativen Details zu verlieren. Der Fokus liegt darauf, was für die tägliche Entwicklungsarbeit relevant ist und häufige Produktionsprobleme zu vermeiden.