Source code for pytwinnet.rl.power_env

from __future__ import annotations
import numpy as np
from typing import List, Dict, Tuple, Optional
from pytwinnet.accelerate.vectorized import fspl_matrix_db
from pytwinnet.core.node import WirelessNode
from pytwinnet.physics import FreeSpacePathLoss
from pytwinnet.accelerate.vectorized import (
            fspl_matrix_db, rsrp_matrix_dbm, noise_dbm_vector,
            sinr_db_from_rsrp_matrix, shannon_throughput_bps_vector
        )

[docs] class PowerControlEnv: """ A lightweight, gymnasium-like environment for downlink power control. - Observations: per-UE RSRP (or path-loss proxy) and current TX power. - Actions: discrete power delta {-Δ, 0, +Δ} for each gNB (vector or per-agent). - Reward: sum-throughput (or weighted) minus power penalty. This avoids hard dependency on gym; but the API is compatible enough. """ metadata = {"render_modes": ["human"]} def __init__(self, twin, tx_ids: List[str], ue_ids: List[str], bandwidth_hz: float = 20e6, efficiency: float = 0.75, power_step_db: float = 1.0, power_min_dbm: float = 10.0, power_max_dbm: float = 40.0, penalty_lambda: float = 0.0): self.twin = twin self.tx_ids = tx_ids self.ue_ids = ue_ids self.B = bandwidth_hz self.eta = efficiency self.step_db = power_step_db self.pmin = power_min_dbm self.pmax = power_max_dbm self.lmb = penalty_lambda self._obs = None # --- core loop ---
[docs] def reset(self, seed: Optional[int] = None): if seed is not None: self.rng = np.random.default_rng(seed) # build initial observation (e.g., current tx powers + path loss snapshot) self._obs = self._make_obs() return self._obs
[docs] def step(self, action_vector: np.ndarray): """ action_vector shape = (len(tx_ids),) in {-1,0,+1}, meaning -Δ, 0, +Δ dB. """ for k, tx_id in enumerate(self.tx_ids): node = self.twin.network.get_node_by_id(tx_id) p = node.transceiver_properties.transmit_power_dbm p_new = np.clip(p + self.step_db * float(action_vector[k]), self.pmin, self.pmax) node.transceiver_properties.transmit_power_dbm = float(p_new) # compute reward reward, info = self._throughput_reward() self._obs = self._make_obs() terminated, truncated = False, False return self._obs, reward, terminated, truncated, info
# --- helpers --- def _make_obs(self): # example obs: [tx_powers_dbm, avg_pathloss_to_each_ue] -> simple vector tx_p = [] for tx_id in self.tx_ids: tx_p.append(self.twin.network.get_node_by_id(tx_id).transceiver_properties.transmit_power_dbm) tx_p = np.array(tx_p, dtype=float) # simple pathloss proxy: FSPL from each TX to a reference UE (or average over UEs) f = 3.5e9 tx_xyz = [] ue_xyz = [] for tx_id in self.tx_ids: tx_xyz.append(self.twin.network.get_node_by_id(tx_id).position) for ue_id in self.ue_ids: ue_xyz.append(self.twin.network.get_node_by_id(ue_id).position) tx_xyz = np.array(tx_xyz, float) ue_xyz = np.array(ue_xyz, float) pl = fspl_matrix_db(tx_xyz, ue_xyz, f) # (n_tx, n_ue) avg_pl = pl.mean(axis=1) # per TX return np.concatenate([tx_p, avg_pl]) def _throughput_reward(self): """Sum throughput over UEs minus power penalty.""" f = 3.5e9 tx_nodes = [self.twin.network.get_node_by_id(t) for t in self.tx_ids] ue_nodes = [self.twin.network.get_node_by_id(u) for u in self.ue_ids] tx_xyz = np.array([n.position for n in tx_nodes], float) ue_xyz = np.array([n.position for n in ue_nodes], float) tx_p_dbm = np.array([n.transceiver_properties.transmit_power_dbm for n in tx_nodes], float) g_db = np.zeros_like(tx_p_dbm) ant_losses_db = np.zeros_like(tx_p_dbm) pl_db = fspl_matrix_db(tx_xyz, ue_xyz, f) # (T,U) rsrp_dbm = rsrp_matrix_dbm(tx_p_dbm, g_db, ant_losses_db, pl_db) # (T,U) noise_dbm = noise_dbm_vector(self.B, noise_figure_db=7.0) * np.ones(ue_xyz.shape[0]) # best server per UE = argmax over T best = np.argmax(rsrp_dbm, axis=0) # sinr for each UE served by its best TX sinr_db = sinr_db_from_rsrp_matrix(rsrp_dbm, best, noise_dbm) thr = shannon_throughput_bps_vector(self.B, sinr_db, self.eta) reward = float(thr.sum() / 1e9) - self.lmb * float(np.sum(10**(tx_p_dbm/10)/1000.0)) return reward, {"sum_thr_bps": float(thr.sum()), "mean_sinr_db": float(np.mean(sinr_db))}