Skip to content

Commit

Permalink
Merge #2363
Browse files Browse the repository at this point in the history
2363: Add inmemory witness generator cache r=Deniallugo a=Deniallugo



Co-authored-by: Danil <deniallugo@gmail.com>
  • Loading branch information
bors-matterlabs-dev[bot] and Deniallugo authored Mar 25, 2023
2 parents 051d1af + cea211e commit aac9319
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 25 deletions.
5 changes: 0 additions & 5 deletions core/bin/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,6 @@ impl ZkSyncStateKeeper {
.collect(),
);

vlog::debug!(
"Requesting new miniblock from mempool. {} txs are excluded from search. {} chunks left",
executed_txs.len(),
self.pending_block.chunks_left
);
let mempool_req = MempoolBlocksRequest::GetBlock(GetBlockRequest {
last_priority_op_number: self.pending_block.unprocessed_priority_op_current,
block_timestamp,
Expand Down
1 change: 1 addition & 0 deletions core/bin/zksync_witness_generator/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl DatabaseInterface for Database {
.remove_old_account_tree_cache(block - NUMBER_OF_STORED_ACCOUNT_TREE_CACHE)
.await?;
}

Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions core/bin/zksync_witness_generator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Built-in
use std::collections::BTreeMap;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -421,6 +422,8 @@ pub fn run_prover_server<DB: DatabaseInterface>(
};

// Start pool maintainer threads.
let cache = Arc::new(RwLock::new(BTreeMap::default()));

for offset in 0..witness_generator_opts.witness_generators {
let start_block = (last_verified_block + offset + 1) as u32;
let block_step = witness_generator_opts.witness_generators as u32;
Expand All @@ -429,11 +432,14 @@ pub fn run_prover_server<DB: DatabaseInterface>(
start_block,
block_step
);
let start_wait = witness_generator_opts.prepare_data_interval() * offset as u32;
let pool_maintainer = witness_generator::WitnessGenerator::new(
database.clone(),
witness_generator_opts.prepare_data_interval(),
start_wait,
BlockNumber(start_block),
BlockNumber(block_step),
cache.clone(),
);
pool_maintainer.start(panic_sender.clone());
}
Expand Down
92 changes: 72 additions & 20 deletions core/bin/zksync_witness_generator/src/witness_generator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::time::Instant;
// Built-in
use std::collections::BTreeMap;
use std::ops::Bound::{Included, Unbounded};
use std::sync::Arc;
use std::{thread, time};
// External
use futures::channel::mpsc;
use tokio::sync::RwLock;
use tokio::time::sleep;
use zksync_crypto::merkle_tree::parallel_smt::SparseMerkleTreeSerializableCacheBN256;
// Workspace deps
Expand All @@ -28,6 +32,8 @@ pub struct WitnessGenerator<DB: DatabaseInterface> {

start_block: BlockNumber,
block_step: BlockNumber,
start_wait: time::Duration,
cached_account_tree: Arc<RwLock<BTreeMap<BlockNumber, SparseMerkleTreeSerializableCacheBN256>>>,
}

#[derive(Debug)]
Expand All @@ -42,14 +48,20 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {
pub fn new(
database: DB,
rounds_interval: time::Duration,
start_wait: time::Duration,
start_block: BlockNumber,
block_step: BlockNumber,
cached_account_tree: Arc<
RwLock<BTreeMap<BlockNumber, SparseMerkleTreeSerializableCacheBN256>>,
>,
) -> Self {
Self {
database,
rounds_interval,
start_block,
block_step,
start_wait,
cached_account_tree,
}
}

Expand Down Expand Up @@ -101,19 +113,38 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {
Ok(block_info)
}

async fn load_account_tree_cache(
&mut self,
block: BlockNumber,
) -> anyhow::Result<Option<(BlockNumber, SparseMerkleTreeSerializableCacheBN256)>> {
let cache = self.cached_account_tree.read().await;
if let Some((block, cache)) = cache.range((Unbounded, Included(block))).next_back() {
metrics::increment_counter!("witness_generator.cache_access", "type" => "hit_in_memory");
return Ok(Some((*block, cache.clone())));
}
drop(cache);
let mut storage = self.database.acquire_connection().await?;
if let Some((block, cache)) = self.database.load_account_tree_cache(&mut storage).await? {
let cache = SparseMerkleTreeSerializableCacheBN256::decode_bincode(&cache);
self.cached_account_tree
.write()
.await
.insert(block, cache.clone());
return Ok(Some((block, cache)));
}
Ok(None)
}
async fn load_account_tree(
&self,
&mut self,
block: BlockNumber,
) -> Result<CircuitAccountTree, anyhow::Error> {
let fn_start = Instant::now();

let mut storage = self.database.acquire_connection().await?;

let start = Instant::now();
let cache = self.load_account_tree_cache(block).await?;
let mut circuit_account_tree = CircuitAccountTree::new(account_tree_depth());
let cache = self.database.load_account_tree_cache(&mut storage).await?;
metrics::histogram!("witness_generator", start.elapsed(), "stage" => "load_cache");

let mut storage = self.database.acquire_connection().await?;
let start = Instant::now();
if let Some((cached_block, account_tree_cache)) = cache {
let (_, accounts) = self
Expand All @@ -123,9 +154,7 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {
for (id, account) in accounts {
circuit_account_tree.insert(*id, account.into());
}
circuit_account_tree.set_internals(
SparseMerkleTreeSerializableCacheBN256::decode_bincode(&account_tree_cache),
);
circuit_account_tree.set_internals(account_tree_cache);
if block != cached_block {
// There is no relevant cache, so we have to use some outdated cache and update the tree.
if *block == *cached_block + 1 {
Expand Down Expand Up @@ -161,12 +190,17 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {
metrics::histogram!("witness_generator", start.elapsed(), "stage" => "recreate_tree_from_cache");

let start = Instant::now();
let tree_cache = circuit_account_tree.get_internals().encode_bincode();
let internal_cache = circuit_account_tree.get_internals();
self.cached_account_tree
.write()
.await
.insert(block, internal_cache.clone());
let tree_cache = internal_cache.encode_bincode();
metrics::histogram!("tree_cache_size", tree_cache.len() as f64);

self.database
.store_account_tree_cache(&mut storage, block, tree_cache)
.await?;

metrics::histogram!("witness_generator", start.elapsed(), "stage" => "store_cache");
} else {
// There exists a cache for the block we are interested in.
Expand All @@ -183,13 +217,14 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {
}
circuit_account_tree.root_hash();

let internal_cache = circuit_account_tree.get_internals();
self.cached_account_tree
.write()
.await
.insert(block, internal_cache.clone());
let tree_cache = internal_cache.encode_bincode();
metrics::histogram!("witness_generator", start.elapsed(), "stage" => "recreate_tree_from_scratch");

let start = Instant::now();
let tree_cache = circuit_account_tree.get_internals().encode_bincode();
metrics::histogram!("tree_cache_size", tree_cache.len() as f64);
metrics::histogram!("witness_generator", start.elapsed(), "stage" => "serialize_cache");

let start = Instant::now();
self.database
.store_account_tree_cache(&mut storage, block, tree_cache)
Expand All @@ -210,15 +245,26 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {
"account tree root hash restored incorrectly"
);
}
self.remove_cache(block).await;
metrics::histogram!("witness_generator", start.elapsed(), "stage" => "ensure_root_hash");

metrics::histogram!("witness_generator", fn_start.elapsed(), "stage" => "load_account_tree");
Ok(circuit_account_tree)
}

async fn prepare_witness_and_save_it(&self, block: Block) -> anyhow::Result<()> {
/// Remove old account tree cache we want to keep more than step just to make sure that we won't go to the database
async fn remove_cache(&self, block: BlockNumber) {
let mut cache = self.cached_account_tree.write().await;
let keys: Vec<_> = cache
.range((Unbounded, Included(block - 2 * self.block_step.0)))
.map(|(key, _)| *key)
.collect();
for key in keys {
cache.remove(&key);
}
}

async fn prepare_witness_and_save_it(&mut self, block: Block) -> anyhow::Result<()> {
let fn_start = Instant::now();
let mut storage = self.database.acquire_connection().await?;

let start = Instant::now();
let mut circuit_account_tree = self.load_account_tree(block.block_number - 1).await?;
Expand All @@ -229,6 +275,7 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {
metrics::histogram!("witness_generator", start.elapsed(), "stage" => "build_witness");

let start = Instant::now();
let mut storage = self.database.acquire_connection().await?;
self.database
.store_witness(
&mut storage,
Expand Down Expand Up @@ -264,7 +311,7 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {

/// Updates witness data in database in an infinite loop,
/// awaiting `rounds_interval` time between updates.
async fn maintain(self) {
async fn maintain(mut self) {
vlog::info!(
"preparing prover data routine started with start_block({}), block_step({})",
*self.start_block,
Expand All @@ -277,8 +324,12 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {
metrics::register_counter!("witness_generator.cache_access", "type" => "miss");

let mut current_block = self.start_block;
// The first step of each job is downloading the account tree cache. And it takes a lot of time.
// But we have a lot of witness-generators on one machine and they all start at the same time.
// So we are waiting some time before starting the first job, for better utilization CPU resources.
// and do not waste the time on downloading the cache.
sleep(self.start_wait).await;
loop {
sleep(self.rounds_interval).await;
let should_work = match self.should_work_on_block(current_block).await {
Ok(should_work) => should_work,
Err(err) => {
Expand All @@ -299,6 +350,7 @@ impl<DB: DatabaseInterface> WitnessGenerator<DB> {

// Update current block.
current_block = next_block;
sleep(self.rounds_interval).await;
}
}
}
Expand Down

0 comments on commit aac9319

Please sign in to comment.