ezmsg.blackrock#

class CereLinkProducer(*args, settings=None, **kwargs)[source]#

Bases: BaseProducer[CereLinkSettings, AxisArray]

Owns the pycbsdk Session, ring buffers, and spike queue.

Parameters:

settings (CereLinkSettings | None)

__init__(*args, settings=None, **kwargs)[source]#
Parameters:

settings (CereLinkSettings | None)

open()[source]#

Create and start the Session, configure channels, register callbacks.

device_type=None is an explicit no-op — the producer stays idle and _produce returns None on every call, which lets the unit sit dormant until a real device is selected by the host app.

Return type:

None

close()[source]#

Tear down the Session and unblock any in-flight _produce.

The _data_event.set() is what frees a coroutine that may be awaiting new samples — without it, an in-flight _produce from the previous producer instance would block forever after a settings-driven recreate, eventually getting GC’d as a pending task.

Return type:

None

reload_channel_map(cmp_path)[source]#

Apply a new CMP to the existing Session, refresh cached positions, and rebuild each active group’s template so subsequent AxisArrays carry the new (x, y) positions.

pycbsdk has no API to clear an applied CMP — pass None and the call is a no-op with a warning.

Parameters:

cmp_path (str)

Return type:

None

reload_ccf(ccf_path)[source]#

Apply a new CCF on the existing Session. None is a no-op.

Parameters:

ccf_path (str)

Return type:

None

class CereLinkSettings(device_type: 'DeviceType | None' = None, cbtime: 'bool' = False, microvolts: 'bool' = True, cont_buffer_dur: 'float' = 0.5, ccf_path: 'str | None' = None, cmp_path: 'str | None' = None, n_chans: 'int | None' = None, channel_type: 'ChannelType' = <ChannelType.FRONTEND: 0>, sample_rate: 'SampleRate | None' = None, ac_input_coupling: 'bool' = False)[source]#

Bases: Settings

Parameters:
  • device_type (DeviceType | None)

  • cbtime (bool)

  • microvolts (bool)

  • cont_buffer_dur (float)

  • ccf_path (str | None)

  • cmp_path (str | None)

  • n_chans (int | None)

  • channel_type (ChannelType)

  • sample_rate (SampleRate | None)

  • ac_input_coupling (bool)

device_type: DeviceType | None = None#

Device type to connect to. None = no Session — the unit produces nothing until a real device is selected. This idle-by-default mode lets a host app defer the cost of opening pycbsdk until the user picks a device, which is useful on macOS where simultaneous Sessions exhaust POSIX shared memory.

cbtime: bool = False#

True = raw device nanoseconds/1e9, False = time.monotonic() via clock sync.

microvolts: bool = True#

Convert int16 → µV using channel scale factors (when available).

cont_buffer_dur: float = 0.5#

Ring buffer duration in seconds per sample rate group.

ccf_path: str | None = None#

CCF file to load after connection (or None to skip). Mutually exclusive with n_chans/sample_rate.

cmp_path: str | None = None#

CMP file for electrode positions (or None to skip). Bank offset is always 0.

n_chans: int | None = None#

Number of channels to enable (used with channel_type and sample_rate). Mutually exclusive with ccf_path.

channel_type: ChannelType = 0#

Channel type filter for programmatic setup.

sample_rate: SampleRate | None = None#

Sample rate for programmatic setup (e.g. SampleRate.SR_30kHz). Mutually exclusive with ccf_path.

ac_input_coupling: bool = False#

AC input coupling (highpass filter). Ignored when ccf_path is provided.

__init__(device_type=None, cbtime=False, microvolts=True, cont_buffer_dur=0.5, ccf_path=None, cmp_path=None, n_chans=None, channel_type=ChannelType.FRONTEND, sample_rate=None, ac_input_coupling=False)#
Parameters:
  • device_type (DeviceType | None)

  • cbtime (bool)

  • microvolts (bool)

  • cont_buffer_dur (float)

  • ccf_path (str | None)

  • cmp_path (str | None)

  • n_chans (int | None)

  • channel_type (ChannelType)

  • sample_rate (SampleRate | None)

  • ac_input_coupling (bool)

Return type:

None

class CereLinkSource(*args, **kwargs)[source]#

Bases: BaseProducerUnit[CereLinkSettings, AxisArray, CereLinkProducer]

ezmsg Unit that streams continuous and spike data from a Blackrock device.

