Skip to content

Commit

Permalink
Improve logging
Browse files Browse the repository at this point in the history
Signed-off-by: gcarq <egger.m@protonmail.com>
  • Loading branch information
gcarq committed Apr 17, 2023
1 parent 3c7a8c1 commit 213de2d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 35 deletions.
9 changes: 1 addition & 8 deletions src/blockchain/parser/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use crate::blockchain::parser::types::CoinType;
use crate::blockchain::proto::block::Block;
use crate::common::utils;
use crate::errors::OpResult;
use crate::{BlockHeightRange, ParserOptions};
use crate::ParserOptions;

/// Manages the index and data of longest valid chain
pub struct ChainStorage {
chain_index: ChainIndex,
blk_files: HashMap<u64, BlkFile>, // maps blk_index to BlkFile
coin_type: CoinType,
verify: bool,
range: BlockHeightRange,
pub cur_height: u64,
}

Expand All @@ -24,7 +23,6 @@ impl ChainStorage {
chain_index: ChainIndex::new(options)?,
blk_files: BlkFile::from_path(options.blockchain_dir.as_path())?,
cur_height: options.range.start,
range: options.range,
coin_type: options.coin_type.clone(),
verify: options.verify,
})
Expand All @@ -34,11 +32,6 @@ impl ChainStorage {
pub fn advance(&mut self) -> Option<(Block, u64)> {
// Check range configured params
let height = self.cur_height;
if let Some(end) = self.range.end {
if height == end {
return None;
}
}

// Read block
let block_meta = self.chain_index.get(height)?;
Expand Down
25 changes: 21 additions & 4 deletions src/blockchain/parser/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ const BLOCK_HAVE_DATA: u64 = 8;

/// Holds the index of longest valid chain
pub struct ChainIndex {
max_height: u64,
block_index: HashMap<u64, BlockIndexRecord>,
max_height_blk_index: HashMap<u64, u64>, // Maps blk_index to max_height found in the file
max_height: u64,
}

impl ChainIndex {
pub fn new(options: &ParserOptions) -> OpResult<Self> {
let path = options.blockchain_dir.join("index");
let block_index = get_block_index(&path)?;
let mut block_index = get_block_index(&path)?;
let mut max_height_blk_index = HashMap::new();

for (height, index_record) in &block_index {
Expand All @@ -38,10 +38,27 @@ impl ChainIndex {
}
}

let max_height = *block_index.keys().max().unwrap();
let min_height = options.range.start;
let max_known_height = *block_index.keys().max().unwrap();
let max_height = match options.range.end {
Some(height) if height < max_known_height => height,
Some(_) | None => max_known_height,
};

// Filter to only keep relevant block index
if !options.range.is_default() {
info!(target: "index", "Trimming block index from height {} to {} ...", min_height, max_height);
block_index = block_index
.into_iter()
.filter(|(height, _)| {
*height >= min_height.saturating_sub(1) && *height <= max_height
})
.collect();
}

Ok(Self {
block_index,
max_height,
block_index,
max_height_blk_index,
})
}
Expand Down
45 changes: 23 additions & 22 deletions src/blockchain/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ pub mod types;

/// Small struct to hold statistics together
struct WorkerStats {
pub t_started: Instant,
pub t_last_log: Instant,
pub t_measure_frame: Duration,
pub started_at: Instant,
pub last_log: Instant,
pub last_height: u64,
pub measure_frame: Duration,
}

impl Default for WorkerStats {
fn default() -> Self {
impl WorkerStats {
fn new(start_range: u64) -> Self {
Self {
t_started: Instant::now(),
t_last_log: Instant::now(),
t_measure_frame: Duration::from_secs(10),
started_at: Instant::now(),
last_log: Instant::now(),
last_height: start_range,
measure_frame: Duration::from_secs(10),
}
}
}
Expand All @@ -40,10 +42,10 @@ pub struct BlockchainParser {
impl BlockchainParser {
/// Instantiates a new Parser but does not start the workers.
pub fn new(options: ParserOptions, chain_storage: ChainStorage) -> Self {
info!(target: "parser", "Parsing {} blockchain (range={}) ...", options.coin_type.name, options.range);
info!(target: "parser", "Parsing {} blockchain ...", options.coin_type.name);
Self {
chain_storage,
stats: WorkerStats::default(),
stats: WorkerStats::new(options.range.start),
callback: options.callback,
coin_type: options.coin_type,
}
Expand All @@ -56,15 +58,15 @@ impl BlockchainParser {
while let Some((block, height)) = self.chain_storage.advance() {
self.on_block(&block, height)?;
}
self.on_complete(self.chain_storage.cur_height)
self.on_complete(self.chain_storage.cur_height.saturating_sub(1))
}

/// Triggers the on_start() callback and initializes state.
fn on_start(&mut self, height: u64) -> OpResult<()> {
let now = Instant::now();
self.stats.t_started = now;
self.stats.t_last_log = now;
info!(target: "parser", "Starting to process blocks starting from height {} ...", height);
self.stats.started_at = now;
self.stats.last_log = now;
info!(target: "parser", "Processing blocks starting from height {} ...", height);
self.callback.on_start(&self.coin_type, height)?;
trace!(target: "parser", "on_start() called");
Ok(())
Expand All @@ -83,7 +85,7 @@ impl BlockchainParser {
/// Triggers the on_complete() callback and updates statistics.
fn on_complete(&mut self, height: u64) -> OpResult<()> {
info!(target: "parser", "Done. Processed blocks up to height {} in {:.2} minutes.",
height, (Instant::now() - self.stats.t_started).as_secs_f32() / 60.0);
height, (Instant::now() - self.stats.started_at).as_secs_f32() / 60.0);

self.callback.on_complete(height)?;
trace!(target: "parser", "on_complete() called");
Expand All @@ -92,14 +94,13 @@ impl BlockchainParser {

fn print_progress(&mut self, height: u64) {
let now = Instant::now();
let blocks_sec = height
.checked_div((now - self.stats.t_started).as_secs())
.unwrap_or(height);
let blocks_speed = (height - self.stats.last_height) / self.stats.measure_frame.as_secs();

if now - self.stats.t_last_log > self.stats.t_measure_frame {
info!(target: "parser", "Status: {:6} Blocks processed. (left: {:6}, avg: {:5.2} blocks/sec)",
height, self.chain_storage.remaining(), blocks_sec);
self.stats.t_last_log = now;
if now - self.stats.last_log > self.stats.measure_frame {
info!(target: "parser", "Status: {:7} Blocks processed. (remaining: {:7}, speed: {:5.2} blocks/s)",
height, self.chain_storage.remaining(), blocks_speed);
self.stats.last_log = now;
self.stats.last_height = height;
}
}
}
9 changes: 8 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ impl BlockHeightRange {
}
Ok(Self { start, end })
}

pub fn is_default(&self) -> bool {
self.start == 0 && self.end.is_none()
}
}

impl fmt::Display for BlockHeightRange {
Expand Down Expand Up @@ -94,6 +98,9 @@ fn main() {
SimpleLogger::init(log_level).expect("Unable to initialize logger!");
info!(target: "main", "Starting rusty-blockparser v{} ...", env!("CARGO_PKG_VERSION"));
debug!(target: "main", "Using log level {}", log_level);
if options.verify {
info!(target: "main", "Configured to verify merkle roots and block hashes");
}

let chain_storage = match ChainStorage::new(&options) {
Ok(storage) => storage,
Expand Down Expand Up @@ -136,7 +143,7 @@ fn parse_args() -> OpResult<ParserOptions> {
// Add flags
.arg(Arg::with_name("verify")
.long("verify")
.help("Verifies the leveldb index integrity and verifies merkle roots"))
.help("Verifies merkle roots and block hashes"))
.arg(Arg::with_name("verbosity")
.short("v")
.multiple(true)
Expand Down

0 comments on commit 213de2d

Please sign in to comment.