Converting Rx Combine Operators to Coroutines

One of my favourite selling points of RxJava is the ability to chain asynchronous calls and merge results with other asynchronous calls…

One of my favourite selling points of RxJava is the ability to chain asynchronous calls and merge results with other asynchronous calls. For example, with the zip() function, we can easily:

  • Make a network call
  • Do a second network call at the same time
  • Wait until both network calls are completed
  • Combine the results of both network calls
  • Emit the combined results to any interested observers
  • The implementation might look something like this:

    We can reduce the line count to almost half if we chain our commands:

    With Coroutines, we can easily accomplish the same result right out of the box. Our code simply shifts from a chained, functional paradigm to an async-await pattern.

    Let’s start by wrapping the entire function in a CoroutineScope with a Dispatchers.Main context. We use Dispatchers.Main so that our code can manipulate the UI without the need to switch threads/context. Each of the two network calls can now be converted to internal synchronous calls. However, since each of these network calls do need to be made on a background thread, we wrap them in an async-await calls. Finally, we update the view with our new message. The result will look like this:

    But wait! If we run this code, we’ll notice that the second network call doesn’t start until the first one has completed! What did we do wrong? The culprit is the first await() call. It suspends the coroutine until the async{…} call is complete. However, since our intent is to emulate the Rx Zip, we don’t want the second network call to wait on the first call’s completion. Instead, we want both network calls to be kicked off together, and to suspend the coroutine only if either network call is yet to be completed.

    Our hero today will be the Deferred object. The async{…} function returns a Deferred object that we can store in a local variable. We can invoke await() on that object later on — when we actually need to wait for that long-running task to complete — as this will allow the network calls to run without suspending the coroutine until it really needs to. This change will look like this:

    We can further shorten this code, but be careful with shortening your code, as it can be easy to lose readability:

    There you have it! The same functionality as the Rx Zip, but with Coroutines. The code is cleaner and arguably more readable since we don’t need to have an in-depth understanding of RxJava.


    About the author

    Keane Quibilan is an r2d2 developer.

    TribalScale is a global innovation firm that helps enterprises adapt and thrive in the digital era. We transform teams and processes, build best-in-class digital products, and create disruptive startups. Learn more about us on our website. Connect with us on Twitter, LinkedIn & Facebook!

    Visit Us on Medium

    You might also be interested in…