pythonadvanced

PySpark Structured Streaming from Kafka

Consume a Kafka topic in real-time with PySpark Structured Streaming and write to Parquet.

python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType

spark = SparkSession.builder.appName('streaming').getOrCreate()
schema = StructType().add('user_id', IntegerType()).add('action', StringType())

df = (
    spark.readStream
         .format('kafka')
         .option('kafka.bootstrap.servers', 'localhost:9092')
         .option('subscribe', 'events')
         .load()
         .select(F.from_json(F.col('value').cast('string'), schema).alias('data'))
         .select('data.*')
)

query = (
    df.writeStream
      .format('parquet')
      .option('path', '/lake/events')
      .option('checkpointLocation', '/checkpoints/events')
      .trigger(processingTime='30 seconds')
      .start()
)
query.awaitTermination()

Use Cases

  • real-time ETL
  • event streaming
  • lakehouse ingestion

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.