Source code for pytwinnet.ingestion.csv_adapter

from __future__ import annotations
import csv
from dataclasses import dataclass
from typing import Dict, Iterable, Iterator
from .base import DataSource
from ..core.digital_twin import DigitalTwin

[docs] @dataclass class CSVPositionDataSource(DataSource): """CSV source for node positions. CSV (header required): timestamp_s,node_id,x,y,z """ path: str
[docs] def connect(self) -> None: return
[docs] def read_data(self) -> Iterator[Dict]: with open(self.path, newline="", encoding="utf-8") as f: r = csv.DictReader(f) for row in r: yield { "timestamp_s": float(row["timestamp_s"]), "node_id": row["node_id"], "position": (float(row["x"]), float(row["y"]), float(row["z"])), }
[docs] class ReplayMonitor: """Replay updates from a DataSource into a DigitalTwin (offline).""" def __init__(self, twin: DigitalTwin, source: DataSource) -> None: self.twin = twin self.source = source
[docs] def run(self, speed: float = 1.0) -> int: updates = 0 events = sorted(self.source.read_data(), key=lambda d: d.get("timestamp_s", 0.0)) for ev in events: nid = ev.get("node_id"); pos = ev.get("position") n = self.twin.network.get_node_by_id(nid) if n and pos: n.move_to(tuple(pos)) updates += 1 return updates
[docs] def rmse(a: Iterable[float], b: Iterable[float]) -> float: import math xs = list(a); ys = list(b) assert len(xs) == len(ys) return math.sqrt(sum((x - y) ** 2 for x, y in zip(xs, ys)) / len(xs))