Skip to content

Commit

Permalink
feat(en): Take into account nonce from tx proxy (#995)
Browse files Browse the repository at this point in the history
## What ❔

Take into account nonce from TxProxy, now if tx was submitted but
miniblock was not synced yet. en will return correct nonce

## Why ❔

[EN doesn't consider
](https://github.com/matter-labs/zksync-era/blob/main/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs#L303)transactions
currently residing in the TxProxy when calculating pending nonce, which
results in returned nonces being incorrect t times
## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `zk spellcheck`.
- [ ] Linkcheck has been run via `zk linkcheck`.

---------

Signed-off-by: Danil <deniallugo@gmail.com>
Co-authored-by: Alex Ostrovski <aov@matterlabs.dev>
  • Loading branch information
Deniallugo and slowli authored Feb 20, 2024
1 parent 2a766a7 commit 22099cb
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 51 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion core/lib/dal/src/storage_web3_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, ops};
use zksync_types::{
get_code_key, get_nonce_key,
utils::{decompose_full_nonce, storage_key_for_standard_token_balance},
AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey,
AccountTreeId, Address, L1BatchNumber, MiniblockNumber, Nonce, StorageKey,
FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H256, U256,
};
use zksync_utils::h256_to_u256;
Expand Down Expand Up @@ -32,6 +32,30 @@ impl StorageWeb3Dal<'_, '_> {
Ok(decompose_full_nonce(full_nonce).0)
}

/// Returns the current *stored* nonces (i.e., w/o accounting for pending transactions) for the specified accounts.
pub async fn get_nonces_for_addresses(
&mut self,
addresses: &[Address],
) -> sqlx::Result<HashMap<Address, Nonce>> {
let nonce_keys: HashMap<_, _> = addresses
.iter()
.map(|address| (get_nonce_key(address).hashed_key(), *address))
.collect();

let res = self
.get_values(&nonce_keys.keys().copied().collect::<Vec<_>>())
.await?
.into_iter()
.filter_map(|(hashed_key, value)| {
let address = nonce_keys.get(&hashed_key)?;
let full_nonce = h256_to_u256(value);
let (nonce, _) = decompose_full_nonce(full_nonce);
Some((*address, Nonce(nonce.as_u32())))
})
.collect();
Ok(res)
}

pub async fn standard_token_historical_balance(
&mut self,
token_id: AccountTreeId,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl TxSender {
// We're running an external node: we have to proxy the transaction to the main node.
// But before we do that, save the tx to cache in case someone will request it
// Before it reaches the main node.
proxy.save_tx(tx.hash(), tx.clone()).await;
proxy.save_tx(tx.clone()).await;
proxy.submit_tx(&tx).await?;
// Now, after we are sure that the tx is on the main node, remove it from cache
// since we don't want to store txs that might have been replaced or otherwise removed
Expand Down
138 changes: 129 additions & 9 deletions core/lib/zksync_core/src/api_server/tx_sender/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,110 @@
use std::collections::HashMap;
use std::{
collections::{BTreeSet, HashMap},
future::Future,
sync::Arc,
time::Duration,
};

use tokio::sync::RwLock;
use tokio::sync::{watch, RwLock};
use zksync_dal::ConnectionPool;
use zksync_types::{
api::{BlockId, Transaction, TransactionDetails, TransactionId},
l2::L2Tx,
H256,
Address, Nonce, H256,
};
use zksync_web3_decl::{
error::{ClientRpcContext, EnrichedClientResult},
jsonrpsee::http_client::{HttpClient, HttpClientBuilder},
namespaces::{EthNamespaceClient, ZksNamespaceClient},
};

#[derive(Debug, Clone, Default)]
pub(crate) struct TxCache {
inner: Arc<RwLock<TxCacheInner>>,
}

#[derive(Debug, Default)]
struct TxCacheInner {
tx_cache: HashMap<H256, L2Tx>,
nonces_by_account: HashMap<Address, BTreeSet<Nonce>>,
}

impl TxCache {
async fn push(&self, tx: L2Tx) {
let mut inner = self.inner.write().await;
inner
.nonces_by_account
.entry(tx.initiator_account())
.or_default()
.insert(tx.nonce());
inner.tx_cache.insert(tx.hash(), tx);
}

async fn get_tx(&self, tx_hash: H256) -> Option<L2Tx> {
self.inner.read().await.tx_cache.get(&tx_hash).cloned()
}

async fn get_nonces_for_account(&self, account_address: Address) -> BTreeSet<Nonce> {
let inner = self.inner.read().await;
if let Some(nonces) = inner.nonces_by_account.get(&account_address) {
nonces.clone()
} else {
BTreeSet::new()
}
}

async fn remove_tx(&self, tx_hash: H256) {
self.inner.write().await.tx_cache.remove(&tx_hash);
// We intentionally don't change `nonces_by_account`; they should only be changed in response to new miniblocks
}

async fn run_updates(
self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);

loop {
if *stop_receiver.borrow() {
return Ok(());
}

let addresses: Vec<_> = {
// Split into 2 statements for readability.
let inner = self.inner.read().await;
inner.nonces_by_account.keys().copied().collect()
};
let mut storage = pool.access_storage_tagged("api").await?;
let nonces_for_accounts = storage
.storage_web3_dal()
.get_nonces_for_addresses(&addresses)
.await?;
drop(storage); // Don't hold both `storage` and lock on `inner` at the same time.

let mut inner = self.inner.write().await;
inner.nonces_by_account.retain(|address, account_nonces| {
let stored_nonce = nonces_for_accounts
.get(address)
.copied()
.unwrap_or(Nonce(0));
// Retain only nonces starting from the stored one.
*account_nonces = account_nonces.split_off(&stored_nonce);
// If we've removed all nonces, drop the account entry so we don't request stored nonces for it later.
!account_nonces.is_empty()
});
drop(inner);

tokio::time::sleep(UPDATE_INTERVAL).await;
}
}
}

/// Used by external node to proxy transaction to the main node
/// and store them while they're not synced back yet
#[derive(Debug)]
pub struct TxProxy {
tx_cache: RwLock<HashMap<H256, L2Tx>>,
tx_cache: TxCache,
client: HttpClient,
}

Expand All @@ -25,20 +113,43 @@ impl TxProxy {
let client = HttpClientBuilder::default().build(main_node_url).unwrap();
Self {
client,
tx_cache: RwLock::new(HashMap::new()),
tx_cache: TxCache::default(),
}
}

pub async fn find_tx(&self, tx_hash: H256) -> Option<L2Tx> {
self.tx_cache.read().await.get(&tx_hash).cloned()
self.tx_cache.get_tx(tx_hash).await
}

pub async fn forget_tx(&self, tx_hash: H256) {
self.tx_cache.write().await.remove(&tx_hash);
self.tx_cache.remove_tx(tx_hash).await;
}

pub async fn save_tx(&self, tx: L2Tx) {
self.tx_cache.push(tx).await;
}

pub async fn save_tx(&self, tx_hash: H256, tx: L2Tx) {
self.tx_cache.write().await.insert(tx_hash, tx);
pub async fn get_nonces_by_account(&self, account_address: Address) -> BTreeSet<Nonce> {
self.tx_cache.get_nonces_for_account(account_address).await
}

pub async fn next_nonce_by_initiator_account(
&self,
account_address: Address,
current_nonce: u32,
) -> Nonce {
let mut pending_nonce = Nonce(current_nonce);
let nonces = self.get_nonces_by_account(account_address).await;
for nonce in nonces.range(pending_nonce + 1..) {
// If nonce is not sequential, then we should not increment nonce.
if nonce == &pending_nonce {
pending_nonce += 1;
} else {
break;
}
}

pending_nonce
}

pub async fn submit_tx(&self, tx: &L2Tx) -> EnrichedClientResult<H256> {
Expand Down Expand Up @@ -91,4 +202,13 @@ impl TxProxy {
.with_arg("hash", &hash)
.await
}

pub fn run_account_nonce_sweeper(
&self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> impl Future<Output = anyhow::Result<()>> {
let tx_cache = self.tx_cache.clone();
tx_cache.run_updates(pool, stop_receiver)
}
}
29 changes: 16 additions & 13 deletions core/lib/zksync_core/src/api_server/web3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ struct OptionalApiParams {
#[derive(Debug)]
struct FullApiParams {
pool: ConnectionPool,
last_miniblock_pool: ConnectionPool,
updaters_pool: ConnectionPool,
config: InternalApiConfig,
transport: ApiTransport,
tx_sender: TxSender,
Expand All @@ -135,7 +135,7 @@ struct FullApiParams {
#[derive(Debug)]
pub struct ApiBuilder {
pool: ConnectionPool,
last_miniblock_pool: ConnectionPool,
updaters_pool: ConnectionPool,
config: InternalApiConfig,
polling_interval: Duration,
// Mandatory params that must be set using builder methods.
Expand All @@ -153,7 +153,7 @@ impl ApiBuilder {

pub fn jsonrpsee_backend(config: InternalApiConfig, pool: ConnectionPool) -> Self {
Self {
last_miniblock_pool: pool.clone(),
updaters_pool: pool.clone(),
pool,
config,
polling_interval: Self::DEFAULT_POLLING_INTERVAL,
Expand All @@ -175,11 +175,12 @@ impl ApiBuilder {
self
}

/// Configures a dedicated DB pool to be used for updating the latest miniblock information
/// Configures a dedicated DB pool to be used for updating different information,
/// such as last mined block number or account nonces. This pool is used to execute
/// in a background task. If not called, the main pool will be used. If the API server is under high load,
/// it may make sense to supply a single-connection pool to reduce pool contention with the API methods.
pub fn with_last_miniblock_pool(mut self, pool: ConnectionPool) -> Self {
self.last_miniblock_pool = pool;
pub fn with_updaters_pool(mut self, pool: ConnectionPool) -> Self {
self.updaters_pool = pool;
self
}

Expand Down Expand Up @@ -247,7 +248,7 @@ impl ApiBuilder {
fn into_full_params(self) -> anyhow::Result<FullApiParams> {
Ok(FullApiParams {
pool: self.pool,
last_miniblock_pool: self.last_miniblock_pool,
updaters_pool: self.updaters_pool,
config: self.config,
transport: self.transport.context("API transport not set")?,
tx_sender: self.tx_sender.context("Transaction sender not set")?,
Expand Down Expand Up @@ -278,10 +279,7 @@ impl FullApiParams {
self,
last_sealed_miniblock: SealedMiniblockNumber,
) -> anyhow::Result<RpcState> {
let mut storage = self
.last_miniblock_pool
.access_storage_tagged("api")
.await?;
let mut storage = self.updaters_pool.access_storage_tagged("api").await?;
let start_info = BlockStartInfo::new(&mut storage).await?;
drop(storage);

Expand Down Expand Up @@ -409,12 +407,17 @@ impl FullApiParams {
let (health_check, health_updater) = ReactiveHealthCheck::new(health_check_name);

let (last_sealed_miniblock, update_task) = SealedMiniblockNumber::new(
self.last_miniblock_pool.clone(),
self.updaters_pool.clone(),
SEALED_MINIBLOCK_UPDATE_INTERVAL,
stop_receiver.clone(),
);
let mut tasks = vec![tokio::spawn(update_task)];

let mut tasks = vec![tokio::spawn(update_task)];
if let Some(tx_proxy) = &self.tx_sender.0.proxy {
let task = tx_proxy
.run_account_nonce_sweeper(self.updaters_pool.clone(), stop_receiver.clone());
tasks.push(tokio::spawn(task));
}
let pub_sub = if matches!(transport, ApiTransport::WebSocket(_))
&& self.namespaces.contains(&Namespace::Pubsub)
{
Expand Down
28 changes: 21 additions & 7 deletions core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use zksync_types::{
l2::{L2Tx, TransactionType},
transaction_request::CallRequest,
utils::decompose_full_nonce,
web3,
web3::types::{FeeHistory, SyncInfo, SyncState},
web3::{
self,
types::{FeeHistory, SyncInfo, SyncState},
},
AccountTreeId, Bytes, MiniblockNumber, StorageKey, H256, L2_ETH_TOKEN_ADDRESS, U256,
};
use zksync_utils::u256_to_h256;
Expand Down Expand Up @@ -302,6 +304,7 @@ impl EthNamespace {
const METHOD_NAME: &str = "get_block_transaction_count";

let method_latency = API_METRICS.start_block_call(METHOD_NAME, block_id);

self.state.start_info.ensure_not_pruned(block_id)?;
let tx_count = self
.state
Expand Down Expand Up @@ -482,11 +485,22 @@ impl EthNamespace {
if matches!(block_id, BlockId::Number(BlockNumber::Pending)) {
let account_nonce_u64 = u64::try_from(account_nonce)
.map_err(|err| internal_error(method_name, anyhow::anyhow!(err)))?;
account_nonce = connection
.transactions_web3_dal()
.next_nonce_by_initiator_account(address, account_nonce_u64)
.await
.map_err(|err| internal_error(method_name, err))?;
account_nonce = if let Some(proxy) = &self.state.tx_sender.0.proxy {
// EN: get pending nonces from the transaction cache
// We don't have mempool in EN, it's safe to use the proxy cache as a mempool
proxy
.next_nonce_by_initiator_account(address, account_nonce_u64 as u32)
.await
.0
.into()
} else {
// Main node: get pending nonces from the mempool
connection
.transactions_web3_dal()
.next_nonce_by_initiator_account(address, account_nonce_u64)
.await
.map_err(|err| internal_error(method_name, err))?
};
}

let block_diff = self.state.last_sealed_miniblock.diff(block_number);
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/consensus/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ impl StateKeeperRunner {
let (stop_sender, stop_receiver) = sync::watch::channel(false);
let (miniblock_sealer, miniblock_sealer_handle) =
MiniblockSealer::new(self.pool.clone(), 5);

let io = ExternalIO::new(
miniblock_sealer_handle,
self.pool.clone(),
Expand Down
Loading

0 comments on commit 22099cb

Please sign in to comment.