-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow converting CoroutineDispatcher to RxJava scheduler #968
Comments
Here is a sample implementation: class DispatcherBackedScheduler(private val dispatcher: CoroutineDispatcher) : Scheduler() {
override fun createWorker(): Worker = DispatcherBackedWorker(dispatcher)
private class DispatcherBackedWorker(private val dispatcher: CoroutineDispatcher) : Worker(), CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext: CoroutineContext = EmptyCoroutineContext + job
override fun isDisposed(): Boolean = !job.isActive
override fun dispose() = job.cancel()
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable =
if (delay <= 0) {
dispatcher.dispatch(coroutineContext, run)
Disposables.disposed()
} else {
launch(dispatcher) {
kotlinx.coroutines.delay(unit.toMillis(delay))
run.run()
}.asDisposable()
}
}
}
private fun Job.asDisposable(): Disposable = object : Disposable {
override fun isDisposed(): Boolean = !isActive
override fun dispose() = cancel()
} |
Should private fun Job.asDisposable(): Disposable = object : Disposable {
override fun isDisposed(): Boolean = !isActive
override fun dispose() = cancel()
}
fun CoroutineScope.asScheduler(): Scheduler {
return ScopeBackedScheduler(this)
}
class ScopeBackedScheduler(private val scope: CoroutineScope) : Scheduler() {
override fun createWorker(): Worker = ScopeBackedWorker(scope)
private class ScopeBackedWorker(private val scope: CoroutineScope) : Worker() {
override fun isDisposed(): Boolean = !scope.isActive
override fun dispose() = scope.cancel()
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (isDisposed) {
return Disposables.disposed()
}
val job = scope.launch {
if (delay > 0) {
delay(unit.toMillis(delay))
}
run.run()
}
return job.asDisposable()
}
}
} I just want the lifecycle of class MainActivity : AppCompatActivity(), CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
private lateinit var job: Job
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
RxView.onClick(btnABC).
.observeOn(this.asScheduler()) // The stream will be completed when the `job.cancel()` was called in `onDestroy()`
.subscribe(...)
}
override fun onDestroy() {
job.cancel()
super.onDestroy()
}
} |
The kotlinx-coroutines-rx2 is interesting. And it expand some excellent feature (new Backpressure strategy), so we can use it directly without cusstom a CoroutineDispatcherScheduler! But keep in mind on the production env. |
@Guang1234567 Your implementation of Also, you never dispose your click observer. |
Thanks your reply. I just fix the problem under your suggestion like: fun CoroutineScope.asScheduler(): Scheduler {
return ScopeBackedScheduler(this)
}
class ScopeBackedScheduler(private val scope: CoroutineScope) : Scheduler() {
override fun createWorker(): Worker = ScopeBackedWorker(scope)
private class ScopeBackedWorker(private val scope: CoroutineScope) : Worker() {
private val job = SupervisorJob(scope.coroutineContext[Job])
override fun isDisposed(): Boolean = !job.isActive
override fun dispose() = job.cancel()
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (isDisposed) {
return Disposables.disposed()
}
return scope.launch(job) {
if (delay > 0) {
delay(unit.toMillis(delay))
}
run.run()
}.asDisposable()
}
}
} It solve several problems:
|
Are there any intentions to get this finished? |
I'm happy to provide a pull request as long as someone from the Coroutines team approves the idea |
There is a PR #1923 in review, close to being ready to merge. |
Currently there is a way to create a
CoroutineDispatcher
from aScheduler
(RxJava) which is great but it would also be convenient to be able to do the reverse. RxJava allows you to set your own schedulers in theSchedulers
singleton meaning it could be overridden for the whole system.The text was updated successfully, but these errors were encountered: