An introduction to Kotlin Coroutines

Motivation

Asynchronous and concurrent programming plays an important role in the current world of Web APIs and microservices, where a significant part of our code is about orchestrating network interactions. Using traditional synchronous models, where threads are blocked while waiting for external responses, is not suitable for platforms where threads are costly, such as .NET or the JVM, or where there are special threads that can’t be blocked, such as Javascript or Android applications.

There are various programming models to handle asynchronicity, ranging from simple callbacks to reactive streams as a way to handle asynchronous sequences. Among these, the concept of future has seen broad adoption in multiple platforms (e.g. Promises and thenables in javascript, CompletableFuture in Java 8, Task in .NET), including language support via the async-await constructs, which are now available in languages such as C#, Javascript, and Python. However, instead of also adding explicit async-await support in the Kotlin language, their designers decided to go a different route and address these problems using the different and more generic, although rather old, concept of coroutines.

This guide provides a slow-paced introduction to Kotlin coroutines, as implemented via suspending functions, and their use to write asynchronous and concurrent programs. Starting from the ground-up, we show how suspending functions allows us to turn callbacks into suspension points that don’t break the apparent control flow. From then we move to creating and starting coroutines as instances of these suspending functions, taking a look at the underlying state machine and continuation interfaces. With this knowledge, we show how the async-await construct can be implemented as library functions without needing explicit language support. We also show how to achieve interoperability with other JVM asynchronous constructs, converting between them and coroutines.

In this guide we made the choice of starting on the primitive coroutine constructs and then proceed to the more high-level and shinny things that can be accomplished with coroutines, namely async-await. Our goal with this bottom-up approach is to provide a more solid framework upon which to understand all the feature provided by the Kotlin coroutine libraries.

Before we start, a word of caution: Kotlin coroutines are currently in experiment status, however, their use is highly encouraged by the language designers and constitute an important mechanism for anyone programming connected systems.

Coroutines and suspending functions

Let’s start our journey into coroutines with a simple plain function that takes some parameters, performs two steps and returns a value.

fun simpleFunction(a: Int, b: Int): Int {
    log.info("step 1")
    log.info("step 2")
    return a + b
}

Calling this function from our main entry point function

fun main(args: Array<String>) {
    log.info("main started")
    log.info("result is {}", simpleFunction(40, 2))
    log.info("main ended")
}

produces the following log messages

8 [main] INFO intro - main started
9 [main] INFO intro - step 1
9 [main] INFO intro - step 2
10 [main] INFO intro - result is 42
10 [main] INFO intro - main ended

The value between brackets contains the names for the thread where the log.info calls were performed: in this example all were performed on the main thread (the one that calls the main method). The following diagram illustrates the execution of simpleFunction, highlighting the fact that all statements are executed in the same thread.

simpleFunction

Now, suppose that between step 1 and step 2 we need to wait for something to happen, such as receiving a message from an external system or waiting for a time period to elapse. To keep things straightfoward let’s illustrate that situation using a simple Thread.sleep

fun simpleFunctionWithDelay(a: Int, b: Int): Int {
    log.info("step 1")
    Thread.sleep(1000)
    log.info("step 2")
    return a + b
}

Running this function from our main method now produces the following output

7 [main] INFO intro - main started
8 [main] INFO intro - step 1
1011 [main] INFO intro - step 2
1012 [main] INFO intro - result is 42
1012 [main] INFO intro - main ended

Again, all the statements are run on the main thread, which blocks for approximately 1000 ms between step 1 and step 2.

simpleFunctionWithDelay

However, blocking threads may not be a good thing:

  • On client applications (e.g. Android applications), if the blocked thread is the GUI (Graphical User Interface) thread then the application will become unresponsive during the blocking period.

  • On server applications, blocking threads will reduce the number of threads available to process new incoming requests and therefore reduce the system’s throughput.

Kotlin suspending functions provide us with a way of handling these pauses in a sequential flow of statements without blocking the hosting thread. Namely, it allows a function

  • to suspend its execution, by returning immediately to its caller and sort of “free up” the hosting thread.
  • resume its execution at a future point in time, potentially on a different thread.

So, let’s convert the previous simpleFunctionWithDelay example to a suspending function that does not block the hosting thread for the 1000 ms period, using suspending functions:

suspend fun suspendFunctionWithDelay(a: Int, b: Int): Int {
    log.info("step 1")
    suspendCoroutine<Unit> { cont ->
        executor.schedule(
          { cont.resume(Unit) }, 
          1000,TimeUnit.MILLISECONDS)
    }
    log.info("step 2")
    return a + b
}

