From 35cf97b6c47fa7448babdceefae6616a1153fc24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannis=20Christopher=20K=C3=B6hl?= Date: Thu, 12 Dec 2024 16:44:49 +0100 Subject: [PATCH 01/13] Make the expression evaluator support heterogeneous results --- .../changes/4839-varying-expression-types.md | 2 + contrib/tenzir-plugins | 2 +- .../builtins/aggregation-functions/all.cpp | 35 +- .../builtins/aggregation-functions/any.cpp | 35 +- .../aggregation-functions/collect.cpp | 17 +- .../aggregation-functions/count_distinct.cpp | 23 +- .../aggregation-functions/distinct.cpp | 23 +- .../aggregation-functions/first_last.cpp | 31 +- .../builtins/aggregation-functions/mean.cpp | 91 +-- .../aggregation-functions/min_max.cpp | 153 ++--- .../mode_value_counts.cpp | 24 +- .../aggregation-functions/stddev_variance.cpp | 114 ++-- .../builtins/aggregation-functions/sum.cpp | 176 +++--- libtenzir/builtins/formats/cef.cpp | 61 +- libtenzir/builtins/formats/grok.cpp | 9 +- libtenzir/builtins/formats/json.cpp | 107 ++-- libtenzir/builtins/formats/leef.cpp | 67 +-- libtenzir/builtins/functions/base64.cpp | 72 +-- .../builtins/functions/ceil_round_floor.cpp | 238 ++++---- libtenzir/builtins/functions/community_id.cpp | 293 +++++----- libtenzir/builtins/functions/hash.cpp | 2 +- libtenzir/builtins/functions/hex.cpp | 95 +-- libtenzir/builtins/functions/int.cpp | 217 +++---- libtenzir/builtins/functions/ip.cpp | 58 +- libtenzir/builtins/functions/list.cpp | 155 ++--- libtenzir/builtins/functions/misc.cpp | 379 ++++++------ libtenzir/builtins/functions/numeric.cpp | 165 +++--- libtenzir/builtins/functions/ocsf.cpp | 91 +-- libtenzir/builtins/functions/otherwise.cpp | 81 ++- libtenzir/builtins/functions/path.cpp | 144 ++--- libtenzir/builtins/functions/string.cpp | 408 ++++++------- libtenzir/builtins/functions/time.cpp | 469 +++++++-------- libtenzir/builtins/operators/decapsulate.cpp | 13 +- libtenzir/builtins/operators/delay.cpp | 97 ++-- libtenzir/builtins/operators/flatten.cpp | 39 +- libtenzir/builtins/operators/if.cpp | 77 +-- libtenzir/builtins/operators/pseudonymize.cpp | 11 +- libtenzir/builtins/operators/sort.cpp | 19 +- libtenzir/builtins/operators/summarize.cpp | 4 +- libtenzir/builtins/operators/timeshift.cpp | 54 +- libtenzir/builtins/operators/to_hive.cpp | 11 +- libtenzir/builtins/operators/unflatten.cpp | 15 +- libtenzir/builtins/operators/where_map.cpp | 111 ++-- libtenzir/include/tenzir/multi_series.hpp | 140 +++++ libtenzir/include/tenzir/tql2/eval.hpp | 5 + libtenzir/include/tenzir/tql2/eval_impl.hpp | 34 +- libtenzir/include/tenzir/tql2/plugin.hpp | 15 +- libtenzir/include/tenzir/tql2/set.hpp | 20 +- libtenzir/src/multi_series.cpp | 105 ++++ libtenzir/src/tql2/eval.cpp | 17 +- libtenzir/src/tql2/eval_binary.cpp | 54 +- libtenzir/src/tql2/eval_impl.cpp | 546 ++++++++++-------- libtenzir/src/tql2/eval_unary.cpp | 38 +- libtenzir/src/tql2/plugin.cpp | 93 +-- libtenzir/src/tql2/set.cpp | 217 +++++-- nix/tenzir/plugins/source.json | 5 +- .../test_heterogeneous_otherwise/step_00.ref | 6 + .../test_list_length}/step_00.ref | 0 .../functions/test_otherwise/step_00.ref | 12 +- .../test_string_length/step_00.ref | 0 tenzir/integration/tests/expression.bats | 27 - tenzir/integration/tests/functions.bats | 42 ++ 62 files changed, 3106 insertions(+), 2558 deletions(-) create mode 100644 changelog/next/changes/4839-varying-expression-types.md create mode 100644 libtenzir/include/tenzir/multi_series.hpp create mode 100644 libtenzir/src/multi_series.cpp create mode 100644 tenzir/integration/data/reference/functions/test_heterogeneous_otherwise/step_00.ref rename tenzir/integration/data/reference/{expression/test_length_method => functions/test_list_length}/step_00.ref (100%) rename tenzir/integration/data/reference/{expression => functions}/test_string_length/step_00.ref (100%) diff --git a/changelog/next/changes/4839-varying-expression-types.md b/changelog/next/changes/4839-varying-expression-types.md new file mode 100644 index 00000000000..0fecc876ba8 --- /dev/null +++ b/changelog/next/changes/4839-varying-expression-types.md @@ -0,0 +1,2 @@ +Functions can now return values of different types for the same input types. For +example, `x.otherwise(y)` no longer requires that `x` has the same type as `y`. diff --git a/contrib/tenzir-plugins b/contrib/tenzir-plugins index 0ccf93bd76e..37ea5d2477d 160000 --- a/contrib/tenzir-plugins +++ b/contrib/tenzir-plugins @@ -1 +1 @@ -Subproject commit 0ccf93bd76e34905e57acac79e134f2881d81e18 +Subproject commit 37ea5d2477d7e681178f6461f2656a9d2d72025a diff --git a/libtenzir/builtins/aggregation-functions/all.cpp b/libtenzir/builtins/aggregation-functions/all.cpp index 45f2d1782eb..e53055a7818 100644 --- a/libtenzir/builtins/aggregation-functions/all.cpp +++ b/libtenzir/builtins/aggregation-functions/all.cpp @@ -67,24 +67,25 @@ class all_instance final : public aggregation_instance { if (state_ == state::failed) { return; } - auto arg = eval(expr_, input, ctx); - auto f = detail::overload{ - [&](const arrow::NullArray&) { - state_ = state::nulled; - }, - [&](const arrow::BooleanArray& array) { - all_ = all_ and array.false_count() == 0; - if (array.null_count() > 0) { + for (auto& arg : eval(expr_, input, ctx)) { + auto f = detail::overload{ + [&](const arrow::NullArray&) { state_ = state::nulled; - } - }, - [&](auto&&) { - diagnostic::warning("expected type `bool`, got `{}`", arg.type.kind()) - .primary(expr_) - .emit(ctx); - state_ = state::failed; - }}; - match(*arg.array, f); + }, + [&](const arrow::BooleanArray& array) { + all_ = all_ and array.false_count() == 0; + if (array.null_count() > 0) { + state_ = state::nulled; + } + }, + [&](auto&&) { + diagnostic::warning("expected type `bool`, got `{}`", arg.type.kind()) + .primary(expr_) + .emit(ctx); + state_ = state::failed; + }}; + match(*arg.array, f); + } } auto get() const -> data override { diff --git a/libtenzir/builtins/aggregation-functions/any.cpp b/libtenzir/builtins/aggregation-functions/any.cpp index d2eab8b76e5..4ab9b95b496 100644 --- a/libtenzir/builtins/aggregation-functions/any.cpp +++ b/libtenzir/builtins/aggregation-functions/any.cpp @@ -67,24 +67,25 @@ class any_instance final : public aggregation_instance { if (state_ == state::failed) { return; } - auto arg = eval(expr_, input, ctx); - auto f = detail::overload{ - [&](const arrow::NullArray&) { - state_ = state::nulled; - }, - [&](const arrow::BooleanArray& array) { - any_ = any_ or array.true_count() > 0; - if (array.null_count() > 0) { + for (auto& arg : eval(expr_, input, ctx)) { + auto f = detail::overload{ + [&](const arrow::NullArray&) { state_ = state::nulled; - } - }, - [&](auto&&) { - diagnostic::warning("expected type `bool`, got `{}`", arg.type.kind()) - .primary(expr_) - .emit(ctx); - state_ = state::failed; - }}; - match(*arg.array, f); + }, + [&](const arrow::BooleanArray& array) { + any_ = any_ or array.true_count() > 0; + if (array.null_count() > 0) { + state_ = state::nulled; + } + }, + [&](auto&&) { + diagnostic::warning("expected type `bool`, got `{}`", arg.type.kind()) + .primary(expr_) + .emit(ctx); + state_ = state::failed; + }}; + match(*arg.array, f); + } } auto get() const -> data override { diff --git a/libtenzir/builtins/aggregation-functions/collect.cpp b/libtenzir/builtins/aggregation-functions/collect.cpp index 39cb59a3927..190f2453408 100644 --- a/libtenzir/builtins/aggregation-functions/collect.cpp +++ b/libtenzir/builtins/aggregation-functions/collect.cpp @@ -63,16 +63,17 @@ class collect_instance final : public aggregation_instance { } auto update(const table_slice& input, session ctx) -> void override { - auto arg = eval(expr_, input, ctx); - if (is(arg.type)) { - return; - } - // NOTE: Currently, different types end up coerced to strings. - for (auto i = int64_t{}; i < arg.array->length(); ++i) { - if (arg.array->IsNull(i)) { + for (auto& arg : eval(expr_, input, ctx)) { + if (is(arg.type)) { continue; } - result_.push_back(materialize(value_at(arg.type, *arg.array, i))); + // NOTE: Currently, different types end up coerced to strings. + for (auto i = int64_t{}; i < arg.array->length(); ++i) { + if (arg.array->IsNull(i)) { + continue; + } + result_.push_back(materialize(value_at(arg.type, *arg.array, i))); + } } } diff --git a/libtenzir/builtins/aggregation-functions/count_distinct.cpp b/libtenzir/builtins/aggregation-functions/count_distinct.cpp index 1315388542d..3b8a9438808 100644 --- a/libtenzir/builtins/aggregation-functions/count_distinct.cpp +++ b/libtenzir/builtins/aggregation-functions/count_distinct.cpp @@ -96,18 +96,19 @@ class count_distinct_instance final : public aggregation_instance { } auto update(const table_slice& input, session ctx) -> void override { - auto arg = eval(expr_, input, ctx); - if (is(arg.type)) { - return; - } - for (auto i = int64_t{}; i < arg.array->length(); ++i) { - if (arg.array->IsValid(i)) { - const auto& view = value_at(arg.type, *arg.array, i); - const auto it = distinct_.find(view); - if (it != distinct_.end()) { - continue; + for (auto& arg : eval(expr_, input, ctx)) { + if (is(arg.type)) { + continue; + } + for (auto i = int64_t{}; i < arg.array->length(); ++i) { + if (arg.array->IsValid(i)) { + const auto& view = value_at(arg.type, *arg.array, i); + const auto it = distinct_.find(view); + if (it != distinct_.end()) { + continue; + } + distinct_.emplace_hint(it, materialize(view)); } - distinct_.emplace_hint(it, materialize(view)); } } } diff --git a/libtenzir/builtins/aggregation-functions/distinct.cpp b/libtenzir/builtins/aggregation-functions/distinct.cpp index 80797bad6cb..851db141566 100644 --- a/libtenzir/builtins/aggregation-functions/distinct.cpp +++ b/libtenzir/builtins/aggregation-functions/distinct.cpp @@ -102,20 +102,21 @@ class distinct_instance final : public aggregation_instance { } auto update(const table_slice& input, session ctx) -> void override { - auto arg = eval(expr_, input, ctx); - if (is(arg.type)) { - return; - } - for (auto i = int64_t{}; i < arg.array->length(); ++i) { - if (arg.array->IsNull(i)) { + for (auto& arg : eval(expr_, input, ctx)) { + if (is(arg.type)) { continue; } - const auto& view = value_at(arg.type, *arg.array, i); - const auto it = distinct_.find(view); - if (it != distinct_.end()) { - continue; + for (auto i = int64_t{}; i < arg.array->length(); ++i) { + if (arg.array->IsNull(i)) { + continue; + } + const auto& view = value_at(arg.type, *arg.array, i); + const auto it = distinct_.find(view); + if (it != distinct_.end()) { + continue; + } + distinct_.emplace_hint(it, materialize(view)); } - distinct_.emplace_hint(it, materialize(view)); } } diff --git a/libtenzir/builtins/aggregation-functions/first_last.cpp b/libtenzir/builtins/aggregation-functions/first_last.cpp index 6de7146349b..36476e0b75d 100644 --- a/libtenzir/builtins/aggregation-functions/first_last.cpp +++ b/libtenzir/builtins/aggregation-functions/first_last.cpp @@ -31,22 +31,23 @@ class first_last_instance final : public aggregation_instance { if (Mode == mode::first and not is(result_)) { return; } - auto arg = eval(expr_, input, ctx); - if (is(arg.type)) { - return; - } - if constexpr (Mode == mode::first) { - for (int64_t i = 0; i < arg.array->length(); ++i) { - if (arg.array->IsValid(i)) { - result_ = materialize(value_at(arg.type, *arg.array, i)); - return; - } + for (auto& arg : eval(expr_, input, ctx)) { + if (is(arg.type)) { + continue; } - } else { - for (int64_t i = arg.array->length() - 1; i >= 0; --i) { - if (arg.array->IsValid(i)) { - result_ = materialize(value_at(arg.type, *arg.array, i)); - return; + if constexpr (Mode == mode::first) { + for (int64_t i = 0; i < arg.array->length(); ++i) { + if (arg.array->IsValid(i)) { + result_ = materialize(value_at(arg.type, *arg.array, i)); + break; + } + } + } else { + for (int64_t i = arg.array->length() - 1; i >= 0; --i) { + if (arg.array->IsValid(i)) { + result_ = materialize(value_at(arg.type, *arg.array, i)); + break; + } } } } diff --git a/libtenzir/builtins/aggregation-functions/mean.cpp b/libtenzir/builtins/aggregation-functions/mean.cpp index 07781e143b1..f68412cde4d 100644 --- a/libtenzir/builtins/aggregation-functions/mean.cpp +++ b/libtenzir/builtins/aggregation-functions/mean.cpp @@ -81,55 +81,56 @@ class mean_instance final : public aggregation_instance { if (state_ == state::failed) { return; } - auto arg = eval(expr_, input, ctx); - auto f = detail::overload{ - [](const arrow::NullArray&) {}, - [&](const T& array) - requires numeric_type> - or std::same_as - { - if constexpr (std::same_as) { - if (state_ != state::dur and state_ != state::none) { - diagnostic::warning("expected `int`, `uint` or `double`, got `{}`", - arg.type.kind()) - .primary(expr_) - .emit(ctx); - state_ = state::failed; - return; - } - state_ = state::dur; - } else { - if (state_ != state::numeric and state_ != state::none) { - diagnostic::warning("got incompatible types `duration` and `{}`", - arg.type.kind()) - .primary(expr_) - .emit(ctx); - state_ = state::failed; - return; + for (auto& arg : eval(expr_, input, ctx)) { + auto f = detail::overload{ + [](const arrow::NullArray&) {}, + [&](const T& array) + requires numeric_type> + or std::same_as + { + if constexpr (std::same_as) { + if (state_ != state::dur and state_ != state::none) { + diagnostic::warning( + "expected `int`, `uint` or `double`, got `{}`", arg.type.kind()) + .primary(expr_) + .emit(ctx); + state_ = state::failed; + return; + } + state_ = state::dur; + } else { + if (state_ != state::numeric and state_ != state::none) { + diagnostic::warning("got incompatible types `duration` and `{}`", + arg.type.kind()) + .primary(expr_) + .emit(ctx); + state_ = state::failed; + return; + } + state_ = state::numeric; } - state_ = state::numeric; - } - for (auto i = int64_t{}; i < array.length(); ++i) { - if (array.IsValid(i)) { - if constexpr (std::same_as) { - if (std::isnan(array.Value(i))) { - continue; + for (auto i = int64_t{}; i < array.length(); ++i) { + if (array.IsValid(i)) { + if constexpr (std::same_as) { + if (std::isnan(array.Value(i))) { + continue; + } } + count_ += 1; + mean_ += (static_cast(array.Value(i)) - mean_) / count_; } - count_ += 1; - mean_ += (static_cast(array.Value(i)) - mean_) / count_; } - } - }, - [&](const auto&) { - diagnostic::warning("expected types `int`, `uint`, " - "`double` or `duration`, got `{}`", - arg.type.kind()) - .primary(expr_) - .emit(ctx); - state_ = state::failed; - }}; - match(*arg.array, f); + }, + [&](const auto&) { + diagnostic::warning("expected types `int`, `uint`, " + "`double` or `duration`, got `{}`", + arg.type.kind()) + .primary(expr_) + .emit(ctx); + state_ = state::failed; + }}; + match(*arg.array, f); + } } auto get() const -> data override { diff --git a/libtenzir/builtins/aggregation-functions/min_max.cpp b/libtenzir/builtins/aggregation-functions/min_max.cpp index d043f4584f7..14af08050ad 100644 --- a/libtenzir/builtins/aggregation-functions/min_max.cpp +++ b/libtenzir/builtins/aggregation-functions/min_max.cpp @@ -73,89 +73,92 @@ class min_max_instance final : public aggregation_instance { if (result_ and std::holds_alternative(result_.value())) { return; } - auto arg = eval(expr_, input, ctx); - if (not type_) { - type_ = arg.type; - } - const auto warn = [&](const auto&) -> result_t { - diagnostic::warning("got incompatible types `{}` and `{}`", type_.kind(), - arg.type.kind()) - .primary(expr_) - .emit(ctx); - return caf::none; - }; - // TODO: Matching on type of max_ might be better to reduce function calls - auto f = detail::overload{ - [](const arrow::NullArray&) {}, - [&](const T& array) - requires numeric_type> - { - for (auto i = int64_t{}; i < array.length(); ++i) { - if (array.IsValid(i)) { - const auto val = array.Value(i); - if (not result_) { - result_ = val; - continue; - } - result_ = result_->match( - warn, - [&](std::integral auto& self) -> result_t { - if constexpr (std::same_as) { - return Mode == mode::min - ? std::min(static_cast(self), val) - : std::max(static_cast(self), - val); - } else { - if (Mode == mode::min - ? std::cmp_less(val, self) - : std::cmp_greater(val, self)) { - return val; + for (auto& arg : eval(expr_, input, ctx)) { + if (not type_) { + type_ = arg.type; + } + const auto warn = [&](const auto&) -> result_t { + diagnostic::warning("got incompatible types `{}` and `{}`", + type_.kind(), arg.type.kind()) + .primary(expr_) + .emit(ctx); + return caf::none; + }; + // TODO: Matching on type of max_ might be better to reduce function calls + auto f = detail::overload{ + [](const arrow::NullArray&) {}, + [&](const T& array) + requires numeric_type> + { + for (auto i = int64_t{}; i < array.length(); ++i) { + if (array.IsValid(i)) { + const auto val = array.Value(i); + if (not result_) { + result_ = val; + continue; + } + result_ = result_->match( + warn, + [&](std::integral auto& self) -> result_t { + if constexpr (std::same_as) { + return Mode == mode::min + ? std::min(static_cast(self), + val) + : std::max(static_cast(self), + val); + } else { + if (Mode == mode::min + ? std::cmp_less(val, self) + : std::cmp_greater(val, self)) { + return val; + } + return self; } - return self; - } - }, - [&](double self) -> result_t { - return Mode == mode::min - ? std::min(self, static_cast(val)) - : std::max(self, static_cast(val)); - }); - if (std::holds_alternative( - result_.value())) { - return; + }, + [&](double self) -> result_t { + return Mode == mode::min + ? std::min(self, static_cast(val)) + : std::max(self, + static_cast(val)); + }); + if (std::holds_alternative( + result_.value())) { + return; + } } } - } - }, - [&](const T& array) - requires concepts::one_of, - duration_type, time_type> - { - using Ty = type_from_arrow_t; - for (const auto& val : values(Ty{}, array)) { - if (val) { - if (not result_) { - result_ = val; - } - result_ - = result_->match(warn, [&](type_to_data_t self) -> result_t { + }, + [&](const T& array) + requires concepts::one_of, + duration_type, time_type> + { + using Ty = type_from_arrow_t; + for (const auto& val : values(Ty{}, array)) { + if (val) { + if (not result_) { + result_ = val; + } + result_ = result_->match( + warn, [&](type_to_data_t self) -> result_t { return Mode == mode::min ? std::min(self, val.value()) : std::max(self, val.value()); }); - if (std::holds_alternative(result_.value())) { - return; + if (std::holds_alternative(result_.value())) { + return; + } } } - } - }, - [&](const auto&) { - diagnostic::warning("expected types `int`, `uint`, `double`, " - "`duration`, or `time`, but got `{}`", - arg.type.kind()) - .primary(expr_) - .emit(ctx); - result_ = caf::none; - }}; - match(*arg.array, f); + }, + [&](const auto&) { + diagnostic::warning("expected types `int`, `uint`, `double`, " + "`duration`, or `time`, but got `{}`", + arg.type.kind()) + .primary(expr_) + .emit(ctx); + result_ = caf::none; + }}; + match(*arg.array, f); + } } auto get() const -> data override { diff --git a/libtenzir/builtins/aggregation-functions/mode_value_counts.cpp b/libtenzir/builtins/aggregation-functions/mode_value_counts.cpp index fd6ad7169e8..556cc698a61 100644 --- a/libtenzir/builtins/aggregation-functions/mode_value_counts.cpp +++ b/libtenzir/builtins/aggregation-functions/mode_value_counts.cpp @@ -29,18 +29,20 @@ class instance final : public aggregation_instance { auto update(const table_slice& input, session ctx) -> void override { auto arg = eval(expr_, input, ctx); - if (is(arg.type)) { - return; - } - for (int64_t i = 0; i < arg.array->length(); ++i) { - if (arg.array->IsValid(i)) { - const auto& view = value_at(arg.type, *arg.array, i); - auto it = counts_.find(view); - if (it == counts_.end()) { - counts_.emplace_hint(it, materialize(view), 1); - continue; + for (auto& arg : eval(expr_, input, ctx)) { + if (is(arg.type)) { + continue; + } + for (int64_t i = 0; i < arg.array->length(); ++i) { + if (arg.array->IsValid(i)) { + const auto& view = value_at(arg.type, *arg.array, i); + auto it = counts_.find(view); + if (it == counts_.end()) { + counts_.emplace_hint(it, materialize(view), 1); + continue; + } + ++it.value(); } - ++it.value(); } } } diff --git a/libtenzir/builtins/aggregation-functions/stddev_variance.cpp b/libtenzir/builtins/aggregation-functions/stddev_variance.cpp index 5b34b97cd94..c84492aa2f0 100644 --- a/libtenzir/builtins/aggregation-functions/stddev_variance.cpp +++ b/libtenzir/builtins/aggregation-functions/stddev_variance.cpp @@ -94,72 +94,74 @@ class stddev_variance_instance final : public aggregation_instance { if (state_ == state::failed) { return; } - auto arg = eval(expr_, input, ctx); - auto f = detail::overload{ - [](const arrow::NullArray&) {}, - [&](const T& array) - requires numeric_type> - or std::same_as - { - if constexpr (std::same_as) { - if (state_ != state::dur and state_ != state::none) { - diagnostic::warning("got incompatible types `number` and `{}`", - arg.type.kind()) - .primary(expr_) - .emit(ctx); - state_ = state::failed; - return; + for (auto& arg : eval(expr_, input, ctx)) { + auto f = detail::overload{ + [](const arrow::NullArray&) {}, + [&](const T& array) + requires numeric_type> + or std::same_as + { + if constexpr (std::same_as) { + if (state_ != state::dur and state_ != state::none) { + diagnostic::warning("got incompatible types `number` and `{}`", + arg.type.kind()) + .primary(expr_) + .emit(ctx); + state_ = state::failed; + return; + } + if (mode_ == mode::variance) { + diagnostic::warning("expected `int`, `uint` or `double` got `{}`", + arg.type.kind()) + .primary(expr_) + .emit(ctx); + state_ = state::failed; + return; + } + state_ = state::dur; + } else { + if (state_ != state::numeric and state_ != state::none) { + diagnostic::warning("got incompatible types `duration` and `{}`", + arg.type.kind()) + .primary(expr_) + .emit(ctx); + state_ = state::failed; + return; + } + state_ = state::numeric; + } + for (auto i = int64_t{}; i < array.length(); ++i) { + if (array.IsValid(i)) { + const auto x = static_cast(array.Value(i)); + if constexpr (std::is_same_v, double_type>) { + if (std::isnan(x)) { + continue; + } + } + count_ += 1; + mean_ += (x - mean_) / count_; + mean_squared_ += ((x * x) - mean_squared_) / count_; + } } + }, + [&](const auto&) { if (mode_ == mode::variance) { diagnostic::warning("expected `int`, `uint` or `double` got `{}`", arg.type.kind()) .primary(expr_) .emit(ctx); - state_ = state::failed; - return; - } - state_ = state::dur; - } else { - if (state_ != state::numeric and state_ != state::none) { - diagnostic::warning("got incompatible types `duration` and `{}`", + } else { + diagnostic::warning("expected `int`, `uint`, `double` or " + "`duration`, " + "got `{}`", arg.type.kind()) .primary(expr_) .emit(ctx); - state_ = state::failed; - return; - } - state_ = state::numeric; - } - for (auto i = int64_t{}; i < array.length(); ++i) { - if (array.IsValid(i)) { - const auto x = static_cast(array.Value(i)); - if constexpr (std::is_same_v, double_type>) { - if (std::isnan(x)) { - continue; - } - } - count_ += 1; - mean_ += (x - mean_) / count_; - mean_squared_ += ((x * x) - mean_squared_) / count_; } - } - }, - [&](const auto&) { - if (mode_ == mode::variance) { - diagnostic::warning("expected `int`, `uint` or `double` got `{}`", - arg.type.kind()) - .primary(expr_) - .emit(ctx); - } else { - diagnostic::warning("expected `int`, `uint`, `double` or `duration`, " - "got `{}`", - arg.type.kind()) - .primary(expr_) - .emit(ctx); - } - state_ = state::failed; - }}; - match(*arg.array, f); + state_ = state::failed; + }}; + match(*arg.array, f); + } } auto get() const -> data override { diff --git a/libtenzir/builtins/aggregation-functions/sum.cpp b/libtenzir/builtins/aggregation-functions/sum.cpp index a779cc7dbed..dd67fad658a 100644 --- a/libtenzir/builtins/aggregation-functions/sum.cpp +++ b/libtenzir/builtins/aggregation-functions/sum.cpp @@ -61,104 +61,106 @@ class sum_instance : public aggregation_instance { if (sum_ and std::holds_alternative(sum_.value())) { return; } - auto s = eval(expr_, input, ctx); - if (not type_) { - type_ = s.type; - } - const auto warn = [&](const auto&) -> sum_t { - diagnostic::warning("got incompatible types `{}` and `{}`", type_.kind(), - s.type.kind()) - .primary(expr_) - .emit(ctx); - return caf::none; - }; - auto f = detail::overload{ - [](const arrow::NullArray&) {}, - [&](const T& array) - requires integral_type> - { - using Type = T::value_type; - // Int64 + UInt64 => UInt64 - // * + Double => Double - if (not sum_) { - sum_ = Type{}; - } - sum_ = sum_->match( - warn, - [&](std::integral auto& self) -> sum_t { - auto array_sum = Type{}; + for (auto& s : eval(expr_, input, ctx)) { + if (not type_) { + type_ = s.type; + } + const auto warn = [&](const auto&) -> sum_t { + diagnostic::warning("got incompatible types `{}` and `{}`", + type_.kind(), s.type.kind()) + .primary(expr_) + .emit(ctx); + return caf::none; + }; + auto f = detail::overload{ + [](const arrow::NullArray&) {}, + [&](const T& array) + requires integral_type> + { + using Type = T::value_type; + // Int64 + UInt64 => UInt64 + // * + Double => Double + if (not sum_) { + sum_ = Type{}; + } + sum_ = sum_->match( + warn, + [&](std::integral auto& self) -> sum_t { + auto array_sum = Type{}; + for (auto i = int64_t{}; i < array.length(); ++i) { + if (array.IsValid(i)) { + auto checked = checked_add(array_sum, array.Value(i)); + if (not checked) { + diagnostic::warning("integer overflow") + .primary(expr_) + .emit(ctx); + return caf::none; + } + array_sum = checked.value(); + } + } + auto checked = checked_add(self, array_sum); + if (not checked) { + diagnostic::warning("integer overflow").primary(expr_).emit(ctx); + return caf::none; + } + return checked.value(); + }, + [&](double self) -> sum_t { + for (auto i = int64_t{}; i < array.length(); ++i) { + if (array.IsValid(i)) { + self += static_cast(array.Value(i)); + } + } + return self; + }); + }, + [&](const arrow::DoubleArray& array) { + // * => Double + if (not sum_) { + sum_ = double{}; + } + sum_ + = sum_->match(warn, [&](concepts::arithmetic auto& self) -> sum_t { + auto result = static_cast(self); + for (auto i = int64_t{}; i < array.length(); ++i) { + if (array.IsValid(i)) { + result += array.Value(i); + } + } + return result; + }); + }, + [&](const arrow::DurationArray& array) { + if (not sum_) { + sum_ = duration{}; + } + sum_ = sum_->match(warn, [&](duration self) -> sum_t { for (auto i = int64_t{}; i < array.length(); ++i) { if (array.IsValid(i)) { - auto checked = checked_add(array_sum, array.Value(i)); + auto checked = checked_add(self.count(), array.Value(i)); if (not checked) { - diagnostic::warning("integer overflow") + diagnostic::warning("duration overflow") .primary(expr_) .emit(ctx); return caf::none; } - array_sum = checked.value(); - } - } - auto checked = checked_add(self, array_sum); - if (not checked) { - diagnostic::warning("integer overflow").primary(expr_).emit(ctx); - return caf::none; - } - return checked.value(); - }, - [&](double self) -> sum_t { - for (auto i = int64_t{}; i < array.length(); ++i) { - if (array.IsValid(i)) { - self += static_cast(array.Value(i)); + self += duration{array.Value(i)}; } } return self; }); - }, - [&](const arrow::DoubleArray& array) { - // * => Double - if (not sum_) { - sum_ = double{}; - } - sum_ = sum_->match(warn, [&](concepts::arithmetic auto& self) -> sum_t { - auto result = static_cast(self); - for (auto i = int64_t{}; i < array.length(); ++i) { - if (array.IsValid(i)) { - result += array.Value(i); - } - } - return result; - }); - }, - [&](const arrow::DurationArray& array) { - if (not sum_) { - sum_ = duration{}; - } - sum_ = sum_->match(warn, [&](duration self) -> sum_t { - for (auto i = int64_t{}; i < array.length(); ++i) { - if (array.IsValid(i)) { - auto checked = checked_add(self.count(), array.Value(i)); - if (not checked) { - diagnostic::warning("duration overflow") - .primary(expr_) - .emit(ctx); - return caf::none; - } - self += duration{array.Value(i)}; - } - } - return self; - }); - }, - [&](const auto&) { - diagnostic::warning("expected `int`, `uint`, `double` or `duration`, " - "got `{}`", - s.type.kind()) - .primary(expr_) - .emit(ctx); - sum_ = caf::none; - }}; - match(*s.array, f); + }, + [&](const auto&) { + diagnostic::warning("expected `int`, `uint`, `double` or `duration`, " + "got `{}`", + s.type.kind()) + .primary(expr_) + .emit(ctx); + sum_ = caf::none; + }}; + match(*s.array, f); + } } auto get() const -> data override { diff --git a/libtenzir/builtins/formats/cef.cpp b/libtenzir/builtins/formats/cef.cpp index 2eaf20830f8..880aba251cc 100644 --- a/libtenzir/builtins/formats/cef.cpp +++ b/libtenzir/builtins/formats/cef.cpp @@ -283,44 +283,37 @@ class parse_cef final : public virtual function_plugin { .parse(inv, ctx)); return function_use::make( [call = inv.call, expr = std::move(expr)](auto eval, session ctx) { - auto arg = eval(expr); - auto f = detail::overload{ - [&](const arrow::NullArray&) { - return arg; - }, - [&](const arrow::StringArray& arg) { - auto b = series_builder{}; - for (auto string : arg) { - if (not string) { - b.null(); - continue; + return map_series(eval(expr), [&](series arg) { + auto f = detail::overload{ + [&](const arrow::NullArray&) { + return multi_series{arg}; + }, + [&](const arrow::StringArray& arg) { + // TODO: Use multi-series builder here. + auto b = series_builder{}; + for (auto string : arg) { + if (not string) { + b.null(); + continue; + } + auto diag = parse_line(*string, b); + if (diag) { + ctx.dh().emit(std::move(*diag)); + b.null(); + } } - auto diag = parse_line(*string, b); - if (diag) { - ctx.dh().emit(std::move(*diag)); - b.null(); - } - } - auto result = b.finish(); - // TODO: Consider whether we need heterogeneous for this. If so, - // then we must extend the evaluator accordingly. - if (result.size() != 1) { - diagnostic::warning("got incompatible CEF messages") + return multi_series{b.finish()}; + }, + [&](const auto&) { + diagnostic::warning("`parse_cef` expected `string`, got `{}`", + arg.type.kind()) .primary(call) .emit(ctx); return series::null(null_type{}, arg.length()); - } - return std::move(result[0]); - }, - [&](const auto&) { - diagnostic::warning("`parse_cef` expected `string`, got `{}`", - arg.type.kind()) - .primary(call) - .emit(ctx); - return series::null(null_type{}, arg.length()); - }, - }; - return match(*arg.array, f); + }, + }; + return match(*arg.array, f); + }); }); } }; diff --git a/libtenzir/builtins/formats/grok.cpp b/libtenzir/builtins/formats/grok.cpp index 9f988637a0f..252d557cda0 100644 --- a/libtenzir/builtins/formats/grok.cpp +++ b/libtenzir/builtins/formats/grok.cpp @@ -710,10 +710,10 @@ class parse_grok_plugin final : public virtual function_plugin { include_unnamed, multi_series_builder::options{}, }; - return function_use::make( - [input = std::move(input), - parser = std::move(parser)](evaluator eval, session ctx) -> series { - auto values = eval(input); + return function_use::make([input = std::move(input), + parser = std::move(parser)](evaluator eval, + session ctx) { + return map_series(eval(input), [&](series values) { if (values.type.kind().is()) { return values; } @@ -734,6 +734,7 @@ class parse_grok_plugin final : public virtual function_plugin { } return std::move(output[0]); }); + }); } }; diff --git a/libtenzir/builtins/formats/json.cpp b/libtenzir/builtins/formats/json.cpp index b40a4963066..f5bfd64a1fc 100644 --- a/libtenzir/builtins/formats/json.cpp +++ b/libtenzir/builtins/formats/json.cpp @@ -1258,66 +1258,67 @@ class parse_json_plugin final : public virtual function_plugin { .positional("x", expr, "string") .parse(inv, ctx)); return function_use::make( - [call = inv.call.get_location(), - expr = std::move(expr)](evaluator eval, session ctx) -> series { - auto arg = eval(expr); - auto f = detail::overload{ - [&](const arrow::NullArray&) { - return arg; - }, - [&](const arrow::StringArray& arg) { - auto parser = simdjson::ondemand::parser{}; - auto b = series_builder{}; - for (auto i = int64_t{0}; i < arg.length(); ++i) { - if (arg.IsNull(i)) { - b.null(); - continue; - } - auto str = std::string{arg.Value(i)}; - doc_parser doc_p = doc_parser(str, ctx); - auto doc = parser.iterate(str); - if (doc.error()) { - diagnostic::warning("{}", error_message(doc.error())) - .primary(call) - .emit(ctx); - b.null(); - continue; - } - const auto result - = doc_p.parse_value(doc.get_value(), builder_ref{b}, 0); - switch (result) { - case doc_parser::result::failure_with_write: - b.remove_last(); - [[fallthrough]]; - case doc_parser::result::failure_no_change: - diagnostic::warning("could not parse json") + [call = inv.call.get_location(), expr = std::move(expr)](evaluator eval, + session ctx) { + return map_series(eval(expr), [&](series arg) { + auto f = detail::overload{ + [&](const arrow::NullArray&) { + return arg; + }, + [&](const arrow::StringArray& arg) { + auto parser = simdjson::ondemand::parser{}; + auto b = series_builder{}; + for (auto i = int64_t{0}; i < arg.length(); ++i) { + if (arg.IsNull(i)) { + b.null(); + continue; + } + auto str = std::string{arg.Value(i)}; + doc_parser doc_p = doc_parser(str, ctx); + auto doc = parser.iterate(str); + if (doc.error()) { + diagnostic::warning("{}", error_message(doc.error())) .primary(call) .emit(ctx); b.null(); - break; - case doc_parser::result::success: /*no op*/; + continue; + } + const auto result + = doc_p.parse_value(doc.get_value(), builder_ref{b}, 0); + switch (result) { + case doc_parser::result::failure_with_write: + b.remove_last(); + [[fallthrough]]; + case doc_parser::result::failure_no_change: + diagnostic::warning("could not parse json") + .primary(call) + .emit(ctx); + b.null(); + break; + case doc_parser::result::success: /*no op*/; + } } - } - auto result = b.finish(); - // TODO: Consider whether we need heterogeneous for this. If so, - // then we must extend the evaluator accordingly. - if (result.size() != 1) { - diagnostic::warning("got incompatible JSON values") + auto result = b.finish(); + // TODO: Consider whether we need heterogeneous for this. If so, + // then we must extend the evaluator accordingly. + if (result.size() != 1) { + diagnostic::warning("got incompatible JSON values") + .primary(call) + .emit(ctx); + return series::null(null_type{}, arg.length()); + } + return std::move(result[0]); + }, + [&](const auto&) { + diagnostic::warning("`parse_json` expected `string`, got `{}`", + arg.type.kind()) .primary(call) .emit(ctx); return series::null(null_type{}, arg.length()); - } - return std::move(result[0]); - }, - [&](const auto&) { - diagnostic::warning("`parse_json` expected `string`, got `{}`", - arg.type.kind()) - .primary(call) - .emit(ctx); - return series::null(null_type{}, arg.length()); - }, - }; - return match(*arg.array, f); + }, + }; + return match(*arg.array, f); + }); }); } }; diff --git a/libtenzir/builtins/formats/leef.cpp b/libtenzir/builtins/formats/leef.cpp index 16594384607..6f165d620a9 100644 --- a/libtenzir/builtins/formats/leef.cpp +++ b/libtenzir/builtins/formats/leef.cpp @@ -361,44 +361,45 @@ class parse_leef final : public virtual function_plugin { .parse(inv, ctx)); return function_use::make( [call = inv.call, expr = std::move(expr)](auto eval, session ctx) { - auto arg = eval(expr); - auto f = detail::overload{ - [&](const arrow::NullArray&) { - return arg; - }, - [&](const arrow::StringArray& arg) { - auto b = series_builder{}; - for (auto string : arg) { - if (not string) { - b.null(); - continue; + return map_series(eval(expr), [&](series arg) { + auto f = detail::overload{ + [&](const arrow::NullArray&) { + return arg; + }, + [&](const arrow::StringArray& arg) { + auto b = series_builder{}; + for (auto string : arg) { + if (not string) { + b.null(); + continue; + } + auto diag = parse_line(*string, b); + if (diag) { + ctx.dh().emit(std::move(*diag)); + b.null(); + } } - auto diag = parse_line(*string, b); - if (diag) { - ctx.dh().emit(std::move(*diag)); - b.null(); + auto result = b.finish(); + // TODO: Consider whether we need heterogeneous for this. If so, + // then we must extend the evaluator accordingly. + if (result.size() != 1) { + diagnostic::warning("got incompatible CEF messages") + .primary(call) + .emit(ctx); + return series::null(null_type{}, arg.length()); } - } - auto result = b.finish(); - // TODO: Consider whether we need heterogeneous for this. If so, - // then we must extend the evaluator accordingly. - if (result.size() != 1) { - diagnostic::warning("got incompatible CEF messages") + return std::move(result[0]); + }, + [&](const auto&) { + diagnostic::warning("`parse_leef` expected `string`, got `{}`", + arg.type.kind()) .primary(call) .emit(ctx); return series::null(null_type{}, arg.length()); - } - return std::move(result[0]); - }, - [&](const auto&) { - diagnostic::warning("`parse_cef` expected `string`, got `{}`", - arg.type.kind()) - .primary(call) - .emit(ctx); - return series::null(null_type{}, arg.length()); - }, - }; - return match(*arg.array, f); + }, + }; + return match(*arg.array, f); + }); }); } }; diff --git a/libtenzir/builtins/functions/base64.cpp b/libtenzir/builtins/functions/base64.cpp index f411fd583fe..b74d3f3192f 100644 --- a/libtenzir/builtins/functions/base64.cpp +++ b/libtenzir/builtins/functions/base64.cpp @@ -32,46 +32,48 @@ class plugin final : public function_plugin { TRY(argument_parser2::function(name()) .positional("value", expr, "blob|string") .parse(inv, ctx)); - return function_use::make([expr = std::move(expr)](evaluator eval, - session ctx) -> series { - const auto value = eval(expr); - const auto f = detail::overload{ - [&](const arrow::NullArray& array) -> series { - return series::null(Type{}, array.length()); - }, - [&](const concepts::one_of auto& + return function_use::make([expr + = std::move(expr)](evaluator eval, session ctx) { + return map_series(eval(expr), [&](series value) { + const auto f = detail::overload{ + [&](const arrow::NullArray& array) -> series { + return series::null(Type{}, array.length()); + }, + [&]( + const concepts::one_of auto& array) -> series { - auto b = Type::make_arrow_builder(arrow::default_memory_pool()); - check(b->Reserve(array.length())); - for (auto i = int64_t{}; i < array.length(); ++i) { - if (array.IsNull(i)) { - check(b->AppendNull()); - continue; - } - if constexpr (Mode == mode::encode) { - check(b->Append(detail::base64::encode(array.Value(i)))); - } else { - const auto decoded = detail::base64::try_decode(array.Value(i)); - if (not decoded) { - diagnostic::warning("invalid base64 encoding") - .primary(expr) - .emit(ctx); + auto b = Type::make_arrow_builder(arrow::default_memory_pool()); + check(b->Reserve(array.length())); + for (auto i = int64_t{}; i < array.length(); ++i) { + if (array.IsNull(i)) { check(b->AppendNull()); continue; } - check(b->Append(decoded.value())); + if constexpr (Mode == mode::encode) { + check(b->Append(detail::base64::encode(array.Value(i)))); + } else { + const auto decoded = detail::base64::try_decode(array.Value(i)); + if (not decoded) { + diagnostic::warning("invalid base64 encoding") + .primary(expr) + .emit(ctx); + check(b->AppendNull()); + continue; + } + check(b->Append(decoded.value())); + } } - } - return series{Type{}, finish(*b)}; - }, - [&](const auto&) -> series { - diagnostic::warning("expected `blob` or `string`, got `{}`", - value.type.kind()) - .primary(expr) - .emit(ctx); - return series::null(Type{}, value.length()); - }}; - return match(*value.array, f); + return series{Type{}, finish(*b)}; + }, + [&](const auto&) -> series { + diagnostic::warning("expected `blob` or `string`, got `{}`", + value.type.kind()) + .primary(expr) + .emit(ctx); + return series::null(Type{}, value.length()); + }}; + return match(*value.array, f); + }); }); } }; diff --git a/libtenzir/builtins/functions/ceil_round_floor.cpp b/libtenzir/builtins/functions/ceil_round_floor.cpp index 229d646bf38..1f3284817de 100644 --- a/libtenzir/builtins/functions/ceil_round_floor.cpp +++ b/libtenzir/builtins/functions/ceil_round_floor.cpp @@ -44,135 +44,137 @@ class plugin final : public function_plugin { } return function_use::make([expr = std::move(expr), spec = std::move(spec), inv_loc = inv.call.get_location(), - this](evaluator eval, session ctx) -> series { - const auto value = located{eval(expr), expr.get_location()}; - const auto& ty = value.inner.type; - const auto length = value.inner.length(); - if (not spec) { - // fn() + this](evaluator eval, session ctx) { + return map_series(eval(expr), [&](series value) { + const auto& ty = value.type; + const auto length = value.length(); + if (not spec) { + // fn() + const auto f = detail::overload{ + [&](const arrow::NullArray&) { + return series::null(ty, length); + }, + [&] T>( + const T&) { + return value; + }, + [&](const arrow::DoubleArray& arg) { + // overflow logic from int.cpp + auto b = arrow::Int64Builder{}; + check(b.Reserve(length)); + constexpr auto min + = static_cast(std::numeric_limits::lowest()) + - 1.0; + constexpr auto max + = static_cast(std::numeric_limits::max()) + + 1.0; + auto overflow = false; + for (auto row = int64_t{0}; row < length; ++row) { + if (arg.IsNull(row) or not std::isfinite(arg.Value(row))) { + check(b.AppendNull()); + continue; + } + auto val = [&]() { + if constexpr (Mode == mode::ceil) { + return std::ceil(arg.Value(row)); + } else if constexpr (Mode == mode::floor) { + return std::floor(arg.Value(row)); + } else { + TENZIR_ASSERT(Mode == mode::round); + return std::round(arg.Value(row)); + } + }(); + if (not(val > min) || not(val < max)) { + check(b.AppendNull()); + overflow = true; + continue; + } + check(b.Append(static_cast(val))); + } + if (overflow) { + diagnostic::warning("integer overflow in `{}`", name()) + .primary(expr) + .emit(ctx); + } + return series{int64_type{}, finish(b)}; + }, + [&] T>( + const T&) { + diagnostic::warning("`{}` with `{}` requires a resolution", + name(), ty.kind()) + .primary(expr) + .hint("for example `{}(x, 1h)`", name()) + .emit(ctx); + return series::null(ty, length); + }, + [&](const auto&) { + diagnostic::warning("`{}` expected `number`, got `{}`", name(), + ty.kind()) + .primary(expr) + .emit(ctx); + return series::null(ty, length); + }, + }; + return match(*value.array, f); + } + // fn(, ) + // fn(x, 1h) -> to multiples of 1h + // fn(