Source code for pysd.py_backend.model

"""
Macro and Model classes are the main classes for loading and interacting
with a PySD model. Model class allows loading and running a PySD model.
Several methods and propierties are inherited from Macro class, which
allows integrating a model or a Macro expression (set of functions in
a separate file).
"""
import time
import warnings
import inspect
import pickle
from pathlib import Path
from copy import deepcopy
from typing import Union

import numpy as np
import xarray as xr
import pandas as pd

from pysd._version import __version__

from . import utils
from .statefuls import DynamicStateful, Stateful
from .external import External, Excels, ExtLookup, ExtData

from .cache import Cache, constant_cache
from .data import TabData
from .lookups import HardcodedLookups
from .components import Components, Time
from .output import ModelOutput


[docs] class Macro(DynamicStateful): """ The Macro class implements a stateful representation of the system, and contains the majority of methods for accessing and modifying components. When the instance in question also serves as the root model object (as opposed to a macro or submodel within another model) it will have added methods to facilitate execution. The Macro object will be created with components drawn from a translated Python model file. Parameters ---------- py_model_file: str or pathlib.Path Filename of a model or macro which has already been converted into a Python format. params: dict or None (optional) Dictionary of the macro parameters. Default is None. return_func: str or None (optional) The name of the function to return from the macro. Default is None. time: components.Time or None (optional) Time object for integration. If None a new time object will be generated (for models), if passed the time object will be used (for macros). Default is None. time_initialization: callable or None Time to set at the begginning of the Macro. Default is None. data_files: dict or list or str or None The dictionary with keys the name of file and variables to load the data from. Or the list of names or name of the file to search the data in. Only works for TabData type object and it is neccessary to provide it. Default is None. py_name: str or None The name of the Macro object. Default is None. See also -------- :class:`pysd.py_backend.model.Model` """ def __init__(self, py_model_file, params=None, return_func=None, time=None, time_initialization=None, data_files=None, py_name=None): super().__init__() self.time = time self.time_initialization = time_initialization # Initialize the cache object self.cache = Cache() # Python name of the object (for Macros) self.py_name = py_name # Booleans to avoid loading again external data or lookups self.external_loaded = False self.lookups_loaded = False # Functions with constant cache self._constant_funcs = set() # Attributes that are set later self.stateful_initial_dependencies = None self.initialize_order = None self.cache_type = None self._components_setter_tracker = {} # Load model/macro from file and save in components self.components = Components(str(py_model_file), self.set_components) if __version__.split(".")[0]\ != self.get_pysd_compiler_version().split(".")[0]: raise ImportError( "\n\nNot able to import the model. " + "The model was translated with a " + "not compatible version of PySD:" + "\n\tPySD " + self.get_pysd_compiler_version() + "\n\nThe current version of PySd is:" + "\n\tPySD " + __version__ + "\n\n" + "Please translate again the model with the function" + " read_vensim or read_xmile.") # Assing some protected attributes for easier access self._namespace = self.components._components.component.namespace self._dependencies =\ self.components._components.component.dependencies.copy() self._subscript_dict = getattr( self.components._components, "_subscript_dict", {}) self._modules = getattr( self.components._components, "_modules", {}) self._doc = self._build_doc() if params is not None: # add params to namespace self._namespace.update(self.components._components._params) # create new components with the params self._set_components(params, new=True) # update dependencies for param in params: self._dependencies[ self._namespace[param]] = {"time"} # Get the collections of stateful elements and external elements self._stateful_elements = { name: getattr(self.components, name) for name in dir(self.components) if isinstance(getattr(self.components, name), Stateful) } self._dynamicstateful_elements = [ getattr(self.components, name) for name in dir(self.components) if isinstance(getattr(self.components, name), DynamicStateful) ] self._external_elements = [ getattr(self.components, name) for name in dir(self.components) if isinstance(getattr(self.components, name), External) ] self._macro_elements = [ getattr(self.components, name) for name in dir(self.components) if isinstance(getattr(self.components, name), Macro) ] self._data_elements = [ getattr(self.components, name) for name in dir(self.components) if isinstance(getattr(self.components, name), TabData) ] self._lookup_elements = [ getattr(self.components, name) for name in dir(self.components) if isinstance(getattr(self.components, name), HardcodedLookups) ] # Load data files if data_files: self._get_data(data_files) # Assign the cache type to each variable self._assign_cache_type() # Get the initialization order of Stateful elements self._get_initialize_order() if return_func is not None: # Assign the return value of Macros self.return_func = getattr(self.components, return_func) else: self.return_func = lambda: 0 self.py_model_file = str(py_model_file) def __call__(self): return self.return_func() @property def doc(self) -> pd.DataFrame: """ The documentation of the model. """ return self._doc.copy() @property def namespace(self) -> dict: """ The namespace dictionary of the model. """ return self._namespace.copy() @property def dependencies(self) -> dict: """ The dependencies dictionary of the model. """ return self._dependencies.copy() @property def subscripts(self) -> dict: """ The subscripts dictionary of the model. """ return self._subscript_dict.copy() @property def modules(self) -> Union[dict, None]: """ The dictionary of modules of the model. If the model is not split by modules it returns None. """ return self._modules.copy() or None
[docs] def clean_caches(self): """ Clean the cache of the object and the macros objects that it contains """ self.cache.clean() # if nested macros [macro.clean_caches() for macro in self._macro_elements]
def _get_data(self, data_files): if isinstance(data_files, dict): for data_file, vars in data_files.items(): for var in vars: found = False for element in self._data_elements: if var in [element.py_name, element.real_name]: element.load_data(data_file) found = True break if not found: raise ValueError( f"'{var}' not found as model data variable") else: for element in self._data_elements: element.load_data(data_files) def _get_initialize_order(self): """ Get the initialization order of the stateful elements and their the full dependencies. """ # get the full set of dependencies to initialize an stateful object # includying all levels self.stateful_initial_dependencies = { ext: set() for ext in self._dependencies if (ext.startswith("_") and not ext.startswith("_active_initial_")) } for element in self.stateful_initial_dependencies: self._get_full_dependencies( element, self.stateful_initial_dependencies[element], "initial") # get the full dependencies of stateful objects taking into account # only other objects current_deps = { element: [ dep for dep in deps if dep in self.stateful_initial_dependencies ] for element, deps in self.stateful_initial_dependencies.items() } # get initialization order of the stateful elements self.initialize_order = [] delete = True while delete: delete = [] for element in current_deps: if not current_deps[element]: # if stateful element has no deps on others # add to the queue to initialize self.initialize_order.append(element) delete.append(element) for element2 in current_deps: # remove dependency on the initialized element if element in current_deps[element2]: current_deps[element2].remove(element) # delete visited elements for element in delete: del current_deps[element] if current_deps: # if current_deps is not an empty set there is a circular # reference between stateful objects raise ValueError( 'Circular initialization...\n' + 'Not able to initialize the following objects:\n\t' + '\n\t'.join(current_deps)) def _get_full_dependencies(self, element, dep_set, stateful_deps): """ Get all dependencies of an element, i.e., also get the dependencies of the dependencies. When finding an stateful element only dependencies for initialization are considered. Parameters ---------- element: str Element to get the full dependencies. dep_set: set Set to include the dependencies of the element. stateful_deps: "initial" or "step" The type of dependencies to take in the case of stateful objects. Returns ------- None """ deps = self._dependencies[element] if element.startswith("_"): deps = deps[stateful_deps] for dep in deps: if dep not in dep_set and not dep.startswith("__")\ and dep != "time": dep_set.add(dep) self._get_full_dependencies(dep, dep_set, stateful_deps) def _add_constant_cache(self): for element, cache_type in self.cache_type.items(): if cache_type == "run": self.components._set_component( element, constant_cache(getattr(self.components, element)) ) self._constant_funcs.add(element) def _remove_constant_cache(self): for element in self._constant_funcs: self.components._set_component( element, getattr(self.components, element).function) # remove attributes added with constant cache delattr(getattr(self.components, element), 'function') delattr(getattr(self.components, element), 'value') self._constant_funcs.clear() def _assign_cache_type(self): """ Assigns the cache type to all the elements from the namespace. """ self.cache_type = {"time": None} for element in self._namespace.values(): if element not in self.cache_type\ and element in self._dependencies: self._assign_cache(element) for element, cache_type in self.cache_type.items(): if cache_type is not None: if element not in self.cache.cached_funcs\ and self._count_calls(element) > 1: self.components._set_component( element, self.cache(getattr(self.components, element))) self.cache.cached_funcs.add(element) def _count_calls(self, element): n_calls = 0 for subelement in self._dependencies: if subelement.startswith("_") and\ element in self._dependencies[subelement]["step"]: if element in\ self._dependencies[subelement]["initial"]: n_calls +=\ 2*self._dependencies[subelement]["step"][element] else: n_calls +=\ self._dependencies[subelement]["step"][element] elif (not subelement.startswith("_") and element in self._dependencies[subelement]): n_calls +=\ self._dependencies[subelement][element] return n_calls def _assign_cache(self, element): """ Assigns the cache type to the given element and its dependencies if needed. Parameters ---------- element: str Element name. Returns ------- None """ if not self._dependencies[element]: self.cache_type[element] = "run" elif "__lookup__" in self._dependencies[element]: self.cache_type[element] = None elif self._isdynamic(self._dependencies[element]): self.cache_type[element] = "step" else: self.cache_type[element] = "run" for subelement in self._dependencies[element]: if subelement.startswith("_initial_")\ or subelement.startswith("__"): continue if subelement not in self.cache_type: self._assign_cache(subelement) if self.cache_type[subelement] == "step": self.cache_type[element] = "step" break def _isdynamic(self, dependencies): """ Parameters ---------- dependencies: iterable List of dependencies. Returns ------- isdynamic: bool True if 'time' or a dynamic stateful objects is in dependencies. """ if "time" in dependencies: return True for dep in dependencies: if dep.startswith("_") and not dep.startswith("_initial_")\ and not dep.startswith("__"): return True return False
[docs] def get_pysd_compiler_version(self): """ Returns the version of pysd complier that used for generating this model """ return self.components.__pysd_version__
[docs] def initialize(self): """ This function initializes the external objects and stateful objects in the given order. """ # Initialize time if self.time is None: self.time = self.time_initialization() # Reset time to the initial one self.time.reset() self.cache.clean() self.components._init_outer_references({ 'scope': self, 'time': self.time }) if not self.lookups_loaded: # Initialize HardcodedLookups elements for element in self._lookup_elements: element.initialize() self.lookups_loaded = True if not self.external_loaded: # Initialize external elements self.initialize_external_data() # Initialize stateful objects for element_name in self.initialize_order: self._stateful_elements[element_name].initialize()
def ddt(self): return np.array([component.ddt() for component in self._dynamicstateful_elements], dtype=object) @property def state(self): return np.array([component.state for component in self._dynamicstateful_elements], dtype=object) @state.setter def state(self, new_value): [component.update(val) for component, val in zip(self._dynamicstateful_elements, new_value)]
[docs] def initialize_external_data(self, externals=None): """ Initializes external data. If a path to a netCDF file containing serialized values of some or all of the model external data is passed in the external argument, those will be loaded from the file. To get the full performance gain of loading the externals from a netCDF file, the model should be loaded with initialize=False first. Examples of usage are available at `Advanced Usage <https://pysd.readthedocs.io/en/master/advanced_usage.html#initializing-external-data-from-netcdf-file>`__. Parameters ---------- externals: str or pathlib.Path (optional) Path to the netCDF file that contains the model external objects. Returns ------- None See also -------- :func:`pysd.py_backend.model.Macro.serialize_externals` Note ---- To load externals from a netCDF file you need to have installed the optional dependency `netCDF4`. """ if not externals: for ext in self._external_elements: ext.initialize() # Remove Excel data from memory Excels.clean() self.external_loaded = True return externals = Path(externals) if not externals.is_file(): raise FileNotFoundError(f"Invalid file path ({str(externals)})") try: ds = xr.open_dataset(externals) except ValueError: # pragma: no cover raise ModuleNotFoundError("No module named 'netCDF4'") for ext in self._external_elements: if ext.py_name in ds.data_vars.keys(): # Initialize external from nc file da = ds.data_vars[ext.py_name] if isinstance(ext, ExtData): # Rename again time dimension time_dim = [dim for dim in da.dims if dim.startswith( "time_#")][0] da = da.rename({time_dim: "time"}) elif isinstance(ext, ExtLookup): # Rename again lookup_dim dimension lookup_dim = [dim for dim in da.dims if dim.startswith( "lookup_dim_#")][0] da = da.rename({lookup_dim: "lookup_dim"}) # Assign the value if da.dims: ext.data = da else: ext.data = float(da.data) else: # Initialize external from original file ext.initialize() Excels.clean() self.external_loaded = True
[docs] def serialize_externals(self, export_path="externals.nc", include_externals="all", exclude_externals=None): """ Stores a netCDF file with the data and metadata for all model external objects. This method is useful for models with lots of external inputs, which are slow to load into memory. Once exported, the resulting netCDF file can be passed as argument to the initialize_external_data method. Names of variables should be those in the model (python safe, without the _ext_type_ string in front). Examples of usage are available at `Advanced Usage <https://pysd.readthedocs.io/en/master/advanced_usage.html#initializing-external-data-from-netcdf-file>`__. Parameters ---------- export_path: str or pathlib.Path (optional) Path of the resulting *.nc* file. include_externals: list or str (optional) External objects to export to netCDF. If 'all', then all externals are exported to the *.nc* file. The argument also accepts a list containing spreadsheet file names, external variable names or a combination of both. If a spreadsheet file path is passed, all external objects defined in it will be included in the *.nc* file. Spreadsheet tab names are not currently supported, because the same may be used in different files. Better customisation can be achieved by combining the include_externals and exclude_externals (see description below) arguments. exclude_externals: list or None (optional) Exclude external objects from being included in the exported nc file. It accepts either variable names, spreadsheet files or a combination of both. Returns ------- None See also -------- :func:`pysd.py_backend.model.Macro.initialize_external_data` Note ---- To run this function you need to have installed the optional dependency `netCDF4`. """ data = {} metadata = {} lookup_dims = utils.UniqueDims("lookup_dim") data_dims = utils.UniqueDims("time") if isinstance(export_path, str): export_path = Path(export_path) if not include_externals: raise ValueError("include_externals argument must not be None.") # Generate a Dataframe to make simpler the search of external # objects to include # TODO include also checking the original name py_names = [] externals_dict = {"py_var_name": [], "file": [], "ext": []} for ext in self._external_elements: py_names.append(ext.py_name) externals_dict["py_var_name"].append( self.__get_varname_from_ext_name(ext.py_name)) externals_dict["file"].append(set(ext.files)) externals_dict["ext"].append(ext) exts_df = pd.DataFrame(index=py_names, data=externals_dict) if include_externals != "all": if not isinstance(include_externals, (list, set)): raise TypeError( "include_externals must be 'all', or a list, or a set.") # subset only to the externals to include exts_df = exts_df[[ name in include_externals or var_name in include_externals or bool(file.intersection(include_externals)) for name, (var_name, file) in exts_df[["py_var_name", "file"]].iterrows() ]] if exclude_externals: if not isinstance(exclude_externals, (list, set)): raise TypeError("exclude_externals must be a list or a set.") # subset only to the externals to include exts_df = exts_df[[ name not in exclude_externals and var_name not in exclude_externals and not bool(file.intersection(exclude_externals)) for name, (var_name, file) in exts_df[["py_var_name", "file"]].iterrows() ]] for _, (ext, var_name) in exts_df[["ext", "py_var_name"]].iterrows(): self.__include_for_serialization( ext, var_name, data, metadata, lookup_dims, data_dims ) # create description to be used as global attribute of the dataset description = { "description": f"External objects for {self.py_model_file} " f"exported on {time.ctime(time.time())} " f"using PySD version {__version__}" } # create Dataset ds = xr.Dataset(data_vars=data, attrs=description) # add data_vars attributes for key, values in metadata.items(): ds[key].attrs = values try: ds.to_netcdf(export_path) except KeyError: # pragma: no cover raise ModuleNotFoundError("No module named 'netCDF4'")
def __include_for_serialization(self, ext, py_name_clean, data, metadata, lookup_dims, data_dims): """ Initialize the external object and get the data and metadata for inclusion in the netCDF. This function updates the metadata dict with the metadata corresponding to each external object, which is collected from model.doc. It also updates the data dict, with the data from the external object (ext). It renames the "time" dimension of ExtData types by appending _# followed by a unique number at the end of it. For large models, this prevents having an unnecessary large time dimension, which then causes all ExtData objects to have many nans when stored in a xarray Dataset. It does the same for the "lookup_dim" of all ExtLookup objects. Note ---- Though subscripts can be read from Excel, they are hardcoded during the model building process. Therefore they will not be serialized. Parameters ---------- ext: pysd.py_backend.externals.External External object. It can be any of the External subclasses (ExtConstant, ExtData, ExtLookup) py_name_clean: str Name of the variable without _ext_[constant|data|lookup] prefix. data: dict Collects all the data for each external, which is later used to build the xarray Dataset. metadata: dict Collects the metadata for each external, which is later included as data_vars attributes in the xarray Dataset. lookup_dims: utils.UniqueDims UniqueDims object for "lookup_dim" dimension. data_dims: utils.UniqueDims UniqueDims object for "time" dimension. Returns ------- None """ ext.initialize() # collecting variable metadata from model._doc var_meta = { col: self._doc.loc[self._doc["Py Name"] == py_name_clean, col].values[0] or "Missing" for col in self._doc.columns } var_meta["files"] = ";".join(ext.files) var_meta["tabs"] = ";".join(ext.tabs) var_meta["cells"] = ";".join(ext.cells) # TODO: add also time_row_or_cols da = ext.data # Renaming shared dims by all ExtData ("time") and ExtLookup # ("lookup_dim") external objects if isinstance(ext, ExtData): new_name = data_dims.name_new_dim("time", da.coords["time"].values) da = da.rename({"time": new_name}) if isinstance(ext, ExtLookup): new_name = lookup_dims.name_new_dim("lookup_dim", da.coords["lookup_dim"].values) da = da.rename({"lookup_dim": new_name}) metadata.update({ext.py_name: var_meta}) data.update({ext.py_name: da}) # TODO use a logger print(f"Finished processing variable {py_name_clean}.") def __get_varname_from_ext_name(self, varname): """ Returns the name of the variable that depends on the external object named varname. If that is not possible (see warning in the code to understand when that may happen), it gets the name by removing the _ext_[constant|data|lookup] prefix from the varname. Parameters ---------- varname: str Variable name to which the External object is assigned. Returns ------- var: str Name of the variable that calls the variable with name varname. """ for var, deps in self._dependencies.items(): for _, ext_name in deps.items(): if varname == ext_name: return var warnings.warn( f"No variable depends upon '{varname}'. This is likely due " f"to the fact that '{varname}' is defined using a mix of " "DATA and CONSTANT. Though Vensim allows it, it is " "not recommended." ) return "_".join(varname.split("_")[3:])
[docs] def get_args(self, param): """ Returns the arguments of a model element. Parameters ---------- param: str or func The model element name or function. Returns ------- args: list List of arguments of the function. Examples -------- >>> model.get_args('birth_rate') >>> model.get_args('Birth Rate') See also -------- :func:`pysd.py_backend.model.Macro.get_coords` """ if isinstance(param, str): func_name = utils.get_key_and_value_by_insensitive_key_or_value( param, self._namespace)[1] or param func = getattr(self.components, func_name) else: func = param if hasattr(func, 'args'): # cached functions return func.args else: # regular functions args = inspect.getfullargspec(func)[0] if 'self' in args: args.remove('self') return args
[docs] def get_coords(self, param): """ Returns the coordinates and dims of a model element. Parameters ---------- param: str or func The model element name or function. Returns ------- (coords, dims) or None: (dict, list) or None The coords and the dimensions of the element if it has. Otherwise, returns None. Examples -------- >>> model.get_coords('birth_rate') >>> model.get_coords('Birth Rate') See also -------- :func:`pysd.py_backend.model.Macro.get_args` """ if isinstance(param, str): func_name = utils.get_key_and_value_by_insensitive_key_or_value( param, self._namespace)[1] or param func = getattr(self.components, func_name) else: func = param if hasattr(func, "subscripts"): dims = func.subscripts if not dims: return None coords = {dim: self.components._subscript_dict[dim] for dim in dims} return coords, dims elif hasattr(func, "state") and isinstance(func.state, xr.DataArray): value = func() else: return None dims = list(value.dims) coords = {coord: list(value.coords[coord].values) for coord in value.coords} return coords, dims
def __getitem__(self, param): """ Returns the current value of a model component. Parameters ---------- param: str or func The model element name. Returns ------- value: float or xarray.DataArray The value of the model component. Examples -------- >>> model['birth_rate'] >>> model['Birth Rate'] Note ---- It will crash if the model component takes arguments. See also -------- :func:`pysd.py_backend.model.Macro.get_series_data` """ func_name = utils.get_key_and_value_by_insensitive_key_or_value( param, self._namespace)[1] or param if self.get_args(getattr(self.components, func_name)): raise ValueError( "Trying to get the current value of a lookup " "to get all the values with the series data use " "model.get_series_data(param)\n\n") return getattr(self.components, func_name)()
[docs] def get_series_data(self, param): """ Returns the original values of a model lookup/data component. Parameters ---------- param: str The model lookup/data element name. Returns ------- value: xarray.DataArray Array with the value of the interpolating series in the first dimension. Examples -------- >>> model['room_temperature'] >>> model['Room temperature'] """ func_name = utils.get_key_and_value_by_insensitive_key_or_value( param, self._namespace)[1] or param if func_name.startswith("_ext_"): return getattr(self.components, func_name).data elif "__data__" in self._dependencies[func_name]: return getattr( self.components, self._dependencies[func_name]["__data__"] ).data elif "__lookup__" in self._dependencies[func_name]: return getattr( self.components, self._dependencies[func_name]["__lookup__"] ).data else: raise ValueError( "Trying to get the values of a constant variable. " "'model.get_series_data' only works lookups/data objects.\n\n")
[docs] def set_components(self, params): """ Set the value of exogenous model elements. Element values should be passed with a dictionary in the function call. Values can be numeric type or pandas Series. Series will be interpolated by integrator. Parameters ---------- params: dict Dictionary with the name of the elements to modify and the value that they would take. If the passed value is a :class:`float` or a :class:`xarray.DataArray` (must be compatible with the dimensions of the variable). In this case, the variable will have a constant value, returning the past value and broadcasting to all dimensions, if necessary. If a :class:`pandas.Series` is passed, the variable will be of type data and will use the time of the model to interpolate the result having as reference the indexes of the series. In this case, the series 'data' can also be a :class:`float` or a :class:`xarray.DataArray`, as with constant values. In the case of the target using a :class:`pysd.py_backend.lookups.Lookup` object, it will modify the object values to use the original arguments when being call. More detailed information and examples of usage are available at `Getting Started <https://pysd.readthedocs.io/en/master/getting_started.html#setting-parameter-values>`__. To write more complex relationships, which may or may not include other model variables, a callable, e.g. a function, can be passed that takes the same arguments as the original function and returns a :class:`float` or a :class:`xarray.DataArray` with exactly the same dimensions as the original function. More detailed information and examples of usage are available at `Advanced Usage <https://pysd.readthedocs.io/en/master/advanced_usage.html#replacing-model-components-with-more-complex-objects>`__. Note ---- This function is to modify the value or equations of the variables, it won't work properly with Stateful objects, e.g. Integ, DelayFixed... In order to modify them it will be necessary to do it manually, if you have other inputs it is recommended to modify these ones. To change their initial value :func:`pysd.py_backend.model.Model.set_initial_condition` method could be used. Examples -------- >>> model.set_components({'birth_rate': 10}) >>> model.set_components({'Birth Rate': 10}) >>> br = pandas.Series(index=range(30), data=np.sin(range(30)) >>> model.set_components({'birth_rate': br}) See also -------- :func:`pysd.py_backend.model.Model.set_initial_condition` :func:`pysd.py_backend.model.Macro.get_coords` :func:`pysd.py_backend.model.Macro.get_args` """ self._components_setter_tracker.update(params) self._set_components(params, new=False)
def _set_components(self, params, new): """ Set the value of exogenous model elements, giving the option to set new components (used in Macros). """ for key, value in params.items(): func_name = utils.get_key_and_value_by_insensitive_key_or_value( key, self._namespace)[1] if isinstance(value, np.ndarray) or isinstance(value, list): raise TypeError( 'When setting ' + key + '\n' 'Setting subscripted must be done using a xarray.DataArray' ' with the correct dimensions or a constant value ' '(https://pysd.readthedocs.io/en/master/' 'getting_started.html)') if func_name is None: raise NameError( "\n'%s' is not recognized as a model component." % key) if new: func = None dims = None else: func = getattr(self.components, func_name) _, dims = self.get_coords(func) or (None, None) # if the variable is a lookup or a data we perform the change in # the object they call func_type = getattr(func, "type", None) if func_type in ["Lookup", "Data"]: # getting the object from original dependencies obj = self._dependencies[func_name][f"__{func_type.lower()}__"] getattr( self.components, obj ).set_values(value) if not isinstance(value, pd.Series): warnings.warn( "Replacing interpolation data with constant values.") # Update dependencies if func_type == "Data": if isinstance(value, pd.Series): self._dependencies[func_name] = { "time": 1, "__data__": obj } else: self._dependencies[func_name] = {"__data__": obj} continue if func_type == "Stateful": warnings.warn( "Replacing the value of Stateful variable with " "an expression. To set initial conditions use " "`set_initial_condition` instead..." ) if isinstance(value, pd.Series): if func_type == "Constant": warnings.warn( "Replacing a constant value with a " "time-dependent value. The value will be " "interpolated over time." ) new_function, deps = self._timeseries_component( value, dims) self._dependencies[func_name] = deps elif callable(value): if func_type == "Constant": warnings.warn( "Replacing a constant value with a callable. " "The value may not be constant anymore." ) new_function = value # Using step cache adding time as dependency # TODO it would be better if we can parse the content # of the function to get all the dependencies self._dependencies[func_name] = {"time": 1} else: if func_type != "Constant": warnings.warn("Replacing a variable by a constant value.") new_function = self._constant_component(value, dims) self._dependencies[func_name] = {} # copy attributes from the original object to proper working # of internal functions new_function.__name__ = func_name new_function.__dict__.update(getattr(func, "__dict__", {})) # set the new function self.components._set_component(func_name, new_function) if func_name in self.cache.cached_funcs: self.cache.cached_funcs.remove(func_name) def _timeseries_component(self, series, dims): """ Internal function for creating a timeseries model element """ # this is only called if the set_component function recognizes a # pandas series # TODO: raise a warning if extrapolating from the end of the series. # TODO: data type variables should be creted using a Data object # lookup type variables should be created using a Lookup object if isinstance(series.values[0], xr.DataArray): # the interpolation will be time dependent return lambda: utils.rearrange(xr.concat( series.values, series.index).interp(concat_dim=self.time()).reset_coords( 'concat_dim', drop=True), dims, self._subscript_dict), {'time': 1} elif dims: # the interpolation will be time dependent return lambda: utils.rearrange( np.interp(self.time(), series.index, series.values), dims, self._subscript_dict), {'time': 1} else: # the interpolation will be time dependent return lambda: np.interp( self.time(), series.index, series.values ), {'time': 1} def _constant_component(self, value, dims): """ Internal function for creating a constant model element """ if dims: return lambda: utils.rearrange( value, dims, self._subscript_dict) else: return lambda: value
[docs] def set_initial_value(self, time, initial_value): """ Set the system initial value. Parameters ---------- time : float or int The system intial time to be set. initial_value : dict A (possibly partial) dictionary of the system initial values. The keys to this dictionary may be either pysafe names or original model file names. See also -------- :func:`pysd.py_backend.model.Model.set_initial_condition` """ self.time.set_control_vars(initial_time=time) stateful_name = "_NONE" modified_statefuls = set() for key, value in initial_value.items(): component_name =\ utils.get_key_and_value_by_insensitive_key_or_value( key, self._namespace)[1] if component_name is not None: if self._dependencies[component_name]: deps = list(self._dependencies[component_name]) if len(deps) == 1 and deps[0] in self.initialize_order: stateful_name = deps[0] else: component_name = key stateful_name = key try: _, dims = self.get_coords(component_name) except TypeError: dims = None if isinstance(value, xr.DataArray)\ and not set(value.dims).issubset(set(dims)): raise ValueError( f"\nInvalid dimensions for {component_name}." f"It should be a subset of {dims}, " f"but passed value has {list(value.dims)}") if isinstance(value, np.ndarray) or isinstance(value, list): raise TypeError( 'When setting ' + key + '\n' 'Setting subscripted must be done using a xarray.DataArray' ' with the correct dimensions or a constant value ' '(https://pysd.readthedocs.io/en/master/' 'getting_started.html)') # Try to update stateful component try: element = getattr(self.components, stateful_name) if dims: value = utils.rearrange( value, dims, self._subscript_dict) element.initialize(value) modified_statefuls.add(stateful_name) except NameError: # Try to override component raise ValueError( f"\nUnrecognized stateful '{component_name}'. If you want" " to set a value of a regular component. Use params={" f"'{component_name}': {value}" + "} instead.") self.clean_caches() # get the elements to initialize elements_to_initialize =\ self._get_elements_to_initialize(modified_statefuls) # Initialize remaining stateful objects for element_name in self.initialize_order: if element_name in elements_to_initialize: self._stateful_elements[element_name].initialize()
def _get_elements_to_initialize(self, modified_statefuls): elements_to_initialize = set() for stateful, deps in self.stateful_initial_dependencies.items(): if stateful in modified_statefuls: # if elements initial conditions have been modified # we should not modify it continue for modified_sateteful in modified_statefuls: if modified_sateteful in deps: # if element has dependencies on a modified element # we should re-initialize it elements_to_initialize.add(stateful) continue return elements_to_initialize def export(self): """Exports stateful values to a dictionary.""" return { name: element.export() for name, element in self._stateful_elements.items() } def _set_stateful(self, stateful_dict): """ Set stateful values. Parameters ---------- stateful_dict: dict Dictionary of the stateful elements and the attributes to change. """ for element, attrs in stateful_dict.items(): component = getattr(self.components, element) if hasattr(component, '_set_stateful'): component._set_stateful(attrs) else: [ setattr(component, attr, value) for attr, value in attrs.items() ] def _build_doc(self): """ Formats a table of documentation strings to help users remember variable names, and understand how they are translated into Python safe names. Returns ------- docs_df: pandas dataframe Dataframe with columns for the model components: - Real names - Python safe identifiers (as used in model.components) - Units string - Documentation strings from the original model file """ collector = [] for name, pyname in self._namespace.items(): element = getattr(self.components, pyname) collector.append({ 'Real Name': name, 'Py Name': pyname, 'Subscripts': element.subscripts, 'Units': element.units, 'Limits': element.limits, 'Type': element.type, 'Subtype': element.subtype, 'Comment': element.__doc__.strip().strip("\n").strip() if element.__doc__ else None }) return pd.DataFrame( collector ).sort_values(by="Real Name").reset_index(drop=True) def __str__(self): """ Return model source files """ # JT: Might be helpful to return not only the source file, but # also how the instance differs from that source file. This # would give a more accurate view of the current model. string = 'Translated Model File: ' + self.py_model_file if hasattr(self, 'mdl_file'): string += '\n Original Model File: ' + self.mdl_file return string
[docs] class Model(Macro): """ The Model class implements a stateful representation of the system. It inherits methods from the Macro class to integrate the model and access and modify model components. It also contains the main methods for running the model. The Model object will be created with components drawn from a translated Python model file. Parameters ---------- py_model_file: str or pathlib.Path Filename of a model which has already been converted into a Python format. data_files: dict or list or str or None The dictionary with keys the name of file and variables to load the data from there. Or the list of names or name of the file to search the data in. Only works for TabData type object and it is neccessary to provide it. Default is None. initialize: bool If False, the model will not be initialize when it is loaded. Default is True. missing_values : str ("warning", "error", "ignore", "keep") (optional) What to do with missing values. If "warning" (default) shows a warning message and interpolates the values. If "raise" raises an error. If "ignore" interpolates the values without showing anything. If "keep" it will keep the missing values, this option may cause the integration to fail, but it may be used to check the quality of the data. See also -------- :class:`pysd.py_backend.model.Macro` """ def __init__(self, py_model_file, data_files, initialize, missing_values): """ Sets up the Python objects """ super().__init__(py_model_file, None, None, Time(), data_files=data_files) self.data_files = data_files self.missing_values = missing_values # set time component self.time.stage = 'Load' # set control var privately to do not change it when copying self.time._set_control_vars(**self.components._control_vars) # Attributes that are set later self.progress = None self.output = None self.capture_elements = None self.return_addresses = None self._stepper_mode = None self._submodel_tracker = {} if initialize: self.initialize()
[docs] def initialize(self): """ Initializes the simulation model. See also -------- :func:`pysd.py_backend.model.Macro.initialize` :func:`pysd.py_backend.model.Model.reload` """ self.time.stage = 'Initialization' External.missing = self.missing_values super().initialize()
[docs] def run(self, params=None, return_columns=None, return_timestamps=None, initial_condition='original', final_time=None, time_step=None, saveper=None, reload=False, progress=False, flatten_output=True, cache_output=True, output_file=None): """ Simulate the model's behavior over time. Return a pandas dataframe with timestamps as rows and model elements as columns. More detailed information and examples of usage are available at `Getting Started <https://pysd.readthedocs.io/en/master/getting_started.html#running-the-model>`__. Parameters ---------- params: dict (optional) Keys are strings of model component names. Values are numeric or pandas Series. Numeric values represent constants over the model integration. Timeseries will be interpolated to give time-varying input. For more information, check the documentation of :func:`pysd.py_backend.model.Macro.set_components`. return_timestamps: list, numeric, ndarray (1D) (optional) Timestamps in model execution at which to return state information. Defaults to model-file specified timesteps. return_columns: list, 'step' or None (optional) List of string model component names, returned dataframe will have corresponding columns. If 'step' only variables with cache step will be returned. If None, variables with cache step and run will be returned. Default is None. initial_condition: str or (float, dict) (optional) The starting time, and the state of the system (the values of all the stocks) at that starting time. 'original' or 'o' uses model-file specified initial condition. 'current' or 'c' uses the state of the model after the previous execution. Other str objects, loads initial conditions from the pickle file with the given name.(float, dict) tuple lets the user specify a starting time (float) and (possibly partial) dictionary of initial values for stock (stateful) objects. Default is 'original'. For more information, check the documentation of :func:`pysd.py_backend.model.Model.set_initial_condition` final_time: float or None Final time of the simulation. If float, the given value will be used to compute the return_timestamps (if not given) and as a final time. If None the last value of return_timestamps will be used as a final time. Default is None. time_step: float or None Time step of the simulation. If float, the given value will be used to compute the return_timestamps (if not given) and euler time series. If None the default value from components will be used. Default is None. saveper: float or None Saving step of the simulation. If float, the given value will be used to compute the return_timestamps (if not given). If None the default value from components will be used. Default is None. reload : bool (optional) If True, reloads the model from the translated model file before making changes. Default is False. progress : bool (optional) If True, a progressbar will be shown during integration. Default is False. flatten_output: bool (optional) If True, once the output dataframe has been formatted will split the xarrays in new columns following Vensim's naming to make a totally flat output. Default is True. This argument will be ignored when passing a netCDF4 file path in the output_file argument. cache_output: bool (optional) If True, the number of calls of outputs variables will be increased in 1. This helps caching output variables if they are called only once. For performance reasons, if time step = saveper it is recommended to activate this feature, if time step << saveper it is recommended to deactivate it. Default is True. output_file: str, pathlib.Path or None (optional) Path of the file in which to save simulation results. Currently, csv, tab and nc (netCDF4) files are supported. Examples -------- >>> model.run(params={'exogenous_constant': 42}) >>> model.run(params={'exogenous_variable': timeseries_input}) >>> model.run(return_timestamps=[1, 2, 3, 4, 10]) >>> model.run(return_timestamps=10) >>> model.run(return_timestamps=np.linspace(1, 10, 20)) >>> model.run(output_file="results.nc") See also -------- :func:`pysd.py_backend.model.Macro.set_components` :func:`pysd.py_backend.model.Model.set_initial_condition` :func:`pysd.py_backend.model.Model.reload` """ self._stepper_mode = False if reload: self.reload() self._config_simulation(params, return_columns, return_timestamps, initial_condition, final_time, time_step, saveper, cache_output, progress=progress) # instante output object self.output = ModelOutput(output_file) self.output.set_capture_elements(self.capture_elements) self.output.initialize(self) self._integrate() return self.output.collect(self, flatten_output)
[docs] def set_stepper(self, output_obj, params=None, step_vars=[], return_columns=None, return_timestamps=None, initial_condition='original', final_time=None, time_step=None, saveper=None, cache_output=True): """ Configure the model stepping behavior. Examples of usage are available at `Advanced Usage <https://pysd.readthedocs.io/en/master/advanced_usage.html#running-models-one-or-more-step-s-at-a-time>`__. Parameters ---------- output_obj: ModelOutput Instance of ModelOutput where the simulation results will be stored. params: dict (optional) Keys are strings of model component names. Values are numeric or pandas Series. Numeric values represent constants over the model integration. Timeseries will be interpolated to give time-varying input. step_vars: list List of variable or parameter names whose values might be updated after one or more simulation steps. return_columns: list, 'step' or None (optional) List of string model component names, returned dataframe will have corresponding columns. If 'step' only variables with cache step will be returned. If None, variables with cache step and run will be returned. Default is None. return_timestamps: list, numeric, ndarray (1D) (optional) Timestamps in model execution at which to return state information. Defaults to model-file specified timesteps. initial_condition: str or (float, dict) (optional) The starting time, and the state of the system (the values of all the stocks) at that starting time. 'original' or 'o' uses model-file specified initial condition. 'current' or 'c' uses the state of the model after the previous execution. Other str objects, loads initial conditions from the pickle file with the given name.(float, dict) tuple lets the user specify a starting time (float) and (possibly partial) dictionary of initial values for stock (stateful) objects. Default is 'original'. final_time: float or None Final time of the simulation. If float, the given value will be used to compute the return_timestamps (if not given) and as a final time. If None the last value of return_timestamps will be used as a final time. Default is None. time_step: float or None Time step of the simulation. If float, the given value will be used to compute the return_timestamps (if not given) and euler time series. If None the default value from components will be used. Default is None. saveper: float or None Saving step of the simulation. If float, the given value will be used to compute the return_timestamps (if not given). If None the default value from components will be used. Default is None. cache_output: bool (optional) If True, the number of calls of outputs variables will be increased in 1. This helps caching output variables if they are called only once. For performance reasons, if time step = saveper it is recommended to activate this feature, if time step << saveper it is recommended to deactivate it. Default is True. See also -------- :func:`pysd.py_backend.model.Model.step` """ self.output = output_obj self._stepper_mode = True self._config_simulation(params, return_columns, return_timestamps, initial_condition, final_time, time_step, saveper, cache_output, step_vars=step_vars) self.output.set_capture_elements(self.capture_elements) self.output.initialize(self) self.output.update(self)
[docs] def step(self, num_steps=1, step_vars={}): """ Run a model step. Updates model variables first (optional), and then runs any number of model steps. To collect the outputs after one or more steps, use the collect method of the ModelOutput class. Examples of usage are available at `Advanced Usage <https://pysd.readthedocs.io/en/master/advanced_usage.html#running-models-one-or-more-step-s-at-a-time>`__ Parameters ---------- num_steps: int Number of steps that the iterator should run with the values of variables defined in step_vars argument. step_vars: dict Varibale names that should be updated before running the step as keys, and the actual values of the variables as values. Returns ------- None See also -------- :func:`pysd.py_backend.model.Model.set_stepper` """ # TODO warn the user if we exceeded the final_time?? self._set_components(step_vars, new=False) for _ in range(num_steps): self._integrate_step() if self.time.in_return(): self.output.update(self)
def _config_simulation(self, params, return_columns, return_timestamps, initial_condition, final_time, time_step, saveper, cache_output, **kwargs): """ Internal method to set all simulation config parameters. Arguments to this function are those of the run and set_stepper methods. """ # set control var at the beginning in case they are needed to # initialize any object self._set_control_vars(return_timestamps, final_time, time_step, saveper) if params: self.set_components(params) if self._stepper_mode: for step_var in kwargs["step_vars"]: self._dependencies[step_var]["time"] = 1 # update cache types after setting params self._assign_cache_type() # set initial conditions self.set_initial_condition(initial_condition) # set control vars again in case a pickle has been used self._set_control_vars(return_timestamps, final_time, time_step, saveper) # progressbar only makes sense when not running step by step if not self._stepper_mode: self.progress = self._set_progressbar(kwargs["progress"]) self.capture_elements = self._set_capture_elements(return_columns) # include outputs in cache if needed self._dependencies["OUTPUTS"] = { element: 1 for element in self.capture_elements["step"] } if cache_output: # udate the cache type taking into account the outputs self._assign_cache_type() # add constant cache to thosa variable that are constants self._add_constant_cache() # set Run mode self.time.stage = 'Run' # need to clean cache to remove the values from active_initial self.clean_caches() def _set_capture_elements(self, return_columns): """ Define which variables will be stored in the output object, according to the return_columns passed by the user. The list is stored in the capture_elements attribute, which is a dictionary with keys "run" and "step". Parameters ---------- return_columns:list, 'step' or None (optional) List of string model component names, returned dataframe will have corresponding columns. If 'step' only variables with cache step will be returned. If None, variables with cache step and run will be returned. Default is None. Returns ------- capture_elements: dict Dictionary of list with keywords step and run. """ if return_columns is None or isinstance(return_columns, str): return_columns = self._default_return_columns(return_columns) capture_elements, self.return_addresses = utils.get_return_elements( return_columns, self._namespace) # create a dictionary splitting run cached and others capture_elements = self._split_capture_elements(capture_elements) return capture_elements def _set_progressbar(self, progress): """ Configures the progressbar, according to the user provided argument. If final_time or time_step are functions, then the progressbar is automatically disabled, regardless of the value of the progress argument. Parameters ---------- progress: bool """ if progress and (self.cache_type["final_time"] == "step" or self.cache_type["time_step"] == "step"): warnings.warn( "The progressbar is not compatible with dynamic " "final time or time step. Both variables must be " "constants to prompt progress." ) progress = False return progress def _set_control_vars(self, return_timestamps, final_time, time_step, saveper): self.time.add_return_timestamps(return_timestamps) if self.time.return_timestamps is not None and not final_time: # if not final time given the model will end in the list # return timestamp (the list is reversed for popping) if self.time.return_timestamps: final_time = self.time.return_timestamps[0] else: final_time = self.time._next_return self.time.set_control_vars( final_time=final_time, time_step=time_step, saveper=saveper)
[docs] def select_submodel(self, vars=[], modules=[], exogenous_components={}, inplace=True): """ Select a submodel from the original model. After selecting a submodel only the necessary stateful objects for integrating this submodel will be computed. Examples of usage are available at `Advanced Usage <https://pysd.readthedocs.io/en/master/advanced_usage.html#selecting-and-running-a-submodel>`__. Parameters ---------- vars: set or list of strings (optional) Variables to include in the new submodel. It can be an empty list if the submodel is only selected by module names. Default is an empty list. modules: set or list of strings (optional) Modules to include in the new submodel. It can be an empty list if the submodel is only selected by variable names. Default is an empty list. Can select a full module or a submodule by passing the path without the .py, e.g.: "view_1/submodule1". exogenous_components: dictionary of parameters (optional) Exogenous value to fix to the model variables that are needed to run the selected submodel. The exogenous_components should be passed as a dictionary in the same way it is done for set_components method. By default it is an empty dict and the needed exogenous components will be set to a numpy.nan value. inplace: bool (optional) If True it will modify current object and will return None. If False it will create a copy of the model and return it keeping the original model unchange. Default is True. Returns ------- None or pysd.py_backend.model.Model If inplace=False it will return a modified copy of the original model. Note ---- modules can be only passed when the model has been split in different files during translation. Examples -------- >>> model.select_submodel( ... vars=["Room Temperature", "Teacup temperature"]) UserWarning: Selecting submodel, to run the full model again use model.reload() >>> model.select_submodel( ... modules=["view_1", "view_2/subview_1"]) UserWarning: Selecting submodel, to run the full model again use model.reload() UserWarning: Exogenous components for the following variables are necessary but not given: initial_value_stock1, stock3 >>> model.select_submodel( ... vars=["stock3"], ... modules=["view_1", "view_2/subview_1"]) UserWarning: Selecting submodel, to run the full model again use model.reload() UserWarning: Exogenous components for the following variables are necessary but not given: initial_value_stock1, initial_value_stock3 Please, set them before running the model using set_components method... >>> model.select_submodel( ... vars=["stock3"], ... modules=["view_1", "view_2/subview_1"], ... exogenous_components={ ... "initial_value_stock1": 3, ... "initial_value_stock3": 5}) UserWarning: Selecting submodel, to run the full model again use model.reload() See also -------- :func:`pysd.py_backend.model.Model.get_vars_in_module` :func:`pysd.py_backend.model.Model.get_dependencies` """ if inplace: self._select_submodel(vars, modules, exogenous_components) else: return self.copy()._select_submodel( vars, modules, exogenous_components)
def _select_submodel(self, vars, modules, exogenous_components={}): self._submodel_tracker = { "vars": vars, "modules": modules } deps = self.get_dependencies(vars, modules) warnings.warn( "Selecting submodel, " "to run the full model again use model.reload()") # get set of all dependencies and all variables to select all_deps = deps.d_deps["initial"].copy() all_deps.update(deps.d_deps["step"]) all_deps.update(deps.d_deps["lookup"]) all_vars = all_deps.copy() all_vars.update(deps.c_vars) # clean dependendies and namespace dictionaries, and remove # the rows from the documentation for real_name, py_name in self._namespace.copy().items(): if py_name not in all_vars: del self._namespace[real_name] del self._dependencies[py_name] self._doc.drop( self._doc.index[self._doc["Real Name"] == real_name], inplace=True ) for py_name in self._dependencies.copy().keys(): if py_name.startswith("_") and py_name not in deps.s_deps: del self._dependencies[py_name] # remove active initial from s_deps as they are "fake" objects # in dependencies deps.s_deps = { dep for dep in deps.s_deps if not dep.startswith("_active_initial") } # reassing the dictionary and lists of needed stateful objects self._stateful_elements = { name: getattr(self.components, name) for name in deps.s_deps if isinstance(getattr(self.components, name), Stateful) } self._dynamicstateful_elements = [ getattr(self.components, name) for name in deps.s_deps if isinstance(getattr(self.components, name), DynamicStateful) ] self._macro_elements = [ getattr(self.components, name) for name in deps.s_deps if isinstance(getattr(self.components, name), Macro) ] # keeping only needed external objects ext_deps = set() for values in self._dependencies.values(): if "__external__" in values: ext_deps.add(values["__external__"]) self._external_elements = [ getattr(self.components, name) for name in ext_deps if isinstance(getattr(self.components, name), External) ] # set all exogenous values to np.nan by default new_components = {element: np.nan for element in all_deps} # update exogenous values with the user input [new_components.update( { utils.get_key_and_value_by_insensitive_key_or_value( key, self._namespace)[1]: value }) for key, value in exogenous_components.items()] self.set_components(new_components) # show a warning message if exogenous values are needed for a # dependency new_components = [ key for key, value in new_components.items() if value is np.nan] if new_components: warnings.warn( "Exogenous components for the following variables are " f"necessary but not given:\n\t{', '.join(new_components)}" "\n\n Please, set them before running the model using " "set_components method...") # re-assign the cache_type and initialization order self._assign_cache_type() self._get_initialize_order() return self
[docs] def get_dependencies(self, vars=[], modules=[]): """ Get the dependencies of a set of variables or modules. Parameters ---------- vars: set or list of strings (optional) Variables to get the dependencies from. It can be an empty list if the dependencies are computed only using modules. Default is an empty list. modules: set or list of strings (optional) Modules to get the dependencies from. It can be an empty list if the dependencies are computed only using variables. Default is an empty list. Can select a full module or a submodule by passing the path without the .py, e.g.: "view_1/submodule1". Returns ------- dependencies: pysd.py_backend.utils.Dependencies Dependencies data object. Note ---- modules can be only passed when the model has been split in different files during translation. Examples -------- >>> print(model.get_dependencies( ... vars=["Room Temperature", "Teacup temperature"])) Selected variables (total 1): room_temperature, teacup_temperature Stateful objects integrated with the selected variables (total 1): _integ_teacup_temperature >>> print(model.get_dependencies( ... modules=["view_1", "view_2/subview_1"])) Selected variables (total 4): var1, var2, stock1, delay1 Dependencies for initialization only (total 1): initial_value_stock1 Dependencies that may change over time (total 2): stock3 Stateful objects integrated with the selected variables (total 1): _integ_stock1, _delay_fixed_delay1 >>> print(model.get_dependencies( ... vars=["stock3"], ... modules=["view_1", "view_2/subview_1"])) Selected variables (total 4): var1, var2, stock1, stock3, delay1 Dependencies for initialization only (total 1): initial_value_stock1, initial_value_stock3 Stateful objects integrated with the selected variables (total 1): _integ_stock1, _integ_stock3, _delay_fixed_delay1 See also -------- :func:`pysd.py_backend.model.Model.get_vars_in_module` """ def check_dep(deps_obj, deps, initial=False): for dep in deps: if dep in deps_obj.c_vars or dep.startswith("__"): pass elif dep.startswith("_"): deps_obj.s_deps.add(dep) dep = self._dependencies[dep] check_dep(deps_obj, dep["initial"], True) check_dep(deps_obj, dep["step"]) else: if initial and dep not in deps_obj.d_deps["step"]\ and dep not in deps_obj.d_deps["lookup"]: deps_obj.d_deps["initial"].add(dep) else: if dep in deps_obj.d_deps["initial"]: deps_obj.d_deps["initial"].remove(dep) if self.get_args(dep): deps_obj.d_deps["lookup"].add(dep) else: deps_obj.d_deps["step"].add(dep) dependencies = utils.Dependencies( {"time", "time_step", "initial_time", "final_time", "saveper"}, {"initial": set(), "step": set(), "lookup": set()}, set() ) for var in vars: py_name = utils.get_key_and_value_by_insensitive_key_or_value( var, self._namespace)[1] dependencies.c_vars.add(py_name) for module in modules: dependencies.c_vars.update(self.get_vars_in_module(module)) for var in dependencies.c_vars: if var == "time": continue check_dep(dependencies, self._dependencies[var]) return dependencies
[docs] def get_vars_in_module(self, module): """ Return the name of Python vars in a module. Parameters ---------- module: str Name of the module to search in. Returns ------- vars: set Set of varible names in the given module. See also -------- :func:`pysd.py_backend.model.Model.get_dependencies` """ if self._modules: module_content = self._modules.copy() else: raise ValueError( "Trying to get a module from a non-modularized model") try: # get the module or the submodule content for submodule in module.split("/"): module_content = module_content[submodule] module_content = [module_content] except KeyError: raise NameError( f"Module or submodule '{submodule}' not found...\n") vars, new_content = set(), [] while module_content: # find the vars in the module or the submodule for content in module_content: if isinstance(content, list): vars.update(content) else: [new_content.append(value) for value in content.values()] module_content, new_content = new_content, [] return vars
[docs] def copy(self, reload=False): """ Create a copy of the current model. Parameters ---------- reload: bool (optional) If True the model will be copied without applying to it any change, the copy will simply load the model again from the translated file. This would be equivalent to doing :py:func:`pysd.load` with the same arguments. Otherwise, it will apply the same changes that have been applied to the original model and update the states (faithful copy). Default is False. Warning ------- The copy function will load a new model from the file and apply the same changes to it. If any of these changes have replaced a variable with a function that references other variables in the model, the copy will not work properly since the function will still reference the variables in the original model, in which case the function should be redefined. See also -------- :func:`pysd.py_backend.model.Model.reload` """ # initialize the new model? initialize = self.time.stage != 'Load' # create a new model new_model = type(self)( py_model_file=deepcopy(self.py_model_file), data_files=deepcopy(self.data_files), initialize=initialize, missing_values=deepcopy(self.missing_values) ) if reload: # return reloaded copy return new_model # copy the values of the stateful objects if initialize: new_model._set_stateful(deepcopy(super().export())) # copy time object values new_model.time._set_time(deepcopy(self.time.export())) # set other components with warnings.catch_warnings(): # filter warnings that have been already shown in original model warnings.simplefilter("ignore") # substract submodel if self._submodel_tracker: new_model._select_submodel(**self._submodel_tracker) # copy modified parameters new_model.set_components(self._components_setter_tracker) return new_model
[docs] def reload(self): """ Reloads the model from the translated model file, so that all the parameters are back to their original value. See also -------- :func:`pysd.py_backend.model.Model.copy` :func:`pysd.py_backend.model.Model.initialize` """ self.__init__(self.py_model_file, data_files=self.data_files, initialize=True, missing_values=self.missing_values)
def _default_return_columns(self, which): """ Return a list of the model elements tha change on time that does not include lookup other functions that take parameters or run-cached functions. Parameters ---------- which: str or None If it is 'step' only cache step elements will be returned. Else cache 'step' and 'run' elements will be returned. Default is None. Returns ------- return_columns: list List of columns to return """ if which == 'step': types = ['step'] else: types = ['step', 'run'] return_columns = [] for key, pykey in self._namespace.items(): if pykey in self.cache_type and self.cache_type[pykey] in types\ and not self.get_args(pykey): return_columns.append(key) return return_columns def _split_capture_elements(self, capture_elements): """ Splits the capture elements list between those with run cache and others. Parameters ---------- capture_elements: list Captured elements list Returns ------- capture_dict: dict Dictionary of list with keywords step and run. """ capture_dict = {'step': [], 'run': [], None: []} [capture_dict[self.cache_type[element]].append(element) for element in capture_elements] return capture_dict
[docs] def set_initial_condition(self, initial_condition): """ Set the initial conditions of the integration. Parameters ---------- initial_condition : str or (float, dict) or pathlib.Path The starting time, and the state of the system (the values of all the stocks) at that starting time. 'original' or 'o'uses model-file specified initial condition. 'current' or 'c' uses the state of the model after the previous execution. Other str objects, loads initial conditions from the pickle file with the given name.(float, dict) tuple lets the user specify a starting time (float) and (possibly partial) dictionary of initial values for stock (stateful) objects. Examples -------- >>> model.set_initial_condition('original') >>> model.set_initial_condition('current') >>> model.set_initial_condition('exported_pickle.pic') >>> model.set_initial_condition((10, {'teacup_temperature': 50})) See also -------- :func:`pysd.py_backend.model.Macro.set_initial_value` """ if isinstance(initial_condition, str)\ and initial_condition.lower() not in ["original", "o", "current", "c"]: initial_condition = Path(initial_condition) if isinstance(initial_condition, tuple): self.initialize() self.set_initial_value(*initial_condition) elif isinstance(initial_condition, Path): self.import_pickle(initial_condition) self.time.set_control_vars(initial_time=self.time()) elif isinstance(initial_condition, str): if initial_condition.lower() in ["original", "o"]: self.time.set_control_vars( initial_time=self.components._control_vars["initial_time"]) self.initialize() else: raise TypeError( "Invalid initial conditions. " + "Check documentation for valid entries or use " + "'help(model.set_initial_condition)'.")
def _euler_step(self, dt): """ Performs a single step in the euler integration, updating stateful components Parameters ---------- dt : float This is the amount to increase time by this step """ self.state = self.state + self.ddt() * dt def _integrate(self): """ Performs euler integration and writes results to the out_obj. Returns ------- None """ if self.progress: # initialize progress bar progressbar = utils.ProgressBar( int((self.time.final_time()-self.time())/self.time.time_step()) ) else: # when None is used the update will do nothing progressbar = utils.ProgressBar(None) # performs the time stepping while self.time.in_bounds(): if self.time.in_return(): self.output.update(self) self._integrate_step() progressbar.update() # need to add one more time step, because we run only the state # updates in the previous loop and thus may be one short. if self.time.in_return(): self.output.update(self) progressbar.finish() def _integrate_step(self): self._euler_step(self.time.time_step()) self.time.update(self.time()+self.time.time_step()) self.clean_caches()
[docs] def export(self, file_name): """ Export stateful values to pickle file. Parameters ---------- file_name: str or pathlib.Path Name of the file to export the values. See also -------- :func:`pysd.py_backend.model.Model.import_pickle` """ warnings.warn( "\nCompatibility of exported states could be broken between" " different versions of PySD or xarray, current versions:\n" f"\tPySD {__version__}\n\txarray {xr.__version__}\n" ) with open(file_name, 'wb') as file: pickle.dump( (self.time.export(), super().export(), {'pysd': __version__, 'xarray': xr.__version__} ), file)
[docs] def import_pickle(self, file_name): """ Import stateful values from pickle file. Parameters ---------- file_name: str or pathlib.Path Name of the file to import the values from. See also -------- :func:`pysd.py_backend.model.Model.export_pickle` """ with open(file_name, 'rb') as file: time_dict, stateful_dict, metadata = pickle.load(file) if __version__ != metadata['pysd']\ or xr.__version__ != metadata['xarray']: # pragma: no cover warnings.warn( "\nCompatibility of exported states could be broken between" " different versions of PySD or xarray. Current versions:\n" f"\tPySD {__version__}\n\txarray {xr.__version__}\n" "Loaded versions:\n" f"\tPySD {metadata['pysd']}\n\txarray {metadata['xarray']}\n" ) self.time._set_time(time_dict) self._set_stateful(stateful_dict)