27 Push vs. Pull-Verarbeitung

In Event-Driven Architectures gibt es zwei fundamentale Verarbeitungsmodelle für Consumer: Push-basierte und Pull-basierte Verarbeitung. Diese Unterscheidung beeinflusst maßgeblich die Architektur der Consumer-Komponenten und deren Verhalten unter Last.

27.1 Konzeptionelle Unterschiede

Pull-basierte Verarbeitung bedeutet, dass der Consumer aktiv Nachrichten vom Broker anfordert. Der Consumer bestimmt dabei Zeitpunkt und Geschwindigkeit der Verarbeitung. Bei Kafka ist dies das Standard-Verarbeitungsmodell.

Push-basierte Verarbeitung hingegen bedeutet, dass der Broker oder ein anderer Mechanismus Nachrichten an den Consumer “drückt”. Der Consumer reagiert passiv auf eingehende Events.

In unserem E-Commerce-Szenario verdeutlicht sich der Unterschied am PaymentService:

Pull-Modell: Der PaymentService fragt regelmäßig: “Gibt es neue OrderPlaced-Events zu verarbeiten?”

Push-Modell: Das System benachrichtigt den PaymentService: “Hier ist ein neues OrderPlaced-Event!”

27.2 Polling Strategies

27.2.1 Kontinuierliches Polling

Das kontinuierliche Polling ist die einfachste Form der Pull-Verarbeitung. Der Consumer fragt in regelmäßigen Abständen nach neuen Events.

Spring Boot Implementierung:

@Component
public class PaymentService {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        // Spring Boot verwaltet das Polling automatisch
        processPayment(event.getOrderId(), event.getTotalAmount());
    }
    
    private void processPayment(String orderId, BigDecimal amount) {
        // Geschäftslogik zur Zahlungsverarbeitung
        log.info("Processing payment for order {} with amount {}", 
                 orderId, amount);
    }
}

Python Implementierung:

class PaymentService:
    def __init__(self, kafka_config):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        
    async def poll_and_process(self):
        while True:
            messages = self.consumer.poll(timeout=1.0)
            
            for topic_partition, msgs in messages.items():
                for message in msgs:
                    await self.handle_order_placed(message.value)
                    
            await asyncio.sleep(0.1)  # Kurze Pause zwischen Polls
    
    async def handle_order_placed(self, event_data):
        order_data = json.loads(event_data)
        await self.process_payment(
            order_data['orderId'], 
            order_data['totalAmount']
        )

27.2.2 Adaptives Polling

Beim adaptiven Polling passt sich die Polling-Frequenz an die Nachrichtenlast an. Bei hohem Durchsatz wird häufiger gepollt, bei niedriger Last weniger oft.

class AdaptivePaymentService:
    def __init__(self, kafka_config):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.poll_interval = 1.0  # Startwert in Sekunden
        
    async def adaptive_poll_and_process(self):
        while True:
            start_time = time.time()
            messages = self.consumer.poll(timeout=self.poll_interval)
            message_count = sum(len(msgs) for msgs in messages.values())
            
            # Verarbeitung der Nachrichten
            for topic_partition, msgs in messages.items():
                for message in msgs:
                    await self.handle_order_placed(message.value)
            
            # Anpassung der Polling-Frequenz
            self.adjust_poll_interval(message_count)
            
            processing_time = time.time() - start_time
            sleep_time = max(0.01, self.poll_interval - processing_time)
            await asyncio.sleep(sleep_time)
    
    def adjust_poll_interval(self, message_count):
        if message_count > 10:
            self.poll_interval = max(0.1, self.poll_interval * 0.8)
        elif message_count == 0:
            self.poll_interval = min(5.0, self.poll_interval * 1.2)

27.2.3 Batch-Polling

Beim Batch-Polling werden mehrere Events gleichzeitig abgeholt und als Gruppe verarbeitet, was die Effizienz bei hohem Durchsatz steigert.

@Component
public class BatchPaymentService {
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlacedBatch(List<OrderPlacedEvent> events) {
        List<PaymentTask> paymentTasks = events.stream()
            .map(this::createPaymentTask)
            .collect(Collectors.toList());
            
        processBatchPayments(paymentTasks);
    }
    
    private void processBatchPayments(List<PaymentTask> tasks) {
        // Batch-Verarbeitung für bessere Performance
        log.info("Processing {} payments in batch", tasks.size());
        tasks.forEach(this::executePayment);
    }
}

27.3 Backpressure Handling

Backpressure tritt auf, wenn Consumer nicht schnell genug verarbeiten können und sich Events anstauen. Verschiedene Strategien helfen dabei, das System stabil zu halten.

27.3.1 Throttling

Beim Throttling wird die Verarbeitungsgeschwindigkeit bewusst gedrosselt, um Überlastung zu vermeiden.

class ThrottledPaymentService:
    def __init__(self, kafka_config, max_messages_per_second=10):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.rate_limiter = max_messages_per_second
        self.last_process_time = time.time()
        
    async def throttled_process(self):
        while True:
            messages = self.consumer.poll(timeout=1.0)
            
            for topic_partition, msgs in messages.items():
                for message in msgs:
                    # Rate Limiting anwenden
                    await self.apply_rate_limit()
                    await self.handle_order_placed(message.value)
    
    async def apply_rate_limit(self):
        current_time = time.time()
        time_since_last = current_time - self.last_process_time
        min_interval = 1.0 / self.rate_limiter
        
        if time_since_last < min_interval:
            await asyncio.sleep(min_interval - time_since_last)
            
        self.last_process_time = time.time()

