Source code for pysd.py_backend.allocation

"""
The provided allocation functions have no direct analog in the standard
Python data analytics stack. They are provided in a structure that makes
it easy for the model elements to call. The functions may be similar to
the original functions given by Vensim, but sometimes the number or
order of arguments may change. The allocation functions may call a
protected function or class method thatintegrates the algorithm to
compute the allocation. The algorithms are briefly explained in these
functions docstring.

Note
----
The Allocation functions basis is explained in the Vensim documentation.
https://www.vensim.com/documentation/allocation_overview.html

Warning
-------
Some allocation function's results may differ from the result given by
Vensim as optimization functions are used to solve the allocation
problems. Those algorithms may not work in the same way or may
have differences in the numerical error propagation.

"""
import itertools
from math import erfc

import numpy as np
import xarray as xr
from scipy.optimize import least_squares
import portion as p


class Priorities:
    @classmethod
    def get_functions(cls, q0, pp, kind):
        """
        Get priority functions based on the demand/supply and priority profile.

        Parameters
        ----------
        q0: numpy.array
            values of maximum demand or supply of each component.
            Its shape should be (n,)
        pp: numpy.array
            pp values array. Its shape should be (n, m).
        kind: str ("demand" or "supply")
            The kind of priority "demand" or "supply".

        Returns
        -------
        functions: list of functions
            List of allocation supply or demand function for each element.

        full_allocation: function
            Full allocation function. It is the result function of
            addying all the functions.

        def_intervals: list of tuples
            List of (supply interval, priority interval, mean priority)
            where the full_allocation function is extrictly monotonous
            (injective). Givin a supply value, this is used to compute
            the limits and starting point of the optimization problem.

        """
        if np.any(pp[:, 2] <= 0):
            # pwidth values smaller than 0
            raise ValueError("pwidth values must be positive.")

        if kind == "demand":
            # Get the list of priority functions and the intervals where
            # they are strictly monotonous (injective function)
            func_int = [
                cls.get_function_demand(q0[i], pp[i])
                for i in range(pp.shape[0])
            ]
            # In order to get the range of the full_allocation function,
            # we need to flip the lower and the upper value, as it is a
            # decreasing function for demand
            int_attr = {"lower": "upper", "upper": "lower"}
        elif kind == "supply":  # pragma: no cover
            # Get the list of priority functions and the intervals where
            # they are strictly monotonous (injective function)
            func_int = [
                cls.get_function_supply(q0[i], pp[i])
                for i in range(pp.shape[0])
            ]
            # In order to get the range of the full_allocation function,
            # we need to keep the lower and the upper value, as it is a
            # increasing function for supply
            int_attr = {"lower": "lower", "upper": "upper"}
        else:
            raise ValueError(
                f"kind='{kind}' is not allowed. kind should be "
                "'demand' or 'supply'.")

        functions = [fi[0] for fi in func_int]
        intervals = [fi[1] for fi in func_int]

        # Join the intervals of all functions to get the intervals where
        # the sum of the functions is strictly monotonous (injective
        # function), therefore we can solve the minimization problem in
        # strictly monotonous areas of the function, avoiding the crash
        # of the algorithm
        interval = intervals[0]
        for i in intervals[1:]:
            interval = interval.union(i)

        # Full allocation function -> function to solve
        def full_allocation(x): return np.sum([func(x) for func in functions])

        def_intervals = []
        for subinterval in interval:
            # Iterate over disjoint interval sections
            # Each interval section will be converted in supply interval
            # and compute the starting point for the supply interval
            # as the midpoint in the priority interval
            def_intervals.append((
                p.closed(
                    full_allocation(getattr(subinterval, int_attr["lower"])),
                    full_allocation(getattr(subinterval, int_attr["upper"]))
                ),
                subinterval,
                .5*(subinterval.upper+subinterval.lower)
            ))

        return functions, full_allocation, def_intervals

    @classmethod
    def get_function_demand(cls, q0, pp):
        """
        Get priority functions for demand based on the priority profile.

        Parameters
        ----------
        q0: float [0, +np.inf)
            The demand of the target.
        pp: numpy.array
            pp values array.

        Returns
        -------
        priority_func: function
            Priority function.
        interval: portion.interval
            The interval where the priority function is strictly monotonous.

        """
        if q0 == 0:
            # No demand is requested return a 0 function with an empty interval
            return lambda x: 0, p.empty()
        if pp[0] == 0:
            # Fixed quantity demand
            return cls.fixed_quantity(q0, *pp[1:])
        elif pp[0] == 1:
            # Rectangular demand
            return cls.rectangular(q0, *pp[1:])
        elif pp[0] == 2:
            # Triangular demand
            return cls.triangular(q0, *pp[1:])
        elif pp[0] == 3:
            # Normal distribution demand
            return cls.normal(q0, *pp[1:])
        elif pp[0] == 4:
            # Exponential distribution demand
            return cls.exponential(q0, *pp[1:])
        elif pp[0] == 5:
            # Constant elasticity demand
            return cls.constant_elasticity_demand(q0, *pp[1:])
        else:
            raise ValueError(
                f"The priority function for pprofile={pp[0]} is not valid.")

    @classmethod
    def get_function_supply(cls, q0, pp):
        """
        Get priority functions for supply based on the priority profile.

        Parameters
        ----------
        q0: float [0, +np.inf)
            The supply of the producer.
        pp: numpy.array
            pp values array.

        Returns
        -------
        priority_func: function
            Priority function.
        interval: portion.interval
            The interval where the priority function is strictly monotonous.

        """
        # TODO: This function should be similar to the demand function
        # it is neccessary for the many-to-many allocation given by
        # the set FIND MARKET PLACE, DEMAND AT PRICE, SUPPLY AT PRICE
        raise NotImplementedError("get_function_supply is not implemented.")

    @staticmethod
    def fixed_quantity(q0, ppriority, pwidth, pextra):
        raise NotImplementedError(
            "fixed_quantity priority profile is not implemented.")

    @staticmethod
    def rectangular(q0, ppriority, pwidth, pextra):
        """
        Demand curve for rectangular shape.
        The supply curve will be shaped as the integral of a rectangle.

        Parameters
        ----------
        q0: float
            The total demand/supply of the element.
        ppriority: float
            Specifies the midpoint of the curve.
        pwidth: float
            Determines the speed with which the curve goes from 0 to
            the specified quantity.
        pextra: float
            Ignore.

        Returns
        -------
        priority_func: function
            The priority function.

        """
        def priority_func(x):
            if x <= ppriority - pwidth*.5:
                return q0
            elif x < ppriority + pwidth*.5:
                return q0*(1-(x-ppriority+pwidth*.5)/pwidth)
            else:
                return 0

        return (
            priority_func,
            p.open(ppriority - pwidth*.5, ppriority + pwidth*.5)
        )

    @staticmethod
    def triangular(q0, ppriority, pwidth, pextra):
        """
        Demand curve for triangular shape.
        The supply curve will be shaped as the integral of a triangle.

        Parameters
        ----------
        q0: float
            The total demand/supply of the element.
        ppriority: float
            Specifies the midpoint of the curve.
        pwidth: float
            Determines the speed with which the curve goes from 0 to the
            specified quantity.
        pextra: float
            Ignore.

        Returns
        -------
        priority_func: function
            The priority function.

        """
        def priority_func(x):
            if x <= ppriority - pwidth*.5:
                return q0
            elif x < ppriority:
                return q0*(1-2*(x-ppriority+pwidth*.5)**2/pwidth**2)
            elif x < ppriority + pwidth*.5:
                return 2*q0*(ppriority+pwidth*.5-x)**2/pwidth**2
            else:
                return 0

        return (
            priority_func,
            p.open(ppriority - pwidth*.5, ppriority + pwidth*.5)
        )

    @staticmethod
    def normal(q0, ppriority, pwidth, pextra):
        """
        Demand curve for normal shape.
        The supply curve will be shaped as the integral of a normal
        distribution.

        Parameters
        ----------
        q0: float
            The total demand/supply of the element.
        ppriority: float
            Specifies the midpoint of the curve (the mean of the
            underlying distribution).
        pwidth: float
            Standard deviation of the underlying distribution.
        pextra: float
            Ignore.

        Returns
        -------
        priority_func: function
            The priority function.

        """
        def priority_func(x):
            return q0*.5*(2-erfc((ppriority-x)/(np.sqrt(2)*pwidth)))

        # Normal distribution CDF is stricty monotonous in (-inf, inf).
        # However, numerically it is only in a the range ~ (-8.29*sd, 8.29*sd)
        return (
            priority_func,
            p.open(
                ppriority-8.2923611*pwidth,
                ppriority+8.2923611*pwidth
            )
        )

    @staticmethod
    def exponential(q0, ppriority, pwidth, pextra):
        """
        Demand curve for exponential shape
        The supply curve will be shaped as the integral of an
        exponential distribution that is symmetric around its mean
        (0.5*exp(-ABS(x-ppriority)/pwidth) on -∞ to ∞).

        Parameters
        ----------
        q0: float
            The total demand/supply of the element.
        ppriority: float
            Specifies the midpoint of the curve (the mean of the
            underlying distribution).
        pwidth: float
            Multiplier on x in the underlying distribution.
        pextra: float
            Ignore.

        Returns
        -------
        priority_func: function
            The priority function.

        """
        def priority_func(x):
            if x < ppriority:
                return q0*(1-.5*np.exp((x-ppriority)/pwidth))
            else:
                return q0*.5*np.exp((ppriority-x)/pwidth)

        # Exponential distribution CDF is stricty monotonous in (-inf, inf).
        # However, numerically it is only in a the range ~ (-36.7*sd, 36.7*sd)
        return (
            priority_func,
            p.open(
                ppriority-36.7368005696*pwidth,
                ppriority+36.7368005696*pwidth
            )
        )

    @staticmethod
    def constant_elasticity_demand(q0, ppriority, pwidth, pextra):
        """
        Demand constant elasticity curve.
        The curve will be a constant elasticity curve.

        Parameters
        ----------
        q0: float
            The total demand/supply of the element.
        ppriority: float
            Specifies the midpoint of the curve (the mean of the
            underlying distribution).
        pwidth: float
            Standard deviation of the underlying distribution.
        pextra: positive float
            Elasticity exponent.

        Returns
        -------
        priority_func: function
            The priority function.

        """
        raise NotImplementedError(
            "Some results for Vensim showed some bugs when using this "
            "priority curve. Therefore, the curve is not implemented in "
            "PySD as it cannot be properly tested."
        )

    @staticmethod
    def constant_elasticity_supply(ppriority, pwidth,
                                   pextra):   # pragma: no cover
        """
        Supply constant elasticity curve.
        The curve will be a constant elasticity curve.

        Parameters
        ----------
        q0: float
            The total demand/supply of the element.
        ppriority: float
            Specifies the midpoint of the curve (the mean of the
            underlying distribution).
        pwidth: float
            Standard deviation of the underlying distribution.
        pextra: positive float
            Elasticity exponent.

        Returns
        -------
        priority_func: function
            The priority function.

        """
        raise NotImplementedError(
            "Some results for Vensim showed some bugs when using this "
            "priority curve. Therefore, the curve is not implemented in "
            "PySD as it cannot be properly tested."
        )


