Skip to content

Commit

Permalink
[indexer] set up ingestion test (#15242)
Browse files Browse the repository at this point in the history
## Description 

Adds tests for indexer v2 ingestion and populating of the transactions
table.

## Test Plan 

PR is test itself.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
emmazzz authored Dec 7, 2023
1 parent 01a0315 commit 0f0d7cb
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 138 deletions.
1 change: 1 addition & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ jobs:
cargo nextest run --test-threads 1 --package sui-graphql-rpc --test e2e_tests --test examples_validation_tests --features pg_integration
cargo nextest run --test-threads 1 --package sui-graphql-e2e-tests --features pg_integration
cargo nextest run --test-threads 1 --package sui-cluster-test --test local_cluster_test --features pg_integration
cargo nextest run --test-threads 1 --package sui-indexer --test ingestion_tests --features pg_integration
env:
POSTGRES_HOST: localhost
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 47 additions & 35 deletions crates/simulacrum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ use self::store::in_mem_store::KeyStore;
pub use self::store::SimulatorStore;
use sui_types::mock_checkpoint_builder::{MockCheckpointBuilder, ValidatorKeypairProvider};

use shared_crypto::intent::Intent;
use sui_types::{
gas_coin::GasCoin,
programmable_transaction_builder::ProgrammableTransactionBuilder,
transaction::{GasData, TransactionData, TransactionKind},
};
mod epoch_state;
pub mod store;

Expand Down Expand Up @@ -404,18 +410,50 @@ impl<T, V: store::SimulatorStore> ObjectStore for Simulacrum<T, V> {
}
}

