diff --git a/.github/workflows/oasislmf-unittest.yml b/.github/workflows/oasislmf-unittest.yml index 276e1bcaf2..cdbdf98047 100644 --- a/.github/workflows/oasislmf-unittest.yml +++ b/.github/workflows/oasislmf-unittest.yml @@ -35,8 +35,7 @@ jobs: - name: Install pip-tools run: | python -m pip install --upgrade pip - pip install pip-tools - + pip install --upgrade pip-tools - name: Pip Compile run: | rm -f requirements.txt diff --git a/README.md b/README.md index 6369f55384..bced70a95d 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,6 @@ [![Binder](https://static.mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/OasisLMF/OasisLMF/develop?filepath=FmTesting.ipynb) # OasisLMF - The `oasislmf` Python package, loosely called the *model development kit (MDK)* or the *MDK package*, provides a command line toolkit for developing, testing and running Oasis models end-to-end locally, or remotely via the Oasis API. It can generate ground-up losses (GUL), direct/insured losses (IL) and reinsurance losses (RIL). It can also generate deterministic losses at all these levels. diff --git a/oasislmf/computation/generate/files.py b/oasislmf/computation/generate/files.py index f886522f30..de08218af3 100644 --- a/oasislmf/computation/generate/files.py +++ b/oasislmf/computation/generate/files.py @@ -8,6 +8,7 @@ import json import os from pathlib import Path +from typing import List from ods_tools import get_ods_fields, read_csv, read_parquet @@ -74,9 +75,11 @@ GULSummaryXrefFile, FMSummaryXrefFile ) - +from oasislmf.preparation.correlations import map_data +from oasislmf.preparation.gul_inputs import process_group_id_cols +from oasislmf.utils.data import establish_correlations +from oasislmf.pytools.data_layer.oasis_files.correlations import CorrelationsData from ...utils.forex import create_currency_rates, manage_multiple_currency -from oasislmf.preparation.correlations import get_correlation_input_items class GenerateFiles(ComputationStep): @@ -110,7 +113,7 @@ class GenerateFiles(ComputationStep): {'name': 'disable_summarise_exposure', 'flag':'-S', 'default': False, 'type': str2bool, 'const':True, 'nargs':'?', 'help': 'Disables creation of an exposure summary report'}, {'name': 'group_id_cols', 'flag':'-G', 'nargs':'+', 'help': 'Columns from loc file to set group_id', 'default': GROUP_ID_COLS}, {'name': 'lookup_multiprocessing', 'type': str2bool, 'const': False, 'nargs':'?', 'default': False, 'help': 'Flag to enable/disable lookup multiprocessing'}, - {"name": "hashed_group_id", "type": str2bool, "const": False, 'nargs':'?', "default": False, "help": "Hashes the group_id in the items.bin"}, + {"name": "hashed_group_id", 'type': str2bool, 'const': False, 'nargs': '?', 'default': True, 'help': "Hashes the group_id in the items.bin"}, # Manager only options (pass data directy instead of filepaths) {'name': 'lookup_config'}, @@ -245,14 +248,18 @@ def run(self): # Columns from loc file to assign group_id model_group_fields = None - if self.model_settings_json: + correlations: bool = False + model_settings = None + + if self.model_settings_json is not None: + model_settings = get_model_settings(self.model_settings_json) + correlations = establish_correlations(model_settings=model_settings) try: - model_group_fields = get_model_settings( - self.model_settings_json, key='data_settings' - ).get('group_fields') + model_group_fields = model_settings["data_settings"].get("group_fields") except (KeyError, AttributeError, OasisException) as e: self.logger.warn('WARNING: Failed to load {} - {}'.format(self.model_settings_json, e)) + # load group columns from model_settings.json if not set in kwargs (CLI) if model_group_fields and not self.kwargs.get('group_id_cols'): group_id_cols = model_group_fields @@ -261,22 +268,19 @@ def run(self): group_id_cols = self.group_id_cols group_id_cols = list(map(lambda col: col.lower(), group_id_cols)) + group_id_cols: List[str] = process_group_id_cols(group_id_cols=group_id_cols, + exposure_df_columns=list(location_df), + has_correlation_groups=correlations) gul_inputs_df = get_gul_input_items( location_df, keys_df, + peril_correlation_group_df=map_data(data=model_settings), + correlations=correlations, exposure_profile=location_profile, group_id_cols=group_id_cols, - hash_group_ids=self.hashed_group_id, + hashed_group_id=self.hashed_group_id ) - if self.model_settings_json is not None: - correlation_input_items = get_correlation_input_items( - model_settings_path=self.model_settings_json, - gul_inputs_df=gul_inputs_df - ) - else: - correlation_input_items = None - # If not in det. loss gen. scenario, write exposure summary file if summarise_exposure: write_exposure_summary( @@ -294,16 +298,15 @@ def run(self): # Write the GUL input files files_prefixes = self.oasis_files_prefixes + gul_input_files = write_gul_input_files( gul_inputs_df, target_dir, - correlations_df=correlation_input_items, + correlations_df=gul_inputs_df[CorrelationsData.COLUMNS] if correlations is True else None, output_dir=self._get_output_dir(), oasis_files_prefixes=files_prefixes['gul'], chunksize=self.write_chunksize, - hashed_item_id=self.hashed_group_id ) - gul_summary_mapping = get_summary_mapping(gul_inputs_df, oed_hierarchy) write_mapping_file(gul_summary_mapping, target_dir) diff --git a/oasislmf/computation/run/exposure.py b/oasislmf/computation/run/exposure.py index a332851b84..82f322095e 100644 --- a/oasislmf/computation/run/exposure.py +++ b/oasislmf/computation/run/exposure.py @@ -54,6 +54,7 @@ class RunExposure(ComputationStep): {'name': 'fmpy_low_memory', 'default': False, 'type': str2bool, 'const':True, 'nargs':'?', 'help': 'use memory map instead of RAM to store loss array (may decrease performance but reduce RAM usage drastically)'}, {'name': 'fmpy_sort_output', 'default': True, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'order fmpy output by item_id'}, {'name': 'stream_type', 'flag':'-t', 'default': 2, 'type':int, 'help': 'Set the IL input stream type, 2 = default loss stream, 1 = deprecated cov/item stream'}, + {"name": "hashed_group_id", "default": True, "type": str2bool, "const": False, 'nargs': '?', "help": "Hashes the group_id in the items.bin"}, {'name': 'net_ri', 'default': True}, {'name': 'include_loss_factor', 'default': True}, {'name': 'print_summary', 'default': True}, @@ -120,6 +121,7 @@ def run(self): oed_info_csv=ri_info_fp, oed_scope_csv=ri_scope_fp, keys_data_csv=keys_fp, + hashed_group_id=self.hashed_group_id, ).run() # 3. Run Deterministic Losses @@ -312,6 +314,7 @@ class RunFmTest(ComputationStep): {'name': 'fmpy_low_memory', 'default': False, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'use memory map instead of RAM to store loss array (may decrease performance but reduce RAM usage drastically)'}, {'name': 'fmpy_sort_output', 'default': True, 'type': str2bool, 'const': True, 'nargs': '?', 'help': 'order fmpy output by item_id'}, {'name': 'update_expected', 'default': False}, + {'name': 'hashed_group_id', 'default': False}, {'name': 'expected_output_dir', 'default': "expected"}, ] @@ -408,7 +411,8 @@ def execute_test_case(self, test_case): num_subperils=self.num_subperils, fmpy=self.fmpy, fmpy_low_memory=self.fmpy_low_memory, - fmpy_sort_output=self.fmpy_sort_output + fmpy_sort_output=self.fmpy_sort_output, + hashed_group_id=self.hashed_group_id, ).run() expected_data_dir = os.path.join(test_dir, self.expected_output_dir) diff --git a/oasislmf/preparation/correlations.py b/oasislmf/preparation/correlations.py index 6a5fed0bc6..bfea8c2713 100644 --- a/oasislmf/preparation/correlations.py +++ b/oasislmf/preparation/correlations.py @@ -8,7 +8,7 @@ from oasislmf.utils.data import get_model_settings -def map_data(data: dict) -> Optional[pd.DataFrame]: +def map_data(data: Optional[dict]) -> Optional[pd.DataFrame]: """ Maps data from the model settings to to have Peril ID, peril_correlation_group, and correlation_value. @@ -17,40 +17,37 @@ def map_data(data: dict) -> Optional[pd.DataFrame]: Returns: (pd.DataFrame) the mapped data """ - supported_perils = data.get("lookup_settings", {}).get("supported_perils", []) - correlation_settings = data.get("correlation_settings", []) + if data is not None: + supported_perils = data.get("lookup_settings", {}).get("supported_perils", []) + correlation_settings = data.get("correlation_settings", []) - for supported_peril in supported_perils: - supported_peril["peril_correlation_group"] = supported_peril.get("peril_correlation_group", 0) + for supported_peril in supported_perils: + supported_peril["peril_correlation_group"] = supported_peril.get("peril_correlation_group", 0) - supported_perils_df = pd.DataFrame(supported_perils) - correlation_settings_df = pd.DataFrame(correlation_settings) + supported_perils_df = pd.DataFrame(supported_perils) + correlation_settings_df = pd.DataFrame(correlation_settings) - # merge allows duplicates of the "peril_correlation_group" in the supported perils - # merge does not allow duplicates of the "peril_correlation_group" in the correlation settings - if len(supported_perils_df) > 0 and len(correlation_settings_df) > 0: - mapped_data = pd.merge(supported_perils_df, correlation_settings_df, on="peril_correlation_group") - return mapped_data + # merge allows duplicates of the "peril_correlation_group" in the supported perils + # merge does not allow duplicates of the "peril_correlation_group" in the correlation settings + if len(supported_perils_df) > 0 and len(correlation_settings_df) > 0: + mapped_data = pd.merge(supported_perils_df, correlation_settings_df, on="peril_correlation_group") + return mapped_data -def get_correlation_input_items(model_settings_path: str, gul_inputs_df: pd.DataFrame) -> pd.DataFrame: +def get_correlation_input_items(gul_inputs_df: pd.DataFrame, correlation_map_df: pd.DataFrame) -> pd.DataFrame: """ Gets the correlation values with the peril ID from the model_settings. Args: - model_settings_path: (str) the path to the model settings JSON file + correlation_map_df: (pd.DataFrame) data from the model settings to to have Peril ID, peril_correlation_group, + and correlation_value gul_inputs_df: (pd.DataFrame) the data of the gul inputs to be mapped Returns: (pd.DataFrame) the mapped data of correlations """ - model_settings_raw_data: dict = get_model_settings(model_settings_fp=model_settings_path) - correlation_map_df = map_data(data=model_settings_raw_data) + gul_inputs_df = gul_inputs_df.merge(correlation_map_df, left_on='peril_id', right_on='id').reset_index() + gul_inputs_df["correlation_value"] = gul_inputs_df["correlation_value"].astype(float) + gul_inputs_df = gul_inputs_df.reindex(columns=list(gul_inputs_df)) - if correlation_map_df is not None: - gul_inputs_df = gul_inputs_df.merge(correlation_map_df, left_on='peril_id', right_on='id').reset_index() - gul_inputs_df["correlation_value"] = gul_inputs_df["correlation_value"].astype(float) - gul_inputs_df = gul_inputs_df.reindex(columns=list(gul_inputs_df)) - - correlation_df = gul_inputs_df[["item_id", "peril_correlation_group", "correlation_value"]] - return correlation_df.sort_values('item_id') - return pd.DataFrame(columns=["item_id", "peril_correlation_group", "correlation_value"]) + correlation_df = gul_inputs_df[["item_id", "peril_correlation_group", "correlation_value"]] + return correlation_df.sort_values('item_id') diff --git a/oasislmf/preparation/gul_inputs.py b/oasislmf/preparation/gul_inputs.py index a90421a9cc..f3e5ad7aa6 100644 --- a/oasislmf/preparation/gul_inputs.py +++ b/oasislmf/preparation/gul_inputs.py @@ -10,6 +10,7 @@ import sys import warnings from collections import OrderedDict +from typing import List import pandas as pd @@ -44,14 +45,66 @@ pd.options.mode.chained_assignment = None warnings.simplefilter(action='ignore', category=FutureWarning) +VALID_OASIS_GROUP_COLS = [ + 'item_id', + 'peril_id', + 'coverage_id', + 'coverage_type_id', + 'peril_correlation_group' + ] + + +def process_group_id_cols(group_id_cols, exposure_df_columns, has_correlation_groups): + """ + cleans out columns that are not valid oasis group columns. + + Valid group id columns can be either + 1. exist in the location file + 2. be listed as a useful internal col + + Args: + group_id_cols: (List[str]) the ID columns that are going to be filtered + exposure_df_columns: (List[str]) the columns in the exposure dataframe + has_correlation_groups: (bool) if set to True means that we are hashing with correlations in mind therefore the + "peril_correlation_group" column is added + + Returns: (List[str]) the filtered columns + """ + for col in group_id_cols: + if col not in list(exposure_df_columns) + VALID_OASIS_GROUP_COLS: + warnings.warn('Column {} not found in loc file, or a valid internal oasis column'.format(col)) + group_id_cols.remove(col) + + peril_correlation_group = 'peril_correlation_group' + if peril_correlation_group not in group_id_cols and has_correlation_groups is True: + group_id_cols.append(peril_correlation_group) + return group_id_cols + + +def hash_group_id(gul_inputs_df: pd.DataFrame, hashing_columns: List[str]) -> pd.DataFrame: + """ + Creates a hash for the group ID field for the input data frame. + + Args: + gul_inputs_df: (pd.DataFrame) the gul inputs that are doing the have the group_id field rewritten with a hash + hashing_columns: (List[str]) the list of columns used in the hashing algorithm + + Returns: (pd.DataFrame) the gul_inputs_df with the new hashed group_id + """ + gul_inputs_df["group_id"] = (pd.util.hash_pandas_object(gul_inputs_df[hashing_columns], + index=False).to_numpy() >> 33) + return gul_inputs_df + @oasis_log def get_gul_input_items( exposure_df, keys_df, + correlations=False, + peril_correlation_group_df=None, exposure_profile=get_default_exposure_profile(), - group_id_cols=['loc_id'], - hash_group_ids=False + group_id_cols=["PortNumber", "AccNumber", "LocNumber"], + hashed_group_id=True ): """ Generates and returns a Pandas dataframe of GUL input items. @@ -62,6 +115,9 @@ def get_gul_input_items( :param keys_df: Keys dataframe :type keys_df: pandas.DataFrame + :param output_dir: the output directory where input files are stored + :type output_dir: str + :param exposure_profile: Exposure profile :type exposure_profile: dict @@ -144,20 +200,12 @@ def get_gul_input_items( # Remove any duplicate column names used to assign group_id group_id_cols = list(set(group_id_cols)) - # Ignore any column names used to assign group_id that are missing or not supported - # Valid group id columns can be either - # 1. exist in the location file - # 2. be listed as a useful internal col - valid_oasis_group_cols = [ - 'item_id', - 'peril_id', - 'coverage_id', - 'coverage_type_id', - ] - for col in group_id_cols: - if col not in list(exposure_df.columns) + valid_oasis_group_cols: - warnings.warn('Column {} not found in loc file, or a valid internal oasis column'.format(col)) - group_id_cols.remove(col) + # it is assumed that correlations are False for now, correlations for group ID hashing are assessed later on in + # the process to re-hash the group ID with the correlation "peril_correlation_group" column name. This is because + # the correlations is achieved later in the process leading to a chicken and egg problem + # group_id_cols = process_group_id_cols(group_id_cols=group_id_cols, + # exposure_df_columns=list(exposure_df.columns), + # has_correlation_groups=False) # Should list of column names used to group_id be empty, revert to # default @@ -167,7 +215,7 @@ def get_gul_input_items( # Only add group col if not internal oasis col missing_group_id_cols = [] for col in group_id_cols: - if col in valid_oasis_group_cols: + if col in VALID_OASIS_GROUP_COLS: pass elif col not in exposure_df_gul_inputs_cols: missing_group_id_cols.append(col) @@ -279,7 +327,7 @@ def get_gul_input_items( # Concatenate chunks. Sort by index to preserve item_id order in generated outputs compared # to original code. - gul_inputs_df = pd.concat(gul_inputs_reformatted_chunks).sort_index().reset_index() + gul_inputs_df = pd.concat(gul_inputs_reformatted_chunks).sort_index().reset_index(drop=True) # Set default values and data types for BI coverage boolean, TIV, deductibles and limit dtypes = { **{t: 'uint8' for t in term_cols_ints + terms_ints}, @@ -312,12 +360,10 @@ def get_gul_input_items( # directly, otherwise create an index of the group id fields group_id_cols.sort() - col_key = group_id_cols[0] - if correlation_check is True: gul_inputs_df['group_id'] = gul_inputs_df[correlation_group_id] - elif hash_group_ids is False: + elif hashed_group_id is False: if len(group_id_cols) > 1: gul_inputs_df['group_id'] = factorize_ndarray( @@ -331,9 +377,15 @@ def get_gul_input_items( gul_inputs_df[group_id_cols[0]].values )[0] - # this block gets fired if the hash_group_ids is True + # this block gets fired if the hashed_group_id is True + elif correlations is False: + gul_inputs_df["group_id"] = (pd.util.hash_pandas_object(gul_inputs_df[group_id_cols], + index=False).to_numpy() >> 33) else: - gul_inputs_df["group_id"] = (pd.util.hash_pandas_object(gul_inputs_df[group_id_cols]).to_numpy() >> 33) + # do merge with peril correlation df + gul_inputs_df = gul_inputs_df.merge(peril_correlation_group_df, left_on='peril_id', right_on='id').reset_index() + gul_inputs_df["group_id"] = (pd.util.hash_pandas_object(gul_inputs_df[group_id_cols], + index=False).to_numpy() >> 33) gul_inputs_df['group_id'] = gul_inputs_df['group_id'].astype('uint32') @@ -347,6 +399,9 @@ def get_gul_input_items( (['model_data'] if 'model_data' in gul_inputs_df else []) + ['is_bi_coverage', 'group_id', 'coverage_id', 'item_id', 'status'] ) + if correlations is True: + usecols += ["peril_correlation_group", "correlation_value"] + usecols = [col for col in usecols if col in gul_inputs_df] gul_inputs_df = gul_inputs_df[usecols] @@ -446,7 +501,6 @@ def write_gul_input_files( output_dir, oasis_files_prefixes=copy.deepcopy(OASIS_FILES_PREFIXES['gul']), chunksize=(2 * 10 ** 5), - hashed_item_id=False ): """ Writes the standard Oasis GUL input files to a target directory, using a @@ -477,11 +531,13 @@ def write_gul_input_files( # Clean the target directory path target_dir = as_path(target_dir, 'Target IL input files directory', is_dir=True, preexists=False) + if correlations_df is None: + correlations_df = pd.DataFrame(columns=['item_id', 'peril_correlation_group', 'correlation_value']) + # write the correlations to a binary file - if correlations_df is not None: - correlation_data_handle = CorrelationsData(data=correlations_df) - correlation_data_handle.to_bin(file_path=f"{output_dir}/correlations.bin") - correlation_data_handle.to_csv(file_path=f"{output_dir}/correlations.csv") + correlation_data_handle = CorrelationsData(data=correlations_df) + correlation_data_handle.to_bin(file_path=f"{output_dir}/correlations.bin") + correlation_data_handle.to_csv(file_path=f"{output_dir}/correlations.csv") # Set chunk size for writing the CSV files - default is the minimum of 100K # or the GUL inputs frame size @@ -504,9 +560,4 @@ def write_gul_input_files( for fn in gul_input_files: getattr(this_module, 'write_{}_file'.format(fn))(gul_inputs_df.copy(deep=True), gul_input_files[fn], chunksize) - # if hashed_item_id is True: - # input_file = gul_input_files["items"] - # input_directory = "/".join(input_file.split("/")[:-1]) + "/" - # convert_item_csv_to_hash(input_directory=input_directory) - return gul_input_files diff --git a/oasislmf/pytools/data_layer/oasis_files/correlations.py b/oasislmf/pytools/data_layer/oasis_files/correlations.py index 75de73ad79..afdd9c4547 100644 --- a/oasislmf/pytools/data_layer/oasis_files/correlations.py +++ b/oasislmf/pytools/data_layer/oasis_files/correlations.py @@ -16,6 +16,8 @@ class CorrelationsData: Attributes: data (Optional[pd.DataFrame): correlation data that is either loaded or saved """ + COLUMNS = ["item_id", "peril_correlation_group", "correlation_value"] + def __init__(self, data: Optional[pd.DataFrame] = None) -> None: """ The constructor for the CorrelationsData class. diff --git a/oasislmf/pytools/gul/io.py b/oasislmf/pytools/gul/io.py index cdc065256f..e3d4c31c92 100644 --- a/oasislmf/pytools/gul/io.py +++ b/oasislmf/pytools/gul/io.py @@ -268,6 +268,7 @@ def stream_to_data(int32_mv, valid_buf, size_cdf_entry, last_event_id, item_map, seeds[rng_index] = generate_hash(group_id, last_event_id) this_rng_index = rng_index rng_index += 1 + else: this_rng_index = group_id_rng_index[group_id] diff --git a/oasislmf/pytools/gul/manager.py b/oasislmf/pytools/gul/manager.py index c6fd4b3d13..445fdd720d 100644 --- a/oasislmf/pytools/gul/manager.py +++ b/oasislmf/pytools/gul/manager.py @@ -12,8 +12,11 @@ from numba import njit from numba.typed import Dict, List +from oasislmf.pytools.data_layer.conversions.correlations import CorrelationsData + from oasislmf.pytools.getmodel.manager import get_damage_bins, Item -from oasislmf.pytools.getmodel.common import oasis_float, Keys + +from oasislmf.pytools.getmodel.common import oasis_float, Keys, Correlation from oasislmf.pytools.gul.common import ( MEAN_IDX, STD_DEV_IDX, TIV_IDX, CHANCE_OF_LOSS_IDX, MAX_LOSS_IDX, NUM_IDX, @@ -22,8 +25,14 @@ ) from oasislmf.pytools.gul.io import ( write_negative_sidx, write_sample_header, - write_sample_rec, read_getmodel_stream + write_sample_rec, read_getmodel_stream, +) + +from oasislmf.pytools.gul.random import ( + get_random_generator, compute_norm_cdf_lookup, + compute_norm_inv_cdf_lookup, get_corr_rval, generate_correlated_hash_vector ) + from oasislmf.pytools.gul.random import get_random_generator from oasislmf.pytools.gul.core import split_tiv_classic, split_tiv_multiplicative, get_gul, setmaxloss, compute_mean_loss from oasislmf.pytools.gul.utils import append_to_dict_value, binary_search @@ -117,7 +126,7 @@ def generate_item_map(items, coverages): def run(run_dir, ignore_file_type, sample_size, loss_threshold, alloc_rule, debug, - random_generator, peril_filter=[], file_in=None, file_out=None, **kwargs): + random_generator, peril_filter=[], file_in=None, file_out=None, ignore_correlation=False, **kwargs): """Execute the main gulpy worklow. Args: @@ -131,6 +140,7 @@ def run(run_dir, ignore_file_type, sample_size, loss_threshold, alloc_rule, debu random_generator (int): random generator function id. file_in (str, optional): filename of input stream. Defaults to None. file_out (str, optional): filename of output stream. Defaults to None. + ignore_correlation (bool): if True, do not compute correlated random samples. Raises: ValueError: if alloc_rule is not 0, 1, or 2. @@ -216,6 +226,56 @@ def run(run_dir, ignore_file_type, sample_size, loss_threshold, alloc_rule, debu # create the array to store the seeds seeds = np.zeros(len(np.unique(items['group_id'])), dtype=Item.dtype['group_id']) + do_correlation = False + if ignore_correlation: + logger.info(f"Correlated random number generation: switched OFF because --ignore-correlation is True.") + + else: + file_path = os.path.join(input_path, 'correlations.bin') + data = CorrelationsData.from_bin(file_path=file_path).data + Nperil_correlation_groups = len(data) + logger.info(f"Detected {Nperil_correlation_groups} peril correlation groups.") + + if Nperil_correlation_groups > 0 and any(data['correlation_value'] > 0): + do_correlation = True + else: + logger.info(f"Correlated random number generation: switched OFF because 0 peril correlation groups were detected or " + "the correlation value is zero for all peril correlation groups.") + + if do_correlation: + logger.info(f"Correlated random number generation: switched ON.") + + corr_data_by_item_id = np.ndarray(Nperil_correlation_groups + 1, dtype=Correlation) + corr_data_by_item_id[0] = (0, 0.) + corr_data_by_item_id[1:]['peril_correlation_group'] = np.array(data['peril_correlation_group']) + corr_data_by_item_id[1:]['correlation_value'] = np.array(data['correlation_value']) + + logger.info( + f"Correlation values for {Nperil_correlation_groups} peril correlation groups have been imported." + ) + + unique_peril_correlation_groups = np.unique(corr_data_by_item_id[1:]['peril_correlation_group']) + + # pre-compute lookup tables for the Gaussian cdf and inverse cdf + # Notes: + # - the size `arr_N` and `arr_N_cdf` can be increased to achieve better resolution in the Gaussian cdf and inv cdf. + # - the function `get_corr_rval` to compute the correlated numbers is not affected by arr_N and arr_N_cdf + arr_min, arr_max, arr_N = 1e-16, 1 - 1e-16, 1000000 + arr_min_cdf, arr_max_cdf, arr_N_cdf = -20., 20., 1000000 + norm_inv_cdf = compute_norm_inv_cdf_lookup(arr_min, arr_max, arr_N) + norm_cdf = compute_norm_cdf_lookup(arr_min_cdf, arr_max_cdf, arr_N_cdf) + + # buffer to be re-used to store all the correlated random values + z_unif = np.zeros(sample_size, dtype='float64') + + else: + # create dummy data structures with proper dtypes to allow correct numba compilation + corr_data_by_item_id = np.ndarray(1, dtype=Correlation) + arr_min, arr_max, arr_N = 0, 0, 0 + arr_min_cdf, arr_max_cdf, arr_N_cdf = 0, 0, 0 + norm_inv_cdf, norm_cdf = np.zeros(1, dtype='float64'), np.zeros(1, dtype='float64') + z_unif = np.zeros(1, dtype='float64') + # create buffer to be reused to store all losses for one coverage losses_buffer = np.zeros((sample_size + NUM_IDX + 1, np.max(coverages[1:]['max_items'])), dtype=oasis_float) @@ -223,14 +283,27 @@ def run(run_dir, ignore_file_type, sample_size, loss_threshold, alloc_rule, debu event_id, compute_i, items_data, recs, rec_idx_ptr, rng_index = event_data - rndms = generate_rndm(seeds[:rng_index], sample_size) + # generation of "base" random values is done as before + rndms_base = generate_rndm(seeds[:rng_index], sample_size) + + # to generate the correlated part, we do the hashing here for now (instead of in stream_to_data) + # generate the correlated samples for the whole event, for all peril correlation groups + if do_correlation: + corr_seeds = generate_correlated_hash_vector(unique_peril_correlation_groups, event_id) + eps_ij = generate_rndm(corr_seeds, sample_size, skip_seeds=1) + + else: + # create dummy data structures with proper dtypes to allow correct numba compilation + corr_seeds = np.zeros(1, dtype='int64') + eps_ij = np.zeros((1, 1), dtype='float64') last_processed_coverage_ids_idx = 0 while last_processed_coverage_ids_idx < compute_i: cursor, cursor_bytes, last_processed_coverage_ids_idx = compute_event_losses( event_id, coverages, compute[:compute_i], items_data, last_processed_coverage_ids_idx, sample_size, recs, rec_idx_ptr, - damage_bins, loss_threshold, losses_buffer, alloc_rule, rndms, debug, + damage_bins, loss_threshold, losses_buffer, alloc_rule, do_correlation, rndms_base, eps_ij, corr_data_by_item_id, + arr_min, arr_max, arr_N, norm_inv_cdf, arr_min_cdf, arr_max_cdf, arr_N_cdf, norm_cdf, z_unif, debug, GULPY_STREAM_BUFF_SIZE_WRITE, int32_mv_write, cursor ) @@ -246,8 +319,9 @@ def run(run_dir, ignore_file_type, sample_size, loss_threshold, alloc_rule, debu @njit(cache=True, fastmath=True) def compute_event_losses(event_id, coverages, coverage_ids, items_data, last_processed_coverage_ids_idx, sample_size, recs, rec_idx_ptr, damage_bins, - loss_threshold, losses, alloc_rule, rndms, debug, buff_size, - int32_mv, cursor): + loss_threshold, losses, alloc_rule, do_correlation, rndms_base, eps_ij, corr_data_by_item_id, + arr_min, arr_max, arr_N, norm_inv_cdf, arr_min_cdf, arr_max_cdf, arr_N_cdf, norm_cdf, + z_unif, debug, buff_size, int32_mv, cursor): """Compute losses for an event. Args: @@ -264,6 +338,7 @@ def compute_event_losses(event_id, coverages, coverage_ids, items_data, loss_threshold (float): threshold above which losses are printed to the output stream. losses (numpy.array[oasis_float]): array (to be re-used) to store losses for all item_ids. alloc_rule (int): back-allocation rule. + do_correlation (bool): if True, compute correlated random samples. rndms (numpy.array[float64]): 2d array of shape (number of seeds, sample_size) storing the random values drawn for each seed. debug (bool): if True, for each random sample, print to the output stream the random value @@ -276,6 +351,7 @@ def compute_event_losses(event_id, coverages, coverage_ids, items_data, int, int, int: updated value of cursor, updated value of cursor_bytes, last last_processed_coverage_ids_idx """ max_size_per_item = (sample_size + NUM_IDX + 1) * gulSampleslevelRec_size + 2 * gulSampleslevelHeader_size + for coverage_i in range(last_processed_coverage_ids_idx, coverage_ids.shape[0]): coverage = coverages[coverage_ids[coverage_i]] tiv = coverage['tiv'] # coverages are indexed from 1 @@ -297,7 +373,6 @@ def compute_event_losses(event_id, coverages, coverage_ids, items_data, item = items[item_i] damagecdf_i = item['damagecdf_i'] rng_index = item['rng_index'] - rec = recs[rec_idx_ptr[damagecdf_i]:rec_idx_ptr[damagecdf_i + 1]] prob_to = rec['prob_to'] bin_mean = rec['bin_mean'] @@ -315,14 +390,34 @@ def compute_event_losses(event_id, coverages, coverage_ids, items_data, losses[MEAN_IDX, item_i] = gul_mean if sample_size > 0: + if do_correlation: + item_corr_data = corr_data_by_item_id[item['item_id']] + rho = item_corr_data['correlation_value'] + + if rho > 0: + peril_correlation_group = item_corr_data['peril_correlation_group'] + + get_corr_rval( + eps_ij[peril_correlation_group], rndms_base[rng_index], + rho, arr_min, arr_max, arr_N, norm_inv_cdf, + arr_min_cdf, arr_max_cdf, arr_N_cdf, norm_cdf, sample_size, z_unif + ) + rndms = z_unif + + else: + rndms = rndms_base[rng_index] + + else: + rndms = rndms_base[rng_index] + if debug: for sample_idx in range(1, sample_size + 1): - rval = rndms[rng_index][sample_idx - 1] + rval = rndms[sample_idx - 1] losses[sample_idx, item_i] = rval else: for sample_idx in range(1, sample_size + 1): # cap `rval` to the maximum `prob_to` value (which should be 1.) - rval = rndms[rng_index][sample_idx - 1] + rval = rndms[sample_idx - 1] if rval >= prob_to[Nbins - 1]: rval = prob_to[Nbins - 1] - 0.00000003 diff --git a/oasislmf/pytools/gul/random.py b/oasislmf/pytools/gul/random.py index f7b237a0e3..2310bbdd14 100644 --- a/oasislmf/pytools/gul/random.py +++ b/oasislmf/pytools/gul/random.py @@ -3,8 +3,10 @@ """ +from math import sqrt import logging import numpy as np +from scipy.stats import norm from numba import njit logger = logging.getLogger(__name__) @@ -33,22 +35,6 @@ def generate_hash(group_id, event_id, base_seed=0): return hash -@njit(cache=True, fastmath=True) -def generate_correlated_hash(event_id, base_seed=0): - """Generate hash for an `event_id`. - - Args: - event_id (int): event id. - base_seed (int, optional): base random seed. Defaults to 0. - - Returns: - int64: hash - """ - hash = (base_seed + (event_id * EVENT_ID_HASH_CODE) % HASH_MOD_CODE) % HASH_MOD_CODE - - return hash - - def get_random_generator(random_generator): """Get the random generator function. @@ -72,13 +58,76 @@ def get_random_generator(random_generator): raise ValueError(f"No random generator exists for random_generator={random_generator}.") +EVENT_ID_HASH_CODE = np.int64(1943_272_559) +PERIL_CORRELATION_GROUP_HASH = np.int64(1836311903) +HASH_MOD_CODE = np.int64(2147483648) + + +@njit(cache=True, fastmath=True) +def generate_correlated_hash_vector(unique_peril_correlation_groups, event_id, base_seed=0): + """Generate hashes for all peril correlation groups for a given `event_id`. + + Args: + unique_peril_correlation_groups (List[int]): list of the unique peril correlation groups. + event_id (int): event id. + base_seed (int, optional): base random seed. Defaults to 0. + + Returns: + List[int64]: hashes + """ + Nperil_correlation_groups = unique_peril_correlation_groups.shape[0] + correlated_hashes = np.empty(Nperil_correlation_groups + 1, dtype='int64') + correlated_hashes[0] = 0 + + for i in range(1, Nperil_correlation_groups + 1): + correlated_hashes[i] = ( + base_seed + + (unique_peril_correlation_groups[i - 1] * PERIL_CORRELATION_GROUP_HASH) % HASH_MOD_CODE + + (event_id * EVENT_ID_HASH_CODE) % HASH_MOD_CODE + ) % HASH_MOD_CODE + + return correlated_hashes + + +def compute_norm_inv_cdf_lookup(arr_min, arr_max, arr_N): + return norm.ppf(np.linspace(arr_min, arr_max, arr_N)) + + +def compute_norm_cdf_lookup(arr_min, arr_max, arr_N): + return norm.cdf(np.linspace(arr_min, arr_max, arr_N)) + + +@njit(cache=True, fastmath=True) +def get_norm_cdf_cell_nb(x, arr_min, arr_max, arr_N): + return int((x - arr_min) * (arr_N - 1) // (arr_max - arr_min)) + + +@njit(cache=True, fastmath=True) +def get_corr_rval(x_unif, y_unif, rho, arr_min, arr_max, arr_N, norm_inv_cdf, arr_min_cdf, + arr_max_cdf, arr_N_cdf, norm_cdf, Nsamples, z_unif): + + sqrt_rho = sqrt(rho) + sqrt_1_minus_rho = sqrt(1. - rho) + + for i in range(Nsamples): + x_norm = norm_inv_cdf[get_norm_cdf_cell_nb(x_unif[i], arr_min, arr_max, arr_N)] + y_norm = norm_inv_cdf[get_norm_cdf_cell_nb(y_unif[i], arr_min, arr_max, arr_N)] + z_norm = sqrt_rho * x_norm + sqrt_1_minus_rho * y_norm + + z_unif[i] = norm_cdf[get_norm_cdf_cell_nb(z_norm, arr_min_cdf, arr_max_cdf, arr_N_cdf)] + + @njit(cache=True, fastmath=True) -def random_MersenneTwister(seeds, n): +def random_MersenneTwister(seeds, n, skip_seeds=0): """Generate random numbers using the default Mersenne Twister algorithm. Args: seeds (List[int64]): List of seeds. n (int): number of random samples to generate for each seed. + skip_seeds (int): number of seeds to skip starting from the beginning + of the `seeds` array. For skipped seeds no random numbers are generated + and the output rndms will contain zeros at their corresponding row. + Default is 0, i.e. no seeds are skipped. Returns: rndms (array[float]): 2-d array of shape (number of seeds, n) @@ -89,9 +138,9 @@ def random_MersenneTwister(seeds, n): Nseeds = len(seeds) rndms = np.zeros((Nseeds, n), dtype='float64') - for seed_i, seed in enumerate(seeds): + for seed_i in range(skip_seeds, Nseeds, 1): # set the seed - np.random.seed(seed) + np.random.seed(seeds[seed_i]) # draw the random numbers for j in range(n): @@ -102,7 +151,7 @@ def random_MersenneTwister(seeds, n): @njit(cache=True, fastmath=True) -def random_LatinHypercube(seeds, n): +def random_LatinHypercube(seeds, n, skip_seeds=0): """Generate random numbers using the Latin Hypercube algorithm. Args: @@ -114,6 +163,10 @@ def random_LatinHypercube(seeds, n): containing the random values generated for each seed. rndms_idx (Dict[int64, int]): mapping between `seed` and the row in rndms that stores the corresponding random values. + skip_seeds (int): number of seeds to skip starting from the beginning + of the `seeds` array. For skipped seeds no random numbers are generated + and the output rndms will contain zeros at their corresponding row. + Default is 0, i.e. no seeds are skipped. Notes: Implementation follows scipy.stats.qmc.LatinHypercube v1.8.0. @@ -127,9 +180,9 @@ def random_LatinHypercube(seeds, n): samples = np.zeros(n, dtype='float64') perms = np.zeros(n, dtype='float64') - for seed_i, seed in enumerate(seeds): + for seed_i in range(skip_seeds, Nseeds, 1): # set the seed - np.random.seed(seed) + np.random.seed(seeds[seed_i]) # draw the random numbers and re-generate permutations array for i in range(n): diff --git a/oasislmf/pytools/gulpy.py b/oasislmf/pytools/gulpy.py index a4c01cbe0f..f9c961ffda 100644 --- a/oasislmf/pytools/gulpy.py +++ b/oasislmf/pytools/gulpy.py @@ -12,6 +12,9 @@ ) parser.add_argument('-a', help='back-allocation rule', default=0, type=int, dest='alloc_rule') +parser.add_argument('--ignore-correlation', + help='if passed, peril correlation groups (if defined) are ignored for the generation of correlated samples', + action='store_true', dest='ignore_correlation', default=False) parser.add_argument('-d', help='output random numbers instead of gul (default: False).', default=False, action='store_true', dest='debug') parser.add_argument('-i', '--file-in', help='filename of input stream.', action='store', type=str, dest='file_in') diff --git a/oasislmf/utils/data.py b/oasislmf/utils/data.py index 3419c92a08..1f27a590ea 100644 --- a/oasislmf/utils/data.py +++ b/oasislmf/utils/data.py @@ -44,6 +44,7 @@ from chardet.universaldetector import UniversalDetector from tabulate import tabulate +from typing import List, Optional import numpy as np import pandas as pd @@ -409,6 +410,26 @@ def get_model_settings(model_settings_fp, key=None, validate=True): return model_settings if not key else model_settings.get(key) +def establish_correlations(model_settings: dict) -> bool: + """ + Checks the model settings to see if correlations are present. + + Args: + model_settings: (dict) the model settings that are going to be checked + + Returns: (bool) True if correlations, False if not + """ + correlations: Optional[List[dict]] = model_settings.get("correlation_settings") + + if correlations is None: + return False + if not isinstance(correlations, list): + return False + if len(correlations) == 0: + return False + return True + + def detect_encoding(filepath): """ Given a path to a CSV of unknown encoding diff --git a/tests/fm/test_fm.py b/tests/fm/test_fm.py index 82133c94e3..b9bb672fa0 100644 --- a/tests/fm/test_fm.py +++ b/tests/fm/test_fm.py @@ -15,6 +15,7 @@ def setUp(self): self.test_cases_fp = os.path.join(sys.path[0], 'validation') self.update_expected = False self.keep_output = True + self.hashed_group_id = False def run_test(self, test_case, fmpy=False, subperils=1, expected_dir="expected"): with tempfile.TemporaryDirectory() as tmp_run_dir: @@ -35,6 +36,7 @@ def run_test(self, test_case, fmpy=False, subperils=1, expected_dir="expected"): num_subperils=subperils, test_tolerance=0.001, expected_output_dir=expected_dir, + hashed_group_id=self.hashed_group_id, ) self.assertTrue(result) diff --git a/tests/fm/test_fmpy.py b/tests/fm/test_fmpy.py index 077b342a28..c10ff17f8d 100644 --- a/tests/fm/test_fmpy.py +++ b/tests/fm/test_fmpy.py @@ -15,6 +15,7 @@ def setUp(self): self.test_cases_fp = os.path.join(sys.path[0], 'validation') self.update_expected = False self.keep_output = True + self.hashed_group_id = False def run_test(self, test_case, fmpy=False, subperils=1, expected_dir="expected"): with tempfile.TemporaryDirectory() as tmp_run_dir: @@ -37,6 +38,7 @@ def run_test(self, test_case, fmpy=False, subperils=1, expected_dir="expected"): num_subperils=subperils, test_tolerance=0.001, expected_output_dir=expected_dir, + hashed_group_id=self.hashed_group_id, ) self.assertTrue(result) @@ -91,4 +93,4 @@ def test_issues_2_subperils(self): self.run_test('issues', fmpy=True, subperils=2, expected_dir="expected_subperils") def test_insurance_policy_coverage_2_subperils(self): - self.run_test('insurance_policy_coverage',fmpy=True, subperils=2, expected_dir="expected_subperils") \ No newline at end of file + self.run_test('insurance_policy_coverage',fmpy=True, subperils=2, expected_dir="expected_subperils") diff --git a/tests/model_preparation/test_summaries.py b/tests/model_preparation/test_summaries.py index c19c97a2a4..e128b7510c 100644 --- a/tests/model_preparation/test_summaries.py +++ b/tests/model_preparation/test_summaries.py @@ -133,7 +133,7 @@ def test_single_peril__totals_correct(self, data): ) # Run Gul Proccessing - gul_inputs = get_gul_input_items(loc_df, keys_df) + gul_inputs = get_gul_input_items(loc_df, keys_df, group_id_cols=['loc_id']) gul_inputs = gul_inputs[gul_inputs['status'].isin(OASIS_KEYS_STATUS_MODELLED)] # Fetch expected TIVS @@ -200,7 +200,7 @@ def test_multi_perils__single_covarage(self, data): # Run Summary output check self.assertSummaryIsValid( loc_df, - get_gul_input_items(loc_df, keys_df), + get_gul_input_items(loc_df, keys_df, group_id_cols=['loc_id']), get_exposure_summary(exposure_df=loc_df, keys_df=keys_df), perils_returned ) @@ -244,7 +244,7 @@ def test_multi_perils__multi_covarage(self, data): # Run Summary output check exp_summary = get_exposure_summary(exposure_df=loc_df, keys_df=keys_df) - gul_inputs = get_gul_input_items(loc_df, keys_df) + gul_inputs = get_gul_input_items(loc_df, keys_df, group_id_cols=['loc_id']) self.assertSummaryIsValid( loc_df, gul_inputs, diff --git a/tests/preparation/test_correlations.py b/tests/preparation/test_correlations.py index 62c10112c9..3d2bc49456 100644 --- a/tests/preparation/test_correlations.py +++ b/tests/preparation/test_correlations.py @@ -27,10 +27,10 @@ def test_map_data(self): def test_get_correlation_input_items(self): gul_path = META_PATH + "gul_inputs_df.csv" - settings_path = META_PATH + "model_settings.json" gul_inputs_df = pd.read_csv(gul_path) - correlation_df = get_correlation_input_items(model_settings_path=settings_path, gul_inputs_df=gul_inputs_df) + correlation_df = get_correlation_input_items(correlation_map_df=map_data(data=self.model_settings), + gul_inputs_df=gul_inputs_df) correlation_df_check = pd.read_csv(f"{META_PATH}correlation_df.csv") correlation_df_check.equals(correlation_df) diff --git a/tests/utils/test_data.py b/tests/utils/test_data.py index be99f55edf..73f52b37bf 100644 --- a/tests/utils/test_data.py +++ b/tests/utils/test_data.py @@ -39,6 +39,7 @@ get_timestamp, get_utctimestamp, get_location_df, + PANDAS_DEFAULT_NULL_VALUES, ) from oasislmf.utils.defaults import ( @@ -541,10 +542,12 @@ def test_get_dataframe__from_csv_file_with_mixed_case_cols__set_col_defaults_opt try: df = pd.DataFrame(data) df.to_csv(path_or_buf=fp, columns=df.columns, encoding='utf-8', index=False) + df['STR_COL'] = df['STR_COL'].map(lambda x: np.nan if x in PANDAS_DEFAULT_NULL_VALUES else x) fp.close() expected = df.copy(deep=True) expected.columns = expected.columns.str.lower() + for col, default in defaults.items(): expected.loc[:, col.lower()].fillna(defaults[col], inplace=True)