Source code for pysd.py_backend.functions

"""
The provided functions have no direct analog in the standard Python data
analytics stack, or require information about the internal state of the
system beyond what is present in the function call. They are provided
in a structure that makes it easy for the model elements to call. The
functions may be similar to the original functions given by Vensim or
Stella, but sometimes the number or order of arguments may change.
"""
import warnings

import numpy as np
import xarray as xr

small_vensim = 1e-6  # What is considered zero according to Vensim Help


[docs]def ramp(time, slope, start, finish=None): """ Implements vensim's and xmile's RAMP function. Parameters ---------- time: callable Function that returns the current time. slope: float The slope of the ramp starting at zero at time start. start: float Time at which the ramp begins. finish: float or None (oprional) Time at which the ramp ends. If None the ramp will never end. Default is None. Returns ------- float or xarray.DataArray: If prior to ramp start, returns zero. If after ramp ends, returns top of ramp. """ t = time() if t < start: return 0 else: if finish is None: return slope * (t - start) elif t > finish: return slope * (finish - start) else: return slope * (t - start)
[docs]def step(time, value, tstep): """" Implements vensim's STEP function. Parameters ---------- time: callable Function that returns the current time. value: float The height of the step. tstep: float The time at and after which `result` equals `value`. Returns ------- float or xarray.DataArray: - In range [-inf, tstep): returns 0 - In range [tstep, +inf]: returns `value` """ return value if time() >= tstep else 0
[docs]def pulse(time, start, repeat_time=0, width=None, magnitude=None, end=None): """ Implements Vensim's PULSE and PULSE TRAIN functions and Xmile's PULSE function. Parameters ---------- time: callable Function that returns the current time. start: float Starting time of the pulse. repeat_time: float (optional) Time interval of the pulse repetition. If 0 it will return a single pulse. Default is 0. width: float or None (optional) Duration of the pulse. If None only one-time_step pulse will be generated. Default is None. magnitude: float or None (optional) The magnitude of the pulse. If None it will return 1 when the pulse happens, similar to magnitude=time_step(). Default is None. end: float or None (optional) Final time of the pulse. If None there is no final time. Default is None. Returns ------- float or xarray.DataArray: - In range [-inf, start): returns 0 - In range [start + n*repeat_time, start + n*repeat_time + width): returns magnitude/time_step or 1 - In range [start + n*repeat_time + width, start + (n+1)*repeat_time): returns 0 """ t = time() width = .5*time.time_step() if width is None else width out = magnitude/time.time_step() if magnitude is not None else 1 if repeat_time == 0: return out if start - small_vensim <= t < start + width else 0 elif start <= t and (end is None or t < end): return out if (t - start + small_vensim) % repeat_time < width else 0 else: return 0
[docs]def if_then_else(condition, val_if_true, val_if_false): """ Implements Vensim's IF THEN ELSE function. https://www.vensim.com/documentation/20475.htm Parameters ---------- condition: bool or xarray.DataArray of bools val_if_true: callable Value to evaluate and return when condition is true. val_if_false: callable Value to evaluate and return when condition is false. Returns ------- float or xarray.DataArray: The value depending on the condition. """ # NUMPY: replace xr by np if isinstance(condition, xr.DataArray): # NUMPY: neccessarry for keep the same shape always # if condition.all(): # value = val_if_true() # elif not condition.any(): # value = val_if_false() # else: # return np.where(condition, val_if_true(), val_if_false()) # # if isinstance(value, np.ndarray): # return value # return np.full_like(condition, value) if condition.all(): return val_if_true() elif not condition.any(): return val_if_false() return xr.where(condition, val_if_true(), val_if_false()) return val_if_true() if condition else val_if_false()
[docs]def xidz(numerator, denominator, x): """ Implements Vensim's XIDZ function. https://www.vensim.com/documentation/fn_xidz.htm This function executes a division, robust to denominator being zero. In the case of zero denominator, the final argument is returned. Parameters ---------- numerator: float or xarray.DataArray Numerator of the operation. denominator: float or xarray.DataArray Denominator of the operation. x: float or xarray.DataArray The value to return if the denominator is zero. Returns ------- float or xarray.DataArray: - numerator/denominator if denominator > small_vensim - value_if_denom_is_zero otherwise """ # NUMPY: replace DataArray by np.ndarray, xr.where -> np.where if isinstance(denominator, xr.DataArray): return xr.where(np.abs(denominator) < small_vensim, x, numerator/denominator) if abs(denominator) < small_vensim: # NUMPY: neccessarry for keep the same shape always # if isinstance(numerator, np.ndarray): # return np.full_like(numerator, x) return x else: # NUMPY: neccessarry for keep the same shape always # if isinstance(x, np.ndarray): # return np.full_like(x, numerator/denominator) return numerator/denominator
[docs]def zidz(numerator, denominator): """ This function bypasses divide-by-zero errors, implementing Vensim's ZIDZ function https://www.vensim.com/documentation/fn_zidz.htm Parameters ---------- numerator: float or xarray.DataArray value to be divided denominator: float or xarray.DataArray value to devide by Returns ------- float or xarray.DataArray: - numerator/denominator if denominator > small_vensim - 0 or 0s array otherwise """ # NUMPY: replace DataArray by np.ndarray, xr.where -> np.where if isinstance(denominator, xr.DataArray): return xr.where(np.abs(denominator) < small_vensim, 0, numerator/denominator) if abs(denominator) < small_vensim: # NUMPY: neccessarry for keep the same shape always # if isinstance(denominator, np.ndarray): # return np.zeros_like(denominator) if isinstance(numerator, xr.DataArray): return xr.DataArray(0, numerator.coords, numerator.dims) return 0 else: return numerator/denominator
[docs]def active_initial(stage, expr, init_val): """ Implements vensim's ACTIVE INITIAL function Parameters ---------- stage: str The stage of the model. expr: callable Running stage value init_val: float or xarray.DataArray Initialization stage value. Returns ------- float or xarray.DataArray: - inti_val if stage='Initialization' - expr() otherwise """ # NUMPY: both must have same dimensions in inputs, remove time.stage if stage == 'Initialization': return init_val else: return expr()
def incomplete(*args): warnings.warn( 'Call to undefined function, calling dependencies and returning NaN', RuntimeWarning, stacklevel=2) return np.nan def not_implemented_function(*args): raise NotImplementedError(f"Not implemented function '{args[0]}'")
[docs]def integer(x): """ Implements Vensim's INTEGER function. Parameters ---------- x: float or xarray.DataArray Input value. Returns ------- integer: float or xarray.DataArray Returns integer part of x. """ # NUMPY: replace xr by np if isinstance(x, xr.DataArray): return x.astype(int) else: return int(x)
[docs]def quantum(a, b): """ Implements Vensim's QUANTUM function. Parameters ---------- a: float or xarray.DataArray Input value. b: float or xarray.DataArray Input value. Returns ------- quantum: float or xarray.DataArray If b > 0 returns b * integer(a/b). Otherwise, returns a. """ # NUMPY: replace xr by np if isinstance(b, xr.DataArray): return xr.where(b < small_vensim, a, b*integer(a/b)) if b < small_vensim: return a else: return b*integer(a/b)
[docs]def modulo(x, m): """ Implements Vensim's MODULO function. Parameters ---------- x: float or xarray.DataArray Input value. m: float or xarray.DataArray Modulo to compute. Returns ------- modulo: float or xarray.DataArray Returns x modulo m, if x is smaller than 0 the result is given in the range (-m, 0] as Vensim does. x - quantum(x, m) """ return x - quantum(x, m)
[docs]def sum(x, dim=None): """ Implements Vensim's SUM function. Parameters ---------- x: xarray.DataArray Input value. dim: list of strs (optional) Dimensions to apply the function over. If not given the function will be applied over all dimensions. Returns ------- sum: xarray.DataArray or float The result of the sum operation in the given dimensions. """ # NUMPY: replace by np.sum(x, axis=axis) put directly in the file # float returned if the function is applied over all the dimensions if dim is None or set(x.dims) == set(dim): return float(x.sum()) return x.sum(dim=dim)
[docs]def prod(x, dim=None): """ Implements Vensim's PROD function. Parameters ---------- x: xarray.DataArray Input value. dim: list of strs (optional) Dimensions to apply the function over. If not given the function will be applied over all dimensions. Returns ------- prod: xarray.DataArray or float The result of the product operation in the given dimensions. """ # NUMPY: replace by np.prod(x, axis=axis) put directly in the file # float returned if the function is applied over all the dimensions if dim is None or set(x.dims) == set(dim): return float(x.prod()) return x.prod(dim=dim)
[docs]def vmin(x, dim=None): """ Implements Vensim's Vmin function. Parameters ---------- x: xarray.DataArray Input value. dim: list of strs (optional) Dimensions to apply the function over. If not given the function will be applied over all dimensions. Returns ------- vmin: xarray.DataArray or float The result of the minimum value over the given dimensions. """ # NUMPY: replace by np.min(x, axis=axis) put directly in the file # float returned if the function is applied over all the dimensions if dim is None or set(x.dims) == set(dim): return float(x.min()) return x.min(dim=dim)
[docs]def vmax(x, dim=None): """ Implements Vensim's VMAX function. Parameters ---------- x: xarray.DataArray Input value. dim: list of strs (optional) Dimensions to apply the function over. If not given the function will be applied over all dimensions. Returns ------- vmax: xarray.DataArray or float The result of the maximum value over the dimensions. """ # NUMPY: replace by np.max(x, axis=axis) put directly in the file # float returned if the function is applied over all the dimensions if dim is None or set(x.dims) == set(dim): return float(x.max()) return x.max(dim=dim)
[docs]def invert_matrix(mat): """ Implements Vensim's INVERT MATRIX function. Invert the matrix defined by the last two dimensions of xarray.DataArray. Parameters ----------- mat: xarray.DataArray The matrix to invert. Returns ------- mat1: xarray.DataArray Inverted matrix. """ # NUMPY: avoid converting to xarray, put directly the expression # in the model return xr.DataArray(np.linalg.inv(mat.values), mat.coords, mat.dims)