Skip to content

Commit

Permalink
Merge pull request #2164 from mabel-dev/#2163
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer authored Dec 29, 2024
2 parents fe710c4 + cb0629f commit 7d93b0c
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 64 deletions.
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 925
__build__ = 926

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
30 changes: 15 additions & 15 deletions opteryx/compiled/cross_join/cython_cross_join.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
import numpy as np
cimport numpy as cnp
cimport cython
from libc.stdint cimport int32_t
from libc.stdint cimport int64_t
from libc.math cimport INFINITY

cpdef tuple build_rows_indices_and_column(cnp.ndarray column_data):
cdef Py_ssize_t row_count = column_data.shape[0]
cdef cnp.int32_t[::1] lengths = np.empty(row_count, dtype=np.int32)
cdef cnp.int32_t[::1] offsets = np.empty(row_count + 1, dtype=np.int32)
cdef Py_ssize_t i
cdef Py_ssize_t total_size = 0
cdef int64_t row_count = column_data.shape[0]
cdef cnp.int64_t[::1] lengths = np.empty(row_count, dtype=np.int64)
cdef cnp.int64_t[::1] offsets = np.empty(row_count + 1, dtype=np.int64)
cdef int64_t i
cdef int64_t total_size = 0
cdef cnp.dtype element_dtype = column_data[0].dtype

if not isinstance(column_data[0], np.ndarray):
Expand All @@ -30,13 +30,13 @@ cpdef tuple build_rows_indices_and_column(cnp.ndarray column_data):

# Early exit if total_size is zero
if total_size == 0:
return (np.array([], dtype=np.int32), np.array([], dtype=object))
return (np.array([], dtype=np.int64), np.array([], dtype=object))

# Compute offsets for efficient slicing
offsets[0] = 0
for i in range(row_count):
offsets[i + 1] = offsets[i] + lengths[i]
cdef cnp.int32_t[::1] indices = np.empty(total_size, dtype=np.int32)
cdef cnp.int64_t[::1] indices = np.empty(total_size, dtype=np.int64)
cdef cnp.ndarray flat_data = np.empty(total_size, dtype=element_dtype)

