Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected behavior WhileSubscribed #2863

Closed
0x7e57 opened this issue Aug 9, 2021 · 1 comment
Closed

Unexpected behavior WhileSubscribed #2863

0x7e57 opened this issue Aug 9, 2021 · 1 comment

Comments

@0x7e57
Copy link

0x7e57 commented Aug 9, 2021

From the documentation:

stopTimeout - configures a delay between the disappearance of the last subscriber and the stopping of the sharing coroutine.

However, stopTimeout is not refreshed on new subscriptions. I also asked this question here.
Another ephemeral example on Android:

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        val viewBinding = FragmentHomeBinding.bind(view)

        val instantFlow = flow {
            while (true) {
                emit(Instant.now())
                delay(Duration.ofMinutes(1).toMillis())
            }
        }
            .onStart { Log.d(TAG, "start") }
            .onCompletion { Log.d(TAG, "completion") }
            .shareIn(
                lifecycleScope,
                SharingStarted.WhileSubscribed(
                    Duration.ofSeconds(10).toMillis(),
                    Duration.ZERO.toMillis(),
                ),
                1,
            )
        val eventFlow = MutableSharedFlow<Unit>(
            0,
            Int.MAX_VALUE,
            BufferOverflow.SUSPEND,
        )
        viewBinding.button.setOnClickListener {
            check(eventFlow.tryEmit(Unit))
        }
        eventFlow
            .onEach { Log.d(TAG, "$it ${instantFlow.first()}") }
            .launchIn(lifecycleScope)
    }

Each click on the button should delay the completion of instantFlow, but this does not happen, the flow will end after a time equal to stopTimeout, ignoring the subscriptions that occur in eventFlow.

2021-08-09 17:32:06.835 : start
2021-08-09 17:32:06.839 : kotlin.Unit 2021-08-09T14:32:06.835Z
2021-08-09 17:32:08.411 : kotlin.Unit 2021-08-09T14:32:06.835Z
2021-08-09 17:32:09.256 : kotlin.Unit 2021-08-09T14:32:06.835Z
2021-08-09 17:32:10.978 : kotlin.Unit 2021-08-09T14:32:06.835Z
2021-08-09 17:32:12.119 : kotlin.Unit 2021-08-09T14:32:06.835Z
2021-08-09 17:32:13.964 : kotlin.Unit 2021-08-09T14:32:06.835Z
2021-08-09 17:32:16.862 : completion

I have little experience with kotlin coroutines and I may be doing something wrong. Is this the expected behavior?

@qwwdfsad
Copy link
Contributor

Thanks for the report! The issue seems to have the same root cause as #2488
We'll see how we can address it.

Self-contained reproducer:

// runBlocking ctx
val instantFlow = flow {
    while (true) {
        emit(Instant.now())
        delay(Duration.ofMinutes(1).toMillis())
    }
}.onStart { println("Started") }
    .onCompletion { error("Should not be the case") }
    .shareIn(
        this,
        SharingStarted.WhileSubscribed(
            Duration.ofSeconds(5).toMillis(),
            Duration.ZERO.toMillis(),
        ),
        1,
    )
val eventFlow = flow {
    while (true) {
        emit(1)
        delay(1000)
    }
}
eventFlow
    .onEach { println("Hehe: " + instantFlow.first()) }
    .launchIn(this)
``

qwwdfsad added a commit that referenced this issue Aug 11, 2021
Sharing strategies are too sensitive to conflation around extrema and may miss the necessity to start or not to stop the sharing. For more particular examples see #2863 and #2488

Fixes #2488
Fixes #2863
Fixes #2871
@qwwdfsad qwwdfsad added the bug label Aug 12, 2021
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
…#2872)

* Non-conflating subscription count in SharedFlow and StateFlow

Sharing strategies are too sensitive to conflation around extrema and may miss the necessity to start or not to stop the sharing. For more particular examples see Kotlin#2863 and Kotlin#2488

Fixes Kotlin#2488
Fixes Kotlin#2863
Fixes Kotlin#2871
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants