Skip to content

Commit

Permalink
ClientRefs, Poller, and Streams (#179)
Browse files Browse the repository at this point in the history
* wip: weak and ref clients

* nit: use type alias

* feat: poller and subscription streams

* lint: clippy

* doc: fix links

* feat: polling limit

* fix: name

* feat: block heartbeat

* wip: chaintask

* feat: chain stream poller take 1

* fix: pubsub

* chore: misc cleanups, nfc

* feat: move implementations to trait, implement send transaction

* chore: rename errors in logs to 'err'

* fix: correct ser/de, add more tracing

* test: add a primitive test

* chore: unused import

* feat: add retrying and ser caching to poller

* feat: reduce polling interval if provider is local

* fix: import serde

* fix: export PendingTransaction

* fix: PTX outputs hash, >1 waiting txs, fix initial block yield

* nit

* fix: pin the sleep

* clippies

* wasmm

* fix: feature

---------

Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com>
  • Loading branch information
prestwich and DaniPopes authored Mar 4, 2024
1 parent 785c667 commit 86027c9
Show file tree
Hide file tree
Showing 31 changed files with 1,404 additions and 301 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ jobs:
- name: cargo hack
run: |
cargo hack check --workspace --target wasm32-unknown-unknown \
--exclude alloy-contract \
--exclude alloy-node-bindings \
--exclude alloy-providers \
--exclude alloy-signer \
--exclude alloy-signer-aws \
--exclude alloy-signer-gcp \
--exclude alloy-signer-ledger \
--exclude alloy-signer-trezor \
--exclude alloy-node-bindings \
--exclude alloy-transport-ipc
feature-checks:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ futures-executor = "0.3.29"
hyper = "0.14.27"
tokio = "1.33"
tokio-util = "0.7"
tokio-stream = "0.1.14"
tower = { version = "0.4.13", features = ["util"] }

tracing = "0.1.40"
Expand Down
4 changes: 2 additions & 2 deletions crates/contract/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{Error, Result};
use alloy_dyn_abi::{DynSolValue, FunctionExt, JsonAbiExt};
use alloy_json_abi::Function;
use alloy_primitives::{Address, Bytes, U256, U64};
use alloy_providers::provider::TempProvider;
use alloy_providers::tmp::TempProvider;
use alloy_rpc_types::{
request::{TransactionInput, TransactionRequest},
state::StateOverride,
Expand Down Expand Up @@ -456,7 +456,7 @@ mod tests {
use super::*;
use alloy_node_bindings::{Anvil, AnvilInstance};
use alloy_primitives::{address, b256, bytes, hex};
use alloy_providers::provider::{HttpProvider, Provider};
use alloy_providers::tmp::{HttpProvider, Provider};
use alloy_sol_types::sol;

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/contract/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{CallBuilder, Interface, Result};
use alloy_dyn_abi::DynSolValue;
use alloy_json_abi::{Function, JsonAbi};
use alloy_primitives::{Address, Selector};
use alloy_providers::provider::TempProvider;
use alloy_providers::tmp::TempProvider;

/// A handle to an Ethereum contract at a specific address.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ pub use call::*;
// NOTE: please avoid changing the API of this module due to its use in the `sol!` macro.
#[doc(hidden)]
pub mod private {
pub use alloy_providers::provider::TempProvider as Provider;
pub use alloy_providers::tmp::TempProvider as Provider;
}
2 changes: 1 addition & 1 deletion crates/json-rpc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{ErrorPayload, RpcReturn};
use serde_json::value::RawValue;

/// An RPC error.
#[derive(thiserror::Error, Debug)]
#[derive(Debug, thiserror::Error)]
pub enum RpcError<E, ErrResp = Box<RawValue>> {
/// Server returned an error response.
#[error("Server returned an error response: {0}")]
Expand Down
1 change: 0 additions & 1 deletion crates/json-rpc/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ where

// Params may be omitted if it is 0-sized
if sized_params {
// TODO: remove unwrap
map.serialize_entry("params", &self.params)?;
}

Expand Down
20 changes: 15 additions & 5 deletions crates/providers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,31 @@ exclude.workspace = true

[dependencies]
alloy-network.workspace = true
alloy-primitives.workspace = true
alloy-rpc-client = { workspace = true, features = ["reqwest"] }
alloy-rpc-types.workspace = true
alloy-rpc-trace-types.workspace = true
alloy-rpc-types.workspace = true
alloy-transport-http = { workspace = true, features = ["reqwest"] }
alloy-transport.workspace = true

alloy-primitives.workspace = true

async-stream = "0.3.5"
async-trait.workspace = true
auto_impl = "1.1.0"
futures.workspace = true
lru = "0.12.2"
reqwest.workspace = true
serde.workspace = true
thiserror.workspace = true
reqwest.workspace = true
auto_impl = "1.1.0"
tokio = { workspace = true, features = ["sync", "macros"] }
tracing.workspace = true

[dev-dependencies]
alloy-consensus.workspace = true
alloy-node-bindings.workspace = true
tokio = { version = "1.33.0", features = ["macros"] }
alloy-rlp.workspace = true
tokio = { workspace = true, features = ["macros"] }
tracing-subscriber = { workspace = true, features = ["fmt"] }

[features]
anvil = []
58 changes: 26 additions & 32 deletions crates/providers/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{NetworkRpcClient, Provider};
use crate::new::{Provider, RootProvider};
use alloy_network::Network;
use alloy_rpc_client::RpcClient;
use alloy_transport::Transport;
Expand All @@ -7,28 +7,27 @@ use std::marker::PhantomData;
/// A layering abstraction in the vein of [`tower::Layer`]
///
/// [`tower::Layer`]: https://docs.rs/tower/latest/tower/trait.Layer.html
pub trait ProviderLayer<P: Provider<N, T>, N: Network, T: Transport> {
pub trait ProviderLayer<P: Provider<N, T>, N: Network, T: Transport + Clone> {
type Provider: Provider<N, T>;

fn layer(&self, inner: P) -> Self::Provider;
}

pub struct Stack<T, Inner, Outer> {
pub struct Stack<Inner, Outer> {
inner: Inner,
outer: Outer,
_pd: std::marker::PhantomData<fn() -> T>,
}

impl<T, Inner, Outer> Stack<T, Inner, Outer> {
impl<Inner, Outer> Stack<Inner, Outer> {
/// Create a new `Stack`.
pub fn new(inner: Inner, outer: Outer) -> Self {
Stack { inner, outer, _pd: std::marker::PhantomData }
Stack { inner, outer }
}
}

impl<P, N, T, Inner, Outer> ProviderLayer<P, N, T> for Stack<T, Inner, Outer>
impl<P, N, T, Inner, Outer> ProviderLayer<P, N, T> for Stack<Inner, Outer>
where
T: Transport,
T: Transport + Clone,
N: Network,
P: Provider<N, T>,
Inner: ProviderLayer<P, N, T>,
Expand All @@ -49,14 +48,13 @@ where
/// around maintaining the network and transport types.
///
/// [`tower::ServiceBuilder`]: https://docs.rs/tower/latest/tower/struct.ServiceBuilder.html
pub struct ProviderBuilder<L, N = (), T = ()> {
pub struct ProviderBuilder<L, N = ()> {
layer: L,

transport: PhantomData<T>,
network: PhantomData<N>,
}

impl<L, N, T> ProviderBuilder<L, N, T> {
impl<L, N> ProviderBuilder<L, N> {
/// Add a layer to the stack being built. This is similar to
/// [`tower::ServiceBuilder::layer`].
///
Expand All @@ -70,12 +68,8 @@ impl<L, N, T> ProviderBuilder<L, N, T> {
/// [`tower::ServiceBuilder::layer`]: https://docs.rs/tower/latest/tower/struct.ServiceBuilder.html#method.layer
/// [`tower::ServiceBuilder`]: https://docs.rs/tower/latest/tower/struct.ServiceBuilder.html
pub fn layer<Inner>(self, layer: Inner) -> ProviderBuilder<Stack<T, Inner, L>> {
ProviderBuilder {
layer: Stack::new(layer, self.layer),
transport: PhantomData,
network: PhantomData,
}
pub fn layer<Inner>(self, layer: Inner) -> ProviderBuilder<Stack<Inner, L>> {
ProviderBuilder { layer: Stack::new(layer, self.layer), network: PhantomData }
}

/// Change the network.
Expand All @@ -87,34 +81,34 @@ impl<L, N, T> ProviderBuilder<L, N, T> {
/// ```rust,ignore
/// builder.network::<Arbitrum>()
/// ```
pub fn network<Net: Network>(self) -> ProviderBuilder<L, Net, T> {
ProviderBuilder { layer: self.layer, transport: self.transport, network: PhantomData }
pub fn network<Net: Network>(self) -> ProviderBuilder<L, Net> {
ProviderBuilder { layer: self.layer, network: PhantomData }
}

/// Finish the layer stack by providing a root [`RpcClient`], outputting
/// Finish the layer stack by providing a root [`Provider`], outputting
/// the final [`Provider`] type with all stack components.
///
/// This is a convenience function for
/// `ProviderBuilder::provider<NetworkRpcClient>`.
pub fn client(self, client: RpcClient<T>) -> L::Provider
pub fn provider<P, T>(self, provider: P) -> L::Provider
where
L: ProviderLayer<NetworkRpcClient<N, T>, N, T>,
L: ProviderLayer<P, N, T>,
P: Provider<N, T>,
T: Transport + Clone,
N: Network,
{
self.provider(NetworkRpcClient::from(client))
self.layer.layer(provider)
}

/// Finish the layer stack by providing a root [`Provider`], outputting
/// Finish the layer stack by providing a root [`RpcClient`], outputting
/// the final [`Provider`] type with all stack components.
pub fn provider<P>(self, provider: P) -> L::Provider
///
/// This is a convenience function for
/// `ProviderBuilder::provider<RpcClient>`.
pub fn on_client<T>(self, client: RpcClient<T>) -> L::Provider
where
L: ProviderLayer<P, N, T>,
P: Provider<N, T>,
T: Transport,
L: ProviderLayer<RootProvider<N, T>, N, T>,
T: Transport + Clone,
N: Network,
{
self.layer.layer(provider)
self.provider(RootProvider::new(client))
}
}

Expand Down
112 changes: 112 additions & 0 deletions crates/providers/src/chain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use crate::{new::RootProviderInner, Provider, RootProvider, WeakProvider};
use alloy_network::Network;
use alloy_primitives::{BlockNumber, U64};
use alloy_rpc_client::{PollTask, WeakClient};
use alloy_rpc_types::Block;
use alloy_transport::{RpcError, Transport};
use async_stream::stream;
use futures::{Stream, StreamExt};
use lru::LruCache;
use std::{num::NonZeroUsize, sync::Arc, time::Duration};

/// The size of the block cache.
const BLOCK_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(10) };

/// Maximum number of retries for fetching a block.
const MAX_RETRIES: usize = 3;

/// Default block number for when we don't have a block yet.
const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;

pub(crate) struct ChainStreamPoller<P, T: Transport + Clone> {
provider: WeakProvider<P>,
poll_task: PollTask<T, (), U64>,
next_yield: BlockNumber,
known_blocks: LruCache<BlockNumber, Block>,
}

impl<N: Network, T: Transport + Clone> ChainStreamPoller<RootProviderInner<N, T>, T> {
pub(crate) fn from_root(p: &RootProvider<N, T>) -> Self {
let mut this = Self::new(Arc::downgrade(&p.inner), p.inner.weak_client());
if p.client().is_local() {
this.poll_task.set_poll_interval(Duration::from_secs(1));
}
this
}
}

impl<P, T: Transport + Clone> ChainStreamPoller<P, T> {
pub(crate) fn new(provider: WeakProvider<P>, client: WeakClient<T>) -> Self {
Self {
provider,
poll_task: PollTask::new(client, "eth_blockNumber", ()),
next_yield: NO_BLOCK_NUMBER,
known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
}
}

pub(crate) fn into_stream<N: Network>(mut self) -> impl Stream<Item = Block>
where
P: Provider<N, T>,
{
stream! {
let mut poll_task = self.poll_task.spawn().into_stream();
'task: loop {
// Clear any buffered blocks.
while let Some(known_block) = self.known_blocks.pop(&self.next_yield) {
debug!(number=self.next_yield, "yielding block");
self.next_yield += 1;
yield known_block;
}

// Get the tip.
let block_number = match poll_task.next().await {
Some(Ok(block_number)) => block_number,
Some(Err(err)) => {
// This is fine.
debug!(%err, "polling stream lagged");
continue 'task;
}
None => {
debug!("polling stream ended");
break 'task;
}
};
let block_number = block_number.to::<u64>();
if self.next_yield == NO_BLOCK_NUMBER {
assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
self.next_yield = block_number;
} else if block_number < self.next_yield {
debug!(block_number, self.next_yield, "not advanced yet");
continue 'task;
}

// Upgrade the provider.
let Some(provider) = self.provider.upgrade() else {
debug!("provider dropped");
break 'task;
};

// Then try to fill as many blocks as possible.
// TODO: Maybe use `join_all`
let mut retries = MAX_RETRIES;
for number in self.next_yield..=block_number {
debug!(number, "fetching block");
let block = match provider.get_block_by_number(number, false).await {
Ok(block) => block,
Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
debug!(number, %err, "failed to fetch block, retrying");
retries -= 1;
continue;
}
Err(err) => {
error!(number, %err, "failed to fetch block");
break 'task;
}
};
self.known_blocks.put(number, block);
}
}
}
}
}
Loading

0 comments on commit 86027c9

Please sign in to comment.