Skip to main content

Command Palette

Search for a command to run...

Chapter 11 : UseCase08 - Conflate

Updated
4 min read

In this use case we look into a condition where we do not care about every value emitted from the producer. We only want the latest update. This is what conflate() does.

Real-World Analogy

Imagine stock prices updating rapidly:

100.23 → 100.54 → 100.01 → 99.99 → 100.12

If your UI is slow, do you really need to display 100.21, 100.15, 100.12?

Or is showing the latest price (100.12) enough?

If the answer is “latest is enough,”
you use conflate().

What conflate() does

conflate() drops intermediate values if the collector() is slow. It keeps only the latest unprocessed value. It replaces old pending values with the latest one.

Let us again have the same old example as in the previous case. Producer emits a value with a delay of 100ms but the collector collects it with a bigger delay of 300ms. Thus the entire producer-consumer pipeline is on the slower side due to a slow consumer.

package org.kotlinflowlearner.stockflow.usecases.uc08

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

class UC08Conflate {

    fun withoutConflate(): Flow<Int> = flow {

        repeat(10){
            println("Emitting $it")
            emit(it)
            delay(100)
        }
    }
}
package org.kotlinflowlearner.stockflow.usecases.uc08

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun main(){

    runBlocking {

        val useCase = UC08Conflate()

        useCase.withoutConflate().collect{
                value ->
            delay(300)
            println("Collected $value")
        }
    }
}

What happens without conflate() in the above example

The collector processes every value.

Producer emits 0 → collector waits 300ms
Producer emits 1 → collector waits 300ms
Everything is processed one by one. This cycle repeats 10 times.

Total time is long.

Now let us introduce the conflate() function.

package org.kotlinflowlearner.stockflow.usecases.uc08

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.flow

class UC08Conflate {

    fun withoutConflate(): Flow<Int> = flow {

        repeat(10){
            println("Emitting $it")
            emit(it)
            delay(100)
        }
    }

    fun withConflate():Flow<Int> = flow {

        repeat(10) {
            println("Emitting with conflate $it")
            emit(it)
            delay(100)
        }
    }.conflate()
}
package org.kotlinflowlearner.stockflow.usecases.uc08

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun main(){

    runBlocking {

        val useCase = UC08Conflate()

        useCase.withoutConflate().collect{
                value ->
            delay(300)
            println("Collected $value")
        }

        useCase.withConflate().collect{
            value -> delay(300)
            println("Collected $value")
        }
    }
}
Emitting with conflate 0
Emitting with conflate 1
Emitting with conflate 2
Collected 0
Emitting with conflate 3
Emitting with conflate 4
Emitting with conflate 5
Collected 2
Emitting with conflate 6
Emitting with conflate 7
Emitting with conflate 8
Collected 5
Emitting with conflate 9
Collected 8
Collected 9

What Happens With conflate()

Producer emits rapidly:

0, 1, 2, 3, 4, 5, 6, 7, 8, 9

Collector is slow.

Instead of buffering all values, Flow keeps replacing the pending value with the newest one.

So the collector may see something like:

0
2
5
8

The above console output is shown that confirms the working of conflate()

Intermediate values are dropped. Only the latest available value is delivered when the collector is ready.

REAL STOCK DATA EXAMPLE Scenario

We simulate a live feed of stock updates.

The producer emits stocks quickly.
The consumer computes a slow operation like formatting or aggregation.

We apply conflate() to keep only the latest stock update.

package org.kotlinflowlearner.stockflow.usecases.uc08

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import org.kotlinflowlearner.stockflow.model.Stock
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.conflate

class UC08StockConflate {

    fun stockStream(stocks:List<Stock>) : Flow<Stock> =
        flow {

            for(x in stocks){
                println("Emitting ${x.name}")
                emit(x)
                delay(10) //slow processing
            }

    }

    fun stockStreamWithConflate(stocks:List<Stock>) : Flow<Stock> 
        = stockStream(stocks).conflate()
}
package org.kotlinflowlearner.stockflow.usecases.uc08

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path

fun main(){

    runBlocking {
        val useCase = UC08StockConflate()

        val resource = requireNotNull(
            object {}.javaClass.classLoader.getResource("stocks.csv")
        )

        val path = Path.of(resource.toURI())

        val stocks = CsvStockLoader.load(path)

        println("---- Without Conflate ----")
        useCase.stockStream(stocks).collect{
            x -> delay(100) //slow processing
            println("Collected ${x.name}")
        }

        println("---- With Conflate ----")

        useCase.stockStreamWithConflate(stocks).collect{
            x -> delay(100)
            println("Collected ${x.name}")
        }
    }
}

Run the above program with the sotck data from the csv.

Observations

Conflate drops intermediate buffered values when the collector is slow and always delivers only the most recent value.

Producer emits every 10ms
Collector processes every 100ms

Emissions still occur upstream. But intermediate values are dropped before reaching the collector. The collector never sees them.

What Happens Internally

flow { ... }.conflate()

Flow inserts a special channel between upstream and downstream.

That channel keeps only the most recent value.

If a new value arrives while the collector is still busy then the previous buffered value is replaced.

So:

  • The producer still emits all values.

  • But only the latest unprocessed value survives.

SUMMARY

Without conflate():

Every stock is processed.
Total time is very long.
Backpressure is strict.

With conflate():

Many stock emissions are skipped.
Only the latest available stock at processing time is handled.
Processing finishes much faster.