23 Event-Erzeugung mit Python

Python bietet mehrere ausgereifte Bibliotheken für Kafka-Integration, wobei confluent-kafka-python und kafka-python die populärsten sind. Moderne Python-Anwendungen nutzen AsyncIO für high-performance Event-Produktion, während die Flexibilität der Sprache elegante Abstraktionen und robuste Fehlerbehandlung ermöglicht.

python_producer_arch.svg

23.1 Producer Configuration

Python Kafka Clients erfordern explizite Konfiguration über Dictionary-basierte Einstellungen. Die Wahl zwischen confluent-kafka-python (basiert auf librdkafka) und kafka-python (pure Python) hängt von Performance-Anforderungen und Deployment-Constraints ab.

23.1.1 Confluent Kafka Python Configuration

Confluent Kafka Python bietet die beste Performance durch die zugrundeliegende librdkafka C-Bibliothek und unterstützt alle modernen Kafka-Features.

# requirements.txt
confluent-kafka==2.3.0
aiokafka==0.8.11
pydantic==2.5.0
structlog==23.2.0

# config/kafka_config.py
from dataclasses import dataclass
from typing import Dict, Any, Optional
import os

@dataclass
class KafkaProducerConfig:
    bootstrap_servers: str = "localhost:9092"
    client_id: str = "order-service-python"
    acks: str = "all"
    retries: int = 3
    batch_size: int = 16384
    linger_ms: int = 5
    buffer_memory: int = 33554432
    enable_idempotence: bool = True
    compression_type: str = "snappy"
    
    # Security
    security_protocol: str = "PLAINTEXT"
    sasl_mechanism: Optional[str] = None
    sasl_username: Optional[str] = None
    sasl_password: Optional[str] = None
    
    # Transactions
    transactional_id: Optional[str] = None
    transaction_timeout_ms: int = 60000
    
    @classmethod
    def from_env(cls) -> 'KafkaProducerConfig':
        return cls(
            bootstrap_servers=os.getenv('KAFKA_BOOTSTRAP_SERVERS', cls.bootstrap_servers),
            client_id=os.getenv('KAFKA_CLIENT_ID', cls.client_id),
            acks=os.getenv('KAFKA_ACKS', cls.acks),
            enable_idempotence=os.getenv('KAFKA_ENABLE_IDEMPOTENCE', 'true').lower() == 'true',
            compression_type=os.getenv('KAFKA_COMPRESSION_TYPE', cls.compression_type),
            transactional_id=os.getenv('KAFKA_TRANSACTIONAL_ID'),
        )
    
    def to_producer_config(self) -> Dict[str, Any]:
        config = {
            'bootstrap.servers': self.bootstrap_servers,
            'client.id': self.client_id,
            'acks': self.acks,
            'retries': self.retries,
            'batch.size': self.batch_size,
            'linger.ms': self.linger_ms,
            'buffer.memory': self.buffer_memory,
            'enable.idempotence': self.enable_idempotence,
            'compression.type': self.compression_type,
            'security.protocol': self.security_protocol,
        }
        
        # Optional configurations
        if self.sasl_mechanism:
            config['sasl.mechanism'] = self.sasl_mechanism
            config['sasl.username'] = self.sasl_username
            config['sasl.password'] = self.sasl_password
        
        if self.transactional_id:
            config['transactional.id'] = self.transactional_id
            config['transaction.timeout.ms'] = self.transaction_timeout_ms
        
        return {k: v for k, v in config.items() if v is not None}

# config/topic_config.py
@dataclass
class TopicConfig:
    order_placed: str = "order.placed.v1"
    order_cancelled: str = "order.cancelled.v1"
    payment_processed: str = "payment.processed.v1"
    inventory_reserved: str = "inventory.reserved.v1"
    shipping_requested: str = "shipping.requested.v1"
    
    @classmethod
    def from_env(cls) -> 'TopicConfig':
        return cls(
            order_placed=os.getenv('TOPIC_ORDER_PLACED', cls.order_placed),
            order_cancelled=os.getenv('TOPIC_ORDER_CANCELLED', cls.order_cancelled),
            payment_processed=os.getenv('TOPIC_PAYMENT_PROCESSED', cls.payment_processed),
            inventory_reserved=os.getenv('TOPIC_INVENTORY_RESERVED', cls.inventory_reserved),
            shipping_requested=os.getenv('TOPIC_SHIPPING_REQUESTED', cls.shipping_requested),
        )

