Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal API to retreive a single CPU-bound task from Dispatchers.Default #3439

Closed
qwwdfsad opened this issue Sep 5, 2022 · 6 comments
Closed
Assignees

Comments

@qwwdfsad
Copy link
Contributor

qwwdfsad commented Sep 5, 2022

IntelliJ IDEA platform has the following core API:

// In a blocking, non-suspendable world that is executed in ForkJoinPool

decompose() {
    // here happens decomposition of the task and .join()
}

Such API is composable in a way that it is possible to call decompose recursively from within subtasks.

It may be tempting to rewrite it with coroutines in a following manner:

fun decompose() = runBlocking {
    for (task in decomposition) {
        launch(Dispatchers.Default) { task() }
    }
}

The problem is -- this code is prone to both starvation and deadlocks -- each call to decompose from Dispatchers.Default occupies at least one thread in the backing executor for joinBlocking(), meaning that eventually, such code may deadlock.

It is not the case for the FJP-based solution, because .join on any FJP task not only awaits for the task to complete but also helps to execute subtasks if the joining thread is part of the target's FJP.

It doesn't seem that we can provide a full-blown FJP-like API out-of-the-box, but as the first steps we can provide an internal TBD API getSingleDispatchersDefaultTaskOrNull. With such API, it is possible to emulate decompose API and, after initial evaluation by the IDEA team, we may want to promote it to @Delicate API as one more way to bridge blocking calls with coroutine dispatchers.

@dovchinnikov
Copy link
Contributor

FTR another solution was considered: let runBlocking release the thread into CPU-pool before joinBlocking, so that the blocked thread would be considered as IO-bound, and Dispatchers.Default could use another thread to run submitted CPU tasks. But this will lead to starvation because IO pool is limited, or this will lead to OutOfMemoryError if set to effectively-unlimited via kotlinx.coroutines.io.parallelism. To be able to progress, the thread has to execute dispatched tasks while being blocked, like FJP does.

@dovchinnikov
Copy link
Contributor

There is still a problem with thread-locals. We can manage our own thread-locals and reset them before running the getSingleDispatchersDefaultTaskOrNull task in the current thread, and it would be great to have a handle or a function to clear ThreadContextElement stuff, something like

suspend fun <X> resetThreadContext(/*non-suspending*/ action: () -> X): X {
  restoreThreadContext()
  try {
    return action()
  }
  finally {
    updateThreadContext() // install thread context back
  }
}

runBlocking {
  val pumpQueueJob = launch {
    while (true) {
      val task = getSingleDispatchersDefaultTaskOrNull()
      if (task != null) {
        resetThreadContext { // clean context
          task.run()
        }
      }
      yield()
    }
  }
  try {
    decompose() 
  }
  finally {
    pumpQueueJob.cancel()
  }
}

@qwwdfsad
Copy link
Contributor Author

qwwdfsad commented Sep 22, 2022

I would love to know how we can implement non-transparent restoreThreadContext.

AFAIR there is no API in Java to clean up and later recover all thread locals.
But, given that we know the current ThreadContextElement (e.g. if it works for you to pass it as an argument or because function itself is suspend), we can probably provide an API for isolating these specific elements.

@dovchinnikov
Copy link
Contributor

dovchinnikov commented Nov 8, 2022

We've recently had a starved thread situation after incorrect migration to coroutines in IJ (https://youtrack.jetbrains.com/issue/IDEA-302358), but it was using Dispatchers.IO.limitedParallelism(xx).
Thus I'd like to update the request to:

  1. API to retrieve a task from a dispatcher
  2. API to determine the current dispatcher by the current CoroutineScheduler.Worker thread

We are interested in Dispatchers.Default, Dispatchers.IO and limitedParallelism on any of them.

@dovchinnikov
Copy link
Contributor

dovchinnikov commented Nov 8, 2022

Another possible approach is to provide a single-function API with the semantics like this:

// NB return type is intentionally Nothing
public suspend fun processEventsInCurrentThread(): Nothing {
    assert {
        ThreadLocalEventLoop.currentOrNull() != null // only allowed inside runBlocking
    }
    val thread = Thread.currentThread()
    if (thread !is CoroutineScheduler.Worker) {
        awaitCancellation()
    }
    while (true) {
        // TASK_NON_BLOCKING for CPU-bound thread or TASK_PROBABLY_BLOCKING in IO-bound thread
        val delayTime = thread.executeSingleTaskInTheCurrentMode()
        if (delayTime >= 0L) {
            delay(delayTime)
        } else {
            yield()
        }
    }
}

@dovchinnikov
Copy link
Contributor

Attaching unfinished attempt to implement this as per request in a private conversation
3439.patch.zip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants