Skip to main content

Command Palette

Search for a command to run...

Chapter 5 : UseCase02 - Transformations

Updated
4 min read

UC 02 — Flow Transformations: map, filter, take, onEach

In the previous chapter, we learned that a Flow is lazy and only executes when collected.

Now we take the next step.

We will transform the stream.

This chapter introduces four essential operators:

  • map

  • filter

  • take

  • onEach

These operators form the backbone of Flow pipelines.


What We Are Learning Here

A Flow is not just about emitting values.

It is about transforming values as they move downstream.

Each operator:

  • receives emissions

  • applies logic

  • forwards results

  • preserves sequential execution

Operators do not execute immediately. They remain lazy until collection.


Creating the Use Case

Create a new package: usecases.uc02

Create the class file UC02Transformations.kt

package org.kotlinflowlearner.stockflow.usecases.uc02

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import org.kotlinflowlearner.stockflow.model.Stock

class UC02Transformations(private val stocks:List<Stock>) {

    /**
     * Returns a Flow<String> representing stock names
     * that match specific criteria.
     *
     * The pipeline demonstrates transformation and filtering.
     */
    fun execute(): Flow<String> {
        return stocks
            .asFlow()
            .onEach { println("Upstream emission: ${it.name}") }
            .filter { it.priceUsd > 100 }
            .map { stock -> "\({stock.name} (\){stock.country})" }
            .take(5)
    }
}

What This Pipeline Does Step by Step

  1. Convert the List into a Flow.

  2. Log every upstream emission.

  3. Filter stocks with price greater than 100.

  4. Transform each Stock into a formatted String.

  5. Stop after 5 matching emissions.

This forms a linear pipeline.

Each operator runs sequentially.

Again we have a Main.kt for this use case to test the outcome.

package org.kotlinflowlearner.stockflow.usecases.uc02

import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path

fun main(){

    runBlocking{

        val resource = requireNotNull(
            object {}.javaClass.classLoader.getResource("stocks.csv")
        )

        val path = Path.of(resource.toURI())
        val stocks = CsvStockLoader.load(path)

        val useCase = UC02Transformations(stocks)

        useCase.execute().collect{
            result -> println("Collected: $result")
        }
    }
}

After running the program, the logs appear like this:

Upstream emission: Aurora Systems
Collected: Aurora Systems (United States)
Upstream emission: BluePeak Energy
Upstream emission: Helios Dynamics
Collected: Helios Dynamics (Germany)
Upstream emission: Nimbus Health Group
Collected: Nimbus Health Group (Switzerland)
Upstream emission: Vertex Financial
Collected: Vertex Financial (United Kingdom)
Upstream emission: Solstice Technologies
Upstream emission: Ironclad Semiconductors
Collected: Ironclad Semiconductors (South Korea)

What You Should Observe

You will notice:

  • Logging from onEach

  • Only stocks above 100 USD pass the filter

  • Only 5 matching stocks are collected

  • Execution stops early because of take(5)

The moment 5 items are emitted, the upstream flow is cancelled.

This shows structured cancellation in action.

Sequential Execution Learning

We have chained multiple operators in the example:

asFlow → onEach → filter → map → take

Each emission flows through them in order.

For each Stock:

  1. onEach runs

  2. filter checks

  3. map transforms

  4. take counts

Then the next Stock begins.

Flow is sequential by default.

Important Concept: Operator Order Matters

If you swap operators, behaviour changes.

Example:

Placing take(5) before filter would limit the first 5 stocks,
not the first 5 that match the condition.

Flow pipelines are declarative but order-sensitive.

Important Observation

Why you see more than 5 "Upstream emission" logs, but only 5 Collected: lines in the log output.

When the above pipeline is executed this happens:

  • onEach runs for every upstream emission

  • filter removes some items

  • take(5) counts only items that pass the filter

So the flow continues scanning upstream items until it finds 5 that satisfy the filter.

That means:

  • Some items are logged by onEach

  • But they don’t pass the filter

So they are not counted by take(5)

Key concept - take(5) counts downstream emissions, not upstream attempts.

Summary

  • How to transform data inside a Flow

  • How filtering works

  • How to limit emissions

  • How operator order affects results

  • That laziness still applies

  • Operators are sequential

  • Each operator only sees what comes before it

  • take works after filtering

  • Side-effect operators like onEach observe upstream

This is a very important mental model for Flow.

ADDENDUM

We used runblocking to run the code. This is not something I forgot in chapter 4.

runBlocking is a bridge between regular (blocking) code and coroutines.

It starts a coroutine and blocks the current thread until that coroutine completes.

In simple terms:

  • launch → non-blocking

  • runBlocking → blocking

We use it in main() because main() is not a suspend function.
So runBlocking lets us call suspend functions like collect() safely.

It should not be used in production server or Android code.
It is mainly for:

  • entry points (main)

  • small demos

  • tests

Since this is a blog compilation on Flow, we don’t cover coroutines here.