Skip to content

Commit

Permalink
Reducing data copy in DataFrame/Series constructors, pd.concat, and p…
Browse files Browse the repository at this point in the history
…d.merge

Summary: Adding `copy=False` to invocations of `pd.DataFrame()`, `pd.Series()`, `pd.concat()`, and `pd.merge()` throughout Kats to reduce data copying where not necessary.

Reviewed By: jeffhandl

Differential Revision: D36754374

fbshipit-source-id: 32b745060e2732435c8248584bd94911b37d8713
  • Loading branch information
uthakore authored and facebook-github-bot committed Jun 1, 2022
1 parent 7547e25 commit 622dca5
Show file tree
Hide file tree
Showing 46 changed files with 435 additions and 314 deletions.
51 changes: 31 additions & 20 deletions kats/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def __init__( # noqa C901
)
raise _log_error(msg)
if isinstance(time, pd.DatetimeIndex):
self._time = pd.Series(time)
self._time = pd.Series(time, copy=False)
else:
self._time = cast(pd.Series, time.reset_index(drop=True))
self._value = value.reset_index(drop=True)
Expand Down Expand Up @@ -586,20 +586,20 @@ def extend(self, other: object, validate: bool = True) -> None:
if not isinstance(other, TimeSeriesData):
raise TypeError("extend must take another TimeSeriesData object")
# Concatenate times
self.time = pd.concat([self.time, other.time], ignore_index=True).reset_index(
drop=True
)
self.time = pd.concat(
[self.time, other.time], ignore_index=True, copy=False
).reset_index(drop=True)
# Convert values to DataFrame if needed
cur_value = self.value
other_value = other.value
if isinstance(self.value, pd.Series):
cur_value = pd.DataFrame(cur_value)
cur_value = pd.DataFrame(cur_value, copy=False)
if isinstance(other.value, pd.Series):
other_value = pd.DataFrame(other_value)
other_value = pd.DataFrame(other_value, copy=False)
# Concatenate values
self.value = pd.concat([cur_value, other_value], ignore_index=True).reset_index(
drop=True
)
self.value = pd.concat(
[cur_value, other_value], ignore_index=True, copy=False
).reset_index(drop=True)
# Merge value back to Series if required
self._set_univariate_values_to_series()
# Validate that frequency is constant if required
Expand Down Expand Up @@ -728,16 +728,16 @@ def to_dataframe(self, standard_time_col_name: bool = False) -> pd.DataFrame:
time_col_name = (
DEFAULT_TIME_NAME if standard_time_col_name else self.time_col_name
)
output_df = pd.DataFrame(dict(zip((time_col_name,), (self.time,))))
output_df = pd.DataFrame(dict(zip((time_col_name,), (self.time,))), copy=False)
if isinstance(self.value, pd.Series):
if self.value.name is not None:
output_df[self.value.name] = self.value
else:
output_df[DEFAULT_VALUE_NAME] = self.value
elif isinstance(self.value, pd.DataFrame):
output_df = pd.concat([output_df, self.value], axis=1).reset_index(
drop=True
)
output_df = pd.concat(
[output_df, self.value], axis=1, copy=False
).reset_index(drop=True)
else:
raise ValueError(f"Wrong value type: {type(self.value)}")
return output_df
Expand All @@ -759,9 +759,13 @@ def _get_binary_op_other_arg(self, other: object) -> TimeSeriesData:
dict(
zip(
(DEFAULT_TIME_NAME, self.value.name),
(self.time, pd.Series(other, index=self.time.index)),
(
self.time,
pd.Series(other, index=self.time.index, copy=False),
),
)
)
),
copy=False,
)
)
else:
Expand All @@ -787,6 +791,7 @@ def _perform_op(self, other: object, op_type: "OperationsEnum") -> TimeSeriesDat
on=DEFAULT_TIME_NAME,
how="outer",
suffixes=(PREFIX_OP_1, PREFIX_OP_2),
copy=False,
)
# Map the final column name to the sub column names
col_map = {}
Expand Down Expand Up @@ -998,12 +1003,15 @@ class TimeSeriesIterator:

def __init__(self, ts: TimeSeriesData) -> None:
self.ts: TimeSeriesData = copy.deepcopy(ts)
self.ts.value = pd.DataFrame(ts.value)
self.ts.value = pd.DataFrame(ts.value, copy=False)
self.start = 0

def __iter__(self) -> TimeSeriesIterator:
self.a = pd.DataFrame(
list(self.ts.value.iloc[:, 0]), index=list(self.ts.time), columns=["y"]
list(self.ts.value.iloc[:, 0]),
index=list(self.ts.time),
columns=["y"],
copy=False,
)
return self

