Chapter 9 : UseCase06 - flowOn and Threading Boundaries
So far in all the five use cases discussed, all our flows executed on the same thread.
But real applications perform:
File reading
Network calls
Database operations
These must not block the main thread.
In this chapter, we learn how flow controls execution context.
What We Are Learning
We will understand in this use case:
Where upstream code runs
Where downstream code runs
How flowOn creates a boundary
Why Flow preserves context
Important Rule
Flow has two sides:
Upstream → emission side
Downstream → collection side
flowOn changes the context of upstream only. Downstream remains in the collector’s context.
This separation is important.
Let us work through the code example to make things clear.
package org.kotlinflowlearner.stockflow.usecases.uc06
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
/**
* UC06FlowOn demonstrates how flowOn
* changes the execution context of upstream code.
*/
class UC06FlowOn {
/**
* Emits values while printing thread names.
*/
fun execute(): Flow<String> = flow {
repeat(3){
println("Upstream running on : ${Thread.currentThread().name}")
delay(500)
emit("Value $it")
}
}.flowOn(Dispatchers.Default)
}
//flowOn(Dispatchers.Default) --> This means -->
// Run this upstream work on background worker threads instead of the main thread.
package org.kotlinflowlearner.stockflow.usecases.uc06
import kotlinx.coroutines.runBlocking
fun main(){
runBlocking {
val usecase = UC06FlowOn()
usecase.execute().collect{
value -> println("Downstream running on : ${Thread.currentThread().name}")
println("Collected : $value")
}
}
}
The output resembles something similar as shown below:
Upstream running on : DefaultDispatcher-worker-1
Upstream running on : DefaultDispatcher-worker-1
Downstream running on : main
Collected : Value 0
Upstream running on : DefaultDispatcher-worker-1
Downstream running on : main
Collected : Value 1
Downstream running on : main
Collected : Value 2
DefaultDispatcher-worker-1 is the name of the background worker thread in the above output.
This proves:
The emission side runs on Default dispatcher.
The collector runs on the main thread.
flowOn creates a context boundary.
Why This Matters
In real systems:
File parsing runs on IO or Default
UI collection runs on Main
Heavy computation runs on Default
Flow allows separation of concerns. Thus you can control execution without breaking structured concurrency.
Important Learning
flowOn does not move the entire flow.
It moves only the upstream part before it.
Example:
flow { } → map { } → flowOn(Default) → filter { } → collect
Everything before flowOn runs on Default.
Everything after runs in the collector’s context.
This is one of the most vital concepts of Flow.
Summary
We now learnt:
Execution context separation in Flow
Upstream vs downstream
flowOn boundaries
Context preservation
You now control where work happens.
A Note about Dispatchers.Default
DefaultDispatcher is the thread pool used by Dispatchers.Default.
It is a shared background thread pool. It is optimised for CPU-intensive work.It is managed automatically by Kotlin coroutines.
Dispatchers.Default is the coroutine scheduler for CPU-bound work.
Think of it as a shared pool of background workers. When you give it a task, it assigns that task to one of the available worker threads. The number of workers is usually close to the number of CPU cores, so your program can do multiple calculations in parallel without creating too many threads.
If your code is doing heavy thinking — like sorting, filtering, computing rankings, or parsing data — it belongs on Dispatchers.Default. This keeps the main thread free and prevents your application from becoming slow or unresponsive.
In simple terms:
Main thread is for responding.
Default dispatcher is for thinking.
When to use Dispatchers.Default
Use it for:
Heavy calculations
Parsing large CSV files
Data transformations
Sorting / ranking
CPU-bound tasks
Note:
Don’t use Dispatchers.Default for Blocking IO (files, network, database). For these operations, use Dispatchers.IO
What actually is DefaultDispatcher?
Internally, it is:
A shared ForkJoinPool-like thread pool
Size is approximately equal to the number of CPU cores
Designed for parallel computation.
We use it without creating it.
Mental Model
Main → UI thread
Default → CPU work
IO → Blocking IO work
Heavy Computation Example for better Understanding
To understand the use case better, let us first do a heavy computational task on the stocks data.
We will do a financial analytics (financial aggregation) example as follows:
Group stocks by country
Compute total market cap per country
Compute average stock price per country
Rank countries by total market cap
Extract top N countries
We have modified the class as follows:
package org.kotlinflowlearner.stockflow.usecases.uc06
data class CountryAnalytics(
val country: String,
val totalMarketCap: Long,
val averagePrice: Double,
val companyCount: Int
)
package org.kotlinflowlearner.stockflow.usecases.uc06
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import org.kotlinflowlearner.stockflow.model.Stock
/**
* UC06FlowOn demonstrates how flowOn
* changes the execution context of upstream code.
*/
class UC06FlowOn(
private val stocks : List<Stock>
) {
/**
* Emits values while printing thread names.
*/
fun execute(): Flow<String> = flow {
repeat(3){
println("Upstream running on : ${Thread.currentThread().name}")
delay(500)
emit("Value $it")
}
}.flowOn(Dispatchers.Default)
//on main thread--this will "freeze" in real scenarios
fun executeAggregation() : Flow<List<CountryAnalytics>> = flow {
println("executeAggregation - Upstream started on: ${Thread.currentThread().name}")
// Simulate heavy CPU work
val result = heavyComputation(stocks)
emit(result)
}
fun executeAggregationDispatchers() : Flow<List<CountryAnalytics>> = flow {
println("executeAggregation - Upstream started on: ${Thread.currentThread().name}")
// Simulate heavy CPU work
val result = heavyComputation(stocks)
emit(result)
}.flowOn(Dispatchers.Default)
/**
We will do a financial analytics example as follows:
Group stocks by country
Compute total market cap per country
Compute average stock price per country
Rank countries by total market cap
Extract top N countries
*/
private fun heavyComputation(data:List<Stock>) : List<CountryAnalytics> {
return data
.groupBy { it.country } //returns a Map<String, List<Stock>> for "Group stocks by country"
.map {
entry -> val country = entry.key
val stocksInCountry = entry.value
//Compute total market cap per country
val totalMarketCap = stocksInCountry.sumOf { it.marketCap }
//Compute average stock price per country
val averagePrice = stocksInCountry.map{it.priceUsd}.average()
//total companies per country
val companyCount = stocksInCountry.size
CountryAnalytics(country,totalMarketCap,averagePrice,companyCount)
} //Map.Entry<String, List<Stock>> ----> entry.key,entry.value
.sortedByDescending { it.totalMarketCap } // Rank countries by total market cap
.take(10) //Extract top N countries
}
}
heavyComputation() function execution has three different variants functions - execute(), executeAggregation() and executeAggregationDispatchers().
We will run the three functions using the Main.kt function
package org.kotlinflowlearner.stockflow.usecases.uc06
import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path
fun main(){
runBlocking {
val resource = requireNotNull(
object {}.javaClass.classLoader.getResource("stocks.csv")
)
val path = Path.of(resource.toURI())
val stocks = CsvStockLoader.load(path)
val usecase = UC06FlowOn(stocks)
usecase.execute().collect{
value -> println("Downstream running on : ${Thread.currentThread().name}")
println("Collected : $value")
}
usecase.executeAggregation()
.collect { top ->
println("executeAggregation - Downstream on: ${Thread.currentThread().name}")
println("executeAggregation - Top stock: ${top.first()}")
}
usecase.executeAggregationDispatchers()
.collect { top ->
println("executeAggregationDispatchers - Downstream on: ${Thread.currentThread().name}")
println("executeAggregationDispatchers - Top stock: ${top.first()}")
}
}
}
Run the above code snippet and watch the console output. executeAggregationDispatchers has the flowOn() function that is the focal point of our discussion in this chapter.
executeAggregation - Upstream started on: DefaultDispatcher-worker-1
executeAggregationDispatchers - Downstream on: main
executeAggregationDispatchers - Top stock: CountryAnalytics(country=United States, totalMarketCap=3149600000000, averagePrice=208.26749999999998, companyCount=4)
Over to Chapter 10 now !