ezmsg.sigproc.signalinjector#

Classes

class SignalInjector(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SignalInjectorSettings, AxisArray, AxisArray, SignalInjectorTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SignalInjectorSettings

INPUT_FREQUENCY = InputStream:unlocated[float | None]()#
INPUT_AMPLITUDE = InputStream:unlocated[<class 'float'>]()#
async on_frequency(msg)[source]#
Parameters:

msg (float | None)

Return type:

None

async on_amplitude(msg)[source]#
Parameters:

msg (float)

Return type:

None

class SignalInjectorSettings(time_dim: str = 'time', frequency: float | None = None, amplitude: float = 1.0, mixing_seed: int | None = None)[source]#

Bases: Settings

Parameters:
  • time_dim (str)

  • frequency (float | None)

  • amplitude (float)

  • mixing_seed (int | None)

time_dim: str = 'time'#
frequency: float | None = None#
amplitude: float = 1.0#
mixing_seed: int | None = None#
__init__(time_dim='time', frequency=None, amplitude=1.0, mixing_seed=None)#
Parameters:
  • time_dim (str)

  • frequency (float | None)

  • amplitude (float)

  • mixing_seed (int | None)

Return type:

None

class SignalInjectorState[source]#

Bases: object

cur_shape: tuple[int, ...] | None = None#
cur_frequency: float | None = None#
cur_amplitude: float | None = None#
mixing: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class SignalInjectorTransformer(*args, **kwargs)[source]#

Bases: BaseAsyncTransformer[SignalInjectorSettings, AxisArray, AxisArray, SignalInjectorState]

class SignalInjectorSettings(time_dim: str = 'time', frequency: float | None = None, amplitude: float = 1.0, mixing_seed: int | None = None)[source]#

Bases: Settings

Parameters:
  • time_dim (str)

  • frequency (float | None)

  • amplitude (float)

  • mixing_seed (int | None)

time_dim: str = 'time'#
frequency: float | None = None#
amplitude: float = 1.0#
mixing_seed: int | None = None#
__init__(time_dim='time', frequency=None, amplitude=1.0, mixing_seed=None)#
Parameters:
  • time_dim (str)

  • frequency (float | None)

  • amplitude (float)

  • mixing_seed (int | None)

Return type:

None

class SignalInjectorState[source]#

Bases: object

cur_shape: tuple[int, ...] | None = None#
cur_frequency: float | None = None#
cur_amplitude: float | None = None#
mixing: ndarray[tuple[Any, ...], dtype[_ScalarT]] | None = None#
class SignalInjectorTransformer(*args, **kwargs)[source]#

Bases: BaseAsyncTransformer[SignalInjectorSettings, AxisArray, AxisArray, SignalInjectorState]

class SignalInjector(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SignalInjectorSettings, AxisArray, AxisArray, SignalInjectorTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SignalInjectorSettings

INPUT_FREQUENCY = InputStream:unlocated[float | None]()#
INPUT_AMPLITUDE = InputStream:unlocated[<class 'float'>]()#
async on_frequency(msg)[source]#
Parameters:

msg (float | None)

Return type:

None

async on_amplitude(msg)[source]#
Parameters:

msg (float)

Return type:

None