ezmsg.blackrock#

class CcfConfig(path)[source]#

Bases: object

Load a CCF file. Device-wide configuration — at most one source per graph should carry this. Other sources targeting the same device set configure=None (pure subscriber).

Parameters:

path (str)

path: str#
__init__(path)#
Parameters:

path (str)

Return type:

None

class CereLinkSignalProducer(*args, **kwargs)[source]#

Bases: _CereLinkBaseProducer[CereLinkSignalSettings, CereLinkSignalProducerState]

Streams one continuous sample-group as AxisArray.

class CereLinkSignalSettings(device_type=None, subscribe_rate=SampleRate.SR_RAW, configure=None, cbtime=False, microvolts=True, cont_buffer_dur=0.5, cmp_configs=())[source]#

Bases: Settings

Settings for CereLinkSignalSource — emits one continuous sample-group as AxisArray.

Parameters:
device_type: DeviceType | None = None#

Device to connect to. None = idle (no Session opened).

subscribe_rate: SampleRate = 6#

The sample-group rate this source streams. Defaults to SR_RAW. Explicit SampleRate.NONE is rejected — a Source must subscribe to something.

configure: CcfConfig | SliceConfig | None = None#

Device configuration this source applies on open.

cbtime: bool = False#

True = raw device nanoseconds/1e9; False = time.monotonic() via clock sync.

microvolts: bool = True#

Convert int16 → µV using channel scale factors.

cont_buffer_dur: float = 0.5#

Ring buffer duration in seconds.

cmp_configs: tuple[ChannelMapSettings, ...] = ()#

One ChannelMapSettings per headstage applied after connection.

__init__(device_type=None, subscribe_rate=SampleRate.SR_RAW, configure=None, cbtime=False, microvolts=True, cont_buffer_dur=0.5, cmp_configs=())#
Parameters:
Return type:

None

class CereLinkSignalSource(*args, **kwargs)[source]#

Bases: BaseProducerUnit[CereLinkSignalSettings, AxisArray, CereLinkSignalProducer]

ezmsg Unit that streams one continuous sample-group from a Blackrock device.

SETTINGS#

alias of CereLinkSignalSettings