impl Simulacrum {
/// Generate a random transfer transaction.
/// TODO: This is here today to make it easier to write tests. But we should utilize all the
/// existing code for generating transactions in sui-test-transaction-builder by defining a trait
/// that both WalletContext and Simulacrum implement. Then we can remove this function.
pub fn transfer_txn(&mut self, recipient: SuiAddress) -> (Transaction, u64) {
let (sender, key) = self.keystore().accounts().next().unwrap();
let sender = *sender;

let object = self
.store()
.owned_objects(sender)
.find(|object| object.is_gas_coin())
.unwrap();
let gas_coin = GasCoin::try_from(&object).unwrap();
let transfer_amount = gas_coin.value() / 2;

let pt = {
let mut builder = ProgrammableTransactionBuilder::new();
builder.transfer_sui(recipient, Some(transfer_amount));
builder.finish()
};

let kind = TransactionKind::ProgrammableTransaction(pt);
let gas_data = GasData {
payment: vec![object.compute_object_reference()],
owner: sender,
price: self.reference_gas_price(),
budget: 1_000_000_000,
};
let tx_data = TransactionData::new_with_gas_data(kind, sender, gas_data);
let tx = Transaction::from_data_and_signer(tx_data, Intent::sui_transaction(), vec![key]);
(tx, transfer_amount)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use rand::{rngs::StdRng, SeedableRng};
use shared_crypto::intent::Intent;
use sui_types::{
base_types::SuiAddress,
effects::TransactionEffectsAPI,
gas_coin::GasCoin,
programmable_transaction_builder::ProgrammableTransactionBuilder,
transaction::{GasData, TransactionData, TransactionKind},
base_types::SuiAddress, effects::TransactionEffectsAPI, gas_coin::GasCoin,
transaction::TransactionDataAPI,
};

use super::*;
Expand Down Expand Up @@ -489,36 +527,10 @@ mod tests {
#[test]
fn transfer() {
let mut sim = Simulacrum::new();
let recipient = SuiAddress::generate(sim.rng());
let (sender, key) = sim.keystore().accounts().next().unwrap();
let sender = *sender;

let object = sim
.store()
.owned_objects(sender)
.find(|object| object.is_gas_coin())
.unwrap();
let gas_coin = GasCoin::try_from(&object).unwrap();
let gas_id = object.id();
let transfer_amount = gas_coin.value() / 2;

gas_coin.value();
let pt = {
let mut builder = ProgrammableTransactionBuilder::new();
builder.transfer_sui(recipient, Some(transfer_amount));
builder.finish()
};

let kind = TransactionKind::ProgrammableTransaction(pt);
let gas_data = GasData {
payment: vec![object.compute_object_reference()],
owner: sender,
price: sim.reference_gas_price(),
budget: 1_000_000_000,
};
let tx_data = TransactionData::new_with_gas_data(kind, sender, gas_data);
let tx = Transaction::from_data_and_signer(tx_data, Intent::sui_transaction(), vec![key]);
let recipient = SuiAddress::random_for_testing_only();
let (tx, transfer_amount) = sim.transfer_txn(recipient);

let gas_id = tx.data().transaction_data().gas_data().payment[0].0;
let effects = sim.execute_transaction(tx).unwrap().0;
let gas_summary = effects.gas_cost_summary();
let gas_paid = gas_summary.net_gas_usage();
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-cluster-test/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::path::Path;
use sui_config::Config;
use sui_config::{PersistedConfig, SUI_KEYSTORE_FILENAME, SUI_NETWORK_CONFIG};
use sui_graphql_rpc::config::ConnectionConfig;
use sui_graphql_rpc::test_infra::cluster::{start_graphql_server, start_test_indexer_v2};
use sui_indexer::test_utils::start_test_indexer;
use sui_graphql_rpc::test_infra::cluster::start_graphql_server;
use sui_indexer::test_utils::{start_test_indexer, start_test_indexer_v2};
use sui_indexer::IndexerConfig;
use sui_keys::keystore::{AccountKeystore, FileBasedKeystore, Keystore};
use sui_sdk::sui_client_config::{SuiClientConfig, SuiEnv};
Expand Down
80 changes: 1 addition & 79 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,18 @@ use crate::client::simple_client::SimpleClient;
use crate::config::ConnectionConfig;
use crate::config::ServerConfig;
use crate::server::graphiql_server::start_graphiql_server;
use mysten_metrics::init_metrics;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use sui_indexer::errors::IndexerError;
use sui_indexer::indexer_v2::IndexerV2;
use sui_indexer::metrics::IndexerMetrics;
use sui_indexer::new_pg_connection_pool_impl;
use sui_indexer::store::indexer_store_v2::IndexerStoreV2;
use sui_indexer::store::PgIndexerStoreV2;
use sui_indexer::utils::reset_database;
use sui_indexer::IndexerConfig;
use sui_indexer::test_utils::start_test_indexer_v2;
use sui_rest_api::node_state_getter::NodeStateGetter;
use sui_swarm_config::genesis_config::{AccountConfig, DEFAULT_GAS_AMOUNT};
use test_cluster::TestCluster;
use test_cluster::TestClusterBuilder;
use tokio::task::JoinHandle;
use tracing::info;

const VALIDATOR_COUNT: usize = 7;
const EPOCH_DURATION_MS: u64 = 15000;
Expand Down Expand Up @@ -170,77 +163,6 @@ async fn start_validator_with_fullnode(internal_data_source_rpc_port: Option<u16
test_cluster_builder.build().await
}

pub async fn start_test_indexer_v2(
db_url: Option<String>,
rpc_url: String,
reader_mode_rpc_url: Option<String>,
use_indexer_experimental_methods: bool,
) -> (PgIndexerStoreV2, JoinHandle<Result<(), IndexerError>>) {
// Reduce the connection pool size to 20 for testing
// to prevent maxing out
info!("Setting DB_POOL_SIZE to 20");
std::env::set_var("DB_POOL_SIZE", "20");

let db_url = db_url.unwrap_or_else(|| {
let pg_host = env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".into());
let pg_port = env::var("POSTGRES_PORT").unwrap_or_else(|_| "32770".into());
let pw = env::var("POSTGRES_PASSWORD").unwrap_or_else(|_| "postgrespw".into());
format!("postgres://postgres:{pw}@{pg_host}:{pg_port}")
});

let migrated_methods = if use_indexer_experimental_methods {
IndexerConfig::all_implemented_methods()
} else {
vec![]
};

// Default weiter mode
let mut config = IndexerConfig {
db_url: Some(db_url.clone()),
rpc_client_url: rpc_url,
migrated_methods,
reset_db: true,
fullnode_sync_worker: true,
rpc_server_worker: false,
use_v2: true,
..Default::default()
};

if let Some(reader_mode_rpc_url) = &reader_mode_rpc_url {
let reader_mode_rpc_url = reader_mode_rpc_url
.parse::<SocketAddr>()
.expect("Unable to parse fullnode address");
config.fullnode_sync_worker = false;
config.rpc_server_worker = true;
config.rpc_server_url = reader_mode_rpc_url.ip().to_string();
config.rpc_server_port = reader_mode_rpc_url.port();
}

let parsed_url = config.get_db_url().unwrap();
let blocking_pool = new_pg_connection_pool_impl(&parsed_url, Some(5)).unwrap();
if config.reset_db && reader_mode_rpc_url.is_none() {
reset_database(&mut blocking_pool.get().unwrap(), true, config.use_v2).unwrap();
}

let registry = prometheus::Registry::default();

init_metrics(&registry);

let indexer_metrics = IndexerMetrics::new(&registry);

let store = PgIndexerStoreV2::new(blocking_pool, indexer_metrics.clone());
let store_clone = store.clone();
let handle = if reader_mode_rpc_url.is_some() {
tokio::spawn(async move { IndexerV2::start_reader(&config, &registry, db_url).await })
} else {
tokio::spawn(
async move { IndexerV2::start_writer(&config, store_clone, indexer_metrics).await },
)
};

(store, handle)
}

impl ExecutorCluster {
pub async fn wait_for_checkpoint_catchup(&self, checkpoint: u64, base_timeout: Duration) {
let current_checkpoint = self
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ sui-test-transaction-builder.workspace = true
test-cluster.workspace = true
ntest.workspace = true
criterion.workspace = true
simulacrum.workspace = true

[[bin]]
name = "sui-indexer"
Expand Down
20 changes: 0 additions & 20 deletions crates/sui-indexer/src/schema_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,22 +299,6 @@ diesel::table! {
}
}

diesel::table! {
tx_indices (tx_sequence_number) {
tx_sequence_number -> Int8,
checkpoint_sequence_number -> Int8,
transaction_digest -> Bytea,
input_objects -> Array<Nullable<Bytea>>,
changed_objects -> Array<Nullable<Bytea>>,
senders -> Array<Nullable<Bytea>>,
payers -> Array<Nullable<Bytea>>,
recipients -> Array<Nullable<Bytea>>,
packages -> Array<Nullable<Bytea>>,
package_modules -> Array<Nullable<Text>>,
package_module_functions -> Array<Nullable<Text>>,
}
}

diesel::allow_tables_to_appear_in_same_query!(
active_addresses,
address_metrics,
Expand All @@ -339,8 +323,4 @@ diesel::allow_tables_to_appear_in_same_query!(
tx_input_objects,
tx_recipients,
tx_senders,
tx_indices,
);

use diesel::sql_types::Text;
diesel::sql_function! {fn query_cost(x : Text) ->Float8;}
4 changes: 4 additions & 0 deletions crates/sui-indexer/src/store/pg_indexer_store_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ impl PgIndexerStoreV2 {
}
}

pub fn blocking_cp(&self) -> PgConnectionPool {
self.blocking_cp.clone()
}

fn get_latest_tx_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
read_only_blocking!(&self.blocking_cp, |conn| {
checkpoints::dsl::checkpoints
Expand Down
80 changes: 78 additions & 2 deletions crates/sui-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,92 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use mysten_metrics::init_metrics;
use prometheus::Registry;
use tokio::task::JoinHandle;

use std::env;
use std::net::SocketAddr;
use sui_json_rpc_types::SuiTransactionBlockResponse;
use tracing::info;

use crate::errors::IndexerError;
use crate::store::PgIndexerStore;
use crate::indexer_v2::IndexerV2;
use crate::store::{PgIndexerStore, PgIndexerStoreV2};
use crate::utils::reset_database;
use crate::IndexerMetrics;
use crate::{new_pg_connection_pool, Indexer, IndexerConfig};
use crate::{new_pg_connection_pool_impl, IndexerMetrics};

pub async fn start_test_indexer_v2(
db_url: Option<String>,
rpc_url: String,
reader_mode_rpc_url: Option<String>,
use_indexer_experimental_methods: bool,
) -> (PgIndexerStoreV2, JoinHandle<Result<(), IndexerError>>) {
// Reduce the connection pool size to 10 for testing
// to prevent maxing out
info!("Setting DB_POOL_SIZE to 10");
std::env::set_var("DB_POOL_SIZE", "10");

let db_url = db_url.unwrap_or_else(|| {
let pg_host = env::var("POSTGRES_HOST").unwrap_or_else(|_| "localhost".into());
let pg_port = env::var("POSTGRES_PORT").unwrap_or_else(|_| "32770".into());
let pw = env::var("POSTGRES_PASSWORD").unwrap_or_else(|_| "postgrespw".into());
format!("postgres://postgres:{pw}@{pg_host}:{pg_port}")
});

let migrated_methods = if use_indexer_experimental_methods {
IndexerConfig::all_implemented_methods()
} else {
vec![]
};

// Default writer mode
let mut config = IndexerConfig {
db_url: Some(db_url.clone()),
rpc_client_url: rpc_url,
migrated_methods,
reset_db: true,
fullnode_sync_worker: true,
rpc_server_worker: false,
use_v2: true,
..Default::default()
};

if let Some(reader_mode_rpc_url) = &reader_mode_rpc_url {
let reader_mode_rpc_url = reader_mode_rpc_url
.parse::<SocketAddr>()
.expect("Unable to parse fullnode address");
config.fullnode_sync_worker = false;
config.rpc_server_worker = true;
config.rpc_server_url = reader_mode_rpc_url.ip().to_string();
config.rpc_server_port = reader_mode_rpc_url.port();
}

let parsed_url = config.get_db_url().unwrap();
let blocking_pool = new_pg_connection_pool_impl(&parsed_url, Some(5)).unwrap();
if config.reset_db && reader_mode_rpc_url.is_none() {
reset_database(&mut blocking_pool.get().unwrap(), true, config.use_v2).unwrap();
}

let registry = prometheus::Registry::default();

init_metrics(&registry);

let indexer_metrics = IndexerMetrics::new(&registry);

let store = PgIndexerStoreV2::new(blocking_pool, indexer_metrics.clone());
let store_clone = store.clone();
let handle = if reader_mode_rpc_url.is_some() {
tokio::spawn(async move { IndexerV2::start_reader(&config, &registry, db_url).await })
} else {
tokio::spawn(
async move { IndexerV2::start_writer(&config, store_clone, indexer_metrics).await },
)
};

(store, handle)
}

/// Spawns an indexer thread with provided Postgres DB url
pub async fn start_test_indexer(
Expand Down
Loading

0 comments on commit 0f0d7cb

Please sign in to comment.