29 Eventverarbeitung mit Python

Python bietet mit confluent-kafka-python und asyncio mächtige Werkzeuge für Event-Consumer. Die explizite Kontrolle über Polling-Loops ermöglicht es, Consumer-Verhalten präzise an Anwendungsanforderungen anzupassen.

29.1 Consumer Loop Patterns

29.1.1 Basic Polling Loop

Der grundlegende Polling-Loop ist das Fundament aller Python-Kafka-Consumer.

import json
import logging
from confluent_kafka import Consumer, KafkaError, KafkaException
from typing import Dict, Optional

class PaymentService:
    def __init__(self, kafka_config: Dict[str, str]):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.running = True
        
    def run(self):
        """Grundlegender synchroner Consumer-Loop"""
        try:
            while self.running:
                msg = self.consumer.poll(timeout=1.0)
                
                if msg is None:
                    continue
                    
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        logging.info(f"End of partition {msg.partition()}")
                        continue
                    else:
                        raise KafkaException(msg.error())
                
                # Event verarbeiten
                self.handle_order_placed(msg.value())
                
        except KeyboardInterrupt:
            logging.info("Consumer interrupted by user")
        finally:
            self.consumer.close()
    
    def handle_order_placed(self, message_value: bytes):
        """Verarbeitung eines OrderPlaced Events"""
        try:
            event_data = json.loads(message_value.decode('utf-8'))
            order_id = event_data['data']['orderId']
            total_amount = event_data['data']['totalAmount']
            currency = event_data['data']['currency']
            
            logging.info(f"Processing payment for order {order_id}")
            
            # Geschäftslogik
            payment_result = self.process_payment(order_id, total_amount, currency)
            
            if payment_result['success']:
                self.publish_payment_processed(order_id, payment_result)
            else:
                self.handle_payment_failure(order_id, payment_result)
                
        except Exception as e:
            logging.error(f"Error processing message: {e}")
            raise

    def process_payment(self, order_id: str, amount: float, currency: str) -> Dict:
        """Simulierte Zahlungsverarbeitung"""
        # Vereinfachte Geschäftslogik
        if amount > 1000:
            return {'success': False, 'error': 'Amount too high'}
        
        return {
            'success': True,
            'transaction_id': f"txn_{order_id}_{int(time.time())}",
            'amount': amount,
            'currency': currency
        }

29.1.2 Batch Processing Loop

Für höheren Durchsatz können Events in Batches verarbeitet werden.

import time
from typing import List
from confluent_kafka import Message

class BatchPaymentService:
    def __init__(self, kafka_config: Dict[str, str], batch_size: int = 10):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.batch_size = batch_size
        self.running = True
        
    def run_batch_processing(self):
        """Batch-orientierter Consumer-Loop"""
        try:
            while self.running:
                messages = self.consume_batch()
                
                if messages:
                    self.process_batch(messages)
                    self.commit_batch(messages)
                    
        except KeyboardInterrupt:
            logging.info("Batch consumer interrupted")
        finally:
            self.consumer.close()
    
    def consume_batch(self) -> List[Message]:
        """Sammelt Messages für Batch-Verarbeitung"""
        messages = []
        start_time = time.time()
        timeout = 5.0  # Maximale Wartezeit für Batch
        
        while len(messages) < self.batch_size:
            remaining_time = timeout - (time.time() - start_time)
            if remaining_time <= 0:
                break
                
            msg = self.consumer.poll(timeout=remaining_time)
            
            if msg is None:
                continue
                
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    raise KafkaException(msg.error())
            
            messages.append(msg)
            
        return messages
    
    def process_batch(self, messages: List[Message]):
        """Verarbeitet einen Batch von Messages"""
        logging.info(f"Processing batch of {len(messages)} messages")
        
        payment_requests = []
        for msg in messages:
            try:
                event_data = json.loads(msg.value().decode('utf-8'))
                payment_requests.append({
                    'order_id': event_data['data']['orderId'],
                    'amount': event_data['data']['totalAmount'],
                    'currency': event_data['data']['currency'],
                    'message': msg
                })
            except Exception as e:
                logging.error(f"Error parsing message: {e}")
        
        # Batch-Zahlungsverarbeitung
        results = self.process_payment_batch(payment_requests)
        
        # Ergebnisse verarbeiten
        for request, result in zip(payment_requests, results):
            if result['success']:
                self.publish_payment_processed(request['order_id'], result)
            else:
                self.handle_payment_failure(request['order_id'], result)
    
    def process_payment_batch(self, requests: List[Dict]) -> List[Dict]:
        """Simulierte Batch-Zahlungsverarbeitung"""
        results = []
        for request in requests:
            # Simulierte externe API-Calls (würden normalerweise parallel laufen)
            time.sleep(0.1)  # Simulierte Latenz
            
            if request['amount'] > 1000:
                results.append({'success': False, 'error': 'Amount too high'})
            else:
                results.append({
                    'success': True,
                    'transaction_id': f"txn_{request['order_id']}_{int(time.time())}",
                    'amount': request['amount']
                })
                
        return results
    
    def commit_batch(self, messages: List[Message]):
        """Committed alle Messages im Batch"""
        if messages:
            self.consumer.commit(message=messages[-1], asynchronous=False)

