Source code for pytwinnet.optimization.placement_parallel

from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Tuple, List, Any, Iterable
from concurrent.futures import ProcessPoolExecutor, as_completed
import itertools
import math

from ..core.digital_twin import DigitalTwin
from .objective import Objective

Pos = Tuple[float, float, float]

def _eval_combo(snapshot: DigitalTwin, node_ids: List[str], coords: List[Pos], objective: Objective) -> Tuple[float, Dict[str, Pos]]:
    sim = snapshot  # already deep-copied by caller
    for nid, pos in zip(node_ids, coords):
        n = sim.network.get_node_by_id(nid)
        if n: n.move_to(pos)
    score = objective.evaluate(sim)
    return score, {nid: pos for nid, pos in zip(node_ids, coords)}

[docs] @dataclass class ParallelPlacementGrid: grid_x: Iterable[float] grid_y: Iterable[float] fixed_z: float = 10.0 max_workers: int = 0 # 0 -> use os.cpu_count()
[docs] def optimize(self, twin: DigitalTwin, objective: Objective, node_ids: List[str]) -> Dict[str, Any]: points = [(x, y, self.fixed_z) for x, y in itertools.product(self.grid_x, self.grid_y)] combos = itertools.product(points, repeat=len(node_ids)) best_score = -math.inf best_pos: Dict[str, Pos] = {} futures = [] with ProcessPoolExecutor(max_workers=self.max_workers or None) as ex: for combo in combos: # Snapshot per job (so workers don't share state) snap = twin.snapshot() futures.append(ex.submit(_eval_combo, snap, node_ids, list(combo), objective)) for fut in as_completed(futures): score, pos = fut.result() if score > best_score: best_score, best_pos = score, pos return {"best_score": best_score, "best_positions": best_pos, "evaluations": len(futures)}