# -*- coding: utf-8
"""Module for helper functions used by several other modules.
This file is part of project TESPy (github.com/oemof/tespy). It's copyrighted
by the contributors recorded in the version control history of the file,
available from its original location tespy/tools/helpers.py
SPDX-License-Identifier: MIT
"""
import json
import os
from collections.abc import Iterable
from collections.abc import Mapping
from copy import deepcopy
import pandas as pd
from tespy import __datapath__
from tespy.tools import logger
from tespy.tools.data_containers import ComponentProperties as dc_cp
from tespy.tools.data_containers import ScalarVariable as dc_scavar
from tespy.tools.data_containers import VectorVariable as dc_vecvar
from tespy.tools.global_vars import ERR
from tespy.tools.global_vars import FLUID_ALIASES
from tespy.tools.global_vars import fluid_property_data
[docs]
def get_all_subdictionaries(data):
subdictionaries = []
for value in data.values():
if len(value["subbranches"]) == 0:
subdictionaries.append(
{k: v for k, v in value.items() if k != "subbranches"}
)
else:
subdictionaries.append(
{k: v for k, v in value.items() if k != "subbranches"}
)
subdictionaries.extend(get_all_subdictionaries(value["subbranches"]))
return subdictionaries
[docs]
def get_chem_ex_lib(name):
"""Return a new dictionary by merging two dictionaries recursively."""
path = os.path.join(__datapath__, "ChemEx", f"{name}.json")
with open(path, "r") as f:
return json.load(f)
[docs]
def fluidalias_in_list(fluid, fluid_list):
aliases = FLUID_ALIASES.get_fluid(fluid)
return set(fluid_list) & aliases
[docs]
def merge_dicts(dict1, dict2):
"""Return a new dictionary by merging two dictionaries recursively."""
result = deepcopy(dict1)
for key, value in dict2.items():
if isinstance(value, Mapping):
result[key] = merge_dicts(result.get(key, {}), value)
else:
result[key] = deepcopy(dict2[key])
return result
[docs]
class TESPyNetworkError(Exception):
"""Custom message for network related errors."""
pass
[docs]
class TESPyConnectionError(Exception):
"""Custom message for connection related errors."""
pass
[docs]
class TESPyComponentError(Exception):
"""Custom message for component related errors."""
pass
[docs]
def convert_to_SI(property, value, unit):
r"""
Convert a value to its SI value.
Parameters
----------
property : str
Fluid property to convert.
value : float
Value to convert.
unit : str
Unit of the value.
Returns
-------
SI_value : float
Specified fluid property in SI value.
"""
if property == 'T':
converters = fluid_property_data['T']['units'][unit]
return (value + converters[0]) * converters[1]
else:
return value * fluid_property_data[property]['units'][unit]
[docs]
def convert_from_SI(property, SI_value, unit):
r"""
Get a value in the network's unit system from SI value.
Parameters
----------
property : str
Fluid property to convert.
SI_value : float
SI value to convert.
unit : str
Unit of the value.
Returns
-------
value : float
Specified fluid property value in network's unit system.
"""
if property == 'T':
converters = fluid_property_data['T']['units'][unit]
return SI_value / converters[1] - converters[0]
else:
return SI_value / fluid_property_data[property]['units'][unit]
[docs]
class UserDefinedEquation:
def __init__(self, label: str, func: callable, dependents:callable, deriv: callable=None, conns: list=[], comps:list=[], params:dict={}):
r"""
A UserDefinedEquation allows use of generic user specified equations.
Parameters
----------
label : str
Label of the user defined function
func : function
Method returning residual value of equation to evaluate
dependents : function
Function, that returns the variables the equation depends on
deriv : function, optional
Method calculating partial derivatives of the equation, be default
None
conns : list
List of connections used
comps : list
List of components used
params : dict
Dictionary containing keyword arguments required by the function
and/or derivative.
Note
----
The derivative function specification is optional.
- In case you provide the dependents and no derivative, the equation
will numerically derived to all of the specified dependents.
- In case you provide the dependents and the derivative, the derivative
method will be used to calculate the partial derivatives.
Example
-------
Consider a pipeline transporting hot water with measurement data on
temperature reduction in the pipeline as function of volumetric flow.
First, we set up the TESPy model. Additionally, we will import the
:py:class:`tespy.tools.helpers.UserDefinedEquation` class as well as
some fluid property functions. We specify fluid property information
for the inflow and assume that no pressure losses occur in the
pipeline.
>>> from tespy.components import Source, Sink, Pipe
>>> from tespy.networks import Network
>>> from tespy.connections import Connection
>>> from tespy.tools import UserDefinedEquation
>>> from tespy.tools import CharLine
>>> from tespy.tools.fluid_properties import T_mix_ph, v_mix_ph
>>> nw = Network()
>>> nw.set_attr(iterinfo=False)
>>> nw.units.set_defaults(**{"pressure": "bar", "temperature": "degC"})
>>> so = Source('source')
>>> si = Sink('sink')
>>> pipeline = Pipe('pipeline')
>>> inflow = Connection(so, 'out1', pipeline, 'in1')
>>> outflow = Connection(pipeline, 'out1', si, 'in1')
>>> nw.add_conns(inflow, outflow)
>>> inflow.set_attr(T=120, p=10, v=1, fluid={'water': 1})
>>> pipeline.set_attr(pr=1)
Let's assume, the temperature reduction is measured from inflow and
outflow temperature. The mathematical description of the relation
we want the model to follow therefore is:
.. math::
0 = T_{in} - T_{out} + f \left( \dot{m}_{in} \cdot v_{in} \right)
We can define a function, describing exactly that relation using a
:py:class:`tespy.tools.characteristics.CharLine` object with volumetric
flow as input values and temperature drop as output values. The
function should look like this:
>>> def myfunc(ude):
... char = ude.params['char']
... return (
... ude.conns[0].calc_T() - ude.conns[1].calc_T()
... - char.evaluate(
... ude.conns[0].m.val_SI *
... ude.conns[0].calc_vol()
... )
... )
The function does only take one parameter, we name it :code:`ude` in
this case. This parameter will hold all relevant information you pass
to your UserDefinedEquation later, i.e. a list of the connections
(:code:`.conns`) required by the UserDefinedEquation as well as
a dictionary of arbitrary parameters required for your function
(:code:`.params`). The index of the :code:`.conns` indicates the
position of the connection in the list of connections required for the
UserDefinedEquation (see below).
On top of the equation the solver requires its derivatives with respect
to all relevant primary variables of the network, which are mass flow
pressure, enthalpy and fluid composition, potentially also component
variables. In this case, the derivatives to the mass flow, pressure and
enthalpy of the inflow as well as the derivatives to the pressure and
enthalpy of the outflow will be required.
We can now apply two different approaches:
- Either we specify the dependents and let the solver automatically
derive the method, or
- we specify the derivative function directly ourselves.
In the first example, we will only specify the dependents. With that,
we only need to specify the characteristic line we want our temperature
drop to follow as well as create the UserDefinedEquation instance and
add it to the network. We apply extrapolation for the characteristic
line as it helps with convergence.
>>> def mydependents(ude):
... c0, c1 = ude.conns
... return [c0.m, c0.p, c0.h, c1.p, c1.h]
>>> char = CharLine(
... x=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 2.0, 3.0],
... y=[17, 12, 9, 6.5, 4.5, 3, 2, 1.5, 1.25, 1.125, 1.1, 1.05],
... extrapolate=True)
>>> my_ude = UserDefinedEquation(
... 'myudelabel', myfunc,
... conns=[inflow, outflow],
... dependents=mydependents,
... params={'char': char}
... )
>>> nw.add_ude(my_ude)
>>> nw.solve('design')
Clearly the result is obvious here as the volumetric flow is exactly
at one of the supporting points.
>>> round(inflow.T.val - outflow.T.val, 3)
1.125
So now, let's say, we want to calculate the volumetric flow necessary
to at least maintain a specific temperature at the outflow.
>>> inflow.set_attr(v=None)
>>> outflow.set_attr(T=110)
>>> nw.solve('design')
>>> round(inflow.v.val, 3)
0.267
Or calculate volumetric flow and/or temperature drop corresponding to a
specified heat loss.
>>> outflow.set_attr(T=None)
>>> pipeline.set_attr(Q=-5e6)
>>> nw.solve('design')
>>> round(inflow.v.val, 3)
0.067
To make the example complete, we will quickly have a look at, how you
can specify the derivative method. In this case, you have to define a
function placing the derivatives in the correct locations of the
Jacobian matrix. The highlevel method :code:`partial_derivative` of the
class can handle this for your. You have to pass the variable to this
method, for which you want to calculate the partial derivative. In case
the value can be determined analytically, you can additionally pass the
value as second argument to the function. If you do not pass a second
argument to the function as in the example below, a numerical
derivative will be calculated.
>>> def myjacobian(increment_filter, k, dependents=None, ude=None):
... c0 = ude.conns[0]
... c1 = ude.conns[1]
... ude.partial_derivative(c0.m)
... ude.partial_derivative(c0.p)
... ude.partial_derivative(c0.h)
... ude.partial_derivative(c1.p)
... ude.partial_derivative(c1.h)
>>> my_ude.deriv = myjacobian
>>> nw.solve('design')
"""
if isinstance(label, str):
self.label = label
else:
msg = 'Label of UserDefinedEquation object must be of type String.'
logger.exception(msg)
raise TypeError(msg)
self.func = func
self.deriv = deriv
self.dependents = dependents
self.conns = conns
self.comps = comps
self.params = params
def _preprocess(self, row_idx):
self.num_eq = 0
self._structure_matrix = {}
self._rhs = {}
self._equation_set_lookup = {}
self._equation_set_lookup[row_idx] = "equation"
self.num_eq += 1
def _prepare_for_solver(self, system_dependencies, eq_counter):
self.num_eq = 0
self.it = 0
self.equations = {}
self._equation_lookup = {}
self._equation_scalar_dependents_lookup = {}
self._equation_vector_dependents_lookup = {}
for eq_num, value in self._equation_set_lookup.items():
if eq_num in system_dependencies:
continue
if value not in self.equations:
self.equations[value] = dc_cp(
func_params={"ude": self},
dependents=self.dependents,
num_eq_sets=1,
func=self.func,
deriv=self.deriv
)
self._assign_dependents_and_eq_mapping(
value, self.equations[value], self.equations, eq_counter
)
self.num_eq += 1
eq_counter += 1
self.residual = {}
self.jacobian = {}
return eq_counter
def _assign_dependents_and_eq_mapping(self, value, data, eq_dict, eq_counter):
if data.dependents is None:
scalar_dependents = [[] for _ in range(data.num_eq)]
vector_dependents = [{} for _ in range(data.num_eq)]
else:
dependents = data.dependents(**data.func_params)
if type(dependents) == list:
scalar_dependents = _get_dependents(dependents)
vector_dependents = [{} for _ in range(data.num_eq)]
else:
scalar_dependents = _get_dependents(dependents["scalars"])
vector_dependents = _get_vector_dependents(dependents["vectors"])
# this is a temporary fix
if len(vector_dependents) < data.num_eq:
vector_dependents = [{} for _ in range(data.num_eq)]
eq_dict[value]._scalar_dependents = scalar_dependents
eq_dict[value]._vector_dependents = vector_dependents
eq_dict[value]._first_eq_index = eq_counter
for i in range(data.num_eq):
self._equation_lookup[eq_counter + i] = (value, i)
self._equation_scalar_dependents_lookup[eq_counter + i] = scalar_dependents[i]
self._equation_vector_dependents_lookup[eq_counter + i] = vector_dependents[i]
[docs]
def get_structure_matrix(self):
return
[docs]
def partial_derivative(self, var, value=None):
eq_num = self.equations["equation"]._first_eq_index
if value is None:
value = self.func
self._partial_derivative(var, eq_num, value, **{"ude": self})
def _partial_derivative(self, var, eq_num, value, increment_filter=None, **kwargs):
result = _partial_derivative(var, value, increment_filter, **kwargs)
if result is not None:
self.jacobian[eq_num, var.J_col] = result
def _partial_derivative_fluid(self, var, eq_num, value, dx, increment_filter=None, **kwargs):
result = _partial_derivative_vecvar(var, value, dx, increment_filter, **kwargs)
if result is not None:
self.jacobian[eq_num, var.J_col[dx]] = result
[docs]
def solve(obj, increment_filter):
"""
Solve equations and calculate partial derivatives of a component.
Parameters
----------
increment_filter : ndarray
Matrix for filtering non-changing variables.
"""
for label, data in obj.equations.items():
eq_num = data._first_eq_index
_solve_residual(obj, data, eq_num)
_solve_jacobian(obj, data, increment_filter, eq_num)
def _solve_residual(obj, data, eq_num):
result = data.func(**data.func_params)
if isinstance(result, Iterable):
result = {eq_num + k: value for k, value in enumerate(result)}
else:
result = {eq_num: result}
obj.residual.update(result)
def _solve_jacobian(obj, data, increment_filter, eq_num):
if data.constant_deriv:
return
elif data.deriv is not None:
data.deriv(
increment_filter,
eq_num,
dependents={
"scalars": data._scalar_dependents,
"vectors": data._vector_dependents
},
**data.func_params
)
else:
# these can only be parameters with a single equation for now
for dependent in data._scalar_dependents[0]:
f = data.func
obj._partial_derivative(
dependent, eq_num, f, increment_filter, **data.func_params
)
for dependent, dx in data._vector_dependents[0].items():
f = data.func
obj._partial_derivative_fluid(
dependent, eq_num, f, dx, increment_filter, **data.func_params
)
def _is_variable(var, increment_filter=None):
if var.is_var:
if increment_filter is None or not increment_filter[var.J_col]:
return True
return False
def _is_variable_vecvar(var, dx, increment_filter=None):
if dx in var.is_var:
if increment_filter is None or not increment_filter[var.J_col[dx]]:
return True
return False
def _partial_derivative(var, value, increment_filter, **kwargs):
if _is_variable(var, increment_filter):
if callable(value):
if type(var) != dc_scavar:
var = var._reference_container
return _numeric_deriv(var, value, **kwargs)
else:
return value
else:
return None
def _partial_derivative_vecvar(var, value, dx, increment_filter, **kwargs):
if _is_variable_vecvar(var, dx, increment_filter):
if callable(value):
if type(var) != dc_vecvar:
var = var._reference_container
return _numeric_deriv_vecvar(var, value, dx, **kwargs)
else:
return value
else:
return None
def _numeric_deriv(variable, func, **kwargs):
r"""
Calculate partial derivative of the function func to dx.
Parameters
----------
variable : object
Variable container.
func : function
Function :math:`f` to calculate the partial derivative for.
Returns
-------
deriv : float/list
Partial derivative(s) of the function :math:`f` to variable(s)
:math:`x`.
.. math::
\frac{\partial f}{\partial x} = \frac{f(x + d) + f(x - d)}{2 d}
"""
d = variable.d
tol = max(variable.val_SI * d, d)
variable.val_SI += tol
exp = func(**kwargs)
variable.val_SI -= 2 * tol
exp -= func(**kwargs)
deriv = exp / (2 * tol)
variable.val_SI += tol
return deriv
def _numeric_deriv_vecvar(variable, func, dx, **kwargs):
r"""
Calculate partial derivative of the function func to dx.
Parameters
----------
obj : object
Instance, which provides the equation to calculate the derivative for.
func : function
Function :math:`f` to calculate the partial derivative for.
dx : str
Partial derivative.
conn : tespy.connections.connection.Connection
Connection to calculate the numeric derivative for.
Returns
-------
deriv : float/list
Partial derivative(s) of the function :math:`f` to variable(s)
:math:`x`.
.. math::
\frac{\partial f}{\partial x} = \frac{f(x + d) + f(x - d)}{2 d}
"""
original_vector = variable.val.copy()
# this is specific to fluids right now (upper limit of 1, lower limit of 0)
d1 = min(variable.d, 1 - variable.val[dx])
variable.val[dx] += d1
exp = func(**kwargs)
d2 = min(variable.d * 2, variable.val[dx])
variable.val[dx] -= d2
exp -= func(**kwargs)
variable.val = original_vector
# d2 is the complete delta of the central difference no matter how big
# d1 is actually
deriv = exp / d2
return deriv
[docs]
def bus_char_evaluation(component_value, char_func, reference_value, bus_value, **kwargs):
r"""
Calculate the value of a bus.
Parameters
----------
comp_value : float
Value of the energy transfer at the component.
reference_value : float
Value of the bus in reference state.
char_func : tespy.tools.characteristics.char_line
Characteristic function of the bus.
Returns
-------
residual : float
Residual of the equation.
.. math::
residual = \dot{E}_\mathrm{bus} - \frac{\dot{E}_\mathrm{component}}
{f\left(\frac{\dot{E}_\mathrm{bus}}
{\dot{E}_\mathrm{bus,ref}}\right)}
"""
return bus_value - component_value / char_func.evaluate(
bus_value / reference_value
)
[docs]
def bus_char_derivative(component_value, char_func, reference_value, bus_value, **kwargs):
"""Calculate derivative for bus char evaluation."""
d = 1e-3
return (1 - (
1 / char_func.evaluate((bus_value + d) / reference_value) -
1 / char_func.evaluate((bus_value - d) / reference_value)
) / (2 * d))
[docs]
def newton_with_kwargs(
derivative, target_value, val0=300, valmin=70, valmax=3000, max_iter=10,
tol_rel=ERR, tol_abs=ERR ** 0.5, tol_mode="rel", **function_kwargs
):
iteration = 0
expr = True
x = val0
parameter = function_kwargs["parameter"]
function = function_kwargs["function"]
relax = 1
if tol_mode == "rel" and abs(target_value) <= 2 * tol_rel:
tol_mode = "abs"
while expr:
# calculate function residual and new value
function_kwargs[parameter] = x
residual = target_value - function(**function_kwargs)
x += residual / derivative(**function_kwargs) * relax
# check for value ranges
if x < valmin:
x = valmin
if x > valmax:
x = valmax
iteration += 1
# relaxation to help convergence in case of jumping
if iteration == 5:
relax = 0.75
max_iter = 12
if iteration > max_iter:
msg = (
'The Newton algorithm was not able to find a feasible value '
f'for function {function}. Current value with x={x} is '
f'{function(**function_kwargs)}, target value is '
f'{target_value}, residual is {residual} after {iteration} '
'iterations.'
)
logger.debug(msg)
break
if tol_mode == 'abs':
expr = abs(residual) >= tol_abs
elif tol_mode == 'rel':
expr = abs(residual / target_value) >= tol_rel
return x
[docs]
def central_difference(function=None, parameter=None, delta=None, **kwargs):
delta = abs(max(kwargs[parameter] * delta, delta))
upper = kwargs.copy()
upper[parameter] += delta
lower = kwargs
lower[parameter] -= delta
return (function(**upper) - function(**lower)) / (2 * delta)
[docs]
def get_basic_path():
"""
Return the basic tespy path and creates it if necessary.
The basic path is the '.tespy' folder in the $HOME directory.
"""
basicpath = os.path.join(os.path.expanduser('~'), '.tespy')
if not os.path.isdir(basicpath):
os.mkdir(basicpath)
return basicpath
[docs]
def extend_basic_path(subfolder):
"""
Return a path based on the basic tespy path and creates it if necessary.
The subfolder is the name of the path extension.
"""
extended_path = os.path.join(get_basic_path(), subfolder)
if not os.path.isdir(extended_path):
os.mkdir(extended_path)
return extended_path
def _is_numeric(potentially_a_number):
"""Checks if the value provided is a number by trying to convert it to
float
Parameters
----------
potentially_a_number : any
Value to check
Returns
-------
bool
True if the value is a number
Example
-------
>>> from tespy.tools.helpers import _is_numeric
>>> _is_numeric(5)
True
>>> _is_numeric("var")
False
"""
try:
float(potentially_a_number)
return True
except (TypeError, ValueError):
return False
def _get_vector_dependents(variable_list):
if len(variable_list) == 0:
return []
if isinstance(variable_list, list):
variable_list_new = []
for _eq_set in variable_list:
variable_list_new.append({})
for key, value in _eq_set.items():
if not value:
continue
k = key._reference_container
if k not in variable_list_new[-1]:
variable_list_new[-1][k] = set()
variable_list_new[-1][k] |= value
return variable_list_new
else:
return [
set(var for var in variable_list)
]
def _get_dependents(variable_list):
if isinstance(variable_list[0], list):
return [set(
var._reference_container
for var in sublist if var.is_var
) for sublist in variable_list
]
else:
return [set(
var._reference_container
for var in variable_list if var.is_var
)]
def _nested_dict_of_dataframes_to_dict(dictionary):
"""Transpose a nested dict with dataframes or series in a json style dict
Parameters
----------
dictionary : dict
Dictionary of dataframes
Returns
-------
dict
json style dictionary containing all data from the dataframes
"""
for key, value in dictionary.items():
if isinstance(value, dict):
dictionary[key] = _nested_dict_of_dataframes_to_dict(value)
else:
# Otherwise this is assumed a series, then orient is not available
kwargs = {}
if isinstance(value, pd.DataFrame):
kwargs = {"orient": "index"}
dictionary[key] = value.to_dict(**kwargs)
return dictionary
def _nested_dict_of_dataframes_to_filetree(dictionary, basepath):
"""Dump a nested dict with dataframes into a folder structrue
The upper level keys with subdictionaries are folder names, the lower
level keys (where a dataframe is the value) will be the names of the
csv files.
Parameters
----------
dictionary : dict
Nested dictionary to write to filesystem.
basepath : str
path to dump data to
"""
os.makedirs(basepath, exist_ok=True)
for key, value in dictionary.items():
if isinstance(value, dict):
basepath = os.path.join(basepath, key)
_nested_dict_of_dataframes_to_filetree(value, basepath)
else:
value.to_csv(os.path.join(basepath, f"{key}.csv"))