Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a TQL2-only mode #4840

Merged
merged 9 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tenzir-plugins
3 changes: 1 addition & 2 deletions libtenzir/builtins/commands/exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ auto exec_command(const invocation& inv, caf::actor_system& sys) -> bool {
cfg.implicit_events_source
= caf::get_or(inv.options, "tenzir.exec.implicit-events-source",
cfg.implicit_events_source);
cfg.tql2 = caf::get_or(inv.options, "tenzir.exec.tql2", cfg.tql2);
cfg.tql2 = caf::get_or(inv.options, "tenzir.tql2", cfg.tql2);
cfg.strict = caf::get_or(inv.options, "tenzir.exec.strict", cfg.strict);
auto filename = std::string{};
auto content = std::string{};
Expand Down Expand Up @@ -157,7 +157,6 @@ class plugin final : public virtual command_plugin {
.add<std::string>("implicit-events-source",
"implicit source for pipelines starting with events "
"(default: 'from stdin read json'")
.add<bool>("tql2", "use TQL version 2 (experimental)")
.add<bool>("strict",
"return a non-zero exit code if any warnings occured"));
auto factory = command::factory{
Expand Down
6 changes: 4 additions & 2 deletions libtenzir/builtins/operators/version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class version_operator final : public crtp_operator<version_operator> {
public:
version_operator() = default;

auto operator()(operator_control_plane&) const -> generator<table_slice> {
auto operator()(operator_control_plane& ctrl) const
-> generator<table_slice> {
auto builder = series_builder{type{
"tenzir.version",
record_type{
Expand Down Expand Up @@ -75,7 +76,8 @@ class version_operator final : public crtp_operator<version_operator> {
event.field("minor", tenzir::version::minor);
event.field("patch", tenzir::version::patch);
auto features = event.field("features").list();
for (const auto& feature : tenzir_features()) {
for (const auto& feature :
tenzir_features(check(to<record>(content(ctrl.self().config()))))) {
features.data(feature);
}
auto build = event.field("build").record();
Expand Down
13 changes: 9 additions & 4 deletions libtenzir/include/tenzir/connect_to_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ auto get_retry_delay(const caf::settings& settings)
auto get_deadline(caf::timespan timeout)
-> std::optional<std::chrono::steady_clock::time_point>;

[[nodiscard]] auto check_version(const record& remote_version) -> bool;
[[nodiscard]] auto
check_version(const record& remote_version, const record& cfg) -> bool;

} // namespace details

Expand All @@ -43,8 +44,9 @@ void connect_to_node(caf::typed_event_based_actor<Sigs...>* self,
// Fetch values from config.
const auto& opts = content(self->system().config());
auto node_endpoint = details::get_node_endpoint(opts);
if (!node_endpoint)
if (!node_endpoint) {
return callback(std::move(node_endpoint.error()));
}
auto timeout = node_connection_timeout(opts);
auto connector
= self->spawn(tenzir::connector, details::get_retry_delay(opts),
Expand All @@ -59,9 +61,12 @@ void connect_to_node(caf::typed_event_based_actor<Sigs...>* self,
(void)connector;
self->request(node, timeout, atom::get_v, atom::version_v)
.then(
[callback, node = std::move(node)](record& remote_version) mutable {
[self, callback,
node = std::move(node)](record& remote_version) mutable {
// TODO: Refactor this (also in .cpp).
(void)details::check_version(remote_version);
(void)details::check_version(
remote_version,
check(to<record>(content(self->system().config()))));
callback(std::move(node));
},
[=](caf::error& error) {
Expand Down
4 changes: 2 additions & 2 deletions libtenzir/include/tenzir/version.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
namespace tenzir {

/// Puts the version information into a record.
record retrieve_versions();
auto retrieve_versions(const record& cfg) -> record;

/// Returns a list of features supported by this build of the node.
std::vector<std::string> tenzir_features();
auto tenzir_features(const record& cfg) -> std::vector<std::string>;

} // namespace tenzir
10 changes: 7 additions & 3 deletions libtenzir/src/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void add_root_opts(command& cmd) {
"forbid unsafe location overrides for pipelines with the "
"'local' and 'remote' keywords, e.g., remotely reading from "
"a file");
cmd.options.add<bool>("?tenzir", "tql2", "use TQL2 by default");
cmd.options.add<std::string>("?tenzir", "console-verbosity",
"output verbosity level on the "
"console");
Expand Down Expand Up @@ -197,8 +198,9 @@ make_application(std::string_view path) {
// Add additional commands from plugins.
for (const auto* plugin : plugins::get<command_plugin>()) {
auto [cmd, cmd_factory] = plugin->make_command();
if (!cmd || cmd_factory.empty())
if (!cmd || cmd_factory.empty()) {
continue;
}
root->add_subcommand(std::move(cmd));
root_factory.insert(std::make_move_iterator(cmd_factory.begin()),
std::make_move_iterator(cmd_factory.end()));
Expand All @@ -211,9 +213,10 @@ make_application(std::string_view path) {

void render_error(const command& root, const caf::error& err,
std::ostream& os) {
if (!err || err == ec::silent)
if (!err || err == ec::silent) {
// The user most likely killed the process via CTRL+C, print nothing.
return;
}
const auto pretty_diagnostics = true;
os << render(err, pretty_diagnostics) << '\n';
if (err.category() == caf::type_id_v<tenzir::ec>) {
Expand All @@ -227,8 +230,9 @@ void render_error(const command& root, const caf::error& err,
auto ctx = err.context();
if (ctx.match_element<std::string>(1)) {
auto name = ctx.get_as<std::string>(1);
if (auto cmd = resolve(root, name))
if (auto cmd = resolve(root, name)) {
helptext(*cmd, os);
}
} else {
TENZIR_ASSERT(
!"User visible error contexts must consist of strings!");
Expand Down
65 changes: 43 additions & 22 deletions libtenzir/src/connect_to_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ namespace {

void assert_data_completness(const record& remote_version,
const record& local_version) {
if (!local_version.contains("Tenzir"))
if (!local_version.contains("Tenzir")) {
die("no Tenzir key found in a local version");
if (!remote_version.contains("Tenzir"))
}
if (!remote_version.contains("Tenzir")) {
die("no Tenzir key found in a remote version");
if (!local_version.contains("plugins"))
}
if (!local_version.contains("plugins")) {
die("no plugins key found in a local version");
if (!remote_version.contains("plugins"))
}
if (!remote_version.contains("plugins")) {
die("no plugins key found in a remote version");
}
}

} // namespace
Expand All @@ -49,19 +53,24 @@ auto get_node_endpoint(const caf::settings& opts) -> caf::expected<endpoint> {
endpoint node_endpoint;
auto endpoint_str
= get_or(opts, "tenzir.endpoint", defaults::endpoint.data());
if (!parsers::endpoint(endpoint_str, node_endpoint))
if (!parsers::endpoint(endpoint_str, node_endpoint)) {
return caf::make_error(ec::parse_error, "invalid endpoint",
endpoint_str.data());
}
// Default to port 5158/tcp if none is set.
if (!node_endpoint.port)
if (!node_endpoint.port) {
node_endpoint.port = port{defaults::endpoint_port, port_type::tcp};
if (node_endpoint.port->type() == port_type::unknown)
}
if (node_endpoint.port->type() == port_type::unknown) {
node_endpoint.port->type(port_type::tcp);
if (node_endpoint.port->type() != port_type::tcp)
}
if (node_endpoint.port->type() != port_type::tcp) {
return caf::make_error(ec::invalid_configuration, "invalid protocol",
*node_endpoint.port);
if (node_endpoint.host.empty())
}
if (node_endpoint.host.empty()) {
node_endpoint.host = defaults::endpoint_host;
}
return node_endpoint;
}

Expand All @@ -72,20 +81,23 @@ auto get_retry_delay(const caf::settings& settings)
auto retry_delay
= caf::get_or<caf::timespan>(settings, "tenzir.connection-retry-delay",
defaults::node_connection_retry_delay);
if (retry_delay == caf::timespan::zero())
if (retry_delay == caf::timespan::zero()) {
return {};
}
return retry_delay;
}

auto get_deadline(caf::timespan timeout)
-> std::optional<std::chrono::steady_clock::time_point> {
if (caf::is_infinite(timeout))
if (caf::is_infinite(timeout)) {
return {};
}
return {std::chrono::steady_clock::now() + timeout};
}

[[nodiscard]] auto check_version(const record& remote_version) -> bool {
const auto local_version = retrieve_versions();
[[nodiscard]] auto
check_version(const record& remote_version, const record& cfg) -> bool {
const auto local_version = retrieve_versions(cfg);
assert_data_completness(remote_version, local_version);
if (local_version.at("Tenzir") != remote_version.at("Tenzir")) {
TENZIR_WARN("client version {} does not match node version {}; "
Expand All @@ -111,37 +123,44 @@ auto get_deadline(caf::timespan timeout)

std::optional<std::chrono::steady_clock::time_point>
get_deadline(caf::timespan timeout) {
if (caf::is_infinite(timeout))
if (caf::is_infinite(timeout)) {
return {};
}
return {std::chrono::steady_clock::now() + timeout};
}

std::optional<caf::timespan> get_retry_delay(const caf::settings& settings) {
auto retry_delay
= caf::get_or<caf::timespan>(settings, "tenzir.connection-retry-delay",
defaults::node_connection_retry_delay);
if (retry_delay == caf::timespan::zero())
if (retry_delay == caf::timespan::zero()) {
return {};
}
return retry_delay;
}

caf::expected<endpoint> get_node_endpoint(const caf::settings& opts) {
endpoint node_endpoint;
auto endpoint_str
= get_or(opts, "tenzir.endpoint", defaults::endpoint.data());
if (!parsers::endpoint(endpoint_str, node_endpoint))
if (!parsers::endpoint(endpoint_str, node_endpoint)) {
return caf::make_error(ec::parse_error, "invalid endpoint",
endpoint_str.data());
}
// Default to port 5158/tcp if none is set.
if (!node_endpoint.port)
if (!node_endpoint.port) {
node_endpoint.port = port{defaults::endpoint_port, port_type::tcp};
if (node_endpoint.port->type() == port_type::unknown)
}
if (node_endpoint.port->type() == port_type::unknown) {
node_endpoint.port->type(port_type::tcp);
if (node_endpoint.port->type() != port_type::tcp)
}
if (node_endpoint.port->type() != port_type::tcp) {
return caf::make_error(ec::invalid_configuration, "invalid protocol",
*node_endpoint.port);
if (node_endpoint.host.empty())
}
if (node_endpoint.host.empty()) {
node_endpoint.host = defaults::endpoint_host;
}
return node_endpoint;
}

Expand All @@ -153,8 +172,9 @@ caf::expected<node_actor> connect_to_node(caf::scoped_actor& self) {
// Fetch values from config.
const auto& opts = content(self->system().config());
auto node_endpoint = details::get_node_endpoint(opts);
if (!node_endpoint)
if (!node_endpoint) {
return std::move(node_endpoint.error());
}
auto timeout = node_connection_timeout(opts);
auto connector_actor = self->spawn(connector, details::get_retry_delay(opts),
details::get_deadline(timeout));
Expand Down Expand Up @@ -183,7 +203,8 @@ caf::expected<node_actor> connect_to_node(caf::scoped_actor& self) {
.receive(
[&](record& remote_version) {
// TODO
(void)details::check_version(remote_version);
(void)details::check_version(
remote_version, check(to<record>(content(self->system().config()))));
},
[&](caf::error& error) {
result = caf::make_error(ec::version_error,
Expand Down
4 changes: 2 additions & 2 deletions libtenzir/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,8 @@ auto node(node_actor::stateful_pointer<node_state> self, std::string /*name*/,
result);
return result;
},
[](atom::get, atom::version) { //
return retrieve_versions();
[self](atom::get, atom::version) {
return retrieve_versions(check(to<record>(content(self->config()))));
},
[self](atom::spawn, operator_box& box, operator_type input_type,
const receiver_actor<diagnostic>& diagnostic_handler,
Expand Down
22 changes: 15 additions & 7 deletions libtenzir/src/version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

namespace tenzir {

record retrieve_versions() {
record retrieve_versions(const record& cfg) {
record result;
result["Tenzir"] = version::version;
result["Build Configuration"] = record{
Expand All @@ -52,34 +52,42 @@ record retrieve_versions() {
#endif
list plugin_names;
for (const auto& plugin : plugins::get()) {
if (plugin.type() == plugin_ptr::type::builtin)
if (plugin.type() == plugin_ptr::type::builtin) {
continue;
if (auto v = plugin.version())
}
if (auto v = plugin.version()) {
plugin_names.push_back(fmt::format("{}-{}", plugin->name(), v));
else
} else {
plugin_names.push_back(fmt::format("{}", plugin->name()));
}
}
result["plugins"] = std::move(plugin_names);
list features;
for (auto feature : tenzir_features())
for (auto feature : tenzir_features(cfg)) {
features.push_back(feature);
}
result["features"] = features;
return result;
}

auto tenzir_features() -> std::vector<std::string> {
auto tenzir_features(const record& cfg) -> std::vector<std::string> {
// A list of features that are supported by this version of the node. This is
// intended to support the rollout of potentially breaking new features, so
// that downstream API consumers can adjust their behavior depending on the
// capabilities of the node. We remove entries once they're stabilized in the
// Tenzir Platform.
return {
auto result = std::vector<std::string>{
// The node supports passing the `--limit` flag to the TQL1 `chart` operator
"chart_limit",
// The node supports modules in TQL2. Alongside this a few operators were
// renamed, e.g., `package_add` was renamed to `package::add`.
"modules",
};
if (auto fallback = false; get_or(cfg, "tenzir.tql2", fallback)) {
// The experimental TQL2-only mode is enabled.
result.emplace_back("tql2_only");
}
return result;
}

} // namespace tenzir
5 changes: 3 additions & 2 deletions nix/tenzir/plugins/source.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "tenzir-plugins",
"url": "git@github.com:tenzir/tenzir-plugins",
"ref": "main",
"rev": "18694cbf7b9e725ca5907c907df855338b20d84a",
"rev": "2d62dcd42c25522612ccc02549941c61ebd975ab",
"submodules": true,
"shallow": true
"shallow": true,
"allRefs": true
}
3 changes: 3 additions & 0 deletions tenzir.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ tenzir:
# without retries.
connection-retry-delay: 3s

# Always use TQL2 for pipelines.
#tql2: false

# The file system path used for persistent state.
# Defaults to one of the following paths, selecting the first that is
# available:
Expand Down
2 changes: 1 addition & 1 deletion tenzir/integration/tests/every.bats
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ teardown() {
}

@test "every with multiple operators" {
export TENZIR_EXEC__TQL2=true
export TENZIR_TQL2=true
check tenzir -f /dev/stdin <<EOF
every 10ms {
from {x: 42}
Expand Down
2 changes: 1 addition & 1 deletion tenzir/integration/tests/expression.bats
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ setup() {
bats_load_library bats-assert
bats_load_library bats-tenzir

export TENZIR_EXEC__TQL2=true
export TENZIR_TQL2=true
}

@test "add" {
Expand Down
Loading
Loading