Chapter 23 : UseCase20 - Buffering for Performance
There are real-world cases where producer is faster than consumer.
By default, Flow is sequential.
That means:
Upstream emits
Downstream processes
Only then next value emits
This is slow but safe.
But differing speeds of producers and consumers where the consumers are slower, will slow down the system. To change that we use buffer()
What buffer Does
It introduces a channel between upstream and downstream.
So instead of: Emit → Process → Emit → Process
We get:
Emit → Emit → Emit (queued)
↓
Process → Process → Process
Producer and Consumer can run concurrently.
CODE IMPLEMENTATION
package org.kotlinflowlearner.stockflow.usecases.uc20
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import org.kotlinflowlearner.stockflow.model.Stock
class UC20Buffering {
fun execute(stocks: List<Stock>): Flow<String> {
return stocks.asFlow().onEach {
println("Emitting ${it.name}")
delay(100) // fast producer
}.buffer().map {
stock ->
println("Processing ${stock.name}")
delay(500) // slow consumer
"Processed ${stock.name}"
}
}
}
package org.kotlinflowlearner.stockflow.usecases.uc20
import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path
fun main(){
runBlocking {
val useCase = UC20Buffering()
val resource = requireNotNull(
object {}.javaClass.classLoader.getResource("stocks.csv")
)
val path = Path.of(resource.toURI())
val stocks = CsvStockLoader.load(path)
useCase.execute(stocks).collect{
result -> println("Collected : $result")
}
}
}
What Happens Without buffer
Timeline:
Emit(100ms) → Process(500ms)
Emit(100ms) → Process(500ms)
Total per item approximately equal to 600ms.
Everything is sequential.
What Happens With buffer
Now, Producer runs independently.
It emits every 100ms. Consumer processes every 500ms.
Producer does not wait. The channel queues values.Throughput increases.