Skip to content

Commit

Permalink
feat!: improve region manifest service (GreptimeTeam#1268)
Browse files Browse the repository at this point in the history
* feat: try to use batch delete in ManifestLogStorage

* feat: clean temp dir when startup with file backend

* refactor: export region manifest checkpoint actions magin and refactor storage options

* feat: purge unused manifest and checkpoint files by repeat gc task

* chore: debug deleted logs

* feat: adds RepeatedTask and refactor all gc tasks

* chore: clean code

* feat: export gc_duration to manifest config

* test: assert gc works

* fix: typo

* Update src/common/runtime/src/error.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* Update src/common/runtime/src/repeated_task.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* Update src/common/runtime/src/repeated_task.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* fix: format

* Update src/common/runtime/src/repeated_task.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: by CR comments

* chore: by CR comments

* fix: serde default for StorageConfig

* chore: remove compaction config in StandaloneOptions

---------

Co-authored-by: LFC <bayinamine@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
  • Loading branch information
3 people authored Mar 31, 2023
1 parent b71bb4e commit 563adba
Show file tree
Hide file tree
Showing 31 changed files with 686 additions and 190 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ type = "File"
data_dir = "/tmp/greptimedb/data/"

# Compaction options, see `standalone.example.toml`.
[compaction]
[storage.compaction]
max_inflight_tasks = 4
max_files_in_level0 = 8
max_purge_tasks = 32

# Storage manifest options
[storage.manifest]
# Region checkpoint actions margin.
# Create a checkpoint every <checkpoint_margin> actions.
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '30s'

# Procedure storage options, see `standalone.example.toml`.
# [procedure.store]
# type = "File"
Expand Down
11 changes: 10 additions & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,23 @@ type = "File"
data_dir = "/tmp/greptimedb/data/"

# Compaction options.
[compaction]
[storage.compaction]
# Max task number that can concurrently run.
max_inflight_tasks = 4
# Max files in level 0 to trigger compaction.
max_files_in_level0 = 8
# Max task number for SST purge task after compaction.
max_purge_tasks = 32

# Storage manifest options
[storage.manifest]
# Region checkpoint actions margin.
# Create a checkpoint every <checkpoint_margin> actions.
checkpoint_margin = 10
# Region manifest logs and checkpoints gc execution duration
gc_duration = '30s'


# Procedure storage options.
# Uncomment to enable.
# [procedure.store]
Expand Down
33 changes: 22 additions & 11 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
}

if let Some(data_dir) = cmd.data_dir {
opts.storage = ObjectStoreConfig::File(FileConfig { data_dir });
opts.storage.store = ObjectStoreConfig::File(FileConfig { data_dir });
}

if let Some(wal_dir) = cmd.wal_dir {
Expand All @@ -167,7 +167,7 @@ mod tests {
use std::time::Duration;

use common_test_util::temp_dir::create_named_temp_file;
use datanode::datanode::{CompactionConfig, ObjectStoreConfig};
use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig};
use servers::Mode;

use super::*;
Expand Down Expand Up @@ -203,10 +203,14 @@ mod tests {
type = "File"
data_dir = "/tmp/greptimedb/data/"
[compaction]
max_inflight_tasks = 4
max_files_in_level0 = 8
[storage.compaction]
max_inflight_tasks = 3
max_files_in_level0 = 7
max_purge_tasks = 32
[storage.manifest]
checkpoint_margin = 9
gc_duration = '7s'
"#;
write!(file, "{}", toml_str).unwrap();

Expand Down Expand Up @@ -237,21 +241,28 @@ mod tests {
assert_eq!(3000, timeout_millis);
assert!(tcp_nodelay);

match options.storage {
ObjectStoreConfig::File(FileConfig { data_dir }) => {
assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir)
match &options.storage.store {
ObjectStoreConfig::File(FileConfig { data_dir, .. }) => {
assert_eq!("/tmp/greptimedb/data/", data_dir)
}
ObjectStoreConfig::S3 { .. } => unreachable!(),
ObjectStoreConfig::Oss { .. } => unreachable!(),
};

assert_eq!(
CompactionConfig {
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_inflight_tasks: 3,
max_files_in_level0: 7,
max_purge_tasks: 32,
},
options.compaction
options.storage.compaction,
);
assert_eq!(
RegionManifestConfig {
checkpoint_margin: Some(9),
gc_duration: Some(Duration::from_secs(7)),
},
options.storage.manifest,
);
}

Expand Down
11 changes: 3 additions & 8 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use std::sync::Arc;
use clap::Parser;
use common_base::Plugins;
use common_telemetry::info;
use datanode::datanode::{
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
};
use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig, WalConfig};
use datanode::instance::InstanceRef;
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
Expand Down Expand Up @@ -82,8 +80,7 @@ pub struct StandaloneOptions {
pub prometheus_options: Option<PrometheusOptions>,
pub prom_options: Option<PromOptions>,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub storage: StorageConfig,
pub procedure: Option<ProcedureConfig>,
}

Expand All @@ -101,8 +98,7 @@ impl Default for StandaloneOptions {
prometheus_options: Some(PrometheusOptions::default()),
prom_options: Some(PromOptions::default()),
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
compaction: CompactionConfig::default(),
storage: StorageConfig::default(),
procedure: None,
}
}
Expand All @@ -129,7 +125,6 @@ impl StandaloneOptions {
enable_memory_catalog: self.enable_memory_catalog,
wal: self.wal,
storage: self.storage,
compaction: self.compaction,
procedure: self.procedure,
..Default::default()
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ edition.workspace = true
license.workspace = true

[dependencies]
async-trait.workspace = true
common-error = { path = "../error" }
common-telemetry = { path = "../telemetry" }
metrics = "0.20"
once_cell = "1.12"
paste.workspace = true
snafu.workspace = true
tokio.workspace = true
tokio-util.workspace = true

[dev-dependencies]
tokio-test = "0.4"
14 changes: 14 additions & 0 deletions src/common/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::any::Any;

use common_error::prelude::*;
use tokio::task::JoinError;

pub type Result<T> = std::result::Result<T, Error>;

Expand All @@ -26,6 +27,19 @@ pub enum Error {
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Repeated task {} not started yet", name))]
IllegalState { name: String, backtrace: Backtrace },

#[snafu(display(
"Failed to wait for repeated task {} to stop, source: {}",
name,
source
))]
WaitGcTaskStop {
name: String,
source: JoinError,
backtrace: Backtrace,
},
}

impl ErrorExt for Error {
Expand Down
2 changes: 2 additions & 0 deletions src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod error;
mod global;
pub mod metric;
mod repeated_task;
pub mod runtime;

pub use global::{
Expand All @@ -23,4 +24,5 @@ pub use global::{
spawn_read, spawn_write, write_runtime,
};

pub use crate::repeated_task::{RepeatedTask, TaskFunction, TaskFunctionRef};
pub use crate::runtime::{Builder, JoinError, JoinHandle, Runtime};
Loading

0 comments on commit 563adba

Please sign in to comment.