Source code for pysd.tools.benchmarking

"""
Benchmarking tools for testing and comparing outputs between different files.
Some of these functions are also used for testing.
"""
import warnings
from pathlib import Path

import numpy as np
import pandas as pd

from pysd import read_vensim, read_xmile, load
from ..py_backend.utils import load_outputs, detect_encoding
from pysd.translators.vensim.vensim_utils import supported_extensions as\
    vensim_extensions
from pysd.translators.xmile.xmile_utils import supported_extensions as\
    xmile_extensions


[docs]def runner(model_file, canonical_file=None, transpose=False, data_files=None): """ Translates and runs a model and returns its output and the canonical output. Parameters ---------- model_file: str Name of the original model file. Must be '.mdl' or '.xmile'. canonical_file: str or None (optional) Canonical output file to read. If None, will search for 'output.csv' and 'output.tab' in the model directory. Default is None. transpose: bool (optional) If True reads transposed canonical file, i.e. one variable per row. Default is False. data_files: list (optional) List of the data files needed to run the model. Returns ------- output, canon: (pandas.DataFrame, pandas.DataFrame) pandas.DataFrame of the model output and the canonical output. """ if isinstance(model_file, str): model_file = Path(model_file) directory = model_file.parent # load canonical output if not canonical_file: if directory.joinpath('output.csv').is_file(): canonical_file = directory.joinpath('output.csv') elif directory.joinpath('output.tab').is_file(): canonical_file = directory.joinpath('output.tab') else: raise FileNotFoundError("\nCanonical output file not found.") canon = load_outputs(canonical_file, transpose=transpose, encoding=detect_encoding(canonical_file)) # load model if model_file.suffix.lower() in vensim_extensions: model = read_vensim(model_file, data_files) elif model_file.suffix.lower() in xmile_extensions: model = read_xmile(model_file, data_files) elif model_file.suffix.lower() == ".py": model = load(model_file, data_files) else: raise ValueError( "\nThe model file name must be a Vensim" f" ({', '.join(vensim_extensions)}), a Xmile " f"({', '.join(xmile_extensions)}) or a PySD (.py) model file...") # run model and return the result return model.run(return_columns=canon.columns), canon
[docs]def assert_frames_close(actual, expected, assertion="raise", verbose=False, precision=2, **kwargs): """ Compare DataFrame items by column and raise AssertionError if any column is not equal. Ordering of columns is unimportant, items are compared only by label. NaN and infinite values are supported. Parameters ---------- actual: pandas.DataFrame Actual value from the model output. expected: pandas.DataFrame Expected model output. assertion: str (optional) "raise" if an error should be raised when not able to assert that two frames are close. If "warning", it will show a warning message. If "return" it will return information. Default is "raise". verbose: bool (optional) If True, if any column is not close the actual and expected values will be printed in the error/warning message with the difference. Default is False. precision: int (optional) Precision to print the numerical values of assertion verbosed message. Default is 2. kwargs: Optional rtol and atol values for assert_allclose. Returns ------- (cols, first_false_time, first_false_cols) or None: (set, float, set) or None If assertion is 'return', return the sets of the all columns that are different. The time when the first difference was found and the variables that what different at that time. If assertion is not 'return' it returns None. Examples -------- >>> assert_frames_close( ... pd.DataFrame(100, index=range(5), columns=range(3)), ... pd.DataFrame(100, index=range(5), columns=range(3))) >>> assert_frames_close( ... pd.DataFrame(100, index=range(5), columns=range(3)), ... pd.DataFrame(110, index=range(5), columns=range(3)), ... rtol=.2) >>> assert_frames_close( ... pd.DataFrame(100, index=range(5), columns=range(3)), ... pd.DataFrame(150, index=range(5), columns=range(3)), ... rtol=.2) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... AssertionError: Following columns are not close: \t'0' >>> assert_frames_close( ... pd.DataFrame(100, index=range(5), columns=range(3)), ... pd.DataFrame(150, index=range(5), columns=range(3)), ... verbose=True, rtol=.2) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... AssertionError: Following columns are not close: \t'0' Column '0' is not close. Expected values: \t[150, 150, 150, 150, 150] Actual values: \t[100, 100, 100, 100, 100] Difference: \t[50, 50, 50, 50, 50] >>> assert_frames_close( ... pd.DataFrame(100, index=range(5), columns=range(3)), ... pd.DataFrame(150, index=range(5), columns=range(3)), ... rtol=.2, assertion="warn") ... UserWarning: Following columns are not close: \t'0' References ---------- Derived from: http://nbviewer.jupyter.org/gist/jiffyclub/ac2e7506428d5e1d587b """ if not isinstance(actual, pd.DataFrame)\ or not isinstance(expected, pd.DataFrame): raise TypeError('\nInputs must both be pandas DataFrames.') expected_cols, actual_cols = set(expected.columns), set(actual.columns) if expected_cols != actual_cols: # columns are not equal message = "" if actual_cols.difference(expected_cols): columns = sorted([ "'" + col + "'" for col in actual_cols.difference(expected_cols)]) columns = ", ".join(columns) message += '\nColumns ' + columns\ + ' from actual values not found in expected values.' if expected_cols.difference(actual_cols): columns = sorted([ "'" + col + "'" for col in expected_cols.difference(actual_cols)]) columns = ", ".join(columns) message += '\nColumns ' + columns\ + ' from expected values not found in actual values.' if assertion == "raise": raise ValueError( '\nColumns from actual and expected values must be equal.' + message) else: warnings.warn(message) columns = list(actual_cols.intersection(expected_cols)) # TODO let compare dataframes with different timestamps if "warn" assert np.all(np.equal(expected.index.values, actual.index.values)), \ "test set and actual set must share a common index, "\ "instead found %s vs %s" % (expected.index.values, actual.index.values) # if for Vensim outputs where constant values are only in the first row _remove_constant_nan(expected) _remove_constant_nan(actual) c = assert_allclose(expected[columns], actual[columns], **kwargs) if c.all().all(): return (set(), np.nan, set()) if assertion == "return" else None # Get the columns that have the first different value, useful for # debugging false_index = c.apply( lambda x: np.where(~x)[0][0] if not x.all() else np.nan) index_first_false = int(np.nanmin(false_index)) time_first_false = c.index[index_first_false] variable_first_false = sorted( false_index.index[false_index == index_first_false]) columns = sorted(np.array(columns, dtype=str)[~c.all().values]) assertion_details = "\nFollowing columns are not close:\n\t"\ + ", ".join(columns) + "\n\n"\ + f"First false values ({time_first_false}):\n\t"\ + ", ".join(variable_first_false) if verbose: for col in columns: assertion_details += '\n\n'\ + f"Column '{col}' is not close."\ + '\n\nExpected values:\n\t'\ + np.array2string(expected[col].values, precision=precision, separator=', ')\ + '\n\nActual values:\n\t'\ + np.array2string(actual[col].values, precision=precision, separator=', ')\ + '\n\nDifference:\n\t'\ + np.array2string(expected[col].values-actual[col].values, precision=precision, separator=', ') if assertion == "raise": raise AssertionError(assertion_details) elif assertion == "return": return (set(columns), time_first_false, set(variable_first_false)) else: warnings.warn(assertion_details)
[docs]def assert_allclose(x, y, rtol=1.e-5, atol=1.e-5): """ Asserts if numeric values from two arrays are close. Parameters ---------- x: ndarray Expected value. y: ndarray Actual value. rtol: float (optional) Relative tolerance on the error. Default is 1.e-5. atol: float (optional) Absolut tolerance on the error. Default is 1.e-5. Returns ------- None """ return ((abs(x - y) <= atol + rtol * abs(y)) + x.isna()*y.isna())
def _remove_constant_nan(df): """ Removes nana values in constant value columns produced by Vensim """ nan_cols = np.isnan(df.iloc[1:, :]).all() cols = nan_cols[nan_cols].index df[cols] = df[cols].apply(lambda x: x.iloc[0])