Source code for pytwinnet.ingestion.realtime


from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Optional, Callable
from ..core.digital_twin import DigitalTwin
from .base import DataSource
from .mock import MockNodeUpdate

[docs] @dataclass class RealTimeMonitor: twin: DigitalTwin source: DataSource on_applied: Optional[Callable[[object], None]] = None
[docs] def poll_once(self, updates: Iterable[object] | None = None) -> int: count = 0 if updates is None: updates = self.source.read_data() for upd in updates: applied = self._apply_update(upd) if applied: count += 1 if self.on_applied: self.on_applied(upd) return count
def _apply_update(self, update: object) -> bool: if isinstance(update, MockNodeUpdate): node = self.twin.network.get_node_by_id(update.node_id) if node: x, y, z = node.position dx, dy, dz = update.new_position node.move_to((x + dx, y + dy, z + dz)) return True return False return False