The Use and Principle Analysis of Coroutines

1st. Why Use Coroutines

Whether it’s the birth of a technology or a product, it often comes with demand, and coroutines are no exception. When asked why to use coroutines, the first thing to understand is, what is a coroutine? In summary, it is a lightweight, suspendable asynchronous code block that supports structured concurrency, compared to threads. From here, it is not difficult to see that coroutines are very similar to threads, but with two new features: suspendability and structured concurrency. Are there any other differences? If so, why not use threads? To address these questions, we need to start from basic usage and gradually delve deeper into the topic.

2nd. How to Use

When it comes to using coroutines, various example codes may seem fancy, but fundamentally they can be summarized into two aspects:

  • Launching
  • Thread switching
    Let’s start with launching.

2.1 Three Launching Modes

There are three basic launching methods for coroutines:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Kotlin
fun main() {
// launching mode 1
GlobalScope.launch {

}

// launching mode 2
GlobalScope.async {

}

// launching mehtod 3
runBlocking {

}
}

These three launching modes have the following characteristics:

  1. launch : Does not block the current thread, returns a Job object.
  2. async : Does not block the current thread, returns a Deferred object.
  3. runBlocking: Blocks the current thread, and the return object type is determined by the last line of code inside the closure. It is generally used only for testing purposes.

Unlike threads, coroutines do not need to explicitly call the start() method (except for the lazy launch mode).
(Note: Because these three implementation modes are essentially not much different, we will use the launch launch mode as an example in the following.)
On this basis, if I want to switch from the current thread to another thread for printing, I can write it like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
Kotlin
fun main() {
println("ThreadName : [${Thread.currentThread().name}]")
GlobalScope.launch {
println("launch ThreadName : [${Thread.currentThread().name}]")
}

Thread.sleep(1_000L)
}

// execute result
ThreadName : [main]
launch ThreadName : [DefaultDispatcher-worker-1]

So for network requests on Android, you can code it directly like this:

1
2
3
4
5
6
7
8
Kotlin
GlobalScope.launch {
api.getResut(object:ApiCallback {
override fun onNext(bean:Bean) {

}
})
}

While this resolves the issue of making network requests on a non-main thread, how do we switch back to the main thread when we need to update the UI after the onNext method is called?

2.2 Thread switching

When there is a need to switch back to the main thread, you can do it like this:

1
2
3
4
5
6
7
8
9
10
11
Kotlin
GlobalScope.launch {
api.getResut(object : ApiCallback {
override fun onNext(bean: Bean) {
// Switch Back to Main Thread
withContext(Dispatchers.Main) {
updateUI()
}
}
})
}

In coroutines, thread switching operations are accomplished using the withContext method, which takes a coroutine dispatcher parameter. The code inside the closure will then be executed on the thread specified by the dispatcher. There are a total of four dispatchers:

  • Default - Default Dispatcher
  • Main - Main Dispatcher

For different platforms, the main dispatcher has different implementations. For the Android platform, it represents the UI main thread.

  • Unconfined
  • IO

Overall, it is somewhat similar to RxJava’s subscribeOn and observeOn, but instead of RxJava’s chain-based asynchronous calls, it becomes a closure-based

3rd. Startup Principle

I personally feel that understanding coroutines can often feel confusing, and a big reason for this is being fixated on understanding coroutine suspension and resumption. Therefore, here we will focus on outlining the startup and execution process. Suspension and resumption will be addressed later.

3.1 Pre-requisite Concepts