23.1.2 Kafka-Python Configuration

Kafka-Python ist eine pure Python Implementierung, die einfacher zu deployen ist, aber weniger Performance bietet.

# Kafka-Python Configuration Alternative
from kafka import KafkaProducer
import json
from typing import Dict, Any

class KafkaPythonConfig:
    def __init__(self):
        self.bootstrap_servers = ['localhost:9092']
        self.value_serializer = lambda v: json.dumps(v).encode('utf-8')
        self.key_serializer = lambda k: k.encode('utf-8') if k else None
        self.acks = 'all'
        self.retries = 3
        self.batch_size = 16384
        self.linger_ms = 5
        self.buffer_memory = 33554432
        self.compression_type = 'snappy'
        self.enable_idempotence = True
    
    def create_producer(self) -> KafkaProducer:
        return KafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=self.value_serializer,
            key_serializer=self.key_serializer,
            acks=self.acks,
            retries=self.retries,
            batch_size=self.batch_size,
            linger_ms=self.linger_ms,
            buffer_memory=self.buffer_memory,
            compression_type=self.compression_type,
            enable_idempotence=self.enable_idempotence
        )

23.1.3 Factory Pattern für Producer Management

# producer/factory.py
from abc import ABC, abstractmethod
from confluent_kafka import Producer
from kafka import KafkaProducer
import structlog

logger = structlog.get_logger()

class ProducerFactory(ABC):
    @abstractmethod
    def create_producer(self):
        pass
    
    @abstractmethod
    def get_producer_type(self) -> str:
        pass

class ConfluentProducerFactory(ProducerFactory):
    def __init__(self, config: KafkaProducerConfig):
        self.config = config
    
    def create_producer(self) -> Producer:
        producer_config = self.config.to_producer_config()
        logger.info("Creating Confluent Kafka producer", config=producer_config)
        return Producer(producer_config)
    
    def get_producer_type(self) -> str:
        return "confluent-kafka"

class KafkaPythonProducerFactory(ProducerFactory):
    def __init__(self, config: KafkaPythonConfig):
        self.config = config
    
    def create_producer(self) -> KafkaProducer:
        logger.info("Creating kafka-python producer")
        return self.config.create_producer()
    
    def get_producer_type(self) -> str:
        return "kafka-python"

# Dependency Injection Container
class ProducerContainer:
    def __init__(self, factory: ProducerFactory):
        self._factory = factory
        self._producer = None
    
    @property
    def producer(self):
        if self._producer is None:
            self._producer = self._factory.create_producer()
        return self._producer
    
    def close(self):
        if self._producer:
            if hasattr(self._producer, 'flush'):
                self._producer.flush()
            if hasattr(self._producer, 'close'):
                self._producer.close()
            self._producer = None

23.2 Async/Await Patterns

Moderne Python-Anwendungen nutzen AsyncIO für non-blocking Event-Produktion. Dies erfordert spezielle Patterns und Bibliotheken wie aiokafka für native AsyncIO-Unterstützung.

23.2.1 AsyncIO mit aiokafka

# producer/async_producer.py
import asyncio
import json
import uuid
from datetime import datetime
from typing import Any, Dict, Optional, Callable
from dataclasses import asdict

from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
import structlog

logger = structlog.get_logger()

