Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsubpubsubpubsub #21

Merged
merged 30 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5967d6c
feature: new pubsub
prestwich Nov 6, 2023
cf385c7
fix: reconnect in pubsubservice
prestwich Nov 6, 2023
3f926f9
feat: Ws
prestwich Nov 7, 2023
98c3bb8
refactor: delete pubsub trait
prestwich Nov 7, 2023
ed4de99
refactor: disable batching for pubsub
prestwich Nov 7, 2023
5f77c63
nit: match tuple order
prestwich Nov 7, 2023
06e2606
fix: manually impl deser of pubsubitem
prestwich Nov 8, 2023
e762f8d
fix: clippy
prestwich Nov 8, 2023
acbffe3
docs: comments for deser impl
prestwich Nov 8, 2023
751d68c
chore: remove dbg from test
prestwich Nov 8, 2023
b628e0a
chore: remove dbg from test
prestwich Nov 8, 2023
37ecfb6
refactor: rename env vars
prestwich Nov 8, 2023
6ce20b5
refactor: break transports into several crates
prestwich Nov 8, 2023
d07764a
fix: turn ws off by default
prestwich Nov 8, 2023
61f311e
nit: temporarily comment out tests
prestwich Nov 8, 2023
2e47e93
nits: clippy
prestwich Nov 8, 2023
2d2d85e
fix: clippy all-features
prestwich Nov 8, 2023
acf091b
fix: tests for provider
prestwich Nov 8, 2023
2196597
chore: fix wasm
prestwich Nov 8, 2023
48ee7e1
doc: resolve broken links
prestwich Nov 8, 2023
f8f422c
fix: tokio rt on non-wasm
prestwich Nov 8, 2023
1721881
fix: cargo hack
prestwich Nov 8, 2023
0772226
chore: add warns and denies to some lib files
prestwich Nov 8, 2023
f1e8a68
chore: add warns and denies to more lib files
prestwich Nov 8, 2023
61b3a2c
chore: more lints and warns and errors
prestwich Nov 9, 2023
6ad5ddc
fix: impl PubSubConnect for WsConnect in wasm
prestwich Nov 9, 2023
f59daa6
docs: fix some backticks
prestwich Nov 9, 2023
093d5ec
fix: url in deps
prestwich Nov 9, 2023
8dfb675
fix: 1 url type
prestwich Nov 9, 2023
245ff68
fix: dep tokio
prestwich Nov 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: manually impl deser of pubsubitem
  • Loading branch information
prestwich committed Nov 8, 2023
commit 06e2606d0c088fa83fad25d777372b28120ae0fc
105 changes: 102 additions & 3 deletions crates/json-rpc/src/notification.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use alloy_primitives::U256;
use serde::{Deserialize, Serialize};
use serde::{
de::{MapAccess, Visitor},
Deserialize, Serialize,
};

use crate::Response;

Expand All @@ -14,9 +17,105 @@ pub struct EthNotification<T = Box<serde_json::value::RawValue>> {
/// An item received over an Ethereum pubsub transport. Ethereum pubsub uses a
/// non-standard JSON-RPC notification format. An item received over a pubsub
/// transport may be a JSON-RPC response or an Ethereum-style notification.
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
#[derive(Debug, Clone)]
pub enum PubSubItem {
Response(Response),
Notification(EthNotification),
}

impl<'de> Deserialize<'de> for PubSubItem {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct PubSubItemVisitor;

impl<'de> Visitor<'de> for PubSubItemVisitor {
type Value = PubSubItem;

fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a JSON-RPC response or an Ethereum-style notification")
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut id = None;
let mut subscription = None;
let mut result = None;
let mut error = None;

while let Ok(Some(key)) = map.next_key() {
match key {
"id" => {
if id.is_some() {
return Err(serde::de::Error::duplicate_field("id"));
}
id = Some(map.next_value()?);
}
"subscription" => {
if subscription.is_some() {
return Err(serde::de::Error::duplicate_field("subscription"));
}
subscription = Some(map.next_value()?);
}
"result" => {
if result.is_some() {
return Err(serde::de::Error::duplicate_field("result"));
}
result = Some(map.next_value()?);
}
"error" => {
if error.is_some() {
return Err(serde::de::Error::duplicate_field("error"));
}
error = Some(map.next_value()?);
}
_ => {
let _ = map.next_value::<serde_json::Value>()?;
}
}
}

if let Some(id) = id {
if subscription.is_some() {
return Err(serde::de::Error::custom(
"unexpected subscription in pubsub item",
));
}

let payload = if error.is_some() {
crate::ResponsePayload::Failure(error.unwrap())
} else {
if result.is_none() {
return Err(serde::de::Error::missing_field("result"));
}
crate::ResponsePayload::Success(result.unwrap())
};
Ok(PubSubItem::Response(Response { id, payload }))
} else {
if error.is_some() {
return Err(serde::de::Error::custom(
"unexpected `error` field in subscription notification",
));
}
if subscription.is_none() {
return Err(serde::de::Error::missing_field("subscription"));
}

if result.is_none() {
return Err(serde::de::Error::missing_field("result"));
}

Ok(PubSubItem::Notification(EthNotification {
subscription: subscription.unwrap(),
result: result.unwrap(),
}))
}
}
}

deserializer.deserialize_any(PubSubItemVisitor)
}
}
5 changes: 5 additions & 0 deletions crates/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ tokio-tungstenite = { version = "0.20.1", features = ["rustls-tls-webpki-roots"]
default = ["reqwest", "hyper"]
reqwest = ["dep:reqwest"]
hyper = ["dep:hyper", "hyper/client"]

[dev-dependencies]
test-log = { version = "0.2.13", default-features = false, features = ["trace"] }
tracing-subscriber = "0.3.17"
tracing-test = "0.2.4"
22 changes: 21 additions & 1 deletion crates/transports/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ use alloy_json_rpc::{Request, RequestPacket, ResponsePacket, RpcParam, RpcResult
use core::panic;
use serde_json::value::RawValue;
use std::{
fmt::Debug,
future::Future,
marker::PhantomData,
pin::Pin,
task::{self, Poll::Ready},
};
use tower::Service;
use tracing::{instrument, trace};

/// The states of the [`RpcCall`] future.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project(project = CallStateProj)]
enum CallState<Params, Conn>
where
Conn: Transport + Clone,
Params: RpcParam,
Conn: Transport + Clone,
{
Prepared {
request: Option<Request<Params>>,
Expand All @@ -30,6 +32,20 @@ where
Complete,
}

impl<Params, Conn> Debug for CallState<Params, Conn>
where
Params: RpcParam,
Conn: Transport + Clone,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Prepared { .. } => f.debug_struct("Prepared").finish(),
Self::AwaitingResponse { .. } => f.debug_struct("AwaitingResponse").finish(),
Self::Complete => write!(f, "Complete"),
}
}
}

