Skip to main content

Command Palette

Search for a command to run...

Chapter 17 : UseCase14 - flatMapMerge

Updated
5 min read

In UC13 you used flatMapConcat, which processed each stock one after another.

In UC14 we use flatMapMerge(). This allows multiple inner flows to run at the same time.Now we upgrade from sequential expansion to concurrent expansion.

What We Are Learning

We are learning how to process multiple sub-flows concurrently and merge their results into one stream.

This is useful when:

  • Each stock requires heavy analytics.

  • Each stock requires a network call.

  • Each stock requires database access.

  • Order does not matter.

  • Speed matters.

Operator Execution Order Preserved Concurrency
flatMapConcat Sequential Yes No
flatMapMerge Concurrent No Yes
package org.kotlinflowlearner.stockflow.usecases.uc14

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import org.kotlinflowlearner.stockflow.model.Stock

/**
 * UC14 demonstrates how flatMapMerge works.
 *
 * This use case shows how multiple inner flows can run concurrently.
 * Each country generates its own analytics flow.
 * All country analytics flows are merged into a single output stream.
 *
 * The order of emissions is not guaranteed because execution happens concurrently.
 */
class UC14FlatMapMerge{

    /**
     * Executes the use case.
     *
     * The function receives the full list of stocks.
     * It converts the list into a flow.
     * The stocks are grouped by country.
     * For each country, an analytics flow is created.
     * All analytics flows are merged concurrently.
     *
     * @param stocks The in-memory list of stocks loaded from CSV.
     * @return A Flow of CountryAnalytics emitted concurrently.
     */
    fun execute(stocks:List<Stock>) : Flow<CountryAnalytics> {

        // Step 1: Group stocks by country
         val groupedByCountry:Map<String,List<Stock>> = stocks.groupBy { it.country }

        // Step 2: Convert grouped map entries into a Flow
        //We have Flow<Pair<Country, List<Stock>>> here
        val countryFlow : Flow<Map.Entry<String,List<Stock>>> = groupedByCountry.entries.asFlow()

        //step 3: For each country, generate analytics concurrently
        //Each pair produced: Flow<CountryAnalytics>
        val analyticsFlow: Flow<CountryAnalytics> = countryFlow.flatMapMerge(concurrency = 3){
            entry -> val country = entry.key
            val stocksInCountry = entry.value
            generateCountryAnalytics(country, stocksInCountry)
        }

        // Step 4: Return the final merged flow

        return analyticsFlow
    }

    /**
     * Generates analytics for a single country.
     *
     * This simulates heavy computation or a network call using delay.
     * It calculates the total market capitalization of the country's stocks.
     * The result is emitted as a CountryAnalytics object.
     *
     * @param country The country name.
     * @param stocks The list of stocks belonging to that country.
     * @return A Flow emitting one CountryAnalytics result.
     */
    private fun generateCountryAnalytics(country:String,stocks: List<Stock>) : Flow<CountryAnalytics> = flow{

        println("Starting analytics for $country")
        delay(300)

        val totalMarketCap = stocks.sumOf { it.marketCap }

        emit(CountryAnalytics(country,totalMarketCap))

        println("Finished analytics for $country")
    }
}

/**
Step 1 — Grouping--> List<Stock> turned to Map<Country, List<Stock>> ---> groupedByCountry variable
Step 2 — Convert to Flow --> countryFlow --> Flow<Map.Entry<String, List<Stock>>>
Step 3 — flatMapMerge --> countryFlow.flatMapMerge { entry -> generateCountryAnalytics(...) }
                       --> Each emission from countryFlow becomes Flow<Flow<CountryAnalytics>>

Step 4 — Flattening + Merging flatMapMerge now:
Runs up to 3 country flows at the same time
Collects their emissions
Merges them into one stream

Step 5 — What generateCountryAnalytics Does
For each country:
delay(300)
val totalMarketCap = stocks.sumOf { it.marketCap }
emit(CountryAnalytics(country, totalMarketCap))

List<Stock>
↓ groupBy
Map<Country, List<Stock>>
↓ entries.asFlow()
Flow<Map.Entry<Country, List<Stock>>>
↓ flatMapMerge
Flow<Flow<CountryAnalytics>>
↓ flatten + merge
Flow<CountryAnalytics>


Now flatMapMerge does two things:

It subscribes to each inner flow.
It merges their emissions into a single outer flow.
So the nested structure disappears.
 */
package org.kotlinflowlearner.stockflow.usecases.uc14

import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path

fun main(){

    runBlocking {

        val useCase = UC14FlatMapMerge()

        val resource = requireNotNull(
            object {}.javaClass.classLoader.getResource("stocks.csv")
        )

        val path = Path.of(resource.toURI())

        val stocks = CsvStockLoader.load(path)

        useCase.execute(stocks).collect{
            println("Collected : $it")
        }
    }
}

What is happening in the code

Step 0 — What You Start With - You receive: List<Stock>.This is just a plain in-memory collection.

Step 1 — Grouping

val groupedByCountry: Map<String, List<Stock>> =  stocks.groupBy { it.country }

Now the structure becomes: Map<String, List<Stock>>

So after step 1 we move from List<Stock> to Map<Country, List<Stock>>

Step 2 — Convert to Flow

val countryFlow: Flow<Map.Entry<String, List<Stock>>> = groupedByCountry.entries.asFlow()

Each emission is: Map.Entry<String, List<Stock>>

So now we have: Flow<Map.Entry<String, List<Stock>>>

Step 3 — flatMapMerge

countryFlow.flatMapMerge { entry ->
    generateCountryAnalytics(...)
}

Each emission from countryFlow becomes: Flow<CountryAnalytics>

So internally we temporarily create: Flow<Flow<CountryAnalytics>>

Step 4 — Flattening + Merging

flatMapMerge now:

  • Runs up to 3 country flows at the same time

  • Collects their emissions

  • Merges them into one stream

So, Flow<Flow<CountryAnalytics>> becomes Flow<CountryAnalytics>

Step 5 — What generateCountryAnalytics Does

For each country:

delay(300)
val totalMarketCap = stocks.sumOf { it.marketCap }
emit(CountryAnalytics(country, totalMarketCap))

So each country produces exactly one - CountryAnalytics(country, totalMarketCap)

Final Transformation Summary

Here is the entire pipeline visually:

List<Stock>

    ↓ groupBy

Map<Country, List<Stock>>

    ↓ entries.asFlow()

Flow<Map.Entry<Country, List<Stock>>>

    ↓ flatMapMerge

Flow<Flow<CountryAnalytics>>

    ↓ flatten + merge

Flow<CountryAnalytics>

SUMMARY

Now flatMapMerge does two things:

  1. It subscribes to each inner flow.

  2. It merges their emissions into a single outer flow.

flatMapMerge allows multiple asynchronous operations triggered by upstream emissions to run concurrently and merge their results into a single downstream stream.