scalaadvanced
Spark Structured Streaming
Process real-time data with Spark Structured Streaming: sources, transformations, and sinks.
scalaPress ⌘/Ctrl + Shift + C to copy
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.*
import org.apache.spark.sql.types.*
import org.apache.spark.sql.streaming.Trigger
import java.sql.Timestamp
object StructuredStreamingApp:
def main(args: Array[String]): Unit =
val spark = SparkSession.builder()
.appName("StructuredStreaming")
.master("local[*]")
.getOrCreate()
import spark.implicits.*
// Define schema for JSON events
val eventSchema = StructType(Seq(
StructField("userId", StringType),
StructField("action", StringType),
StructField("timestamp", TimestampType),
StructField("amount", DoubleType)
))
// Read from socket (for demo)
val socketStream = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Read from Kafka
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json(col("json"), eventSchema).as("event"))
.select("event.*")
// Read from files
val fileStream = spark.readStream
.schema(eventSchema)
.json("data/events/")
// Transformations
val processed = kafkaStream
.filter(col("action") === "purchase")
.withColumn("hour", hour(col("timestamp")))
.withColumn("date", to_date(col("timestamp")))
// Windowed aggregation
val windowedCounts = kafkaStream
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "5 minutes", "1 minute"),
col("action")
)
.agg(
count("*").as("count"),
sum("amount").as("total_amount"),
avg("amount").as("avg_amount")
)
// Top users by spending
val topUsers = kafkaStream
.filter(col("action") === "purchase")
.withWatermark("timestamp", "1 hour")
.groupBy(
window(col("timestamp"), "1 hour"),
col("userId")
)
.agg(sum("amount").as("total_spent"))
.orderBy(col("total_spent").desc)
// Write to console
val consoleQuery = windowedCounts.writeStream
.outputMode("update")
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
// Write to Kafka
val kafkaSink = processed.selectExpr(
"userId as key",
"to_json(struct(*)) as value"
).writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "processed-events")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
// Write to Parquet files
val fileSink = processed.writeStream
.outputMode("append")
.format("parquet")
.partitionBy("date")
.option("path", "output/events")
.option("checkpointLocation", "/tmp/checkpoint-parquet")
.trigger(Trigger.ProcessingTime("1 minute"))
.start()
consoleQuery.awaitTermination()Sponsored
Databricks
Use Cases
- Real-time event processing
- Streaming aggregation dashboards
- ETL from Kafka to data lake
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
scalaintermediate
Apache Spark DataFrame Operations
Process big data with Spark DataFrames: read, filter, aggregate, join, and write operations.
Best for: Big data processing with Spark
#scala#spark
scalaadvanced
Spark RDD Transformations and Actions
Process distributed data with Spark RDDs: map, reduce, aggregate, and pair RDD operations.
Best for: Distributed data processing
#scala#spark
scalaadvanced
Streaming with fs2
Build composable streaming pipelines with fs2: chunks, transformations, concurrency, and resource safety.
Best for: Streaming data processing
#scala#fs2
typescriptintermediate
Server-Sent Events Handler
Express route that implements SSE for real-time server-to-client push notifications.
Best for: Live notifications
#sse#real-time