Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientRefs, Poller, and Streams #179

Merged
merged 33 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fa95d24
wip: weak and ref clients
prestwich Feb 2, 2024
52be2b8
nit: use type alias
prestwich Feb 3, 2024
70ecd4b
feat: poller and subscription streams
prestwich Feb 3, 2024
b724da0
lint: clippy
prestwich Feb 3, 2024
0c494b6
doc: fix links
prestwich Feb 3, 2024
2a1ad27
feat: polling limit
prestwich Feb 3, 2024
babf0ef
fix: name
prestwich Feb 4, 2024
51824bf
feat: block heartbeat
prestwich Feb 5, 2024
39a148f
wip: chaintask
prestwich Feb 6, 2024
0188a78
feat: chain stream poller take 1
prestwich Feb 12, 2024
698f57a
fix: pubsub
prestwich Feb 12, 2024
1cd9233
Merge branch 'main' into prestwich/block-heart
DaniPopes Feb 21, 2024
9854584
chore: misc cleanups, nfc
DaniPopes Feb 22, 2024
4d127c8
Merge branch 'main' into prestwich/block-heart
DaniPopes Feb 22, 2024
963797f
feat: move implementations to trait, implement send transaction
DaniPopes Feb 26, 2024
f092dd2
chore: rename errors in logs to 'err'
DaniPopes Feb 26, 2024
a150bc7
fix: correct ser/de, add more tracing
DaniPopes Feb 26, 2024
78cba27
test: add a primitive test
DaniPopes Feb 26, 2024
1fc706e
chore: unused import
DaniPopes Feb 26, 2024
5b18bda
feat: add retrying and ser caching to poller
DaniPopes Feb 26, 2024
3f124c4
feat: reduce polling interval if provider is local
DaniPopes Feb 26, 2024
5372780
fix: import serde
DaniPopes Feb 26, 2024
ece484e
Merge branch 'main' into prestwich/block-heart
DaniPopes Feb 26, 2024
3e482e5
fix: export PendingTransaction
DaniPopes Feb 26, 2024
60362ec
Merge branch 'main' into prestwich/block-heart
DaniPopes Feb 28, 2024
9e72f67
Merge branch 'main' into prestwich/block-heart
DaniPopes Feb 28, 2024
e07b5f2
fix: PTX outputs hash, >1 waiting txs, fix initial block yield
DaniPopes Feb 28, 2024
e9396b2
nit
DaniPopes Mar 4, 2024
ad4b7d5
fix: pin the sleep
DaniPopes Mar 4, 2024
781df81
clippies
DaniPopes Mar 4, 2024
1cee23c
wasmm
DaniPopes Mar 4, 2024
f55312a
Merge branch 'main' into prestwich/block-heart
DaniPopes Mar 4, 2024
4a34ef6
fix: feature
DaniPopes Mar 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ jobs:
- name: cargo hack
run: |
cargo hack check --workspace --target wasm32-unknown-unknown \
--exclude alloy-contract \
--exclude alloy-node-bindings \
--exclude alloy-providers \
--exclude alloy-signer \
--exclude alloy-signer-aws \
--exclude alloy-signer-gcp \
--exclude alloy-signer-ledger \
--exclude alloy-signer-trezor \
--exclude alloy-node-bindings \
--exclude alloy-transport-ipc

feature-checks:
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ futures-executor = "0.3.29"
hyper = "0.14.27"
tokio = "1.33"
tokio-util = "0.7"
tokio-stream = "0.1.14"
tower = { version = "0.4.13", features = ["util"] }

