Source code for ezmsg.xdf.source

import asyncio
import os
import time
import typing
from dataclasses import field

import ezmsg.core as ez
from ezmsg.util.generator import GenState
from ezmsg.util.messages.axisarray import AxisArray

from .iter import XDFAxisArrayIterator, XDFMultiAxArrIterator


[docs] class PlaybackClock:
[docs] def __init__( self, rate: float = 1.0, step_dur: float = 0.005, ): """ Create an object that provides a timer that can run at a specified rate, and with a specified step duration. Args: rate: Speed of playback. 1.0 is real time. step_dur: The duration of each step in seconds. Provide the duration using the unmodified rate. """ self._step_dur = step_dur * rate self._wall_start: float = time.time() - self._step_dur / 2 self._step_count: int = 0
[docs] def reset(self) -> None: self._wall_start = time.time() - self._step_dur / 2 self._step_count = 0
def _get_duration(self) -> float: wall_elapsed = time.time() - self._wall_start next_elapsed = self._step_count * self._step_dur step_dur = max(next_elapsed - wall_elapsed, 0) self._step_count += 1 return step_dur
[docs] async def astep(self) -> None: await asyncio.sleep(self._get_duration())
[docs] def step(self) -> None: time.sleep(self._get_duration())
[docs] class XDFIteratorSettings(ez.Settings): filepath: typing.Union[os.PathLike, str] select: str chunk_dur: float = 1.0 start_time: float | None = None stop_time: float | None = None rezero: bool = True playback_rate: float | None = None self_terminating: bool = False """ If True, the unit will raise a :obj:`ez.NormalTermination` exception when the file is exhausted. Note, however, that this will terminate the pipeline even if the data published by this unit are still in transit, which will lead to the pipeline output being truncated before it has finished processing the stream. `self_terminating` should only be used when it is not important that the pipeline finish processing data, such as during prototyping and testing. """
[docs] class XDFIteratorUnit(ez.Unit): STATE = GenState SETTINGS = XDFIteratorSettings OUTPUT_SIGNAL = ez.OutputStream(AxisArray) OUTPUT_TERM = ez.OutputStream(typing.Any)
[docs] def initialize(self) -> None: self.construct_generator()
[docs] def construct_generator(self): self.STATE.gen = XDFAxisArrayIterator( filepath=self.SETTINGS.filepath, select=self.SETTINGS.select, chunk_dur=self.SETTINGS.chunk_dur, start_time=self.SETTINGS.start_time, stop_time=self.SETTINGS.stop_time, rezero=self.SETTINGS.rezero, ) if self.SETTINGS.playback_rate is not None: self._clock = PlaybackClock(rate=self.SETTINGS.playback_rate, step_dur=self.SETTINGS.chunk_dur) else: self._clock = None
[docs] @ez.publisher(OUTPUT_SIGNAL) async def pub_chunk(self) -> typing.AsyncGenerator: try: while True: if self._clock is not None: await self._clock.astep() msg = next(self.STATE.gen) if msg.data.size > 0: yield self.OUTPUT_SIGNAL, msg else: await asyncio.sleep(0) except StopIteration: ez.logger.debug( f"File ({self.SETTINGS.filepath} :: {self.SETTINGS.select}) exhausted." ) if self.SETTINGS.self_terminating: raise ez.NormalTermination yield self.OUTPUT_TERM, True
[docs] class XDFMultiIteratorUnitSettings(XDFIteratorSettings): select: set[str] | None = None # Override with a default force_single_sample: set = field(default_factory=set)
[docs] class XDFMultiIteratorUnit(ez.Unit): STATE = GenState SETTINGS = XDFMultiIteratorUnitSettings OUTPUT_SIGNAL = ez.OutputStream(AxisArray) OUTPUT_TERM = ez.OutputStream(typing.Any)
[docs] def initialize(self) -> None: self.construct_generator()
[docs] def construct_generator(self): self.STATE.gen = XDFMultiAxArrIterator( filepath=self.SETTINGS.filepath, select=self.SETTINGS.select, chunk_dur=self.SETTINGS.chunk_dur, start_time=self.SETTINGS.start_time, stop_time=self.SETTINGS.stop_time, rezero=self.SETTINGS.rezero, force_single_sample=self.SETTINGS.force_single_sample, ) if self.SETTINGS.playback_rate is not None: self._clock = PlaybackClock(rate=self.SETTINGS.playback_rate, step_dur=self.SETTINGS.chunk_dur) else: self._clock = None
[docs] @ez.publisher(OUTPUT_SIGNAL) async def pub_multi(self) -> typing.AsyncGenerator: try: while True: if self._clock is not None: await self._clock.astep() msg = next(self.STATE.gen) if msg is not None: yield self.OUTPUT_SIGNAL, msg else: await asyncio.sleep(0) except StopIteration: ez.logger.debug( f"File ({self.SETTINGS.filepath} :: {self.SETTINGS.select}) exhausted." ) if self.SETTINGS.self_terminating: raise ez.NormalTermination yield self.OUTPUT_TERM, True