Before understanding how coroutines are started, it is essential to grasp the following concepts:

  1. Functions are first-class citizens in Kotlin.
  2. CPS(continuation passing style
  3. Operator Overloading
  • object keyword
    a. Object declaration
    b. Object expression

3.1.1 first-class citizens

First-class citizens is a concept in functional programming, which means that in Kotlin, functions can be passed around like any other data type, assigned to variables, passed as parameters to other functions, and returned from functions as results.

3.1.2 CPS

CPS (continuation passing style) is a functional programming style where the core idea is that functions no longer return results to the caller, but instead accept an additional continuation function as a parameter. This continuation function defines the code to be executed after the function completes its execution (this function is called a Continuation).
For example, if we have code that adds two numbers and prints the sum, the implementation is as follows:

1
2
3
4
5
6
7
8
Kotlin
fun plus(a: Int, b: Int): Int {
return a + b
}

fun main() {
println(plus(1, 2))
}

If we refactor it into CPS style, it would be:

1
2
3
4
5
6
7
8
9
10
Kotlin
fun plus(a: Int, b: Int, block: (Int) -> Unit) {
block(a + b)
}

fun main() {
plus(1, 2) {
println(it)
}
}

In Kotlin coroutines, methods marked with the suspend keyword are compiled with an added parameter of type Continuation. This parameter is used to callback the result to the caller after execution is complete.

1
2
3
4
5
6
7
8
9
10
Kotlin
// kotlin code
suspend fun plus() {

}

// After decompiling into Java code, it would be:
public static final Object plus(@NotNull Continuation $completion) {
return Unit.INSTANCE;
}

Continuation is an interface in the Kotlin coroutine package, which represents the top-level abstraction of coroutines. The specific code is as follows. In fact, when we call a coroutine, the final result is ultimately returned through the resumeWith method.

1
2
3
4
5
6
Kotlin
public interface Continuation<in T> {
public val context: CoroutineContext

public fun resumeWith(result: Result<T>)
}

3.2 Analysis of Coroutine Launch Process

Analysis of Coroutine Launch Process Using the Launch Method as an Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Kotlin
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// Here, returning `Dispatchers.Default`, non-main threads don't need to pay much attention.
val newContext = newCoroutineContext(context)
// Because `start` defaults to `CoroutineStart.DEFAULT`, `StandaloneCoroutine` is returned here.
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
// laucnch Coroutine
coroutine.start(start, coroutine, block)
return coroutine
}

The launch method is an extension function of CoroutineScope and accepts three parameters in total.

  • context: The coroutine context defaults to EmptyCoroutineContext. There is no need to pay much attention to this at the moment, as we will analyze it further later.
  • start : The coroutine launch mode defaults to Default.
  • block : The anonymous extension method of CoroutineScope decorated with suspend is also the coroutine body that executes the business logic.
    The coroutine is ultimately started by calling coroutine.start(start, coroutine, block). The lambda expression block in the parameters is processed by the compiler during compilation to implement an anonymous inner class that implements the Function2 interface and inherits the SuspendLambda. It can be seen after decompilation.
    1
    The inheritance hierarchy of this anonymous inner class is as follows:
    2
    Then, returning to the analysis of the startup process, the process of starting the coroutine with coroutine.start can be summarized into three steps:
  1. Wrap the block parameter of the launch method into a new Continuation object, with the specific type being the same as the anonymous inner class generated during compilation as mentioned above.
    a. The purpose of this step is to combine it with the context object and assign the StandaloneCoroutine coroutine object constructed within the launch method to the final generated object.
  2. Construct the coroutine execution task and specify the dispatcher for executing the coroutine task.
  3. Execute the coroutine task.

3.2.1 Wrap the block to construct a new Continuation object.

The specific implementation of coroutine.start is within the parent class AbstractCoroutine of StandaloneCoroutine.

1
2
3
4
5
Kotlin
// AbstractCoroutine.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}

The first parameter of the start method is an instance of the CoroutineStart type. The CoroutineStart class is an enum class that identifies the startup mode of the coroutine, which is the second parameter when starting a coroutine and defaults to Default. The CoroutineStart class overloads the invoke operator. In Kotlin, if a class overloads the invoke operator, instances of this class can be called like methods. Let’s take a look at the invoke method of CoroutineStart:

1
2
3
4
5
6
7
8
9
10
Kotlin
// The invoke method of CoroutineStart
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
// By default, it starts with the DEFAULT mode, and this check is triggered.
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}

