Source code for pytwinnet.physics.batch
from __future__ import annotations
from typing import List, Iterable
import numpy as np
from ..core.node import WirelessNode
from ..core.digital_twin import DigitalTwin
from .propagation import PropagationModel, FreeSpacePathLoss
from ..accelerate.vectorized import fspl_matrix_db
[docs]
def path_loss_matrix(
twin: DigitalTwin,
tx_nodes: List[WirelessNode],
rx_positions: np.ndarray, # (R,3) float
) -> np.ndarray:
"""
Return PL (dB) matrix of shape (T,R) for arbitrary PropagationModel.
Fast paths:
- If model is FreeSpacePathLoss -> fully vectorized (no Python loops).
- If model implements an optional `calculate_path_loss_batch(tx_nodes, rx_positions, env)`
it will be used directly (hook for future custom vectorized models).
Fallback:
- Reuse a single RX WirelessNode and update its .position per point to avoid
per-point allocations. This is still scalar but much faster than constructing
nodes in a tight loop.
"""
pm: PropagationModel = twin.propagation_model
env = twin.environment
assert pm is not None and env is not None
T = len(tx_nodes)
R = int(rx_positions.shape[0])
if T == 0 or R == 0:
return np.zeros((T, R), dtype=float)
# Fast path: pure FSPL
if isinstance(pm, FreeSpacePathLoss):
tx_xyz = np.array([tx.position for tx in tx_nodes], dtype=float) # (T,3)
f_hz = tx_nodes[0].transceiver_properties.carrier_frequency_hz
return fspl_matrix_db(tx_xyz, rx_positions, f_hz)
# Optional vectorized API hook
if hasattr(pm, "calculate_path_loss_batch"):
return pm.calculate_path_loss_batch(tx_nodes, rx_positions, env)
# Scalar fallback (minimize allocations)
pl = np.empty((T, R), dtype=float)
rx = WirelessNode("__rx__", position=(0.0, 0.0, 0.0))
for j in range(R):
# update one shared RX node position
rx.position = (float(rx_positions[j, 0]), float(rx_positions[j, 1]), float(rx_positions[j, 2]))
for i, tx in enumerate(tx_nodes):
pl[i, j] = pm.calculate_path_loss(tx, rx, env)
return pl