Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ doc/*.nwb
*.plx
*.smr
B95.zip
grouped_ephys
grouped_ephys
uv.lock
.venv
62 changes: 62 additions & 0 deletions neo/rawio/blackrockrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,68 @@ def _get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, strea
sig_chunk = memmap_data[i_start:i_stop, channel_indexes]
return sig_chunk

def _get_blackrock_timestamps(self, block_index, seg_index, i_start, i_stop, stream_index):
"""
Return timestamps in seconds for analog signal samples in the given range.

The behavior depends on the file format:

- **PTP format (FileSpec 3.0-ptp):** Each packet in the file contains
exactly one sample with its own PTP hardware timestamp at nanosecond
resolution. This method returns those actual timestamps converted to
seconds. Because each NSx file (e.g. ns2 at 1kHz, ns6 at 30kHz) stores
its own independent PTP packets, every sample has a real timestamp.
Note that PTP timestamps exhibit natural clock jitter at the
nanosecond scale, so consecutive sample intervals are not perfectly
uniform.

- **Standard formats (FileSpec 2.2, 2.3, 3.0 non-PTP):** Each data
block has a single scalar timestamp for its first sample. All
subsequent samples within the block are interpolated as
``t_start + sample_index / sampling_rate``, assuming uniform spacing.

- **FileSpec 2.1:** No timestamps are stored in the file. All samples
are interpolated from ``t_start=0`` using the sampling rate.

Parameters
----------
block_index : int
Block index.
seg_index : int
Segment index.
i_start : int | None
First sample index. None means 0.
i_stop : int | None
Stop sample index (exclusive). None means end of segment.
stream_index : int
Stream index.

Returns
-------
timestamps : np.ndarray (float64)
Timestamps in seconds for each sample in [i_start, i_stop).
"""
stream_id = self.header["signal_streams"][stream_index]["id"]
nsx_nb = int(stream_id)

# Resolve None to concrete indices
size = self.nsx_datas[nsx_nb][seg_index].shape[0]
i_start = i_start if i_start is not None else 0
i_stop = i_stop if i_stop is not None else size

# Check if this segment has per-sample timestamps (PTP format)
raw_timestamps = self._nsx_data_header[nsx_nb][seg_index]["timestamp"]

if isinstance(raw_timestamps, np.ndarray) and raw_timestamps.size == size:
# PTP: real hardware timestamps
ts_res = float(self._nsx_basic_header[nsx_nb]["timestamp_resolution"])
return raw_timestamps[i_start:i_stop].astype("float64") / ts_res
else:
# Non-PTP: reconstruct from t_start + index / sampling_rate
t_start = self._sigs_t_starts[nsx_nb][seg_index]
sr = self._nsx_sampling_frequency[nsx_nb]
return t_start + np.arange(i_start, i_stop, dtype="float64") / sr

def _spike_count(self, block_index, seg_index, unit_index):
channel_id, unit_id = self.internal_unit_ids[unit_index]

Expand Down
104 changes: 104 additions & 0 deletions neo/test/rawiotest/test_blackrockrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,110 @@ def test_blackrockrawio_ptp_timestamps(self):
# Spikes enabled on channels 1-129 but channel 129 had 0 events.
self.assertEqual(128, reader.spike_channels_count())

def test_get_blackrock_timestamps_reconstruction(self):
"""Test _get_blackrock_timestamps for non-PTP formats (reconstructed from t_start + rate)."""
reader = BlackrockRawIO(filename=self.get_local_path("blackrock/FileSpec2.3001"))
reader.parse_header()

stream_index = 0
size = reader.get_signal_size(0, 0, stream_index)
sr = reader.get_signal_sampling_rate(stream_index)
t_start = reader.get_signal_t_start(0, 0, stream_index)

# Full segment timestamps
timestamps = reader._get_blackrock_timestamps(0, 0, None, None, stream_index)
self.assertEqual(timestamps.shape[0], size)
self.assertEqual(timestamps.dtype, np.float64)

# Verify reconstruction: t_start + index / sr
expected = t_start + np.arange(size, dtype="float64") / sr
np.testing.assert_allclose(timestamps, expected)

def test_get_blackrock_timestamps_ptp(self):
"""Test _get_blackrock_timestamps for PTP format (real hardware timestamps)."""
dirname = self.get_local_path("blackrock/blackrock_3_0_ptp/20231027-125608-001")
reader = BlackrockRawIO(filename=dirname)
reader.parse_header()

nanoseconds_per_second = 1_000_000_000.0

# ns2 stream (stream_index=0, 1 kHz)
stream_index = 0
timestamps = reader._get_blackrock_timestamps(0, 0, None, None, stream_index)
self.assertEqual(timestamps.shape[0], reader.get_signal_size(0, 0, stream_index))
self.assertEqual(timestamps.dtype, np.float64)

# First 5 PTP clock values (nanoseconds since Unix epoch) read directly from the ns2 file and hardcoded here for testing
ptp_clock_ns_ns2 = np.array(
[1688252801327128558, 1688252801328128518, 1688252801329128718,
1688252801330128558, 1688252801331128518], dtype="uint64",
)
expected_ns2 = ptp_clock_ns_ns2.astype("float64") / nanoseconds_per_second
np.testing.assert_array_equal(timestamps[:5], expected_ns2)

# ns6 stream (stream_index=1, 30 kHz) — different sampling rate, different timestamps
stream_index = 1
timestamps_ns6 = reader._get_blackrock_timestamps(0, 0, None, None, stream_index)
self.assertEqual(timestamps_ns6.shape[0], reader.get_signal_size(0, 0, stream_index))
self.assertEqual(timestamps_ns6.dtype, np.float64)

# First 5 PTP clock values (nanoseconds since Unix epoch) read directly from the ns6 file and hardcoded here for testing
ptp_clock_ns_ns6 = np.array(
[1688252801327328740, 1688252801327361940, 1688252801327395260,
1688252801327428620, 1688252801327461940], dtype="uint64",
)
expected_ns6 = ptp_clock_ns_ns6.astype("float64") / nanoseconds_per_second
np.testing.assert_array_equal(timestamps_ns6[:5], expected_ns6)

def test_get_blackrock_timestamps_ptp_with_gaps(self):
"""Test _get_blackrock_timestamps for PTP format with gaps and multiple segments."""
dirname = self.get_local_path("blackrock/blackrock_ptp_with_missing_samples/Hub1-NWBtestfile_neural_wspikes")
gap_tolerance_ms = 0.5
reader = BlackrockRawIO(filename=dirname, nsx_to_load=6, gap_tolerance_ms=gap_tolerance_ms)
reader.parse_header()

n_segments = reader.segment_count(0)
self.assertEqual(n_segments, 3)

nanoseconds_per_second = 1_000_000_000.0

# This file has a single stream: ns6 (30 kHz, 1 channel)
stream_index = 0
expected_sizes = [632, 58, 310]

# First 5 PTP clock values (nanoseconds since Unix epoch) per segment, read directly from file and hardcoded here for testing
first_5_ptp_clock_ns_per_segment = [
np.array([1752531864717743037, 1752531864717776237, 1752531864717809677,
1752531864717843077, 1752531864717876277], dtype="uint64"),
np.array([1752531864739742985, 1752531864739776305, 1752531864739809665,
1752531864739842945, 1752531864739876385], dtype="uint64"),
np.array([1752531864740742986, 1752531864740776306, 1752531864740809626,
1752531864740842946, 1752531864740876346], dtype="uint64"),
]
# Last PTP clock value (nanoseconds since Unix epoch) of each segment, hardcoded here for testing
last_ptp_clock_ns_per_segment = np.array(
[1752531864738776304, 1752531864739709625, 1752531864751042999], dtype="uint64",
)

for seg_index in range(n_segments):
timestamps = reader._get_blackrock_timestamps(0, seg_index, None, None, stream_index)
self.assertEqual(timestamps.shape[0], expected_sizes[seg_index])

# Verify first 5 timestamps match the clock values from the file
expected_first_5 = first_5_ptp_clock_ns_per_segment[seg_index].astype("float64") / nanoseconds_per_second
np.testing.assert_array_equal(timestamps[:5], expected_first_5)

# Verify last timestamp
expected_last = last_ptp_clock_ns_per_segment[seg_index].astype("float64") / nanoseconds_per_second
np.testing.assert_array_equal(timestamps[-1], expected_last)

# Verify the gaps between segments exceed the tolerance
for seg_index in range(n_segments - 1):
last_ts = last_ptp_clock_ns_per_segment[seg_index].astype("float64") / nanoseconds_per_second
first_ts_next = first_5_ptp_clock_ns_per_segment[seg_index + 1][0].astype("float64") / nanoseconds_per_second
gap_ms = (first_ts_next - last_ts) * 1000
self.assertGreater(gap_ms, gap_tolerance_ms)

def test_gap_tolerance_ms_parameter(self):
"""
Test gap_tolerance_ms parameter for gap handling with files that have actual gaps.
Expand Down
Loading