Skip to content

API Reference

This section provides the API documentation for mdotoolbox.

Core

BestSolution dataclass

Tracks the best iterate found during optimization under the lexicographic progress criterion.

Source code in src/mdotoolbox/core/base.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
@dataclass
class BestSolution:
    """Tracks the best iterate found during optimization under the lexicographic
    progress criterion."""
    z_bar: np.ndarray = field(default_factory=lambda: np.array([np.nan]))
    x_bar: np.ndarray = field(default_factory=lambda: np.array([np.nan]))
    y_bar: np.ndarray = field(default_factory=lambda: np.array([np.nan]))
    f: float = np.inf
    h: float = np.inf
    J_i: float = np.inf

    def update(self, z_bar_new: np.ndarray, x_bar_new: np.ndarray, y_bar_new: np.ndarray, f_new: float, h_new: float, J_i_new: float, eta: float=1e-08, epsilon_h: float=1e-06, epsilon_J: float=1e-06) -> bool:
        """Update the stored best iterate if the new point represents progress."""
        improved = assess_progress(h=h_new, h_best=self.h, J=J_i_new, J_best=self.J_i, f=f_new, f_best=self.f, eta=eta, epsilon_h=epsilon_h, epsilon_J=epsilon_J)
        if improved:
            self.z_bar = z_bar_new.copy()
            self.x_bar = x_bar_new.copy()
            self.y_bar = y_bar_new.copy()
            self.f = f_new
            self.h = min(self.h, h_new)
            self.J_i = J_i_new
        return improved

    def to_dict(self) -> dict:
        """Export best results as a dictionary."""
        return {'z_bar': self.z_bar, 'x_bar': self.x_bar, 'y_bar': self.y_bar, 'f': self.f, 'h': self.h, 'J_i': self.J_i}

    def __str__(self):
        results = ['Best Attained Results:', f'\t- (z_bar) Best Shared Variables           : {format_vars(self.z_bar, 6)}', f'\t- (x_bar) Best Local Variables            : {format_vars(self.x_bar, 6)}', f'\t- (y_bar) Best Coupling Variables         : {format_vars(self.y_bar, 6)}', f'\t- (f) Best Objective Value            : {format_vars(self.f, 6)}', f'\t- (J_i) Best Total Discrepancy          : {format_vars(self.J_i, 6)}', f'\t- (h) Best Total Constraint Violation : {format_vars(self.h, 6)}']
        return '\n'.join(results)

    def __repr__(self) -> str:
        return f'BestSolution(f={self.f:.6e}, h={self.h:.6e}, J_i={self.J_i:.6e})'

to_dict()

Export best results as a dictionary.

Source code in src/mdotoolbox/core/base.py
511
512
513
def to_dict(self) -> dict:
    """Export best results as a dictionary."""
    return {'z_bar': self.z_bar, 'x_bar': self.x_bar, 'y_bar': self.y_bar, 'f': self.f, 'h': self.h, 'J_i': self.J_i}

update(z_bar_new, x_bar_new, y_bar_new, f_new, h_new, J_i_new, eta=1e-08, epsilon_h=1e-06, epsilon_J=1e-06)

Update the stored best iterate if the new point represents progress.

Source code in src/mdotoolbox/core/base.py
499
500
501
502
503
504
505
506
507
508
509
def update(self, z_bar_new: np.ndarray, x_bar_new: np.ndarray, y_bar_new: np.ndarray, f_new: float, h_new: float, J_i_new: float, eta: float=1e-08, epsilon_h: float=1e-06, epsilon_J: float=1e-06) -> bool:
    """Update the stored best iterate if the new point represents progress."""
    improved = assess_progress(h=h_new, h_best=self.h, J=J_i_new, J_best=self.J_i, f=f_new, f_best=self.f, eta=eta, epsilon_h=epsilon_h, epsilon_J=epsilon_J)
    if improved:
        self.z_bar = z_bar_new.copy()
        self.x_bar = x_bar_new.copy()
        self.y_bar = y_bar_new.copy()
        self.f = f_new
        self.h = min(self.h, h_new)
        self.J_i = J_i_new
    return improved

BudgetManager dataclass

Manages evaluation budget allocation for Collaborative Optimization frameworks.

Source code in src/mdotoolbox/core/base.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
@dataclass
class BudgetManager:
    """Manages evaluation budget allocation for Collaborative Optimization frameworks."""
    mode: Literal['shared', 'weighted', 'fixed'] = 'weighted'
    total_budget: int = None
    system_ratio: float = 0.5
    subsystem_weights: List[float] = None
    iteration_ratio: float = 0.05
    subsystem_budgets: List[int] = None
    system_budget: int = None

    def __post_init__(self):
        self._subsystem_used = []
        self._system_used = 0
        self._total_used = 0
        self._subsystem_allocated = None
        self._system_allocated = None
        self._validate()

    def _validate(self):
        """Validate budget configuration"""
        if self.mode == 'shared':
            if self.total_budget is None:
                raise ValueError("total_budget required for 'shared' mode")
        elif self.mode == 'weighted':
            if self.total_budget is None:
                raise ValueError("total_budget required for 'weighted' mode")
            if self.subsystem_weights is None:
                raise ValueError("subsystem_weights required for 'weighted' mode")
            if not np.isclose(sum(self.subsystem_weights), 1.0):
                raise ValueError('subsystem_weights must sum to 1.0')
            if not 0 < self.system_ratio < 1:
                raise ValueError('system_ratio must be between 0 and 1')
        elif self.mode == 'fixed':
            if self.subsystem_budgets is None:
                raise ValueError("subsystem_budgets required for 'fixed' mode")
            if self.system_budget is None:
                raise ValueError("system_budget required for 'fixed' mode")
        else:
            raise ValueError("mode must be one of 'shared', 'weighted', or 'fixed'")

    def initialize(self, n_subsystems: int):
        """Initialize budget tracking for given number of subsystems"""
        self._subsystem_used = [0] * n_subsystems
        self._system_used = 0
        self._total_used = 0
        if self.mode == 'weighted':
            if len(self.subsystem_weights) != n_subsystems:
                raise ValueError(f'subsystem_weights length ({len(self.subsystem_weights)}) must match n_subsystems ({n_subsystems})')
            subsystem_pool = self.total_budget * (1 - self.system_ratio)
            self._subsystem_allocated = [int(subsystem_pool * w) for w in self.subsystem_weights]
            self._system_allocated = int(self.total_budget * self.system_ratio)
        elif self.mode == 'fixed':
            if len(self.subsystem_budgets) != n_subsystems:
                raise ValueError(f'subsystem_budgets length ({len(self.subsystem_budgets)}) must match n_subsystems ({n_subsystems})')
            self._subsystem_allocated = list(self.subsystem_budgets)
            self._system_allocated = self.system_budget
            self.total_budget = sum(self.subsystem_budgets) + self.system_budget
        elif self.mode == 'shared':
            pass

    def get_subsystem_maxiter(self, subsystem_idx: int) -> int:
        """Get maxiter for a subsystem."""
        if self.mode == 'shared':
            remaining = self.total_budget - self._total_used
            return max(1, remaining)
        allocated = self._subsystem_allocated[subsystem_idx]
        used = self._subsystem_used[subsystem_idx]
        remaining = allocated - used
        if remaining <= 0:
            return 0
        if self.iteration_ratio is not None:
            per_iter = max(1, int(np.floor(allocated * self.iteration_ratio)))
            return min(per_iter, remaining)
        return remaining

    def get_system_maxiter(self) -> int:
        """Get maxiter for system optimization."""
        if self.mode == 'shared':
            remaining = self.total_budget - self._total_used
            return max(1, remaining)
        remaining = self._system_allocated - self._system_used
        if remaining <= 0:
            return 0
        if self.iteration_ratio is not None:
            per_iter = max(1, int(np.floor(self._system_allocated * self.iteration_ratio)))
            return min(per_iter, remaining)
        return remaining

    def record_subsystem_evals(self, subsystem_idx: int, n_evals: int):
        """Record evaluations used by a subsystem"""
        self._subsystem_used[subsystem_idx] += n_evals
        self._total_used += n_evals

    def record_system_evals(self, n_evals: int):
        """Record evaluations used by system"""
        self._system_used += n_evals
        self._total_used += n_evals

    def is_subsystem_exhausted(self, subsystem_idx: int) -> bool:
        """Check if subsystem budget is exhausted"""
        if self.mode == 'shared':
            return self._total_used >= self.total_budget
        return self._subsystem_used[subsystem_idx] >= self._subsystem_allocated[subsystem_idx]

    def is_system_exhausted(self) -> bool:
        """Check if system budget is exhausted"""
        if self.mode == 'shared':
            return self._total_used >= self.total_budget
        return self._system_used >= self._system_allocated

    def is_total_exhausted(self) -> bool:
        """Check if total budget is exhausted"""
        return self._total_used >= self.total_budget

    def get_remaining_total(self) -> int:
        """Get remaining total budget"""
        return max(0, self.total_budget - self._total_used)

    def get_status(self) -> dict:
        """Get current budget status"""
        status = {'mode': self.mode, 'total_budget': self.total_budget, 'total_used': self._total_used, 'total_remaining': self.get_remaining_total(), 'subsystem_used': self._subsystem_used.copy(), 'system_used': self._system_used}
        if self.mode in ['weighted', 'fixed']:
            status['subsystem_allocated'] = self._subsystem_allocated.copy()
            status['system_allocated'] = self._system_allocated
            status['subsystem_remaining'] = [a - u for a, u in zip(self._subsystem_allocated, self._subsystem_used)]
            status['system_remaining'] = self._system_allocated - self._system_used
        return status

    def __str__(self) -> str:
        """Print budget allocation summary"""
        lines = []
        lines.append(f'Mode: {self.mode}')
        lines.append(f'Total Budget: {self.total_budget}')
        if self._subsystem_allocated is None:
            lines.append('(Not initialized - call initialize() first)')
            return '\n'.join(lines)
        if self.mode == 'shared':
            lines.append(f'Max per iteration: {self.total_budget} (shared pool)')
        elif self.mode in ['weighted', 'fixed']:
            lines.append('\nSubsystem Allocations:')
            for i, alloc in enumerate(self._subsystem_allocated):
                max_per_iter = max(1, int(np.floor(alloc * self.iteration_ratio))) if self.iteration_ratio is not None else alloc
                lines.append(f'  Subsystem {i + 1}: {alloc} total, {max_per_iter} max/iter')
            max_sys_per_iter = max(1, int(np.floor(self._system_allocated * self.iteration_ratio))) if self.iteration_ratio is not None else self._system_allocated
            lines.append(f'\nSystem Allocation: {self._system_allocated} total, {max_sys_per_iter} max/iter')
        return '\n'.join(lines)

    def __repr__(self) -> str:
        """Detailed representation of BudgetManager"""
        return f"BudgetManager(mode='{self.mode}', total_budget={self.total_budget}, used={self._total_used})"

__repr__()

Detailed representation of BudgetManager

Source code in src/mdotoolbox/core/base.py
711
712
713
def __repr__(self) -> str:
    """Detailed representation of BudgetManager"""
    return f"BudgetManager(mode='{self.mode}', total_budget={self.total_budget}, used={self._total_used})"

__str__()

Print budget allocation summary

Source code in src/mdotoolbox/core/base.py
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
def __str__(self) -> str:
    """Print budget allocation summary"""
    lines = []
    lines.append(f'Mode: {self.mode}')
    lines.append(f'Total Budget: {self.total_budget}')
    if self._subsystem_allocated is None:
        lines.append('(Not initialized - call initialize() first)')
        return '\n'.join(lines)
    if self.mode == 'shared':
        lines.append(f'Max per iteration: {self.total_budget} (shared pool)')
    elif self.mode in ['weighted', 'fixed']:
        lines.append('\nSubsystem Allocations:')
        for i, alloc in enumerate(self._subsystem_allocated):
            max_per_iter = max(1, int(np.floor(alloc * self.iteration_ratio))) if self.iteration_ratio is not None else alloc
            lines.append(f'  Subsystem {i + 1}: {alloc} total, {max_per_iter} max/iter')
        max_sys_per_iter = max(1, int(np.floor(self._system_allocated * self.iteration_ratio))) if self.iteration_ratio is not None else self._system_allocated
        lines.append(f'\nSystem Allocation: {self._system_allocated} total, {max_sys_per_iter} max/iter')
    return '\n'.join(lines)

get_remaining_total()

Get remaining total budget

Source code in src/mdotoolbox/core/base.py
678
679
680
def get_remaining_total(self) -> int:
    """Get remaining total budget"""
    return max(0, self.total_budget - self._total_used)

get_status()

Get current budget status

Source code in src/mdotoolbox/core/base.py
682
683
684
685
686
687
688
689
690
def get_status(self) -> dict:
    """Get current budget status"""
    status = {'mode': self.mode, 'total_budget': self.total_budget, 'total_used': self._total_used, 'total_remaining': self.get_remaining_total(), 'subsystem_used': self._subsystem_used.copy(), 'system_used': self._system_used}
    if self.mode in ['weighted', 'fixed']:
        status['subsystem_allocated'] = self._subsystem_allocated.copy()
        status['system_allocated'] = self._system_allocated
        status['subsystem_remaining'] = [a - u for a, u in zip(self._subsystem_allocated, self._subsystem_used)]
        status['system_remaining'] = self._system_allocated - self._system_used
    return status

get_subsystem_maxiter(subsystem_idx)

Get maxiter for a subsystem.

Source code in src/mdotoolbox/core/base.py
624
625
626
627
628
629
630
631
632
633
634
635
636
637
def get_subsystem_maxiter(self, subsystem_idx: int) -> int:
    """Get maxiter for a subsystem."""
    if self.mode == 'shared':
        remaining = self.total_budget - self._total_used
        return max(1, remaining)
    allocated = self._subsystem_allocated[subsystem_idx]
    used = self._subsystem_used[subsystem_idx]
    remaining = allocated - used
    if remaining <= 0:
        return 0
    if self.iteration_ratio is not None:
        per_iter = max(1, int(np.floor(allocated * self.iteration_ratio)))
        return min(per_iter, remaining)
    return remaining

get_system_maxiter()

Get maxiter for system optimization.

Source code in src/mdotoolbox/core/base.py
639
640
641
642
643
644
645
646
647
648
649
650
def get_system_maxiter(self) -> int:
    """Get maxiter for system optimization."""
    if self.mode == 'shared':
        remaining = self.total_budget - self._total_used
        return max(1, remaining)
    remaining = self._system_allocated - self._system_used
    if remaining <= 0:
        return 0
    if self.iteration_ratio is not None:
        per_iter = max(1, int(np.floor(self._system_allocated * self.iteration_ratio)))
        return min(per_iter, remaining)
    return remaining

initialize(n_subsystems)

Initialize budget tracking for given number of subsystems

Source code in src/mdotoolbox/core/base.py
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
def initialize(self, n_subsystems: int):
    """Initialize budget tracking for given number of subsystems"""
    self._subsystem_used = [0] * n_subsystems
    self._system_used = 0
    self._total_used = 0
    if self.mode == 'weighted':
        if len(self.subsystem_weights) != n_subsystems:
            raise ValueError(f'subsystem_weights length ({len(self.subsystem_weights)}) must match n_subsystems ({n_subsystems})')
        subsystem_pool = self.total_budget * (1 - self.system_ratio)
        self._subsystem_allocated = [int(subsystem_pool * w) for w in self.subsystem_weights]
        self._system_allocated = int(self.total_budget * self.system_ratio)
    elif self.mode == 'fixed':
        if len(self.subsystem_budgets) != n_subsystems:
            raise ValueError(f'subsystem_budgets length ({len(self.subsystem_budgets)}) must match n_subsystems ({n_subsystems})')
        self._subsystem_allocated = list(self.subsystem_budgets)
        self._system_allocated = self.system_budget
        self.total_budget = sum(self.subsystem_budgets) + self.system_budget
    elif self.mode == 'shared':
        pass

is_subsystem_exhausted(subsystem_idx)

Check if subsystem budget is exhausted

Source code in src/mdotoolbox/core/base.py
662
663
664
665
666
def is_subsystem_exhausted(self, subsystem_idx: int) -> bool:
    """Check if subsystem budget is exhausted"""
    if self.mode == 'shared':
        return self._total_used >= self.total_budget
    return self._subsystem_used[subsystem_idx] >= self._subsystem_allocated[subsystem_idx]

is_system_exhausted()

Check if system budget is exhausted

Source code in src/mdotoolbox/core/base.py
668
669
670
671
672
def is_system_exhausted(self) -> bool:
    """Check if system budget is exhausted"""
    if self.mode == 'shared':
        return self._total_used >= self.total_budget
    return self._system_used >= self._system_allocated

is_total_exhausted()

Check if total budget is exhausted

Source code in src/mdotoolbox/core/base.py
674
675
676
def is_total_exhausted(self) -> bool:
    """Check if total budget is exhausted"""
    return self._total_used >= self.total_budget

record_subsystem_evals(subsystem_idx, n_evals)

Record evaluations used by a subsystem

Source code in src/mdotoolbox/core/base.py
652
653
654
655
def record_subsystem_evals(self, subsystem_idx: int, n_evals: int):
    """Record evaluations used by a subsystem"""
    self._subsystem_used[subsystem_idx] += n_evals
    self._total_used += n_evals

record_system_evals(n_evals)

Record evaluations used by system

Source code in src/mdotoolbox/core/base.py
657
658
659
660
def record_system_evals(self, n_evals: int):
    """Record evaluations used by system"""
    self._system_used += n_evals
    self._total_used += n_evals

Constraint dataclass

Constraint definition for optimization problems.

Source code in src/mdotoolbox/core/base.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
@dataclass
class Constraint:
    """Constraint definition for optimization problems."""
    func: Function
    ctype: str = 'ge'
    value: Union[int, float] = 0.0

    def __post_init__(self):
        if not type_check(self.func, Function):
            raise TypeError('func must be a Function object.')
        if not type_check(self.ctype, str):
            raise TypeError('ctype must be a string.')
        if not type_check(self.value, Union[int, float]):
            raise TypeError('value must be int or float.')
        if self.ctype not in ['ge', 'le', 'eq']:
            raise ValueError('Only:\n\t- ge: greater than;\n\t- le: smaller than;\n\t- eq: equal to;\nare acceptable.')

    def __str__(self) -> str:
        """String representation of Constraint"""
        func_name = self.func.name if self.func.name else 'f'
        vars_str = ', '.join(self.func.x)
        if self.ctype == 'ge':
            op = '>='
        elif self.ctype == 'le':
            op = '<='
        else:
            op = '='
        return f'{func_name}({vars_str}) {op} {self.value}'

    def __repr__(self) -> str:
        """Detailed representation of Constraint"""
        return f"Constraint(type='{self.ctype}', value={self.value}, func={self.func.name})"

__repr__()

Detailed representation of Constraint

Source code in src/mdotoolbox/core/base.py
226
227
228
def __repr__(self) -> str:
    """Detailed representation of Constraint"""
    return f"Constraint(type='{self.ctype}', value={self.value}, func={self.func.name})"

__str__()

String representation of Constraint

Source code in src/mdotoolbox/core/base.py
214
215
216
217
218
219
220
221
222
223
224
def __str__(self) -> str:
    """String representation of Constraint"""
    func_name = self.func.name if self.func.name else 'f'
    vars_str = ', '.join(self.func.x)
    if self.ctype == 'ge':
        op = '>='
    elif self.ctype == 'le':
        op = '<='
    else:
        op = '='
    return f'{func_name}({vars_str}) {op} {self.value}'

DoE dataclass

Design of Experiments (DoE) container for storing sampling data.