SETTINGS#

alias of CereLinkSettings

OUTPUT_SPIKE = OutputStream:unlocated[EventMessage](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
OUTPUT_DEVICE_STATUS = OutputStream:unlocated[DeviceStatus](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
__init__(*args, **kwargs)[source]#

Initialize an Addressable object.

The name and location are initially None and must be set before the object can be properly addressed. This is achieved through the _set_name() and _set_location() methods.

Return type:

None

async initialize()[source]#

Runs when the Unit is instantiated.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should initialize your unit’s state and prepare for message processing.

Return type:

None

async on_settings(msg)[source]#

Receive a settings message, override self.SETTINGS, and re-create the producer. Child classes that wish to have fine-grained control over whether the core producer resets on settings changes should override this method.

Parameters:

msg (CereLinkSettings) – a settings message.

Return type:

None

shutdown()[source]#

Runs when the Unit terminates.

This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

This method is where you should clean up resources and perform any necessary shutdown procedures.

Return type:

None

async spikes()[source]#
Return type:

AsyncGenerator

async device_status()[source]#
Return type:

AsyncGenerator

class CerePlexImpedance(*args, settings=None, **kwargs)[source]#

Bases: BaseTransformerUnit[CerePlexImpedanceSettings, AxisArray, AxisArray, CerePlexImpedanceProcessor]

Parameters:

settings (Settings | None)

SETTINGS#

alias of CerePlexImpedanceSettings

async on_settings(msg)[source]#

Apply new settings.

If only headstage_channel_offsets changed, rebuild the trackers in place — the accumulated impedance values for previously-measured channels remain valid. Any other change recreates the processor as usual (state is reset on the next message).

Parameters:

msg (CerePlexImpedanceSettings)

Return type:

None

class CerePlexImpedanceProcessor(*args, **kwargs)[source]#

Bases: BaseStatefulTransformer[CerePlexImpedanceSettings, AxisArray, AxisArray | None, CerePlexImpedanceState]

Stateful transformer that extracts per-channel impedance from a CerePlex sweep.

Expects a stream of AxisArray messages with dims ["time", "ch"] where the data is in microvolts. When using CereLinkSource, set microvolts=True; raw ADC counts will produce incorrect results.

The processor tracks one or more headstages independently (configured via CerePlexImpedanceSettings.headstage_channel_offsets). Each headstage’s impedance sweep cycles sequentially through its channels: exactly one channel is non-zero at a time while the others read zero.

On each impedance update the processor emits an AxisArray whose data is a (1, n_ch) array of impedance values in kOhm (NaN for channels not yet measured).

class CerePlexImpedanceSettings(headstage_channel_offsets: tuple[int, ...] = (0,), collect_duration_s: float = 0.1, fft_duration_s: float = 0.09227, freq_lo: float = 960.0, freq_hi: float = 1050.0, test_current_nA: float = 1.0)[source]#

Bases: Settings

Parameters:
headstage_channel_offsets: tuple[int, ...] = (0,)#

Starting channel index of each CerePlex headstage. Each headstage’s range extends from its offset to the next offset (or n_ch for the last). Example: two 128-ch headstages → (0, 128).

collect_duration_s: float = 0.1#

Maximum burst duration to buffer per channel (100 ms for CerePlex).

fft_duration_s: float = 0.09227#

Duration of data used for FFT, taken from the end of the burst. The preceding samples serve as settle time.

freq_lo: float = 960.0#

Lower bound of frequency range for peak extraction (Hz).

freq_hi: float = 1050.0#

Upper bound of frequency range for peak extraction (Hz).

test_current_nA: float = 1.0#

Injected test-current peak-to-peak amplitude (nA).

__init__(headstage_channel_offsets=(0,), collect_duration_s=0.1, fft_duration_s=0.09227, freq_lo=960.0, freq_hi=1050.0, test_current_nA=1.0)#
Parameters:
Return type:

None

class DeviceType(*values)[source]#

Bases: IntEnum

Device type selection.

Values match cbproto_device_type_t.

LEGACY_NSP = 0#
NSP = 1#
HUB1 = 2#
HUB2 = 3#
HUB3 = 4#
NPLAY = 5#
CUSTOM = 6#

Modules

cerelink

CereLink-based source for ezmsg — streams continuous and spike data from Blackrock devices.

cereplex_impedance

CerePlex impedance measurement pipeline.