Coroutine Channels Producer Consumer
Use Kotlin channels for coroutine communication: buffered, conflated, fan-out, and fan-in patterns.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// Basic channel
val channel = Channel<Int>()
launch {
for (i in 1..5) {
println("Sending $i")
channel.send(i)
}
channel.close()
}
for (value in channel) println("Received $value")
// Buffered channel
val buffered = Channel<String>(capacity = 10)
launch {
repeat(10) { buffered.send("Message-$it") }
buffered.close()
}
delay(100)
for (msg in buffered) print("$msg ")
println()
// Fan-out: multiple consumers
val tasks = Channel<Int>(Channel.BUFFERED)
launch { repeat(20) { tasks.send(it); delay(10) }; tasks.close() }
val workers = (1..3).map { workerId ->
launch {
for (task in tasks) {
println("Worker-$workerId processing task $task")
delay(50)
}
}
}
workers.forEach { it.join() }
// Fan-in: multiple producers
suspend fun produce(id: String, ch: SendChannel<String>) {
repeat(3) {
delay((10..50).random().toLong())
ch.send("$id-item-$it")
}
}
val merged = Channel<String>(Channel.BUFFERED)
val producers = listOf("A", "B", "C").map { launch { produce(it, merged) } }
launch { producers.forEach { it.join() }; merged.close() }
for (item in merged) println("Merged: $item")
// Conflated channel: keeps only latest
val sensor = Channel<Int>(Channel.CONFLATED)
launch {
repeat(100) { sensor.send(it); delay(1) }
sensor.close()
}
delay(50)
for (reading in sensor) println("Sensor: $reading")
println("Done")
}Use Cases
- Coroutine-based producer-consumer patterns
- Work distribution across multiple consumers
- Sensor data processing with backpressure
Tags
Related Snippets
Similar patterns you can reuse in the same workflow.
Channels — Producer-Consumer with Coroutines
Communicate between coroutines with Channels: produce, actor pattern, fan-out/fan-in, and select.
Best for: Producer-consumer patterns in coroutines
Coroutines — launch, async, and Structured Concurrency
Write concurrent code with Kotlin coroutines: launch, async/await, structured concurrency, and dispatchers.
Best for: Parallel API calls in backend services
Structured Concurrency Patterns
Master coroutine structured concurrency: coroutineScope, async/await, fan-out/fan-in, and parallel map.
Best for: Parallel I/O operations with concurrency limits
Coroutine Mutex and Synchronization
Synchronize shared mutable state in coroutines with Mutex, atomic operations, and thread confinement.
Best for: Thread-safe shared state in coroutines