def _allocate_available_1d(request, pp, avail):
    """
    This function implements the algorithm for allocate_available
    to be passed for 1d numpy.arrays. The algorithm works as follows:

    0. If supply > sum(request): return request. In the same way,
       if supply = 0: return request*0
    1. Based on the priority profiles and demands, the priority profiles
       are computed. This profiles are returned with the interval where
       each of them is strictly monotonous (or injective).
    2. Using the intervals of injectivity the initial guess is
       selected depending on the available supply.
    3. The initial guess and injectivity interval are used to compute
       the value where the sum of all priority functions is equal to
       the avilable supply. This porcess is done using a least_squares
       optimization function.
    4. The output from the previous step is used to compute the supply
       to each target.

    Parameters
    ----------
    request: numpy.ndarray (1D)
        The request by target. Values must be non-negative.
    pp: numpy.ndarray (2D)
        The priority profiles of each target.
    avail: float
        The available supply. Must be non-negative.

    Returns
    -------
    out: numpy.ndarray (1D)
        The distribution of the supply.

    """
    if avail >= np.sum(request):
        return request
    if avail == 0:
        return np.zeros_like(request)

    priorities, full_allocation, intervals =\
        Priorities.get_functions(request, pp, "demand")

    for interval, x_interval, x0 in intervals:
        if avail in interval:
            break
    priority = least_squares(
        lambda x: full_allocation(x) - avail,
        x0,
        bounds=(x_interval.lower, x_interval.upper),
        method='dogbox',
        tr_solver='exact',
        ).x[0]

    return [allocate(priority) for allocate in priorities]