Source code in src/mdotoolbox/core/base.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@dataclass
class DoE:
    """Design of Experiments (DoE) container for storing sampling data."""
    x: Union[list, np.ndarray]
    y: Union[dict, list, np.ndarray]
    constraint_violation: Union[np.ndarray, None] = None

    def __post_init__(self):
        """Validate and normalize data after initialization."""
        if type_check(self.y, dict):
            first_key = list(self.y.keys())[0]
            n_outputs = len(self.y[first_key])
            for key, val in self.y.items():
                if not type_check(val, Union[ArrayLike, MatrixLike]):
                    raise TypeError(f"y['{key}'] must be an ArrayLike or a MatrixLike object.")
                if not len(val) == n_outputs:
                    raise ValueError(f"All outputs must have same length. y['{key}'] has length {len(val)}, expected {n_outputs}")
            self.y = {key: np.array(val) for key, val in self.y.items()}
        elif type_check(self.y, ArrayLike):
            n_outputs = len(self.y)
            self.y = {'y': np.array(self.y)}
        if type_check(self.x, MatrixLike):
            if not is_valid_matrix(self.x):
                raise ValueError('x must be a valid MatrixLike object. Verify the number of items in each entry.')
            n_rows = self._get_n_rows(self.x)
            if n_rows != n_outputs:
                raise ValueError(f'Number of rows in x ({n_rows}) must match length of outputs ({n_outputs})')
        elif type_check(self.x, ArrayLike):
            if len(self.x) != n_outputs:
                raise ValueError(f'Length of x ({len(self.x)}) must match length of outputs ({n_outputs})')
        else:
            raise ValueError('x must either be a vector or a matrix.')
        self.x = np.array(self.x)
        if self.constraint_violation is not None:
            self.constraint_violation = np.array(self.constraint_violation).ravel()
            if len(self.constraint_violation) != n_outputs:
                raise ValueError(f'constraint_violation length ({len(self.constraint_violation)}) must match number of samples ({n_outputs})')

    @staticmethod
    def _get_n_rows(matrix: np.array) -> int:
        """Get number of rows in a matrix."""
        if type_check(matrix, np.ndarray):
            return matrix.shape[0]
        return len(matrix)

    def update_DoE(self, x_n: Union[ArrayLike, ScalarLike], y_n: Union[ArrayLike, ScalarLike], constraint_violation_n: Union[float, None]=None, tol: float=1e-10, drop_duplicates: bool=True):
        """Add a new sample point to the Design of Experiments."""
        if not type_check(tol, float):
            raise TypeError('tol must be a float.')
        if not type_check(drop_duplicates, bool):
            raise TypeError('drop_duplicates must be a bool.')
        if isinstance(y_n, dict):
            if set(y_n.keys()) != set(self.y.keys()):
                raise ValueError(f'y_n keys {set(y_n.keys())} must match existing keys {set(self.y.keys())}')
            for key, val in y_n.items():
                if not type_check(val, Union[ArrayLike, ScalarLike]):
                    raise TypeError(f"y_n['{key}'] must be a ScalarLike or an ArrayLike object.")
        elif len(self.y) == 1:
            key = list(self.y.keys())[0]
            y_n = {key: y_n}
        else:
            raise ValueError(f'DoE has multiple outputs {list(self.y.keys())}, y_n must be a dictionary')
        if not type_check(x_n, Union[ArrayLike, ScalarLike]):
            raise TypeError('x_n must be a ScalarLike or an ArrayLike object.')
        x_n = np.array([x_n]).ravel()
        x_0 = np.array([self.x[0]]).ravel()
        if len(x_n) != len(x_0):
            raise ValueError(f'New DoE x input must contain {len(x_0)} values.')
        n_samples = self.x.shape[0]
        duplicates = []
        for i in range(n_samples):
            if np.allclose(self.x[i], x_n, rtol=tol, atol=tol):
                duplicates.append(i)
        if duplicates and drop_duplicates:
            return
        if self.x.ndim == 1:
            self.x = np.append(self.x, x_n)
        else:
            self.x = np.vstack([self.x, x_n.reshape(1, -1)])
        for key in self.y.keys():
            val = np.array(y_n[key]).ravel()
            if self.y[key].ndim == 1:
                self.y[key] = np.append(self.y[key], val)
            else:
                self.y[key] = np.vstack([self.y[key], val.reshape(1, -1)])
        if self.constraint_violation is not None or constraint_violation_n is not None:
            if self.constraint_violation is None:
                self.constraint_violation = np.zeros(n_samples)
            if constraint_violation_n is None:
                constraint_violation_n = 0.0
            self.constraint_violation = np.append(self.constraint_violation, constraint_violation_n)

    def to_dict(self) -> dict:
        """Export DoE data as a dictionary."""
        result = {'x': self.x}
        result.update(self.y)
        if self.constraint_violation is not None:
            result['constraint_violation'] = self.constraint_violation
        return result

    def get_feasible_indices(self, tol: float=1e-06) -> np.ndarray:
        """Get indices of feasible points in the DoE."""
        if self.constraint_violation is None:
            return np.arange(len(self.x))
        return np.where(self.constraint_violation <= tol)[0]

    def get_best_feasible(self, objective_key: str='obj', tol: float=1e-06):
        """Find the best (minimum objective) feasible point in the DoE."""
        feasible_idx = self.get_feasible_indices(tol)
        if len(feasible_idx) == 0:
            min_violation_idx = np.argmin(self.constraint_violation)
            return (self.x[min_violation_idx], self.y[objective_key][min_violation_idx], self.constraint_violation[min_violation_idx])
        feasible_objectives = self.y[objective_key][feasible_idx]
        best_feasible_idx = feasible_idx[np.argmin(feasible_objectives)]
        return (self.x[best_feasible_idx], self.y[objective_key][best_feasible_idx], self.constraint_violation[best_feasible_idx] if self.constraint_violation is not None else 0.0)

    def get_f_min(self, objective_key: str='obj', tol: float=1e-06) -> float:
        """Get the minimum objective value from feasible points."""
        _, f_min, _ = self.get_best_feasible(objective_key, tol)
        return f_min

    def __str__(self) -> str:
        """String representation of DoE"""
        n_samples = len(self.x)
        n_vars = len(self.x[0]) if len(self.x.shape) > 1 else 1
        output_keys = list(self.y.keys())
        has_cv = self.constraint_violation is not None
        lines = []
        lines.append(f'DoE: {n_samples} samples, {n_vars} variables')
        lines.append(f"Outputs: {', '.join(output_keys)}")
        if has_cv:
            n_feasible = len(self.get_feasible_indices())
            lines.append(f'Feasible points: {n_feasible}/{n_samples}')
        return ' | '.join(lines)

    def __repr__(self) -> str:
        """Detailed representation of DoE"""
        return f'DoE(x={self.x.shape}, outputs={list(self.y.keys())}, n_samples={len(self.x)})'

__post_init__()

Validate and normalize data after initialization.

Source code in src/mdotoolbox/core/base.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __post_init__(self):
    """Validate and normalize data after initialization."""
    if type_check(self.y, dict):
        first_key = list(self.y.keys())[0]
        n_outputs = len(self.y[first_key])
        for key, val in self.y.items():
            if not type_check(val, Union[ArrayLike, MatrixLike]):
                raise TypeError(f"y['{key}'] must be an ArrayLike or a MatrixLike object.")
            if not len(val) == n_outputs:
                raise ValueError(f"All outputs must have same length. y['{key}'] has length {len(val)}, expected {n_outputs}")
        self.y = {key: np.array(val) for key, val in self.y.items()}
    elif type_check(self.y, ArrayLike):
        n_outputs = len(self.y)
        self.y = {'y': np.array(self.y)}
    if type_check(self.x, MatrixLike):
        if not is_valid_matrix(self.x):
            raise ValueError('x must be a valid MatrixLike object. Verify the number of items in each entry.')
        n_rows = self._get_n_rows(self.x)
        if n_rows != n_outputs:
            raise ValueError(f'Number of rows in x ({n_rows}) must match length of outputs ({n_outputs})')
    elif type_check(self.x, ArrayLike):
        if len(self.x) != n_outputs:
            raise ValueError(f'Length of x ({len(self.x)}) must match length of outputs ({n_outputs})')
    else:
        raise ValueError('x must either be a vector or a matrix.')
    self.x = np.array(self.x)
    if self.constraint_violation is not None:
        self.constraint_violation = np.array(self.constraint_violation).ravel()
        if len(self.constraint_violation) != n_outputs:
            raise ValueError(f'constraint_violation length ({len(self.constraint_violation)}) must match number of samples ({n_outputs})')

__repr__()

Detailed representation of DoE

Source code in src/mdotoolbox/core/base.py
167
168
169
def __repr__(self) -> str:
    """Detailed representation of DoE"""
    return f'DoE(x={self.x.shape}, outputs={list(self.y.keys())}, n_samples={len(self.x)})'

__str__()

String representation of DoE

Source code in src/mdotoolbox/core/base.py
153
154
155
156
157
158
159
160
161
162
163
164
165
def __str__(self) -> str:
    """String representation of DoE"""
    n_samples = len(self.x)
    n_vars = len(self.x[0]) if len(self.x.shape) > 1 else 1
    output_keys = list(self.y.keys())
    has_cv = self.constraint_violation is not None
    lines = []
    lines.append(f'DoE: {n_samples} samples, {n_vars} variables')
    lines.append(f"Outputs: {', '.join(output_keys)}")
    if has_cv:
        n_feasible = len(self.get_feasible_indices())
        lines.append(f'Feasible points: {n_feasible}/{n_samples}')
    return ' | '.join(lines)

get_best_feasible(objective_key='obj', tol=1e-06)

Find the best (minimum objective) feasible point in the DoE.

Source code in src/mdotoolbox/core/base.py
138
139
140
141
142
143
144
145
146
def get_best_feasible(self, objective_key: str='obj', tol: float=1e-06):
    """Find the best (minimum objective) feasible point in the DoE."""
    feasible_idx = self.get_feasible_indices(tol)
    if len(feasible_idx) == 0:
        min_violation_idx = np.argmin(self.constraint_violation)
        return (self.x[min_violation_idx], self.y[objective_key][min_violation_idx], self.constraint_violation[min_violation_idx])
    feasible_objectives = self.y[objective_key][feasible_idx]
    best_feasible_idx = feasible_idx[np.argmin(feasible_objectives)]
    return (self.x[best_feasible_idx], self.y[objective_key][best_feasible_idx], self.constraint_violation[best_feasible_idx] if self.constraint_violation is not None else 0.0)

get_f_min(objective_key='obj', tol=1e-06)

Get the minimum objective value from feasible points.

Source code in src/mdotoolbox/core/base.py
148
149
150
151
def get_f_min(self, objective_key: str='obj', tol: float=1e-06) -> float:
    """Get the minimum objective value from feasible points."""
    _, f_min, _ = self.get_best_feasible(objective_key, tol)
    return f_min

get_feasible_indices(tol=1e-06)

Get indices of feasible points in the DoE.

Source code in src/mdotoolbox/core/base.py
132
133
134
135
136
def get_feasible_indices(self, tol: float=1e-06) -> np.ndarray:
    """Get indices of feasible points in the DoE."""
    if self.constraint_violation is None:
        return np.arange(len(self.x))
    return np.where(self.constraint_violation <= tol)[0]

to_dict()

Export DoE data as a dictionary.

Source code in src/mdotoolbox/core/base.py
124
125
126
127
128
129
130
def to_dict(self) -> dict:
    """Export DoE data as a dictionary."""
    result = {'x': self.x}
    result.update(self.y)
    if self.constraint_violation is not None:
        result['constraint_violation'] = self.constraint_violation
    return result

update_DoE(x_n, y_n, constraint_violation_n=None, tol=1e-10, drop_duplicates=True)

Add a new sample point to the Design of Experiments.

Source code in src/mdotoolbox/core/base.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def update_DoE(self, x_n: Union[ArrayLike, ScalarLike], y_n: Union[ArrayLike, ScalarLike], constraint_violation_n: Union[float, None]=None, tol: float=1e-10, drop_duplicates: bool=True):
    """Add a new sample point to the Design of Experiments."""
    if not type_check(tol, float):
        raise TypeError('tol must be a float.')
    if not type_check(drop_duplicates, bool):
        raise TypeError('drop_duplicates must be a bool.')
    if isinstance(y_n, dict):
        if set(y_n.keys()) != set(self.y.keys()):
            raise ValueError(f'y_n keys {set(y_n.keys())} must match existing keys {set(self.y.keys())}')
        for key, val in y_n.items():
            if not type_check(val, Union[ArrayLike, ScalarLike]):
                raise TypeError(f"y_n['{key}'] must be a ScalarLike or an ArrayLike object.")
    elif len(self.y) == 1:
        key = list(self.y.keys())[0]
        y_n = {key: y_n}
    else:
        raise ValueError(f'DoE has multiple outputs {list(self.y.keys())}, y_n must be a dictionary')
    if not type_check(x_n, Union[ArrayLike, ScalarLike]):
        raise TypeError('x_n must be a ScalarLike or an ArrayLike object.')
    x_n = np.array([x_n]).ravel()
    x_0 = np.array([self.x[0]]).ravel()
    if len(x_n) != len(x_0):
        raise ValueError(f'New DoE x input must contain {len(x_0)} values.')
    n_samples = self.x.shape[0]
    duplicates = []
    for i in range(n_samples):
        if np.allclose(self.x[i], x_n, rtol=tol, atol=tol):
            duplicates.append(i)
    if duplicates and drop_duplicates:
        return
    if self.x.ndim == 1:
        self.x = np.append(self.x, x_n)
    else:
        self.x = np.vstack([self.x, x_n.reshape(1, -1)])
    for key in self.y.keys():
        val = np.array(y_n[key]).ravel()
        if self.y[key].ndim == 1:
            self.y[key] = np.append(self.y[key], val)
        else:
            self.y[key] = np.vstack([self.y[key], val.reshape(1, -1)])
    if self.constraint_violation is not None or constraint_violation_n is not None:
        if self.constraint_violation is None:
            self.constraint_violation = np.zeros(n_samples)
        if constraint_violation_n is None:
            constraint_violation_n = 0.0
        self.constraint_violation = np.append(self.constraint_violation, constraint_violation_n)

Function dataclass

Callable function wrapper for optimization problems.

Source code in src/mdotoolbox/core/base.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@dataclass
class Function:
    """Callable function wrapper for optimization problems."""
    func: Callable
    x: Union[str, List[str], np.typing.NDArray[str]]
    name: str = ''

    def __post_init__(self):
        if not type_check(self.func, Callable):
            raise TypeError('func must be a callable function.')
        if not type_check(self.x, Union[str, List[str], np.typing.NDArray[str]]):
            raise TypeError('x is a vector therefore x_str must be a list-like of strings')
        self.x = np.array([self.x], dtype=str).ravel()
        if not type_check(self.name, str):
            raise TypeError('name must be a string.')

    def __str__(self) -> str:
        """String representation of Function"""
        name_str = f'{self.name}: ' if self.name else ''
        vars_str = ', '.join(self.x)
        return f'{name_str}f({vars_str})'

    def __repr__(self) -> str:
        """Detailed representation of Function"""
        return f"Function(name='{self.name}', variables={list(self.x)})"

__repr__()

Detailed representation of Function

Source code in src/mdotoolbox/core/base.py
193
194
195
def __repr__(self) -> str:
    """Detailed representation of Function"""
    return f"Function(name='{self.name}', variables={list(self.x)})"

__str__()

String representation of Function

Source code in src/mdotoolbox/core/base.py
187
188
189
190
191
def __str__(self) -> str:
    """String representation of Function"""
    name_str = f'{self.name}: ' if self.name else ''
    vars_str = ', '.join(self.x)
    return f'{name_str}f({vars_str})'

ParetoEntry dataclass

A single entry in the Pareto set.

Source code in src/mdotoolbox/core/base.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
@dataclass
class ParetoEntry:
    """A single entry in the Pareto set."""
    f: float
    h: float
    J_i: float
    z_bar: np.ndarray
    x_bar: np.ndarray
    y_bar: np.ndarray
    code: int

    def metric(self, key: str) -> float:
        return {'f': self.f, 'h': self.h, 'J': self.J_i}[key]

    @property
    def dominated_in_all(self) -> bool:
        return self.code == 0

    @property
    def nondominated_all_spaces(self) -> bool:
        """True if non-dominated in all three bi-objective spaces simultaneously."""
        return self.code == 7

nondominated_all_spaces property

True if non-dominated in all three bi-objective spaces simultaneously.

Problem dataclass

Complete optimization problem definition.

Source code in src/mdotoolbox/core/base.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@dataclass
class Problem:
    """Complete optimization problem definition."""
    objective: Function
    constraints: Iterable
    ubounds: ArrayLike
    lbounds: ArrayLike
    maximize: bool = False
    tol: float = 1e-06
    name: str = ''

    def __post_init__(self):
        """Verify input lengths"""
        if not type_check(self.objective, Function):
            raise TypeError('objective must be a Function object.')
        if not type_check(self.constraints, (list, set, tuple, np.ndarray)):
            raise TypeError('constraints must be a list or array of Constraint objects.')
        for idx, const in enumerate(self.constraints):
            if not type_check(const, Constraint):
                raise TypeError(f'Constraint {idx} must be a Constraint object.')
            if type_check(const.func.x, npt.NDArray):
                if const.func.x.shape != self.objective.x.shape:
                    raise ValueError('The constraints must share the same variables as the objective function.')
            const.func.name = f'c{idx}-{const.ctype}-{const.value}'
        if not type_check(self.ubounds, ArrayLike):
            raise TypeError('ubounds must be an ArrayLike object.')
        if not type_check(self.lbounds, ArrayLike):
            raise TypeError('lbounds must be an ArrayLike object.')
        if len(self.lbounds) != len(self.objective.x):
            raise ValueError(f'There must be {len(self.objective.x)} lower bounds, only {len(self.lbounds)} were given')
        self.lbounds = np.array(self.lbounds)
        if len(self.ubounds) != len(self.objective.x):
            raise ValueError(f'There must be {len(self.objective.x)} upper bounds, only {len(self.ubounds)} were given')
        self.ubounds = np.array(self.ubounds)
        self.bounds = np.array([self.lbounds, self.ubounds]).T
        if not type_check(self.maximize, bool):
            raise TypeError('maximize must be a boolean.')
        if self.maximize:
            self.objective.func = lambda *args: -1.0 * self.objective.func(*args)
        if not type_check(self.tol, float):
            raise TypeError('tol must be a float.')
        if not type_check(self.name, str):
            raise TypeError('name must be a string.')

    def evaluate(self, x, f: bool, c: bool):
        """Evaluate objective and/or constraints at a given point."""
        if f and c:
            f_val = self.objective.func(*x)
            f_arr = np.asarray(f_val)
            if f_arr.size != 1:
                raise ValueError(f"Objective function '{self.objective.name}' returned shape {f_arr.shape}, expected scalar. Callable must return a single numeric value.")
            c_list = []
            for const in self.constraints:
                c_val = const.func.func(*x)
                c_arr = np.asarray(c_val)
                if c_arr.size != 1:
                    raise ValueError(f"Constraint function '{const.func.name}' returned shape {c_arr.shape}, expected scalar. Callable must return a single numeric value.")
                c_list.append(float(c_arr.flat[0]))
            return (f_arr.flat[0], np.array(c_list))
        elif f and (not c):
            f_val = self.objective.func(*x)
            f_arr = np.asarray(f_val)
            if f_arr.size != 1:
                raise ValueError(f"Objective function '{self.objective.name}' returned shape {f_arr.shape}, expected scalar. Callable must return a single numeric value.")
            return (f_arr.flat[0], None)
        elif not f and c:
            c_list = []
            for const in self.constraints:
                c_val = const.func.func(*x)
                c_arr = np.asarray(c_val)
                if c_arr.size != 1:
                    raise ValueError(f"Constraint function '{const.func.name}' returned shape {c_arr.shape}, expected scalar. Callable must return a single numeric value.")
                c_list.append(float(c_arr.flat[0]))
            return (None, np.array(c_list))
        else:
            raise ValueError('You must evaluate either f, or c, or both, but not neither.')

    def initial_DoE(self, n) -> DoE:
        """Generate initial Design of Experiments using Latin Hypercube Sampling."""
        if type_check(n, Callable):
            n = n(len(self.bounds))
        elif type_check(n, (int, float)) and n < 0:
            warnings.warn('n should be a positive integer, converting to absolute value.', stacklevel=2)
            n = abs(n)
        elif type_check(n, float):
            n = int(n)
        elif not type_check(n, int):
            raise ValueError('n must be an integer or a callable function of the number of variables.')
        lims = []
        for bound in self.bounds:
            if np.isfinite(bound[0]) and np.isfinite(bound[1]):
                lims.append(bound)
            elif not np.isfinite(bound[0]) and np.isfinite(bound[1]):
                lims.append([min(-bound[1], bound[1] - 50), bound[1]])
            elif np.isfinite(bound[0]) and (not np.isfinite(bound[1])):
                lims.append([bound[0], max(-bound[0], bound[0] + 50)])
            else:
                lims.append([-100.0, +100.0])
        lims = np.array(lims)
        from smt.sampling_methods import LHS
        x = LHS(xlimits=lims)(n)
        y = {'obj': np.array([self.objective.func(*row) for row in x])}
        yc = {}
        for const in self.constraints:
            yc[const.func.name] = np.array([const.func.func(*row) for row in x])
        y.update(yc)
        if self.constraints:
            constraint_violation = self.compute_constraint_violation(yc)
        else:
            constraint_violation = np.zeros(n)
        doe = DoE(x, y, constraint_violation)
        return doe

    def compute_constraint_violation(self, constraint_values: dict) -> np.ndarray:
        """Compute total constraint violation for each sample point."""
        violation = 0
        if self.constraints:
            for const in self.constraints:
                c_vals = constraint_values[const.func.name]
                if const.ctype == 'ge':
                    violation += np.maximum(0, const.value - c_vals) ** 2
                elif const.ctype == 'le':
                    violation += np.maximum(0, c_vals - const.value) ** 2
                elif const.ctype == 'eq':
                    violation += np.abs(c_vals - const.value) ** 2
        return violation

    def __str__(self) -> str:
        """String representation of Problem"""
        lines = []
        obj_type = 'max' if self.maximize else 'min'
        obj_name = self.objective.name if self.objective.name else 'f'
        vars_str = ', '.join(self.objective.x)
        lines.append(f'{obj_type} {obj_name}({vars_str})')
        if len(self.constraints) > 0:
            lines.append('subject to:')
            for i, const in enumerate(self.constraints):
                func_name = const.func.name if const.func.name else f'c{i}'
                const_vars = ', '.join(const.func.x)
                if const.ctype == 'ge':
                    op = '>='
                elif const.ctype == 'le':
                    op = '<='
                else:
                    op = '='
                lines.append(f'    {func_name}({const_vars}) {op} {const.value}')
        if len(self.objective.x) > 0:
            lines.append('bounds:')
            for i, var in enumerate(self.objective.x):
                lb = self.lbounds[i]
                ub = self.ubounds[i]
                lb_str = f'{lb:.2g}' if np.isfinite(lb) else '-inf'
                ub_str = f'{ub:.2g}' if np.isfinite(ub) else '+inf'
                lines.append(f'    {lb_str} <= {var} <= {ub_str}')
        return '\n'.join(lines)

    def __repr__(self) -> str:
        """Detailed representation of Problem"""
        n_vars = len(self.objective.x)
        n_constraints = len(self.constraints)
        return f"Problem(name='{self.name}', n_vars={n_vars}, n_constraints={n_constraints}, maximize={self.maximize})"

