Chapter 5 : UseCase02 - Transformations
UC 02 — Flow Transformations: map, filter, take, onEach
In the previous chapter, we learned that a Flow is lazy and only executes when collected.
Now we take the next step.
We will transform the stream.
This chapter introduces four essential operators:
map
filter
take
onEach
These operators form the backbone of Flow pipelines.
What We Are Learning Here
A Flow is not just about emitting values.
It is about transforming values as they move downstream.
Each operator:
receives emissions
applies logic
forwards results
preserves sequential execution
Operators do not execute immediately. They remain lazy until collection.
Creating the Use Case
Create a new package: usecases.uc02
Create the class file UC02Transformations.kt
package org.kotlinflowlearner.stockflow.usecases.uc02
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import org.kotlinflowlearner.stockflow.model.Stock
class UC02Transformations(private val stocks:List<Stock>) {
/**
* Returns a Flow<String> representing stock names
* that match specific criteria.
*
* The pipeline demonstrates transformation and filtering.
*/
fun execute(): Flow<String> {
return stocks
.asFlow()
.onEach { println("Upstream emission: ${it.name}") }
.filter { it.priceUsd > 100 }
.map { stock -> "\({stock.name} (\){stock.country})" }
.take(5)
}
}
What This Pipeline Does Step by Step
Convert the List into a Flow.
Log every upstream emission.
Filter stocks with price greater than 100.
Transform each Stock into a formatted String.
Stop after 5 matching emissions.
This forms a linear pipeline.
Each operator runs sequentially.
Again we have a Main.kt for this use case to test the outcome.
package org.kotlinflowlearner.stockflow.usecases.uc02
import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path
fun main(){
runBlocking{
val resource = requireNotNull(
object {}.javaClass.classLoader.getResource("stocks.csv")
)
val path = Path.of(resource.toURI())
val stocks = CsvStockLoader.load(path)
val useCase = UC02Transformations(stocks)
useCase.execute().collect{
result -> println("Collected: $result")
}
}
}
After running the program, the logs appear like this:
Upstream emission: Aurora Systems
Collected: Aurora Systems (United States)
Upstream emission: BluePeak Energy
Upstream emission: Helios Dynamics
Collected: Helios Dynamics (Germany)
Upstream emission: Nimbus Health Group
Collected: Nimbus Health Group (Switzerland)
Upstream emission: Vertex Financial
Collected: Vertex Financial (United Kingdom)
Upstream emission: Solstice Technologies
Upstream emission: Ironclad Semiconductors
Collected: Ironclad Semiconductors (South Korea)
What You Should Observe
You will notice:
Logging from onEach
Only stocks above 100 USD pass the filter
Only 5 matching stocks are collected
Execution stops early because of take(5)
The moment 5 items are emitted, the upstream flow is cancelled.
This shows structured cancellation in action.
Sequential Execution Learning
We have chained multiple operators in the example:
asFlow → onEach → filter → map → take
Each emission flows through them in order.
For each Stock:
onEach runs
filter checks
map transforms
take counts
Then the next Stock begins.
Flow is sequential by default.
Important Concept: Operator Order Matters
If you swap operators, behaviour changes.
Example:
Placing take(5) before filter would limit the first 5 stocks,
not the first 5 that match the condition.
Flow pipelines are declarative but order-sensitive.
Important Observation
Why you see more than 5 "Upstream emission" logs, but only 5 Collected: lines in the log output.
When the above pipeline is executed this happens:
onEach runs for every upstream emission
filter removes some items
take(5) counts only items that pass the filter
So the flow continues scanning upstream items until it finds 5 that satisfy the filter.
That means:
Some items are logged by onEach
But they don’t pass the filter
So they are not counted by take(5)
Key concept - take(5) counts downstream emissions, not upstream attempts.
Summary
How to transform data inside a Flow
How filtering works
How to limit emissions
How operator order affects results
That laziness still applies
Operators are sequential
Each operator only sees what comes before it
take works after filtering
Side-effect operators like onEach observe upstream
This is a very important mental model for Flow.
ADDENDUM
We used runblocking to run the code. This is not something I forgot in chapter 4.
runBlocking is a bridge between regular (blocking) code and coroutines.
It starts a coroutine and blocks the current thread until that coroutine completes.
In simple terms:
launch → non-blocking
runBlocking → blocking
We use it in main() because main() is not a suspend function.
So runBlocking lets us call suspend functions like collect() safely.
It should not be used in production server or Android code.
It is mainly for:
entry points (main)
small demos
tests
Since this is a blog compilation on Flow, we don’t cover coroutines here.