Expand All @@ -1013,6 +1021,7 @@ def __next__(self) -> pd.DataFrame:
list(self.ts.value.iloc[:, self.start]),
index=list(self.ts.time),
columns=["y"],
copy=False,
)
self.start += 1
return x
Expand Down Expand Up @@ -1042,12 +1051,14 @@ def __next__(self) -> TimeSeriesData:
if self.curr < len(self.ts.time):
if self.ts.is_univariate():
ret = TimeSeriesData(
time=pd.Series(self.ts.time[self.curr]),
value=pd.Series(self.ts.value.iloc[self.curr], name=self.curr),
time=pd.Series(self.ts.time[self.curr], copy=False),
value=pd.Series(
self.ts.value.iloc[self.curr], name=self.curr, copy=False
),
)
else:
ret = TimeSeriesData(
time=pd.Series(self.ts.time[self.curr]),
time=pd.Series(self.ts.time[self.curr], copy=False),
value=self.ts.value.loc[[self.curr]],
)
self.curr += 1
Expand Down
25 changes: 13 additions & 12 deletions kats/detectors/bocpd_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@
import numpy as np
import pandas as pd
from kats.consts import TimeSeriesData
from kats.detectors.bocpd import (
BOCPDetector,
BOCPDModelType,
)
from kats.detectors.bocpd import BOCPDetector, BOCPDModelType
from kats.detectors.detector import DetectorModel
from kats.detectors.detector_consts import (
AnomalyResponse,
ConfidenceBand,
)
from kats.detectors.detector_consts import AnomalyResponse, ConfidenceBand
from statsmodels.tsa.holtwinters import ExponentialSmoothing


