pythonadvanced
Kafka Consumer in Python — Stream Processing
Build a Kafka consumer in Python with offset management, error handling, and batch processing.
pythonPress ⌘/Ctrl + Shift + C to copy
from kafka import KafkaConsumer, TopicPartition
import json
import signal
import sys
class GracefulConsumer:
def __init__(self, topic: str, group_id: str, brokers: str):
self.running = True
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=brokers,
group_id=group_id,
auto_offset_reset="earliest",
enable_auto_commit=False,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
max_poll_records=100,
session_timeout_ms=30000,
)
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def _shutdown(self, *_):
print("Shutting down gracefully...")
self.running = False
def process_batch(self, messages: list[dict]) -> None:
"""Override this with your processing logic."""
for msg in messages:
print(f" key={msg['key']} value={msg['value']}")
def run(self):
print(f"Consumer started, listening...")
try:
while self.running:
records = self.consumer.poll(timeout_ms=1000)
if not records:
continue
batch = []
for tp, messages in records.items():
for msg in messages:
batch.append({
"key": msg.key,
"value": msg.value,
"partition": msg.partition,
"offset": msg.offset,
})
if batch:
self.process_batch(batch)
self.consumer.commit()
print(f"Committed {len(batch)} messages")
finally:
self.consumer.close()
print("Consumer closed")
if __name__ == "__main__":
consumer = GracefulConsumer(
topic="events",
group_id="etl-pipeline",
brokers="localhost:9092",
)
consumer.run()Sponsored
Confluent
Use Cases
- Real-time event processing from Kafka topics
- Building streaming ETL consumers
- Graceful shutdown handling for long-running consumers
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
bashbeginner
Kafka Topic — Create and Manage with CLI
Create, describe, alter, and manage Kafka topics using the kafka-topics CLI with partitioning config.
Best for: Setting up Kafka topics for new data streams
#kafka#bash
pythonintermediate
Kafka Producer & Consumer in Python
Produce and consume JSON messages from Apache Kafka using the confluent-kafka Python client.
Best for: event streaming
#kafka#streaming
pythonadvanced
Python ETL Pipeline Example
Complete extract-transform-load pipeline with error handling, logging, and incremental processing.
Best for: Automating data ingestion from CSV to warehouse
#etl#pipeline
pythonintermediate
Python Streaming Data Processing
Process streaming data with generators, windowed aggregation, and memory-efficient line-by-line reading.
Best for: Processing large event log files efficiently
#streaming#python