tracing = "0.1.40"
Expand Down
4 changes: 2 additions & 2 deletions crates/contract/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{Error, Result};
use alloy_dyn_abi::{DynSolValue, FunctionExt, JsonAbiExt};
use alloy_json_abi::Function;
use alloy_primitives::{Address, Bytes, U256, U64};
use alloy_providers::provider::TempProvider;
use alloy_providers::tmp::TempProvider;
use alloy_rpc_types::{
request::{TransactionInput, TransactionRequest},
state::StateOverride,
Expand Down Expand Up @@ -456,7 +456,7 @@ mod tests {
use super::*;
use alloy_node_bindings::{Anvil, AnvilInstance};
use alloy_primitives::{address, b256, bytes, hex};
use alloy_providers::provider::{HttpProvider, Provider};
use alloy_providers::tmp::{HttpProvider, Provider};
use alloy_sol_types::sol;

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/contract/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{CallBuilder, Interface, Result};
use alloy_dyn_abi::DynSolValue;
use alloy_json_abi::{Function, JsonAbi};
use alloy_primitives::{Address, Selector};
use alloy_providers::provider::TempProvider;
use alloy_providers::tmp::TempProvider;

/// A handle to an Ethereum contract at a specific address.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ pub use call::*;
// NOTE: please avoid changing the API of this module due to its use in the `sol!` macro.
#[doc(hidden)]
pub mod private {
pub use alloy_providers::provider::TempProvider as Provider;
pub use alloy_providers::tmp::TempProvider as Provider;
}
2 changes: 1 addition & 1 deletion crates/json-rpc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{ErrorPayload, RpcReturn};
use serde_json::value::RawValue;

/// An RPC error.
#[derive(thiserror::Error, Debug)]
#[derive(Debug, thiserror::Error)]
pub enum RpcError<E, ErrResp = Box<RawValue>> {
/// Server returned an error response.
#[error("Server returned an error response: {0}")]
Expand Down
1 change: 0 additions & 1 deletion crates/json-rpc/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ where

// Params may be omitted if it is 0-sized
if sized_params {
// TODO: remove unwrap
map.serialize_entry("params", &self.params)?;
}

Expand Down
20 changes: 15 additions & 5 deletions crates/providers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,31 @@ exclude.workspace = true

[dependencies]
alloy-network.workspace = true
alloy-primitives.workspace = true
alloy-rpc-client = { workspace = true, features = ["reqwest"] }
alloy-rpc-types.workspace = true
alloy-rpc-trace-types.workspace = true
alloy-rpc-types.workspace = true
alloy-transport-http = { workspace = true, features = ["reqwest"] }
alloy-transport.workspace = true

alloy-primitives.workspace = true

async-stream = "0.3.5"
async-trait.workspace = true
auto_impl = "1.1.0"
futures.workspace = true
lru = "0.12.2"
reqwest.workspace = true
serde.workspace = true
thiserror.workspace = true
reqwest.workspace = true
auto_impl = "1.1.0"
tokio = { workspace = true, features = ["sync", "macros"] }
tracing.workspace = true

[dev-dependencies]
alloy-consensus.workspace = true
alloy-node-bindings.workspace = true
tokio = { version = "1.33.0", features = ["macros"] }
alloy-rlp.workspace = true
tokio = { workspace = true, features = ["macros"] }
tracing-subscriber = { workspace = true, features = ["fmt"] }

[features]
anvil = []
58 changes: 26 additions & 32 deletions crates/providers/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{NetworkRpcClient, Provider};
use crate::new::{Provider, RootProvider};
use alloy_network::Network;
use alloy_rpc_client::RpcClient;
use alloy_transport::Transport;
Expand All @@ -7,28 +7,27 @@ use std::marker::PhantomData;
/// A layering abstraction in the vein of [`tower::Layer`]
///
/// [`tower::Layer`]: https://docs.rs/tower/latest/tower/trait.Layer.html
pub trait ProviderLayer<P: Provider<N, T>, N: Network, T: Transport> {
pub trait ProviderLayer<P: Provider<N, T>, N: Network, T: Transport + Clone> {
type Provider: Provider<N, T>;

fn layer(&self, inner: P) -> Self::Provider;
}

pub struct Stack<T, Inner, Outer> {
pub struct Stack<Inner, Outer> {
inner: Inner,
outer: Outer,
_pd: std::marker::PhantomData<fn() -> T>,
}

impl<T, Inner, Outer> Stack<T, Inner, Outer> {
impl<Inner, Outer> Stack<Inner, Outer> {
/// Create a new `Stack`.
pub fn new(inner: Inner, outer: Outer) -> Self {
Stack { inner, outer, _pd: std::marker::PhantomData }
Stack { inner, outer }
}
}

impl<P, N, T, Inner, Outer> ProviderLayer<P, N, T> for Stack<T, Inner, Outer>
impl<P, N, T, Inner, Outer> ProviderLayer<P, N, T> for Stack<Inner, Outer>
where
T: Transport,
T: Transport + Clone,
N: Network,
P: Provider<N, T>,
Inner: ProviderLayer<P, N, T>,
Expand All @@ -49,14 +48,13 @@ where
/// around maintaining the network and transport types.
///
/// [`tower::ServiceBuilder`]: https://docs.rs/tower/latest/tower/struct.ServiceBuilder.html
pub struct ProviderBuilder<L, N = (), T = ()> {
pub struct ProviderBuilder<L, N = ()> {
layer: L,

transport: PhantomData<T>,
network: PhantomData<N>,
}

impl<L, N, T> ProviderBuilder<L, N, T> {
impl<L, N> ProviderBuilder<L, N> {
/// Add a layer to the stack being built. This is similar to
/// [`tower::ServiceBuilder::layer`].
///
Expand All @@ -70,12 +68,8 @@ impl<L, N, T> ProviderBuilder<L, N, T> {
/// [`tower::ServiceBuilder::layer`]: https://docs.rs/tower/latest/tower/struct.ServiceBuilder.html#method.layer
/// [`tower::ServiceBuilder`]: https://docs.rs/tower/latest/tower/struct.ServiceBuilder.html

pub fn layer<Inner>(self, layer: Inner) -> ProviderBuilder<Stack<T, Inner, L>> {
ProviderBuilder {
layer: Stack::new(layer, self.layer),
transport: PhantomData,
network: PhantomData,
}
pub fn layer<Inner>(self, layer: Inner) -> ProviderBuilder<Stack<Inner, L>> {
ProviderBuilder { layer: Stack::new(layer, self.layer), network: PhantomData }
}

/// Change the network.
Expand All @@ -87,34 +81,34 @@ impl<L, N, T> ProviderBuilder<L, N, T> {
/// ```rust,ignore
/// builder.network::<Arbitrum>()
/// ```
pub fn network<Net: Network>(self) -> ProviderBuilder<L, Net, T> {
ProviderBuilder { layer: self.layer, transport: self.transport, network: PhantomData }
pub fn network<Net: Network>(self) -> ProviderBuilder<L, Net> {
ProviderBuilder { layer: self.layer, network: PhantomData }
}

/// Finish the layer stack by providing a root [`RpcClient`], outputting
/// Finish the layer stack by providing a root [`Provider`], outputting
/// the final [`Provider`] type with all stack components.
///
/// This is a convenience function for
/// `ProviderBuilder::provider<NetworkRpcClient>`.
pub fn client(self, client: RpcClient<T>) -> L::Provider
pub fn provider<P, T>(self, provider: P) -> L::Provider
where
L: ProviderLayer<NetworkRpcClient<N, T>, N, T>,
L: ProviderLayer<P, N, T>,
P: Provider<N, T>,
T: Transport + Clone,
N: Network,
{
self.provider(NetworkRpcClient::from(client))
self.layer.layer(provider)
}

/// Finish the layer stack by providing a root [`Provider`], outputting
/// Finish the layer stack by providing a root [`RpcClient`], outputting
/// the final [`Provider`] type with all stack components.
pub fn provider<P>(self, provider: P) -> L::Provider
///
/// This is a convenience function for
/// `ProviderBuilder::provider<RpcClient>`.
pub fn on_client<T>(self, client: RpcClient<T>) -> L::Provider
where
L: ProviderLayer<P, N, T>,
P: Provider<N, T>,
T: Transport,
L: ProviderLayer<RootProvider<N, T>, N, T>,
T: Transport + Clone,
N: Network,
{
self.layer.layer(provider)
self.provider(RootProvider::new(client))
}
}

Expand Down
112 changes: 112 additions & 0 deletions crates/providers/src/chain.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use crate::{new::RootProviderInner, Provider, RootProvider, WeakProvider};
use alloy_network::Network;
use alloy_primitives::{BlockNumber, U64};
use alloy_rpc_client::{PollTask, WeakClient};
use alloy_rpc_types::Block;
use alloy_transport::{RpcError, Transport};
use async_stream::stream;
use futures::{Stream, StreamExt};
use lru::LruCache;
use std::{num::NonZeroUsize, sync::Arc, time::Duration};

/// The size of the block cache.
const BLOCK_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(10) };

/// Maximum number of retries for fetching a block.
const MAX_RETRIES: usize = 3;

/// Default block number for when we don't have a block yet.
const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;

pub(crate) struct ChainStreamPoller<P, T: Transport + Clone> {
provider: WeakProvider<P>,
poll_task: PollTask<T, (), U64>,
next_yield: BlockNumber,
known_blocks: LruCache<BlockNumber, Block>,
}

impl<N: Network, T: Transport + Clone> ChainStreamPoller<RootProviderInner<N, T>, T> {
pub(crate) fn from_root(p: &RootProvider<N, T>) -> Self {
let mut this = Self::new(Arc::downgrade(&p.inner), p.inner.weak_client());
if p.client().is_local() {
this.poll_task.set_poll_interval(Duration::from_secs(1));
}
this
}
}

impl<P, T: Transport + Clone> ChainStreamPoller<P, T> {
pub(crate) fn new(provider: WeakProvider<P>, client: WeakClient<T>) -> Self {
Self {
provider,
poll_task: PollTask::new(client, "eth_blockNumber", ()),
next_yield: NO_BLOCK_NUMBER,
known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
}
}

pub(crate) fn into_stream<N: Network>(mut self) -> impl Stream<Item = Block>
where
P: Provider<N, T>,
{
stream! {
let mut poll_task = self.poll_task.spawn().into_stream();
'task: loop {
// Clear any buffered blocks.
while let Some(known_block) = self.known_blocks.pop(&self.next_yield) {
debug!(number=self.next_yield, "yielding block");
self.next_yield += 1;
yield known_block;
}

// Get the tip.
let block_number = match poll_task.next().await {
Some(Ok(block_number)) => block_number,
Some(Err(err)) => {
// This is fine.
debug!(%err, "polling stream lagged");
continue 'task;
}
None => {
debug!("polling stream ended");
break 'task;
}
};
let block_number = block_number.to::<u64>();
if self.next_yield == NO_BLOCK_NUMBER {
assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
self.next_yield = block_number;
} else if block_number < self.next_yield {
debug!(block_number, self.next_yield, "not advanced yet");
continue 'task;
}

// Upgrade the provider.
let Some(provider) = self.provider.upgrade() else {
debug!("provider dropped");
break 'task;
};

// Then try to fill as many blocks as possible.
// TODO: Maybe use `join_all`
let mut retries = MAX_RETRIES;
for number in self.next_yield..=block_number {
debug!(number, "fetching block");
let block = match provider.get_block_by_number(number, false).await {
Ok(block) => block,
Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
debug!(number, %err, "failed to fetch block, retrying");
retries -= 1;
continue;
}
Err(err) => {
error!(number, %err, "failed to fetch block");
break 'task;
}
};
self.known_blocks.put(number, block);
}
}
}
}
}
Loading
Loading