Skip to content

Commit

Permalink
refactor: start options (GreptimeTeam#545)
Browse files Browse the repository at this point in the history
* refactor: config options for frontend/datanode/standalone

* chore: rename MetaClientOpts::metasrv_addr to MetaClientOpts::metasrv_addrs

* fix: clippy

* fix: change default meta-srv addr to 127.0.0.1:3002
  • Loading branch information
v0y4g3r authored Nov 17, 2022
1 parent 55f18b5 commit 8faa6b0
Showing 14 changed files with 70 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ type = 'File'
data_dir = '/tmp/greptimedb/data/'

[meta_client_opts]
metasrv_addr = '1.1.1.1:3002'
metasrv_addrs = ['127.0.0.1:3002']
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false
2 changes: 1 addition & 1 deletion config/frontend.example.toml
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ datanode_rpc_addr = '127.0.0.1:3001'
http_addr = '127.0.0.1:4000'

[meta_client_opts]
metasrv_addr = '1.1.1.1:3002'
metasrv_addrs = ['127.0.0.1:3002']
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false
1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ common-error = { path = "../common/error" }
common-telemetry = { path = "../common/telemetry", features = [
"deadlock_detection",
] }
meta-client = { path = "../meta-client" }
datanode = { path = "../datanode" }
frontend = { path = "../frontend" }
futures = "0.3"
38 changes: 29 additions & 9 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions};
use frontend::frontend::Mode;
use meta_client::MetaClientOpts;
use snafu::ResultExt;

use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu};
@@ -98,7 +99,13 @@ impl TryFrom<StartCommand> for DatanodeOptions {
}

if let Some(meta_addr) = cmd.metasrv_addr {
opts.meta_client_opts.metasrv_addr = meta_addr;
opts.meta_client_opts
.get_or_insert_with(MetaClientOpts::default)
.metasrv_addrs = meta_addr
.split(',')
.map(&str::trim)
.map(&str::to_string)
.collect::<_>();
opts.mode = Mode::Distributed;
}

@@ -138,13 +145,17 @@ mod tests {
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir);
assert_eq!("127.0.0.1:3306".to_string(), options.mysql_addr);
assert_eq!(4, options.mysql_runtime_size);
assert_eq!(
"1.1.1.1:3002".to_string(),
options.meta_client_opts.metasrv_addr
);
assert_eq!(5000, options.meta_client_opts.connect_timeout_millis);
assert_eq!(3000, options.meta_client_opts.timeout_millis);
assert!(!options.meta_client_opts.tcp_nodelay);
let MetaClientOpts {
metasrv_addrs: metasrv_addr,
timeout_millis,
connect_timeout_millis,
tcp_nodelay,
} = options.meta_client_opts.unwrap();

assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
assert_eq!(5000, connect_timeout_millis);
assert_eq!(3000, timeout_millis);
assert!(!tcp_nodelay);

match options.storage {
ObjectStoreConfig::File { data_dir } => {
@@ -213,6 +224,15 @@ mod tests {
})
.unwrap();
assert_eq!(Some(42), dn_opts.node_id);
assert_eq!("1.1.1.1:3002", dn_opts.meta_client_opts.metasrv_addr);
let MetaClientOpts {
metasrv_addrs: metasrv_addr,
timeout_millis,
connect_timeout_millis,
tcp_nodelay,
} = dn_opts.meta_client_opts.unwrap();
assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
assert_eq!(3000, timeout_millis);
assert_eq!(5000, connect_timeout_millis);
assert!(!tcp_nodelay);
}
}
15 changes: 8 additions & 7 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ use frontend::instance::Instance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use meta_client::MetaClientOpts;
use snafu::ResultExt;

use crate::error::{self, Result};
@@ -124,13 +125,13 @@ impl TryFrom<StartCommand> for FrontendOptions {
opts.influxdb_options = Some(InfluxdbOptions { enable });
}
if let Some(metasrv_addr) = cmd.metasrv_addr {
opts.metasrv_addr = Some(
metasrv_addr
.split(',')
.into_iter()
.map(|x| x.trim().to_string())
.collect::<Vec<String>>(),
);
opts.meta_client_opts
.get_or_insert_with(MetaClientOpts::default)
.metasrv_addrs = metasrv_addr
.split(',')
.map(&str::trim)
.map(&str::to_string)
.collect::<Vec<_>>();
opts.mode = Mode::Distributed;
}
Ok(opts)
2 changes: 1 addition & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
@@ -105,7 +105,7 @@ impl StandaloneOptions {
prometheus_options: self.prometheus_options,
mode: self.mode,
datanode_rpc_addr: "127.0.0.1:3001".to_string(),
metasrv_addr: None,
meta_client_opts: None,
}
}

