kotlinadvanced

Channels — Producer-Consumer with Coroutines

Communicate between coroutines with Channels: produce, actor pattern, fan-out/fan-in, and select.

kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // Basic channel
    println("--- Basic Channel ---")
    val channel = Channel<Int>()
    launch {
        for (i in 1..5) {
            channel.send(i)
            println("Sent: $i")
        }
        channel.close()
    }
    for (value in channel) {
        println("Received: $value")
    }

    // Buffered channel
    println("\n--- Buffered Channel ---")
    val buffered = Channel<Int>(capacity = 3)
    launch {
        repeat(5) {
            buffered.send(it)
            println("Buffered sent: $it")
        }
        buffered.close()
    }
    delay(500) // let buffer fill
    for (v in buffered) println("Buffered received: $v")

    // produce — convenience builder
    println("\n--- Produce ---")
    val numbers = produce {
        var n = 1
        while (true) {
            send(n++)
            delay(100)
        }
    }
    repeat(5) { println("Produced: ${numbers.receive()}") }
    numbers.cancel()

    // Fan-out — multiple consumers
    println("\n--- Fan-Out ---")
    val tasks = produce {
        repeat(10) { send("task-$it") }
    }
    repeat(3) { workerId ->
        launch {
            for (task in tasks) {
                println("Worker-$workerId processes $task")
                delay(50)
            }
        }
    }
    delay(500)

    // Fan-in — multiple producers, single consumer
    println("\n--- Fan-In ---")
    val merged = Channel<String>()
    fun CoroutineScope.producer(name: String, items: List<String>) = launch {
        items.forEach {
            merged.send("$name: $it")
            delay(50)
        }
    }
    producer("A", listOf("x", "y", "z"))
    producer("B", listOf("1", "2", "3"))
    launch {
        delay(400)
        merged.close()
    }
    for (msg in merged) println("Merged: $msg")

    // Pipeline
    println("\n--- Pipeline ---")
    val source = produce { repeat(10) { send(it) } }
    val doubled = produce { for (x in source) send(x * 2) }
    val filtered = produce { for (x in doubled) if (x > 10) send(x) }

    for (v in filtered) println("Pipeline: $v")

    println("\nDone")
}

Use Cases

  • Producer-consumer patterns in coroutines
  • Work distribution across multiple workers
  • Data processing pipelines

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.