pytwinnet.ingestion

class pytwinnet.ingestion.DataSource[source]

Bases: ABC

abstractmethod connect()[source]
Return type:

None

abstractmethod read_data()[source]
Return type:

Iterable[object]

class pytwinnet.ingestion.MockDataSource(node_ids, step_size_m=1.0, seed=0)[source]

Bases: DataSource

Parameters:
connect()[source]
Return type:

None

read_data()[source]
Return type:

Iterable[MockNodeUpdate]

class pytwinnet.ingestion.MockNodeUpdate(node_id, new_position)[source]

Bases: object

Parameters:
new_position: Tuple[float, float, float]
node_id: str
class pytwinnet.ingestion.RealTimeMonitor(twin, source, on_applied=None)[source]

Bases: object

Parameters:
on_applied: Optional[Callable[[object], None]] = None
poll_once(updates=None)[source]
Return type:

int

Parameters:

updates (Iterable[object] | None)

source: DataSource
twin: DigitalTwin

Modules