4 changes: 2 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ pub struct DatanodeOptions {
pub rpc_runtime_size: usize,
pub mysql_addr: String,
pub mysql_runtime_size: usize,
pub meta_client_opts: MetaClientOpts,
pub meta_client_opts: Option<MetaClientOpts>,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub mode: Mode,
@@ -58,7 +58,7 @@ impl Default for DatanodeOptions {
rpc_runtime_size: 8,
mysql_addr: "127.0.0.1:3306".to_string(),
mysql_runtime_size: 2,
meta_client_opts: MetaClientOpts::default(),
meta_client_opts: None,
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
mode: Mode::Standalone,
4 changes: 4 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
@@ -283,6 +283,9 @@ pub enum Error {

#[snafu(display("Missing node id option in distributed mode"))]
MissingNodeId { backtrace: Backtrace },

#[snafu(display("Missing node id option in distributed mode"))]
MissingMetasrvOpts { backtrace: Backtrace },
}

pub type Result<T> = std::result::Result<T, Error>;
@@ -348,6 +351,7 @@ impl ErrorExt for Error {
Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported,
Error::BumpTableId { source, .. } => source.status_code(),
Error::MissingNodeId { .. } => StatusCode::InvalidArguments,
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
}
}

9 changes: 6 additions & 3 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
@@ -38,7 +38,8 @@ use table::table::TableIdProviderRef;

use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingNodeIdSnafu, NewCatalogSnafu, Result,
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, Result,
};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
@@ -76,7 +77,9 @@ impl Instance {
Mode::Distributed => {
let meta_client = new_metasrv_client(
opts.node_id.context(MissingNodeIdSnafu)?,
&opts.meta_client_opts,
opts.meta_client_opts
.as_ref()
.context(MissingMetasrvOptsSnafu)?,
)
.await?;
Some(Arc::new(meta_client))
@@ -204,7 +207,7 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul
.channel_manager(channel_manager)
.build();
meta_client
.start(&[&meta_config.metasrv_addr])
.start(&meta_config.metasrv_addrs)
.await
.context(MetaClientInitSnafu)?;

4 changes: 4 additions & 0 deletions src/frontend/src/error.rs
Original file line number Diff line number Diff line change
@@ -413,6 +413,9 @@ pub enum Error {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},

#[snafu(display("Missing meta_client_opts section in config"))]
MissingMetasrvOpts { backtrace: Backtrace },
}

pub type Result<T> = std::result::Result<T, Error>;
@@ -496,6 +499,7 @@ impl ErrorExt for Error {
Error::CollectRecordbatchStream { source } | Error::CreateRecordbatches { source } => {
source.status_code()
}
Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments,
}
}

5 changes: 3 additions & 2 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@

use std::sync::Arc;

use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use snafu::prelude::*;

@@ -38,7 +39,7 @@ pub struct FrontendOptions {
pub prometheus_options: Option<PrometheusOptions>,
pub mode: Mode,
pub datanode_rpc_addr: String,
pub metasrv_addr: Option<Vec<String>>,
pub meta_client_opts: Option<MetaClientOpts>,
}

impl Default for FrontendOptions {
@@ -53,7 +54,7 @@ impl Default for FrontendOptions {
prometheus_options: Some(PrometheusOptions::default()),
mode: Mode::Standalone,
datanode_rpc_addr: "127.0.0.1:3001".to_string(),
metasrv_addr: None,
meta_client_opts: None,
}
}
}
12 changes: 7 additions & 5 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
@@ -60,7 +60,8 @@ use crate::datanode::DatanodeClients;
use crate::error::{
self, AlterTableOnInsertionSnafu, AlterTableSnafu, CatalogNotFoundSnafu, CatalogSnafu,
CreateDatabaseSnafu, CreateTableSnafu, DeserializeInsertBatchSnafu,
FindNewColumnsOnInsertionSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, SelectSnafu,
FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result,
SchemaNotFoundSnafu, SelectSnafu,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::{FrontendOptions, Mode};
@@ -129,10 +130,11 @@ impl Instance {
instance.dist_instance = match &opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
let metasrv_addr = opts
.metasrv_addr
.clone()
.expect("Forgot to set metasrv_addr");
let metasrv_addr = &opts
.meta_client_opts
.as_ref()
.context(MissingMetasrvOptsSnafu)?
.metasrv_addrs;
info!(
"Creating Frontend instance in distributed mode with Meta server addr {:?}",
metasrv_addr
4 changes: 2 additions & 2 deletions src/meta-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ pub mod rpc;
// Options for meta client in datanode instance.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MetaClientOpts {
pub metasrv_addr: String,
pub metasrv_addrs: Vec<String>,
pub timeout_millis: u64,
pub connect_timeout_millis: u64,
pub tcp_nodelay: bool,
@@ -32,7 +32,7 @@ pub struct MetaClientOpts {
impl Default for MetaClientOpts {
fn default() -> Self {
Self {
metasrv_addr: "127.0.0.1:3002".to_string(),
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout_millis: 3_000u64,
connect_timeout_millis: 5_000u64,
tcp_nodelay: true,

0 comments on commit 8faa6b0

Please sign in to comment.