Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#6556: do window block-wise #6290

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
code cleanup and edits
  • Loading branch information
christinafan committed Sep 11, 2023
commit 4e563b7af39031ec661a45130448bf8630d0e424
4 changes: 2 additions & 2 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,7 @@ def window(

# applies reduction function over entire virtual partition
def window_function_complete(virtual_partition):
# have to copy the pandas dataframe on ray because it's immutable
virtual_partition_copy = virtual_partition.copy()
window_result = reduce_fn(virtual_partition_copy)
return window_result
Expand Down Expand Up @@ -2092,7 +2093,6 @@ def window_function_partition(virtual_partition):
]
)
parts_to_join.append(masked_new_parts)
break
else:
# window continues into next part, so just add this part to parts_to_join
if axis == Axis.COL_WISE:
Expand Down Expand Up @@ -2138,7 +2138,7 @@ def window_function_partition(virtual_partition):
results = np.array(results)

return self.__constructor__(
results, self.index, self.columns, None, None, result_schema
np.array(results), self.index, self.columns, None, None, result_schema
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down
126 changes: 44 additions & 82 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ def expanding_corr(
)
)

old_window_mean = Fold.register(
_window_mean = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).mean(*args, **kwargs)
)
Expand All @@ -1429,9 +1429,9 @@ def window_mean(self, axis, window_kwargs, *args, **kwargs):
)
)
else:
return self.old_window_mean(axis, window_kwargs, *args, **kwargs)
return self._window_mean(axis, window_kwargs, *args, **kwargs)

old_window_sum = Fold.register(
_window_sum = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).sum(*args, **kwargs)
)
Expand All @@ -1450,9 +1450,9 @@ def window_sum(self, axis, window_kwargs, *args, **kwargs):
)
)
else:
return self.old_window_sum(axis, window_kwargs, *args, **kwargs)
return self._window_sum(axis, window_kwargs, *args, **kwargs)

old_window_var = Fold.register(
_window_var = Fold.register(
lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).var(ddof=ddof, *args, **kwargs)
)
Expand All @@ -1471,9 +1471,9 @@ def window_var(self, axis, window_kwargs, ddof, *args, **kwargs):
)
)
else:
return self.old_window_var(axis, window_kwargs, ddof, *args, **kwargs)
return self._window_var(axis, window_kwargs, ddof, *args, **kwargs)

old_window_std = Fold.register(
_window_std = Fold.register(
lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).std(ddof=ddof, *args, **kwargs)
)
Expand All @@ -1492,9 +1492,9 @@ def window_std(self, axis, window_kwargs, ddof, *args, **kwargs):
)
)
else:
return self.old_window_std(axis, window_kwargs, ddof, *args, **kwargs)
return self._window_std(axis, window_kwargs, ddof, *args, **kwargs)

old_rolling_count = Fold.register(
_rolling_count = Fold.register(
lambda df, rolling_kwargs: pandas.DataFrame(df.rolling(**rolling_kwargs).count())
)

Expand All @@ -1509,9 +1509,9 @@ def rolling_count(self, axis, rolling_kwargs):
)
)
else:
return self.old_rolling_count(axis, rolling_kwargs)
return self._rolling_count(axis, rolling_kwargs)

old_rolling_sum = Fold.register(
_rolling_sum = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).sum(*args, **kwargs)
)
Expand All @@ -1530,9 +1530,9 @@ def rolling_sum(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_sum(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_sum(axis, rolling_kwargs, *args, **kwargs)

old_rolling_sem = Fold.register(
_rolling_sem = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).sem(*args, **kwargs)
)
Expand All @@ -1551,9 +1551,9 @@ def rolling_sem(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_sem(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_sem(axis, rolling_kwargs, *args, **kwargs)

old_rolling_mean = Fold.register(
_rolling_mean = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).mean(*args, **kwargs)
)
Expand All @@ -1572,9 +1572,9 @@ def rolling_mean(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_mean(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_mean(axis, rolling_kwargs, *args, **kwargs)

old_rolling_median = Fold.register(
_rolling_median = Fold.register(
lambda df, rolling_kwargs, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).median(**kwargs)
)
Expand All @@ -1591,9 +1591,9 @@ def rolling_median(self, axis, rolling_kwargs, **kwargs):
)
)
else:
return self.old_rolling_median(axis, rolling_kwargs, **kwargs)
return self._rolling_median(axis, rolling_kwargs, **kwargs)

old_rolling_var = Fold.register(
_rolling_var = Fold.register(
lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).var(ddof=ddof, *args, **kwargs)
)
Expand All @@ -1614,9 +1614,9 @@ def rolling_var(self, axis, rolling_kwargs, ddof, *args, **kwargs):
)
)
else:
return self.old_rolling_var(axis, rolling_kwargs, ddof, *args, **kwargs)
return self._rolling_var(axis, rolling_kwargs, ddof, *args, **kwargs)

old_rolling_std = Fold.register(
_rolling_std = Fold.register(
lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).std(ddof=ddof, *args, **kwargs)
)
Expand All @@ -1637,9 +1637,9 @@ def rolling_std(self, axis, rolling_kwargs, ddof, *args, **kwargs):
)
)
else:
return self.old_rolling_std(axis, rolling_kwargs, ddof, *args, **kwargs)
return self._rolling_std(axis, rolling_kwargs, ddof, *args, **kwargs)

