"""
Opyrability - Process Operability Analysis in Python
=====================================================
A Python package for process operability analysis: forward and inverse
mapping between input and output spaces, operability index evaluation,
nonlinear- and mixed-integer-linear-programming based operability
calculations, and dynamic operability, for steady-state and dynamic
process models.
Copyright (c) 2022-2026 Victor Alves -- Carnegie Mellon University.
Released under the MIT License.
See the acknowledgements on source code and documentation for the project's origins
and current development.
"""
# ---------------------------------------------------------------------------- #
# Acknowledgements
# Opyrability began its development in late 2022 in the Control, Optimization and
# Design for Energy and Sustainability (CODES) Group at West Virginia
# University (WVU), by Victor Alves and San Dinh.
# It is currently under active development and maintained by
# Victor Alves @ Carnegie Mellon University.
# ---------------------------------------------------------------------------- #
# Basic python tools
import sys
import warnings
import string
from itertools import permutations as perms
from typing import Callable, Union
from tqdm.auto import tqdm
# Linear Algebra
import numpy as np
from numpy.linalg import norm
# Optimization algorithms
import scipy as sp
from scipy.optimize import root
from scipy.optimize import differential_evolution as DE
# Pounce (pure-Rust IPOPT, bundled FERAL linear solver) is the default NLP
# solver and a required dependency. cyipopt is optional and imported lazily
# (see _import_cyipopt_minimize) because it ships compiled binaries that
# users typically install via conda.
from pounce import minimize as pounce_minimize
# Polytopic calculations
import polytope as pc
# Plots
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
# Setting default plot options
plt.rcParams['figure.dpi'] = 150
cmap = 'rainbow'
lineweight = 1
edgecolors = 'k'
markersize = 128
DS_COLOR = '#7f7f7f'
INTERSECT_COLOR = '#0000cc'
AS_COLOR = '#1cff1c'
EDGES_COLORS = '#000000'
EDGES_WIDTH = 0.25
def _get_patch(poly, **kwargs):
'''
Build a filled matplotlib patch for a 2D polytope.
Self-contained replacement for ``polytope.polytope._get_patch``. Older
``polytope`` releases (for example 0.2.4, still current on some
conda-forge channels) construct the patch with a positional ``closed``
argument, ``matplotlib.patches.Polygon(V, True, ...)``. Recent matplotlib
makes ``closed`` keyword-only, so that positional call raises a
``TypeError``. Vendoring the helper keeps ``plot=True`` working on any
combination of polytope and matplotlib versions.
Parameters
----------
poly : polytope.Polytope
The 2D polytope to render.
**kwargs
Keyword arguments forwarded to ``matplotlib.patches.Polygon``
(for example ``color``, ``linestyle``, ``alpha``).
Returns
-------
matplotlib.patches.Polygon
The filled polygon patch, drawn with ``zorder=0`` so it sits behind
other artists.
Notes
-----
Vertices are ordered by polar angle about their centroid, which yields a
non-self-intersecting boundary for a convex polytope.
'''
V = pc.extreme(poly)
center = V.mean(axis=0)
angles = np.arctan2(V[:, 1] - center[1], V[:, 0] - center[0])
order = np.argsort(angles)
patch = mpatches.Polygon(V[order, :], closed=True, **kwargs)
patch.set_zorder(0)
return patch
[docs]
def multimodel_rep(model: Callable[...,Union[float,np.ndarray]],
bounds: np.ndarray,
resolution: np.ndarray,
polytopic_trace: str = 'simplices',
perspective: str = 'outputs',
plot: bool = True,
EDS_bound = None,
EDS_resolution = None,
labels: str = None):
"""
Obtain a multimodel representation based on polytopes of Process Operability
sets. This procedure is essential for evaluating the Operability Index (OI).
Author: Victor Alves
Parameters
----------
model : Callable[...,Union[float,np.ndarray]]
Process model that calculates the relationship between inputs (AIS-DIS)
and Outputs (AOS-DOS).
bounds : np.ndarray
Bounds on the Available Input Set (AIS), or Desired Output Set (DOS) if
an inverse mapping is chosen. Each row corresponds to the
lower and upper bound of each AIS or DOS variable.
resolution : np.ndarray
Array containing the resolution of the discretization grid for the AIS or
DOS. Each element corresponds to the resolution of each variable. For a
resolution defined as k, it will generate d^k points (in which d is the
dimensionality of the AIS or DOS).
polytopic_trace: str, Optional.
Determines if the polytopes will be constructed using simplices or
polyhedrons. Default is 'simplices'. Additional option is 'polyhedra'.
perspective: str, Optional.
Defines if the calculation is to be done from the inputs/outputs
perspective. Also affects labels in plots. Default is 'outputs'.
plot: str, Optional.
Defines if the plot of operability sets is desired (If the dimension
is <= 3). Default is True.
EDS_bound : np.ndarray
Lower and upper bounds for the Expected Disturbance Set (EDS). Default
is 'None'.
EDS_resolution : np.ndarray
Resolution for the Expected Disturbance Set (EDS). This will be used to
discretize the EDS, similar to the AIS_resolution
labels: str, Optional.
labels for axes. Accepts TeX math input as it uses matplotlib math
rendering. Should be in order y1, y2, y3, and so on:
labels= ['first label','second label']. Default is None.
Returns
-------
mapped_region : list
List with first argument being a convex polytope or collection of
convex polytopes (named as Region)
that describes the AOS. Can be used to calculate the Operability Index
using ``OI_eval``. Second argument is the coordinates of the AOS as a
numpy array.
References
----------
[1] D. R., Vinson and C. Georgakis. New Measure of Process Output
Controllability. J. Process Control, 2000.
https://doi.org/10.1016/S0959-1524(99)00045-1
[2] C. Georgakis, D. Uztürk, S. Subramanian, and D. R. Vinson. On the
Operability of Continuous Processes. Control Eng. Pract., 2003.
https://doi.org/10.1016/S0967-0661(02)00217-4
[3] V.Gazzaneo, J. C. Carrasco, D. R. Vinson, and F. V. Lima. Process
Operability Algorithms: Past, Present, and Future Developments.
Ind. & Eng. Chem. Res. 59 (6), 2457-2470, 2020.
https://doi.org/10.1021/acs.iecr.9b05181
"""
# TODO: Add implicit mapping option to perform any multimodel rep
#(future release).
# Map AOS from AIS setup.
# AIS, AOS = AIS2AOS_map(model,
# AIS_bound,
# resolution,
# EDS_bound=EDS_bound,
# EDS_resolution=EDS_resolution,
# plot = False)
# If it is a forward map, using the forward mapping function (AIS2AOS_map).
# If not, using the NLP-based approach for inverse mapping. In this case,
# an initial estimate is needed and the user is prompted.
if perspective == 'outputs':
AIS, AOS = AIS2AOS_map(model,
bounds,
resolution,
EDS_bound= EDS_bound,
EDS_resolution=EDS_resolution,
plot= False)
else:
u0_input = input('Enter an initial estimate for your inverse model '
'separated only by commas (,) : ')
input_list = [float(u0_input) for u0_input in u0_input.split(',')]
u0 = np.array(input_list)
AIS, AOS, _ = nlp_based_approach(model, bounds, resolution,
u0,
-np.inf*np.ones(u0.shape),
+np.inf*np.ones(u0.shape),
method='ipopt',
plot=False,
ad=False,
warmstart=True)
# Reshape (n^k, k) vectors into (list[k*n, k]) multidimensional arrays.
# This makes polytopic tracing calculations more "clear".
AIS = AIS.reshape((resolution + [AIS.shape[-1]]))
AOS = AOS.reshape((resolution + [AOS.shape[-1]]))
# Switch in between for simplicial of polyhedra calculations.
if polytopic_trace =='simplices':
AIS_poly, AOS_poly = points2simplices(AIS,AOS)
elif polytopic_trace =='polyhedra':
AIS_poly, AOS_poly = points2polyhedra(AIS,AOS)
else:
raise ValueError('Invalid option for polytopic tracing. Choose '
'"simplices" or "polyhedra".')
# Define empty polyopes list.
Polytopes = list()
Vertices_list = list()
# Create convex hull using the vertices of the AOS.
for i in range(len(AOS_poly)):
Vertices = AOS_poly[i]
Vertices_list.append(Vertices)
Polytopes.append(pc.qhull(Vertices))
# Define the AOS as an (possibly) overlapped region. Here we don't need to
# worry about this here for now, only when evaluating the OI.
mapped_region = pc.Region(Polytopes[0:])
# Perspective switch: This will only affect plot and legends.
if perspective == 'outputs':
AS_label = 'Achievable Output Set (AOS)'
else:
AS_label = 'Available Input Set (AIS)'
# Plots (2D/ 3D), unfortunately humans can't see higher dimensions. :(
if plot is True:
if mapped_region.dim == 2:
polyplot = []
fig = plt.figure()
ax = fig.add_subplot(111)
AS_coords = np.concatenate(Vertices_list, axis=0)
for i in range(len(mapped_region)):
polyplot = _get_patch(mapped_region[i], linestyle="solid",
edgecolor=EDGES_COLORS, linewidth=EDGES_WIDTH,
facecolor=AS_COLOR)
ax.add_patch(polyplot)
lower_xaxis = mapped_region.bounding_box[0][0]
upper_xaxis = mapped_region.bounding_box[1][0]
lower_yaxis = mapped_region.bounding_box[0][1]
upper_yaxis = mapped_region.bounding_box[1][1]
# Use range-based margins to avoid inverted padding on negative coordinates.
x_range = upper_xaxis - lower_xaxis
y_range = upper_yaxis - lower_yaxis
ax.set_xlim(lower_xaxis - 0.05 * x_range,
upper_xaxis + 0.05 * x_range)
ax.set_ylim(lower_yaxis - 0.05 * y_range,
upper_yaxis + 0.05 * y_range)
AS_patch = mpatches.Patch(color=AS_COLOR, label=AS_label)
extra = mpatches.Rectangle((0, 0), 1, 1, fc="w",
fill=False,
edgecolor='none',
linewidth=0)
if perspective == 'outputs':
str_title = 'Achievable Output Set (AOS)'
ax.set_title(str_title)
else:
str_title ='Available Input set (AIS)'
ax.set_title(str_title)
ax.legend(handles=[AS_patch, extra])
if labels is not None:
if len(labels) != 2:
raise ValueError('You need two entries for your custom '+
'labels for your 2D system, but entered ' +
'an incorrect number of labels.')
else:
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
else:
ax.set_xlabel('$y_{1}$')
ax.set_ylabel('$y_{2}$')
plt.show()
elif mapped_region.dim == 3:
polyplot = []
fig = plt.figure()
ax = fig.add_subplot(111, projection="3d")
AS_coords = np.concatenate(Vertices_list, axis=0)
for cube, color in zip([AS_coords], [AS_COLOR]):
hull = sp.spatial.ConvexHull(cube)
# Draw the polygons of the convex hull
for s in hull.simplices:
tri = Poly3DCollection([cube[s]])
tri.set_color(color)
tri.set_alpha(0.5)
tri.set_edgecolor(EDGES_COLORS)
ax.add_collection3d(tri)
# Draw the vertices
ax.scatter(cube[:, 0], cube[:, 1], cube[:, 2], marker='o',
color='None')
plt.show()
AS_patch = mpatches.Patch(color=AS_COLOR, label=AS_label)
extra = mpatches.Rectangle((0, 0), 1, 1, fc="w",
fill=False,
edgecolor='none',
linewidth=0)
if perspective == 'outputs':
str_title = 'Achievable Output Set (AOS)'
ax.set_title(str_title)
else:
str_title = 'Available Input Set (AIS)'
ax.set_title(str_title)
ax.legend(handles=[AS_patch, extra])
if labels is not None:
if len(labels) != 3:
raise ValueError('You need three entries for your custom '+
'labels for your 3D system, but entered ' +
'an incorrect number of labels.')
else:
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
ax.set_zlabel(labels[2])
else:
ax.set_xlabel('$y_{1}$')
ax.set_ylabel('$y_{2}$')
ax.set_zlabel('$y_{3}$')
plt.show()
else:
print(f'Plotting is only supported for 2D and 3D. '
f'Your data has dimension {mapped_region.dim}. '
f'The operability set is still returned.')
AS_coords = np.concatenate(Vertices_list, axis=0)
else:
print('plot=False selected. The operability set is still returned '
'as a polytopic region of general dimension.')
AS_coords = np.concatenate(Vertices_list, axis=0)
# Small hack: Inject AS coordinates into return to be able to
# plot 3D region effortlessly.
mapped_region = [mapped_region, AS_coords]
return mapped_region
[docs]
def OI_eval(AS: pc.Region,
DS: np.ndarray,
perspective='outputs',
hypervol_calc: str = 'robust',
plot: bool = True,
labels: str = None):
'''
Operability Index (OI) calculation. From a Desired Output
Set (DOS) defined by the user, this function calculates the intersection
between achievable (AOS) and desired output operation (DOS). Similarly, the
OI can be also calculated from the inputs' perspective, as an intersection
between desired input (DIS) and available input (AIS). This function is able
to evaluate the OI in any dimension, ranging from 1-d (length) up to
higher dimensions (Hypervolumes, > 3-d), aided by the Polytope package.
Author: Victor Alves
Parameters
----------
AS : pc.Region
Available input set (AIS) or Achievable output set (AOS) represented as
a series of paired (and convex) polytopes. This region is easily
obtained using the multimodel_rep function.
DS: np.ndarray
Array containing the desired operation, either in the inputs (DIS) or
outputs perspective (DOS).
perspective: str, Optional.
String that determines in which perspective the OI will be evaluated:
inputs or outputs. default is 'outputs'.
hypervol_calc: str, Optional.
Determines how the approach when evaluating hypervolumes. Default is
'robust', an implementation that switches from qhull's hypervolume
evaluation or polytope's evaluation depending on the dimensionality of
the problem. Additional option is 'polytope', Polytope's package own
implementation of hypervolumes evaluation being used in problems of
any dimension.
plot: bool, Optional.
Defines if the plot of operability sets is desired. Default is True.
labels: str, Optional.
labels for axes. Accepts TeX math input as it uses matplotlib math
rendering. Should be in order y1, y2, y3, and so on:
labels= ['first label','second label']. Default is None.
Returns
-------
OI: float
Operability Index value. Ranges from 0 to 100%. A fully operable
process has OI = 100% and if not fully operable, OI < 100%.
References
----------
[1] D. R., Vinson and C. Georgakis. New Measure of Process Output
Controllability. J. Process Control, 2000.
https://doi.org/10.1016/S0959-1524(99)00045-1
[2] C. Georgakis, D. Uztürk, S. Subramanian, and D. R. Vinson. On the
Operability of Continuous Processes. Control Eng. Pract., 2003.
https://doi.org/10.1016/S0967-0661(02)00217-4
[3] V.Gazzaneo, J. C. Carrasco, D. R. Vinson, and F. V. Lima. Process
Operability Algorithms: Past, Present, and Future Developments.
Ind. & Eng. Chem. Res. 59 (6), 2457-2470, 2020.
https://doi.org/10.1021/acs.iecr.9b05181
'''
# Defining Polytopes for manipulation. Obtaining polytopes in min-rep if
# applicable.
DS_region = pc.box2poly(DS)
AS_region = pc.reduce(AS[0])
DS_region = pc.reduce(DS_region)
inter_list = list()
# Obtain (possibly overlapped) intersection region of polytopes and
# its respective bounding box.
for i in range(len(AS_region)):
intersection = pc.intersect(AS_region[i], DS_region)
if intersection.fulldim is False or intersection.dim != DS_region.dim:
pass
else:
inter_list.append(intersection)
if len(inter_list) == 0:
print('No intersection found between AS and DS. OI = 0.')
return 0.0
overlapped_intersection = pc.Region(inter_list)
min_coord = overlapped_intersection.bounding_box[0]
max_coord = overlapped_intersection.bounding_box[1]
box_coord = np.hstack([min_coord, max_coord])
bound_box = pc.box2poly(box_coord)
# Remove overlapping (Vinson&Gazzaneo trick) - Remove one polytope at a time,
# create void polytope using the bounding box and subtract from the original
# bounding box itself. This avoids wrong calculations of the OI.
intersection= process_overlapping_polytopes(bound_box,
overlapped_intersection)
v_intersect_list = list()
final_intersection = list()
# Methods for OI evaluation.
if hypervol_calc == 'polytope':
OI = (intersection.volume/DS_region.volume)*100
elif hypervol_calc == 'robust':
if DS_region.dim < 7:
intersect_i = []
volumes_i = []
for i in range(len(intersection)):
intersect_i = intersection[i]
v_intersect = pc.extreme(intersect_i)
if v_intersect is None:
continue
else:
processed_intersection = pc.qhull(v_intersect)
final_intersection.append(processed_intersection)
v_intersect_list.append(v_intersect)
# ConvexHull requires >= 2D; for 1D use segment length.
if DS_region.dim == 1:
volumes_i.append(v_intersect.max() - v_intersect.min())
else:
volumes_i.append(sp.spatial.ConvexHull(v_intersect).volume)
each_polytope_volume = np.array(volumes_i)
intersection_volume = each_polytope_volume[0:].sum()
final_intersection = pc.Region(final_intersection)
v_DS = pc.extreme(DS_region)
# Evaluate OI
# ConvexHull requires >= 2D; for 1D use segment length.
if DS_region.dim == 1:
DS_volume = v_DS.max() - v_DS.min()
else:
DS_volume = sp.spatial.ConvexHull(v_DS).volume
OI = (intersection_volume / DS_volume) * 100
else:
print("For higher dimensions (>7) polytope's hypervolume estimation \
is faster. Switching to polytope's calculation.")
OI = (intersection.volume/DS_region.volume)*100
else:
raise ValueError('Invalid hypervolume calculation option. Choose '
'"robust" or "polytope".')
# Perspective switch: This will only affect plot and legends.
if perspective == 'outputs':
DS_label = 'Desired Output Set (DOS)'
AS_label = 'Achievable Output Set (AOS)'
int_label = r'$ AOS \cap DOS$'
else:
DS_label = 'Desired Input Set (DIS)'
AS_label = 'Available Input Set (AIS)'
int_label = r'$ AIS \cap DIS$'
# Plot if 2D / 3D
if plot is True:
if DS_region.dim == 2:
polyplot = []
fig = plt.figure()
ax = fig.add_subplot(111)
for i in range(len(AS_region)):
polyplot = _get_patch(AS_region[i], linestyle="solid",
edgecolor=EDGES_COLORS,
linewidth=EDGES_WIDTH,
facecolor=AS_COLOR)
ax.add_patch(polyplot)
for item in final_intersection:
if item is None:
continue # Skip None values
else:
interplot = _get_patch(item, linestyle="solid",
linewidth=EDGES_WIDTH,
facecolor=INTERSECT_COLOR,
edgecolor=INTERSECT_COLOR)
ax.add_patch(interplot)
DSplot = _get_patch(DS_region, linestyle="dashed",
edgecolor=DS_COLOR, alpha=0.5,
linewidth=EDGES_WIDTH,
facecolor=DS_COLOR)
ax.add_patch(DSplot)
ax.legend(['DOS']) # Pass label as a list to avoid matplotlib treating the string as individual characters.
lower_xaxis = min(AS_region.bounding_box[0][0],
DS_region.bounding_box[0][0])
upper_xaxis = max(AS_region.bounding_box[1][0],
DS_region.bounding_box[1][0])
lower_yaxis = min(AS_region.bounding_box[0][1],
DS_region.bounding_box[0][1])
upper_yaxis = max(AS_region.bounding_box[1][1],
DS_region.bounding_box[1][1])
# Use range-based margins to avoid inverted padding on negative coordinates.
x_range = upper_xaxis - lower_xaxis
y_range = upper_yaxis - lower_yaxis
ax.set_xlim(lower_xaxis - 0.05 * x_range,
upper_xaxis + 0.05 * x_range)
ax.set_ylim(lower_yaxis - 0.05 * y_range,
upper_yaxis + 0.05 * y_range)
DS_patch = mpatches.Patch(color=DS_COLOR, label=DS_label)
AS_patch = mpatches.Patch(color=AS_COLOR, label=AS_label)
INTERSECT_patch = mpatches.Patch(color=INTERSECT_COLOR,
label=int_label)
OI_str = 'Operability Index = ' + str(round(OI, 2)) + str('%')
extra = mpatches.Rectangle((0, 0), 1, 1, fc="w",
fill=False,
edgecolor='none',
linewidth=0, label=OI_str)
if perspective == 'outputs':
str_title = string.capwords(
"Operability Index Evaluation - Outputs' perspective")
ax.set_title(str_title)
else:
str_title = string.capwords(
"Operability Index Evaluation - Inputs' perspective")
ax.set_title(str_title)
ax.legend(handles=[DS_patch, AS_patch, INTERSECT_patch, extra])
if labels is not None:
if len(labels) != 2:
raise ValueError('You need two entries for your custom '+
'labels for your 2D system, but entered ' +
'an incorrect number of labels.')
else:
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
else:
ax.set_xlabel('$y_{1}$')
ax.set_ylabel('$y_{2}$')
plt.show()
elif DS_region.dim == 3:
AS_coords = AS[1]
DS_coords = get_extreme_vertices(DS)
polyplot = []
fig = plt.figure()
ax = fig.add_subplot(111, projection="3d")
intersect_coords = np.concatenate(v_intersect_list, axis=0)
for cube, color in zip([AS_coords, DS_coords, intersect_coords],
[AS_COLOR, DS_COLOR, INTERSECT_COLOR]):
hull = sp.spatial.ConvexHull(cube)
# Draw the polygons of the convex hull
for s in hull.simplices:
tri = Poly3DCollection([cube[s]])
tri.set_color(color)
tri.set_edgecolor(EDGES_COLORS)
tri.set_alpha(0.5)
ax.add_collection3d(tri)
# Draw the vertices
ax.scatter(cube[:, 0], cube[:, 1], cube[:, 2],
marker='o', color='None')
plt.show()
AS_patch = mpatches.Patch(color=AS_COLOR, label=AS_label)
extra = mpatches.Rectangle((0, 0), 1, 1, fc="w",
fill=False,
edgecolor='none',
linewidth=0)
if perspective == 'outputs':
str_title = 'Achievable Output Set (AOS)'
ax.set_title(str_title)
else:
str_title = 'Available Input set (AIS)'
ax.set_title(str_title)
ax.legend(handles=[AS_patch, extra])
OI_str = 'Operability Index = ' + str(round(OI, 2)) + str('%')
extra = mpatches.Rectangle((0, 0), 1, 1, fc="w",
fill=False,
edgecolor='none',
linewidth=0, label=OI_str)
if perspective == 'outputs':
str_title = string.capwords(
"Operability Index Evaluation - Outputs' perspective")
ax.set_title(str_title)
else:
str_title = string.capwords(
"Operability Index Evaluation - Inputs' perspective")
ax.set_title(str_title)
DS_patch = mpatches.Patch(color=DS_COLOR, label=DS_label)
AS_patch = mpatches.Patch(color=AS_COLOR, label=AS_label)
INTERSECT_patch = mpatches.Patch(color=INTERSECT_COLOR,
label=int_label)
ax.legend(handles=[DS_patch, AS_patch, INTERSECT_patch, extra])
if labels is not None:
if len(labels) != 3:
raise ValueError('You need three entries for your custom '+
'labels for your 3D system, but entered ' +
'an incorrect number of labels.')
else:
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
ax.set_zlabel(labels[2])
else:
ax.set_xlabel('$y_{1}$')
ax.set_ylabel('$y_{2}$')
ax.set_zlabel('$y_{3}$')
plt.show()
elif DS_region.dim > 3:
print(f'Plotting is only supported for 2D and 3D. '
f'Your data has dimension {DS_region.dim}. '
f'The OI value is still available for interpretation.')
return OI
[docs]
def rank_designs(models,
AIS_bound,
DOS_bound,
resolution,
perspective='outputs',
polytopic_trace='simplices',
u0=None,
lb=None,
ub=None,
constr=None,
method='pounce',
ad=False,
hypervol_calc='robust',
plot=True):
'''
Rank two or more steady-state process designs by their Operability Index.
Each design (a process model) is scored by its Operability Index (OI)
against the same Desired Output Set (DOS), and the designs are returned
sorted from most to least operable. The comparison can be made from two
perspectives:
- ``'outputs'`` (default): each model is forward-mapped from its Available
Input Set (AIS) to its Achievable Output Set (AOS) with
``multimodel_rep``, and the OI is the output-space index
``mu(AOS ∩ DOS) / mu(DOS)``.
- ``'inputs'``: the DOS is inverse-mapped to the feasible desired input set
(DIS*) with ``nlp_based_approach``, and the OI is the input-space index
``mu(DIS* ∩ AIS) / mu(AIS)``, that is, the share of the available input
envelope that achieves the desired outputs.
Both indices are computed with the existing ``OI_eval`` routine, so the
ranking inherits its geometry. Steady-state models only.
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
models : dict or list of Callable
The designs to rank. A dict ``{label: model}`` names each design; a
plain list is accepted and auto-labeled ``'Design 1'``,
``'Design 2'``, and so on. Every model maps an input vector ``u`` to an
output vector ``y`` whose dimension matches ``DOS_bound``.
AIS_bound : np.ndarray or dict
Available Input Set bounds, shape ``(n_u, 2)``. A single array is
shared by all designs; a dict ``{label: bounds}`` gives each design its
own AIS, so models with different input spaces can be compared against
the same ``DOS_bound``.
DOS_bound : np.ndarray
Desired Output Set bounds, shape ``(n_y, 2)``, shared by all designs.
resolution : int, array-like or dict
Discretization resolution passed to the mapping routine. A single value
is shared by all designs; a dict ``{label: resolution}`` sets it per
design.
perspective : {'outputs', 'inputs'}, Optional.
Space in which the OI is evaluated. Default is ``'outputs'``.
polytopic_trace : {'simplices', 'polyhedra'}, Optional.
Polytopic tracing passed to ``multimodel_rep`` for the output
perspective. Default is ``'simplices'``.
u0 : np.ndarray, Optional.
Initial estimate for the inverse mapping (input perspective). Default
is the midpoint of each design's AIS.
lb : np.ndarray, Optional.
Lower bounds for the inverse mapping (input perspective). Default is the
lower column of each design's AIS, so DIS* lies inside the AIS.
ub : np.ndarray, Optional.
Upper bounds for the inverse mapping (input perspective). Default is the
upper column of each design's AIS.
constr : dict, Optional.
Nonlinear constraint forwarded to ``nlp_based_approach`` (input
perspective). Default is None.
method : str, Optional.
NLP solver for the inverse mapping (input perspective). Default is
``'pounce'``.
ad : bool, Optional.
Whether to use automatic differentiation in the inverse mapping (input
perspective). Default is False.
hypervol_calc : {'robust', 'polytope'}, Optional.
Hypervolume method forwarded to ``OI_eval``. Default is ``'robust'``.
plot : bool, Optional.
Whether to draw a ranked bar chart of the OI of each design. Default is
True.
Returns
-------
ranking : list of dict
One entry per design, sorted by OI from highest to lowest (the list
order is the ranking). Each entry has keys ``'label'`` (the design
name), ``'OI'`` (the Operability Index in percent) and ``'region'``
(the ``pc.Region`` scored: the AOS for the output perspective, the
DIS* for the input perspective).
Notes
-----
For the input perspective, the feasible desired input set DIS* is taken as
the convex hull of the inverse-mapping solution points (``pc.qhull`` over
the ``nlp_based_approach`` result), so a strongly non-convex DIS* is
over-approximated by its hull. Because ``nlp_based_approach`` bounds the
inverse map by the AIS, DIS* lies inside the AIS and the input-space index
reduces to ``mu(DIS*) / mu(AIS)``.
References
----------
[1] D. R. Vinson and C. Georgakis. New Measure of Process Output
Controllability. J. Process Control, 2000.
https://doi.org/10.1016/S0959-1524(99)00045-1
'''
# Normalize the models container into an ordered {label: model} dict.
if isinstance(models, dict):
model_map = dict(models)
else:
model_map = {f'Design {i + 1}': m for i, m in enumerate(models)}
if len(model_map) < 2:
warnings.warn('rank_designs compares two or more designs; only '
f'{len(model_map)} was given.')
# Per-design AIS and resolution: a dict selects per design; anything else
# is shared by all designs.
def _per_design(value, key):
return value[key] if isinstance(value, dict) else value
ranking = []
for key, model in model_map.items():
AIS_i = np.asarray(_per_design(AIS_bound, key), dtype=float)
res_i = _per_design(resolution, key)
if perspective == 'outputs':
# Forward map AIS -> AOS, then score the AOS against the DOS.
region = multimodel_rep(model,
AIS_i,
res_i,
polytopic_trace=polytopic_trace,
perspective='outputs',
plot=False)
OI = OI_eval(region,
DOS_bound,
perspective='outputs',
hypervol_calc=hypervol_calc,
plot=False)
scored_region = region[0]
elif perspective == 'inputs':
# Inverse map the DOS to DIS*, then score DIS* against the AIS.
lb_i = AIS_i[:, 0] if lb is None else np.asarray(lb, dtype=float)
ub_i = AIS_i[:, 1] if ub is None else np.asarray(ub, dtype=float)
u0_i = (AIS_i.mean(axis=1) if u0 is None
else np.asarray(u0, dtype=float))
fDIS, _, _ = nlp_based_approach(model,
DOS_bound,
res_i,
u0_i,
lb_i,
ub_i,
constr=constr,
method=method,
ad=ad,
warmstart=True,
plot=False)
fDIS = np.asarray(fDIS, dtype=float)
dis_region = [pc.Region([pc.qhull(fDIS)]), fDIS]
OI = OI_eval(dis_region,
AIS_i,
perspective='inputs',
hypervol_calc=hypervol_calc,
plot=False)
scored_region = dis_region[0]
else:
raise ValueError("perspective must be 'outputs' or 'inputs'.")
ranking.append({'label': key, 'OI': float(OI),
'region': scored_region})
# Sort from most to least operable.
ranking.sort(key=lambda entry: entry['OI'], reverse=True)
# Ranked table.
space = 'output' if perspective == 'outputs' else 'input'
print(f'Design ranking by {space}-space Operability Index:')
for position, entry in enumerate(ranking, start=1):
print(f' {position}. {entry["label"]:<22s} '
f'OI = {entry["OI"]:6.2f} %')
# Ranked bar chart, colored with opyrability's default colormap.
if plot and ranking:
names = [entry['label'] for entry in ranking]
ois = [entry['OI'] for entry in ranking]
colors = plt.get_cmap(cmap)(np.linspace(0, 1, len(ois)))
fig, ax = plt.subplots()
ax.bar(names, ois, color=colors, edgecolor='black')
ax.set_ylabel(f'{space.capitalize()}-space OI [%]')
ax.set_title('Design ranking by Operability Index')
ax.set_ylim(0, max(100.0, max(ois) * 1.1))
for position, value in enumerate(ois):
ax.text(position, value, f'{value:.1f}',
ha='center', va='bottom', fontsize=9)
plt.tight_layout()
plt.show()
return ranking
[docs]
def nlp_based_approach(model: Callable[..., Union[float, np.ndarray]],
DOS_bounds: np.ndarray,
DOS_resolution: np.ndarray,
u0: np.ndarray,
lb: np.ndarray,
ub: np.ndarray,
constr = None,
method: str = 'pounce',
plot: bool = True,
ad: bool = False,
warmstart: bool = True,
labels: str = None,
pyomo_solver: str = 'ipopt',
print_level: int = 5,
problem: str = 'P1',
PI_target: Callable[..., Union[float, np.ndarray]] = None,
PI_bounds: np.ndarray = None) -> Union[np.ndarray, np.ndarray, list]:
'''
Inverse mapping for Process Operability calculations. From a Desired Output
Set (DOS) defined by the user, this function calculates the closest
Feasible Desired Ouput set (DOS*) from the AOS and its respective Feasible
Desired Input Set (DIS*), which gives insight about potential changes in
design and/or operations of a given process model.
Author: Victor Alves
Parameters
----------
model : Callable[...,Union[float,np.ndarray]]
Process model that calculates the relationship from inputs (AIS-DIS)
to outputs (AOS-DOS).
DOS_bounds : np.ndarray
Array containing the bounds of the Desired Output set (DOS). Each row
corresponds to the lower and upper bound of each variable.
DOS_resolution: np.ndarray
Array containing the resolution of the discretization grid for the DOS.
Each element corresponds to the resolution of each variable. For a
resolution defined as k, it will generate d^k points (in which d is the
dimensionality of the problem).
u0 : np.ndarray
Initial estimate for the inverse mapping at each point. Should have
same dimension as model inputs.
lb : np.ndarray
Lower bound on inputs.
ub : np.ndarray
Upper bound on inputs.
constr : list, optional
List of functions containing constraints to the inverse mapping.
The default is None. follows scipy.optimize synthax.
method: str
Optimization method used. The default is 'pounce' (Pounce, a
pure-Rust reimplementation of the Ipopt interior-point solver,
installed as a core dependency). Set 'ipopt' to use cyipopt's
IPOPT instead (an optional dependency, typically installed via
conda). Options are:
For unconstrained problems:
-'trust-constr'
-'Nelder-Mead'
-'ipopt'
-'pounce'
-'DE'
For constrained problems:
-'ipopt'
-'pounce'
-'DE'
-'SLSQP'
plot: bool
Turn on/off plot. If dimension is d<=3, plot is available and
both the Feasible Desired Output Set (DOS*) and Feasible Desired Input
Set (DIS*) are plotted. Default is True.
ad: bool
Turn on/off use of Automatic Differentiation using JAX. If Jax is
installed, high-order data (jacobians, hessians) are obtained using AD.
Default is False.
warmstart: bool
Turn on/off warm-start of NLP. If 'on', the sucessful solution of the
current iteration is used as an estimate to the next one. Default is
True.
labels: str, Optional.
labels for axes. Accepts TeX math input as it uses matplotlib math
rendering. Should be in order u1,u2,u3, y1, y2, y3, and so on:
labels= ['first u label','second u label', 'first y label', 'second y label'].
Default is False.
pyomo_solver: str, Optional.
NLP solver used when the model is a Pyomo/OMLT builder (see below).
Either 'ipopt' or 'pounce'. Ignored for plain callable models.
Default is 'ipopt'.
print_level: int, Optional.
Solver output verbosity for the Pyomo/OMLT path (Ipopt convention,
0 = silent to 12 = most verbose). Ignored for plain callable
models. Default is 5 (Ipopt's default).
problem: str, Optional.
Which operability optimization problem to solve. Options are:
-'P1' (default): the classic inverse mapping. For each DOS
grid point, minimize the relative error
sum(((y - y*)/y)**2) to obtain the DIS*/DOS* pair. This is
the original behavior of this function, unchanged.
-'P2': process intensification. Solves
P1 over the DOS grid, filters the DOS* by the level of
performance given in PI_bounds (the DOS_PI subset), and
selects the design among the DIS_PI points that minimizes
PI_target: Omega = min PI_target subject to u in DIS*,
y in DOS*. Since P2 is a postprocessing step on the P1
output, the returned (fDIS, fDOS) can be ranked by any other
PI_target directly, without re-solving P1.
-'P3': the bilevel operability framework. The nested program
Psi = min_{u in U} [PI_target]
s.t. u*_k in argmin {sum_j ((y_jk - y*_jk)/y_jk)**2 :
u*_k in U_k, y_k in DOS}, c_1(u*_k) <= 0
with the inner argmin being problem P1 over the k = 1...p
DOS grid points and c_1 the process constraints (constr).
It is solved through its validated sequential equivalence:
the outcome is the same as solving P1 and P2 in series over
the full DOS grid (Carrasco dissertation, pp. 56 and 60).
Default is 'P1'.
PI_target: Callable, Optional.
Process intensification target Omega(u) for problems P2/P3, a
function of the input vector returning the metric to minimize
(e.g. reactor volume, membrane area, cost). Required for
P2/P3. Default is None.
PI_bounds: np.ndarray, Optional.
Level-of-performance box (the DOS_PI subset of the DOS), shape
(n_y, 2). Only DOS* points inside this box are eligible for the
intensified design selection. Default is None (the full
DOS_bounds are used).
Notes
-----
Pyomo/OMLT models (equation-oriented path): instead of a plain Python
callable, the model may be a Pyomo builder function with the signature
model(m, u_vars, y_vars) that adds the process constraints linking the
input variables u_vars to the output variables y_vars on the Pyomo
ConcreteModel m, flagged by setting an attribute on the function:
model.build_pyomo_constraints = True (or is_omlt for OMLT objects).
The inverse mapping is then solved algebraically with exact first and
second derivatives from the Pyomo expression graph. The 'ad' and
'constr' arguments are ignored in this path (define constraints in
the builder). Pyomo/OMLT support contributed by Heitor F.
(github @hfsf), PR #33, adapted.
Returns
-------
fDIS: np.ndarray
Feasible Desired Input Set (DIS*). Array containing the solution for
each point of the inverse-mapping.
fDOS: np.ndarray
Feasible Desired Output Set (DOS*). Array containing the feasible
output for each feasible input calculated via inverse-mapping.
message_list: list
List containing the termination criteria for each optimization run
performed for each DOS grid point.
pi_report: dict
Returned ONLY for problem='P2' or 'P3' (the classic P1 return is
unchanged). Contains the intensified design 'u_PI', its outputs
'y_PI', the metric value 'PI_value', the level-of-performance
subsets 'DIS_PI'/'DOS_PI' (eqs. 5.1-5.2 of Carrasco's
dissertation), the metric evaluated over the subset 'PI_grid',
the 'problem' string and a 'success' flag.
References
----------
[1] J. C. Carrasco and F. V. Lima, “An optimization-based operability
framework for process design and intensification of modular natural
gas utilization systems,” Comput. & Chem. Eng, 2017.
https://doi.org/10.1016/j.compchemeng.2016.12.010
'''
from scipy.optimize import NonlinearConstraint
# Pyomo/OMLT equation-oriented path detection (PR #33, adapted).
is_pyomo = _is_pyomo_model(model)
if is_pyomo:
warnings.warn("Pyomo/OMLT model detected. The inverse mapping "
"will be solved algebraically through Pyomo.",
UserWarning)
if ad is True:
warnings.warn("The 'ad=True' argument is ignored for "
"Pyomo/OMLT models, since exact derivatives "
"are obtained natively from the algebraic "
"model.", UserWarning)
ad = False
if constr is not None:
warnings.warn("The 'constr' argument is ignored for "
"Pyomo/OMLT models. Define the constraints "
"inside the model builder function instead.",
UserWarning)
constr = None
# P1/P2/P3 problem selection validation (Carrasco and Lima).
if problem not in ('P1', 'P2', 'P3'):
raise ValueError("problem must be 'P1', 'P2' or 'P3', got "
f"'{problem}'.")
if problem in ('P2', 'P3'):
if PI_target is None:
raise ValueError(
f"problem='{problem}' requires the PI_target argument: "
"a callable Omega(u) returning the process "
"intensification metric to minimize (e.g. reactor "
"volume, membrane area, cost).")
if is_pyomo:
raise NotImplementedError(
"problem='P2'/'P3' with Pyomo/OMLT models is not "
"supported yet. Use a plain callable model.")
# cyipopt is optional; import its IPOPT interface only when selected.
# Pounce (the default) is imported at module level.
if method == 'ipopt':
minimize_ipopt = _import_cyipopt_minimize()
# Use JAX.numpy if differentiable programming is available.
if ad is False:
import numpy as np
def p1(u: np.ndarray,
model: Callable[..., Union[float, np.ndarray]],
DOSpt: np.ndarray):
y_found = model(u)
vector_of_f = np.array([y_found.T, DOSpt.T])
f = np.sum(error(*vector_of_f))
return f
# Error minimization function
def error(y_achieved, y_desired):
return ((y_achieved-y_desired)/y_desired)**2
else:
print(" You have selected automatic differentiation as a method for"
" obtaining higher-order data (Jacobians/Hessian),")
print(" Make sure your process model is JAX-compatible implementation-wise.")
from jax import config
config.update("jax_enable_x64", True)
config.update('jax_platform_name', 'cpu')
warnings.filterwarnings('ignore', module='jax._src.lib.xla_bridge')
import jax.numpy as np
from jax import jacrev, grad
# Make sure that the arrays contain floats (Reviewer #2 bug @ JOSS)
u0 = u0.astype(np.float64)
lb = lb.astype(np.float64)
ub = ub.astype(np.float64)
DOS_bounds = DOS_bounds.astype(np.float64)
def p1(u: np.ndarray,
model: Callable[..., Union[float, np.ndarray]],
DOSpt: np.ndarray):
y_found = model(u)
vector_of_f = np.array([y_found.T, DOSpt.T])
f = np.sum(error(*vector_of_f))
return f
# Error minimization function
def error(y_achieved, y_desired):
return ((y_achieved-y_desired)/y_desired)**2
# Take the gradient of the objective via AD. We deliberately do NOT
# pass an exact AD Hessian: for the small problems typical here,
# Ipopt's limited-memory (L-BFGS) approximation, used by both
# Pounce and cyipopt (>=1.7.0) when no Hessian is supplied, is
# faster than evaluating a dense AD Hessian every iteration. Both
# solvers accept an exact Hessian if one is ever needed.
grad_ad = grad(p1)
if constr is not None:
constr['jac'] = (jacrev(constr['fun']))
else:
pass
if not isinstance(DOS_bounds, np.ndarray):
DOS_bounds = np.array(DOS_bounds)
else:
pass
dimDOS = DOS_bounds.shape[0]
DOSPts = create_grid(DOS_bounds, DOS_resolution)
DOSPts = DOSPts.reshape(-1, dimDOS)
u00 = u0
# Initialization of variables
m = len(u0)
r, c = np.shape(DOSPts)
fDOS = np.zeros((r, c))
fDIS = np.zeros((r, m))
message_list = []
bounds = np.column_stack((lb, ub))
# Input sanitation
if u0.size < c:
warnings.warn("Your problem is non-square and you have "
"less degrees of freedom in the AIS/DIS "
"Than variables in the AIS.")
if bounds.shape[0] != u0.size:
raise ValueError("Initial estimate and given bounds have"
" inconsistent sizes."
" Check the dimensions"
" of your problem.")
# If unbounded, set as +-inf.
if lb.size == 0:
lb = -np.inf
if ub.size == 0:
ub = np.inf
if is_pyomo:
# Equation-oriented inverse mapping: per DOS grid point, build a
# Pyomo ConcreteModel with bounded inputs and free outputs, let the
# user's builder add the process constraints, and minimize the
# squared distance to the desired output point with exact
# derivatives from the algebraic model (PR #33, adapted).
pyo, pyomo_nlp_solver = _pyomo_solver_factory(pyomo_solver,
print_level)
current_u0 = np.asarray(u0, dtype=float)
for i in tqdm(range(r)):
m_pyo = pyo.ConcreteModel()
u_start = current_u0
def _u_bounds(mm, j):
return (float(lb[j]), float(ub[j]))
def _u_init(mm, j):
return float(u_start[j])
m_pyo.u = pyo.Var(range(m), bounds=_u_bounds,
initialize=_u_init)
m_pyo.y = pyo.Var(range(dimDOS))
# The user's builder adds the process model constraints.
model(m_pyo, m_pyo.u, m_pyo.y)
DOS_pt = DOSPts[i, :]
m_pyo.obj = pyo.Objective(
expr=sum((m_pyo.y[j] - float(DOS_pt[j])) ** 2
for j in range(dimDOS)))
results_pyo = pyomo_nlp_solver.solve(m_pyo, tee=False)
term = results_pyo.solver.termination_condition
message_list.append(str(term))
fDIS[i, :] = [pyo.value(m_pyo.u[j]) for j in m_pyo.u]
fDOS[i, :] = [pyo.value(m_pyo.y[j]) for j in m_pyo.y]
# Warm-start: reuse the previous solution as the next initial
# estimate, rebooting to the first estimate on failure.
if warmstart and (term == pyo.TerminationCondition.optimal):
current_u0 = np.asarray(fDIS[i, :], dtype=float)
else:
current_u0 = np.asarray(u00, dtype=float)
else:
# Inverse-mapping: Run for each DOS grid point
for i in tqdm(range(r)):
if constr is None:
if ad is True:
if method == 'trust-constr':
sol = sp.optimize.minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
method=method,
options={'xtol': 1e-10},
jac=grad_ad)
elif method == 'Nelder-Mead':
sol = sp.optimize.minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
method=method,
options={'fatol': 1e-10,
'xatol': 1e-10},
jac=grad_ad)
elif method == 'ipopt':
sol = minimize_ipopt(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
jac=grad_ad)
elif method == 'pounce':
# Pounce (>=0.5.0) mirrors the scipy/cyipopt
# interface, including args=. The AD gradient is
# supplied; the Hessian is left to Ipopt's
# limited-memory approximation (faster here).
sol = pounce_minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
jac=grad_ad)
elif method == 'DE':
sol = DE(p1, bounds=bounds, x0=u0, strategy='best1bin',
maxiter=2000, workers=-1, updating='deferred',
init='sobol', args=(model, DOSPts[i, :]))
else:
if method == 'trust-constr':
sol = sp.optimize.minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
method=method,
options={'xtol': 1e-10})
elif method == 'Nelder-Mead':
sol = sp.optimize.minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
method=method,
options={'fatol': 1e-10,
'xatol': 1e-10})
elif method == 'ipopt':
sol = minimize_ipopt(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]))
elif method == 'pounce':
# Without AD, Pounce falls back to central finite
# differences internally (numerical derivatives).
sol = pounce_minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]))
elif method == 'DE':
sol = DE(p1, bounds=bounds, x0=u0, strategy='best1bin',
maxiter=2000, workers=-1, updating='deferred',
init='sobol', args=(model, DOSPts[i, :]))
else:
if method == 'ipopt':
if ad==True:
sol = minimize_ipopt(p1, x0=u0, bounds=bounds,
constraints=(constr),
jac=grad_ad,
args=(model, DOSPts[i, :]))
else:
sol = minimize_ipopt(p1, x0=u0, bounds=bounds,
constraints=(constr),
args=(model, DOSPts[i, :]))
elif method == 'pounce':
# Constrained problems: Pounce consumes the same
# constraint dict format as cyipopt; constraint
# Jacobians from AD are honored when ad=True, and the
# Hessian is approximated internally (limited-memory)
# since nonlinear constraints are present.
if ad is True:
sol = pounce_minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
constraints=constr,
jac=grad_ad)
else:
sol = pounce_minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
constraints=constr)
elif method == 'DE':
sol = DE(p1, bounds=bounds, x0=u0, strategy='best1bin',
maxiter=2000, workers=-1, updating='deferred',
init='sobol', constraints=(constr),
args=(model, DOSPts[i, :]))
elif method == 'trust-constr':
if ad is True:
con_fun = constr['fun']
nlc = NonlinearConstraint((con_fun), -np.inf, 0,
jac= (jacrev(con_fun)),
hess=(jacrev(jacrev(con_fun))))
sol = sp.optimize.minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
method=method,
constraints=(nlc),
jac=grad_ad)
else:
# Build NonlinearConstraint in both branches; without AD,
# scipy falls back to finite-difference Jacobians/Hessians.
con_fun = constr['fun']
nlc = NonlinearConstraint((con_fun), -np.inf, 0)
sol = sp.optimize.minimize(p1, x0=u0, bounds=bounds,
args=(model, DOSPts[i, :]),
method=method,
constraints=(nlc))
# Append results into fDOS, fDIS and message list for each iteration
if warmstart is True:
if sol.success is True:
u0 = sol.x
else:
u0 = u00 # Reboot to first initial estimate
else:
u0 = u00 # Reboot to first initial estimate
if ad is True:
fDOS = fDOS.at[i, :].set(model(sol.x))
fDIS = fDIS.at[i, :].set(sol.x)
elif ad is False:
fDOS[i, :] = model(sol.x)
fDIS[i, :] = sol.x
message_list.append(sol.message)
if fDIS.shape[1] > 3 and fDOS.shape[1] > 3:
plot = False # Assignment, not comparison: disable plot for high-dimensional cases.
print(f'Plotting is only supported for 2D and 3D. '
f'DIS* has dimension {fDIS.shape[1]}, DOS* has dimension {fDOS.shape[1]}.')
else:
if plot is False:
pass
elif plot is True:
if fDIS.shape[1] == 2 and fDOS.shape[1] == 2:
_, (ax1, ax2) = plt.subplots(nrows=1,ncols=2,
constrained_layout=True)
ax1.scatter(fDIS[:, 0], fDIS[:, 1], s=16,
c=np.sqrt(fDOS[:, 0]**1 + fDOS[:, 1]**1),
cmap=cmap, antialiased=True,
lw=lineweight, marker='s',
edgecolors=edgecolors, label='DIS*')
ax1.set_title('Feasible Desired Input Set (DIS*)', fontsize=10)
vertices_DOS = [(DOS_bounds[0, 0], DOS_bounds[1, 0]),
(DOS_bounds[0, 0], DOS_bounds[1, 1]),
(DOS_bounds[0, 1], DOS_bounds[1, 1]),
(DOS_bounds[0, 1], DOS_bounds[1, 0])]
vertices_DOS = np.array(vertices_DOS)
ax2.fill(vertices_DOS[:, 0], vertices_DOS[:, 1],
facecolor='gray', edgecolor='gray',
alpha=0.5, label= 'DOS')
ax2.scatter(fDOS[:, 0], fDOS[:, 1], s=16,
c=np.sqrt(fDOS[:, 0]**1 + fDOS[:, 1]**1),
cmap=cmap, antialiased=True,
lw=lineweight, marker='o',
edgecolors=edgecolors, label='DOS*')
if labels is not None:
if len(labels) != 4:
raise ValueError('You need four entries for your custom '+
'labels for your 2x2 system, but entered ' +
'an incorrect number of labels.')
else:
ax1.set_xlabel(labels[0])
ax1.set_ylabel(labels[1])
ax2.set_xlabel(labels[2])
ax2.set_ylabel(labels[3])
else:
ax1.set_ylabel('$u_{2}$')
ax1.set_xlabel('$u_{1}$')
ax2.set_ylabel('$y_{2}$')
ax2.set_xlabel('$y_{1}$')
ax2.set_title('Feasible Desired Output Set (DOS*)', fontsize=10)
plt.legend()
elif fDIS.shape[1] == 3 and fDOS.shape[1] == 3:
fig = plt.figure(figsize=plt.figaspect(0.5))
ax = fig.add_subplot(1,2,1, projection='3d')
plt.rcParams['figure.facecolor'] = 'white'
ax.scatter(fDIS[:, 0], fDIS[:, 1], fDIS[:,2],
s=16, c=np.sqrt(fDOS[:, 0]**2 +
fDOS[:, 1]**2 +
fDOS[:, 2]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='s',
edgecolors=edgecolors)
if labels is not None:
if len(labels) != 6:
raise ValueError('You need six entries for your custom '+
'labels for your 3x3 system, but entered ' +
'an incorrect number of labels.')
else:
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
ax.set_zlabel(labels[2])
else:
ax.set_xlabel('$u_{1}$')
ax.set_ylabel('$u_{2}$')
ax.set_zlabel('$u_{3}$')
ax.set_title('DIS*')
ax = fig.add_subplot(1,2,2, projection='3d')
ax.scatter(fDOS[:, 0],
fDOS[:, 1],
fDOS[:, 2],
s=16,
c=np.sqrt(fDOS[:, 0]**2 +
fDOS[:, 1]**2 +
fDOS[:, 2]**2),
cmap=cmap,
antialiased=True,
lw=lineweight,
marker='o',
edgecolors=edgecolors)
if labels is not None:
ax.set_xlabel(labels[3])
ax.set_ylabel(labels[4])
ax.set_zlabel(labels[5])
else:
ax.set_xlabel('$y_{1}$')
ax.set_ylabel('$y_{2}$')
ax.set_zlabel('$y_{3}$')
ax.set_title('$DOS*$')
elif fDIS.shape[1] == 2 and fDOS.shape[1] == 3:
fig = plt.figure(figsize=plt.figaspect(0.5))
ax = fig.add_subplot(1,2,1)
plt.rcParams['figure.facecolor'] = 'white'
ax.scatter(fDIS[:, 0], fDIS[:, 1],
s=16, c=np.sqrt(fDOS[:, 0]**2 +
fDOS[:, 1]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='s',
edgecolors=edgecolors)
if labels is not None:
if len(labels) != 5:
raise ValueError('You need five entries for your custom '+
'labels for your 2x3 system, but entered ' +
'an incorrect number of labels.')
else:
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
else:
ax.set_xlabel('$u_{1}$')
ax.set_ylabel('$u_{2}$')
ax.set_title('DIS*')
ax = fig.add_subplot(1,2,2, projection='3d')
ax.scatter(fDOS[:, 0],
fDOS[:, 1],
fDOS[:, 2],
s=16,
c=np.sqrt(fDOS[:, 0]**2 +
fDOS[:, 1]**2 +
fDOS[:, 2]**2),
cmap=cmap,
antialiased=True,
lw=lineweight,
marker='o',
edgecolors=edgecolors)
if labels is not None:
ax.set_xlabel(labels[2])
ax.set_ylabel(labels[3])
ax.set_zlabel(labels[4])
else:
ax.set_xlabel('$y_{1}$')
ax.set_ylabel('$y_{2}$')
ax.set_zlabel('$y_{3}$')
ax.set_title('$DOS*$')
elif fDIS.shape[1] == 3 and fDOS.shape[1] == 2:
fig = plt.figure(figsize=plt.figaspect(0.5))
ax = fig.add_subplot(1,2,1, projection='3d')
plt.rcParams['figure.facecolor'] = 'white'
ax.scatter(fDIS[:, 0], fDIS[:, 1], fDIS[:, 2],
s=16, c=np.sqrt(fDOS[:, 0]**2 +
fDOS[:, 1]**2 +
fDOS[:, 2]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='s',
edgecolors=edgecolors)
if labels is not None:
if len(labels) != 5:
raise ValueError('You need five entries for your custom '+
'labels for your 3x2 system, but entered ' +
'an incorrect number of labels.')
else:
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
ax.set_zlabel(labels[2])
else:
ax.set_xlabel('$u_{1}$')
ax.set_ylabel('$u_{2}$')
ax.set_zlabel('$u_{3}$')
ax.set_title('DIS*')
ax = fig.add_subplot(1,2,2)
ax.scatter(fDOS[:, 0],
fDOS[:, 1],
s=16,
c=np.sqrt(fDOS[:, 0]**2 +
fDOS[:, 1]**2),
cmap=cmap,
antialiased=True,
lw=lineweight,
marker='o',
edgecolors=edgecolors)
if labels is not None:
ax.set_xlabel(labels[3])
ax.set_ylabel(labels[4])
else:
ax.set_xlabel('$y_{1}$')
ax.set_ylabel('$y_{2}$')
ax.set_title('$DOS*$')
else:
print(f'Plotting is only supported for 2D and 3D. '
f'DIS* has dimension {fDIS.shape[1]}, DOS* has dimension {fDOS.shape[1]}.')
plot = False # Assignment, not comparison: disable plot for high-dimensional cases.
if problem == 'P1':
return fDIS, fDOS, message_list
# P2/P3 intensified design selection (Carrasco and Lima): filter the
# DOS* by the level of performance (DOS_PI, eq. 5.1) and select the
# design among the DIS_PI points that minimizes the PI target.
fDIS_arr = np.asarray(fDIS, dtype=float)
fDOS_arr = np.asarray(fDOS, dtype=float)
perf = (np.asarray(PI_bounds, dtype=float) if PI_bounds is not None
else np.asarray(DOS_bounds, dtype=float))
inside = np.all((fDOS_arr >= perf[:, 0] - 1e-6)
& (fDOS_arr <= perf[:, 1] + 1e-6), axis=1)
if not np.any(inside):
raise ValueError(
"No DOS* point satisfies the level of performance "
"(PI_bounds). Loosen PI_bounds, refine DOS_resolution, or "
"check that the performance subset overlaps the achievable "
"outputs.")
DIS_PI = fDIS_arr[inside]
DOS_PI = fDOS_arr[inside]
PI_grid = np.asarray([float(PI_target(u)) for u in DIS_PI])
i_best = int(np.argmin(PI_grid))
pi_report = {'u_PI': DIS_PI[i_best],
'y_PI': DOS_PI[i_best],
'PI_value': float(PI_grid[i_best]),
'problem': problem,
'DIS_PI': DIS_PI,
'DOS_PI': DOS_PI,
'PI_grid': PI_grid,
'success': True}
if plot is True and fDIS_arr.shape[1] == 2 and fDOS_arr.shape[1] == 2:
_plot_pi_sets(fDIS_arr, fDOS_arr, pi_report, DOS_bounds, perf,
labels=labels)
return fDIS, fDOS, message_list, pi_report
def _plot_pi_sets(fDIS, fDOS, pi_report, DOS_bounds, perf, labels=None):
'''
Plot the process intensification sets for problems P2/P3 in the style
of Carrasco's dissertation (Fig. 5.2): DIS* and DOS* as solid dots,
the DIS_PI/DOS_PI level-of-performance subsets as hollow red circles,
and the intensified design as a star. 2D only.
'''
DIS_PI = pi_report['DIS_PI']
DOS_PI = pi_report['DOS_PI']
u_PI = pi_report['u_PI']
y_PI = pi_report['y_PI']
_, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, constrained_layout=True)
ax1.scatter(fDIS[:, 0], fDIS[:, 1], s=12, c='k', marker='.',
label='DIS*')
ax1.scatter(DIS_PI[:, 0], DIS_PI[:, 1], s=60, facecolors='none',
edgecolors='r', label='$DIS_{PI}$')
ax1.scatter(u_PI[0], u_PI[1], s=120, c='b', marker='*',
label='$u_{PI}^*$')
ax1.set_title('Intensified design selection (inputs)', fontsize=9)
ax1.legend(fontsize=7)
# DOS box and performance subset box for spatial reference.
dos = np.asarray(DOS_bounds, dtype=float)
ax2.add_patch(mpatches.Rectangle(
(dos[0, 0], dos[1, 0]), dos[0, 1] - dos[0, 0],
dos[1, 1] - dos[1, 0], fill=False, edgecolor='gray',
linestyle='--', linewidth=0.8, label='DOS'))
ax2.add_patch(mpatches.Rectangle(
(perf[0, 0], perf[1, 0]), perf[0, 1] - perf[0, 0],
perf[1, 1] - perf[1, 0], fill=False, edgecolor='g',
linestyle=':', linewidth=1.0, label='$DOS_{PI}$ box'))
ax2.scatter(fDOS[:, 0], fDOS[:, 1], s=12, c='k', marker='.',
label='DOS*')
ax2.scatter(DOS_PI[:, 0], DOS_PI[:, 1], s=60, facecolors='none',
edgecolors='r', label='$DOS_{PI}$')
ax2.scatter(y_PI[0], y_PI[1], s=120, c='b', marker='*',
label='$y_{PI}^*$')
ax2.set_title('Intensified design selection (outputs)', fontsize=9)
ax2.legend(fontsize=7)
if labels is not None and len(labels) >= 4:
ax1.set_xlabel(labels[0])
ax1.set_ylabel(labels[1])
ax2.set_xlabel(labels[2])
ax2.set_ylabel(labels[3])
else:
ax1.set_xlabel('$u_{1}$')
ax1.set_ylabel('$u_{2}$')
ax2.set_xlabel('$y_{1}$')
ax2.set_ylabel('$y_{2}$')
[docs]
def create_grid(region_bounds: np.ndarray, region_resolution: np.ndarray):
'''
Create a multidimensional, discretized grid, given the bounds and the
resolution.
Author: San Dinh
Parameters
----------
region_bounds : np.ndarray
Numpy array that contains the bounds of a (possibily) multidimensional
region. Each row corresponds to the lower and upper bound of each
variable that constitutes the hypercube.
region_resolution: np.ndarray
Array containing the resolution of the discretization grid for the
region. Each element corresponds to the resolution of each variable.
For a resolution defined as k, it will generate d^k points
(in which d is the dimensionality of the problem).
Returns
-------
region_grid: np.ndarray
Multidimensional array that represents the grid descritization of the
region in Euclidean coordinates.
'''
# Indexing
numInput = np.prod(region_resolution)
nInput = region_bounds.shape[0]
Input_u = []
# Create discretized region based on bounds and resolution information.
for i in range(nInput):
Input_u.append(list(np.linspace(region_bounds[i, 0],
region_bounds[i, 1],
region_resolution[i])))
# Create slack variables for preallocation purposes.
region_grid = np.zeros(region_resolution + [nInput])
for i in range(numInput):
inputID = [0]*nInput
inputID[0] = int(np.mod(i, region_resolution[0]))
region_val = [Input_u[0][inputID[0]]]
for j in range(1, nInput):
inputID[j] = int(np.mod(np.floor(i/np.prod(region_resolution[0:j])),
region_resolution[j]))
region_val.append(Input_u[j][inputID[j]])
region_grid[tuple(inputID)] = region_val
return region_grid
[docs]
def AIS2AOS_map(model: Callable[...,Union[float,np.ndarray]],
AIS_bound: np.ndarray,
AIS_resolution: np.ndarray,
EDS_bound: np.ndarray = None,
EDS_resolution: np.ndarray = None,
plot: bool = True,
labels: list = None,
output_dim: int = None,
pyomo_solver: str = 'ipopt')-> Union[np.ndarray,np.ndarray]:
'''
Forward mapping for Process Operability calculations (From AIS to AOS).
From an Available Input Set (AIS) bounds and discretization resolution both
defined by the user,
this function calculates the corresponding discretized
Available Output Set (AOS).
Authors: San Dinh & Victor Alves
Parameters
----------
model : Callable[...,Union[float,np.ndarray]]
Process model that calculates the relationship between inputs (AIS)
and Outputs (AOS).
AIS_bound : np.ndarray
Lower and upper bounds for the Available Input Set (AIS).
AIS_resolution : np.ndarray
Resolution for the Available Input Set (AIS). This will be used to
discretize the AIS.
EDS_bound : np.ndarray
Lower and upper bounds for the Expected Disturbance Set (EDS). Default
is 'None'.
EDS_resolution : np.ndarray
Resolution for the Expected Disturbance Set (EDS). This will be used to
discretize the EDS.
plot: bool
Turn on/off plot. If dimension is d<=3, plot is available and
both the Achievable Output Set (AOS) and Available Input
Set (AIS) are plotted. Default is True.
labels: list, Optional.
Custom axis labels for the plots, input-side labels first and
output-side labels afterwards (when an EDS is present, the
disturbance labels follow the manipulated input labels). Accepts
TeX math input as it uses matplotlib math rendering. Example for
a 2x2 system: labels=['Flowrate [m3/h]', 'Temperature [K]',
'Level [m]', 'Concentration [mol/L]']. Default is None, which
reproduces the standard $u_{i}$/$d_{i}$/$y_{i}$ labels.
output_dim: int, Optional.
Number of output variables y. Required when the model is a
Pyomo/OMLT builder (the output dimension cannot be inferred from
an algebraic model). Ignored for plain callable models. Default
is None.
pyomo_solver: str, Optional.
NLP solver used to simulate Pyomo/OMLT models in the forward
mapping ('ipopt' or 'pounce'). Ignored for plain callable models.
Default is 'ipopt'.
Notes
-----
Pyomo/OMLT models: when the model is a Pyomo builder (a function
model(m, u_vars, y_vars) flagged with a build_pyomo_constraints
attribute, or an OMLT object), the forward mapping wraps it into a
simulation proxy: inputs are fixed at each grid point and the square
algebraic system is solved for the outputs. The proxy prefers the
persistent appsi_ipopt interface when available, falling back to the
standard executable interface. Pyomo/OMLT support contributed by
Heitor F. (github @hfsf), PR #33, adapted.
Returns
-------
AIS : np.ndarray
Discretized Available Input Set (AIS).
AOS : np.ndarray
Discretized Available Output Set (AOS).
Discretized Available Output Set (AOS).
'''
# Pyomo/OMLT simulation proxy (PR #33, adapted): wrap the algebraic
# model into a plain callable so the grid loop below runs unchanged.
if _is_pyomo_model(model):
if output_dim is None:
raise ValueError(
"For Pyomo/OMLT models in the forward mapping you must "
"provide the 'output_dim' argument (number of output "
"variables y), since it cannot be inferred from the "
"algebraic model.")
warnings.warn("Pyomo/OMLT model detected in the forward mapping. "
"Creating a simulation proxy.", UserWarning)
pyo, _ = _pyomo_solver_factory(pyomo_solver, print_level=0)
m_sim = pyo.ConcreteModel()
total_inputs = AIS_bound.shape[0]
if EDS_bound is not None:
total_inputs += EDS_bound.shape[0]
m_sim.u = pyo.Var(range(total_inputs))
m_sim.y = pyo.Var(range(output_dim))
# The user's builder adds the process model constraints.
model(m_sim, m_sim.u, m_sim.y)
# Constant objective: each grid point is a square simulation.
m_sim.dummy_obj = pyo.Objective(expr=0, sense=pyo.minimize)
# Prefer the persistent APPSI interface for speed when available
# (keeps the solver loaded in memory across grid points).
sim_solver_name = pyomo_solver
if pyomo_solver == 'ipopt':
try:
if pyo.SolverFactory('appsi_ipopt').available():
sim_solver_name = 'appsi_ipopt'
except Exception:
pass
sim_solver = pyo.SolverFactory(sim_solver_name)
try:
sim_solver.options['print_level'] = 0
if sim_solver_name == 'ipopt':
sim_solver.options['sb'] = 'yes'
sim_solver.options['warm_start_init_point'] = 'yes'
sim_solver.options['mu_strategy'] = 'adaptive'
except Exception:
pass
def _pyomo_simulation_proxy(u_values):
for k_u in range(len(u_values)):
m_sim.u[k_u].fix(float(u_values[k_u]))
res_sim = sim_solver.solve(m_sim, tee=False)
term_sim = res_sim.solver.termination_condition
if term_sim != pyo.TerminationCondition.optimal:
for k_y in m_sim.y:
m_sim.y[k_y].set_value(0)
return np.full(output_dim, np.nan)
return np.array([pyo.value(m_sim.y[k_y]) for k_y in m_sim.y])
model = _pyomo_simulation_proxy
# Indexing
# Check if both EDS parameters are None using identity checks.
# Avoid `type(x) and type(y) is type(None)` as type() is always truthy.
if EDS_bound is None and EDS_resolution is None:
numInput_map = np.prod(AIS_resolution)
nInput_map = AIS_bound.shape[0]
map_bounds = AIS_bound
map_resolution = AIS_resolution
else:
numInput_map = np.prod(AIS_resolution + EDS_resolution)
numInput_d = np.prod(EDS_resolution)
nInput_map = AIS_bound.shape[0] + EDS_bound.shape[0]
nInput_d = EDS_bound.shape[0]
map_bounds = np.concatenate([AIS_bound, EDS_bound])
map_resolution = AIS_resolution + EDS_resolution
EDS = np.zeros(EDS_resolution + [nInput_d])
Input_map = []
Input_d = []
# Create discretized AIS based on bounds and resolution information.
for i in range(nInput_map):
Input_map.append(list(np.linspace(map_bounds[i, 0],
map_bounds[i, 1],
map_resolution[i])))
# Only populate EDS arrays when both EDS parameters are provided.
if EDS_bound is not None and EDS_resolution is not None:
for i in range(nInput_d):
Input_d.append(list(np.linspace(EDS_bound[i, 0],
EDS_bound[i, 1],
EDS_resolution[i])))
else:
pass
# Create slack variables for preallocation purposes.
u_d_slack = map_bounds[:, 0]
y_slack = model(u_d_slack)
nOutput = y_slack.shape[0]
input_map = np.zeros(map_resolution + [nInput_map])
AOS = np.zeros(map_resolution + [nOutput])
# General map (AIS+EDS) multidimensional array
for i in range(numInput_map):
inputID = [0]*nInput_map
inputID[0] = int(np.mod(i, map_resolution[0]))
map_val = [Input_map[0][inputID[0]]]
for j in range(1, nInput_map):
inputID[j] = int(np.mod(np.floor(i/np.prod(map_resolution[0:j])),
map_resolution[j]))
map_val.append(Input_map[j][inputID[j]])
input_map[tuple(inputID)] = map_val
AOS[tuple(inputID)] = model(map_val)
# EDS multidimensional array.
# Only populate EDS arrays when both EDS parameters are provided.
if EDS_bound is not None and EDS_resolution is not None:
for i in range(numInput_d):
inputID = [0]*nInput_d
inputID[0] = int(np.mod(i, EDS_resolution[0]))
map_val = [Input_d[0][inputID[0]]]
for j in range(1, nInput_d):
inputID[j] = int(np.mod(np.floor(i/np.prod(EDS_resolution[0:j])),
map_resolution[j]))
map_val.append(Input_d[j][inputID[j]])
EDS[tuple(inputID)] = map_val
else:
pass
# 2D / 3D Plots
if plot is False:
pass
elif plot is True:
# Effective axis labels: the defaults reproduce the standard
# $u_{i}$/$d_{i}$/$y_{i}$ labels exactly; user-provided labels
# replace them positionally (inputs first, then disturbances,
# then outputs).
n_in_plot = input_map.shape[-1]
n_out_plot = AOS.shape[-1]
n_d_plot = 0 if EDS_bound is None else EDS_bound.shape[0]
in_labels = (['$u_{' + str(i + 1) + '}$'
for i in range(n_in_plot - n_d_plot)]
+ ['$d_{' + str(i + 1) + '}$'
for i in range(n_d_plot)])
out_labels = ['$y_{' + str(i + 1) + '}$'
for i in range(n_out_plot)]
if labels is not None:
if len(labels) != n_in_plot + n_out_plot:
raise ValueError('You need ' +
str(n_in_plot + n_out_plot) +
' entries for your custom labels '
'(inputs/disturbances first, then '
'outputs), but entered ' +
str(len(labels)) + ' labels.')
in_labels = list(labels[:n_in_plot])
out_labels = list(labels[n_in_plot:])
if input_map.shape[-1] == 2 and AOS.shape[-1] == 2:
input_plot = input_map.reshape(np.prod(input_map.shape[0:-1]),
input_map.shape[-1])
AOS_plot = AOS.reshape(np.prod(AOS.shape[0:-1]), AOS.shape[-1])
_, (ax1, ax2) = plt.subplots(nrows=1,ncols=2,
constrained_layout=True)
plt.rcParams['figure.facecolor'] = 'white'
ax1.scatter(input_plot[:, 0], input_plot[:, 1], s=16,
c=np.sqrt(AOS_plot[:, 0]**2 + AOS_plot[:, 1]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='s',
edgecolors=edgecolors)
ax1.set_xlabel(in_labels[0])
if (EDS_bound and EDS_resolution) is None:
ax1.set_title('$AIS_{u}$')
else:
ax1.set_title(r'$AIS_{u} \, and \, EDS_{d}$')
ax1.set_ylabel(in_labels[1])
ax2.scatter(AOS_plot[:, 0], AOS_plot[:, 1], s=16,
c=np.sqrt(AOS_plot[:, 0]**2 + AOS_plot[:, 1]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='o',
edgecolors=edgecolors)
ax2.set_ylabel(out_labels[1])
plt.xlabel(out_labels[0])
ax2.set_title('$AOS$')
elif input_map.shape[-1]== 3 and AOS.shape[-1] == 3:
input_plot = input_map.reshape(np.prod(input_map.shape[0:-1]),
input_map.shape[-1])
AOS_plot = AOS.reshape(np.prod(AOS.shape[0:-1]), AOS.shape[-1])
fig = plt.figure(figsize=plt.figaspect(0.5))
ax = fig.add_subplot(1,2,1, projection='3d')
plt.rcParams['figure.facecolor'] = 'white'
ax.scatter(input_plot[:, 0], input_plot[:, 1], input_plot[:,2], s=16,
c=np.sqrt(AOS_plot[:, 0]**2 + AOS_plot[:, 1]**2 +
AOS_plot[:, 2]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='s',
edgecolors=edgecolors)
ax.set_xlabel(in_labels[0])
# Check if both EDS parameters are None using identity checks.
# Avoid `type(x) and type(y) is type(None)` as type() is always truthy.
if EDS_bound is None and EDS_resolution is None:
ax.set_title('$AIS_{u}$')
elif EDS_bound.shape[0] == 2:
ax.set_title(r'$AIS_{u} \, and \, EDS_{d}$')
elif EDS_bound.shape[0] == 1:
ax.set_title(r'$AIS_{u} \, and \, EDS_{d}$')
ax.set_ylabel(in_labels[1])
ax.set_zlabel(in_labels[2])
ax = fig.add_subplot(1,2,2, projection='3d')
ax.scatter(AOS_plot[:, 0], AOS_plot[:, 1], AOS_plot[:, 2], s=16,
c=np.sqrt(AOS_plot[:, 0]**2 + AOS_plot[:, 1]**2 +
AOS_plot[:, 2]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='o',
edgecolors=edgecolors)
ax.set_ylabel(out_labels[1])
ax.set_xlabel(out_labels[0])
ax.set_zlabel(out_labels[2])
ax.set_title('$AOS$')
elif input_map.shape[-1] == 2 and AOS.shape[-1] == 3:
input_plot = input_map.reshape(np.prod(input_map.shape[0:-1]),
input_map.shape[-1])
AOS_plot = AOS.reshape(np.prod(AOS.shape[0:-1]), AOS.shape[-1])
fig = plt.figure(figsize=plt.figaspect(0.5))
ax = fig.add_subplot(1,2,1)
plt.rcParams['figure.facecolor'] = 'white'
ax.scatter(input_plot[:, 0], input_plot[:, 1], s=16,
c=np.sqrt(AOS_plot[:, 0]**2 + AOS_plot[:, 1]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='s',
edgecolors=edgecolors)
ax.set_xlabel(in_labels[0])
# Check if both EDS parameters are None using identity checks.
# Avoid `type(x) and type(y) is type(None)` as type() is always truthy.
if EDS_bound is None and EDS_resolution is None:
plt.title('$AIS_{u}$')
else:
plt.title(r'$AIS_{u} \, and \, EDS_{d}$')
plt.ylabel(in_labels[1])
ax = fig.add_subplot(1,2,2, projection='3d')
ax.scatter(AOS_plot[:, 0], AOS_plot[:, 1], AOS_plot[:, 2], s=16,
c=np.sqrt(AOS_plot[:, 0]**2 + AOS_plot[:, 1]**2 +
AOS_plot[:, 2]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='o',
edgecolors=edgecolors)
ax.set_ylabel(out_labels[1])
ax.set_xlabel(out_labels[0])
ax.set_zlabel(out_labels[2])
ax.set_title('$AOS$')
elif input_map.shape[-1] == 3 and AOS.shape[-1] == 2:
input_plot = input_map.reshape(np.prod(input_map.shape[0:-1]),
input_map.shape[-1])
AOS_plot = AOS.reshape(np.prod(AOS.shape[0:-1]), AOS.shape[-1])
fig = plt.figure(figsize=plt.figaspect(0.5))
ax = fig.add_subplot(1,2,1, projection='3d')
plt.rcParams['figure.facecolor'] = 'white'
ax.scatter(input_plot[:, 0], input_plot[:, 1], input_plot[:,2], s=16,
c=np.sqrt(AOS_plot[:, 0]**2 + AOS_plot[:, 1]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='s',
edgecolors=edgecolors)
ax.set_xlabel(in_labels[0])
# Check if both EDS parameters are None using identity checks.
# Avoid `type(x) and type(y) is type(None)` as type() is always truthy.
if EDS_bound is None and EDS_resolution is None:
ax.set_title('$AIS_{u}$')
elif EDS_bound.shape[0] == 2:
# Closing $ required for valid LaTeX rendering.
ax.set_title(r'$AIS_{u} \, and \, EDS_{d}$')
elif EDS_bound.shape[0] == 1:
ax.set_title(r'$AIS_{u} \, and \, EDS_{d}$')
ax.set_ylabel(in_labels[1])
ax.set_zlabel(in_labels[2])
ax = fig.add_subplot(1,2,2)
ax.scatter(AOS_plot[:, 0], AOS_plot[:, 1], s=16,
c=np.sqrt(AOS_plot[:, 0]**2 + AOS_plot[:, 1]**2),
cmap=cmap, antialiased=True,
lw=lineweight, marker='o',
edgecolors=edgecolors)
ax.set_ylabel(out_labels[1])
ax.set_xlabel(out_labels[0])
ax.set_title('$AOS$')
else:
print(f'Plotting is only supported for 2D and 3D. '
f'Your data has dimension {input_map.shape[-1]} inputs '
f'and {AOS.shape[-1]} outputs.')
else:
pass
return input_map, AOS
def _paired_simplices(AIS: np.ndarray, AOS: np.ndarray) -> Union[list, list]:
'''
Build the raw paired input/output simplices from gridded AIS/AOS data,
PRESERVING the vertex correspondence: column m of an AIS simplex maps to
column m of its paired AOS simplex (same grid point). This pairing is
required by barycentric interpolation (e.g. the multilayer MILP of the
Gazzaneo and Lima framework) and is destroyed by the qhull vertex
reordering that points2simplices applies for its public output.
Returns (AIS_simplices, AOS_simplices) as lists of (n_var, n + 1) arrays.
'''
# Additional parameters: Resolution, no. of inputs/outputs.
resolution = AIS.shape[0:-1]
nInputVar = AIS.shape[-1]
nOutputVar = AOS.shape[-1]
numInput = np.prod(resolution)
nInput = nInputVar
inputID = [1]*nInput
permutations = list(perms(range(nOutputVar)))
n_simplex = len(permutations)
simplex_vertices = list()
for n in range(n_simplex):
ineq_choice = permutations[n]
A_full = np.zeros((nOutputVar+1, nOutputVar))
A_full[0, ineq_choice[0]] = -1
for i in range(0, nOutputVar-1):
A_full[i+1, ineq_choice[i]] = 1
A_full[i+1, ineq_choice[i+1]] = -1
A_full[-1, ineq_choice[-1]] = 1
b_full = np.zeros((nOutputVar+1, 1))
b_full[-1] = 1
V_simplex = np.zeros((nOutputVar, nOutputVar+1))
for i in range(nOutputVar+1):
A = np.delete(A_full, i, axis=0)
b = np.delete(b_full, i, axis=0)
V_sol = np.linalg.solve(A, b)
V_simplex[:, [i]] = V_sol.astype(int)
simplex_vertices.append(list(V_simplex.T))
AOS_simplices = list()
AIS_simplices = list()
for i in range(numInput):
inputID[0] = int(np.mod(i, resolution[0]))
for j in range(1, nInput):
inputID[j] = int(np.mod(np.floor(i/np.prod(resolution[0:j])),
resolution[j]))
if np.prod(inputID) != 0:
inputID = [x-1 for x in inputID]
for k in range(n_simplex):
ID = [a + b for a, b in zip([inputID]*(nInput + 1),
simplex_vertices[k])]
V_AOS_id = np.zeros((nOutputVar, nInput + 1))
V_AIS_id = np.zeros((nInputVar, nInput + 1))
for m in range(nInput + 1):
ID_cell = tuple([int(x) for x in ID[m]])
V_AOS_id[:, m] = AOS[ID_cell]
V_AIS_id[:, m] = AIS[ID_cell]
AOS_simplices.append(V_AOS_id)
AIS_simplices.append(V_AIS_id)
return AIS_simplices, AOS_simplices
[docs]
def points2simplices(AIS: np.ndarray, AOS: np.ndarray) -> Union[np.ndarray,
np.ndarray]:
'''
Generation of connected simplices (k+1 convex hull of k+1 vertices)
based on the AIS/AOS points.
Author: San Dinh
Parameters
----------
AIS : np.ndarray
Array containing the points that constitutes the AIS.
AOS : np.ndarray
Array containing the points that constitutes the AOS.
Returns
-------
AIS_simplices : np.ndarray
List of input (AIS) vertices for each simplex generated, in the form of
an array.
AOS_simplices : np.ndarray
List of output (AOS) vertices for each simplex generated, in the form
of an array.
'''
AIS_simplices, AOS_simplices = _paired_simplices(AIS, AOS)
# Putting polytopes together.
for i, simplex in enumerate(AOS_simplices):
poly = pc.qhull(simplex.T)
AOS_simplices[i] = pc.extreme(poly)
for i, simplex in enumerate(AIS_simplices):
poly = pc.qhull(simplex.T)
AIS_simplices[i] = pc.extreme(poly)
return AIS_simplices, AOS_simplices
[docs]
def points2polyhedra(AIS: np.ndarray, AOS: np.ndarray) -> Union[np.ndarray,
np.ndarray]:
'''
Generation of connected polyhedra based on the AIS/AOS points.
Author: San Dinh
Parameters
----------
AIS : np.ndarray
Array containing the points that constitutes the AIS.
AOS : np.ndarray
Array containing the points that constitutes the AOS.
Returns
-------
AIS_polytope : np.ndarray
List of input (AIS) vertices for each polytope generated, in the form
of an array.
AOS_polytope : np.ndarray
List of output (AOS) vertices for each polyhedra generated, in the form
of an array.
'''
# Additional parameters: Resolution, no. of inputs/outputs.
resolution = AIS.shape[0:-1]
nInputVar = AIS.shape[-1]
nOutputVar = AOS.shape[-1]
numInput = np.prod(resolution)
nInput = nInputVar
inputID = [1]*nInput
cube_vertices = list()
for n in range(2**(nInput)):
cube_vertices.append([int(x) for x in
np.binary_repr(n, width=nInput)])
# Preallocation
AOS_polytope = list()
AIS_polytope = list()
for i in range(numInput):
inputID[0] = int(np.mod(i, resolution[0]))
for j in range(1, nInput):
inputID[j] = int(np.mod(np.floor(i/np.prod(resolution[0:j])),
resolution[j]))
if np.prod(inputID) != 0:
inputID = [x-1 for x in inputID]
ID = np.array([inputID]*(2**(nInput))) + np.array(cube_vertices)
V_AOS_id = np.zeros((nOutputVar, 2**(nInput)))
V_AIS_id = np.zeros((nInputVar, 2**(nInput)))
for k in range(2**(nInput)):
ID_cell = tuple([int(x) for x in ID[k]])
V_AOS_id[:, k] = AOS[ID_cell]
V_AIS_id[:, k] = AIS[ID_cell]
AOS_polytope.append(V_AOS_id)
AIS_polytope.append(V_AIS_id)
# Putting polytopes together.
for i, simplex in enumerate(AOS_polytope):
poly = pc.qhull(simplex.T)
AOS_polytope[i] = pc.extreme(poly)
for i, simplex in enumerate(AIS_polytope):
poly = pc.qhull(simplex.T)
AIS_polytope[i] = pc.extreme(poly)
return AIS_polytope, AOS_polytope
def _filter_polytope_pairs(AIS_raw, AOS_raw, DOS_bounds, input_constr,
AIS_bound):
'''
Layer 1, step 3 of the multilayer operability framework (Gazzaneo and
Lima): from the paired input/output simplices, keep the subset S' of
pairs whose output simplex intersects the DOS and (when given) whose
input simplex intersects the linear input-constraint polytope
A_u u <= b_u. Returns the list of kept pair indices; degenerate
simplices are skipped. The input-constraint polytope is bounded by
the AIS box (the polytope library mishandles intersections with
unbounded polyhedra).
'''
DOS_poly = pc.box2poly(np.asarray(DOS_bounds, dtype=float))
U_poly = None
if input_constr is not None:
A_u, b_u = input_constr
A_u = np.asarray(A_u, dtype=float)
b_u = np.asarray(b_u, dtype=float).reshape(-1)
box = np.asarray(AIS_bound, dtype=float)
n_u = box.shape[0]
A_box = np.vstack([np.eye(n_u), -np.eye(n_u)])
b_box = np.hstack([box[:, 1], -box[:, 0]])
U_poly = pc.Polytope(np.vstack([A_u, A_box]),
np.hstack([b_u, b_box]))
kept = []
n_degenerate = 0
for k in range(len(AIS_raw)):
try:
aos_poly = pc.qhull(AOS_raw[k].T)
if pc.is_empty(aos_poly):
n_degenerate += 1
continue
if not pc.is_fulldim(pc.intersect(aos_poly, DOS_poly)):
continue
if U_poly is not None:
ais_poly = pc.qhull(AIS_raw[k].T)
if pc.is_empty(ais_poly):
n_degenerate += 1
continue
if not pc.is_fulldim(pc.intersect(ais_poly, U_poly)):
continue
except Exception:
n_degenerate += 1
continue
kept.append(k)
if n_degenerate > 0:
warnings.warn(f'{n_degenerate} degenerate polytope pair(s) were '
'skipped during the multimodel filtering.',
UserWarning)
return kept
def _assemble_pi_milp(AIS_S, AOS_S, PI_target, DOS_bounds, input_constr):
'''
Assemble the MILP of Layer 1, step 4 of the multilayer operability
framework (Gazzaneo and Lima): continuous barycentric weights w_{j,k}
per vertex j of each kept polytope pair k, binaries b_k selecting
exactly one pair, the inputs/outputs expressed as the weighted vertex
combinations, and the linearized PI target as the objective. Returns
(c, integrality, variable_bounds, constraint_list, U_mat, Y_mat).
'''
from scipy.optimize import LinearConstraint, Bounds
K = len(AIS_S)
n_vert = AIS_S[0].shape[1]
n_u = AIS_S[0].shape[0]
n_y = AOS_S[0].shape[0]
Nw = n_vert * K
n_var = Nw + K
# Stacked vertex matrices: u = U_mat @ w, y = Y_mat @ w.
U_mat = np.hstack([V for V in AIS_S])
Y_mat = np.hstack([V for V in AOS_S])
# Objective: barycentric linearization of the PI target (evaluated at
# the input vertices); binaries carry zero cost.
c = np.concatenate([
np.array([float(PI_target(U_mat[:, j])) for j in range(Nw)]),
np.zeros(K)])
integrality = np.concatenate([np.zeros(Nw), np.ones(K)])
variable_bounds = Bounds(np.zeros(n_var), np.ones(n_var))
constraint_list = []
# Linking: sum_j w_{j,k} - b_k = 0 for each pair k.
A_link = np.zeros((K, n_var))
for k in range(K):
A_link[k, k * n_vert:(k + 1) * n_vert] = 1.0
A_link[k, Nw + k] = -1.0
constraint_list.append(LinearConstraint(A_link, 0.0, 0.0))
# Exactly one pair is selected: sum_k b_k = 1.
A_one = np.zeros((1, n_var))
A_one[0, Nw:] = 1.0
constraint_list.append(LinearConstraint(A_one, 1.0, 1.0))
# Output box (the DOS/performance targets): y_lb <= Y w <= y_ub.
dos = np.asarray(DOS_bounds, dtype=float)
A_y = np.zeros((n_y, n_var))
A_y[:, :Nw] = Y_mat
constraint_list.append(LinearConstraint(A_y, dos[:, 0], dos[:, 1]))
# Linear input constraints: A_u (U w) <= b_u.
if input_constr is not None:
A_u, b_u = input_constr
A_u = np.asarray(A_u, dtype=float)
b_u = np.asarray(b_u, dtype=float).reshape(-1)
A_in = np.zeros((A_u.shape[0], n_var))
A_in[:, :Nw] = A_u @ U_mat
constraint_list.append(
LinearConstraint(A_in, -np.inf, b_u))
return c, integrality, variable_bounds, constraint_list, U_mat, Y_mat
[docs]
def milp_based_approach(model: Callable[..., Union[float, np.ndarray]],
AIS_bound: np.ndarray,
PI_target: Callable[..., Union[float, np.ndarray]],
DOS_bounds: np.ndarray,
AIS_resolution=3,
input_constr: tuple = None,
tol: float = 1e-3,
max_iter: int = 20,
solver_options: dict = None,
plot: bool = True,
labels: list = None) -> Union[np.ndarray, np.ndarray,
float, list]:
'''
MILP-based iterative algorithm for optimal modular design: Layer 1 of
the multilayer operability framework of Gazzaneo and Lima. At each
iteration, the nonlinear process model is simulated on a coarse AIS
grid, the input-output data are triangulated into paired multimodel
polytopes (the linearized subsystems), the pairs satisfying the DOS
and the input constraints are kept, and a mixed-integer linear
program selects the design point minimizing the linearized process
intensification target via barycentric interpolation: binaries b_k
pick exactly one polytope pair and continuous weights w express the
design as a convex combination of its vertices. The AIS bounds are
then refined to the winning polytope and the procedure repeats until
the relative change in the objective falls below tol.
The MILP is solved with scipy.optimize.milp (the HiGHS solver bundled
inside scipy, no additional installation needed; for advanced HiGHS
controls the highspy package is the escalation path).
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
model : Callable
Process model (forward mapping) y = M(u). Either a plain callable
or a Pyomo/OMLT builder (auto-detected, simulated through the
forward-mapping proxy).
AIS_bound : np.ndarray
Bounds of the Available Input Set for design, shape (n_u, 2).
PI_target : Callable
Process intensification target Omega(u) to minimize (e.g. reactor
volume, membrane area, cost). Evaluated at the polytope vertices;
the MILP minimizes its barycentric linearization.
DOS_bounds : np.ndarray
Desired Output Set bounds (including PI and sustainable
manufacturing targets), shape (n_y, 2). The framework requires a
square system (n_y equal to n_u) for the simplicial multimodel
representation.
AIS_resolution : int or array-like, Optional.
Grid resolution per input dimension for the multimodel
discretization. The paper uses 3 (a 3^n grid). Default is 3.
input_constr : tuple, Optional.
Linear input constraints as (A_u, b_u) with A_u u <= b_u, e.g.
the plug-flow aspect ratio L/D >= 30 written as
(np.array([[-1.0, 30.0]]), np.array([0.0])). Default is None.
tol : float, Optional.
Relative-improvement stopping tolerance between consecutive
iterations. Default is 1e-3 (the paper's 0.001).
max_iter : int, Optional.
Maximum number of refinement iterations. Default is 20.
solver_options : dict, Optional.
Options forwarded to scipy.optimize.milp (e.g.
{'mip_rel_gap': 1e-3, 'time_limit': 60}). Default is None.
plot : bool, Optional.
Plot the iteration history (2D systems only): the refined
triangulations, the winning polytopes and the optimal design.
Default is True.
labels : list, Optional.
Axis labels, input labels first then output labels, e.g.
['L [cm]', 'D [cm]', 'Benzene [mg/h]', 'Conversion [%]'].
Default is None.
Returns
-------
u_opt : np.ndarray
The optimal (intensified/modular) design point.
y_opt : np.ndarray
The outputs of the optimal design from the barycentric
interpolation (re-evaluate model(u_opt) for the nonlinear value).
PI_linearized : float
The linearized PI target at the optimum (phi), i.e. the quantity
the MILP actually minimizes (the barycentric interpolation of
PI_target over the winning simplex vertices).
PI_true : float
The true intensification metric, PI_target evaluated directly at
the optimal design u_opt. Equals PI_linearized when PI_target is
linear, and converges to it as the simplices shrink; their gap is
the linearization error.
history : list
One dict per iteration with keys 'phi', 'u', 'y', 'PI_true',
'AIS_simplex'/'AOS_simplex' (the winning input/output simplex),
'AIS_simplices'/'AOS_simplices' (the full multimodel triangulation
of the current box), 'AIS_simplices_kept'/'AOS_simplices_kept' (the
subset reaching the DOS), 'AIS_bound', 'n_pairs', 'E_rel' and
'milp_status'.
References
----------
[1] V. Gazzaneo and F. V. Lima. Multilayer Operability Framework for
Process Design, Intensification, and Modularization of Nonlinear
Energy Systems. Ind. Eng. Chem. Res., 58, 6069-6079, 2019.
https://doi.org/10.1021/acs.iecr.8b05482
[2] V. Gazzaneo, J. C. Carrasco, D. R. Vinson and F. V. Lima.
Process Operability Algorithms: Past, Present, and Future
Developments. Ind. Eng. Chem. Res., 59, 2457-2470, 2020.
'''
from scipy.optimize import milp
AIS_bound = np.asarray(AIS_bound, dtype=float)
DOS_bounds = np.asarray(DOS_bounds, dtype=float)
n_u = AIS_bound.shape[0]
n_y = DOS_bounds.shape[0]
if n_u != n_y:
raise ValueError(
"milp_based_approach currently requires a square system "
f"(n_u == n_y); got {n_u} inputs and {n_y} outputs. The "
"simplicial multimodel representation of the framework is "
"defined for square subsystems. Support for non-square "
"systems is an upcoming feature of opyrability.")
if np.isscalar(AIS_resolution):
resolution = [int(AIS_resolution)] * n_u
else:
resolution = [int(rr) for rr in AIS_resolution]
# Pyomo/OMLT models: the forward-mapping proxy needs the output
# dimension, which here is known from the DOS.
forward_kwargs = {}
if _is_pyomo_model(model):
forward_kwargs['output_dim'] = n_y
original_range = AIS_bound[:, 1] - AIS_bound[:, 0]
bounds_current = AIS_bound.copy()
history = []
phi_prev = None
for iteration in range(max_iter):
# (1) Simulate the model on the current coarse grid and (2) build
# the paired multimodel simplices (vertex pairing preserved).
AIS_grid, AOS_grid = AIS2AOS_map(model, bounds_current, resolution,
plot=False, **forward_kwargs)
AIS_raw, AOS_raw = _paired_simplices(AIS_grid, AOS_grid)
# (3) Keep the subset S' satisfying output and input constraints.
kept = _filter_polytope_pairs(AIS_raw, AOS_raw, DOS_bounds,
input_constr, bounds_current)
if len(kept) == 0:
if iteration == 0:
raise ValueError(
"No polytope pair satisfies the DOS and input "
"constraints (S' is empty). Refine AIS_resolution, "
"widen the DOS_bounds, or check the input_constr "
"definition.")
warnings.warn(
"S' became empty after bound refinement; returning the "
"best design found so far.", UserWarning)
break
AIS_S = [AIS_raw[k] for k in kept]
AOS_S = [AOS_raw[k] for k in kept]
# (4) Assemble and solve the design MILP.
c, integrality, var_bounds, constr_list, U_mat, Y_mat = \
_assemble_pi_milp(AIS_S, AOS_S, PI_target, DOS_bounds,
input_constr)
res = milp(c=c, constraints=constr_list, integrality=integrality,
bounds=var_bounds, options=solver_options)
if not res.success:
if iteration == 0:
raise ValueError(
"The design MILP is infeasible at the first "
f"iteration (status: {res.message}). Refine "
"AIS_resolution or widen the DOS_bounds.")
warnings.warn(
f"The design MILP became infeasible ({res.message}); "
"returning the best design found so far.", UserWarning)
break
Nw = U_mat.shape[1]
w_sol = res.x[:Nw]
b_sol = res.x[Nw:]
k_win = int(np.argmax(b_sol))
u_it = U_mat @ w_sol
y_it = Y_mat @ w_sol
phi_it = float(c @ res.x)
# (5) Relative improvement check.
if phi_prev is None:
E_rel = np.inf
else:
E_rel = abs(phi_it - phi_prev) / max(abs(phi_prev), 1e-12)
history.append({'phi': phi_it,
'u': u_it,
'y': y_it,
'PI_true': float(PI_target(u_it)),
'AIS_simplex': AIS_S[k_win],
'AOS_simplex': AOS_S[k_win],
# Full multimodel triangulation of the current box and
# the subset that reaches the DOS (for visualization).
'AIS_simplices': AIS_raw,
'AOS_simplices': AOS_raw,
'AIS_simplices_kept': AIS_S,
'AOS_simplices_kept': AOS_S,
'AIS_bound': bounds_current.copy(),
'n_pairs': len(kept),
'E_rel': E_rel,
'milp_status': res.message})
if E_rel < tol:
break
phi_prev = phi_it
# (6) New bounds: the bounding box of the winning input simplex,
# guarded against collapsing to zero width.
V_win = AIS_S[k_win]
lo = V_win.min(axis=1)
hi = V_win.max(axis=1)
width_guard = 1e-9 * original_range
hi = np.maximum(hi, lo + width_guard)
bounds_current = np.column_stack((lo, hi))
# Best iteration by the linearized objective (the quantity the MILP
# actually minimizes). Both the linearized objective and the true
# PI_target evaluated at the design are returned: they coincide as the
# simplices shrink, and their gap measures the linearization error.
i_best = int(np.argmin([h['phi'] for h in history]))
u_opt = history[i_best]['u']
y_opt = history[i_best]['y']
PI_linearized = history[i_best]['phi']
PI_true = history[i_best]['PI_true']
if plot is True and n_u == 2 and n_y == 2:
_plot_milp_iterations(history, DOS_bounds, u_opt, y_opt,
labels=labels)
elif plot is True:
print(f'Plotting is only supported for 2D systems. '
f'Your design problem has {n_u} inputs and {n_y} outputs.')
return u_opt, y_opt, PI_linearized, PI_true, history
def _plot_milp_iterations(history, DOS_bounds, u_opt, y_opt, labels=None):
'''
Plot the MILP-based iterative refinement (2D), reproducing the
multimodel-triangulation view of the Gazzaneo and Lima framework
(Figs. 5-6). For each iteration the full paired triangulation of the
current AIS box is drawn faded (left = input space, right = output
space), the simplices whose output reaches the DOS are shaded a little
stronger, and the winning simplex is highlighted in the iteration
color. The DOS box and the optimal design (star) are overlaid.
'''
cmap_func = plt.get_cmap(cmap, max(len(history), 2))
_, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, constrained_layout=True)
def _fill(ax, V, **kw):
# V is a (2, 3) simplex; fill its triangle.
ax.fill(V[0, :], V[1, :], **kw)
for it, h in enumerate(history):
color = cmap_func(it / max(len(history) - 1, 1))
# Full multimodel triangulation of this iteration's box (faded),
# so the whole partition of the AIS and the image fan over the AOS
# are visible, not just the winner.
for VA, VO in zip(h['AIS_simplices'], h['AOS_simplices']):
_fill(ax1, VA, facecolor=color, alpha=0.05,
edgecolor=color, linewidth=0.3)
_fill(ax2, VO, facecolor=color, alpha=0.05,
edgecolor=color, linewidth=0.3)
# Candidate simplices that reach the DOS (the MILP search set).
for VA, VO in zip(h['AIS_simplices_kept'], h['AOS_simplices_kept']):
_fill(ax1, VA, facecolor=color, alpha=0.18,
edgecolor=color, linewidth=0.4)
_fill(ax2, VO, facecolor=color, alpha=0.18,
edgecolor=color, linewidth=0.4)
# Winning simplex of the iteration (highlighted, in the legend).
_fill(ax1, h['AIS_simplex'], facecolor=color, alpha=0.55,
edgecolor='k', linewidth=1.1, label=f'Iteration {it + 1}')
_fill(ax2, h['AOS_simplex'], facecolor=color, alpha=0.55,
edgecolor='k', linewidth=1.1)
ax1.scatter(u_opt[0], u_opt[1], s=120, c='k', marker='*', zorder=5,
label='$u^*$ (optimal design)')
ax2.scatter(y_opt[0], y_opt[1], s=120, c='k', marker='*', zorder=5)
dos = np.asarray(DOS_bounds, dtype=float)
ax2.add_patch(mpatches.Rectangle(
(dos[0, 0], dos[1, 0]), dos[0, 1] - dos[0, 0],
dos[1, 1] - dos[1, 0], fill=False, edgecolor='gray',
linestyle='--', linewidth=0.9, label='DOS'))
ax1.set_title('AIS triangulation and refinement', fontsize=9)
ax2.set_title('AOS triangulation and DOS', fontsize=9)
ax1.legend(fontsize=6)
ax2.legend(fontsize=7)
if labels is not None and len(labels) >= 4:
ax1.set_xlabel(labels[0])
ax1.set_ylabel(labels[1])
ax2.set_xlabel(labels[2])
ax2.set_ylabel(labels[3])
else:
ax1.set_xlabel('$u_{1}$')
ax1.set_ylabel('$u_{2}$')
ax2.set_xlabel('$y_{1}$')
ax2.set_ylabel('$y_{2}$')
[docs]
def implicit_map(model: Callable[...,Union[float,np.ndarray]],
image_init: np.ndarray,
domain_bound: np.ndarray = None,
domain_resolution: np.ndarray = None,
domain_points: np.ndarray = None,
direction: str = 'forward',
validation: str = 'predictor-corrector',
tol_cor: float = 1e-4,
continuation: str = 'Explicit RK4',
derivative: str = 'jax',
jit: bool = True,
step_cutting: bool = False):
'''
Performs implicit mapping of an implicitly defined process F(u,y) = 0.
F can be a vector-valued, multivariable function, which is typically the
case for chemical processes studied in Process Operability.
This method relies on the implicit function theorem and automatic
differentiation in order to obtain the mapping of the required
input/output space. The
mapping "direction" can be set by changing the 'direction' parameter.
Authors: San Dinh & Victor Alves
Parameters
----------
implicit_model : Callable[...,Union[float,np.ndarray]]
Process model that describes the relationship between the input and
output spaces. Has to be written as a function in the following form:
F(Input Vector, Output Vector) = 0
domain_bound : np.ndarray
Domains of the domain variables. Each row corresponds to the lower and
upper bound of each variable. The domain terminology corresponds to the
mathematical definition of the domain of a function: If the mapping is
in the foward direction (direction = 'forward'), then the domain
corresponds to the process inputs. Otherwise, the mapping is in the
inverse direction and the domain corresponds to the process outputs.
domain_resolution : np.ndarray
Array containing the resolution of the discretization grid for the domain.
Each element corresponds to the resolution of each variable. For a
resolution defined as k, it will generate d^k points (in which d is the
dimensionality of the domain).
direction : str, optional
Mapping direction. this will define if a forward or inverse mapping is
performed. The default is 'forward'.
validation : str, optional
How the implicit mapping is obatined. The default is 'predictor-corrector'.
The available options are:
'predictor' => Numerical integration only.
'corrector' => Solved using a nonlinear equation solver.
'predictor-corrector' => Numerical integration + correction using
nonlinear equation solver if F(u, y) >= Tolerance.
tol_cor : float, optional
Tolerance for solution of the implicit function F(u,y) <= tol_cor.
The algorithm will keep solving the implicit mapping while
F(u,y) >= tol_cor. The default is 1e-4.
continuation : str, optional
Numerical continuation method used in the implicit mapping.
The default is 'Explicit RK4' .
'Explicit Euler' - Euler's method. Good for simple (nonstiff) problems.
'Explicit RK4' - Fixed-step 4th order Runge-Kutta. Good for moderate,
mildly stiff problems. Good balance between accuracy and complexity.
'Odeint' - Variable step Runge-Kutta. Suitable for challenging,
high-dimensional and stiff problems. This is a fully-featured IVP solver.
derivative: str, optional
Derivative calculation method. If JAX is available, automatic
differentiation can be performed. The default is 'jax'.
jit: bool True, optional
JAX's Just-in-time compilation (JIT) of implicit function and its
respective multidimensional derivatives (Jacobians). JIT allows faster
computation of the implicit map. The default is True.
step_cutting: bool, False, optional
Cutting step strategy to subdivide the domain/image in case of stiffness.
The default is 'False'.
Returns
-------
domain_set: np.ndarray
Set of values that corresponds to the domain of the implicit function.
image_set: np.ndarray
Set of values that corresponds to the calculated image of the implicit
function.,
domain_polyhedra: list
Set of coordinates for polytopic construction of the domain. Can be used
to create polytopes for multimodel representation and/or OI evaluation
(see multimodel_rep and oi_calc modules)
image_polyhedra: list
Set of coordinates for polytopic construction of the calculated image
of the implicit function. Similarly to 'domain_polyhedra', this list can
be used to construct polytopes for multimodel representation and OI
evaluation.
References
----------
V. Alves, J. R. Kitchin, and F. V. Lima. "An inverse mapping approach for process
systems engineering using automatic differentiation and the implicit
function theorem". AIChE Journal, 2023.
https://doi.org/10.1002/aic.18119
'''
# Pyomo/OMLT models are incompatible with the JAX JIT compilation
# this function relies on (PR #33, adapted).
if _is_pyomo_model(model):
raise NotImplementedError(
"Implicit mapping is not supported for Pyomo/OMLT models due "
"to its dependency on JAX JIT compilation. Use AIS2AOS_map "
"(forward) or nlp_based_approach (inverse) instead.")
# Implicit function theorem and pre-configuration steps.
if direction == 'forward':
print('Forward Mapping Selected.')
print('The given domain is recognized as an Available Input Set (AIS).')
print('The result of this mapping is an Achievable Output Set(AOS)')
def F(i, o) : return model(i, o)
def F_io(o, i): return model(i, o)
elif direction == 'inverse':
print('Inverse Mapping Selected.')
print('The given domain is recognized as Desired Output Set (DOS).')
print('The result of this mapping is an Desired Input Set(DIS)')
def F(i, o) : return model(o, i)
def F_io(o, i): return model(o, i)
else:
print('Invalid Mapping Selected. Please select the direction \
to be either "forward" or "inverse"')
# Use JAX.numpy if differentiable programming is available.
if derivative == 'jax':
from jax import config
config.update("jax_enable_x64", True)
config.update('jax_platform_name', 'cpu')
warnings.filterwarnings('ignore', module='jax._src.lib.xla_bridge')
import jax.numpy as jnp
from jax import jit, jacrev
from jax.experimental.ode import odeint as odeint
dFdi = jacrev(F, 0)
dFdo = jacrev(F, 1)
else:
raise ValueError('Currently JAX is the only supported option for '
'calculating derivatives.')
if jit is True:
@jit
def dodi(ii,oo):
return -jnp.linalg.pinv(dFdo(ii,oo)) @ dFdi(ii,oo)
@jit
def dods(oo, s, s_length, i0, iplus):
return dodi(i0 + (s/s_length)*(iplus - i0), oo) \
@((iplus - i0)/s_length)
else:
def dodi(ii, oo): return -jnp.linalg.pinv(dFdo(ii, oo)) @ dFdi(ii, oo)
def dods(oo, s, s_length, i0, iplus): return dodi(
i0 + (s/s_length)*(iplus - i0), oo)@((iplus - i0)/s_length)
# Initialization step: obtaining first solution
# sol = root(F_io, image_init,args=domain_bound[:,0])
# Predictor scheme selection
if continuation == 'Explicit RK4':
print('Selected RK4')
def predict_RK4(dodi, i0, iplus, o0):
h = iplus -i0
k1 = dodi( i0 , o0 )
k2 = dodi( i0 + (1/2)*h, o0 + (h/2) @ k1)
## RK4 classical scheme: each stage uses the previous stage's estimate.
# k1 → k2 → k3 → k4 (do not reuse k1 in k3).
k3 = dodi( i0 + (1/2)*h, o0 + (h/2) @ k2)
k4 = dodi(np.array(i0+h), o0 + h@ k3)
return o0 + (1/6)*(k1 + 2*k2 + 2*k3 + k4) @ h
predict = predict_RK4
do_predict = dodi
elif continuation == 'Explicit Euler':
print('Selected Euler')
def predict_eEuler(dodi, i0, iplus, o0):
return o0 + dodi(i0,o0)@(iplus -i0)
predict = predict_eEuler
do_predict = dodi
elif continuation == 'odeint':
print('Selected odeint')
def predict_odeint(dods, i0, iplus, o0):
s_length = norm(iplus - i0)
s_span = np.linspace(0.0, s_length, 10)
sol = odeint(dods, o0, s_span, s_length, i0, iplus)
return sol[-1,:]
predict = predict_odeint
do_predict = dods
else:
raise ValueError('Invalid continuation method. Choose "Explicit RK4", '
'"Explicit Euler", or "odeint".')
# This code below is a partial implementation of implicit mapping with a
# closed path. It works for applications in which the meshgrid can be
# inferred from a discrete path.
# TODO: Work in a definitive solution that can be generalized.
solv_method = 'hybr'
if domain_points is not None:
# Determine dimension
d = domain_points.shape[1]
# Calculate side length of grid (assumes cubic/square grid)
side_length = int(domain_points.shape[0] ** (1/d))
# Reshape into the desired shape
domain_set = domain_points.reshape(*([side_length]*d), d)
# Update other dependent parameters
nInput = domain_set.shape[-1]
numInput = np.prod([side_length]*d)
domain_resolution = [side_length]*d # inferred resolution
# Pre-allocate image set with nOutput dimensions (not nInput),
# as input and output spaces may differ in non-square problems.
nOutput = image_init.shape[0]
image_set = np.zeros(domain_resolution + [nOutput]) * np.nan
# Initialization step: obtaining first solution
sol = root(F_io, image_init,args=domain_points[0,:], method=solv_method)
image_set[0, 0] = sol.x
for i in range(numInput):
inputID = [0]*nInput
inputID[0] = int(np.mod(i, domain_resolution[0]))
else:
# Pre-alocating the domain set
numInput = np.prod(domain_resolution)
nInput = domain_bound.shape[0]
nOutput = image_init.shape[0]
Input_u = []
# Create discretized AIS based on bounds and resolution information.
for i in range(nInput):
Input_u.append(list(np.linspace(domain_bound[i, 0],
domain_bound[i, 1],
domain_resolution[i])))
domain_set = np.zeros(domain_resolution + [nInput])
image_set = np.zeros(domain_resolution + [nOutput])*np.nan
# Initialization step: obtaining the first solution. The first grid
# cell (0, 0, ...) sits at the lower-bound corner of every input,
# i.e. domain_bound[:, 0]; image_init is the output guess there.
sol = root(F_io, image_init, args=domain_bound[:, 0],
method=solv_method)
image_set[0, 0] = sol.x
for i in range(numInput):
inputID = [0]*nInput
inputID[0] = int(np.mod(i, domain_resolution[0]))
domain_val = [Input_u[0][inputID[0]]]
for j in range(1, nInput):
inputID[j] = int(np.mod(np.floor(i/np.prod(domain_resolution[0:j])),
domain_resolution[j]))
domain_val.append(Input_u[j][inputID[j]])
domain_set[tuple(inputID)] = domain_val
# End of partial implementation. Below we have the OG code that I will leave
# here for my own sake of sanity.
# --------------------------- Previous strategy ------------------------- #
# # Pre-allocating the domain set
# numInput = np.prod(domain_resolution)
# nInput = domain_bound.shape[0]
# nOutput = image_init.shape[0]
# Input_u = []
# # Create discretized AIS based on bounds and resolution information.
# for i in range(nInput):
# Input_u.append(list(np.linspace(domain_bound[i, 0],
# domain_bound[i, 1],
# domain_resolution[i])))
# domain_set = np.zeros(domain_resolution + [nInput])
# image_set = np.zeros(domain_resolution + [nInput])*np.nan
# image_set[0, 0] = sol.x
# for i in range(numInput):
# inputID = [0]*nInput
# inputID[0] = int(np.mod(i, domain_resolution[0]))
# domain_val = [Input_u[0][inputID[0]]]
# for j in range(1, nInput):
# inputID[j] = int(np.mod(np.floor(i/np.prod(domain_resolution[0:j])),
# domain_resolution[j]))
# domain_val.append(Input_u[j][inputID[j]])
# domain_set[tuple(inputID)] = domain_val
# --------------------------- End of Previous strategy ------------------ #
cube_vertices = list()
for n in range(2**(nInput)):
cube_vertices.append([int(x) for x in
np.binary_repr(n, width=nInput)])
# Preallocate domain and image polyhedras
domain_polyhedra = list()
image_polyhedra = list()
for i in tqdm(range(numInput)):
inputID[0] = int(np.mod(i, domain_resolution[0]))
for j in range(1, nInput):
inputID[j] = int(np.mod(np.floor(i/np.prod(domain_resolution[0:j])),
domain_resolution[j]))
if np.prod(inputID) != 0:
inputID = [x-1 for x in inputID]
ID = np.array([inputID]*(2**(nInput))) + np.array(cube_vertices)
V_domain_id = np.zeros((nInput, 2**(nInput)))
V_image_id = np.zeros((nOutput, 2**(nInput)))
for k in range(2**(nInput)):
ID_cell = tuple([int(x) for x in ID[k]])
if k == 0:
domain_0 = domain_set[ID_cell]
image_0 = image_set[ID_cell]
V_domain_id[:, k] = domain_0
V_image_id[:, k] = image_0
else:
if validation == 'predictor-corrector':
if (np.isnan(np.prod(image_set[ID_cell]))
and np.isnan(np.prod(image_0)) == False):
domain_k = domain_set[ID_cell]
V_domain_id[:, k] = domain_k
if step_cutting == True:
test_img_k = predict_eEuler(do_predict,
domain_0,
domain_k,
image_0)
domain_k_test = domain_k
count = 1
condition = max(
abs(F(domain_k_test, test_img_k)))
while ((condition > tol_cor or
np.isnan(condition))
and count < 10):
count = count + 1
test_img_k = predict_eEuler(do_predict,
domain_0,
domain_k_test,
image_0)
domain_k_test = (domain_k_test*(count-1) \
+ domain_0) / count
condition = max(
abs(F(domain_k_test, test_img_k)))
domain_k_step_size = domain_k_test
domain_kk_minus = domain_0
image_kk_minus = image_0
if count < 10:
for kk in range(count):
domain_kk = domain_0 + \
domain_k_step_size*count
image_kk = predict(do_predict,
domain_kk_minus,
domain_kk,
image_kk_minus)
image_kk_minus = image_kk
image_k = image_kk_minus
else:
image_k = predict(do_predict,
domain_0,
domain_k,
image_0)
max_residual = max(abs(F(domain_k, image_k)))
if (max_residual**2 > tol_cor or
np.isnan(max_residual)):
# Call the corrector:
sol = root(F_io, image_0, args=domain_k, method=solv_method)
found_sol = sol.success
# Treat case in which solution is not found:
if found_sol:
image_k = sol.x
else:
image_k = np.nan
image_set[ID_cell] = image_k
V_image_id[:, k] = image_k
else:
domain_k = domain_set[ID_cell]
V_domain_id[:, k] = domain_k
image_k = image_set[ID_cell]
V_image_id[:, k] = image_k
elif validation == 'predictor':
if np.isnan(np.prod(image_set[ID_cell])):
domain_k = domain_set[ID_cell]
V_domain_id[:,k] = domain_k
image_k = predict(do_predict,
domain_0,
domain_k,
image_0)
image_set[ID_cell] = image_k
V_image_id[:,k] = image_k
elif validation == 'corrector':
domain_k = domain_set[ID_cell]
V_domain_id[:,k] = domain_k
sol = root(F_io, image_0, args=domain_k, method=solv_method)
image_k = sol.x
image_set[ID_cell] = image_k
V_image_id[:,k] = image_k
domain_polyhedra.append(V_domain_id)
image_polyhedra.append(V_image_id)
return domain_set, image_set, domain_polyhedra, image_polyhedra
# The functions below are fundamental for operability calculations, though
# typical users won't need to directly interact with them. They play a crucial
# role within 'opyrability' without requiring user intervention, but are
# documented here nevertheless.
[docs]
def get_extreme_vertices(bounds: np.ndarray) -> np.ndarray:
"""
Gets the extreme vertices of any D-dimensional hypercube. This is used to
plot the DOS in 3D.
Author: Victor Alves
Parameters
----------
bounds : np.ndarray
Lower and upper bounds for the Desired Output Set (DOS).
Returns
-------
extreme_vertices : np.ndarray
Extreme vertices for the DOS.
"""
num_dimensions = bounds.shape[0]
num_points = 2 ** num_dimensions
extreme_vertices = np.zeros((num_points, num_dimensions))
for i in range(num_dimensions):
indices = np.arange(num_points) // (2 ** i) % 2
extreme_vertices[:, i] = bounds[i, indices]
return extreme_vertices
[docs]
def process_overlapping_polytopes(bound_box: pc.Polytope,
overlapped_intersection: pc.Region) -> pc.Region:
"""
Eliminate overlaps between polytopes given a bounding box and a region of
potentially overlapping polytopes.
The function aims to process a set of polytopes such that the resultant
polytopes within the specified bounding
box do not overlap with each other.
This was initially suggested by David Vinson in his Ph.D. dissertation
"A new measure of process operability for the improved steady-state design
of chemical processes. Ph.D. Thesis, Lehigh University, USA, 2000."
This has been refined by me to perform the subraction of polytopes if and only
if the polytopes are indeed overlapping. This is checked by
function 'are_overlapping'. Hence, computational time is largely reduced.
In addition, this avoids numerical instability in the LP that could
potentially lead to unbounded solutions (infesaible).
Author: Victor Alves
Parameters
----------
bound_box : pc.Polytope
The bounding polytope within which all other polytopes are contained.
overlapped_intersection : pc.Region
A collection of polytopes (encapsulated in a pc.Region) that may
potentially overlap with each other.
Returns
-------
pc.Region
A region consisting of polytopes that are within the bounding box
and do not overlap with each other.
"""
refined_polytopes = list(overlapped_intersection)
flattened_polytopes = []
for poly in refined_polytopes:
if isinstance(poly, pc.Region):
flattened_polytopes.extend(poly)
else:
flattened_polytopes.append(poly)
refined_polytopes = flattened_polytopes
non_overlapping_polytopes = []
while refined_polytopes:
current_poly = refined_polytopes.pop()
overlapping = False
for poly in non_overlapping_polytopes:
try:
if are_overlapping(current_poly, poly):
# Split the current polytope and add the non-overlapping
# parts back for consideration
diffs = current_poly.diff(poly)
if isinstance(diffs, pc.Region):
refined_polytopes.extend(diffs)
else:
refined_polytopes.append(diffs)
overlapping = True
break
except RuntimeError as e:
print('Some overlapping polytopes could not be resolved.')
continue
# If there was no overlap with existing non-overlapping polytopes,
# add it to the list
if not overlapping:
non_overlapping_polytopes.append(current_poly)
# Intersect each polytope in the non_overlapping_polytopes list with the
# bounding box
final_polytopes = []
for poly in non_overlapping_polytopes:
try:
intersection = pc.intersect(bound_box, poly)
if not pc.is_empty(intersection):
if isinstance(intersection, pc.Region):
final_polytopes.extend(intersection)
else:
final_polytopes.append(intersection)
except RuntimeError as e:
print('Some overlapping polytopes could not be resolved.')
continue
return pc.Region(final_polytopes)
[docs]
def are_overlapping(poly1, poly2):
"""
Check if two polytopes overlap.
Author: Victor Alves
Parameters
----------
poly1 : Polytope
First polytope object.
poly2 : Polytope
Second polytope object.
Returns
-------
bool
True if the polytopes overlap, False otherwise.
"""
return not pc.is_empty(pc.intersect(poly1, poly2))
# --------------------------------------------------------------------------- #
# Dynamic Operability Analysis
#
# State-space projection framework for time-varying systems. The public
# surface mirrors the steady-state pair (multimodel_rep, OI_eval):
# dynamic_operability_mapping -- builds the AOS polytope at each k
# dOI_eval -- evaluates the DOI at each k
# Supporting utilities (simulate_mc_trajectories, plot_dynamic_funnel,
# make_pyomo_step_model) follow. Private helpers are at the end.
#
# Based on: Dinh & Lima, Ind. Eng. Chem. Res., 2023
# Dinh, PhD Dissertation, West Virginia University, 2023
# Dinh & Lima, Comput. Chem. Eng., 2026
#
# Author: Victor Alves
# --------------------------------------------------------------------------- #
from inspect import signature as _inspect_signature
from scipy.spatial import ConvexHull
[docs]
def dynamic_operability_mapping(step_model=None,
x0=None,
AIS_bound=None,
AIS_resolution=5,
k_max=10,
EDS_bound=None,
EDS_resolution=None,
A=None, B=None, C=None, B_d=None,
u_ref=None, y_ref=None, d_ref=None,
convergence_tol=1e-4,
plot=True,
labels=None):
'''
Obtain the multimodel representation of the output-space Achievable
Output Set (AOS) for a dynamic process as it evolves over k time steps.
Propagates the AOS forward using state-space projection from an initial
state x0, mirroring multimodel_rep in the steady-state case. When the
system has disturbances, a worst-case propagation is used: the AOS at
each step is the intersection of the reachable sets under every vertex
of the Expected Disturbance Set (EDS).
Two propagation modes are supported:
-- Nonlinear mode (default): evaluates the user's step_model over
every vertex of the current state polytope and every AIS grid
point, taking the convex hull to obtain the next AOS. Works for
arbitrary nonlinear dynamics.
-- Linear mode (opt-in): if A, B, C are supplied, uses polytope
affine transforms and Minkowski sums (Dinh & Lima, Comp. Chem.
Eng., 2026).
Author: Victor Alves
Parameters
----------
step_model : Callable, Optional.
State transition of the form step_model(x, u) or step_model(x, u, d),
returning the tuple (x_next, y). Arity is auto-detected via
inspect.signature. Can be None ONLY when A, B, C matrices are
supplied (linear fast path); in that case an internal callable
is constructed for downstream simulation and MC sampling.
x0 : np.ndarray
Initial state vector, shape (n_x,). The state-space AOS at k=0
collapses to this single point.
AIS_bound : np.ndarray
Bounds on the Available Input Set (AIS), shape (n_u, 2). Each row
corresponds to the lower and upper bound of each input dimension.
AIS_resolution : int or array-like, Optional.
Discretization grid resolution on the AIS. A scalar applies the
same resolution in every dimension; an array specifies the
resolution per dimension. Resolution of 2 samples only the AIS
corners; higher values capture nonlinear AOS-boundary curvature.
Default is 5.
k_max : int
Maximum number of discrete time steps to propagate the AOS forward.
EDS_bound : np.ndarray, Optional.
Bounds on the Expected Disturbance Set (EDS), shape (n_d, 2).
Required when step_model has arity 3 (or B_d is supplied in the
linear fast path). Default is None.
EDS_resolution : int or array-like, Optional.
Resolution of the disturbance grid. Only the EDS corners are
required for the worst-case intersection semantics, but denser
grids are accepted. Default is None (corners only).
A : np.ndarray, Optional.
Linear state transition matrix, shape (n_x, n_x). Enables the
linear fast path when A, B, and C are all given. Default is None.
B : np.ndarray, Optional.
Linear input matrix, shape (n_x, n_u). Default is None.
C : np.ndarray, Optional.
Linear output matrix, shape (n_y, n_x). Default is None.
B_d : np.ndarray, Optional.
Linear disturbance-input matrix, shape (n_x, n_d). Required for
the linear fast path when disturbances are present. Default is
None.
u_ref : np.ndarray, Optional.
Nominal (reference) input values, shape (n_u,). Linear path only.
Identified LTI models are usually expressed in deviation
variables; supplying u_ref lets the AIS_bound be given in
absolute units, with the deviation u - u_ref fed to the model.
Default is None (AIS_bound already in deviation form).
y_ref : np.ndarray, Optional.
Nominal (reference) output values, shape (n_y,). Linear path
only. When supplied, the output equation becomes
y = C x + y_ref so the AOS (and any DOS used downstream) are in
absolute units. Default is None (outputs in deviation form).
d_ref : np.ndarray, Optional.
Nominal (reference) disturbance values, shape (n_d,). Linear
path only; lets EDS_bound be given in absolute units. Default is
None.
convergence_tol : float, Optional.
Relative change threshold on the AOS volume for early stopping:
the loop exits when |vol(k) - vol(k-1)| / vol(k-1) falls below
this tolerance, i.e., the dynamic funnel has reached steady
shape. Default is 1e-4.
plot : bool, Optional.
Whether to generate the standard AOS evolution and volume plots
(only for 2D or 3D output spaces), mirroring multimodel_rep's
plot-by-default behavior. Default is True.
labels : list, Optional.
Axis labels for the output-space AOS plot. Accepts TeX math input
as it uses matplotlib math rendering. Should be in order y1, y2,
and so on: labels=['first label','second label']. Default is None.
Returns
-------
results : dict
Dictionary containing the following entries:
'AOS_regions' : list of pc.Region
Output-space AOS at each time step k = 1..K. Each Region
contains one polytope and is directly compatible with
OI_eval (used internally by dOI_eval).
'AOS_x' : list of pc.Polytope
State-space AOS at each time step, starting from k = 0.
'volumes' : np.ndarray
AOS_y Lebesgue volume per step, used for the convergence
check and the standard diagnostic plot.
'k_converged' : int or None
Step at which the volume-change tolerance was first met;
None if the loop ran to k_max.
Additional private keys ('_step_model', '_arity', '_x0',
'_AIS_bound', '_EDS_bound', '_matrices') are included so that
downstream routines (dOI_eval, simulate_mc_trajectories,
plot_dynamic_funnel) can reuse the system definition without
requiring the user to re-specify it.
References
----------
[1] S. Dinh and F. V. Lima. Dynamic operability analysis for process
design and control of modular natural gas utilization systems.
Ind. Eng. Chem. Res., 2023.
https://doi.org/10.1021/acs.iecr.2c03543
[2] S. Dinh. Nonlinear Dynamic Analysis and Control of Chemical
Processes Using Dynamic Operability Framework. PhD Dissertation,
West Virginia University, 2023.
[3] S. Dinh and F. V. Lima. Linear dynamic operability analysis with
state-space projection for the online construction of achievable
output funnels. Comput. Chem. Eng., 205:109428, 2026.
https://doi.org/10.1016/j.compchemeng.2025.109428
'''
# Validate the two always-required arguments. Everything else is mode-
# dependent (linear vs nonlinear, EDS present or not).
if x0 is None:
raise ValueError("x0 is required.")
if AIS_bound is None:
raise ValueError("AIS_bound is required.")
x0 = np.asarray(x0, dtype=float)
AIS_bound = np.asarray(AIS_bound, dtype=float)
if EDS_bound is not None:
EDS_bound = np.asarray(EDS_bound, dtype=float)
# Decide between the linear (fast) and nonlinear (general) propagation
# paths. Linear mode activates when the user supplies A, B, and C; the
# step_model callable becomes optional since the same dynamics can be
# constructed from the matrices for downstream simulation use.
linear_mode = (A is not None and B is not None and C is not None)
if not linear_mode and step_model is None:
raise ValueError(
"step_model must be provided unless A, B, C matrices are given."
)
# In linear mode we wrap the matrices into an equivalent callable so
# that simulate_mc_trajectories and other downstream consumers never
# have to branch on linear-vs-nonlinear. Arity follows the presence
# of a disturbance-input matrix.
A_arr = B_arr = C_arr = Bd_arr = None
uref_arr = yref_arr = dref_arr = None
if linear_mode:
A_arr = np.asarray(A, dtype=float)
B_arr = np.asarray(B, dtype=float)
C_arr = np.asarray(C, dtype=float)
Bd_arr = np.asarray(B_d, dtype=float) if B_d is not None else None
# Reference (nominal) values let deviation-form identified models
# work with AIS/EDS bounds and outputs expressed in absolute
# units: deviations are formed internally and y_ref is added back
# to the outputs.
uref_arr = (np.zeros(B_arr.shape[1]) if u_ref is None
else np.asarray(u_ref, dtype=float))
yref_arr = (np.zeros(C_arr.shape[0]) if y_ref is None
else np.asarray(y_ref, dtype=float))
if Bd_arr is not None:
dref_arr = (np.zeros(Bd_arr.shape[1]) if d_ref is None
else np.asarray(d_ref, dtype=float))
# In linear mode the arity is dictated by whether the user asked
# for a disturbance analysis on this particular run (EDS_bound
# present), not by whether B_d is defined in the system. This lets
# the same matrices be reused for both disturbance-free offline
# and disturbance-aware robust runs.
if EDS_bound is None or Bd_arr is None:
def _linear_step(x, u):
x_next = A_arr @ x + B_arr @ (u - uref_arr)
y = C_arr @ x_next + yref_arr
return x_next, y
arity = 2
else:
def _linear_step(x, u, d):
x_next = (A_arr @ x + B_arr @ (u - uref_arr)
+ Bd_arr @ (d - dref_arr))
y = C_arr @ x_next + yref_arr
return x_next, y
arity = 3
step_callable = _linear_step
elif u_ref is not None or y_ref is not None or d_ref is not None:
raise ValueError(
"u_ref/y_ref/d_ref are only supported in the linear path "
"(A, B, C matrices); fold reference values into the "
"step_model callable instead.")
else:
# Nonlinear mode: detect the arity of the user's step function.
# 2 args => step(x, u); 3 args => step(x, u, d). Anything else
# is ambiguous and raises.
sig = _inspect_signature(step_model)
n_params = len(sig.parameters)
if n_params not in (2, 3):
raise ValueError(
"step_model must accept 2 or 3 arguments (x, u) or "
"(x, u, d); got {} parameters.".format(n_params)
)
arity = n_params
step_callable = step_model
# Cross-check arity against the presence of EDS_bound so the user
# gets a clear message instead of a silent mismatch at runtime.
if arity == 2 and EDS_bound is not None:
raise ValueError(
"step_model has 2 arguments (no disturbance) but EDS_bound "
"was provided. Use a 3-argument step(x, u, d) (or supply "
"B_d in the linear path) if disturbances are relevant."
)
if arity == 3 and EDS_bound is None:
raise ValueError(
"step_model has 3 arguments (with disturbance) but "
"EDS_bound was not provided."
)
# ----- Build the AIS sample grid ----- #
# Resolution of 2 samples only the AIS corners (polygonal AOS
# boundary). Higher values capture nonlinear curvature of the
# reachable-set boundary, matching the discretization convention
# used in multimodel_rep on the steady-state side.
n_u = AIS_bound.shape[0]
if np.isscalar(AIS_resolution):
AIS_res_arr = np.full(n_u, int(AIS_resolution), dtype=int)
else:
AIS_res_arr = np.asarray(AIS_resolution, dtype=int)
AIS_res_arr = np.maximum(AIS_res_arr, 2)
input_linspaces = [np.linspace(AIS_bound[i, 0], AIS_bound[i, 1],
AIS_res_arr[i])
for i in range(n_u)]
input_grids = np.meshgrid(*input_linspaces, indexing='ij')
input_vertices = np.column_stack([g.ravel() for g in input_grids])
# ----- Build the EDS sample grid (only if disturbances present) ----- #
# Worst-case semantics only require the EDS corners, but the user can
# request a denser grid via EDS_resolution. The resulting vertex set
# is looped over and the reachable polytopes are intersected below.
disturbance_vertices = None
if EDS_bound is not None:
n_d = EDS_bound.shape[0]
if EDS_resolution is None:
EDS_res_arr = np.full(n_d, 2, dtype=int)
elif np.isscalar(EDS_resolution):
EDS_res_arr = np.full(n_d, int(EDS_resolution), dtype=int)
else:
EDS_res_arr = np.asarray(EDS_resolution, dtype=int)
EDS_res_arr = np.maximum(EDS_res_arr, 2)
dist_linspaces = [np.linspace(EDS_bound[i, 0], EDS_bound[i, 1],
EDS_res_arr[i])
for i in range(n_d)]
dist_grids = np.meshgrid(*dist_linspaces, indexing='ij')
disturbance_vertices = np.column_stack([g.ravel()
for g in dist_grids])
# AIS polytope is only needed by the linear-mode Minkowski sum path.
# The deviation-form dynamics consume u - u_ref, so the box is shifted
# by the reference input before entering the Minkowski sums.
AIS_poly = (pc.box2poly(AIS_bound - uref_arr[:, None])
if linear_mode else None)
# ----- Initialize AOS at k=0 ----- #
# The state is fully known at k=0, so the initial state-space AOS is
# a degenerate point at x0. The helper pads it with a tiny box
# (eps=1e-6) so the polytope library treats it as full-dimensional.
AOS_x_list = [_point_or_degenerate_polytope(x0.reshape(1, -1))]
AOS_regions = []
volumes = []
k_converged = None
# ----- Main propagation loop over k ----- #
for k in range(k_max):
# ----- State-space propagation: AOS_x(k) -> AOS_x(k+1) ----- #
if linear_mode:
if disturbance_vertices is None:
# Single Minkowski sum: A X(k) (+) B AIS.
AOS_x_next = _propagate_state_linear(
A_arr, B_arr, None, AOS_x_list[k], AIS_poly
)
else:
# Achievable-set semantics: A X(k) (+) B AIS (+) B_d EDS.
# The Minkowski sum with the full EDS polytope gives the
# union of reachable states over every disturbance in EDS,
# matching the convention used on the steady-state side.
# The box is shifted by d_ref for deviation-form models.
EDS_poly_full = pc.box2poly(EDS_bound - dref_arr[:, None])
AOS_x_next = _propagate_state_linear(
A_arr, B_arr, Bd_arr, AOS_x_list[k],
AIS_poly, EDS_polytope=EDS_poly_full
)
AOS_y_points = None # deferred until after reduction
else:
# Extract vertices of the current state polytope for
# propagation. On the first iteration this is just x0 (the
# degenerate-point polytope pads a tiny box around it).
state_verts = pc.extreme(AOS_x_list[k])
if state_verts is None or len(state_verts) == 0:
state_verts = x0.reshape(1, -1)
if arity == 2:
# No disturbance: propagate all (state-vertex, input-
# vertex) pairs through the step function and take the
# convex hull of the resulting next states. Output points
# are collected at the same time so we do not have to
# call the step function a second time.
next_pts, next_outputs = _propagate_state_nonlinear(
step_callable, state_verts, input_vertices
)
AOS_x_next = _hull_or_degenerate(next_pts)
AOS_y_points = next_outputs
else:
# Achievable-set semantics under disturbances: propagate
# every (state vertex, input vertex, disturbance vertex)
# triplet, then take the convex hull. The AOS therefore
# reflects the union of reachable states/outputs over all
# admissible realizations in EDS, matching the steady-
# state convention.
all_next_pts = []
all_outputs = []
for dv in disturbance_vertices:
next_pts, next_outputs = _propagate_state_nonlinear(
step_callable, state_verts, input_vertices,
d_vec=dv
)
all_next_pts.append(next_pts)
all_outputs.append(next_outputs)
all_next_pts = np.vstack(all_next_pts)
AOS_x_next = _hull_or_degenerate(all_next_pts)
AOS_y_points = np.vstack(all_outputs)
# Reduce the state polytope to its minimal half-space
# representation to keep the polytope representation compact
# across iterations.
AOS_x_next = pc.reduce(AOS_x_next)
AOS_x_list.append(AOS_x_next)
# ----- Output-space AOS ----- #
# In linear mode, or whenever the nonlinear path did not already
# build the output points, map state polytope vertices through
# the output equation to obtain AOS_y.
if AOS_y_points is None:
out_verts = pc.extreme(AOS_x_next)
if out_verts is None or len(out_verts) == 0:
AOS_y_points = (C_arr @ x0 + yref_arr).reshape(1, -1)
else:
AOS_y_points = out_verts @ C_arr.T + yref_arr
AOS_y_poly = _hull_or_degenerate(AOS_y_points)
AOS_regions.append(pc.Region([AOS_y_poly]))
# Record the Lebesgue volume of AOS_y; this drives the early-
# stopping convergence heuristic and the volume-evolution plot.
volumes.append(AOS_y_poly.volume)
# ----- Early-stopping check ----- #
# Stop once the relative change in AOS volume falls below the
# user-specified tolerance, indicating the funnel has reached
# a steady shape.
if k > 0:
prev_vol = volumes[-2]
if prev_vol > 0:
rel_change = abs(volumes[-1] - prev_vol) / prev_vol
else:
rel_change = abs(volumes[-1] - prev_vol)
if rel_change < convergence_tol:
k_converged = k
break
volumes = np.asarray(volumes)
# Pack the results. Private keys carry the system definition forward
# so that dOI_eval, simulate_mc_trajectories, and plot_dynamic_funnel
# never require the user to redundantly respecify the system.
results = {
'AOS_regions': AOS_regions,
'AOS_x': AOS_x_list,
'volumes': volumes,
'k_converged': k_converged,
'_step_model': step_callable,
'_arity': arity,
'_x0': x0,
'_AIS_bound': AIS_bound,
'_EDS_bound': EDS_bound,
'_matrices': ((A_arr, B_arr, C_arr, Bd_arr)
if linear_mode else None),
'_refs': ((uref_arr, yref_arr, dref_arr)
if linear_mode else None),
}
# Auto-plot the AOS evolution and the volume curve, matching the
# plot-by-default behavior of multimodel_rep on the steady-state side.
if plot:
_plot_AOS_evolution(AOS_regions, labels=labels)
_plot_volume_evolution(volumes)
return results
[docs]
def dynamic_operability_nstep(step_model,
x0,
AIS_bound,
k_max,
AIS_resolution=3,
y0=None,
max_vertices=80,
convergence_tol=1e-3,
plot=False,
labels=None):
'''
Build the dynamic Achievable Output Set (AOS) funnel by direct n-step
simulation of a nonlinear step model, reproducing the construction used
in Dinh & Lima (IECR, 2023, Figures 8-9).
Unlike dynamic_operability_mapping, which propagates a state-space
polytope and is therefore limited to low-dimensional states, this
routine works entirely in the output space and is suited to
high-dimensional-state models (e.g. spatially discretized PDE/reactor
models with hundreds of states). From a fixed initial state x0 it
forward-simulates input sequences and takes the convex hull of the
reachable outputs at each step k. The reachable set is propagated by
vertex tracking: every retained boundary state is branched over the
AIS grid, the resulting outputs are hulled to form AOS(k), and only the
states whose outputs land on that hull are retained for the next step.
This keeps the vertex count bounded while tracing the AOS boundary,
avoiding the scenario-tree blow-up of full input-sequence enumeration.
Disturbances are handled by the caller: wrap the disturbance into a
fixed-d, two-argument closure step(x, u) and call this routine once per
disturbance value, then intersect the resulting AOS regions step-by-step
(pc.intersect) to obtain the disturbance-robust funnel (Figure 9).
The returned dictionary follows the same contract as
dynamic_operability_mapping, so dOI_eval, simulate_mc_trajectories, and
plot_dynamic_funnel can be chained directly afterward.
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
step_model : Callable
Two-argument state transition step_model(x, u) returning the tuple
(x_next, y). x is the full (possibly high-dimensional) state and y
is the output vector. The step integrates the dynamics over one
discrete time step internally.
x0 : np.ndarray
Initial state vector. The funnel starts from this single point.
AIS_bound : np.ndarray
Bounds on the Available Input Set, shape (n_u, 2).
k_max : int
Maximum number of discrete time steps to propagate forward.
AIS_resolution : int or array-like, Optional.
Grid resolution on the AIS used to branch each retained state. A
scalar applies the same resolution to every input dimension.
Default is 3 (matching the MATLAB reference implementation).
y0 : np.ndarray, Optional.
Output at the initial state x0. When provided, a degenerate AOS
point at y0 is prepended as the k=0 funnel slice so the funnel
starts from the steady state. Default is None.
max_vertices : int, Optional.
Cap on the number of boundary states retained between steps. When
the hull has more vertices, they are evenly subsampled. Default 80.
convergence_tol : float, Optional.
Relative change in AOS area below which the funnel is considered
time-invariant and propagation stops early. Default is 1e-3.
plot : bool, Optional.
Whether to draw the AOS evolution and area curves. Default False.
labels : list, Optional.
Output-space axis labels. Default None.
Returns
-------
results : dict
Same contract as dynamic_operability_mapping: 'AOS_regions' (list of
pc.Region per step), 'volumes', 'k_converged', plus the private keys
'_step_model', '_arity', '_x0', '_AIS_bound', '_EDS_bound',
'_matrices' used by the downstream dynamic-operability functions.
References
----------
[1] S. Dinh and F. V. Lima. Dynamic operability analysis for process
design and control of modular natural gas utilization systems.
Ind. Eng. Chem. Res., 2023.
https://doi.org/10.1021/acs.iecr.2c03543
'''
from scipy.spatial import ConvexHull
x0 = np.asarray(x0, dtype=float)
AIS_bound = np.asarray(AIS_bound, dtype=float)
n_u = AIS_bound.shape[0]
# Build the AIS grid that every retained state is branched over.
if np.isscalar(AIS_resolution):
res = [int(AIS_resolution)] * n_u
else:
res = [int(r) for r in AIS_resolution]
res = [max(r, 2) for r in res]
axes = [np.linspace(AIS_bound[i, 0], AIS_bound[i, 1], res[i])
for i in range(n_u)]
grids = np.meshgrid(*axes, indexing='ij')
input_grid = np.column_stack([g.ravel() for g in grids])
def _hull_vertex_indices(points):
# Indices of the convex-hull vertices of a 2D (or nD) point cloud.
# Falls back to all points when the cloud is degenerate (collinear,
# coincident, or too few points for qhull).
pts = np.asarray(points, dtype=float)
if pts.shape[0] < pts.shape[1] + 1:
return list(range(pts.shape[0]))
try:
return list(ConvexHull(pts).vertices)
except Exception:
return list(range(pts.shape[0]))
def _dedup(states, ys, decimals=6):
# Collapse boundary states that map to the same output (rounded),
# keeping the vertex set small without losing distinct boundary pts.
seen = {}
for s, y in zip(states, ys):
key = tuple(np.round(np.atleast_1d(y), decimals))
if key not in seen:
seen[key] = s
return list(seen.values())
AOS_regions = []
volumes = []
k_converged = None
# Optional k=0 slice: a degenerate point at the steady-state output.
if y0 is not None:
y0 = np.atleast_1d(np.asarray(y0, dtype=float))
AOS_regions.append(
pc.Region([_point_or_degenerate_polytope(y0.reshape(1, -1))]))
volumes.append(0.0)
kept_states = [x0]
for k in range(k_max):
cand_states = []
cand_y = []
for s in kept_states:
for u in input_grid:
x_next, y_next = step_model(s, u)
cand_states.append(np.asarray(x_next, dtype=float))
cand_y.append(np.atleast_1d(np.asarray(y_next, dtype=float)))
cand_y = np.asarray(cand_y, dtype=float)
# AOS(k) is the convex hull of all reachable outputs this step.
AOS_poly = _hull_or_degenerate(cand_y)
AOS_regions.append(pc.Region([AOS_poly]))
volumes.append(AOS_poly.volume)
# Retain only the boundary states for the next step's branching.
idx = _hull_vertex_indices(cand_y)
kept_states = _dedup([cand_states[i] for i in idx],
[cand_y[i] for i in idx])
if len(kept_states) > max_vertices:
sel = np.linspace(0, len(kept_states) - 1,
max_vertices).astype(int)
kept_states = [kept_states[i] for i in sel]
# Early stop once the funnel area stops changing.
if len(volumes) > 1 and volumes[-2] > 0:
rel = abs(volumes[-1] - volumes[-2]) / volumes[-2]
if rel < convergence_tol:
k_converged = k
break
volumes = np.asarray(volumes, dtype=float)
results = {
'AOS_regions': AOS_regions,
'volumes': volumes,
'k_converged': k_converged,
'_step_model': step_model,
'_arity': 2,
'_x0': x0,
'_AIS_bound': AIS_bound,
'_EDS_bound': None,
'_matrices': None,
'_refs': None,
}
if plot:
_plot_AOS_evolution(AOS_regions, labels=labels)
_plot_volume_evolution(volumes)
return results
[docs]
def dOI_eval(mapping_results,
DOS,
plot=True,
labels=None):
'''
Evaluate the Dynamic Operability Index (dOI) at each time step of a
dynamic operability mapping. For each step k, the output-space AOS
region is intersected with the Desired Output Set (DOS) using the
existing steady-state OI_eval routine, and the resulting scalar OI
value is collected into a time-series. This function is the dynamic
counterpart to OI_eval and is meant to be chained directly after
dynamic_operability_mapping.
Author: Victor Alves
Parameters
----------
mapping_results : dict
Dictionary returned by dynamic_operability_mapping. The list of
output-space AOS regions is read from the 'AOS_regions' key.
DOS : np.ndarray
Bounds on the Desired Output Set (DOS), shape (n_y, 2). Each row
corresponds to the lower and upper bound of each output dimension.
Same convention as the DS argument of OI_eval on the steady-state
side.
plot : bool, Optional.
Whether to plot the dOI convergence curve (dOI vs time step k).
Default is True.
labels : list, Optional.
Currently unused; reserved for symmetry with OI_eval's signature.
Default is None.
Returns
-------
dOI : np.ndarray
Dynamic Operability Index at each time step, shape (K,), values
in the range [0, 100].
References
----------
[1] S. Dinh and F. V. Lima. Dynamic operability analysis for process
design and control of modular natural gas utilization systems.
Ind. Eng. Chem. Res., 2023.
https://doi.org/10.1021/acs.iecr.2c03543
[2] S. Dinh. Nonlinear Dynamic Analysis and Control of Chemical
Processes Using Dynamic Operability Framework. PhD Dissertation,
West Virginia University, 2023.
[3] S. Dinh and F. V. Lima. Linear dynamic operability analysis with
state-space projection for the online construction of achievable
output funnels. Comput. Chem. Eng., 205:109428, 2026.
https://doi.org/10.1016/j.compchemeng.2025.109428
'''
# Pull the time-indexed AOS regions; each is a pc.Region wrapping the
# output-space polytope at that step, as expected by OI_eval.
AOS_regions = mapping_results['AOS_regions']
DOS = np.asarray(DOS, dtype=float)
# Compute the DOI per step by delegating to the steady-state OI_eval.
# Plotting is suppressed there; the dynamic convergence curve is
# drawn separately from the collected time-series below.
dOI_values = []
for region in AOS_regions:
dOI_k = _dOI_at_step(region, DOS)
dOI_values.append(dOI_k)
dOI_array = np.asarray(dOI_values, dtype=float)
# Plot the dOI convergence trajectory if requested.
if plot and len(dOI_array) > 0:
_plot_dOI_convergence(dOI_array)
return dOI_array
[docs]
def simulate_mc_trajectories(mapping_results,
n_steps=None,
n_trajectories=50,
seed=None):
'''
Simulate Monte Carlo output trajectories using randomly sampled AIS
(and, if applicable, EDS) inputs at each time step. The step model
and system definition are read directly from the dictionary returned
by dynamic_operability_mapping, so no system parameters need to be
repeated by the user. Trajectories are useful for verifying that
arbitrary admissible input sequences yield outputs contained in the
AOS funnel.
Author: Victor Alves
Parameters
----------
mapping_results : dict
Dictionary returned by dynamic_operability_mapping. The step
callable, its arity, the initial state, the AIS bounds, and
(optionally) the EDS bounds are read from the private keys
'_step_model', '_arity', '_x0', '_AIS_bound', and '_EDS_bound'.
n_steps : int, Optional.
Number of time steps to simulate. If None (default), the number
of steps is taken from the length of mapping_results['AOS_regions'].
n_trajectories : int, Optional.
Number of independent random trajectories to simulate. Default
is 50.
seed : int, Optional.
Random seed for reproducibility. Default is None.
Returns
-------
trajectories : np.ndarray
Output trajectories, shape (n_trajectories, n_steps + 1, n_y).
Index 0 along axis 1 corresponds to k=0 (the initial output y0
obtained by evaluating the step model from x0 with the first
sampled input).
References
----------
[1] S. Dinh and F. V. Lima. Dynamic operability analysis for process
design and control of modular natural gas utilization systems.
Ind. Eng. Chem. Res., 2023.
https://doi.org/10.1021/acs.iecr.2c03543
'''
# Recover the system definition stored by dynamic_operability_mapping.
step_model = mapping_results['_step_model']
arity = mapping_results['_arity']
x0 = np.asarray(mapping_results['_x0'])
AIS_bound = np.asarray(mapping_results['_AIS_bound'])
EDS_bound = mapping_results.get('_EDS_bound')
# If the user did not provide n_steps, match the mapping horizon so
# MC trajectories line up one-to-one with the funnel polytopes.
if n_steps is None:
n_steps = len(mapping_results['AOS_regions'])
rng = np.random.default_rng(seed)
n_u = AIS_bound.shape[0]
n_d = EDS_bound.shape[0] if (arity == 3 and EDS_bound is not None) else 0
# Probe the output dimension by running a single step from x0 with
# input/disturbance at the AIS/EDS center. Using means avoids
# triggering edge-case corner behavior during the probe.
u_center = AIS_bound.mean(axis=1)
if arity == 2:
_, y_probe = step_model(x0, u_center)
else:
d_center = EDS_bound.mean(axis=1)
_, y_probe = step_model(x0, u_center, d_center)
y_probe = np.atleast_1d(np.asarray(y_probe, dtype=float))
n_y = y_probe.shape[0]
trajectories = np.zeros((n_trajectories, n_steps + 1, n_y))
# Simulate each trajectory by drawing uniform samples from the AIS
# (and EDS, when disturbances are present) at every step.
for t in range(n_trajectories):
x = x0.copy()
# Initial output at k=0: a single sample to fill index 0.
if arity == 2:
u0 = (AIS_bound[:, 0]
+ rng.random(n_u) * (AIS_bound[:, 1] - AIS_bound[:, 0]))
_, y0_sample = step_model(x, u0)
else:
u0 = (AIS_bound[:, 0]
+ rng.random(n_u) * (AIS_bound[:, 1] - AIS_bound[:, 0]))
d0 = (EDS_bound[:, 0]
+ rng.random(n_d) * (EDS_bound[:, 1] - EDS_bound[:, 0]))
_, y0_sample = step_model(x, u0, d0)
trajectories[t, 0, :] = np.asarray(y0_sample, dtype=float)
# Propagate forward n_steps times and store each output.
for k in range(n_steps):
u = (AIS_bound[:, 0]
+ rng.random(n_u) * (AIS_bound[:, 1] - AIS_bound[:, 0]))
if arity == 2:
x, y = step_model(x, u)
else:
d = (EDS_bound[:, 0]
+ rng.random(n_d)
* (EDS_bound[:, 1] - EDS_bound[:, 0]))
x, y = step_model(x, u, d)
trajectories[t, k + 1, :] = np.asarray(y, dtype=float)
return trajectories
def _plotly_camera(view_elev, view_azim, r=2.0):
'''Convert matplotlib-style elevation/azimuth angles (degrees) to a
plotly scene camera eye position.'''
e = np.radians(view_elev)
az = np.radians(view_azim)
return dict(eye=dict(x=r * np.cos(e) * np.cos(az),
y=r * np.cos(e) * np.sin(az),
z=r * np.sin(e)))
def _plotly_colorscale(colormap, n=33):
'''Build a plotly colorscale from a matplotlib colormap name.'''
from matplotlib import colors as mcolors
cmap = plt.get_cmap(colormap)
return [[float(f), mcolors.to_hex(cmap(float(f)))]
for f in np.linspace(0.0, 1.0, n)]
def _import_plotly():
try:
import plotly.graph_objects as go
except ImportError as exc:
raise ImportError(
"engine='plotly' requires the plotly package. Install it "
"with: pip install plotly") from exc
return go
def _plotly_funnel(AOS_regions, DOS=None, dOI=None, labels=None,
alpha=0.25, colormap='rainbow', view_elev=20,
view_azim=-60, orientation='landscape',
mc_trajectories=None, mc_color='green', mc_alpha=0.4,
mc_linewidth=0.6):
'''Interactive plotly version of the dynamic operability funnel.
Returns a plotly Figure that supports pan/rotate/zoom both in Jupyter
and in static HTML exports (e.g. the Jupyter Book documentation).'''
go = _import_plotly()
from matplotlib import colors as mcolors
landscape = orientation == 'landscape'
n_steps = len(AOS_regions)
use_dOI = dOI is not None and len(dOI) == n_steps
dOI_arr = np.asarray(dOI, dtype=float) if use_dOI else None
cmap = plt.get_cmap(colormap)
def coords(V, k_val):
kk = np.full(len(V), float(k_val))
if landscape:
return V[:, 0], kk, V[:, 1]
return V[:, 0], V[:, 1], kk
fig = go.Figure()
# DOS reference: a rectangular column swept along the time axis, drawn
# with darker end faces plus thick near-black outlines and connector edges
# so it stands out clearly behind the funnel.
if DOS is not None:
dos = np.asarray(DOS, dtype=float)
dos_edge = '#222222'
rect = np.array([[dos[0, 0], dos[1, 0]], [dos[0, 1], dos[1, 0]],
[dos[0, 1], dos[1, 1]], [dos[0, 0], dos[1, 1]]])
first_dos = True
for k_val in (1, n_steps):
# End face (darker fill).
x, y, z = coords(rect, k_val)
fig.add_trace(go.Mesh3d(
x=x, y=y, z=z, i=[0, 0], j=[1, 2], k=[2, 3],
color=DS_COLOR, opacity=0.5, hoverinfo='skip',
name='DOS', legendgroup='DOS', showlegend=False))
# Thick dark outline of the end face (also the legend entry).
loop = np.vstack([rect, rect[0:1]])
xo, yo, zo = coords(loop, k_val)
fig.add_trace(go.Scatter3d(
x=xo, y=yo, z=zo, mode='lines',
line=dict(color=dos_edge, width=6),
name='DOS', legendgroup='DOS', showlegend=first_dos,
hoverinfo='skip'))
first_dos = False
# Thick dark connector edges joining the two faces into a column.
for v in rect:
seg = np.array([v, v])
x, y, z = coords(seg, 1)
x2, y2, z2 = coords(seg, n_steps)
fig.add_trace(go.Scatter3d(
x=[x[0], x2[0]], y=[y[0], y2[0]], z=[z[0], z2[0]],
mode='lines',
line=dict(color=dos_edge, width=6),
legendgroup='DOS', hoverinfo='skip', showlegend=False))
# One filled slice (Mesh3d fan triangulation) + outline per step.
for i, region in enumerate(AOS_regions):
verts = pc.extreme(region.list_poly[0])
k_val = i + 1
if use_dOI:
frac = float(np.clip(dOI_arr[i] / 100.0, 0.0, 1.0))
else:
frac = i / max(n_steps - 1, 1)
color = mcolors.to_hex(cmap(frac))
hover = (f'k = {k_val}' + (f'<br>dOI = {dOI_arr[i]:.1f} %'
if use_dOI else ''))
if verts is None or len(verts) < 3:
# Degenerate slice (e.g. the k = 0 steady-state point).
if verts is not None and len(verts) > 0:
x, y, z = coords(np.atleast_2d(verts.mean(axis=0)), k_val)
fig.add_trace(go.Scatter3d(
x=x, y=y, z=z, mode='markers',
marker=dict(size=4, color=color),
hovertext=hover, hoverinfo='text', showlegend=False))
continue
center = verts.mean(axis=0)
order = np.argsort(np.arctan2(verts[:, 1] - center[1],
verts[:, 0] - center[0]))
ordered = verts[order]
m = len(ordered)
x, y, z = coords(ordered, k_val)
fig.add_trace(go.Mesh3d(
x=x, y=y, z=z,
i=[0] * (m - 2), j=list(range(1, m - 1)),
k=list(range(2, m)),
color=color, opacity=alpha + 0.1,
hovertext=hover, hoverinfo='text', showlegend=False))
loop = np.vstack([ordered, ordered[0:1]])
x, y, z = coords(loop, k_val)
fig.add_trace(go.Scatter3d(
x=x, y=y, z=z, mode='lines',
line=dict(color=color, width=4),
hovertext=hover, hoverinfo='text', showlegend=False))
# Monte Carlo trajectories: one trace with None-separated segments.
if mc_trajectories is not None:
xs, ys_, zs = [], [], []
n_traj, n_pts, _ = mc_trajectories.shape
for t in range(n_traj):
traj = mc_trajectories[t]
ks = np.arange(n_pts, dtype=float)
if landscape:
xs += list(traj[:, 0]) + [None]
ys_ += list(ks) + [None]
zs += list(traj[:, 1]) + [None]
else:
xs += list(traj[:, 0]) + [None]
ys_ += list(traj[:, 1]) + [None]
zs += list(ks) + [None]
fig.add_trace(go.Scatter3d(
x=xs, y=ys_, z=zs, mode='lines',
line=dict(color=mc_color, width=max(1, int(mc_linewidth * 3))),
opacity=mc_alpha, hoverinfo='skip', showlegend=False))
# Invisible marker carrying the colorbar (dOI % or time step).
cbar_title = 'dOI (%)' if use_dOI else 'Time step k'
cmax = 100 if use_dOI else n_steps
fig.add_trace(go.Scatter3d(
x=[None, None], y=[None, None], z=[None, None], mode='markers',
marker=dict(size=0.1, color=[0, cmax], cmin=0, cmax=cmax,
colorscale=_plotly_colorscale(colormap),
showscale=True,
colorbar=dict(title=cbar_title, len=0.6)),
hoverinfo='skip', showlegend=False))
y1_label = labels[0] if (labels is not None and len(labels) >= 2) \
else 'y1'
y2_label = labels[1] if (labels is not None and len(labels) >= 2) \
else 'y2'
if landscape:
scene = dict(xaxis_title=y1_label, yaxis_title='Time step k',
zaxis_title=y2_label)
else:
scene = dict(xaxis_title=y1_label, yaxis_title=y2_label,
zaxis_title='Time step k')
scene['aspectmode'] = 'cube'
scene['camera'] = _plotly_camera(view_elev, view_azim)
fig.update_layout(scene=scene, width=850, height=650,
title='Dynamic Operability Funnel',
margin=dict(l=0, r=0, t=40, b=0))
return fig
def _plotly_scenarios(per, inter_regions, n_k, labels_list, labels=None,
colors=None, title=None, orientation='landscape',
view_elev=18, view_azim=-50):
'''Interactive plotly version of the scenario funnels + robust
intersection plot. Pan/rotate/zoom works in Jupyter and static HTML.'''
go = _import_plotly()
landscape = orientation == 'landscape'
def _ordered(poly):
V = pc.extreme(poly)
if V is None or len(V) < 3:
return V
c = V.mean(axis=0)
return V[np.argsort(np.arctan2(V[:, 1] - c[1], V[:, 0] - c[0]))]
def coords(V, k_val):
kk = np.full(len(V), float(k_val))
if landscape:
return V[:, 0], kk, V[:, 1]
return V[:, 0], V[:, 1], kk
if colors is None:
cycle = plt.rcParams['axes.prop_cycle'].by_key().get(
'color', ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728'])
colors = [cycle[i % len(cycle)] for i in range(len(labels_list))]
fig = go.Figure()
# Scenario outlines, one None-separated trace per scenario so the
# legend has exactly one entry per scenario.
for j, lab in enumerate(labels_list):
regions = per[lab]['AOS_regions']
xs, ys_, zs = [], [], []
for k in range(min(n_k, len(regions))):
V = _ordered(regions[k].list_poly[0])
if V is None:
continue
loop = np.vstack([V, V[0:1]])
x, y, z = coords(loop, k)
xs += list(x) + [None]
ys_ += list(y) + [None]
zs += list(z) + [None]
fig.add_trace(go.Scatter3d(
x=xs, y=ys_, z=zs, mode='lines',
line=dict(color=colors[j], width=3),
name=lab, hoverinfo='name'))
# Robust intersection, filled in blue.
showleg = True
for k in range(n_k):
reg = inter_regions[k]
if len(reg) == 0:
continue
V = _ordered(reg.list_poly[0])
if V is None or len(V) < 3:
continue
m = len(V)
x, y, z = coords(V, k)
fig.add_trace(go.Mesh3d(
x=x, y=y, z=z,
i=[0] * (m - 2), j=list(range(1, m - 1)),
k=list(range(2, m)),
color='blue', opacity=0.55,
name='Robust intersection', showlegend=showleg,
hovertext=f'k = {k} (robust)', hoverinfo='text'))
showleg = False
y1_label = labels[0] if (labels is not None and len(labels) >= 2) \
else 'y1'
y2_label = labels[1] if (labels is not None and len(labels) >= 2) \
else 'y2'
if landscape:
scene = dict(xaxis_title=y1_label, yaxis_title='Time step k',
zaxis_title=y2_label)
else:
scene = dict(xaxis_title=y1_label, yaxis_title=y2_label,
zaxis_title='Time step k')
scene['aspectmode'] = 'cube'
scene['camera'] = _plotly_camera(view_elev, view_azim)
fig.update_layout(scene=scene, width=850, height=650,
title=(title or 'Dynamic operability: scenarios '
'and robust intersection'),
margin=dict(l=0, r=0, t=40, b=0),
legend=dict(x=0.01, y=0.99))
return fig
def _show_if_notebook(fig):
'''Display a plotly figure when running inside an IPython kernel,
mirroring matplotlib's display-at-cell-end behavior; no-op in plain
scripts and test runs.'''
try:
from IPython import get_ipython
if get_ipython() is not None:
fig.show()
except Exception:
pass
[docs]
def plot_dynamic_funnel(mapping_results,
DOS=None,
dOI=None,
labels=None,
alpha=0.25,
colormap='rainbow',
view_elev=20,
view_azim=-60,
orientation='landscape',
engine='matplotlib',
mc_trajectories=None,
mc_color='green',
mc_alpha=0.4,
mc_linewidth=0.6,
show=True):
'''
Plot the 3D dynamic operability funnel by stacking the output-space
AOS polytopes along the time axis, as in Dinh & Lima (IECR 2023 and
Comput. Chem. Eng. 2026). In the default landscape orientation the
time axis is horizontal and the funnel opens sideways, matching the
figures of the publications; each time step k is rendered as a filled
polygon at its time coordinate, producing the characteristic expanding
or contracting funnel shape. Monte Carlo trajectories returned by
simulate_mc_trajectories may be overlaid for visual containment
verification.
Author: Victor Alves
Parameters
----------
mapping_results : dict
Dictionary returned by dynamic_operability_mapping.
DOS : np.ndarray, Optional.
Bounds on the Desired Output Set (DOS), shape (n_y, 2). When
provided, the DOS box is drawn as a translucent reference at
the first and last time steps. Default is None.
dOI : array-like, Optional.
Per-time-step Dynamic Operability Index values (as returned by
dOI_eval), one per AOS region. When provided, each funnel slice is
colored by its dOI value and the colorbar is rescaled to the
operability index in [0, 100] % instead of the time step. This is
how dynamic_operability shows the dOI directly on the funnel.
Default is None (color slices by time step).
labels : list, Optional.
Axis labels for the two output variables, e.g. ['$y_1$','$y_2$'].
Default is None.
alpha : float, Optional.
Transparency for the AOS polygon faces. Default is 0.25.
colormap : str, Optional.
Matplotlib colormap name for coloring time steps. Default is
'rainbow' to match opyrability's default cmap.
view_elev : float, Optional.
Elevation angle for the 3D view in degrees. Default is 20.
view_azim : float, Optional.
Azimuth angle for the 3D view in degrees. Default is -60.
orientation : {'landscape', 'vertical'}, Optional.
Funnel orientation. 'landscape' (default) places time on a
horizontal axis with the second output on the vertical axis, as
in the figures of Dinh & Lima; 'vertical' stacks the time steps
upward along the vertical axis.
engine : {'matplotlib', 'plotly'}, Optional.
Plotting engine. 'matplotlib' (default) produces a static 3D
figure. 'plotly' produces an interactive WebGL figure that
supports pan, rotate, and zoom both in Jupyter and in static
HTML exports such as the Jupyter Book documentation (requires
the plotly package); the second return value is then None.
mc_trajectories : np.ndarray, Optional.
Monte Carlo trajectories produced by simulate_mc_trajectories,
shape (n_traj, n_steps + 1, n_y). When provided, trajectories
are drawn as semi-transparent lines through the funnel. Default
is None.
mc_color : str, Optional.
Color for Monte Carlo trajectory lines. Default is 'green'.
mc_alpha : float, Optional.
Transparency for Monte Carlo lines. Default is 0.4.
mc_linewidth : float, Optional.
Line width for Monte Carlo lines. Default is 0.6.
show : bool, Optional.
For the plotly engine, display the figure in a notebook before
returning it. Default is True. High-level callers that set a title
and display the figure themselves pass show=False to avoid a double
display. Has no effect on the matplotlib engine.
Returns
-------
fig : matplotlib.figure.Figure
The figure object.
ax : mpl_toolkits.mplot3d.axes3d.Axes3D
The 3D axes object.
References
----------
[1] S. Dinh and F. V. Lima. Dynamic operability analysis for process
design and control of modular natural gas utilization systems.
Ind. Eng. Chem. Res., 2023.
https://doi.org/10.1021/acs.iecr.2c03543
[2] S. Dinh. Nonlinear Dynamic Analysis and Control of Chemical
Processes Using Dynamic Operability Framework. PhD Dissertation,
West Virginia University, 2023.
'''
# Extract the output-space AOS regions from the mapping results.
AOS_regions = mapping_results['AOS_regions']
n_steps = len(AOS_regions)
if n_steps == 0:
warnings.warn("No AOS regions to plot.")
return None, None
# The funnel requires 2D output polytopes (x=y1, y=y2, z=k). Probe
# the first region to confirm dimensionality before building axes.
first_verts = pc.extreme(AOS_regions[0].list_poly[0])
if first_verts is None or first_verts.shape[1] != 2:
warnings.warn("Funnel plot requires 2D output polytopes.")
return None, None
if engine == 'plotly':
fig = _plotly_funnel(
AOS_regions, DOS=DOS, dOI=dOI, labels=labels, alpha=alpha,
colormap=colormap, view_elev=view_elev, view_azim=view_azim,
orientation=orientation, mc_trajectories=mc_trajectories,
mc_color=mc_color, mc_alpha=mc_alpha,
mc_linewidth=mc_linewidth)
# Display the plotly figure in a notebook, unless a caller (e.g.
# dynamic_operability) will set a title and show it itself.
if show:
_show_if_notebook(fig)
return fig, None
if engine != 'matplotlib':
raise ValueError("engine must be 'matplotlib' or 'plotly'.")
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')
cmap_func = plt.get_cmap(colormap, max(n_steps, 2))
cmap_cont = plt.get_cmap(colormap)
# When per-step dOI is supplied, color the slices by operability index
# (continuous, 0-100 %) instead of by time step.
use_dOI = dOI is not None and len(dOI) == n_steps
dOI_arr = np.asarray(dOI, dtype=float) if use_dOI else None
# Draw the DOS box at the first and last time steps as a translucent
# spatial reference, and connect its vertical edges with dashed lines.
if orientation not in ('landscape', 'vertical'):
raise ValueError("orientation must be 'landscape' or 'vertical'.")
landscape = orientation == 'landscape'
def _p3d(v, k_val):
# Map a 2D output vertex and its time coordinate to the 3D axes:
# landscape puts time on the (horizontal) y-axis with the second
# output vertical, matching the Dinh & Lima figures.
return (v[0], k_val, v[1]) if landscape else (v[0], v[1], k_val)
if DOS is not None:
dos = np.asarray(DOS, dtype=float)
dos_verts_2d = np.array([
[dos[0, 0], dos[1, 0]],
[dos[0, 1], dos[1, 0]],
[dos[0, 1], dos[1, 1]],
[dos[0, 0], dos[1, 1]],
])
for k_val in [1, n_steps]:
dos_verts_3d = [_p3d(v, k_val) for v in dos_verts_2d]
poly_dos = Poly3DCollection(
[dos_verts_3d], alpha=0.08, facecolor=DS_COLOR,
edgecolor='gray', linewidth=0.5, linestyle='--'
)
ax.add_collection3d(poly_dos)
# Dashed edges along the time axis connecting the two DOS faces
# for visual continuity.
for v in dos_verts_2d:
if landscape:
ax.plot([v[0], v[0]], [1, n_steps], [v[1], v[1]],
color='gray', linewidth=0.4, linestyle='--',
alpha=0.3)
else:
ax.plot([v[0], v[0]], [v[1], v[1]], [1, n_steps],
color='gray', linewidth=0.4, linestyle='--',
alpha=0.3)
# Plot each AOS_y polytope as a filled 3D polygon at its time-step
# height, with rainbow coloring mapped to k.
for i, region in enumerate(AOS_regions):
poly = region.list_poly[0]
verts = pc.extreme(poly)
if verts is None or len(verts) < 3:
continue
# Sort vertices by polar angle around the centroid so the polygon
# is drawn in order (no self-intersections).
center = verts.mean(axis=0)
angles = np.arctan2(verts[:, 1] - center[1],
verts[:, 0] - center[0])
order = np.argsort(angles)
ordered = verts[order]
k_val = i + 1
if use_dOI:
color = cmap_cont(np.clip(dOI_arr[i] / 100.0, 0.0, 1.0))
else:
color = cmap_func(i / max(n_steps - 1, 1))
# Filled polygon face.
verts_3d = [_p3d(v, k_val) for v in ordered]
poly_3d = Poly3DCollection(
[verts_3d], alpha=alpha, facecolor=color,
edgecolor=color, linewidth=1.2
)
ax.add_collection3d(poly_3d)
# Stronger boundary line around the polygon for visual clarity.
boundary = np.vstack([ordered, ordered[0:1]])
if landscape:
ax.plot(boundary[:, 0], [k_val] * len(boundary),
boundary[:, 1],
color=color, linewidth=1.0, alpha=0.8)
else:
ax.plot(boundary[:, 0], boundary[:, 1],
[k_val] * len(boundary),
color=color, linewidth=1.0, alpha=0.8)
# Overlay Monte Carlo trajectories as lines through the funnel.
if mc_trajectories is not None:
n_traj, n_pts, _ = mc_trajectories.shape
for t in range(n_traj):
ys = mc_trajectories[t]
ks = np.arange(n_pts)
if landscape:
ax.plot(ys[:, 0], ks, ys[:, 1],
color=mc_color, alpha=mc_alpha,
linewidth=mc_linewidth)
else:
ax.plot(ys[:, 0], ys[:, 1], ks,
color=mc_color, alpha=mc_alpha,
linewidth=mc_linewidth)
# Axis labels and 3D view angle. In landscape orientation the time
# axis is horizontal (y-axis) and the second output is vertical.
y1_label = labels[0] if (labels is not None and len(labels) >= 2) \
else '$y_1$'
y2_label = labels[1] if (labels is not None and len(labels) >= 2) \
else '$y_2$'
if landscape:
ax.set_xlabel(y1_label, fontsize=11, labelpad=8)
ax.set_ylabel('Time step $k$', fontsize=11, labelpad=8)
ax.set_zlabel(y2_label, fontsize=11, labelpad=8)
else:
ax.set_xlabel(y1_label, fontsize=11, labelpad=8)
ax.set_ylabel(y2_label, fontsize=11, labelpad=8)
ax.set_zlabel('Time step $k$', fontsize=11, labelpad=8)
ax.set_title('Dynamic Operability Funnel', fontsize=13)
# Colorbar: dOI (%) when slices are colored by operability index,
# otherwise the time-step index.
if use_dOI:
sm = plt.cm.ScalarMappable(
cmap=colormap, norm=plt.Normalize(vmin=0, vmax=100))
sm.set_array([])
cbar = fig.colorbar(sm, ax=ax, shrink=0.6, pad=0.1)
cbar.set_label('dOI (%)')
else:
sm = plt.cm.ScalarMappable(
cmap=colormap, norm=plt.Normalize(vmin=1, vmax=n_steps))
sm.set_array([])
cbar = fig.colorbar(sm, ax=ax, shrink=0.6, pad=0.1)
cbar.set_label('Time step $k$')
ax.view_init(elev=view_elev, azim=view_azim)
fig.tight_layout()
return fig, ax
[docs]
def dynamic_operability(model,
x0,
AIS_bound,
DOS=None,
k_max=10,
AIS_resolution=3,
method='auto',
y0=None,
EDS_bound=None,
monte_carlo=0,
plot=True,
labels=None,
title=None,
orientation='landscape',
engine='matplotlib',
seed=None):
'''
One-call dynamic operability analysis -- the recommended high-level entry
point.
Builds the achievable-output funnel over k_max time steps, evaluates the
Dynamic Operability Index (dOI) against the Desired Output Set, and plots
the funnel with each time slice colored by its dOI -- replacing the
manual dynamic_operability_mapping/nstep -> dOI_eval -> plot_dynamic_funnel
sequence with a single call. The propagation method is selected
automatically so the caller never has to choose:
-- linear state-space projection when ``model`` is a set of matrices,
-- nonlinear state-space projection for low-dimensional states,
-- n-step simulation for high-dimensional states (e.g. spatially
discretized PDE/reactor models) where the state polytope cannot be
enumerated.
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
model : Callable or dict
Either a step function step(x, u) -> (x_next, y) (or step(x, u, d)
for the projection path with disturbances), or a dict of linear
system matrices {'A':.., 'B':.., 'C':.., 'B_d':.. (optional)}. The
dict also accepts 'u_ref', 'y_ref', and 'd_ref' nominal values so
that deviation-form identified models can be analyzed with AIS,
DOS, and EDS bounds given in absolute units. For
high-dimensional-state models bake any disturbance into a
two-argument closure step(x, u).
x0 : np.ndarray
Initial state vector.
AIS_bound : np.ndarray
Available Input Set bounds, shape (n_u, 2).
DOS : np.ndarray, Optional.
Desired Output Set bounds, shape (n_y, 2). When given, the dOI is
computed and the funnel is colored by it. Default None.
k_max : int, Optional.
Number of discrete time steps. Default 10.
AIS_resolution : int or array-like, Optional.
AIS grid resolution. Default 3.
method : {'auto', 'projection', 'nstep'}, Optional.
Propagation method. 'auto' (default) picks projection for matrices or
low-dimensional states (<= 8) and n-step otherwise.
y0 : np.ndarray, Optional.
Output at x0; prepended as the k=0 funnel slice for the n-step
method. Default None.
EDS_bound : np.ndarray, Optional.
Expected Disturbance Set bounds for the projection path with an
arity-3 step or a B_d matrix. Default None.
monte_carlo : int, Optional.
If > 0, overlay this many Monte Carlo input-sequence trajectories on
the funnel. Default 0.
plot : bool, Optional.
Whether to draw the dOI-colored funnel. Default True.
labels : list, Optional.
Output-space axis labels. Default None.
title : str, Optional.
Plot title. Default None.
orientation : {'landscape', 'vertical'}, Optional.
Funnel orientation, as in plot_dynamic_funnel. 'landscape'
(default) places time on a horizontal axis with the second output
vertical, matching the Dinh & Lima figures.
engine : {'matplotlib', 'plotly'}, Optional.
Plotting engine. 'plotly' produces an interactive figure
(pan/rotate/zoom) that stays interactive in Jupyter and in static
HTML exports; 'matplotlib' (default) produces a static figure.
seed : int, Optional.
Seed for the Monte Carlo trajectories. Default None.
Returns
-------
results : dict
The funnel results dict (see dynamic_operability_mapping /
dynamic_operability_nstep) augmented with 'dOI' (when DOS is given),
'method' (the method used), and 'fig'/'ax' (when plotted).
References
----------
[1] S. Dinh and F. V. Lima. Dynamic operability analysis for process
design and control of modular natural gas utilization systems.
Ind. Eng. Chem. Res., 2023.
https://doi.org/10.1021/acs.iecr.2c03543
'''
x0 = np.asarray(x0, dtype=float)
if callable(model):
if method == 'auto':
# State-space projection enumerates the state polytope, which is
# only tractable for low-dimensional states; everything larger
# falls back to n-step simulation.
method = 'projection' if x0.size <= 8 else 'nstep'
if method == 'projection':
results = dynamic_operability_mapping(
step_model=model, x0=x0, AIS_bound=AIS_bound,
AIS_resolution=AIS_resolution, k_max=k_max,
EDS_bound=EDS_bound, plot=False)
results['method'] = 'state-space projection (nonlinear)'
elif method == 'nstep':
results = dynamic_operability_nstep(
model, x0, AIS_bound, k_max,
AIS_resolution=AIS_resolution, y0=y0, plot=False)
results['method'] = 'n-step simulation'
else:
raise ValueError(
"method must be 'auto', 'projection', or 'nstep'.")
elif isinstance(model, dict):
A, B = model.get('A'), model.get('B')
C, B_d = model.get('C'), model.get('B_d')
if A is None or B is None or C is None:
raise ValueError(
"matrix model must provide at least 'A', 'B' and 'C'.")
results = dynamic_operability_mapping(
x0=x0, AIS_bound=AIS_bound, AIS_resolution=AIS_resolution,
k_max=k_max, A=A, B=B, C=C, B_d=B_d,
u_ref=model.get('u_ref'), y_ref=model.get('y_ref'),
d_ref=model.get('d_ref'),
EDS_bound=EDS_bound, plot=False)
results['method'] = 'state-space projection (linear)'
else:
raise TypeError(
"model must be a step callable or a dict of {A, B, C, B_d}.")
# Dynamic Operability Index against the DOS.
dOI = None
if DOS is not None:
dOI = dOI_eval(results, DOS, plot=False)
results['dOI'] = dOI
# Optional Monte Carlo trajectory overlay.
mc = None
if monte_carlo and monte_carlo > 0:
mc = simulate_mc_trajectories(
results, n_trajectories=int(monte_carlo), seed=seed)
if plot:
fig, ax = plot_dynamic_funnel(
results, DOS=DOS, dOI=dOI, labels=labels, mc_trajectories=mc,
orientation=orientation, engine=engine, show=False)
if fig is not None and title is not None:
if engine == 'plotly':
fig.update_layout(title_text=title)
else:
ax.set_title(title, fontsize=13)
if engine == 'plotly' and fig is not None:
_show_if_notebook(fig)
results['fig'], results['ax'] = fig, ax
return results
[docs]
def dynamic_operability_scenarios(step_factory,
x0_factory,
AIS_bound,
scenarios,
DOS=None,
k_max=10,
AIS_resolution=3,
method='auto',
y0_factory=None,
plot=True,
labels=None,
colors=None,
title=None,
orientation='landscape',
engine='matplotlib'):
'''
Dynamic operability across several disturbance scenarios (or input
sequences), together with their disturbance-robust intersection.
For each scenario the achievable-output funnel is built with
dynamic_operability; the scenarios are then intersected step by step to
give the robust funnel -- the outputs achievable regardless of which
scenario is realized (Dinh & Lima 2023, Figure 9). When the output space
is two-dimensional the result is plotted: each scenario's funnel outline
in its own color with the robust intersection filled and stacked along the
time axis. For higher output dimensions the plot is skipped and the
numerical results are returned.
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
step_factory : Callable
Maps a scenario value d to a two-argument step closure
step(x, u) -> (x_next, y) with that disturbance baked in.
x0_factory : Callable
Maps a scenario value d to the initial state x0 for that scenario
(e.g. the steady state at that disturbance).
AIS_bound : np.ndarray
Available Input Set bounds, shape (n_u, 2).
scenarios : dict or list
Either {label: d} or a list of scenario values d (labels are then
generated automatically). Each d is passed to step_factory and
x0_factory.
DOS : np.ndarray, Optional.
Desired Output Set bounds, shape (n_y, 2). When given, the dOI of the
robust intersection is computed per step. Default None.
k_max : int, Optional.
Number of discrete time steps. Default 10.
AIS_resolution : int or array-like, Optional.
AIS grid resolution. Default 3.
method : {'auto', 'projection', 'nstep'}, Optional.
Propagation method passed to dynamic_operability. Default 'auto'.
y0_factory : Callable, Optional.
Maps a scenario value d to the output at x0 (prepended as the k=0
slice). Default None.
plot : bool, Optional.
Whether to draw the scenario funnels + intersection (2D output only).
Default True.
labels : list, Optional.
Output-space axis labels. Default None.
colors : list, Optional.
One color per scenario for the funnel outlines. Default None (uses
the matplotlib color cycle).
title : str, Optional.
Plot title. Default None.
orientation : {'landscape', 'vertical'}, Optional.
Funnel orientation, as in plot_dynamic_funnel. 'landscape'
(default) places time on a horizontal axis with the second output
vertical, matching the Dinh & Lima figures.
engine : {'matplotlib', 'plotly'}, Optional.
Plotting engine. 'plotly' produces an interactive figure
(pan/rotate/zoom) that stays interactive in Jupyter and in static
HTML exports; 'matplotlib' (default) produces a static figure.
Returns
-------
results : dict
{'scenarios': {label: per-scenario results dict},
'intersection': {'AOS_regions', 'volumes', 'dOI' (if DOS given)},
'fig', 'ax' (when plotted)}.
References
----------
[1] S. Dinh and F. V. Lima. Dynamic operability analysis for process
design and control of modular natural gas utilization systems.
Ind. Eng. Chem. Res., 2023.
https://doi.org/10.1021/acs.iecr.2c03543
'''
# Normalize the scenario specification to an ordered list of (label, d).
if isinstance(scenarios, dict):
items = list(scenarios.items())
else:
items = [(f'scenario {i + 1}', d) for i, d in enumerate(scenarios)]
labels_list = [lab for lab, _ in items]
# Build each scenario's funnel (no per-scenario plotting).
per = {}
for label, d in items:
y0 = y0_factory(d) if y0_factory is not None else None
per[label] = dynamic_operability(
step_factory(d), x0_factory(d), AIS_bound, DOS=DOS, k_max=k_max,
AIS_resolution=AIS_resolution, method=method, y0=y0, plot=False)
n_k = min(len(per[lab]['AOS_regions']) for lab in labels_list)
# Step-by-step intersection of all scenario AOS polytopes.
def _as_region(obj):
if obj is None or pc.is_empty(obj):
return pc.Region([])
return obj if isinstance(obj, pc.Region) else pc.Region([obj])
inter_regions, inter_vol = [], []
for k in range(n_k):
cur = per[labels_list[0]]['AOS_regions'][k].list_poly[0]
for lab in labels_list[1:]:
nxt = per[lab]['AOS_regions'][k].list_poly[0]
inter = pc.intersect(cur, nxt)
if pc.is_empty(inter):
cur = inter
break
cur = (inter.list_poly[0] if isinstance(inter, pc.Region)
else inter)
reg = _as_region(cur)
inter_regions.append(reg)
inter_vol.append(float(reg.volume) if len(reg) > 0 else 0.0)
intersection = {
'AOS_regions': inter_regions,
'volumes': np.asarray(inter_vol, dtype=float),
'k_converged': None,
}
if DOS is not None:
DOS_arr = np.asarray(DOS, dtype=float)
intersection['dOI'] = np.asarray(
[_dOI_at_step(r, DOS_arr) if len(r) > 0 else 0.0
for r in inter_regions], dtype=float)
results = {'scenarios': per, 'intersection': intersection}
# Plot only for 2D output spaces.
probe = pc.extreme(per[labels_list[0]]['AOS_regions'][0].list_poly[0])
plottable = probe is not None and probe.shape[1] == 2
if plot and not plottable:
warnings.warn(
"Scenario plot requires a 2D output space; returning data only.")
if plot and plottable:
if engine == 'plotly':
fig = _plotly_scenarios(per, inter_regions, n_k, labels_list,
labels=labels, colors=colors,
title=title, orientation=orientation)
_show_if_notebook(fig)
results['fig'], results['ax'] = fig, None
elif engine == 'matplotlib':
fig, ax = _plot_scenarios(per, inter_regions, n_k, labels_list,
labels=labels, colors=colors,
title=title, orientation=orientation)
results['fig'], results['ax'] = fig, ax
else:
raise ValueError("engine must be 'matplotlib' or 'plotly'.")
return results
def _plot_scenarios(per, inter_regions, n_k, labels_list,
labels=None, colors=None, title=None,
orientation='landscape'):
'''3D plot of per-scenario funnel outlines (one color each) with the
robust intersection filled in blue, stacked along the time axis. The
default landscape orientation places time on a horizontal axis with
the second output vertical, as in the Dinh & Lima figures.'''
landscape = orientation == 'landscape'
def _ordered(poly):
V = pc.extreme(poly)
if V is None or len(V) < 3:
return V
c = V.mean(axis=0)
ang = np.arctan2(V[:, 1] - c[1], V[:, 0] - c[0])
return V[np.argsort(ang)]
if colors is None:
cycle = plt.rcParams['axes.prop_cycle'].by_key().get(
'color', ['C0', 'C1', 'C2', 'C3'])
colors = [cycle[i % len(cycle)] for i in range(len(labels_list))]
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')
# Scenario funnel outlines.
for j, lab in enumerate(labels_list):
regions = per[lab]['AOS_regions']
for k in range(min(n_k, len(regions))):
V = _ordered(regions[k].list_poly[0])
if V is None:
continue
B = np.vstack([V, V[0:1]])
if landscape:
ax.plot(B[:, 0], [k] * len(B), B[:, 1], color=colors[j],
linewidth=1.2, label=lab if k == 0 else None)
else:
ax.plot(B[:, 0], B[:, 1], [k] * len(B), color=colors[j],
linewidth=1.2, label=lab if k == 0 else None)
# Robust intersection, filled.
for k in range(n_k):
reg = inter_regions[k]
if len(reg) == 0:
continue
V = _ordered(reg.list_poly[0])
if V is None or len(V) < 3:
continue
if landscape:
face_verts = [[(x, k, y) for x, y in V]]
else:
face_verts = [[(x, y, k) for x, y in V]]
face = Poly3DCollection(face_verts,
facecolor='blue', alpha=0.55,
edgecolor='b', linewidth=0.8)
ax.add_collection3d(face)
y1_label = labels[0] if (labels is not None and len(labels) >= 2) \
else '$y_1$'
y2_label = labels[1] if (labels is not None and len(labels) >= 2) \
else '$y_2$'
if landscape:
ax.set_xlabel(y1_label, fontsize=11, labelpad=8)
ax.set_ylabel('Time step $k$', fontsize=11, labelpad=8)
ax.set_zlabel(y2_label, fontsize=11, labelpad=8)
else:
ax.set_xlabel(y1_label, fontsize=11, labelpad=8)
ax.set_ylabel(y2_label, fontsize=11, labelpad=8)
ax.set_zlabel('Time step $k$', fontsize=11, labelpad=8)
ax.set_title(title or 'Dynamic operability: scenarios and robust '
'intersection', fontsize=13)
ax.legend(loc='upper left', fontsize=9)
ax.view_init(elev=18, azim=-50)
fig.tight_layout()
return fig, ax
def _project_state_regions(mapping_results, dims):
'''Project the stored state-space funnel polytopes (AOS_x) onto two
selected state dimensions, returning a list of 2D pc.Region slices.'''
if 'AOS_x' not in mapping_results:
raise ValueError(
"plot_state_funnel requires results from the state-space "
"projection path (dynamic_operability_mapping, or "
"dynamic_operability with matrices or a low-dimensional "
"state); the n-step simulation method does not store state "
"polytopes.")
d0, d1 = int(dims[0]), int(dims[1])
regions = []
for P in mapping_results['AOS_x']:
V = pc.extreme(P)
if V is None or len(V) == 0:
continue
pts = V[:, [d0, d1]]
regions.append(pc.Region([_hull_or_degenerate(pts)]))
return regions
[docs]
def plot_state_funnel(mapping_results,
dims=(0, 1),
labels=None,
colors=None,
title=None,
alpha=0.25,
colormap='rainbow',
view_elev=20,
view_azim=-60,
orientation='landscape',
engine='matplotlib'):
'''
Plot the dynamic funnel in the STATE space, as in Figures 4 and 5 of
Dinh & Lima (Comput. Chem. Eng., 2026): the achievable state-space
polytopes AOS_x(k) are projected onto two selected state dimensions
and stacked along the time axis. Passing a dictionary of several
mapping results overlays their state funnels as outlines (one color
each), which reproduces the nominal-versus-updated funnel comparison
used for the online update of the operability funnel at a perturbed
initial state.
Only available for results produced by the state-space projection
path (dynamic_operability_mapping, or dynamic_operability with
matrices or a low-dimensional state), which stores the state
polytopes under the 'AOS_x' key; the n-step simulation method works
in the output space and does not retain them.
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
mapping_results : dict
Either a single results dictionary (from the projection path) or
a dictionary mapping legend labels to results dictionaries, e.g.
{'Nominal initial state': res_a, 'Perturbed initial state':
res_b}, in which case the state funnels are overlaid as outlines.
dims : tuple of int, Optional.
The two (zero-based) state dimensions to project onto. Default
is (0, 1).
labels : list, Optional.
Axis labels for the two state variables. Default is None
('State variable <dims[0]+1>', 'State variable <dims[1]+1>').
colors : list, Optional.
One color per overlaid funnel (multi-results input only).
Default is None (matplotlib color cycle).
title : str, Optional.
Plot title. Default is None.
alpha : float, Optional.
Transparency for the funnel slices (single-results input only).
Default is 0.25.
colormap : str, Optional.
Colormap for the time-step coloring (single-results input only).
Default is 'rainbow'.
view_elev : float, Optional.
Elevation angle for the 3D view in degrees. Default is 20.
view_azim : float, Optional.
Azimuth angle for the 3D view in degrees. Default is -60.
orientation : {'landscape', 'vertical'}, Optional.
Funnel orientation, as in plot_dynamic_funnel. Default is
'landscape'.
engine : {'matplotlib', 'plotly'}, Optional.
Plotting engine, as in plot_dynamic_funnel. Default is
'matplotlib'.
Returns
-------
fig : matplotlib.figure.Figure or plotly Figure
The figure object.
ax : mpl_toolkits.mplot3d.axes3d.Axes3D or None
The 3D axes object (None for the plotly engine).
References
----------
[1] S. Dinh and F. V. Lima. Linear dynamic operability analysis with
state-space projection for the online construction of achievable
output funnels. Comput. Chem. Eng., 205:109428, 2026.
https://doi.org/10.1016/j.compchemeng.2025.109428
'''
if labels is None:
labels = [f'State variable {int(dims[0]) + 1}',
f'State variable {int(dims[1]) + 1}']
# A single results dict carries 'AOS_regions'/'AOS_x' keys; a
# label -> results mapping does not.
multi = not ('AOS_regions' in mapping_results
or 'AOS_x' in mapping_results)
if not multi:
# Single funnel: reuse the output-funnel renderer on the
# projected state regions (time-step coloring, no DOS/dOI).
regions = _project_state_regions(mapping_results, dims)
fig, ax = plot_dynamic_funnel(
{'AOS_regions': regions}, labels=labels, alpha=alpha,
colormap=colormap, view_elev=view_elev, view_azim=view_azim,
orientation=orientation, engine=engine, show=False)
final_title = title or 'Dynamic state funnel'
if fig is not None:
if engine == 'plotly':
fig.update_layout(title_text=final_title)
_show_if_notebook(fig)
else:
ax.set_title(final_title, fontsize=13)
return fig, ax
# Multiple funnels: overlay outlines per label, reusing the scenario
# renderers with an empty intersection.
per = {lab: {'AOS_regions': _project_state_regions(res, dims)}
for lab, res in mapping_results.items()}
labels_list = list(per.keys())
n_k = min(len(per[lab]['AOS_regions']) for lab in labels_list)
empty_inter = [pc.Region([]) for _ in range(n_k)]
final_title = title or 'Dynamic state funnels'
if engine == 'plotly':
fig = _plotly_scenarios(per, empty_inter, n_k, labels_list,
labels=labels, colors=colors,
title=final_title,
orientation=orientation,
view_elev=view_elev, view_azim=view_azim)
_show_if_notebook(fig)
return fig, None
if engine == 'matplotlib':
return _plot_scenarios(per, empty_inter, n_k, labels_list,
labels=labels, colors=colors,
title=final_title, orientation=orientation)
raise ValueError("engine must be 'matplotlib' or 'plotly'.")
[docs]
def plot_funnel_comparison(results_dict,
labels=None,
colors=None,
title=None,
orientation='landscape',
view_elev=20,
view_azim=-60,
engine='matplotlib'):
'''
Overlay the output-space funnels of several dynamic operability results
as outlines, one color per result. Useful for comparing mapping methods
(linear projection, nonlinear projection, n-step simulation) or model
fidelities on the same process and operability sets.
The funnels are aligned index-wise: slice i of every result is drawn at
time step i+1. For a fair comparison all results should therefore start
at the same time step (e.g. build the n-step funnel without the y0
initial slice when comparing against projection results).
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
results_dict : dict
Mapping of legend labels to results dictionaries (as returned by
dynamic_operability and the lower-level mapping functions), e.g.
{'Linear': res_lin, 'Nonlinear projection': res_nl,
'N-step (full model)': res_ns}.
labels : list, Optional.
Axis labels for the two output variables. Default is None.
colors : list, Optional.
One color per result. Default is None (matplotlib color cycle).
title : str, Optional.
Plot title. Default is None.
orientation : {'landscape', 'vertical'}, Optional.
Funnel orientation, as in plot_dynamic_funnel. Default is
'landscape'.
view_elev : float, Optional.
Elevation angle for the 3D view in degrees. Default is 20.
view_azim : float, Optional.
Azimuth angle for the 3D view in degrees. Default is -60.
engine : {'matplotlib', 'plotly'}, Optional.
Plotting engine, as in plot_dynamic_funnel. Default is
'matplotlib'.
Returns
-------
fig : matplotlib.figure.Figure or plotly Figure
The figure object.
ax : mpl_toolkits.mplot3d.axes3d.Axes3D or None
The 3D axes object (None for the plotly engine).
'''
per = {lab: {'AOS_regions': res['AOS_regions']}
for lab, res in results_dict.items()}
labels_list = list(per.keys())
n_k = min(len(per[lab]['AOS_regions']) for lab in labels_list)
empty_inter = [pc.Region([]) for _ in range(n_k)]
final_title = title or 'Dynamic funnel comparison'
if engine == 'plotly':
fig = _plotly_scenarios(per, empty_inter, n_k, labels_list,
labels=labels, colors=colors,
title=final_title, orientation=orientation,
view_elev=view_elev, view_azim=view_azim)
_show_if_notebook(fig)
return fig, None
if engine == 'matplotlib':
return _plot_scenarios(per, empty_inter, n_k, labels_list,
labels=labels, colors=colors,
title=final_title, orientation=orientation)
raise ValueError("engine must be 'matplotlib' or 'plotly'.")
[docs]
def update_dynamic_funnel(mapping_results, x0_new, DOS=None):
'''
Online update of a linear dynamic operability funnel for a new initial
state, via the hyperplane right-hand-side shift of Dinh & Lima
(Comput. Chem. Eng., 2026, Eq. 18). For an LTI system the funnel from
any initial state is the offline funnel translated by A^k (x0_new -
x0_offline), so both the state polytopes and the output regions are
updated by pure linear algebra, with no re-simulation: the offline
construction is performed once, and this update runs in microseconds
at every sampling instant during operation.
Only available for results produced by the linear matrices path of
dynamic_operability / dynamic_operability_mapping.
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
mapping_results : dict
Results dictionary from the linear path (must carry '_matrices'
and 'AOS_x').
x0_new : np.ndarray
The new initial (estimated) state vector, in the same deviation
coordinates as the offline x0.
DOS : np.ndarray, Optional.
Desired Output Set bounds, shape (n_y, 2). When given, the dOI of
the updated funnel is computed and attached. Default is None.
Returns
-------
results : dict
A new results dictionary with the translated state polytopes and
output regions (volumes are translation-invariant and reused),
compatible with all downstream plotting and evaluation functions.
References
----------
[1] S. Dinh and F. V. Lima. Linear dynamic operability analysis with
state-space projection for the online construction of achievable
output funnels. Comput. Chem. Eng., 205:109428, 2026.
https://doi.org/10.1016/j.compchemeng.2025.109428
'''
mats = mapping_results.get('_matrices')
if mats is None or 'AOS_x' not in mapping_results:
raise ValueError(
"update_dynamic_funnel requires results from the linear "
"matrices path of dynamic_operability_mapping.")
A_arr, B_arr, C_arr, _ = mats
x0_old = np.asarray(mapping_results['_x0'], dtype=float)
x0_new = np.asarray(x0_new, dtype=float)
dx0 = x0_new - x0_old
# Translate the state polytopes by A^k dx0 and the output regions by
# C A^k dx0: {H x <= b} + t = {H x <= b + H t}. Volumes are unchanged.
AOS_x_new = []
AOS_regions_new = []
Ak = np.eye(A_arr.shape[0])
for k, P in enumerate(mapping_results['AOS_x']):
t_x = Ak @ dx0
AOS_x_new.append(pc.Polytope(P.A, P.b + P.A @ t_x))
if k >= 1:
t_y = C_arr @ t_x
reg = mapping_results['AOS_regions'][k - 1]
polys = [pc.Polytope(Q.A, Q.b + Q.A @ t_y)
for Q in reg.list_poly]
AOS_regions_new.append(pc.Region(polys))
Ak = A_arr @ Ak
results = dict(mapping_results)
results['AOS_x'] = AOS_x_new
results['AOS_regions'] = AOS_regions_new
results['_x0'] = x0_new
results['method'] = 'state-space projection (linear, online update)'
results.pop('fig', None)
results.pop('ax', None)
if DOS is not None:
results['dOI'] = dOI_eval(results, DOS, plot=False)
return results
[docs]
def propagate_output_covariance(A, G, C, Sigma_w, k_max, Sigma_v=None):
'''
Propagate Gaussian disturbance covariance through linear dynamics and
return the output covariance at each time step:
Sigma_x(k+1) = A Sigma_x(k) A^T + G Sigma_w G^T, Sigma_x(0) = 0
Sigma_y(k) = C Sigma_x(k) C^T (+ Sigma_v)
Companion to gaussian_robust_funnel, following the uncertainty
propagation of Dinh & Lima (Comput. Chem. Eng., 2026, Eqs. 3-6).
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
A : np.ndarray
State transition matrix of the disturbance-carrying dynamics,
shape (n, n).
G : np.ndarray
Disturbance input matrix, shape (n, n_d).
C : np.ndarray
Output matrix, shape (n_y, n).
Sigma_w : np.ndarray
Disturbance covariance, shape (n_d, n_d).
k_max : int
Number of time steps.
Sigma_v : np.ndarray, Optional.
Measurement noise covariance, shape (n_y, n_y), added to every
Sigma_y(k). Default is None.
Returns
-------
list of np.ndarray
Output covariance matrices Sigma_y(k) for k = 1, ..., k_max.
'''
A = np.asarray(A, dtype=float)
G = np.asarray(G, dtype=float)
C = np.asarray(C, dtype=float)
Sigma_w = np.atleast_2d(np.asarray(Sigma_w, dtype=float))
Sx = np.zeros((A.shape[0], A.shape[0]))
out = []
for _ in range(k_max):
Sx = A @ Sx @ A.T + G @ Sigma_w @ G.T
Sy = C @ Sx @ C.T
if Sigma_v is not None:
Sy = Sy + np.atleast_2d(np.asarray(Sigma_v, dtype=float))
out.append(Sy)
return out
[docs]
def gaussian_robust_funnel(mapping_results, Sigma_y, confidence=0.95,
DOS=None):
'''
Disturbance-robust achievable output funnel for Gaussian uncertainty,
by hyperplane shrinkage: each slice of the (mean) funnel is shrunk so
that the remaining outputs stay achievable for every uncertainty
realization inside the chosen highest-density region. Following the
set-intersection theorem of Dinh & Lima (Comput. Chem. Eng., 2026,
Theorem 1), the intersection of all translations of a polytope
{H y <= b} over an ellipsoidal uncertainty with covariance Sigma is
{H y <= b_hat}, b_hat_i = b_i - l * sqrt(H_i Sigma H_i^T),
l^2 = Inv_chi2(confidence; n_y).
This applies the shrinkage in the output space using the
disturbance-induced output covariance (see
propagate_output_covariance); the original publication composes a
state-space shrinkage followed by a measurement-noise output
shrinkage, of which this is the single-stage output-space variant.
An empty slice means no output can be guaranteed at that time step
regardless of the controller (transient inoperability).
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
mapping_results : dict
Results dictionary whose 'AOS_regions' hold the mean (zero-
disturbance) funnel, e.g. from the linear matrices path.
Sigma_y : np.ndarray or list of np.ndarray
Output covariance, either a single (n_y, n_y) matrix applied to
every step or one matrix per funnel slice (as returned by
propagate_output_covariance).
confidence : float, Optional.
Probability content of the uncertainty highest-density region.
Default is 0.95.
DOS : np.ndarray, Optional.
Desired Output Set bounds. When given, the dOI of the robust
funnel is computed and attached. Default is None.
Returns
-------
results : dict
A results dictionary with the shrunken 'AOS_regions' (empty
regions where nothing can be guaranteed), updated 'volumes', and
'dOI' when DOS is given; compatible with the plotting functions.
References
----------
[1] S. Dinh and F. V. Lima. Linear dynamic operability analysis with
state-space projection for the online construction of achievable
output funnels. Comput. Chem. Eng., 205:109428, 2026.
https://doi.org/10.1016/j.compchemeng.2025.109428
'''
from scipy.stats import chi2
regions = mapping_results['AOS_regions']
n_k = len(regions)
if isinstance(Sigma_y, (list, tuple)):
if len(Sigma_y) < n_k:
raise ValueError(
f"Sigma_y has {len(Sigma_y)} entries but the funnel has "
f"{n_k} slices.")
Sy_list = [np.atleast_2d(np.asarray(S, dtype=float))
for S in Sigma_y]
else:
Sy_list = [np.atleast_2d(np.asarray(Sigma_y, dtype=float))] * n_k
probe = pc.extreme(regions[0].list_poly[0])
n_y = probe.shape[1] if probe is not None else Sy_list[0].shape[0]
l_val = np.sqrt(chi2.ppf(confidence, df=n_y))
regions_new = []
volumes = []
for k, reg in enumerate(regions):
Sy = Sy_list[k]
polys = []
for Q in reg.list_poly:
shrink = l_val * np.sqrt(
np.einsum('ij,jk,ik->i', Q.A, Sy, Q.A))
Pn = pc.reduce(pc.Polytope(Q.A, Q.b - shrink))
if not pc.is_empty(Pn) and Pn.volume > 0:
polys.append(Pn)
reg_new = pc.Region(polys)
regions_new.append(reg_new)
volumes.append(float(sum(p.volume for p in polys)) if polys
else 0.0)
results = dict(mapping_results)
results['AOS_regions'] = regions_new
results['volumes'] = np.asarray(volumes, dtype=float)
results['method'] = (mapping_results.get('method', '')
+ ' + Gaussian robust shrinkage').strip()
results.pop('fig', None)
results.pop('ax', None)
if DOS is not None:
DOS_arr = np.asarray(DOS, dtype=float)
results['dOI'] = np.asarray(
[_dOI_at_step(r, DOS_arr) if len(r) > 0 else 0.0
for r in regions_new], dtype=float)
return results
[docs]
def identify_lti_step_tests(step_model, x0, u_ref, du, n_steps=25):
'''
Identify a discrete-time LTI model from step tests on a nonlinear
step model, in the form consumed by dynamic_operability's matrices
interface. Each input is stepped by du[j] from the reference point;
every output response is fit with a first-order model
y_i(k) = K_ij (1 - exp(-k / tau_ij)), and each fitted channel
contributes one state (zero-order-hold discretization), mirroring the
construction of identified models such as the HYPER process model of
Dinh & Lima (Comput. Chem. Eng., 2026).
Author: Victor Alves -- Carnegie Mellon University
Parameters
----------
step_model : Callable
Two-argument step model step(x, u) -> (x_next, y) of the rigorous
process (one call advances one sampling period).
x0 : np.ndarray
Steady-state state vector at the reference point (the step tests
start here).
u_ref : np.ndarray
Reference (nominal) input vector, shape (n_u,).
du : np.ndarray or float
Step size per input (scalar applies to all inputs).
n_steps : int, Optional.
Number of sampling periods recorded per step test. Should cover
the settling time of the slowest channel. Default is 25.
Returns
-------
model : dict
{'A', 'B', 'C', 'u_ref', 'y_ref'} ready for
dynamic_operability(model, ...). The fitted gains and time
constants are attached under 'K' and 'tau' (shape (n_y, n_u)) for
inspection.
'''
x0 = np.asarray(x0, dtype=float)
u_ref = np.asarray(u_ref, dtype=float)
n_u = u_ref.shape[0]
du_arr = (np.full(n_u, float(du)) if np.isscalar(du)
else np.asarray(du, dtype=float))
# Reference output at the steady state.
_, y_ref = step_model(x0.copy(), u_ref)
y_ref = np.atleast_1d(np.asarray(y_ref, dtype=float))
n_y = y_ref.shape[0]
# One step test per input.
responses = np.zeros((n_u, n_steps, n_y))
for j in range(n_u):
u_test = u_ref.copy()
u_test[j] += du_arr[j]
x = x0.copy()
for k in range(n_steps):
x, y = step_model(x, u_test)
responses[j, k, :] = np.asarray(y, dtype=float) - y_ref
# First-order fit per channel via grid search on tau.
t_grid = np.arange(1, n_steps + 1, dtype=float)
taus = np.linspace(0.3, max(3.0, n_steps / 1.5), 300)
K = np.zeros((n_y, n_u))
tau = np.zeros((n_y, n_u))
for j in range(n_u):
for i in range(n_y):
yk = responses[j, :, i]
K_end = yk[-1]
errors = [np.sum((K_end * (1 - np.exp(-t_grid / t_c))
- yk) ** 2) for t_c in taus]
tau[i, j] = taus[int(np.argmin(errors))]
K[i, j] = K_end / du_arr[j]
# One state per channel, ZOH at the sampling period of step_model.
n_states = n_y * n_u
A = np.zeros((n_states, n_states))
B = np.zeros((n_states, n_u))
C = np.zeros((n_y, n_states))
s = 0
for i in range(n_y):
for j in range(n_u):
A[s, s] = np.exp(-1.0 / tau[i, j])
B[s, j] = 1 - np.exp(-1.0 / tau[i, j])
C[i, s] = K[i, j]
s += 1
return {'A': A, 'B': B, 'C': C, 'u_ref': u_ref, 'y_ref': y_ref,
'K': K, 'tau': tau}
[docs]
def make_pyomo_step_model(build_func, n_x, n_u):
'''
Wrap a Pyomo model builder function into a step_model callable that
is compatible with dynamic_operability_mapping. At each call the
returned callable builds and solves the Pyomo NLP to advance the
state x(k) to x(k+1) given input u(k).
The builder function must return a ConcreteModel with:
-- m.x_current[i] : Param, indexed 0..n_x-1, for current state values.
-- m.u[j] : fixed Var, indexed 0..n_u-1, for input values.
-- m.x_next[i] : Var, indexed 0..n_x-1, for the next state values.
-- Constraints linking x_current and u to x_next (e.g. via orthogonal
collocation on finite elements over a single discretized time
step).
Author: Victor Alves
Parameters
----------
build_func : Callable
Function that takes no arguments and returns a
pyomo.environ.ConcreteModel with the structure described above.
n_x : int
Number of state variables.
n_u : int
Number of input variables.
Returns
-------
step_model : Callable
step_model(x, u) -> (x_next, y), compatible with
dynamic_operability_mapping. The output y equals x_next; supply
your own C matrix in the linear path or post-process results if
a reduced output mapping is needed.
References
----------
[1] S. Dinh and F. V. Lima. Dynamic operability analysis for process
design and control of modular natural gas utilization systems.
Ind. Eng. Chem. Res., 2023.
https://doi.org/10.1021/acs.iecr.2c03543
[2] S. Dinh. Nonlinear Dynamic Analysis and Control of Chemical
Processes Using Dynamic Operability Framework. PhD Dissertation,
West Virginia University, 2023.
'''
import pyomo.environ as pyo
def step_model(x, u):
# Build a fresh Pyomo model instance for this (x, u) evaluation.
m = build_func()
# Fix the current state and input values as model parameters.
for i in range(n_x):
m.x_current[i].set_value(float(x[i]))
for j in range(n_u):
m.u[j].fix(float(u[j]))
# Solve the NLP to obtain the next state vector.
solver = pyo.SolverFactory('ipopt')
solver.solve(m, tee=False)
# Extract the next-state values and return with identity output.
x_next = np.array([pyo.value(m.x_next[i]) for i in range(n_x)])
return x_next, x_next
return step_model
# --------------------------------------------------------------------------- #
# Private helpers - not part of the public API.
# --------------------------------------------------------------------------- #
def _propagate_state_nonlinear(step_model, state_vertices, input_vertices,
d_vec=None):
'''
Propagate a set of state vertices one time step forward by evaluating
the nonlinear step model over all (state vertex, input vertex) pairs.
Returns both the array of candidate next-state points (for state-space
AOS reconstruction) and the array of corresponding output points.
Parameters
----------
step_model : Callable
(x, u) -> (x_next, y) or (x, u, d) -> (x_next, y).
state_vertices : np.ndarray
Vertices of the current state polytope, shape (n_sv, n_x).
input_vertices : np.ndarray
Vertices or grid points of the AIS, shape (n_iv, n_u).
d_vec : np.ndarray, Optional.
A single fixed disturbance vector, shape (n_d,). The caller
loops over EDS vertices to cover worst-case scenarios. Default
is None (no disturbance argument passed to step_model).
Returns
-------
next_pts : np.ndarray
All candidate next-state points, shape (n_sv * n_iv, n_x).
out_pts : np.ndarray
Corresponding output points, shape (n_sv * n_iv, n_y).
'''
next_pts = []
out_pts = []
# Evaluate the step model for every (state vertex, input vertex)
# pair, with the disturbance held fixed when supplied.
if d_vec is None:
for sv in state_vertices:
for iv in input_vertices:
x_next, y = step_model(np.asarray(sv), np.asarray(iv))
next_pts.append(np.asarray(x_next, dtype=float))
out_pts.append(np.atleast_1d(np.asarray(y, dtype=float)))
else:
d_arr = np.asarray(d_vec, dtype=float)
for sv in state_vertices:
for iv in input_vertices:
x_next, y = step_model(np.asarray(sv), np.asarray(iv),
d_arr)
next_pts.append(np.asarray(x_next, dtype=float))
out_pts.append(np.atleast_1d(np.asarray(y, dtype=float)))
return np.array(next_pts), np.array(out_pts)
def _propagate_state_linear(A, B, B_d, state_polytope, AIS_polytope,
EDS_polytope=None):
'''
Propagate a state polytope one time step forward for a discrete-time
linear system using polytope affine transforms and Minkowski sums:
AOS_x(k+1) = A @ AOS_x(k) (+) B @ AIS
If disturbances are present, an additional term is Minkowski-summed:
AOS_x(k+1) = A @ AOS_x(k) (+) B @ AIS (+) B_d @ EDS
Parameters
----------
A : np.ndarray
State transition matrix, shape (n_x, n_x).
B : np.ndarray
Input matrix, shape (n_x, n_u).
B_d : np.ndarray or None
Disturbance input matrix, shape (n_x, n_d). Required when
EDS_polytope is supplied.
state_polytope : pc.Polytope
Current state polytope AOS_x(k).
AIS_polytope : pc.Polytope
Polytope representation of the Available Input Set.
EDS_polytope : pc.Polytope or None, Optional.
Polytope for the disturbance (typically a single-point degenerate
polytope for one EDS vertex when the caller is handling worst-
case intersection). Default is None.
Returns
-------
pc.Polytope
The propagated state polytope AOS_x(k+1).
'''
# Affine transform: A @ AOS_x(k).
A_state = _affine_transform_polytope(A, state_polytope)
# Affine transform: B @ AIS.
B_input = _affine_transform_polytope(B, AIS_polytope)
# Minkowski sum: A @ AOS_x(k) (+) B @ AIS.
next_poly = _minkowski_sum_vrep(A_state, B_input)
# Add the disturbance contribution B_d @ EDS via a second Minkowski
# sum when EDS_polytope is supplied.
if EDS_polytope is not None and B_d is not None:
Bd_dist = _affine_transform_polytope(B_d, EDS_polytope)
next_poly = _minkowski_sum_vrep(next_poly, Bd_dist)
return next_poly
def _affine_transform_polytope(M, polytope_in):
'''
Apply the affine transformation y = M @ x to a polytope via vertex
enumeration. The transformed vertex cloud is converted back to a
polytope using convex hull (qhull), or to a degenerate bounding box
when fewer vertices are present than required for a full-dimensional
polytope.
Parameters
----------
M : np.ndarray
Transformation matrix, shape (n_out, n_in).
polytope_in : pc.Polytope
Input polytope in n_in dimensions.
Returns
-------
pc.Polytope
Transformed polytope in n_out dimensions.
'''
# Extract the vertex representation of the input polytope.
verts = pc.extreme(polytope_in)
if verts is None or len(verts) == 0:
raise ValueError(
"Cannot extract vertices from the input polytope."
)
# Apply the linear transformation to each vertex row.
transformed = (M @ verts.T).T
# Fall back to a degenerate bounding box when qhull would need more
# points than we have for a full-dimensional polytope.
return _hull_or_degenerate(transformed)
def _minkowski_sum_vrep(poly_a, poly_b):
'''
Compute the Minkowski sum of two polytopes via vertex enumeration:
P_a (+) P_b = ConvexHull{ a + b | a in V(P_a), b in V(P_b) }
Parameters
----------
poly_a : pc.Polytope
First input polytope.
poly_b : pc.Polytope
Second input polytope.
Returns
-------
pc.Polytope
The Minkowski sum polytope.
'''
# Enumerate vertices of both operand polytopes.
va = pc.extreme(poly_a)
vb = pc.extreme(poly_b)
if va is None or vb is None:
raise ValueError("Cannot extract vertices for Minkowski sum.")
# Form all pairwise sums and take the convex hull.
pts = []
for a in va:
for b in vb:
pts.append(a + b)
pts = np.array(pts)
return _hull_or_degenerate(pts)
def _hull_or_degenerate(points):
'''
Return pc.qhull(points) when the point cloud spans a full-dimensional
polytope, or a small padded bounding box via _point_or_degenerate_polytope
when it does not (e.g. single point or collinear set).
Parameters
----------
points : np.ndarray
Array of points, shape (n_pts, n_dim).
Returns
-------
pc.Polytope
'''
points = np.atleast_2d(points)
# qhull needs at least n_dim + 1 points in general position.
if points.shape[0] < points.shape[1] + 1:
return _point_or_degenerate_polytope(points)
return pc.qhull(points)
def _point_or_degenerate_polytope(points, eps=1e-6):
'''
Create a small full-dimensional polytope enclosing a potentially
degenerate set of points (e.g. a single point or collinear set). A
tiny axis-aligned bounding box of half-width eps is added around
the point cloud so the resulting polytope is non-degenerate and
admits a proper H-representation.
Parameters
----------
points : np.ndarray
Array of points, shape (n_pts, n_dim).
eps : float, Optional.
Half-width of the padding added around each dimension. Default
is 1e-6.
Returns
-------
pc.Polytope
A (possibly tiny) full-dimensional polytope enclosing all points.
'''
# Pad the bounding box of the point cloud by eps on all sides.
points = np.atleast_2d(points)
lo = points.min(axis=0) - eps
hi = points.max(axis=0) + eps
bounds = np.column_stack([lo, hi])
return pc.box2poly(bounds)
def _dOI_at_step(AOS_region, DOS):
'''
Compute the Dynamic Operability Index (dOI) at a single time step
by delegating to the steady-state OI_eval routine.
Parameters
----------
AOS_region : pc.Region
The achievable output set region at this time step.
DOS : np.ndarray
Bounds on the Desired Output Set, shape (n_y, 2).
Returns
-------
float
Operability index value in the range [0, 100].
'''
# OI_eval expects AS as [pc.Region, None]; reuse the steady-state code.
return OI_eval([AOS_region, None], DOS, plot=False)
def _plot_AOS_evolution(AOS_regions, labels=None):
'''
Plot the 2D output-space AOS polygon at each time step overlaid on
a single axes, with a rainbow color map indicating time progression.
Parameters
----------
AOS_regions : list of pc.Region
Output-space AOS regions at each time step.
labels : list, Optional.
Axis labels, e.g. ['$y_1$', '$y_2$']. Default is None.
Returns
-------
None
'''
n_steps = len(AOS_regions)
if n_steps == 0:
return
# Determine output dimensionality from the first region.
first_verts = pc.extreme(AOS_regions[0].list_poly[0])
if first_verts is None:
return
n_y = first_verts.shape[1] if first_verts.ndim == 2 else 1
# AOS evolution plot is only generated for 2D output spaces.
if n_y != 2:
return
fig, ax = plt.subplots(1, 1)
cmap_func = plt.get_cmap(cmap, max(n_steps, 2))
# Plot each AOS polygon with a time-indexed color and increasing
# opacity from early to late steps.
for i, region in enumerate(AOS_regions):
color = cmap_func(i / max(n_steps - 1, 1))
alpha = 0.15 + 0.6 * (i / max(n_steps - 1, 1))
lbl = f'k={i+1}' if i == 0 or i == n_steps - 1 else None
_plot_polytope_2d(ax, region.list_poly[0], facecolor=color,
edgecolor='k', alpha=alpha, label=lbl,
linewidth=0.5)
ax.set_title('Dynamic AOS Evolution')
if labels is not None and len(labels) >= 2:
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
else:
ax.set_xlabel('$y_1$')
ax.set_ylabel('$y_2$')
ax.legend(loc='best', fontsize='small')
sm = plt.cm.ScalarMappable(
cmap=cmap, norm=plt.Normalize(vmin=1, vmax=n_steps)
)
sm.set_array([])
cbar = fig.colorbar(sm, ax=ax)
cbar.set_label('Time step k')
fig.tight_layout()
def _plot_volume_evolution(volumes):
'''
Plot the evolution of the AOS_y Lebesgue volume (area in 2D) as a
function of the time step k.
Parameters
----------
volumes : np.ndarray
AOS_y volume per step.
Returns
-------
None
'''
if len(volumes) == 0:
return
fig, ax = plt.subplots(1, 1)
steps = np.arange(1, len(volumes) + 1)
ax.plot(steps, volumes, 's-', color=AS_COLOR, linewidth=1.5,
markersize=5, markeredgecolor='k')
ax.set_xlabel('Time step k')
ax.set_ylabel('AOS Volume (Lebesgue measure)')
ax.set_title('AOS Volume Evolution')
fig.tight_layout()
def _plot_dOI_convergence(dOI_values):
'''
Plot the Dynamic Operability Index convergence trajectory: dOI vs
time step k, with a reference horizontal line at dOI = 100%.
Parameters
----------
dOI_values : np.ndarray
Dynamic Operability Index at each time step, values in [0, 100].
Returns
-------
None
'''
fig, ax = plt.subplots(1, 1)
steps = np.arange(1, len(dOI_values) + 1)
ax.plot(steps, dOI_values, 'o-', color=INTERSECT_COLOR,
linewidth=1.5, markersize=5)
ax.axhline(y=100, color='gray', linestyle='--', linewidth=0.8,
label='dOI = 100%')
ax.set_xlabel('Time step k')
ax.set_ylabel('Dynamic Operability Index (%)')
ax.set_title('dOI Convergence')
ax.legend(loc='best')
ax.set_ylim(bottom=0)
fig.tight_layout()
def _plot_polytope_2d(ax, polytope_obj, facecolor='b', edgecolor='k',
alpha=0.3, label=None, linewidth=1.0):
'''
Plot a 2D convex polytope on an existing matplotlib axes by
extracting its vertices and drawing a closed filled polygon.
Parameters
----------
ax : matplotlib.axes.Axes
The axes on which to draw the polygon.
polytope_obj : pc.Polytope
A 2D polytope whose vertices define the polygon.
facecolor : str, Optional.
Fill color of the polygon. Default is 'b'.
edgecolor : str, Optional.
Edge (boundary) color. Default is 'k'.
alpha : float, Optional.
Transparency of the polygon fill (0 transparent, 1 opaque).
Default is 0.3.
label : str, Optional.
Legend label for the polygon patch. Default is None.
linewidth : float, Optional.
Width of the polygon boundary lines. Default is 1.0.
Returns
-------
None
'''
# Extract vertices; skip degenerate or under-defined polytopes.
verts = pc.extreme(polytope_obj)
if verts is None or len(verts) < 3:
return
# Sort vertices by polar angle around the centroid for a proper
# (non-self-intersecting) polygon.
center = verts.mean(axis=0)
angles = np.arctan2(verts[:, 1] - center[1],
verts[:, 0] - center[0])
order = np.argsort(angles)
ordered_verts = verts[order]
# Add the closed polygon patch and update axis limits.
polygon = plt.Polygon(ordered_verts, closed=True, facecolor=facecolor,
edgecolor=edgecolor, alpha=alpha, label=label,
linewidth=linewidth)
ax.add_patch(polygon)
ax.autoscale_view()
# --------------------------------------------------------------------------- #
# Private helpers: optional-solver and Pyomo/OMLT support.
#
# Pounce (the default NLP solver) is a core dependency imported at module
# level. cyipopt and pyomo are optional and imported lazily here so that
# importing opyrability never requires them.
# --------------------------------------------------------------------------- #
def _is_pyomo_model(model) -> bool:
'''
Detect whether the supplied process model is a Pyomo/OMLT builder
instead of a plain Python callable.
The convention (contributed by Heitor F., github @hfsf, PR #33,
adapted) is a builder function model(m, u_vars, y_vars) that adds the
model constraints to a Pyomo ConcreteModel, flagged with a
'build_pyomo_constraints' attribute (or 'is_omlt' for OMLT objects).
Parameters
----------
model : Callable or object
The process model passed by the user: either a plain Python/JAX
callable y = model(u), or a Pyomo/OMLT builder carrying the
'build_pyomo_constraints' or 'is_omlt' attribute.
Returns
-------
bool
True if model is a Pyomo/OMLT builder, False if it is a plain
callable.
Author: Victor Alves -- Carnegie Mellon University
'''
return (hasattr(model, 'build_pyomo_constraints')
or hasattr(model, 'is_omlt'))
def _pyomo_solver_factory(pyomo_solver: str, print_level: int = 5):
'''
Lazily import pyomo and build the requested NLP solver for the
Pyomo/OMLT mapping paths.
Supports 'ipopt' and 'pounce' (the pure-Rust Ipopt reimplementation
with the bundled FERAL linear solver), the latter registered through
the pyomo_pounce package.
Parameters
----------
pyomo_solver : str
Name of the Pyomo solver to build (e.g. 'ipopt' or 'pounce').
print_level : int, optional
IPOPT print_level forwarded to the solver options when supported.
The default is 5.
Returns
-------
pyo : module
The imported pyomo.environ module.
solver : pyomo solver object
The constructed Pyomo solver returned by SolverFactory.
Raises
------
ImportError
If pyomo (or, when pyomo_solver='pounce', pyomo_pounce) is not
installed.
Author: Victor Alves -- Carnegie Mellon University
'''
try:
import pyomo.environ as pyo
except ImportError as exc:
raise ImportError(
"Pyomo/OMLT models require the pyomo package. Install it "
"with: pip install pyomo") from exc
if pyomo_solver == 'pounce':
try:
import pyomo_pounce # noqa: F401 -- registers 'pounce'
except ImportError as exc:
raise ImportError(
"pyomo_solver='pounce' requires the Pounce solver and "
"its Pyomo plugin. Install them with: pip install "
"pounce-solver pyomo-pounce") from exc
solver = pyo.SolverFactory(pyomo_solver)
try:
solver.options['print_level'] = print_level
except Exception:
pass
return pyo, solver
def _import_cyipopt_minimize():
'''
Lazily import cyipopt's scipy-style minimize interface.
cyipopt is an optional dependency (it ships compiled IPOPT binaries
that are usually installed through conda), so it is only imported when
the user selects method='ipopt'. Pounce, the default solver, is a core
dependency imported at module level instead.
Returns
-------
minimize_ipopt : Callable
cyipopt's scipy-style minimize_ipopt entry point.
Raises
------
ImportError
If cyipopt (>=1.7.0) is not installed.
Author: Victor Alves -- Carnegie Mellon University
'''
try:
from cyipopt import minimize_ipopt
except ImportError as exc:
raise ImportError(
"method='ipopt' requires cyipopt (>=1.7.0). Install it with: "
"conda install -c conda-forge cyipopt") from exc
return minimize_ipopt