class AsyncEventProducer:
    def __init__(self, bootstrap_servers: str = "localhost:9092"):
        self.bootstrap_servers = bootstrap_servers
        self.producer: Optional[AIOKafkaProducer] = None
        self._started = False
    
    async def start(self):
        """Initialize and start the async producer"""
        if self._started:
            return
        
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=self._serialize_value,
            key_serializer=self._serialize_key,
            acks='all',
            enable_idempotence=True,
            max_batch_size=16384,
            linger_ms=5,
            compression_type='snappy'
        )
        
        await self.producer.start()
        self._started = True
        logger.info("Async Kafka producer started")
    
    async def stop(self):
        """Stop and cleanup the producer"""
        if not self._started or not self.producer:
            return
        
        await self.producer.stop()
        self._started = False
        logger.info("Async Kafka producer stopped")
    
    async def send_event(self, topic: str, event: Any, key: Optional[str] = None) -> dict:
        """Send event asynchronously with proper error handling"""
        if not self._started:
            await self.start()
        
        try:
            # Send and await the result
            record_metadata = await self.producer.send_and_wait(
                topic=topic,
                value=event,
                key=key
            )
            
            result = {
                'topic': record_metadata.topic,
                'partition': record_metadata.partition,
                'offset': record_metadata.offset,
                'timestamp': record_metadata.timestamp,
                'success': True
            }
            
            logger.debug("Event sent successfully", **result)
            return result
            
        except KafkaError as e:
            logger.error("Failed to send event", topic=topic, key=key, error=str(e))
            raise EventPublishingException(f"Failed to send event to {topic}") from e
    
    async def send_event_fire_and_forget(self, topic: str, event: Any, 
                                       key: Optional[str] = None):
        """Send event without waiting for confirmation (highest performance)"""
        if not self._started:
            await self.start()
        
        try:
            # Fire and forget - don't await
            await self.producer.send(topic=topic, value=event, key=key)
            logger.debug("Event sent (fire and forget)", topic=topic, key=key)
        except Exception as e:
            logger.error("Failed to send event (fire and forget)", 
                        topic=topic, key=key, error=str(e))
    
    async def send_batch(self, events: list[tuple[str, Any, Optional[str]]]) -> list[dict]:
        """Send multiple events in batch for better performance"""
        if not self._started:
            await self.start()
        
        tasks = []
        for topic, event, key in events:
            task = self.send_event(topic, event, key)
            tasks.append(task)
        
        try:
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return [r if not isinstance(r, Exception) else {'success': False, 'error': str(r)} 
                   for r in results]
        except Exception as e:
            logger.error("Batch send failed", error=str(e))
            raise
    
    def _serialize_value(self, value: Any) -> bytes:
        """Serialize event value to JSON bytes"""
        if hasattr(value, '__dict__'):
            value = asdict(value) if hasattr(value, '__dataclass_fields__') else value.__dict__
        return json.dumps(value, default=str).encode('utf-8')
    
    def _serialize_key(self, key: str) -> bytes:
        """Serialize key to bytes"""
        return key.encode('utf-8') if key else None
    
    async def __aenter__(self):
        await self.start()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.stop()

class EventPublishingException(Exception):
    pass

23.2.2 Confluent Kafka mit AsyncIO

# producer/confluent_async_producer.py
import asyncio
import concurrent.futures
from typing import Any, Optional, Callable
from confluent_kafka import Producer, Message
import structlog

logger = structlog.get_logger()

class ConfluentAsyncProducer:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.producer = Producer(config)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
        self._delivery_callbacks: Dict[str, Callable] = {}
    
    async def send_event_async(self, topic: str, value: Any, key: Optional[str] = None) -> dict:
        """Send event asynchronously using thread pool"""
        
        # Create a future for the delivery callback
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        
        def delivery_callback(err: Optional[Exception], msg: Message):
            if err:
                if not future.done():
                    future.set_exception(err)
            else:
                if not future.done():
                    result = {
                        'topic': msg.topic(),
                        'partition': msg.partition(),
                        'offset': msg.offset(),
                        'timestamp': msg.timestamp(),
                        'success': True
                    }
                    future.set_result(result)
        
        # Send in thread pool to avoid blocking
        def produce():
            try:
                self.producer.produce(
                    topic=topic,
                    value=json.dumps(value) if not isinstance(value, (str, bytes)) else value,
                    key=key,
                    callback=delivery_callback
                )
                self.producer.poll(0)  # Trigger delivery
            except Exception as e:
                if not future.done():
                    future.set_exception(e)
        
        await loop.run_in_executor(self.executor, produce)
        
        # Wait for the delivery callback
        try:
            result = await asyncio.wait_for(future, timeout=10.0)
            logger.debug("Event sent successfully", **result)
            return result
        except asyncio.TimeoutError:
            logger.error("Event delivery timeout", topic=topic, key=key)
            raise EventPublishingException(f"Timeout sending event to {topic}")
    
    async def send_batch_async(self, events: list[tuple[str, Any, Optional[str]]]) -> list[dict]:
        """Send multiple events concurrently"""
        tasks = [
            self.send_event_async(topic, value, key) 
            for topic, value, key in events
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)
    
    async def flush_async(self, timeout: float = 10.0):
        """Flush producer in thread pool"""
        loop = asyncio.get_event_loop()
        
        def flush():
            return self.producer.flush(timeout)
        
        await loop.run_in_executor(self.executor, flush)
    
    async def close(self):
        """Close producer and cleanup resources"""
        await self.flush_async()
        self.executor.shutdown(wait=True)
        logger.info("Confluent async producer closed")

