Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2100 #2117

Merged
merged 2 commits into from
Nov 27, 2024
Merged

#2100 #2117

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 871
__build__ = 874

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion opteryx/functions/other_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def if_null(values, replacement):


def null_if(col1, col2):
"""
"""
Parameters:
col1: Union[numpy.ndarray, list]
The first input array.
Expand Down
16 changes: 11 additions & 5 deletions opteryx/models/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
from typing import Optional
from typing import Tuple

import pyarrow

from opteryx import EOS
from opteryx.constants import ResultType
from opteryx.exceptions import InvalidInternalStateError
from opteryx.third_party.travers import Graph

import pyarrow

morsel_lock = Lock()
active_task_lock = Lock()
active_tasks: int = 0


def active_tasks_increment(value: int):
global active_tasks
with active_task_lock:
Expand Down Expand Up @@ -115,7 +116,6 @@ def _inner_explain(node, depth):
print(table)
return table


def execute(self, head_node=None) -> Generator[Tuple[Any, ResultType], Any, Any]:
from opteryx.operators import ExplainNode
from opteryx.operators import JoinNode
Expand All @@ -135,7 +135,7 @@ def mark_node_exhausted(node_id):
return # Node is already marked as exhausted

node_exhaustion[node_id] = True
print("+", node_id, self[node_id].name)
# print("+", node_id, self[node_id].name)

# Notify downstream nodes
for _, downstream_node, _ in self.outgoing_edges(node_id):
Expand Down Expand Up @@ -258,7 +258,13 @@ def should_stop():
all_nodes_exhausted = all(node_exhaustion.values())
queues_empty = work_queue.empty() and response_queue.empty()
all_nodes_inactive = active_tasks <= 0
print(node_exhaustion.values(), all(node_exhaustion.values()), work_queue.empty(), response_queue.empty(), active_tasks)
# print(
# node_exhaustion.values(),
# all(node_exhaustion.values()),
# work_queue.empty(),
# response_queue.empty(),
# active_tasks,
# )
return all_nodes_exhausted and queues_empty and all_nodes_inactive

while not should_stop():
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/aggregate_and_group_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ def execute(self, morsel: pyarrow.Table):
groups = groups.rename_columns(list(self.column_map.keys()) + self.group_by_columns)

yield groups
yield EOS
return

morsel = project(morsel, self.all_identifiers)
Expand All @@ -153,3 +152,4 @@ def execute(self, morsel: pyarrow.Table):
morsel = evaluate_and_append(self.groups, morsel)

self.buffer.append(morsel)
yield None
6 changes: 4 additions & 2 deletions opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,13 @@ def config(self): # pragma: no cover

def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
if not self.continue_executing:
return None
yield None
return

if self._unnest_column is not None:
if morsel == EOS:
self.continue_executing = False
yield None
return
if isinstance(self._unnest_column.value, tuple):
yield from _cross_join_unnest_literal(
Expand Down Expand Up @@ -366,4 +368,4 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
yield from _cross_join(self.left_relation, right_table)
else:
self.right_buffer.append(morsel)
yield None
yield None
2 changes: 1 addition & 1 deletion opteryx/operators/distinct_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def execute(self, morsel: Table) -> Table:
# limit processing

if morsel == EOS:
yield EOS
yield None
return

unique_indexes, self.hash_set = distinct(
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/filter_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def name(self): # pragma: no cover

def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
if morsel == EOS:
yield EOS
yield None
return

if morsel.num_rows == 0:
Expand Down
2 changes: 2 additions & 0 deletions opteryx/operators/heap_sort_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,5 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
)
# Slice the sorted table
self.table = self.table.take(sort_indices[: self.limit])

yield None
1 change: 1 addition & 0 deletions opteryx/operators/inner_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def execute(self, morsel: Table) -> Table:
return

if morsel == EOS:
yield None
return

# do the join
Expand Down
1 change: 1 addition & 0 deletions opteryx/operators/inner_join_node_single.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
return

if morsel == EOS:
yield None
return

# do the join
Expand Down
4 changes: 2 additions & 2 deletions opteryx/operators/limit_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def config(self): # pragma: no cover

def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
if morsel == EOS:
yield None
return

if self.rows_left_to_skip > 0:
Expand All @@ -64,9 +65,8 @@ def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:

if self.remaining_rows <= 0 or morsel.num_rows == 0:
yield morsel.slice(offset=0, length=0)
yield

if morsel.num_rows < self.remaining_rows:
elif morsel.num_rows < self.remaining_rows:
self.remaining_rows -= morsel.num_rows
yield morsel
else:
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/projection_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def name(self): # pragma: no cover

def execute(self, morsel: pyarrow.Table) -> pyarrow.Table:
if morsel == EOS:
yield EOS
yield None
return

# If any of the columns need evaluating, we need to do that here
Expand Down
2 changes: 0 additions & 2 deletions opteryx/operators/sort_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def execute(self, morsel: Table) -> Table:
new_order = numpy.argsort(numpy.random.uniform(size=table.num_rows))
table = table.take(new_order)
yield table
yield EOS
return

raise UnsupportedSyntaxError(
Expand Down Expand Up @@ -101,4 +100,3 @@ def execute(self, morsel: Table) -> Table:
)

yield table.sort_by(mapped_order)
yield EOS
1 change: 1 addition & 0 deletions opteryx/operators/union_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def execute(self, morsel: Table) -> Table:
coercible types are coerced.
"""
if morsel == EOS and self.seen_first_eos:
yield None
return
elif morsel == EOS:
self.seen_first_eos = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def fold_constants(root: Node, statistics: QueryStatistics) -> Node:
statistics.optimization_constant_fold_reduce += 1
return node


if root.node_type in {NodeType.AND, NodeType.OR, NodeType.XOR}:
# try to fold each side of logical operators
root.left = fold_constants(root.left, statistics)
Expand Down
Loading