# Fill indices and flat_data
Expand Down Expand Up @@ -65,13 +65,13 @@ cpdef tuple build_filtered_rows_indices_and_column(cnp.ndarray column_data, set
tuple of (ndarray, ndarray)
Returns a tuple containing an array of indices and an array of flattened data for rows that match the filter.
"""
cdef Py_ssize_t row_count = column_data.shape[0]
cdef Py_ssize_t allocated_size = row_count * 4 # Initial allocation size
cdef Py_ssize_t index = 0
cdef Py_ssize_t i, j, len_i
cdef int64_t row_count = column_data.shape[0]
cdef int64_t allocated_size = row_count * 4 # Initial allocation size
cdef int64_t index = 0
cdef int64_t i, j, len_i
cdef object array_i
cdef cnp.ndarray flat_data
cdef cnp.int32_t[::1] indices
cdef cnp.int64_t[::1] indices
cdef cnp.dtype element_dtype = None
cdef object value

Expand All @@ -92,7 +92,7 @@ cpdef tuple build_filtered_rows_indices_and_column(cnp.ndarray column_data, set
element_dtype = np.object_

# Initialize indices and flat_data arrays
indices = np.empty(allocated_size, dtype=np.int32)
indices = np.empty(allocated_size, dtype=np.int64)
flat_data = np.empty(allocated_size, dtype=element_dtype)

# Handle set initialization based on element dtype
Expand Down Expand Up @@ -127,7 +127,7 @@ cpdef tuple build_filtered_rows_indices_and_column(cnp.ndarray column_data, set
index += 1

if index == 0:
return (np.array([], dtype=np.int32), np.array([], dtype=element_dtype))
return (np.array([], dtype=np.int64), np.array([], dtype=element_dtype))

# Slice arrays to the actual used size
indices = indices[:index]
Expand Down
55 changes: 34 additions & 21 deletions opteryx/compiled/levenshtein/clevenshtein.pyx
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
# cython: language_level=3
# cython: nonecheck=False
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=False
# cython: boundscheck=False

import numpy as np # Required for array allocation
from libc.stdint cimport int64_t, int32_t
cimport cython

cdef int min3(int x, int y, int z):
cdef inline int64_t min3(int64_t x, int64_t y, int64_t z) nogil:
"""Utility function to find the minimum of three integers."""
cdef int m = x
if y < m:
m = y
if z < m:
m = z
return m

def levenshtein(str string1, str string2):
if x <= y:
if x <= z:
return x
return z
if y <= z:
return y
return z

cpdef int64_t levenshtein(str string1, str string2):
"""
Calculate the Levenshtein distance between two strings.
Expand All @@ -22,26 +31,30 @@ def levenshtein(str string1, str string2):
Returns:
int: The Levenshtein distance between string1 and string2.
"""
if len(string1) < len(string2):
string1, string2 = string2, string1

cdef int len1 = len(string1)
cdef int len2 = len(string2)
cdef int i, j
cdef int len2 = len(string2) + 1

cdef int64_t i, j

# Allocate a numpy array and create a memory view from it
cdef int[:] dp = np.zeros((len1 + 1) * (len2 + 1), dtype=np.int32)
cdef int64_t[:] dp = np.zeros((len1 + 1) * len2, dtype=np.int64)

for i in range(len1 + 1):
for j in range(len2 + 1):
for j in range(len2):
if i == 0:
dp[i * (len2 + 1) + j] = j # First string is empty
dp[j] = j
elif j == 0:
dp[i * (len2 + 1) + j] = i # Second string is empty
dp[i * len2] = i
elif string1[i - 1] == string2[j - 1]:
dp[i * (len2 + 1) + j] = dp[(i - 1) * (len2 + 1) + (j - 1)]
dp[i * len2 + j] = dp[(i - 1) * len2 + (j - 1)]
else:
dp[i * (len2 + 1) + j] = 1 + min3(
dp[(i - 1) * (len2 + 1) + j], # Remove
dp[i * (len2 + 1) + (j - 1)], # Insert
dp[(i - 1) * (len2 + 1) + (j - 1)] # Replace
dp[i * len2 + j] = 1 + min3(
dp[(i - 1) * len2 + j], # Remove
dp[i * len2 + (j - 1)], # Insert
dp[(i - 1) * len2 + (j - 1)] # Replace
)

return dp[len1 * (len2 + 1) + len2]
return dp[len1 * len2 + (len2 - 1)]
27 changes: 10 additions & 17 deletions opteryx/compiled/structures/hash_table.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# cython: cdivision=True
# cython: initializedcheck=False
# cython: infer_types=True
# cython: wraparound=True
# cython: wraparound=False
# cython: boundscheck=False

from libcpp.unordered_map cimport unordered_map
Expand Down Expand Up @@ -59,7 +59,7 @@ cdef class HashSet:
cdef inline bint contains(self, int64_t value):
return self.c_set.find(value) != self.c_set.end()

@cython.wraparound(False)

cdef inline object recast_column(column):
cdef column_type = column.type

Expand All @@ -70,9 +70,6 @@ cdef inline object recast_column(column):
return column




@cython.wraparound(False)
cpdef tuple distinct(table, HashSet seen_hashes=None, list columns=None):
"""
Perform a distinct operation on the given table using an external HashSet.
Expand Down Expand Up @@ -140,7 +137,6 @@ cpdef tuple distinct(table, HashSet seen_hashes=None, list columns=None):

return keep, seen_hashes

@cython.wraparound(False)
cdef void compute_float_hashes(cnp.ndarray[cnp.float64_t] data, int64_t null_hash, int64_t[:] hashes):
cdef Py_ssize_t i, n = data.shape[0]
cdef cnp.float64_t value
Expand All @@ -151,7 +147,7 @@ cdef void compute_float_hashes(cnp.ndarray[cnp.float64_t] data, int64_t null_has
else:
hashes[i] = hash(value)

@cython.wraparound(False)

cdef void compute_int_hashes(cnp.ndarray[cnp.int64_t] data, int64_t null_hash, int64_t[:] hashes):
cdef Py_ssize_t i, n = data.shape[0]
cdef cnp.int64_t value
Expand All @@ -164,7 +160,6 @@ cdef void compute_int_hashes(cnp.ndarray[cnp.int64_t] data, int64_t null_hash, i
else:
hashes[i] = value # Hash of int is the int itself in Python 3

@cython.wraparound(False)
cdef void compute_object_hashes(cnp.ndarray data, int64_t null_hash, int64_t[:] hashes):
cdef Py_ssize_t i, n = data.shape[0]
cdef object value
Expand All @@ -176,18 +171,13 @@ cdef void compute_object_hashes(cnp.ndarray data, int64_t null_hash, int64_t[:]
hashes[i] = hash(value)


@cython.wraparound(False)
cpdef tuple list_distinct(cnp.ndarray values, cnp.int32_t[::1] indices, HashSet seen_hashes=None):
cpdef tuple list_distinct(cnp.ndarray values, cnp.int64_t[::1] indices, HashSet seen_hashes=None):
cdef:
Py_ssize_t i, j = 0
Py_ssize_t n = values.shape[0]
object v
int64_t hash_value
int32_t[::1] new_indices = numpy.empty(n, dtype=numpy.int32)

# Determine the dtype of the `values` array
int64_t[::1] new_indices = numpy.empty(n, dtype=numpy.int64)
cnp.dtype dtype = values.dtype

cnp.ndarray new_values = numpy.empty(n, dtype=dtype)

if seen_hashes is None:
Expand All @@ -200,11 +190,11 @@ cpdef tuple list_distinct(cnp.ndarray values, cnp.int32_t[::1] indices, HashSet
new_values[j] = v
new_indices[j] = indices[i]
j += 1

return new_values[:j], new_indices[:j], seen_hashes



@cython.wraparound(False)
cpdef HashTable hash_join_map(relation, list join_columns):
"""
Build a hash table for the join operations.
Expand Down Expand Up @@ -272,7 +262,10 @@ cpdef HashTable hash_join_map(relation, list join_columns):
return ht


cpdef filter_join_set(relation, list join_columns, HashSet seen_hashes):
cpdef HashSet filter_join_set(relation, list join_columns, HashSet seen_hashes):
"""
Build the set for the right of a filter join (ANTI/SEMI)
"""

cdef int64_t num_columns = len(join_columns)

Expand Down
2 changes: 2 additions & 0 deletions opteryx/connectors/capabilities/predicate_pushable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datetime
from typing import Dict

from orso.tools import single_item_cache
from orso.types import OrsoTypes

from opteryx.exceptions import NotSupportedError
Expand Down Expand Up @@ -59,6 +60,7 @@ def __init__(self, **kwargs):
pass

@staticmethod
@single_item_cache
def to_dnf(root):
"""
Convert a filter to DNF form, this is the form used by PyArrow.
Expand Down
5 changes: 3 additions & 2 deletions opteryx/managers/expression/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Expressions are evaluated against an entire morsel at a time.
"""

from collections import deque
from enum import Enum
from typing import Callable
from typing import Dict
Expand Down Expand Up @@ -289,7 +290,7 @@ def evaluate(expression: Node, table: Table):
return result


def get_all_nodes_of_type(root, select_nodes):
def get_all_nodes_of_type(root, select_nodes: tuple) -> list:
"""
Walk an expression tree collecting all nodes of a specified type.
"""
Expand All @@ -299,7 +300,7 @@ def get_all_nodes_of_type(root, select_nodes):
root = [root]

identifiers = []
stack = list(root)
stack = deque(root)

while stack:
node = stack.pop()
Expand Down
8 changes: 4 additions & 4 deletions opteryx/operators/async_read_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ def __init__(self, properties: QueryProperties, **parameters):

self.predicates = parameters.get("predicates")

@classmethod
def from_dict(cls, dic: dict) -> "AsyncReaderNode": # pragma: no cover
raise NotImplementedError()
@property
def name(self): # pragma: no cover
"""friendly name for this step"""
return "Async Read"

def execute(self, morsel, **kwargs) -> Generator:
if morsel == EOS:
Expand Down Expand Up @@ -99,7 +100,6 @@ def execute(self, morsel, **kwargs) -> Generator:

if len(blob_names) == 0:
# if we don't have any matching blobs, create an empty dataset
# TODO: rewrite
from orso import DataFrame

as_arrow = DataFrame(rows=[], schema=orso_schema).arrow()
Expand Down
16 changes: 15 additions & 1 deletion opteryx/operators/base_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import pyarrow
from orso.tools import random_string

from opteryx.config import MORSEL_SIZE


class BasePlanNode:
is_join: bool = False
Expand Down Expand Up @@ -68,6 +70,8 @@ def __call__(self, morsel: pyarrow.Table, join_leg: str) -> Optional[pyarrow.Tab

# set up the execution of the operator
generator = self.execute(morsel, join_leg=join_leg)
empty_morsel = None
at_least_one = False

while True:
try:
Expand All @@ -83,10 +87,20 @@ def __call__(self, morsel: pyarrow.Table, join_leg: str) -> Optional[pyarrow.Tab
self.records_out += result.num_rows
self.bytes_out += result.nbytes

yield result
# if we get empty sets, don't yield them unless they're the only one
if result.num_rows > 0:
self.statistics.avoided_empty_datasets += 1
at_least_one = True
yield result
else:
empty_morsel = result
else:
yield result

except StopIteration:
# Break the loop when the generator is exhausted
if not at_least_one and empty_morsel is not None:
yield empty_morsel
break
except Exception as err:
# print(f"Exception {err} in operator", self.name)
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/cross_join_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _cross_join_unnest_column(

if single_column and distinct and indices.size > 0:
# if the unnest target is the only field in the SELECT and we're DISTINCTING
indices = numpy.array(indices, dtype=numpy.int32)
indices = numpy.array(indices, dtype=numpy.int64)
new_column_data, indices, hash_set = list_distinct(new_column_data, indices, hash_set)

if len(indices) > 0:
Expand Down
Loading

0 comments on commit 7d93b0c

Please sign in to comment.