__post_init__()

Verify input lengths

Source code in src/mdotoolbox/core/base.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def __post_init__(self):
    """Verify input lengths"""
    if not type_check(self.objective, Function):
        raise TypeError('objective must be a Function object.')
    if not type_check(self.constraints, (list, set, tuple, np.ndarray)):
        raise TypeError('constraints must be a list or array of Constraint objects.')
    for idx, const in enumerate(self.constraints):
        if not type_check(const, Constraint):
            raise TypeError(f'Constraint {idx} must be a Constraint object.')
        if type_check(const.func.x, npt.NDArray):
            if const.func.x.shape != self.objective.x.shape:
                raise ValueError('The constraints must share the same variables as the objective function.')
        const.func.name = f'c{idx}-{const.ctype}-{const.value}'
    if not type_check(self.ubounds, ArrayLike):
        raise TypeError('ubounds must be an ArrayLike object.')
    if not type_check(self.lbounds, ArrayLike):
        raise TypeError('lbounds must be an ArrayLike object.')
    if len(self.lbounds) != len(self.objective.x):
        raise ValueError(f'There must be {len(self.objective.x)} lower bounds, only {len(self.lbounds)} were given')
    self.lbounds = np.array(self.lbounds)
    if len(self.ubounds) != len(self.objective.x):
        raise ValueError(f'There must be {len(self.objective.x)} upper bounds, only {len(self.ubounds)} were given')
    self.ubounds = np.array(self.ubounds)
    self.bounds = np.array([self.lbounds, self.ubounds]).T
    if not type_check(self.maximize, bool):
        raise TypeError('maximize must be a boolean.')
    if self.maximize:
        self.objective.func = lambda *args: -1.0 * self.objective.func(*args)
    if not type_check(self.tol, float):
        raise TypeError('tol must be a float.')
    if not type_check(self.name, str):
        raise TypeError('name must be a string.')

__repr__()

Detailed representation of Problem

Source code in src/mdotoolbox/core/base.py
386
387
388
389
390
def __repr__(self) -> str:
    """Detailed representation of Problem"""
    n_vars = len(self.objective.x)
    n_constraints = len(self.constraints)
    return f"Problem(name='{self.name}', n_vars={n_vars}, n_constraints={n_constraints}, maximize={self.maximize})"

__str__()

String representation of Problem

Source code in src/mdotoolbox/core/base.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def __str__(self) -> str:
    """String representation of Problem"""
    lines = []
    obj_type = 'max' if self.maximize else 'min'
    obj_name = self.objective.name if self.objective.name else 'f'
    vars_str = ', '.join(self.objective.x)
    lines.append(f'{obj_type} {obj_name}({vars_str})')
    if len(self.constraints) > 0:
        lines.append('subject to:')
        for i, const in enumerate(self.constraints):
            func_name = const.func.name if const.func.name else f'c{i}'
            const_vars = ', '.join(const.func.x)
            if const.ctype == 'ge':
                op = '>='
            elif const.ctype == 'le':
                op = '<='
            else:
                op = '='
            lines.append(f'    {func_name}({const_vars}) {op} {const.value}')
    if len(self.objective.x) > 0:
        lines.append('bounds:')
        for i, var in enumerate(self.objective.x):
            lb = self.lbounds[i]
            ub = self.ubounds[i]
            lb_str = f'{lb:.2g}' if np.isfinite(lb) else '-inf'
            ub_str = f'{ub:.2g}' if np.isfinite(ub) else '+inf'
            lines.append(f'    {lb_str} <= {var} <= {ub_str}')
    return '\n'.join(lines)

compute_constraint_violation(constraint_values)

Compute total constraint violation for each sample point.

Source code in src/mdotoolbox/core/base.py
343
344
345
346
347
348
349
350
351
352
353
354
355
def compute_constraint_violation(self, constraint_values: dict) -> np.ndarray:
    """Compute total constraint violation for each sample point."""
    violation = 0
    if self.constraints:
        for const in self.constraints:
            c_vals = constraint_values[const.func.name]
            if const.ctype == 'ge':
                violation += np.maximum(0, const.value - c_vals) ** 2
            elif const.ctype == 'le':
                violation += np.maximum(0, c_vals - const.value) ** 2
            elif const.ctype == 'eq':
                violation += np.abs(c_vals - const.value) ** 2
    return violation

evaluate(x, f, c)

Evaluate objective and/or constraints at a given point.

Source code in src/mdotoolbox/core/base.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def evaluate(self, x, f: bool, c: bool):
    """Evaluate objective and/or constraints at a given point."""
    if f and c:
        f_val = self.objective.func(*x)
        f_arr = np.asarray(f_val)
        if f_arr.size != 1:
            raise ValueError(f"Objective function '{self.objective.name}' returned shape {f_arr.shape}, expected scalar. Callable must return a single numeric value.")
        c_list = []
        for const in self.constraints:
            c_val = const.func.func(*x)
            c_arr = np.asarray(c_val)
            if c_arr.size != 1:
                raise ValueError(f"Constraint function '{const.func.name}' returned shape {c_arr.shape}, expected scalar. Callable must return a single numeric value.")
            c_list.append(float(c_arr.flat[0]))
        return (f_arr.flat[0], np.array(c_list))
    elif f and (not c):
        f_val = self.objective.func(*x)
        f_arr = np.asarray(f_val)
        if f_arr.size != 1:
            raise ValueError(f"Objective function '{self.objective.name}' returned shape {f_arr.shape}, expected scalar. Callable must return a single numeric value.")
        return (f_arr.flat[0], None)
    elif not f and c:
        c_list = []
        for const in self.constraints:
            c_val = const.func.func(*x)
            c_arr = np.asarray(c_val)
            if c_arr.size != 1:
                raise ValueError(f"Constraint function '{const.func.name}' returned shape {c_arr.shape}, expected scalar. Callable must return a single numeric value.")
            c_list.append(float(c_arr.flat[0]))
        return (None, np.array(c_list))
    else:
        raise ValueError('You must evaluate either f, or c, or both, but not neither.')

initial_DoE(n)

Generate initial Design of Experiments using Latin Hypercube Sampling.

Source code in src/mdotoolbox/core/base.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def initial_DoE(self, n) -> DoE:
    """Generate initial Design of Experiments using Latin Hypercube Sampling."""
    if type_check(n, Callable):
        n = n(len(self.bounds))
    elif type_check(n, (int, float)) and n < 0:
        warnings.warn('n should be a positive integer, converting to absolute value.', stacklevel=2)
        n = abs(n)
    elif type_check(n, float):
        n = int(n)
    elif not type_check(n, int):
        raise ValueError('n must be an integer or a callable function of the number of variables.')
    lims = []
    for bound in self.bounds:
        if np.isfinite(bound[0]) and np.isfinite(bound[1]):
            lims.append(bound)
        elif not np.isfinite(bound[0]) and np.isfinite(bound[1]):
            lims.append([min(-bound[1], bound[1] - 50), bound[1]])
        elif np.isfinite(bound[0]) and (not np.isfinite(bound[1])):
            lims.append([bound[0], max(-bound[0], bound[0] + 50)])
        else:
            lims.append([-100.0, +100.0])
    lims = np.array(lims)
    from smt.sampling_methods import LHS
    x = LHS(xlimits=lims)(n)
    y = {'obj': np.array([self.objective.func(*row) for row in x])}
    yc = {}
    for const in self.constraints:
        yc[const.func.name] = np.array([const.func.func(*row) for row in x])
    y.update(yc)
    if self.constraints:
        constraint_violation = self.compute_constraint_violation(yc)
    else:
        constraint_violation = np.zeros(n)
    doe = DoE(x, y, constraint_violation)
    return doe

Results dataclass

Container for the complete output of an optimization run.

