From 3b32f03e3a8d8761c173f2046cd86b2c0f4b666e Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 23 Feb 2023 15:15:09 +1300 Subject: [PATCH 1/9] feat: Create trait for abstracting away S3 client --- Cargo.toml | 3 ++- src/s3_fetchers.rs | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 00455b6..13a61b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,8 @@ aws-config = "0.53.0" aws-types = "0.53.0" aws-credential-types = "0.53.0" aws-sdk-s3 = "0.23.0" +async-stream = "0.3.3" +async-trait = "0.1.64" derive_builder = "0.11.2" futures = "0.3.23" serde = { version = "1", features = ["derive"] } @@ -30,4 +32,3 @@ tokio-stream = { version = "0.1" } tracing = "0.1.13" near-indexer-primitives = ">=0.16.0,<0.17.0" -async-stream = "0.3.3" diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index 5156f2c..d984030 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -1,6 +1,26 @@ +use async_trait::async_trait; use std::str::FromStr; use aws_sdk_s3::Client; +use aws_sdk_s3::output::{GetObjectOutput, ListObjectsV2Output}; + +#[async_trait] +pub trait LakeS3Client { + async fn get_object( + &self, + bucket: &str, + prefix: &str, + ) -> Result>; + + async fn list_objects( + &self, + bucket: &str, + start_after: &str, + ) -> Result< + ListObjectsV2Output, + aws_sdk_s3::types::SdkError, + >; +} /// Queries the list of the objects in the bucket, grouped by "/" delimiter. /// Returns the list of block heights that can be fetched From 6b3325d6d17ddf2c63e8536c714164c5f738dfc6 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 23 Feb 2023 15:21:44 +1300 Subject: [PATCH 2/9] refactor: Replace s3 client usages with new trait impl --- src/lib.rs | 29 +++++++++------ src/s3_fetchers.rs | 90 +++++++++++++++++++++++++++++++++------------- 2 files changed, 83 insertions(+), 36 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 31edb84..23c5a45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -230,6 +230,8 @@ pub use near_indexer_primitives; pub use aws_credential_types::Credentials; pub use types::{LakeConfig, LakeConfigBuilder}; +use s3_fetchers::LakeClient; + mod s3_fetchers; pub(crate) mod types; @@ -265,7 +267,7 @@ pub fn streamer( } fn stream_block_heights<'a: 'b, 'b>( - s3_client: &'a Client, + lake_client: &'a LakeClient, s3_bucket_name: &'a str, mut start_from_block_height: crate::types::BlockHeight, ) -> impl futures::Stream + 'b { @@ -273,7 +275,7 @@ fn stream_block_heights<'a: 'b, 'b>( loop { tracing::debug!(target: LAKE_FRAMEWORK, "Fetching a list of blocks from S3..."); match s3_fetchers::list_block_heights( - s3_client, + lake_client, s3_bucket_name, start_from_block_height, ) @@ -370,6 +372,7 @@ async fn start( .build(); Client::from_conf(s3_config) }; + let lake_client = s3_fetchers::LakeClient::new(s3_client.clone()); let mut last_processed_block_hash: Option = None; @@ -381,8 +384,11 @@ async fn start( // in some cases, write N+1 block before it finishes writing the N block. // We require to stream blocks consistently, so we need to try to load the block again. - let pending_block_heights = - stream_block_heights(&s3_client, &config.s3_bucket_name, start_from_block_height); + let pending_block_heights = stream_block_heights( + &lake_client, + &config.s3_bucket_name, + start_from_block_height, + ); tokio::pin!(pending_block_heights); let mut streamer_messages_futures = futures::stream::FuturesOrdered::new(); @@ -402,7 +408,7 @@ async fn start( .into_iter() .map(|block_height| { s3_fetchers::fetch_streamer_message( - &s3_client, + &lake_client, &config.s3_bucket_name, block_height, ) @@ -502,12 +508,13 @@ async fn start( })? .into_iter() .map(|block_height| { - s3_fetchers::fetch_streamer_message( - &s3_client, - &config.s3_bucket_name, - block_height, - ) - })); + s3_fetchers::fetch_streamer_message( + &lake_client, + &config.s3_bucket_name, + block_height, + ) + } + )); } tracing::warn!( diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index d984030..9e9fa0e 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -1,7 +1,6 @@ use async_trait::async_trait; use std::str::FromStr; -use aws_sdk_s3::Client; use aws_sdk_s3::output::{GetObjectOutput, ListObjectsV2Output}; #[async_trait] @@ -22,10 +21,60 @@ pub trait LakeS3Client { >; } +#[derive(Clone, Debug)] +pub struct LakeClient { + s3: aws_sdk_s3::Client, +} + +impl LakeClient { + pub fn new(s3: aws_sdk_s3::Client) -> Self { + Self { s3 } + } +} + +#[async_trait] +impl LakeS3Client for LakeClient { + async fn get_object( + &self, + bucket: &str, + prefix: &str, + ) -> Result> + { + Ok(self + .s3 + .get_object() + .bucket(bucket) + .key(prefix) + .request_payer(aws_sdk_s3::model::RequestPayer::Requester) + .send() + .await?) + } + + async fn list_objects( + &self, + bucket: &str, + start_after: &str, + ) -> Result< + ListObjectsV2Output, + aws_sdk_s3::types::SdkError, + > { + Ok(self + .s3 + .list_objects_v2() + .max_keys(1000) // 1000 is the default and max value for this parameter + .delimiter("/".to_string()) + .start_after(start_after) + .request_payer(aws_sdk_s3::model::RequestPayer::Requester) + .bucket(bucket) + .send() + .await?) + } +} + /// Queries the list of the objects in the bucket, grouped by "/" delimiter. /// Returns the list of block heights that can be fetched pub(crate) async fn list_block_heights( - s3_client: &Client, + lake_client_impl: &impl LakeS3Client, s3_bucket_name: &str, start_from_block_height: crate::types::BlockHeight, ) -> Result< @@ -37,14 +86,8 @@ pub(crate) async fn list_block_heights( "Fetching block heights from S3, after #{}...", start_from_block_height ); - let response = s3_client - .list_objects_v2() - .max_keys(1000) // 1000 is the default and max value for this parameter - .delimiter("/".to_string()) - .start_after(format!("{:0>12}", start_from_block_height)) - .request_payer(aws_sdk_s3::model::RequestPayer::Requester) - .bucket(s3_bucket_name) - .send() + let response = lake_client_impl + .list_objects(s3_bucket_name, &format!("{:0>12}", start_from_block_height)) .await?; Ok(match response.common_prefixes { @@ -71,7 +114,7 @@ pub(crate) async fn list_block_heights( /// Reads the content of the objects and parses as a JSON. /// Returns the result in `near_indexer_primitives::StreamerMessage` pub(crate) async fn fetch_streamer_message( - s3_client: &Client, + lake_client_impl: &impl LakeS3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, ) -> Result< @@ -80,12 +123,8 @@ pub(crate) async fn fetch_streamer_message( > { let block_view = { let body_bytes = loop { - match s3_client - .get_object() - .bucket(s3_bucket_name) - .key(format!("{:0>12}/block.json", block_height)) - .request_payer(aws_sdk_s3::model::RequestPayer::Requester) - .send() + match lake_client_impl + .get_object(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) .await { Ok(response) => { @@ -118,7 +157,9 @@ pub(crate) async fn fetch_streamer_message( let fetch_shards_futures = (0..block_view.chunks.len() as u64) .collect::>() .into_iter() - .map(|shard_id| fetch_shard_or_retry(s3_client, s3_bucket_name, block_height, shard_id)); + .map(|shard_id| { + fetch_shard_or_retry(lake_client_impl, s3_bucket_name, block_height, shard_id) + }); let shards = futures::future::try_join_all(fetch_shards_futures).await?; @@ -130,7 +171,7 @@ pub(crate) async fn fetch_streamer_message( /// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard` async fn fetch_shard_or_retry( - s3_client: &Client, + lake_client_impl: &impl LakeS3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, shard_id: u64, @@ -139,12 +180,11 @@ async fn fetch_shard_or_retry( crate::types::LakeError, > { let body_bytes = loop { - match s3_client - .get_object() - .bucket(s3_bucket_name) - .key(format!("{:0>12}/shard_{}.json", block_height, shard_id)) - .request_payer(aws_sdk_s3::model::RequestPayer::Requester) - .send() + match lake_client_impl + .get_object( + s3_bucket_name, + &format!("{:0>12}/shard_{}.json", block_height, shard_id), + ) .await { Ok(response) => { From be3aea75890c4f2730495148d2be4db7d9db822a Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 23 Feb 2023 16:08:18 +1300 Subject: [PATCH 3/9] test: Serialization of meta transactions --- Cargo.toml | 3 + blocks/000000879765/block.json | 63 ++++++++ blocks/000000879765/shard_0.json | 239 +++++++++++++++++++++++++++++++ src/s3_fetchers.rs | 86 +++++++++++ 4 files changed, 391 insertions(+) create mode 100644 blocks/000000879765/block.json create mode 100644 blocks/000000879765/shard_0.json diff --git a/Cargo.toml b/Cargo.toml index 13a61b9..580f154 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,6 @@ tokio-stream = { version = "0.1" } tracing = "0.1.13" near-indexer-primitives = ">=0.16.0,<0.17.0" + +[dev-dependencies] +aws-smithy-http = "0.53.0" diff --git a/blocks/000000879765/block.json b/blocks/000000879765/block.json new file mode 100644 index 0000000..1581ebb --- /dev/null +++ b/blocks/000000879765/block.json @@ -0,0 +1,63 @@ +{ + "author": "test.near", + "header": { + "height": 879765, + "prev_height": 879764, + "epoch_id": "Hp4sw9ZGSceYadnvh7NpYJVVK7rcdir48jfrsxvwKQu9", + "next_epoch_id": "4h5mecoLYVFeZxAMAX3Mq3GQfEnuvSAPPo9kEpr4rGUL", + "hash": "95K8Je1iAVqieVU8ZuGgSdbvYs8T9rL6ER1XnRekMGbj", + "prev_hash": "9Da84RTsubZPcLxzK1K6JkCnDnMn4DxaSRzJPtnYJXUM", + "prev_state_root": "6zDM1UGLsZ7HnyUofDrTF73gv5vk2N614ViDkXBkq4ej", + "chunk_receipts_root": "9ETNjrt6MkwTgSVMMbpukfxRshSD1avBUUa4R4NuqwHv", + "chunk_headers_root": "4otZ2Zj1wANZweh33kWETr3VbF3HwW9zWET4YRYTo2pL", + "chunk_tx_root": "9rdfzfYzJMZyaj2yMvjget2ZsPNbZhKqY1qUXc1urDfu", + "outcome_root": "7tkzFg8RHBmMw1ncRJZCCZAizgq4rwCftTKYLce8RU8t", + "chunks_included": 1, + "challenges_root": "11111111111111111111111111111111", + "timestamp": 1676913656724153000, + "timestamp_nanosec": "1676913656724153000", + "random_value": "Au7bq9XzGAhDm2wb4PxbXQnTngzVTcWYa76Govx6n7NK", + "validator_proposals": [], + "chunk_mask": [ + true + ], + "gas_price": "100000000", + "block_ordinal": 879714, + "rent_paid": "0", + "validator_reward": "0", + "total_supply": "2085303629225498163419972383984892", + "challenges_result": [], + "last_final_block": "BS9QJenf3N9pKy8PZ5xRuowZi9X9T4sSDDu4i3i5UJZe", + "last_ds_final_block": "9Da84RTsubZPcLxzK1K6JkCnDnMn4DxaSRzJPtnYJXUM", + "next_bp_hash": "EtsYQonaJ7n5nRt32XJC5dBxxBxh7a9UVApykmmt8fCQ", + "block_merkle_root": "CqRoDd8BR4su7Z8vSfvg45HrugZnwbMbnXHRTWYQkWfZ", + "epoch_sync_data_hash": null, + "approvals": [ + "ed25519:3RBQ4PnfBbnDn8WnCScQJH9asjkicuhZZo36aa6FVa2Lbnj531NLiBkTmj8rhg5vfsarmYLgQmcMcXRuJ4jkzKns" + ], + "signature": "ed25519:2dWsY1QadJyNaVkyga5Wcj9DFRizAyFc9STjyN5Mtxc59ZzNYqML6qQTgtLeCYkpCy1h7kG34jcALTpEDQpkBoKQ", + "latest_protocol_version": 59 + }, + "chunks": [ + { + "chunk_hash": "7Ewp1AnL6o29UXLW2up9miQBdSaKxCnfRyhMGt9G4epN", + "prev_block_hash": "9Da84RTsubZPcLxzK1K6JkCnDnMn4DxaSRzJPtnYJXUM", + "outcome_root": "11111111111111111111111111111111", + "prev_state_root": "2ViDp7rmam77VmhY5C9KW92a6mgUTCKQ3Scz8tFyH13z", + "encoded_merkle_root": "44MrDjQzt1jU5PGUYY69THZ4g3SsfQiNiKKorey3GVtq", + "encoded_length": 364, + "height_created": 879765, + "height_included": 879765, + "shard_id": 0, + "gas_used": 0, + "gas_limit": 1000000000000000, + "rent_paid": "0", + "validator_reward": "0", + "balance_burnt": "0", + "outgoing_receipts_root": "H4Rd6SGeEBTbxkitsCdzfu9xL9HtZ2eHoPCQXUeZ6bW4", + "tx_root": "GKd8Evs3JdahRpS8q14q6RzzkodzFiSQPcH4yJxs4ZjG", + "validator_proposals": [], + "signature": "ed25519:2qev3mWQdYLi9aPwCnFHt22GFxhuGTGfnaz3msGcduUdXeycTQDBkY4EyQzpph4frXCybuYHE6g4GFxD2HVmWbJY" + } + ] +} diff --git a/blocks/000000879765/shard_0.json b/blocks/000000879765/shard_0.json new file mode 100644 index 0000000..2178ac2 --- /dev/null +++ b/blocks/000000879765/shard_0.json @@ -0,0 +1,239 @@ +{ + "shard_id": 0, + "chunk": { + "author": "test.near", + "header": { + "chunk_hash": "7Ewp1AnL6o29UXLW2up9miQBdSaKxCnfRyhMGt9G4epN", + "prev_block_hash": "9Da84RTsubZPcLxzK1K6JkCnDnMn4DxaSRzJPtnYJXUM", + "outcome_root": "11111111111111111111111111111111", + "prev_state_root": "2ViDp7rmam77VmhY5C9KW92a6mgUTCKQ3Scz8tFyH13z", + "encoded_merkle_root": "44MrDjQzt1jU5PGUYY69THZ4g3SsfQiNiKKorey3GVtq", + "encoded_length": 364, + "height_created": 879765, + "height_included": 0, + "shard_id": 0, + "gas_used": 0, + "gas_limit": 1000000000000000, + "rent_paid": "0", + "validator_reward": "0", + "balance_burnt": "0", + "outgoing_receipts_root": "H4Rd6SGeEBTbxkitsCdzfu9xL9HtZ2eHoPCQXUeZ6bW4", + "tx_root": "GKd8Evs3JdahRpS8q14q6RzzkodzFiSQPcH4yJxs4ZjG", + "validator_proposals": [], + "signature": "ed25519:2qev3mWQdYLi9aPwCnFHt22GFxhuGTGfnaz3msGcduUdXeycTQDBkY4EyQzpph4frXCybuYHE6g4GFxD2HVmWbJY" + }, + "transactions": [ + { + "transaction": { + "signer_id": "test.near", + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib", + "nonce": 39, + "receiver_id": "test.near", + "actions": [ + { + "Delegate": { + "delegate_action": { + "sender_id": "test.near", + "receiver_id": "test.near", + "actions": [ + { + "AddKey": { + "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + } + ], + "nonce": 879546, + "max_block_height": 100, + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" + }, + "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" + } + } + ], + "signature": "ed25519:3vKF31u2naSjow1uQEfkoWy834fu9xhk66oBfTAYL3XVtJVAf1FREt7owJzwyRrN5F4mtd1rkvv1iTPTL86Szb2j", + "hash": "EZnJpyJDnkwnadB1V8PqjVMx7oe2zLhUMtJ8v6EUh1NQ" + }, + "outcome": { + "execution_outcome": { + "proof": [ + { + "hash": "7kPZTTVYJHvUg4g3S7SFErkKs18Ex1kN4rESnZwtJb2U", + "direction": "Right" + } + ], + "block_hash": "95K8Je1iAVqieVU8ZuGgSdbvYs8T9rL6ER1XnRekMGbj", + "id": "EZnJpyJDnkwnadB1V8PqjVMx7oe2zLhUMtJ8v6EUh1NQ", + "outcome": { + "logs": [], + "receipt_ids": [ + "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ" + ], + "gas_burnt": 409824625000, + "tokens_burnt": "40982462500000000000", + "executor_id": "test.near", + "status": { + "SuccessReceiptId": "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ" + }, + "metadata": { + "version": 1, + "gas_profile": null + } + } + }, + "receipt": null + } + } + ], + "receipts": [ + { + "predecessor_id": "test.near", + "receiver_id": "test.near", + "receipt_id": "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ", + "receipt": { + "Action": { + "signer_id": "test.near", + "signer_public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib", + "gas_price": "100000000", + "output_data_receivers": [], + "input_data_ids": [], + "actions": [ + { + "Delegate": { + "delegate_action": { + "sender_id": "test.near", + "receiver_id": "test.near", + "actions": [ + { + "AddKey": { + "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + } + ], + "nonce": 879546, + "max_block_height": 100, + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" + }, + "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" + } + } + ] + } + } + } + ] + }, + "receipt_execution_outcomes": [ + { + "execution_outcome": { + "proof": [ + { + "hash": "6vBgNYcwx6pcESfrw5YRBRamatBH8red3GEt3s3ntefm", + "direction": "Left" + } + ], + "block_hash": "95K8Je1iAVqieVU8ZuGgSdbvYs8T9rL6ER1XnRekMGbj", + "id": "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ", + "outcome": { + "logs": [], + "receipt_ids": [ + "5rc8UEhD4hmNQ3pJJM5Xc3VHeLXpCQqkA3ep8ag4aaDA" + ], + "gas_burnt": 308059500000, + "tokens_burnt": "30805950000000000000", + "executor_id": "test.near", + "status": { + "Failure": { + "ActionError": { + "index": 0, + "kind": "DelegateActionExpired" + } + } + }, + "metadata": { + "version": 3, + "gas_profile": [] + } + } + }, + "receipt": { + "predecessor_id": "test.near", + "receiver_id": "test.near", + "receipt_id": "AQDQ9G4QpK7x2inV3GieVEbqeoCGF9nmvrViQ2UgEXDQ", + "receipt": { + "Action": { + "signer_id": "test.near", + "signer_public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib", + "gas_price": "100000000", + "output_data_receivers": [], + "input_data_ids": [], + "actions": [ + { + "Delegate": { + "delegate_action": { + "sender_id": "test.near", + "receiver_id": "test.near", + "actions": [ + { + "AddKey": { + "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + } + ], + "nonce": 879546, + "max_block_height": 100, + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" + }, + "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" + } + } + ] + } + } + } + } + ], + "state_changes": [ + { + "cause": { + "type": "transaction_processing", + "tx_hash": "EZnJpyJDnkwnadB1V8PqjVMx7oe2zLhUMtJ8v6EUh1NQ" + }, + "type": "account_update", + "change": { + "account_id": "test.near", + "amount": "999999549946933447300000000000000", + "locked": "81773107345435833494396250588347", + "code_hash": "11111111111111111111111111111111", + "storage_usage": 182, + "storage_paid_at": 0 + } + }, + { + "cause": { + "type": "transaction_processing", + "tx_hash": "EZnJpyJDnkwnadB1V8PqjVMx7oe2zLhUMtJ8v6EUh1NQ" + }, + "type": "access_key_update", + "change": { + "account_id": "test.near", + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib", + "access_key": { + "nonce": 39, + "permission": "FullAccess" + } + } + } + ] +} diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index 9e9fa0e..309b135 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -220,3 +220,89 @@ async fn fetch_shard_or_retry( near_indexer_primitives::IndexerShard, >(body_bytes.as_ref())?) } + +#[cfg(test)] +mod test { + use super::*; + + use async_trait::async_trait; + + use aws_sdk_s3::output::{get_object_output, list_objects_v2_output}; + use aws_sdk_s3::types::ByteStream; + + use aws_smithy_http::body::SdkBody; + + #[derive(Clone, Debug)] + pub struct LakeClient {} + + #[async_trait] + impl LakeS3Client for LakeClient { + async fn get_object( + &self, + _bucket: &str, + prefix: &str, + ) -> Result> + { + let path = format!("{}/blocks/{}", env!("CARGO_MANIFEST_DIR"), prefix); + let file_bytes = tokio::fs::read(path).await.unwrap(); + let stream = ByteStream::new(SdkBody::from(file_bytes)); + Ok(get_object_output::Builder::default().body(stream).build()) + } + + async fn list_objects( + &self, + _bucket: &str, + _start_after: &str, + ) -> Result< + ListObjectsV2Output, + aws_sdk_s3::types::SdkError, + > { + Ok(list_objects_v2_output::Builder::default().build()) + } + } + + #[tokio::test] + async fn deserializes_meta_transactions() { + let lake_client = LakeClient {}; + + let streamer_message = + fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) + .await + .unwrap(); + + let delegate_action = &streamer_message.shards[0] + .chunk + .as_ref() + .unwrap() + .transactions[0] + .transaction + .actions[0]; + + assert_eq!( + serde_json::to_value(delegate_action).unwrap(), + serde_json::json!({ + "Delegate": { + "delegate_action": { + "sender_id": "test.near", + "receiver_id": "test.near", + "actions": [ + { + "AddKey": { + "public_key": "ed25519:CnQMksXTTtn81WdDujsEMQgKUMkFvDJaAjDeDLTxVrsg", + "access_key": { + "nonce": 0, + "permission": "FullAccess" + } + } + } + ], + "nonce": 879546, + "max_block_height": 100, + "public_key": "ed25519:8Rn4FJeeRYcrLbcrAQNFVgvbZ2FCEQjgydbXwqBwF1ib" + }, + "signature": "ed25519:25uGrsJNU3fVgUpPad3rGJRy2XQum8gJxLRjKFCbd7gymXwUxQ9r3tuyBCD6To7SX5oSJ2ScJZejwqK1ju8WdZfS" + } + }) + ); + } +} From f88e689a048b33b60a30df122117e3a26ddc382d Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 23 Feb 2023 16:12:20 +1300 Subject: [PATCH 4/9] ci: Add step to run tests --- .github/workflows/ci.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c9aeed3..dadbecc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,6 +53,22 @@ jobs: run: | cargo clippy + test: + runs-on: ubuntu-20.04 + steps: + - name: Checkout repository + uses: actions/checkout@v2 + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + profile: minimal + components: clippy + - name: Run tests + run: | + cargo test + publish: runs-on: ubuntu-20.04 needs: [rustfmt, rustclippy] From fab4caf2d2017191d0aca595297fca05f996fbdd Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 23 Feb 2023 16:18:18 +1300 Subject: [PATCH 5/9] test: Ignore failing doc tests --- Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 580f154..377366a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,6 @@ near-indexer-primitives = ">=0.16.0,<0.17.0" [dev-dependencies] aws-smithy-http = "0.53.0" + +[lib] +doctest = false From f5d5c996474665988ead845eca3d11363f072bb7 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Thu, 23 Feb 2023 16:23:11 +1300 Subject: [PATCH 6/9] chore: Add `CHANGELOG.md` entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e5e9db..53760ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.0...HEAD) +- Refactor `s3_fetchers` to allow testing + ## [0.7.0](https://github.com/near/near-lake-framework/compare/v0.6.1...0.7.0) - Add support for Meta Transactions [NEP-366](https://github.com/near/NEPs/blob/master/neps/nep-0366.md) by upgrading `near-indexer-primitives` to `0.16` From 973ecb0a16681a1f226be602395db8e44743dcd9 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 24 Feb 2023 07:00:52 +1300 Subject: [PATCH 7/9] refactor: Rename some variables for clairty --- src/lib.rs | 14 +++++++------- src/s3_fetchers.rs | 28 ++++++++++++++-------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 23c5a45..cf38e9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -230,7 +230,7 @@ pub use near_indexer_primitives; pub use aws_credential_types::Credentials; pub use types::{LakeConfig, LakeConfigBuilder}; -use s3_fetchers::LakeClient; +use s3_fetchers::LakeS3Client; mod s3_fetchers; pub(crate) mod types; @@ -267,7 +267,7 @@ pub fn streamer( } fn stream_block_heights<'a: 'b, 'b>( - lake_client: &'a LakeClient, + lake_s3_client: &'a LakeS3Client, s3_bucket_name: &'a str, mut start_from_block_height: crate::types::BlockHeight, ) -> impl futures::Stream + 'b { @@ -275,7 +275,7 @@ fn stream_block_heights<'a: 'b, 'b>( loop { tracing::debug!(target: LAKE_FRAMEWORK, "Fetching a list of blocks from S3..."); match s3_fetchers::list_block_heights( - lake_client, + lake_s3_client, s3_bucket_name, start_from_block_height, ) @@ -372,7 +372,7 @@ async fn start( .build(); Client::from_conf(s3_config) }; - let lake_client = s3_fetchers::LakeClient::new(s3_client.clone()); + let lake_s3_client = s3_fetchers::LakeS3Client::new(s3_client.clone()); let mut last_processed_block_hash: Option = None; @@ -385,7 +385,7 @@ async fn start( // We require to stream blocks consistently, so we need to try to load the block again. let pending_block_heights = stream_block_heights( - &lake_client, + &lake_s3_client, &config.s3_bucket_name, start_from_block_height, ); @@ -408,7 +408,7 @@ async fn start( .into_iter() .map(|block_height| { s3_fetchers::fetch_streamer_message( - &lake_client, + &lake_s3_client, &config.s3_bucket_name, block_height, ) @@ -509,7 +509,7 @@ async fn start( .into_iter() .map(|block_height| { s3_fetchers::fetch_streamer_message( - &lake_client, + &lake_s3_client, &config.s3_bucket_name, block_height, ) diff --git a/src/s3_fetchers.rs b/src/s3_fetchers.rs index 309b135..8e428b5 100644 --- a/src/s3_fetchers.rs +++ b/src/s3_fetchers.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use aws_sdk_s3::output::{GetObjectOutput, ListObjectsV2Output}; #[async_trait] -pub trait LakeS3Client { +pub trait S3Client { async fn get_object( &self, bucket: &str, @@ -22,18 +22,18 @@ pub trait LakeS3Client { } #[derive(Clone, Debug)] -pub struct LakeClient { +pub struct LakeS3Client { s3: aws_sdk_s3::Client, } -impl LakeClient { +impl LakeS3Client { pub fn new(s3: aws_sdk_s3::Client) -> Self { Self { s3 } } } #[async_trait] -impl LakeS3Client for LakeClient { +impl S3Client for LakeS3Client { async fn get_object( &self, bucket: &str, @@ -74,7 +74,7 @@ impl LakeS3Client for LakeClient { /// Queries the list of the objects in the bucket, grouped by "/" delimiter. /// Returns the list of block heights that can be fetched pub(crate) async fn list_block_heights( - lake_client_impl: &impl LakeS3Client, + lake_s3_client: &impl S3Client, s3_bucket_name: &str, start_from_block_height: crate::types::BlockHeight, ) -> Result< @@ -86,7 +86,7 @@ pub(crate) async fn list_block_heights( "Fetching block heights from S3, after #{}...", start_from_block_height ); - let response = lake_client_impl + let response = lake_s3_client .list_objects(s3_bucket_name, &format!("{:0>12}", start_from_block_height)) .await?; @@ -114,7 +114,7 @@ pub(crate) async fn list_block_heights( /// Reads the content of the objects and parses as a JSON. /// Returns the result in `near_indexer_primitives::StreamerMessage` pub(crate) async fn fetch_streamer_message( - lake_client_impl: &impl LakeS3Client, + lake_s3_client: &impl S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, ) -> Result< @@ -123,7 +123,7 @@ pub(crate) async fn fetch_streamer_message( > { let block_view = { let body_bytes = loop { - match lake_client_impl + match lake_s3_client .get_object(s3_bucket_name, &format!("{:0>12}/block.json", block_height)) .await { @@ -158,7 +158,7 @@ pub(crate) async fn fetch_streamer_message( .collect::>() .into_iter() .map(|shard_id| { - fetch_shard_or_retry(lake_client_impl, s3_bucket_name, block_height, shard_id) + fetch_shard_or_retry(lake_s3_client, s3_bucket_name, block_height, shard_id) }); let shards = futures::future::try_join_all(fetch_shards_futures).await?; @@ -171,7 +171,7 @@ pub(crate) async fn fetch_streamer_message( /// Fetches the shard data JSON from AWS S3 and returns the `IndexerShard` async fn fetch_shard_or_retry( - lake_client_impl: &impl LakeS3Client, + lake_s3_client: &impl S3Client, s3_bucket_name: &str, block_height: crate::types::BlockHeight, shard_id: u64, @@ -180,7 +180,7 @@ async fn fetch_shard_or_retry( crate::types::LakeError, > { let body_bytes = loop { - match lake_client_impl + match lake_s3_client .get_object( s3_bucket_name, &format!("{:0>12}/shard_{}.json", block_height, shard_id), @@ -233,10 +233,10 @@ mod test { use aws_smithy_http::body::SdkBody; #[derive(Clone, Debug)] - pub struct LakeClient {} + pub struct LakeS3Client {} #[async_trait] - impl LakeS3Client for LakeClient { + impl S3Client for LakeS3Client { async fn get_object( &self, _bucket: &str, @@ -263,7 +263,7 @@ mod test { #[tokio::test] async fn deserializes_meta_transactions() { - let lake_client = LakeClient {}; + let lake_client = LakeS3Client {}; let streamer_message = fetch_streamer_message(&lake_client, "near-lake-data-mainnet", 879765) From 3067ee82b17abc7669c5ab1b1f89c21d055e2a41 Mon Sep 17 00:00:00 2001 From: Bohdan Khorolets Date: Mon, 6 Mar 2023 18:40:46 +0200 Subject: [PATCH 8/9] fix: Betanet shortcut default region fixed (#62) --- CHANGELOG.md | 1 + src/types.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53760ac..c774ebd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.0...HEAD) - Refactor `s3_fetchers` to allow testing +- Fix `betanet` default region (the corresponding bucket is in different region) ## [0.7.0](https://github.com/near/near-lake-framework/compare/v0.6.1...0.7.0) diff --git a/src/types.rs b/src/types.rs index 2d74622..6d64e00 100644 --- a/src/types.rs +++ b/src/types.rs @@ -107,7 +107,7 @@ impl LakeConfigBuilder { /// ``` pub fn betanet(mut self) -> Self { self.s3_bucket_name = Some("near-lake-data-betanet".to_string()); - self.s3_region_name = Some("eu-central-1".to_string()); + self.s3_region_name = Some("us-east-1".to_string()); self } } From a4d6f59b0c217abdcd0240c228db70a9e1ddbb55 Mon Sep 17 00:00:00 2001 From: Bohdan Khorolets Date: Mon, 6 Mar 2023 18:51:19 +0200 Subject: [PATCH 9/9] chore: Release 0.7.1 (#63) --- CHANGELOG.md | 4 +++- Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c774ebd..6af1b2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.0...HEAD) +## [Unreleased](https://github.com/near/near-lake-framework/compare/v0.7.1...HEAD) + +## [0.7.1](https://github.com/near/near-lake-framework/compare/v0.7.1...0.7.0) - Refactor `s3_fetchers` to allow testing - Fix `betanet` default region (the corresponding bucket is in different region) diff --git a/Cargo.toml b/Cargo.toml index 377366a..968f849 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ rust-version = "1.58.1" # cargo-workspaces [workspace.metadata.workspaces] -version = "0.7.0" +version = "0.7.1" [dependencies] anyhow = "1.0.51"