from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict
from ..core.digital_twin import DigitalTwin
from ..physics.link_budget import rx_power_dbm
[docs]
@dataclass
class HandoverController:
hysteresis_db: float = 3.0
time_to_trigger_s: float = 0.64
_cand: Dict[str, Dict[str, float]] = field(default_factory=dict) # ue_id -> {candidate_tx, start_time}
[docs]
def step(self, twin: DigitalTwin, ue_id: str, current_tx_id: str, timestamp_s: float) -> str:
pm = twin.propagation_model; env = twin.environment
ue = twin.network.get_node_by_id(ue_id)
cur = twin.network.get_node_by_id(current_tx_id)
# current RSRP
cur_pl = pm.calculate_path_loss(cur, ue, env)
cur_rsrp = rx_power_dbm(
cur.transceiver_properties.transmit_power_dbm,
cur.transceiver_properties.antenna_gain_dbi,
ue.transceiver_properties.antenna_gain_dbi,
cur_pl,
)
# best neighbor RSRP among gNB/BS roles
best_tx_id, best_rsrp = current_tx_id, cur_rsrp
for tx in twin.network:
if tx.node_id == current_tx_id:
continue
role = str(tx.metadata.get("role", "")).lower()
if "gnb" not in role and "bs" not in role:
continue
pl = pm.calculate_path_loss(tx, ue, env)
p = rx_power_dbm(
tx.transceiver_properties.transmit_power_dbm,
tx.transceiver_properties.antenna_gain_dbi,
ue.transceiver_properties.antenna_gain_dbi,
pl,
)
if p > best_rsrp:
best_tx_id, best_rsrp = tx.node_id, p
# hysteresis + TTT
if best_rsrp >= cur_rsrp + self.hysteresis_db and best_tx_id != current_tx_id:
rec = self._cand.get(ue_id)
if rec is None or rec.get("candidate_tx") != best_tx_id:
self._cand[ue_id] = {"candidate_tx": best_tx_id, "start_time": timestamp_s}
else:
if (timestamp_s - rec["start_time"]) >= self.time_to_trigger_s:
self._cand.pop(ue_id, None)
return best_tx_id
else:
self._cand.pop(ue_id, None)
return current_tx_id
# --- KPI logging helpers ---
[docs]
def reset_logs(self):
self._log = {"events": [], "ping_pong": 0}
[docs]
def log_event(self, ue_id: str, from_tx: str, to_tx: str, t: float):
if not hasattr(self, "_log"):
self.reset_logs()
self._log["events"].append({"t": t, "ue": ue_id, "from": from_tx, "to": to_tx})
if len(self._log["events"]) >= 2:
prev = self._log["events"][-2]
if prev["ue"] == ue_id and prev["from"] == to_tx and prev["to"] == from_tx:
self._log["ping_pong"] += 1
[docs]
def step_logged(self, twin, ue_id: str, current_tx_id: str, timestamp_s: float) -> str:
new_tx = self.step(twin, ue_id, current_tx_id, timestamp_s)
if new_tx != current_tx_id:
self.log_event(ue_id, current_tx_id, new_tx, timestamp_s)
return new_tx
[docs]
def kpis(self) -> dict:
if not hasattr(self, "_log"):
self.reset_logs()
return {
"handover_count": len(self._log["events"]),
"ping_pong_count": self._log["ping_pong"],
}