Kafka Topic — Create and Manage with CLI
Create, describe, alter, and manage Kafka topics using the kafka-topics CLI with partitioning config.
#!/usr/bin/env bash
# Kafka topic management commands
BOOTSTRAP="localhost:9092"
# Create a topic
kafka-topics.sh --bootstrap-server "$BOOTSTRAP" \
--create \
--topic events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=delete \
--config compression.type=zstd
# List all topics
kafka-topics.sh --bootstrap-server "$BOOTSTRAP" --list
# Describe a topic (partitions, replicas, ISR)
kafka-topics.sh --bootstrap-server "$BOOTSTRAP" \
--describe --topic events
# Alter partitions (can only increase)
kafka-topics.sh --bootstrap-server "$BOOTSTRAP" \
--alter --topic events --partitions 24
# Change retention to 30 days
kafka-configs.sh --bootstrap-server "$BOOTSTRAP" \
--alter --entity-type topics --entity-name events \
--add-config retention.ms=2592000000
# Check consumer group lag
kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP" \
--describe --group etl-pipeline
# Reset consumer offsets to earliest
kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP" \
--group etl-pipeline --topic events \
--reset-offsets --to-earliest --execute
# Produce test messages
echo '{"user":"test","event":"click"}' | \
kafka-console-producer.sh --bootstrap-server "$BOOTSTRAP" \
--topic events
# Consume from beginning
kafka-console-consumer.sh --bootstrap-server "$BOOTSTRAP" \
--topic events --from-beginning --max-messages 10Sponsored
Confluent
Use Cases
- Setting up Kafka topics for new data streams
- Debugging consumer lag issues
- Resetting consumer offsets after pipeline failures
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Kafka Consumer in Python — Stream Processing
Build a Kafka consumer in Python with offset management, error handling, and batch processing.
Best for: Real-time event processing from Kafka topics
Bash ETL Pipeline Script
Build a complete ETL script in Bash with logging, error handling, notifications, and idempotent runs.
Best for: Automating daily data extract and load jobs
Cron Data Sync — Database to S3
Automated script to export database tables to compressed CSV and sync to S3 on a schedule.
Best for: Nightly database exports to cloud storage
Spark Submit — Job Launcher Script
Launch PySpark jobs with spark-submit including cluster configuration, dependencies, and monitoring.
Best for: Launching PySpark batch jobs on YARN clusters