In the invoke method of CoroutineStart, the coroutine is ultimately started by calling the extension function startCoroutineCancellable on the block parameter. The specific implementation is as follows:

1
2
3
4
5
6
7
8
9
10
Kotlin
// Canceable.kt
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
// Null Check
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}

Inside the startCoroutineCancellable method, the runSafely is a safety check, which can be skipped. The key code is the chained call of three methods, corresponding to the three steps of coroutine startup:

  • createCoroutineUnintercepted: Corresponds to the first step, combining the block parameter of launch with the StandaloneCoroutine constructed within the launch method.
  • intercepted: Corresponds to the second step, constructing the coroutine execution task and binding the dispatcher.
  • resumeCancellableWith: Corresponds to the third step, executing the coroutine task.

Next, let’s take a look at the createCoroutineUnintercepted function, which is annotated with the expect keyword, indicating that it has different implementations on different platforms. On the Android platform, it is implemented in IntrinsicsJvm.kt. Then, let’s see the specific implementation of createCoroutineUnintercepted:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Kotlin
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}

Here we come to the first key point: what is the return result of this in BaseContinuationImpl? Because this refers to the block parameter in the launch method, according to the inheritance relationship mentioned earlier, the condition holds true.
How is the inheritance relationship mentioned earlier obtained? By examining the Kotlin code. When not decompiled into Java code, you can see:
3

So, actually, here an anonymous inner class named TestKt$main$1 is generated, which inherits from SuspendLambda and implements the Function2 interface. Then, the inheritance relationship mentioned earlier can be obtained.
Returning to the condition this is BaseContinuationImpl, when we follow the create method in the if branch, we find that it is defined in BaseContinuationImpl, but no default implementation is provided, and an exception is thrown.

1
2
3
4
Kotlin
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}

Therefore, the implementation must be in a subclass, which is actually within the anonymous inner class generated earlier. It can be observed that another new object of type Function2 is created and returned within it.
By examining the method return signature and deducing from the bytecode, it is evident that the actual type of this object is exactly the same as the anonymous inner class mentioned earlier. We won’t delve into it further here.
With that, the wrapping of the block parameter is completed.

3.2.2 intercepted - Constructing the coroutine execution task and binding the dispatcher

Then, returning to the intercepted method called within startCoroutineCancellable in Cancellable.kt, this method mainly further wraps the object obtained from createCoroutineUnintercepted(receiver, completion) into a runnable and binds the dispatcher.

1
2
3
4
5
6
7
8
9
Kotlin
// Canceable.kt
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}

Combining the inheritance relationship mentioned earlier, it is not difficult to ascertain that the intercepted method is defined in ContinuationImpl.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Kotlin
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// ...Omitted irrelevant code

}

Here, intercepted is initially empty, so the interceptor is obtained through the coroutine context. This interceptor corresponds to the default dispatcher specified during startup, which is Dispatchers.Default.
(Note: How context[ContinuationInterceptor] retrieves Dispatchers.Default will be skipped for now and addressed later when analyzing coroutine contexts. Currently, the main focus is on the startup process.)
To understand what interceptContinuation does, we first need to look at the creation of Dispatchers.Default.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Kotlin
public actual object Dispatchers {

@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
// ...Omitted irrelevant code
}

// CoroutineContext.kt
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
// ...Omitted irrelevant code
}

DefaultScheduler is a singleton annotated with the object keyword. There is no implementation related to interceptContinuation within it. Therefore, the implementation must be in its superclass. By tracing upwards, the inheritance hierarchy obtained is as follows.

4

Although interceptContinuation is declared in ContinuationInterceptor, its specific implementation is actually in CoroutineDispatcher.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Kotlin
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}

// The wrapped object obtained from createCoroutineUnintercepted(receiver, completion) is wrapped into an instance of type DispatchedContinuation here.


internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
// ...
}

