Source code for pytwinnet.mobility.random_waypoint


from __future__ import annotations
import random, math
import numpy as np
from dataclasses import dataclass
from typing import Tuple, Optional
from ..core.node import WirelessNode
from ..physics.environment import Environment

[docs] @dataclass class RandomWaypoint: env: Environment speed_range_mps: Tuple[float, float] = (0.5, 1.5) pause_time_s: float = 0.0 seed: int = 0 _rng: random.Random = None _target: Optional[Tuple[float, float, float]] = None _speed: float = 0.0 _paused_until: float = 0.0 _last_t: Optional[float] = None def __post_init__(self): self._rng = random.Random(self.seed) def _sample_waypoint(self) -> Tuple[float, float, float]: w, d, h = self.env.dimensions_m return (self._rng.uniform(0, w), self._rng.uniform(0, d), self._rng.uniform(0, h))
[docs] def update(self, node: WirelessNode, timestamp: float) -> None: if self._last_t is None: self._last_t = timestamp self._target = self._sample_waypoint() self._speed = self._rng.uniform(*self.speed_range_mps) return dt = max(0.0, timestamp - self._last_t) self._last_t = timestamp if timestamp < self._paused_until: return x, y, z = node.position tx, ty, tz = self._target dx, dy, dz = tx - x, ty - y, tz - z dist = math.sqrt(dx*dx + dy*dy + dz*dz) if dist < 1e-6: self._paused_until = timestamp + self.pause_time_s self._target = self._sample_waypoint() self._speed = self._rng.uniform(*self.speed_range_mps) return step = self._speed * dt if step >= dist: node.move_to((tx, ty, tz)) else: nx = x + dx / dist * step ny = y + dy / dist * step nz = z + dz / dist * step node.move_to((nx, ny, nz))
# -------------------------------------------------------------------------- # Backwards-compatibility adapter for legacy examples # --------------------------------------------------------------------------
[docs] class RandomWaypointMobility: """ Simplified random waypoint model compatible with old examples. It does not require a WirelessNode or Environment object. """ def __init__(self, dimensions=(1000,1000), velocity_mps=(1,10), wait_time_s=(0,5), seed=None, **_): self.W, self.H = dimensions self.vmin, self.vmax = velocity_mps self.wmin, self.wmax = wait_time_s self.rng = np.random.default_rng(seed) self.path_history = [] self._target = None self._wait = 0.0 self._speed = None def _choose_target(self, x, y): self._target = (self.rng.uniform(0,self.W), self.rng.uniform(0,self.H)) self._speed = self.rng.uniform(self.vmin, self.vmax) self._wait = self.rng.uniform(self.wmin, self.wmax)
[docs] def update(self, pos, dt): x,y,*_ = pos if self._target is None: self._choose_target(x,y) if self._wait > 0: self._wait = max(0.0, self._wait - dt) self.path_history.append((x,y)) return (x,y, pos[2] if len(pos)>2 else 0.0) tx,ty = self._target dx, dy = tx - x, ty - y d = np.hypot(dx, dy) if d < 1e-6: self._choose_target(x,y) self.path_history.append((x,y)) return (x,y, pos[2] if len(pos)>2 else 0.0) step = self._speed * dt if step >= d: x,y = tx,ty self._choose_target(x,y) else: x += dx * (step/d) y += dy * (step/d) self.path_history.append((x,y)) return (max(0,min(self.W,x)), max(0,min(self.H,y)), pos[2] if len(pos)>2 else 0.0)