Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common): bytes::Buf wrapper that notifies subscribers on EOS #6

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

tomkarw
Copy link
Contributor

@tomkarw tomkarw commented Aug 19, 2022

@tomkarw tomkarw force-pushed the tomkarw/add-buffer-wrapper-with-notify-on-eos branch 2 times, most recently from 70f5ae4 to 0ec027e Compare August 19, 2022 19:50
@tomkarw
Copy link
Contributor Author

tomkarw commented Aug 19, 2022

I wanted to add tests, but I have issues (again) with advancing futures in tests.

When I debug the test, I can see that notify_eos (and thus Notify::notify_waiters) gets called, but Notify::notified() future just returns Poll:;Pending forever o.0

I did not understand the behavior of Notify::notify_waiters. Switched to Notify::notify_one, but had to remove Clone from EosSignaler in the process. This should be revisited once Notify::notify_all stabilizes.

@tomkarw tomkarw force-pushed the tomkarw/add-buffer-wrapper-with-notify-on-eos branch from 0ec027e to c004653 Compare August 19, 2022 20:13
@tomkarw
Copy link
Contributor Author

tomkarw commented Aug 20, 2022

r? @seanmonstar (I wish that worked 😄)

Copy link
Member

@seanmonstar seanmonstar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that some may want this util on it's own. I think more importantly though, we should keep the end goal in mind, and "work backwards". That is to say, I suspect most often people will want a channel where they can do tx.flush().await. We don't need the full impl to start with, but a full public API design/proposal would help us see what the user would write.

src/common/buf.rs Outdated Show resolved Hide resolved
@tomkarw tomkarw force-pushed the tomkarw/add-buffer-wrapper-with-notify-on-eos branch from fd61c49 to 9150c0f Compare August 23, 2022 18:55
@tomkarw
Copy link
Contributor Author

tomkarw commented Aug 23, 2022

I can see that some may want this util on it's own. I think more importantly though, we should keep the end goal in mind, and "work backwards". That is to say, I suspect most often people will want a channel where they can do tx.flush().await. We don't need the full impl to start with, but a full public API design/proposal would help us see what the user would write.

Are you suggesting we write the proposal before continuing?

I'll be honest, I was thinking about my use case so far.
I'm storing some data on each chunk of a streaming body. Then I need an indication of EOS and that no more chunks will come in, so that I can finalize with some external request that uses all the data aggregated.

I think your example (although I'm fuzzy on the details) would work as well, I can imagine some futures::join on both whatever is driving the sending of data forward and EosSignaler.wait_till_eos. A bit simpler with tokio::spawn:

let tx = SomeChannel::new();
let buf = Bytes::from_static(b"abc");
let (buf, signaler) = NotifyOnEos::new(buf);

{
    let tx = tx.clone();
    tokio::spawn(async move {
        signaler.wait_till_eos().await;
        tx.flush().await;
    }
}

while buf.remaining() > 0 {
    tx.send(buf.chunk());
    buf.advance(buf.chunk().len());
}

@tomkarw tomkarw requested a review from seanmonstar August 24, 2022 10:05
@seanmonstar
Copy link
Member

Are you suggesting we write the proposal before continuing?

I don't think it's a strong blocker, but rather a very helpful tool to make sure we solve the problem from the user's point of view. I just currently am focused on things that must be done for hyper's 1.0 release candidate, and coming up with this proposal is not one of those things yet.

I'll be honest, I was thinking about my use case so far.

That's reasonable! It's certainly an user's point of view, which is better than my hand-wavey idea that I jotted down in the issue to solve an abstract problem I've heard many voice. It does sound like having this util to wrap any impl Buf is helpful on it's own, which is what I wanted to be sure of.

@aliu
Copy link

aliu commented Mar 21, 2023

How would one use this, practically speaking? NotifyOnEos::new gives you a pair of the type implementing Buf and the signal, but if your body has multiple data frames (I imagine you would use this through BodyExt::map_data, or BodyExt::map_frame for the new api) or something like Body::wrap_stream, you'd have to call this for every data frame, then what would you do with all the extra signals? @tomkarw you mentioned in hyperium/hyper#2858 that you were using some similar strategy for your work, and that this implementation was based on that. Do you mind elaborating on how you're applying this NotifyOnEos in the actual body? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create a body channel implementation that knows when buf is written
3 participants