pythonintermediate
Kafka Producer & Consumer in Python
Produce and consume JSON messages from Apache Kafka using the confluent-kafka Python client.
pythonPress ⌘/Ctrl + Shift + C to copy
from confluent_kafka import Producer, Consumer, KafkaError
import json
BROKERS = 'localhost:9092'
TOPIC = 'user-events'
def produce_events(events: list[dict]) -> None:
p = Producer({'bootstrap.servers': BROKERS})
for event in events:
p.produce(TOPIC, json.dumps(event).encode())
p.flush()
def consume_one() -> dict:
c = Consumer({'bootstrap.servers': BROKERS, 'group.id': 'my-group', 'auto.offset.reset': 'earliest'})
c.subscribe([TOPIC])
msg = c.poll(5.0)
c.close()
return json.loads(msg.value()) if msg and not msg.error() else {}
produce_events([{'user': 1, 'action': 'click'}])
print(consume_one())Use Cases
- event streaming
- real-time ingestion
- data pipelines
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonadvanced
Kafka Consumer in Python — Stream Processing
Build a Kafka consumer in Python with offset management, error handling, and batch processing.
Best for: Real-time event processing from Kafka topics
#kafka#streaming
bashbeginner
Kafka Topic — Create and Manage with CLI
Create, describe, alter, and manage Kafka topics using the kafka-topics CLI with partitioning config.
Best for: Setting up Kafka topics for new data streams
#kafka#bash
pythonadvanced
PySpark Structured Streaming from Kafka
Consume a Kafka topic in real-time with PySpark Structured Streaming and write to Parquet.
Best for: real-time ETL
#pyspark#kafka
pythonintermediate
Python Streaming Data Processing
Process streaming data with generators, windowed aggregation, and memory-efficient line-by-line reading.
Best for: Processing large event log files efficiently
#streaming#python