diff --git a/opteryx/__version__.py b/opteryx/__version__.py index 98bf044f3..6f71a88ce 100644 --- a/opteryx/__version__.py +++ b/opteryx/__version__.py @@ -1,4 +1,4 @@ -__build__ = 871 +__build__ = 874 # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/opteryx/functions/other_functions.py b/opteryx/functions/other_functions.py index e3a0b097f..dc39e2ada 100644 --- a/opteryx/functions/other_functions.py +++ b/opteryx/functions/other_functions.py @@ -138,7 +138,7 @@ def if_null(values, replacement): def null_if(col1, col2): - """ + """ Parameters: col1: Union[numpy.ndarray, list] The first input array. diff --git a/opteryx/models/physical_plan.py b/opteryx/models/physical_plan.py index 2638a540a..aa664f885 100644 --- a/opteryx/models/physical_plan.py +++ b/opteryx/models/physical_plan.py @@ -19,17 +19,18 @@ from typing import Optional from typing import Tuple +import pyarrow + from opteryx import EOS from opteryx.constants import ResultType from opteryx.exceptions import InvalidInternalStateError from opteryx.third_party.travers import Graph -import pyarrow - morsel_lock = Lock() active_task_lock = Lock() active_tasks: int = 0 + def active_tasks_increment(value: int): global active_tasks with active_task_lock: @@ -115,7 +116,6 @@ def _inner_explain(node, depth): print(table) return table - def execute(self, head_node=None) -> Generator[Tuple[Any, ResultType], Any, Any]: from opteryx.operators import ExplainNode from opteryx.operators import JoinNode @@ -135,7 +135,7 @@ def mark_node_exhausted(node_id): return # Node is already marked as exhausted node_exhaustion[node_id] = True - print("+", node_id, self[node_id].name) +# print("+", node_id, self[node_id].name) # Notify downstream nodes for _, downstream_node, _ in self.outgoing_edges(node_id): @@ -258,7 +258,13 @@ def should_stop(): all_nodes_exhausted = all(node_exhaustion.values()) queues_empty = work_queue.empty() and response_queue.empty() all_nodes_inactive = active_tasks <= 0 - print(node_exhaustion.values(), all(node_exhaustion.values()), work_queue.empty(), response_queue.empty(), active_tasks) +# print( +# node_exhaustion.values(), +# all(node_exhaustion.values()), +# work_queue.empty(), +# response_queue.empty(), +# active_tasks, +# ) return all_nodes_exhausted and queues_empty and all_nodes_inactive while not should_stop(): diff --git a/opteryx/operators/aggregate_and_group_node.py b/opteryx/operators/aggregate_and_group_node.py index c5463a529..28ad741f3 100644 --- a/opteryx/operators/aggregate_and_group_node.py +++ b/opteryx/operators/aggregate_and_group_node.py @@ -139,7 +139,6 @@ def execute(self, morsel: pyarrow.Table): groups = groups.rename_columns(list(self.column_map.keys()) + self.group_by_columns) yield groups - yield EOS return morsel = project(morsel, self.all_identifiers) @@ -153,3 +152,4 @@ def execute(self, morsel: pyarrow.Table): morsel = evaluate_and_append(self.groups, morsel) self.buffer.append(morsel) + yield None diff --git a/opteryx/operators/cross_join_node.py b/opteryx/operators/cross_join_node.py index 5a0ed5268..f2a4b08f8 100644 --- a/opteryx/operators/cross_join_node.py +++ b/opteryx/operators/cross_join_node.py @@ -325,11 +325,13 @@ def config(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if not self.continue_executing: - return None + yield None + return if self._unnest_column is not None: if morsel == EOS: self.continue_executing = False + yield None return if isinstance(self._unnest_column.value, tuple): yield from _cross_join_unnest_literal( @@ -366,4 +368,4 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: yield from _cross_join(self.left_relation, right_table) else: self.right_buffer.append(morsel) - yield None + yield None diff --git a/opteryx/operators/distinct_node.py b/opteryx/operators/distinct_node.py index 2a1478f9f..384bef653 100644 --- a/opteryx/operators/distinct_node.py +++ b/opteryx/operators/distinct_node.py @@ -59,7 +59,7 @@ def execute(self, morsel: Table) -> Table: # limit processing if morsel == EOS: - yield EOS + yield None return unique_indexes, self.hash_set = distinct( diff --git a/opteryx/operators/filter_node.py b/opteryx/operators/filter_node.py index 1346d4a60..ff8940aad 100644 --- a/opteryx/operators/filter_node.py +++ b/opteryx/operators/filter_node.py @@ -57,7 +57,7 @@ def name(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if morsel == EOS: - yield EOS + yield None return if morsel.num_rows == 0: diff --git a/opteryx/operators/heap_sort_node.py b/opteryx/operators/heap_sort_node.py index 3a5a1d1fd..24c419a71 100644 --- a/opteryx/operators/heap_sort_node.py +++ b/opteryx/operators/heap_sort_node.py @@ -138,3 +138,5 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: ) # Slice the sorted table self.table = self.table.take(sort_indices[: self.limit]) + + yield None diff --git a/opteryx/operators/inner_join_node.py b/opteryx/operators/inner_join_node.py index 5d5ef74a4..d655ef494 100644 --- a/opteryx/operators/inner_join_node.py +++ b/opteryx/operators/inner_join_node.py @@ -122,6 +122,7 @@ def execute(self, morsel: Table) -> Table: return if morsel == EOS: + yield None return # do the join diff --git a/opteryx/operators/inner_join_node_single.py b/opteryx/operators/inner_join_node_single.py index 2187e7701..b21e97aa7 100644 --- a/opteryx/operators/inner_join_node_single.py +++ b/opteryx/operators/inner_join_node_single.py @@ -208,6 +208,7 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: return if morsel == EOS: + yield None return # do the join diff --git a/opteryx/operators/limit_node.py b/opteryx/operators/limit_node.py index b4e4a40ae..5db185dea 100644 --- a/opteryx/operators/limit_node.py +++ b/opteryx/operators/limit_node.py @@ -49,6 +49,7 @@ def config(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if morsel == EOS: + yield None return if self.rows_left_to_skip > 0: @@ -64,9 +65,8 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if self.remaining_rows <= 0 or morsel.num_rows == 0: yield morsel.slice(offset=0, length=0) - yield - if morsel.num_rows < self.remaining_rows: + elif morsel.num_rows < self.remaining_rows: self.remaining_rows -= morsel.num_rows yield morsel else: diff --git a/opteryx/operators/projection_node.py b/opteryx/operators/projection_node.py index 213c56b10..4f150ea72 100644 --- a/opteryx/operators/projection_node.py +++ b/opteryx/operators/projection_node.py @@ -64,7 +64,7 @@ def name(self): # pragma: no cover def execute(self, morsel: pyarrow.Table) -> pyarrow.Table: if morsel == EOS: - yield EOS + yield None return # If any of the columns need evaluating, we need to do that here diff --git a/opteryx/operators/sort_node.py b/opteryx/operators/sort_node.py index c6ede6fe5..87e471ceb 100644 --- a/opteryx/operators/sort_node.py +++ b/opteryx/operators/sort_node.py @@ -69,7 +69,6 @@ def execute(self, morsel: Table) -> Table: new_order = numpy.argsort(numpy.random.uniform(size=table.num_rows)) table = table.take(new_order) yield table - yield EOS return raise UnsupportedSyntaxError( @@ -101,4 +100,3 @@ def execute(self, morsel: Table) -> Table: ) yield table.sort_by(mapped_order) - yield EOS diff --git a/opteryx/operators/union_node.py b/opteryx/operators/union_node.py index c8623f69d..430c5f785 100644 --- a/opteryx/operators/union_node.py +++ b/opteryx/operators/union_node.py @@ -50,6 +50,7 @@ def execute(self, morsel: Table) -> Table: coercible types are coerced. """ if morsel == EOS and self.seen_first_eos: + yield None return elif morsel == EOS: self.seen_first_eos = True diff --git a/opteryx/planner/cost_based_optimizer/strategies/constant_folding.py b/opteryx/planner/cost_based_optimizer/strategies/constant_folding.py index 21ac51b77..473d76837 100644 --- a/opteryx/planner/cost_based_optimizer/strategies/constant_folding.py +++ b/opteryx/planner/cost_based_optimizer/strategies/constant_folding.py @@ -154,7 +154,6 @@ def fold_constants(root: Node, statistics: QueryStatistics) -> Node: statistics.optimization_constant_fold_reduce += 1 return node - if root.node_type in {NodeType.AND, NodeType.OR, NodeType.XOR}: # try to fold each side of logical operators root.left = fold_constants(root.left, statistics)