Converting callback-based APIs into Kotlin Flows in Android

Sarthak Garg
3 min readJul 24, 2021

--

Photo by Rod Long on Unsplash

Kotlin flows allow us to seamlessly stream data, asynchronously. While, we can only emit a single value with basic co-routines, flows allow us to emit multiple values. Moreover, they are easy to cleanup thereby preventing memory leaks.

A common use case in Android application development is to stream data from a callback-based API. With callbackFlow we can convert any callback-based API into flows.

Say for our use case, we want to stream the list of users currently logged onto our app. For this example, we’ll be using Firestore with its snapshot change listeners. However, the approach is extensible for any callback-based API in general.

General Approach

The general approach to convert any callback-based API into a Kotlin Flow is:

  1. Define the function
  2. Perform API initialisations
  3. Handle the callback
  4. Ensure cleanup

We’ll review each step in the purview of our example by working with Firestore.

Defining the Function

The first step is to define our function signature. We can only stream a single type of data type with a flow. In our case this would be the list of User objects. So we can define our function as:

fun getUsers(): Flow<List<Users>> { }

To integrate a callbackFlow with the above definition, we just need a make a tiny tweak:

fun getUsers(): Flow<List<User>> = callbackFlow {
}

Performing API initialisations

Inside the function, you’d need to either initialise / fetch / connect to the API instance and optionally initialise your query.

For Firestore, we simply need a reference to our users collection.

fun getUsers(): Flow<List<User>> = callbackFlow {    // Reference to users collection in Firestore
val collectionReference = Firebase.firestore.collection("users")
}

Handling the callback

Now comes the tricky part. We need to handle the callback provided by our API. We can do it by attaching a snapshot change listener to our users collection.

fun getUsers(): Flow<List<User>> = callbackFlow {    // Reference to users collection in Firestore
val collectionReference = Firebase.firestore.collection("users")

// Add snapshot change listener to our collection reference
collectionReference.addSnapshotListener { snapshot, exception ->

}
}

Each time, the callback is triggered, if there’s no exception, we need to offer the updated set of list of users via our flow interface.

fun getUsers(): Flow<List<User>> = callbackFlow {

// Reference to users collection in Firestore
val collectionReference = Firebase.firestore.collection("users")

// Add snapshot change listener to our collection reference
collectionReference.addSnapshotListener { snapshot, exception ->
if (exception != null) {
// Handle exception
return@addSnapshotListener
}

snapshot?.let { _snapshot ->
// Offer updated list of users via flow
offer(_snapshot.toObjects(User::class.java))
}
}
}

We can run our application now and any changes to the users collection will be emitted by our new interface.

Ensuring cleanup

While our application is functionally ready, it has a glaring mistake. Once the flow is stopped, our listener to the Firestore collection has not been cleaned up causing memory leaks.

This would be a common requirement for any callback-based API - to tell the API to close the connection and perform any cleanup necessary. With callbackFlow , we have a function called awaitClose which takes a lambda parameter. Any time the flow is closed, the code inside awaitClose is triggered.

We can get a reference to our snapshot listener and perform our clean up there.

fun getUsers(): Flow<List<User>> = callbackFlow {

// Reference to users collection in Firestore
val collectionReference = Firebase.firestore.collection("users")

// Add snapshot change listener to our collection reference
val snapshotListener =
collectionReference.addSnapshotListener { snapshot, exception ->
if (exception != null) {
// Handle exception
return@addSnapshotListener
}

snapshot?.let { _snapshot ->
// Offer updated list of users via flow
offer(_snapshot.toObjects(User::class.java))
}
}

awaitClose {
// Perform cleanup here
snapshotListener.remove()
}
}

And that’s it!

There’s a lot more that we can do with flows. Each callback-based API comes with its own nuisances but in our experience, we’ve been able to handle them all (and quite seamlessly!) by leveraging the power of Kotlin flows.

Connect on LinkedIn.

--

--