23.2.3 Context Manager und Connection Pooling

# producer/connection_pool.py
import asyncio
from typing import Dict, Any
from contextlib import asynccontextmanager

class AsyncProducerPool:
    def __init__(self, config: Dict[str, Any], pool_size: int = 5):
        self.config = config
        self.pool_size = pool_size
        self._pool: asyncio.Queue = asyncio.Queue(maxsize=pool_size)
        self._initialized = False
    
    async def initialize(self):
        """Initialize the producer pool"""
        if self._initialized:
            return
        
        for _ in range(self.pool_size):
            producer = AsyncEventProducer(self.config['bootstrap_servers'])
            await producer.start()
            await self._pool.put(producer)
        
        self._initialized = True
        logger.info("Producer pool initialized", pool_size=self.pool_size)
    
    @asynccontextmanager
    async def get_producer(self):
        """Get a producer from the pool"""
        if not self._initialized:
            await self.initialize()
        
        producer = await self._pool.get()
        try:
            yield producer
        finally:
            await self._pool.put(producer)
    
    async def close_all(self):
        """Close all producers in the pool"""
        producers = []
        while not self._pool.empty():
            producers.append(await self._pool.get())
        
        for producer in producers:
            await producer.stop()
        
        logger.info("All producers in pool closed")

23.3 Error Handling

Python Event-Produktion erfordert robuste Fehlerbehandlung für verschiedene Fehlertypen: Netzwerk-Issues, Serialisierung, Kafka-Broker-Probleme und Application-Level Errors.

23.3.1 Comprehensive Error Handling

# producer/error_handling.py
import asyncio
import time
from typing import Any, Optional, Callable, Dict
from enum import Enum
import structlog

logger = structlog.get_logger()

class ErrorType(Enum):
    NETWORK_ERROR = "network_error"
    SERIALIZATION_ERROR = "serialization_error"
    KAFKA_ERROR = "kafka_error"
    TIMEOUT_ERROR = "timeout_error"
    CONFIGURATION_ERROR = "configuration_error"

class EventPublishingError(Exception):
    def __init__(self, message: str, error_type: ErrorType, original_error: Optional[Exception] = None):
        super().__init__(message)
        self.error_type = error_type
        self.original_error = original_error

class RetryPolicy:
    def __init__(self, max_attempts: int = 3, base_delay: float = 1.0, 
                 max_delay: float = 60.0, backoff_factor: float = 2.0):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.backoff_factor = backoff_factor
    
    def calculate_delay(self, attempt: int) -> float:
        """Calculate exponential backoff delay"""
        delay = self.base_delay * (self.backoff_factor ** (attempt - 1))
        return min(delay, self.max_delay)

