AxisArray Utility Methods and Units#

Utility classes which implement functionality related to messages. These include a method for instantiating new instances of a dataclass given an existing instance:

  • replace - Function for creating a new dataclass instance by replacing attributes of an existing instance

and several ezmsg Unit classes which operate on AxisArray messages:

  • ArrayChunker - ezmsg Unit to chunk AxisArray messages along specified axes

  • SetKey - ezmsg Unit for setting the key of incoming AxisArray messages

  • FilterOnKey - ezmsg Unit for filtering an AxisArray based on its key.

  • ModifyAxis - ezmsg Unit for modifying axis names and dimensions of AxisArray messages.

replace#

ezmsg.util.messages.axisarray.replace(arr, **kwargs)[source]#

Fast replacement of dataclass fields with reduced safety.

Unlike dataclasses.replace, this function does not check for type compatibility, nor does it check that the passed in fields are valid fields for the dataclass and not flagged as init=False.

User code may choose to use this replace or the legacy replace according to their needs. To force ezmsg to use the legacy replace, set the environment variable: EZMSG_DISABLE_FAST_REPLACE Unset the variable to use this replace function.

Parameters:
  • arr (Generic[T]) – The dataclass instance to create a modified copy of.

  • kwargs – Field values to update in the new instance.

Returns:

A new instance of the same type with updated field values.

Return type:

T

Array Chunking#

class ezmsg.util.messages.chunker.ArrayChunker(*args, settings=None, **kwargs)[source]#

Bases: Unit

Unit for chunking array data along a specified axis.

Converts array data into sequential chunks along a specified axis, with proper timing axis information for streaming applications.

SETTINGS[source]#

alias of ArrayChunkerSettings

STATE[source]#

alias of GenState

construct_generator()[source]#

Construct the chunking generator with current settings.

Creates a new array_chunker generator instance using the unit’s settings.

async initialize()[source]#

Initialize the ArrayChunker unit.

Sets up the generator for chunking operations based on current settings.

Return type:

None

async on_settings(msg)[source]#

Handle incoming settings updates.

Parameters:

msg (ez.Settings) – New settings to apply.

Return type:

None

async send_chunk()[source]#

Publisher method that yields data chunks.

Continuously yields chunks from the generator until exhausted, with proper exception handling for completion cases.

Returns:

Async generator yielding AxisArray chunks.

Return type:

AsyncGenerator

class ezmsg.util.messages.chunker.ArrayChunkerSettings(data, chunk_len, axis=0, fs=1000.0, tzero=0.0)[source]#

Bases: Settings

Settings for ArrayChunker unit.

Configuration for chunking array data along a specified axis with timing information.

Parameters:
  • data (npt.ArrayLike) – An array_like object to iterate over, chunk-by-chunk.

  • chunk_len (int) – The length of the chunk returned in each iteration (except the last).

  • axis (int) – The axis along which to chunk the array.

  • fs (float) – The sampling frequency of the data. Will only be used to make the time axis.

  • tzero (float) – The time offset of the first chunk. Will only be used to make the time axis.

ezmsg.util.messages.chunker.array_chunker(data, chunk_len, axis=0, fs=1000.0, tzero=0.0)[source]#

Create a generator that yields AxisArrays containing chunks of an array along a specified axis.

The generator should be useful for quick offline analyses, tests, or examples. This generator probably is not useful for online streaming applications.

Parameters:
  • data (npt.ArrayLike) – An array_like object to iterate over, chunk-by-chunk.

  • chunk_len (int) – The length of the chunk returned in each iteration (except the last).

  • axis (int) – The axis along which to chunk the array.

  • fs (float) – The sampling frequency of the data. Will only be used to make the time axis.

  • tzero (float) – The time offset of the first chunk. Will only be used to make the time axis.

Returns:

A generator that yields AxisArrays containing chunks of the input array.

Return type:

Generator[AxisArray, None, None]

Modifying AxisArray “key”#

class ezmsg.util.messages.key.FilterOnKey(*args, settings=None, **kwargs)[source]#

Bases: Unit

Filter an AxisArray based on its key.

Only passes through AxisArray messages whose key matches the configured key setting. Uses zero-copy operations for efficient filtering.

