Source code for ezmsg.sigproc.util.message

import time
import typing
from dataclasses import dataclass, field

from ezmsg.util.messages.axisarray import AxisArray


[docs] @dataclass(unsafe_hash=True) class SampleTriggerMessage: timestamp: float = field(default_factory=time.time) """Time of the trigger, in seconds. The Clock depends on the input but defaults to time.time""" period: tuple[float, float] | None = None """The period around the timestamp, in seconds""" value: typing.Any = None """A value or 'label' associated with the trigger."""
[docs] @dataclass class SampleMessage: trigger: SampleTriggerMessage """The time, window, and value (if any) associated with the trigger.""" sample: AxisArray """The data sampled around the trigger."""
[docs] def is_sample_message(message: typing.Any) -> typing.TypeGuard[SampleMessage]: """Check if the message is a SampleMessage.""" return hasattr(message, "trigger")