Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2165 #2166

Merged
merged 2 commits into from
Dec 29, 2024
Merged

#2165 #2166

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 928
__build__ = 930

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def execute(self, morsel, **kwargs) -> Generator:

from opteryx import system_statistics

"""Perform this step, time how long is spent doing work"""
# Perform this step, time how long is spent doing work
orso_schema = self.parameters["schema"]
reader = self.parameters["connector"]

Expand Down
29 changes: 17 additions & 12 deletions opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import pyarrow
from orso.tools import random_string

from opteryx.config import MORSEL_SIZE
from opteryx import EOS

END = object()


class BasePlanNode:
Expand Down Expand Up @@ -77,31 +79,34 @@ def __call__(self, morsel: pyarrow.Table, join_leg: str) -> Optional[pyarrow.Tab
try:
# Time the production of the next result
start_time = time.monotonic_ns()
result = next(generator) # Retrieve the next item from the generator
result = next(generator, END) # Retrieve the next item from the generator
execution_time = time.monotonic_ns() - start_time
self.execution_time += execution_time
self.statistics.increase("time_" + self.name.lower(), execution_time)

# Update metrics for valid results
if result == END:
# Break the loop when the generator is exhausted
if not at_least_one:
yield empty_morsel
break

if hasattr(result, "num_rows"):
self.records_out += result.num_rows
self.bytes_out += result.nbytes

if empty_morsel is None:
empty_morsel = result.slice(0, 0)

# if we get empty sets, don't yield them unless they're the only one
if result.num_rows > 0:
self.statistics.avoided_empty_datasets += 1
at_least_one = True
yield result
else:
empty_morsel = result
else:
yield result

except StopIteration:
# Break the loop when the generator is exhausted
if not at_least_one and empty_morsel is not None:
yield empty_morsel
break
continue

yield result

except Exception as err:
# print(f"Exception {err} in operator", self.name)
raise err
Expand Down
Loading