29.1.3 Retry-enabled Loop

Ein Consumer-Loop mit eingebautem Retry-Mechanismus.

import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class RetryConfig:
    max_retries: int = 3
    initial_delay: float = 1.0
    backoff_multiplier: float = 2.0
    max_delay: float = 60.0

class RetryablePaymentService:
    def __init__(self, kafka_config: Dict[str, str], retry_config: RetryConfig):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.retry_config = retry_config
        self.running = True
        
    def run_with_retry(self):
        """Consumer-Loop mit automatischem Retry"""
        try:
            while self.running:
                msg = self.consumer.poll(timeout=1.0)
                
                if msg is None:
                    continue
                    
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        raise KafkaException(msg.error())
                
                # Message mit Retry verarbeiten
                success = self.process_with_retry(msg)
                
                if success:
                    self.consumer.commit(message=msg)
                else:
                    # Nach allen Retry-Versuchen: Dead Letter handling
                    self.send_to_dead_letter(msg)
                    self.consumer.commit(message=msg)
                    
        except KeyboardInterrupt:
            logging.info("Retry consumer interrupted")
        finally:
            self.consumer.close()
    
    def process_with_retry(self, msg: Message) -> bool:
        """Verarbeitet Message mit Retry-Logik"""
        last_exception = None
        
        for attempt in range(self.retry_config.max_retries + 1):
            try:
                self.handle_order_placed(msg.value())
                return True
                
            except RetryableException as e:
                last_exception = e
                
                if attempt < self.retry_config.max_retries:
                    delay = min(
                        self.retry_config.initial_delay * (
                            self.retry_config.backoff_multiplier ** attempt
                        ),
                        self.retry_config.max_delay
                    )
                    
                    logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
                    time.sleep(delay)
                else:
                    logging.error(f"All {self.retry_config.max_retries} retry attempts failed: {e}")
                    
            except Exception as e:
                logging.error(f"Non-retryable error: {e}")
                return False
        
        return False

29.2 Async Processing

29.2.1 Basic Async Consumer

Asynchrone Verarbeitung ermöglicht höhere Parallelität und bessere Resource-Nutzung.

import asyncio
import aiohttp
from confluent_kafka import Consumer
from typing import Coroutine

