Skip to content

Commit

Permalink
Merge branch 'main' into topic/context-erase
Browse files Browse the repository at this point in the history
  • Loading branch information
dominiklohmann committed Dec 16, 2024
2 parents 53b93c6 + 4a6dd99 commit 350fcbb
Show file tree
Hide file tree
Showing 125 changed files with 3,667 additions and 3,112 deletions.
4 changes: 4 additions & 0 deletions changelog/next/bug-fixes/4839--varying-metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Metadata such as `@name` can now be set to a dynamically computed value that
does not have to be a constant. For example, if the field `event_name` should be
used as the event name, `@name = event_name` now correctly assigns the events
their name instead of using the first value.
2 changes: 2 additions & 0 deletions changelog/next/changes/4839-varying-expression-types.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Functions can now return values of different types for the same input types. For
example, `x.otherwise(y)` no longer requires that `x` has the same type as `y`.
2 changes: 1 addition & 1 deletion contrib/tenzir-plugins
35 changes: 18 additions & 17 deletions libtenzir/builtins/aggregation-functions/all.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,25 @@ class all_instance final : public aggregation_instance {
if (state_ == state::failed) {
return;
}
auto arg = eval(expr_, input, ctx);
auto f = detail::overload{
[&](const arrow::NullArray&) {
state_ = state::nulled;
},
[&](const arrow::BooleanArray& array) {
all_ = all_ and array.false_count() == 0;
if (array.null_count() > 0) {
for (auto& arg : eval(expr_, input, ctx)) {
auto f = detail::overload{
[&](const arrow::NullArray&) {
state_ = state::nulled;
}
},
[&](auto&&) {
diagnostic::warning("expected type `bool`, got `{}`", arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
}};
match(*arg.array, f);
},
[&](const arrow::BooleanArray& array) {
all_ = all_ and array.false_count() == 0;
if (array.null_count() > 0) {
state_ = state::nulled;
}
},
[&](auto&&) {
diagnostic::warning("expected type `bool`, got `{}`", arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
}};
match(*arg.array, f);
}
}

auto get() const -> data override {
Expand Down
35 changes: 18 additions & 17 deletions libtenzir/builtins/aggregation-functions/any.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,25 @@ class any_instance final : public aggregation_instance {
if (state_ == state::failed) {
return;
}
auto arg = eval(expr_, input, ctx);
auto f = detail::overload{
[&](const arrow::NullArray&) {
state_ = state::nulled;
},
[&](const arrow::BooleanArray& array) {
any_ = any_ or array.true_count() > 0;
if (array.null_count() > 0) {
for (auto& arg : eval(expr_, input, ctx)) {
auto f = detail::overload{
[&](const arrow::NullArray&) {
state_ = state::nulled;
}
},
[&](auto&&) {
diagnostic::warning("expected type `bool`, got `{}`", arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
}};
match(*arg.array, f);
},
[&](const arrow::BooleanArray& array) {
any_ = any_ or array.true_count() > 0;
if (array.null_count() > 0) {
state_ = state::nulled;
}
},
[&](auto&&) {
diagnostic::warning("expected type `bool`, got `{}`", arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
}};
match(*arg.array, f);
}
}

auto get() const -> data override {
Expand Down
17 changes: 9 additions & 8 deletions libtenzir/builtins/aggregation-functions/collect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ class collect_instance final : public aggregation_instance {
}

auto update(const table_slice& input, session ctx) -> void override {
auto arg = eval(expr_, input, ctx);
if (is<null_type>(arg.type)) {
return;
}
// NOTE: Currently, different types end up coerced to strings.
for (auto i = int64_t{}; i < arg.array->length(); ++i) {
if (arg.array->IsNull(i)) {
for (auto& arg : eval(expr_, input, ctx)) {
if (is<null_type>(arg.type)) {
continue;
}
result_.push_back(materialize(value_at(arg.type, *arg.array, i)));
// NOTE: Currently, different types end up coerced to strings.
for (auto i = int64_t{}; i < arg.array->length(); ++i) {
if (arg.array->IsNull(i)) {
continue;
}
result_.push_back(materialize(value_at(arg.type, *arg.array, i)));
}
}
}

Expand Down
23 changes: 12 additions & 11 deletions libtenzir/builtins/aggregation-functions/count_distinct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,19 @@ class count_distinct_instance final : public aggregation_instance {
}

auto update(const table_slice& input, session ctx) -> void override {
auto arg = eval(expr_, input, ctx);
if (is<null_type>(arg.type)) {
return;
}
for (auto i = int64_t{}; i < arg.array->length(); ++i) {
if (arg.array->IsValid(i)) {
const auto& view = value_at(arg.type, *arg.array, i);
const auto it = distinct_.find(view);
if (it != distinct_.end()) {
continue;
for (auto& arg : eval(expr_, input, ctx)) {
if (is<null_type>(arg.type)) {
continue;
}
for (auto i = int64_t{}; i < arg.array->length(); ++i) {
if (arg.array->IsValid(i)) {
const auto& view = value_at(arg.type, *arg.array, i);
const auto it = distinct_.find(view);
if (it != distinct_.end()) {
continue;
}
distinct_.emplace_hint(it, materialize(view));
}
distinct_.emplace_hint(it, materialize(view));
}
}
}
Expand Down
23 changes: 12 additions & 11 deletions libtenzir/builtins/aggregation-functions/distinct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,21 @@ class distinct_instance final : public aggregation_instance {
}

auto update(const table_slice& input, session ctx) -> void override {
auto arg = eval(expr_, input, ctx);
if (is<null_type>(arg.type)) {
return;
}
for (auto i = int64_t{}; i < arg.array->length(); ++i) {
if (arg.array->IsNull(i)) {
for (auto& arg : eval(expr_, input, ctx)) {
if (is<null_type>(arg.type)) {
continue;
}
const auto& view = value_at(arg.type, *arg.array, i);
const auto it = distinct_.find(view);
if (it != distinct_.end()) {
continue;
for (auto i = int64_t{}; i < arg.array->length(); ++i) {
if (arg.array->IsNull(i)) {
continue;
}
const auto& view = value_at(arg.type, *arg.array, i);
const auto it = distinct_.find(view);
if (it != distinct_.end()) {
continue;
}
distinct_.emplace_hint(it, materialize(view));
}
distinct_.emplace_hint(it, materialize(view));
}
}

Expand Down
31 changes: 16 additions & 15 deletions libtenzir/builtins/aggregation-functions/first_last.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ class first_last_instance final : public aggregation_instance {
if (Mode == mode::first and not is<caf::none_t>(result_)) {
return;
}
auto arg = eval(expr_, input, ctx);
if (is<null_type>(arg.type)) {
return;
}
if constexpr (Mode == mode::first) {
for (int64_t i = 0; i < arg.array->length(); ++i) {
if (arg.array->IsValid(i)) {
result_ = materialize(value_at(arg.type, *arg.array, i));
return;
}
for (auto& arg : eval(expr_, input, ctx)) {
if (is<null_type>(arg.type)) {
continue;
}
} else {
for (int64_t i = arg.array->length() - 1; i >= 0; --i) {
if (arg.array->IsValid(i)) {
result_ = materialize(value_at(arg.type, *arg.array, i));
return;
if constexpr (Mode == mode::first) {
for (int64_t i = 0; i < arg.array->length(); ++i) {
if (arg.array->IsValid(i)) {
result_ = materialize(value_at(arg.type, *arg.array, i));
break;
}
}
} else {
for (int64_t i = arg.array->length() - 1; i >= 0; --i) {
if (arg.array->IsValid(i)) {
result_ = materialize(value_at(arg.type, *arg.array, i));
break;
}
}
}
}
Expand Down
91 changes: 46 additions & 45 deletions libtenzir/builtins/aggregation-functions/mean.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,55 +81,56 @@ class mean_instance final : public aggregation_instance {
if (state_ == state::failed) {
return;
}
auto arg = eval(expr_, input, ctx);
auto f = detail::overload{
[](const arrow::NullArray&) {},
[&]<class T>(const T& array)
requires numeric_type<type_from_arrow_t<T>>
or std::same_as<T, arrow::DurationArray>
{
if constexpr (std::same_as<T, arrow::DurationArray>) {
if (state_ != state::dur and state_ != state::none) {
diagnostic::warning("expected `int`, `uint` or `double`, got `{}`",
arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
return;
}
state_ = state::dur;
} else {
if (state_ != state::numeric and state_ != state::none) {
diagnostic::warning("got incompatible types `duration` and `{}`",
arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
return;
for (auto& arg : eval(expr_, input, ctx)) {
auto f = detail::overload{
[](const arrow::NullArray&) {},
[&]<class T>(const T& array)
requires numeric_type<type_from_arrow_t<T>>
or std::same_as<T, arrow::DurationArray>
{
if constexpr (std::same_as<T, arrow::DurationArray>) {
if (state_ != state::dur and state_ != state::none) {
diagnostic::warning(
"expected `int`, `uint` or `double`, got `{}`", arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
return;
}
state_ = state::dur;
} else {
if (state_ != state::numeric and state_ != state::none) {
diagnostic::warning("got incompatible types `duration` and `{}`",
arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
return;
}
state_ = state::numeric;
}
state_ = state::numeric;
}
for (auto i = int64_t{}; i < array.length(); ++i) {
if (array.IsValid(i)) {
if constexpr (std::same_as<T, arrow::DoubleArray>) {
if (std::isnan(array.Value(i))) {
continue;
for (auto i = int64_t{}; i < array.length(); ++i) {
if (array.IsValid(i)) {
if constexpr (std::same_as<T, arrow::DoubleArray>) {
if (std::isnan(array.Value(i))) {
continue;
}
}
count_ += 1;
mean_ += (static_cast<double>(array.Value(i)) - mean_) / count_;
}
count_ += 1;
mean_ += (static_cast<double>(array.Value(i)) - mean_) / count_;
}
}
},
[&](const auto&) {
diagnostic::warning("expected types `int`, `uint`, "
"`double` or `duration`, got `{}`",
arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
}};
match(*arg.array, f);
},
[&](const auto&) {
diagnostic::warning("expected types `int`, `uint`, "
"`double` or `duration`, got `{}`",
arg.type.kind())
.primary(expr_)
.emit(ctx);
state_ = state::failed;
}};
match(*arg.array, f);
}
}

auto get() const -> data override {
Expand Down
Loading

0 comments on commit 350fcbb

Please sign in to comment.