27.3.2 Circuit Breaker Pattern

Der Circuit Breaker schützt nachgelagerte Systeme vor Überlastung durch temporäres Stoppen der Verarbeitung.

@Component
public class ResilientPaymentService {
    private final CircuitBreakerService circuitBreaker;
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        try {
            circuitBreaker.executeWithCircuitBreaker(() -> {
                processPayment(event.getOrderId(), event.getTotalAmount());
                return null;
            });
        } catch (CircuitBreakerOpenException e) {
            // Event für spätere Verarbeitung markieren
            scheduleRetry(event);
        }
    }
    
    private void scheduleRetry(OrderPlacedEvent event) {
        // Implementierung für verzögerte Wiederholung
        log.warn("Payment processing circuit breaker open, scheduling retry for order {}", 
                 event.getOrderId());
    }
}

27.3.3 Flow Control

Flow Control regelt die Anzahl der gleichzeitig verarbeiteten Events.

import asyncio
from asyncio import Semaphore

class FlowControlledPaymentService:
    def __init__(self, kafka_config, max_concurrent=5):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.semaphore = Semaphore(max_concurrent)
        
    async def controlled_process(self):
        while True:
            messages = self.consumer.poll(timeout=1.0)
            
            tasks = []
            for topic_partition, msgs in messages.items():
                for message in msgs:
                    task = asyncio.create_task(
                        self.process_with_semaphore(message.value)
                    )
                    tasks.append(task)
            
            if tasks:
                await asyncio.gather(*tasks)
    
    async def process_with_semaphore(self, event_data):
        async with self.semaphore:
            order_data = json.loads(event_data)
            await self.process_payment(
                order_data['orderId'], 
                order_data['totalAmount']
            )

27.4 Resource Management

27.4.1 Connection Pooling

Effiziente Verwaltung von Datenbankverbindungen und externen Service-Connections.

@Configuration
public class PaymentServiceConfig {
    
    @Bean
    public HikariDataSource paymentDataSource() {
        HikariConfig config = new HikariConfig();
        config.setMaximumPoolSize(10);
        config.setMinimumIdle(2);
        config.setConnectionTimeout(30000);
        config.setIdleTimeout(600000);
        return new HikariDataSource(config);
    }
}

@Service
public class DatabasePaymentService {
    private final JdbcTemplate jdbcTemplate;
    
    public void processPayment(String orderId, BigDecimal amount) {
        // Connection wird automatisch aus Pool geholt und zurückgegeben
        jdbcTemplate.update(
            "INSERT INTO payments (order_id, amount, status) VALUES (?, ?, ?)",
            orderId, amount, "PROCESSING"
        );
    }
}

27.4.2 Memory Management

Kontrollierte Speichernutzung bei der Event-Verarbeitung.

import resource
import gc
from typing import List

class MemoryManagedPaymentService:
    def __init__(self, kafka_config, max_memory_mb=512):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.max_memory_bytes = max_memory_mb * 1024 * 1024
        self.processed_count = 0
        
    async def memory_aware_process(self):
        while True:
            # Memory-Check vor Verarbeitung
            if self.check_memory_usage():
                await self.cleanup_memory()
            
            messages = self.consumer.poll(timeout=1.0)
            
            for topic_partition, msgs in messages.items():
                for message in msgs:
                    await self.handle_order_placed(message.value)
                    self.processed_count += 1
                    
                    # Regelmäßige Garbage Collection
                    if self.processed_count % 100 == 0:
                        gc.collect()
    
    def check_memory_usage(self) -> bool:
        current_memory = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
        return current_memory > self.max_memory_bytes
        
    async def cleanup_memory(self):
        gc.collect()
        await asyncio.sleep(0.1)  # Kurze Pause für Cleanup

27.4.3 Thread Pool Management

Effiziente Nutzung von Thread-Pools für parallele Verarbeitung.

@Component
public class ThreadPoolPaymentService {
    private final ExecutorService paymentExecutor;
    
    public ThreadPoolPaymentService() {
        this.paymentExecutor = Executors.newFixedThreadPool(
            5, // Feste Anzahl Threads
            r -> {
                Thread t = new Thread(r, "payment-processor");
                t.setDaemon(true);
                return t;
            }
        );
    }
    
    @KafkaListener(topics = "order.placed.v1")
    public void handleOrderPlaced(OrderPlacedEvent event) {
        paymentExecutor.submit(() -> {
            try {
                processPayment(event.getOrderId(), event.getTotalAmount());
            } catch (Exception e) {
                log.error("Error processing payment for order {}", 
                         event.getOrderId(), e);
            }
        });
    }
    
    @PreDestroy
    public void shutdown() {
        paymentExecutor.shutdown();
        try {
            if (!paymentExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                paymentExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            paymentExecutor.shutdownNow();
        }
    }
}

Die Wahl zwischen verschiedenen Polling-Strategien und Backpressure-Mechanismen hängt von den spezifischen Anforderungen des Systems ab. Niedrige Latenz erfordert häufiges Polling, während hoher Durchsatz von Batch-Verarbeitung profitiert. Resource Management stellt sicher, dass das System auch unter Last stabil läuft.

Im siehe Kapitel zu “Eventverarbeitung mit Spring Boot” werden wir die praktischen Aspekte der KafkaListener-Konfiguration vertiefen, während das siehe Kapitel zu “Eventverarbeitung mit Python” async-basierte Implementierungen im Detail behandelt.