impl<Params, Conn> CallState<Params, Conn>
where
Conn: Transport + Clone,
Expand All @@ -39,6 +55,7 @@ where
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<<Self as Future>::Output> {
trace!("Polling prepared");
let fut = {
let CallStateProj::Prepared {
connection,
Expand Down Expand Up @@ -76,6 +93,7 @@ where
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<<Self as Future>::Output> {
trace!("Polling awaiting");
let CallStateProj::AwaitingResponse { fut } = self.as_mut().project() else {
unreachable!("Called poll_awaiting in incorrect state")
};
Expand All @@ -95,6 +113,7 @@ where
{
type Output = RpcResult<Box<RawValue>, Box<RawValue>, TransportError>;

#[instrument(skip(self, cx))]
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
if matches!(*self.as_mut(), CallState::Prepared { .. }) {
return self.poll_prepared(cx);
Expand Down Expand Up @@ -191,6 +210,7 @@ where
type Output = RpcResult<Resp, Box<RawValue>, TransportError>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
tracing::trace!(?self.state, "Polling RpcCall");
let this = self.project();

let resp = task::ready!(this.state.poll(cx));
Expand Down
5 changes: 3 additions & 2 deletions crates/transports/src/transports/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use wasm::WsConnect;

use crate::pubsub::ConnectionInterface;

use tracing::{error, trace};
use tracing::{debug, error, trace};

/// An ongoing connection to a backend.
///
Expand All @@ -25,8 +25,9 @@ pub struct WsBackend<T> {
}

impl<T> WsBackend<T> {
#[tracing::instrument(skip(self))]
pub async fn handle_text(&mut self, t: String) -> Result<(), ()> {
trace!(text = t, "Received message");
debug!(text = t, "Received message from websocket");

match serde_json::from_str(&t) {
Ok(item) => {
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions crates/transports/tests/it/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod http;
mod ws;
17 changes: 12 additions & 5 deletions crates/transports/tests/ws.rs → crates/transports/tests/it/ws.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::borrow::Cow;
use alloy_transports::{ClientBuilder, RpcCall, WsConnect};

use alloy_primitives::U64;
use alloy_transports::{ClientBuilder, RpcCall, WsConnect};
use std::borrow::Cow;

#[tokio::test]
#[test_log::test(tokio::test)]
async fn it_makes_a_request() {
let infura = std::env::var("INFURA_WS").unwrap();

Expand All @@ -12,13 +12,20 @@ async fn it_makes_a_request() {
auth: None,
};

dbg!("have connector");

let client = ClientBuilder::default().connect(connector).await.unwrap();

dbg!("have client");

let params: Cow<'static, _> = Cow::Owned(());

let req: RpcCall<_, Cow<'static, ()>, U64> = client.prepare("eth_blockNumber", params);
let res = req.await;

let timeout = tokio::time::timeout(std::time::Duration::from_secs(2), req);

let res = timeout.await;

dbg!(&res);
res.unwrap();
res.unwrap().unwrap();
}
Loading