Cold flows vs Hot flows
Un cold flow no emite valores hasta que alguien lo colecta — y cada colector recibe su propia secuencia desde el principio. Es como un video on-demand: cada espectador comienza desde el inicio.
Un hot flow emite valores independientemente de si hay colectores, y los nuevos colectores se unen "en vivo". Es como una transmisión en directo: llegás cuando llegás y recibís lo que se emite desde ese momento.
Flow<T>(builderflow { }) → coldStateFlow→ hot, siempre tiene un valor actual, replay = 1SharedFlow→ hot, configurable replay, para eventos- Los flows de Room → cold (pero cada cambio en la DB emite un nuevo valor)
Flow builders
// flow { } — el builder básico, para cold flows
fun contarHasta(n: Int): Flow<Int> = flow {
for (i in 1..n) {
delay(1000) // suspende 1 segundo (cooperativo, cancelable)
emit(i) // emite el valor
}
}
// flowOf — para valores conocidos
val flow1 = flowOf(1, 2, 3)
// asFlow — convierte colecciones e iterables
val flow2 = listOf("a", "b", "c").asFlow()
// channelFlow — para código que no es suspend (callbacks, listeners)
fun locationFlow(): Flow<Location> = callbackFlow {
val listener = LocationListener { location -> trySend(location) }
locationManager.addListener(listener)
awaitClose { locationManager.removeListener(listener) } // limpieza al cancelar
}
map, filter, transform
// map — transforma cada valor emitido
repository.getProductos() // Flow<List<ProductoEntity>>
.map { entities ->
entities.map { it.toDomainModel() }
} // Flow<List<Producto>>
// filter — descarta valores que no cumplen el predicado
repository.getEventos()
.filter { evento -> evento.activo }
// transform — el más flexible: puede emitir 0, 1 o N valores por input
repository.getProductos()
.transform { lista ->
if (lista.isEmpty()) emit(UiState.Empty)
else emit(UiState.Success(lista))
}
// onEach — efecto secundario sin transformar el valor (útil para logs)
repository.getProductos()
.onEach { Log.d("Flow", "Nuevos productos: ${it.size}") }
.collect { ... }
combine y zip
// combine — emite cada vez que CUALQUIERA de los flows emite un nuevo valor
// Siempre usa el último valor de cada flow
val nombreFlow: Flow<String> = ...
val edadFlow: Flow<Int> = ...
combine(nombreFlow, edadFlow) { nombre, edad ->
"Me llamo $nombre y tengo $edad años"
}.collect { texto ->
binding.tvInfo.text = texto
}
// Caso real: combinar filtro de búsqueda + lista completa
val busquedaFlow = MutableStateFlow("")
val productosFlow = repository.getProductos()
combine(productosFlow, busquedaFlow) { productos, busqueda ->
if (busqueda.isBlank()) productos
else productos.filter { it.nombre.contains(busqueda, ignoreCase = true) }
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())
// zip — emite solo cuando AMBOS flows emiten un valor, en pares
flow1.zip(flow2) { a, b -> "$a-$b" }.collect { ... }
// Si flow1 emite más rápido que flow2, espera a flow2 antes de emparejar
flatMapLatest — el operador más útil
flatMapLatest convierte cada valor emitido en un nuevo flow, y cuando llega un nuevo valor, cancela el flow anterior y empieza el nuevo. Ideal para búsquedas en tiempo real:
// Búsqueda reactiva: cada vez que cambia el texto, cancela la búsqueda anterior
val busquedaFlow = MutableStateFlow("")
val resultadosFlow = busquedaFlow
.debounce(300) // espera 300ms de inactividad antes de emitir
.distinctUntilChanged() // descarta si el valor no cambió
.flatMapLatest { query ->
if (query.isBlank()) flowOf(emptyList())
else repository.buscarProductos(query) // Flow<List<Producto>>
}
.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())
debounce es clave para búsquedasSin debounce, cada keystroke lanzaría una llamada a la API. Con debounce(300) esperás 300ms de pausa antes de buscar, reduciendo las llamadas drásticamente.
buffer y conflate
// buffer — el productor y el consumidor corren en paralelo
// útil cuando el collector es más lento que el emisor
flow { /* emite rápido */ }
.buffer(capacity = 10) // el emisor puede adelantarse hasta 10 items
.collect { /* procesa más lento */ }
// conflate — si el collector es lento, descarta valores intermedios
// solo procesa el más reciente
flow { /* emite 1, 2, 3, 4, 5 rápidamente */ }
.conflate()
.collect { value ->
delay(1000) // procesa lento
println(value) // puede imprimir 1, 3, 5 (descarta 2 y 4)
}
stateIn y shareIn — convertir cold en hot
Para exponer un cold flow del repositorio como un StateFlow en el ViewModel, usás stateIn:
class ProductosViewModel(repo: ProductoRepository) : ViewModel() {
// Convierte el Flow<List<Producto>> del Room en un StateFlow
val productos: StateFlow<List<Producto>> = repo.getProductos()
.stateIn(
scope = viewModelScope,
// WhileSubscribed(5000): deja de colectar si no hay suscriptores
// por más de 5 segundos (ej: rotación del dispositivo)
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
}
// shareIn — similar pero para SharedFlow, cuando múltiples colectores
// necesitan el mismo flow sin que cada uno lo colecte por separado
val eventos = eventoRepository.getEventos()
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(),
replay = 0
)
WhileSubscribed(5000) es el sweet spotUsar WhileSubscribed(5000) es la recomendación oficial de Google. Los 5000ms de gracia evitan que el flow se cancele y reinicie durante rotaciones de pantalla, que normalmente tardan menos de ese tiempo.