diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f4f2feaa6..1a7387dc00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ but cannot always guarantee backwards compatibility. Changes that may **break co - renamed params `actual_anoamlies` to `anomalies`, and `binary_pred_anomalies` to `pred_anomalies` - `darts.ad.utils.show_anomalies_from_scores`: - renamed params `series` to `actual_series`, `actual_anomalies` to `anomalies`, `model_output` to `pred_series`, and `anomaly_scores` to `pred_scores` +- Improvements to `RegressionModel` : [#2404](https://github.com/unit8co/darts/pull/2404) by [Anton Ragot](https://github.com/AntonRagot) and [Dennis Bader](https://github.com/dennisbader). + - Added parameters `sample_weight` and `val_sample_weight` to `fit()` to apply weights to each observation with the corresponding output step, and target component in the training and evaluation set. - Improvements to `TimeSeries` : [#1477](https://github.com/unit8co/darts/pull/1477) by [Dennis Bader](https://github.com/dennisbader). - New method `with_times_and_values()`, which returns a new series with a new time index and new values but with identical columns and metadata as the series called from (static covariates, hierarchy). - New method `slice_intersect_times()`, which returns the sliced time index of a series, where the index has been intersected with another series. diff --git a/darts/models/forecasting/catboost_model.py b/darts/models/forecasting/catboost_model.py index 6fc8947bf2..1ed9580a97 100644 --- a/darts/models/forecasting/catboost_model.py +++ b/darts/models/forecasting/catboost_model.py @@ -7,10 +7,10 @@ This implementation comes with the ability to produce probabilistic forecasts. """ -from typing import List, Optional, Sequence, Tuple, Union +from typing import Dict, List, Optional, Sequence, Tuple, Union import numpy as np -from catboost import CatBoostRegressor +from catboost import CatBoostRegressor, Pool from darts.logging import get_logger from darts.models.forecasting.regression_model import RegressionModel, _LikelihoodMixin @@ -215,6 +215,11 @@ def fit( val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, max_samples_per_ts: Optional[int] = None, + n_jobs_multioutput_wrapper: Optional[int] = None, + sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, + val_sample_weight: Optional[ + Union[TimeSeries, Sequence[TimeSeries], str] + ] = None, verbose: Optional[Union[int, bool]] = 0, **kwargs, ): @@ -242,6 +247,21 @@ def fit( creation) to know their sizes, which might be expensive on big datasets. If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the most recent `max_samples_per_ts` samples will be considered. + n_jobs_multioutput_wrapper + Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't + support multi-output regression natively. + sample_weight + Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, + per label (each step in `output_chunk_length`), and per component. + If a series or sequence of series, then those weights are used. If the weight series only have a single + component / column, then the weights are applied globally to all components in `series`. Otherwise, for + component-specific weights, the number of components must match those of `series`. + If a string, then the weights are generated using built-in weighting functions. The available options are + `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are + computed globally based on the length of the longest series in `series`. Then for each series, the weights + are extracted from the end of the global weights. This gives a common time weighting across all series. + val_sample_weight + Same as for `sample_weight` but for the evaluation dataset. verbose An integer or a boolean that can be set to 1 to display catboost's default verbose output **kwargs @@ -263,6 +283,9 @@ def fit( val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, max_samples_per_ts=max_samples_per_ts, + n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, + val_sample_weight=val_sample_weight, verbose=verbose, **kwargs, ) @@ -277,6 +300,9 @@ def fit( val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, max_samples_per_ts=max_samples_per_ts, + n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, + val_sample_weight=val_sample_weight, verbose=verbose, **kwargs, ) @@ -318,14 +344,52 @@ def _likelihood_components_names( else: return None + def _add_val_set_to_kwargs( + self, + kwargs: Dict, + val_series: Sequence[TimeSeries], + val_past_covariates: Optional[Sequence[TimeSeries]], + val_future_covariates: Optional[Sequence[TimeSeries]], + val_sample_weight: Optional[Union[Sequence[TimeSeries], str]], + max_samples_per_ts: int, + ) -> dict: + # CatBoostRegressor requires sample weights to be passed with a validation set `Pool` + kwargs = super()._add_val_set_to_kwargs( + kwargs=kwargs, + val_series=val_series, + val_past_covariates=val_past_covariates, + val_future_covariates=val_future_covariates, + val_sample_weight=val_sample_weight, + max_samples_per_ts=max_samples_per_ts, + ) + val_set_name, val_weight_name = self.val_set_params + val_sets = kwargs[val_set_name] + # CatBoost requires eval set Pool with sample weights -> remove from kwargs + val_weights = kwargs.pop(val_weight_name) + val_pools = [] + for i, val_set in enumerate(val_sets): + val_pools.append( + Pool( + data=val_set[0], + label=val_set[1], + weight=val_weights[i] if val_weights is not None else None, + ) + ) + kwargs[val_set_name] = val_pools + return kwargs + @property def supports_probabilistic_prediction(self) -> bool: return self.likelihood is not None @property - def supports_val_set(self): + def supports_val_set(self) -> bool: return True + @property + def val_set_params(self) -> Tuple[Optional[str], Optional[str]]: + return "eval_set", "eval_sample_weight" + @property def min_train_series_length(self) -> int: # Catboost requires a minimum of 2 train samples, therefore the min_train_series_length should be one more than diff --git a/darts/models/forecasting/lgbm.py b/darts/models/forecasting/lgbm.py index 32ab22a924..512cf4b264 100644 --- a/darts/models/forecasting/lgbm.py +++ b/darts/models/forecasting/lgbm.py @@ -10,7 +10,7 @@ https://github.com/unit8co/darts/blob/master/INSTALL.md """ -from typing import List, Optional, Sequence, Union +from typing import List, Optional, Sequence, Tuple, Union import lightgbm as lgb import numpy as np @@ -226,6 +226,11 @@ def fit( val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, max_samples_per_ts: Optional[int] = None, + n_jobs_multioutput_wrapper: Optional[int] = None, + sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, + val_sample_weight: Optional[ + Union[TimeSeries, Sequence[TimeSeries], str] + ] = None, **kwargs, ): """ @@ -252,6 +257,21 @@ def fit( creation) to know their sizes, which might be expensive on big datasets. If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the most recent `max_samples_per_ts` samples will be considered. + n_jobs_multioutput_wrapper + Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't + support multi-output regression natively. + sample_weight + Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, + per label (each step in `output_chunk_length`), and per component. + If a series or sequence of series, then those weights are used. If the weight series only have a single + component / column, then the weights are applied globally to all components in `series`. Otherwise, for + component-specific weights, the number of components must match those of `series`. + If a string, then the weights are generated using built-in weighting functions. The available options are + `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are + computed globally based on the length of the longest series in `series`. Then for each series, the weights + are extracted from the end of the global weights. This gives a common time weighting across all series. + val_sample_weight + Same as for `sample_weight` but for the evaluation dataset. **kwargs Additional kwargs passed to `lightgbm.LGBRegressor.fit()` """ @@ -269,6 +289,9 @@ def fit( val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, max_samples_per_ts=max_samples_per_ts, + n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, + val_sample_weight=val_sample_weight, **kwargs, ) @@ -283,6 +306,9 @@ def fit( val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, max_samples_per_ts=max_samples_per_ts, + n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, + val_sample_weight=val_sample_weight, **kwargs, ) return self @@ -309,9 +335,13 @@ def supports_probabilistic_prediction(self) -> bool: return self.likelihood is not None @property - def supports_val_set(self): + def supports_val_set(self) -> bool: return True + @property + def val_set_params(self) -> Tuple[Optional[str], Optional[str]]: + return "eval_set", "eval_sample_weight" + @property def min_train_series_length(self) -> int: # LightGBM requires a minimum of 2 train samples, therefore the min_train_series_length should be one more than diff --git a/darts/models/forecasting/linear_regression_model.py b/darts/models/forecasting/linear_regression_model.py index 7bdffff8d6..36dabe0d5e 100644 --- a/darts/models/forecasting/linear_regression_model.py +++ b/darts/models/forecasting/linear_regression_model.py @@ -211,33 +211,9 @@ def fit( future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, max_samples_per_ts: Optional[int] = None, n_jobs_multioutput_wrapper: Optional[int] = None, + sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, **kwargs, ): - """ - Fit/train the model on one or multiple series. - - Parameters - ---------- - series - TimeSeries or Sequence[TimeSeries] object containing the target values. - past_covariates - Optionally, a series or sequence of series specifying past-observed covariates - future_covariates - Optionally, a series or sequence of series specifying future-known covariates - max_samples_per_ts - This is an integer upper bound on the number of tuples that can be produced - per time series. It can be used in order to have an upper bound on the total size of the dataset and - ensure proper sampling. If `None`, it will read all of the individual time series in advance (at dataset - creation) to know their sizes, which might be expensive on big datasets. - If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the - most recent `max_samples_per_ts` samples will be considered. - n_jobs_multioutput_wrapper - Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't - support multi-output regression natively. - **kwargs - Additional keyword arguments passed to the `fit` method of the model. - """ - if self.likelihood == "quantile": # set solver for linear program if "solver" not in self.kwargs: @@ -267,6 +243,8 @@ def fit( past_covariates=past_covariates, future_covariates=future_covariates, max_samples_per_ts=max_samples_per_ts, + n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, **kwargs, ) @@ -283,6 +261,8 @@ def fit( past_covariates=past_covariates, future_covariates=future_covariates, max_samples_per_ts=max_samples_per_ts, + n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, **kwargs, ) diff --git a/darts/models/forecasting/regression_model.py b/darts/models/forecasting/regression_model.py index 804b452a1e..91bd9ba5a8 100644 --- a/darts/models/forecasting/regression_model.py +++ b/darts/models/forecasting/regression_model.py @@ -563,17 +563,33 @@ def _add_val_set_to_kwargs( val_series: Sequence[TimeSeries], val_past_covariates: Optional[Sequence[TimeSeries]], val_future_covariates: Optional[Sequence[TimeSeries]], + val_sample_weight: Optional[Union[Sequence[TimeSeries], str]], max_samples_per_ts: int, - ): + ) -> dict: """Creates a validation set and returns a new set of kwargs passed to `self.model.fit()` including the validation set. This method can be overridden if the model requires a different logic to add the eval set.""" - val_samples, val_labels = self._create_lagged_data( + val_samples, val_labels, val_weight = self._create_lagged_data( series=val_series, past_covariates=val_past_covariates, future_covariates=val_future_covariates, max_samples_per_ts=max_samples_per_ts, + sample_weight=val_sample_weight, + last_static_covariates_shape=self._static_covariates_shape, ) - return dict(kwargs, **{"eval_set": (val_samples, val_labels)}) + # create validation sets for MultiOutputRegressor + if val_labels.ndim == 2 and isinstance(self.model, MultiOutputRegressor): + val_sets, val_weights = [], [] + for i in range(val_labels.shape[1]): + val_sets.append((val_samples, val_labels[:, i])) + if val_weight is not None: + val_weights.append(val_weight[:, i]) + val_weights = val_weights or None + else: + val_sets = [(val_samples, val_labels)] + val_weights = val_weight + + val_set_name, val_weight_name = self.val_set_params + return dict(kwargs, **{val_set_name: val_sets, val_weight_name: val_weights}) def _create_lagged_data( self, @@ -581,12 +597,15 @@ def _create_lagged_data( past_covariates: Sequence[TimeSeries], future_covariates: Sequence[TimeSeries], max_samples_per_ts: int, + sample_weight: Optional[Union[TimeSeries, str]] = None, + last_static_covariates_shape: Optional[Tuple[int, int]] = None, ): ( features, labels, _, self._static_covariates_shape, + sample_weights, ) = create_lagged_training_data( target_series=series, output_chunk_length=self.output_chunk_length, @@ -597,11 +616,12 @@ def _create_lagged_data( lags_past_covariates=self._get_lags("past"), lags_future_covariates=self._get_lags("future"), uses_static_covariates=self.uses_static_covariates, - last_static_covariates_shape=None, + last_static_covariates_shape=last_static_covariates_shape, max_samples_per_ts=max_samples_per_ts, multi_models=self.multi_models, check_inputs=False, concatenate=False, + sample_weight=sample_weight, ) expected_nb_feat = ( @@ -626,15 +646,25 @@ def _create_lagged_data( raise_log(ValueError("\n".join(shape_error_msg)), logger) features[i] = X_i[:, :, 0] labels[i] = y_i[:, :, 0] + if sample_weights is not None: + sample_weights[i] = sample_weights[i][:, :, 0] - training_samples = np.concatenate(features, axis=0) - training_labels = np.concatenate(labels, axis=0) + features = np.concatenate(features, axis=0) + labels = np.concatenate(labels, axis=0) + if sample_weights is not None: + sample_weights = np.concatenate(sample_weights, axis=0) # if labels are of shape (n_samples, 1) flatten it to shape (n_samples,) - if len(training_labels.shape) == 2 and training_labels.shape[1] == 1: - training_labels = training_labels.ravel() + if labels.ndim == 2 and labels.shape[1] == 1: + labels = labels.ravel() + if ( + sample_weights is not None + and sample_weights.ndim == 2 + and sample_weights.shape[1] == 1 + ): + sample_weights = sample_weights.ravel() - return training_samples, training_labels + return features, labels, sample_weights def _fit_model( self, @@ -642,21 +672,24 @@ def _fit_model( past_covariates: Sequence[TimeSeries], future_covariates: Sequence[TimeSeries], max_samples_per_ts: int, + sample_weight: Optional[Union[Sequence[TimeSeries], str]], val_series: Optional[Sequence[TimeSeries]] = None, val_past_covariates: Optional[Sequence[TimeSeries]] = None, val_future_covariates: Optional[Sequence[TimeSeries]] = None, + val_sample_weight: Optional[Union[Sequence[TimeSeries], str]] = None, **kwargs, ): """ Function that fit the model. Deriving classes can override this method for adding additional parameters (e.g., adding validation data), keeping the sanity checks on series performed by fit(). """ - - training_set = self._create_lagged_data( + training_samples, training_labels, sample_weights = self._create_lagged_data( series=series, past_covariates=past_covariates, future_covariates=future_covariates, max_samples_per_ts=max_samples_per_ts, + sample_weight=sample_weight, + last_static_covariates_shape=None, ) if self.supports_val_set and val_series is not None: @@ -665,9 +698,12 @@ def _fit_model( val_series=val_series, val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, + val_sample_weight=val_sample_weight, max_samples_per_ts=max_samples_per_ts, ) - self.model.fit(*training_set, **kwargs) + self.model.fit( + training_samples, training_labels, sample_weight=sample_weights, **kwargs + ) # generate and store the lagged components names (for feature importance analysis) self._lagged_feature_names, self._lagged_label_names = ( @@ -691,6 +727,7 @@ def fit( future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, max_samples_per_ts: Optional[int] = None, n_jobs_multioutput_wrapper: Optional[int] = None, + sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, **kwargs, ): """ @@ -714,6 +751,16 @@ def fit( n_jobs_multioutput_wrapper Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't support multi-output regression natively. + sample_weight + Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, + per label (each step in `output_chunk_length`), and per component. + If a series or sequence of series, then those weights are used. If the weight series only have a single + component / column, then the weights are applied globally to all components in `series`. Otherwise, for + component-specific weights, the number of components must match those of `series`. + If a string, then the weights are generated using built-in weighting functions. The available options are + `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are + computed globally based on the length of the longest series in `series`. Then for each series, the weights + are extracted from the end of the global weights. This gives a common time weighting across all series. **kwargs Additional keyword arguments passed to the `fit` method of the model. """ @@ -725,6 +772,12 @@ def fit( val_past_covariates = series2seq(kwargs.pop("val_past_covariates", None)) val_future_covariates = series2seq(kwargs.pop("val_future_covariates", None)) + if not isinstance(sample_weight, str): + sample_weight = series2seq(sample_weight) + val_sample_weight = kwargs.pop("val_sample_weight", None) + if not isinstance(val_sample_weight, str): + val_sample_weight = series2seq(val_sample_weight) + self.encoders = self.initialize_encoders() if self.encoders.encoding_available: past_covariates, future_covariates = self.generate_fit_encodings( @@ -779,28 +832,29 @@ def fit( # if multi-output regression if not series[0].is_univariate or ( - self.output_chunk_length > 1 and self.multi_models + self.output_chunk_length > 1 + and self.multi_models + and not isinstance(self.model, MultiOutputRegressor) ): - # and model isn't wrapped already - if not isinstance(self.model, MultiOutputRegressor): - # check whether model supports multi-output regression natively - if not ( - callable(getattr(self.model, "_get_tags", None)) - and isinstance(self.model._get_tags(), dict) - and self.model._get_tags().get("multioutput") - ): - # if not, wrap model with MultiOutputRegressor - self.model = MultiOutputRegressor( - self.model, n_jobs=n_jobs_multioutput_wrapper - ) - elif self.model.__class__.__name__ == "CatBoostRegressor": - if ( - self.model.get_params()["loss_function"] - == "RMSEWithUncertainty" - ): - self.model = MultiOutputRegressor( - self.model, n_jobs=n_jobs_multioutput_wrapper - ) + val_set_name, val_weight_name = self.val_set_params + mor_kwargs = { + "eval_set_name": val_set_name, + "eval_weight_name": val_weight_name, + "n_jobs": n_jobs_multioutput_wrapper, + } + if sample_weight is not None: + # we have 2D sample (and time) weights, only supported in Darts + self.model = MultiOutputRegressor(self.model, **mor_kwargs) + elif not ( + callable(getattr(self.model, "_get_tags", None)) + and isinstance(self.model._get_tags(), dict) + and self.model._get_tags().get("multioutput") + ): + # model does not support multi-output regression natively + self.model = MultiOutputRegressor(self.model, **mor_kwargs) + elif self.model.__class__.__name__ == "CatBoostRegressor": + if self.model.get_params()["loss_function"] == "RMSEWithUncertainty": + self.model = MultiOutputRegressor(self.model, **mor_kwargs) # warn if n_jobs_multioutput_wrapper was provided but not used if ( @@ -870,6 +924,8 @@ def fit( val_series=val_series, val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, + sample_weight=sample_weight, + val_sample_weight=val_sample_weight, max_samples_per_ts=max_samples_per_ts, **kwargs, ) @@ -1183,10 +1239,16 @@ def supports_static_covariates(self) -> bool: return True @property - def supports_val_set(self): + def supports_val_set(self) -> bool: """Whether the model supports a validation set during training.""" return False + @property + def val_set_params(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the parameter names for the validation set, and validation sample weights if it supports + a validation set.""" + return None, None + def _check_optimizable_historical_forecasts( self, forecast_horizon: int, @@ -1734,6 +1796,7 @@ def fit( future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, max_samples_per_ts: Optional[int] = None, n_jobs_multioutput_wrapper: Optional[int] = None, + sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, **kwargs, ): self._validate_categorical_covariates( @@ -1747,6 +1810,7 @@ def fit( future_covariates=future_covariates, max_samples_per_ts=max_samples_per_ts, n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, **kwargs, ) @@ -1903,6 +1967,7 @@ def _fit_model( past_covariates, future_covariates, max_samples_per_ts, + sample_weight, **kwargs, ): """ @@ -1924,5 +1989,6 @@ def _fit_model( past_covariates=past_covariates, future_covariates=future_covariates, max_samples_per_ts=max_samples_per_ts, + sample_weight=sample_weight, **kwargs, ) diff --git a/darts/models/forecasting/xgboost.py b/darts/models/forecasting/xgboost.py index f1a9a6a18a..79bfdcd27d 100644 --- a/darts/models/forecasting/xgboost.py +++ b/darts/models/forecasting/xgboost.py @@ -8,7 +8,7 @@ """ from functools import partial -from typing import Dict, List, Optional, Sequence, Union +from typing import List, Optional, Sequence, Tuple, Union import numpy as np import xgboost as xgb @@ -232,6 +232,11 @@ def fit( val_past_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, val_future_covariates: Optional[Union[TimeSeries, Sequence[TimeSeries]]] = None, max_samples_per_ts: Optional[int] = None, + n_jobs_multioutput_wrapper: Optional[int] = None, + sample_weight: Optional[Union[TimeSeries, Sequence[TimeSeries], str]] = None, + val_sample_weight: Optional[ + Union[TimeSeries, Sequence[TimeSeries], str] + ] = None, **kwargs, ): """ @@ -258,6 +263,21 @@ def fit( creation) to know their sizes, which might be expensive on big datasets. If some series turn out to have a length that would allow more than `max_samples_per_ts`, only the most recent `max_samples_per_ts` samples will be considered. + n_jobs_multioutput_wrapper + Number of jobs of the MultiOutputRegressor wrapper to run in parallel. Only used if the model doesn't + support multi-output regression natively. + sample_weight + Optionally, some sample weights to apply to the target `series` labels. They are applied per observation, + per label (each step in `output_chunk_length`), and per component. + If a series or sequence of series, then those weights are used. If the weight series only have a single + component / column, then the weights are applied globally to all components in `series`. Otherwise, for + component-specific weights, the number of components must match those of `series`. + If a string, then the weights are generated using built-in weighting functions. The available options are + `"linear"` or `"exponential"` decay - the further in the past, the lower the weight. The weights are + computed globally based on the length of the longest series in `series`. Then for each series, the weights + are extracted from the end of the global weights. This gives a common time weighting across all series. + val_sample_weight + Same as for `sample_weight` but for the evaluation dataset. **kwargs Additional kwargs passed to `xgb.XGBRegressor.fit()` """ @@ -281,6 +301,9 @@ def fit( val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, max_samples_per_ts=max_samples_per_ts, + n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, + val_sample_weight=val_sample_weight, **kwargs, ) self._model_container[quantile] = self.model @@ -294,6 +317,9 @@ def fit( val_past_covariates=val_past_covariates, val_future_covariates=val_future_covariates, max_samples_per_ts=max_samples_per_ts, + n_jobs_multioutput_wrapper=n_jobs_multioutput_wrapper, + sample_weight=sample_weight, + val_sample_weight=val_sample_weight, **kwargs, ) return self @@ -315,33 +341,18 @@ def _predict_and_sample( x, num_samples, predict_likelihood_parameters, **kwargs ) - def _add_val_set_to_kwargs( - self, - kwargs: Dict, - val_series: Sequence[TimeSeries], - val_past_covariates: Optional[Sequence[TimeSeries]], - val_future_covariates: Optional[Sequence[TimeSeries]], - max_samples_per_ts: int, - ): - # XGBRegressor.fit() requires a list of eval sets - kwargs = super()._add_val_set_to_kwargs( - kwargs=kwargs, - val_series=val_series, - val_past_covariates=val_past_covariates, - val_future_covariates=val_future_covariates, - max_samples_per_ts=max_samples_per_ts, - ) - kwargs["eval_set"] = [kwargs["eval_set"]] - return kwargs - @property def supports_probabilistic_prediction(self) -> bool: return self.likelihood is not None @property - def supports_val_set(self): + def supports_val_set(self) -> bool: return True + @property + def val_set_params(self) -> Tuple[Optional[str], Optional[str]]: + return "eval_set", "sample_weight_eval_set" + @property def min_train_series_length(self) -> int: # XGBModel requires a minimum of 2 training samples, diff --git a/darts/tests/models/forecasting/test_regression_models.py b/darts/tests/models/forecasting/test_regression_models.py index a09abc7aca..feedf73eef 100644 --- a/darts/tests/models/forecasting/test_regression_models.py +++ b/darts/tests/models/forecasting/test_regression_models.py @@ -1,5 +1,7 @@ import copy import functools +import importlib +import inspect import itertools import math from unittest.mock import patch @@ -524,7 +526,7 @@ def test_training_data_creation(self, mode): max_samples_per_ts = 17 - training_samples, training_labels = model_instance._create_lagged_data( + training_samples, training_labels, _ = model_instance._create_lagged_data( series=self.target_series, past_covariates=self.past_covariates, future_covariates=self.future_covariates, @@ -575,7 +577,7 @@ def test_training_data_creation(self, mode): max_samples_per_ts = 3 # using only one series of each - training_samples, training_labels = model_instance._create_lagged_data( + training_samples, training_labels, _ = model_instance._create_lagged_data( series=self.target_series[0], past_covariates=self.past_covariates[0], future_covariates=self.future_covariates[0], @@ -1566,6 +1568,132 @@ def test_multiple_ts(self, mode): assert error_both > error_both_multi_ts + @pytest.mark.parametrize( + "config", + itertools.product( + [ + (LinearRegressionModel, {}), + (RandomForest, {"bootstrap": False}), + (XGBModel, xgb_test_params), + ] + + ( + [(CatBoostModel, dict({"allow_const_label": True}, **cb_test_params))] + if cb_available + else [] + ) + + ([(LightGBMModel, lgbm_test_params)] if lgbm_available else []), + [True, False], + ), + ) + def test_weights_built_in(self, config): + (model_cls, model_kwargs), single_series = config + + ts = TimeSeries.from_values(values=np.array([0, 0, 0, 0, 1, 0, 0])) + + model = model_cls(lags=3, output_chunk_length=1, **model_kwargs) + model.fit( + ts if single_series else [ts] * 2, + sample_weight="linear_decay", + ) + preds = model.predict(n=3, series=ts if single_series else [ts] * 2) + + model_no_weight = model_cls(lags=3, output_chunk_length=1, **model_kwargs) + model_no_weight.fit( + ts if single_series else [ts] * 2, + sample_weight=None, + ) + preds_no_weight = model_no_weight.predict( + n=3, series=ts if single_series else [ts] * 2 + ) + + if single_series: + preds = [preds] + preds_no_weight = [preds_no_weight] + + for pred, pred_no_weight in zip(preds, preds_no_weight): + with pytest.raises(AssertionError): + np.testing.assert_array_almost_equal( + pred.all_values(), pred_no_weight.all_values() + ) + + @pytest.mark.parametrize( + "config", + itertools.product( + [ + (LinearRegressionModel, {}), + (RandomForest, {"bootstrap": False}), + (XGBModel, xgb_test_params), + ] + + ( + [(CatBoostModel, dict({"allow_const_label": True}, **cb_test_params))] + if cb_available + else [] + ) + + ([(LightGBMModel, lgbm_test_params)] if lgbm_available else []), + [True, False], + ), + ) + def test_weights_single_step_horizon(self, config): + (model_cls, model_kwargs), single_series = config + model = model_cls(lags=3, output_chunk_length=1, **model_kwargs) + + weights = TimeSeries.from_values(np.array([0, 0, 0, 0, 1, 0, 0])) + + ts = TimeSeries.from_values(values=np.array([0, 0, 0, 0, 1, 0, 0])) + + model.fit( + ts if single_series else [ts] * 2, + sample_weight=weights if single_series else [weights] * 2, + ) + + preds = model.predict(n=3, series=ts if single_series else [ts] * 2) + + preds = [preds] if single_series else preds + for pred in preds: + np.testing.assert_array_almost_equal(pred.values()[:, 0], [1, 1, 1]) + + @pytest.mark.parametrize( + "config", + [ + (LinearRegressionModel, {}), + (RandomForest, {"bootstrap": False}), + (XGBModel, xgb_test_params), + ] + + ( + [(CatBoostModel, dict({"allow_const_label": True}, **cb_test_params))] + if cb_available + else [] + ) + + ([(LightGBMModel, lgbm_test_params)] if lgbm_available else []), + ) + def test_weights_multi_horizon(self, config): + (model_cls, model_kwargs) = config + model = model_cls(lags=3, output_chunk_length=3, **model_kwargs) + + weights = TimeSeries.from_values(np.array([0, 0, 0, 1, 1, 1, 0, 0, 0])) + + # model should only fit on ones in the middle + ts = TimeSeries.from_values(values=np.array([0, 0, 0, 1, 1, 1, 2, 2, 2])) + + model.fit(ts, sample_weight=weights) + + pred = model.predict(n=3) + + np.testing.assert_array_almost_equal(pred.values()[:, 0], [1, 1, 1]) + + def test_weights_multimodel_false_multi_horizon(self): + model = LinearRegressionModel(lags=3, output_chunk_length=3, multi_models=False) + + weights = TimeSeries.from_values(np.array([0, 0, 0, 0, 0, 1, 0, 0])) + + ts = TimeSeries.from_values(values=np.array([0, 0, 0, 0, 0, 1, 0, 0])) + + model.fit(ts, sample_weight=weights) + + pred = model.predict(n=3) + + np.testing.assert_array_almost_equal(pred.values()[:, 0], [1, 1, 1]) + @pytest.mark.parametrize("mode", [True, False]) def test_only_future_covariates(self, mode): model = RegressionModel(lags_future_covariates=[-2], multi_models=mode) @@ -1671,25 +1799,58 @@ def test_not_enough_covariates(self, config): @pytest.mark.parametrize( "config", - [(XGBModel, xgb_test_params, "xgboost.xgb.XGBRegressor")] - + ( - [(LightGBMModel, lgbm_test_params, "lgbm.lgb.LGBMRegressor")] - if lgbm_available - else [] - ) - + ( - [(CatBoostModel, cb_test_params, "catboost_model.CatBoostRegressor")] - if cb_available - else [] + itertools.product( + [ + ( + XGBModel, + xgb_test_params, + "xgboost.xgb.XGBRegressor", + "xgboost.XGBRegressor", + ) + ] + + ( + [ + ( + LightGBMModel, + lgbm_test_params, + "lgbm.lgb.LGBMRegressor", + "lightgbm.LGBMRegressor", + ) + ] + if lgbm_available + else [] + ) + + ( + [ + ( + CatBoostModel, + cb_test_params, + "catboost_model.CatBoostRegressor", + "catboost.CatBoostRegressor", + ) + ] + if cb_available + else [] + ), + [False, True], ), ) def test_val_set(self, config): """Test whether the evaluation set parameters are passed to the wrapped regression model.""" - model_cls, model_kwargs, model_loc = config + (model_cls, model_kwargs, model_loc, model_import), use_weights = config + module_name, model_name = model_import.split(".") + # mocking `fit` loses function signature. MultiOutputRegressor checks the function signature + # internally, so we have to overwrite the mocked function signature with the original one. + fit_sig = inspect.signature( + getattr(importlib.import_module(module_name), model_name).fit + ) with patch(f"darts.models.forecasting.{model_loc}.fit") as fit_patch: - self.helper_check_val_set(model_cls, model_kwargs, fit_patch) + fit_patch.__signature__ = fit_sig + self.helper_check_val_set( + model_cls, model_kwargs, fit_patch, use_weights=use_weights + ) - def helper_check_val_set(self, model_cls, model_kwargs, fit_patch): + def helper_check_val_set(self, model_cls, model_kwargs, fit_patch, use_weights): series1 = tg.sine_timeseries(length=10, column_name="tg_1") series2 = tg.sine_timeseries(length=10, column_name="tg_2") / 2 + 10 series = series1.stack(series2) @@ -1700,6 +1861,16 @@ def helper_check_val_set(self, model_cls, model_kwargs, fit_patch): fc = TimeSeries.from_times_and_values( times=series.time_index, values=series.values() * -1, columns=["fc1", "fc2"] ) + + weights_kwargs = ( + { + "sample_weight": tg.linear_timeseries(length=10), + "val_sample_weight": tg.linear_timeseries(length=10), + } + if use_weights + else {} + ) + model = model_cls( lags={"default_lags": [-4, -3, -2, -1]}, lags_past_covariates=3, @@ -1723,6 +1894,7 @@ def helper_check_val_set(self, model_cls, model_kwargs, fit_patch): val_past_covariates=pc, val_future_covariates=fc["fc1"], early_stopping_rounds=2, + **weights_kwargs, ) msg_expected = ( "The dimensions of the (`series`, `future_covariates`, `static_covariates`) between " @@ -1740,6 +1912,7 @@ def helper_check_val_set(self, model_cls, model_kwargs, fit_patch): val_past_covariates=[pc, pc], val_future_covariates=[fc, fc["fc1"]], early_stopping_rounds=2, + **weights_kwargs, ) msg_expected = ( "The dimensions of the (`series`, `future_covariates`, `static_covariates`) between " @@ -1755,23 +1928,53 @@ def helper_check_val_set(self, model_cls, model_kwargs, fit_patch): val_past_covariates=pc, val_future_covariates=fc, early_stopping_rounds=2, + **weights_kwargs, ) # fit called 6 times (3 quantiles * 2 target features) assert fit_patch.call_count == 6 - train_set = fit_patch.call_args[0] + X_train, y_train = fit_patch.call_args[0] + + # check weights in training set + weight_train = None + if use_weights: + assert "sample_weight" in fit_patch.call_args[1] + weight_train = fit_patch.call_args[1]["sample_weight"] + + # check eval set + eval_set_name, eval_weight_name = model.val_set_params + assert eval_set_name in fit_patch.call_args[1] eval_set = fit_patch.call_args[1]["eval_set"] assert eval_set is not None - # xbg requires a list of eval sets - if issubclass(model_cls, XGBModel): - assert isinstance(eval_set, list) - eval_set = eval_set[0] - assert isinstance(eval_set, tuple) and len(eval_set) == 2 + assert isinstance(eval_set, list) + eval_set = eval_set[0] + + weight = None + if cb_available and isinstance(model, CatBoostModel): + # CatBoost requires eval set as `Pool` + from catboost import Pool + + assert isinstance(eval_set, Pool) + X, y = eval_set.get_features(), eval_set.get_label() + if use_weights: + weight = np.array(eval_set.get_weight()) + + else: + assert isinstance(eval_set, tuple) and len(eval_set) == 2 + X, y = eval_set + if use_weights: + assert eval_weight_name in fit_patch.call_args[1] + weight = fit_patch.call_args[1][eval_weight_name] + assert isinstance(weight, list) + weight = weight[0] # check same number of features for each dataset - assert eval_set[0].shape[1:] == train_set[0].shape[1:] - assert eval_set[1].shape[1:] == train_set[1].shape[1:] + assert X.shape[1:] == X_train.shape[1:] + assert y.shape[1:] == y_train.shape[1:] assert fit_patch.call_args[1]["early_stopping_rounds"] == 2 + if use_weights: + assert weight_train.shape == y_train.shape + assert weight.shape == y.shape @pytest.mark.parametrize("mode", [True, False]) def test_integer_indexed_series(self, mode): diff --git a/darts/tests/utils/tabularization/test_create_lagged_training_data.py b/darts/tests/utils/tabularization/test_create_lagged_training_data.py index c900de8803..ae0b65524a 100644 --- a/darts/tests/utils/tabularization/test_create_lagged_training_data.py +++ b/darts/tests/utils/tabularization/test_create_lagged_training_data.py @@ -522,7 +522,7 @@ def helper_check_lagged_data( else [expected_times_y] ) - X, y, times, _ = create_lagged_training_data( + X, y, times, _, _ = create_lagged_training_data( target_series=target, output_chunk_length=output_chunk_length, past_covariates=past_cov if lags_past_ else None, @@ -977,7 +977,7 @@ def test_lagged_training_data_method_consistency(self, series_type): if all(lags_is_none): continue # Using moving window method: - X_mw, y_mw, times_mw, _ = create_lagged_training_data( + X_mw, y_mw, times_mw, _, _ = create_lagged_training_data( target_series=target, output_chunk_length=output_chunk_length, past_covariates=past if lags_past else None, @@ -992,7 +992,7 @@ def test_lagged_training_data_method_consistency(self, series_type): output_chunk_shift=output_chunk_shift, ) # Using time intersection method: - X_ti, y_ti, times_ti, _ = create_lagged_training_data( + X_ti, y_ti, times_ti, _, _ = create_lagged_training_data( target_series=target, output_chunk_length=output_chunk_length, past_covariates=past if lags_past else None, @@ -1065,7 +1065,8 @@ def test_lagged_training_data_single_lag_single_component_same_series(self, conf ] expected_X = np.concatenate( [expected_X_target, expected_X_past, expected_X_future], axis=1 - )[:, :, np.newaxis] + ) + expected_X = np.expand_dims(expected_X, axis=-1) kwargs = { "expected_X": expected_X, @@ -2611,3 +2612,215 @@ def test_create_lagged_component_names_different_lags(self, config): use_static_covariates=use_static_cov, ) assert expected_lagged_features == created_lagged_features + + @pytest.mark.parametrize( + "config", + itertools.product( + [10, 50], + [True, False], + ["linear_decay", "exponential_decay"], + ["D", "2D", 2], + [True, False], + ), + ) + def test_correct_generated_weights_exponential(self, config): + """Tests built in weights generation for: + - varying target series sizes + - with and without moving window tabularization + - different weight functions + - datetime and integer index + - single and multiple series + """ + training_size, use_moving_windows, sample_weight, freq, single_series = config + + if not isinstance(freq, int): + freq = pd.tseries.frequencies.to_offset(freq) + start = pd.Timestamp("2000-01-01") + else: + start = 1 + + train_y = linear_timeseries(start=start, length=training_size, freq=freq) + + _, y, _, _, weights = create_lagged_training_data( + lags=[-4, -1], + target_series=train_y if single_series else [train_y] * 2, + output_chunk_length=1, + uses_static_covariates=False, + sample_weight=sample_weight, + output_chunk_shift=0, + use_moving_windows=use_moving_windows, + ) + + len_y = len(y) if single_series else int(len(y) / 2) + if sample_weight == "equal": + expected_weights = np.ones((len_y, 1, 1)) + elif sample_weight == "linear_decay": + expected_weights = np.linspace(0, 1, len(train_y))[-len_y:, None, None] + else: # exponential decay + time_steps = np.linspace(0, 1, len(train_y)) + expected_weights = np.exp(-10 * (1 - time_steps))[-len_y:, None, None] + + if not single_series: + expected_weights = np.concatenate([expected_weights] * 2, axis=0) + + assert weights.shape == y.shape + np.testing.assert_array_almost_equal(weights, expected_weights) + + @pytest.mark.parametrize( + "config", + itertools.product( + [10, 20], + [True, False], + [True, False], + [1, 2], + [0, 1], + ["D", "2D", 2], + [True, False], + [True, False], + ), + ) + def test_correct_user_weights(self, config): + """Checks correct weights extraction for: + - varying target series sizes + - with and without moving window tabularization + - weights with exact matching index and longer weights + - single and multi horizon + - with and without output chunk shift + - datetime and integer index + - single and multiple series + - uni- and multivariate series + """ + ( + training_size, + use_moving_windows, + weights_longer, + ocl, + ocs, + freq, + single_series, + univar_series, + ) = config + if not isinstance(freq, int): + freq = pd.tseries.frequencies.to_offset(freq) + start = pd.Timestamp("2000-01-01") + else: + start = 1 + + train_y = linear_timeseries(start=start, length=training_size, freq=freq) + if not univar_series: + train_y.stack(train_y) + + # weights are either longer or have the exact time index as the target series + n_weights = len(train_y) + 2 * int(weights_longer) + ts_weights = TimeSeries.from_times_and_values( + times=generate_index( + start=train_y.start_time() - int(weights_longer) * freq, + length=n_weights, + freq=freq, + ), + values=np.linspace(0, 1, n_weights), + ) + if not univar_series: + ts_weights.stack(ts_weights + 1.0) + + _, y, _, _, weights = create_lagged_training_data( + lags=[-4, -1], + target_series=train_y if single_series else [train_y] * 2, + output_chunk_length=ocl, + uses_static_covariates=False, + sample_weight=ts_weights if single_series else [ts_weights] * 2, + output_chunk_shift=ocs, + use_moving_windows=use_moving_windows, + ) + + # weights shape must match label shape, since we have one + # weight per sample and predict step + assert weights.shape == y.shape + + # get the weights matching the index of the target series + weights_exact = ts_weights.values() + if weights_longer: + weights_exact = weights_exact[1:-1] + + # the weights correspond to the same sample and time index as the `y` labels + expected_weights = [] + len_y_single = len(y) if single_series else int(len(y) / 2) + for i in range(ocl): + mask = slice(-(i + len_y_single), -i if i else None) + expected_weights.append(weights_exact[mask]) + expected_weights = np.concatenate(expected_weights, axis=1)[:, ::-1] + if not single_series: + expected_weights = np.concatenate([expected_weights] * 2, axis=0) + np.testing.assert_array_almost_equal(weights[:, :, 0], expected_weights) + + @pytest.mark.parametrize( + "use_moving_windows", + [True, False], + ) + def test_invalid_sample_weights(self, use_moving_windows): + """Checks invalid weights raise error with and without moving window tabularization + - too short series + - not enough series + - invalid string + - weights shape does not match number of `series` components + """ + training_size = 10 + + train_y = linear_timeseries(length=training_size) + weights_too_short = train_y[:-2] + with pytest.raises(ValueError) as err: + _ = create_lagged_training_data( + lags=[-4, -1], + target_series=train_y, + output_chunk_length=1, + uses_static_covariates=False, + sample_weight=weights_too_short, + output_chunk_shift=0, + use_moving_windows=use_moving_windows, + ) + assert ( + str(err.value) + == "The `sample_weight` series must have at least the same times as the target `series`." + ) + + with pytest.raises(ValueError) as err: + _ = create_lagged_training_data( + lags=[-4, -1], + target_series=[train_y] * 2, + output_chunk_length=1, + uses_static_covariates=False, + sample_weight=[train_y], + output_chunk_shift=0, + use_moving_windows=use_moving_windows, + ) + assert ( + str(err.value) + == "Must specify the same number of `TimeSeries` for each series input." + ) + + with pytest.raises(ValueError) as err: + _ = create_lagged_training_data( + lags=[-4, -1], + target_series=[train_y] * 2, + output_chunk_length=1, + uses_static_covariates=False, + sample_weight="invalid", + output_chunk_shift=0, + use_moving_windows=use_moving_windows, + ) + assert str(err.value).startswith("Invalid `sample_weight` value: invalid. ") + + with pytest.raises(ValueError) as err: + _ = create_lagged_training_data( + lags=[-4, -1], + target_series=train_y, + output_chunk_length=1, + uses_static_covariates=False, + sample_weight=train_y.stack(train_y), + output_chunk_shift=0, + use_moving_windows=use_moving_windows, + ) + assert str(err.value) == ( + "The number of components in `sample_weight` must either be `1` or " + "match the number of target series components `1`" + ) diff --git a/darts/utils/data/tabularization.py b/darts/utils/data/tabularization.py index 559ad0fa74..bfb719a5e8 100644 --- a/darts/utils/data/tabularization.py +++ b/darts/utils/data/tabularization.py @@ -14,7 +14,7 @@ import pandas as pd from numpy.lib.stride_tricks import as_strided -from darts.logging import get_logger, raise_if, raise_if_not, raise_log +from darts.logging import get_logger, raise_log from darts.timeseries import TimeSeries from darts.utils.ts_utils import get_single_series, series2seq from darts.utils.utils import n_steps_between @@ -22,6 +22,7 @@ logger = get_logger(__name__) ArrayOrArraySequence = Union[np.ndarray, Sequence[np.ndarray]] +SUPPORTED_SAMPLE_WEIGHT = {"linear_decay", "exponential_decay"} def create_lagged_data( @@ -41,11 +42,13 @@ def create_lagged_data( use_moving_windows: bool = True, is_training: bool = True, concatenate: bool = True, + sample_weight: Optional[Union[str, TimeSeries, Sequence[TimeSeries]]] = None, ) -> Tuple[ ArrayOrArraySequence, Union[None, ArrayOrArraySequence], Sequence[pd.Index], Optional[Tuple[int, int]], + Optional[ArrayOrArraySequence], ]: """ Creates the features array `X` and labels array `y` to train a lagged-variables regression model (e.g. an @@ -211,6 +214,13 @@ def create_lagged_data( when `Sequence[TimeSeries]` are provided, then `X` and `y` will be arrays created by concatenating all feature/label arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. + sample_weight + Optionally, some sample weights to apply to the target `series` labels. + If a `TimeSeries` or `Sequence[TimeSeries]`, then those weights are used. The number of weights series must + match the number of target `series` and each weight series must contain at least all time steps from the + corresponding target `series`. + If a string, then the weights are generated using built-in weighting functions. The available options are + `"linear_decay"` or `"exponential_decay"`. Returns ------- @@ -233,7 +243,8 @@ def create_lagged_data( last_static_covariates_shape The last observed shape of the static covariates. This is ``None`` when `uses_static_covariates` is ``False``. - + sample_weight + The weights to apply to each observation in `X` and output step `y`, returned as a `Sequence` of `np.ndarray`. Raises ------ @@ -261,18 +272,55 @@ def create_lagged_data( tabularization.create_lagged_component_names : return the lagged features names as a list of strings. """ - raise_if( - is_training and (target_series is None), - "Must specify `target_series` if `is_training = True`.", - ) + if is_training and (target_series is None): + raise_log( + ValueError("Must specify `target_series` if `is_training = True`."), + logger=logger, + ) # ensure list of TimeSeries format target_series = series2seq(target_series) past_covariates = series2seq(past_covariates) future_covariates = series2seq(future_covariates) + + # get sample weights + if isinstance(sample_weight, str): + if sample_weight not in SUPPORTED_SAMPLE_WEIGHT: + raise_log( + ValueError( + f"Invalid `sample_weight` value: {sample_weight}. " + f"If a string, must be one of: {SUPPORTED_SAMPLE_WEIGHT}." + ), + logger=logger, + ) + if target_series is None: + raise_log( + ValueError("Must supply `target_series` when using `sample_weight`."), + logger=logger, + ) + # create global time weights based on the longest target series + max_len = max(len(target_i) for target_i in target_series) + if sample_weight == "linear_decay": + weights = np.linspace(0, 1, max_len) + else: # "exponential_decay" + time_steps = np.linspace(0, 1, max_len) + weights = np.exp(-10 * (1 - time_steps)) + weights = np.expand_dims(weights, -1) + + # create sequence of series for tabularization + sample_weight = [ + TimeSeries.from_times_and_values( + times=target_i.time_index, + values=weights[-len(target_i) :], + ) + for target_i in target_series + ] + if sample_weight is not None: + sample_weight = series2seq(sample_weight) + seq_ts_lens = [ len(seq_ts) - for seq_ts in (target_series, past_covariates, future_covariates) + for seq_ts in (target_series, past_covariates, future_covariates, sample_weight) if seq_ts is not None ] seq_ts_lens = set(seq_ts_lens) @@ -305,11 +353,12 @@ def create_lagged_data( lags_past_covariates, lags_future_covariates, ) - X, y, times = [], [], [] + X, y, times, sample_weights = [], [], [], [] for i in range(max(seq_ts_lens)): target_i = target_series[i] if target_series else None past_i = past_covariates[i] if past_covariates else None future_i = future_covariates[i] if future_covariates else None + sample_weight_i = sample_weight[i] if sample_weight else None series_equal_freq = _all_equal_freq(target_i, past_i, future_i) # component-wise lags extraction is not support with times intersection at the moment if use_moving_windows and lags_passed_as_dict and (not series_equal_freq): @@ -322,36 +371,38 @@ def create_lagged_data( logger, ) if use_moving_windows and series_equal_freq: - X_i, y_i, times_i = _create_lagged_data_by_moving_window( - target_i, - output_chunk_length, - output_chunk_shift, - past_i, - future_i, - lags, - lags_past_covariates, - lags_future_covariates, - lags_extract, - lags_order, - max_samples_per_ts, - multi_models, - check_inputs, - is_training, + X_i, y_i, times_i, weights_i = _create_lagged_data_by_moving_window( + target_series=target_i, + output_chunk_length=output_chunk_length, + output_chunk_shift=output_chunk_shift, + past_covariates=past_i, + future_covariates=future_i, + sample_weight=sample_weight_i, + lags=lags, + lags_past_covariates=lags_past_covariates, + lags_future_covariates=lags_future_covariates, + lags_extract=lags_extract, + lags_order=lags_order, + max_samples_per_ts=max_samples_per_ts, + multi_models=multi_models, + check_inputs=check_inputs, + is_training=is_training, ) else: - X_i, y_i, times_i = _create_lagged_data_by_intersecting_times( - target_i, - output_chunk_length, - output_chunk_shift, - past_i, - future_i, - lags, - lags_past_covariates, - lags_future_covariates, - max_samples_per_ts, - multi_models, - check_inputs, - is_training, + X_i, y_i, times_i, weights_i = _create_lagged_data_by_intersecting_times( + target_series=target_i, + output_chunk_length=output_chunk_length, + output_chunk_shift=output_chunk_shift, + past_covariates=past_i, + future_covariates=future_i, + sample_weight=sample_weight_i, + lags=lags, + lags_past_covariates=lags_past_covariates, + lags_future_covariates=lags_future_covariates, + max_samples_per_ts=max_samples_per_ts, + multi_models=multi_models, + check_inputs=check_inputs, + is_training=is_training, ) X_i, last_static_covariates_shape = add_static_covariates_to_lagged_data( features=X_i, @@ -362,6 +413,8 @@ def create_lagged_data( X.append(X_i) y.append(y_i) times.append(times_i) + if weights_i is not None: + sample_weights.append(weights_i) if concatenate: X = np.concatenate(X, axis=0) @@ -369,7 +422,12 @@ def create_lagged_data( y = None elif concatenate: y = np.concatenate(y, axis=0) - return X, y, times, last_static_covariates_shape + + if sample_weights and concatenate: + sample_weights = np.concatenate(sample_weights, axis=0) + elif not sample_weights: + sample_weights = None + return X, y, times, last_static_covariates_shape, sample_weights def create_lagged_training_data( @@ -388,11 +446,13 @@ def create_lagged_training_data( check_inputs: bool = True, use_moving_windows: bool = True, concatenate: bool = True, + sample_weight: Optional[Union[TimeSeries, str]] = None, ) -> Tuple[ ArrayOrArraySequence, Union[None, ArrayOrArraySequence], Sequence[pd.Index], Optional[Tuple[int, int]], + Optional[ArrayOrArraySequence], ]: """ Creates the features array `X` and labels array `y` to train a lagged-variables regression model (e.g. an @@ -465,6 +525,13 @@ def create_lagged_training_data( when `Sequence[TimeSeries]` are provided, then `X` and `y` will be arrays created by concatenating all feature/label arrays formed by each `TimeSeries` along the `0`th axis. Note that `times` is still returned as `Sequence[pd.Index]`, even when `concatenate = True`. + sample_weight + Optionally, some sample weights to apply to the target `series` labels. + If a `TimeSeries` or `Sequence[TimeSeries]`, then those weights are used. The number of weights series must + match the number of target `series` and each weight series must contain at least all time steps from the + corresponding target `series`. + If a string, then the weights are generated using built-in weighting functions. The available options are + `"linear_decay"` or `"exponential_decay"`. Returns ------- @@ -484,6 +551,8 @@ def create_lagged_training_data( gives the times of those observations formed using the `i`th `TimeSeries` object in each `Sequence`. Otherwise, if the series inputs were specified as `TimeSeries`, the only element is the times of those observations formed from the lone `TimeSeries` inputs. + sample_weight + The weights to apply to each observation in `X` and output step `y`, returned as a `Sequence` of `np.ndarray`. Raises ------ @@ -515,6 +584,7 @@ def create_lagged_training_data( use_moving_windows=use_moving_windows, is_training=True, concatenate=concatenate, + sample_weight=sample_weight, ) @@ -621,7 +691,7 @@ def create_lagged_prediction_data( If the provided series do not share the same type of `time_index` (e.g. `target_series` uses a pd.RangeIndex, but `future_covariates` uses a `pd.DatetimeIndex`). """ - X, _, times, _ = create_lagged_data( + X, _, times, _, _ = create_lagged_data( target_series=target_series, past_covariates=past_covariates, future_covariates=future_covariates, @@ -908,6 +978,7 @@ def _create_lagged_data_by_moving_window( output_chunk_shift: int, past_covariates: Optional[TimeSeries], future_covariates: Optional[TimeSeries], + sample_weight: Optional[TimeSeries], lags: Optional[Union[Sequence[int], Dict[str, List[int]]]], lags_past_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]], lags_future_covariates: Optional[Union[Sequence[int], Dict[str, List[int]]]], @@ -917,7 +988,7 @@ def _create_lagged_data_by_moving_window( multi_models: bool, check_inputs: bool, is_training: bool, -) -> Tuple[np.ndarray, np.ndarray, pd.Index]: +) -> Tuple[np.ndarray, Optional[np.ndarray], pd.Index, Optional[np.ndarray]]: """ Helper function called by `create_lagged_data` that computes `X`, `y`, and `times` by extracting 'moving windows' from each series using the `strided_moving_window` @@ -934,29 +1005,34 @@ def _create_lagged_data_by_moving_window( Assumes that all the lags are sorted in ascending order. """ feature_times, min_lags, max_lags = _get_feature_times( - target_series, - past_covariates, - future_covariates, - lags, - lags_past_covariates, - lags_future_covariates, - output_chunk_length, - output_chunk_shift, + target_series=target_series, + past_covariates=past_covariates, + future_covariates=future_covariates, + lags=lags, + lags_past_covariates=lags_past_covariates, + lags_future_covariates=lags_future_covariates, + output_chunk_length=output_chunk_length, + output_chunk_shift=output_chunk_shift, is_training=is_training, return_min_and_max_lags=True, check_inputs=check_inputs, ) if check_inputs: series_and_lags_not_specified = [max_lag is None for max_lag in max_lags] - raise_if( - all(series_and_lags_not_specified), - "Must specify at least one series-lags pair.", - ) + if all(series_and_lags_not_specified): + raise_log( + ValueError("Must specify at least one series-lags pair."), logger=logger + ) + sample_weight_vals = _process_sample_weight(sample_weight, target_series) + time_bounds = get_shared_times_bounds(*feature_times) - raise_if( - time_bounds is None, - "Specified series do not share any common times for which features can be created.", - ) + if time_bounds is None: + raise_log( + ValueError( + "Specified series do not share any common times for which features can be created." + ), + logger=logger, + ) freq = _get_freqs(target_series, past_covariates, future_covariates)[0] if isinstance(time_bounds[0], int): # `stop` is exclusive, so need `+ freq` to include end-point: @@ -1048,33 +1124,44 @@ def _create_lagged_data_by_moving_window( X = np.concatenate(X, axis=1) # Construct labels array `y`: if is_training: - # All values between times `t` and `t + output_chunk_length` used as labels: + # All values between times `t` and `t + output_chunk_length` used as labels / weights: # Window taken between times `t` and `t + output_chunk_length - 1`: first_window_start_idx = target_start_time_idx + output_chunk_shift # Add `+ 1` since end index is exclusive in Python: first_window_end_idx = ( target_start_time_idx + output_chunk_length + output_chunk_shift ) - # To create `(num_samples - 1)` other windows in addition to first window, - # must take `(num_samples - 1)` values ahead of `first_window_end_idx` - vals = target_series.all_values(copy=False)[ - first_window_start_idx : first_window_end_idx + num_samples - 1, - :, - :, - ] - windows = strided_moving_window( - x=vals, - window_len=output_chunk_length, - stride=1, - axis=0, - check_inputs=False, - ) lags_to_extract = None if multi_models else -np.ones((1,), dtype=int) - y = _extract_lagged_vals_from_windows(windows, lags_to_extract) - # Only values at times `t + output_chunk_length - 1` used as labels: + + # extract target labels and sample weights + y_and_weights = [] + for vals in [target_series.all_values(copy=False), sample_weight_vals]: + if vals is None: + y_and_weights.append(None) + continue + + # To create `(num_samples - 1)` other windows in addition to first window, + # must take `(num_samples - 1)` values ahead of `first_window_end_idx` + vals = vals[ + first_window_start_idx : first_window_end_idx + num_samples - 1, + :, + :, + ] + windows = strided_moving_window( + x=vals, + window_len=output_chunk_length, + stride=1, + axis=0, + check_inputs=False, + ) + # Only values at times `t + output_chunk_length - 1` used as labels: + vals = _extract_lagged_vals_from_windows(windows, lags_to_extract) + y_and_weights.append(vals) + + y, weights = y_and_weights else: - y = None - return X, y, times + y, weights = None, None + return X, y, times, weights def _extract_lagged_vals_from_windows( @@ -1127,6 +1214,7 @@ def _create_lagged_data_by_intersecting_times( output_chunk_shift: int, past_covariates: Optional[TimeSeries], future_covariates: Optional[TimeSeries], + sample_weight: Optional[TimeSeries], lags: Optional[Sequence[int]], lags_past_covariates: Optional[Sequence[int]], lags_future_covariates: Optional[Sequence[int]], @@ -1134,7 +1222,12 @@ def _create_lagged_data_by_intersecting_times( multi_models: bool, check_inputs: bool, is_training: bool, -) -> Tuple[np.ndarray, np.ndarray, Union[pd.RangeIndex, pd.DatetimeIndex]]: +) -> Tuple[ + np.ndarray, + Optional[np.ndarray], + Union[pd.RangeIndex, pd.DatetimeIndex], + Optional[np.ndarray], +]: """ Helper function called by `_create_lagged_data` that computes `X`, `y`, and `times` by first finding the time points in each series that *could* be used to create features/labels, @@ -1145,29 +1238,33 @@ def _create_lagged_data_by_intersecting_times( specified series are of the same frequency. """ feature_times, min_lags, _ = _get_feature_times( - target_series, - past_covariates, - future_covariates, - lags, - lags_past_covariates, - lags_future_covariates, - output_chunk_length, - output_chunk_shift, + target_series=target_series, + past_covariates=past_covariates, + future_covariates=future_covariates, + lags=lags, + lags_past_covariates=lags_past_covariates, + lags_future_covariates=lags_future_covariates, + output_chunk_length=output_chunk_length, + output_chunk_shift=output_chunk_shift, is_training=is_training, return_min_and_max_lags=True, check_inputs=check_inputs, ) if check_inputs: series_and_lags_not_specified = [min_lag is None for min_lag in min_lags] - raise_if( - all(series_and_lags_not_specified), - "Must specify at least one series-lags pair.", - ) + if all(series_and_lags_not_specified): + raise_log( + ValueError("Must specify at least one series-lags pair."), logger=logger + ) + sample_weight_vals = _process_sample_weight(sample_weight, target_series) shared_times = get_shared_times(*feature_times, sort=True) - raise_if( - shared_times is None, - "Specified series do not share any common times for which features can be created.", - ) + if shared_times is None: + raise_log( + ValueError( + "Specified series do not share any common times for which features can be created." + ), + logger=logger, + ) if len(shared_times) > max_samples_per_ts: shared_times = shared_times[-max_samples_per_ts:] X = [] @@ -1233,13 +1330,23 @@ def _create_lagged_data_by_intersecting_times( idx_to_get = ( label_shared_time_idx + output_chunk_length + output_chunk_shift - 1 ) - # Before reshaping: lagged_vals.shape = (n_observations, num_lags, n_components, n_samples) - lagged_vals = target_series.all_values(copy=False)[idx_to_get, :, :] - # After reshaping: lagged_vals.shape = (n_observations, num_lags*n_components, n_samples) - y = lagged_vals.reshape(lagged_vals.shape[0], -1, lagged_vals.shape[-1]) + + # extract target labels and sample weights + y_and_weights = [] + for vals in [target_series.all_values(copy=False), sample_weight_vals]: + if vals is None: + y_and_weights.append(None) + continue + + # Before reshaping: lagged_vals.shape = (n_observations, num_lags, n_components, n_samples) + vals = vals[idx_to_get, :, :] + # After reshaping: lagged_vals.shape = (n_observations, num_lags*n_components, n_samples) + vals = vals.reshape(vals.shape[0], -1, vals.shape[-1]) + y_and_weights.append(vals) + y, weights = y_and_weights else: - y = None - return X, y, shared_times + y, weights = None, None + return X, y, shared_times, weights def _create_lagged_data_autoregression( @@ -1522,15 +1629,17 @@ def _get_feature_times( a regression model without using autoregressive features. """ - raise_if( - is_training and (target_series is None), - "Must specify `target_series` when `is_training = True`.", - ) - if check_inputs: - raise_if( - not isinstance(output_chunk_length, int) or output_chunk_length < 1, - "`output_chunk_length` must be a positive `int`.", + if is_training and (target_series is None): + raise_log( + ValueError("Must specify `target_series` when `is_training = True`."), + logger=logger, ) + if check_inputs: + if not isinstance(output_chunk_length, int) or output_chunk_length < 1: + raise_log( + ValueError("`output_chunk_length` must be a positive `int`."), + logger=logger, + ) _check_lags(lags, lags_past_covariates, lags_future_covariates) feature_times, min_lags, max_lags = [], [], [] for name_i, series_i, lags_i in zip( @@ -1677,14 +1786,14 @@ def intersection_func(series_or_times_1, series_or_times_2): type(ts.time_index if isinstance(ts, TimeSeries) else ts) for ts in specified_inputs ] - raise_if_not( - len(set(times_types)) == 1, - ( - "Specified series and/or times must all " - "have the same type of `time_index` (i.e. all " - "`pd.RangeIndex` or all `pd.DatetimeIndex`)." - ), - ) + if not len(set(times_types)) == 1: + raise_log( + ValueError( + "Specified series and/or times must all have the same type of " + "`time_index` (i.e. all `pd.RangeIndex` or all `pd.DatetimeIndex`)." + ), + logger=logger, + ) return shared_times @@ -1747,14 +1856,14 @@ def get_shared_times_bounds( bounds = None else: times_types = [type(time) for time in start_times] - raise_if_not( - len(set(times_types)) == 1, - ( - "Specified series and/or times must all " - "have the same type of `time_index` " - "(i.e. all `pd.RangeIndex` or all `pd.DatetimeIndex`)." - ), - ) + if not len(set(times_types)) == 1: + raise_log( + ValueError( + "Specified series and/or times must all have the same type of " + "`time_index` (i.e. all `pd.RangeIndex` or all `pd.DatetimeIndex`)." + ), + logger=logger, + ) # If `start_times` empty, no series were specified -> `bounds = (1, -1)` will # be 'converted' to `None` in next line: bounds = (max(start_times), min(end_times)) if start_times else (1, -1) @@ -1824,22 +1933,22 @@ def strided_moving_window( .. [1] https://numpy.org/doc/stable/reference/generated/numpy.lib.stride_tricks.as_strided.html """ if check_inputs: - raise_if( - not isinstance(stride, int) or stride < 1, - "`stride` must be a positive `int`.", - ) - raise_if( - not isinstance(window_len, int) or window_len < 1, - "`window_len` must be a positive `int`.", - ) - raise_if( - not isinstance(axis, int) or axis > x.ndim - 1 or axis < -x.ndim, - "`axis` must be an `int` that is less than `x.ndim`.", - ) - raise_if( - window_len > x.shape[axis], - "`window_len` must be less than or equal to x.shape[axis].", - ) + if not isinstance(stride, int) or stride < 1: + raise_log(ValueError("`stride` must be a positive `int`."), logger=logger) + if not isinstance(window_len, int) or window_len < 1: + raise_log( + ValueError("`window_len` must be a positive `int`."), logger=logger + ) + if not isinstance(axis, int) or axis > x.ndim - 1 or axis < -x.ndim: + raise_log( + ValueError("`axis` must be an `int` that is less than `x.ndim`."), + logger=logger, + ) + if window_len > x.shape[axis]: + raise_log( + ValueError("`window_len` must be less than or equal to x.shape[axis]."), + logger=logger, + ) num_windows = (x.shape[axis] - window_len) // stride + 1 new_shape = list(x.shape) new_shape[axis] = num_windows @@ -1916,14 +2025,22 @@ def _check_lags( if isinstance(lags_i, dict): lags_i = list(set(chain(*lags_i.values()))) - raise_if( - any((lag > max_lag or not isinstance(lag, int)) for lag in lags_i), - f"`lags{suffix}` must be a `Sequence` or `Dict` containing only `int` values less than {max_lag + 1}.", - ) - raise_if( - all(lags_is_none), - "Must specify at least one of: `lags`, `lags_past_covariates`, `lags_future_covariates`.", - ) + if any((lag > max_lag or not isinstance(lag, int)) for lag in lags_i): + raise_log( + ValueError( + f"`lags{suffix}` must be a `Sequence` or `Dict` containing only `int` " + f"values less than {max_lag + 1}." + ), + logger=logger, + ) + + if all(lags_is_none): + raise_log( + ValueError( + "Must specify at least one of: `lags`, `lags_past_covariates`, `lags_future_covariates`." + ), + logger=logger, + ) return None @@ -1958,12 +2075,45 @@ def _check_series_length( minimum_len_str = f"-min({lags_name}) + max({lags_name}) + 1" minimum_len = -min(lags) + max(lags) + 1 if lags_specified: - raise_if( - series.n_timesteps < minimum_len, - ( - f"`{name}` must have at least " - f"`{minimum_len_str}` = {minimum_len} time steps; " - f"instead, it only has {series.n_timesteps}." + if series.n_timesteps < minimum_len: + raise_log( + ValueError( + f"`{name}` must have at least `{minimum_len_str}` = {minimum_len} time " + f"steps; instead, it only has {series.n_timesteps}." + ), + logger=logger, + ) + return None + + +def _process_sample_weight(sample_weight, target_series): + """Checks that sample weights are valid, and returns the values of the weights.""" + if sample_weight is None: + return None + + if target_series is None: + raise_log( + ValueError("Must supply `target_series` when using `sample_weight`."), + logger=logger, + ) + sample_weight_vals = sample_weight.slice_intersect_values(target_series, copy=False) + if len(sample_weight_vals) != len(target_series): + raise_log( + ValueError( + "The `sample_weight` series must have at least the same times as the target `series`." ), + logger=logger, ) - return None + weight_n_comp = sample_weight_vals.shape[1] + series_n_comp = target_series.n_components + if weight_n_comp > 1 and weight_n_comp != series_n_comp: + raise_log( + ValueError( + "The number of components in `sample_weight` must either be `1` or match " + f"the number of target series components `{series_n_comp}`" + ), + logger=logger, + ) + elif weight_n_comp != series_n_comp: + sample_weight_vals = sample_weight_vals.repeat(series_n_comp, axis=1) + return sample_weight_vals diff --git a/darts/utils/multioutput.py b/darts/utils/multioutput.py index 9ad54edddd..e45b5c7b9b 100644 --- a/darts/utils/multioutput.py +++ b/darts/utils/multioutput.py @@ -1,3 +1,5 @@ +from typing import Optional + from sklearn import __version__ as sklearn_version from sklearn.base import is_classifier from sklearn.multioutput import MultiOutputRegressor as sk_MultiOutputRegressor @@ -5,6 +7,8 @@ from sklearn.utils.multiclass import check_classification_targets from sklearn.utils.validation import has_fit_parameter +from darts.logging import get_logger, raise_log + if sklearn_version >= "1.4": # sklearn renamed `_check_fit_params` to `_check_method_params` in v1.4 from sklearn.utils.validation import _check_method_params @@ -18,6 +22,8 @@ from joblib import Parallel from sklearn.utils.fixes import delayed +logger = get_logger(__name__) + class MultiOutputRegressor(sk_MultiOutputRegressor): """ @@ -25,6 +31,20 @@ class MultiOutputRegressor(sk_MultiOutputRegressor): validation data correctly. The validation data has to be passed as parameter ``eval_set`` in ``**fit_params``. """ + def __init__( + self, + *args, + eval_set_name: Optional[str] = None, + eval_weight_name: Optional[str] = None, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.eval_set_name_ = eval_set_name + self.eval_weight_name_ = eval_weight_name + self.estimators_ = None + self.n_features_in_ = None + self.feature_names_in_ = None + def fit(self, X, y, sample_weight=None, **fit_params): """Fit the model to data, separately for each output variable. @@ -37,7 +57,7 @@ def fit(self, X, y, sample_weight=None, **fit_params): Multi-output targets. An indicator matrix turns on multilabel estimation. - sample_weight : array-like of shape (n_samples,), default=None + sample_weight : array-like of shape (n_samples, n_outputs), default=None Sample weights. If `None`, then samples are equally weighted. Only supported if the underlying regressor supports sample weights. @@ -54,7 +74,10 @@ def fit(self, X, y, sample_weight=None, **fit_params): """ if not hasattr(self.estimator, "fit"): - raise ValueError("The base estimator should implement a fit method") + raise_log( + ValueError("The base estimator should implement a fit method"), + logger=logger, + ) y = self._validate_data(X="no_validation", y=y, multi_output=True) @@ -62,45 +85,52 @@ def fit(self, X, y, sample_weight=None, **fit_params): check_classification_targets(y) if y.ndim == 1: - raise ValueError( - "y must have at least two dimensions for " - "multi-output regression but has only one." + raise_log( + ValueError( + "`y` must have at least two dimensions for multi-output regression but has only one." + ), + logger=logger, + ) + if sample_weight is not None and ( + sample_weight.ndim == 1 or sample_weight.shape[1] != y.shape[1] + ): + raise_log( + ValueError("`sample_weight` must have the same dimensions as `y`."), + logger=logger, ) if sample_weight is not None and not has_fit_parameter( self.estimator, "sample_weight" ): - raise ValueError("Underlying estimator does not support sample weights.") + raise_log( + ValueError("Underlying estimator does not support sample weights."), + logger=logger, + ) fit_params_validated = _check_method_params(X, fit_params) - - if "eval_set" in fit_params_validated.keys(): - # with validation set - eval_set = fit_params_validated.pop("eval_set") - self.estimators_ = Parallel(n_jobs=self.n_jobs)( - delayed(_fit_estimator)( - self.estimator, - X, - y[:, i], - sample_weight, - # eval set may be a list (for XGBRegressor), in which case we have to keep it as a list - eval_set=( - [(eval_set[0][0], eval_set[0][1][:, i])] - if isinstance(eval_set, list) - else (eval_set[0], eval_set[1][:, i]) - ), - **fit_params_validated, - ) - for i in range(y.shape[1]) - ) - else: - # without validation set - self.estimators_ = Parallel(n_jobs=self.n_jobs)( - delayed(_fit_estimator)( - self.estimator, X, y[:, i], sample_weight, **fit_params_validated - ) - for i in range(y.shape[1]) + eval_set = fit_params_validated.pop(self.eval_set_name_, None) + eval_weight = fit_params_validated.pop(self.eval_weight_name_, None) + + self.estimators_ = Parallel(n_jobs=self.n_jobs)( + delayed(_fit_estimator)( + self.estimator, + X, + y[:, i], + sample_weight=sample_weight[:, i] + if sample_weight is not None + else None, + **( + {self.eval_set_name_: [eval_set[i]]} if eval_set is not None else {} + ), + **( + {self.eval_weight_name_: [eval_weight[i]]} + if eval_weight is not None + else {} + ), + **fit_params_validated, ) + for i in range(y.shape[1]) + ) if hasattr(self.estimators_[0], "n_features_in_"): self.n_features_in_ = self.estimators_[0].n_features_in_