Source code for pytwinnet.accelerate.vectorized

from __future__ import annotations
import numpy as np
from typing import Tuple, Optional


try:
    from .numba_kernels import HAVE_NUMBA, fspl_matrix_db_numba, sinr_db_from_rsrp_matrix_numba
except Exception:  # pragma: no cover
    HAVE_NUMBA = False
    fspl_matrix_db_numba = None
    sinr_db_from_rsrp_matrix_numba = None
# ---------- Geometry / FSPL ----------

def _pairwise_dist(tx_xyz: np.ndarray, rx_xyz: np.ndarray) -> np.ndarray:
    """
    tx_xyz: (T,3), rx_xyz: (R,3) -> (T,R) 3D distances (meters)
    """
    d = tx_xyz[:, None, :] - rx_xyz[None, :, :]
    return np.linalg.norm(d, axis=2)

[docs] def fspl_matrix_db( tx_xyz: np.ndarray, rx_xyz: np.ndarray, f_hz: np.ndarray | float ) -> np.ndarray: """ Vectorized Free-Space Path Loss in dB: (T,R) If f_hz is scalar and Numba is available -> JIT kernel. """ if np.isscalar(f_hz): f_mhz = float(f_hz) / 1e6 if HAVE_NUMBA and fspl_matrix_db_numba is not None: # Numba prefers float64 return fspl_matrix_db_numba( np.asarray(tx_xyz, dtype=np.float64), np.asarray(rx_xyz, dtype=np.float64), float(f_mhz), ) # NumPy fallback d = tx_xyz[:, None, :] - rx_xyz[None, :, :] d_m = np.linalg.norm(d, axis=2) d_km = np.maximum(d_m, 1e-3) / 1e3 return 20.0 * np.log10(d_km) + 20.0 * np.log10(f_mhz) + 32.44 # Vector f_hz: use NumPy path d = tx_xyz[:, None, :] - rx_xyz[None, :, :] d_m = np.linalg.norm(d, axis=2) d_km = np.maximum(d_m, 1e-3) / 1e3 f_mhz = np.asarray(f_hz, dtype=float) / 1e6 # (T,) return 20.0 * np.log10(d_km) + 20.0 * np.log10(f_mhz[:, None]) + 32.44
# ---------- Link budget (vectorized) ----------
[docs] def rsrp_matrix_dbm( tx_dbm: np.ndarray, gt_dbi: np.ndarray, gr_dbi: np.ndarray, pl_db: np.ndarray ) -> np.ndarray: """ RSRP (received power) in dBm for all TX-RX pairs. tx_dbm, gt_dbi: (T,), gr_dbi: (R,), pl_db: (T,R) -> (T,R) """ return tx_dbm[:, None] + gt_dbi[:, None] + gr_dbi[None, :] - pl_db
[docs] def noise_dbm_vector( bandwidth_hz: np.ndarray | float, temperature_k: float = 290.0, noise_figure_db: np.ndarray | float = 0.0, ) -> np.ndarray: """ Vectorized thermal noise power (dBm). bandwidth_hz: scalar or (R,), noise_figure_db: scalar or (R,) -> (R,) """ # -174 dBm/Hz at 290K baseline temp_adj_db = 10.0 * np.log10(np.asarray(temperature_k, dtype=float) / 290.0) bw = np.asarray(bandwidth_hz, dtype=float) nf = np.asarray(noise_figure_db, dtype=float) return -174.0 + temp_adj_db + 10.0 * np.log10(np.maximum(bw, 1.0)) + nf
[docs] def sinr_db_from_rsrp_matrix( rsrp_dbm: np.ndarray, # (T,R) serving_tx_idx: np.ndarray, # (R,) noise_dbm: np.ndarray, # (R,) ) -> np.ndarray: if HAVE_NUMBA and sinr_db_from_rsrp_matrix_numba is not None: return sinr_db_from_rsrp_matrix_numba( np.asarray(rsrp_dbm, dtype=np.float64), np.asarray(serving_tx_idx, dtype=np.int64), np.asarray(noise_dbm, dtype=np.float64), ) # NumPy fallback p_mw = 10 ** (rsrp_dbm / 10.0) total_mw = p_mw.sum(axis=0) sig_mw = p_mw[serving_tx_idx, np.arange(p_mw.shape[1])] noi_mw = 10 ** (noise_dbm / 10.0) sinr_lin = sig_mw / np.maximum(total_mw - sig_mw + noi_mw, 1e-30) return 10.0 * np.log10(sinr_lin)
[docs] def shannon_throughput_bps_vector( bandwidth_hz: np.ndarray | float, sinr_db: np.ndarray, # (R,) efficiency: float = 1.0, ) -> np.ndarray: """ Vectorized Shannon throughput per RX (bps). """ bw = np.asarray(bandwidth_hz, dtype=float) sinr_lin = 10 ** (sinr_db / 10.0) return efficiency * bw * np.log2(1.0 + sinr_lin)