Note: There is no associated generator method for this Unit because messages that fail the filter would still be yielded (as None), which complicates downstream processing. For contexts where filtering on key is desired but the ezmsg framework is not used, use normal Python functional programming. See ezmsg-org/ezmsg#142

SETTINGS[source]#

alias of KeySettings

async on_message(message)[source]#

Filter incoming AxisArray messages based on their key.

Only yields messages whose key matches the configured filter key. Uses minimal ‘touch’ to prevent unnecessary deep copying by the framework.

Parameters:

message (AxisArray) – Input AxisArray to filter.

Returns:

Async generator yielding filtered AxisArray messages.

Return type:

AsyncGenerator

class ezmsg.util.messages.key.KeySettings(key='')[source]#

Bases: Settings

Settings for key manipulation units.

Configuration for setting or filtering AxisArray keys.

Parameters:

key (str) – The string to set as the key.

class ezmsg.util.messages.key.SetKey(*args, settings=None, **kwargs)[source]#

Bases: Unit

Unit for setting the key of incoming AxisArray messages.

Modifies the key field of AxisArray messages while preserving all other data. Uses zero-copy operations for efficient processing.

SETTINGS[source]#

alias of KeySettings

STATE[source]#

alias of GenState

construct_generator()[source]#

Construct the key-setting generator with current settings.

Creates a new set_key generator instance using the unit’s key setting.

async initialize()[source]#

Initialize the SetKey unit.

Sets up the generator for key modification operations.

Return type:

None

async on_message(message)[source]#

Process incoming AxisArray messages and set their keys.

Uses zero-copy operations to efficiently modify the key field while preserving all other data.

Parameters:

message (AxisArray) – Input AxisArray to modify.

Returns:

Async generator yielding AxisArray with modified key.

Return type:

AsyncGenerator

async on_settings(msg)[source]#

Handle incoming settings updates.

Parameters:

msg (ez.Settings) – New settings to apply.

Return type:

None

ezmsg.util.messages.key.set_key(key='')[source]#

Set the key of an AxisArray.

Parameters:

key (str) – The string to set as the key.

Returns:

A primed generator object ready to yield an AxisArray with the key set for each .send(axis_array).

Return type:

Generator[AxisArray, AxisArray, None]

Modifying AxisArray “axes”#

class ezmsg.util.messages.modify.ModifyAxis(*args, settings=None, **kwargs)[source]#

Bases: Unit

Unit for modifying axis names and dimensions of AxisArray messages.

Renames dimensions and axes according to a name mapping, with support for dropping dimensions. Uses zero-copy operations for efficient processing.

SETTINGS[source]#

alias of ModifyAxisSettings

STATE[source]#

alias of GenState

construct_generator()[source]#

Construct the axis-modifying generator with current settings.

Creates a new modify_axis generator instance using the unit’s name mapping.

async initialize()[source]#

Initialize the ModifyAxis unit.

Sets up the generator for axis modification operations.

Return type:

None

async on_message(message)[source]#

Process incoming AxisArray messages and modify their axes.

Uses zero-copy operations to efficiently modify axis names and dimensions while preserving data integrity.

Parameters:

message (AxisArray) – Input AxisArray to modify.

Returns:

Async generator yielding AxisArray with modified axes.

Return type:

AsyncGenerator

async on_settings(msg)[source]#

Handle incoming settings updates.

Parameters:

msg (ez.Settings) – New settings to apply.

Return type:

None

class ezmsg.util.messages.modify.ModifyAxisSettings(name_map=None)[source]#

Bases: Settings

Settings for ModifyAxis unit.

Configuration for modifying axis names and dimensions of AxisArray messages.

Parameters:

name_map (dict[str, str | None] | None) – A dictionary where the keys are the names of the old dims and the values are the new names. Use None as a value to drop the dimension. If the dropped dimension is not len==1 then an error is raised.

ezmsg.util.messages.modify.modify_axis(name_map=None)[source]#

Modify an AxisArray’s axes and dims according to a name_map.

Parameters:

name_map (dict[str, str | None] | None) – A dictionary where the keys are the names of the old dims and the values are the new names. Use None as a value to drop the dimension. If the dropped dimension is not len==1 then an error is raised.

Returns:

A primed generator object ready to yield an AxisArray with modified axes for each .send(axis_array).

Return type:

Generator[AxisArray, AxisArray, None]