[docs]def allocate_available(request, pp, avail): """ Implements Vensim's ALLOCATE AVAILABLE function. https://www.vensim.com/documentation/fn_allocate_available.html Parameters ----------- request: xarray.DataArray Request of each target. Its shape should be the one of the expected output of the function, having the allocation dimension in the last position. pp: xarray.DataArray Priority of each target. Its shape should be the same as request with an extra dimension for the priority profiles in the last position. See Vensim's documentation for more information https://www.vensim.com/documentation/24335.html avail: float or xarray.DataArray The total supply available to fulfill all requests. If the supply exceeds total requests, all requests are filled, but none are overfilled. If you wish to conserve material you must compute supply minus total allocations explicitly. Its shape, should be the same of request without the last dimension. Returns ------- out: xarray.DataArray The distribution of the supply. Warning ------- This function uses an optimization method for resolution and the given solution could differ from the one from Vensim. Particularly, when close to the boundaries of the defined priority profiles. """ if np.any(request < 0): raise ValueError( "There are some negative request values. Ensure that " "your request is always non-negative. Allocation requires " f"all quantities to be positive or 0.\n{request}") if np.any(avail < 0): raise ValueError( f"avail={avail} is not allowed. avail should be non-negative." ) if len(request.shape) == 1: # NUMPY: avoid '.values' and return directly the result of the # function call return xr.DataArray( _allocate_available_1d( request.values, pp.values, avail), request.coords ) # NUMPY: use np.empty_like and remove '.values' out = xr.zeros_like(request, dtype=float) for comb in itertools.product(*[range(i) for i in avail.shape]): out.values[comb] = _allocate_available_1d( request.values[comb], pp.values[comb], avail.values[comb]) return out
def _allocate_by_priority_1d(request, priority, width, supply): """ This function implements the algorithm for allocate_by_priority to be passed for 1d numpy.arrays. The algorithm works as follows: 0. If supply > sum(request): return request. 1. Order the request and priorities from bigger to lower priorities. 2. Compute the 'distances' between the target, the 'distance' is defined as the difference between priorities divided by width multiplied by the target request. If the difference in priorities over width is bigger than 1, set it to 1, having the distance equal to the request. For example, priorities = [10, 9, 7, 2], request = [3, 6, 2.5, 2] and width = 3 will have the following 'distances' vector, distance = [(10-9)/3*3, (9-7)/3*6, 1*2.5] = [1, 4, 2.5] 3. The supply is assigned with linear functions. The fraction (or slope) of supply that a target receives is its total request divided by the request of all the targets that are receiving supply at that point. 4. The supply is assigned from bigger to lower priority. Starts assigning the supply to the first target, when it reaches the quantity of the 'distance,' to the second target, the second target will start receiving its supply. When the second target receives its 'distance' to the third target, the third target will start receiving supply and so on. 5. Each time a target reaches its request or a new target starts receiving supply the slope of each target is computed again. 6. It finishes when all the supply is distributed between targets Parameters ---------- request: numpy.ndarray (1D) The request by target. Values must be non-negative. priority: numpy.ndarray (1D) The priority of each target. width: float The width between priorities. Must be positive. supply: float The available supply. Must be non-negative. Returns ------- out: numpy.ndarray (1D) The distribution of the supply. """ if supply >= np.sum(request): # All targets receive their request return request elif supply == 0: # No supply, all targets receive 0 return np.zeros_like(request) # Remove request 0 targets and order by priority is_0 = request == 0 sort = (-priority[~is_0]).argsort() request = request[~is_0].astype(float)[sort] priority = priority[~is_0][sort] # Create the outputs array out_return = np.zeros_like(is_0, dtype=float) out = np.zeros_like(request, dtype=float) # Compute the distances between target supply and next target start distances = np.full_like(request, np.nan, dtype=float) # last target will have an numpy.nan as distances as there are no # more targets after distances[:-1] = np.minimum(-np.diff(priority)/width, 1)*request[:-1] # Create a vector of the current active targets active = np.zeros_like(request, dtype=bool) active[0] = True # Create a vector of the last activated target c_i = 0 while supply > 0: # Compute the slopes of the active targets of supply slopes = request*active slopes /= np.sum(slopes) # Compute how much supply much be given to any target reach its request dx_next_top = np.nanmin((request-out)[active]/slopes[active]) # Compute how much supply is needed to start next target # (last target will return a numpy.nan) dx_next_start = (distances[c_i]-out[c_i])/slopes[c_i] # Compute where the next change in allocation function will change # this will happen when a target reaches is request, or when # the next target starts or when the supply is totally distributed dx = np.nanmin((dx_next_top, dx_next_start, supply)) # Assing the supply to the targets out += slopes*dx if np.isclose(dx, dx_next_start, rtol=1e-10, atol=1e-16): # A new target will start in the next loop c_i += 1 # Active the next targetif its request is different than 0 active[c_i] = True if dx == dx_next_top: # One or more target have reached their request active[out == request] = False supply -= dx # Return the distributed supply in the original order # adding to it again the request 0 if the where removed out_return[~is_0] = out[sort.argsort()] return out_return
[docs]def allocate_by_priority(request, priority, width, supply): """ Implements Vensim's ALLOCATE BY PRIORITY function. https://www.vensim.com/documentation/fn_allocate_by_priority.html Parameters ----------- request: xarray.DataArray Request of each target. Its shape should be the same as priority. width and supply must have the same shape except the last dimension. priority: xarray.DataArray Priority of each target. Its shape should be the same as request. width and supply must have the same shape except the last dimension. width: float or xarray.DataArray Specifies how big a gap in priority is required to have the allocation go first to higher priority with only leftovers going to lower priority. When the distance between any two priorities exceeds width and the higher priority does not receive its full request the lower priority will receive nothing. Its shape should be the same as supply. supply: float or xarray.DataArray The total supply available to fulfill all requests. If the supply exceeds total requests, all requests are filled, but none are overfilled. If you wish to conserve material you must compute supply minus total allocations explicitly. Its shape should be the same as width. Returns ------- out: xarray.DataArray The distribution of the supply. """ if np.any(request < 0): raise ValueError( "There are some negative request values. Ensure that " "your request is always non-negative. Allocation requires " f"all quantities to be positive or 0.\n{request}") if np.any(width <= 0): raise ValueError( f"width={width} \n is not allowed. width must be greater than 0.") if np.any(supply < 0): raise ValueError( f"supply={supply} \n is not allowed. supply must not be negative.") if len(request.shape) == 1: # NUMPY: avoid '.values' and return directly the result of the # function call return xr.DataArray( _allocate_by_priority_1d( request.values, priority.values, width, supply), request.coords ) # NUMPY: use np.empty_like and remove '.values' out = xr.zeros_like(request, dtype=float) for comb in itertools.product(*[range(i) for i in supply.shape]): out.values[comb] = _allocate_by_priority_1d( request.values[comb], priority.values[comb], width.values[comb], supply.values[comb]) return out