Skip to content

Commit

Permalink
feat(agg): support aggregate: prefixed scalar function in streaming…
Browse files Browse the repository at this point in the history
… agg (#18205)

Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Aug 27, 2024
1 parent 3c31ef9 commit e3a9d37
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 16 deletions.
62 changes: 62 additions & 0 deletions e2e_test/streaming/aggregate/wrap_scalar.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (a varchar, b int, c int);

statement ok
insert into t values ('aaa', 1, 1), ('bbb', 0, 2), ('ccc', 0, 5), ('ddd', 1, 4);

statement ok
create materialized view mv1 as select aggregate:array_sum(c) as res from t;

statement ok
create materialized view mv2 as select b, aggregate:array_max(c) as res from t group by b;

statement ok
create function myjoin(text[]) returns text language sql as $$ select array_join($1, ', '); $$;

statement ok
create materialized view mv3 as select b, aggregate:myjoin(a order by c) as res from t group by b;

query I
select * from mv1;
----
12

query II
select * from mv2 order by b;
----
0 5
1 4

query IT
select * from mv3 order by b;
----
0 bbb, ccc
1 aaa, ddd

statement ok
insert into t values ('x', 1, 2), ('y', 3, 6);

query I
select * from mv1;
----
20

query II
select * from mv2 order by b;
----
0 5
1 4
3 6

query IT
select * from mv3 order by b;
----
0 bbb, ccc
1 aaa, x, ddd
3 y

statement ok
drop table t cascade;
18 changes: 18 additions & 0 deletions src/expr/core/src/aggregate/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,24 @@ pub mod agg_kinds {
}
pub use single_value_state_iff_in_append_only;

/// [`AggKind`](super::AggKind)s that are implemented with a materialized input state.
#[macro_export]
macro_rules! materialized_input_state {
() => {
AggKind::Builtin(
PbAggKind::Min
| PbAggKind::Max
| PbAggKind::FirstValue
| PbAggKind::LastValue
| PbAggKind::StringAgg
| PbAggKind::ArrayAgg
| PbAggKind::JsonbAgg
| PbAggKind::JsonbObjectAgg,
) | AggKind::WrapScalar(_)
};
}
pub use materialized_input_state;

/// Ordered-set aggregate functions.
#[macro_export]
macro_rules! ordered_set {
Expand Down
24 changes: 9 additions & 15 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,17 +418,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
AggCallState::Value
}
agg_kinds::single_value_state!() => AggCallState::Value,
AggKind::Builtin(
PbAggKind::Min
| PbAggKind::Max
| PbAggKind::FirstValue
| PbAggKind::LastValue
| PbAggKind::StringAgg
| PbAggKind::ArrayAgg
| PbAggKind::JsonbAgg
| PbAggKind::JsonbObjectAgg,
)
| AggKind::WrapScalar(_) => {
agg_kinds::materialized_input_state!() => {
// columns with order requirement in state table
let sort_keys = {
match agg_call.agg_kind {
Expand All @@ -444,7 +434,8 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
| PbAggKind::StringAgg
| PbAggKind::ArrayAgg
| PbAggKind::JsonbAgg,
) => {
)
| AggKind::WrapScalar(_) => {
if agg_call.order_by.is_empty() {
me.ctx().warn_to_user(format!(
"{} without ORDER BY may produce non-deterministic result",
Expand All @@ -469,8 +460,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
})
.collect()
}
AggKind::Builtin(PbAggKind::JsonbObjectAgg)
| AggKind::WrapScalar(_) => agg_call
AggKind::Builtin(PbAggKind::JsonbObjectAgg) => agg_call
.order_by
.iter()
.map(|o| (o.order_type, o.column_index))
Expand All @@ -491,14 +481,18 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {

// other columns that should be contained in state table
let include_keys = match agg_call.agg_kind {
// `agg_kinds::materialized_input_state` except for `min`/`max`
AggKind::Builtin(
PbAggKind::FirstValue
| PbAggKind::LastValue
| PbAggKind::StringAgg
| PbAggKind::ArrayAgg
| PbAggKind::JsonbAgg
| PbAggKind::JsonbObjectAgg,
) => agg_call.inputs.iter().map(|i| i.index).collect(),
)
| AggKind::WrapScalar(_) => {
agg_call.inputs.iter().map(|i| i.index).collect()
}
_ => vec![],
};

Expand Down
3 changes: 2 additions & 1 deletion src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ impl MaterializedInputState {
| PbAggKind::ArrayAgg
| PbAggKind::JsonbAgg
| PbAggKind::JsonbObjectAgg,
) => Box::new(GenericAggStateCache::new(
)
| AggKind::WrapScalar(_) => Box::new(GenericAggStateCache::new(
OrderedStateCache::new(),
agg_call.args.arg_types(),
)),
Expand Down

0 comments on commit e3a9d37

Please sign in to comment.