ezmsg.neo.source#

Classes

class NeoIterator(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[NeoIteratorSettings, AxisArray, NeoIteratorState]

__init__(*args, **kwargs)[source]#
property exhausted: bool#
class NeoIteratorSettings(filepath, chunk_dur=0.05, self_terminating=True, t_offset=None)[source]#

Bases: Settings

Settings for NeoIterator.

Parameters:
filepath: PathLike#
chunk_dur: float = 0.05#
self_terminating: bool = True#
t_offset: float | None = None#
__init__(filepath, chunk_dur=0.05, self_terminating=True, t_offset=None)#
Parameters:
Return type:

None

class NeoIteratorState[source]#

Bases: object

t_offset: float = 0.0#
t_start: float = 0.0#
chunk_ix: int = 0#
n_chunks: int = 0#
reader: BaseRawIO | None = None#
streams: dict | None = None#
deque: deque | None = None#
class NeoIteratorUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[NeoIteratorSettings, AxisArray, NeoIterator]

Parameters:

settings (Settings | None)

SETTINGS#

alias of NeoIteratorSettings

OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=False)#
OUTPUT_TERM = OutputStream:unlocated[Any](self.num_buffers=32, self.force_tcp=False)#
async produce()[source]#
Return type:

AsyncGenerator

class NeoIteratorSettings(filepath, chunk_dur=0.05, self_terminating=True, t_offset=None)[source]#

Bases: Settings

Settings for NeoIterator.

Parameters:
filepath: PathLike#
chunk_dur: float = 0.05#
self_terminating: bool = True#
t_offset: float | None = None#
__init__(filepath, chunk_dur=0.05, self_terminating=True, t_offset=None)#
Parameters:
Return type:

None

class NeoIteratorState[source]#

Bases: object

t_offset: float = 0.0#
t_start: float = 0.0#
chunk_ix: int = 0#
n_chunks: int = 0#
reader: BaseRawIO | None = None#
streams: dict | None = None#
deque: deque | None = None#
class NeoIterator(*args, **kwargs)[source]#

Bases: BaseStatefulProducer[NeoIteratorSettings, AxisArray, NeoIteratorState]

__init__(*args, **kwargs)[source]#
property exhausted: bool#
class NeoIteratorUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseProducerUnit[NeoIteratorSettings, AxisArray, NeoIterator]

Parameters:

settings (Settings | None)

SETTINGS#

alias of NeoIteratorSettings

OUTPUT_SIGNAL = OutputStream:unlocated[AxisArray](self.num_buffers=32, self.force_tcp=False)#
OUTPUT_TERM = OutputStream:unlocated[Any](self.num_buffers=32, self.force_tcp=False)#
async produce()[source]#
Return type:

AsyncGenerator