class RobustEventProducer:
    def __init__(self, producer: AsyncEventProducer, retry_policy: RetryPolicy = None):
        self.producer = producer
        self.retry_policy = retry_policy or RetryPolicy()
        self.metrics = EventMetrics()
    
    async def send_with_retry(self, topic: str, event: Any, key: Optional[str] = None) -> dict:
        """Send event with comprehensive retry logic"""
        last_error = None
        
        for attempt in range(1, self.retry_policy.max_attempts + 1):
            try:
                start_time = time.time()
                result = await self.producer.send_event(topic, event, key)
                
                # Record success metrics
                self.metrics.record_success(topic, time.time() - start_time)
                return result
                
            except Exception as e:
                last_error = e
                error_type = self._classify_error(e)
                
                self.metrics.record_error(topic, error_type)
                logger.warning(
                    "Event publishing attempt failed",
                    topic=topic, key=key, attempt=attempt, 
                    max_attempts=self.retry_policy.max_attempts,
                    error_type=error_type.value, error=str(e)
                )
                
                # Don't retry non-retryable errors
                if not self._is_retryable_error(error_type):
                    await self._handle_non_retryable_error(topic, event, key, e, error_type)
                    raise EventPublishingError(
                        f"Non-retryable error: {e}", error_type, e
                    )
                
                # Wait before retry (except on last attempt)
                if attempt < self.retry_policy.max_attempts:
                    delay = self.retry_policy.calculate_delay(attempt)
                    await asyncio.sleep(delay)
        
        # All retries exhausted
        await self._handle_retry_exhausted(topic, event, key, last_error)
        raise EventPublishingError(
            f"All {self.retry_policy.max_attempts} retry attempts failed", 
            self._classify_error(last_error), last_error
        )
    
    def _classify_error(self, error: Exception) -> ErrorType:
        """Classify error type for appropriate handling"""
        error_str = str(error).lower()
        
        if 'timeout' in error_str or isinstance(error, asyncio.TimeoutError):
            return ErrorType.TIMEOUT_ERROR
        elif 'network' in error_str or 'connection' in error_str:
            return ErrorType.NETWORK_ERROR
        elif 'serializ' in error_str or 'json' in error_str:
            return ErrorType.SERIALIZATION_ERROR
        elif 'kafka' in error_str:
            return ErrorType.KAFKA_ERROR
        else:
            return ErrorType.CONFIGURATION_ERROR
    
    def _is_retryable_error(self, error_type: ErrorType) -> bool:
        """Determine if error type is retryable"""
        retryable_errors = {
            ErrorType.NETWORK_ERROR,
            ErrorType.KAFKA_ERROR,
            ErrorType.TIMEOUT_ERROR
        }
        return error_type in retryable_errors
    
    async def _handle_non_retryable_error(self, topic: str, event: Any, key: Optional[str], 
                                        error: Exception, error_type: ErrorType):
        """Handle non-retryable errors (e.g., send to dead letter)"""
        logger.error(
            "Non-retryable error occurred",
            topic=topic, key=key, error_type=error_type.value, error=str(error)
        )
        # Send to dead letter topic
        await self._send_to_dead_letter(topic, event, key, error)
    
    async def _handle_retry_exhausted(self, topic: str, event: Any, key: Optional[str], 
                                    last_error: Exception):
        """Handle case when all retries are exhausted"""
        logger.error(
            "All retry attempts exhausted",
            topic=topic, key=key, error=str(last_error)
        )
        # Send to dead letter topic
        await self._send_to_dead_letter(topic, event, key, last_error)
    
    async def _send_to_dead_letter(self, original_topic: str, event: Any, 
                                 key: Optional[str], error: Exception):
        """Send failed event to dead letter topic"""
        dead_letter_topic = f"{original_topic}.dead-letter"
        dead_letter_event = {
            'original_topic': original_topic,
            'original_key': key,
            'original_event': event,
            'error_message': str(error),
            'error_type': self._classify_error(error).value,
            'timestamp': time.time()
        }
        
        try:
            await self.producer.send_event_fire_and_forget(
                dead_letter_topic, dead_letter_event, key
            )
            logger.info("Event sent to dead letter topic", 
                       original_topic=original_topic, dead_letter_topic=dead_letter_topic)
        except Exception as dle:
            logger.error("Failed to send to dead letter topic", 
                        original_topic=original_topic, error=str(dle))

class EventMetrics:
    def __init__(self):
        self.success_count: Dict[str, int] = {}
        self.error_count: Dict[str, Dict[str, int]] = {}
        self.latency_sum: Dict[str, float] = {}
        self.latency_count: Dict[str, int] = {}
    
    def record_success(self, topic: str, latency: float):
        self.success_count[topic] = self.success_count.get(topic, 0) + 1
        self.latency_sum[topic] = self.latency_sum.get(topic, 0) + latency
        self.latency_count[topic] = self.latency_count.get(topic, 0) + 1
    
    def record_error(self, topic: str, error_type: ErrorType):
        if topic not in self.error_count:
            self.error_count[topic] = {}
        error_type_str = error_type.value
        self.error_count[topic][error_type_str] = \
            self.error_count[topic].get(error_type_str, 0) + 1
    
    def get_average_latency(self, topic: str) -> float:
        if topic not in self.latency_count or self.latency_count[topic] == 0:
            return 0.0
        return self.latency_sum[topic] / self.latency_count[topic]
    
    def get_error_rate(self, topic: str) -> float:
        total_errors = sum(self.error_count.get(topic, {}).values())
        total_success = self.success_count.get(topic, 0)
        total_attempts = total_errors + total_success
        return total_errors / total_attempts if total_attempts > 0 else 0.0

23.4 Practical Python Examples

Die folgenden Beispiele zeigen production-ready Implementierungen für das E-Commerce-Szenario mit modernen Python-Patterns.

