Skip to content

Commit

Permalink
so with this, we should be able to write to the new cp_mapping table?
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Dec 12, 2024
1 parent 55ed9b9 commit f044e11
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
7 changes: 4 additions & 3 deletions crates/sui-indexer-alt-framework/src/handlers/cp_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ impl Processor for CpMapping {

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let cp = checkpoint.checkpoint_summary.sequence_number as i64;
let tx_lo = checkpoint.checkpoint_summary.network_total_transactions as i64
- checkpoint.transactions.len() as i64;
let tx_hi = checkpoint.checkpoint_summary.network_total_transactions as i64;
let network_total_transactions =
checkpoint.checkpoint_summary.network_total_transactions as i64;
let tx_lo = network_total_transactions - checkpoint.transactions.len() as i64;
let tx_hi = network_total_transactions;
let epoch = checkpoint.checkpoint_summary.epoch as i64;
Ok(vec![StoredCpMapping {
cp,
Expand Down
16 changes: 13 additions & 3 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use diesel::{
pg::Pg,
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use handlers::cp_mapping;
use handlers::cp_mapping::CpMapping;
use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
use pipeline::{
concurrent::{self, ConcurrentConfig},
sequential::{self, SequentialConfig},
Processor,
CommitterConfig, Processor,
};
use sui_pg_db::{Db, DbArgs};
use task::graceful_shutdown;
Expand Down Expand Up @@ -295,6 +295,16 @@ impl Indexer {
/// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided,
/// or will continue until it tracks the tip of the network.
pub async fn run(mut self) -> Result<JoinHandle<()>> {
// TODO: allow cp_mapping table committing to be configurable
// TODO: set pruner config to max retention of other pruned tables
// would require pulling those configs in
let layer = ConcurrentConfig {
committer: CommitterConfig::default(),
pruner: None,
checkpoint_lag: None,
};
self.concurrent_pipeline(CpMapping, layer).await?;

if let Some(enabled_pipelines) = self.enabled_pipelines {
ensure!(
enabled_pipelines.is_empty(),
Expand Down Expand Up @@ -377,7 +387,7 @@ impl Indexer {
);

if let Some(enabled_pipelines) = &mut self.enabled_pipelines {
if !enabled_pipelines.remove(P::NAME) {
if P::NAME != CpMapping::NAME && !enabled_pipelines.remove(P::NAME) {
info!(pipeline = P::NAME, "Skipping");
return Ok(None);
}
Expand Down

0 comments on commit f044e11

Please sign in to comment.