class AsyncPaymentService:
    def __init__(self, kafka_config: Dict[str, str]):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.running = True
        self.session: Optional[aiohttp.ClientSession] = None
        
    async def run_async(self):
        """Asynchroner Consumer-Loop"""
        self.session = aiohttp.ClientSession()
        
        try:
            while self.running:
                # Polling ist synchron, aber kurz
                msg = self.consumer.poll(timeout=0.1)
                
                if msg is None:
                    await asyncio.sleep(0.01)  # Kurze async Pause
                    continue
                    
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        raise KafkaException(msg.error())
                
                # Asynchrone Verarbeitung
                await self.handle_order_placed_async(msg.value())
                self.consumer.commit(message=msg)
                
        except KeyboardInterrupt:
            logging.info("Async consumer interrupted")
        finally:
            if self.session:
                await self.session.close()
            self.consumer.close()
    
    async def handle_order_placed_async(self, message_value: bytes):
        """Asynchrone Event-Verarbeitung"""
        try:
            event_data = json.loads(message_value.decode('utf-8'))
            order_id = event_data['data']['orderId']
            total_amount = event_data['data']['totalAmount']
            
            logging.info(f"Processing payment async for order {order_id}")
            
            # Parallele asynchrone Operationen
            customer_task = self.fetch_customer_data(event_data['data']['customerId'])
            fraud_task = self.check_fraud_async(order_id, total_amount)
            
            customer_data, fraud_result = await asyncio.gather(
                customer_task, fraud_task
            )
            
            if fraud_result['is_fraud']:
                await self.handle_fraud_detected(order_id)
                return
                
            # Zahlungsverarbeitung
            payment_result = await self.process_payment_async(
                order_id, total_amount, customer_data
            )
            
            if payment_result['success']:
                await self.publish_payment_processed_async(order_id, payment_result)
                
        except Exception as e:
            logging.error(f"Error in async processing: {e}")
            raise
    
    async def fetch_customer_data(self, customer_id: str) -> Dict:
        """Asynchroner Customer-Service Call"""
        async with self.session.get(f"http://customer-service/api/customers/{customer_id}") as response:
            if response.status == 200:
                return await response.json()
            else:
                raise Exception(f"Customer service error: {response.status}")
    
    async def check_fraud_async(self, order_id: str, amount: float) -> Dict:
        """Asynchrone Fraud-Detection"""
        await asyncio.sleep(0.1)  # Simulierte async Verarbeitung
        
        # Vereinfachte Fraud-Logik
        is_fraud = amount > 10000 or order_id.startswith('suspicious')
        
        return {
            'is_fraud': is_fraud,
            'confidence': 0.95 if is_fraud else 0.1,
            'order_id': order_id
        }
    
    async def process_payment_async(self, order_id: str, amount: float, 
                                  customer_data: Dict) -> Dict:
        """Asynchrone Zahlungsverarbeitung"""
        payment_data = {
            'order_id': order_id,
            'amount': amount,
            'customer_id': customer_data['id'],
            'payment_method': customer_data.get('preferred_payment_method', 'credit_card')
        }
        
        async with self.session.post('http://payment-gateway/api/payments', 
                                   json=payment_data) as response:
            if response.status == 200:
                result = await response.json()
                return {
                    'success': True,
                    'transaction_id': result['transaction_id'],
                    'amount': amount
                }
            else:
                return {
                    'success': False,
                    'error': f"Payment gateway error: {response.status}"
                }

29.2.2 Concurrent Message Processing

Parallele Verarbeitung mehrerer Messages mit Semaphore-Kontrolle.

class ConcurrentPaymentService:
    def __init__(self, kafka_config: Dict[str, str], max_concurrent: int = 10):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.running = True
        self.pending_tasks = set()
        
    async def run_concurrent(self):
        """Concurrent Consumer mit Task-Management"""
        try:
            while self.running:
                msg = self.consumer.poll(timeout=0.1)
                
                if msg is None:
                    await asyncio.sleep(0.01)
                    continue
                    
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        raise KafkaException(msg.error())
                
                # Task für asynchrone Verarbeitung erstellen
                task = asyncio.create_task(
                    self.process_message_concurrent(msg)
                )
                self.pending_tasks.add(task)
                task.add_done_callback(self.pending_tasks.discard)
                
                # Cleanup beendeter Tasks
                await self.cleanup_completed_tasks()
                
        except KeyboardInterrupt:
            logging.info("Concurrent consumer interrupted")
        finally:
            await self.shutdown_tasks()
            self.consumer.close()
    
    async def process_message_concurrent(self, msg: Message):
        """Verarbeitet Message unter Semaphore-Kontrolle"""
        async with self.semaphore:
            try:
                await self.handle_order_placed_async(msg.value())
                self.consumer.commit(message=msg)
            except Exception as e:
                logging.error(f"Error processing message concurrently: {e}")
    
    async def cleanup_completed_tasks(self):
        """Entfernt abgeschlossene Tasks"""
        if len(self.pending_tasks) > 100:  # Cleanup bei vielen Tasks
            completed = [task for task in self.pending_tasks if task.done()]
            for task in completed:
                self.pending_tasks.discard(task)
    
    async def shutdown_tasks(self):
        """Wartet auf alle pending Tasks"""
        if self.pending_tasks:
            logging.info(f"Waiting for {len(self.pending_tasks)} pending tasks")
            await asyncio.gather(*self.pending_tasks, return_exceptions=True)

