Source code for pytwinnet.accelerate.association

from __future__ import annotations
import numpy as np
from typing import Dict, List, Tuple
from ..core.digital_twin import DigitalTwin
from ..core.node import WirelessNode
from ..physics.propagation import PropagationModel
from ..physics.environment import Environment
from .vectorized import fspl_matrix_db, rsrp_matrix_dbm

def _stack_positions(nodes: List[WirelessNode]) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    xyz = np.array([n.position for n in nodes], dtype=float)   # (N,3)
    pw = np.array([n.transceiver_properties.transmit_power_dbm for n in nodes], dtype=float)
    g  = np.array([n.transceiver_properties.antenna_gain_dbi for n in nodes], dtype=float)
    return xyz, pw, g

[docs] def max_rsrp_association_vectorized( twin: DigitalTwin, tx_ids: List[str], ue_ids: List[str] ) -> Dict[str, str]: """ Vectorized max-RSRP association. Much faster than per-pair loops for large sets. """ pm: PropagationModel = twin.propagation_model env: Environment = twin.environment assert pm is not None and env is not None txs = [twin.network.get_node_by_id(t) for t in tx_ids] ues = [twin.network.get_node_by_id(u) for u in ue_ids] txs = [t for t in txs if t is not None] ues = [u for u in ues if u is not None] if not txs or not ues: return {} tx_xyz, tx_dbm, gt_dbi = _stack_positions(txs) ue_xyz, _, gr_dbi = _stack_positions(ues) f_hz = txs[0].transceiver_properties.carrier_frequency_hz # Build PL matrix via underlying scalar model by matching FSPL formula for speed. from ..physics.propagation import FreeSpacePathLoss if isinstance(pm, FreeSpacePathLoss): pl_db = fspl_matrix_db(tx_xyz, ue_xyz, f_hz) # (T,R) else: # Slow fallback (still vectorizable with numba in future) import numpy as np pl_db = np.zeros((len(txs), len(ues)), dtype=float) for i, tx in enumerate(txs): for j, rx in enumerate(ues): pl_db[i, j] = pm.calculate_path_loss(tx, rx, env) rsrp_dbm = rsrp_matrix_dbm(tx_dbm, gt_dbi, gr_dbi, pl_db) # (T,R) best_tx_idx = rsrp_dbm.argmax(axis=0) # (R,) mapping = {ues[j].node_id: txs[i].node_id for j, i in enumerate(best_tx_idx)} return mapping