The Use and Principle Analysis of Coroutines
1st. Why Use Coroutines
Whether it’s the birth of a technology or a product, it often comes with demand, and coroutines are no exception. When asked why to use coroutines, the first thing to understand is, what is a coroutine? In summary, it is a lightweight, suspendable asynchronous code block that supports structured concurrency, compared to threads. From here, it is not difficult to see that coroutines are very similar to threads, but with two new features: suspendability and structured concurrency. Are there any other differences? If so, why not use threads? To address these questions, we need to start from basic usage and gradually delve deeper into the topic.
2nd. How to Use
When it comes to using coroutines, various example codes may seem fancy, but fundamentally they can be summarized into two aspects:
- Launching
- Thread switching
Let’s start with launching.
2.1 Three Launching Modes
There are three basic launching methods for coroutines:
1 | Kotlin |
These three launching modes have the following characteristics:
- launch : Does not block the current thread, returns a Job object.
- async : Does not block the current thread, returns a Deferred object.
- runBlocking: Blocks the current thread, and the return object type is determined by the last line of code inside the closure. It is generally used only for testing purposes.
Unlike threads, coroutines do not need to explicitly call the start() method (except for the lazy launch mode).
(Note: Because these three implementation modes are essentially not much different, we will use the launch launch mode as an example in the following.)
On this basis, if I want to switch from the current thread to another thread for printing, I can write it like this:
1 | Kotlin |
So for network requests on Android, you can code it directly like this:
1 | Kotlin |
While this resolves the issue of making network requests on a non-main thread, how do we switch back to the main thread when we need to update the UI after the onNext method is called?
2.2 Thread switching
When there is a need to switch back to the main thread, you can do it like this:
1 | Kotlin |
In coroutines, thread switching operations are accomplished using the withContext method, which takes a coroutine dispatcher parameter. The code inside the closure will then be executed on the thread specified by the dispatcher. There are a total of four dispatchers:
- Default - Default Dispatcher
- Main - Main Dispatcher
For different platforms, the main dispatcher has different implementations. For the Android platform, it represents the UI main thread.
- Unconfined
- IO
Overall, it is somewhat similar to RxJava’s subscribeOn and observeOn, but instead of RxJava’s chain-based asynchronous calls, it becomes a closure-based
3rd. Startup Principle
I personally feel that understanding coroutines can often feel confusing, and a big reason for this is being fixated on understanding coroutine suspension and resumption. Therefore, here we will focus on outlining the startup and execution process. Suspension and resumption will be addressed later.
3.1 Pre-requisite Concepts
Before understanding how coroutines are started, it is essential to grasp the following concepts:
- Functions are first-class citizens in Kotlin.
- CPS(continuation passing style
- Operator Overloading
- object keyword
a. Object declaration
b. Object expression
3.1.1 first-class citizens
First-class citizens is a concept in functional programming, which means that in Kotlin, functions can be passed around like any other data type, assigned to variables, passed as parameters to other functions, and returned from functions as results.
3.1.2 CPS
CPS (continuation passing style) is a functional programming style where the core idea is that functions no longer return results to the caller, but instead accept an additional continuation function as a parameter. This continuation function defines the code to be executed after the function completes its execution (this function is called a Continuation).
For example, if we have code that adds two numbers and prints the sum, the implementation is as follows:
1 | Kotlin |
If we refactor it into CPS style, it would be:
1 | Kotlin |
In Kotlin coroutines, methods marked with the suspend keyword are compiled with an added parameter of type Continuation. This parameter is used to callback the result to the caller after execution is complete.
1 | Kotlin |
Continuation is an interface in the Kotlin coroutine package, which represents the top-level abstraction of coroutines. The specific code is as follows. In fact, when we call a coroutine, the final result is ultimately returned through the resumeWith
method.
1 | Kotlin |
3.2 Analysis of Coroutine Launch Process
Analysis of Coroutine Launch Process Using the Launch Method as an Example
1 | Kotlin |
The launch
method is an extension function of CoroutineScope and accepts three parameters in total.
- context: The coroutine context defaults to
EmptyCoroutineContext
. There is no need to pay much attention to this at the moment, as we will analyze it further later. - start : The coroutine launch mode defaults to Default.
- block : The anonymous extension method of CoroutineScope decorated with suspend is also the coroutine body that executes the business logic.
The coroutine is ultimately started by calling coroutine.start(start, coroutine, block). The lambda expression block in the parameters is processed by the compiler during compilation to implement an anonymous inner class that implements the Function2 interface and inherits the SuspendLambda. It can be seen after decompilation.
The inheritance hierarchy of this anonymous inner class is as follows:
Then, returning to the analysis of the startup process, the process of starting the coroutine with coroutine.start can be summarized into three steps:
- Wrap the block parameter of the launch method into a new Continuation object, with the specific type being the same as the anonymous inner class generated during compilation as mentioned above.
a. The purpose of this step is to combine it with the context object and assign the StandaloneCoroutine coroutine object constructed within the launch method to the final generated object. - Construct the coroutine execution task and specify the dispatcher for executing the coroutine task.
- Execute the coroutine task.
3.2.1 Wrap the block to construct a new Continuation object.
The specific implementation of coroutine.start is within the parent class AbstractCoroutine of StandaloneCoroutine.
1 | Kotlin |
The first parameter of the start method is an instance of the CoroutineStart type. The CoroutineStart class is an enum class that identifies the startup mode of the coroutine, which is the second parameter when starting a coroutine and defaults to Default. The CoroutineStart class overloads the invoke operator. In Kotlin, if a class overloads the invoke operator, instances of this class can be called like methods. Let’s take a look at the invoke method of CoroutineStart:
1 | Kotlin |
In the invoke method of CoroutineStart, the coroutine is ultimately started by calling the extension function startCoroutineCancellable on the block parameter. The specific implementation is as follows:
1 | Kotlin |
Inside the startCoroutineCancellable method, the runSafely is a safety check, which can be skipped. The key code is the chained call of three methods, corresponding to the three steps of coroutine startup:
- createCoroutineUnintercepted: Corresponds to the first step, combining the block parameter of launch with the StandaloneCoroutine constructed within the launch method.
- intercepted: Corresponds to the second step, constructing the coroutine execution task and binding the dispatcher.
- resumeCancellableWith: Corresponds to the third step, executing the coroutine task.
Next, let’s take a look at the createCoroutineUnintercepted function, which is annotated with the expect keyword, indicating that it has different implementations on different platforms. On the Android platform, it is implemented in IntrinsicsJvm.kt. Then, let’s see the specific implementation of createCoroutineUnintercepted:
1 | Kotlin |
Here we come to the first key point: what is the return result of this in BaseContinuationImpl? Because this refers to the block parameter in the launch method, according to the inheritance relationship mentioned earlier, the condition holds true.
How is the inheritance relationship mentioned earlier obtained? By examining the Kotlin code. When not decompiled into Java code, you can see:
So, actually, here an anonymous inner class named TestKt$main$1 is generated, which inherits from SuspendLambda and implements the Function2 interface. Then, the inheritance relationship mentioned earlier can be obtained.
Returning to the condition this is BaseContinuationImpl, when we follow the create method in the if branch, we find that it is defined in BaseContinuationImpl, but no default implementation is provided, and an exception is thrown.
1 | Kotlin |
Therefore, the implementation must be in a subclass, which is actually within the anonymous inner class generated earlier. It can be observed that another new object of type Function2 is created and returned within it.
By examining the method return signature and deducing from the bytecode, it is evident that the actual type of this object is exactly the same as the anonymous inner class mentioned earlier. We won’t delve into it further here.
With that, the wrapping of the block parameter is completed.
3.2.2 intercepted - Constructing the coroutine execution task and binding the dispatcher
Then, returning to the intercepted method called within startCoroutineCancellable in Cancellable.kt, this method mainly further wraps the object obtained from createCoroutineUnintercepted(receiver, completion) into a runnable and binds the dispatcher.
1 | Kotlin |
Combining the inheritance relationship mentioned earlier, it is not difficult to ascertain that the intercepted method is defined in ContinuationImpl.
1 | Kotlin |
Here, intercepted is initially empty, so the interceptor is obtained through the coroutine context. This interceptor corresponds to the default dispatcher specified during startup, which is Dispatchers.Default.
(Note: How context[ContinuationInterceptor] retrieves Dispatchers.Default will be skipped for now and addressed later when analyzing coroutine contexts. Currently, the main focus is on the startup process.)
To understand what interceptContinuation does, we first need to look at the creation of Dispatchers.Default.
1 | Kotlin |
DefaultScheduler is a singleton annotated with the object keyword. There is no implementation related to interceptContinuation within it. Therefore, the implementation must be in its superclass. By tracing upwards, the inheritance hierarchy obtained is as follows.
Although interceptContinuation is declared in ContinuationInterceptor, its specific implementation is actually in CoroutineDispatcher.
1 | Kotlin |
In fact, the original anonymous inner class is wrapped into a DispatchedContinuation. Since DispatchedContinuation inherits from DispatchedTask, it also means that the instance of this anonymous inner class is wrapped into a DispatchedTask. Typically, in large libraries, classes named like XxxTask are often implementations of Runnable, which can be used for later task scheduling and more convenient execution by thread pools, or directly calling the run method for execution. This is also the case here. DispatchedTask, when traced upwards, ultimately implements the Runnable interface. Ultimately, the instance called is the DispatchedContinuation.
3.2.3 resumeCancellableWith executes the coroutine task.
Then, once again back to the startCoroutineCancellable method in Canceable.kt, and look at the final resumeCancellableWith.
1 | Kotlin |
Here, it directly matches is DispatchedContinuation. The resumeCancellableWith method is implemented in CoroutineDispatcher, which is in the inheritance chain where DispatchedContinuation was just constructed.
1 | Kotlin |
The dispatcher parameter passed into the construction of DispatchedContinuation in CoroutineDispatcher corresponds to the current coroutine dispatcher, which, for this analysis process, corresponds to Dispatcher.Default.
Then, the default return of dispatcher.isDispatchNeeded(context) in CoroutineDispatcher is true, unless overridden by Dispatchers.Unconfined, which sets it to false. Therefore, task scheduling is accomplished through dispatcher.dispatch(context, this).
The dispatch method is defined in the CoroutineDispatcher interface and has various implementations.
Considering the previous analysis of the dispatcher inheritance chain, it is evident that for this analysis process, the specific implementation of the dispatch method refers to the implementation in ExperimentalCoroutineDispatcher.
1 | Kotlin |
You can see that the previously constructed DispatchedContinuation is passed as a Runnable to the member coroutineScheduler using the dispatch method.
CoroutineScheduler is a custom thread pool. The dispatch function is as follows:
Kotlin
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
// debug
trackTask() // this is needed for virtual time support
// Step 1: Wrap the block to be executed again, timestamp it to facilitate awakening calls after operations such as sleep.
val task = createTask(block, taskContext)
// Step 2: Obtain the current worker, which returns null in this case.
val currentWorker = currentWorker() // The return type is Worker.
// Step 3: Submit the task wrapped in step 1 to the pending execution queue.
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
// tailDispatch is the third parameter of the method, with a default value of false.
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
// Step 4: Schedule the worker to execute the task.
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
Mainly four things are done: Step 1: Wrap the block to be executed again, timestamp it to facilitate awakening calls after operations such as sleep. Step 2: Obtain the current worker, which returns null. The return type of currentWorker() is Worker. Step 3: Submit the task wrapped in step 1 to the pending execution queue. Here, Kotlin’s syntactic sugar is used because the worker is null, so the task will not actually be executed. Step 4: Execute signalCpuWork(), scheduling the worker to execute the task.
In the second step above, the Worker attempted to be obtained is an inner class of CoroutineScheduler, which inherits from Thread. Therefore, it can be used as a thread.
1 |
|
So, when the Worker is used as a thread and started with the start method, the run method will naturally be invoked. This will then call the internal runWorker method, which loops to retrieve tasks from the queue for execution, and then executes the executeTask method.
The executeTask method further calls runSafely of CoroutineScheduler, ultimately invoking task.run() to start and invoke the coroutine.
The task is the DispatchedContinuation previously wrapped, whose superclass DispatchedTask overrides the run method.
1 | Kotlin |
The implementation of continuation.resume is in BaseContinuationImpl.
1 | Kotlin |
So, the question now becomes how the Worker thread is started.
Returning to the analysis of the CoroutineScheduler#dispatch() method, the call to the signalCpuWork() method in the fourth step has not been elaborated on.
1 | Kotlin |
The main logic is actually in the signalCpuWork method:
- If the worker has already been created, it will be awakened using LockSupport.unpark(), and the task execution will begin.
- Otherwise, a new thread, the Worker thread, will be created to execute the task.
So, the coroutine startup process can be summarized in three steps:
4st. The difference between launch and async:
If we compare coroutines to threads, launch is similar to using Runnable, while async is akin to using FutureTask. Both are asynchronous mechanisms, but Runnable doesn’t return a value, whereas FutureTask can return the result of thread execution by calling the call method, which blocks the current thread.
In coroutines, launch returns a Job object which cannot directly retrieve the execution result, while async returns a Deferred object. By calling the await method on the Deferred object, you can obtain the coroutine execution result, but it also blocks the current thread.
Moreover, Deferred inherits from Job, indicating that Deferred merely extends the functionality of Job. In the source code, this is reflected by launch constructing a StandaloneCoroutine object, while async constructs a DeferredCoroutine object.
1 | Kotlin |
5st. Launch Mode
The coroutine’s launch mode is primarily reflected in the second parameter of the launch method, which is the CoroutineStart enum. CoroutineStart enum represents several different launch modes, including DEFAULT, LAZY, ATOMIC, and UNDISPATCHED.
These different launch modes determine the branching behavior after invoking CoroutineStart.invoke.
The specific differences are as follows:
- DEFAULT :This is the default launch mode. The coroutine starts immediately and executes the code in its body right away. This is the most common way of starting a coroutine.
- ATOMIC: In atomic launch mode, the coroutine starts immediately and executes the code in its body right away, similar to DEFAULT mode. However, it automatically marks the coroutine as atomic, allowing other coroutines to cancel or wait for it using cancelAndJoin() or join().
- UNDISPATCHED: This mode does not involve scheduling. The coroutine starts immediately but is not scheduled on any dispatcher. Instead, it executes in the current thread. This mode is useful for performing lightweight operations within the coroutine body, avoiding the overhead of thread switching.
- LAZY: This is the lazy launch mode. The coroutine is started lazily, meaning it starts execution only when the Job.start() function is called or when waiting for the Job.await() result. This mode is primarily used to start the coroutine when needed to avoid unnecessary overhead.