In fact, the original anonymous inner class is wrapped into a DispatchedContinuation. Since DispatchedContinuation inherits from DispatchedTask, it also means that the instance of this anonymous inner class is wrapped into a DispatchedTask. Typically, in large libraries, classes named like XxxTask are often implementations of Runnable, which can be used for later task scheduling and more convenient execution by thread pools, or directly calling the run method for execution. This is also the case here. DispatchedTask, when traced upwards, ultimately implements the Runnable interface. Ultimately, the instance called is the DispatchedContinuation.

3.2.3 resumeCancellableWith executes the coroutine task.

Then, once again back to the startCoroutineCancellable method in Canceable.kt, and look at the final resumeCancellableWith.

1
2
3
4
5
6
7
8
Kotlin
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}

Here, it directly matches is DispatchedContinuation. The resumeCancellableWith method is implemented in CoroutineDispatcher, which is in the inheritance chain where DispatchedContinuation was just constructed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Kotlin
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
// The default implementation of isDispatchNeeded is true unless the dispatcher is specified as Dispatchers.Unconfined.
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
}

The dispatcher parameter passed into the construction of DispatchedContinuation in CoroutineDispatcher corresponds to the current coroutine dispatcher, which, for this analysis process, corresponds to Dispatcher.Default.
Then, the default return of dispatcher.isDispatchNeeded(context) in CoroutineDispatcher is true, unless overridden by Dispatchers.Unconfined, which sets it to false. Therefore, task scheduling is accomplished through dispatcher.dispatch(context, this).
The dispatch method is defined in the CoroutineDispatcher interface and has various implementations.
5

Considering the previous analysis of the dispatcher inheritance chain, it is evident that for this analysis process, the specific implementation of the dispatch method refers to the implementation in ExperimentalCoroutineDispatcher.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Kotlin
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler

// Creating a task scheduler.
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}
// The final type of the scheduler is CoroutineScheduler
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}

You can see that the previously constructed DispatchedContinuation is passed as a Runnable to the member coroutineScheduler using the dispatch method.
CoroutineScheduler is a custom thread pool. The dispatch function is as follows:

Kotlin
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
// debug
trackTask() // this is needed for virtual time support

// Step 1: Wrap the block to be executed again, timestamp it to facilitate awakening calls after operations such as sleep.
val task = createTask(block, taskContext)
// Step 2: Obtain the current worker, which returns null in this case.
val currentWorker = currentWorker() // The return type is Worker.
// Step 3: Submit the task wrapped in step 1 to the pending execution queue.
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
    if (!addToGlobalQueue(notAdded)) {
        // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
        throw RejectedExecutionException("$schedulerName was terminated")
    }
}
// tailDispatch is the third parameter of the method, with a default value of false.
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
    if (skipUnpark) return
    // Step 4: Schedule the worker to execute the task.
    signalCpuWork()
} else {
    // Increment blocking tasks anyway
    signalBlockingWork(skipUnpark = skipUnpark)
}

}
Mainly four things are done: Step 1: Wrap the block to be executed again, timestamp it to facilitate awakening calls after operations such as sleep. Step 2: Obtain the current worker, which returns null. The return type of currentWorker() is Worker. Step 3: Submit the task wrapped in step 1 to the pending execution queue. Here, Kotlin’s syntactic sugar is used because the worker is null, so the task will not actually be executed. Step 4: Execute signalCpuWork(), scheduling the worker to execute the task.
In the second step above, the Worker attempted to be obtained is an inner class of CoroutineScheduler, which inherits from Thread. Therefore, it can be used as a thread.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

Kotlin
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
internal inner class Worker private constructor() : Thread() {
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
// Loop to retrieve pending tasks for execution.
val task = findTask(mayHaveLocalTasks)
// Task found. Execute and repeat
if (task != null) {
// execute task
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}

// ... (Code omitted)
}
}
private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
// The actual logic for executing the task.
runSafely(task)
afterTask(taskMode)
}
}
fun runSafely(task: Task) {
try {
// Here, the task is the one originally wrapped. DispatchedContinuation
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
unTrackTask()
}
}
}

