Skip to content

Commit

Permalink
feat: meta procedure options (GreptimeTeam#1937)
Browse files Browse the repository at this point in the history
* feat: meta procedure options

* chore: tune meta procedure options in tests

* Update src/common/procedure/Cargo.toml

Co-authored-by: dennis zhuang <killme2008@gmail.com>

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
  • Loading branch information
WenyXu and killme2008 authored Jul 12, 2023
1 parent fa12392 commit 264c5ea
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,10 @@ use_memory_store = false
# [logging]
# dir = "/tmp/greptimedb/logs"
# level = "info"

# Procedure storage options.
[procedure]
# Procedure max retry time.
max_retry_times = 3
# Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
3 changes: 2 additions & 1 deletion src/common/procedure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
async-stream.workspace = true
backon = "0.4"
common-error = { path = "../error" }
common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }
futures.workspace = true
humantime-serde = "1.1"
object-store = { path = "../../object-store" }
serde.workspace = true
serde_json = "1.0"
smallvec = "1"
backon = "0.4.0"
snafu.workspace = true
tokio.workspace = true
uuid.workspace = true
Expand Down
1 change: 1 addition & 0 deletions src/common/procedure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
pub mod error;
pub mod local;
pub mod options;
mod procedure;
pub mod store;
pub mod watcher;
Expand Down
38 changes: 38 additions & 0 deletions src/common/procedure/src/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Common traits and structures for the procedure framework.
use std::time::Duration;

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Max retry times of procedure.
pub max_retry_times: usize,
/// Initial retry delay of procedures, increases exponentially.
#[serde(with = "humantime_serde")]
pub retry_delay: Duration,
}

impl Default for ProcedureConfig {
fn default() -> ProcedureConfig {
ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
}
}
}
20 changes: 1 addition & 19 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_error::prelude::BoxedError;
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use meta_client::MetaClientOptions;
Expand Down Expand Up @@ -344,25 +345,6 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Max retry times of procedure.
pub max_retry_times: usize,
/// Initial retry delay of procedures, increases exponentially.
#[serde(with = "humantime_serde")]
pub retry_delay: Duration,
}

impl Default for ProcedureConfig {
fn default() -> ProcedureConfig {
ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct DatanodeOptions {
Expand Down
4 changes: 3 additions & 1 deletion src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use api::v1::meta::Peer;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::{error, info, warn};
Expand All @@ -38,7 +39,6 @@ use crate::selector::{Selector, SelectorType};
use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};

pub const TABLE_ID_SEQ: &str = "table_id";

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -53,6 +53,7 @@ pub struct MetaSrvOptions {
pub disable_region_failover: bool,
pub http_opts: HttpOptions,
pub logging: LoggingOptions,
pub procedure: ProcedureConfig,
}

impl Default for MetaSrvOptions {
Expand All @@ -67,6 +68,7 @@ impl Default for MetaSrvOptions {
disable_region_failover: false,
http_opts: HttpOptions::default(),
logging: LoggingOptions::default(),
procedure: ProcedureConfig::default(),
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,14 @@ impl MetaSrvBuilder {
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence);
let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));

let manager_config = ManagerConfig {
max_retry_times: options.procedure.max_retry_times,
retry_delay: options.procedure.retry_delay,
..Default::default()
};

let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store));
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
Expand Down
20 changes: 12 additions & 8 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_meta::peer::Peer;
use common_meta::DatanodeId;
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig};
use datanode::heartbeat::HeartbeatTask;
use datanode::instance::Instance as DatanodeInstance;
use frontend::frontend::FrontendOptions;
Expand Down Expand Up @@ -118,13 +118,17 @@ impl GreptimeDbClusterBuilder {
}

async fn build_metasrv(&self, datanode_clients: Arc<DatanodeClients>) -> MockInfo {
meta_srv::mocks::mock(
MetaSrvOptions::default(),
self.kv_store.clone(),
None,
Some(datanode_clients),
)
.await
let opt = MetaSrvOptions {
procedure: ProcedureConfig {
// Due to large network delay during cross data-center.
// We only make max_retry_times and retry_delay large than the default in tests.
max_retry_times: 5,
retry_delay: Duration::from_secs(1),
},
..Default::default()
};

meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await
}

async fn build_datanodes(
Expand Down

0 comments on commit 264c5ea

Please sign in to comment.