OUTPUT_DEVICE_STATUS = OutputStream:unlocated[DeviceStatus](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
__init__(*args, **kwargs)[source]#

Initialize an Addressable object.

The name and location are initially None and must be set before the object can be properly addressed. This is achieved through the _set_name() and _set_location() methods.

Return type:

None

create_producer()[source]#

Create the producer instance from settings.

Closes the previous producer first (if any), so resources held by it — sockets, file handles, hardware sessions — are released deterministically rather than being left to garbage collection.

Return type:

None

shutdown()[source]#

Runs when the Unit terminates.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should clean up resources and perform any necessary shutdown procedures.

Return type:

None

async device_status()[source]#
Return type:

AsyncGenerator

class CereLinkSpikeProducer(*args, **kwargs)[source]#

Bases: _CereLinkBaseProducer[CereLinkSpikeSettings, CereLinkSpikeProducerState]

Streams spike events as AxisArray of shape [time, ch, unit=7].

Time axis is the device’s 30 kHz spike clock; unit axis indexes the device convention (0=unsorted, 1..5=sorted, 6=noise — values >5 collapse into the noise bucket because the unit axis has fixed length 7).

Emission cadence is regular: every spike_buffer_dur seconds, one window of shape [N_t, n_ch, 7] is emitted. Empty windows are emitted too (downstream wants a steady time axis). The window before the first spike is suppressed — there’s no device-time anchor until then.

Spikes whose timestamp lands beyond the current window’s range are dropped (a downstream-backpressure failure mode); in normal operation _produce runs at the buffer cadence and the buffer is large enough to hold one window of spikes.

__init__(*args, **kwargs)[source]#
Return type:

None

class CereLinkSpikeSettings(device_type=None, configure=None, cbtime=False, microvolts=True, spike_buffer_dur=0.5, cmp_configs=())[source]#

Bases: Settings

Settings for CereLinkSpikeSource — emits sparse spike events as AxisArray of shape [time, ch, unit=7] at the 30 kHz spike clock. Unit indices follow the device convention: 0=unsorted, 1..5=sorted, 6=noise (header.type > 5).

Parameters:
device_type: DeviceType | None = None#

Device to connect to. None = idle.

configure: CcfConfig | SliceConfig | None = None#

Device configuration this source applies on open.

cbtime: bool = False#

True = raw device nanoseconds/1e9; False = time.monotonic() via clock sync.

microvolts: bool = True#

Reserved for future spike-waveform emission — int16 → µV scaling.

spike_buffer_dur: float = 0.5#

Ring buffer duration in seconds (at the 30 kHz spike clock).

cmp_configs: tuple[ChannelMapSettings, ...] = ()#

One ChannelMapSettings per headstage applied after connection.

__init__(device_type=None, configure=None, cbtime=False, microvolts=True, spike_buffer_dur=0.5, cmp_configs=())#
Parameters:
Return type:

None

class CereLinkSpikeSource(*args, **kwargs)[source]#

Bases: BaseProducerUnit[CereLinkSpikeSettings, AxisArray, CereLinkSpikeProducer]

ezmsg Unit that streams spike events as AxisArray[time, ch, unit=7].

SETTINGS#

alias of CereLinkSpikeSettings

OUTPUT_DEVICE_STATUS = OutputStream:unlocated[DeviceStatus](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
__init__(*args, **kwargs)[source]#

Initialize an Addressable object.

The name and location are initially None and must be set before the object can be properly addressed. This is achieved through the _set_name() and _set_location() methods.

Return type:

None

create_producer()[source]#

Create the producer instance from settings.

Closes the previous producer first (if any), so resources held by it — sockets, file handles, hardware sessions — are released deterministically rather than being left to garbage collection.

Return type:

None

shutdown()[source]#

Runs when the Unit terminates.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should clean up resources and perform any necessary shutdown procedures.

Return type:

None

async device_status()[source]#
Return type:

AsyncGenerator

class CerePlexImpedance(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[CerePlexImpedanceSettings, AxisArray, AxisArray, CerePlexImpedanceProcessor]

Parameters:

settings (Settings | None)

SETTINGS#

alias of CerePlexImpedanceSettings

class CerePlexImpedanceProcessor(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[CerePlexImpedanceSettings, AxisArray, AxisArray | None, CerePlexImpedanceState]

Stateful transformer that extracts per-channel impedance from a CerePlex sweep.

Expects a stream of AxisArray messages with dims ["time", "ch"] where the data is in microvolts. When using CereLinkSignalSource, set microvolts=True; raw ADC counts will produce incorrect results.

The processor tracks one or more headstages independently (configured via CerePlexImpedanceSettings.headstage_channel_offsets). Each headstage’s impedance sweep cycles sequentially through its channels: exactly one channel is non-zero at a time while the others read zero. This relies on the device’s internal filtering being disabled — a filter produces small non-zero residuals on idle channels, which defeats the exact-zero exclusivity checks. Misconfiguration there is a sign that the recording chain needs fixing, not that this algorithm should accommodate it.

On each impedance update the processor emits an AxisArray whose data is a (1, n_ch) array of impedance values in kOhm (NaN for channels not yet measured).

NONRESET_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset({'freq_hi', 'freq_lo', 'headstage_channel_offsets', 'test_current_nA'})#
update_settings(new_settings)[source]#

Apply new settings to this processor, requesting a state reset if needed.

Diffs new_settings against the current self.settings. If every changed field is listed in NONRESET_SETTINGS_FIELDS, self.settings is simply rebound and the processor keeps running. Otherwise, _request_reset() is called so the next message triggers a fresh _reset_state (for stateful subclasses). Override for finer-grained control than the class-level allow-list provides.

Parameters:

new_settings (CerePlexImpedanceSettings)

Return type:

None

class CerePlexImpedanceSettings(headstage_channel_offsets: tuple[int, ...] = (0,), collect_duration_s: float = 0.1, fft_duration_s: float = 0.09227, freq_lo: float = 960.0, freq_hi: float = 1050.0, test_current_nA: float = 1.0)[source]#

Bases: Settings

Parameters:
headstage_channel_offsets: tuple[int, ...] = (0,)#

Starting channel index of each CerePlex headstage. Each headstage’s range extends from its offset to the next offset (or n_ch for the last). Example: two 128-ch headstages → (0, 128).

collect_duration_s: float = 0.1#

Maximum burst duration to buffer per channel (100 ms for CerePlex).

fft_duration_s: float = 0.09227#

Duration of data used for FFT, taken from the end of the burst. The preceding samples serve as settle time.

freq_lo: float = 960.0#

Lower bound of frequency range for peak extraction (Hz).

freq_hi: float = 1050.0#

Upper bound of frequency range for peak extraction (Hz).

test_current_nA: float = 1.0#

Injected test-current peak-to-peak amplitude (nA).

__init__(headstage_channel_offsets=(0,), collect_duration_s=0.1, fft_duration_s=0.09227, freq_lo=960.0, freq_hi=1050.0, test_current_nA=1.0)#
Parameters:
Return type:

None

class ChannelSelection(*values)[source]#

Bases: Enum

Sentinel selections for SliceConfig.channels (the cases an explicit channel-ID list can’t express).

ALL = 'all'#

Every channel matching channel_type; the others are disabled.

ENABLED = 'enabled'#

Only the channels the device already has enabled for this stream; the enabled set is left unchanged. What counts as “enabled” is stream-specific: continuous sample-group membership for the signal source (channels are retuned to its rate/coupling but never enabled/disabled), and spike-extraction state for the spike source (extraction is left exactly as-is, and the source subscribes to whatever already has it on).

class ChannelMapProcessor(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[ChannelMapUnitSettings, AxisArray, AxisArray, ChannelMapState]

Stateful transformer that attaches CMP-derived channel metadata.

Each reset rebuilds the axis in full from settings.cmp_configs: a base layer from incoming labels, every CMP overlay applied in order, then the auto-grid fills indices no CMP claimed. Reset fires on a channel-count change or any cmp_configs change (the latter via BaseProcessor.update_settings_request_reset), so there is no cross-push state to coalesce. An empty cmp_configs yields a pure auto-grid.

class ChannelMapSettings(filepath: str | None = None, start_chan: int = 1, hs_id: int = 0)[source]#

Bases: Settings

Parameters:
  • filepath (str | None)

  • start_chan (int)

  • hs_id (int)

filepath: str | None = None#

Path to the .cmp file. None (or an empty path) means no CMP — the auto-grid fallback generates coordinates for every channel.

start_chan: int = 1#

1-based channel ID assigned to the first sorted CMP row. Mirrors pycbsdk.Session.load_channel_map().

hs_id: int = 0#

Headstage identifier, passed through to pycbsdk.cmp.parse_cmp(), where it sets each entry’s headstage field. Labels are taken verbatim (no "hs{hs_id}-" prefix); bank/elec disambiguate channels that reuse a label across headstages. Pass 0 for single-headstage rigs.

__init__(filepath=None, start_chan=1, hs_id=0)#
Parameters:
  • filepath (str | None)

  • start_chan (int)

  • hs_id (int)

Return type:

None

class ChannelMapUnit(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[ChannelMapUnitSettings, AxisArray, AxisArray, ChannelMapProcessor]

Parameters:

settings (Settings | None)

SETTINGS#

alias of ChannelMapUnitSettings

class ChannelMapUnitSettings(cmp_configs: tuple[ChannelMapSettings, ...] = ())[source]#

Bases: Settings

Parameters:

cmp_configs (tuple[ChannelMapSettings, ...])

cmp_configs: tuple[ChannelMapSettings, ...] = ()#

Per-headstage overlays, applied in order on each reset. Empty (the default) means no CMP — the auto-grid lays out every channel.

__init__(cmp_configs=())#
Parameters:

cmp_configs (tuple[ChannelMapSettings, ...])

Return type:

None

class DeviceStatus(device_type, success, error='')[source]#

Bases: object

Result of a settings-driven device switch.

Emitted on CereLinkSignalSource.OUTPUT_DEVICE_STATUS (and the corresponding spike-source output) after each _areset_state attempt. The GUI consumes this to confirm or revert its device selection — the snapshot’s settings_changed event fires before the open completes and so cannot distinguish success from failure.

Parameters:
device_type: DeviceType | None#
success: bool#
error: str = ''#
__init__(device_type, success, error='')#
Parameters:
Return type:

None

class DeviceType(*values)[source]#

Bases: IntEnum

Device type selection.

Values match cbproto_device_type_t.

LEGACY_NSP = 0#
NSP = 1#
HUB1 = 2#
HUB2 = 3#
HUB3 = 4#
NPLAY = 5#
CUSTOM = 6#
class SamplingDelayAlignment(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[SamplingDelayAlignmentSettings, AxisArray, AxisArray, SamplingDelayAlignmentTransformer]

Parameters:

settings (Settings | None)

SETTINGS#

alias of SamplingDelayAlignmentSettings

class SamplingDelayAlignmentSettings(bank_size=32, channel_sample_interval=9.696969696969698e-07, filter_len=33, rail_threshold=None)[source]#

Bases: Settings

Settings for SamplingDelayAlignmentTransformer.

Parameters:
  • bank_size (int)

  • channel_sample_interval (float)

  • filter_len (int)

  • rail_threshold (float | None)

bank_size: int = 32#

Channels per simultaneously-started A/D bank. Used to derive each channel’s sweep slot (c % bank_size) only as a fallback, when the channel axis carries no bank/elec metadata.

channel_sample_interval: float = 9.696969696969698e-07#

Seconds between successive channels within a bank.

filter_len: int = 33#

Sinc FIR length (odd). Bulk delay is (filter_len-1)//2 samples; longer = flatter passband / better near Nyquist, at more latency and compute. Set to 0 to disable alignment entirely – the transformer becomes a pass-through that returns its input unchanged.

rail_threshold: float | None = None#

If set, samples with abs(value) >= rail_threshold are treated as clipped and held at the last valid value before filtering. None skips rail handling. (For Blackrock int16 at 0.25 uV/count, the rail is ~8191 uV.)

__init__(bank_size=32, channel_sample_interval=9.696969696969698e-07, filter_len=33, rail_threshold=None)#
Parameters:
  • bank_size (int)

  • channel_sample_interval (float)

  • filter_len (int)

  • rail_threshold (float | None)

Return type:

None

class SamplingDelayAlignmentTransformer(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[SamplingDelayAlignmentSettings, AxisArray, AxisArray, SamplingDelayAlignmentState]

Per-channel fractional-delay alignment (see module docstring).

NONRESET_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset({'rail_threshold'})#
class SliceConfig(channels=ChannelSelection.ALL, channel_type=ChannelType.FRONTEND, ac_input_coupling=False, enable_spiking=False)[source]#

Bases: object

Programmatic per-slice device configuration owned by this source.

The source applies the device state needed to make its subscribed stream produce data on channels: set_sample_group for signal sources, set_spike_extraction for spike sources, plus AC coupling.

Multiple sources can carry disjoint slices for the same device; pycbsdk handles the merge. Overlap with incompatible settings is the user’s responsibility.

Parameters:
channels: list[int] | ChannelSelection = 'all'#

Which channels this slice targets — one field, three intents:

  • list[int] — enable exactly these 1-based channel IDs; the other channels of channel_type are disabled. A provided list is always respected.

  • ChannelSelection.ALL (default) — enable every channel matching channel_type; others disabled.

  • ChannelSelection.ENABLED — leave the device’s enabled set as-is and only consume it. Signal source: retune the already-streaming channels (disable_others=False, so an unused front-end bank stays off). Spike source: leave spike extraction untouched and subscribe to whatever already has it on (enable_spiking is ignored in this mode). An empty enabled set yields nothing and warns.

channel_type: ChannelType = 0#
ac_input_coupling: bool = False#

Enable (True) or disable (False) AC coupling (highpass filter) on this slice. Note: This is applied unconditionally to CereLinkSignalProducer.

enable_spiking: bool = False#

Enable spike extraction on the selected channels (FRONTEND only). Honored by CereLinkSpikeSource; ignored by signal sources, and ignored when channels is ChannelSelection.ENABLED (which leaves extraction exactly as the device has it).

__init__(channels=ChannelSelection.ALL, channel_type=ChannelType.FRONTEND, ac_input_coupling=False, enable_spiking=False)#
Parameters:
Return type:

None

Modules

cerelink

CereLink-based source for ezmsg — streams continuous and spike data from Blackrock devices.

cereplex_impedance

CerePlex impedance measurement pipeline.

channel_map

Attach Blackrock .cmp channel-map metadata to an AxisArray's ch axis.

sampling_delay_alignment

Align channels sampled at different instants by a sequential A/D.