Chapter 13 : UseCase10 -distinctUntilChanged
In real streams, values often repeat.
But sometimes, we only care when the value actually changes.
distinctUntilChanged() ensures that the same consecutive value is not emitted twice. It filters out duplicates - but only if they come one after another.
Imagine, we are dealing with stock price updates. If the value hasn't changed, why trigger downstream work again?
This saves:
CPU
Logging noise
UI re-renders
Expensive recalculation
WORKING EXAMPLE
With our stock csv data, let us detect country changes as we stream stocks.
We want to only emit when country changes compared to previous stock.
package org.kotlinflowlearner.stockflow.usecases.uc10
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import org.kotlinflowlearner.stockflow.model.Stock
class UC10DistinctCountryChanges {
fun execute(stocks: List<Stock>): Flow<String> {
return stocks
.asFlow()
.map { stock ->
//println("Upstream: \({stock.name} from \){stock.country}")
stock.country
}
.distinctUntilChanged()
}
}
package org.kotlinflowlearner.stockflow.usecases.uc10
import kotlinx.coroutines.runBlocking
import org.kotlinflowlearner.stockflow.csv.CsvStockLoader
import java.nio.file.Path
fun main(){
runBlocking {
val useCase = UC10DistinctCountryChanges()
val resource = requireNotNull(
object {}.javaClass.classLoader.getResource("stocks.csv")
)
val path = Path.of(resource.toURI())
val stocks = CsvStockLoader.load(path)
useCase.execute(stocks).collect{
country-> println("Collected country change : $country")
}
}
}
The output resembles like this:
Collected country change : United States
Collected country change : Canada
Collected country change : Germany
Collected country change : Switzerland
Collected country change : United Kingdom
Collected country change : Japan
Collected country change : South Korea
Collected country change : Netherlands
Collected country change : Australia
Collected country change : France
Collected country change : United States
Collected country change : Ireland
Collected country change : Chile
Collected country change : Singapore
Collected country change : United States
Collected country change : Denmark
Collected country change : Belgium
Collected country change : Estonia
Collected country change : United States
Collected country change : New Zealand
Consecutive duplicates are removed.
distinctUntilChanged() only removes adjacent duplicates.
If the sequence is:
United States
Germany
United States
The second United States will be emitted. Because it is different from Germany. It does not remember the full history.
SUMMARY
Filtering based on change detection using distinctUntilChanged()
Preventing unnecessary downstream work
How Flow can reduce redundant signals in practical applications