"""
The Stateful objects are used and updated each time step with an update
method. This include Integs, Delays, Forecasts, Smooths, and Trends,
between others. The Macro class and Model class are also Stateful type.
However, they are defined appart as they are more complex.
"""
import warnings
import numpy as np
import xarray as xr
from .functions import zidz, if_then_else
SMALL_VENSIM = 1e-6 # What is considered zero according to Vensim Help
class Stateful(object):
# the integrator needs to be able to 'get' the current state of the object,
# and get the derivative. It calculates the new state, and updates it.
# The state can be any object which is subject to basic (element-wise)
# algebraic operations
def __init__(self):
self._state = None
self.shape_info = None
self.py_name = ""
def __call__(self, *args, **kwargs):
return self.state
@property
def state(self):
if self._state is None:
raise AttributeError(
self.py_name
+ "\nAttempt to call stateful element"
+ " before it is initialized.")
return self._state
@state.setter
def state(self, new_value):
if self.shape_info:
self._state = xr.DataArray(data=new_value, **self.shape_info)
else:
self._state = new_value
class DynamicStateful(Stateful):
def __init__(self):
super().__init__()
def update(self, state):
try:
self.state = state
except Exception as err:
raise ValueError(err.args[0] + "\n\n"
+ "Could not update the value of "
+ self.py_name)
[docs]
class Integ(DynamicStateful):
"""
Implements INTEG function.
Parameters
----------
ddt: callable
Derivate to integrate.
initial_value: callable
Initial value.
py_name: str
Python name to identify the object.
Attributes
----------
state: float or xarray.DataArray
Current state of the object. Value of the stock.
"""
def __init__(self, ddt, initial_value, py_name):
super().__init__()
self.init_func = initial_value
self.ddt = ddt
self.shape_info = None
self.py_name = py_name
def initialize(self, init_val=None):
if init_val is None:
self.state = self.init_func()
else:
self.state = init_val
if isinstance(self.state, xr.DataArray):
self.shape_info = {'dims': self.state.dims,
'coords': self.state.coords}
def export(self):
return {'state': self.state, 'shape_info': self.shape_info}
[docs]
class NonNegativeInteg(Integ):
"""
Implements non negative INTEG function.
Parameters
----------
ddt: callable
Derivate to integrate.
initial_value: callable
Initial value.
py_name: str
Python name to identify the object.
Attributes
----------
state: float or xarray.DataArray
Current state of the object. Value of the stock.
"""
def __init__(self, ddt, initial_value, py_name):
super().__init__(ddt, initial_value, py_name)
def update(self, state):
self.state = np.maximum(state, 0)
[docs]
class Delay(DynamicStateful):
"""
Implements DELAY function.
Parameters
----------
delay_input: callable
Input of the delay.
delay_time: callable
Delay time.
initial_value: callable
Initial value.
order: callable
Delay order.
tsetp: callable
The time step of the model.
py_name: str
Python name to identify the object.
Attributes
----------
state: numpy.array or xarray.DataArray
Current state of the object. Array of the delays values multiplied
by their corresponding average time.
"""
# note that we could have put the `delay_input` argument as a parameter to
# the `__call__` function, and more closely mirrored the vensim syntax.
# However, people may get confused this way in thinking that they need
# only one delay object and can call it with various arguments to delay
# whatever is convenient. This method forces them to acknowledge that
# additional structure is being created in the delay object.
def __init__(self, delay_input, delay_time, initial_value, order, tstep,
py_name):
super().__init__()
self.init_func = initial_value
self.delay_time_func = delay_time
self.input_func = delay_input
self.order_func = order
self.order = None
self.tstep = tstep
self.shape_info = None
self.py_name = py_name
def initialize(self, init_val=None):
order = self.order_func()
if order != int(order):
warnings.warn(self.py_name + '\n' +
'Casting delay order '
+ f'from {order} to {int(order)}')
self.order = int(order) # The order can only be set once
if self.order*self.tstep() > np.min(self.delay_time_func()):
while self.order*self.tstep() > np.min(self.delay_time_func()):
self.order -= 1
warnings.warn(self.py_name + '\n' +
'Delay time very small, casting delay order '
+ f'from {int(order)} to {self.order}')
if init_val is None:
init_state_value = self.init_func() * self.delay_time_func()
else:
init_state_value = init_val * self.delay_time_func()
if isinstance(init_state_value, xr.DataArray):
# broadcast self.state
self.state = init_state_value.expand_dims({
'_delay': np.arange(self.order)}, axis=0)
self.shape_info = {'dims': self.state.dims,
'coords': self.state.coords}
else:
self.state = np.array([init_state_value] * self.order)
def __call__(self):
if self.shape_info:
return self.state[-1].reset_coords('_delay', drop=True)\
/ self.delay_time_func()
else:
return self.state[-1] / self.delay_time_func()
def ddt(self):
outflows = self.state / self.delay_time_func()
inflows = np.roll(outflows, 1, axis=0)
if self.shape_info:
inflows[0] = self.input_func().values
else:
inflows[0] = self.input_func()
return (inflows - outflows) * self.order
def export(self):
return {'state': self.state, 'shape_info': self.shape_info}
[docs]
class DelayN(DynamicStateful):
"""
Implements DELAY N function.
Parameters
----------
delay_input: callable
Input of the delay.
delay_time: callable
Delay time.
initial_value: callable
Initial value.
order: callable
Delay order.
tsetp: callable
The time step of the model.
py_name: str
Python name to identify the object.
Attributes
----------
state: numpy.array or xarray.DataArray
Current state of the object. Array of the delays values multiplied
by their corresponding average time.
times: numpy.array or xarray.DataArray
Array of delay times used for computing the delay output.
If delay_time is constant, this array will be constant and
DelayN will behave ad Delay.
"""
# note that we could have put the `delay_input` argument as a parameter to
# the `__call__` function, and more closely mirrored the vensim syntax.
# However, people may get confused this way in thinking that they need
# only one delay object and can call it with various arguments to delay
# whatever is convenient. This method forces them to acknowledge that
# additional structure is being created in the delay object.
def __init__(self, delay_input, delay_time, initial_value, order, tstep,
py_name):
super().__init__()
self.init_func = initial_value
self.delay_time_func = delay_time
self.input_func = delay_input
self.order_func = order
self.order = None
self.times = None
self.tstep = tstep
self.shape_info = None
self.py_name = py_name
def initialize(self, init_val=None):
order = self.order_func()
if order != int(order):
warnings.warn(self.py_name + '\n' +
'Casting delay order '
+ f'from {order} to {int(order)}')
self.order = int(order) # The order can only be set once
if self.order*self.tstep() > np.min(self.delay_time_func()):
while self.order*self.tstep() > np.min(self.delay_time_func()):
self.order -= 1
warnings.warn(self.py_name + '\n' +
'Delay time very small, casting delay order '
+ f'from {int(order)} to {self.order}')
if init_val is None:
init_state_value = self.init_func() * self.delay_time_func()
else:
init_state_value = init_val * self.delay_time_func()
if isinstance(init_state_value, xr.DataArray):
# broadcast self.state
self.state = init_state_value.expand_dims({
'_delay': np.arange(self.order)}, axis=0)
self.times = self.delay_time_func().expand_dims({
'_delay': np.arange(self.order)}, axis=0)
self.shape_info = {'dims': self.state.dims,
'coords': self.state.coords}
else:
self.state = np.array([init_state_value] * self.order)
self.times = np.array([self.delay_time_func()] * self.order)
def __call__(self):
if self.shape_info:
return self.state[-1].reset_coords('_delay', drop=True)\
/ self.times[0].reset_coords('_delay', drop=True)
else:
return self.state[-1] / self.times[0]
def ddt(self):
if self.shape_info:
# if is xarray need to preserve coords
self.times = self.times.roll({'_delay': 1}, False)
self.times[0] = self.delay_time_func()
outflows = self.state / self.times
inflows = outflows.roll({'_delay': 1}, False)
else:
# if is float use numpy.roll
self.times = np.roll(self.times, 1, axis=0)
self.times[0] = self.delay_time_func()
outflows = self.state / self.times
inflows = np.roll(outflows, 1, axis=0)
inflows[0] = self.input_func()
return (inflows - outflows)*self.order
def export(self):
return {'state': self.state, 'times': self.times,
'shape_info': self.shape_info}
[docs]
class DelayFixed(DynamicStateful):
"""
Implements DELAY FIXED function.
Parameters
----------
delay_input: callable
Input of the delay.
delay_time: callable
Delay time.
initial_value: callable
Initial value.
tsetp: callable
The time step of the model.
py_name: str
Python name to identify the object.
Attributes
----------
state: float or xarray.DataArray
Current state of the object, equal to pipe[pointer].
pipe: list
List of the delays values.
pointer: int
Pointer to the last value in the pipe
"""
def __init__(self, delay_input, delay_time, initial_value, tstep,
py_name):
super().__init__()
self.init_func = initial_value
self.delay_time_func = delay_time
self.input_func = delay_input
self.tstep = tstep
self.order = None
self.pointer = 0
self.py_name = py_name
def initialize(self, init_val=None):
order = max(self.delay_time_func()/self.tstep(), 1)
if order != int(order):
warnings.warn(
self.py_name + '\n'
+ 'Casting delay order from %f to %i' % (
order, round(order + SMALL_VENSIM)))
# need to add a small decimal to ensure that 0.5 is rounded to 1
# The order can only be set once
self.order = round(order + SMALL_VENSIM)
# set the pointer to 0
self.pointer = 0
if init_val is None:
init_state_value = self.init_func()
else:
init_state_value = init_val
self.state = init_state_value
self.pipe = [init_state_value] * self.order
def __call__(self):
return self.state
def ddt(self):
return np.nan
def update(self, state):
self.pipe[self.pointer] = self.input_func()
self.pointer = (self.pointer + 1) % self.order
self.state = self.pipe[self.pointer]
def export(self):
return {'state': self.state, 'pointer': self.pointer,
'pipe': self.pipe}
[docs]
class Forecast(DynamicStateful):
"""
Implements FORECAST function.
Parameters
----------
forecast_input: callable
Input of the forecast.
average_time: callable
Average time.
horizon: callable
Forecast horizon.
initial_trend: callable
Initial trend of the forecast.
py_name: str
Python name to identify the object.
Attributes
----------
state: float or xarray.DataArray
Current state of the object. AV value by Vensim docs.
"""
def __init__(self, forecast_input, average_time, horizon, initial_trend,
py_name):
super().__init__()
self.horizon = horizon
self.average_time = average_time
self.input = forecast_input
self.initial_trend = initial_trend
self.py_name = py_name
def initialize(self, init_trend=None):
# self.state = AV in the vensim docs
if init_trend is None:
self.state = self.input() / (1 + self.initial_trend())
else:
self.state = self.input() / (1 + init_trend)
if isinstance(self.state, xr.DataArray):
self.shape_info = {'dims': self.state.dims,
'coords': self.state.coords}
def __call__(self):
return self.input() * (
1 + zidz(self.input() - self.state,
self.average_time() * self.state
)*self.horizon()
)
def ddt(self):
return (self.input() - self.state) / self.average_time()
def export(self):
return {'state': self.state, 'shape_info': self.shape_info}
[docs]
class Smooth(DynamicStateful):
"""
Implements SMOOTH function.
Parameters
----------
smooth_input: callable
Input of the smooth.
smooth_time: callable
Smooth time.
initial_value: callable
Initial value.
order: callable
Delay order.
py_name: str
Python name to identify the object.
Attributes
----------
state: numpy.array or xarray.DataArray
Current state of the object. Array of the inputs having the
value to return in the last position.
"""
def __init__(self, smooth_input, smooth_time, initial_value, order,
py_name):
super().__init__()
self.init_func = initial_value
self.smooth_time_func = smooth_time
self.input_func = smooth_input
self.order_func = order
self.order = None
self.shape_info = None
self.py_name = py_name
def initialize(self, init_val=None):
self.order = self.order_func() # The order can only be set once
if init_val is None:
init_state_value = self.init_func()
else:
init_state_value = init_val
if isinstance(init_state_value, xr.DataArray):
# broadcast self.state
self.state = init_state_value.expand_dims({
'_smooth': np.arange(self.order)}, axis=0)
self.shape_info = {'dims': self.state.dims,
'coords': self.state.coords}
else:
self.state = np.array([init_state_value] * self.order)
def __call__(self):
if self.shape_info:
return self.state[-1].reset_coords('_smooth', drop=True)
else:
return self.state[-1]
def ddt(self):
targets = np.roll(self.state, 1, axis=0)
if self.shape_info:
targets[0] = self.input_func().values
else:
targets[0] = self.input_func()
return (targets - self.state) * self.order / self.smooth_time_func()
def export(self):
return {'state': self.state, 'shape_info': self.shape_info}
[docs]
class Trend(DynamicStateful):
"""
Implements TREND function.
Parameters
----------
trend_input: callable
Input of the trend.
average_time: callable
Average time.
initial_trend: callable
Initial trend.
py_name: str
Python name to identify the object.
Attributes
----------
state: float or xarray.DataArray
Current state of the object. AV value by Vensim docs.
"""
def __init__(self, trend_input, average_time, initial_trend, py_name):
super().__init__()
self.init_func = initial_trend
self.average_time_function = average_time
self.input_func = trend_input
self.py_name = py_name
def initialize(self, init_trend=None):
if init_trend is None:
self.state = self.input_func()\
/ (1 + self.init_func()*self.average_time_function())
else:
self.state = self.input_func()\
/ (1 + init_trend*self.average_time_function())
if isinstance(self.state, xr.DataArray):
self.shape_info = {'dims': self.state.dims,
'coords': self.state.coords}
def __call__(self):
return zidz(self.input_func() - self.state,
self.average_time_function() * np.abs(self.state))
def ddt(self):
return (self.input_func() - self.state) / self.average_time_function()
def export(self):
return {'state': self.state, 'shape_info': self.shape_info}
[docs]
class SampleIfTrue(DynamicStateful):
"""
Implements SAMPLE IF TRUE function.
Parameters
----------
condition: callable
Condition for sample.
actual_value: callable
Value to update if condition is true.
initial_value: callable
Initial value.
py_name: str
Python name to identify the object.
Attributes
----------
state: float or xarray.DataArray
Current state of the object. Last actual_value when condition
was true or the initial_value if condition has never been true.
"""
def __init__(self, condition, actual_value, initial_value, py_name):
super().__init__()
self.condition = condition
self.actual_value = actual_value
self.init_func = initial_value
self.py_name = py_name
def initialize(self, init_val=None):
if init_val is None:
self.state = self.init_func()
else:
self.state = init_val
if isinstance(self.state, xr.DataArray):
self.shape_info = {'dims': self.state.dims,
'coords': self.state.coords}
def __call__(self):
return if_then_else(self.condition(),
self.actual_value,
lambda: self.state)
def ddt(self):
return np.nan
def update(self, state):
self.state = self.state*0 + if_then_else(self.condition(),
self.actual_value,
lambda: self.state)
def export(self):
return {'state': self.state, 'shape_info': self.shape_info}
[docs]
class Initial(Stateful):
"""
Implements INITIAL function.
Parameters
----------
initial_value: callable
Initial value.
py_name: str
Python name to identify the object.
Attributes
----------
state: float or xarray.DataArray
Current state of the object, which will always be the initial_value.
"""
def __init__(self, initial_value, py_name):
super().__init__()
self.init_func = initial_value
self.py_name = py_name
def initialize(self, init_val=None):
if init_val is None:
self.state = self.init_func()
else:
self.state = init_val
def export(self):
return {'state': self.state}