pythonadvanced

Kafka Consumer in Python — Stream Processing

Build a Kafka consumer in Python with offset management, error handling, and batch processing.

python
from kafka import KafkaConsumer, TopicPartition
import json
import signal
import sys


class GracefulConsumer:
    def __init__(self, topic: str, group_id: str, brokers: str):
        self.running = True
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=brokers,
            group_id=group_id,
            auto_offset_reset="earliest",
            enable_auto_commit=False,
            value_deserializer=lambda v: json.loads(v.decode("utf-8")),
            max_poll_records=100,
            session_timeout_ms=30000,
        )
        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)

    def _shutdown(self, *_):
        print("Shutting down gracefully...")
        self.running = False

    def process_batch(self, messages: list[dict]) -> None:
        """Override this with your processing logic."""
        for msg in messages:
            print(f"  key={msg['key']} value={msg['value']}")

    def run(self):
        print(f"Consumer started, listening...")
        try:
            while self.running:
                records = self.consumer.poll(timeout_ms=1000)
                if not records:
                    continue

                batch = []
                for tp, messages in records.items():
                    for msg in messages:
                        batch.append({
                            "key": msg.key,
                            "value": msg.value,
                            "partition": msg.partition,
                            "offset": msg.offset,
                        })

                if batch:
                    self.process_batch(batch)
                    self.consumer.commit()
                    print(f"Committed {len(batch)} messages")
        finally:
            self.consumer.close()
            print("Consumer closed")


if __name__ == "__main__":
    consumer = GracefulConsumer(
        topic="events",
        group_id="etl-pipeline",
        brokers="localhost:9092",
    )
    consumer.run()

Sponsored

Confluent

Use Cases

  • Real-time event processing from Kafka topics
  • Building streaming ETL consumers
  • Graceful shutdown handling for long-running consumers

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.