Chapter 17 : UseCase14 - flatMapMerge
In UC13 you used flatMapConcat, which processed each stock one after another.
In UC14 we use flatMapMerge(). This allows multiple inner flows to run at the same time.Now we upgrade from sequential expansion to concurrent expansion.
What We Are Learning
We are learning how to process multiple sub-flows concurrently and merge their results into one stream.
This is useful when:
Each stock requires heavy analytics.
Each stock requires a network call.
Each stock requires database access.
Order does not matter.
Speed matters.
| Operator | Execution | Order Preserved | Concurrency |
|---|---|---|---|
| flatMapConcat | Sequential | Yes | No |
| flatMapMerge | Concurrent | No | Yes |
package org.kotlinflowlearner.stockflow.usecases.uc14
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import org.kotlinflowlearner.stockflow.model.Stock
/**
* UC14 demonstrates how flatMapMerge works.
*
* This use case shows how multiple inner flows can run concurrently.
* Each country generates its own analytics flow.
* All country analytics flows are merged into a single output stream.
*
* The order of emissions is not guaranteed because execution happens concurrently.
*/
class UC14FlatMapMerge{
/**
* Executes the use case.
*
* The function receives the full list of stocks.
* It converts the list into a flow.
* The stocks are grouped by country.
* For each country, an analytics flow is created.
* All analytics flows are merged concurrently.
*
* @param stocks The in-memory list of stocks loaded from CSV.
* @return A Flow of CountryAnalytics emitted concurrently.
*/
fun execute(stocks:List<Stock>) : Flow<CountryAnalytics> {
// Step 1: Group stocks by country
val groupedByCountry:Map<String,List<Stock>> = stocks.groupBy { it.country }
// Step 2: Convert grouped map entries into a Flow
//We have Flow<Pair<Country, List<Stock>>> here
val countryFlow : Flow<Map.Entry<String,List<Stock>>> = groupedByCountry.entries.asFlow()
//step 3: For each country, generate analytics concurrently
//Each pair produced: Flow<CountryAnalytics>
val analyticsFlow: Flow<CountryAnalytics> = countryFlow.flatMapMerge(concurrency = 3){
entry -> val country = entry.key
val stocksInCountry = entry.value
generateCountryAnalytics(country, stocksInCountry)
}
// Step 4: Return the final merged flow
return analyticsFlow
}
/**
* Generates analytics for a single country.
*
* This simulates heavy computation or a network call using delay.
* It calculates the total market capitalization of the country's stocks.
* The result is emitted as a CountryAnalytics object.
*
* @param country The country name.
* @param stocks The list of stocks belonging to that country.
* @return A Flow emitting one CountryAnalytics result.
*/
private fun generateCountryAnalytics(country:String,stocks: List<Stock>) : Flow<CountryAnalytics> = flow{
println("Starting analytics for $country")
delay(300)
val totalMarketCap = stocks.sumOf { it.marketCap }
emit(CountryAnalytics(country,totalMarketCap))
println("Finished analytics for $country")
}
}
/**
Step 1 — Grouping--> List<Stock> turned to Map<Country, List<Stock>> ---> groupedByCountry variable
Step 2 — Convert to Flow --> countryFlow --> Flow<Map.Entry<String, List<Stock>>>
Step 3 — flatMapMerge --> countryFlow.flatMapMerge { entry -> generateCountryAnalytics(...) }
--> Each emission from countryFlow becomes Flow<Flow<CountryAnalytics>>
Step 4 — Flattening + Merging flatMapMerge now:
Runs up to 3 country flows at the same time
Collects their emissions
Merges them into one stream
Step 5 — What generateCountryAnalytics Does
For each country:
delay(300)
val totalMarketCap = stocks.sumOf { it.marketCap }
emit(CountryAnalytics(country, totalMarketCap))
List<Stock>
↓ groupBy
Map<Country, List<Stock>>
↓ entries.asFlow()
Flow<Map.Entry<Country, List<Stock>>>
↓ flatMapMerge
Flow<Flow<CountryAnalytics>>
↓ flatten + merge
Flow<CountryAnalytics>
Now flatMapMerge does two things:
It subscribes to each inner flow.
It merges their emissions into a single outer flow.
So the nested structure disappears.
*/
package org.kotlinflowlearner.stockflow.usecases.uc14
import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path
fun main(){
runBlocking {
val useCase = UC14FlatMapMerge()
val resource = requireNotNull(
object {}.javaClass.classLoader.getResource("stocks.csv")
)
val path = Path.of(resource.toURI())
val stocks = CsvStockLoader.load(path)
useCase.execute(stocks).collect{
println("Collected : $it")
}
}
}
What is happening in the code
Step 0 — What You Start With - You receive: List<Stock>.This is just a plain in-memory collection.
Step 1 — Grouping
val groupedByCountry: Map<String, List<Stock>> = stocks.groupBy { it.country }
Now the structure becomes: Map<String, List<Stock>>
So after step 1 we move from List<Stock> to Map<Country, List<Stock>>
Step 2 — Convert to Flow
val countryFlow: Flow<Map.Entry<String, List<Stock>>> = groupedByCountry.entries.asFlow()
Each emission is: Map.Entry<String, List<Stock>>
So now we have: Flow<Map.Entry<String, List<Stock>>>
Step 3 — flatMapMerge
countryFlow.flatMapMerge { entry ->
generateCountryAnalytics(...)
}
Each emission from countryFlow becomes: Flow<CountryAnalytics>
So internally we temporarily create: Flow<Flow<CountryAnalytics>>
Step 4 — Flattening + Merging
flatMapMerge now:
Runs up to 3 country flows at the same time
Collects their emissions
Merges them into one stream
So, Flow<Flow<CountryAnalytics>> becomes Flow<CountryAnalytics>
Step 5 — What generateCountryAnalytics Does
For each country:
delay(300)
val totalMarketCap = stocks.sumOf { it.marketCap }
emit(CountryAnalytics(country, totalMarketCap))
So each country produces exactly one - CountryAnalytics(country, totalMarketCap)
Final Transformation Summary
Here is the entire pipeline visually:
List<Stock>
↓ groupBy
Map<Country, List<Stock>>
↓ entries.asFlow()
Flow<Map.Entry<Country, List<Stock>>>
↓ flatMapMerge
Flow<Flow<CountryAnalytics>>
↓ flatten + merge
Flow<CountryAnalytics>
SUMMARY
Now flatMapMerge does two things:
It subscribes to each inner flow.
It merges their emissions into a single outer flow.
flatMapMerge allows multiple asynchronous operations triggered by upstream emissions to run concurrently and merge their results into a single downstream stream.