23.4.1 Order Event Publisher Service

# services/order_event_publisher.py
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Optional
import uuid
import structlog

logger = structlog.get_logger()

@dataclass
class OrderItem:
    product_id: str
    quantity: int
    price: float

@dataclass
class Address:
    street: str
    city: str
    postal_code: str
    country: str

@dataclass
class OrderPlacedEvent:
    event_id: str
    event_type: str
    timestamp: str
    version: str
    order_id: str
    customer_id: str
    items: List[OrderItem]
    total_amount: float
    currency: str
    shipping_address: Address
    
    @classmethod
    def from_order(cls, order: 'Order') -> 'OrderPlacedEvent':
        return cls(
            event_id=str(uuid.uuid4()),
            event_type="OrderPlaced",
            timestamp=datetime.utcnow().isoformat(),
            version="v1",
            order_id=order.order_id,
            customer_id=order.customer_id,
            items=[OrderItem(
                product_id=item.product_id,
                quantity=item.quantity,
                price=item.price
            ) for item in order.items],
            total_amount=order.total_amount,
            currency=order.currency,
            shipping_address=Address(
                street=order.shipping_address.street,
                city=order.shipping_address.city,
                postal_code=order.shipping_address.postal_code,
                country=order.shipping_address.country
            )
        )

@dataclass
class OrderCancelledEvent:
    event_id: str
    event_type: str
    timestamp: str
    version: str
    order_id: str
    customer_id: str
    reason: str
    original_amount: float
    
    @classmethod
    def from_order(cls, order: 'Order', reason: str) -> 'OrderCancelledEvent':
        return cls(
            event_id=str(uuid.uuid4()),
            event_type="OrderCancelled",
            timestamp=datetime.utcnow().isoformat(),
            version="v1",
            order_id=order.order_id,
            customer_id=order.customer_id,
            reason=reason,
            original_amount=order.total_amount
        )

class OrderEventPublisher:
    def __init__(self, producer: RobustEventProducer, topic_config: TopicConfig):
        self.producer = producer
        self.topic_config = topic_config
        self.logger = logger.bind(component="OrderEventPublisher")
    
    async def publish_order_placed(self, order: 'Order') -> dict:
        """Publish OrderPlaced event with full error handling"""
        event = OrderPlacedEvent.from_order(order)
        
        self.logger.info(
            "Publishing OrderPlaced event",
            order_id=order.order_id,
            customer_id=order.customer_id,
            event_id=event.event_id
        )
        
        try:
            result = await self.producer.send_with_retry(
                topic=self.topic_config.order_placed,
                event=asdict(event),
                key=order.order_id
            )
            
            self.logger.info(
                "OrderPlaced event published successfully",
                order_id=order.order_id,
                event_id=event.event_id,
                offset=result.get('offset')
            )
            
            return result
            
        except EventPublishingError as e:
            self.logger.error(
                "Failed to publish OrderPlaced event",
                order_id=order.order_id,
                event_id=event.event_id,
                error=str(e)
            )
            raise
    
    async def publish_order_cancelled(self, order: 'Order', reason: str) -> dict:
        """Publish OrderCancelled event"""
        event = OrderCancelledEvent.from_order(order, reason)
        
        self.logger.info(
            "Publishing OrderCancelled event",
            order_id=order.order_id,
            reason=reason,
            event_id=event.event_id
        )
        
        try:
            result = await self.producer.send_with_retry(
                topic=self.topic_config.order_cancelled,
                event=asdict(event),
                key=order.order_id
            )
            
            self.logger.info(
                "OrderCancelled event published successfully",
                order_id=order.order_id,
                event_id=event.event_id
            )
            
            return result
            
        except EventPublishingError as e:
            self.logger.error(
                "Failed to publish OrderCancelled event",
                order_id=order.order_id,
                error=str(e)
            )
            raise
    
    async def publish_batch_events(self, events: List[tuple['Order', str]]) -> List[dict]:
        """Publish multiple order events in batch"""
        batch_events = []
        
        for order, event_type in events:
            if event_type == "OrderPlaced":
                event = OrderPlacedEvent.from_order(order)
                topic = self.topic_config.order_placed
            elif event_type == "OrderCancelled":
                event = OrderCancelledEvent.from_order(order, "batch_cancellation")
                topic = self.topic_config.order_cancelled
            else:
                continue
            
            batch_events.append((topic, asdict(event), order.order_id))
        
        self.logger.info("Publishing batch events", batch_size=len(batch_events))
        
        return await self.producer.producer.send_batch(batch_events)

