Skip to content

Commit

Permalink
Rename poll-list to poll, poll-one to pollable.block, and introduce p…
Browse files Browse the repository at this point in the history
…ollable.ready (bytecodealliance#7427)

* update wasi:io/poll wit to WebAssembly/wasi-io#54

* put version in wit package name

* implement changes to the wits

* move contents of Host::poll_one to HostPollable::block
* rename Host::poll_list to Host::poll,
* implement HostPollable::ready, using futures::future::poll_immediate

* wit: fix reference to poll-list

* wasi-http wit: fix reference to poll-list

* clocks implementation: ready returns immediately if deadline has past

this is an optimization, but what it really allows us to do is assert
pollable.ready() for a subscribe_duration(0) is ready immediately.

* component adapter: rename poll-list to poll

* test-programs: renames to poll functions

test-programs/src/bin/preview2_sleep.rs in particular now asserts
ready() on a subscribe_duration(0) and a subscribe_instant(now() - 1),
so we have test coverage for ready as well now

* code review

Co-authored-by: Alex Crichton <alex@alexcrichton.com>

---------

Co-authored-by: Alex Crichton <alex@alexcrichton.com>
  • Loading branch information
Pat Hickey and alexcrichton authored Oct 31, 2023
1 parent 83a33b0 commit ddffc7e
Show file tree
Hide file tree
Showing 17 changed files with 103 additions and 78 deletions.
2 changes: 1 addition & 1 deletion crates/test-programs/src/bin/api_proxy_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ mod executor {

let mut ready = vec![false; wakers.len()];

for index in io::poll::poll_list(&pollables) {
for index in io::poll::poll(&pollables) {
ready[usize::try_from(index).unwrap()] = true;
}

Expand Down
6 changes: 2 additions & 4 deletions crates/test-programs/src/bin/api_reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ wit_bindgen::generate!({
});

struct T;
use crate::wasi::io::poll;

static mut STATE: Vec<String> = Vec::new();

Expand All @@ -34,7 +33,7 @@ impl Guest for T {
for s in STATE.iter() {
let mut out = s.as_bytes();
while !out.is_empty() {
poll::poll_list(&[&pollable]);
pollable.block();
let n = match o.check_write() {
Ok(n) => n,
Err(_) => return Err(()),
Expand All @@ -52,8 +51,7 @@ impl Guest for T {
Ok(_) => {}
Err(_) => return Err(()),
}

poll::poll_list(&[&pollable]);
pollable.block();
match o.check_write() {
Ok(_) => {}
Err(_) => return Err(()),
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/src/bin/preview2_ip_name_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() {
let addresses =
ip_name_lookup::resolve_addresses(&network, "example.com", None, false).unwrap();
let pollable = addresses.subscribe();
poll::poll_one(&pollable);
pollable.block();
assert!(addresses.resolve_next_address().is_ok());

let result = ip_name_lookup::resolve_addresses(&network, "a.b<&>", None, false);
Expand All @@ -20,7 +20,7 @@ fn main() {
let addresses = ip_name_lookup::resolve_addresses(&network, "github.com", None, false).unwrap();
let lookup = addresses.subscribe();
let timeout = monotonic_clock::subscribe_duration(1_000_000_000);
let ready = poll::poll_list(&[&lookup, &timeout]);
let ready = poll::poll(&[&lookup, &timeout]);
assert!(ready.len() > 0);
match ready[0] {
0 => loop {
Expand Down
18 changes: 12 additions & 6 deletions crates/test-programs/src/bin/preview2_sleep.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use test_programs::wasi::{clocks::monotonic_clock, io::poll};
use test_programs::wasi::clocks::monotonic_clock;

fn main() {
sleep_10ms();
Expand All @@ -9,19 +9,25 @@ fn main() {
fn sleep_10ms() {
let dur = 10_000_000;
let p = monotonic_clock::subscribe_instant(monotonic_clock::now() + dur);
poll::poll_one(&p);
p.block();
let p = monotonic_clock::subscribe_duration(dur);
poll::poll_one(&p);
p.block();
}

fn sleep_0ms() {
let p = monotonic_clock::subscribe_instant(monotonic_clock::now());
poll::poll_one(&p);
p.block();
let p = monotonic_clock::subscribe_duration(0);
poll::poll_one(&p);
assert!(
p.ready(),
"timer subscription with duration 0 is ready immediately"
);
}

fn sleep_backwards_in_time() {
let p = monotonic_clock::subscribe_instant(monotonic_clock::now() - 1);
poll::poll_one(&p);
assert!(
p.ready(),
"timer subscription for instant which has passed is ready immediately"
);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use test_programs::wasi::cli::stdin;
use test_programs::wasi::io::poll;
use test_programs::wasi::io::streams;

fn main() {
let stdin: streams::InputStream = stdin::get_stdin();
let stdin_pollable = stdin.subscribe();
let ready = poll::poll_list(&[&stdin_pollable]);
assert_eq!(ready, &[0]);
stdin_pollable.block();
assert!(stdin_pollable.ready(), "after blocking, pollable is ready");
drop(stdin_pollable);
drop(stdin);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use test_programs::wasi::cli::stdin;
use test_programs::wasi::io::poll;
use test_programs::wasi::io::streams;

fn main() {
let stdin: streams::InputStream = stdin::get_stdin();
let stdin_pollable = stdin.subscribe();
let ready = poll::poll_list(&[&stdin_pollable]);
assert_eq!(ready, &[0]);
stdin_pollable.block();
drop(stdin);
unreachable!("execution should have trapped in line above when stream dropped before pollable");
}
9 changes: 4 additions & 5 deletions crates/test-programs/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::wasi::http::{outgoing_handler, types as http_types};
use crate::wasi::io::poll;
use crate::wasi::io::streams;
use anyhow::{anyhow, Result};
use std::fmt;
Expand Down Expand Up @@ -72,7 +71,7 @@ pub fn request(

let pollable = request_body.subscribe();
while !buf.is_empty() {
poll::poll_list(&[&pollable]);
pollable.block();

let permit = match request_body.check_write() {
Ok(n) => n,
Expand All @@ -94,7 +93,7 @@ pub fn request(
_ => {}
}

poll::poll_list(&[&pollable]);
pollable.block();

match request_body.check_write() {
Ok(_) => {}
Expand All @@ -110,7 +109,7 @@ pub fn request(
Some(result) => result.map_err(|_| anyhow!("incoming response errored"))?,
None => {
let pollable = future_response.subscribe();
let _ = poll::poll_list(&[&pollable]);
pollable.block();
future_response
.get()
.expect("incoming response available")
Expand Down Expand Up @@ -140,7 +139,7 @@ pub fn request(

let mut body = Vec::new();
loop {
poll::poll_list(&[&input_stream_pollable]);
input_stream_pollable.block();

let mut body_chunk = match input_stream.read(1024 * 1024) {
Ok(c) => c,
Expand Down
18 changes: 7 additions & 11 deletions crates/test-programs/src/sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ use std::ops::Range;
const TIMEOUT_NS: u64 = 1_000_000_000;

impl Pollable {
pub fn wait(&self) {
poll::poll_one(self);
}

pub fn wait_until(&self, timeout: &Pollable) -> Result<(), ErrorCode> {
let ready = poll::poll_list(&[self, timeout]);
let ready = poll::poll(&[self, timeout]);
assert!(ready.len() > 0);
match ready[0] {
0 => Ok(()),
Expand All @@ -36,7 +32,7 @@ impl OutputStream {
let pollable = self.subscribe();

while !bytes.is_empty() {
pollable.wait();
pollable.block();

let permit = self.check_write()?;

Expand Down Expand Up @@ -75,7 +71,7 @@ impl TcpSocket {

loop {
match self.finish_bind() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand All @@ -88,7 +84,7 @@ impl TcpSocket {

loop {
match self.finish_listen() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand All @@ -105,7 +101,7 @@ impl TcpSocket {

loop {
match self.finish_connect() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand All @@ -116,7 +112,7 @@ impl TcpSocket {

loop {
match self.accept() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand All @@ -139,7 +135,7 @@ impl UdpSocket {

loop {
match self.finish_bind() {
Err(ErrorCode::WouldBlock) => sub.wait(),
Err(ErrorCode::WouldBlock) => sub.block(),
result => return result,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/wasi-http/wit/deps/http/types.wit
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ interface types {
/// transport layer of the HTTP protocol.
///
/// These timeouts are separate from any the user may use to bound a
/// blocking call to `wasi:io/poll.poll-list`.
/// blocking call to `wasi:io/poll.poll`.
resource request-options {
/// Construct a default `request-options` value.
constructor();
Expand Down
25 changes: 16 additions & 9 deletions crates/wasi-http/wit/deps/io/poll.wit
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@ package wasi:io@0.2.0-rc-2023-11-05;
/// A poll API intended to let users wait for I/O events on multiple handles
/// at once.
interface poll {
/// A "pollable" handle.
resource pollable;
/// `pollable` epresents a single I/O event which may be ready, or not.
resource pollable {

/// Return the readiness of a pollable. This function never blocks.
///
/// Returns `true` when the pollable is ready, and `false` otherwise.
ready: func() -> bool;

/// `block` returns immediately if the pollable is ready, and otherwise
/// blocks until ready.
///
/// This function is equivalent to calling `poll.poll` on a list
/// containing only this pollable.
block: func();
}

/// Poll for completion on a set of pollables.
///
Expand All @@ -24,11 +37,5 @@ interface poll {
/// do any I/O so it doesn't fail. If any of the I/O sources identified by
/// the pollables has an error, it is indicated by marking the source as
/// being reaedy for I/O.
poll-list: func(in: list<borrow<pollable>>) -> list<u32>;

/// Poll for completion on a single pollable.
///
/// This function is similar to `poll-list`, but operates on only a single
/// pollable. When it returns, the handle is ready for I/O.
poll-one: func(in: borrow<pollable>);
poll: func(in: list<borrow<pollable>>) -> list<u32>;
}
10 changes: 5 additions & 5 deletions crates/wasi-preview1-component-adapter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub mod bindings {
// can't support in these special core-wasm adapters.
// Instead, we manually define the bindings for these functions in
// terms of raw pointers.
skip: ["run", "get-environment", "poll-list"],
skip: ["run", "get-environment", "poll"],
});

#[cfg(feature = "reactor")]
Expand All @@ -54,7 +54,7 @@ pub mod bindings {
// can't support in these special core-wasm adapters.
// Instead, we manually define the bindings for these functions in
// terms of raw pointers.
skip: ["get-environment", "poll-list"],
skip: ["get-environment", "poll"],
});
}

Expand Down Expand Up @@ -1780,8 +1780,8 @@ pub unsafe extern "C" fn poll_oneoff(
#[link(wasm_import_module = "wasi:io/poll@0.2.0-rc-2023-11-05")]
#[allow(improper_ctypes)] // FIXME(bytecodealliance/wit-bindgen#684)
extern "C" {
#[link_name = "poll-list"]
fn poll_list_import(pollables: *const Pollable, len: usize, rval: *mut ReadyList);
#[link_name = "poll"]
fn poll_import(pollables: *const Pollable, len: usize, rval: *mut ReadyList);
}
let mut ready_list = ReadyList {
base: std::ptr::null(),
Expand All @@ -1794,7 +1794,7 @@ pub unsafe extern "C" fn poll_oneoff(
.checked_mul(size_of::<u32>())
.trapping_unwrap(),
|| {
poll_list_import(
poll_import(
pollables.pointer,
pollables.length,
&mut ready_list as *mut _,
Expand Down
6 changes: 5 additions & 1 deletion crates/wasi/src/preview2/host/clocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ fn subscribe_to_duration(
table: &mut crate::preview2::Table,
duration: tokio::time::Duration,
) -> anyhow::Result<Resource<Pollable>> {
let sleep = if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) {
let sleep = if duration.is_zero() {
table.push(Deadline::Past)?
} else if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) {
// NB: this resource created here is not actually exposed to wasm, it's
// only an internal implementation detail used to match the signature
// expected by `subscribe`.
Expand Down Expand Up @@ -85,6 +87,7 @@ impl<T: WasiView> monotonic_clock::Host for T {
}

enum Deadline {
Past,
Instant(tokio::time::Instant),
Never,
}
Expand All @@ -93,6 +96,7 @@ enum Deadline {
impl Subscribe for Deadline {
async fn ready(&mut self) {
match self {
Deadline::Past => {}
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
Deadline::Never => std::future::pending().await,
}
Expand Down
5 changes: 3 additions & 2 deletions crates/wasi/src/preview2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ pub mod bindings {
"[method]output-stream.blocking-write-and-flush",
"[method]output-stream.blocking-write-zeroes-and-flush",
"[method]directory-entry-stream.read-directory-entry",
"poll-list",
"poll-one",
"poll",
"[method]pollable.block",
"[method]pollable.ready",
],
},
trappable_error_type: {
Expand Down
Loading

0 comments on commit ddffc7e

Please sign in to comment.