Skip to main content

Command Palette

Search for a command to run...

Chapter 23 : UseCase20 - Buffering for Performance

Published
2 min read

There are real-world cases where producer is faster than consumer.

By default, Flow is sequential.

That means:

  • Upstream emits

  • Downstream processes

  • Only then next value emits

This is slow but safe.

But differing speeds of producers and consumers where the consumers are slower, will slow down the system. To change that we use buffer()

What buffer Does

It introduces a channel between upstream and downstream.

So instead of: Emit → Process → Emit → Process

We get:

Emit → Emit → Emit (queued)

Process → Process → Process

Producer and Consumer can run concurrently.

CODE IMPLEMENTATION

package org.kotlinflowlearner.stockflow.usecases.uc20

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import org.kotlinflowlearner.stockflow.model.Stock

class UC20Buffering {

    fun execute(stocks: List<Stock>): Flow<String> {

        return stocks.asFlow().onEach {
            println("Emitting ${it.name}")
            delay(100) // fast producer
        }.buffer().map {
                stock ->
            println("Processing ${stock.name}")
            delay(500) // slow consumer
            "Processed ${stock.name}"
        }
    }
    }
package org.kotlinflowlearner.stockflow.usecases.uc20

import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path

fun main(){
    runBlocking {
        val useCase = UC20Buffering()

        val resource = requireNotNull(
            object {}.javaClass.classLoader.getResource("stocks.csv")
        )

        val path = Path.of(resource.toURI())

        val stocks = CsvStockLoader.load(path)

        useCase.execute(stocks).collect{
                result -> println("Collected : $result")
        }
    }
}

What Happens Without buffer

Timeline:

Emit(100ms) → Process(500ms)
Emit(100ms) → Process(500ms)

Total per item approximately equal to 600ms.

Everything is sequential.

What Happens With buffer

Now, Producer runs independently.

It emits every 100ms. Consumer processes every 500ms.

Producer does not wait. The channel queues values.Throughput increases.