The first thing to notice is that a suspending function declaration is prefixed with the suspend keyword. However, the remaining function signature is unchanged: it still receives two integers and returns an integer. As a comparison, in C#, an async function that asynchronously returns an int will have Task<int> as the return type and not int.

Looking into the function body we notice that it remains mostly unchanged, except for the Thread.sleep call that was replaced by a call to suspendCoroutine.

This suspendCoroutine function, available in the kotlin.coroutines.experimental package, is used to suspend the function when it is invoked. It is one of the building blocks used by the coroutines libraries and has the following signature

public inline suspend fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

The main thing to understand is the block parameter, which is a function receiving a continuation representing the resume point after the suspension. A continuation is an object implementing the following interface

public interface Continuation<in T> {
    /**
     * Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point.
     */
    public fun resume(value: T)

    /**
     * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
     * last suspension point.
     */
    public fun resumeWithException(exception: Throwable)

    /**
     * Context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext
}

Ignoring the context field for the moment being, a Continuation<T> has two members:

  • the resume function, to be called if the suspending function should resume normally with a value;
  • and the resumeWithException function to be called if the suspending function should resume with an exception.

In our case, the continuation will sort of reference the statement log.info("step 2"), i.e., the statement after the point where the function called suspendCoroutine. It is the responsibility of the function invoking suspendCoroutine to pass a block that does something with that continuation.

In our case, we just we use a plain Java ScheduledExecutorService

val executor = Executors.newScheduledThreadPool(1)

to schedule the execution of the continuation after 1000 ms.

suspendCoroutine<Unit> { cont ->
        executor.schedule(
          { cont.resume(Unit) }, 
          1000,TimeUnit.MILLISECONDS)
    }

The { cont.resume(Unit) } is just the Runnable passed into the executor.

When the main thread enters the suspendCoroutine function, the input block will be called with the continuation and a callback will be scheduled on the executor. After this, the main thread returns from the suspendCoroutine function and also from the suspendFunctionWithDelay. Yes, that is right, the main thread does not continue to the log.info("step 2") statement.

The following diagram illustrates the suspension and later resumption of the suspendFunctionWithDelayfunction.

suspendFunctionWithDelay

The main thread enters the suspendFunctionWithDelay, executes the first step and then enters the suspendCoroutine, where the passed in block is immediately called, scheduling the cont.resume(Unit) to be run after 1000 ms. After this, the main thread leaves the suspendCoroutine and, since this function is marked as suspend, it also leaves the suspendFunctionWithDelay. The main thread does not continue to step 2. Instead, the suspendFunctionWithDelay execution is suspended without blocking the main thread.

After the 1000 ms elapse, a thread from the scheduled pool (in orange color) calls cont.resume(Unit), resuming the execution of the suspendFunctionWithDelay. This suspension and resumption, including the switch between threads, is visible in the program output

8 [main] INFO intro - main started
20 [main] INFO intro - step 1
24 [main] INFO intro - main ended
1027 [pool-1-thread-1] INFO intro - step 2
1029 [pool-1-thread-1] INFO intro - result is 42

Notice that the main function ends immediately after step 1, without waiting for the 1000 ms to elapse, because suspendFunctionWithDelay suspended its execution and returns to the main function. After the 1000 ms elapses, the suspendFunctionWithDelay resumes its execution in the pool-1-thread-1 (a thread from the scheduled pool) and step 2 is executed.

Using suspendCoroutine directly in our suspendFunctionWithDelay makes the code a slightly brittle to read, namely due to the nested lambda passed as a parameter. However, that can be easily handled by wrapping that behavior on a helper suspending function

suspend fun delay(ms: Long) {
    suspendCoroutine<Unit> { continuation ->
        executor.schedule({ continuation.resume(Unit) }, ms, TimeUnit.MILLISECONDS)
    }
}

The suspendFunctionWithDelay now becomes

suspend fun suspendFunctionWithDelay2(a: Int, b: Int): Int {
    log.info("step 1")
    delay(1000)
    log.info("step 2")
    return a + b
}

which is as readable as our initial simpleFunctionWithDelay that used Thread.sleep, however has a non-blocking behavior. In fact, the only thing different in suspendFunctionWithDelay is the fact that it has the suspend modifier. Everything else is equal to the plain old simpleFunctionWithDelay blocking function, namely:

  • No change in the function signature; it still returns a plain Int.
  • No special keywords used in the function body.

However, the simpleFunctionWithDelay exhibits the remarking behavior of

  • being after the delay call, freeing up the calling thread;
  • resuming execution after 1000 ms on a different thread.

Namely, note that both log.info statements are in the same block however they will be executed in different threads.

We are still missing an important part of the picture Suspending functions can call regular functions or other suspending functions. For instance, in the previous example the suspendFunctionWithDelay is a suspending function so it can call delay directly, which is also a suspending function. However, suspending functions cannot be called directly from regular functions. Namely, our regular main function cannot call suspendFunctionWithDelay directly. For that, we need to use another one of the building blocks provided by the Kotlin library: the startCoroutine function, which is a regular (i.e. non-suspending) extension function over a suspending lambda.

public fun <T> (suspend  () -> T).startCoroutine(
        completion: Continuation<T>
)

The startCoroutine receives:

  • the suspending lambda to start (as the extension target).
  • the continuation to use when the suspending function completes its execution.

Using it we can create a simple startAndForget function

fun startAndForget(suspendingFunction: suspend () -> Unit) {
    suspendingFunction.startCoroutine(object : Continuation<Unit> {
        override fun resume(value: Unit) {
            // forget it
        }

        override fun resumeWithException(exception: Throwable) {
            // forget it
        }

        override val context: CoroutineContext
            get() = EmptyCoroutineContext
    })
}

that starts a suspending function and ignores its result.

We are now able to show the main function using the suspending version

fun main(args: Array<String>) {
    log.info("main started")
    startAndForget {
        log.info("result is {}", suspendFunctionWithDelay2(40, 2))
    }
    log.info("main ended")
}

which produces the output already shown before

8 [main] INFO intro - main started
20 [main] INFO intro - step 1
24 [main] INFO intro - main ended
1027 [pool-1-thread-1] INFO intro - step 2
1029 [pool-1-thread-1] INFO intro - result is 42

Notice how the result is 42 log message appears in the pool-1-thread-1 after the main function is terminated.

The following diagram depicts the complete picture, including the log.info with the suspendFunctionWithDelay2 returned value, as well as the final continuation.

suspendFunctionWithDelay

The continuation passed into startCoroutine allow us to do more interesting things than just ignoring the result. For instance, the following example uses a CompletableFuture to allow the main function to synchronize with the completable function termination.

fun startAndGetFuture(suspendingFunction: suspend () -> Unit): CompletableFuture<Unit>{
    val future = CompletableFuture<Unit>()
    suspendingFunction.startCoroutine(object : Continuation<Unit> {
        override fun resume(value: Unit) {
            future.complete(value)
        }

        override fun resumeWithException(exception: Throwable) {
            future.completeExceptionally(exception)
        }

        override val context: CoroutineContext
            get() = EmptyCoroutineContext
    })
    return future
}

With this, we can rewrite main to synchronize with the future completion. We use a blocking future.get() but on a real scenario we could also use a non-blocking future.thenApply.

fun main(args: Array<String>) {
    log.info("main started")
    val future = startAndGetFuture {
        log.info("result is {}", suspendFunctionWithDelay2(40, 2))
    }
    future.get()
    executor.shutdown()
    log.info("main ended")
}

Running this main function produced

8 [main] INFO intro - main started
16 [main] INFO intro - step 1
1022 [pool-1-thread-1] INFO intro - step 2
1024 [pool-1-thread-1] INFO intro - result is 42
1025 [main] INFO intro - main ended

Notice how the main function only ends after the suspending function completely terminate (i.e. prints result is 42).

Until now, and based solely on this simple example, all this suspending mechanics may seem a rather complex way to achieve something that could be done using a simple callback. However, the advantage of the coroutine mechanism starts to be apparent when the suspending functions are more than just an unconditional sequence of steps, such as the following example.

suspend fun suspendFunctionWithDelayAndALoopWithConditionalLogic(a: Int, b: Int): Int {
    for(i in 0..3) {
        log.info("step 1 of iteration $i")
        if(i % 2 == 0) {
            delay(1000)
        }
        log.info("step 2 of iteration $i")
    }
    return a + b
}

Accomplishing the same behavior using traditional callbacks would probably result in a much more complex recursive control logic, because the simple for loop construct could not be used anymore.

In this first part we introduced the fundamental concept of suspending function, as well as two core functions:

  • The suspendCoroutine function that is used to suspend a coroutine execution and schedule its future continuation.
  • The startCoroutine extension function that is used to start a coroutine by allowing a suspending function to be called from a regular function.

In the next part, we will show how coroutines are instance of state machines defined by suspending functions: Suspending functions, coroutines and state machines.