23.4.2 Transactional Order Service

# services/order_service.py
import asyncio
from contextlib import asynccontextmanager
from typing import Optional
import structlog

logger = structlog.get_logger()

class TransactionalOrderService:
    def __init__(self, order_repository: 'OrderRepository', 
                 event_publisher: OrderEventPublisher,
                 db_session_factory):
        self.order_repository = order_repository
        self.event_publisher = event_publisher
        self.db_session_factory = db_session_factory
        self.logger = logger.bind(component="TransactionalOrderService")
    
    @asynccontextmanager
    async def database_transaction(self):
        """Database transaction context manager"""
        session = await self.db_session_factory.create_session()
        try:
            await session.begin()
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()
    
    async def create_order_with_events(self, request: 'CreateOrderRequest') -> 'Order':
        """Create order with transactional event publishing"""
        self.logger.info("Creating order with events", customer_id=request.customer_id)
        
        # Use database transaction
        async with self.database_transaction() as session:
            try:
                # 1. Create and save order
                order = Order.from_request(request)
                await self.order_repository.save(order, session)
                
                # 2. Publish events (outside DB transaction for reliability)
                await self.event_publisher.publish_order_placed(order)
                
                # 3. Publish additional events if needed
                if order.requires_inventory_check():
                    await self._publish_inventory_reservation_request(order)
                
                if order.requires_payment_processing():
                    await self._publish_payment_request(order)
                
                self.logger.info(
                    "Order created successfully with events",
                    order_id=order.order_id,
                    total_amount=order.total_amount
                )
                
                return order
                
            except EventPublishingError as e:
                self.logger.error(
                    "Event publishing failed, order creation aborted",
                    error=str(e)
                )
                # Re-raise to trigger transaction rollback
                raise
            except Exception as e:
                self.logger.error(
                    "Order creation failed",
                    error=str(e)
                )
                raise
    
    async def cancel_order_with_compensation(self, order_id: str, reason: str) -> bool:
        """Cancel order with compensation events"""
        self.logger.info("Cancelling order", order_id=order_id, reason=reason)
        
        async with self.database_transaction() as session:
            try:
                # 1. Load and validate order
                order = await self.order_repository.find_by_id(order_id, session)
                if not order:
                    raise ValueError(f"Order not found: {order_id}")
                
                if not order.is_cancellable():
                    raise ValueError(f"Order cannot be cancelled: {order_id}")
                
                # 2. Update order status
                order.cancel(reason)
                await self.order_repository.save(order, session)
                
                # 3. Publish cancellation event
                await self.event_publisher.publish_order_cancelled(order, reason)
                
                # 4. Publish compensation events
                await self._publish_compensation_events(order)
                
                self.logger.info("Order cancelled successfully", order_id=order_id)
                return True
                
            except Exception as e:
                self.logger.error("Order cancellation failed", 
                                order_id=order_id, error=str(e))
                raise
    
    async def _publish_inventory_reservation_request(self, order: 'Order'):
        """Publish inventory reservation request event"""
        # Implementation for inventory event publishing
        pass
    
    async def _publish_payment_request(self, order: 'Order'):
        """Publish payment request event"""
        # Implementation for payment event publishing
        pass
    
    async def _publish_compensation_events(self, order: 'Order'):
        """Publish compensation events for cancelled order"""
        # Implementation for compensation events
        pass

23.4.3 Production-Ready Event Service with Monitoring

# services/production_event_service.py
import asyncio
import time
from typing import Dict, Any, Optional
from prometheus_client import Counter, Histogram, Gauge
import structlog

# Prometheus metrics
EVENT_COUNTER = Counter('events_published_total', 
                       'Total events published', ['topic', 'status'])
EVENT_LATENCY = Histogram('event_publishing_duration_seconds',
                         'Event publishing latency', ['topic'])
ACTIVE_CONNECTIONS = Gauge('kafka_active_connections',
                          'Active Kafka connections')

logger = structlog.get_logger()

