Source code for pysd.builders.python.python_expressions_builder

"""
The translation from Abstract Syntax Tree to Python happens in both ways.
The outer expression is visited with its builder, which will split its
arguments and visit them with their respective builders. Once the lowest
level is reached, it will be translated into Python returning a BuildAST
object, this object will include the python expression, its subscripts,
its calls to other and its arithmetic order (see Build AST for more info).
BuildAST will be returned for each visited argument from the lower
lever to the top level, giving the final expression.
"""
import warnings
from dataclasses import dataclass
from typing import Union

import numpy as np
from pysd.py_backend.utils import compute_shape

from pysd.translators.structures.abstract_expressions import\
    AbstractSyntax, AllocateAvailableStructure, AllocateByPriorityStructure,\
    ArithmeticStructure, CallStructure, DataStructure, DelayFixedStructure,\
    DelayStructure, DelayNStructure, ForecastStructure, GameStructure,\
    GetConstantsStructure, GetDataStructure, GetLookupsStructure,\
    InitialStructure, InlineLookupsStructure, IntegStructure,\
    LogicStructure, LookupsStructure, ReferenceStructure,\
    SampleIfTrueStructure, SmoothNStructure, SmoothStructure,\
    SubscriptsReferenceStructure, TrendStructure

from .python_functions import functionspace
from .subscripts import SubscriptManager


