scalaadvanced

Spark Structured Streaming

Process real-time data with Spark Structured Streaming: sources, transformations, and sinks.

scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.*
import org.apache.spark.sql.types.*
import org.apache.spark.sql.streaming.Trigger
import java.sql.Timestamp

object StructuredStreamingApp:
  def main(args: Array[String]): Unit =
    val spark = SparkSession.builder()
      .appName("StructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits.*

    // Define schema for JSON events
    val eventSchema = StructType(Seq(
      StructField("userId", StringType),
      StructField("action", StringType),
      StructField("timestamp", TimestampType),
      StructField("amount", DoubleType)
    ))

    // Read from socket (for demo)
    val socketStream = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // Read from Kafka
    val kafkaStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "events")
      .option("startingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS STRING) as json")
      .select(from_json(col("json"), eventSchema).as("event"))
      .select("event.*")

    // Read from files
    val fileStream = spark.readStream
      .schema(eventSchema)
      .json("data/events/")

    // Transformations
    val processed = kafkaStream
      .filter(col("action") === "purchase")
      .withColumn("hour", hour(col("timestamp")))
      .withColumn("date", to_date(col("timestamp")))

    // Windowed aggregation
    val windowedCounts = kafkaStream
      .withWatermark("timestamp", "10 minutes")
      .groupBy(
        window(col("timestamp"), "5 minutes", "1 minute"),
        col("action")
      )
      .agg(
        count("*").as("count"),
        sum("amount").as("total_amount"),
        avg("amount").as("avg_amount")
      )

    // Top users by spending
    val topUsers = kafkaStream
      .filter(col("action") === "purchase")
      .withWatermark("timestamp", "1 hour")
      .groupBy(
        window(col("timestamp"), "1 hour"),
        col("userId")
      )
      .agg(sum("amount").as("total_spent"))
      .orderBy(col("total_spent").desc)

    // Write to console
    val consoleQuery = windowedCounts.writeStream
      .outputMode("update")
      .format("console")
      .option("truncate", false)
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    // Write to Kafka
    val kafkaSink = processed.selectExpr(
      "userId as key",
      "to_json(struct(*)) as value"
    ).writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "processed-events")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()

    // Write to Parquet files
    val fileSink = processed.writeStream
      .outputMode("append")
      .format("parquet")
      .partitionBy("date")
      .option("path", "output/events")
      .option("checkpointLocation", "/tmp/checkpoint-parquet")
      .trigger(Trigger.ProcessingTime("1 minute"))
      .start()

    consoleQuery.awaitTermination()

Sponsored

Databricks

Use Cases

  • Real-time event processing
  • Streaming aggregation dashboards
  • ETL from Kafka to data lake

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.