Expand Down Expand Up @@ -111,7 +105,8 @@ def _handle_missing_data_extend(
for i in range(len(historical_data))
if historical_data.time.iloc[i] in original_time_list
],
}
},
copy=False,
),
use_unix_time=True,
unix_time_units="s",
Expand Down Expand Up @@ -189,8 +184,12 @@ def fit_predict(

# construct the object
N = len(data)
default_ts = TimeSeriesData(time=data.time, value=pd.Series(N * [0.0]))
score_ts = TimeSeriesData(time=data.time, value=pd.Series(change_prob))
default_ts = TimeSeriesData(
time=data.time, value=pd.Series(N * [0.0], copy=False)
)
score_ts = TimeSeriesData(
time=data.time, value=pd.Series(change_prob, copy=False)
)

self.response = AnomalyResponse(
scores=score_ts,
Expand Down Expand Up @@ -262,7 +261,9 @@ def _holt_winter_fit(
fit_arr = [x + y for x, y in zip(level_arr, trend_arr)]
fit_diff = np.diff(fit_arr)
fit_diff = np.concatenate(([fit_diff[0]], fit_diff))
trend_ts = TimeSeriesData(time=data_ts.time, value=pd.Series(fit_diff))
trend_ts = TimeSeriesData(
time=data_ts.time, value=pd.Series(fit_diff, copy=False)
)
return trend_ts

def fit_predict(
Expand Down
15 changes: 6 additions & 9 deletions kats/detectors/cusum_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,12 @@
import logging
from datetime import datetime
from enum import Enum
from typing import Dict, cast, Any, List, Optional, Union, NamedTuple
from typing import Any, cast, Dict, List, NamedTuple, Optional, Union

import numpy as np
import pandas as pd
from kats.consts import (
DEFAULT_VALUE_NAME,
TimeSeriesData,
)
from kats.detectors.cusum_detection import (
CUSUMDetector,
CUSUM_DEFAULT_ARGS,
)
from kats.consts import DEFAULT_VALUE_NAME, TimeSeriesData
from kats.detectors.cusum_detection import CUSUM_DEFAULT_ARGS, CUSUMDetector
from kats.detectors.detector import DetectorModel
from kats.detectors.detector_consts import AnomalyResponse
from kats.utils.decomposition import TimeSeriesDecomposition
Expand Down Expand Up @@ -409,6 +403,7 @@ def _zeros_ts(self, data: TimeSeriesData) -> TimeSeriesData:
value=pd.Series(
np.zeros(len(data)),
name=data.value.name if data.value.name else DEFAULT_VALUE_NAME,
copy=False,
),
)

Expand Down Expand Up @@ -499,6 +494,7 @@ def fit_predict(
decomp["rem"][historical_data_time_idx].value
+ decomp["trend"][historical_data_time_idx].value,
name=historical_data.value.name,
copy=False,
)

smooth_window = int(scan_window.total_seconds() / frequency.total_seconds())
Expand All @@ -509,6 +505,7 @@ def fit_predict(
)[: 1 - smooth_window]
/ smooth_window,
name=historical_data.value.name,
copy=False,
)
smooth_historical_data = TimeSeriesData(
time=historical_data.time, value=smooth_historical_value
Expand Down
2 changes: 1 addition & 1 deletion kats/detectors/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def remover(self, interpolate: bool = False) -> TimeSeriesData:
df.append(ts)

# Need to make this a ts object
df_final = pd.concat(df, axis=1)
df_final = pd.concat(df, axis=1, copy=False)

if interpolate:
df_final.interpolate(method="linear", limit_direction="both", inplace=True)
Expand Down
26 changes: 16 additions & 10 deletions kats/detectors/detector_consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Tuple, Union, cast
from typing import cast, List, Optional, Tuple, Union

import attr
import numpy as np
Expand Down Expand Up @@ -119,7 +119,7 @@ def extend_data(self, data: TimeSeriesData) -> None:
new_data_df.columns = ["time"] + self._ts_cols
df = self.data_df
if df is not None:
new_data_df = pd.concat([df, new_data_df])
new_data_df = pd.concat([df, new_data_df], copy=False)
self.data_df = new_data_df.loc[
(new_data_df.time >= self.start_time) & (new_data_df.time < self.end_time)
]
Expand Down Expand Up @@ -455,7 +455,12 @@ def _calc_cov(self) -> float:

# for multivariate TS data
if self.num_series > 1:
return np.asarray([np.cov(current[:, c], previous[:, c])[0, 1] / n_min for c in range(self.num_series)])
return np.asarray(
[
np.cov(current[:, c], previous[:, c])[0, 1] / n_min
for c in range(self.num_series)
]
)

return np.cov(current, previous)[0, 1] / n_min

Expand All @@ -471,9 +476,9 @@ def _delta_method(self) -> None:
cov_xy = self._calc_cov()

sigma_sq_ratio = (
test_var / (n_test * (control_mean ** 2))
- 2 * (test_mean * cov_xy) / (control_mean ** 3)
+ (control_var * (test_mean ** 2)) / (n_control * (control_mean ** 4))
test_var / (n_test * (control_mean**2))
- 2 * (test_mean * cov_xy) / (control_mean**3)
+ (control_var * (test_mean**2)) / (n_control * (control_mean**4))
)
# the signs appear flipped because norm.ppf(0.025) ~ -1.96
self.lower = self.ratio_estimate + norm.ppf(self.alpha / 2) * np.sqrt(
Expand Down Expand Up @@ -550,10 +555,10 @@ def update(
def _update_ts_slice(
self, ts: TimeSeriesData, time: datetime, value: Union[float, ArrayLike]
) -> TimeSeriesData:
time = ts.time.iloc[1:].append(pd.Series(time))
time = ts.time.iloc[1:].append(pd.Series(time, copy=False))
time.reset_index(drop=True, inplace=True)
if self.num_series == 1:
value = ts.value.iloc[1:].append(pd.Series(value))
value = ts.value.iloc[1:].append(pd.Series(value, copy=False))
value.reset_index(drop=True, inplace=True)
return TimeSeriesData(time=time, value=value)
else:
Expand All @@ -564,7 +569,7 @@ def _update_ts_slice(
value_dict = {}
for i, value_col in enumerate(self.key_mapping):
value_dict[value_col] = (
ts.value[value_col].iloc[1:].append(pd.Series(value[i]))
ts.value[value_col].iloc[1:].append(pd.Series(value[i], copy=False))
)
value_dict[value_col].reset_index(drop=True, inplace=True)
return TimeSeriesData(
Expand All @@ -575,7 +580,8 @@ def _update_ts_slice(
value_col: value_dict[value_col]
for value_col in self.key_mapping
},
}
},
copy=False,
)
)

Expand Down
4 changes: 3 additions & 1 deletion kats/detectors/meta_learning/metalearning_detection_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def report_metrics(self) -> pd.DataFrame:
results = self.results
if results is None:
results = self.train()
summary = pd.DataFrame([results["fit_error"], results["pred_error"]])
summary = pd.DataFrame(
[results["fit_error"], results["pred_error"]], copy=False
)
summary["type"] = ["fit_error", "pred_error"]
summary["error_metric"] = "Inverted F-score"
return summary
Expand Down
13 changes: 10 additions & 3 deletions kats/detectors/multivariate_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,24 @@ def predict(
shape=[len(data) - output_scores_df.shape[0], output_scores_df.shape[1]]
)
padding[:] = np.NaN
padding = pd.DataFrame(padding, columns=output_scores_df.columns)
padding = pd.DataFrame(padding, columns=output_scores_df.columns, copy=False)
# all fields other than scores are left as TimeSeriesData with all zero values
response = AnomalyResponse(
scores=TimeSeriesData(
time=data.time,
value=pd.concat(
[padding.iloc[:, :-2], output_scores_df.iloc[:, :-2]],
ignore_index=True,
copy=False,
),
),
confidence_band=ConfidenceBand(
upper=TimeSeriesData(time=data.time, value=pd.DataFrame(zeros)),
lower=TimeSeriesData(time=data.time, value=pd.DataFrame(zeros)),
upper=TimeSeriesData(
time=data.time, value=pd.DataFrame(zeros, copy=False)
),
lower=TimeSeriesData(
time=data.time, value=pd.DataFrame(zeros, copy=False)
),
),
predicted_ts=TimeSeriesData(
time=data.time, value=pd.DataFrame(zeros).iloc[:, :-2]
Expand All @@ -160,13 +165,15 @@ def predict(
value=pd.concat(
[padding.iloc[:, -2], output_scores_df.iloc[:, -2]],
ignore_index=True,
copy=False,
),
),
stat_sig_ts=TimeSeriesData(
time=data.time,
value=pd.concat(
[padding.iloc[:, -1], output_scores_df.iloc[:, -1]],
ignore_index=True,
copy=False,
),
),
)
Expand Down
Loading

0 comments on commit 622dca5

Please sign in to comment.