Source code for pysd.py_backend.functions

"""
The provided functions have no direct analog in the standard Python data
analytics stack, or require information about the internal state of the
system beyond what is present in the function call. They are provided
in a structure that makes it easy for the model elements to call. The
functions may be similar to the original functions given by Vensim or
Stella, but sometimes the number or order of arguments may change.
"""
import warnings
from datetime import datetime

import numpy as np
import xarray as xr

from . import utils

SMALL_VENSIM = 1e-6  # What is considered zero according to Vensim Help


[docs] def ramp(time, slope, start, finish=None): """ Implements vensim's and xmile's RAMP function. Parameters ---------- time: pysd.py_backend.components.Time Model time object. slope: float or xarray.DataArray The slope of the ramp starting at zero at time start. start: float or xarray.DataArray Time at which the ramp begins. finish: float or xarray.DataArray or None (oprional) Time at which the ramp ends. If None the ramp will never end. Default is None. Returns ------- float or xarray.DataArray: If prior to ramp start, returns zero. If after ramp ends, returns top of ramp. """ t = time() # compute out (MIN(time, finish)-start)*slope if finish is None: final = t else: final = np.minimum(finish, t) return (t + SMALL_VENSIM > start) * slope * (final-start)
[docs] def step(time, value, tstep): """" Implements vensim's STEP function. Parameters ---------- time: pysd.py_backend.components.Time Model time object. value: float or xarray.DataArray The height of the step. tstep: float or xarray.DataArray The time at and after which `result` equals `value`. Returns ------- float or xarray.DataArray: - In range [-inf, tstep): returns 0 - In range [tstep, +inf]: returns `value` """ return value * (time() + time.time_step()/2 > tstep)
[docs] def pulse(time, start, repeat_time=0, width=None, magnitude=None, end=None): """ Implements Vensim's PULSE and PULSE TRAIN functions and Xmile's PULSE function. Parameters ---------- time: pysd.py_backend.components.Time Model time object. start: float Starting time of the pulse. repeat_time: float (optional) Time interval of the pulse repetition. If 0 it will return a single pulse. Default is 0. width: float or None (optional) Duration of the pulse. If None only one-time_step pulse will be generated. Default is None. magnitude: float or None (optional) The magnitude of the pulse. If None it will return 1 when the pulse happens, similar to magnitude=time_step(). Default is None. end: float or None (optional) Final time of the pulse. If None there is no final time. Default is None. Returns ------- float or xarray.DataArray: - In range [-inf, start): returns 0 - In range [start + n*repeat_time, start + n*repeat_time + width): returns magnitude/time_step or 1 - In range [start + n*repeat_time + width, start + (n+1)*repeat_time): returns 0 """ t = time() width = .5*time.time_step() if width is None else width out = magnitude/time.time_step() if magnitude is not None else 1 if repeat_time == 0: return out if start - SMALL_VENSIM <= t < start + width else 0 elif start <= t and (end is None or t < end): return out if (t - start + SMALL_VENSIM) % repeat_time < width else 0 else: return 0
[docs] def if_then_else(condition, val_if_true, val_if_false): """ Implements Vensim's IF THEN ELSE function. https://www.vensim.com/documentation/20475.htm Parameters ---------- condition: bool or xarray.DataArray of bools val_if_true: callable Value to evaluate and return when condition is true. val_if_false: callable Value to evaluate and return when condition is false. Returns ------- float or xarray.DataArray: The value depending on the condition. """ # NUMPY: replace xr by np if isinstance(condition, xr.DataArray): # NUMPY: neccessarry for keep the same shape always # if condition.all(): # value = val_if_true() # elif not condition.any(): # value = val_if_false() # else: # return np.where(condition, val_if_true(), val_if_false()) # # if isinstance(value, np.ndarray): # return value # return np.full_like(condition, value) if condition.all(): return val_if_true() elif not condition.any(): return val_if_false() return xr.where(condition, val_if_true(), val_if_false()) return val_if_true() if condition else val_if_false()
[docs] def xidz(numerator, denominator, x): """ Implements Vensim's XIDZ function. https://www.vensim.com/documentation/fn_xidz.htm This function executes a division, robust to denominator being zero. In the case of zero denominator, the final argument is returned. Parameters ---------- numerator: float or xarray.DataArray Numerator of the operation. denominator: float or xarray.DataArray Denominator of the operation. x: float or xarray.DataArray The value to return if the denominator is zero. Returns ------- float or xarray.DataArray: - numerator/denominator if denominator > small_vensim - value_if_denom_is_zero otherwise """ # NUMPY: replace DataArray by np.ndarray, xr.where -> np.where if isinstance(denominator, xr.DataArray): return xr.where(np.abs(denominator) < SMALL_VENSIM, x, numerator/denominator) if abs(denominator) < SMALL_VENSIM: # NUMPY: neccessarry for keep the same shape always # if isinstance(numerator, np.ndarray): # return np.full_like(numerator, x) return x else: # NUMPY: neccessarry for keep the same shape always # if isinstance(x, np.ndarray): # return np.full_like(x, numerator/denominator) return numerator/denominator
[docs] def zidz(numerator, denominator): """ This function bypasses divide-by-zero errors, implementing Vensim's ZIDZ function. https://www.vensim.com/documentation/fn_zidz.htm Parameters ---------- numerator: float or xarray.DataArray value to be divided denominator: float or xarray.DataArray value to devide by Returns ------- float or xarray.DataArray: - numerator/denominator if denominator > small_vensim - 0 or 0s array otherwise """ # NUMPY: replace DataArray by np.ndarray, xr.where -> np.where if isinstance(denominator, xr.DataArray): return xr.where(np.abs(denominator) < SMALL_VENSIM, 0, numerator/denominator) if abs(denominator) < SMALL_VENSIM: # NUMPY: neccessarry for keep the same shape always # if isinstance(denominator, np.ndarray): # return np.zeros_like(denominator) if isinstance(numerator, xr.DataArray): return xr.DataArray(0, numerator.coords, numerator.dims) return 0 else: return numerator/denominator
[docs] def active_initial(stage, expr, init_val): """ Implements vensim's ACTIVE INITIAL function Parameters ---------- stage: str The stage of the model. expr: callable Running stage value init_val: float or xarray.DataArray Initialization stage value. Returns ------- float or xarray.DataArray: - inti_val if stage='Initialization' - expr() otherwise """ # NUMPY: both must have same dimensions in inputs, remove time.stage if stage == 'Initialization': return init_val else: return expr()
[docs] def incomplete(*args): """ Implements an incomplete functions. Prompts a RuntimeWarning. Parameters ---------- *args: arguments Returns ------- numpy.nan """ warnings.warn( 'Call to undefined function, calling dependencies and returning NaN', RuntimeWarning, stacklevel=2) return np.nan
[docs] def not_implemented_function(*args): """ Implements a not implemented functions. Raises a NotImplementedError if it is called. Parameters ---------- *args: arguments The first argument must be the name of the function as str to properly print the error message. """ raise NotImplementedError(f"Not implemented function '{args[0]}'")
[docs] def integer(x): """ Implements Vensim's INTEGER function. Parameters ---------- x: float or xarray.DataArray Input value. Returns ------- integer: float or xarray.DataArray Returns integer part of x. """ # NUMPY: replace xr by np if isinstance(x, xr.DataArray): return x.astype(int) else: return int(x)
[docs] def quantum(a, b): """ Implements Vensim's QUANTUM function. Parameters ---------- a: float or xarray.DataArray Input value. b: float or xarray.DataArray Input value. Returns ------- quantum: float or xarray.DataArray If b > 0 returns b * integer(a/b). Otherwise, returns a. """ # NUMPY: replace xr by np if isinstance(b, xr.DataArray): return xr.where(b < SMALL_VENSIM, a, b*integer(a/b)) if b < SMALL_VENSIM: return a else: return b*integer(a/b)
[docs] def modulo(x, m): """ Implements Vensim's MODULO function. Parameters ---------- x: float or xarray.DataArray Input value. m: float or xarray.DataArray Modulo to compute. Returns ------- modulo: float or xarray.DataArray Returns x modulo m, if x is smaller than 0 the result is given in the range (-m, 0] as Vensim does. x - quantum(x, m) """ return x - quantum(x, m)
[docs] def sum(x, dim=None): """ Implements Vensim's SUM function. Parameters ---------- x: xarray.DataArray Input value. dim: list of strs (optional) Dimensions to apply the function over. If not given the function will be applied over all dimensions. Returns ------- sum: xarray.DataArray or float The result of the sum operation in the given dimensions. """ # NUMPY: replace by np.sum(x, axis=axis) put directly in the file # float returned if the function is applied over all the dimensions if dim is None or set(x.dims) == set(dim): return float(x.sum()) return x.sum(dim=dim)
[docs] def prod(x, dim=None): """ Implements Vensim's PROD function. Parameters ---------- x: xarray.DataArray Input value. dim: list of strs (optional) Dimensions to apply the function over. If not given the function will be applied over all dimensions. Returns ------- prod: xarray.DataArray or float The result of the product operation in the given dimensions. """ # NUMPY: replace by np.prod(x, axis=axis) put directly in the file # float returned if the function is applied over all the dimensions if dim is None or set(x.dims) == set(dim): return float(x.prod()) return x.prod(dim=dim)
[docs] def vmin(x, dim=None): """ Implements Vensim's Vmin function. Parameters ---------- x: xarray.DataArray Input value. dim: list of strs (optional) Dimensions to apply the function over. If not given the function will be applied over all dimensions. Returns ------- vmin: xarray.DataArray or float The result of the minimum value over the given dimensions. """ # NUMPY: replace by np.min(x, axis=axis) put directly in the file # float returned if the function is applied over all the dimensions if dim is None or set(x.dims) == set(dim): return float(x.min()) return x.min(dim=dim)
[docs] def vmax(x, dim=None): """ Implements Vensim's VMAX function. Parameters ---------- x: xarray.DataArray Input value. dim: list of strs (optional) Dimensions to apply the function over. If not given the function will be applied over all dimensions. Returns ------- vmax: xarray.DataArray or float The result of the maximum value over the dimensions. """ # NUMPY: replace by np.max(x, axis=axis) put directly in the file # float returned if the function is applied over all the dimensions if dim is None or set(x.dims) == set(dim): return float(x.max()) return x.max(dim=dim)
[docs] def invert_matrix(mat): """ Implements Vensim's INVERT MATRIX function. Invert the matrix defined by the last two dimensions of xarray.DataArray. Parameters ----------- mat: xarray.DataArray The matrix to invert. Returns ------- mat1: xarray.DataArray Inverted matrix. """ # NUMPY: avoid converting to xarray, put directly the expression # in the model return xr.DataArray(np.linalg.inv(mat.values), mat.coords, mat.dims)
[docs] def vector_select(selection_array, expression_array, dim, missing_vals, numerical_action, error_action): """ Implements Vensim's VECTOR SELECT function. http://vensim.com/documentation/fn_vector_select.html Parameters ---------- selection_array: xr.DataArray This specifies a selection array with a mixture of zeroes and non-zero values. expression_array: xarray.DataArray This is the expression that elements are being selected from based on the selection array. dim: list of strs Dimensions to apply the function over. missing_vals: float The value to use in the case where there are only zeroes in the selection array. numerical_action: int The action to take: - 0 It will calculate the weighted sum. - 1 When values in the selection array are non-zero, this will calculate the product of the selection_array * expression_array. - 2 The weighted minimum, for non zero values of the selection array, this is minimum of selection_array * expression_array. - 3 The weighted maximum, for non zero values of the selection array, this is maximum of selection_array * expression_array. - 4 For non zero values of the selection array, this is the average of selection_array * expression_array. - 5 When values in the selection array are non-zero, this will calculate the product of the expression_array ^ selection_array. - 6 When values in the selection array are non-zero, this will calculate the sum of the expression_array. The same as the SUM function for non-zero values in the selection array. - 7 When values in the selection array are non-zero, this will calculate the product of the expression_array. The same as the PROD function for non-zero values in the selection array. - 8 The unweighted minimum, for non zero values of the selection array, this is minimum of the expression_array. The same as the VMIN function for non-zero values in the selection array. - 9 The unweighted maximum, for non zero values of the selection array, this is maximum of expression_array. The same as the VMAX function for non-zero values in the selection array. - 10 For non zero values of the selection array, this is the average of expression_array. error_action: int Indicates how to treat too many or too few entries in the selection: - 0 No error is raised. - 1 Raise a floating point error is selection array only contains zeros. - 2 Raise an error if the selection array contains more than one non-zero value. - 3 Raise an error if all elements in selection array are zero, or more than one element is non-zero (this is a combination of error_action = 1 and error_action = 2). Returns ------- result: xarray.DataArray or float The output of the numerical action. """ zeros = (selection_array == 0).all(dim=dim) non_zeros = (selection_array != 0).sum(dim=dim) # Manage error actions if np.any(zeros) and error_action in (1, 3): raise FloatingPointError( "All the values of selection_array are 0...") if np.any(non_zeros > 1) and error_action in (2, 3): raise FloatingPointError( "More than one non-zero values in selection_array...") # Manage numeric actions (array to operate) # NUMPY: replace by np.where if numerical_action in range(5): array = xr.where( selection_array == 0, np.nan, selection_array * expression_array ) elif numerical_action == 5: warnings.warn( "Vensim's help says that numerical_action=5 computes the " "product of selection_array ^ expression_array. But, in fact," " Vensim is computing the product of expression_array ^ " " selection array. The output of this function behaves as " "Vensim, expression_array ^ selection_array." ) array = xr.where( selection_array == 0, np.nan, expression_array ** selection_array ) elif numerical_action in range(6, 11): array = xr.where( selection_array == 0, np.nan, expression_array ) else: raise ValueError( f"Invalid argument value 'numerical_action={numerical_action}'. " "'numerical_action' must be 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 or 10.") # Manage numeric actions (operation) # NUMPY: use the axis if numerical_action in (0, 6): out = array.sum(dim=dim, skipna=True) elif numerical_action in (1, 5, 7): out = array.prod(dim=dim, skipna=True) elif numerical_action in (2, 8): out = array.min(dim=dim, skipna=True) elif numerical_action in (3, 9): out = array.max(dim=dim, skipna=True) elif numerical_action in (4, 10): out = array.mean(dim=dim, skipna=True) # Replace missin vals if len(out.shape) == 0 and np.all(zeros): return missing_vals elif len(out.shape) == 0: return float(out) elif np.any(zeros): out.values[zeros.values] = missing_vals return out
[docs] def vector_sort_order(vector, direction): """ Implements Vensim's VECTOR SORT ORDER function. Sorting is done on the complete vector relative to the last subscript. https://www.vensim.com/documentation/fn_vector_sort_order.html Parameters ----------- vector: xarray.DataArray The vector to sort. direction: float The direction to sort the vector. If direction > 0 it will sort the vector entries from smallest to biggest, otherwise from biggest to smallest. Returns ------- vector_sorted: xarray.DataArray The sorted vector. """ # TODO: can direction be an array? In this case this will fail if direction <= 0: # NUMPY: return flip directly flip = np.flip(vector.argsort(), axis=-1) return xr.DataArray(flip.values, vector.coords, vector.dims) return vector.argsort()
[docs] def vector_reorder(vector, svector): """ Implements Vensim's VECTOR REORDER function. Reordering is done on the complete vector relative to the last subscript. https://www.vensim.com/documentation/fn_vector_reorder.html Parameters ----------- vector: xarray.DataArray The vector to sort. svector: xarray.DataArray The vector to specify the order. Returns ------- vector_sorted: xarray.DataArray The sorted vector. """ # NUMPY: Use directly numpy sort functions, no need to assign coords later if len(svector.dims) > 1: # TODO this may be simplified new_vector = vector.copy() dims = svector.dims # create an empty array to hold the orderings (only last dim) arr = xr.DataArray( np.nan, {dims[-1]: vector.coords[dims[-1]].values}, dims[-1:] ) # split the ordering array in 0-dim arrays svectors = utils.xrsplit(svector) orders = {} for sv in svectors: # regrup the ordering arrays using last dimensions pos = {dim: str(sv.coords[dim].values) for dim in dims[:-1]} key = ";".join(pos.values()) if key not in orders.keys(): orders[key] = (pos, arr.copy()) orders[key][1].loc[sv.coords[dims[-1]]] = sv.values for pos, array in orders.values(): # get the reordered array values = [vector.loc[pos].values[int(i)] for i in array.values] new_vector.loc[pos] = values return new_vector return vector[svector.values].assign_coords(vector.coords)
[docs] def vector_rank(vector, direction): """ Implements Vensim's VECTOR RANK function. Ranking is done on the complete vector relative to the last subscript. https://www.vensim.com/documentation/fn_vector_rank.html Parameters ----------- vector: xarray.DataArray The vector to sort. direction: float The direction to sort the vector. If direction > 1 it will rank the vector entries from smallest to biggest, otherwise from biggest to smallest. Returns ------- vector_rank: xarray.DataArray The rank of the vector. """ return vector_sort_order(vector, direction).argsort() + 1
[docs] def get_time_value(time, relativeto, offset, measure): """ Implements Vensim's GET TIME VALUE function. Warning, not all the cases are implemented. https://www.vensim.com/documentation/fn_get_time_value.html Parameters ---------- time: pysd.py_backend.components.Time Model time object. relativeto: int The time to take as a reference: - 0 for the current simulation time. - 1 for the initial simulation time. - 2 for the current computer clock time. offset: float or xarray.DataArray The difference in time, as measured in the units of Time for the model, to move before computing the value. offset is ignored when relativeto is 2. measure: int The units or measure of time: - 0 units of Time in the model (only for relativeto 0 and 1) - 1 years since 1 BC (an integer, same as the normal calendar year) - 2 quarter of year (1-4) - 3 month of year (1-12) - 4 day of month (1-31) - 5 day of week (0-6 where 0 is Sunday) - 6 days since Jan 1., 1 BC (year 1 BC is treated as year 0) - 7 hour of day (0-23) - 8 minute of hour (0-59) - 9 second of minute (0-59.999999 – not an integer) - 10 elapsed seconds modulo 500,000 (0-499,999) Returns ------- time_value: float or int The resulting time value. """ if relativeto == 0: # Current time ctime = time() elif relativeto == 1: # Initial time # Not implemented as it doesn't work as Vensim docs say # TODO check other versions or implement it as it should be? raise NotImplementedError("'relativeto=1' not implemented...") # ctime = time.initial_time() elif relativeto == 2: # Machine time ctime = utils.get_current_computer_time() else: # Invalid value raise ValueError( f"Invalid argument value 'relativeto={relativeto}'. " "'relativeto' must be 0, 1 or 2.") if measure == 0: # units of Time in the model (only for relativeto 0 and 1) if relativeto == 2: # measure=0 only supported with relativeto=0 or 1 raise ValueError( "Invalid argument 'measure=0' with 'relativeto=2'.") else: return ctime + offset elif measure == 1: # years since 1 BC (an integer, same as the normal calendar year) if relativeto == 2: return ctime.year elif measure == 2: # quarter of year (1-4) if relativeto == 2: return int(1 + (ctime.month-0.5) // 3) elif measure == 3: # month of year (1-12) if relativeto == 2: return ctime.month elif measure == 4: # day of month (1-31) if relativeto == 2: return ctime.day elif measure == 5: # day of week (0-6 where 0 is Sunday) if relativeto == 2: return ctime.weekday() elif measure == 6: # days since Jan 1., 1 BC (year 1 BC is treated as year 0) if relativeto == 2: return (ctime - datetime(1, 1, 1)).days elif measure == 7: # hour of day (0-23) if relativeto == 2: return ctime.hour elif measure == 8: # minute of hour (0-59) if relativeto == 2: return ctime.minute elif measure == 9: # second of minute (0-59.99 – not an integer) if relativeto == 2: return ctime.second + 1e-6*ctime.microsecond elif measure == 10: # elapsed seconds modulo 500,000 (0-499,999) if relativeto == 2: return (ctime - datetime(1, 1, 1)).seconds % 500000 else: raise ValueError( f"Invalid argument value 'measure={measure}'. " "'measure' must be 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 or 10.") # TODO include other measures for relativeto=0 raise NotImplementedError( f"The case 'relativeto={relativeto}' and 'measure={measure}' " "is not implemented..." )