"""
The ModelBuilder class allows converting the AbstractModel into a
PySD model writing the Python code in files that can be loaded later
with PySD Model class. Each Abstract level has its own Builder. However,
the user is only required to create a ModelBuilder object using the
AbstractModel and call the `build_model` method.
"""
from warnings import warn
import textwrap
import black
import json
from pathlib import Path
from typing import Union
from pysd.translators.structures.abstract_model import\
AbstractComponent, AbstractElement, AbstractControlElement,\
AbstractModel, AbstractSection
from . import python_expressions_builder as vs
from .namespace import NamespaceManager
from .subscripts import SubscriptManager
from .imports import ImportsManager
from pysd._version import __version__
[docs]
class ModelBuilder:
"""
ModelBuilder allows building a PySD Python model from the
Abstract Model.
Parameters
----------
abstract_model: AbstractModel
The abstract model to build.
"""
def __init__(self, abstract_model: AbstractModel):
self.__dict__ = abstract_model.__dict__.copy()
# load sections
self.sections = [
SectionBuilder(section)
for section in abstract_model.sections
]
# create the macrospace (namespace of macros)
self.macrospace = {
section.name: section for section in self.sections[1:]}
[docs]
def build_model(self) -> Path:
"""
Build the Python model in a file callled as the orginal model
but with '.py' suffix.
Returns
-------
path: pathlib.Path
The path to the new PySD model.
"""
for section in self.sections:
# add macrospace information to each section and build it
section.macrospace = self.macrospace
section.build_section()
# return the path to the main file
return self.sections[0].path
[docs]
class SectionBuilder:
"""
SectionBuilder allows building a section of the PySD model. Each
section will be a file unless the model has been set to be split
in modules.
Parameters
----------
abstract_section: AbstractSection
The abstract section to build.
"""
def __init__(self, abstract_section: AbstractSection):
self.__dict__ = abstract_section.__dict__.copy()
self.root = self.path.parent # the folder where the model is
self.model_name = self.path.with_suffix("").name # name of the model
# Create subscript manager object with subscripts_dict
self.subscripts = SubscriptManager(
abstract_section.subscripts, self.root)
# Load the elements in the section
self.elements = [
ElementBuilder(element, self)
for element in abstract_section.elements
]
# Create the namespace of the section
self.namespace = NamespaceManager(self.params)
# Create an imports manager
self.imports = ImportsManager()
# Create macrospace (namespace of macros)
self.macrospace = {}
# Create parameters dict necessary in macros
self.params = {
key: self.namespace.namespace[key]
for key in self.params
}
# Import xarray if there are any subscripts defined in the section
if self.subscripts.subscripts:
self.imports.add("xarray")
[docs]
def build_section(self) -> None:
"""
Build the Python section in a file callled as the orginal model
if the section is main or in a file called as the macro name
if the section is a macro.
"""
# Firts iteration over elements to recover their information
for element in self.elements:
# Add element to namespace
self.namespace.add_to_namespace(element.name)
identifier = self.namespace.namespace[element.name]
element.identifier = identifier
# Add element subscripts information to the subscript manager
self.subscripts.elements[identifier] = element.subscripts
# Build elements
for element in self.elements:
element.build_element()
if self.split:
# Build modular section
self._build_modular(self.views_dict)
else:
# Build one-file section
self._build()
def _process_views_tree(self, view_name: str,
view_content: Union[dict, set],
wdir: Path) -> dict:
"""
Creates a directory tree based on the elements_per_view dictionary.
If it's the final view, it creates a file, if not, it creates a folder.
"""
if isinstance(view_content, set):
# Will become a module
# Convert subview elements names to Python names
view_content = {
self.namespace.cleanspace[var] for var in view_content
}
# Get subview elements (ordered)
subview_elems = sorted(
[
element for element in self.elements_remaining
if element.identifier in view_content
and not element.control_var
],
key=lambda x: x.identifier)
# Get the names of the elements and include their
# information in the elements_added dictionary
subview_elems_names = [
element.identifier for element in subview_elems
]
view_path = ".".join(view_name.parts[1:])
self.elements_added.update({
var: view_path for var in subview_elems_names
})
if len(view_content) != len(subview_elems_names):
# Some elements from the view where not added
for var in view_content.difference(subview_elems_names):
original_name = self.namespace.get_original_name(var)
if var in self.elements_added:
# Element already added in another view
warn(
f"Variable '{original_name}' is declared as "
f"a workbench variable in '{view_path}' but "
"it has been already added in "
f"'{self.elements_added[var]}'."
)
else:
# Element is a control variable
warn(
f"Control variable '{original_name}' is "
"declared as a workbench variable in "
f"'{view_path}'. As it is a control "
"variable, this declaration will be ignored "
"and added to the main module only."
)
# Remove elements from remaining ones
[
self.elements_remaining.remove(element)
for element in subview_elems
]
if subview_elems:
# Build the module (only when they are variables)
self._build_separate_module(subview_elems, view_name, wdir)
return list(subview_elems_names)
else:
# The current view has subviews
(wdir / view_name).mkdir(exist_ok=True)
subviews = {
subview_name: self._process_views_tree(
view_name / subview_name, subview_content, wdir)
for subview_name, subview_content in view_content.items()
}
# Avoid includying empty views to the dictionary
return {
subview_name: subview_content
for subview_name, subview_content in subviews.items()
if subview_content
}
def _build_modular(self, elements_per_view: dict) -> None:
""" Build modular section """
self.elements_remaining = self.elements.copy()
self.elements_added = {}
elements_per_view = self._process_views_tree(
Path("modules_" + self.model_name), elements_per_view, self.root)
for element in self.elements_remaining:
if not element.control_var:
warn(
f"Variable '{element.name}' is not declared as a "
"workbench variable in any view. It will be added to "
"the main module."
)
# Building main file using the build function
self._build_main_module(self.elements_remaining)
# Build subscripts dir and moduler .json files
for file, values in {
"modules_%s/_modules": elements_per_view,
"_subscripts_%s": self.subscripts.subscripts}.items():
with self.root.joinpath(
file % self.model_name).with_suffix(
".json").open("w") as outfile:
json.dump(values, outfile, indent=4, sort_keys=True)
def _build_separate_module(self, elements: list, module_name: str,
module_dir: str) -> None:
"""
Constructs and writes the Python representation of a specific model
module, when the split_views=True in the read_vensim function.
Parameters
----------
elements: list
Elements belonging to the module module_name.
module_name: str
Name of the module
module_dir: str
Path of the directory where module files will be stored.
Returns
-------
None
"""
text = textwrap.dedent('''
"""
Module %(module_name)s
Translated using PySD version %(version)s
"""
''' % {
"module_name": ".".join(module_name.parts[1:]),
"version": __version__,
})
funcs = self._generate_functions(elements)
text += funcs
text = black.format_file_contents(
text, fast=True, mode=black.FileMode())
outfile_name = module_dir / module_name.with_suffix(".py")
with outfile_name.open("w", encoding="UTF-8") as out:
out.write(text)
def _build_main_module(self, elements: list) -> None:
"""
Constructs and writes the Python representation of the main model
module, when the split_views=True in the read_vensim function.
Parameters
----------
elements: list
Elements belonging to the main module. Ideally, there should
only be the initial_time, final_time, saveper and time_step,
functions, though there might be others in some situations.
Each element is a dictionary, with the various components
needed to assemble a model component in Python syntax. This
will contain multiple entries for elements that have multiple
definitions in the original file, and which need to be combined.
Returns
-------
None
"""
# separating between control variables and rest of variables
control_vars, funcs = self._build_variables(elements)
self.imports.add("utils", "load_model_data")
self.imports.add("utils", "load_modules")
# import of needed functions and packages
text = self.imports.get_header(self.path.name)
# import subscript dict from json file
text += textwrap.dedent("""
__pysd_version__ = '%(version)s'
__data = {
'scope': None,
'time': lambda: 0
}
_root = Path(__file__).parent
%(params)s
_subscript_dict, _modules = load_model_data(
_root, "%(model_name)s")
component = Component()
""" % {
"params": f"\n _params = {self.params}\n"
if self.params else "",
"model_name": self.model_name,
"version": __version__
})
text += self._get_control_vars(control_vars)
text += textwrap.dedent("""
# load modules from modules_%(model_name)s directory
exec(load_modules("modules_%(model_name)s", _modules, _root, []))
""" % {
"model_name": self.model_name,
})
text += funcs
text = black.format_file_contents(
text, fast=True, mode=black.FileMode())
with self.path.open("w", encoding="UTF-8") as out:
out.write(text)
def _build(self) -> None:
"""
Constructs and writes the Python representation of a section.
Returns
-------
None
"""
control_vars, funcs = self._build_variables(self.elements)
text = self.imports.get_header(self.path.name)
indent = "\n "
# Generate params dict for macro parameters
params = f"{indent}_params = {self.params}\n"\
if self.params else ""
# Generate subscripts dir
subs = f"{indent}_subscript_dict = {self.subscripts.subscripts}"\
if self.subscripts.subscripts else ""
text += textwrap.dedent("""
__pysd_version__ = '%(version)s'
__data = {
'scope': None,
'time': lambda: 0
}
_root = Path(__file__).parent
%(params)s
%(subscript_dict)s
component = Component()
""" % {
"subscript_dict": subs,
"params": params,
"version": __version__,
})
text += self._get_control_vars(control_vars) + funcs
text = black.format_file_contents(
text, fast=True, mode=black.FileMode())
with self.path.open("w", encoding="UTF-8") as out:
out.write(text)
def _build_variables(self, elements: dict) -> tuple:
"""
Build model variables (functions) and separate then in control
variables and regular variables.
Returns
-------
control_vars, regular_vars: tuple, str
control_vars is a tuple of length 2. First element is the
dictionary of original control vars. Second is the string to
add the control variables' functions. regular_vars is the
string to add the regular variables' functions.
"""
# returns of the control variables
control_vars_dict = {
"initial_time": "__data['time'].initial_time()",
"final_time": "__data['time'].final_time()",
"time_step": "__data['time'].time_step()",
"saveper": "__data['time'].saveper()"
}
regular_vars = []
control_vars = []
for element in elements:
if element.identifier in control_vars_dict:
# change the return expression in the element and update
# the dict with the original expression
control_vars_dict[element.identifier], element.expression =\
element.expression, control_vars_dict[element.identifier]
control_vars.append(element)
else:
regular_vars.append(element)
if len(control_vars) == 0:
# macro objects, no control variables
control_vars_dict = ""
else:
control_vars_dict = """
_control_vars = {
"initial_time": lambda: %(initial_time)s,
"final_time": lambda: %(final_time)s,
"time_step": lambda: %(time_step)s,
"saveper": lambda: %(saveper)s
}
""" % control_vars_dict
return (control_vars_dict,
self._generate_functions(control_vars)),\
self._generate_functions(regular_vars)
def _generate_functions(self, elements: dict) -> str:
"""
Builds all model elements as functions in string format.
NOTE: this function calls the build_element function, which
updates the import_modules.
Therefore, it needs to be executed before the method
_generate_automatic_imports.
Parameters
----------
elements: dict
Each element is a dictionary, with the various components
needed to assemble a model component in Python syntax. This
will contain multiple entries for elements that have multiple
definitions in the original file, and which need to be combined.
Returns
-------
funcs: str
String containing all formated model functions
"""
return "\n".join(
[element._build_element_out() for element in elements]
)
def _get_control_vars(self, control_vars: str) -> str:
"""
Create the section of control variables
Parameters
----------
control_vars: str
Functions to define control variables.
Returns
-------
text: str
Control variables section and header of model variables section.
"""
text = textwrap.dedent("""
#######################################################################
# CONTROL VARIABLES #
#######################################################################
%(control_vars_dict)s
def _init_outer_references(data):
for key in data:
__data[key] = data[key]
@component.add(name="Time")
def time():
'''
Current time of the model.
'''
return __data['time']()
""" % {"control_vars_dict": control_vars[0]})
text += control_vars[1]
text += textwrap.dedent("""
#######################################################################
# MODEL VARIABLES #
#######################################################################
""")
return text
[docs]
class ElementBuilder:
"""
ElementBuilder allows building an element of the PySD model.
Parameters
----------
abstract_element: AbstractElement
The abstract element to build.
section: SectionBuilder
The section where the element is defined. Necessary to give the
acces to the subscripts and namespace.
"""
def __init__(self, abstract_element: AbstractElement,
section: SectionBuilder):
self.__dict__ = abstract_element.__dict__.copy()
self.control_var = isinstance(abstract_element, AbstractControlElement)
# Set element type and subtype to None
self.type = None
self.subtype = None
# Get the arguments of the element
self.arguments = getattr(self.components[0], "arguments", "")
# Load the components of the element
self.components = [
ComponentBuilder(component, self, section)
for component in abstract_element.components
]
self.section = section
# Get the subscripts of the element after merging all the components
self.subscripts = section.subscripts.make_merge_list(
[component.subscripts[0] for component in self.components])
# Get the subscript dictionary of the element
self.subs_dict = section.subscripts.make_coord_dict(self.subscripts)
# Dictionaries to save dependencies and objects related to the element
self.dependencies = {}
self.other_dependencies = {}
self.objects = {}
[docs]
def build_element(self) -> None:
"""
Build the element. Returns the string to include in the section which
will be a decorated function definition and possible objects.
"""
# TODO think better how to build the components at once to build
# in one declaration the external objects
# TODO include some kind of magic vectorization to identify patterns
# that can be easily vecorized (GET, expressions, Stocks...)
# Build the components of the element
[component.build_component() for component in self.components]
expressions = []
for component in self.components:
expr, subs, except_subscripts = component.get()
if expr is None:
# The expr is None when the component has been "added"
# to an existing object using the add method
continue
if isinstance(subs, list):
# Get the list of locs for the component
# Subscripts dict will be a list when the component is
# translated to an object that groups may components
# via 'add' method.
loc = [vs.visit_loc(subsi, self.subs_dict, True)
for subsi in subs]
else:
# Get the loc of the component
loc = vs.visit_loc(subs, self.subs_dict, True)
# Get the locs of the :EXCLUDE: parameters if any
exc_loc = [
vs.visit_loc(subs_e, self.subs_dict, True)
for subs_e in except_subscripts
]
expressions.append({
"expr": expr,
"subs": subs,
"loc": loc,
"loc_except": exc_loc
})
if len(expressions) > 1:
# NUMPY: xrmerge would be sustitute by a multiple line definition
# e.g.:
# value = np.empty((len(dim1), len(dim2)))
# value[:, 0] = expression1
# value[:, 1] = expression2
# return value
# This allows reference to the same variable
# from: VAR[A] = 5; VAR[B] = 2*VAR[A]
# to: value[0] = 5; value[1] = 2*value[0]
self.section.imports.add("numpy")
self.pre_expression =\
"value = xr.DataArray(np.nan, {%s}, %s)\n" % (
", ".join("'%(dim)s': _subscript_dict['%(dim)s']" %
{"dim": subs} for subs in self.subscripts),
self.subscripts)
for expression in expressions:
# Generate the pre_expression, operations to compute in
# the body of the function
if expression["expr"].subscripts:
# Get the values
# NUMPY not necessary
expression["expr"].lower_order(-1)
expression["expr"].expression += ".values"
if expression["loc_except"]:
# There is an excep in the definition of the component
self.pre_expression += self._manage_except(expression)
elif isinstance(expression["subs"], list):
# There are mixed definitions which include multicomponent
# object
self.pre_expression += self._manage_multi_def(expression)
else:
# Regular loc for a component
self.pre_expression +=\
"value.loc[%(loc)s] = %(expr)s\n" % expression
# Return value
self.expression = "value"
else:
self.pre_expression = ""
# NUMPY: reshape to the final shape if needed)
# expressions[0]["expr"].reshape(self.section.subscripts, {})
if not expressions[0]["expr"].subscripts and self.subscripts:
# Updimension the return value to an array
self.expression = "xr.DataArray(%s, %s, %s)\n" % (
expressions[0]["expr"],
self.section.subscripts.simplify_subscript_input(
self.subs_dict)[1],
list(self.subs_dict)
)
else:
# Return the expression
self.expression = expressions[0]["expr"]
# Merge the types of the components (well defined element should
# have only one type and subtype)
self.type = ", ".join(
set(component.type for component in self.components)
)
self.subtype = ", ".join(
set(component.subtype for component in self.components)
)
if ", " in self.type:
warn(
f"Variable '{self.name}' is defined with different types:"
f" '{self.type}'. This may cause bugs when trying to "
"change its value or applying other methods from the "
"pysd.py_backend.model.Model class. Running the model "
"without modifying this variable should not cause any "
"bug."
)
elif ", " in self.subtype:
warn(
f"Variable '{self.name}' is defined with different subtypes:"
f" '{self.subtype}'. This may cause bugs when trying to "
"change its value or applying other methods from the "
"pysd.py_backend.model.Model class. Running the model "
"without modifying this variable should not cause any "
"bug."
)
def _manage_multi_def(self, expression: dict) -> str:
"""
Manage multiline definitions when some of them (not all) are
merged to one object.
"""
final_expr = "def_subs = xr.zeros_like(value, dtype=bool)\n"
for loc in expression["loc"]:
# coordinates of the object
final_expr += f"def_subs.loc[{loc}] = True\n"
# replace the values matching the coordinates
return final_expr + "value.values[def_subs.values] = "\
"%(expr)s[def_subs.values]\n" % expression
def _manage_except(self, expression: dict) -> str:
"""
Manage except declarations by not asigning its values.
"""
if expression["subs"] == self.subs_dict:
# Final subscripts are the same as the main subscripts
# of the component. Generate a True array like value
final_expr = "except_subs = xr.ones_like(value, dtype=bool)\n"
else:
# Final subscripts are greater than the main subscripts
# of the component. Generate a False array like value and
# set to True the subarray of the component coordinates
final_expr = "except_subs = xr.zeros_like(value, dtype=bool)\n"\
"except_subs.loc[%(loc)s] = True\n" % expression
for except_subs in expression["loc_except"]:
# We set to False the dimensions in the EXCEPT
final_expr += "except_subs.loc[%s] = False\n" % except_subs
if expression["expr"].subscripts:
# Assign the values of an array
if expression["subs"] == self.subs_dict:
return final_expr + "value.values[except_subs.values] = "\
"%(expr)s[except_subs.values]\n" % expression
else:
return final_expr + "value.values[except_subs.values] = "\
"%(expr)s[except_subs.loc[%(loc)s].values]\n" % expression
else:
# Assign the values of a float
return final_expr + "value.values[except_subs.values] = "\
"%(expr)s\n" % expression
def _build_element_out(self) -> str:
"""
Returns a string that has processed a single element dictionary.
Returns
-------
func: str
The function to write in the model file.
"""
# Contents of the function (body + return)
contents = self.pre_expression + "return %s" % self.expression
# Get the objects to create as string
objects = "\n\n".join([
value["expression"] for value in self.objects.values()
if value["expression"] is not None
])
# Format the limits to get them as a string
self.limits = self._format_limits(self.limits)
# Update arguments with final subs to alllow passing arguments
# with subscripts to the lookups
if self.arguments == 'x':
self.arguments = 'x, final_subs=None'
# Define variable metadata for the @component decorator
self.name = repr(self.name)
meta_data = ["name=%(name)s"]
# Include basic metadata (units, limits, dimensions)
if self.units:
meta_data.append("units=%(units)s")
self.units = repr(self.units)
if self.limits:
meta_data.append("limits=%(limits)s")
if self.subscripts:
self.section.imports.add("subs")
meta_data.append("subscripts=%(subscripts)s")
# Include component type and subtype
meta_data.append("comp_type='%(type)s'")
meta_data.append("comp_subtype='%(subtype)s'")
# Include dependencies
if self.dependencies:
meta_data.append("depends_on=%(dependencies)s")
if self.other_dependencies:
meta_data.append("other_deps=%(other_dependencies)s")
# Get metadata decorator
self.meta_data = f"@component.add({', '.join(meta_data)})"\
% self.__dict__
# Clean the documentation and add it to the beggining of contents
if self.documentation:
doc = self.documentation.replace("\\", "\n")
contents = f'"""\n{doc}\n"""\n'\
+ contents
indent = 12
# Convert newline indicator and add expected level of indentation
self.contents = contents.replace("\n", "\n" + " " * (indent+4))
self.objects = objects.replace("\n", "\n" + " " * indent)
# Return the decorated function definition with the object declarations
return textwrap.dedent('''
%(meta_data)s
def %(identifier)s(%(arguments)s):
%(contents)s
%(objects)s
''' % self.__dict__)
def _format_limits(self, limits: tuple) -> str:
"""Format the limits of an element to print them properly"""
if limits == (None, None):
return None
new_limits = []
for value in limits:
value = repr(value)
if value == "nan" or value == "None":
# add numpy.nan to the values
self.section.imports.add("numpy")
new_limits.append("np.nan")
elif value.endswith("inf"):
# add numpy.inf to the values
self.section.imports.add("numpy")
new_limits.append(value.strip("inf") + "np.inf")
else:
# add numeric value
new_limits.append(value)
if new_limits[0] == "np.nan" and new_limits[1] == "np.nan":
# if both are numpy.nan do not include limits
return None
return "(" + ", ".join(new_limits) + ")"
[docs]
class ComponentBuilder:
"""
ComponentBuilder allows building a component of the PySD model.
Parameters
----------
abstract_component: AbstracComponent
The abstract component to build.
element: ElementBuilder
The element where the component is defined. Necessary to give the
acces to the merging subscripts and other components.
section: SectionBuilder
The section where the element is defined. Necessary to give the
acces to the subscripts and namespace.
"""
def __init__(self, abstract_component: AbstractComponent,
element: ElementBuilder, section: SectionBuilder):
self.__dict__ = abstract_component.__dict__.copy()
self.element = element
self.section = section
if not hasattr(self, "keyword"):
self.keyword = None
[docs]
def build_component(self) -> None:
"""
Build model component parsing the Abstract Syntax Tree.
"""
self.subscripts_dict = self.section.subscripts.make_coord_dict(
self.subscripts[0])
self.except_subscripts = [self.section.subscripts.make_coord_dict(
except_list) for except_list in self.subscripts[1]]
self.ast_build = vs.ASTVisitor(self).visit()
[docs]
def get(self) -> tuple:
"""
Get build component to build the element.
Returns
-------
ast_build: BuildAST
Parsed AbstractSyntaxTree.
subscript_dict: dict or list of dicts
The subscripts of the component.
except_subscripts: list of dicts
The subscripts to avoid.
"""
return self.ast_build, self.subscripts_dict, self.except_subscripts