Skip to content

Commit

Permalink
Merge branch 'main' into topic/tql2-only-mode
Browse files Browse the repository at this point in the history
  • Loading branch information
dominiklohmann committed Dec 12, 2024
2 parents e1866cb + c3aadf2 commit 92c2a9a
Show file tree
Hide file tree
Showing 766 changed files with 3,870 additions and 28,464 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/tenzir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ jobs:
with:
ssh-private-key: ${{ secrets.TENZIR_PLUGINS_DEPLOY_KEY }}
- name: Require contrib/tenzir-plugins to be downstream of current `main`
if: github.event_name == 'pull_request' && github.base_ref == 'refs/heads/main'
run: |
git submodule update --init contrib/tenzir-plugins
git -C contrib/tenzir-plugins fetch origin main
Expand All @@ -357,13 +358,15 @@ jobs:
$(git -C contrib/tenzir-plugins rev-parse HEAD) \
$(git -C contrib/tenzir-plugins rev-parse origin/main)
- name: Generate a token
if: github.event_name == 'pull_request' && github.base_ref == 'refs/heads/main'
id: generate_token
uses: actions/create-github-app-token@v1
with:
app-id: ${{ vars.TENZIR_AUTOBUMPER_APP_ID }}
private-key: ${{ secrets.TENZIR_AUTOBUMPER_APP_PRIVATE_KEY }}
owner: ${{ github.repository_owner }}
- name: Require an open PR for a submodule bump
if: github.event_name == 'pull_request' && github.base_ref == 'refs/heads/main'
env:
GH_TOKEN: ${{ steps.generate_token.outputs.token }}
run: |
Expand Down Expand Up @@ -813,6 +816,7 @@ jobs:
- name: Compaction
target: compaction
path: contrib/tenzir-plugins/compaction
parallel: 1
- name: Context
target: context
path: contrib/tenzir-plugins/context
Expand Down Expand Up @@ -977,7 +981,7 @@ jobs:
cmake -S '${{ matrix.plugin.path }}' -B "$BUILD_DIR"
- name: Build
run: |
cmake --build "$BUILD_DIR" --target all --parallel
cmake --build "$BUILD_DIR" --target all --parallel ${{ matrix.plugin.parallel }}
- name: Run Unit Tests
env:
CTEST_OUTPUT_ON_FAILURE: 1
Expand Down
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
shallow = true
[submodule "libtenzir/aux/caf"]
path = libtenzir/aux/caf
url = https://github.com/tenzir/actor-framework
url = https://github.com/actor-framework/actor-framework
shallow = true
[submodule "libtenzir/aux/pfs"]
path = libtenzir/aux/pfs
Expand Down
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ elseif (CMAKE_CXX_COMPILER_ID MATCHES "GNU")
target_compile_options(libtenzir_internal INTERFACE -Wno-unknown-warning)
endif ()

# FIXME: remove and fix warnings.
target_compile_options(libtenzir_internal INTERFACE -Wno-deprecated-declarations)