Source code in src/mdotoolbox/core/base.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
@dataclass
class Results:
    """Container for the complete output of an optimization run."""
    best: BestSolution = None
    converged: bool = None
    iterations: int = None
    evaluations: int = None
    elapsed_time: float = None
    history: 'pd.DataFrame' | None = None
    pareto: 'pd.DataFrame' = field(default_factory=lambda: __import__('pandas').DataFrame(columns=PARETO_COLUMNS))

    def __post_init__(self):
        import pandas as pd
        if self.best is None:
            self.best = BestSolution()
        self._pareto_archive: list[ParetoEntry] = []
        if not self.pareto.empty:
            self._pareto_archive = [ParetoEntry(f=row['f_sys'], h=row['h_total'], J_i=row['J_i'], z_bar=row['z_bar'], x_bar=row['x_bar'], y_bar=row['y_bar'], code=int(row['code'])) for _, row in self.pareto.iterrows()]

    def update_pareto(self, f: float, h: float, J_i: float, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> None:
        """Pass the current iterate to the Pareto set update procedure."""
        self._pareto_archive = update_pareto(self._pareto_archive, f=f, h=h, J_i=J_i, z_bar=z_bar, x_bar=x_bar, y_bar=y_bar)
        self.pareto = pareto_archive_to_df(self._pareto_archive)

    @property
    def pareto_all(self) -> pd.DataFrame:
        """Archive entries non-dominated in all three bi-objective spaces."""
        if self.pareto.empty:
            return self.pareto
        return self.pareto[self.pareto['code'] == 7].reset_index(drop=True)

    def __str__(self):
        conv_txt = TextColor.grn + 'Converged' + TextColor.clr if self.converged else TextColor.red + 'Did not converge' + TextColor.clr
        ts = format_time_multi(self.elapsed_time, self.elapsed_time / self.evaluations if self.evaluations else 0, self.elapsed_time / self.iterations if self.iterations else 0)
        results = [80 * '=', f'{conv_txt}', 80 * '-', 'Results:', f'\t- Iterations                          : {self.iterations:>{max(len(str(self.iterations)), len(str(self.evaluations)))}}', f'\t- Evaluations                         : {self.evaluations:>{max(len(str(self.iterations)), len(str(self.evaluations)))}}', f'\t- Elapsed Time                        : {ts[0]}', f'\t- Average Time per Evaluation         : {ts[1]}', f'\t- Average Time per Iteration          : {ts[2]}', 80 * '-', str(self.best), 80 * '=']
        return '\n'.join(results)

    def __repr__(self) -> str:
        status = 'converged' if self.converged else 'not_converged'
        return f'Results({status}, iters={self.iterations}, evals={self.evaluations}, {self.best!r})'

pareto_all property

Archive entries non-dominated in all three bi-objective spaces.

update_pareto(f, h, J_i, z_bar, x_bar, y_bar)

Pass the current iterate to the Pareto set update procedure.

Source code in src/mdotoolbox/core/base.py
541
542
543
544
def update_pareto(self, f: float, h: float, J_i: float, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> None:
    """Pass the current iterate to the Pareto set update procedure."""
    self._pareto_archive = update_pareto(self._pareto_archive, f=f, h=h, J_i=J_i, z_bar=z_bar, x_bar=x_bar, y_bar=y_bar)
    self.pareto = pareto_archive_to_df(self._pareto_archive)

TextColor

ANSI escape sequences for text formatting in terminal

Source code in src/mdotoolbox/core/base.py
715
716
717
718
719
720
721
722
723
724
725
726
727
class TextColor:
    """ANSI escape sequences for text formatting in terminal"""
    clr = '\x1b[0m'
    blk = '\x1b[30m'
    red = '\x1b[31m'
    grn = '\x1b[32m'
    ylw = '\x1b[33m'
    blu = '\x1b[34m'
    mgy = '\x1b[35m'
    cyn = '\x1b[36m'
    wht = '\x1b[37m'
    brw = '\x1b[38;5;94m'
    org = '\x1b[38;5;208m'

pareto_archive_to_df(pareto_set)

Convert an internal Pareto set list to a DataFrame.

Source code in src/mdotoolbox/core/base.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def pareto_archive_to_df(pareto_set: list[ParetoEntry]) -> pd.DataFrame:
    """Convert an internal Pareto set list to a DataFrame."""
    import pandas as pd
    if not pareto_set:
        return pd.DataFrame(columns=PARETO_COLUMNS)
    df = pd.DataFrame([{'f_sys': e.f, 'J_i': e.J_i, 'h_total': e.h, 'z_bar': e.z_bar, 'x_bar': e.x_bar, 'y_bar': e.y_bar, 'code': e.code} for e in pareto_set])
    fh = []
    fj = []
    hj = []
    for entry in df.code:
        if entry == 1:
            fh.append(1)
            fj.append(0)
            hj.append(0)
        elif entry == 2:
            fh.append(0)
            fj.append(1)
            hj.append(0)
        elif entry == 3:
            fh.append(1)
            fj.append(1)
            hj.append(0)
        elif entry == 4:
            fh.append(0)
            fj.append(0)
            hj.append(1)
        elif entry == 5:
            fh.append(1)
            fj.append(0)
            hj.append(1)
        elif entry == 6:
            fh.append(0)
            fj.append(1)
            hj.append(1)
        elif entry == 7:
            fh.append(1)
            fj.append(1)
            hj.append(1)
    df['fh'] = fh
    df['fJ'] = fj
    df['hJ'] = hj
    return df

update_pareto(pareto_set, f, h, J_i, z_bar, x_bar, y_bar)

Insert a new iterate into the Pareto set if it is non-dominated in at least one bi-objective space, and update membership codes of existing entries.

Source code in src/mdotoolbox/core/base.py
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def update_pareto(pareto_set: list[ParetoEntry], f: float, h: float, J_i: float, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> list[ParetoEntry]:
    """Insert a new iterate into the Pareto set if it is non-dominated
    in at least one bi-objective space, and update membership codes of existing
    entries."""
    candidate = {'f': f, 'h': h, 'J': J_i}
    code = 0
    for s, key_a, key_b in _SPACES:
        a = (candidate[key_a], candidate[key_b])
        dominated = any((_dominates((e.metric(key_a), e.metric(key_b)), a) for e in pareto_set if e.code // s % 2 == 1))
        if not dominated:
            code += s
    if code == 0:
        return pareto_set
    for e in pareto_set:
        for s, key_a, key_b in _SPACES:
            if e.code // s % 2 == 1:
                b = (e.metric(key_a), e.metric(key_b))
                a = (candidate[key_a], candidate[key_b])
                if _dominates(a, b):
                    e.code -= s
    pareto_set = [e for e in pareto_set if not e.dominated_in_all]
    pareto_set.append(ParetoEntry(f=f, h=h, J_i=J_i, z_bar=z_bar.copy(), x_bar=x_bar.copy(), y_bar=y_bar.copy(), code=code))
    return pareto_set

Multi-Disciplinary Optimization (MDO) problem definitions.

Discipline dataclass

Single discipline (subsystem) in a Multi-Disciplinary Optimization problem.

Source code in src/mdotoolbox/core/mdo.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@dataclass
class Discipline:
    """Single discipline (subsystem) in a Multi-Disciplinary Optimization problem."""
    name: str
    outputs: Function
    constraints: List[Constraint] = None
    shared_var_indices: np.ndarray = None
    local_var_indices: np.ndarray = None
    coupling_output_indices: np.ndarray = None
    coupling_input_indices: List[np.ndarray] = None

    def __post_init__(self):
        if not type_check(self.name, str):
            raise TypeError('name must be a string')
        if not type_check(self.outputs, Function):
            raise TypeError('outputs must be a Function')
        if self.constraints is None:
            self.constraints = []
        else:
            if not type_check(self.constraints, (list, tuple, np.ndarray)):
                raise TypeError('constraints must be a list of Constraint objects')
            for idx, const in enumerate(self.constraints):
                if not type_check(const, Constraint):
                    raise TypeError(f'Constraint {idx} must be a Constraint object')
        if self.shared_var_indices is not None:
            self.shared_var_indices = np.array(self.shared_var_indices)
        else:
            self.shared_var_indices = np.array([])
        if self.local_var_indices is not None:
            self.local_var_indices = np.array(self.local_var_indices)
        else:
            self.local_var_indices = np.array([])
        if self.coupling_output_indices is not None:
            self.coupling_output_indices = np.array(self.coupling_output_indices)
        else:
            self.coupling_output_indices = np.array([])
        if self.coupling_input_indices is None:
            self.coupling_input_indices = []
        else:
            self.coupling_input_indices = [np.array(idx_arr) for idx_arr in self.coupling_input_indices]

    def evaluate(self, z_under: np.ndarray, x_under: np.ndarray, y_bar_coupled: np.ndarray) -> np.ndarray:
        """Evaluate discipline outputs given current variable values."""
        z_under_local = z_under[self.shared_var_indices] if len(self.shared_var_indices) > 0 else np.array([])
        x_under_local = x_under[self.local_var_indices] if len(self.local_var_indices) > 0 else np.array([])
        inputs = np.concatenate([z_under_local, x_under_local, y_bar_coupled])
        return np.atleast_1d(self.outputs.func(*inputs))

    def evaluate_constraints(self, z_under: np.ndarray, x_under: np.ndarray, y_bar_coupled: np.ndarray) -> np.ndarray:
        """Evaluate discipline-level constraints."""
        if not self.constraints:
            return np.array([])
        z_under_local = z_under[self.shared_var_indices] if len(self.shared_var_indices) > 0 else np.array([])
        x_under_local = x_under[self.local_var_indices] if len(self.local_var_indices) > 0 else np.array([])
        inputs = np.concatenate([z_under_local, x_under_local, y_bar_coupled])
        c_vals = []
        for const in self.constraints:
            c_vals.append(const.func.func(*inputs))
        return np.array(c_vals)

evaluate(z_under, x_under, y_bar_coupled)

Evaluate discipline outputs given current variable values.

Source code in src/mdotoolbox/core/mdo.py
62
63
64
65
66
67
def evaluate(self, z_under: np.ndarray, x_under: np.ndarray, y_bar_coupled: np.ndarray) -> np.ndarray:
    """Evaluate discipline outputs given current variable values."""
    z_under_local = z_under[self.shared_var_indices] if len(self.shared_var_indices) > 0 else np.array([])
    x_under_local = x_under[self.local_var_indices] if len(self.local_var_indices) > 0 else np.array([])
    inputs = np.concatenate([z_under_local, x_under_local, y_bar_coupled])
    return np.atleast_1d(self.outputs.func(*inputs))

evaluate_constraints(z_under, x_under, y_bar_coupled)

Evaluate discipline-level constraints.

Source code in src/mdotoolbox/core/mdo.py
69
70
71
72
73
74
75
76
77
78
79
def evaluate_constraints(self, z_under: np.ndarray, x_under: np.ndarray, y_bar_coupled: np.ndarray) -> np.ndarray:
    """Evaluate discipline-level constraints."""
    if not self.constraints:
        return np.array([])
    z_under_local = z_under[self.shared_var_indices] if len(self.shared_var_indices) > 0 else np.array([])
    x_under_local = x_under[self.local_var_indices] if len(self.local_var_indices) > 0 else np.array([])
    inputs = np.concatenate([z_under_local, x_under_local, y_bar_coupled])
    c_vals = []
    for const in self.constraints:
        c_vals.append(const.func.func(*inputs))
    return np.array(c_vals)

MDOProblem dataclass

Multi-Disciplinary Optimization (MDO) problem definition.

Source code in src/mdotoolbox/core/mdo.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@dataclass
class MDOProblem:
    """Multi-Disciplinary Optimization (MDO) problem definition."""
    system_objective: Function
    disciplines: List[Discipline]
    n_shared: int
    n_local: List[int]
    n_coupling: List[int]
    shared_bounds: np.ndarray
    local_bounds: List[np.ndarray]
    system_constraints: List[Constraint] = None
    maximize: bool = False
    tol: float = 1e-06
    name: str = ''

    def __post_init__(self):
        """Validate MDO problem structure"""
        if not type_check(self.system_objective, Function):
            raise TypeError('system_objective must be a Function')
        if not type_check(self.disciplines, (list, tuple)):
            raise TypeError('disciplines must be a list of Discipline objects')
        for idx, disc in enumerate(self.disciplines):
            if not type_check(disc, Discipline):
                raise TypeError(f'Discipline {idx} must be a Discipline object')
        if not type_check(self.n_shared, int):
            raise TypeError('n_shared must be an integer')
        if self.n_shared < 0:
            raise ValueError('n_shared must be non-negative')
        if not type_check(self.n_local, (list, tuple, np.ndarray)):
            raise TypeError('n_local must be a list of integers')
        if len(self.n_local) != len(self.disciplines):
            raise ValueError('n_local must have one entry per discipline')
        if not type_check(self.n_coupling, (list, tuple, np.ndarray)):
            raise TypeError('n_coupling must be a list of integers')
        if len(self.n_coupling) != len(self.disciplines):
            raise ValueError('n_coupling must have one entry per discipline')
        if not type_check(self.shared_bounds, (list, tuple, np.ndarray)):
            raise TypeError('shared_bounds must be array-like')
        self.shared_bounds = np.array(self.shared_bounds)
        if self.shared_bounds.shape != (self.n_shared, 2):
            raise ValueError(f'shared_bounds must be ({self.n_shared}, 2)')
        if not type_check(self.local_bounds, (list, tuple)):
            raise TypeError('local_bounds must be a list of array-like bounds')
        if len(self.local_bounds) != len(self.disciplines):
            raise ValueError('local_bounds must have one entry per discipline')
        self.local_bounds = [np.array(bounds) for bounds in self.local_bounds]
        for idx, bounds in enumerate(self.local_bounds):
            if bounds.shape != (self.n_local[idx], 2):
                raise ValueError(f'local_bounds[{idx}] must be ({self.n_local[idx]}, 2)')
        if self.system_constraints is None:
            self.system_constraints = []
        else:
            for idx, const in enumerate(self.system_constraints):
                if not type_check(const, Constraint):
                    raise TypeError(f'System constraint {idx} must be a Constraint object')
        if not type_check(self.maximize, bool):
            raise TypeError('maximize must be a boolean')
        if self.maximize:
            self.system_objective.func = lambda *args: -1.0 * self.system_objective.func(*args)
        if not type_check(self.tol, float):
            raise TypeError('tol must be a float')
        if not type_check(self.name, str):
            raise TypeError('name must be a string')
        self._validate_coupling_structure()

    def _validate_coupling_structure(self):
        """Validate that coupling indices are consistent"""
        total_coupling = sum(self.n_coupling)
        for disc in self.disciplines:
            n_outputs = len(disc.coupling_output_indices)
            if not n_outputs <= total_coupling:
                raise ValueError(f'Discipline {disc.name} has more outputs than total coupling variables')
            for coupled_idx_arr in disc.coupling_input_indices:
                if not all((idx < total_coupling for idx in coupled_idx_arr)):
                    raise ValueError(f'Discipline {disc.name} references invalid coupling indices')

    def evaluate_system(self, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> float:
        """Evaluate system objective."""
        inputs = np.concatenate([z_bar, x_bar, y_bar])
        return self.system_objective.func(*inputs)

    def evaluate_disciplines(self, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> Dict[str, np.ndarray]:
        """Evaluate all discipline outputs."""
        outputs = {}
        x_offset = 0
        for disc in self.disciplines:
            n_local_disc = len(disc.local_var_indices)
            y_bar_coupled_disc = []
            for coupled_idx_arr in disc.coupling_input_indices:
                for idx in coupled_idx_arr:
                    y_bar_coupled_disc.append(y_bar[idx])
            y_bar_coupled_disc = np.array(y_bar_coupled_disc) if y_bar_coupled_disc else np.array([])
            x_bar_disc = x_bar[x_offset:x_offset + n_local_disc] if n_local_disc > 0 else np.array([])
            outputs[disc.name] = disc.evaluate(z_bar, x_bar_disc, y_bar_coupled_disc)
            x_offset += n_local_disc
        return outputs

    def compute_coupling_residuals(self, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> np.ndarray:
        """Compute coupling residuals: ||y_computed - y_bar||^2"""
        outputs = self.evaluate_disciplines(z_bar, x_bar, y_bar)
        residuals = []
        for disc in self.disciplines:
            y_computed = outputs[disc.name]
            y_bar_target = y_bar[disc.coupling_output_indices]
            if len(y_bar_target) > 0:
                residual = np.linalg.norm(y_computed - y_bar_target) ** 2
            else:
                residual = 0.0
            residuals.append(residual)
        return np.array(residuals)

__post_init__()

Validate MDO problem structure

Source code in src/mdotoolbox/core/mdo.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def __post_init__(self):
    """Validate MDO problem structure"""
    if not type_check(self.system_objective, Function):
        raise TypeError('system_objective must be a Function')
    if not type_check(self.disciplines, (list, tuple)):
        raise TypeError('disciplines must be a list of Discipline objects')
    for idx, disc in enumerate(self.disciplines):
        if not type_check(disc, Discipline):
            raise TypeError(f'Discipline {idx} must be a Discipline object')
    if not type_check(self.n_shared, int):
        raise TypeError('n_shared must be an integer')
    if self.n_shared < 0:
        raise ValueError('n_shared must be non-negative')
    if not type_check(self.n_local, (list, tuple, np.ndarray)):
        raise TypeError('n_local must be a list of integers')
    if len(self.n_local) != len(self.disciplines):
        raise ValueError('n_local must have one entry per discipline')
    if not type_check(self.n_coupling, (list, tuple, np.ndarray)):
        raise TypeError('n_coupling must be a list of integers')
    if len(self.n_coupling) != len(self.disciplines):
        raise ValueError('n_coupling must have one entry per discipline')
    if not type_check(self.shared_bounds, (list, tuple, np.ndarray)):
        raise TypeError('shared_bounds must be array-like')
    self.shared_bounds = np.array(self.shared_bounds)
    if self.shared_bounds.shape != (self.n_shared, 2):
        raise ValueError(f'shared_bounds must be ({self.n_shared}, 2)')
    if not type_check(self.local_bounds, (list, tuple)):
        raise TypeError('local_bounds must be a list of array-like bounds')
    if len(self.local_bounds) != len(self.disciplines):
        raise ValueError('local_bounds must have one entry per discipline')
    self.local_bounds = [np.array(bounds) for bounds in self.local_bounds]
    for idx, bounds in enumerate(self.local_bounds):
        if bounds.shape != (self.n_local[idx], 2):
            raise ValueError(f'local_bounds[{idx}] must be ({self.n_local[idx]}, 2)')
    if self.system_constraints is None:
        self.system_constraints = []
    else:
        for idx, const in enumerate(self.system_constraints):
            if not type_check(const, Constraint):
                raise TypeError(f'System constraint {idx} must be a Constraint object')
    if not type_check(self.maximize, bool):
        raise TypeError('maximize must be a boolean')
    if self.maximize:
        self.system_objective.func = lambda *args: -1.0 * self.system_objective.func(*args)
    if not type_check(self.tol, float):
        raise TypeError('tol must be a float')
    if not type_check(self.name, str):
        raise TypeError('name must be a string')
    self._validate_coupling_structure()

compute_coupling_residuals(z_bar, x_bar, y_bar)

Compute coupling residuals: ||y_computed - y_bar||^2

Source code in src/mdotoolbox/core/mdo.py
178
179
180
181
182
183
184
185
186
187
188
189
190
def compute_coupling_residuals(self, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> np.ndarray:
    """Compute coupling residuals: ||y_computed - y_bar||^2"""
    outputs = self.evaluate_disciplines(z_bar, x_bar, y_bar)
    residuals = []
    for disc in self.disciplines:
        y_computed = outputs[disc.name]
        y_bar_target = y_bar[disc.coupling_output_indices]
        if len(y_bar_target) > 0:
            residual = np.linalg.norm(y_computed - y_bar_target) ** 2
        else:
            residual = 0.0
        residuals.append(residual)
    return np.array(residuals)

evaluate_disciplines(z_bar, x_bar, y_bar)

Evaluate all discipline outputs.

Source code in src/mdotoolbox/core/mdo.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def evaluate_disciplines(self, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> Dict[str, np.ndarray]:
    """Evaluate all discipline outputs."""
    outputs = {}
    x_offset = 0
    for disc in self.disciplines:
        n_local_disc = len(disc.local_var_indices)
        y_bar_coupled_disc = []
        for coupled_idx_arr in disc.coupling_input_indices:
            for idx in coupled_idx_arr:
                y_bar_coupled_disc.append(y_bar[idx])
        y_bar_coupled_disc = np.array(y_bar_coupled_disc) if y_bar_coupled_disc else np.array([])
        x_bar_disc = x_bar[x_offset:x_offset + n_local_disc] if n_local_disc > 0 else np.array([])
        outputs[disc.name] = disc.evaluate(z_bar, x_bar_disc, y_bar_coupled_disc)
        x_offset += n_local_disc
    return outputs

evaluate_system(z_bar, x_bar, y_bar)

Evaluate system objective.

Source code in src/mdotoolbox/core/mdo.py
157
158
159
160
def evaluate_system(self, z_bar: np.ndarray, x_bar: np.ndarray, y_bar: np.ndarray) -> float:
    """Evaluate system objective."""
    inputs = np.concatenate([z_bar, x_bar, y_bar])
    return self.system_objective.func(*inputs)

Frameworks

src/mdotoolbox/frameworks/baco.py

BACOSubsystem dataclass

Subsystem for Bayesian Collaborative Optimization framework.

Source code in src/mdotoolbox/frameworks/baco.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
@dataclass
class BACOSubsystem:
    """Subsystem for Bayesian Collaborative Optimization framework."""
    problem: Problem
    z_idxs: np.ndarray
    x_idxs: np.ndarray
    y_idxs: np.ndarray
    y_coupled_idxs: List[np.ndarray]
    surrogate_config: Union[SMTGPConfig, TorchGPConfig, None] = None

    def __post_init__(self):
        self.z_idxs = np.array(self.z_idxs)
        self.x_idxs = np.array(self.x_idxs)
        self.y_idxs = np.array(self.y_idxs)
        if self.surrogate_config is None:
            self.surrogate_config = SMTGPConfig()
        self.doe_J_i = None
        self.doe_g_i = None
        self.gp_J_i = None
        self.gp_g_i = {}
        self.z_under_i = None
        self.x_under_i = None
        self.y_i = None
        self.best_J_i = np.inf
        self.best_h = np.inf

    def initialize_doe(self, n_samples, z_bar, x_bar, y_bar):
        """Initialize Design of Experiments using Latin Hypercube Sampling."""
        nz = len(self.z_idxs)
        nx = len(self.x_idxs)
        if type_check(n_samples, Callable):
            n_samples = n_samples(nz + nx)
        elif type_check(n_samples, (int, float)) and n_samples < 0:
            warnings.warn('n should be a positive integer, converting to absolute value.', stacklevel=2)
            n_samples = abs(n_samples)
        elif type_check(n_samples, float):
            n_samples = int(n_samples)
        elif not type_check(n_samples, int):
            raise ValueError('n must be an integer or a callable function of the number of variables.')
        if n_samples < 2:
            raise ValueError(f'n_samples must be >= 2 for KPLSK training (got {n_samples}). KPLSK requires at least 2 training points.')
        from smt.sampling_methods import LHS
        bounds_local = self.problem.bounds[:nz + nx]
        if self.z_under_i is not None and self.x_under_i is not None:
            initial_guess_local = np.concatenate([self.z_under_i, self.x_under_i])
            lhs_samples = LHS(xlimits=bounds_local, seed=42)(n_samples)
            samples = np.vstack([initial_guess_local, lhs_samples])
        else:
            samples = LHS(xlimits=bounds_local, seed=42)(n_samples)
        x_bar_i = x_bar[self.x_idxs] if len(self.x_idxs) > 0 else np.array([])
        y_bar_i = y_bar[self.y_idxs]
        y_bar_coupled = []
        for coupled_idx_array in self.y_coupled_idxs:
            for idx in coupled_idx_array:
                y_bar_coupled.append(y_bar[idx])
        y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
        J_i_vals = []
        yi_vals = []
        h_i_vals = []
        x_J_i = []
        x_g_i = []
        g_i_vals = {const.func.name: [] for const in self.problem.constraints}
        for sample in samples:
            z_under_i = sample[:nz]
            x_under_i = sample[nz:] if nx > 0 else np.array([])
            J_i_val, y_i = compute_Ji(self.problem.objective.func, z_bar[self.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, z_under_i, x_under_i)
            if not np.isfinite(J_i_val):
                warnings.warn('True function evaluation returned NaN/Inf. Skipping DoE update for this sample.', UserWarning, stacklevel=2)
                continue
            J_i_vals.append(J_i_val)
            yi_vals.append(y_i.item() if y_i.size == 1 else y_i)
            x_entry = np.concatenate([z_bar, x_bar_i, y_bar_i, y_bar_coupled, z_under_i, x_under_i])
            x_J_i.append(x_entry)
            input_vars = np.concatenate([z_under_i, x_under_i, y_bar_coupled])
            _, c_vals = self.problem.evaluate(input_vars, f=False, c=True)
            c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
            h_i_val = self.problem.compute_constraint_violation(c_vals_dict)
            h_i_vals.append(h_i_val)
            for i, const in enumerate(self.problem.constraints):
                g_i_vals[const.func.name].append(c_vals[i])
            if self.problem.constraints:
                x_g_i.append(np.concatenate([z_under_i, x_under_i, y_bar_coupled]))
        self.doe_J_i = DoE(x=np.array(x_J_i), y={'J_i': np.array(J_i_vals), 'y_i': np.array(yi_vals)}, constraint_violation=np.array(h_i_vals))
        if self.problem.constraints:
            g_i_y = {name: np.array(vals) for name, vals in g_i_vals.items()}
            self.doe_g_i = DoE(x=np.array(x_g_i), y=g_i_y)

    def build_surrogate_J_i(self):
        """Train Gaussian Process surrogate for discrepancy J_i."""
        self.gp_J_i = build_surrogate(self.doe_J_i, y_key='J_i', config=self.surrogate_config)

    def build_surrogate_g_i(self):
        """Train Gaussian Process surrogates for all subsystem constraints."""
        if self.problem.constraints:
            for const in self.problem.constraints:
                self.gp_g_i[const.func.name] = build_surrogate(self.doe_g_i, y_key=const.func.name, config=self.surrogate_config)

    def solve_acquisition(self, z_bar, x_bar, y_bar, optimizer: Union[str, Callable], acq_func: Callable, n_multistart: int=10):
        """Optimize acquisition function to find next evaluation point."""
        _start_time = time.time()
        x_bar_i = x_bar[self.x_idxs] if len(self.x_idxs) > 0 else np.array([])
        y_bar_i = y_bar[self.y_idxs]
        y_bar_coupled = []
        for coupled_idx_array in self.y_coupled_idxs:
            for idx in coupled_idx_array:
                y_bar_coupled.append(y_bar[idx])
        y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
        nz = len(self.z_idxs)
        nx = len(self.x_idxs)
        _j_i_off_z_bar = len(z_bar)
        _j_i_off_x_bar_i = _j_i_off_z_bar + len(x_bar_i)
        _j_i_off_y_bar_i = _j_i_off_x_bar_i + len(y_bar_i)
        _j_i_off_y_bar_coupled = _j_i_off_y_bar_i + len(y_bar_coupled)
        _j_i_off_z_under_i = _j_i_off_y_bar_coupled + nz
        _j_i_size = _j_i_off_z_under_i + nx
        _template_J_i = np.empty(_j_i_size)
        _template_J_i[:_j_i_off_z_bar] = z_bar
        if len(x_bar_i) > 0:
            _template_J_i[_j_i_off_z_bar:_j_i_off_x_bar_i] = x_bar_i
        _template_J_i[_j_i_off_x_bar_i:_j_i_off_y_bar_i] = y_bar_i
        if len(y_bar_coupled) > 0:
            _template_J_i[_j_i_off_y_bar_i:_j_i_off_y_bar_coupled] = y_bar_coupled
        _g_i_size = nz + nx + len(y_bar_coupled)
        _template_g_i = np.empty(_g_i_size)
        if len(y_bar_coupled) > 0:
            _template_g_i[nz + nx:] = y_bar_coupled
        if self.best_J_i == np.inf:
            self.best_J_i = self.doe_J_i.get_f_min(objective_key='J_i', tol=self.problem.tol)
        bounds = self.problem.bounds[:nz + nx]
        from smt.sampling_methods import LHS
        lhs_starts = LHS(xlimits=bounds, seed=42)(n_multistart - 1)
        x0_default = np.concatenate([z_bar, x_bar_i])
        start_points = [x0_default] + [lhs_starts[i] for i in range(n_multistart - 1)]
        from ..optimizers import get_optimizer
        if isinstance(optimizer, str):
            opt = get_optimizer(optimizer)
        else:
            opt = optimizer
        from joblib import Parallel, delayed
        if n_multistart >= 4:
            start_results = Parallel(n_jobs=-1, backend='loky')((delayed(_run_one_start_subsystem)(x0, opt, acq_func, bounds, self.gp_J_i, self.gp_g_i, self.problem.constraints, self.best_J_i, self.problem.tol, _template_J_i.copy(), _template_g_i.copy(), (_j_i_off_z_bar, _j_i_off_x_bar_i, _j_i_off_y_bar_i, _j_i_off_y_bar_coupled, _j_i_off_z_under_i), nz, nx) for x0 in start_points))
        else:
            start_results = [_run_one_start_subsystem(x0, opt, acq_func, bounds, self.gp_J_i, self.gp_g_i, self.problem.constraints, self.best_J_i, self.problem.tol, _template_J_i.copy(), _template_g_i.copy(), (_j_i_off_z_bar, _j_i_off_x_bar_i, _j_i_off_y_bar_i, _j_i_off_y_bar_coupled, _j_i_off_z_under_i), nz, nx) for x0 in start_points]
        best_result = None
        best_acq_val = np.inf
        for result_x, acq_val in start_results:
            if result_x is not None and acq_val < best_acq_val:
                best_acq_val = acq_val
                best_result = result_x
        if best_result is None:
            warnings.warn(f'All {len(start_points)} multi-start acquisition attempts failed. Falling back to x0_default. Check optimizer and constraint configuration.', UserWarning, stacklevel=2)
            best_result = x0_default
        best_result = clip_to_bounds(best_result, bounds)
        self.z_under_i = best_result[:nz]
        self.x_under_i = best_result[nz:] if nx > 0 else np.array([])
        J_i_val, self.y_i = compute_Ji(self.problem.objective.func, z_bar[self.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, self.z_under_i, self.x_under_i)
        input_vars = np.concatenate([self.z_under_i, self.x_under_i, y_bar_coupled])
        _, c_vals = self.problem.evaluate(input_vars, f=False, c=True)
        c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
        h_i = self.problem.compute_constraint_violation(c_vals_dict)
        if assess_progress(h=h_i, h_best=self.best_h, J=0.0, J_best=0.0, f=J_i_val, f_best=self.best_J_i):
            self.best_h = h_i
            self.best_J_i = J_i_val
        x_J_i_new = np.concatenate([z_bar, x_bar_i, y_bar_i, y_bar_coupled, self.z_under_i, self.x_under_i])
        yi_val = self.y_i.item() if self.y_i.size == 1 else self.y_i
        self.doe_J_i.update_DoE(x_J_i_new, {'J_i': J_i_val, 'y_i': yi_val})
        if self.problem.constraints:
            x_g_i_new = np.concatenate([self.z_under_i, self.x_under_i, y_bar_coupled])
            self.doe_g_i.update_DoE(x_g_i_new, c_vals_dict)
        elapsed = time.time() - _start_time
        return (self.z_under_i, self.x_under_i, self.y_i, J_i_val, h_i, elapsed)

    def _describe_setup(self):
        return '\n'.join([f'problem: {self.problem}', f'z_idxs: {self.z_idxs}', f'x_idxs: {self.x_idxs}', f'y_idxs: {self.y_idxs}', f'y_coupled_idxs: {self.y_coupled_idxs}', f'surrogate_config: {self.surrogate_config}'])

build_surrogate_J_i()

Train Gaussian Process surrogate for discrepancy J_i.

Source code in src/mdotoolbox/frameworks/baco.py
237
238
239
def build_surrogate_J_i(self):
    """Train Gaussian Process surrogate for discrepancy J_i."""
    self.gp_J_i = build_surrogate(self.doe_J_i, y_key='J_i', config=self.surrogate_config)

build_surrogate_g_i()

Train Gaussian Process surrogates for all subsystem constraints.

Source code in src/mdotoolbox/frameworks/baco.py
241
242
243
244
245
def build_surrogate_g_i(self):
    """Train Gaussian Process surrogates for all subsystem constraints."""
    if self.problem.constraints:
        for const in self.problem.constraints:
            self.gp_g_i[const.func.name] = build_surrogate(self.doe_g_i, y_key=const.func.name, config=self.surrogate_config)

initialize_doe(n_samples, z_bar, x_bar, y_bar)

Initialize Design of Experiments using Latin Hypercube Sampling.

Source code in src/mdotoolbox/frameworks/baco.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def initialize_doe(self, n_samples, z_bar, x_bar, y_bar):
    """Initialize Design of Experiments using Latin Hypercube Sampling."""
    nz = len(self.z_idxs)
    nx = len(self.x_idxs)
    if type_check(n_samples, Callable):
        n_samples = n_samples(nz + nx)
    elif type_check(n_samples, (int, float)) and n_samples < 0:
        warnings.warn('n should be a positive integer, converting to absolute value.', stacklevel=2)
        n_samples = abs(n_samples)
    elif type_check(n_samples, float):
        n_samples = int(n_samples)
    elif not type_check(n_samples, int):
        raise ValueError('n must be an integer or a callable function of the number of variables.')
    if n_samples < 2:
        raise ValueError(f'n_samples must be >= 2 for KPLSK training (got {n_samples}). KPLSK requires at least 2 training points.')
    from smt.sampling_methods import LHS
    bounds_local = self.problem.bounds[:nz + nx]
    if self.z_under_i is not None and self.x_under_i is not None:
        initial_guess_local = np.concatenate([self.z_under_i, self.x_under_i])
        lhs_samples = LHS(xlimits=bounds_local, seed=42)(n_samples)
        samples = np.vstack([initial_guess_local, lhs_samples])
    else:
        samples = LHS(xlimits=bounds_local, seed=42)(n_samples)
    x_bar_i = x_bar[self.x_idxs] if len(self.x_idxs) > 0 else np.array([])
    y_bar_i = y_bar[self.y_idxs]
    y_bar_coupled = []
    for coupled_idx_array in self.y_coupled_idxs:
        for idx in coupled_idx_array:
            y_bar_coupled.append(y_bar[idx])
    y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
    J_i_vals = []
    yi_vals = []
    h_i_vals = []
    x_J_i = []
    x_g_i = []
    g_i_vals = {const.func.name: [] for const in self.problem.constraints}
    for sample in samples:
        z_under_i = sample[:nz]
        x_under_i = sample[nz:] if nx > 0 else np.array([])
        J_i_val, y_i = compute_Ji(self.problem.objective.func, z_bar[self.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, z_under_i, x_under_i)
        if not np.isfinite(J_i_val):
            warnings.warn('True function evaluation returned NaN/Inf. Skipping DoE update for this sample.', UserWarning, stacklevel=2)
            continue
        J_i_vals.append(J_i_val)
        yi_vals.append(y_i.item() if y_i.size == 1 else y_i)
        x_entry = np.concatenate([z_bar, x_bar_i, y_bar_i, y_bar_coupled, z_under_i, x_under_i])
        x_J_i.append(x_entry)
        input_vars = np.concatenate([z_under_i, x_under_i, y_bar_coupled])
        _, c_vals = self.problem.evaluate(input_vars, f=False, c=True)
        c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
        h_i_val = self.problem.compute_constraint_violation(c_vals_dict)
        h_i_vals.append(h_i_val)
        for i, const in enumerate(self.problem.constraints):
            g_i_vals[const.func.name].append(c_vals[i])
        if self.problem.constraints:
            x_g_i.append(np.concatenate([z_under_i, x_under_i, y_bar_coupled]))
    self.doe_J_i = DoE(x=np.array(x_J_i), y={'J_i': np.array(J_i_vals), 'y_i': np.array(yi_vals)}, constraint_violation=np.array(h_i_vals))
    if self.problem.constraints:
        g_i_y = {name: np.array(vals) for name, vals in g_i_vals.items()}
        self.doe_g_i = DoE(x=np.array(x_g_i), y=g_i_y)

solve_acquisition(z_bar, x_bar, y_bar, optimizer, acq_func, n_multistart=10)

Optimize acquisition function to find next evaluation point.

Source code in src/mdotoolbox/frameworks/baco.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def solve_acquisition(self, z_bar, x_bar, y_bar, optimizer: Union[str, Callable], acq_func: Callable, n_multistart: int=10):
    """Optimize acquisition function to find next evaluation point."""
    _start_time = time.time()
    x_bar_i = x_bar[self.x_idxs] if len(self.x_idxs) > 0 else np.array([])
    y_bar_i = y_bar[self.y_idxs]
    y_bar_coupled = []
    for coupled_idx_array in self.y_coupled_idxs:
        for idx in coupled_idx_array:
            y_bar_coupled.append(y_bar[idx])
    y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
    nz = len(self.z_idxs)
    nx = len(self.x_idxs)
    _j_i_off_z_bar = len(z_bar)
    _j_i_off_x_bar_i = _j_i_off_z_bar + len(x_bar_i)
    _j_i_off_y_bar_i = _j_i_off_x_bar_i + len(y_bar_i)
    _j_i_off_y_bar_coupled = _j_i_off_y_bar_i + len(y_bar_coupled)
    _j_i_off_z_under_i = _j_i_off_y_bar_coupled + nz
    _j_i_size = _j_i_off_z_under_i + nx
    _template_J_i = np.empty(_j_i_size)
    _template_J_i[:_j_i_off_z_bar] = z_bar
    if len(x_bar_i) > 0:
        _template_J_i[_j_i_off_z_bar:_j_i_off_x_bar_i] = x_bar_i
    _template_J_i[_j_i_off_x_bar_i:_j_i_off_y_bar_i] = y_bar_i
    if len(y_bar_coupled) > 0:
        _template_J_i[_j_i_off_y_bar_i:_j_i_off_y_bar_coupled] = y_bar_coupled
    _g_i_size = nz + nx + len(y_bar_coupled)
    _template_g_i = np.empty(_g_i_size)
    if len(y_bar_coupled) > 0:
        _template_g_i[nz + nx:] = y_bar_coupled
    if self.best_J_i == np.inf:
        self.best_J_i = self.doe_J_i.get_f_min(objective_key='J_i', tol=self.problem.tol)
    bounds = self.problem.bounds[:nz + nx]
    from smt.sampling_methods import LHS
    lhs_starts = LHS(xlimits=bounds, seed=42)(n_multistart - 1)
    x0_default = np.concatenate([z_bar, x_bar_i])
    start_points = [x0_default] + [lhs_starts[i] for i in range(n_multistart - 1)]
    from ..optimizers import get_optimizer
    if isinstance(optimizer, str):
        opt = get_optimizer(optimizer)
    else:
        opt = optimizer
    from joblib import Parallel, delayed
    if n_multistart >= 4:
        start_results = Parallel(n_jobs=-1, backend='loky')((delayed(_run_one_start_subsystem)(x0, opt, acq_func, bounds, self.gp_J_i, self.gp_g_i, self.problem.constraints, self.best_J_i, self.problem.tol, _template_J_i.copy(), _template_g_i.copy(), (_j_i_off_z_bar, _j_i_off_x_bar_i, _j_i_off_y_bar_i, _j_i_off_y_bar_coupled, _j_i_off_z_under_i), nz, nx) for x0 in start_points))
    else:
        start_results = [_run_one_start_subsystem(x0, opt, acq_func, bounds, self.gp_J_i, self.gp_g_i, self.problem.constraints, self.best_J_i, self.problem.tol, _template_J_i.copy(), _template_g_i.copy(), (_j_i_off_z_bar, _j_i_off_x_bar_i, _j_i_off_y_bar_i, _j_i_off_y_bar_coupled, _j_i_off_z_under_i), nz, nx) for x0 in start_points]
    best_result = None
    best_acq_val = np.inf
    for result_x, acq_val in start_results:
        if result_x is not None and acq_val < best_acq_val:
            best_acq_val = acq_val
            best_result = result_x
    if best_result is None:
        warnings.warn(f'All {len(start_points)} multi-start acquisition attempts failed. Falling back to x0_default. Check optimizer and constraint configuration.', UserWarning, stacklevel=2)
        best_result = x0_default
    best_result = clip_to_bounds(best_result, bounds)
    self.z_under_i = best_result[:nz]
    self.x_under_i = best_result[nz:] if nx > 0 else np.array([])
    J_i_val, self.y_i = compute_Ji(self.problem.objective.func, z_bar[self.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, self.z_under_i, self.x_under_i)
    input_vars = np.concatenate([self.z_under_i, self.x_under_i, y_bar_coupled])
    _, c_vals = self.problem.evaluate(input_vars, f=False, c=True)
    c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
    h_i = self.problem.compute_constraint_violation(c_vals_dict)
    if assess_progress(h=h_i, h_best=self.best_h, J=0.0, J_best=0.0, f=J_i_val, f_best=self.best_J_i):
        self.best_h = h_i
        self.best_J_i = J_i_val
    x_J_i_new = np.concatenate([z_bar, x_bar_i, y_bar_i, y_bar_coupled, self.z_under_i, self.x_under_i])
    yi_val = self.y_i.item() if self.y_i.size == 1 else self.y_i
    self.doe_J_i.update_DoE(x_J_i_new, {'J_i': J_i_val, 'y_i': yi_val})
    if self.problem.constraints:
        x_g_i_new = np.concatenate([self.z_under_i, self.x_under_i, y_bar_coupled])
        self.doe_g_i.update_DoE(x_g_i_new, c_vals_dict)
    elapsed = time.time() - _start_time
    return (self.z_under_i, self.x_under_i, self.y_i, J_i_val, h_i, elapsed)

BACOSystem dataclass

System-level problem for BACO

Source code in src/mdotoolbox/frameworks/baco.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
@dataclass
class BACOSystem:
    """System-level problem for BACO"""
    problem: Problem
    subsystems: List[BACOSubsystem]
    surrogate_config: Union[SMTGPConfig, TorchGPConfig, None] = None

    def __post_init__(self):
        if self.surrogate_config is None:
            from ..surrogates import SMTGPConfig
            self.surrogate_config = SMTGPConfig()
        self.z = None
        self.x_bar = None
        self.y_bar = None
        self.doe_sys = None
        self.gp_f = None
        self.gp_c = {}
        self.best_J = np.inf
        self.best_h = np.inf
        self.best_f = np.inf

    def initialize_doe(self, n_samples):
        """Initialize system DoE"""
        if self.z is not None and self.x_bar is not None and (self.y_bar is not None):
            x0 = np.concatenate([self.z, self.x_bar, self.y_bar])
            f0, c0 = self.problem.evaluate(x0, f=True, c=True)
            c0_dict = {const.func.name: c0[i] for i, const in enumerate(self.problem.constraints)}
            h0 = self.problem.compute_constraint_violation(c0_dict)
            doe_rest = self.problem.initial_DoE(n_samples)
            x_all = np.vstack([x0, doe_rest.x])
            y_all = {'obj': np.concatenate([[f0], doe_rest.y['obj']])}
            for key in c0_dict:
                y_all[key] = np.concatenate([[c0_dict[key]], doe_rest.y[key]])
            h_all = np.concatenate([[h0], doe_rest.constraint_violation])
            self.doe_sys = DoE(x_all, y_all, h_all)
        else:
            self.doe_sys = self.problem.initial_DoE(n_samples)
        sample = self.doe_sys.x[0]
        nz = len(np.unique(np.concatenate([sub.z_idxs for sub in self.subsystems])))
        nx = sum((len(sub.x_idxs) for sub in self.subsystems))
        self.z = sample[:nz]
        self.x_bar = sample[nz:nz + nx]
        self.y_bar = sample[nz + nx:]

    def build_surrogate_system(self):
        """Build GPs for system objective and constraints"""
        self.gp_f = build_surrogate(self.doe_sys, y_key='obj', config=self.surrogate_config)
        for const in self.problem.constraints:
            self.gp_c[const.func.name] = build_surrogate(self.doe_sys, y_key=const.func.name, config=self.surrogate_config)

    def solve_acquisition(self, optimizer: Union[str, Callable], acq_func: Callable, f_min: float, n_multistart: int=10, epsilon_J: float=1.0):
        """Solve system: maximize alpha_f subject to mu_c >= 0, mu_J_i <= epsilon_J"""
        _start_time = time.time()
        fit_tasks = []
        fit_tasks.append(('gp_f', self.doe_sys, 'obj', self.surrogate_config, None))
        for const in self.problem.constraints:
            fit_tasks.append(('gp_c', self.doe_sys, const.func.name, self.surrogate_config, const.func.name))
        for i, subsystem in enumerate(self.subsystems):
            fit_tasks.append(('gp_J_i', subsystem.doe_J_i, 'J_i', subsystem.surrogate_config, i))
        min_doe = min((t[1].x.shape[0] for t in fit_tasks))
        use_loky = len(fit_tasks) >= 3 and min_doe >= 10

        def _fit_one(task_idx, kind, doe, y_key, config, tag):
            task_config = copy.copy(config)
            task_config.seed = config.seed + task_idx
            return (kind, tag, build_surrogate(doe, y_key=y_key, config=task_config))
        from joblib import Parallel, delayed
        if use_loky:
            fit_results = Parallel(n_jobs=-1, backend='loky')((delayed(_fit_one)(i, *task) for i, task in enumerate(fit_tasks)))
        else:
            fit_results = [_fit_one(i, *task) for i, task in enumerate(fit_tasks)]
        for kind, tag, gp in fit_results:
            if kind == 'gp_f':
                self.gp_f = gp
            elif kind == 'gp_c':
                self.gp_c[tag] = gp
            elif kind == 'gp_J_i':
                self.subsystems[tag].gp_J_i = gp
        nz = len(self.z)
        nx = len(self.x_bar)
        _nsvars = nz + nx + len(self.y_bar)
        _subsystem_masks = []
        for subsystem in self.subsystems:
            coupled_flat = []
            for coupled_idx_array in subsystem.y_coupled_idxs:
                for idx in coupled_idx_array:
                    coupled_flat.append(idx)
            coupled_flat = np.array(coupled_flat, dtype=int) if coupled_flat else np.array([], dtype=int)
            parts = [np.arange(nz)]
            if len(subsystem.x_idxs) > 0:
                parts.append(nz + subsystem.x_idxs)
            parts.append(nz + nx + subsystem.y_idxs)
            if len(coupled_flat) > 0:
                parts.append(nz + nx + coupled_flat)
            src_indices = np.concatenate(parts)
            n_dynamic = len(src_indices)
            static_part = np.concatenate([subsystem.z_under_i, subsystem.x_under_i])
            buffer_template = np.empty(n_dynamic + len(static_part))
            buffer_template[n_dynamic:] = static_part
            _subsystem_masks.append({'src_indices': src_indices, 'n_dynamic': n_dynamic, 'buffer_template': buffer_template})
        if np.isinf(f_min):
            f_min = self.doe_sys.get_f_min('obj', self.problem.tol)
        bounds = self.problem.bounds
        lhs_starts = LHS(xlimits=bounds, seed=42)(n_multistart - 1)
        x0_default = np.concatenate([self.z, self.x_bar, self.y_bar])
        start_points = [x0_default] + [lhs_starts[i] for i in range(n_multistart - 1)]
        if isinstance(optimizer, str):
            opt = get_optimizer(optimizer)
        else:
            opt = optimizer
        gp_J_i_subsystems = [sub.gp_J_i for sub in self.subsystems]
        if n_multistart >= 4:
            start_results = Parallel(n_jobs=-1, backend='loky')((delayed(_run_one_start_system)(x0, opt, acq_func, bounds, self.gp_f, self.gp_c, gp_J_i_subsystems, _subsystem_masks, self.problem.constraints, epsilon_J, f_min, _nsvars) for x0 in start_points))
        else:
            start_results = [_run_one_start_system(x0, opt, acq_func, bounds, self.gp_f, self.gp_c, gp_J_i_subsystems, _subsystem_masks, self.problem.constraints, epsilon_J, f_min, _nsvars) for x0 in start_points]
        best_result = None
        best_acq_val = np.inf
        for result_x, acq_val in start_results:
            if result_x is not None and acq_val < best_acq_val:
                best_acq_val = acq_val
                best_result = result_x
        if best_result is None:
            warnings.warn(f'All {len(start_points)} multi-start acquisition attempts failed. Falling back to x0_default. Check optimizer and constraint configuration.', UserWarning, stacklevel=2)
            best_result = x0_default
        best_result = clip_to_bounds(best_result, bounds)
        nz = len(self.z)
        nx = len(self.x_bar)
        self.z = best_result[:nz]
        self.x_bar = best_result[nz:nz + nx]
        self.y_bar = best_result[nz + nx:]
        f_val, c_vals = self.problem.evaluate(best_result, f=True, c=True)
        c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
        h_total = self.problem.compute_constraint_violation(c_vals_dict)
        y_dict = {'obj': f_val}
        for i, const in enumerate(self.problem.constraints):
            y_dict[const.func.name] = c_vals[i]
        self.doe_sys.update_DoE(best_result, y_dict, h_total)
        z_snap = self.z
        x_bar_snap = self.x_bar
        y_bar_snap = self.y_bar

        def _compute_one_J_i(subsystem):
            x_bar_i = x_bar_snap[subsystem.x_idxs] if len(subsystem.x_idxs) > 0 else np.array([])
            y_bar_i = y_bar_snap[subsystem.y_idxs]
            y_bar_coupled = []
            for coupled_idx_array in subsystem.y_coupled_idxs:
                for idx in coupled_idx_array:
                    y_bar_coupled.append(y_bar_snap[idx])
            y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
            J_i_val, y_i = compute_Ji(subsystem.problem.objective.func, z_snap[subsystem.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, subsystem.z_under_i, subsystem.x_under_i)
            x_J_i_new = np.concatenate([z_snap, x_bar_i, y_bar_i, y_bar_coupled, subsystem.z_under_i, subsystem.x_under_i])
            return (J_i_val, x_J_i_new, y_i)
        J_total = 0.0
        for subsystem in self.subsystems:
            J_i_val, x_J_i_new, y_i = _compute_one_J_i(subsystem)
            J_total += J_i_val
            yi_val = y_i.item() if y_i.size == 1 else y_i
            subsystem.doe_J_i.update_DoE(x_J_i_new, {'J_i': J_i_val, 'y_i': yi_val})
        elapsed = time.time() - _start_time
        return (self.z, self.x_bar, self.y_bar, elapsed, J_total)

    def _describe_setup(self):
        return '\n'.join([f'problem: {self.problem}', f'subsystems: {len(self.subsystems)}', f'surrogate_config: {self.surrogate_config}'])

build_surrogate_system()

Build GPs for system objective and constraints

Source code in src/mdotoolbox/frameworks/baco.py
369
370
371
372
373
def build_surrogate_system(self):
    """Build GPs for system objective and constraints"""
    self.gp_f = build_surrogate(self.doe_sys, y_key='obj', config=self.surrogate_config)
    for const in self.problem.constraints:
        self.gp_c[const.func.name] = build_surrogate(self.doe_sys, y_key=const.func.name, config=self.surrogate_config)

initialize_doe(n_samples)

Initialize system DoE

Source code in src/mdotoolbox/frameworks/baco.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def initialize_doe(self, n_samples):
    """Initialize system DoE"""
    if self.z is not None and self.x_bar is not None and (self.y_bar is not None):
        x0 = np.concatenate([self.z, self.x_bar, self.y_bar])
        f0, c0 = self.problem.evaluate(x0, f=True, c=True)
        c0_dict = {const.func.name: c0[i] for i, const in enumerate(self.problem.constraints)}
        h0 = self.problem.compute_constraint_violation(c0_dict)
        doe_rest = self.problem.initial_DoE(n_samples)
        x_all = np.vstack([x0, doe_rest.x])
        y_all = {'obj': np.concatenate([[f0], doe_rest.y['obj']])}
        for key in c0_dict:
            y_all[key] = np.concatenate([[c0_dict[key]], doe_rest.y[key]])
        h_all = np.concatenate([[h0], doe_rest.constraint_violation])
        self.doe_sys = DoE(x_all, y_all, h_all)
    else:
        self.doe_sys = self.problem.initial_DoE(n_samples)
    sample = self.doe_sys.x[0]
    nz = len(np.unique(np.concatenate([sub.z_idxs for sub in self.subsystems])))
    nx = sum((len(sub.x_idxs) for sub in self.subsystems))
    self.z = sample[:nz]
    self.x_bar = sample[nz:nz + nx]
    self.y_bar = sample[nz + nx:]

solve_acquisition(optimizer, acq_func, f_min, n_multistart=10, epsilon_J=1.0)

Solve system: maximize alpha_f subject to mu_c >= 0, mu_J_i <= epsilon_J

Source code in src/mdotoolbox/frameworks/baco.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def solve_acquisition(self, optimizer: Union[str, Callable], acq_func: Callable, f_min: float, n_multistart: int=10, epsilon_J: float=1.0):
    """Solve system: maximize alpha_f subject to mu_c >= 0, mu_J_i <= epsilon_J"""
    _start_time = time.time()
    fit_tasks = []
    fit_tasks.append(('gp_f', self.doe_sys, 'obj', self.surrogate_config, None))
    for const in self.problem.constraints:
        fit_tasks.append(('gp_c', self.doe_sys, const.func.name, self.surrogate_config, const.func.name))
    for i, subsystem in enumerate(self.subsystems):
        fit_tasks.append(('gp_J_i', subsystem.doe_J_i, 'J_i', subsystem.surrogate_config, i))
    min_doe = min((t[1].x.shape[0] for t in fit_tasks))
    use_loky = len(fit_tasks) >= 3 and min_doe >= 10

    def _fit_one(task_idx, kind, doe, y_key, config, tag):
        task_config = copy.copy(config)
        task_config.seed = config.seed + task_idx
        return (kind, tag, build_surrogate(doe, y_key=y_key, config=task_config))
    from joblib import Parallel, delayed
    if use_loky:
        fit_results = Parallel(n_jobs=-1, backend='loky')((delayed(_fit_one)(i, *task) for i, task in enumerate(fit_tasks)))
    else:
        fit_results = [_fit_one(i, *task) for i, task in enumerate(fit_tasks)]
    for kind, tag, gp in fit_results:
        if kind == 'gp_f':
            self.gp_f = gp
        elif kind == 'gp_c':
            self.gp_c[tag] = gp
        elif kind == 'gp_J_i':
            self.subsystems[tag].gp_J_i = gp
    nz = len(self.z)
    nx = len(self.x_bar)
    _nsvars = nz + nx + len(self.y_bar)
    _subsystem_masks = []
    for subsystem in self.subsystems:
        coupled_flat = []
        for coupled_idx_array in subsystem.y_coupled_idxs:
            for idx in coupled_idx_array:
                coupled_flat.append(idx)
        coupled_flat = np.array(coupled_flat, dtype=int) if coupled_flat else np.array([], dtype=int)
        parts = [np.arange(nz)]
        if len(subsystem.x_idxs) > 0:
            parts.append(nz + subsystem.x_idxs)
        parts.append(nz + nx + subsystem.y_idxs)
        if len(coupled_flat) > 0:
            parts.append(nz + nx + coupled_flat)
        src_indices = np.concatenate(parts)
        n_dynamic = len(src_indices)
        static_part = np.concatenate([subsystem.z_under_i, subsystem.x_under_i])
        buffer_template = np.empty(n_dynamic + len(static_part))
        buffer_template[n_dynamic:] = static_part
        _subsystem_masks.append({'src_indices': src_indices, 'n_dynamic': n_dynamic, 'buffer_template': buffer_template})
    if np.isinf(f_min):
        f_min = self.doe_sys.get_f_min('obj', self.problem.tol)
    bounds = self.problem.bounds
    lhs_starts = LHS(xlimits=bounds, seed=42)(n_multistart - 1)
    x0_default = np.concatenate([self.z, self.x_bar, self.y_bar])
    start_points = [x0_default] + [lhs_starts[i] for i in range(n_multistart - 1)]
    if isinstance(optimizer, str):
        opt = get_optimizer(optimizer)
    else:
        opt = optimizer
    gp_J_i_subsystems = [sub.gp_J_i for sub in self.subsystems]
    if n_multistart >= 4:
        start_results = Parallel(n_jobs=-1, backend='loky')((delayed(_run_one_start_system)(x0, opt, acq_func, bounds, self.gp_f, self.gp_c, gp_J_i_subsystems, _subsystem_masks, self.problem.constraints, epsilon_J, f_min, _nsvars) for x0 in start_points))
    else:
        start_results = [_run_one_start_system(x0, opt, acq_func, bounds, self.gp_f, self.gp_c, gp_J_i_subsystems, _subsystem_masks, self.problem.constraints, epsilon_J, f_min, _nsvars) for x0 in start_points]
    best_result = None
    best_acq_val = np.inf
    for result_x, acq_val in start_results:
        if result_x is not None and acq_val < best_acq_val:
            best_acq_val = acq_val
            best_result = result_x
    if best_result is None:
        warnings.warn(f'All {len(start_points)} multi-start acquisition attempts failed. Falling back to x0_default. Check optimizer and constraint configuration.', UserWarning, stacklevel=2)
        best_result = x0_default
    best_result = clip_to_bounds(best_result, bounds)
    nz = len(self.z)
    nx = len(self.x_bar)
    self.z = best_result[:nz]
    self.x_bar = best_result[nz:nz + nx]
    self.y_bar = best_result[nz + nx:]
    f_val, c_vals = self.problem.evaluate(best_result, f=True, c=True)
    c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
    h_total = self.problem.compute_constraint_violation(c_vals_dict)
    y_dict = {'obj': f_val}
    for i, const in enumerate(self.problem.constraints):
        y_dict[const.func.name] = c_vals[i]
    self.doe_sys.update_DoE(best_result, y_dict, h_total)
    z_snap = self.z
    x_bar_snap = self.x_bar
    y_bar_snap = self.y_bar

    def _compute_one_J_i(subsystem):
        x_bar_i = x_bar_snap[subsystem.x_idxs] if len(subsystem.x_idxs) > 0 else np.array([])
        y_bar_i = y_bar_snap[subsystem.y_idxs]
        y_bar_coupled = []
        for coupled_idx_array in subsystem.y_coupled_idxs:
            for idx in coupled_idx_array:
                y_bar_coupled.append(y_bar_snap[idx])
        y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
        J_i_val, y_i = compute_Ji(subsystem.problem.objective.func, z_snap[subsystem.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, subsystem.z_under_i, subsystem.x_under_i)
        x_J_i_new = np.concatenate([z_snap, x_bar_i, y_bar_i, y_bar_coupled, subsystem.z_under_i, subsystem.x_under_i])
        return (J_i_val, x_J_i_new, y_i)
    J_total = 0.0
    for subsystem in self.subsystems:
        J_i_val, x_J_i_new, y_i = _compute_one_J_i(subsystem)
        J_total += J_i_val
        yi_val = y_i.item() if y_i.size == 1 else y_i
        subsystem.doe_J_i.update_DoE(x_J_i_new, {'J_i': J_i_val, 'y_i': yi_val})
    elapsed = time.time() - _start_time
    return (self.z, self.x_bar, self.y_bar, elapsed, J_total)

BayesianCollaborativeOptimization dataclass

Bases: BaseSolver

BACO solver

Source code in src/mdotoolbox/frameworks/baco.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
@dataclass
class BayesianCollaborativeOptimization(BaseSolver):
    """BACO solver"""
    system: BACOSystem
    subsystem_optimizer: Union[str, Callable]
    system_optimizer: Union[str, Callable]
    n_initial: Union[int, Callable, None] = None
    acq_func: Union[Callable, None] = None
    n_multistart: int = 10
    solver: str = 'BACO'

    def __post_init__(self):
        super().__post_init__()
        if self.n_initial is None:
            self.n_initial = 2 * len(np.array([self.system.problem.objective.x]).ravel())
        if self.acq_func is None:
            from ..surrogates import log_ei
            self.acq_func = log_ei
        self.setup_description = [80 * '=', 'BACO Optimization Setup:', 80 * '=', 'n_initial:' + (f'{self.n_initial}' if isinstance(self.n_initial, int) else inspect.getsource(self.n_initial).strip()), f'epsilon_J: {self.epsilon_J}', f'epsilon_h: {self.epsilon_h}', f'max_eval: {self.budget.total_budget}', f'acq_func: {self.acq_func}', f'n_multistart: {self.n_multistart}', f'name: {self.name}', f'solver: {self.solver}', '', 80 * '=', 'SYSTEM SETUP', 80 * '=', f'system_optimizer: {self.system_optimizer}', self.system._describe_setup()]
        for i, subsystem in enumerate(self.system.subsystems, 1):
            self.setup_description += ['', 80 * '=', f'SUBSYSTEM {i:02d} SETUP', 80 * '=', f'subsystem_optimizer: {self.subsystem_optimizer}', subsystem._describe_setup()]

    def _custom_initialize(self):
        """Perform BACO-specific initialization (DoE generation)."""
        if isinstance(self.n_initial, Callable):
            n_init = self.n_initial(len(self.system.z) + len(self.system.x_bar) + len(self.system.y_bar))
        else:
            n_init = self.n_initial
        self.system.initialize_doe(n_init)
        for subsystem in self.system.subsystems:
            subsystem.initialize_doe(self.n_initial, self.system.z, self.system.x_bar, self.system.y_bar)

    def iterate(self):
        """Execute one BACO iteration"""
        J_stars = []
        h_total = 0.0
        z_bar = self.system.z
        x_bar = self.system.x_bar
        y_bar = self.system.y_bar
        gp_fit_tasks = []
        task_map = []
        for idx, subsystem in enumerate(self.system.subsystems):
            gp_fit_tasks.append((subsystem.doe_J_i, 'J_i', subsystem.surrogate_config))
            task_map.append((idx, 'J_i', None))
            for const in subsystem.problem.constraints:
                gp_fit_tasks.append((subsystem.doe_g_i, const.func.name, subsystem.surrogate_config))
                task_map.append((idx, 'g_i', const.func.name))
        min_doe = min((t[0].x.shape[0] for t in gp_fit_tasks))
        use_loky = len(gp_fit_tasks) >= 3 and min_doe >= 10
        from joblib import Parallel, delayed
        if use_loky:
            gp_results = Parallel(n_jobs=-1, backend='loky')((delayed(build_surrogate)(doe, key, copy.copy(cfg)) for doe, key, cfg in gp_fit_tasks))
        else:
            gp_results = [build_surrogate(doe, key, cfg) for doe, key, cfg in gp_fit_tasks]
        for (sub_idx, kind, cname), gp in zip(task_map, gp_results):
            subsystem = self.system.subsystems[sub_idx]
            if kind == 'J_i':
                subsystem.gp_J_i = gp
            else:
                subsystem.gp_g_i[cname] = gp
        results = []
        for idx, subsystem in enumerate(self.system.subsystems):
            z_ss, x_ss, y_ss, J_i, h_i, elap = subsystem.solve_acquisition(z_bar, x_bar, y_bar, self.subsystem_optimizer, self.acq_func, n_multistart=self.n_multistart)
            results.append((idx, z_ss, x_ss, y_ss, J_i, h_i, elap))
        for idx, z_ss, x_ss, y_ss, J_i, h_i, elap in results:
            J_stars.append(J_i)
            h_total += h_i
            self._eval += 1
            self._history['iter'].append(self._iter)
            self._history['eval'].append(self._eval)
            self._history['tss'].append(elap + self._history['tss'][-1] if self._history['tss'] else elap)
            self._history['improved'].append(False)
            self._history['level'].append(f'Subsystem {idx + 1:02d}')
            self._history['z'].append(z_ss.copy())
            self._history['x'].append(x_ss.copy())
            self._history['y'].append(y_ss.copy())
            for key in self._history:
                if key in ['iter', 'eval', 'tss', 'improved', 'level', 'z', 'x', 'y']:
                    continue
                elif key == f'J{idx + 1:02d}':
                    self._history[key].append(J_i)
                elif key == f'h{idx + 1:02d}':
                    self._history[key].append(h_i)
                else:
                    self._history[key].append(np.nan)
        _, _, _, elap, J_total = self.system.solve_acquisition(self.system_optimizer, self.acq_func, self._best_results.f, n_multistart=self.n_multistart, epsilon_J=self.epsilon_J)
        h_sys = self.system.doe_sys.constraint_violation[-1]
        h_total += h_sys
        f_sys = self.system.doe_sys.y['obj'][-1]
        converge = J_total <= self.epsilon_J and h_total <= self.epsilon_h
        self._eval += len(self.system.subsystems)
        self._history['iter'].append(self._iter)
        self._history['eval'].append(self._eval)
        self._history['tss'].append(self._history['tss'][-1] + elap if self._history['tss'] else elap)
        self._history['level'].append('System')
        self._history['f_sys'].append(f_sys)
        self._history['h_total'].append(h_total)
        self._history['J_total'].append(J_total)
        self._history['z'].append(self.system.z.copy())
        self._history['x'].append(self.system.x_bar.copy())
        self._history['y'].append(self.system.y_bar.copy())
        for idx, _ in enumerate(self.system.subsystems):
            self._history[f'h{idx + 1:02d}'].append(np.nan)
            self._history[f'J{idx + 1:02d}'].append(np.nan)
        if converge:
            self._converged = True

    def solve(self, z0, x_bar0, y_bar0, z_hat0, x0):
        """Execute BACO"""
        setup_log = list(self.setup_description) if isinstance(self.setup_description, list) else [self.setup_description]
        setup_log += ['', 80 * '=', 'INITIAL SETUP', 80 * '=', 'z0: ' + np.array2string(z0), 'x_bar0: ' + np.array2string(x_bar0), 'y_bar0: ' + np.array2string(y_bar0), 'z_hat0: ' + np.array2string(z_hat0), 'x0: ' + np.array2string(x0)]
        self.initialize(z0, x_bar0, y_bar0, z_hat0, x0)
        import pandas as pd
        setup_text = '\n'.join(setup_log)
        with open(self.cache_dir / 'setup.txt', 'w') as f:
            f.write(setup_text)
        self._converged = False
        self._iter = 0
        self._eval = len(self.system.doe_sys.x) * len(self.system.subsystems) + sum((len(sub.doe_J_i.x) for sub in self.system.subsystems))
        nz = len(self.system.z)
        nx = len(self.system.x_bar)
        doe = self.system.doe_sys
        cv = doe.constraint_violation
        eval_counter = 0
        for i in range(len(doe.x)):
            sample = doe.x[i]
            f_i = float(doe.y['obj'][i])
            h_i = float(cv[i]) if cv is not None else 0.0
            z_i = sample[:nz].copy()
            x_under_i_all = sample[nz:nz + nx].copy()
            y_i_all = sample[nz + nx:].copy()
            J_total = 0.0
            for idx, subsystem in enumerate(self.system.subsystems):
                x_bar_i_sample = x_under_i_all[subsystem.x_idxs] if len(subsystem.x_idxs) > 0 else np.array([])
                y_bar_i_sample = y_i_all[subsystem.y_idxs]
                y_bar_coupled_sample = []
                for coupled_idx_array in subsystem.y_coupled_idxs:
                    for idx_c in coupled_idx_array:
                        y_bar_coupled_sample.append(y_i_all[idx_c])
                y_bar_coupled_sample = np.array(y_bar_coupled_sample) if y_bar_coupled_sample else np.array([])
                J_i_val, _ = compute_Ji(subsystem.problem.objective.func, z_i[subsystem.z_idxs], x_bar_i_sample, y_bar_i_sample, y_bar_coupled_sample, subsystem.z_under_i, subsystem.x_under_i)
                J_total += J_i_val
                eval_counter += 1
            self._pareto_archive = update_pareto(self._pareto_archive, f=f_i, h=h_i, J=J_total, z=z_i, x=x_under_i_all, y=y_i_all)
            self._history['iter'].append(0)
            self._history['eval'].append(eval_counter)
            self._history['tss'].append(0.0)
            self._history['improved'].append(False)
            self._history['level'].append('System-DoE')
            self._history['f_sys'].append(f_i)
            self._history['h_total'].append(h_i)
            self._history['J_total'].append(J_total)
            self._history['z'].append(z_i)
            self._history['x'].append(x_under_i_all)
            self._history['y'].append(y_i_all)
            for jdx, _ in enumerate(self.system.subsystems):
                self._history[f'h{jdx + 1:02d}'].append(np.nan)
                self._history[f'J{jdx + 1:02d}'].append(np.nan)
        for idx, subsystem in enumerate(self.system.subsystems):
            J_i_vals = subsystem.doe_J_i.y['J_i']
            yi_vals = subsystem.doe_J_i.y['y_i']
            hi_vals = subsystem.doe_J_i.constraint_violation
            nz_bar = len(self.system.z)
            nx_bar_i = len(subsystem.x_idxs)
            ny_bar_i = len(subsystem.y_idxs)
            ny_coupled = sum((len(c) for c in subsystem.y_coupled_idxs))
            offset = nz_bar + nx_bar_i + ny_bar_i + ny_coupled
            nz_ss = len(subsystem.z_idxs)
            nx_ss = len(subsystem.x_idxs)
            for j in range(len(subsystem.doe_J_i.x)):
                sample_ss = subsystem.doe_J_i.x[j]
                z_under_i_sample = sample_ss[offset:offset + nz_ss]
                x_under_i_sample = sample_ss[offset + nz_ss:offset + nz_ss + nx_ss]
                eval_counter += 1
                J_i_j = float(J_i_vals[j])
                yi_j = np.array(yi_vals[j])
                hi_j = float(hi_vals[j]) if hi_vals is not None else np.nan
                self._history['iter'].append(0)
                self._history['eval'].append(eval_counter)
                self._history['tss'].append(0.0)
                self._history['improved'].append(False)
                self._history['level'].append(f'Subsystem{idx + 1:02d}-DoE')
                self._history['f_sys'].append(np.nan)
                self._history['h_total'].append(np.nan)
                self._history['J_total'].append(np.nan)
                self._history['z'].append(z_under_i_sample)
                self._history['x'].append(x_under_i_sample)
                self._history['y'].append(yi_j)
                for jdx, _ in enumerate(self.system.subsystems):
                    if jdx == idx:
                        self._history[f'h{jdx + 1:02d}'].append(hi_j)
                        self._history[f'J{jdx + 1:02d}'].append(J_i_j)
                    else:
                        self._history[f'h{jdx + 1:02d}'].append(np.nan)
                        self._history[f'J{jdx + 1:02d}'].append(np.nan)
        self._eval = eval_counter
        if self._eval >= self.budget.total_budget:
            iterend = time.time() - self._start_time
            print(f'DoE evaluations ({self._eval}) >= budget ({self.budget.total_budget}). Returning best DoE solution.')
            return Results(best=self._best_results, converged=False, iterations=0, evaluations=self._eval, elapsed_time=iterend, history=pd.DataFrame(self._history), pareto=pareto_archive_to_df(self._pareto_archive))
        J_total = np.inf
        h_total = np.inf
        iterend = 0.0
        pd.DataFrame(self._history).to_csv(self.cache_dir / 'DoE.csv', index=False)
        while self._eval < self.budget.total_budget:
            self._iter += 1
            iterstart = time.time() - self._start_time
            self.iterate()
            iterend = time.time() - self._start_time
            iterdur = iterend - iterstart
            f_sys = self._history['f_sys'][-1]
            J_total = self._history['J_total'][-1]
            h_total = self._history['h_total'][-1]
            improved = self._best_results.update(z_new=self.system.z, x_new=self.system.x_bar, y_new=self.system.y_bar, f_new=f_sys, h_new=h_total, J_new=J_total, epsilon_J=self.epsilon_J, epsilon_h=self.epsilon_h)
            self._history['improved'].append(improved)
            pd.DataFrame(self._history).to_pickle(self.cache_dir / 'state.pkl')
            self._pareto_archive = update_pareto(self._pareto_archive, f=f_sys, h=h_total, J=J_total, z=self.system.z.copy(), x=self.system.x_bar.copy(), y=self.system.y_bar.copy())
            if improved:
                improved_str = f'{TextColor.grn}Y{TextColor.clr}'
            else:
                improved_str = f'{TextColor.red}N{TextColor.clr}'
            if h_total < 1:
                h_str = f'{TextColor.grn}{format_vars(h_total, 3)}{TextColor.clr}'
            else:
                h_str = f'{TextColor.red}{format_vars(h_total, 3)}{TextColor.clr}'
            if J_total < 1:
                J_str = f'{TextColor.grn}{format_vars(J_total, 3)}{TextColor.clr}'
            else:
                J_str = f'{TextColor.red}{format_vars(J_total, 3)}{TextColor.clr}'
            print(f'iter={self._iter:>{len(str(self.budget.total_budget))}} |', f'eval={self._eval:>{len(str(self.budget.total_budget))}} /', f'{self.budget.total_budget:>{len(str(self.budget.total_budget))}} |', f'Improved={improved_str} |', f'h_total={h_str} |', f'J_total={J_str} |', f'f={format_vars(f_sys, 3)} |', f'TSince={iterend:>8.2f}s |', f'IterDur={iterdur:>7.3f}s')
            if self._converged:
                print('\nConvergence criteria met.')
                self._history['epsilon_J'] = [self.epsilon_J] * len(self._history['iter'])
                self._history['epsilon_h'] = [self.epsilon_h] * len(self._history['iter'])
                break
        history_df = pd.DataFrame(self._history)
        history_df['identifier'] = self.name
        history_df['solver'] = self.solver
        history_df['epsilon_J'] = self.epsilon_J
        history_df['epsilon_h'] = self.epsilon_h
        pareto_df = pareto_archive_to_df(self._pareto_archive)
        pareto_df['identifier'] = self.name
        pareto_df['solver'] = self.solver
        pareto_df['epsilon_J'] = self.epsilon_J
        pareto_df['epsilon_h'] = self.epsilon_h
        return Results(best=self._best_results, converged=self._converged, iterations=self._iter, evaluations=self._eval, elapsed_time=iterend, history=history_df, pareto=pareto_df)

iterate()

Execute one BACO iteration

Source code in src/mdotoolbox/frameworks/baco.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
def iterate(self):
    """Execute one BACO iteration"""
    J_stars = []
    h_total = 0.0
    z_bar = self.system.z
    x_bar = self.system.x_bar
    y_bar = self.system.y_bar
    gp_fit_tasks = []
    task_map = []
    for idx, subsystem in enumerate(self.system.subsystems):
        gp_fit_tasks.append((subsystem.doe_J_i, 'J_i', subsystem.surrogate_config))
        task_map.append((idx, 'J_i', None))
        for const in subsystem.problem.constraints:
            gp_fit_tasks.append((subsystem.doe_g_i, const.func.name, subsystem.surrogate_config))
            task_map.append((idx, 'g_i', const.func.name))
    min_doe = min((t[0].x.shape[0] for t in gp_fit_tasks))
    use_loky = len(gp_fit_tasks) >= 3 and min_doe >= 10
    from joblib import Parallel, delayed
    if use_loky:
        gp_results = Parallel(n_jobs=-1, backend='loky')((delayed(build_surrogate)(doe, key, copy.copy(cfg)) for doe, key, cfg in gp_fit_tasks))
    else:
        gp_results = [build_surrogate(doe, key, cfg) for doe, key, cfg in gp_fit_tasks]
    for (sub_idx, kind, cname), gp in zip(task_map, gp_results):
        subsystem = self.system.subsystems[sub_idx]
        if kind == 'J_i':
            subsystem.gp_J_i = gp
        else:
            subsystem.gp_g_i[cname] = gp
    results = []
    for idx, subsystem in enumerate(self.system.subsystems):
        z_ss, x_ss, y_ss, J_i, h_i, elap = subsystem.solve_acquisition(z_bar, x_bar, y_bar, self.subsystem_optimizer, self.acq_func, n_multistart=self.n_multistart)
        results.append((idx, z_ss, x_ss, y_ss, J_i, h_i, elap))
    for idx, z_ss, x_ss, y_ss, J_i, h_i, elap in results:
        J_stars.append(J_i)
        h_total += h_i
        self._eval += 1
        self._history['iter'].append(self._iter)
        self._history['eval'].append(self._eval)
        self._history['tss'].append(elap + self._history['tss'][-1] if self._history['tss'] else elap)
        self._history['improved'].append(False)
        self._history['level'].append(f'Subsystem {idx + 1:02d}')
        self._history['z'].append(z_ss.copy())
        self._history['x'].append(x_ss.copy())
        self._history['y'].append(y_ss.copy())
        for key in self._history:
            if key in ['iter', 'eval', 'tss', 'improved', 'level', 'z', 'x', 'y']:
                continue
            elif key == f'J{idx + 1:02d}':
                self._history[key].append(J_i)
            elif key == f'h{idx + 1:02d}':
                self._history[key].append(h_i)
            else:
                self._history[key].append(np.nan)
    _, _, _, elap, J_total = self.system.solve_acquisition(self.system_optimizer, self.acq_func, self._best_results.f, n_multistart=self.n_multistart, epsilon_J=self.epsilon_J)
    h_sys = self.system.doe_sys.constraint_violation[-1]
    h_total += h_sys
    f_sys = self.system.doe_sys.y['obj'][-1]
    converge = J_total <= self.epsilon_J and h_total <= self.epsilon_h
    self._eval += len(self.system.subsystems)
    self._history['iter'].append(self._iter)
    self._history['eval'].append(self._eval)
    self._history['tss'].append(self._history['tss'][-1] + elap if self._history['tss'] else elap)
    self._history['level'].append('System')
    self._history['f_sys'].append(f_sys)
    self._history['h_total'].append(h_total)
    self._history['J_total'].append(J_total)
    self._history['z'].append(self.system.z.copy())
    self._history['x'].append(self.system.x_bar.copy())
    self._history['y'].append(self.system.y_bar.copy())
    for idx, _ in enumerate(self.system.subsystems):
        self._history[f'h{idx + 1:02d}'].append(np.nan)
        self._history[f'J{idx + 1:02d}'].append(np.nan)
    if converge:
        self._converged = True

solve(z0, x_bar0, y_bar0, z_hat0, x0)

Execute BACO

Source code in src/mdotoolbox/frameworks/baco.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
def solve(self, z0, x_bar0, y_bar0, z_hat0, x0):
    """Execute BACO"""
    setup_log = list(self.setup_description) if isinstance(self.setup_description, list) else [self.setup_description]
    setup_log += ['', 80 * '=', 'INITIAL SETUP', 80 * '=', 'z0: ' + np.array2string(z0), 'x_bar0: ' + np.array2string(x_bar0), 'y_bar0: ' + np.array2string(y_bar0), 'z_hat0: ' + np.array2string(z_hat0), 'x0: ' + np.array2string(x0)]
    self.initialize(z0, x_bar0, y_bar0, z_hat0, x0)
    import pandas as pd
    setup_text = '\n'.join(setup_log)
    with open(self.cache_dir / 'setup.txt', 'w') as f:
        f.write(setup_text)
    self._converged = False
    self._iter = 0
    self._eval = len(self.system.doe_sys.x) * len(self.system.subsystems) + sum((len(sub.doe_J_i.x) for sub in self.system.subsystems))
    nz = len(self.system.z)
    nx = len(self.system.x_bar)
    doe = self.system.doe_sys
    cv = doe.constraint_violation
    eval_counter = 0
    for i in range(len(doe.x)):
        sample = doe.x[i]
        f_i = float(doe.y['obj'][i])
        h_i = float(cv[i]) if cv is not None else 0.0
        z_i = sample[:nz].copy()
        x_under_i_all = sample[nz:nz + nx].copy()
        y_i_all = sample[nz + nx:].copy()
        J_total = 0.0
        for idx, subsystem in enumerate(self.system.subsystems):
            x_bar_i_sample = x_under_i_all[subsystem.x_idxs] if len(subsystem.x_idxs) > 0 else np.array([])
            y_bar_i_sample = y_i_all[subsystem.y_idxs]
            y_bar_coupled_sample = []
            for coupled_idx_array in subsystem.y_coupled_idxs:
                for idx_c in coupled_idx_array:
                    y_bar_coupled_sample.append(y_i_all[idx_c])
            y_bar_coupled_sample = np.array(y_bar_coupled_sample) if y_bar_coupled_sample else np.array([])
            J_i_val, _ = compute_Ji(subsystem.problem.objective.func, z_i[subsystem.z_idxs], x_bar_i_sample, y_bar_i_sample, y_bar_coupled_sample, subsystem.z_under_i, subsystem.x_under_i)
            J_total += J_i_val
            eval_counter += 1
        self._pareto_archive = update_pareto(self._pareto_archive, f=f_i, h=h_i, J=J_total, z=z_i, x=x_under_i_all, y=y_i_all)
        self._history['iter'].append(0)
        self._history['eval'].append(eval_counter)
        self._history['tss'].append(0.0)
        self._history['improved'].append(False)
        self._history['level'].append('System-DoE')
        self._history['f_sys'].append(f_i)
        self._history['h_total'].append(h_i)
        self._history['J_total'].append(J_total)
        self._history['z'].append(z_i)
        self._history['x'].append(x_under_i_all)
        self._history['y'].append(y_i_all)
        for jdx, _ in enumerate(self.system.subsystems):
            self._history[f'h{jdx + 1:02d}'].append(np.nan)
            self._history[f'J{jdx + 1:02d}'].append(np.nan)
    for idx, subsystem in enumerate(self.system.subsystems):
        J_i_vals = subsystem.doe_J_i.y['J_i']
        yi_vals = subsystem.doe_J_i.y['y_i']
        hi_vals = subsystem.doe_J_i.constraint_violation
        nz_bar = len(self.system.z)
        nx_bar_i = len(subsystem.x_idxs)
        ny_bar_i = len(subsystem.y_idxs)
        ny_coupled = sum((len(c) for c in subsystem.y_coupled_idxs))
        offset = nz_bar + nx_bar_i + ny_bar_i + ny_coupled
        nz_ss = len(subsystem.z_idxs)
        nx_ss = len(subsystem.x_idxs)
        for j in range(len(subsystem.doe_J_i.x)):
            sample_ss = subsystem.doe_J_i.x[j]
            z_under_i_sample = sample_ss[offset:offset + nz_ss]
            x_under_i_sample = sample_ss[offset + nz_ss:offset + nz_ss + nx_ss]
            eval_counter += 1
            J_i_j = float(J_i_vals[j])
            yi_j = np.array(yi_vals[j])
            hi_j = float(hi_vals[j]) if hi_vals is not None else np.nan
            self._history['iter'].append(0)
            self._history['eval'].append(eval_counter)
            self._history['tss'].append(0.0)
            self._history['improved'].append(False)
            self._history['level'].append(f'Subsystem{idx + 1:02d}-DoE')
            self._history['f_sys'].append(np.nan)
            self._history['h_total'].append(np.nan)
            self._history['J_total'].append(np.nan)
            self._history['z'].append(z_under_i_sample)
            self._history['x'].append(x_under_i_sample)
            self._history['y'].append(yi_j)
            for jdx, _ in enumerate(self.system.subsystems):
                if jdx == idx:
                    self._history[f'h{jdx + 1:02d}'].append(hi_j)
                    self._history[f'J{jdx + 1:02d}'].append(J_i_j)
                else:
                    self._history[f'h{jdx + 1:02d}'].append(np.nan)
                    self._history[f'J{jdx + 1:02d}'].append(np.nan)
    self._eval = eval_counter
    if self._eval >= self.budget.total_budget:
        iterend = time.time() - self._start_time
        print(f'DoE evaluations ({self._eval}) >= budget ({self.budget.total_budget}). Returning best DoE solution.')
        return Results(best=self._best_results, converged=False, iterations=0, evaluations=self._eval, elapsed_time=iterend, history=pd.DataFrame(self._history), pareto=pareto_archive_to_df(self._pareto_archive))
    J_total = np.inf
    h_total = np.inf
    iterend = 0.0
    pd.DataFrame(self._history).to_csv(self.cache_dir / 'DoE.csv', index=False)
    while self._eval < self.budget.total_budget:
        self._iter += 1
        iterstart = time.time() - self._start_time
        self.iterate()
        iterend = time.time() - self._start_time
        iterdur = iterend - iterstart
        f_sys = self._history['f_sys'][-1]
        J_total = self._history['J_total'][-1]
        h_total = self._history['h_total'][-1]
        improved = self._best_results.update(z_new=self.system.z, x_new=self.system.x_bar, y_new=self.system.y_bar, f_new=f_sys, h_new=h_total, J_new=J_total, epsilon_J=self.epsilon_J, epsilon_h=self.epsilon_h)
        self._history['improved'].append(improved)
        pd.DataFrame(self._history).to_pickle(self.cache_dir / 'state.pkl')
        self._pareto_archive = update_pareto(self._pareto_archive, f=f_sys, h=h_total, J=J_total, z=self.system.z.copy(), x=self.system.x_bar.copy(), y=self.system.y_bar.copy())
        if improved:
            improved_str = f'{TextColor.grn}Y{TextColor.clr}'
        else:
            improved_str = f'{TextColor.red}N{TextColor.clr}'
        if h_total < 1:
            h_str = f'{TextColor.grn}{format_vars(h_total, 3)}{TextColor.clr}'
        else:
            h_str = f'{TextColor.red}{format_vars(h_total, 3)}{TextColor.clr}'
        if J_total < 1:
            J_str = f'{TextColor.grn}{format_vars(J_total, 3)}{TextColor.clr}'
        else:
            J_str = f'{TextColor.red}{format_vars(J_total, 3)}{TextColor.clr}'
        print(f'iter={self._iter:>{len(str(self.budget.total_budget))}} |', f'eval={self._eval:>{len(str(self.budget.total_budget))}} /', f'{self.budget.total_budget:>{len(str(self.budget.total_budget))}} |', f'Improved={improved_str} |', f'h_total={h_str} |', f'J_total={J_str} |', f'f={format_vars(f_sys, 3)} |', f'TSince={iterend:>8.2f}s |', f'IterDur={iterdur:>7.3f}s')
        if self._converged:
            print('\nConvergence criteria met.')
            self._history['epsilon_J'] = [self.epsilon_J] * len(self._history['iter'])
            self._history['epsilon_h'] = [self.epsilon_h] * len(self._history['iter'])
            break
    history_df = pd.DataFrame(self._history)
    history_df['identifier'] = self.name
    history_df['solver'] = self.solver
    history_df['epsilon_J'] = self.epsilon_J
    history_df['epsilon_h'] = self.epsilon_h
    pareto_df = pareto_archive_to_df(self._pareto_archive)
    pareto_df['identifier'] = self.name
    pareto_df['solver'] = self.solver
    pareto_df['epsilon_J'] = self.epsilon_J
    pareto_df['epsilon_h'] = self.epsilon_h
    return Results(best=self._best_results, converged=self._converged, iterations=self._iter, evaluations=self._eval, elapsed_time=iterend, history=history_df, pareto=pareto_df)

src/mdotoolbox/frameworks/co.py

COSubsystem dataclass

Bases: BaseSubsystem

Subsystem for Collaborative Optimization framework.

Source code in src/mdotoolbox/frameworks/co.py
19
20
21
22
@dataclass
class COSubsystem(BaseSubsystem):
    """Subsystem for Collaborative Optimization framework."""
    pass

COSystem dataclass

Bases: BaseSystem

System-level coordinator for Collaborative Optimization.

Source code in src/mdotoolbox/frameworks/co.py
24
25
26
27
@dataclass
class COSystem(BaseSystem):
    """System-level coordinator for Collaborative Optimization."""
    pass

CollaborativeOptimization dataclass

Bases: BaseSolver

Main solver for Collaborative Optimization framework.

Source code in src/mdotoolbox/frameworks/co.py
29
30
31
32
@dataclass
class CollaborativeOptimization(BaseSolver):
    """Main solver for Collaborative Optimization framework."""
    pass

src/mdotoolbox/frameworks/eco.py

ECOSubsystem dataclass

Bases: BaseSubsystem

Subsystem for Enhanced Collaborative Optimization framework.

Source code in src/mdotoolbox/frameworks/eco.py
22
23
24
25
@dataclass
class ECOSubsystem(BaseSubsystem):
    """Subsystem for Enhanced Collaborative Optimization framework."""
    pass

ECOSystem dataclass

Bases: BaseSystem

System-level coordinator for Enhanced Collaborative Optimization.

Source code in src/mdotoolbox/frameworks/eco.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@dataclass
class ECOSystem(BaseSystem):
    """System-level coordinator for Enhanced Collaborative Optimization."""

    def _build_constraints(self, alpha=0.0, **kwargs):
        """Build constraints with relaxed consistency: J_i <= alpha."""
        nz = len(self.z_bar)
        nx = len(self.x_bar)

        def constraints_func(sys_vars):
            """System constraints including relaxed J_i <= alpha"""
            c_vals_ineq = []
            c_vals_eq = []
            z_bar_new = sys_vars[:nz]
            x_bar_new = sys_vars[nz:nz + nx]
            y_bar_new = sys_vars[nz + nx:]
            input_vars = np.concatenate([z_bar_new, x_bar_new, y_bar_new])
            for const in self.problem.constraints:
                if const.ctype == 'ge':
                    c_vals_ineq.append(const.func.func(*input_vars) - const.value)
                elif const.ctype == 'le':
                    c_vals_ineq.append(-const.func.func(*input_vars) + const.value)
                elif const.ctype == 'eq':
                    c_vals_eq.append(const.func.func(*input_vars) - const.value)
            for subsystem in self.subsystems:
                x_bar_i = x_bar_new[subsystem.x_idxs] if len(subsystem.x_idxs) > 0 else np.array([])
                y_bar_i = y_bar_new[subsystem.y_idxs]
                y_bar_coupled = []
                for coupled_idx_array in subsystem.y_bar_coupled_idxs:
                    for idx in coupled_idx_array:
                        y_bar_coupled.append(y_bar_new[idx])
                y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
                J_i_val, _ = compute_Ji(subsystem.problem.objective.func, z_bar_new[subsystem.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, subsystem.z_under_i, subsystem.x_under_i)
                c_vals_ineq.append(alpha - J_i_val)
            return (np.array(c_vals_ineq), np.array(c_vals_eq))
        return {'ineq': lambda x: constraints_func(x)[0], 'eq': lambda x: constraints_func(x)[1]}

EnhancedCollaborativeOptimization dataclass

Bases: BaseSolver

Enhanced Collaborative Optimization solver with relaxation parameter alpha.

Source code in src/mdotoolbox/frameworks/eco.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@dataclass
class EnhancedCollaborativeOptimization(BaseSolver):
    """Enhanced Collaborative Optimization solver with relaxation parameter alpha."""
    system: ECOSystem
    subsystem_optimizer: Union[str, Callable]
    system_optimizer: Union[str, Callable]
    epsilon_J: float = 1e-06
    epsilon_h: float = 1e-06
    budget: Union['BudgetManager', int] = 100
    max_iter: Union[int, None] = None
    max_eval: Union[int, None] = None
    alpha_initial: float = 10.0
    alpha_final: float = 1e-06
    alpha_schedule: str = 'geometric'
    solver: str = 'ECO'

    def _add_custom_history_columns(self):
        """Add alpha column to history tracking."""
        self._history['alpha'] = []

    def _custom_initialize(self):
        """Initialize alpha parameter."""
        self._alpha = self.alpha_initial

    def _pre_system_solve_update(self):
        """Update alpha parameter before each system solve."""
        self._update_alpha()

    def _get_system_solve_kwargs(self):
        """Pass alpha to system solve method."""
        return {'alpha': self._alpha}

    def _check_convergence(self, J_total: float, h_total: float=0.0) -> bool:
        """Check convergence with alpha-aware criterion."""
        return J_total <= self.epsilon_J and h_total <= self.epsilon_h and (J_total <= self._alpha)

    def _update_alpha(self):
        """Update relaxation parameter according to schedule."""
        if self.max_iter is None:
            ratio = 0.0
        else:
            ratio = min(self._iter / self.max_iter, 1.0)
        if self.alpha_schedule == 'linear':
            self._alpha = self.alpha_initial + (self.alpha_final - self.alpha_initial) * ratio
        elif self.alpha_schedule == 'exponential':
            self._alpha = self.alpha_initial * np.exp(ratio * np.log(self.alpha_final / self.alpha_initial))
        elif self.alpha_schedule == 'geometric':
            if self._iter == 0:
                self._alpha = self.alpha_initial
            else:
                decay_rate = (self.alpha_final / self.alpha_initial) ** (1.0 / (self.max_iter - 1) if self.max_iter and self.max_iter > 1 else 0.1)
                self._alpha = max(self._alpha * decay_rate, self.alpha_final)

    def _print_custom_budget_info(self):
        """Print alpha schedule configuration."""
        print(f'Alpha schedule: {self.alpha_schedule}')
        print(f'Alpha initial: {self.alpha_initial}')
        print(f'Alpha final: {self.alpha_final}')

    def _record_subsystem_custom_history(self, eval_data):
        """Record alpha for subsystem evaluations."""
        self._history['alpha'].append(self._alpha)

    def _record_system_custom_history(self, eval_data):
        """Record alpha for system evaluations."""
        self._history['alpha'].append(self._alpha)

src/mdotoolbox/frameworks/ico.py

ICOSubsystem dataclass

Bases: BaseSubsystem

Subsystem for Improved Collaborative Optimization.

Source code in src/mdotoolbox/frameworks/ico.py
23
24
25
26
@dataclass
class ICOSubsystem(BaseSubsystem):
    """Subsystem for Improved Collaborative Optimization."""
    pass

ICOSystem dataclass

Bases: BaseSystem

System-level coordinator for Improved Collaborative Optimization.

Source code in src/mdotoolbox/frameworks/ico.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@dataclass
class ICOSystem(BaseSystem):
    """System-level coordinator for Improved Collaborative Optimization."""

    def _build_objective(self, _, _eval_history, _best_result, system_iter, global_start_time, gamma=1.0, relaxation_radius=0.0, **kwargs):
        """Build system objective with penalty term."""
        nz = len(self.z_bar)
        nx = len(self.x_bar)

        def objective(sys_vars_inner):
            """System objective with ICO penalty"""
            nonlocal _eval_history, _best_result
            f_bar_val, c_vals = self.problem.evaluate(sys_vars_inner, f=True, c=True)
            c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
            h_total = self.problem.compute_constraint_violation(c_vals_dict)
            z_bar_eval = sys_vars_inner[:nz]
            x_bar_eval = sys_vars_inner[nz:nz + nx]
            y_bar_eval = sys_vars_inner[nz + nx:]
            penalty = 0.0
            for subsystem in self.subsystems:
                x_bar_i = x_bar_eval[subsystem.x_idxs] if len(subsystem.x_idxs) > 0 else np.array([])
                y_bar_i = y_bar_eval[subsystem.y_idxs]
                y_bar_coupled = []
                for coupled_idx_array in subsystem.y_bar_coupled_idxs:
                    for idx in coupled_idx_array:
                        y_bar_coupled.append(y_bar_eval[idx])
                y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
                J_i_val, _ = compute_Ji(subsystem.problem.objective.func, z_bar_eval[subsystem.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, subsystem.z_under_i, subsystem.x_under_i)
                penalty += gamma * abs(J_i_val - relaxation_radius ** 2)
            obj_val = f_bar_val + penalty
            if f_bar_val < _best_result['f_bar_val'] or (f_bar_val <= _best_result['f_bar_val'] and h_total < _best_result['h_total']):
                _best_result = {'f_bar_val': f_bar_val, 'h_total': h_total, 'z_bar': z_bar_eval.copy(), 'x_bar': x_bar_eval.copy(), 'y_bar': y_bar_eval.copy()}
            _eval_history.append({'system_iter': system_iter, 'time_since_start': time.time() - global_start_time, 'eval_type': 'objective', 'f_bar_val': f_bar_val, 'h_total': h_total, 'z_bar': z_bar_eval.copy(), 'x_bar': x_bar_eval.copy(), 'y_bar': y_bar_eval.copy()})
            return obj_val
        return objective

ImprovedCollaborativeOptimization dataclass

Bases: BaseSolver

Improved Collaborative Optimization solver with dynamic penalty.

Source code in src/mdotoolbox/frameworks/ico.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@dataclass
class ImprovedCollaborativeOptimization(BaseSolver):
    """Improved Collaborative Optimization solver with dynamic penalty."""
    system: ICOSystem
    subsystem_optimizer: Union[str, Callable]
    system_optimizer: Union[str, Callable]
    epsilon_J: float = 1e-06
    epsilon_h: float = 1e-06
    budget: Union['BudgetManager', int] = 100
    max_iter: Union[int, None] = None
    max_eval: Union[int, None] = None
    gamma: float = 1.0
    relaxation_radius: float = 0.0
    delta: float = 1.1
    solver: str = 'ICO'

    def _add_custom_history_columns(self):
        """Add gamma and radius columns to history tracking."""
        self._history['gamma'] = []
        self._history['radius'] = []

    def _custom_initialize(self):
        """Initialize penalty parameters."""
        self._gamma = self.gamma
        self._radius = self.relaxation_radius

    def _pre_system_solve_update(self):
        """Update penalty parameters before each system solve."""
        pass

    def _get_system_solve_kwargs(self):
        """Pass gamma and radius to system solve method."""
        return {'gamma': self._gamma, 'relaxation_radius': self._radius}

    def _print_custom_budget_info(self):
        """Print penalty configuration."""
        print(f'Gamma: {self.gamma}')
        print(f'Relaxation radius: {self.relaxation_radius}')
        print(f'Delta: {self.delta}')

    def _record_subsystem_custom_history(self, eval_data):
        """Record gamma and radius for subsystem evaluations."""
        self._history['gamma'].append(self._gamma)
        self._history['radius'].append(self._radius)

    def _record_system_custom_history(self, eval_data):
        """Record gamma and radius for system evaluations."""
        self._history['gamma'].append(self._gamma)
        self._history['radius'].append(self._radius)

src/mdotoolbox/frameworks/mco.py

MCOSubsystem dataclass

Bases: BaseSubsystem

Subsystem for Modified Collaborative Optimization.

Source code in src/mdotoolbox/frameworks/mco.py
24
25
26
27
@dataclass
class MCOSubsystem(BaseSubsystem):
    """Subsystem for Modified Collaborative Optimization."""
    pass

MCOSystem dataclass

Bases: BaseSystem

System-level coordinator for Modified Collaborative Optimization.

Source code in src/mdotoolbox/frameworks/mco.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@dataclass
class MCOSystem(BaseSystem):
    """System-level coordinator for Modified Collaborative Optimization."""

    def _build_objective(self, _, _eval_history, _best_result, system_iter, global_start_time, **kwargs):
        """Build system objective using averaged z_bar."""
        nx = len(self.x_bar)

        def objective(target_vars):
            """System objective with averaged z_bar"""
            nonlocal _eval_history, _best_result
            x_bar_new = target_vars[:nx]
            y_bar_new = target_vars[nx:]
            z_avg = np.zeros_like(self.z_bar)
            for subsystem in self.subsystems:
                z_avg[subsystem.z_idxs] += subsystem.z_under_i
            z_avg /= len(self.subsystems)
            sys_vars = np.concatenate([z_avg, x_bar_new, y_bar_new])
            f_bar_val, c_vals = self.problem.evaluate(sys_vars, f=True, c=True)
            c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
            h_total = self.problem.compute_constraint_violation(c_vals_dict)
            if f_bar_val < _best_result['f_bar_val'] or (f_bar_val <= _best_result['f_bar_val'] and h_total < _best_result['h_total']):
                _best_result = {'f_bar_val': f_bar_val, 'h_total': h_total, 'z_bar': z_avg.copy(), 'x_bar': x_bar_new.copy(), 'y_bar': y_bar_new.copy()}
            _eval_history.append({'system_iter': system_iter, 'time_since_start': time.time() - global_start_time, 'eval_type': 'objective', 'f_bar_val': f_bar_val, 'h_total': h_total, 'z_bar': z_avg.copy(), 'x_bar': x_bar_new.copy(), 'y_bar': y_bar_new.copy()})
            return f_bar_val
        return objective

    def _build_constraints(self, **kwargs):
        """Build constraints using averaged z_bar."""
        nx = len(self.x_bar)

        def constraints_func(target_vars):
            """System constraints with averaged z_bar"""
            c_vals_ineq = []
            c_vals_eq = []
            x_bar_new = target_vars[:nx]
            y_bar_new = target_vars[nx:]
            z_avg = np.zeros_like(self.z_bar)
            for subsystem in self.subsystems:
                z_avg[subsystem.z_idxs] += subsystem.z_under_i
            z_avg /= len(self.subsystems)
            sys_vars = np.concatenate([z_avg, x_bar_new, y_bar_new])
            for const in self.problem.constraints:
                if const.ctype == 'ge':
                    c_vals_ineq.append(const.func.func(*sys_vars) - const.value)
                elif const.ctype == 'le':
                    c_vals_ineq.append(-const.func.func(*sys_vars) + const.value)
                elif const.ctype == 'eq':
                    c_vals_eq.append(const.func.func(*sys_vars) - const.value)
            for subsystem in self.subsystems:
                x_bar_i = x_bar_new[subsystem.x_idxs] if len(subsystem.x_idxs) > 0 else np.array([])
                y_bar_i = y_bar_new[subsystem.y_idxs]
                y_bar_coupled = []
                for coupled_idx_array in subsystem.y_bar_coupled_idxs:
                    for idx in coupled_idx_array:
                        y_bar_coupled.append(y_bar_new[idx])
                y_bar_coupled = np.array(y_bar_coupled) if y_bar_coupled else np.array([])
                J_i_val, _ = compute_Ji(subsystem.problem.objective.func, z_avg[subsystem.z_idxs], x_bar_i, y_bar_i, y_bar_coupled, subsystem.z_under_i, subsystem.x_under_i)
                c_vals_eq.append(J_i_val)
            return (np.array(c_vals_ineq), np.array(c_vals_eq))
        return {'ineq': lambda x: constraints_func(x)[0], 'eq': lambda x: constraints_func(x)[1]}

    def solve(self, optimizer: Union[str, Callable], system_iter: int, global_start_time: float, maxiter: int=None, **kwargs):
        """Solve MCO system problem - optimizes only (x_bar, y_bar)."""
        _eval_history = []
        nx = len(self.x_bar)
        _best_result = {'f_bar_val': np.inf, 'h_total': np.inf, 'z_bar': self.z_bar.copy(), 'x_bar': self.x_bar.copy(), 'y_bar': self.y_bar.copy()}
        objective = self._build_objective(None, _eval_history, _best_result, system_iter, global_start_time, **kwargs)
        constraints = self._build_constraints(**kwargs)
        x0 = np.concatenate([self.x_bar, self.y_bar])
        bounds_mco = self.problem.bounds[len(self.z_bar):]
        if isinstance(optimizer, str):
            opt_kwargs = {'maxiter': maxiter} if optimizer != 'cobyqa' else {'maxfev': maxiter}
            opt = get_optimizer(optimizer, **opt_kwargs)
        else:
            opt = optimizer
        result_x, f_bar_val = opt(objective, x0, bounds_mco, constraints)
        from ..core import clip_to_bounds
        result_x = clip_to_bounds(result_x, bounds_mco)
        self.x_bar = result_x[:nx]
        self.y_bar = result_x[nx:]
        z_avg = np.zeros_like(self.z_bar)
        for subsystem in self.subsystems:
            z_avg[subsystem.z_idxs] += subsystem.z_under_i
        z_avg /= len(self.subsystems)
        self.z_bar = z_avg
        final_sys_vars = np.concatenate([self.z_bar, self.x_bar, self.y_bar])
        _, c_vals = self.problem.evaluate(final_sys_vars, f=False, c=True)
        c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
        h_total = self.problem.compute_constraint_violation(c_vals_dict)
        _eval_history.append({'system_iter': system_iter, 'time_since_start': time.time() - global_start_time, 'eval_type': 'final', 'f_bar_val': f_bar_val, 'h_total': h_total, 'z_bar': self.z_bar.copy(), 'x_bar': self.x_bar.copy(), 'y_bar': self.y_bar.copy()})
        self.iteration_history.extend(_eval_history)
        return (self.z_bar, self.x_bar, self.y_bar, f_bar_val, h_total, len(_eval_history))

solve(optimizer, system_iter, global_start_time, maxiter=None, **kwargs)

Solve MCO system problem - optimizes only (x_bar, y_bar).

Source code in src/mdotoolbox/frameworks/mco.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def solve(self, optimizer: Union[str, Callable], system_iter: int, global_start_time: float, maxiter: int=None, **kwargs):
    """Solve MCO system problem - optimizes only (x_bar, y_bar)."""
    _eval_history = []
    nx = len(self.x_bar)
    _best_result = {'f_bar_val': np.inf, 'h_total': np.inf, 'z_bar': self.z_bar.copy(), 'x_bar': self.x_bar.copy(), 'y_bar': self.y_bar.copy()}
    objective = self._build_objective(None, _eval_history, _best_result, system_iter, global_start_time, **kwargs)
    constraints = self._build_constraints(**kwargs)
    x0 = np.concatenate([self.x_bar, self.y_bar])
    bounds_mco = self.problem.bounds[len(self.z_bar):]
    if isinstance(optimizer, str):
        opt_kwargs = {'maxiter': maxiter} if optimizer != 'cobyqa' else {'maxfev': maxiter}
        opt = get_optimizer(optimizer, **opt_kwargs)
    else:
        opt = optimizer
    result_x, f_bar_val = opt(objective, x0, bounds_mco, constraints)
    from ..core import clip_to_bounds
    result_x = clip_to_bounds(result_x, bounds_mco)
    self.x_bar = result_x[:nx]
    self.y_bar = result_x[nx:]
    z_avg = np.zeros_like(self.z_bar)
    for subsystem in self.subsystems:
        z_avg[subsystem.z_idxs] += subsystem.z_under_i
    z_avg /= len(self.subsystems)
    self.z_bar = z_avg
    final_sys_vars = np.concatenate([self.z_bar, self.x_bar, self.y_bar])
    _, c_vals = self.problem.evaluate(final_sys_vars, f=False, c=True)
    c_vals_dict = {const.func.name: c_vals[i] for i, const in enumerate(self.problem.constraints)}
    h_total = self.problem.compute_constraint_violation(c_vals_dict)
    _eval_history.append({'system_iter': system_iter, 'time_since_start': time.time() - global_start_time, 'eval_type': 'final', 'f_bar_val': f_bar_val, 'h_total': h_total, 'z_bar': self.z_bar.copy(), 'x_bar': self.x_bar.copy(), 'y_bar': self.y_bar.copy()})
    self.iteration_history.extend(_eval_history)
    return (self.z_bar, self.x_bar, self.y_bar, f_bar_val, h_total, len(_eval_history))

ModifiedCollaborativeOptimization dataclass

Bases: BaseSolver

Modified Collaborative Optimization solver with averaged z.

Source code in src/mdotoolbox/frameworks/mco.py
123
124
125
126
127
128
129
130
131
132
133
134
@dataclass
class ModifiedCollaborativeOptimization(BaseSolver):
    """Modified Collaborative Optimization solver with averaged z."""
    system: MCOSystem
    subsystem_optimizer: Union[str, Callable]
    system_optimizer: Union[str, Callable]
    epsilon_J: float = 1e-06
    epsilon_h: float = 1e-06
    budget: Union['BudgetManager', int] = 100
    max_iter: Union[int, None] = None
    max_eval: Union[int, None] = None
    solver: str = 'MCO'