29.3 Resource Management

29.3.1 Connection Pool Management

import aiohttp
from contextlib import asynccontextmanager

class ResourceManagedPaymentService:
    def __init__(self, kafka_config: Dict[str, str]):
        self.consumer = Consumer(kafka_config)
        self.consumer.subscribe(['order.placed.v1'])
        self.session: Optional[aiohttp.ClientSession] = None
        self.db_pool = None
        self.running = True
        
    async def initialize_resources(self):
        """Initialisiert alle externen Resources"""
        # HTTP Session mit Connection Pooling
        connector = aiohttp.TCPConnector(
            limit=100,  # Maximale Connections
            limit_per_host=30,
            ttl_dns_cache=300,
            use_dns_cache=True,
        )
        
        timeout = aiohttp.ClientTimeout(total=30, connect=5)
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )
        
        # Database Connection Pool (mit asyncpg als Beispiel)
        self.db_pool = await asyncpg.create_pool(
            "postgresql://user:pass@localhost/payments",
            min_size=5,
            max_size=20,
            command_timeout=60
        )
        
        logging.info("Resources initialized")
    
    async def run_with_resource_management(self):
        """Consumer-Loop mit Resource-Management"""
        await self.initialize_resources()
        
        try:
            while self.running:
                msg = self.consumer.poll(timeout=0.1)
                
                if msg is None:
                    await asyncio.sleep(0.01)
                    continue
                    
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        raise KafkaException(msg.error())
                
                await self.handle_order_placed_with_resources(msg.value())
                self.consumer.commit(message=msg)
                
        except KeyboardInterrupt:
            logging.info("Resource managed consumer interrupted")
        finally:
            await self.cleanup_resources()
            self.consumer.close()
    
    async def handle_order_placed_with_resources(self, message_value: bytes):
        """Event-Verarbeitung mit Resource-Management"""
        try:
            event_data = json.loads(message_value.decode('utf-8'))
            order_id = event_data['data']['orderId']
            
            # Database Operation mit Connection Pool
            async with self.db_pool.acquire() as connection:
                payment_record = await connection.fetchrow(
                    "SELECT * FROM payments WHERE order_id = $1", order_id
                )
                
                if payment_record:
                    logging.info(f"Payment already exists for order {order_id}")
                    return
            
            # HTTP Call mit Session Pool
            payment_result = await self.process_payment_with_session(event_data)
            
            # Ergebnis in DB speichern
            async with self.db_pool.acquire() as connection:
                await connection.execute(
                    """INSERT INTO payments (order_id, amount, status, transaction_id) 
                       VALUES ($1, $2, $3, $4)""",
                    order_id,
                    event_data['data']['totalAmount'],
                    'COMPLETED' if payment_result['success'] else 'FAILED',
                    payment_result.get('transaction_id')
                )
                
        except Exception as e:
            logging.error(f"Error in resource managed processing: {e}")
            raise
    
    async def cleanup_resources(self):
        """Cleanup aller Resources"""
        if self.session:
            await self.session.close()
            
        if self.db_pool:
            await self.db_pool.close()
            
        logging.info("Resources cleaned up")

29.4 Practical Python Examples

29.4.1 Production-Ready Consumer

import signal
import sys
from pathlib import Path
from typing import Callable