old_rolling_min = Fold.register(
_rolling_min = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).min(*args, **kwargs)
)
Expand All @@ -1658,9 +1658,9 @@ def rolling_min(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_min(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_min(axis, rolling_kwargs, *args, **kwargs)

old_rolling_max = Fold.register(
_rolling_max = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).max(*args, **kwargs)
)
Expand All @@ -1679,9 +1679,9 @@ def rolling_max(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_max(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_max(axis, rolling_kwargs, *args, **kwargs)

old_rolling_skew = Fold.register(
_rolling_skew = Fold.register(
lambda df, rolling_kwargs, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).skew(**kwargs)
)
Expand All @@ -1698,9 +1698,9 @@ def rolling_skew(self, axis, rolling_kwargs, **kwargs):
)
)
else:
return self.old_rolling_skew(axis, rolling_kwargs, **kwargs)
return self._rolling_skew(axis, rolling_kwargs, **kwargs)

old_rolling_kurt = Fold.register(
_rolling_kurt = Fold.register(
lambda df, rolling_kwargs, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).kurt(**kwargs)
)
Expand All @@ -1717,9 +1717,9 @@ def rolling_kurt(self, axis, rolling_kwargs, **kwargs):
)
)
else:
return self.old_rolling_kurt(axis, rolling_kwargs, **kwargs)
return self._rolling_kurt(axis, rolling_kwargs, **kwargs)

old_rolling_apply = Fold.register(
_rolling_apply = Fold.register(
lambda df, rolling_kwargs, func, raw, engine, engine_kwargs, args, kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).apply(
func=func,
Expand Down Expand Up @@ -1754,7 +1754,7 @@ def rolling_apply(
)
)
else:
return self.old_rolling_apply(
return self._rolling_apply(
axis,
rolling_kwargs,
func,
Expand All @@ -1765,7 +1765,7 @@ def rolling_apply(
kwargs,
)

old_rolling_rank = Fold.register(
_rolling_rank = Fold.register(
lambda df, rolling_kwargs, method, ascending, pct, numeric_only, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).rank(
method=method,
Expand Down Expand Up @@ -1798,11 +1798,11 @@ def rolling_rank(
)
)
else:
return self.old_rolling_rank(
return self._rolling_rank(
axis, rolling_kwargs, method, ascending, pct, numeric_only, **kwargs
)

old_rolling_quantile = Fold.register(
_rolling_quantile = Fold.register(
lambda df, rolling_kwargs, quantile, interpolation, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).quantile(
quantile=quantile, interpolation=interpolation, **kwargs
Expand All @@ -1825,11 +1825,11 @@ def rolling_quantile(self, axis, rolling_kwargs, quantile, interpolation, **kwar
)
)
else:
return self.old_rolling_quantile(
return self._rolling_quantile(
axis, rolling_kwargs, quantile, interpolation, **kwargs
)

old_rolling_corr = Fold.register(
_rolling_corr = Fold.register(
lambda df, rolling_kwargs, other, pairwise, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).corr(
other=other, pairwise=pairwise, *args, **kwargs
Expand Down Expand Up @@ -1859,11 +1859,11 @@ def rolling_corr(self, axis, rolling_kwargs, other, pairwise, *args, **kwargs):
)
)
else:
return self.old_rolling_corr(
return self._rolling_corr(
axis, rolling_kwargs, other, pairwise, *args, **kwargs
)

old_rolling_cov = Fold.register(
_rolling_cov = Fold.register(
lambda df, rolling_kwargs, other, pairwise, ddof, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).cov(
other=other, pairwise=pairwise, ddof=ddof, **kwargs
Expand Down Expand Up @@ -1897,7 +1897,7 @@ def rolling_cov(self, axis, rolling_kwargs, other, pairwise, ddof, **kwargs):
)
)
else:
return self.old_rolling_cov(
return self._rolling_cov(
axis, rolling_kwargs, other, pairwise, ddof, **kwargs
)

Expand Down Expand Up @@ -4494,48 +4494,10 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
return self.__constructor__(new_modin_frame)

def sort_rows_by_column_values(self, columns, ascending=True, **kwargs):
# Our algebra sort is only implemented for Engines that support virtual partitioning.
if _current_engine_supports_virtual_partitions():
new_modin_frame = self._modin_frame.sort_by(
0, columns, ascending=ascending, **kwargs
)
return self.__constructor__(new_modin_frame)
ignore_index = kwargs.get("ignore_index", False)
kwargs["ignore_index"] = False
if not is_list_like(columns):
columns = [columns]
# Currently, sort_values will just reindex based on the sorted values.
# TODO create a more efficient way to sort
ErrorMessage.default_to_pandas("sort_values")
broadcast_value_dict = {
col: self.getitem_column_array([col]).to_pandas().squeeze(axis=1)
for col in columns
}
# Clear index level names because they also appear in broadcast_value_dict
orig_index_level_names = self.index.names
tmp_index = self.index.copy()
tmp_index.names = [None] * tmp_index.nlevels
# Index may contain duplicates
broadcast_values1 = pandas.DataFrame(broadcast_value_dict, index=tmp_index)
# Index without duplicates
broadcast_values2 = pandas.DataFrame(broadcast_value_dict)
broadcast_values2 = broadcast_values2.reset_index(drop=True)
# Index may contain duplicates
new_index1 = broadcast_values1.sort_values(
by=columns, axis=0, ascending=ascending, **kwargs
).index
# Index without duplicates
new_index2 = broadcast_values2.sort_values(
by=columns, axis=0, ascending=ascending, **kwargs
).index

result = self.reset_index(drop=True).reindex(axis=0, labels=new_index2)
if ignore_index:
result = result.reset_index(drop=True)
else:
result.index = new_index1
result.index.names = orig_index_level_names
return result
new_modin_frame = self._modin_frame.sort_by(
0, columns, ascending=ascending, **kwargs
)
return self.__constructor__(new_modin_frame)

def sort_columns_by_row_values(self, rows, ascending=True, **kwargs):
if not is_list_like(rows):
Expand Down