So, when the Worker is used as a thread and started with the start method, the run method will naturally be invoked. This will then call the internal runWorker method, which loops to retrieve tasks from the queue for execution, and then executes the executeTask method.
The executeTask method further calls runSafely of CoroutineScheduler, ultimately invoking task.run() to start and invoke the coroutine.
The task is the DispatchedContinuation previously wrapped, whose superclass DispatchedTask overrides the run method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Kotlin
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {

val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)

val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}

The implementation of continuation.resume is in BaseContinuationImpl.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Kotlin
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {

public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {

with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// Calling the invokeSuspend function on the generated Function2.
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}

releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}

// The implementation is delegated to the generated Function2.
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

So, the question now becomes how the Worker thread is started.
Returning to the analysis of the CoroutineScheduler#dispatch() method, the call to the signalCpuWork() method in the fourth step has not been elaborated on.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Kotlin
// CoroutineScheduler.kt
fun signalCpuWork() {
if (tryUnpark()) return
// A worker is created.
if (tryCreateWorker()) return
tryUnpark()
}

private fun tryCreateWorker(state: Long = controlState.value): Boolean {
val created = createdWorkers(state)
val blocking = blockingTasks(state)
val cpuWorkers = (created - blocking).coerceAtLeast(0)

if (cpuWorkers < corePoolSize) {
val newCpuWorkers = createNewWorker()

// A Worker is created.
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
if (newCpuWorkers > 0) return true
}
return false
}

private fun createNewWorker(): Int {
synchronized(workers) {
// ... (Code omitted)
val worker = Worker(newIndex)
workers[newIndex] = worker
require(newIndex == incrementCreatedWorkers())
// Worker starts!
worker.start()
return cpuWorkers + 1
}
}

The main logic is actually in the signalCpuWork method:

  • If the worker has already been created, it will be awakened using LockSupport.unpark(), and the task execution will begin.
  • Otherwise, a new thread, the Worker thread, will be created to execute the task.
    So, the coroutine startup process can be summarized in three steps:
    6

4st. The difference between launch and async:

If we compare coroutines to threads, launch is similar to using Runnable, while async is akin to using FutureTask. Both are asynchronous mechanisms, but Runnable doesn’t return a value, whereas FutureTask can return the result of thread execution by calling the call method, which blocks the current thread.
In coroutines, launch returns a Job object which cannot directly retrieve the execution result, while async returns a Deferred object. By calling the await method on the Deferred object, you can obtain the coroutine execution result, but it also blocks the current thread.
Moreover, Deferred inherits from Job, indicating that Deferred merely extends the functionality of Job. In the source code, this is reflected by launch constructing a StandaloneCoroutine object, while async constructs a DeferredCoroutine object.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Kotlin
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

5st. Launch Mode

The coroutine’s launch mode is primarily reflected in the second parameter of the launch method, which is the CoroutineStart enum. CoroutineStart enum represents several different launch modes, including DEFAULT, LAZY, ATOMIC, and UNDISPATCHED.
These different launch modes determine the branching behavior after invoking CoroutineStart.invoke.
7

The specific differences are as follows:

  1. DEFAULT :This is the default launch mode. The coroutine starts immediately and executes the code in its body right away. This is the most common way of starting a coroutine.
  2. ATOMIC: In atomic launch mode, the coroutine starts immediately and executes the code in its body right away, similar to DEFAULT mode. However, it automatically marks the coroutine as atomic, allowing other coroutines to cancel or wait for it using cancelAndJoin() or join().
  3. UNDISPATCHED: This mode does not involve scheduling. The coroutine starts immediately but is not scheduled on any dispatcher. Instead, it executes in the current thread. This mode is useful for performing lightweight operations within the coroutine body, avoiding the overhead of thread switching.
  4. LAZY: This is the lazy launch mode. The coroutine is started lazily, meaning it starts execution only when the Job.start() function is called or when waiting for the Job.await() result. This mode is primarily used to start the coroutine when needed to avoid unnecessary overhead.