set(gcc12plus
"$<AND:$<CXX_COMPILER_ID:GNU>,$<VERSION_GREATER_EQUAL:$<CXX_COMPILER_VERSION>,12>>"
)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ FROM plugins-source AS compaction-plugin
COPY contrib/tenzir-plugins/compaction ./contrib/tenzir-plugins/compaction
RUN cmake -S contrib/tenzir-plugins/compaction -B build-compaction -G Ninja \
-D CMAKE_INSTALL_PREFIX:STRING="$PREFIX" && \
cmake --build build-compaction --parallel && \
cmake --build build-compaction --parallel 1 && \
cmake --build build-compaction --target integration && \
DESTDIR=/plugin/compaction cmake --install build-compaction --strip --component Runtime && \
rm -rf build-compaction
Expand Down
3 changes: 3 additions & 0 deletions changelog/next/bug-fixes/4848--save_email.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `endpoint` argument of the `save_email` operator was documented as optional
but was not parsed as so. This has been fixed and the argument is now
correctly optional.
3 changes: 3 additions & 0 deletions changelog/next/features/4841--ip-in-subnet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Whether an IP address is contained in a subnet can now be checked using
expressions such as `1.2.3.4 in 1.2.0.0/16`. Similarly, to check whether a
subnet is included in another subnet, use `1.2.0.0/16 in 1.0.0.0/8`.
2 changes: 2 additions & 0 deletions changelog/next/features/4844--not-in.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
TQL2 now allows writing `x not in y` as an equivalent to `not (x in y)` for
better readability.
2 changes: 2 additions & 0 deletions changelog/next/features/4848--email-tls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The `save_email` now accepts a `tls` option to specify TLS usage when establishing
the SMTP connection.
3 changes: 3 additions & 0 deletions changelog/v4.24.1/bug-fixes/4846.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
We fixed a rare crash on startup that would occur when starting the
`tenzir-node` process was so slow that it would try to emit metrics before the
component handling metrics was ready.
2 changes: 2 additions & 0 deletions changelog/v4.24.1/bug-fixes/4847.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The TQL2 `nics` operator had a bug causing the operator name to be `nic`.
This has now been fixed and works as documented.
1 change: 1 addition & 0 deletions changelog/v4.24.1/bug-fixes/4855.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
We fixed the `last` aggregation function to return the last element.
2 changes: 2 additions & 0 deletions changelog/v4.24.1/bug-fixes/4856.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
We fixed a bug introduced with v4.24.0 causing crashes on startup when some
of the files in the node's state directory were smaller than 12 bytes.
2 changes: 1 addition & 1 deletion contrib/tenzir-plugins
2 changes: 1 addition & 1 deletion libtenzir/aux/caf
Submodule caf updated 1872 files
2 changes: 1 addition & 1 deletion libtenzir/builtins/aggregation-functions/first_last.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class first_last_instance final : public aggregation_instance {
}

