Skip to content

Commit

Permalink
feat: distributed execute gRPC and Prometheus query in Frontend (Grep…
Browse files Browse the repository at this point in the history
…timeTeam#520)

* feat: distributed execute GRPC and Prometheus query in Frontend

* feat: distributed execute GRPC and Prometheus query in Frontend

* Apply suggestions from code review

Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>

* feat: distributed execute GRPC and Prometheus query in Frontend

* fix: do not convert timestamp to string when converting logical plan to SQL

* fix: tests

* refactor: no mock

* refactor: 0.0.0.0 -> 127.0.0.1

* refactor: 0.0.0.0 -> 127.0.0.1

* refactor: 0.0.0.0 -> 127.0.0.1

Co-authored-by: luofucong <luofucong@greptime.com>
Co-authored-by: Lei, Huang <6406592+v0y4g3r@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 16, 2022
1 parent ce11a64 commit 872ac80
Showing 37 changed files with 275 additions and 149 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
node_id = 42
mode = 'distributed'
rpc_addr = '0.0.0.0:3001'
rpc_addr = '127.0.0.1:3001'
wal_dir = '/tmp/greptimedb/wal'
rpc_runtime_size = 8
mysql_addr = '0.0.0.0:3306'
mysql_addr = '127.0.0.1:3306'
mysql_runtime_size = 4

[storage]
2 changes: 1 addition & 1 deletion config/frontend.example.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mode = 'distributed'
datanode_rpc_addr = '127.0.0.1:3001'
http_addr = '0.0.0.0:4000'
http_addr = '127.0.0.1:4000'

[meta_client_opts]
metasrv_addr = '1.1.1.1:3002'
6 changes: 3 additions & 3 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
bind_addr = '127.0.0.1:3002'
server_addr = '0.0.0.0:3002'
store_addr = '127.0.0.1:2380'
datanode_lease_secs = 30
server_addr = '127.0.0.1:3002'
store_addr = '127.0.0.1:2379'
datanode_lease_secs = 15
12 changes: 6 additions & 6 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
node_id = 0
mode = 'standalone'
http_addr = '0.0.0.0:4000'
datanode_mysql_addr = '0.0.0.0:3306'
http_addr = '127.0.0.1:4000'
datanode_mysql_addr = '127.0.0.1:3306'
datanode_mysql_runtime_size = 4
wal_dir = '/tmp/greptimedb/wal/'

@@ -10,25 +10,25 @@ type = 'File'
data_dir = '/tmp/greptimedb/data/'

[grpc_options]
addr = '0.0.0.0:4001'
addr = '127.0.0.1:4001'
runtime_size = 8

[mysql_options]
addr = '0.0.0.0:4002'
addr = '127.0.0.1:4002'
runtime_size = 2

[influxdb_options]
enable = true

[opentsdb_options]
addr = '0.0.0.0:4242'
addr = '127.0.0.1:4242'
enable = true
runtime_size = 2

[prometheus_options]
enable = true

[postgres_options]
addr = '0.0.0.0:4003'
addr = '127.0.0.1:4003'
runtime_size = 2
check_pwd = false
10 changes: 8 additions & 2 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
use std::sync::Arc;

use api::v1::codec::SelectResult as GrpcSelectResult;
use api::v1::column::SemanticType;
use api::v1::{
object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, PhysicalPlan,
@@ -219,7 +220,12 @@ impl TryFrom<ObjectResult> for Output {
.map(|(column, vector)| {
let datatype = vector.data_type();
// nullable or not, does not affect the output
ColumnSchema::new(&column.column_name, datatype, true)
let mut column_schema =
ColumnSchema::new(&column.column_name, datatype, true);
if column.semantic_type == SemanticType::Timestamp as i32 {
column_schema = column_schema.with_time_index(true);
}
column_schema
})
.collect::<Vec<ColumnSchema>>();

@@ -251,7 +257,7 @@ impl TryFrom<ObjectResult> for Output {
mod tests {
use api::helper::ColumnDataTypeWrapper;
use api::v1::Column;
use datanode::server::grpc::select::{null_mask, values};
use common_grpc::select::{null_mask, values};
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, UInt16Vector,
4 changes: 2 additions & 2 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
@@ -145,9 +145,9 @@ mod tests {
)),
};
let options: DatanodeOptions = cmd.try_into().unwrap();
assert_eq!("0.0.0.0:3001".to_string(), options.rpc_addr);
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir);
assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr);
assert_eq!("127.0.0.1:3306".to_string(), options.mysql_addr);
assert_eq!(4, options.mysql_runtime_size);
assert_eq!(
"1.1.1.1:3002".to_string(),
10 changes: 5 additions & 5 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
@@ -104,13 +104,13 @@ mod tests {
fn test_read_from_cmd() {
let cmd = StartCommand {
bind_addr: Some("127.0.0.1:3002".to_string()),
server_addr: Some("0.0.0.0:3002".to_string()),
server_addr: Some("127.0.0.1:3002".to_string()),
store_addr: Some("127.0.0.1:2380".to_string()),
config_file: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
}

@@ -127,8 +127,8 @@ mod tests {
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("0.0.0.0:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2380".to_string(), options.store_addr);
assert_eq!(30, options.datanode_lease_secs);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!("127.0.0.1:2379".to_string(), options.store_addr);
assert_eq!(15, options.datanode_lease_secs);
}
}
13 changes: 8 additions & 5 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ pub struct StandaloneOptions {
impl Default for StandaloneOptions {
fn default() -> Self {
Self {
http_addr: Some("0.0.0.0:4000".to_string()),
http_addr: Some("127.0.0.1:4000".to_string()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
@@ -87,7 +87,7 @@ impl Default for StandaloneOptions {
mode: Mode::Standalone,
wal_dir: "/tmp/greptimedb/wal".to_string(),
storage: ObjectStoreConfig::default(),
datanode_mysql_addr: "0.0.0.0:3306".to_string(),
datanode_mysql_addr: "127.0.0.1:3306".to_string(),
datanode_mysql_runtime_size: 4,
}
}
@@ -274,12 +274,15 @@ mod tests {
let fe_opts = FrontendOptions::try_from(cmd).unwrap();
assert_eq!(Mode::Standalone, fe_opts.mode);
assert_eq!("127.0.0.1:3001".to_string(), fe_opts.datanode_rpc_addr);
assert_eq!(Some("0.0.0.0:4000".to_string()), fe_opts.http_addr);
assert_eq!(Some("127.0.0.1:4000".to_string()), fe_opts.http_addr);
assert_eq!(
"0.0.0.0:4001".to_string(),
"127.0.0.1:4001".to_string(),
fe_opts.grpc_options.unwrap().addr
);
assert_eq!("0.0.0.0:4002", fe_opts.mysql_options.as_ref().unwrap().addr);
assert_eq!(
"127.0.0.1:4002",
fe_opts.mysql_options.as_ref().unwrap().addr
);
assert_eq!(2, fe_opts.mysql_options.as_ref().unwrap().runtime_size);
assert!(fe_opts.influxdb_options.as_ref().unwrap().enable);
}
3 changes: 3 additions & 0 deletions src/common/grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -9,7 +9,10 @@ api = { path = "../../api" }
async-trait = "0.1"
common-base = { path = "../base" }
common-error = { path = "../error" }
common-query = { path = "../query" }
common-recordbatch = { path = "../recordbatch" }
common-runtime = { path = "../runtime" }
datatypes = { path = "../../datatypes" }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = [
"simd",
20 changes: 19 additions & 1 deletion src/common/grpc/src/error.rs
Original file line number Diff line number Diff line change
@@ -69,6 +69,21 @@ pub enum Error {
source: tonic::transport::Error,
backtrace: Backtrace,
},

#[snafu(display("Failed to collect RecordBatches, source: {}", source))]
CollectRecordBatches {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},

#[snafu(display("Failed to convert Arrow type: {}", from))]
Conversion { from: String, backtrace: Backtrace },

#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(backtrace)]
source: api::error::Error,
},
}

impl ErrorExt for Error {
@@ -83,7 +98,10 @@ impl ErrorExt for Error {
}
Error::NewProjection { .. }
| Error::DecodePhysicalPlanNode { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
| Error::CreateChannel { .. }
| Error::Conversion { .. } => StatusCode::Internal,
Error::CollectRecordBatches { source } => source.status_code(),
Error::ColumnDataType { source } => source.status_code(),
}
}

1 change: 1 addition & 0 deletions src/common/grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
pub mod channel_manager;
pub mod error;
pub mod physical;
pub mod select;
pub mod writer;

pub use error::Error;
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ use api::v1::column::{SemanticType, Values};
use api::v1::{Column, ObjectResult};
use arrow::array::{Array, BooleanArray, PrimitiveArray};
use common_base::BitVec;
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::{util, RecordBatches, SendableRecordBatchStream};
@@ -30,7 +31,7 @@ use snafu::{OptionExt, ResultExt};

use crate::error::{self, ConversionSnafu, Result};

pub async fn to_object_result(output: Result<Output>) -> ObjectResult {
pub async fn to_object_result(output: std::result::Result<Output, impl ErrorExt>) -> ObjectResult {
let result = match output {
Ok(Output::AffectedRows(rows)) => Ok(ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
@@ -208,7 +209,7 @@ mod tests {
use datatypes::schema::Schema;
use datatypes::vectors::{UInt32Vector, VectorRef};

use crate::server::grpc::select::{null_mask, try_convert, values};
use crate::select::{null_mask, try_convert, values};

#[test]
fn test_convert_record_batches_to_select_result() {
13 changes: 13 additions & 0 deletions src/common/query/src/error.rs
Original file line number Diff line number Diff line change
@@ -114,6 +114,12 @@ pub enum InnerError {
#[snafu(backtrace)]
source: DataTypeError,
},

#[snafu(display("Failed to execute physical plan, source: {}", source))]
ExecutePhysicalPlan {
#[snafu(backtrace)]
source: BoxedError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
@@ -141,6 +147,7 @@ impl ErrorExt for InnerError {
InnerError::UnsupportedInputDataType { .. } => StatusCode::InvalidArguments,

InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
InnerError::ExecutePhysicalPlan { source } => source.status_code(),
}
}

@@ -165,6 +172,12 @@ impl From<Error> for DataFusionError {
}
}

impl From<BoxedError> for Error {
fn from(source: BoxedError) -> Self {
InnerError::ExecutePhysicalPlan { source }.into()
}
}

#[cfg(test)]
mod tests {
use arrow::error::ArrowError;
4 changes: 2 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
@@ -55,9 +55,9 @@ impl Default for DatanodeOptions {
fn default() -> Self {
Self {
node_id: 0,
rpc_addr: "0.0.0.0:3001".to_string(),
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_runtime_size: 8,
mysql_addr: "0.0.0.0:3306".to_string(),
mysql_addr: "127.0.0.1:3306".to_string(),
mysql_runtime_size: 2,
meta_client_opts: MetaClientOpts::default(),
wal_dir: "/tmp/greptimedb/wal".to_string(),
12 changes: 0 additions & 12 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
@@ -145,9 +145,6 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display("Failed to convert datafusion type: {}", from))]
Conversion { from: String },

#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String },

@@ -229,12 +226,6 @@ pub enum Error {
source: script::error::Error,
},

#[snafu(display("Failed to collect RecordBatches, source: {}", source))]
CollectRecordBatches {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},

#[snafu(display(
"Failed to parse string to timestamp, string: {}, source: {}",
raw,
@@ -338,7 +329,6 @@ impl ErrorExt for Error {
| Error::CreateDir { .. }
| Error::InsertSystemCatalog { .. }
| Error::RegisterSchema { .. }
| Error::Conversion { .. }
| Error::IntoPhysicalPlan { .. }
| Error::UnsupportedExpr { .. }
| Error::ColumnDataType { .. }
@@ -349,8 +339,6 @@ impl ErrorExt for Error {
Error::StartScriptManager { source } => source.status_code(),
Error::OpenStorageEngine { source } => source.status_code(),
Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted,
Error::CollectRecordBatches { source } => source.status_code(),

Error::MetaClientInit { source, .. } => source.status_code(),
Error::InsertData { source, .. } => source.status_code(),
Error::EmptyInsertBatch => StatusCode::InvalidArguments,
2 changes: 1 addition & 1 deletion src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::select::to_object_result;
use common_insert::insertion_expr_to_request;
use common_query::Output;
use query::plan::LogicalPlan;
@@ -36,7 +37,6 @@ use crate::error::{
};
use crate::instance::Instance;
use crate::server::grpc::plan::PhysicalPlanner;
use crate::server::grpc::select::to_object_result;

impl Instance {
pub async fn execute_grpc_insert(
1 change: 0 additions & 1 deletion src/datanode/src/server/grpc.rs
Original file line number Diff line number Diff line change
@@ -14,4 +14,3 @@

mod ddl;
pub(crate) mod plan;
pub mod select;
2 changes: 1 addition & 1 deletion src/datatypes/src/data_type.rs
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ impl ConcreteDataType {
matches!(self, ConcreteDataType::Boolean(_))
}

pub fn is_string(&self) -> bool {
pub fn stringifiable(&self) -> bool {
matches!(
self,
ConcreteDataType::String(_)
Loading

0 comments on commit 872ac80

Please sign in to comment.