class ProductionEventService:
    def __init__(self, producer_pool: AsyncProducerPool, 
                 topic_config: TopicConfig,
                 circuit_breaker: Optional['CircuitBreaker'] = None):
        self.producer_pool = producer_pool
        self.topic_config = topic_config
        self.circuit_breaker = circuit_breaker
        self.logger = logger.bind(component="ProductionEventService")
    
    async def publish_order_event(self, event_type: str, order: 'Order', 
                                 **kwargs) -> dict:
        """Publish order event with full production monitoring"""
        start_time = time.time()
        topic = self._get_topic_for_event_type(event_type)
        
        # Circuit breaker check
        if self.circuit_breaker and not self.circuit_breaker.can_execute():
            EVENT_COUNTER.labels(topic=topic, status='circuit_open').inc()
            raise EventPublishingError("Circuit breaker is open", 
                                     ErrorType.KAFKA_ERROR)
        
        try:
            async with self.producer_pool.get_producer() as producer:
                ACTIVE_CONNECTIONS.inc()
                
                try:
                    # Create event based on type
                    event_data = self._create_event_data(event_type, order, **kwargs)
                    
                    # Publish with monitoring
                    with EVENT_LATENCY.labels(topic=topic).time():
                        result = await producer.send_event(
                            topic=topic,
                            event=event_data,
                            key=order.order_id
                        )
                    
                    # Record success
                    EVENT_COUNTER.labels(topic=topic, status='success').inc()
                    
                    if self.circuit_breaker:
                        self.circuit_breaker.record_success()
                    
                    duration = time.time() - start_time
                    self.logger.info(
                        "Event published successfully",
                        event_type=event_type,
                        order_id=order.order_id,
                        topic=topic,
                        duration=duration,
                        offset=result.get('offset')
                    )
                    
                    return result
                    
                finally:
                    ACTIVE_CONNECTIONS.dec()
                    
        except Exception as e:
            EVENT_COUNTER.labels(topic=topic, status='error').inc()
            
            if self.circuit_breaker:
                self.circuit_breaker.record_failure()
            
            self.logger.error(
                "Event publishing failed",
                event_type=event_type,
                order_id=order.order_id,
                topic=topic,
                error=str(e)
            )
            raise
    
    def _get_topic_for_event_type(self, event_type: str) -> str:
        """Map event type to topic"""
        topic_mapping = {
            'OrderPlaced': self.topic_config.order_placed,
            'OrderCancelled': self.topic_config.order_cancelled,
            'PaymentProcessed': self.topic_config.payment_processed,
            'InventoryReserved': self.topic_config.inventory_reserved,
            'ShippingRequested': self.topic_config.shipping_requested,
        }
        
        if event_type not in topic_mapping:
            raise ValueError(f"Unknown event type: {event_type}")
        
        return topic_mapping[event_type]
    
    def _create_event_data(self, event_type: str, order: 'Order', **kwargs) -> dict:
        """Create event data based on event type"""
        if event_type == 'OrderPlaced':
            return asdict(OrderPlacedEvent.from_order(order))
        elif event_type == 'OrderCancelled':
            reason = kwargs.get('reason', 'Unknown')
            return asdict(OrderCancelledEvent.from_order(order, reason))
        else:
            raise ValueError(f"Unsupported event type for data creation: {event_type}")
    
    async def health_check(self) -> Dict[str, Any]:
        """Health check for the event service"""
        try:
            async with self.producer_pool.get_producer() as producer:
                # Try to send a test event
                test_result = await asyncio.wait_for(
                    producer.send_event_fire_and_forget(
                        "health-check", {"timestamp": time.time()}, "health"
                    ),
                    timeout=5.0
                )
                
                return {
                    "status": "healthy",
                    "producer_pool_size": self.producer_pool.pool_size,
                    "circuit_breaker_state": self.circuit_breaker.state.name if self.circuit_breaker else "disabled"
                }
                
        except Exception as e:
            return {
                "status": "unhealthy",
                "error": str(e)
            }

Diese Python-Implementierungen zeigen moderne, production-ready Patterns für Event-Driven Architecture. Sie nutzen AsyncIO für hohe Performance, umfassende Fehlerbehandlung für Robustheit und Monitoring für Observability. Die durchgängige Verwendung von Type Hints und strukturiertem Logging verbessert Wartbarkeit und Debugging.


Status-Tracking: ✅ Kapitel “Event-Erzeugung mit Python” erstellt - Verwendet Standards v1.0 - [Datum] - Fokus auf AsyncIO, Error Handling und Production-Ready Patterns