Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a TQL2-only mode #4840

Merged
merged 9 commits into from
Dec 12, 2024
Next Next commit
Allow feature flags to depend on configuration
  • Loading branch information
dominiklohmann committed Dec 5, 2024
commit efdcf1cf547b1144219142a97a5cf655a9f8a19a
2 changes: 1 addition & 1 deletion contrib/tenzir-plugins
6 changes: 4 additions & 2 deletions libtenzir/builtins/operators/version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class version_operator final : public crtp_operator<version_operator> {
public:
version_operator() = default;

auto operator()(operator_control_plane&) const -> generator<table_slice> {
auto operator()(operator_control_plane& ctrl) const
-> generator<table_slice> {
auto builder = series_builder{type{
"tenzir.version",
record_type{
Expand Down Expand Up @@ -75,7 +76,8 @@ class version_operator final : public crtp_operator<version_operator> {
event.field("minor", tenzir::version::minor);
event.field("patch", tenzir::version::patch);
auto features = event.field("features").list();
for (const auto& feature : tenzir_features()) {
for (const auto& feature :
tenzir_features(check(to<record>(content(ctrl.self().config()))))) {
features.data(feature);
}
auto build = event.field("build").record();
Expand Down
13 changes: 9 additions & 4 deletions libtenzir/include/tenzir/connect_to_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ auto get_retry_delay(const caf::settings& settings)
auto get_deadline(caf::timespan timeout)
-> std::optional<std::chrono::steady_clock::time_point>;

[[nodiscard]] auto check_version(const record& remote_version) -> bool;
[[nodiscard]] auto
check_version(const record& remote_version, const record& cfg) -> bool;

} // namespace details

Expand All @@ -43,8 +44,9 @@ void connect_to_node(caf::typed_event_based_actor<Sigs...>* self,
// Fetch values from config.
const auto& opts = content(self->system().config());
auto node_endpoint = details::get_node_endpoint(opts);
if (!node_endpoint)
if (!node_endpoint) {
return callback(std::move(node_endpoint.error()));
}
auto timeout = node_connection_timeout(opts);
auto connector
= self->spawn(tenzir::connector, details::get_retry_delay(opts),
Expand All @@ -59,9 +61,12 @@ void connect_to_node(caf::typed_event_based_actor<Sigs...>* self,
(void)connector;
self->request(node, timeout, atom::get_v, atom::version_v)
.then(
[callback, node = std::move(node)](record& remote_version) mutable {
[self, callback,
node = std::move(node)](record& remote_version) mutable {
// TODO: Refactor this (also in .cpp).
(void)details::check_version(remote_version);
(void)details::check_version(
remote_version,
check(to<record>(content(self->system().config()))));
callback(std::move(node));
},
[=](caf::error& error) {
Expand Down
4 changes: 2 additions & 2 deletions libtenzir/include/tenzir/version.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
namespace tenzir {

/// Puts the version information into a record.
record retrieve_versions();
auto retrieve_versions(const record& cfg) -> record;

/// Returns a list of features supported by this build of the node.
std::vector<std::string> tenzir_features();
auto tenzir_features(const record& cfg) -> std::vector<std::string>;

} // namespace tenzir
65 changes: 43 additions & 22 deletions libtenzir/src/connect_to_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ namespace {

void assert_data_completness(const record& remote_version,
const record& local_version) {
if (!local_version.contains("Tenzir"))
if (!local_version.contains("Tenzir")) {
die("no Tenzir key found in a local version");
if (!remote_version.contains("Tenzir"))
}
if (!remote_version.contains("Tenzir")) {
die("no Tenzir key found in a remote version");
if (!local_version.contains("plugins"))
}
if (!local_version.contains("plugins")) {
die("no plugins key found in a local version");
if (!remote_version.contains("plugins"))
}
if (!remote_version.contains("plugins")) {
die("no plugins key found in a remote version");
}
}

} // namespace
Expand All @@ -49,19 +53,24 @@ auto get_node_endpoint(const caf::settings& opts) -> caf::expected<endpoint> {
endpoint node_endpoint;
auto endpoint_str
= get_or(opts, "tenzir.endpoint", defaults::endpoint.data());
if (!parsers::endpoint(endpoint_str, node_endpoint))
if (!parsers::endpoint(endpoint_str, node_endpoint)) {
return caf::make_error(ec::parse_error, "invalid endpoint",
endpoint_str.data());
}
// Default to port 5158/tcp if none is set.
if (!node_endpoint.port)
if (!node_endpoint.port) {
node_endpoint.port = port{defaults::endpoint_port, port_type::tcp};
if (node_endpoint.port->type() == port_type::unknown)
}
if (node_endpoint.port->type() == port_type::unknown) {
node_endpoint.port->type(port_type::tcp);
if (node_endpoint.port->type() != port_type::tcp)
}
if (node_endpoint.port->type() != port_type::tcp) {
return caf::make_error(ec::invalid_configuration, "invalid protocol",
*node_endpoint.port);
if (node_endpoint.host.empty())
}
if (node_endpoint.host.empty()) {
node_endpoint.host = defaults::endpoint_host;
}
return node_endpoint;
}

Expand All @@ -72,20 +81,23 @@ auto get_retry_delay(const caf::settings& settings)
auto retry_delay
= caf::get_or<caf::timespan>(settings, "tenzir.connection-retry-delay",
defaults::node_connection_retry_delay);
if (retry_delay == caf::timespan::zero())
if (retry_delay == caf::timespan::zero()) {
return {};
}
return retry_delay;
}

auto get_deadline(caf::timespan timeout)
-> std::optional<std::chrono::steady_clock::time_point> {
if (caf::is_infinite(timeout))
if (caf::is_infinite(timeout)) {
return {};
}
return {std::chrono::steady_clock::now() + timeout};
}

[[nodiscard]] auto check_version(const record& remote_version) -> bool {
const auto local_version = retrieve_versions();
[[nodiscard]] auto
check_version(const record& remote_version, const record& cfg) -> bool {
const auto local_version = retrieve_versions(cfg);
assert_data_completness(remote_version, local_version);
if (local_version.at("Tenzir") != remote_version.at("Tenzir")) {
TENZIR_WARN("client version {} does not match node version {}; "
Expand All @@ -111,37 +123,44 @@ auto get_deadline(caf::timespan timeout)

std::optional<std::chrono::steady_clock::time_point>
get_deadline(caf::timespan timeout) {
if (caf::is_infinite(timeout))
if (caf::is_infinite(timeout)) {
return {};
}
return {std::chrono::steady_clock::now() + timeout};
}

std::optional<caf::timespan> get_retry_delay(const caf::settings& settings) {
auto retry_delay
= caf::get_or<caf::timespan>(settings, "tenzir.connection-retry-delay",
defaults::node_connection_retry_delay);
if (retry_delay == caf::timespan::zero())
if (retry_delay == caf::timespan::zero()) {
return {};
}
return retry_delay;
}

caf::expected<endpoint> get_node_endpoint(const caf::settings& opts) {
endpoint node_endpoint;
auto endpoint_str
= get_or(opts, "tenzir.endpoint", defaults::endpoint.data());
if (!parsers::endpoint(endpoint_str, node_endpoint))
if (!parsers::endpoint(endpoint_str, node_endpoint)) {
return caf::make_error(ec::parse_error, "invalid endpoint",
endpoint_str.data());
}
// Default to port 5158/tcp if none is set.
if (!node_endpoint.port)
if (!node_endpoint.port) {
node_endpoint.port = port{defaults::endpoint_port, port_type::tcp};
if (node_endpoint.port->type() == port_type::unknown)
}
if (node_endpoint.port->type() == port_type::unknown) {
node_endpoint.port->type(port_type::tcp);
if (node_endpoint.port->type() != port_type::tcp)
}
if (node_endpoint.port->type() != port_type::tcp) {
return caf::make_error(ec::invalid_configuration, "invalid protocol",
*node_endpoint.port);
if (node_endpoint.host.empty())
}
if (node_endpoint.host.empty()) {
node_endpoint.host = defaults::endpoint_host;
}
return node_endpoint;
}

Expand All @@ -153,8 +172,9 @@ caf::expected<node_actor> connect_to_node(caf::scoped_actor& self) {
// Fetch values from config.
const auto& opts = content(self->system().config());
auto node_endpoint = details::get_node_endpoint(opts);
if (!node_endpoint)
if (!node_endpoint) {
return std::move(node_endpoint.error());
}
auto timeout = node_connection_timeout(opts);
auto connector_actor = self->spawn(connector, details::get_retry_delay(opts),
details::get_deadline(timeout));
Expand Down Expand Up @@ -183,7 +203,8 @@ caf::expected<node_actor> connect_to_node(caf::scoped_actor& self) {
.receive(
[&](record& remote_version) {
// TODO
(void)details::check_version(remote_version);
(void)details::check_version(
remote_version, check(to<record>(content(self->system().config()))));
},
[&](caf::error& error) {
result = caf::make_error(ec::version_error,
Expand Down
4 changes: 2 additions & 2 deletions libtenzir/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,8 @@ auto node(node_actor::stateful_pointer<node_state> self, std::string /*name*/,
result);
return result;
},
[](atom::get, atom::version) { //
return retrieve_versions();
[self](atom::get, atom::version) {
return retrieve_versions(check(to<record>(content(self->config()))));
},
[self](atom::spawn, operator_box& box, operator_type input_type,
const receiver_actor<diagnostic>& diagnostic_handler,
Expand Down
23 changes: 16 additions & 7 deletions libtenzir/src/version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

namespace tenzir {

record retrieve_versions() {
record retrieve_versions(const record& cfg) {
record result;
result["Tenzir"] = version::version;
result["Build Configuration"] = record{
Expand All @@ -52,34 +52,43 @@ record retrieve_versions() {
#endif
list plugin_names;
for (const auto& plugin : plugins::get()) {
if (plugin.type() == plugin_ptr::type::builtin)
if (plugin.type() == plugin_ptr::type::builtin) {
continue;
if (auto v = plugin.version())
}
if (auto v = plugin.version()) {
plugin_names.push_back(fmt::format("{}-{}", plugin->name(), v));
else
} else {
plugin_names.push_back(fmt::format("{}", plugin->name()));
}
}
result["plugins"] = std::move(plugin_names);
list features;
for (auto feature : tenzir_features())
for (auto feature : tenzir_features(cfg)) {
features.push_back(feature);
}
result["features"] = features;
return result;
}

auto tenzir_features() -> std::vector<std::string> {
auto tenzir_features(const record& cfg) -> std::vector<std::string> {
// A list of features that are supported by this version of the node. This is
// intended to support the rollout of potentially breaking new features, so
// that downstream API consumers can adjust their behavior depending on the
// capabilities of the node. We remove entries once they're stabilized in the
// Tenzir Platform.
return {
auto result = std::vector<std::string>{
// The node supports passing the `--limit` flag to the TQL1 `chart` operator
"chart_limit",
// The node supports modules in TQL2. Alongside this a few operators were
// renamed, e.g., `package_add` was renamed to `package::add`.
"modules",
};
// The experimental TQL2-only mode is enabled.
const auto disabled = false;
if (get_or(cfg, "tenzir.tql2", disabled)) {
result.emplace_back("tql2_only");
}
return result;
}

} // namespace tenzir
5 changes: 3 additions & 2 deletions nix/tenzir/plugins/source.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"name": "tenzir-plugins",
"url": "git@github.com:tenzir/tenzir-plugins",
"ref": "main",
"rev": "18694cbf7b9e725ca5907c907df855338b20d84a",
"rev": "ad39ae43a2f22a990ca5ec8650cc2ef3bcae8280",
"submodules": true,
"shallow": true
"shallow": true,
"allRefs": true
}
Loading