diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index f46faeda30c..67ecd97e248 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -78,6 +78,8 @@ struct flb_config { /* main configuration */ struct flb_cf *cf_main; + /* command line configuration (handled by fluent-bit bin) */ + struct flb_cf *cf_opts; struct mk_list cf_parsers_list; flb_sds_t program_name; /* argv[0] */ @@ -88,6 +90,12 @@ struct flb_config { */ char *conf_path; + /* if the configuration come from the file system, store the given path */ + flb_sds_t conf_path_file; + + /* if the external plugins come from the file system, store the given paths from command line */ + struct mk_list external_plugins; + /* Event */ struct mk_event event_flush; struct mk_event event_shutdown; @@ -247,6 +255,8 @@ struct flb_config { int enable_chunk_trace; #endif /* FLB_HAVE_CHUNK_TRACE */ + int enable_hot_reload; + /* Co-routines */ unsigned int coro_stack_size; @@ -279,6 +289,7 @@ const char *flb_config_prop_get(const char *key, struct mk_list *list); int flb_config_set_property(struct flb_config *config, const char *k, const char *v); int flb_config_set_program_name(struct flb_config *config, char *name); +int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf); int set_log_level_from_env(struct flb_config *config); #ifdef FLB_HAVE_STATIC_CONF diff --git a/include/fluent-bit/flb_custom.h b/include/fluent-bit/flb_custom.h index 54684ea48d4..f75e8de156d 100644 --- a/include/fluent-bit/flb_custom.h +++ b/include/fluent-bit/flb_custom.h @@ -85,6 +85,8 @@ struct flb_custom_instance *flb_custom_new(struct flb_config *config, const char *custom, void *data); void flb_custom_exit(struct flb_config *config); const char *flb_custom_name(struct flb_custom_instance *ins); +int flb_custom_plugin_property_check(struct flb_custom_instance *ins, + struct flb_config *config); int flb_custom_init_all(struct flb_config *config); void flb_custom_set_context(struct flb_custom_instance *ins, void *context); void flb_custom_instance_destroy(struct flb_custom_instance *ins); diff --git a/include/fluent-bit/flb_filter.h b/include/fluent-bit/flb_filter.h index 0285862e361..a9b686bbba7 100644 --- a/include/fluent-bit/flb_filter.h +++ b/include/fluent-bit/flb_filter.h @@ -135,6 +135,9 @@ void flb_filter_do(struct flb_input_chunk *ic, struct flb_config *config); const char *flb_filter_name(struct flb_filter_instance *ins); +int flb_filter_match_property_existence(struct flb_filter_instance *ins); +int flb_filter_plugin_property_check(struct flb_filter_instance *ins, + struct flb_config *config); int flb_filter_init(struct flb_config *config, struct flb_filter_instance *ins); int flb_filter_init_all(struct flb_config *config); void flb_filter_set_context(struct flb_filter_instance *ins, void *context); diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index 5c9e28f7a77..da362a00b56 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -702,6 +702,11 @@ void flb_input_instance_exit(struct flb_input_instance *ins, struct flb_config *config); void flb_input_instance_destroy(struct flb_input_instance *ins); +int flb_input_net_property_check(struct flb_input_instance *ins, + struct flb_config *config); +int flb_input_plugin_property_check(struct flb_input_instance *ins, + struct flb_config *config); + int flb_input_init_all(struct flb_config *config); void flb_input_pre_run_all(struct flb_config *config); void flb_input_exit_all(struct flb_config *config); diff --git a/include/fluent-bit/flb_lib.h b/include/fluent-bit/flb_lib.h index 7e927529799..eb32ad6258e 100644 --- a/include/fluent-bit/flb_lib.h +++ b/include/fluent-bit/flb_lib.h @@ -79,4 +79,11 @@ FLB_EXPORT int flb_loop(flb_ctx_t *ctx); FLB_EXPORT int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len); FLB_EXPORT int flb_lib_config_file(flb_ctx_t *ctx, const char *path); +/* library context handling */ +FLB_EXPORT void flb_context_set(flb_ctx_t *ctx); +FLB_EXPORT flb_ctx_t *flb_context_get(); + +FLB_EXPORT void flb_cf_context_set(struct flb_cf *cf); +FLB_EXPORT struct flb_cf *flb_cf_context_get(); + #endif diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index acc67b4e743..e528401c014 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -829,6 +829,10 @@ void flb_output_pre_run(struct flb_config *config); void flb_output_exit(struct flb_config *config); void flb_output_set_context(struct flb_output_instance *ins, void *context); int flb_output_instance_destroy(struct flb_output_instance *ins); +int flb_output_net_property_check(struct flb_output_instance *ins, + struct flb_config *config); +int flb_output_plugin_property_check(struct flb_output_instance *ins, + struct flb_config *config); int flb_output_init_all(struct flb_config *config); int flb_output_check(struct flb_config *config); int flb_output_log_check(struct flb_output_instance *ins, int l); diff --git a/include/fluent-bit/flb_parser.h b/include/fluent-bit/flb_parser.h index 71ab443c68c..7d5cfcbc74f 100644 --- a/include/fluent-bit/flb_parser.h +++ b/include/fluent-bit/flb_parser.h @@ -97,6 +97,7 @@ struct flb_parser *flb_parser_create(const char *name, const char *format, int types_len, struct mk_list *decoders, struct flb_config *config); +int flb_parser_conf_file_stat(const char *file, struct flb_config *config); int flb_parser_conf_file(const char *file, struct flb_config *config); void flb_parser_destroy(struct flb_parser *parser); struct flb_parser *flb_parser_get(const char *name, struct flb_config *config); diff --git a/include/fluent-bit/flb_reload.h b/include/fluent-bit/flb_reload.h new file mode 100644 index 00000000000..801e52a0de1 --- /dev/null +++ b/include/fluent-bit/flb_reload.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2022 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_RELOAD_H +#define FLB_RELOAD_H + +#include +#include +#include +#include + +int flb_reload_property_check_all(struct flb_config *config); +int flb_reload_reconstruct_cf(struct flb_cf *src_cf, struct flb_cf *dest_cf); +int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts); + +#endif diff --git a/plugins/in_exec_wasi/in_exec_wasi.c b/plugins/in_exec_wasi/in_exec_wasi.c index e1f90d241d6..2515dd8584f 100644 --- a/plugins/in_exec_wasi/in_exec_wasi.c +++ b/plugins/in_exec_wasi/in_exec_wasi.c @@ -289,7 +289,11 @@ static int in_exec_wasi_init(struct flb_input_instance *in, if (!ctx) { return -1; } + ctx->parser = NULL; ctx->parser_name = NULL; + ctx->wasm = NULL; + ctx->wasi_path = NULL; + ctx->oneshot = FLB_FALSE; /* Initialize exec config */ ret = in_exec_wasi_config_read(ctx, in, config); @@ -330,6 +334,7 @@ static int in_exec_wasi_init(struct flb_input_instance *in, flb_plg_error(in, "could not set collector for exec input plugin"); goto init_error; } + ctx->coll_fd = ret; return 0; @@ -339,6 +344,20 @@ static int in_exec_wasi_init(struct flb_input_instance *in, return -1; } +static void in_exec_wasi_pause(void *data, struct flb_config *config) +{ + struct flb_exec_wasi *ctx = data; + + flb_input_collector_pause(ctx->coll_fd, ctx->ins); +} + +static void in_exec_wasi_resume(void *data, struct flb_config *config) +{ + struct flb_exec_wasi *ctx = data; + + flb_input_collector_resume(ctx->coll_fd, ctx->ins); +} + static int in_exec_wasi_prerun(struct flb_input_instance *ins, struct flb_config *config, void *in_context) { @@ -416,6 +435,8 @@ struct flb_input_plugin in_exec_wasi_plugin = { .description = "Exec WASI Input", .cb_init = in_exec_wasi_init, .cb_pre_run = in_exec_wasi_prerun, + .cb_pause = in_exec_wasi_pause, + .cb_resume = in_exec_wasi_resume, .cb_collect = in_exec_wasi_collect, .cb_flush_buf = NULL, .cb_exit = in_exec_wasi_exit, diff --git a/plugins/in_exec_wasi/in_exec_wasi.h b/plugins/in_exec_wasi/in_exec_wasi.h index 30be07d5ef5..132407bb94e 100644 --- a/plugins/in_exec_wasi/in_exec_wasi.h +++ b/plugins/in_exec_wasi/in_exec_wasi.h @@ -49,6 +49,7 @@ struct flb_exec_wasi { int interval_sec; int interval_nsec; struct flb_log_event_encoder log_encoder; + int coll_fd; }; #endif /* FLB_IN_EXEC_WASI_H */ diff --git a/plugins/in_lib/in_lib.h b/plugins/in_lib/in_lib.h index 000a5dcb998..a5fc8a9ea0e 100644 --- a/plugins/in_lib/in_lib.h +++ b/plugins/in_lib/in_lib.h @@ -24,9 +24,12 @@ #include #include #include +#include #define LIB_BUF_CHUNK 65536 +pthread_key_t flb_active_lib_context; + /* Library input configuration & context */ struct flb_in_lib_config { int fd; /* instance input channel */ diff --git a/plugins/in_random/random.c b/plugins/in_random/random.c index 9a19b454975..ab6e59ffd6b 100644 --- a/plugins/in_random/random.c +++ b/plugins/in_random/random.c @@ -42,6 +42,7 @@ struct flb_in_random_config { /* Internal */ int samples_count; + int coll_fd; struct flb_input_instance *ins; struct flb_log_event_encoder *log_encoder; @@ -169,10 +170,25 @@ static int in_random_init(struct flb_input_instance *in, flb_free(ctx); return -1; } - + ctx->coll_fd = ret; return 0; } +static void in_random_pause(void *data, struct flb_config *config) +{ + struct flb_in_random_config *ctx = data; + + flb_input_collector_pause(ctx->coll_fd, ctx->ins); + +} + +static void in_random_resume(void *data, struct flb_config *config) +{ + struct flb_in_random_config *ctx = data; + + flb_input_collector_resume(ctx->coll_fd, ctx->ins); +} + static int in_random_exit(void *data, struct flb_config *config) { struct flb_in_random_config *ctx = data; @@ -222,6 +238,8 @@ struct flb_input_plugin in_random_plugin = { .cb_pre_run = NULL, .cb_collect = in_random_collect, .cb_flush_buf = NULL, + .cb_pause = in_random_pause, + .cb_resume = in_random_resume, .cb_exit = in_random_exit, .config_map = config_map }; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9a4df8bdff7..f6fd8109cfd 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -76,6 +76,7 @@ set(src flb_log_event_encoder_primitives.c flb_log_event_encoder_dynamic_field.c flb_processor.c + flb_reload.c ) # Config format diff --git a/src/config_format/flb_config_format.c b/src/config_format/flb_config_format.c index 1649873c08a..a62bda96ab7 100644 --- a/src/config_format/flb_config_format.c +++ b/src/config_format/flb_config_format.c @@ -706,6 +706,40 @@ static void dump_section(struct flb_cf_section *s) } } +static void dump_env(struct mk_list *list) +{ + struct mk_list *head; + struct flb_kv *kv; + + if (mk_list_size(list) == 0) { + return; + } + + printf("> env:\n"); + + mk_list_foreach(head, list) { + kv = mk_list_entry(head, struct flb_kv, _head); + printf(" - %-15s: %s\n", kv->key, kv->val); + } +} + +static void dump_metas(struct mk_list *list) +{ + struct mk_list *head; + struct flb_kv *kv; + + if (mk_list_size(list) == 0) { + return; + } + + printf("> metas:\n"); + + mk_list_foreach(head, list) { + kv = mk_list_entry(head, struct flb_kv, _head); + printf(" - %-15s: %s\n", kv->key, kv->val); + } +} + static void dump_section_list(struct mk_list *list) { struct mk_list *head; @@ -719,6 +753,8 @@ static void dump_section_list(struct mk_list *list) void flb_cf_dump(struct flb_cf *cf) { + dump_metas(&cf->metas); + dump_env(&cf->env); dump_section_list(&cf->sections); } diff --git a/src/flb_config.c b/src/flb_config.c index 378856aa16a..13f30670688 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -275,6 +276,8 @@ struct flb_config *flb_config_init() flb_slist_create(&config->stream_processor_tasks); #endif + flb_slist_create(&config->external_plugins); + /* Set default coroutines stack size */ config->coro_stack_size = FLB_CORO_STACK_SIZE_BYTE; if (config->coro_stack_size < getpagesize()) { @@ -424,6 +427,11 @@ void flb_config_exit(struct flb_config *config) flb_free(config->conf_path); } + /* conf path file (file system config path) */ + if (config->conf_path_file) { + flb_sds_destroy(config->conf_path_file); + } + /* Working directory */ if (config->workdir) { flb_free(config->workdir); @@ -489,6 +497,8 @@ void flb_config_exit(struct flb_config *config) flb_slist_destroy(&config->stream_processor_tasks); #endif + flb_slist_destroy(&config->external_plugins); + if (config->evl) { mk_event_loop_destroy(config->evl); } @@ -502,6 +512,11 @@ void flb_config_exit(struct flb_config *config) flb_cf_destroy(config->cf_main); } + /* cf_opts' lifetime should differ from config's lifetime. + * This member should be storing just for the cf_opts reference. + * Don't destroy it here. + */ + /* remove parsers */ mk_list_foreach_safe(head, tmp, &config->cf_parsers_list) { cf = mk_list_entry(head, struct flb_cf, _head); @@ -671,3 +686,234 @@ int flb_config_set_program_name(struct flb_config *config, char *name) return 0; } + +static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, enum section_type type) +{ + int ret; + char *tmp; + char *name; + char *s_type; + struct mk_list *list; + struct mk_list *head; + struct cfl_list *h_prop; + struct cfl_kvpair *kv; + struct cfl_variant *val; + struct flb_cf_section *s; + struct flb_cf_group *processors = NULL; + int i; + void *ins; + + if (type == FLB_CF_CUSTOM) { + s_type = "custom"; + list = &cf->customs; + } + else if (type == FLB_CF_INPUT) { + s_type = "input"; + list = &cf->inputs; + } + else if (type == FLB_CF_FILTER) { + s_type = "filter"; + list = &cf->filters; + } + else if (type == FLB_CF_OUTPUT) { + s_type = "output"; + list = &cf->outputs; + } + else { + return -1; + } + + mk_list_foreach(head, list) { + s = mk_list_entry(head, struct flb_cf_section, _head_section); + name = flb_cf_section_property_get_string(cf, s, "name"); + if (!name) { + flb_error("[config] section '%s' is missing the 'name' property", + s_type); + return -1; + } + + /* translate the variable */ + tmp = flb_env_var_translate(config->env, name); + + /* create an instance of the plugin */ + ins = NULL; + if (type == FLB_CF_CUSTOM) { + ins = flb_custom_new(config, tmp, NULL); + } + else if (type == FLB_CF_INPUT) { + ins = flb_input_new(config, tmp, NULL, FLB_TRUE); + } + else if (type == FLB_CF_FILTER) { + ins = flb_filter_new(config, tmp, NULL); + } + else if (type == FLB_CF_OUTPUT) { + ins = flb_output_new(config, tmp, NULL, FLB_TRUE); + } + flb_sds_destroy(tmp); + + /* validate the instance creation */ + if (!ins) { + flb_error("[config] section '%s' tried to instance a plugin name " + "that don't exists", name); + flb_sds_destroy(name); + return -1; + } + flb_sds_destroy(name); + + /* + * iterate section properties and populate instance by using specific + * api function. + */ + cfl_list_foreach(h_prop, &s->properties->list) { + kv = cfl_list_entry(h_prop, struct cfl_kvpair, _head); + if (strcasecmp(kv->key, "name") == 0) { + continue; + } + + if (type == FLB_CF_CUSTOM) { + if (kv->val->type == CFL_VARIANT_STRING) { + ret = flb_custom_set_property(ins, kv->key, kv->val->data.as_string); + } else if (kv->val->type == CFL_VARIANT_ARRAY) { + for (i = 0; i < kv->val->data.as_array->entry_count; i++) { + val = kv->val->data.as_array->entries[i]; + ret = flb_custom_set_property(ins, kv->key, val->data.as_string); + } + } + } + else if (type == FLB_CF_INPUT) { + if (kv->val->type == CFL_VARIANT_STRING) { + ret = flb_input_set_property(ins, kv->key, kv->val->data.as_string); + } else if (kv->val->type == CFL_VARIANT_ARRAY) { + for (i = 0; i < kv->val->data.as_array->entry_count; i++) { + val = kv->val->data.as_array->entries[i]; + ret = flb_input_set_property(ins, kv->key, val->data.as_string); + } + } + } + else if (type == FLB_CF_FILTER) { + if (kv->val->type == CFL_VARIANT_STRING) { + ret = flb_filter_set_property(ins, kv->key, kv->val->data.as_string); + } else if (kv->val->type == CFL_VARIANT_ARRAY) { + for (i = 0; i < kv->val->data.as_array->entry_count; i++) { + val = kv->val->data.as_array->entries[i]; + ret = flb_filter_set_property(ins, kv->key, val->data.as_string); + } + } + } + else if (type == FLB_CF_OUTPUT) { + if (kv->val->type == CFL_VARIANT_STRING) { + ret = flb_output_set_property(ins, kv->key, kv->val->data.as_string); + } else if (kv->val->type == CFL_VARIANT_ARRAY) { + for (i = 0; i < kv->val->data.as_array->entry_count; i++) { + val = kv->val->data.as_array->entries[i]; + ret = flb_output_set_property(ins, kv->key, val->data.as_string); + } + } + } + + if (ret == -1) { + flb_error("[config] could not configure property '%s' on " + "%s plugin with section name '%s'", + kv->key, s_type, name); + } + } + + /* Processors */ + processors = flb_cf_group_get(cf, s, "processors"); + if (processors) { + if (type == FLB_CF_INPUT) { + flb_processors_load_from_config_format_group(((struct flb_input_instance *) ins)->processor, processors); + } + else if (type == FLB_CF_OUTPUT) { + flb_processors_load_from_config_format_group(((struct flb_output_instance *) ins)->processor, processors); + } + else { + flb_error("[config] section '%s' does not support processors", s_type); + } + } + } + + return 0; +} +/* Load a struct flb_config_format context into a flb_config instance */ +int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) +{ + int ret; + struct flb_kv *kv; + struct mk_list *head; + struct cfl_kvpair *ckv; + struct cfl_list *chead; + struct flb_cf_section *s; + + /* Process config environment vars */ + mk_list_foreach(head, &cf->env) { + kv = mk_list_entry(head, struct flb_kv, _head); + ret = flb_env_set(config->env, kv->key, kv->val); + if (ret == -1) { + flb_error("could not set config environment variable '%s'", kv->key); + return -1; + } + } + + /* Process all meta commands */ + mk_list_foreach(head, &cf->metas) { + kv = mk_list_entry(head, struct flb_kv, _head); + flb_meta_run(config, kv->key, kv->val); + } + + /* Validate sections */ + mk_list_foreach(head, &cf->sections) { + s = mk_list_entry(head, struct flb_cf_section, _head); + + if (strcasecmp(s->name, "env") == 0 || + strcasecmp(s->name, "service") == 0 || + strcasecmp(s->name, "custom") == 0 || + strcasecmp(s->name, "input") == 0 || + strcasecmp(s->name, "filter") == 0 || + strcasecmp(s->name, "output") == 0) { + + /* continue on valid sections */ + continue; + } + + /* Extra sanity checks */ + if (strcasecmp(s->name, "parser") == 0 || + strcasecmp(s->name, "multiline_parser") == 0) { + fprintf(stderr, + "Sections 'multiline_parser' and 'parser' are not valid in " + "the main configuration file. It belongs to \n" + "the 'parsers_file' configuration files.\n"); + return -1; + } + } + + /* Read main 'service' section */ + s = cf->service; + if (s) { + /* Iterate properties */ + cfl_list_foreach(chead, &s->properties->list) { + ckv = cfl_list_entry(chead, struct cfl_kvpair, _head); + flb_config_set_property(config, ckv->key, ckv->val->data.as_string); + } + } + + ret = configure_plugins_type(config, cf, FLB_CF_CUSTOM); + if (ret == -1) { + return -1; + } + + ret = configure_plugins_type(config, cf, FLB_CF_INPUT); + if (ret == -1) { + return -1; + } + ret = configure_plugins_type(config, cf, FLB_CF_FILTER); + if (ret == -1) { + return -1; + } + ret = configure_plugins_type(config, cf, FLB_CF_OUTPUT); + if (ret == -1) { + return -1; + } + + return 0; +} diff --git a/src/flb_custom.c b/src/flb_custom.c index 6029bff4892..d5c18823ed8 100644 --- a/src/flb_custom.c +++ b/src/flb_custom.c @@ -192,13 +192,47 @@ const char *flb_custom_name(struct flb_custom_instance *ins) return ins->name; } +int flb_custom_plugin_property_check(struct flb_custom_instance *ins, + struct flb_config *config) +{ + int ret = 0; + struct mk_list *config_map; + struct flb_custom_plugin *p = ins->p; + + if (p->config_map) { + /* + * Create a dynamic version of the configmap that will be used by the specific + * instance in question. + */ + config_map = flb_config_map_create(config, p->config_map); + if (!config_map) { + flb_error("[custom] error loading config map for '%s' plugin", + p->name); + return -1; + } + ins->config_map = config_map; + + /* Validate incoming properties against config map */ + ret = flb_config_map_properties_check(ins->p->name, + &ins->properties, ins->config_map); + if (ret == -1) { + if (config->program_name) { + flb_helper("try the command: %s -F %s -h\n", + config->program_name, ins->p->name); + } + return -1; + } + } + + return 0; +} + /* Initialize all custom plugins */ int flb_custom_init_all(struct flb_config *config) { int ret; struct mk_list *tmp; struct mk_list *head; - struct mk_list *config_map; struct flb_custom_plugin *p; struct flb_custom_instance *ins; @@ -226,30 +260,9 @@ int flb_custom_init_all(struct flb_config *config) * Before to call the initialization callback, make sure that the received * configuration parameters are valid if the plugin is registering a config map. */ - if (p->config_map) { - /* - * Create a dynamic version of the configmap that will be used by the specific - * instance in question. - */ - config_map = flb_config_map_create(config, p->config_map); - if (!config_map) { - flb_error("[custom] error loading config map for '%s' plugin", - p->name); - return -1; - } - ins->config_map = config_map; - - /* Validate incoming properties against config map */ - ret = flb_config_map_properties_check(ins->p->name, - &ins->properties, ins->config_map); - if (ret == -1) { - if (config->program_name) { - flb_helper("try the command: %s -F %s -h\n", - config->program_name, ins->p->name); - } - flb_custom_instance_destroy(ins); - return -1; - } + if (flb_custom_plugin_property_check(ins, config) == -1) { + flb_custom_instance_destroy(ins); + return -1; } /* Initialize the input */ diff --git a/src/flb_filter.c b/src/flb_filter.c index 5621506f339..f75176b2e45 100644 --- a/src/flb_filter.c +++ b/src/flb_filter.c @@ -417,19 +417,62 @@ const char *flb_filter_name(struct flb_filter_instance *ins) return ins->name; } -int flb_filter_init(struct flb_config *config, struct flb_filter_instance *ins) +int flb_filter_plugin_property_check(struct flb_filter_instance *ins, + struct flb_config *config) { - int ret; - uint64_t ts; - char *name; + int ret = 0; struct mk_list *config_map; - struct flb_filter_plugin *p; + struct flb_filter_plugin *p = ins->p; + + if (p->config_map) { + /* + * Create a dynamic version of the configmap that will be used by the specific + * instance in question. + */ + config_map = flb_config_map_create(config, p->config_map); + if (!config_map) { + flb_error("[filter] error loading config map for '%s' plugin", + p->name); + return -1; + } + ins->config_map = config_map; + /* Validate incoming properties against config map */ + ret = flb_config_map_properties_check(ins->p->name, + &ins->properties, ins->config_map); + if (ret == -1) { + if (config->program_name) { + flb_helper("try the command: %s -F %s -h\n", + config->program_name, ins->p->name); + } + return -1; + } + } + + return 0; +} + +int flb_filter_match_property_existence(struct flb_filter_instance *ins) +{ if (!ins->match #ifdef FLB_HAVE_REGEX - && !ins->match_regex + && !ins->match_regex #endif ) { + return FLB_FALSE; + } + + return FLB_TRUE; +} + +int flb_filter_init(struct flb_config *config, struct flb_filter_instance *ins) +{ + int ret; + uint64_t ts; + char *name; + struct flb_filter_plugin *p; + + if (flb_filter_match_property_existence(ins) == FLB_FALSE) { flb_warn("[filter] NO match rule for %s filter instance, unloading.", ins->name); return -1; @@ -507,29 +550,8 @@ int flb_filter_init(struct flb_config *config, struct flb_filter_instance *ins) * Before to call the initialization callback, make sure that the received * configuration parameters are valid if the plugin is registering a config map. */ - if (p->config_map) { - /* - * Create a dynamic version of the configmap that will be used by the specific - * instance in question. - */ - config_map = flb_config_map_create(config, p->config_map); - if (!config_map) { - flb_error("[filter] error loading config map for '%s' plugin", - p->name); - return -1; - } - ins->config_map = config_map; - - /* Validate incoming properties against config map */ - ret = flb_config_map_properties_check(ins->p->name, - &ins->properties, ins->config_map); - if (ret == -1) { - if (config->program_name) { - flb_helper("try the command: %s -F %s -h\n", - config->program_name, ins->p->name); - } - return -1; - } + if (flb_filter_plugin_property_check(ins, config) == -1) { + return -1; } /* Initialize the input */ diff --git a/src/flb_input.c b/src/flb_input.c index 790b058cba0..a8a79e4d426 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -847,12 +847,79 @@ static int input_instance_channel_events_init(struct flb_input_instance *ins) return 0; } +int flb_input_net_property_check(struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret = 0; + + /* Get Downstream net_setup configmap */ + ins->net_config_map = flb_downstream_get_config_map(config); + if (!ins->net_config_map) { + flb_input_instance_destroy(ins); + return -1; + } + + /* + * Validate 'net.*' properties: if the plugin use the Downstream interface, + * it might receive some networking settings. + */ + if (mk_list_size(&ins->net_properties) > 0) { + ret = flb_config_map_properties_check(ins->p->name, + &ins->net_properties, + ins->net_config_map); + if (ret == -1) { + if (config->program_name) { + flb_helper("try the command: %s -i %s -h\n", + config->program_name, ins->p->name); + } + return -1; + } + } + + return 0; +} + +int flb_input_plugin_property_check(struct flb_input_instance *ins, + struct flb_config *config) +{ + int ret = 0; + struct mk_list *config_map; + struct flb_input_plugin *p = ins->p; + + if (p->config_map) { + /* + * Create a dynamic version of the configmap that will be used by the specific + * instance in question. + */ + config_map = flb_config_map_create(config, p->config_map); + if (!config_map) { + flb_error("[input] error loading config map for '%s' plugin", + p->name); + flb_input_instance_destroy(ins); + return -1; + } + ins->config_map = config_map; + + /* Validate incoming properties against config map */ + ret = flb_config_map_properties_check(ins->p->name, + &ins->properties, ins->config_map); + if (ret == -1) { + if (config->program_name) { + flb_helper("try the command: %s -i %s -h\n", + config->program_name, ins->p->name); + } + return -1; + } + } + + return 0; +} + int flb_input_instance_init(struct flb_input_instance *ins, struct flb_config *config) { int ret; struct flb_config *ctx = ins->config; - struct mk_list *config_map; struct flb_input_plugin *p = ins->p; int tls_session_mode; @@ -1000,31 +1067,8 @@ int flb_input_instance_init(struct flb_input_instance *ins, * Before to call the initialization callback, make sure that the received * configuration parameters are valid if the plugin is registering a config map. */ - if (p->config_map) { - /* - * Create a dynamic version of the configmap that will be used by the specific - * instance in question. - */ - config_map = flb_config_map_create(config, p->config_map); - if (!config_map) { - flb_error("[input] error loading config map for '%s' plugin", - p->name); - flb_input_instance_destroy(ins); - return -1; - } - ins->config_map = config_map; - - /* Validate incoming properties against config map */ - ret = flb_config_map_properties_check(ins->p->name, - &ins->properties, ins->config_map); - if (ret == -1) { - if (config->program_name) { - flb_helper("try the command: %s -i %s -h\n", - config->program_name, ins->p->name); - } - flb_input_instance_destroy(ins); - return -1; - } + if (flb_input_plugin_property_check(ins, config) == -1) { + return -1; } #ifdef FLB_HAVE_TLS @@ -1035,8 +1079,6 @@ int flb_input_instance_init(struct flb_input_instance *ins, "(certificate file missing)", ins->name); - flb_input_instance_destroy(ins); - return -1; } else if (ins->tls_key_file == NULL) { @@ -1044,8 +1086,6 @@ int flb_input_instance_init(struct flb_input_instance *ins, "(private key file missing)", ins->name); - flb_input_instance_destroy(ins); - return -1; } @@ -1069,8 +1109,6 @@ int flb_input_instance_init(struct flb_input_instance *ins, flb_error("[input %s] error initializing TLS context", ins->name); - flb_input_instance_destroy(ins); - return -1; } } @@ -1081,8 +1119,6 @@ int flb_input_instance_init(struct flb_input_instance *ins, ins->tls_config_map = flb_tls_get_config_map(config); if (ins->tls_config_map == NULL) { - flb_input_instance_destroy(ins); - return -1; } @@ -1099,31 +1135,10 @@ int flb_input_instance_init(struct flb_input_instance *ins, /* Init network defaults */ flb_net_setup_init(&ins->net_setup); - /* Get Downstream net_setup configmap */ - ins->net_config_map = flb_downstream_get_config_map(config); - if (!ins->net_config_map) { - flb_input_instance_destroy(ins); + if (flb_input_net_property_check(ins, config) == -1) { return -1; } - /* - * Validate 'net.*' properties: if the plugin use the Downstream interface, - * it might receive some networking settings. - */ - if (mk_list_size(&ins->net_properties) > 0) { - ret = flb_config_map_properties_check(ins->p->name, - &ins->net_properties, - ins->net_config_map); - if (ret == -1) { - if (config->program_name) { - flb_helper("try the command: %s -i %s -h\n", - config->program_name, ins->p->name); - } - flb_input_instance_destroy(ins); - return -1; - } - } - /* Initialize the input */ if (p->cb_init) { flb_plg_info(ins, "initializing"); @@ -1143,7 +1158,6 @@ int flb_input_instance_init(struct flb_input_instance *ins, if (ret != 0) { flb_error("failed initialize input %s", ins->name); - flb_input_instance_destroy(ins); return -1; } @@ -1152,7 +1166,6 @@ int flb_input_instance_init(struct flb_input_instance *ins, if (ret != 0) { flb_error("failed initialize channel events on input %s", ins->name); - flb_input_instance_destroy(ins); return -1; } @@ -1161,7 +1174,6 @@ int flb_input_instance_init(struct flb_input_instance *ins, if (ret) { flb_error("failed while registering ring buffer events on input %s", ins->name); - flb_input_instance_destroy(ins); return -1; } } @@ -1176,7 +1188,6 @@ int flb_input_instance_init(struct flb_input_instance *ins, if (ret != 0) { flb_error("failed initialize input %s", ins->name); - flb_input_instance_destroy(ins); return -1; } } @@ -1228,7 +1239,7 @@ int flb_input_init_all(struct flb_config *config) /* Initialize instance */ ret = flb_input_instance_init(ins, config); if (ret == -1) { - /* do nothing, it's ok if it fails */ + flb_input_instance_destroy(ins); return -1; } } diff --git a/src/flb_lib.c b/src/flb_lib.c index 996816e2393..10e83850df7 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -51,6 +51,12 @@ struct flb_aws_error_reporter *error_reporter; /* thread initializator */ static pthread_once_t flb_lib_once = PTHREAD_ONCE_INIT; +/* reference to the last 'flb_lib_ctx' context started through flb_start() */ +FLB_TLS_DEFINE(flb_ctx_t, flb_lib_active_context); + +/* reference to the last 'flb_cf' context started through flb_start() */ +FLB_TLS_DEFINE(struct flb_cf, flb_lib_active_cf_context); + #ifdef FLB_SYSTEM_WINDOWS static inline int flb_socket_init_win32(void) { @@ -122,6 +128,9 @@ void flb_init_env() flb_downstream_init(); flb_output_prepare(); + FLB_TLS_INIT(flb_lib_active_context); + FLB_TLS_INIT(flb_lib_active_cf_context); + /* libraries */ cmt_initialize(); } @@ -196,7 +205,7 @@ flb_ctx_t *flb_create() ctx->event_channel); if (ret != 0) { flb_error("[lib] could not create notification channels"); - flb_config_exit(ctx->config); + flb_stop(ctx); flb_destroy(ctx); return NULL; } @@ -226,7 +235,7 @@ void flb_destroy(flb_ctx_t *ctx) mk_event_loop_destroy(ctx->event_loop); /* cfg->is_running is set to false when flb_engine_shutdown has been invoked (event loop) */ - if(ctx->config) { + if (ctx->config) { if (ctx->config->is_running == FLB_TRUE) { flb_engine_shutdown(ctx->config); } @@ -599,7 +608,6 @@ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len) int ret; struct flb_input_instance *i_ins; - if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) { flb_error("[lib] cannot push data, engine is not running"); return -1; @@ -657,6 +665,12 @@ int flb_start(flb_ctx_t *ctx) pthread_once(&flb_lib_once, flb_init_env); + flb_debug("[lib] context set: %p", ctx); + + /* set context as the last active one */ + flb_context_set(ctx); + + /* spawn worker thread */ config = ctx->config; ret = mk_utils_worker_spawn(flb_lib_worker, ctx, &tid); if (ret == -1) { @@ -692,6 +706,9 @@ int flb_start(flb_ctx_t *ctx) ctx->status = FLB_LIB_ERROR; return -1; } + else { + flb_error("[lib] other error"); + } } return 0; @@ -711,6 +728,8 @@ int flb_stop(flb_ctx_t *ctx) int ret; pthread_t tid; + flb_debug("[lib] ctx stop address: %p, config context=%p\n", ctx, ctx->config); + tid = ctx->config->worker; if (ctx->status == FLB_LIB_NONE || ctx->status == FLB_LIB_ERROR) { @@ -748,3 +767,30 @@ int flb_stop(flb_ctx_t *ctx) return ret; } + + +void flb_context_set(flb_ctx_t *ctx) +{ + FLB_TLS_SET(flb_lib_active_context, ctx); +} + +flb_ctx_t *flb_context_get() +{ + flb_ctx_t *ctx; + + ctx = FLB_TLS_GET(flb_lib_active_context); + return ctx; +} + +void flb_cf_context_set(struct flb_cf *cf) +{ + FLB_TLS_SET(flb_lib_active_cf_context, cf); +} + +struct flb_cf *flb_cf_context_get() +{ + struct flb_cf *cf; + + cf = FLB_TLS_GET(flb_lib_active_cf_context); + return cf; +} diff --git a/src/flb_output.c b/src/flb_output.c index 62aa60d7029..d892dfa7402 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -985,6 +985,74 @@ void *flb_output_get_cmt_instance(struct flb_output_instance *ins) } #endif +int flb_output_net_property_check(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret = 0; + + /* Get Upstream net_setup configmap */ + ins->net_config_map = flb_upstream_get_config_map(config); + if (!ins->net_config_map) { + flb_output_instance_destroy(ins); + return -1; + } + + /* + * Validate 'net.*' properties: if the plugin use the Upstream interface, + * it might receive some networking settings. + */ + if (mk_list_size(&ins->net_properties) > 0) { + ret = flb_config_map_properties_check(ins->p->name, + &ins->net_properties, + ins->net_config_map); + if (ret == -1) { + if (config->program_name) { + flb_helper("try the command: %s -o %s -h\n", + config->program_name, ins->p->name); + } + return -1; + } + } + + return 0; +} + +int flb_output_plugin_property_check(struct flb_output_instance *ins, + struct flb_config *config) +{ + int ret = 0; + struct mk_list *config_map; + struct flb_output_plugin *p = ins->p; + + if (p->config_map) { + /* + * Create a dynamic version of the configmap that will be used by the specific + * instance in question. + */ + config_map = flb_config_map_create(config, p->config_map); + if (!config_map) { + flb_error("[output] error loading config map for '%s' plugin", + p->name); + flb_output_instance_destroy(ins); + return -1; + } + ins->config_map = config_map; + + /* Validate incoming properties against config map */ + ret = flb_config_map_properties_check(ins->p->name, + &ins->properties, ins->config_map); + if (ret == -1) { + if (config->program_name) { + flb_helper("try the command: %s -o %s -h\n", + config->program_name, ins->p->name); + } + return -1; + } + } + + return 0; +} + /* Trigger the output plugins setup callbacks to prepare them. */ int flb_output_init_all(struct flb_config *config) { @@ -994,7 +1062,6 @@ int flb_output_init_all(struct flb_config *config) #endif struct mk_list *tmp; struct mk_list *head; - struct mk_list *config_map; struct flb_output_instance *ins; struct flb_output_plugin *p; uint64_t ts; @@ -1167,39 +1234,7 @@ int flb_output_init_all(struct flb_config *config) * Before to call the initialization callback, make sure that the received * configuration parameters are valid if the plugin is registering a config map. */ - if (p->config_map) { - /* - * Create a dynamic version of the configmap that will be used by the specific - * instance in question. - */ - config_map = flb_config_map_create(config, p->config_map); - if (!config_map) { - flb_error("[output] error loading config map for '%s' plugin", - p->name); - flb_output_instance_destroy(ins); - return -1; - } - ins->config_map = config_map; - - /* Validate incoming properties against config map */ - ret = flb_config_map_properties_check(ins->p->name, - &ins->properties, ins->config_map); - if (ret == -1) { - if (config->program_name) { - flb_helper("try the command: %s -o %s -h\n", - config->program_name, ins->p->name); - } - flb_output_instance_destroy(ins); - return -1; - } - } - - /* Init network defaults */ - flb_net_setup_init(&ins->net_setup); - - /* Get Upstream net_setup configmap */ - ins->net_config_map = flb_upstream_get_config_map(config); - if (!ins->net_config_map) { + if (flb_output_plugin_property_check(ins, config) == -1) { flb_output_instance_destroy(ins); return -1; } @@ -1223,22 +1258,13 @@ int flb_output_init_all(struct flb_config *config) m->value.val.boolean = FLB_FALSE; } #endif - /* - * Validate 'net.*' properties: if the plugin use the Upstream interface, - * it might receive some networking settings. - */ - if (mk_list_size(&ins->net_properties) > 0) { - ret = flb_config_map_properties_check(ins->p->name, - &ins->net_properties, - ins->net_config_map); - if (ret == -1) { - if (config->program_name) { - flb_helper("try the command: %s -o %s -h\n", - config->program_name, ins->p->name); - } - flb_output_instance_destroy(ins); - return -1; - } + + /* Init network defaults */ + flb_net_setup_init(&ins->net_setup); + + if (flb_output_net_property_check(ins, config) == -1) { + flb_output_instance_destroy(ins); + return -1; } /* Initialize plugin through it 'init callback' */ diff --git a/src/flb_parser.c b/src/flb_parser.c index a080b81fe7d..4ccecc91b1d 100644 --- a/src/flb_parser.c +++ b/src/flb_parser.c @@ -873,16 +873,11 @@ static int multiline_parser_conf_file(const char *cfg, struct flb_cf *cf, return -1; } -/* Load parsers from a configuration file */ -int flb_parser_conf_file(const char *file, struct flb_config *config) +int flb_parser_conf_file_stat(const char *file, struct flb_config *config) { int ret; - char tmp[PATH_MAX + 1]; - char *cfg = NULL; struct stat st; - struct flb_cf *cf = NULL; -#ifndef FLB_HAVE_STATIC_CONF ret = stat(file, &st); if (ret == -1 && errno == ENOENT) { /* Try to resolve the real path (if exists) */ @@ -892,9 +887,32 @@ int flb_parser_conf_file(const char *file, struct flb_config *config) } if (config->conf_path) { - snprintf(tmp, PATH_MAX, "%s%s", config->conf_path, file); - cfg = tmp; + /* Handle as special case here. */ + return -2; } + + return -1; + } + + return 0; +} + +/* Load parsers from a configuration file */ +int flb_parser_conf_file(const char *file, struct flb_config *config) +{ + int ret; + char tmp[PATH_MAX + 1]; + char *cfg = NULL; + struct flb_cf *cf = NULL; + +#ifndef FLB_HAVE_STATIC_CONF + ret = flb_parser_conf_file_stat(file, config); + if (ret == -1) { + return -1; + } + else if (ret == -2) { + snprintf(tmp, PATH_MAX, "%s%s", config->conf_path, file); + cfg = tmp; } else { cfg = (char *) file; diff --git a/src/flb_reload.c b/src/flb_reload.c new file mode 100644 index 00000000000..3ed5b3d9f8a --- /dev/null +++ b/src/flb_reload.c @@ -0,0 +1,507 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +static int flb_input_propery_check_all(struct flb_config *config) +{ + int ret; + struct mk_list *tmp; + struct mk_list *head; + struct flb_input_instance *ins; + struct flb_input_plugin *p; + + /* Iterate all active input instance plugins */ + mk_list_foreach_safe(head, tmp, &config->inputs) { + ins = mk_list_entry(head, struct flb_input_instance, _head); + p = ins->p; + + /* Skip pseudo input plugins */ + if (!p) { + continue; + } + + /* Check net property */ + ret = flb_input_net_property_check(ins, config); + if (ret == -1) { + return -1; + } + + /* Check plugin property */ + ret = flb_input_plugin_property_check(ins, config); + if (ret == -1) { + return -1; + } + + /* destroy net config map (will be recreated at flb_start) */ + if (ins->net_config_map) { + flb_config_map_destroy(ins->net_config_map); + ins->net_config_map = NULL; + } + + /* destroy config map (will be recreated at flb_start) */ + if (ins->config_map) { + flb_config_map_destroy(ins->config_map); + ins->config_map = NULL; + } + } + + return 0; +} + +static int flb_output_propery_check_all(struct flb_config *config) +{ + int ret; + struct mk_list *tmp; + struct mk_list *head; + struct flb_output_instance *ins; + + /* Iterate all active input instance plugins */ + mk_list_foreach_safe(head, tmp, &config->outputs) { + ins = mk_list_entry(head, struct flb_output_instance, _head); + + /* Check net property */ + ret = flb_output_net_property_check(ins, config); + if (ret == -1) { + return -1; + } + + /* Check plugin property */ + ret = flb_output_plugin_property_check(ins, config); + if (ret == -1) { + return -1; + } + + /* destroy net config map (will be recreated at flb_start) */ + if (ins->net_config_map) { + flb_config_map_destroy(ins->net_config_map); + ins->net_config_map = NULL; + } + + /* destroy config map (will be recreated at flb_start) */ + if (ins->config_map) { + flb_config_map_destroy(ins->config_map); + ins->config_map = NULL; + } + } + + return 0; +} + +static int flb_filter_propery_check_all(struct flb_config *config) +{ + int ret; + struct mk_list *tmp; + struct mk_list *head; + struct flb_filter_instance *ins; + + /* Iterate all active input instance plugins */ + mk_list_foreach_safe(head, tmp, &config->filters) { + ins = mk_list_entry(head, struct flb_filter_instance, _head); + + if (flb_filter_match_property_existence(ins) == FLB_FALSE) { + flb_error("[filter] NO match rule for %s filter instance, halting to reload.", + ins->name); + return -1; + } + + /* Check plugin property */ + ret = flb_filter_plugin_property_check(ins, config); + if (ret == -1) { + return -1; + } + + /* destroy config map (will be recreated at flb_start) */ + if (ins->config_map) { + flb_config_map_destroy(ins->config_map); + ins->config_map = NULL; + } + } + + return 0; +} + +static int flb_custom_propery_check_all(struct flb_config *config) +{ + int ret; + struct mk_list *tmp; + struct mk_list *head; + struct flb_custom_instance *ins; + + /* Iterate all active input instance plugins */ + mk_list_foreach_safe(head, tmp, &config->customs) { + ins = mk_list_entry(head, struct flb_custom_instance, _head); + + /* Check plugin property */ + ret = flb_custom_plugin_property_check(ins, config); + if (ret == -1) { + return -1; + } + + /* destroy config map (will be recreated at flb_start) */ + if (ins->config_map) { + flb_config_map_destroy(ins->config_map); + ins->config_map = NULL; + } + } + + return 0; +} + +int flb_reload_property_check_all(struct flb_config *config) +{ + int ret = 0; + + /* Check properties of custom plugins */ + ret = flb_custom_propery_check_all(config); + if (ret == -1) { + flb_error("[reload] check properties for custom plugins is failed"); + + return -1; + } + + /* Check properties of input plugins */ + ret = flb_input_propery_check_all(config); + if (ret == -1) { + flb_error("[reload] check properties for input plugins is failed"); + + return -1; + } + + /* Check properties of filter plugins */ + ret = flb_filter_propery_check_all(config); + if (ret == -1) { + flb_error("[reload] check properties for filter plugins is failed"); + + return -1; + } + + /* Check properties of output plugins */ + ret = flb_output_propery_check_all(config); + if (ret == -1) { + flb_error("[reload] check properties for output plugins is failed"); + + return -1; + } + + return 0; +} + +/* + * Hot reload + * ---------- + * Reload a Fluent Bit instance by using a new 'config_format' context. + * + * 1. As a first step, the config format is validated against the 'config maps', + * this will check that all configuration properties are valid. + */ + +static int recreate_cf_section(struct flb_cf_section *s, struct flb_cf *cf) +{ + struct mk_list *head; + struct cfl_list *p_head; + struct cfl_kvpair *kv; + struct flb_cf_group *g; + struct flb_cf_section *new_s; + struct flb_cf_group *new_g; + struct cfl_variant *var = NULL; + + new_s = flb_cf_section_create(cf, s->name, flb_sds_len(s->name)); + if (cfl_list_size(&s->properties->list) > 0) { + cfl_list_foreach(p_head, &s->properties->list) { + var = NULL; + kv = cfl_list_entry(p_head, struct cfl_kvpair, _head); + var = flb_cf_section_property_add(cf, new_s->properties, + kv->key, cfl_sds_len(kv->key), + kv->val->data.as_string, cfl_sds_len(kv->val->data.as_string)); + + if (var == NULL) { + flb_error("[reload] recreating section '%s' property '%s' is failed", s->name, kv->key); + return -1; + } + } + } + + if (mk_list_size(&s->groups) <= 0) { + return 0; + } + + mk_list_foreach(head, &s->groups) { + g = mk_list_entry(head, struct flb_cf_group, _head); + new_g = flb_cf_group_create(cf, new_s, g->name, flb_sds_len(g->name)); + + if (cfl_list_size(&g->properties->list) > 0) { + cfl_list_foreach(p_head, &g->properties->list) { + var = NULL; + kv = cfl_list_entry(p_head, struct cfl_kvpair, _head); + var = flb_cf_section_property_add(cf, new_g->properties, + kv->key, cfl_sds_len(kv->key), + kv->val->data.as_string, cfl_sds_len(kv->val->data.as_string)); + if (var == NULL) { + flb_error("[reload] recreating group '%s' property '%s' is failed", g->name, kv->key); + return -1; + } + } + } + } + + return 0; +} + +int flb_reload_reconstruct_cf(struct flb_cf *src_cf, struct flb_cf *dest_cf) +{ + struct mk_list *head; + struct flb_cf_section *s; + struct flb_kv *kv; + + mk_list_foreach(head, &src_cf->sections) { + s = mk_list_entry(head, struct flb_cf_section, _head); + if (recreate_cf_section(s, dest_cf) != 0) { + return -1; + } + } + + /* Copy and store env. (For yaml cf.) */ + mk_list_foreach(head, &src_cf->env) { + kv = mk_list_entry(head, struct flb_kv, _head); + if (!flb_cf_env_property_add(dest_cf, + kv->key, cfl_sds_len(kv->key), + kv->val, cfl_sds_len(kv->val))) { + return -1; + } + + } + + /* Copy and store metas. (For old fluent-bit cf.) */ + mk_list_foreach(head, &src_cf->metas) { + kv = mk_list_entry(head, struct flb_kv, _head); + if (!flb_kv_item_create_len(&dest_cf->metas, + kv->key, cfl_sds_len(kv->key), + kv->val, cfl_sds_len(kv->val))) { + return -1; + } + + } + + return 0; +} + +#ifdef FLB_HAVE_STREAM_PROCESSOR +static int flb_reload_reconstruct_sp(struct flb_config *src, struct flb_config *dest) +{ + struct mk_list *head; + struct flb_slist_entry *e; + + /* Check for pre-configured Tasks (command line) */ + mk_list_foreach(head, &src->stream_processor_tasks) { + e = mk_list_entry(head, struct flb_slist_entry, _head); + flb_slist_add(&dest->stream_processor_tasks, e->str); + } + + return 0; +} +#endif + +static int flb_reload_reinstantiate_external_plugins(struct flb_config *src, struct flb_config *dest) +{ + int ret; + struct mk_list *head; + struct flb_slist_entry *e; + + /* Check for pre-configured Tasks (command line) */ + mk_list_foreach(head, &src->external_plugins) { + e = mk_list_entry(head, struct flb_slist_entry, _head); + flb_info("[reload] slist externals %s", e->str); + /* Load the new config format context to config context. */ + ret = flb_plugin_load_router(e->str, dest); + if (ret != 0) { + return -1; + } + flb_slist_add(&dest->external_plugins, e->str); + } + + return 0; +} + +int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) +{ + int ret; + flb_sds_t file = NULL; + struct flb_config *old_config; + struct flb_config *new_config; + flb_ctx_t *new_ctx = NULL; + struct flb_cf *new_cf; + struct flb_cf *original_cf; + int verbose; + int enable_reloading; + + if (ctx == NULL) { + flb_error("[reload] given flb context is NULL"); + return -2; + } + + old_config = ctx->config; + if (old_config->enable_hot_reload != FLB_TRUE) { + flb_warn("[reload] hot reloading is not enabled"); + return -3; + } + + /* Normally, we should create a service section before using this cf + * context. However, this context of config format will be used + * for copying contents from other one. So, we just need to create + * a new cf instance here. + */ + new_cf = flb_cf_create(); + if (!new_cf) { + return -1; + } + + flb_info("reloading instance pid=%lu tid=%u", getpid(), pthread_self()); + + if (old_config->conf_path_file) { + file = flb_sds_create(old_config->conf_path_file); + } + if (cf_opts != NULL) { + if (flb_reload_reconstruct_cf(cf_opts, new_cf) != 0) { + if (file != NULL) { + flb_sds_destroy(file); + } + flb_error("[reload] reconstruct cf failed"); + return -1; + } + } + + /* Create another instance */ + new_ctx = flb_create(); + if (new_ctx == NULL) { + if (file != NULL) { + flb_sds_destroy(file); + } + flb_cf_destroy(new_cf); + flb_error("[reload] creating flb context is failed. Reloading is halted"); + + return -1; + } + + new_config = new_ctx->config; + + /* Inherit verbose from the old ctx instance */ + verbose = ctx->config->verbose; + new_config->verbose = verbose; + enable_reloading = ctx->config->enable_hot_reload; + new_config->enable_hot_reload = enable_reloading; + +#ifdef FLB_HAVE_STREAM_PROCESSOR + /* Inherit stream processor definitions from command line */ + flb_reload_reconstruct_sp(old_config, new_config); +#endif + + /* Create another config format context */ + if (file != NULL) { + new_cf = flb_cf_create_from_file(new_cf, file); + + if (!new_cf) { + flb_sds_destroy(file); + + return -1; + } + } + + /* Load external plugins via command line */ + if (mk_list_size(&old_config->external_plugins) > 0) { + ret = flb_reload_reinstantiate_external_plugins(old_config, new_config); + if (ret == -1) { + if (file != NULL) { + flb_sds_destroy(file); + } + flb_cf_destroy(new_cf); + flb_stop(new_ctx); + flb_destroy(new_ctx); + flb_error("[reload] reloaded config is invalid. Reloading is halted"); + + return -1; + } + } + + /* Load the new config format context to config context. */ + ret = flb_config_load_config_format(new_config, new_cf); + if (ret != 0) { + flb_sds_destroy(file); + flb_cf_destroy(new_cf); + flb_stop(new_ctx); + flb_destroy(new_ctx); + + flb_error("[reload] reloaded config format is invalid. Reloading is halted"); + + return -1; + } + + /* Validate plugin properites before fluent-bit stops the old context. */ + ret = flb_reload_property_check_all(new_config); + if (ret != 0) { + flb_sds_destroy(file); + flb_cf_destroy(new_cf); + flb_stop(new_ctx); + flb_destroy(new_ctx); + + flb_error("[reload] reloaded config is invalid. Reloading is halted"); + + return -1; + } + + /* Delete the original context of config format before replacing + * with the new one. */ + original_cf = new_config->cf_main; + flb_cf_destroy(original_cf); + + new_config->cf_main = new_cf; + new_config->cf_opts = cf_opts; + + if (file != NULL) { + new_config->conf_path_file = file; + } + + flb_info("[reload] stop everything of the old context"); + flb_stop(ctx); + flb_destroy(ctx); + + flb_info("[reload] start everything"); + + ret = flb_start(new_ctx); + + return 0; +} diff --git a/src/fluent-bit.c b/src/fluent-bit.c index c56a750ca21..e1faea8e4b2 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -56,6 +56,7 @@ #include #include #include +#include #include #ifdef FLB_HAVE_MTRACE @@ -70,6 +71,7 @@ extern void win32_started(void); flb_ctx_t *ctx; struct flb_config *config; volatile sig_atomic_t exit_signal = 0; +volatile sig_atomic_t flb_bin_restarting = 0; #ifdef FLB_HAVE_LIBBACKTRACE struct flb_stacktrace flb_st; @@ -94,6 +96,8 @@ struct flb_stacktrace flb_st; static char *prog_name; +static void flb_signal_init(); + static void flb_help(int rc, struct flb_config *config) { struct mk_list *head; @@ -143,6 +147,7 @@ static void flb_help(int rc, struct flb_config *config) config->coro_stack_size); print_opt("-q, --quiet", "quiet mode"); print_opt("-S, --sosreport", "support report for Enterprise customers"); + print_opt("-Y, --enable-hot-reload", "enable for hot reloading"); print_opt("-V, --version", "show version number"); print_opt("-h, --help", "print this help"); @@ -495,6 +500,8 @@ static void flb_signal_exit(int signal) char s[] = "[engine] caught signal ("; time_t now; struct tm *cur; + flb_ctx_t *ctx = flb_context_get(); + struct flb_cf *cf_opts = flb_cf_context_get(); now = time(NULL); cur = localtime(&now); @@ -529,6 +536,9 @@ static void flb_signal_exit(int signal) case SIGQUIT: case SIGHUP: #endif + if (cf_opts != NULL) { + flb_cf_destroy(cf_opts); + } flb_stop(ctx); flb_destroy(ctx); _exit(EXIT_SUCCESS); @@ -544,6 +554,8 @@ static void flb_signal_handler(int signal) char s[] = "[engine] caught signal ("; time_t now; struct tm *cur; + flb_ctx_t *ctx = flb_context_get(); + struct flb_cf *cf_opts = flb_cf_context_get(); now = time(NULL); cur = localtime(&now); @@ -570,6 +582,8 @@ static void flb_signal_handler(int signal) flb_print_signal(SIGFPE); }; + flb_signal_init(); + switch(signal) { case SIGSEGV: case SIGFPE: @@ -581,6 +595,13 @@ static void flb_signal_handler(int signal) #ifndef FLB_SYSTEM_WINDOWS case SIGCONT: flb_dump(ctx->config); + break; + case SIGHUP: +#ifndef FLB_HAVE_STATIC_CONF + /* reload by using same config files/path */ + flb_reload(ctx, cf_opts); + break; +#endif #endif } } @@ -590,7 +611,7 @@ static void flb_signal_init() signal(SIGINT, &flb_signal_handler_break_loop); #ifndef FLB_SYSTEM_WINDOWS signal(SIGQUIT, &flb_signal_handler_break_loop); - signal(SIGHUP, &flb_signal_handler_break_loop); + signal(SIGHUP, &flb_signal_handler); signal(SIGCONT, &flb_signal_handler); #endif signal(SIGTERM, &flb_signal_handler_break_loop); @@ -650,169 +671,17 @@ static int flb_service_conf_path_set(struct flb_config *config, char *file) config->conf_path = flb_strdup(path); free(path); - return 0; -} - -static int service_configure_plugin(struct flb_config *config, - struct flb_cf *cf, enum section_type type) -{ - int ret; - char *tmp; - char *name; - char *s_type; - struct mk_list *list; - struct mk_list *head; - struct cfl_list *h_prop; - struct cfl_kvpair *kv; - struct cfl_variant *val; - struct flb_cf_section *s; - struct flb_cf_group *processors = NULL; - int i; - void *ins; - - if (type == FLB_CF_CUSTOM) { - s_type = "custom"; - list = &cf->customs; - } - else if (type == FLB_CF_INPUT) { - s_type = "input"; - list = &cf->inputs; - } - else if (type == FLB_CF_FILTER) { - s_type = "filter"; - list = &cf->filters; - } - else if (type == FLB_CF_OUTPUT) { - s_type = "output"; - list = &cf->outputs; - } - else { - return -1; - } - - mk_list_foreach(head, list) { - s = mk_list_entry(head, struct flb_cf_section, _head_section); - name = flb_cf_section_property_get_string(cf, s, "name"); - if (!name) { - flb_error("[config] section '%s' is missing the 'name' property", - s_type); - return -1; - } + /* Store the relative file path */ + config->conf_path_file = flb_sds_create(file); - /* translate the variable */ - tmp = flb_env_var_translate(config->env, name); - - /* create an instance of the plugin */ - ins = NULL; - if (type == FLB_CF_CUSTOM) { - ins = flb_custom_new(config, tmp, NULL); - } - else if (type == FLB_CF_INPUT) { - ins = flb_input_new(config, tmp, NULL, FLB_TRUE); - } - else if (type == FLB_CF_FILTER) { - ins = flb_filter_new(config, tmp, NULL); - } - else if (type == FLB_CF_OUTPUT) { - ins = flb_output_new(config, tmp, NULL, FLB_TRUE); - } - flb_sds_destroy(tmp); - - /* validate the instance creation */ - if (!ins) { - flb_error("[config] section '%s' tried to instance a plugin name " - "that don't exists", name); - flb_sds_destroy(name); - return -1; - } - flb_sds_destroy(name); - - /* - * iterate section properties and populate instance by using specific - * api function. - */ - cfl_list_foreach(h_prop, &s->properties->list) { - kv = cfl_list_entry(h_prop, struct cfl_kvpair, _head); - if (strcasecmp(kv->key, "name") == 0) { - continue; - } - - if (type == FLB_CF_CUSTOM) { - if (kv->val->type == CFL_VARIANT_STRING) { - ret = flb_custom_set_property(ins, kv->key, kv->val->data.as_string); - } - else if (kv->val->type == CFL_VARIANT_ARRAY) { - for (i = 0; i < kv->val->data.as_array->entry_count; i++) { - val = kv->val->data.as_array->entries[i]; - ret = flb_custom_set_property(ins, kv->key, val->data.as_string); - } - } - } - else if (type == FLB_CF_INPUT) { - if (kv->val->type == CFL_VARIANT_STRING) { - ret = flb_input_set_property(ins, kv->key, kv->val->data.as_string); - } - else if (kv->val->type == CFL_VARIANT_ARRAY) { - for (i = 0; i < kv->val->data.as_array->entry_count; i++) { - val = kv->val->data.as_array->entries[i]; - ret = flb_input_set_property(ins, kv->key, val->data.as_string); - } - } - } - else if (type == FLB_CF_FILTER) { - if (kv->val->type == CFL_VARIANT_STRING) { - ret = flb_filter_set_property(ins, kv->key, kv->val->data.as_string); - } else if (kv->val->type == CFL_VARIANT_ARRAY) { - for (i = 0; i < kv->val->data.as_array->entry_count; i++) { - val = kv->val->data.as_array->entries[i]; - ret = flb_filter_set_property(ins, kv->key, val->data.as_string); - } - } - } - else if (type == FLB_CF_OUTPUT) { - if (kv->val->type == CFL_VARIANT_STRING) { - ret = flb_output_set_property(ins, kv->key, kv->val->data.as_string); - } else if (kv->val->type == CFL_VARIANT_ARRAY) { - for (i = 0; i < kv->val->data.as_array->entry_count; i++) { - val = kv->val->data.as_array->entries[i]; - ret = flb_output_set_property(ins, kv->key, val->data.as_string); - } - } - } - - if (ret == -1) { - flb_error("[config] could not configure property '%s' on " - "%s plugin with section name '%s'", - kv->key, s_type, name); - } - } - - /* Processors */ - processors = flb_cf_group_get(cf, s, "processors"); - if (processors) { - if (type == FLB_CF_INPUT) { - flb_processors_load_from_config_format_group(((struct flb_input_instance *) ins)->processor, processors); - } - else if (type == FLB_CF_OUTPUT) { - flb_processors_load_from_config_format_group(((struct flb_output_instance *) ins)->processor, processors); - } - else { - flb_error("[config] section '%s' does not support processors", s_type); - } - } - } return 0; } + static struct flb_cf *service_configure(struct flb_cf *cf, struct flb_config *config, char *file) { int ret = -1; - struct flb_cf_section *s; - struct flb_kv *kv; - struct mk_list *head; - struct cfl_kvpair *ckv; - struct cfl_list *chead; #ifdef FLB_HAVE_STATIC_CONF cf = flb_config_static_open(file); @@ -826,88 +695,19 @@ static struct flb_cf *service_configure(struct flb_cf *cf, return NULL; } - config->cf_main = cf; /* Set configuration root path */ if (file) { flb_service_conf_path_set(config, file); } - /* Process config environment vars */ - mk_list_foreach(head, &cf->env) { - kv = mk_list_entry(head, struct flb_kv, _head); - ret = flb_env_set(config->env, kv->key, kv->val); - if (ret == -1) { - fprintf(stderr, "could not set config environment variable '%s'\n", - kv->key); - exit(EXIT_FAILURE); - } - } - - /* Process all meta commands */ - mk_list_foreach(head, &cf->metas) { - kv = mk_list_entry(head, struct flb_kv, _head); - flb_meta_run(config, kv->key, kv->val); - } - - /* Validate sections */ - mk_list_foreach(head, &cf->sections) { - s = mk_list_entry(head, struct flb_cf_section, _head); - - if (strcasecmp(s->name, "env") == 0 || - strcasecmp(s->name, "service") == 0 || - strcasecmp(s->name, "custom") == 0 || - strcasecmp(s->name, "input") == 0 || - strcasecmp(s->name, "filter") == 0 || - strcasecmp(s->name, "output") == 0) { - - /* continue on valid sections */ - continue; - } - - /* Extra sanity checks */ - if (strcasecmp(s->name, "parser") == 0 || - strcasecmp(s->name, "multiline_parser") == 0) { - fprintf(stderr, - "Sections 'multiline_parser' and 'parser' are not valid in " - "the main configuration file. It belongs to \n" - "the 'parsers_file' configuration files.\n"); - exit(EXIT_FAILURE); - } - } - - /* Read main 'service' section */ - s = cf->service; - if (s) { - /* Iterate properties */ - cfl_list_foreach(chead, &s->properties->list) { - ckv = cfl_list_entry(chead, struct cfl_kvpair, _head); - flb_config_set_property(config, ckv->key, ckv->val->data.as_string); - } - } - - ret = service_configure_plugin(config, cf, FLB_CF_CUSTOM); - if (ret == -1) { - goto error; - } - - ret = service_configure_plugin(config, cf, FLB_CF_INPUT); - if (ret == -1) { - goto error; - } - ret = service_configure_plugin(config, cf, FLB_CF_FILTER); - if (ret == -1) { - goto error; - } - ret = service_configure_plugin(config, cf, FLB_CF_OUTPUT); - if (ret == -1) { - goto error; + ret = flb_config_load_config_format(config, cf); + if (ret != 0) { + return NULL; } + config->cf_main = cf; return cf; - -error: - return NULL; } int flb_main(int argc, char **argv) @@ -927,9 +727,21 @@ int flb_main(int argc, char **argv) struct flb_cf *tmp; struct flb_cf_section *service; struct flb_cf_section *s; + struct flb_cf_section *section; + struct flb_cf *cf_opts; prog_name = argv[0]; + cf_opts = flb_cf_create(); + if (!cf_opts) { + exit(EXIT_FAILURE); + } + section = flb_cf_section_create(cf_opts, "service", 0); + if (!section) { + flb_cf_destroy(cf_opts); + exit(EXIT_FAILURE); + } + #ifdef FLB_HAVE_LIBBACKTRACE flb_stacktrace_init(argv[0], &flb_st); #endif @@ -973,6 +785,7 @@ int flb_main(int argc, char **argv) { "http_listen", required_argument, NULL, 'L' }, { "http_port", required_argument, NULL, 'P' }, #endif + { "enable-hot-reload", no_argument, NULL, 'Y' }, #ifdef FLB_HAVE_CHUNK_TRACE { "enable-chunk-trace", no_argument, NULL, 'Z' }, #endif @@ -992,19 +805,22 @@ int flb_main(int argc, char **argv) } config = ctx->config; cf = config->cf_main; - service = cf->service; + service = cf_opts->service; + + /* Add reference for cf_opts */ + config->cf_opts = cf_opts; #ifndef FLB_HAVE_STATIC_CONF /* Parse the command line options */ while ((opt = getopt_long(argc, argv, "b:c:dDf:C:i:m:o:R:F:p:e:" - "t:T:l:vw:qVhJL:HP:s:SZ", + "t:T:l:vw:qVhJL:HP:s:SYZ", long_opts, NULL)) != -1) { switch (opt) { case 'b': - flb_cf_section_property_add(cf, service->properties, + flb_cf_section_property_add(cf_opts, service->properties, "storage.path", 0, optarg, 0); break; case 'c': @@ -1012,7 +828,7 @@ int flb_main(int argc, char **argv) break; #ifdef FLB_HAVE_FORK case 'd': - flb_cf_section_property_add(cf, service->properties, + flb_cf_section_property_add(cf_opts, service->properties, "daemon", 0, "on", 0); config->daemon = FLB_TRUE; break; @@ -1025,68 +841,73 @@ int flb_main(int argc, char **argv) if (ret == -1) { exit(EXIT_FAILURE); } + /* Store the relative file path for external plugin */ + flb_slist_add(&config->external_plugins, optarg); break; case 'f': - flb_cf_section_property_add(cf, service->properties, + flb_cf_section_property_add(cf_opts, service->properties, "flush", 0, optarg, 0); break; case 'C': - s = flb_cf_section_create(cf, "custom", 0); + s = flb_cf_section_create(cf_opts, "custom", 0); if (!s) { flb_utils_error(FLB_ERR_CUSTOM_INVALID); } - flb_cf_section_property_add(cf, s->properties, "name", 0, optarg, 0); + flb_cf_section_property_add(cf_opts, s->properties, "name", 0, optarg, 0); last_plugin = PLUGIN_CUSTOM; break; case 'i': - s = flb_cf_section_create(cf, "input", 0); + s = flb_cf_section_create(cf_opts, "input", 0); if (!s) { flb_utils_error(FLB_ERR_INPUT_INVALID); } - flb_cf_section_property_add(cf, s->properties, "name", 0, optarg, 0); + flb_cf_section_property_add(cf_opts, s->properties, "name", 0, optarg, 0); last_plugin = PLUGIN_INPUT; break; case 'm': if (last_plugin == PLUGIN_FILTER || last_plugin == PLUGIN_OUTPUT) { - flb_cf_section_property_add(cf, s->properties, "match", 0, optarg, 0); + flb_cf_section_property_add(cf_opts, s->properties, "match", 0, optarg, 0); } break; case 'o': - s = flb_cf_section_create(cf, "output", 0); + s = flb_cf_section_create(cf_opts, "output", 0); if (!s) { flb_utils_error(FLB_ERR_OUTPUT_INVALID); } - flb_cf_section_property_add(cf, s->properties, "name", 0, optarg, 0); + flb_cf_section_property_add(cf_opts, s->properties, "name", 0, optarg, 0); last_plugin = PLUGIN_OUTPUT; break; #ifdef FLB_HAVE_PARSER case 'R': - ret = flb_parser_conf_file(optarg, config); - if (ret != 0) { + ret = flb_parser_conf_file_stat(optarg, config); + if (ret == -1) { + flb_cf_destroy(cf_opts); + flb_destroy(ctx); exit(EXIT_FAILURE); } + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_PARSERS_FILE, 0, optarg, 0); break; #endif case 'F': - s = flb_cf_section_create(cf, "filter", 0); + s = flb_cf_section_create(cf_opts, "filter", 0); if (!s) { flb_utils_error(FLB_ERR_FILTER_INVALID); } - flb_cf_section_property_add(cf, s->properties, "name", 0, optarg, 0); + flb_cf_section_property_add(cf_opts, s->properties, "name", 0, optarg, 0); last_plugin = PLUGIN_FILTER; break; case 'l': - flb_cf_section_property_add(cf, service->properties, + flb_cf_section_property_add(cf_opts, service->properties, "log_file", 0, optarg, 0); break; case 'p': if (s) { - set_property(cf, s, optarg); + set_property(cf_opts, s, optarg); } break; case 't': if (s) { - flb_cf_section_property_add(cf, s->properties, "tag", 0, optarg, 0); + flb_cf_section_property_add(cf_opts, s->properties, "tag", 0, optarg, 0); } break; #ifdef FLB_HAVE_STREAM_PROCESSOR @@ -1101,7 +922,7 @@ int flb_main(int argc, char **argv) else { flb_help_plugin(EXIT_SUCCESS, FLB_HELP_TEXT, config, - last_plugin, cf, s); + last_plugin, cf_opts, s); } break; case 'J': @@ -1117,24 +938,18 @@ int flb_main(int argc, char **argv) } else { flb_help_plugin(EXIT_SUCCESS, FLB_HELP_JSON, config, - last_plugin, cf, s); + last_plugin, cf_opts, s); } break; #ifdef FLB_HAVE_HTTP_SERVER case 'H': - flb_cf_section_property_add(cf, service->properties, "http_server", 0, "on", 0); + flb_cf_section_property_add(cf_opts, service->properties, "http_server", 0, "on", 0); break; case 'L': - if (config->http_listen) { - flb_free(config->http_listen); - } - config->http_listen = flb_strdup(optarg); + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_HTTP_LISTEN, 0, optarg, 0); break; case 'P': - if (config->http_port) { - flb_free(config->http_port); - } - config->http_port = flb_strdup(optarg); + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_HTTP_PORT, 0, optarg, 0); break; #endif case 'V': @@ -1150,14 +965,17 @@ int flb_main(int argc, char **argv) config->verbose = FLB_LOG_OFF; break; case 's': - config->coro_stack_size = (unsigned int) atoi(optarg); + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_CORO_STACK_SIZE, 0, optarg, 0); break; case 'S': config->support_mode = FLB_TRUE; break; + case 'Y': + config->enable_hot_reload = FLB_TRUE; + break; #ifdef FLB_HAVE_CHUNK_TRACE case 'Z': - config->enable_chunk_trace = FLB_TRUE; + flb_cf_section_property_add(cf_opts, service->properties, FLB_CONF_STR_ENABLE_CHUNK_TRACE, 0, "on", 0); break; #endif /* FLB_HAVE_CHUNK_TRACE */ default: @@ -1179,6 +997,7 @@ int flb_main(int argc, char **argv) if (config->workdir) { ret = chdir(config->workdir); if (ret == -1) { + flb_cf_destroy(cf_opts); flb_errno(); return -1; } @@ -1189,19 +1008,29 @@ int flb_main(int argc, char **argv) if (cfg_file) { if (access(cfg_file, R_OK) != 0) { flb_free(cfg_file); + flb_cf_destroy(cf_opts); flb_utils_error(FLB_ERR_CFG_FILE); } } + if (flb_reload_reconstruct_cf(cf_opts, cf) != 0) { + flb_free(cfg_file); + flb_cf_destroy(cf_opts); + fprintf(stderr, "reconstruct format context is failed\n"); + exit(EXIT_FAILURE); + } + /* Load the service configuration file */ tmp = service_configure(cf, config, cfg_file); flb_free(cfg_file); if (!tmp) { + flb_cf_destroy(cf_opts); flb_utils_error(FLB_ERR_CFG_FILE_STOP); } #else tmp = service_configure(cf, config, "fluent-bit.conf"); if (!tmp) { + flb_cf_destroy(cf_opts); flb_utils_error(FLB_ERR_CFG_FILE_STOP); } @@ -1211,14 +1040,15 @@ int flb_main(int argc, char **argv) cf = tmp; #endif - /* Check co-routine stack size */ if (config->coro_stack_size < getpagesize()) { + flb_cf_destroy(cf_opts); flb_utils_error(FLB_ERR_CORO_STACK_SIZE); } /* Validate flush time (seconds) */ if (config->flush <= (double) 0.0) { + flb_cf_destroy(cf_opts); flb_utils_error(FLB_ERR_CFG_FLUSH); } @@ -1240,17 +1070,33 @@ int flb_main(int argc, char **argv) if (config->dry_run == FLB_TRUE) { fprintf(stderr, "configuration test is successful\n"); + flb_cf_destroy(cf_opts); + flb_destroy(ctx); exit(EXIT_SUCCESS); } + /* start Fluent Bit library */ ret = flb_start(ctx); if (ret != 0) { + flb_cf_destroy(cf_opts); flb_destroy(ctx); return ret; } + /* Store the current config format context from command line */ + flb_cf_context_set(cf_opts); + + /* + * Always re-set the original context that was started, note that during a flb_start() a 'reload' could happen so the context + * will be different. Use flb_context_get() to get the current context. + */ + ctx = flb_context_get(); + while (ctx->status == FLB_LIB_OK && exit_signal == 0) { sleep(1); + + /* set the context again before checking the status again */ + ctx = flb_context_get(); } if (exit_signal) { @@ -1258,6 +1104,11 @@ int flb_main(int argc, char **argv) } ret = config->exit_status_code; + cf_opts = flb_cf_context_get(); + + if (cf_opts != NULL) { + flb_cf_destroy(cf_opts); + } flb_stop(ctx); flb_destroy(ctx); diff --git a/src/http_server/api/v2/CMakeLists.txt b/src/http_server/api/v2/CMakeLists.txt index 39ad9a3ddb0..a9d590fbd7f 100644 --- a/src/http_server/api/v2/CMakeLists.txt +++ b/src/http_server/api/v2/CMakeLists.txt @@ -1,6 +1,7 @@ # api/v2 set(src metrics.c + reload.c register.c ) diff --git a/src/http_server/api/v2/register.c b/src/http_server/api/v2/register.c index aaee9ac9dec..7a0956fbf57 100644 --- a/src/http_server/api/v2/register.c +++ b/src/http_server/api/v2/register.c @@ -21,9 +21,11 @@ #include #include "metrics.h" +#include "reload.h" int api_v2_registration(struct flb_hs *hs) { + api_v2_reload(hs); api_v2_metrics(hs); return 0; } diff --git a/src/http_server/api/v2/reload.c b/src/http_server/api/v2/reload.c new file mode 100644 index 00000000000..83626f11ae2 --- /dev/null +++ b/src/http_server/api/v2/reload.c @@ -0,0 +1,118 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "reload.h" + +#include + +#include + +static void cb_reload(mk_request_t *request, void *data) +{ + int ret; + flb_sds_t out_buf; + size_t out_size; + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + struct flb_hs *hs = data; + struct flb_config *config = hs->config; + + if (request->method != MK_METHOD_POST && + request->method != MK_METHOD_PUT) { + mk_http_status(request, 400); + mk_http_done(request); + return; + } + + /* initialize buffers */ + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + msgpack_pack_map(&mp_pck, 2); + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "reload", 6); + +#ifdef FLB_SYSTEM_WINDOWS + ret = -1; + + msgpack_pack_str(&mp_pck, 11); + msgpack_pack_str_body(&mp_pck, "unsupported", 11); + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "status", 6); + msgpack_pack_int64(&mp_pck, ret); +#else + if (config->enable_hot_reload != FLB_TRUE) { + msgpack_pack_str(&mp_pck, 11); + msgpack_pack_str_body(&mp_pck, "not enabled", 11); + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "status", 6); + msgpack_pack_int64(&mp_pck, -1); + } + else { + ret = kill(getpid(), SIGHUP); + if (ret != 0) { + mk_http_status(request, 500); + mk_http_done(request); + return; + } + + msgpack_pack_str(&mp_pck, 4); + msgpack_pack_str_body(&mp_pck, "done", 4); + msgpack_pack_str(&mp_pck, 6); + msgpack_pack_str_body(&mp_pck, "status", 6); + msgpack_pack_int64(&mp_pck, ret); + } + +#endif + + /* Export to JSON */ + out_buf = flb_msgpack_raw_to_json_sds(mp_sbuf.data, mp_sbuf.size); + msgpack_sbuffer_destroy(&mp_sbuf); + if (!out_buf) { + mk_http_status(request, 400); + mk_http_done(request); + return; + } + out_size = flb_sds_len(out_buf); + + mk_http_status(request, 200); + flb_hs_add_content_type_to_req(request, FLB_HS_CONTENT_TYPE_JSON); + mk_http_send(request, out_buf, out_size, NULL); + mk_http_done(request); + + flb_sds_destroy(out_buf); +} + +/* Perform registration */ +int api_v2_reload(struct flb_hs *hs) +{ + mk_vhost_handler(hs->ctx, hs->vid, "/api/v2/reload", cb_reload, hs); + + return 0; +} diff --git a/src/http_server/api/v2/reload.h b/src/http_server/api/v2/reload.h new file mode 100644 index 00000000000..e64e867d690 --- /dev/null +++ b/src/http_server/api/v2/reload.h @@ -0,0 +1,28 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_HS_API_V2_RELOAD_H +#define FLB_HS_API_V2_RELOAD_H + +#include +#include + +int api_v2_reload(struct flb_hs *hs); + +#endif diff --git a/src/wasm/flb_wasm.c b/src/wasm/flb_wasm.c index cd49b3a1f23..bfebb160733 100644 --- a/src/wasm/flb_wasm.c +++ b/src/wasm/flb_wasm.c @@ -104,6 +104,8 @@ struct flb_wasm *flb_wasm_instantiate(struct flb_config *config, const char *was flb_errno(); return NULL; } + fw->tag_buffer = 0; + fw->record_buffer = 0; #if WASM_ENABLE_LIBC_WASI != 0 wasi_dir_list = flb_malloc(sizeof(char *) * accessible_dir_list_size); @@ -264,10 +266,10 @@ int flb_wasm_call_wasi_main(struct flb_wasm *fw) void flb_wasm_buffer_free(struct flb_wasm *fw) { - if (fw->tag_buffer) { + if (fw->tag_buffer != 0) { wasm_runtime_module_free(fw->module_inst, fw->tag_buffer); } - if (fw->record_buffer) { + if (fw->record_buffer != 0) { wasm_runtime_module_free(fw->module_inst, fw->record_buffer); } } diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index b055fab4ab5..b92b999cec3 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -60,6 +60,7 @@ if (NOT WIN32) ${UNIT_TESTS_FILES} gelf.c fstore.c + reload.c ) endif() diff --git a/tests/internal/data/reload/fluent-bit.conf b/tests/internal/data/reload/fluent-bit.conf new file mode 100644 index 00000000000..4db44f32b64 --- /dev/null +++ b/tests/internal/data/reload/fluent-bit.conf @@ -0,0 +1,15 @@ +[SERVICE] + Flush 1 + Daemon Off + Log_Level error + HTTP_Server On + HTTP_Listen 0.0.0.0 + HTTP_Port 2020 + +[INPUT] + Name dummy + Tag dummy.locals + +[OUTPUT] + Name stdout + Match * diff --git a/tests/internal/data/reload/yaml/processor.yaml b/tests/internal/data/reload/yaml/processor.yaml new file mode 100644 index 00000000000..3021ee7e36b --- /dev/null +++ b/tests/internal/data/reload/yaml/processor.yaml @@ -0,0 +1,41 @@ +service: + log_level: info + http_server: on + http_listen: 0.0.0.0 + http_port: 2021 + + +pipeline: + inputs: + - name: random + tag: test-tag + interval_sec: 1 + + processors: + logs: + - name: modify + add: hostname monox + + - name: lua + call: append_tag + code: | + function append_tag(tag, timestamp, record) + new_record = record + new_record["tag"] = tag + return 1, timestamp, new_record + end + + outputs: + - name: stdout + match: '*' + + processors: + logs: + - name: lua + call: add_field + code: | + function add_field(tag, timestamp, record) + new_record = record + new_record["output"] = "new data" + return 1, timestamp, new_record + end diff --git a/tests/internal/reload.c b/tests/internal/reload.c new file mode 100644 index 00000000000..e6a695d5102 --- /dev/null +++ b/tests/internal/reload.c @@ -0,0 +1,249 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "flb_tests_internal.h" + +#define FLB_YAML FLB_TESTS_DATA_PATH "/data/reload/yaml/processor.yaml" +#define FLB_CLASSIC FLB_TESTS_DATA_PATH "/data/reload/fluent-bit.conf" + +void test_reconstruct_cf() +{ + struct flb_cf *cf; + struct flb_cf_section *s_tmp; + struct flb_cf_section *service; + struct flb_cf_group *g_tmp; + struct cfl_variant *ret; + struct flb_kv *meta; + struct flb_cf *new_cf; + int status; + + + /* create context */ + cf = flb_cf_create(); + TEST_CHECK(cf != NULL); + + /* create service section */ + service = flb_cf_section_create(cf, "SERVICE", 7); + TEST_CHECK(service != NULL); + + /* add a property */ + ret = flb_cf_section_property_add(cf, service->properties, "key", 3, "val", 3); + TEST_CHECK(ret != NULL); + + /* add a property with empty spaces on left/right */ + ret = flb_cf_section_property_add(cf, service->properties, " key ", 5, " val ", 7); + TEST_CHECK(ret != NULL); + + /* add an invalid property */ + ret = flb_cf_section_property_add(cf, service->properties, " ", 3, "", 0); + TEST_CHECK(ret == NULL); + + /* try to add another 'SERVICE' section, it should return the same one */ + s_tmp = flb_cf_section_create(cf, "SERVICE", 7); + TEST_CHECK(s_tmp == service); + + /* add a valid section */ + s_tmp = flb_cf_section_create(cf, "INPUT", 5); + TEST_CHECK(s_tmp != NULL); + + TEST_CHECK(mk_list_size(&cf->inputs) == 1); + + /* add property to the section recently created */ + ret = flb_cf_section_property_add(cf, s_tmp->properties, "key", 3, "val", 3); + TEST_CHECK(ret != NULL); + + /* groups: add groups to the last section created */ + g_tmp = flb_cf_group_create(cf, s_tmp, "FLUENT GROUP", 12); + TEST_CHECK(g_tmp != NULL); + + /* add properties to the group */ + ret = flb_cf_section_property_add(cf, g_tmp->properties, "key", 3, "val", 3); + TEST_CHECK(ret != NULL); + + /* groups: invalid group */ + g_tmp = flb_cf_group_create(cf, s_tmp, "", 0); + TEST_CHECK(g_tmp == NULL); + + /* Meta commands */ + meta = flb_cf_meta_property_add(cf, "@SET a=1 ", 20); + + TEST_CHECK(meta != NULL); + TEST_CHECK(flb_sds_len(meta->key) == 3 && strcmp(meta->key, "SET") == 0); + TEST_CHECK(flb_sds_len(meta->val) == 3 && strcmp(meta->val, "a=1") == 0); + + /* create new context */ + new_cf = flb_cf_create(); + TEST_CHECK(cf != NULL); + + status = flb_reload_reconstruct_cf(cf, new_cf); + TEST_CHECK(status == 0); + TEST_CHECK(new_cf != NULL); + + TEST_CHECK(mk_list_size(&new_cf->inputs) == 1); + TEST_CHECK(mk_list_size(&new_cf->sections) == 2); + TEST_CHECK(mk_list_size(&new_cf->metas) == 1); + + printf("\n"); + flb_cf_dump(new_cf); + + /* destroy context */ + flb_cf_destroy(cf); + flb_cf_destroy(new_cf); +} + +/* data/reload/fluent-bit.conf */ +void test_reload() +{ + struct flb_cf *cf = NULL; + struct flb_cf *cf_opts; + struct flb_cf_section *section; + struct cfl_variant *ret; + flb_ctx_t *ctx; + int status; + + /* create context */ + cf_opts = flb_cf_create(); + TEST_CHECK(cf_opts != NULL); + + /* add a valid section (input) */ + section = flb_cf_section_create(cf_opts, "INPUT", 5); + TEST_CHECK(section != NULL); + + /* add property to the section recently created */ + ret = flb_cf_section_property_add(cf_opts, section->properties, "name", 0, "dummy", 0); + TEST_CHECK(ret != NULL); + + TEST_CHECK(mk_list_size(&cf_opts->inputs) == 1); + + ctx = flb_create(); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("flb_create failed"); + exit(EXIT_FAILURE); + } + + cf = ctx->config->cf_main; + + status = flb_reload_reconstruct_cf(cf_opts, cf); + TEST_CHECK(status == 0); + + /* Mimic operation like as service_configure() */ + cf = flb_cf_create_from_file(cf, FLB_CLASSIC); + TEST_CHECK(cf != NULL); + + ctx->config->conf_path_file = flb_sds_create(FLB_CLASSIC); + ctx->config->enable_hot_reload = FLB_TRUE; + + status = flb_config_load_config_format(ctx->config, cf); + TEST_CHECK(status == 0); + + /* Start the engine */ + status = flb_start(ctx); + TEST_CHECK(status == 0); + TEST_CHECK(mk_list_size(&ctx->config->inputs) == 2); + + sleep(2); + + status = flb_reload(ctx, cf_opts); + TEST_CHECK(status == 0); + + sleep(2); + + /* flb context should be replaced with flb_reload() */ + ctx = flb_context_get(); + + TEST_CHECK(mk_list_size(&ctx->config->cf_opts->inputs) == 1); + TEST_CHECK(mk_list_size(&ctx->config->inputs) == 2); + + flb_cf_destroy(cf_opts); + + flb_stop(ctx); + flb_destroy(ctx); +} + +/* data/reload/yaml/processor.yaml */ +void test_reload_yaml() +{ + struct flb_cf *cf = NULL; + struct flb_cf *cf_opts; + struct flb_cf_section *section; + struct cfl_variant *ret; + flb_ctx_t *ctx; + int status; + + /* create context */ + cf_opts = flb_cf_create(); + TEST_CHECK(cf_opts != NULL); + + /* add a valid section (input) */ + section = flb_cf_section_create(cf_opts, "INPUT", 5); + TEST_CHECK(section != NULL); + + /* add property to the section recently created */ + ret = flb_cf_section_property_add(cf_opts, section->properties, "name", 0, "dummy", 0); + TEST_CHECK(ret != NULL); + + TEST_CHECK(mk_list_size(&cf_opts->inputs) == 1); + + ctx = flb_create(); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("flb_create failed"); + exit(EXIT_FAILURE); + } + + cf = ctx->config->cf_main; + + status = flb_reload_reconstruct_cf(cf_opts, cf); + TEST_CHECK(status == 0); + + /* Mimic operation like as service_configure() */ + cf = flb_cf_create_from_file(cf, FLB_YAML); + TEST_CHECK(cf != NULL); + + ctx->config->conf_path_file = flb_sds_create(FLB_YAML); + ctx->config->enable_hot_reload = FLB_TRUE; + + status = flb_config_load_config_format(ctx->config, cf); + TEST_CHECK(status == 0); + + /* Start the engine */ + status = flb_start(ctx); + TEST_CHECK(status == 0); + TEST_CHECK(mk_list_size(&ctx->config->inputs) == 2); + + sleep(2); + + status = flb_reload(ctx, cf_opts); + TEST_CHECK(status == 0); + + sleep(2); + + /* flb context should be replaced with flb_reload() */ + ctx = flb_context_get(); + + TEST_CHECK(mk_list_size(&ctx->config->cf_opts->inputs) == 1); + TEST_CHECK(mk_list_size(&ctx->config->inputs) == 2); + TEST_CHECK(mk_list_size(&ctx->config->filters) == 0); + TEST_CHECK(mk_list_size(&ctx->config->outputs) == 1); + + flb_cf_destroy(cf_opts); + + flb_stop(ctx); + flb_destroy(ctx); +} + +TEST_LIST = { + { "reconstruct_cf" , test_reconstruct_cf}, + { "reload" , test_reload}, + { "reload_yaml" , test_reload_yaml}, + { 0 } +};