Cold flows vs Hot flows

Un cold flow no emite valores hasta que alguien lo colecta — y cada colector recibe su propia secuencia desde el principio. Es como un video on-demand: cada espectador comienza desde el inicio.

Un hot flow emite valores independientemente de si hay colectores, y los nuevos colectores se unen "en vivo". Es como una transmisión en directo: llegás cuando llegás y recibís lo que se emite desde ese momento.

  • Flow<T> (builder flow { }) → cold
  • StateFlow → hot, siempre tiene un valor actual, replay = 1
  • SharedFlow → hot, configurable replay, para eventos
  • Los flows de Room → cold (pero cada cambio en la DB emite un nuevo valor)

Flow builders

// flow { } — el builder básico, para cold flows
fun contarHasta(n: Int): Flow<Int> = flow {
    for (i in 1..n) {
        delay(1000)    // suspende 1 segundo (cooperativo, cancelable)
        emit(i)        // emite el valor
    }
}

// flowOf — para valores conocidos
val flow1 = flowOf(1, 2, 3)

// asFlow — convierte colecciones e iterables
val flow2 = listOf("a", "b", "c").asFlow()

// channelFlow — para código que no es suspend (callbacks, listeners)
fun locationFlow(): Flow<Location> = callbackFlow {
    val listener = LocationListener { location -> trySend(location) }
    locationManager.addListener(listener)
    awaitClose { locationManager.removeListener(listener) }  // limpieza al cancelar
}

map, filter, transform

// map — transforma cada valor emitido
repository.getProductos()            // Flow<List<ProductoEntity>>
    .map { entities ->
        entities.map { it.toDomainModel() }
    }                                // Flow<List<Producto>>

// filter — descarta valores que no cumplen el predicado
repository.getEventos()
    .filter { evento -> evento.activo }

// transform — el más flexible: puede emitir 0, 1 o N valores por input
repository.getProductos()
    .transform { lista ->
        if (lista.isEmpty()) emit(UiState.Empty)
        else emit(UiState.Success(lista))
    }

// onEach — efecto secundario sin transformar el valor (útil para logs)
repository.getProductos()
    .onEach { Log.d("Flow", "Nuevos productos: ${it.size}") }
    .collect { ... }

combine y zip

// combine — emite cada vez que CUALQUIERA de los flows emite un nuevo valor
// Siempre usa el último valor de cada flow
val nombreFlow: Flow<String> = ...
val edadFlow: Flow<Int> = ...

combine(nombreFlow, edadFlow) { nombre, edad ->
    "Me llamo $nombre y tengo $edad años"
}.collect { texto ->
    binding.tvInfo.text = texto
}

// Caso real: combinar filtro de búsqueda + lista completa
val busquedaFlow = MutableStateFlow("")
val productosFlow = repository.getProductos()

combine(productosFlow, busquedaFlow) { productos, busqueda ->
    if (busqueda.isBlank()) productos
    else productos.filter { it.nombre.contains(busqueda, ignoreCase = true) }
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())

// zip — emite solo cuando AMBOS flows emiten un valor, en pares
flow1.zip(flow2) { a, b -> "$a-$b" }.collect { ... }
// Si flow1 emite más rápido que flow2, espera a flow2 antes de emparejar

flatMapLatest — el operador más útil

flatMapLatest convierte cada valor emitido en un nuevo flow, y cuando llega un nuevo valor, cancela el flow anterior y empieza el nuevo. Ideal para búsquedas en tiempo real:

// Búsqueda reactiva: cada vez que cambia el texto, cancela la búsqueda anterior
val busquedaFlow = MutableStateFlow("")

val resultadosFlow = busquedaFlow
    .debounce(300)           // espera 300ms de inactividad antes de emitir
    .distinctUntilChanged()  // descarta si el valor no cambió
    .flatMapLatest { query ->
        if (query.isBlank()) flowOf(emptyList())
        else repository.buscarProductos(query)  // Flow<List<Producto>>
    }
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())

debounce es clave para búsquedasSin debounce, cada keystroke lanzaría una llamada a la API. Con debounce(300) esperás 300ms de pausa antes de buscar, reduciendo las llamadas drásticamente.

buffer y conflate

// buffer — el productor y el consumidor corren en paralelo
// útil cuando el collector es más lento que el emisor
flow { /* emite rápido */ }
    .buffer(capacity = 10)  // el emisor puede adelantarse hasta 10 items
    .collect { /* procesa más lento */ }

// conflate — si el collector es lento, descarta valores intermedios
// solo procesa el más reciente
flow { /* emite 1, 2, 3, 4, 5 rápidamente */ }
    .conflate()
    .collect { value ->
        delay(1000)       // procesa lento
        println(value)    // puede imprimir 1, 3, 5 (descarta 2 y 4)
    }

stateIn y shareIn — convertir cold en hot

Para exponer un cold flow del repositorio como un StateFlow en el ViewModel, usás stateIn:

class ProductosViewModel(repo: ProductoRepository) : ViewModel() {

    // Convierte el Flow<List<Producto>> del Room en un StateFlow
    val productos: StateFlow<List<Producto>> = repo.getProductos()
        .stateIn(
            scope = viewModelScope,
            // WhileSubscribed(5000): deja de colectar si no hay suscriptores
            // por más de 5 segundos (ej: rotación del dispositivo)
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )
}

// shareIn — similar pero para SharedFlow, cuando múltiples colectores
// necesitan el mismo flow sin que cada uno lo colecte por separado
val eventos = eventoRepository.getEventos()
    .shareIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(),
        replay = 0
    )

WhileSubscribed(5000) es el sweet spotUsar WhileSubscribed(5000) es la recomendación oficial de Google. Los 5000ms de gracia evitan que el flow se cancele y reinicie durante rotaciones de pantalla, que normalmente tardan menos de ese tiempo.