[docs]@dataclass class BuildAST: """ Python expression holder. Parameters ---------- expression: str The Python expression. calls: dict The calls to other variables for the dependencies dictionary. subscripts: dict The subscripts dict of the expression. order: int Arithmetic order of the expression. The arithmetic order depends on the last arithmetic operation. If the expression is a number, a call to a function, or is between parenthesis; its order will be 0. If the expression its an exponential of two terms its order will be 1. If the expression is a product or division its order will be 2. If the expression is a sum or substraction its order will be 3. If the expression is a logical comparison its order will be 4. """ expression: str calls: dict subscripts: dict order: int def __str__(self) -> str: # makes easier building return self.expression
[docs] def reshape(self, subscripts: SubscriptManager, final_subscripts: dict, final_element: bool = False) -> None: """ Reshape the object to the desired subscripts. It will modify the expression and lower the order if it is not 0. Parameters ---------- subscripts: SubscriptManager The subscripts of the section. final_subscripts: dict The desired final subscripts. final_element: bool (optional) If True the array will be reshaped with the final subscripts to have the shame shape. Otherwise, a length 1 dimension will be included in the position to allow arithmetic operations with other arrays. Default is False. """ if not final_subscripts or ( self.subscripts == final_subscripts and list(self.subscripts) == list(final_subscripts)): # Same dictionary in the same order, do nothing pass elif not self.subscripts: # Original expression is not an array # NUMPY: object.expression = np.full(%s, %(shape)s) subscripts_out = subscripts.simplify_subscript_input( final_subscripts)[1] self.expression = "xr.DataArray(%s, %s, %s)" % ( self.expression, subscripts_out, list(final_subscripts) ) self.order = 0 self.subscripts = final_subscripts else: # Original expression is an array self.lower_order(-1) # Reorder subscrips final_order = { sub: self.subscripts[sub] for sub in final_subscripts if sub in self.subscripts } if list(final_order) != list(self.subscripts): # NUMPY: reorder dims if neccessary with np.moveaxis or similar self.expression +=\ f".transpose({', '.join(map(repr, final_order))})" self.subscripts = final_order # add new dimensions if final_element and final_subscripts != self.subscripts: # NUMPY: remove final_element condition from top # NUMPY: add new axis with [:, None, :] # NUMPY: move final_element condition here and use np.tile for i, dim in enumerate(final_subscripts): if dim not in self.subscripts: subscripts_out = subscripts.simplify_subscript_input( {dim: final_subscripts[dim]})[1] self.expression +=\ f".expand_dims({subscripts_out}, {i})" self.subscripts = final_subscripts
[docs] def lower_order(self, new_order: int) -> None: """ Lower the order to maintain the correct order in arithmetic operations. If the requested order is smaller than the current order parenthesis will be added to the expression to lower its order to 0. Parameters ---------- new_order: int The required new order of the expression. If 0 it will be assumed that the expression will be passed as an argument of a function and therefore no operations will be done. If order 0 is required, a negative value can be used for new_order. """ if self.order >= new_order and self.order != 0 and new_order != 0: # if current operator order is 0 do not need to do anything # if the order of operations conflicts add parenthesis # if new order is 0 do not need to do anything, as it may be # an argument to a function. To force the 0 order a negative # value can be used, which will force the parenthesis # (necessary to reshape some arrays) self.expression = "(%s)" % self.expression self.order = 0
[docs]class StructureBuilder: """ Main builder for Abstract Syntax Tree structures. All the builders are children of this class, which allows them inheriting the methods. """ def __init__(self, value: object, component: object): # component typing should be ComponentBuilder, but importing it # for typing would create a circular dependency :S self.value = value self.arguments = {} self.component = component self.element = component.element self.section = component.section self.def_subs = component.subscripts_dict
[docs] @staticmethod def join_calls(arguments: dict) -> dict: """ Merge the calls of the arguments. Parameters ---------- arguments: dict The dictionary of arguments. The keys should br strings of ordered integer numbers starting from 0. Returns ------- calls: dict The merged dictionary of calls. """ if len(arguments) == 0: # No arguments return {} elif len(arguments) == 1: # Only one argument return list(arguments.values())[0].calls else: # Several arguments return merge_dependencies( *[val.calls for val in arguments.values()])
[docs] def reorder(self, arguments: dict, force: bool = None) -> dict: """ Reorder the subscripts of the arguments to make them match. Parameters ---------- arguments: dict The dictionary of arguments. The keys should br strings of ordered integer numbers starting from 0. force: 'component', 'equal', or None (optional) If force is 'component' it will force the arguments to have the subscripts of the component definition. If force is 'equal' it will force all the arguments to have the same subscripts, includying the floats. If force is None, it will only modify the shape of the arrays adding length 1 dimensions to allow operation between different shape arrays. Default is None. Returns ------- final_subscripts: dict The final_subscripts after reordering all the elements. """ if force == "component": final_subscripts = self.def_subs or {} else: final_subscripts = self.get_final_subscripts(arguments) [arguments[key].reshape( self.section.subscripts, final_subscripts, bool(force)) for key in arguments if arguments[key].subscripts or force == "equal"] return final_subscripts
[docs] def get_final_subscripts(self, arguments: dict) -> dict: """ Get the final subscripts of a combination of arguments. Parameters ---------- arguments: dict The dictionary of arguments. The keys should br strings of ordered integer numbers starting from 0. Returns ------- final_subscripts: dict The final_subscripts of combining all the elements. """ if len(arguments) == 0: return {} elif len(arguments) == 1: return arguments["0"].subscripts else: return self._compute_final_subscripts( [arg.subscripts for arg in arguments.values()])
def _compute_final_subscripts(self, subscripts_list: list) -> dict: """ Compute final subscripts from a list of subscript dictionaries. Parameters ---------- subscript_list: list of dicts List of subscript dictionaries. """ expression = {} [expression.update(subscript) for subscript in subscripts_list if subscript] # TODO reorder final_subscripts taking into account def_subs # this way try to minimize the reordering operations return expression
[docs] def update_object_subscripts(self, name: str, component_final_subs: dict) -> None: """ Update the object subscripts. Needed for those objects that use 'add' method to load several components at once and mixed definitions are used. Parameters ---------- name: str The name of the object in the objects dictionary from the element. component_final_subs: dict The subscripts of the component but with the element subscript ranges as keys. This can differ from the component subscripts when the component is defined with subranges of the final subscript ranges. """ # Get the component used to define the object first time origin_comp = self.element.objects[name]["component"] # The original component subscript dictionary is a list origin_comp.subscripts_dict.append(component_final_subs)
[docs]class OperationBuilder(StructureBuilder): """Builder for arithmetic and logical operations.""" _operators_build = { "^": ("%(left)s**%(right)s", None, 1), "*": ("%(left)s*%(right)s", None, 2), "/": ("%(left)s/%(right)s", None, 2), "+": ("%(left)s + %(right)s", None, 3), "-": ("%(left)s - %(right)s", None, 3), "=": ("%(left)s == %(right)s", None, 4), "<>": ("%(left)s != %(right)s", None, 4), ">=": ("%(left)s >= %(right)s", None, 4), ">": ("%(left)s > %(right)s", None, 4), "<=": ("%(left)s <= %(right)s", None, 4), "<": ("%(left)s < %(right)s", None, 4), ":NOT:": ("np.logical_not(%s)", ("numpy",), 0), ":AND:": ("np.logical_and(%(left)s, %(right)s)", ("numpy",), 0), ":OR:": ("np.logical_or(%(left)s, %(right)s)", ("numpy",), 0), "negative": ("-%s", None, 3), } def __init__(self, operation: Union[ArithmeticStructure, LogicStructure], component: object): super().__init__(None, component) self.operators = operation.operators.copy() self.arguments = { str(i): arg for i, arg in enumerate(operation.arguments)}
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ operands = {} calls = self.join_calls(arguments) final_subscripts = self.reorder(arguments) arguments = [arguments[str(i)] for i in range(len(arguments))] dependencies, order = self._operators_build[self.operators[-1]][1:] if dependencies: # Add necessary dependencies to the imports self.section.imports.add(*dependencies) if self.operators[-1] == "^": # Right side of the exponential can be from higher order arguments[-1].lower_order(2) else: arguments[-1].lower_order(order) if len(arguments) == 1: # not and negative operations (only 1 element) if self.operators[0] == "negative": order = 1 expression = self._operators_build[self.operators[0]][0] return BuildAST( expression=expression % arguments[0], calls=calls, subscripts=final_subscripts, order=order) # Add the arguments to the expression with the operator, # they are built from right to left # Get the last argument as the RHS of the first operation operands["right"] = arguments.pop() while arguments or self.operators: # Get the operator and the LHS of the operation expression = self._operators_build[self.operators.pop()][0] operands["left"] = arguments.pop() # Lower the order of the LHS if neccessary operands["left"].lower_order(order) # Include the operation in the RHS for next iteration operands["right"] = expression % operands return BuildAST( expression=operands["right"], calls=calls, subscripts=final_subscripts, order=order)
[docs]class GameBuilder(StructureBuilder): """Builder for GAME expressions.""" def __init__(self, game_str: GameStructure, component: object): super().__init__(None, component) self.arguments = {"expr": game_str.expression}
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ # Game calls are ignored as we have no support for a similar # feature, we simply return the content inside the GAME call return arguments["expr"]
[docs]class CallBuilder(StructureBuilder): """Builder for calls to functions, macros and lookups.""" def __init__(self, call_str: CallStructure, component: object): super().__init__(None, component) function_name = call_str.function.reference self.arguments = { str(i): arg for i, arg in enumerate(call_str.arguments)} if function_name in self.section.macrospace: # Build macro self.macro_name = function_name self.build = self.build_macro_call elif function_name in self.section.namespace.cleanspace: # Build lookupcall self.arguments["function"] = call_str.function self.build = self.build_lookups_call elif function_name in functionspace: # Build direct function self.function = function_name self.build = self.build_function_call elif function_name == "a_function_of": # Build incomplete function self.build = self.build_incomplete_call else: # Build missing function self.function = function_name self.build = self.build_not_implemented
[docs] def build_not_implemented(self, arguments: dict) -> BuildAST: """ Build method for not implemented function calls. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ final_subscripts = self.reorder(arguments) warnings.warn( "\n\nTrying to translate '" + self.function.upper().replace("_", " ") + "' which it is not implemented on PySD. The translated " + "model will crash... " ) self.section.imports.add("functions", "not_implemented_function") return BuildAST( expression="not_implemented_function('%s', %s)" % ( self.function, ", ".join(arg.expression for arg in arguments.values())), calls=self.join_calls(arguments), subscripts=final_subscripts, order=0)
[docs] def build_incomplete_call(self, arguments: dict) -> BuildAST: """ Build method for incomplete function calls. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ warnings.warn( "'%s' has no equation specified" % self.element.name, SyntaxWarning, stacklevel=2 ) self.section.imports.add("functions", "incomplete") return BuildAST( expression="incomplete(%s)" % ", ".join( arg.expression for arg in arguments.values()), calls=self.join_calls(arguments), subscripts=self.def_subs, order=0)
[docs] def build_macro_call(self, arguments: dict) -> BuildAST: """ Build method for macro calls. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.section.imports.add("model", "Macro") # Get macro from macrospace macro = self.section.macrospace[self.macro_name] calls = self.join_calls(arguments) final_subscripts = self.reorder(arguments) arguments["name"] = self.section.namespace.make_python_identifier( self.macro_name + "_" + self.element.identifier, prefix="_macro") arguments["file"] = macro.path.name arguments["macro_name"] = macro.name arguments["args"] = "{%s}" % ", ".join([ "'%s': lambda: %s" % (key, val) for key, val in zip(macro.params, arguments.values()) ]) # Create Macro object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = Macro(_root.joinpath('%(file)s'), " "%(args)s, '%(macro_name)s', " "time_initialization=lambda: __data['time'], " "py_name='%(name)s')" % arguments, } # Add other_dependencies self.element.other_dependencies[arguments["name"]] = { "initial": calls, "step": calls } return BuildAST( expression="%s()" % arguments["name"], calls={arguments["name"]: 1}, subscripts=final_subscripts, order=0)
[docs] def build_lookups_call(self, arguments: dict) -> BuildAST: """ Build method for loookups calls. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ if arguments["0"].subscripts: # Build lookups with subcripted arguments # it is neccessary to give the final subscripts information # in the call to rearrange it correctly final_subscripts =\ self.get_final_subscripts(arguments) expression = arguments["function"].expression.replace( "()", f"(%(0)s, {final_subscripts})") else: # Build lookups with float arguments final_subscripts = arguments["function"].subscripts expression = arguments["function"].expression.replace( "()", "(%(0)s)") # NUMPY: we need to manage inside lookup with subscript and later # return the values in a correct ndarray return BuildAST( expression=expression % arguments, calls=self.join_calls(arguments), subscripts=final_subscripts, order=0)
[docs] def build_function_call(self, arguments: dict) -> BuildAST: """ Build method for function calls. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ # Get the function expression from the functionspace expression, modules = functionspace[self.function] if modules: # Update module dependencies in imports self.section.imports.add(*modules) calls = self.join_calls(arguments) if "__data['time']" in expression: # If the expression depens on time add to the dependencies merge_dependencies(calls, {"time": 1}, inplace=True) if "%(axis)s" in expression: # Vectorial expressions, compute the axis using dimensions # with ! operator if "%(1)s" in expression: subs = self.reorder(arguments) # NUMPY: following line may be avoided [arguments[i].reshape(self.section.subscripts, subs, True) for i in ["0", "1"]] else: subs = arguments["0"].subscripts final_subscripts, arguments["axis"] = self._compute_axis(subs) elif "%(size)s" in expression: # Random expressions, need to give the final size of the # component to create one value per final coordinate final_subscripts = self.reorder(arguments, force="component") arguments["size"] = tuple(compute_shape(final_subscripts)) if arguments["size"]: # Create an xarray from the random function output # NUMPY: not necessary # generate an xarray from the output subs = self.section.subscripts.simplify_subscript_input( self.def_subs)[1] expression = f"xr.DataArray({expression}, {subs}, "\ f"{list(self.def_subs)})" elif self.function == "active_initial": # Ee need to ensure that active initial outputs are always the # same and update dependencies as stateful object name = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_active_initial") final_subscripts = self.reorder(arguments, force="equal") self.element.other_dependencies[name] = { "initial": arguments["1"].calls, "step": arguments["0"].calls } calls = {name: 1} else: final_subscripts = self.reorder(arguments) if self.function == "xidz" and final_subscripts: # xidz must always return the same shape object if not arguments["1"].subscripts: [arguments[i].reshape( self.section.subscripts, final_subscripts, True) for i in ["0", "1"]] elif arguments["0"].subscripts or arguments["2"].subscripts: # NUMPY: not need this statement [arguments[i].reshape( self.section.subscripts, final_subscripts, True) for i in ["0", "1", "2"] if arguments[i].subscripts] elif self.function == "zidz" and final_subscripts: # zidz must always return the same shape object arguments["0"].reshape( self.section.subscripts, final_subscripts, True) if arguments["1"].subscripts: # NUMPY: not need this statement arguments["1"].reshape( self.section.subscripts, final_subscripts, True) elif self.function == "if_then_else" and final_subscripts: # if_then_else must always return the same shape object if not arguments["0"].subscripts: # condition is a float [arguments[i].reshape( self.section.subscripts, final_subscripts, True) for i in ["1", "2"]] else: # condition has dimensions [arguments[i].reshape( self.section.subscripts, final_subscripts, True) for i in ["0", "1", "2"]] return BuildAST( expression=expression % arguments, calls=calls, subscripts=final_subscripts, order=0)
def _compute_axis(self, subscripts: dict) -> tuple: """ Compute the axis to apply a vectorial function. Parameters ---------- subscripts: dict The final_subscripts after reordering all the elements. Returns ------- coords: dict The final coordinates after executing the vectorial function axis: list The list of dimensions to apply the function. Uses the dimensions with "!" at the end. """ axis = [] coords = {} for subs in subscripts: if subs.endswith("!"): # dimensions to apply along axis.append(subs) else: # dimensions remaining coords[subs] = subscripts[subs] return coords, axis
[docs]class AllocateAvailableBuilder(StructureBuilder): """Builder for allocate_available function.""" def __init__(self, allocate_str: AllocateAvailableStructure, component: object): super().__init__(None, component) pp = allocate_str.pp pp_sub = self.section.subscripts.elements[pp.reference][-1:] pp.subscripts.subscripts = pp.subscripts.subscripts[:-1] + pp_sub self.arguments = { "request": allocate_str.request, "pp": pp, "avail": allocate_str.avail }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.section.imports.add("allocation", "allocate_available") calls = self.join_calls(arguments) # the last sub of the request must be keep last sub of request and # priority last_sub = list(arguments["request"].subscripts)[-1] pp_sub = list(arguments["pp"].subscripts)[-1] # compute the merged subscripts final_subscripts = self.get_final_subscripts(arguments) # remove last sub from request last_sub_value = final_subscripts[last_sub] pp_sub_value = final_subscripts[pp_sub] del final_subscripts[last_sub], final_subscripts[pp_sub] # Update the susbcripts of avail arguments["avail"].reshape( self.section.subscripts, final_subscripts, True) # Include last sub of request in the last position and update # the subscripts of request final_subscripts[last_sub] = last_sub_value arguments["request"].reshape( self.section.subscripts, final_subscripts, True) # Include priority subscripts and update the subscripts of pp final_subscripts[pp_sub] = pp_sub_value arguments["pp"].reshape( self.section.subscripts, final_subscripts, True) expression = "allocate_available(%(request)s, %(pp)s, %(avail)s)" return BuildAST( expression=expression % arguments, calls=calls, subscripts=arguments["request"].subscripts, order=0)
[docs]class AllocateByPriorityBuilder(StructureBuilder): """Builder for allocate_by_priority function.""" def __init__(self, allocate_str: AllocateByPriorityStructure, component: object): super().__init__(None, component) self.arguments = { "request": allocate_str.request, "priority": allocate_str.priority, "width": allocate_str.width, "supply": allocate_str.supply }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.section.imports.add("allocation", "allocate_by_priority") calls = self.join_calls(arguments) # the last sub of the request must be keep last sub of request and # priority last_sub = list(arguments["request"].subscripts)[-1] # compute the merged subscripts final_subscripts = self.get_final_subscripts(arguments) # remove last sub from request last_sub_value = final_subscripts[last_sub] del final_subscripts[last_sub] # Update the susbcripts of width and supply arguments["width"].reshape( self.section.subscripts, final_subscripts, True) arguments["supply"].reshape( self.section.subscripts, final_subscripts, True) # Include last sub of request in the last position and update # the subscripts of request and priority final_subscripts[last_sub] = last_sub_value arguments["request"].reshape( self.section.subscripts, final_subscripts, True) arguments["priority"].reshape( self.section.subscripts, final_subscripts, True) expression = "allocate_by_priority(%(request)s, %(priority)s, "\ "%(width)s, %(supply)s)" return BuildAST( expression=expression % arguments, calls=calls, subscripts=arguments["request"].subscripts, order=0)
[docs]class ExtLookupBuilder(StructureBuilder): """Builder for External Lookups.""" def __init__(self, getlookup_str: GetLookupsStructure, component: object): super().__init__(None, component) self.file = getlookup_str.file self.tab = getlookup_str.tab self.x_row_or_col = getlookup_str.x_row_or_col self.cell = getlookup_str.cell self.arguments = {}
[docs] def build(self, arguments: dict) -> Union[BuildAST, None]: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST or None The built object, unless the component has been added to an existing object using the 'add' method. """ self.component.type = "Lookup" self.component.subtype = "External" arguments["params"] = "'%s', '%s', '%s', '%s'" % ( self.file, self.tab, self.x_row_or_col, self.cell ) final_subs, arguments["subscripts"] =\ self.section.subscripts.simplify_subscript_input( self.def_subs, self.element.subscripts) if "ext_lookups" in self.element.objects: # Object already exists, use 'add' method self.element.objects["ext_lookups"]["expression"] += "\n\n"\ + self.element.objects["ext_lookups"]["name"]\ + ".add(%(params)s, %(subscripts)s)" % arguments self.update_object_subscripts("ext_lookups", final_subs) return None else: # Create a new object self.section.imports.add("external", "ExtLookup") arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_ext_lookup") arguments["final_subs"] =\ self.section.subscripts.simplify_subscript_input( self.element.subs_dict)[1] self.component.subscripts_dict = [final_subs] self.element.objects["ext_lookups"] = { "name": arguments["name"], "expression": "%(name)s = ExtLookup(%(params)s, " "%(subscripts)s, _root, " "%(final_subs)s , '%(name)s')" % arguments, "component": self.component, "final_subs": final_subs } return BuildAST( expression=arguments["name"] + "(x, final_subs)", calls={ "__external__": arguments["name"], "__lookup__": arguments["name"] }, subscripts=final_subs, order=0)
[docs]class ExtDataBuilder(StructureBuilder): """Builder for External Data.""" def __init__(self, getdata_str: GetDataStructure, component: object): super().__init__(None, component) self.file = getdata_str.file self.tab = getdata_str.tab self.time_row_or_col = getdata_str.time_row_or_col self.cell = getdata_str.cell self.keyword = component.keyword self.arguments = {}
[docs] def build(self, arguments: dict) -> Union[BuildAST, None]: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST or None The built object, unless the component has been added to an existing object using the 'add' method. """ self.component.type = "Data" self.component.subtype = "External" arguments["params"] = "'%s', '%s', '%s', '%s'" % ( self.file, self.tab, self.time_row_or_col, self.cell ) final_subs, arguments["subscripts"] =\ self.section.subscripts.simplify_subscript_input( self.def_subs, self.element.subscripts) arguments["method"] = "'%s'" % self.keyword if self.keyword else None if "ext_data" in self.element.objects: # Object already exists, use add method self.element.objects["ext_data"]["expression"] += "\n\n"\ + self.element.objects["ext_data"]["name"]\ + ".add(%(params)s, %(method)s, %(subscripts)s)" % arguments self.update_object_subscripts("ext_data", final_subs) return None else: # Create a new object self.section.imports.add("external", "ExtData") arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_ext_data") arguments["final_subs"] =\ self.section.subscripts.simplify_subscript_input( self.element.subs_dict)[1] self.component.subscripts_dict = [final_subs] self.element.objects["ext_data"] = { "name": arguments["name"], "expression": "%(name)s = ExtData(%(params)s, " " %(method)s, %(subscripts)s, " "_root, %(final_subs)s ,'%(name)s')" % arguments, "component": self.component, "final_subs": final_subs } return BuildAST( expression=arguments["name"] + "(time())", calls={ "__external__": arguments["name"], "__data__": arguments["name"], "time": 1}, subscripts=final_subs, order=0)
[docs]class ExtConstantBuilder(StructureBuilder): """Builder for External Constants.""" def __init__(self, getconstant_str: GetConstantsStructure, component: object): super().__init__(None, component) self.file = getconstant_str.file self.tab = getconstant_str.tab self.cell = getconstant_str.cell self.arguments = {}
[docs] def build(self, arguments: dict) -> Union[BuildAST, None]: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST or None The built object, unless the component has been added to an existing object using the 'add' method. """ self.component.type = "Constant" self.component.subtype = "External" arguments["params"] = "'%s', '%s', '%s'" % ( self.file, self.tab, self.cell ) final_subs, arguments["subscripts"] =\ self.section.subscripts.simplify_subscript_input( self.def_subs, self.element.subscripts) if "constants" in self.element.objects: # Object already exists, use 'add' method self.element.objects["constants"]["expression"] += "\n\n"\ + self.element.objects["constants"]["name"]\ + ".add(%(params)s, %(subscripts)s)" % arguments self.update_object_subscripts("constants", final_subs) return None else: # Create a new object self.section.imports.add("external", "ExtConstant") arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_ext_constant") arguments["final_subs"] =\ self.section.subscripts.simplify_subscript_input( self.element.subs_dict)[1] self.component.subscripts_dict = [final_subs] self.element.objects["constants"] = { "name": arguments["name"], "expression": "%(name)s = ExtConstant(%(params)s, " "%(subscripts)s, _root, %(final_subs)s, " "'%(name)s')" % arguments, "component": self.component, "final_subs": final_subs } return BuildAST( expression=arguments["name"] + "()", calls={"__external__": arguments["name"]}, subscripts=final_subs, order=0)
[docs]class TabDataBuilder(StructureBuilder): """Builder for empty DATA expressions.""" def __init__(self, data_str: DataStructure, component: object): super().__init__(None, component) self.keyword = component.keyword self.arguments = {}
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.section.imports.add("data", "TabData") final_subs, arguments["subscripts"] =\ self.section.subscripts.simplify_subscript_input( self.def_subs, self.element.subscripts) arguments["real_name"] = self.element.name arguments["py_name"] =\ self.section.namespace.namespace[self.element.name] arguments["method"] = "'%s'" % self.keyword if self.keyword else None arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_data") # Create TabData object self.element.objects["tab_data"] = { "name": arguments["name"], "expression": "%(name)s = TabData('%(real_name)s', '%(py_name)s', " "%(subscripts)s, %(method)s)" % arguments } return BuildAST( expression=arguments["name"] + "(time())", calls={"time": 1, "__data__": arguments["name"]}, subscripts=final_subs, order=0)
[docs]class InitialBuilder(StructureBuilder): """Builder for Initials.""" def __init__(self, initial_str: InitialStructure, component: object): super().__init__(None, component) self.arguments = { "initial": initial_str.initial }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Stateful" self.component.subtype = "Initial" self.section.imports.add("statefuls", "Initial") arguments["initial"].reshape( self.section.subscripts, self.def_subs, True) arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_initial") # Create the object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = Initial(lambda: %(initial)s, " "'%(name)s')" % arguments, } # Add other-dependencies self.element.other_dependencies[arguments["name"]] = { "initial": arguments["initial"].calls, "step": {} } return BuildAST( expression=arguments["name"] + "()", calls={arguments["name"]: 1}, subscripts=self.def_subs, order=0)
[docs]class IntegBuilder(StructureBuilder): """Builder for Integs/Stocks.""" def __init__(self, integ_str: IntegStructure, component: object): super().__init__(None, component) self.arguments = { "flow": integ_str.flow, "initial": integ_str.initial }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Stateful" self.component.subtype = "Integ" self.section.imports.add("statefuls", "Integ") arguments["initial"].reshape( self.section.subscripts, self.def_subs, True) arguments["flow"].reshape( self.section.subscripts, self.def_subs, True) arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_integ") # Create the object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = Integ(lambda: %(flow)s, " "lambda: %(initial)s, '%(name)s')" % arguments } # Add other dependencies self.element.other_dependencies[arguments["name"]] = { "initial": arguments["initial"].calls, "step": arguments["flow"].calls } return BuildAST( expression=arguments["name"] + "()", calls={arguments["name"]: 1}, subscripts=self.def_subs, order=0)
[docs]class DelayBuilder(StructureBuilder): """Builder for regular Delays.""" def __init__(self, dtype: str, delay_str: Union[DelayStructure, DelayNStructure], component: object): super().__init__(None, component) self.arguments = { "input": delay_str.input, "delay_time": delay_str.delay_time, "initial": delay_str.initial, "order": delay_str.order } self.dtype = dtype
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Stateful" self.component.subtype = "Delay" self.section.imports.add("statefuls", self.dtype) arguments["input"].reshape( self.section.subscripts, self.def_subs, True) arguments["delay_time"].reshape( self.section.subscripts, self.def_subs, True) arguments["initial"].reshape( self.section.subscripts, self.def_subs, True) arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix=f"_{self.dtype.lower()}") arguments["dtype"] = self.dtype # Add the object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = %(dtype)s(lambda: %(input)s, " "lambda: %(delay_time)s, lambda: %(initial)s, " "lambda: %(order)s, " "time_step, '%(name)s')" % arguments, } # Add other dependencies self.element.other_dependencies[arguments["name"]] = { "initial": merge_dependencies( arguments["initial"].calls, arguments["delay_time"].calls, arguments["order"].calls), "step": merge_dependencies( arguments["input"].calls, arguments["delay_time"].calls) } return BuildAST( expression=arguments["name"] + "()", calls={arguments["name"]: 1}, subscripts=self.def_subs, order=0)
[docs]class DelayFixedBuilder(StructureBuilder): """Builder for Delay Fixed.""" def __init__(self, delay_str: DelayFixedStructure, component: object): super().__init__(None, component) self.arguments = { "input": delay_str.input, "delay_time": delay_str.delay_time, "initial": delay_str.initial, }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Stateful" self.component.subtype = "DelayFixed" self.section.imports.add("statefuls", "DelayFixed") arguments["input"].reshape( self.section.subscripts, self.def_subs, True) arguments["initial"].reshape( self.section.subscripts, self.def_subs, True) arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_delayfixed") # Create object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = DelayFixed(lambda: %(input)s, " "lambda: %(delay_time)s, lambda: %(initial)s, " "time_step, '%(name)s')" % arguments, } # Add other dependencies self.element.other_dependencies[arguments["name"]] = { "initial": merge_dependencies( arguments["initial"].calls, arguments["delay_time"].calls), "step": arguments["input"].calls } return BuildAST( expression=arguments["name"] + "()", calls={arguments["name"]: 1}, subscripts=self.def_subs, order=0)
[docs]class SmoothBuilder(StructureBuilder): """Builder for Smooths.""" def __init__(self, smooth_str: Union[SmoothStructure, SmoothNStructure], component: object): super().__init__(None, component) self.arguments = { "input": smooth_str.input, "smooth_time": smooth_str.smooth_time, "initial": smooth_str.initial, "order": smooth_str.order }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Stateful" self.component.subtype = "Smooth" self.section.imports.add("statefuls", "Smooth") arguments["input"].reshape( self.section.subscripts, self.def_subs, True) arguments["smooth_time"].reshape( self.section.subscripts, self.def_subs, True) arguments["initial"].reshape( self.section.subscripts, self.def_subs, True) arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_smooth") # TODO in the future we need to ad timestep to show warnings about # the smooth time as its done with delays (see vensim help for smooth) # TODO in the future we may want to have 2 py_backend classes for # smooth as the behaviour is different for SMOOTH and SMOOTH N when # using RingeKutta scheme # Create object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = Smooth(lambda: %(input)s, " "lambda: %(smooth_time)s, lambda: %(initial)s, " "lambda: %(order)s, '%(name)s')" % arguments, } # Add other dependencies self.element.other_dependencies[arguments["name"]] = { "initial": merge_dependencies( arguments["initial"].calls, arguments["smooth_time"].calls, arguments["order"].calls), "step": merge_dependencies( arguments["input"].calls, arguments["smooth_time"].calls) } return BuildAST( expression=arguments["name"] + "()", calls={arguments["name"]: 1}, subscripts=self.def_subs, order=0)
[docs]class TrendBuilder(StructureBuilder): """Builder for Trends.""" def __init__(self, trend_str: TrendStructure, component: object): super().__init__(None, component) self.arguments = { "input": trend_str.input, "average_time": trend_str.average_time, "initial_trend": trend_str.initial_trend, }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Stateful" self.component.subtype = "Trend" self.section.imports.add("statefuls", "Trend") arguments["input"].reshape( self.section.subscripts, self.def_subs, True) arguments["average_time"].reshape( self.section.subscripts, self.def_subs, True) arguments["initial_trend"].reshape( self.section.subscripts, self.def_subs, True) arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_trend") # Create object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = Trend(lambda: %(input)s, " "lambda: %(average_time)s, " "lambda: %(initial_trend)s, " "'%(name)s')" % arguments, } # Add other dependencies self.element.other_dependencies[arguments["name"]] = { "initial": merge_dependencies( arguments["initial_trend"].calls, arguments["input"].calls, arguments["average_time"].calls), "step": merge_dependencies( arguments["input"].calls, arguments["average_time"].calls) } return BuildAST( expression=arguments["name"] + "()", calls={arguments["name"]: 1}, subscripts=self.def_subs, order=0)
[docs]class ForecastBuilder(StructureBuilder): """Builder for Forecasts.""" def __init__(self, forecast_str: ForecastStructure, component: object): super().__init__(None, component) self.arguments = { "input": forecast_str.input, "average_time": forecast_str.average_time, "horizon": forecast_str.horizon, "initial_trend": forecast_str.initial_trend }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Stateful" self.component.subtype = "Forecast" self.section.imports.add("statefuls", "Forecast") arguments["input"].reshape( self.section.subscripts, self.def_subs, True) arguments["average_time"].reshape( self.section.subscripts, self.def_subs, True) arguments["horizon"].reshape( self.section.subscripts, self.def_subs, True) arguments["initial_trend"].reshape( self.section.subscripts, self.def_subs, True) arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_forecast") # Create object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = Forecast(lambda: %(input)s, " "lambda: %(average_time)s, lambda: %(horizon)s, " "lambda: %(initial_trend)s, '%(name)s')" % arguments, } # Add other dependencies self.element.other_dependencies[arguments["name"]] = { "initial": merge_dependencies( arguments["input"].calls, arguments["initial_trend"].calls), "step": merge_dependencies( arguments["input"].calls, arguments["average_time"].calls, arguments["horizon"].calls) } return BuildAST( expression=arguments["name"] + "()", calls={arguments["name"]: 1}, subscripts=self.def_subs, order=0)
[docs]class SampleIfTrueBuilder(StructureBuilder): """Builder for Sample If True.""" def __init__(self, sampleiftrue_str: SampleIfTrueStructure, component: object): super().__init__(None, component) self.arguments = { "condition": sampleiftrue_str.condition, "input": sampleiftrue_str.input, "initial": sampleiftrue_str.initial, }
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Stateful" self.component.subtype = "SampleIfTrue" self.section.imports.add("statefuls", "SampleIfTrue") arguments["condition"].reshape( self.section.subscripts, self.def_subs, True) arguments["input"].reshape( self.section.subscripts, self.def_subs, True) arguments["initial"].reshape( self.section.subscripts, self.def_subs, True) arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_sampleiftrue") # Create object self.element.objects[arguments["name"]] = { "name": arguments["name"], "expression": "%(name)s = SampleIfTrue(lambda: %(condition)s, " "lambda: %(input)s, lambda: %(initial)s, " "'%(name)s')" % arguments, } # Add other dependencies self.element.other_dependencies[arguments["name"]] = { "initial": arguments["initial"].calls, "step": merge_dependencies( arguments["condition"].calls, arguments["input"].calls) } return BuildAST( expression=arguments["name"] + "()", calls={arguments["name"]: 1}, subscripts=self.def_subs, order=0)
[docs]class LookupsBuilder(StructureBuilder): """Builder for regular Lookups.""" def __init__(self, lookups_str: LookupsStructure, component: object): super().__init__(None, component) self.arguments = {} self.x = lookups_str.x self.y = lookups_str.y self.keyword = lookups_str.type
[docs] def build(self, arguments: dict) -> Union[BuildAST, None]: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST or None The built object, unless the component has been added to an existing object using the 'add' method. """ self.component.type = "Lookup" self.component.subtype = "Normal" # Get the numeric values as numpy arrays arguments["x"] = np.array2string( np.array(self.x), separator=",", threshold=len(self.x) ) arguments["y"] = np.array2string( np.array(self.y), separator=",", threshold=len(self.y) ) arguments["subscripts"] = self.def_subs arguments["interp"] = self.keyword if "hardcoded_lookups" in self.element.objects: # Object already exists, use 'add' method self.element.objects["hardcoded_lookups"]["expression"] += "\n\n"\ + self.element.objects["hardcoded_lookups"]["name"]\ + ".add(%(x)s, %(y)s, %(subscripts)s)" % arguments return None else: # Create a new object self.section.imports.add("lookups", "HardcodedLookups") arguments["name"] = self.section.namespace.make_python_identifier( self.element.identifier, prefix="_hardcodedlookup") arguments["final_subs"] =\ self.section.subscripts.simplify_subscript_input( self.element.subs_dict)[1] self.element.objects["hardcoded_lookups"] = { "name": arguments["name"], "expression": "%(name)s = HardcodedLookups(%(x)s, %(y)s, " "%(subscripts)s, '%(interp)s', " "%(final_subs)s, '%(name)s')" % arguments, "final_subs": self.element.subs_dict } return BuildAST( expression=arguments["name"] + "(x, final_subs)", calls={"__lookup__": arguments["name"]}, subscripts=self.def_subs, order=0)
[docs]class InlineLookupsBuilder(StructureBuilder): """Builder for inline Lookups.""" def __init__(self, inlinelookups_str: InlineLookupsStructure, component: object): super().__init__(None, component) self.arguments = { "value": inlinelookups_str.argument } self.lookups = inlinelookups_str.lookups
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.component.type = "Auxiliary" self.component.subtype = "with Lookup" self.section.imports.add("numpy") # Get the numeric values as numpy arrays arguments["x"] = np.array2string( np.array(self.lookups.x), separator=",", threshold=len(self.lookups.x) ) arguments["y"] = np.array2string( np.array(self.lookups.y), separator=",", threshold=len(self.lookups.y) ) if arguments["value"].subscripts: subs = arguments["value"].subscripts expression = "np.interp(%(value)s, %(x)s, %(y)s)" % arguments return BuildAST( expression="xr.DataArray(%s, %s, %s)" % ( expression, subs, list(subs)), calls=arguments["value"].calls, subscripts=subs, order=0) else: return BuildAST( expression="np.interp(%(value)s, %(x)s, %(y)s)" % arguments, calls=arguments["value"].calls, subscripts={}, order=0)
[docs]class ReferenceBuilder(StructureBuilder): """Builder for references to other variables.""" def __init__(self, reference_str: ReferenceStructure, component: object): super().__init__(None, component) self.mapping_subscripts = {} self.reference = reference_str.reference self.subscripts = reference_str.subscripts self.arguments = {} @property def subscripts(self): return self._subscripts @subscripts.setter def subscripts(self, subscripts: SubscriptsReferenceStructure): """Get subscript dictionary from reference""" ref_subs = getattr(subscripts, "subscripts", []) self._subscripts = self.section.subscripts.make_coord_dict(ref_subs) if len(ref_subs) != len(self._subscripts): # The reference has repeated subscript ranges, this is # not compatible with Python as we use dictionaries to save # the subscript ranges information (duplicates a key) # Get the original name of the reference origin_ref = self.reference for origin_ref2 in self.section.namespace.namespace.keys(): if origin_ref == origin_ref2.lower().replace(" ", "_"): origin_ref = origin_ref2 break warnings.warn( "The reference to '%s' in variable '%s' has duplicated " "subscript ranges. If mapping is used in one of them, " "please, rewrite reference subscripts to avoid " "duplicates. Otherwise, the final model may crash..." % (origin_ref, self.element.name) ) # get the subscripts after applying the mapping if necessary for dim, coordinates in self._subscripts.items(): if len(coordinates) > 1: # we create the mapping only with those subscripts that are # ranges as we need to ignore singular subscripts because # that dimension is removed from final element if dim not in self.def_subs and not dim.endswith("!"): # the reference has a subscripts which is it not # applied (!) and does not appear in the definition # of the variable not_mapped = True for mapped in self.section.subscripts.mapping[dim]: # check the mapped subscripts # TODO update this and the parser to make it # compatible with more complex mappings if mapped in self.def_subs\ and mapped not in self._subscripts: # the mapped subscript appears in the definition # and it is not already in the variable self.mapping_subscripts[mapped] =\ self.section.subscripts.subscripts[mapped] not_mapped = False break if not_mapped: # manage other not mapped subscripts # this is necessary for Allocate Available # where we must force the expression in the # right to have more subscripts thant the # expression in the left self.mapping_subscripts[dim] = coordinates else: # the subscript is in the variable definition, # do not change it self.mapping_subscripts[dim] = coordinates
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ if self.reference not in self.section.namespace.cleanspace: # Manage references to subscripts (subscripts used as variables) expression, subscripts =\ self.section.subscripts.subscript2num[self.reference] subscripts_out = self.section.subscripts.simplify_subscript_input( subscripts)[1] if subscripts: self.section.imports.add("numpy") # NUMPY: not need this if expression = "xr.DataArray(%s, %s, %s)" % ( expression, subscripts_out, list(subscripts)) return BuildAST( expression=expression, calls={}, subscripts=subscripts, order=0) reference = self.section.namespace.cleanspace[self.reference] expression = reference + "()" if not self.subscripts: return BuildAST( expression=expression, calls={reference: 1}, subscripts={}, order=0) original_subs = self.section.subscripts.make_coord_dict( self.section.subscripts.elements[reference]) expression, final_subs = self._visit_subscripts( expression, original_subs) return BuildAST( expression=expression, calls={reference: 1}, subscripts=final_subs, order=0)
def _visit_subscripts(self, expression: str, original_subs: dict) -> tuple: """ Visit the subcripts of a reference to subset a subarray if neccessary or apply mapping. Parameters ---------- expression: str The expression of visiting the variable. original_subs: dict The original subscript dict of the variable. Returns ------- expression: str The expression with the necessary operations. mapping_subscirpts: dict The final subscripts of the reference after applying mapping. """ loc, rename, final_subs, reset_coords, to_float =\ visit_loc(self.subscripts, original_subs) if loc is not None: # NUMPY: expression += "[%s]" % ", ".join(loc) expression += f".loc[{loc}]" if to_float: # NUMPY: Not neccessary expression = "float(" + expression + ")" elif reset_coords: # NUMPY: Not neccessary expression += ".reset_coords(drop=True)" if rename: # NUMPY: Not neccessary expression += ".rename(%s)" % rename # NUMPY: This will not be necessary, we only need to return # self.mapping_subscripts if self.mapping_subscripts != final_subs: subscripts_out = self.section.subscripts.simplify_subscript_input( self.mapping_subscripts)[1] expression = "xr.DataArray(%s.values, %s, %s)" % ( expression, subscripts_out, list(self.mapping_subscripts) ) return expression, self.mapping_subscripts
[docs]class NumericBuilder(StructureBuilder): """Builder for numeric and nan values."""
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ if np.isnan(self.value): self.section.imports.add("numpy") return BuildAST( expression="np.nan", calls={}, subscripts={}, order=0) else: return BuildAST( expression=repr(self.value), calls={}, subscripts={}, order=0)
[docs]class ArrayBuilder(StructureBuilder): """Builder for arrays."""
[docs] def build(self, arguments: dict) -> BuildAST: """ Build method. Parameters ---------- arguments: dict The dictionary of builded arguments. Returns ------- built_ast: BuildAST The built object. """ self.value = np.array2string( self.value.reshape(compute_shape(self.def_subs)), separator=",", threshold=np.prod(self.value.shape) ) self.component.type = "Constant" self.component.subtype = "Normal" final_subs, subscripts_out =\ self.section.subscripts.simplify_subscript_input( self.def_subs, self.element.subscripts) return BuildAST( expression="xr.DataArray(%s, %s, %s)" % ( self.value, subscripts_out, list(final_subs)), calls={}, subscripts=final_subs, order=0)
[docs]def merge_dependencies(*dependencies: dict, inplace: bool = False) -> dict: """ Merge two dependencies dicts of an element. Parameters ---------- dependencies: dict The dictionaries of dependencies to merge. inplace: bool (optional) If True the final dependencies dict will be updated in the first dependencies argument, mutating it. Default is False. Returns ------- current: dict The final dependencies dict. """ current = dependencies[0] if inplace: current = dependencies[0] else: current = dependencies[0].copy() for new in dependencies[1:]: if not current: current.update(new) elif new: # regular element current_set, new_set = set(current), set(new) for dep in current_set.intersection(new_set): # if dependency is in both sum the number of calls if dep.startswith("__"): # if it is special (__lookup__, __external__) continue continue else: current[dep] += new[dep] for dep in new_set.difference(current_set): # if dependency is only in new copy it current[dep] = new[dep] return current
[docs]def visit_loc(current_subs: dict, original_subs: dict, keep_shape: bool = False) -> tuple: """ Compares the original subscripts and the current subscripts and returns subindexing information if needed. Parameters ---------- current_subs: dict The dictionary of the subscripts that are used in the variable. original_subs: dict The dictionary of the original subscripts of the variable. keep_shape: bool (optional) If True will keep the number of dimensions of the original element and return only loc. Default is False. Returns ------- loc: list of str or None List of the subscripting in each dimensions. If all are full (":"), None is rerned wich means that array indexing is not needed. rename: dict Dictionary of the dimensions to rename. final_subs: dict Dictionary of the final subscripts of the variable. reset_coords: bool Boolean indicating if the coords need to be reseted. to_float: bool Boolean indicating if the variable should be converted to a float. """ final_subs, rename, loc, reset_coords, to_float = {}, {}, [], False, True subscripts_zipped = zip(current_subs.items(), original_subs.items()) for (dim, coord), (orig_dim, orig_coord) in subscripts_zipped: if len(coord) == 1: # subset a 1 dimension value # NUMPY: subset value [:, N, :, :] if keep_shape: # NUMPY: not necessary loc.append(f"[{repr(coord[0])}]") else: loc.append(repr(coord[0])) reset_coords = True elif len(coord) < len(orig_coord): # subset a subrange # NUMPY: subset value [:, :, np.array([1, 0]), :] # NUMPY: as order may change we need to check if # dim != orig_dim # NUMPY: use also ranges [:, :, 2:5, :] when possible if dim.endswith("!"): loc.append("_subscript_dict['%s']" % dim[:-1]) else: if dim != orig_dim: loc.append("_subscript_dict['%s']" % dim) else: # workaround for locs from external objects merge loc.append(repr(coord)) final_subs[dim] = coord to_float = False else: # do nothing # NUMPY: same, we can remove float = False loc.append(":") final_subs[dim] = coord to_float = False if dim != orig_dim and len(coord) != 1: # NUMPY: check order of dimensions, make all subranges work # with the same dimensions? # NUMPY: this could be solved in the previous if/then/else rename[orig_dim] = dim if all(dim == ":" for dim in loc): # if all are ":" then no need to loc loc = None else: loc = ", ".join(loc) if keep_shape: return loc # convert to float if also coords are reseted (input is an array) to_float = to_float and reset_coords # NUMPY: save and return only loc, the other are not needed return loc, rename, final_subs, reset_coords, to_float
[docs]class ASTVisitor: """ ASTVisitor allows visiting the Abstract Synatx Tree of a component returning the Python object and generating the neccessary objects. Parameters ---------- component: ComponentBuilder The component builder to build. """ _builders = { InitialStructure: InitialBuilder, IntegStructure: IntegBuilder, DelayStructure: lambda x, y: DelayBuilder("Delay", x, y), DelayNStructure: lambda x, y: DelayBuilder("DelayN", x, y), DelayFixedStructure: DelayFixedBuilder, SmoothStructure: SmoothBuilder, SmoothNStructure: SmoothBuilder, TrendStructure: TrendBuilder, ForecastStructure: ForecastBuilder, SampleIfTrueStructure: SampleIfTrueBuilder, GetConstantsStructure: ExtConstantBuilder, GetDataStructure: ExtDataBuilder, GetLookupsStructure: ExtLookupBuilder, LookupsStructure: LookupsBuilder, InlineLookupsStructure: InlineLookupsBuilder, DataStructure: TabDataBuilder, ReferenceStructure: ReferenceBuilder, CallStructure: CallBuilder, GameStructure: GameBuilder, AllocateAvailableStructure: AllocateAvailableBuilder, AllocateByPriorityStructure: AllocateByPriorityBuilder, LogicStructure: OperationBuilder, ArithmeticStructure: OperationBuilder, int: NumericBuilder, float: NumericBuilder, np.ndarray: ArrayBuilder, } def __init__(self, component: object): # component typing should be ComponentBuilder, but importing it # for typing would create a circular dependency :S self.ast = component.ast self.subscripts = component.subscripts_dict self.component = component
[docs] def visit(self) -> Union[None, BuildAST]: """ Visit the Abstract Syntax Tree of the component. Returns ------- visit_out: BuildAST or None The BuildAST object resulting from visiting the AST. If the component content has been added to an existing object using the 'add' method it will return None. """ visit_out = self._visit(self.ast) if not visit_out: # external objects that are declared with other expression return None if not visit_out.calls and self.component.type == "Auxiliary": self.component.type = "Constant" self.component.subtype = "Normal" # include dependencies of the current component in the element merge_dependencies( self.component.element.dependencies, visit_out.calls, inplace=True) if not visit_out.subscripts: # expression is a float return visit_out # NUMPY not needed # get subscript in elements as name of the ranges may change subscripts_in_element = { dim: coords for dim, coords in zip(self.component.element.subscripts, self.subscripts.values()) } reshape = ( (visit_out.subscripts != self.subscripts or list(visit_out.subscripts) != list(self.subscripts)) and (visit_out.subscripts != subscripts_in_element or list(visit_out.subscripts) != list(subscripts_in_element)) ) if reshape: # NUMPY: in this case we need to tile along dims if neccessary # or reorder the dimensions visit_out.reshape( self.component.section.subscripts, self.subscripts, True) return visit_out
def _visit(self, ast_object: AbstractSyntax) -> AbstractSyntax: """ Visit one Builder and its arguments. """ builder = self._builders[type(ast_object)](ast_object, self.component) arguments = { name: self._visit(value) for name, value in builder.arguments.items() } return builder.build(arguments)