class ProductionPaymentConsumer:
    def __init__(self, config_file: Path):
        self.config = self.load_config(config_file)
        self.consumer = Consumer(self.config['kafka'])
        self.consumer.subscribe(['order.placed.v1'])
        self.running = True
        self.metrics = {
            'messages_processed': 0,
            'messages_failed': 0,
            'processing_time_total': 0.0
        }
        
        # Signal Handling für graceful shutdown
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)
    
    def signal_handler(self, signum, frame):
        """Graceful Shutdown Handler"""
        logging.info(f"Received signal {signum}, shutting down gracefully")
        self.running = False
    
    async def run_production(self):
        """Production-ready Consumer-Loop"""
        await self.initialize_resources()
        
        try:
            logging.info("Starting production consumer")
            
            while self.running:
                start_time = time.time()
                
                msg = self.consumer.poll(timeout=1.0)
                
                if msg is None:
                    continue
                    
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        raise KafkaException(msg.error())
                
                try:
                    await self.process_message_with_metrics(msg)
                    self.consumer.commit(message=msg)
                    self.metrics['messages_processed'] += 1
                    
                except Exception as e:
                    logging.error(f"Message processing failed: {e}")
                    self.metrics['messages_failed'] += 1
                    await self.handle_processing_error(msg, e)
                
                processing_time = time.time() - start_time
                self.metrics['processing_time_total'] += processing_time
                
                # Periodische Metriken-Ausgabe
                if self.metrics['messages_processed'] % 100 == 0:
                    self.log_metrics()
                    
        except Exception as e:
            logging.error(f"Fatal consumer error: {e}")
        finally:
            await self.shutdown()
    
    async def process_message_with_metrics(self, msg: Message):
        """Message-Verarbeitung mit Monitoring"""
        correlation_id = self.extract_correlation_id(msg)
        
        # Structured Logging mit Correlation ID
        with logging_context(correlation_id=correlation_id):
            logging.info(f"Processing message from partition {msg.partition()}, offset {msg.offset()}")
            
            event_data = json.loads(msg.value().decode('utf-8'))
            await self.handle_order_placed_async(event_data)
            
            logging.info("Message processed successfully")
    
    def extract_correlation_id(self, msg: Message) -> str:
        """Extrahiert Correlation ID aus Message Headers"""
        if msg.headers():
            for key, value in msg.headers():
                if key == 'correlation-id':
                    return value.decode('utf-8')
        
        # Fallback: generiere neue ID
        return f"auto-{int(time.time())}-{msg.offset()}"
    
    def log_metrics(self):
        """Logged aktuelle Metriken"""
        avg_processing_time = (
            self.metrics['processing_time_total'] / 
            max(1, self.metrics['messages_processed'])
        )
        
        logging.info(
            f"Metrics - Processed: {self.metrics['messages_processed']}, "
            f"Failed: {self.metrics['messages_failed']}, "
            f"Avg Processing Time: {avg_processing_time:.3f}s"
        )
    
    async def shutdown(self):
        """Graceful Shutdown"""
        logging.info("Shutting down consumer")
        await self.cleanup_resources()
        self.consumer.close()
        logging.info("Consumer shutdown complete")

# Context Manager für Structured Logging
@asynccontextmanager
async def logging_context(**kwargs):
    """Adds context to logging for the duration of the block"""
    old_factory = logging.getLogRecordFactory()
    
    def record_factory(*args, **factory_kwargs):
        record = old_factory(*args, **factory_kwargs)
        for key, value in kwargs.items():
            setattr(record, key, value)
        return record
    
    logging.setLogRecordFactory(record_factory)
    try:
        yield
    finally:
        logging.setLogRecordFactory(old_factory)

Python’s explizite Polling-Loops bieten maximale Kontrolle über Consumer-Verhalten. Die Kombination aus asyncio und Connection-Pooling ermöglicht hochperformante, ressourcenschonende Event-Verarbeitung, die gleichzeitig wartbar und testbar bleibt.

Im siehe Kapitel zu “Manuelles vs. automatisches Offset-Handling” werden wir die Details der Message-Bestätigung behandeln, während das siehe Kapitel zu “Fehlertoleranz, Dead Letter Topics, Wiederholungen” robuste Error-Handling-Strategien vertieft.