Python bietet mehrere ausgereifte Bibliotheken für Kafka-Integration,
wobei confluent-kafka-python und kafka-python
die populärsten sind. Moderne Python-Anwendungen nutzen AsyncIO für
high-performance Event-Produktion, während die Flexibilität der Sprache
elegante Abstraktionen und robuste Fehlerbehandlung ermöglicht.
Python Kafka Clients erfordern explizite Konfiguration über
Dictionary-basierte Einstellungen. Die Wahl zwischen
confluent-kafka-python (basiert auf librdkafka) und
kafka-python (pure Python) hängt von
Performance-Anforderungen und Deployment-Constraints ab.
Confluent Kafka Python bietet die beste Performance durch die zugrundeliegende librdkafka C-Bibliothek und unterstützt alle modernen Kafka-Features.
# requirements.txt
confluent-kafka==2.3.0
aiokafka==0.8.11
pydantic==2.5.0
structlog==23.2.0
# config/kafka_config.py
from dataclasses import dataclass
from typing import Dict, Any, Optional
import os
@dataclass
class KafkaProducerConfig:
bootstrap_servers: str = "localhost:9092"
client_id: str = "order-service-python"
acks: str = "all"
retries: int = 3
batch_size: int = 16384
linger_ms: int = 5
buffer_memory: int = 33554432
enable_idempotence: bool = True
compression_type: str = "snappy"
# Security
security_protocol: str = "PLAINTEXT"
sasl_mechanism: Optional[str] = None
sasl_username: Optional[str] = None
sasl_password: Optional[str] = None
# Transactions
transactional_id: Optional[str] = None
transaction_timeout_ms: int = 60000
@classmethod
def from_env(cls) -> 'KafkaProducerConfig':
return cls(
bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS', cls.bootstrap_servers),
client_id=os.getenv('KAFKA_CLIENT_ID', cls.client_id),
acks=os.getenv('KAFKA_ACKS', cls.acks),
enable_idempotence=os.getenv('KAFKA_ENABLE_IDEMPOTENCE', 'true').lower() == 'true',
compression_type=os.getenv('KAFKA_COMPRESSION_TYPE', cls.compression_type),
transactional_id=os.getenv('KAFKA_TRANSACTIONAL_ID'),
)
def to_producer_config(self) -> Dict[str, Any]:
config = {
'bootstrap.servers': self.bootstrap_servers,
'client.id': self.client_id,
'acks': self.acks,
'retries': self.retries,
'batch.size': self.batch_size,
'linger.ms': self.linger_ms,
'buffer.memory': self.buffer_memory,
'enable.idempotence': self.enable_idempotence,
'compression.type': self.compression_type,
'security.protocol': self.security_protocol,
}
# Optional configurations
if self.sasl_mechanism:
config['sasl.mechanism'] = self.sasl_mechanism
config['sasl.username'] = self.sasl_username
config['sasl.password'] = self.sasl_password
if self.transactional_id:
config['transactional.id'] = self.transactional_id
config['transaction.timeout.ms'] = self.transaction_timeout_ms
return {k: v for k, v in config.items() if v is not None}
# config/topic_config.py
@dataclass
class TopicConfig:
order_placed: str = "order.placed.v1"
order_cancelled: str = "order.cancelled.v1"
payment_processed: str = "payment.processed.v1"
inventory_reserved: str = "inventory.reserved.v1"
shipping_requested: str = "shipping.requested.v1"
@classmethod
def from_env(cls) -> 'TopicConfig':
return cls(
order_placed=os.getenv('TOPIC_ORDER_PLACED', cls.order_placed),
order_cancelled=os.getenv('TOPIC_ORDER_CANCELLED', cls.order_cancelled),
payment_processed=os.getenv('TOPIC_PAYMENT_PROCESSED', cls.payment_processed),
inventory_reserved=os.getenv('TOPIC_INVENTORY_RESERVED', cls.inventory_reserved),
shipping_requested=os.getenv('TOPIC_SHIPPING_REQUESTED', cls.shipping_requested),
)Kafka-Python ist eine pure Python Implementierung, die einfacher zu deployen ist, aber weniger Performance bietet.
# Kafka-Python Configuration Alternative
from kafka import KafkaProducer
import json
from typing import Dict, Any
class KafkaPythonConfig:
def __init__(self):
self.bootstrap_servers = ['localhost:9092']
self.value_serializer = lambda v: json.dumps(v).encode('utf-8')
self.key_serializer = lambda k: k.encode('utf-8') if k else None
self.acks = 'all'
self.retries = 3
self.batch_size = 16384
self.linger_ms = 5
self.buffer_memory = 33554432
self.compression_type = 'snappy'
self.enable_idempotence = True
def create_producer(self) -> KafkaProducer:
return KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=self.value_serializer,
key_serializer=self.key_serializer,
acks=self.acks,
retries=self.retries,
batch_size=self.batch_size,
linger_ms=self.linger_ms,
buffer_memory=self.buffer_memory,
compression_type=self.compression_type,
enable_idempotence=self.enable_idempotence
)# producer/factory.py
from abc import ABC, abstractmethod
from confluent_kafka import Producer
from kafka import KafkaProducer
import structlog
logger = structlog.get_logger()
class ProducerFactory(ABC):
@abstractmethod
def create_producer(self):
pass
@abstractmethod
def get_producer_type(self) -> str:
pass
class ConfluentProducerFactory(ProducerFactory):
def __init__(self, config: KafkaProducerConfig):
self.config = config
def create_producer(self) -> Producer:
producer_config = self.config.to_producer_config()
logger.info("Creating Confluent Kafka producer", config=producer_config)
return Producer(producer_config)
def get_producer_type(self) -> str:
return "confluent-kafka"
class KafkaPythonProducerFactory(ProducerFactory):
def __init__(self, config: KafkaPythonConfig):
self.config = config
def create_producer(self) -> KafkaProducer:
logger.info("Creating kafka-python producer")
return self.config.create_producer()
def get_producer_type(self) -> str:
return "kafka-python"
# Dependency Injection Container
class ProducerContainer:
def __init__(self, factory: ProducerFactory):
self._factory = factory
self._producer = None
@property
def producer(self):
if self._producer is None:
self._producer = self._factory.create_producer()
return self._producer
def close(self):
if self._producer:
if hasattr(self._producer, 'flush'):
self._producer.flush()
if hasattr(self._producer, 'close'):
self._producer.close()
self._producer = NoneModerne Python-Anwendungen nutzen AsyncIO für non-blocking
Event-Produktion. Dies erfordert spezielle Patterns und Bibliotheken wie
aiokafka für native AsyncIO-Unterstützung.
# producer/async_producer.py
import asyncio
import json
import uuid
from datetime import datetime
from typing import Any, Dict, Optional, Callable
from dataclasses import asdict
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
import structlog
logger = structlog.get_logger()
class AsyncEventProducer:
def __init__(self, bootstrap_servers: str = "localhost:9092"):
self.bootstrap_servers = bootstrap_servers
self.producer: Optional[AIOKafkaProducer] = None
self._started = False
async def start(self):
"""Initialize and start the async producer"""
if self._started:
return
self.producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=self._serialize_value,
key_serializer=self._serialize_key,
acks='all',
enable_idempotence=True,
max_batch_size=16384,
linger_ms=5,
compression_type='snappy'
)
await self.producer.start()
self._started = True
logger.info("Async Kafka producer started")
async def stop(self):
"""Stop and cleanup the producer"""
if not self._started or not self.producer:
return
await self.producer.stop()
self._started = False
logger.info("Async Kafka producer stopped")
async def send_event(self, topic: str, event: Any, key: Optional[str] = None) -> dict:
"""Send event asynchronously with proper error handling"""
if not self._started:
await self.start()
try:
# Send and await the result
record_metadata = await self.producer.send_and_wait(
topic=topic,
value=event,
key=key
)
result = {
'topic': record_metadata.topic,
'partition': record_metadata.partition,
'offset': record_metadata.offset,
'timestamp': record_metadata.timestamp,
'success': True
}
logger.debug("Event sent successfully", **result)
return result
except KafkaError as e:
logger.error("Failed to send event", topic=topic, key=key, error=str(e))
raise EventPublishingException(f"Failed to send event to {topic}") from e
async def send_event_fire_and_forget(self, topic: str, event: Any,
key: Optional[str] = None):
"""Send event without waiting for confirmation (highest performance)"""
if not self._started:
await self.start()
try:
# Fire and forget - don't await
await self.producer.send(topic=topic, value=event, key=key)
logger.debug("Event sent (fire and forget)", topic=topic, key=key)
except Exception as e:
logger.error("Failed to send event (fire and forget)",
topic=topic, key=key, error=str(e))
async def send_batch(self, events: list[tuple[str, Any, Optional[str]]]) -> list[dict]:
"""Send multiple events in batch for better performance"""
if not self._started:
await self.start()
tasks = []
for topic, event, key in events:
task = self.send_event(topic, event, key)
tasks.append(task)
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r if not isinstance(r, Exception) else {'success': False, 'error': str(r)}
for r in results]
except Exception as e:
logger.error("Batch send failed", error=str(e))
raise
def _serialize_value(self, value: Any) -> bytes:
"""Serialize event value to JSON bytes"""
if hasattr(value, '__dict__'):
value = asdict(value) if hasattr(value, '__dataclass_fields__') else value.__dict__
return json.dumps(value, default=str).encode('utf-8')
def _serialize_key(self, key: str) -> bytes:
"""Serialize key to bytes"""
return key.encode('utf-8') if key else None
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
class EventPublishingException(Exception):
pass# producer/confluent_async_producer.py
import asyncio
import concurrent.futures
from typing import Any, Optional, Callable
from confluent_kafka import Producer, Message
import structlog
logger = structlog.get_logger()
class ConfluentAsyncProducer:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.producer = Producer(config)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
self._delivery_callbacks: Dict[str, Callable] = {}
async def send_event_async(self, topic: str, value: Any, key: Optional[str] = None) -> dict:
"""Send event asynchronously using thread pool"""
# Create a future for the delivery callback
loop = asyncio.get_event_loop()
future = loop.create_future()
def delivery_callback(err: Optional[Exception], msg: Message):
if err:
if not future.done():
future.set_exception(err)
else:
if not future.done():
result = {
'topic': msg.topic(),
'partition': msg.partition(),
'offset': msg.offset(),
'timestamp': msg.timestamp(),
'success': True
}
future.set_result(result)
# Send in thread pool to avoid blocking
def produce():
try:
self.producer.produce(
topic=topic,
value=json.dumps(value) if not isinstance(value, (str, bytes)) else value,
key=key,
callback=delivery_callback
)
self.producer.poll(0) # Trigger delivery
except Exception as e:
if not future.done():
future.set_exception(e)
await loop.run_in_executor(self.executor, produce)
# Wait for the delivery callback
try:
result = await asyncio.wait_for(future, timeout=10.0)
logger.debug("Event sent successfully", **result)
return result
except asyncio.TimeoutError:
logger.error("Event delivery timeout", topic=topic, key=key)
raise EventPublishingException(f"Timeout sending event to {topic}")
async def send_batch_async(self, events: list[tuple[str, Any, Optional[str]]]) -> list[dict]:
"""Send multiple events concurrently"""
tasks = [
self.send_event_async(topic, value, key)
for topic, value, key in events
]
return await asyncio.gather(*tasks, return_exceptions=True)
async def flush_async(self, timeout: float = 10.0):
"""Flush producer in thread pool"""
loop = asyncio.get_event_loop()
def flush():
return self.producer.flush(timeout)
await loop.run_in_executor(self.executor, flush)
async def close(self):
"""Close producer and cleanup resources"""
await self.flush_async()
self.executor.shutdown(wait=True)
logger.info("Confluent async producer closed")# producer/connection_pool.py
import asyncio
from typing import Dict, Any
from contextlib import asynccontextmanager
class AsyncProducerPool:
def __init__(self, config: Dict[str, Any], pool_size: int = 5):
self.config = config
self.pool_size = pool_size
self._pool: asyncio.Queue = asyncio.Queue(maxsize=pool_size)
self._initialized = False
async def initialize(self):
"""Initialize the producer pool"""
if self._initialized:
return
for _ in range(self.pool_size):
producer = AsyncEventProducer(self.config['bootstrap_servers'])
await producer.start()
await self._pool.put(producer)
self._initialized = True
logger.info("Producer pool initialized", pool_size=self.pool_size)
@asynccontextmanager
async def get_producer(self):
"""Get a producer from the pool"""
if not self._initialized:
await self.initialize()
producer = await self._pool.get()
try:
yield producer
finally:
await self._pool.put(producer)
async def close_all(self):
"""Close all producers in the pool"""
producers = []
while not self._pool.empty():
producers.append(await self._pool.get())
for producer in producers:
await producer.stop()
logger.info("All producers in pool closed")Python Event-Produktion erfordert robuste Fehlerbehandlung für verschiedene Fehlertypen: Netzwerk-Issues, Serialisierung, Kafka-Broker-Probleme und Application-Level Errors.
# producer/error_handling.py
import asyncio
import time
from typing import Any, Optional, Callable, Dict
from enum import Enum
import structlog
logger = structlog.get_logger()
class ErrorType(Enum):
NETWORK_ERROR = "network_error"
SERIALIZATION_ERROR = "serialization_error"
KAFKA_ERROR = "kafka_error"
TIMEOUT_ERROR = "timeout_error"
CONFIGURATION_ERROR = "configuration_error"
class EventPublishingError(Exception):
def __init__(self, message: str, error_type: ErrorType, original_error: Optional[Exception] = None):
super().__init__(message)
self.error_type = error_type
self.original_error = original_error
class RetryPolicy:
def __init__(self, max_attempts: int = 3, base_delay: float = 1.0,
max_delay: float = 60.0, backoff_factor: float = 2.0):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.backoff_factor = backoff_factor
def calculate_delay(self, attempt: int) -> float:
"""Calculate exponential backoff delay"""
delay = self.base_delay * (self.backoff_factor ** (attempt - 1))
return min(delay, self.max_delay)
class RobustEventProducer:
def __init__(self, producer: AsyncEventProducer, retry_policy: RetryPolicy = None):
self.producer = producer
self.retry_policy = retry_policy or RetryPolicy()
self.metrics = EventMetrics()
async def send_with_retry(self, topic: str, event: Any, key: Optional[str] = None) -> dict:
"""Send event with comprehensive retry logic"""
last_error = None
for attempt in range(1, self.retry_policy.max_attempts + 1):
try:
start_time = time.time()
result = await self.producer.send_event(topic, event, key)
# Record success metrics
self.metrics.record_success(topic, time.time() - start_time)
return result
except Exception as e:
last_error = e
error_type = self._classify_error(e)
self.metrics.record_error(topic, error_type)
logger.warning(
"Event publishing attempt failed",
topic=topic, key=key, attempt=attempt,
max_attempts=self.retry_policy.max_attempts,
error_type=error_type.value, error=str(e)
)
# Don't retry non-retryable errors
if not self._is_retryable_error(error_type):
await self._handle_non_retryable_error(topic, event, key, e, error_type)
raise EventPublishingError(
f"Non-retryable error: {e}", error_type, e
)
# Wait before retry (except on last attempt)
if attempt < self.retry_policy.max_attempts:
delay = self.retry_policy.calculate_delay(attempt)
await asyncio.sleep(delay)
# All retries exhausted
await self._handle_retry_exhausted(topic, event, key, last_error)
raise EventPublishingError(
f"All {self.retry_policy.max_attempts} retry attempts failed",
self._classify_error(last_error), last_error
)
def _classify_error(self, error: Exception) -> ErrorType:
"""Classify error type for appropriate handling"""
error_str = str(error).lower()
if 'timeout' in error_str or isinstance(error, asyncio.TimeoutError):
return ErrorType.TIMEOUT_ERROR
elif 'network' in error_str or 'connection' in error_str:
return ErrorType.NETWORK_ERROR
elif 'serializ' in error_str or 'json' in error_str:
return ErrorType.SERIALIZATION_ERROR
elif 'kafka' in error_str:
return ErrorType.KAFKA_ERROR
else:
return ErrorType.CONFIGURATION_ERROR
def _is_retryable_error(self, error_type: ErrorType) -> bool:
"""Determine if error type is retryable"""
retryable_errors = {
ErrorType.NETWORK_ERROR,
ErrorType.KAFKA_ERROR,
ErrorType.TIMEOUT_ERROR
}
return error_type in retryable_errors
async def _handle_non_retryable_error(self, topic: str, event: Any, key: Optional[str],
error: Exception, error_type: ErrorType):
"""Handle non-retryable errors (e.g., send to dead letter)"""
logger.error(
"Non-retryable error occurred",
topic=topic, key=key, error_type=error_type.value, error=str(error)
)
# Send to dead letter topic
await self._send_to_dead_letter(topic, event, key, error)
async def _handle_retry_exhausted(self, topic: str, event: Any, key: Optional[str],
last_error: Exception):
"""Handle case when all retries are exhausted"""
logger.error(
"All retry attempts exhausted",
topic=topic, key=key, error=str(last_error)
)
# Send to dead letter topic
await self._send_to_dead_letter(topic, event, key, last_error)
async def _send_to_dead_letter(self, original_topic: str, event: Any,
key: Optional[str], error: Exception):
"""Send failed event to dead letter topic"""
dead_letter_topic = f"{original_topic}.dead-letter"
dead_letter_event = {
'original_topic': original_topic,
'original_key': key,
'original_event': event,
'error_message': str(error),
'error_type': self._classify_error(error).value,
'timestamp': time.time()
}
try:
await self.producer.send_event_fire_and_forget(
dead_letter_topic, dead_letter_event, key
)
logger.info("Event sent to dead letter topic",
original_topic=original_topic, dead_letter_topic=dead_letter_topic)
except Exception as dle:
logger.error("Failed to send to dead letter topic",
original_topic=original_topic, error=str(dle))
class EventMetrics:
def __init__(self):
self.success_count: Dict[str, int] = {}
self.error_count: Dict[str, Dict[str, int]] = {}
self.latency_sum: Dict[str, float] = {}
self.latency_count: Dict[str, int] = {}
def record_success(self, topic: str, latency: float):
self.success_count[topic] = self.success_count.get(topic, 0) + 1
self.latency_sum[topic] = self.latency_sum.get(topic, 0) + latency
self.latency_count[topic] = self.latency_count.get(topic, 0) + 1
def record_error(self, topic: str, error_type: ErrorType):
if topic not in self.error_count:
self.error_count[topic] = {}
error_type_str = error_type.value
self.error_count[topic][error_type_str] = \
self.error_count[topic].get(error_type_str, 0) + 1
def get_average_latency(self, topic: str) -> float:
if topic not in self.latency_count or self.latency_count[topic] == 0:
return 0.0
return self.latency_sum[topic] / self.latency_count[topic]
def get_error_rate(self, topic: str) -> float:
total_errors = sum(self.error_count.get(topic, {}).values())
total_success = self.success_count.get(topic, 0)
total_attempts = total_errors + total_success
return total_errors / total_attempts if total_attempts > 0 else 0.0Die folgenden Beispiele zeigen production-ready Implementierungen für das E-Commerce-Szenario mit modernen Python-Patterns.
# services/order_event_publisher.py
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Optional
import uuid
import structlog
logger = structlog.get_logger()
@dataclass
class OrderItem:
product_id: str
quantity: int
price: float
@dataclass
class Address:
street: str
city: str
postal_code: str
country: str
@dataclass
class OrderPlacedEvent:
event_id: str
event_type: str
timestamp: str
version: str
order_id: str
customer_id: str
items: List[OrderItem]
total_amount: float
currency: str
shipping_address: Address
@classmethod
def from_order(cls, order: 'Order') -> 'OrderPlacedEvent':
return cls(
event_id=str(uuid.uuid4()),
event_type="OrderPlaced",
timestamp=datetime.utcnow().isoformat(),
version="v1",
order_id=order.order_id,
customer_id=order.customer_id,
items=[OrderItem(
product_id=item.product_id,
quantity=item.quantity,
price=item.price
) for item in order.items],
total_amount=order.total_amount,
currency=order.currency,
shipping_address=Address(
street=order.shipping_address.street,
city=order.shipping_address.city,
postal_code=order.shipping_address.postal_code,
country=order.shipping_address.country
)
)
@dataclass
class OrderCancelledEvent:
event_id: str
event_type: str
timestamp: str
version: str
order_id: str
customer_id: str
reason: str
original_amount: float
@classmethod
def from_order(cls, order: 'Order', reason: str) -> 'OrderCancelledEvent':
return cls(
event_id=str(uuid.uuid4()),
event_type="OrderCancelled",
timestamp=datetime.utcnow().isoformat(),
version="v1",
order_id=order.order_id,
customer_id=order.customer_id,
reason=reason,
original_amount=order.total_amount
)
class OrderEventPublisher:
def __init__(self, producer: RobustEventProducer, topic_config: TopicConfig):
self.producer = producer
self.topic_config = topic_config
self.logger = logger.bind(component="OrderEventPublisher")
async def publish_order_placed(self, order: 'Order') -> dict:
"""Publish OrderPlaced event with full error handling"""
event = OrderPlacedEvent.from_order(order)
self.logger.info(
"Publishing OrderPlaced event",
order_id=order.order_id,
customer_id=order.customer_id,
event_id=event.event_id
)
try:
result = await self.producer.send_with_retry(
topic=self.topic_config.order_placed,
event=asdict(event),
key=order.order_id
)
self.logger.info(
"OrderPlaced event published successfully",
order_id=order.order_id,
event_id=event.event_id,
offset=result.get('offset')
)
return result
except EventPublishingError as e:
self.logger.error(
"Failed to publish OrderPlaced event",
order_id=order.order_id,
event_id=event.event_id,
error=str(e)
)
raise
async def publish_order_cancelled(self, order: 'Order', reason: str) -> dict:
"""Publish OrderCancelled event"""
event = OrderCancelledEvent.from_order(order, reason)
self.logger.info(
"Publishing OrderCancelled event",
order_id=order.order_id,
reason=reason,
event_id=event.event_id
)
try:
result = await self.producer.send_with_retry(
topic=self.topic_config.order_cancelled,
event=asdict(event),
key=order.order_id
)
self.logger.info(
"OrderCancelled event published successfully",
order_id=order.order_id,
event_id=event.event_id
)
return result
except EventPublishingError as e:
self.logger.error(
"Failed to publish OrderCancelled event",
order_id=order.order_id,
error=str(e)
)
raise
async def publish_batch_events(self, events: List[tuple['Order', str]]) -> List[dict]:
"""Publish multiple order events in batch"""
batch_events = []
for order, event_type in events:
if event_type == "OrderPlaced":
event = OrderPlacedEvent.from_order(order)
topic = self.topic_config.order_placed
elif event_type == "OrderCancelled":
event = OrderCancelledEvent.from_order(order, "batch_cancellation")
topic = self.topic_config.order_cancelled
else:
continue
batch_events.append((topic, asdict(event), order.order_id))
self.logger.info("Publishing batch events", batch_size=len(batch_events))
return await self.producer.producer.send_batch(batch_events)# services/order_service.py
import asyncio
from contextlib import asynccontextmanager
from typing import Optional
import structlog
logger = structlog.get_logger()
class TransactionalOrderService:
def __init__(self, order_repository: 'OrderRepository',
event_publisher: OrderEventPublisher,
db_session_factory):
self.order_repository = order_repository
self.event_publisher = event_publisher
self.db_session_factory = db_session_factory
self.logger = logger.bind(component="TransactionalOrderService")
@asynccontextmanager
async def database_transaction(self):
"""Database transaction context manager"""
session = await self.db_session_factory.create_session()
try:
await session.begin()
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def create_order_with_events(self, request: 'CreateOrderRequest') -> 'Order':
"""Create order with transactional event publishing"""
self.logger.info("Creating order with events", customer_id=request.customer_id)
# Use database transaction
async with self.database_transaction() as session:
try:
# 1. Create and save order
order = Order.from_request(request)
await self.order_repository.save(order, session)
# 2. Publish events (outside DB transaction for reliability)
await self.event_publisher.publish_order_placed(order)
# 3. Publish additional events if needed
if order.requires_inventory_check():
await self._publish_inventory_reservation_request(order)
if order.requires_payment_processing():
await self._publish_payment_request(order)
self.logger.info(
"Order created successfully with events",
order_id=order.order_id,
total_amount=order.total_amount
)
return order
except EventPublishingError as e:
self.logger.error(
"Event publishing failed, order creation aborted",
error=str(e)
)
# Re-raise to trigger transaction rollback
raise
except Exception as e:
self.logger.error(
"Order creation failed",
error=str(e)
)
raise
async def cancel_order_with_compensation(self, order_id: str, reason: str) -> bool:
"""Cancel order with compensation events"""
self.logger.info("Cancelling order", order_id=order_id, reason=reason)
async with self.database_transaction() as session:
try:
# 1. Load and validate order
order = await self.order_repository.find_by_id(order_id, session)
if not order:
raise ValueError(f"Order not found: {order_id}")
if not order.is_cancellable():
raise ValueError(f"Order cannot be cancelled: {order_id}")
# 2. Update order status
order.cancel(reason)
await self.order_repository.save(order, session)
# 3. Publish cancellation event
await self.event_publisher.publish_order_cancelled(order, reason)
# 4. Publish compensation events
await self._publish_compensation_events(order)
self.logger.info("Order cancelled successfully", order_id=order_id)
return True
except Exception as e:
self.logger.error("Order cancellation failed",
order_id=order_id, error=str(e))
raise
async def _publish_inventory_reservation_request(self, order: 'Order'):
"""Publish inventory reservation request event"""
# Implementation for inventory event publishing
pass
async def _publish_payment_request(self, order: 'Order'):
"""Publish payment request event"""
# Implementation for payment event publishing
pass
async def _publish_compensation_events(self, order: 'Order'):
"""Publish compensation events for cancelled order"""
# Implementation for compensation events
pass# services/production_event_service.py
import asyncio
import time
from typing import Dict, Any, Optional
from prometheus_client import Counter, Histogram, Gauge
import structlog
# Prometheus metrics
EVENT_COUNTER = Counter('events_published_total',
'Total events published', ['topic', 'status'])
EVENT_LATENCY = Histogram('event_publishing_duration_seconds',
'Event publishing latency', ['topic'])
ACTIVE_CONNECTIONS = Gauge('kafka_active_connections',
'Active Kafka connections')
logger = structlog.get_logger()
class ProductionEventService:
def __init__(self, producer_pool: AsyncProducerPool,
topic_config: TopicConfig,
circuit_breaker: Optional['CircuitBreaker'] = None):
self.producer_pool = producer_pool
self.topic_config = topic_config
self.circuit_breaker = circuit_breaker
self.logger = logger.bind(component="ProductionEventService")
async def publish_order_event(self, event_type: str, order: 'Order',
**kwargs) -> dict:
"""Publish order event with full production monitoring"""
start_time = time.time()
topic = self._get_topic_for_event_type(event_type)
# Circuit breaker check
if self.circuit_breaker and not self.circuit_breaker.can_execute():
EVENT_COUNTER.labels(topic=topic, status='circuit_open').inc()
raise EventPublishingError("Circuit breaker is open",
ErrorType.KAFKA_ERROR)
try:
async with self.producer_pool.get_producer() as producer:
ACTIVE_CONNECTIONS.inc()
try:
# Create event based on type
event_data = self._create_event_data(event_type, order, **kwargs)
# Publish with monitoring
with EVENT_LATENCY.labels(topic=topic).time():
result = await producer.send_event(
topic=topic,
event=event_data,
key=order.order_id
)
# Record success
EVENT_COUNTER.labels(topic=topic, status='success').inc()
if self.circuit_breaker:
self.circuit_breaker.record_success()
duration = time.time() - start_time
self.logger.info(
"Event published successfully",
event_type=event_type,
order_id=order.order_id,
topic=topic,
duration=duration,
offset=result.get('offset')
)
return result
finally:
ACTIVE_CONNECTIONS.dec()
except Exception as e:
EVENT_COUNTER.labels(topic=topic, status='error').inc()
if self.circuit_breaker:
self.circuit_breaker.record_failure()
self.logger.error(
"Event publishing failed",
event_type=event_type,
order_id=order.order_id,
topic=topic,
error=str(e)
)
raise
def _get_topic_for_event_type(self, event_type: str) -> str:
"""Map event type to topic"""
topic_mapping = {
'OrderPlaced': self.topic_config.order_placed,
'OrderCancelled': self.topic_config.order_cancelled,
'PaymentProcessed': self.topic_config.payment_processed,
'InventoryReserved': self.topic_config.inventory_reserved,
'ShippingRequested': self.topic_config.shipping_requested,
}
if event_type not in topic_mapping:
raise ValueError(f"Unknown event type: {event_type}")
return topic_mapping[event_type]
def _create_event_data(self, event_type: str, order: 'Order', **kwargs) -> dict:
"""Create event data based on event type"""
if event_type == 'OrderPlaced':
return asdict(OrderPlacedEvent.from_order(order))
elif event_type == 'OrderCancelled':
reason = kwargs.get('reason', 'Unknown')
return asdict(OrderCancelledEvent.from_order(order, reason))
else:
raise ValueError(f"Unsupported event type for data creation: {event_type}")
async def health_check(self) -> Dict[str, Any]:
"""Health check for the event service"""
try:
async with self.producer_pool.get_producer() as producer:
# Try to send a test event
test_result = await asyncio.wait_for(
producer.send_event_fire_and_forget(
"health-check", {"timestamp": time.time()}, "health"
),
timeout=5.0
)
return {
"status": "healthy",
"producer_pool_size": self.producer_pool.pool_size,
"circuit_breaker_state": self.circuit_breaker.state.name if self.circuit_breaker else "disabled"
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}Diese Python-Implementierungen zeigen moderne, production-ready Patterns für Event-Driven Architecture. Sie nutzen AsyncIO für hohe Performance, umfassende Fehlerbehandlung für Robustheit und Monitoring für Observability. Die durchgängige Verwendung von Type Hints und strukturiertem Logging verbessert Wartbarkeit und Debugging.
Status-Tracking: ✅ Kapitel “Event-Erzeugung mit Python” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf AsyncIO, Error Handling und Production-Ready Patterns