pythonadvanced
PySpark Structured Streaming from Kafka
Consume a Kafka topic in real-time with PySpark Structured Streaming and write to Parquet.
pythonPress ⌘/Ctrl + Shift + C to copy
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType
spark = SparkSession.builder.appName('streaming').getOrCreate()
schema = StructType().add('user_id', IntegerType()).add('action', StringType())
df = (
spark.readStream
.format('kafka')
.option('kafka.bootstrap.servers', 'localhost:9092')
.option('subscribe', 'events')
.load()
.select(F.from_json(F.col('value').cast('string'), schema).alias('data'))
.select('data.*')
)
query = (
df.writeStream
.format('parquet')
.option('path', '/lake/events')
.option('checkpointLocation', '/checkpoints/events')
.trigger(processingTime='30 seconds')
.start()
)
query.awaitTermination()Use Cases
- real-time ETL
- event streaming
- lakehouse ingestion
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
pythonadvanced
Kafka Consumer in Python — Stream Processing
Build a Kafka consumer in Python with offset management, error handling, and batch processing.
Best for: Real-time event processing from Kafka topics
#kafka#streaming
bashbeginner
Kafka Topic — Create and Manage with CLI
Create, describe, alter, and manage Kafka topics using the kafka-topics CLI with partitioning config.
Best for: Setting up Kafka topics for new data streams
#kafka#bash
pythonintermediate
Kafka Producer & Consumer in Python
Produce and consume JSON messages from Apache Kafka using the confluent-kafka Python client.
Best for: event streaming
#kafka#streaming
pythonadvanced
Spark SQL Query Example
PySpark DataFrame operations with SQL queries, window functions, and aggregations for big data.
Best for: Processing large-scale datasets with Spark
#spark#pyspark