Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion imap_processing/mag/l1c/interpolation_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def linear_filtered(
input_filtered, vectors_filtered = cic_filter(
input_vectors, input_timestamps, output_timestamps, input_rate, output_rate
)
return linear(vectors_filtered, input_filtered, output_timestamps)
return linear(vectors_filtered, input_filtered, output_timestamps, extrapolate=True)


def quadratic_filtered(
Expand Down
161 changes: 129 additions & 32 deletions imap_processing/mag/l1c/mag_l1c.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

logger = logging.getLogger(__name__)

GAP_TOLERANCE = 0.075


def mag_l1c(
first_input_dataset: xr.Dataset,
Expand Down Expand Up @@ -461,6 +463,8 @@ def interpolate_gaps(
6-7 - compression flags.
"""
burst_epochs = burst_dataset["epoch"].data
norm_epochs = filled_norm_timeline[:, 0]
has_norm_context = np.any(filled_norm_timeline[:, 5] == ModeFlags.NORM.value)
# Exclude range values
burst_vectors = burst_dataset["vectors"].data
# Default to two vectors per second
Expand Down Expand Up @@ -497,14 +501,18 @@ def interpolate_gaps(
burst_buffer = int(required_seconds * burst_rate.value)

burst_start = max(0, burst_gap_start - burst_buffer)
burst_end = min(len(burst_epochs) - 1, burst_gap_end + burst_buffer)
burst_end = min(len(burst_epochs), burst_gap_end + burst_buffer + 1)

gap_timeline = filled_norm_timeline[
(filled_norm_timeline > gap[0]) & (filled_norm_timeline < gap[1])
]
gap_timeline = norm_epochs[(norm_epochs > gap[0]) & (norm_epochs < gap[1])]

short_end = burst_epochs[burst_end - 1]
if not has_norm_context:
# In the burst-only fallback, CIC delay compensation shortens the usable
# filtered range at the trailing edge by roughly one output cadence.
short_end -= int(1e9 / norm_rate.value)

short = (gap_timeline >= burst_epochs[burst_start]) & (
gap_timeline <= burst_epochs[burst_end]
gap_timeline <= short_end
)
num_short = int(short.sum())

Expand All @@ -524,7 +532,7 @@ def interpolate_gaps(

# gaps should not have data in timeline, still check it
for index, timestamp in enumerate(adjusted_gap_timeline):
timeline_index = np.searchsorted(filled_norm_timeline[:, 0], timestamp)
timeline_index = np.searchsorted(norm_epochs, timestamp)
if sum(
filled_norm_timeline[timeline_index, 1:4]
) == 0 and burst_gap_start + index < len(burst_vectors):
Expand All @@ -542,13 +550,12 @@ def interpolate_gaps(
missing_timeline = np.setdiff1d(gap_timeline, adjusted_gap_timeline)

for timestamp in missing_timeline:
timeline_index = np.searchsorted(filled_norm_timeline[:, 0], timestamp)
timeline_index = np.searchsorted(norm_epochs, timestamp)
if filled_norm_timeline[timeline_index, 5] != ModeFlags.MISSING.value:
raise RuntimeError(
"Self-inconsistent data. "
"Gaps not included in final timeline should be missing."
)
np.delete(filled_norm_timeline, timeline_index)

return filled_norm_timeline

Expand All @@ -557,8 +564,8 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
"""
Generate a new timeline from existing, gap-filled timeline and gaps.

The gaps are generated at a .5 second cadence, regardless of the cadence of the
existing data.
The gaps are generated at the cadence implied by the gap rate. If no rate is
provided, a default cadence of 0.5 seconds is used.

Parameters
----------
Expand All @@ -573,7 +580,8 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
numpy.ndarray
The new timeline, filled with the existing data and the generated gaps.
"""
full_timeline: np.ndarray = np.array([])
epoch_data = np.asarray(epoch_data)
full_timeline: np.ndarray = np.array([], dtype=epoch_data.dtype)
last_index = 0
for gap in gaps:
epoch_start_index = np.searchsorted(epoch_data, gap[0], side="left")
Expand All @@ -582,6 +590,7 @@ def generate_timeline(epoch_data: np.ndarray, gaps: np.ndarray) -> np.ndarray:
)
generated_timestamps = generate_missing_timestamps(gap)
if generated_timestamps.size == 0:
last_index = int(np.searchsorted(epoch_data, gap[1], side="left"))
continue

# Remove any generated timestamps that are already in the timeline
Expand Down Expand Up @@ -639,37 +648,48 @@ def find_all_gaps(
specified as (start, end, vector_rate) where start and end both exist in the
timeline.
"""
gaps: np.ndarray = np.zeros((0, 3))
gaps: np.ndarray = np.empty((0, 3), dtype=np.int64)

# TODO: when we go back to the previous file, also retrieve expected
# vectors per second

vecsec_dict = {0: VecSec.TWO_VECS_PER_S.value} | (vecsec_dict or {})

end_index = epoch_data.shape[0]
rate_segments = _find_rate_segments(epoch_data, vecsec_dict)
if rate_segments:
first_rate = rate_segments[0][1]
last_rate = rate_segments[-1][1]
else:
default_rate = next(iter(vecsec_dict.values()))
first_rate = default_rate
last_rate = default_rate

if start_of_day_ns is not None and epoch_data[0] > start_of_day_ns:
# Add a gap from the start of the day to the first timestamp
gaps = np.concatenate(
(gaps, np.array([[start_of_day_ns, epoch_data[0], vecsec_dict[0]]]))
)

for start_time in reversed(sorted(vecsec_dict.keys())):
# Find the start index that is equal to or immediately after start_time
start_index = np.searchsorted(epoch_data, start_time, side="left")
gaps = np.concatenate(
(
find_gaps(
epoch_data[start_index : end_index + 1], vecsec_dict[start_time]
),
gaps,
np.array(
[[start_of_day_ns, epoch_data[0], first_rate]], dtype=np.int64
),
)
)
end_index = start_index

for index, (start_index, vectors_per_second) in enumerate(rate_segments):
next_start_index = (
rate_segments[index + 1][0]
if index + 1 < len(rate_segments)
else epoch_data.shape[0] - 1
)
epoch_slice = epoch_data[start_index : next_start_index + 1]
gaps = np.concatenate((gaps, find_gaps(epoch_slice, vectors_per_second)))

if end_of_day_ns is not None and epoch_data[-1] < end_of_day_ns:
gaps = np.concatenate(
(gaps, np.array([[epoch_data[-1], end_of_day_ns, vecsec_dict[start_time]]]))
(
gaps,
np.array([[epoch_data[-1], end_of_day_ns, last_rate]], dtype=np.int64),
)
)

return gaps
Expand All @@ -696,14 +716,19 @@ def find_gaps(timeline_data: np.ndarray, vectors_per_second: int) -> np.ndarray:
end_gap, as well as vectors_per_second. Start_gap and end_gap both correspond
to points in timeline_data.
"""
if timeline_data.shape[0] < 2:
return np.empty((0, 3), dtype=np.int64)

# Expected difference between timestamps in nanoseconds.
expected_gap = 1 / vectors_per_second * 1e9

diffs = abs(np.diff(timeline_data))

# Gap can be up to 7.5% larger than expected vectors per second due to clock drift
gap_index = np.asarray(diffs - expected_gap > expected_gap * 0.075).nonzero()[0]
output: np.ndarray = np.zeros((len(gap_index), 3))
gap_index = np.asarray(
diffs - expected_gap > expected_gap * GAP_TOLERANCE
).nonzero()[0]
output: np.ndarray = np.zeros((len(gap_index), 3), dtype=np.int64)

for index, gap in enumerate(gap_index):
output[index, :] = [
Expand All @@ -719,8 +744,8 @@ def generate_missing_timestamps(gap: np.ndarray) -> np.ndarray:
"""
Generate a new timeline from input gaps.

Any gaps specified in gaps will be filled with timestamps that are 0.5 seconds
apart.
Any gaps specified in gaps will be filled with timestamps at the gap rate. If the
gap rate is not included, the default cadence is 0.5 seconds.

Parameters
----------
Expand All @@ -734,12 +759,84 @@ def generate_missing_timestamps(gap: np.ndarray) -> np.ndarray:
full_timeline: numpy.ndarray
Completed timeline.
"""
# Generated timestamps should always be 0.5 seconds apart
difference_ns = 0.5 * 1e9
output: np.ndarray = np.arange(gap[0], gap[1], difference_ns)
difference_ns = int(0.5 * 1e9)
if len(gap) > 2:
difference_ns = int(1e9 / int(gap[2]))
output: np.ndarray = np.arange(
int(np.rint(gap[0])),
int(np.rint(gap[1])),
difference_ns,
dtype=np.int64,
)
return output


def _is_expected_rate(timestamp_difference: float, vectors_per_second: int) -> bool:
"""
Determine whether a timestamp spacing matches an expected cadence.

Parameters
----------
timestamp_difference : float
The observed spacing between adjacent timestamps, in nanoseconds.
vectors_per_second : int
The expected number of vectors per second for the cadence being checked.

Returns
-------
bool
True when the observed spacing is within `GAP_TOLERANCE` of the expected
cadence.
"""
expected_gap = 1 / vectors_per_second * 1e9
return abs(timestamp_difference - expected_gap) <= expected_gap * GAP_TOLERANCE


def _find_rate_segments(
epoch_data: np.ndarray, vecsec_dict: dict[int, int]
) -> list[tuple[int, int]]:
"""
Build contiguous rate segments using observed cadence near each transition.

Walk each configured transition backward while the observed cadence already matches
the new rate so gaps stay attached to the correct segment instead of producing
spurious single-sample micro-gaps at delayed Config boundaries.

Parameters
----------
epoch_data : numpy.ndarray
The sorted epoch timestamps for the current timeline, in nanoseconds.
vecsec_dict : dict[int, int]
Mapping of transition start time to expected vectors-per-second rate.

Returns
-------
list[tuple[int, int]]
Pairs of `(start_index, vectors_per_second)` describing contiguous rate
segments in `epoch_data`.
"""
if epoch_data.shape[0] == 0:
return []

segments: list[tuple[int, int]] = []
for start_time, vectors_per_second in sorted(vecsec_dict.items()):
start_index = int(np.searchsorted(epoch_data, start_time, side="left"))
start_index = min(start_index, epoch_data.shape[0] - 1)
lower_bound = segments[-1][0] if segments else 0

while start_index > lower_bound and _is_expected_rate(
epoch_data[start_index] - epoch_data[start_index - 1], vectors_per_second
):
start_index -= 1

if segments and start_index == segments[-1][0]:
segments[-1] = (start_index, vectors_per_second)
else:
segments.append((start_index, vectors_per_second))

return segments


def vectors_per_second_from_string(vecsec_string: str) -> dict:
"""
Extract the vectors per second from a string into a dictionary.
Expand Down
Loading
Loading