Python bietet mit confluent-kafka-python und
asyncio mächtige Werkzeuge für Event-Consumer. Die
explizite Kontrolle über Polling-Loops ermöglicht es, Consumer-Verhalten
präzise an Anwendungsanforderungen anzupassen.
Der grundlegende Polling-Loop ist das Fundament aller Python-Kafka-Consumer.
import json
import logging
from confluent_kafka import Consumer, KafkaError, KafkaException
from typing import Dict, Optional
class PaymentService:
def __init__(self, kafka_config: Dict[str, str]):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.running = True
def run(self):
"""Grundlegender synchroner Consumer-Loop"""
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logging.info(f"End of partition {msg.partition()}")
continue
else:
raise KafkaException(msg.error())
# Event verarbeiten
self.handle_order_placed(msg.value())
except KeyboardInterrupt:
logging.info("Consumer interrupted by user")
finally:
self.consumer.close()
def handle_order_placed(self, message_value: bytes):
"""Verarbeitung eines OrderPlaced Events"""
try:
event_data = json.loads(message_value.decode('utf-8'))
order_id = event_data['data']['orderId']
total_amount = event_data['data']['totalAmount']
currency = event_data['data']['currency']
logging.info(f"Processing payment for order {order_id}")
# Geschäftslogik
payment_result = self.process_payment(order_id, total_amount, currency)
if payment_result['success']:
self.publish_payment_processed(order_id, payment_result)
else:
self.handle_payment_failure(order_id, payment_result)
except Exception as e:
logging.error(f"Error processing message: {e}")
raise
def process_payment(self, order_id: str, amount: float, currency: str) -> Dict:
"""Simulierte Zahlungsverarbeitung"""
# Vereinfachte Geschäftslogik
if amount > 1000:
return {'success': False, 'error': 'Amount too high'}
return {
'success': True,
'transaction_id': f"txn_{order_id}_{int(time.time())}",
'amount': amount,
'currency': currency
}Für höheren Durchsatz können Events in Batches verarbeitet werden.
import time
from typing import List
from confluent_kafka import Message
class BatchPaymentService:
def __init__(self, kafka_config: Dict[str, str], batch_size: int = 10):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.batch_size = batch_size
self.running = True
def run_batch_processing(self):
"""Batch-orientierter Consumer-Loop"""
try:
while self.running:
messages = self.consume_batch()
if messages:
self.process_batch(messages)
self.commit_batch(messages)
except KeyboardInterrupt:
logging.info("Batch consumer interrupted")
finally:
self.consumer.close()
def consume_batch(self) -> List[Message]:
"""Sammelt Messages für Batch-Verarbeitung"""
messages = []
start_time = time.time()
timeout = 5.0 # Maximale Wartezeit für Batch
while len(messages) < self.batch_size:
remaining_time = timeout - (time.time() - start_time)
if remaining_time <= 0:
break
msg = self.consumer.poll(timeout=remaining_time)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
messages.append(msg)
return messages
def process_batch(self, messages: List[Message]):
"""Verarbeitet einen Batch von Messages"""
logging.info(f"Processing batch of {len(messages)} messages")
payment_requests = []
for msg in messages:
try:
event_data = json.loads(msg.value().decode('utf-8'))
payment_requests.append({
'order_id': event_data['data']['orderId'],
'amount': event_data['data']['totalAmount'],
'currency': event_data['data']['currency'],
'message': msg
})
except Exception as e:
logging.error(f"Error parsing message: {e}")
# Batch-Zahlungsverarbeitung
results = self.process_payment_batch(payment_requests)
# Ergebnisse verarbeiten
for request, result in zip(payment_requests, results):
if result['success']:
self.publish_payment_processed(request['order_id'], result)
else:
self.handle_payment_failure(request['order_id'], result)
def process_payment_batch(self, requests: List[Dict]) -> List[Dict]:
"""Simulierte Batch-Zahlungsverarbeitung"""
results = []
for request in requests:
# Simulierte externe API-Calls (würden normalerweise parallel laufen)
time.sleep(0.1) # Simulierte Latenz
if request['amount'] > 1000:
results.append({'success': False, 'error': 'Amount too high'})
else:
results.append({
'success': True,
'transaction_id': f"txn_{request['order_id']}_{int(time.time())}",
'amount': request['amount']
})
return results
def commit_batch(self, messages: List[Message]):
"""Committed alle Messages im Batch"""
if messages:
self.consumer.commit(message=messages[-1], asynchronous=False)Ein Consumer-Loop mit eingebautem Retry-Mechanismus.
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class RetryConfig:
max_retries: int = 3
initial_delay: float = 1.0
backoff_multiplier: float = 2.0
max_delay: float = 60.0
class RetryablePaymentService:
def __init__(self, kafka_config: Dict[str, str], retry_config: RetryConfig):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.retry_config = retry_config
self.running = True
def run_with_retry(self):
"""Consumer-Loop mit automatischem Retry"""
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
# Message mit Retry verarbeiten
success = self.process_with_retry(msg)
if success:
self.consumer.commit(message=msg)
else:
# Nach allen Retry-Versuchen: Dead Letter handling
self.send_to_dead_letter(msg)
self.consumer.commit(message=msg)
except KeyboardInterrupt:
logging.info("Retry consumer interrupted")
finally:
self.consumer.close()
def process_with_retry(self, msg: Message) -> bool:
"""Verarbeitet Message mit Retry-Logik"""
last_exception = None
for attempt in range(self.retry_config.max_retries + 1):
try:
self.handle_order_placed(msg.value())
return True
except RetryableException as e:
last_exception = e
if attempt < self.retry_config.max_retries:
delay = min(
self.retry_config.initial_delay * (
self.retry_config.backoff_multiplier ** attempt
),
self.retry_config.max_delay
)
logging.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
time.sleep(delay)
else:
logging.error(f"All {self.retry_config.max_retries} retry attempts failed: {e}")
except Exception as e:
logging.error(f"Non-retryable error: {e}")
return False
return FalseAsynchrone Verarbeitung ermöglicht höhere Parallelität und bessere Resource-Nutzung.
import asyncio
import aiohttp
from confluent_kafka import Consumer
from typing import Coroutine
class AsyncPaymentService:
def __init__(self, kafka_config: Dict[str, str]):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.running = True
self.session: Optional[aiohttp.ClientSession] = None
async def run_async(self):
"""Asynchroner Consumer-Loop"""
self.session = aiohttp.ClientSession()
try:
while self.running:
# Polling ist synchron, aber kurz
msg = self.consumer.poll(timeout=0.1)
if msg is None:
await asyncio.sleep(0.01) # Kurze async Pause
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
# Asynchrone Verarbeitung
await self.handle_order_placed_async(msg.value())
self.consumer.commit(message=msg)
except KeyboardInterrupt:
logging.info("Async consumer interrupted")
finally:
if self.session:
await self.session.close()
self.consumer.close()
async def handle_order_placed_async(self, message_value: bytes):
"""Asynchrone Event-Verarbeitung"""
try:
event_data = json.loads(message_value.decode('utf-8'))
order_id = event_data['data']['orderId']
total_amount = event_data['data']['totalAmount']
logging.info(f"Processing payment async for order {order_id}")
# Parallele asynchrone Operationen
customer_task = self.fetch_customer_data(event_data['data']['customerId'])
fraud_task = self.check_fraud_async(order_id, total_amount)
customer_data, fraud_result = await asyncio.gather(
customer_task, fraud_task
)
if fraud_result['is_fraud']:
await self.handle_fraud_detected(order_id)
return
# Zahlungsverarbeitung
payment_result = await self.process_payment_async(
order_id, total_amount, customer_data
)
if payment_result['success']:
await self.publish_payment_processed_async(order_id, payment_result)
except Exception as e:
logging.error(f"Error in async processing: {e}")
raise
async def fetch_customer_data(self, customer_id: str) -> Dict:
"""Asynchroner Customer-Service Call"""
async with self.session.get(f"http://customer-service/api/customers/{customer_id}") as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Customer service error: {response.status}")
async def check_fraud_async(self, order_id: str, amount: float) -> Dict:
"""Asynchrone Fraud-Detection"""
await asyncio.sleep(0.1) # Simulierte async Verarbeitung
# Vereinfachte Fraud-Logik
is_fraud = amount > 10000 or order_id.startswith('suspicious')
return {
'is_fraud': is_fraud,
'confidence': 0.95 if is_fraud else 0.1,
'order_id': order_id
}
async def process_payment_async(self, order_id: str, amount: float,
customer_data: Dict) -> Dict:
"""Asynchrone Zahlungsverarbeitung"""
payment_data = {
'order_id': order_id,
'amount': amount,
'customer_id': customer_data['id'],
'payment_method': customer_data.get('preferred_payment_method', 'credit_card')
}
async with self.session.post('http://payment-gateway/api/payments',
json=payment_data) as response:
if response.status == 200:
result = await response.json()
return {
'success': True,
'transaction_id': result['transaction_id'],
'amount': amount
}
else:
return {
'success': False,
'error': f"Payment gateway error: {response.status}"
}Parallele Verarbeitung mehrerer Messages mit Semaphore-Kontrolle.
class ConcurrentPaymentService:
def __init__(self, kafka_config: Dict[str, str], max_concurrent: int = 10):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.semaphore = asyncio.Semaphore(max_concurrent)
self.running = True
self.pending_tasks = set()
async def run_concurrent(self):
"""Concurrent Consumer mit Task-Management"""
try:
while self.running:
msg = self.consumer.poll(timeout=0.1)
if msg is None:
await asyncio.sleep(0.01)
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
# Task für asynchrone Verarbeitung erstellen
task = asyncio.create_task(
self.process_message_concurrent(msg)
)
self.pending_tasks.add(task)
task.add_done_callback(self.pending_tasks.discard)
# Cleanup beendeter Tasks
await self.cleanup_completed_tasks()
except KeyboardInterrupt:
logging.info("Concurrent consumer interrupted")
finally:
await self.shutdown_tasks()
self.consumer.close()
async def process_message_concurrent(self, msg: Message):
"""Verarbeitet Message unter Semaphore-Kontrolle"""
async with self.semaphore:
try:
await self.handle_order_placed_async(msg.value())
self.consumer.commit(message=msg)
except Exception as e:
logging.error(f"Error processing message concurrently: {e}")
async def cleanup_completed_tasks(self):
"""Entfernt abgeschlossene Tasks"""
if len(self.pending_tasks) > 100: # Cleanup bei vielen Tasks
completed = [task for task in self.pending_tasks if task.done()]
for task in completed:
self.pending_tasks.discard(task)
async def shutdown_tasks(self):
"""Wartet auf alle pending Tasks"""
if self.pending_tasks:
logging.info(f"Waiting for {len(self.pending_tasks)} pending tasks")
await asyncio.gather(*self.pending_tasks, return_exceptions=True)import aiohttp
from contextlib import asynccontextmanager
class ResourceManagedPaymentService:
def __init__(self, kafka_config: Dict[str, str]):
self.consumer = Consumer(kafka_config)
self.consumer.subscribe(['order.placed.v1'])
self.session: Optional[aiohttp.ClientSession] = None
self.db_pool = None
self.running = True
async def initialize_resources(self):
"""Initialisiert alle externen Resources"""
# HTTP Session mit Connection Pooling
connector = aiohttp.TCPConnector(
limit=100, # Maximale Connections
limit_per_host=30,
ttl_dns_cache=300,
use_dns_cache=True,
)
timeout = aiohttp.ClientTimeout(total=30, connect=5)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
# Database Connection Pool (mit asyncpg als Beispiel)
self.db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/payments",
min_size=5,
max_size=20,
command_timeout=60
)
logging.info("Resources initialized")
async def run_with_resource_management(self):
"""Consumer-Loop mit Resource-Management"""
await self.initialize_resources()
try:
while self.running:
msg = self.consumer.poll(timeout=0.1)
if msg is None:
await asyncio.sleep(0.01)
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
await self.handle_order_placed_with_resources(msg.value())
self.consumer.commit(message=msg)
except KeyboardInterrupt:
logging.info("Resource managed consumer interrupted")
finally:
await self.cleanup_resources()
self.consumer.close()
async def handle_order_placed_with_resources(self, message_value: bytes):
"""Event-Verarbeitung mit Resource-Management"""
try:
event_data = json.loads(message_value.decode('utf-8'))
order_id = event_data['data']['orderId']
# Database Operation mit Connection Pool
async with self.db_pool.acquire() as connection:
payment_record = await connection.fetchrow(
"SELECT * FROM payments WHERE order_id = $1", order_id
)
if payment_record:
logging.info(f"Payment already exists for order {order_id}")
return
# HTTP Call mit Session Pool
payment_result = await self.process_payment_with_session(event_data)
# Ergebnis in DB speichern
async with self.db_pool.acquire() as connection:
await connection.execute(
"""INSERT INTO payments (order_id, amount, status, transaction_id)
VALUES ($1, $2, $3, $4)""",
order_id,
event_data['data']['totalAmount'],
'COMPLETED' if payment_result['success'] else 'FAILED',
payment_result.get('transaction_id')
)
except Exception as e:
logging.error(f"Error in resource managed processing: {e}")
raise
async def cleanup_resources(self):
"""Cleanup aller Resources"""
if self.session:
await self.session.close()
if self.db_pool:
await self.db_pool.close()
logging.info("Resources cleaned up")import signal
import sys
from pathlib import Path
from typing import Callable
class ProductionPaymentConsumer:
def __init__(self, config_file: Path):
self.config = self.load_config(config_file)
self.consumer = Consumer(self.config['kafka'])
self.consumer.subscribe(['order.placed.v1'])
self.running = True
self.metrics = {
'messages_processed': 0,
'messages_failed': 0,
'processing_time_total': 0.0
}
# Signal Handling für graceful shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
"""Graceful Shutdown Handler"""
logging.info(f"Received signal {signum}, shutting down gracefully")
self.running = False
async def run_production(self):
"""Production-ready Consumer-Loop"""
await self.initialize_resources()
try:
logging.info("Starting production consumer")
while self.running:
start_time = time.time()
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
try:
await self.process_message_with_metrics(msg)
self.consumer.commit(message=msg)
self.metrics['messages_processed'] += 1
except Exception as e:
logging.error(f"Message processing failed: {e}")
self.metrics['messages_failed'] += 1
await self.handle_processing_error(msg, e)
processing_time = time.time() - start_time
self.metrics['processing_time_total'] += processing_time
# Periodische Metriken-Ausgabe
if self.metrics['messages_processed'] % 100 == 0:
self.log_metrics()
except Exception as e:
logging.error(f"Fatal consumer error: {e}")
finally:
await self.shutdown()
async def process_message_with_metrics(self, msg: Message):
"""Message-Verarbeitung mit Monitoring"""
correlation_id = self.extract_correlation_id(msg)
# Structured Logging mit Correlation ID
with logging_context(correlation_id=correlation_id):
logging.info(f"Processing message from partition {msg.partition()}, offset {msg.offset()}")
event_data = json.loads(msg.value().decode('utf-8'))
await self.handle_order_placed_async(event_data)
logging.info("Message processed successfully")
def extract_correlation_id(self, msg: Message) -> str:
"""Extrahiert Correlation ID aus Message Headers"""
if msg.headers():
for key, value in msg.headers():
if key == 'correlation-id':
return value.decode('utf-8')
# Fallback: generiere neue ID
return f"auto-{int(time.time())}-{msg.offset()}"
def log_metrics(self):
"""Logged aktuelle Metriken"""
avg_processing_time = (
self.metrics['processing_time_total'] /
max(1, self.metrics['messages_processed'])
)
logging.info(
f"Metrics - Processed: {self.metrics['messages_processed']}, "
f"Failed: {self.metrics['messages_failed']}, "
f"Avg Processing Time: {avg_processing_time:.3f}s"
)
async def shutdown(self):
"""Graceful Shutdown"""
logging.info("Shutting down consumer")
await self.cleanup_resources()
self.consumer.close()
logging.info("Consumer shutdown complete")
# Context Manager für Structured Logging
@asynccontextmanager
async def logging_context(**kwargs):
"""Adds context to logging for the duration of the block"""
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **factory_kwargs):
record = old_factory(*args, **factory_kwargs)
for key, value in kwargs.items():
setattr(record, key, value)
return record
logging.setLogRecordFactory(record_factory)
try:
yield
finally:
logging.setLogRecordFactory(old_factory)Python’s explizite Polling-Loops bieten maximale Kontrolle über Consumer-Verhalten. Die Kombination aus asyncio und Connection-Pooling ermöglicht hochperformante, ressourcenschonende Event-Verarbeitung, die gleichzeitig wartbar und testbar bleibt.
Im siehe Kapitel zu “Manuelles vs. automatisches Offset-Handling” werden wir die Details der Message-Bestätigung behandeln, während das siehe Kapitel zu “Fehlertoleranz, Dead Letter Topics, Wiederholungen” robuste Error-Handling-Strategien vertieft.