ezmsg.blackrock#
- class CcfConfig(path)[source]#
Bases:
objectLoad a CCF file. Device-wide configuration — at most one source per graph should carry this. Other sources targeting the same device set
configure=None(pure subscriber).- Parameters:
path (str)
- class CereLinkSignalProducer(*args, **kwargs)[source]#
Bases:
_CereLinkBaseProducer[CereLinkSignalSettings,CereLinkSignalProducerState]Streams one continuous sample-group as
AxisArray.
- class CereLinkSignalSettings(device_type=None, subscribe_rate=SampleRate.SR_RAW, configure=None, cbtime=False, microvolts=True, cont_buffer_dur=0.5, cmp_configs=())[source]#
Bases:
SettingsSettings for
CereLinkSignalSource— emits one continuous sample-group asAxisArray.- Parameters:
device_type (DeviceType | None)
subscribe_rate (SampleRate)
configure (CcfConfig | SliceConfig | None)
cbtime (bool)
microvolts (bool)
cont_buffer_dur (float)
cmp_configs (tuple[ChannelMapSettings, ...])
- device_type: DeviceType | None = None#
Device to connect to.
None= idle (no Session opened).
- subscribe_rate: SampleRate = 6#
The sample-group rate this source streams. Defaults to
SR_RAW. ExplicitSampleRate.NONEis rejected — a Source must subscribe to something.
- configure: CcfConfig | SliceConfig | None = None#
Device configuration this source applies on open.
- cmp_configs: tuple[ChannelMapSettings, ...] = ()#
One
ChannelMapSettingsper headstage applied after connection.
- __init__(device_type=None, subscribe_rate=SampleRate.SR_RAW, configure=None, cbtime=False, microvolts=True, cont_buffer_dur=0.5, cmp_configs=())#
- Parameters:
device_type (DeviceType | None)
subscribe_rate (SampleRate)
configure (CcfConfig | SliceConfig | None)
cbtime (bool)
microvolts (bool)
cont_buffer_dur (float)
cmp_configs (tuple[ChannelMapSettings, ...])
- Return type:
None
- class CereLinkSignalSource(*args, **kwargs)[source]#
Bases:
BaseProducerUnit[CereLinkSignalSettings,AxisArray,CereLinkSignalProducer]ezmsg Unit that streams one continuous sample-group from a Blackrock device.
- SETTINGS#
alias of
CereLinkSignalSettings
- OUTPUT_DEVICE_STATUS = OutputStream:unlocated[DeviceStatus](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- __init__(*args, **kwargs)[source]#
Initialize an Addressable object.
The name and location are initially None and must be set before the object can be properly addressed. This is achieved through the
_set_name()and_set_location()methods.- Return type:
None
- create_producer()[source]#
Create the producer instance from settings.
Closes the previous producer first (if any), so resources held by it — sockets, file handles, hardware sessions — are released deterministically rather than being left to garbage collection.
- Return type:
None
- shutdown()[source]#
Runs when the Unit terminates.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should clean up resources and perform any necessary shutdown procedures.
- Return type:
None
- class CereLinkSpikeProducer(*args, **kwargs)[source]#
Bases:
_CereLinkBaseProducer[CereLinkSpikeSettings,CereLinkSpikeProducerState]Streams spike events as
AxisArrayof shape[time, ch, unit=7].Time axis is the device’s 30 kHz spike clock;
unitaxis indexes the device convention (0=unsorted, 1..5=sorted, 6=noise — values >5 collapse into the noise bucket because the unit axis has fixed length 7).Emission cadence is regular: every
spike_buffer_durseconds, one window of shape[N_t, n_ch, 7]is emitted. Empty windows are emitted too (downstream wants a steady time axis). The window before the first spike is suppressed — there’s no device-time anchor until then.Spikes whose timestamp lands beyond the current window’s range are dropped (a downstream-backpressure failure mode); in normal operation
_produceruns at the buffer cadence and the buffer is large enough to hold one window of spikes.
- class CereLinkSpikeSettings(device_type=None, configure=None, cbtime=False, microvolts=True, spike_buffer_dur=0.5, cmp_configs=())[source]#
Bases:
SettingsSettings for
CereLinkSpikeSource— emits sparse spike events asAxisArrayof shape[time, ch, unit=7]at the 30 kHz spike clock. Unit indices follow the device convention:0=unsorted, 1..5=sorted, 6=noise (header.type > 5).- Parameters:
device_type (DeviceType | None)
configure (CcfConfig | SliceConfig | None)
cbtime (bool)
microvolts (bool)
spike_buffer_dur (float)
cmp_configs (tuple[ChannelMapSettings, ...])
- device_type: DeviceType | None = None#
Device to connect to.
None= idle.
- configure: CcfConfig | SliceConfig | None = None#
Device configuration this source applies on open.
- cmp_configs: tuple[ChannelMapSettings, ...] = ()#
One
ChannelMapSettingsper headstage applied after connection.
- __init__(device_type=None, configure=None, cbtime=False, microvolts=True, spike_buffer_dur=0.5, cmp_configs=())#
- Parameters:
device_type (DeviceType | None)
configure (CcfConfig | SliceConfig | None)
cbtime (bool)
microvolts (bool)
spike_buffer_dur (float)
cmp_configs (tuple[ChannelMapSettings, ...])
- Return type:
None
- class CereLinkSpikeSource(*args, **kwargs)[source]#
Bases:
BaseProducerUnit[CereLinkSpikeSettings,AxisArray,CereLinkSpikeProducer]ezmsg Unit that streams spike events as AxisArray[time, ch, unit=7].
- SETTINGS#
alias of
CereLinkSpikeSettings
- OUTPUT_DEVICE_STATUS = OutputStream:unlocated[DeviceStatus](self.num_buffers=32, self.force_tcp=None, self.allow_local=None)#
- __init__(*args, **kwargs)[source]#
Initialize an Addressable object.
The name and location are initially None and must be set before the object can be properly addressed. This is achieved through the
_set_name()and_set_location()methods.- Return type:
None
- create_producer()[source]#
Create the producer instance from settings.
Closes the previous producer first (if any), so resources held by it — sockets, file handles, hardware sessions — are released deterministically rather than being left to garbage collection.
- Return type:
None
- shutdown()[source]#
Runs when the Unit terminates.
This is called from within the same process this unit will live in. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.
This method is where you should clean up resources and perform any necessary shutdown procedures.
- Return type:
None
- class CerePlexImpedance(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[CerePlexImpedanceSettings,AxisArray,AxisArray,CerePlexImpedanceProcessor]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
CerePlexImpedanceSettings
- class CerePlexImpedanceProcessor(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[CerePlexImpedanceSettings,AxisArray,AxisArray|None,CerePlexImpedanceState]Stateful transformer that extracts per-channel impedance from a CerePlex sweep.
Expects a stream of
AxisArraymessages with dims["time", "ch"]where the data is in microvolts. When usingCereLinkSignalSource, setmicrovolts=True; raw ADC counts will produce incorrect results.The processor tracks one or more headstages independently (configured via
CerePlexImpedanceSettings.headstage_channel_offsets). Each headstage’s impedance sweep cycles sequentially through its channels: exactly one channel is non-zero at a time while the others read zero. This relies on the device’s internal filtering being disabled — a filter produces small non-zero residuals on idle channels, which defeats the exact-zero exclusivity checks. Misconfiguration there is a sign that the recording chain needs fixing, not that this algorithm should accommodate it.On each impedance update the processor emits an
AxisArraywhose data is a(1, n_ch)array of impedance values in kOhm (NaNfor channels not yet measured).- NONRESET_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset({'freq_hi', 'freq_lo', 'headstage_channel_offsets', 'test_current_nA'})#
- update_settings(new_settings)[source]#
Apply new settings to this processor, requesting a state reset if needed.
Diffs
new_settingsagainst the currentself.settings. If every changed field is listed inNONRESET_SETTINGS_FIELDS,self.settingsis simply rebound and the processor keeps running. Otherwise,_request_reset()is called so the next message triggers a fresh_reset_state(for stateful subclasses). Override for finer-grained control than the class-level allow-list provides.- Parameters:
new_settings (CerePlexImpedanceSettings)
- Return type:
None
- class CerePlexImpedanceSettings(headstage_channel_offsets: tuple[int, ...] = (0,), collect_duration_s: float = 0.1, fft_duration_s: float = 0.09227, freq_lo: float = 960.0, freq_hi: float = 1050.0, test_current_nA: float = 1.0)[source]#
Bases:
Settings- Parameters:
- headstage_channel_offsets: tuple[int, ...] = (0,)#
Starting channel index of each CerePlex headstage. Each headstage’s range extends from its offset to the next offset (or n_ch for the last). Example: two 128-ch headstages → (0, 128).
- collect_duration_s: float = 0.1#
Maximum burst duration to buffer per channel (100 ms for CerePlex).
- fft_duration_s: float = 0.09227#
Duration of data used for FFT, taken from the end of the burst. The preceding samples serve as settle time.
- __init__(headstage_channel_offsets=(0,), collect_duration_s=0.1, fft_duration_s=0.09227, freq_lo=960.0, freq_hi=1050.0, test_current_nA=1.0)#
- class ChannelSelection(*values)[source]#
Bases:
EnumSentinel selections for
SliceConfig.channels(the cases an explicit channel-ID list can’t express).- ALL = 'all'#
Every channel matching
channel_type; the others are disabled.
- ENABLED = 'enabled'#
Only the channels the device already has enabled for this stream; the enabled set is left unchanged. What counts as “enabled” is stream-specific: continuous sample-group membership for the signal source (channels are retuned to its rate/coupling but never enabled/disabled), and spike-extraction state for the spike source (extraction is left exactly as-is, and the source subscribes to whatever already has it on).
- class ChannelMapProcessor(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[ChannelMapUnitSettings,AxisArray,AxisArray,ChannelMapState]Stateful transformer that attaches CMP-derived channel metadata.
Each reset rebuilds the axis in full from
settings.cmp_configs: a base layer from incoming labels, every CMP overlay applied in order, then the auto-grid fills indices no CMP claimed. Reset fires on a channel-count change or anycmp_configschange (the latter viaBaseProcessor.update_settings→_request_reset), so there is no cross-push state to coalesce. An emptycmp_configsyields a pure auto-grid.
- class ChannelMapSettings(filepath: str | None = None, start_chan: int = 1, hs_id: int = 0)[source]#
Bases:
Settings- filepath: str | None = None#
Path to the
.cmpfile.None(or an empty path) means no CMP — the auto-grid fallback generates coordinates for every channel.
- start_chan: int = 1#
1-based channel ID assigned to the first sorted CMP row. Mirrors
pycbsdk.Session.load_channel_map().
- class ChannelMapUnit(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[ChannelMapUnitSettings,AxisArray,AxisArray,ChannelMapProcessor]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
ChannelMapUnitSettings
- class ChannelMapUnitSettings(cmp_configs: tuple[ChannelMapSettings, ...] = ())[source]#
Bases:
Settings- Parameters:
cmp_configs (tuple[ChannelMapSettings, ...])
- cmp_configs: tuple[ChannelMapSettings, ...] = ()#
Per-headstage overlays, applied in order on each reset. Empty (the default) means no CMP — the auto-grid lays out every channel.
- __init__(cmp_configs=())#
- Parameters:
cmp_configs (tuple[ChannelMapSettings, ...])
- Return type:
None
- class DeviceStatus(device_type, success, error='')[source]#
Bases:
objectResult of a settings-driven device switch.
Emitted on
CereLinkSignalSource.OUTPUT_DEVICE_STATUS(and the corresponding spike-source output) after each_areset_stateattempt. The GUI consumes this to confirm or revert its device selection — the snapshot’s settings_changed event fires before the open completes and so cannot distinguish success from failure.- Parameters:
device_type (DeviceType | None)
success (bool)
error (str)
- device_type: DeviceType | None#
- __init__(device_type, success, error='')#
- Parameters:
device_type (DeviceType | None)
success (bool)
error (str)
- Return type:
None
- class DeviceType(*values)[source]#
Bases:
IntEnumDevice type selection.
Values match
cbproto_device_type_t.- LEGACY_NSP = 0#
- NSP = 1#
- HUB1 = 2#
- HUB2 = 3#
- HUB3 = 4#
- NPLAY = 5#
- CUSTOM = 6#
- class SamplingDelayAlignment(*args, settings=None, **kwargs)[source]#
Bases:
BaseTransformerUnit[SamplingDelayAlignmentSettings,AxisArray,AxisArray,SamplingDelayAlignmentTransformer]- Parameters:
settings (Settings | None)
- SETTINGS#
alias of
SamplingDelayAlignmentSettings
- class SamplingDelayAlignmentSettings(bank_size=32, channel_sample_interval=9.696969696969698e-07, filter_len=33, rail_threshold=None)[source]#
Bases:
SettingsSettings for
SamplingDelayAlignmentTransformer.- Parameters:
- bank_size: int = 32#
Channels per simultaneously-started A/D bank. Used to derive each channel’s sweep slot (
c % bank_size) only as a fallback, when the channel axis carries nobank/elecmetadata.
- channel_sample_interval: float = 9.696969696969698e-07#
Seconds between successive channels within a bank.
- filter_len: int = 33#
Sinc FIR length (odd). Bulk delay is
(filter_len-1)//2samples; longer = flatter passband / better near Nyquist, at more latency and compute. Set to0to disable alignment entirely – the transformer becomes a pass-through that returns its input unchanged.
- class SamplingDelayAlignmentTransformer(*args, **kwargs)[source]#
Bases:
BaseStatefulTransformer[SamplingDelayAlignmentSettings,AxisArray,AxisArray,SamplingDelayAlignmentState]Per-channel fractional-delay alignment (see module docstring).
- class SliceConfig(channels=ChannelSelection.ALL, channel_type=ChannelType.FRONTEND, ac_input_coupling=False, enable_spiking=False)[source]#
Bases:
objectProgrammatic per-slice device configuration owned by this source.
The source applies the device state needed to make its subscribed stream produce data on
channels:set_sample_groupfor signal sources,set_spike_extractionfor spike sources, plus AC coupling.Multiple sources can carry disjoint slices for the same device; pycbsdk handles the merge. Overlap with incompatible settings is the user’s responsibility.
- Parameters:
channels (list[int] | ChannelSelection)
channel_type (ChannelType)
ac_input_coupling (bool)
enable_spiking (bool)
- channels: list[int] | ChannelSelection = 'all'#
Which channels this slice targets — one field, three intents:
list[int]— enable exactly these 1-based channel IDs; the other channels ofchannel_typeare disabled. A provided list is always respected.ChannelSelection.ALL(default) — enable every channel matchingchannel_type; others disabled.ChannelSelection.ENABLED— leave the device’s enabled set as-is and only consume it. Signal source: retune the already-streaming channels (disable_others=False, so an unused front-end bank stays off). Spike source: leave spike extraction untouched and subscribe to whatever already has it on (enable_spikingis ignored in this mode). An empty enabled set yields nothing and warns.
- channel_type: ChannelType = 0#
- ac_input_coupling: bool = False#
Enable (True) or disable (False) AC coupling (highpass filter) on this slice. Note: This is applied unconditionally to CereLinkSignalProducer.
- enable_spiking: bool = False#
Enable spike extraction on the selected channels (FRONTEND only). Honored by
CereLinkSpikeSource; ignored by signal sources, and ignored whenchannelsisChannelSelection.ENABLED(which leaves extraction exactly as the device has it).
Modules
CereLink-based source for ezmsg — streams continuous and spike data from Blackrock devices. |
|
CerePlex impedance measurement pipeline. |
|
Attach Blackrock |
|
Align channels sampled at different instants by a sequential A/D. |