auto update(const table_slice& input, session ctx) -> void override {
if (not is<caf::none_t>(result_)) {
if (Mode == mode::first and not is<caf::none_t>(result_)) {
return;
}
auto arg = eval(expr_, input, ctx);
Expand Down
51 changes: 27 additions & 24 deletions libtenzir/builtins/commands/rebuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,33 +481,35 @@ struct rebuilder_state {
rebuilder_actor::behavior_type
rebuilder(rebuilder_actor::stateful_pointer<rebuilder_state> self,
catalog_actor catalog, index_actor index) {
self->state.self = self;
self->state.catalog = std::move(catalog);
self->state.index = std::move(index);
self->state.max_partition_size
self->state().self = self;
self->state().catalog = std::move(catalog);
self->state().index = std::move(index);
self->state().max_partition_size
= caf::get_or(self->system().config(), "tenzir.max-partition-size",
defaults::max_partition_size);
self->state.desired_batch_size
self->state().desired_batch_size
= caf::get_or(self->system().config(), "tenzir.import.batch-size",
defaults::import::table_slice_size);
self->state.automatic_rebuild = caf::get_or(
self->state().automatic_rebuild = caf::get_or(
self->system().config(), "tenzir.automatic-rebuild", size_t{1});
if (self->state.automatic_rebuild > 0) {
self->state.rebuild_interval
if (self->state().automatic_rebuild > 0) {
self->state().rebuild_interval
= caf::get_or(self->system().config(), "tenzir.rebuild-interval",
defaults::rebuild_interval);
self->state.schedule();
self->state().schedule();
}
self->set_exit_handler([self](const caf::exit_msg& msg) {
TENZIR_DEBUG("{} received EXIT from {}: {}", *self, msg.source, msg.reason);
if (!self->state.run) {
if (!self->state().run) {
self->quit(msg.reason);
return;
}
for (auto&& rp : std::exchange(self->state.run->stop_requests, {}))
for (auto&& rp : std::exchange(self->state().run->stop_requests, {})) {
rp.deliver(msg.reason);
for (auto&& rp : std::exchange(self->state.run->delayed_rebuilds, {}))
}
for (auto&& rp : std::exchange(self->state().run->delayed_rebuilds, {})) {
rp.deliver(msg.reason);
}
self->quit(msg.reason);
});
if (auto importer
Expand All @@ -525,13 +527,14 @@ rebuilder(rebuilder_actor::stateful_pointer<rebuilder_state> self,
self, defaults::metrics_interval,
[self, importer = std::move(importer),
builder = std::move(builder)]() mutable {
const auto partitions
= self->state.run ? self->state.run->statistics.num_rebuilding : 0;
const auto partitions = self->state().run
? self->state().run->statistics.num_rebuilding
: 0;
const auto queued_partitions
= self->state.run ? self->state.run->statistics.num_total
- self->state.run->statistics.num_completed
- self->state.run->statistics.num_rebuilding
: 0;
= self->state().run ? self->state().run->statistics.num_total
- self->state().run->statistics.num_completed
- self->state().run->statistics.num_rebuilding
: 0;
auto metric = builder.record();
metric.field("timestamp", time::clock::now());
metric.field("partitions", partitions);
Expand All @@ -541,19 +544,19 @@ rebuilder(rebuilder_actor::stateful_pointer<rebuilder_state> self,
}
return {
[self](atom::status, status_verbosity verbosity, duration) {
return self->state.status(verbosity);
return self->state().status(verbosity);
},
[self](atom::start, start_options& options) {
return self->state.start(std::move(options));
return self->state().start(std::move(options));
},
[self](atom::stop, const stop_options& options) {
return self->state.stop(options);
return self->state().stop(options);
},
[self](atom::internal, atom::rebuild) {
return self->state.rebuild();
return self->state().rebuild();
},
[self](atom::internal, atom::schedule) {
return self->state.schedule();
return self->state().schedule();
},
};
}
Expand Down Expand Up @@ -726,7 +729,7 @@ class plugin final : public virtual command_plugin,
auto make_component(node_actor::stateful_pointer<node_state> node) const
-> component_plugin_actor override {
auto [catalog, index]
= node->state.registry.find<catalog_actor, index_actor>();
= node->state().registry.find<catalog_actor, index_actor>();
return node->spawn(rebuilder, std::move(catalog), std::move(index));
}
};
Expand Down
8 changes: 4 additions & 4 deletions libtenzir/builtins/components/metrics_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ struct metrics_collector_state {
auto metrics_collector(
metrics_collector_actor::stateful_pointer<metrics_collector_state> self,
importer_actor importer) -> metrics_collector_actor::behavior_type {
self->state.self = self;
self->state.importer = std::move(importer);
if (const auto ok = self->state.setup(); not ok) {
self->state().self = self;
self->state().importer = std::move(importer);
if (const auto ok = self->state().setup(); not ok) {
self->quit(diagnostic::error(ok.error())
.note("failed to create {}", *self)
.to_error());
Expand All @@ -143,7 +143,7 @@ class plugin final : public virtual component_plugin {

auto make_component(node_actor::stateful_pointer<node_state> node) const
-> component_plugin_actor override {
auto [importer] = node->state.registry.find<importer_actor>();
auto [importer] = node->state().registry.find<importer_actor>();
return node->spawn<caf::linked>(metrics_collector, std::move(importer));
}
};
Expand Down
1 change: 1 addition & 0 deletions libtenzir/builtins/connectors/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <filesystem>
#include <regex>
#include <string_view>
#include <system_error>

using namespace std::chrono_literals;
Expand Down
57 changes: 34 additions & 23 deletions libtenzir/builtins/connectors/email.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <tenzir/plugin.hpp>
#include <tenzir/transfer.hpp>

#include <string_view>

using namespace std::chrono_literals;

namespace tenzir::plugins::email {
Expand All @@ -28,6 +30,7 @@ struct saver_args {
std::optional<std::string> subject;
transfer_options transfer_opts;
bool mime;
std::optional<located<bool>> tls;

friend auto inspect(auto& f, saver_args& x) -> bool {
return f.object(x)
Expand Down Expand Up @@ -63,22 +66,33 @@ class saver final : public plugin_saver {
auto instantiate(operator_control_plane& ctrl, std::optional<printer_info>)
-> caf::expected<std::function<void(chunk_ptr)>> override {
auto tx = transfer{args_.transfer_opts};
if (auto err = tx.prepare(std::move(args_.endpoint))) {
diagnostic::error("failed to prepare SMTP server request")
.note("{}", err)
.emit(ctrl.diagnostics());
return err;
}
if (auto err = to_error(tx.handle().set(CURLOPT_UPLOAD, 1))) {
return err;
}
auto code = tx.handle().set(CURLOPT_URL, args_.endpoint);
if (code != curl::easy::code::ok) {
auto err = to_error(code);
diagnostic::error("failed to set SMTP server request")
.note("server: {}", args_.endpoint)
const auto set_tls_opts = [&] {
if (args_.tls) {
return to_error(tx.handle().set(CURLOPT_USE_SSL, args_.tls->inner
? CURLUSESSL_ALL
: CURLUSESSL_NONE));
} else {
return to_error(tx.handle().set(CURLOPT_USE_SSL, CURLUSESSL_TRY));
}
};
if (auto err = set_tls_opts()) {
diagnostic::error("failed to set TLS options")
.note("{}", err)
.emit(ctrl.diagnostics());
return err;
}
if (args_.from) {
code = tx.handle().set(CURLOPT_MAIL_FROM, *args_.from);
if (code != curl::easy::code::ok) {
auto err = to_error(code);
if (auto err
= to_error(tx.handle().set(CURLOPT_MAIL_FROM, *args_.from))) {
diagnostic::error("failed to set MAIL FROM")
.note("from: {}", *args_.from)
.note("{}", err)
Expand All @@ -92,17 +106,13 @@ class saver final : public plugin_saver {
#else
auto allowfails = CURLOPT_MAIL_RCPT_ALLOWFAILS;
#endif
code = tx.handle().set(allowfails, 1);
if (code != curl::easy::code::ok) {
auto err = to_error(code);
if (auto err = to_error(tx.handle().set(allowfails, 1))) {
diagnostic::error("failed to adjust recipient failure mode")
.note("{}", err)
.emit(ctrl.diagnostics());
return err;
}
code = tx.handle().add_mail_recipient(args_.to);
if (code != curl::easy::code::ok) {
auto err = to_error(code);
if (auto err = to_error(tx.handle().add_mail_recipient(args_.to))) {
diagnostic::error("failed to set To header")
.note("to: {}", args_.to)
.note("{}", err)
Expand Down Expand Up @@ -247,36 +257,37 @@ class save_plugin final
auto make(invocation inv, session ctx) const
-> failure_or<operator_ptr> override {
auto args = saver_args{};
auto endpoint = std::optional<std::string>{default_smtp_server};
auto to = located<std::string>{};
auto parser = argument_parser2::operator_(name());
parser.positional("email", args.to);
parser.named("endpoint", args.endpoint);
parser.positional("recipient", to);
parser.named("endpoint", endpoint);
parser.named("from", args.from);
parser.named("subject", args.subject);
parser.named("username", args.transfer_opts.username);
parser.named("password", args.transfer_opts.password);
parser.named("authzid", args.transfer_opts.authzid);
parser.named("authorization", args.transfer_opts.authorization);
parser.named("tls", args.tls);
parser.named("skip_peer_verification",
args.transfer_opts.skip_peer_verification);
parser.named("skip_hostname_verification",
args.transfer_opts.skip_hostname_verification);
parser.named("mime", args.mime);
parser.named("verbose", args.transfer_opts.verbose);
TRY(parser.parse(inv, ctx));
if (args.endpoint.empty()) {
args.endpoint = default_smtp_server;
} else if (args.endpoint.find("://") == std::string_view::npos) {
args.endpoint = std::move(endpoint).value();
if (args.endpoint.find("://") == std::string_view::npos) {
args.endpoint.insert(0, "smtps://");
} else if (args.endpoint.starts_with("email://")) {
args.endpoint.erase(0, 5);
args.endpoint.insert(0, "smtp");
}
if (args.to.empty()) {
diagnostic::error("no recipient specified")
.hint("add `to=<recipient>` to your invocation")
.emit(ctx);
if (to.inner.empty()) {
diagnostic::error("empty recipient specified").primary(to).emit(ctx);
return failure::promise();
}
args.to = std::move(to.inner);
return std::make_unique<saver_adapter<saver>>(saver{std::move(args)});
}
};
Expand Down
2 changes: 1 addition & 1 deletion libtenzir/builtins/connectors/file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ class plugin : public virtual loader_plugin<file_loader>,
-> caf::error override {
auto timeout
= try_get<tenzir::duration>(global_config, "tenzir.import.read-timeout");
if (!timeout.engaged()) {
if (!timeout.has_value()) {
return std::move(timeout.error());
}
if (timeout->has_value()) {
Expand Down
Loading

0 comments on commit 92c2a9a

Please sign in to comment.