kotlinadvanced

Spring WebFlux with Kotlin Coroutines

Build reactive APIs with Spring WebFlux and Kotlin coroutines: suspend handlers and Flow responses.

kotlin
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.stereotype.Repository
import org.springframework.stereotype.Service
import org.springframework.web.bind.annotation.*
import org.springframework.web.server.ResponseStatusException
import java.time.LocalDateTime
import java.util.concurrent.ConcurrentHashMap
import java.util.UUID

// Domain
data class Article(
    val id: String = UUID.randomUUID().toString(),
    val title: String,
    val content: String,
    val author: String,
    val createdAt: LocalDateTime = LocalDateTime.now()
)

data class CreateArticleRequest(val title: String, val content: String, val author: String)

// Repository
@Repository
class ArticleRepository {
    private val articles = ConcurrentHashMap<String, Article>()

    suspend fun findAll(): List<Article> {
        delay(50) // simulate I/O
        return articles.values.toList()
    }

    fun findAllAsFlow(): Flow<Article> = flow {
        articles.values.forEach {
            delay(10) // simulate streaming
            emit(it)
        }
    }

    suspend fun findById(id: String): Article? {
        delay(20)
        return articles[id]
    }

    suspend fun save(article: Article): Article {
        delay(30)
        articles[article.id] = article
        return article
    }

    suspend fun deleteById(id: String): Boolean {
        delay(20)
        return articles.remove(id) != null
    }

    suspend fun count(): Int = articles.size
}

// Service
@Service
class ArticleService(private val repo: ArticleRepository) {
    suspend fun getAll(): List<Article> = repo.findAll()

    fun streamAll(): Flow<Article> = repo.findAllAsFlow()

    suspend fun getById(id: String): Article =
        repo.findById(id) ?: throw ResponseStatusException(HttpStatus.NOT_FOUND, "Article not found")

    suspend fun create(request: CreateArticleRequest): Article {
        require(request.title.isNotBlank()) { "Title required" }
        require(request.content.isNotBlank()) { "Content required" }
        val article = Article(
            title = request.title.trim(),
            content = request.content.trim(),
            author = request.author.trim()
        )
        return repo.save(article)
    }

    suspend fun delete(id: String) {
        if (!repo.deleteById(id)) {
            throw ResponseStatusException(HttpStatus.NOT_FOUND)
        }
    }

    fun searchStream(query: String): Flow<Article> =
        repo.findAllAsFlow().filter {
            it.title.contains(query, ignoreCase = true) ||
            it.content.contains(query, ignoreCase = true)
        }
}

// Controller — suspend functions for coroutine support
@RestController
@RequestMapping("/api/articles")
class ArticleController(private val service: ArticleService) {

    @GetMapping
    suspend fun getAll(): List<Article> = service.getAll()

    // Server-Sent Events streaming
    @GetMapping("/stream", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun streamAll(): Flow<Article> = service.streamAll()

    @GetMapping("/{id}")
    suspend fun getById(@PathVariable id: String): Article = service.getById(id)

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    suspend fun create(@RequestBody request: CreateArticleRequest): Article =
        service.create(request)

    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    suspend fun delete(@PathVariable id: String) = service.delete(id)

    @GetMapping("/search", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun search(@RequestParam q: String): Flow<Article> = service.searchStream(q)
}

// Exception handler
@RestControllerAdvice
class ErrorHandler {
    @ExceptionHandler(IllegalArgumentException::class)
    @ResponseStatus(HttpStatus.BAD_REQUEST)
    fun handleValidation(e: IllegalArgumentException) =
        mapOf("error" to (e.message ?: "Validation error"))
}

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args)
}

Sponsored

Deploy on Railway

Use Cases

  • Non-blocking REST APIs with coroutines
  • Server-Sent Events streaming
  • Reactive data pipelines

Tags

Related Snippets

Similar patterns you can reuse in the same workflow.