pythonintermediate

Kafka Producer & Consumer in Python

Produce and consume JSON messages from Apache Kafka using the confluent-kafka Python client.

python
from confluent_kafka import Producer, Consumer, KafkaError
import json

BROKERS = 'localhost:9092'
TOPIC   = 'user-events'

def produce_events(events: list[dict]) -> None:
    p = Producer({'bootstrap.servers': BROKERS})
    for event in events:
        p.produce(TOPIC, json.dumps(event).encode())
    p.flush()

def consume_one() -> dict:
    c = Consumer({'bootstrap.servers': BROKERS, 'group.id': 'my-group', 'auto.offset.reset': 'earliest'})
    c.subscribe([TOPIC])
    msg = c.poll(5.0)
    c.close()
    return json.loads(msg.value()) if msg and not msg.error() else {}

produce_events([{'user': 1, 'action': 'click'}])
print(consume_one())

Use Cases

  • event streaming
  • real-time ingestion
  • data pipelines

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.