Source code for pytwinnet.cli.main

from __future__ import annotations
import argparse, json
from typing import Any, Dict
try:
    import yaml  # type: ignore
except Exception as e:  # pragma: no cover
    raise SystemExit("PyYAML is required. Install with: pip install pyyaml") from e

from .. import DigitalTwin, Network, WirelessNode, TransceiverProperties
from ..physics import Environment, FreeSpacePathLoss
from ..physics.fading import ShadowedModel, FadedModel
from ..physics.ris_beam import SmartRISPanel, RISBeamModel
from ..visualization.heatmap_fast import sinr_heatmap_2d_fast
from ..optimization.objective import SumThroughputObjective
from ..optimization.placement import PlacementRandomSearchOptimizer, PlacementGridOptimizer

MODEL_MAP = {
    "fspl": FreeSpacePathLoss,
    "shadowed": ShadowedModel,
    "faded": FadedModel,
    "ris_beam": RISBeamModel,
}

OBJ_MAP = {
    "sum_throughput": SumThroughputObjective,
}

[docs] def build_twin(cfg: Dict[str, Any]) -> DigitalTwin: twin = DigitalTwin() env = cfg.get("environment", {}) dims = env.get("dimensions_m", [100.0, 100.0, 30.0]) twin.set_environment(Environment(dimensions_m=tuple(dims))) pm_cfg = cfg.get("propagation", {"model": "fspl"}) model = pm_cfg.get("model", "fspl").lower() if model == "fspl": pm = FreeSpacePathLoss() elif model == "shadowed": base = FreeSpacePathLoss() pm = ShadowedModel(base, sigma_db=float(pm_cfg.get("sigma_db", 6.0))) elif model == "faded": base = FreeSpacePathLoss() pm = FadedModel(base, kind=str(pm_cfg.get("kind", "rayleigh")), K_dB=float(pm_cfg.get("K_dB", 6.0))) elif model == "ris_beam": base = FreeSpacePathLoss() ris_cfg = pm_cfg.get("ris", {}) ris = SmartRISPanel(position=tuple(ris_cfg.get("position", [50.0, 50.0, 10.0])), element_count=int(ris_cfg.get("element_count", 64))) pm = RISBeamModel(base, ris, extra_loss_db=float(pm_cfg.get("extra_loss_db", 3.0))) else: raise ValueError(f"Unknown propagation model: {model}") twin.set_propagation_model(pm) net = Network() for nd in cfg.get("nodes", []): role = nd.get("role", "") node = WirelessNode( node_id=str(nd["id"]), position=tuple(nd.get("position", [0.0, 0.0, 1.5])), transceiver_properties=TransceiverProperties( transmit_power_dbm=float(nd.get("tx_power_dbm", 20.0)), antenna_gain_dbi=float(nd.get("ant_gain_dbi", 0.0)), carrier_frequency_hz=float(nd.get("f_hz", 3.5e9)), ), metadata={"role": role} if role else {}, ) net.add_node(node) twin.network = net return twin
[docs] def apply_scenario(twin: DigitalTwin, cfg: Dict[str, Any]) -> None: # Minimal scenario support for ev in cfg.get("scenario", {}).get("events", []): et = ev.get("type") if et == "move_node": nid = ev["id"]; pos = tuple(ev["position"]) n = twin.network.get_node_by_id(nid) if n: n.move_to(pos) elif et == "set_power": nid = ev["id"]; p = float(ev["tx_power_dbm"]) n = twin.network.get_node_by_id(nid) if n: n.transceiver_properties.transmit_power_dbm = p elif et == "ris_beam": target = ev.get("target", None) pm = twin.propagation_model if hasattr(pm, "set_beam"): pm.set_beam(target) # type: ignore
[docs] def run_optimizer(twin: DigitalTwin, cfg: Dict[str, Any]) -> Dict[str, Any]: opt_cfg = cfg.get("optimize") if not opt_cfg: return {} obj_cfg = opt_cfg.get("objective", {"type": "sum_throughput", "tx_id": None}) obj_type = obj_cfg.get("type", "sum_throughput") if obj_type not in OBJ_MAP: raise ValueError(f"Unknown objective: {obj_type}") obj = OBJ_MAP[obj_type](**{k: v for k, v in obj_cfg.items() if k != "type"}) method = opt_cfg.get("method", "random_search") targets = opt_cfg.get("node_ids", []) if method == "random_search": bounds = opt_cfg.get("bounds", [[0, 100], [0, 100]]) samples = int(opt_cfg.get("samples", 100)) opt = PlacementRandomSearchOptimizer( bounds=((bounds[0][0], bounds[0][1]), (bounds[1][0], bounds[1][1])), samples=samples, fixed_z=float(opt_cfg.get("fixed_z", 10.0)), ) return opt.optimize(twin, obj, node_ids=targets) elif method == "grid": grid_x = opt_cfg.get("grid_x", [10, 20, 30]) grid_y = opt_cfg.get("grid_y", [10, 20, 30]) opt = PlacementGridOptimizer(grid_x=grid_x, grid_y=grid_y, fixed_z=float(opt_cfg.get("fixed_z", 10.0))) return opt.optimize(twin, obj, node_ids=targets) else: raise ValueError(f"Unknown optimize.method: {method}")
[docs] def maybe_heatmap(twin: DigitalTwin, cfg: Dict[str, Any]) -> str | None: vis = cfg.get("visualization", {}) if not vis.get("heatmap", {}).get("enabled", False): return None h = vis["heatmap"] tx = h.get("tx_id") interfs = h.get("interferer_ids", []) xlim = tuple(h.get("xlim", [0, 100])) ylim = tuple(h.get("ylim", [0, 100])) res = int(h.get("resolution", 200)) _, _ = sinr_heatmap_2d_fast(twin, tx_id=tx, interferer_ids=interfs, xlim=xlim, ylim=ylim, resolution=res, show=False) out = h.get("output", "heatmap.png") import matplotlib.pyplot as plt plt.savefig(out, bbox_inches="tight", dpi=150) return out
[docs] def main(argv: list[str] | None = None) -> None: parser = argparse.ArgumentParser(prog="pytwinnet.cli") sub = parser.add_subparsers(dest="cmd", required=True) p_run = sub.add_parser("run", help="Run config-driven experiment") p_run.add_argument("config", help="Path to YAML config") args = parser.parse_args(argv) if args.cmd == "run": with open(args.config, "r", encoding="utf-8") as f: cfg = yaml.safe_load(f) twin = build_twin(cfg) apply_scenario(twin, cfg) opt_res = run_optimizer(twin, cfg) heat = maybe_heatmap(twin, cfg) report = {"optimize": opt_res, "heatmap": heat} out_json = cfg.get("output", "report.json") with open(out_json, "w", encoding="utf-8") as f: json.dump(report, f, indent=2) print(f"Saved report -> {out_json}") if heat: print(f"Saved heatmap -> {heat}")