diff --git a/imap_processing/hi/hi_goodtimes.py b/imap_processing/hi/hi_goodtimes.py index caadd618b..297da9af3 100644 --- a/imap_processing/hi/hi_goodtimes.py +++ b/imap_processing/hi/hi_goodtimes.py @@ -552,141 +552,213 @@ def mark_bad_times( def get_good_intervals(self) -> np.ndarray: """ - Extract time intervals grouped by contiguous cull flag patterns. + Extract good time intervals grouped by ESA sweep cull patterns. - Merges consecutive MET timestamps that have identical cull_flags patterns - into single intervals. Each interval spans a contiguous time range where - cull flags don't change. - - If cull flags have multiple contiguous regions with different values - (e.g., bins 0-44 good, 45-89 bad), multiple intervals are created for - the same time range, one per contiguous bin region. + Groups consecutive ESA sweeps with identical cull patterns. For each group: + 1. Writes one interval for fully-good ESA steps (all 90 bins good) spanning + bins 0-89, with cull_value indicating the cull code from any bad ESAs. + 2. Writes additional intervals for each good bin region of partially-good + ESA steps, with cull_value indicating the cull code that removed bad bins. Returns ------- numpy.ndarray Structured array with dtype INTERVAL_DTYPE containing: - met_start: First MET timestamp of interval - - met_end: Last MET timestamp of interval + - met_end: Start of next interval (or last MET for final interval) - spin_bin_low: Lowest spin bin in this contiguous region - spin_bin_high: Highest spin bin in this contiguous region - n_bins: Number of bins in this region - - esa_step_mask: Bitmask of ESA steps (1-10) included in interval - - cull_value: Cull flag value for this region (0=good, >0=bad) + - esa_step_mask: Bitmask of good ESA steps (1-10) for this interval + - cull_value: Cull code for ESA steps/bins not included (0 if all good) Notes ----- This is used for generating the Good Times output files per algorithm document Section 2.3.2.5. """ - logger.debug("Extracting time intervals") - met_values = self._obj["met"].values - cull_flags = self._obj["cull_flags"].values - esa_steps = self._obj["esa_step"].values + logger.debug("Extracting good time intervals") + + # Determine which dimension is present (epoch for CDF, met for in-memory) + time_dim = "epoch" if "epoch" in self._obj.dims else "met" + # Get met values + met_values = self._obj["met"].values if len(met_values) == 0: logger.warning("No MET values found, returning empty intervals array") return np.array([], dtype=INTERVAL_DTYPE) - # Group consecutive METs with identical cull patterns - # Each group becomes one or more intervals (one per contiguous bin region) - intervals: list[tuple] = [] + # Add sweep indices as a coordinate + ds = _add_sweep_indices(self._obj) + + # Compare consecutive sweeps using xarray groupby + grouped = list(ds["cull_flags"].groupby("esa_sweep")) - # Start first group - group_start_idx = 0 - current_pattern = cull_flags[0] - # Cast to int to avoid uint8 overflow when esa_step > 8 - esa_step_mask = 1 << int(esa_steps[0] - 1) # Bit i represents ESA step i+1 + # Determine pattern changes by comparing each sweep to the next + # Start with False for first sweep (no previous sweep) + pattern_changes = [False] + for i in range(len(grouped) - 1): + # The grouped list contains tuples (sweep_idx, cull_flags_ds). + # Grab just the cull_flags_ds values for comparison. + cull_curr = grouped[i][1] + cull_next = grouped[i + 1][1] - for met_idx in range(1, len(met_values)): - if np.array_equal(cull_flags[met_idx], current_pattern): - # Same pattern - extend current group - esa_step_mask |= 1 << int(esa_steps[met_idx] - 1) + # Compare shapes first (different lengths = different pattern) + if cull_curr.shape != cull_next.shape: + pattern_changes.append(True) else: - # Different pattern - close current group and start new one - self._add_intervals_for_pattern( - intervals, - met_values[group_start_idx], - met_values[met_idx - 1], - current_pattern, - esa_step_mask, + # Compare cull_flag values only (not coordinates) + pattern_changes.append( + not np.array_equal(cull_curr.values, cull_next.values) ) - # Start new group - group_start_idx = met_idx - current_pattern = cull_flags[met_idx] - esa_step_mask = 1 << int(esa_steps[met_idx] - 1) - - # Close final group - self._add_intervals_for_pattern( - intervals, - met_values[group_start_idx], - met_values[-1], - current_pattern, - esa_step_mask, - ) + # Convert to numpy array and create group IDs + pattern_changes = np.array(pattern_changes, dtype=bool) + + # Use cumsum to create group IDs + group_ids = pattern_changes.cumsum().astype(int) + + # Map group IDs to all time points using the correct dimension + group_coord = np.array([group_ids[int(s)] for s in ds["esa_sweep"].values]) + ds = ds.assign_coords(pattern_group=(time_dim, group_coord)) + + # Group by pattern_group (consecutive identical sweeps only) + intervals: list[tuple] = [] + pattern_groups = list(ds.groupby("pattern_group")) - logger.info(f"Extracted {len(intervals)} time intervals") + for i, (_, pattern_ds) in enumerate(pattern_groups): + # Get met values from the pattern dataset + pattern_met = pattern_ds["met"].values + met_start = float(pattern_met.min()) + + # met_end is start of next group, or max MET of this group if last + if i + 1 < len(pattern_groups): + next_met = pattern_groups[i + 1][1]["met"].values + met_end = float(next_met.min()) + else: + met_end = float(pattern_met.max()) + + # Get first sweep as representative (all sweeps in pattern are identical) + first_sweep_idx = pattern_ds["esa_sweep"].values[0] + first_sweep = pattern_ds.sel( + {time_dim: (pattern_ds["esa_sweep"] == first_sweep_idx)} + ) + + # Generate interval elements for this pattern + intervals.extend( + self._generate_intervals_for_pattern(first_sweep, met_start, met_end) + ) + + logger.info(f"Extracted {len(intervals)} good time intervals") return np.array(intervals, dtype=INTERVAL_DTYPE) - @staticmethod - def _add_intervals_for_pattern( - intervals: list, - met_start: float, - met_end: float, - pattern: np.ndarray, - esa_step_mask: int, - ) -> None: + def _generate_intervals_for_pattern( + self, sweep_ds: xr.Dataset, met_start: float, met_end: float + ) -> list[tuple]: """ - Add interval(s) for a cull_flags pattern, one per contiguous bin region. - - Creates an interval for each contiguous region of bins that share the - same cull value. This includes both good (cull=0) and bad (cull>0) regions. + Generate interval elements for a sweep pattern. Parameters ---------- - intervals : list - List to append interval tuples to. + sweep_ds : xarray.Dataset + Representative sweep. met_start : float - Start MET timestamp. + Start MET for this interval group. met_end : float - End MET timestamp. - pattern : numpy.ndarray - Cull flags pattern for spin bins (90 values). - esa_step_mask : int - Bitmask of ESA steps included in this time range. + End MET for this interval group. + + Returns + ------- + list[tuple] + List of interval tuples matching INTERVAL_DTYPE. """ - # Find contiguous regions of bins with the same cull value - # diff != 0 indicates a change in cull value - changes = np.nonzero(np.diff(pattern) != 0)[0] - - # Build list of (start_bin, end_bin) for each contiguous region - # If no changes, entire range is one region - if len(changes) == 0: - regions = [(0, 89)] - else: - regions = [] - start_bin = 0 - for change_idx in changes: - regions.append((start_bin, change_idx)) - start_bin = change_idx + 1 - # Add final region - regions.append((start_bin, 89)) + all_good_mask = 0 + partial_regions = [] + bad_cull_value = 0 + + # Process each unique ESA step + for esa_step in np.unique(sweep_ds["esa_step"].values): + esa_mask = sweep_ds["esa_step"] == esa_step + cull_pattern = sweep_ds["cull_flags"].values[esa_mask.values][0] + esa_bit = 1 << (int(esa_step) - 1) + + if np.all(cull_pattern == 0): + all_good_mask |= esa_bit + else: + bad_vals = cull_pattern[cull_pattern > 0] + if len(bad_vals) > 0: + # Aggregate all non-zero cull codes for this ESA step so that + # the region cull value reflects every flag that contributed. + region_cull = int(np.bitwise_or.reduce(bad_vals)) + bad_cull_value |= region_cull + else: + region_cull = 0 + + for bin_low, bin_high in self._find_good_bin_regions(cull_pattern): + partial_regions.append( + { + "esa_bit": esa_bit, + "bin_low": bin_low, + "bin_high": bin_high, + "cull_value": region_cull, + } + ) + + # Generate interval elements + elements = [] + + if all_good_mask > 0: + elements.append( + (met_start, met_end, 0, 89, 90, all_good_mask, bad_cull_value) + ) - # Create an interval for each region - for start_bin, end_bin in regions: - cull_value = pattern[start_bin] - n_bins = end_bin - start_bin + 1 - interval = ( - met_start, - met_end, - start_bin, - end_bin, - n_bins, - esa_step_mask, - cull_value, + for region in partial_regions: + n_bins = region["bin_high"] - region["bin_low"] + 1 + elements.append( + ( + met_start, + met_end, + region["bin_low"], + region["bin_high"], + n_bins, + region["esa_bit"], + region["cull_value"], + ) ) - intervals.append(interval) + + return elements + + @staticmethod + def _find_good_bin_regions(cull_pattern: np.ndarray) -> list[tuple[int, int]]: + """ + Find contiguous regions where cull_pattern == 0. + + Parameters + ---------- + cull_pattern : np.ndarray + Array of cull values for 90 spin bins. + + Returns + ------- + list[tuple[int, int]] + List of (start_bin, end_bin) tuples for good regions. + """ + regions: list[tuple[int, int]] = [] + in_good_region = False + start_bin = 0 + + for i, val in enumerate(cull_pattern): + if val == 0 and not in_good_region: + start_bin = i + in_good_region = True + elif val != 0 and in_good_region: + regions.append((start_bin, i - 1)) + in_good_region = False + + if in_good_region: + regions.append((start_bin, 89)) + + return regions def get_cull_statistics(self) -> dict: """ @@ -728,7 +800,7 @@ def write_txt(self, output_path: Path) -> Path: Write time intervals to text file in the format specified by algorithm document. Format per Section 2.3.2.5: - pointing MET_start MET_end`tab`spin_bin_low spin_bin_high sensor`tab` + pointing MET_start MET_end spin_bin_low spin_bin_high sensor esa_steps[10] cull_value The esa_steps field consists of 10 binary values (0 or 1) indicating whether @@ -774,13 +846,13 @@ def write_txt(self, output_path: Path) -> Path: # esa_steps[10] cull_value line = ( f"{pointing:05d} " - f"{int(interval['met_start'])} " - f"{int(interval['met_end'])}\t" - f"{interval['spin_bin_low']} " - f"{interval['spin_bin_high']} " - f"{sensor}\t" - f"{esa_step_flags}\t" - f"{interval['cull_value']}" + f"{interval['met_start']:0.1f} " + f"{interval['met_end']:0.1f} " + f"{interval['spin_bin_low']:2d} " + f"{interval['spin_bin_high']:2d} " + f"{sensor} " + f"{esa_step_flags} " + f"{interval['cull_value']:3d}" ) # TODO: Add rate/sigma values for each ESA step @@ -1186,6 +1258,7 @@ def mark_bad_tdc_cal( goodtimes_ds: xr.Dataset, diagfee: xr.Dataset, cull_code: int = CullCode.BAD_TDC_CAL, + check_tdc_3: bool = False, ) -> None: """ Remove times with failed TDC calibration (DIAG_FEE method). @@ -1210,6 +1283,9 @@ def mark_bad_tdc_cal( - tdc3_cal_ctrl_stat: TDC3 calibration status (bit 1 = success) cull_code : int, optional Cull code to use for marking bad times. Default is CullCode.LOOSE. + check_tdc_3 : bool, optional + Whether to check TDC3 calibration status in addition to TDC1 and TDC2. + Default is False to match original C code behavior. Notes ----- @@ -1238,11 +1314,11 @@ def mark_bad_tdc_cal( # Identify any packets where any of the three TDC calibrations failed. # TDC failure check (bit 1: 1=good, 0=bad) - tdc_failed = ( - ((diagfee["tdc1_cal_ctrl_stat"].values & 2) == 0) - | ((diagfee["tdc2_cal_ctrl_stat"].values & 2) == 0) - | ((diagfee["tdc3_cal_ctrl_stat"].values & 2) == 0) + tdc_failed = ((diagfee["tdc1_cal_ctrl_stat"].values & 2) == 0) | ( + (diagfee["tdc2_cal_ctrl_stat"].values & 2) == 0 ) + if check_tdc_3: + tdc_failed |= (diagfee["tdc3_cal_ctrl_stat"].values & 2) == 0 # Only loop over non-duplicate packets with TDC failures tdc_failed_indices = np.nonzero(~is_duplicate & tdc_failed)[0] @@ -1310,15 +1386,18 @@ def _add_sweep_indices(l1b_de: xr.Dataset) -> xr.Dataset: Parameters ---------- l1b_de : xarray.Dataset - L1B Direct Event dataset. + L1B Direct Event dataset or goodtimes dataset. Returns ------- xarray.Dataset - Dataset with esa_sweep coordinate added on epoch dimension. + Dataset with esa_sweep coordinate added on the time dimension + (either 'epoch' or 'met'). """ sweep_indices = _get_sweep_indices(l1b_de["esa_step"].values) - return l1b_de.assign_coords(esa_sweep=("epoch", sweep_indices)) + # Determine which dimension to use (epoch for CDF data, met for in-memory) + time_dim = "epoch" if "epoch" in l1b_de.dims else "met" + return l1b_de.assign_coords(esa_sweep=(time_dim, sweep_indices)) def _compute_normalized_counts_per_sweep( @@ -2004,6 +2083,11 @@ def _find_event_clusters( # Find transitions: +1 = start of group, -1 = end of group diff = np.diff(padded.astype(int)) starts = np.flatnonzero(diff == 1) + # We need to adjust ends for the shortening from diffs performed. + # The window_spans array has length = n_events - min_events + 1 + # The contiguous diff adds two padding elements and np.diff shortens by 1. + # The result is that we need to add min_events and subtract 2 to get the + # correct end index. ends = np.flatnonzero(diff == -1) + min_events - 2 # Adjust for window size return list(zip(starts.tolist(), ends.tolist(), strict=False)) diff --git a/imap_processing/tests/hi/test_hi_goodtimes.py b/imap_processing/tests/hi/test_hi_goodtimes.py index 19927c84e..545c8c3af 100644 --- a/imap_processing/tests/hi/test_hi_goodtimes.py +++ b/imap_processing/tests/hi/test_hi_goodtimes.py @@ -122,7 +122,7 @@ def test_from_l1b_de_dimensions(self, goodtimes_instance): """Test that dimensions are correct.""" assert "met" in goodtimes_instance.dims assert "spin_bin" in goodtimes_instance.dims - assert goodtimes_instance.dims["spin_bin"] == 90 + assert goodtimes_instance.sizes["spin_bin"] == 90 def test_from_l1b_de_coordinates(self, goodtimes_instance): """Test that coordinates are set correctly.""" @@ -296,40 +296,59 @@ def test_get_good_intervals_all_good(self, goodtimes_instance): """Test getting intervals when all times are good.""" intervals = goodtimes_instance.goodtimes.get_good_intervals() - # When all cull flags are identical (all zeros), should merge into 1 interval - assert len(intervals) == 1 - - # Check interval structure + # With sweep-based grouping, consecutive sweeps with identical patterns + # are merged. The number of intervals depends on sweep structure. + assert len(intervals) >= 1 assert intervals.dtype == INTERVAL_DTYPE + # All intervals should be good (cull_value == 0) + for interval in intervals: + assert interval["cull_value"] == 0 + # All-good intervals have all ESAs marked in bitmask + assert interval["esa_step_mask"] > 0 + def test_get_good_intervals_structure(self, goodtimes_instance): - """Test interval structure and field names.""" + """Test interval structure and attributes.""" intervals = goodtimes_instance.goodtimes.get_good_intervals() - # Check that all fields exist - assert "met_start" in intervals.dtype.names - assert "met_end" in intervals.dtype.names - assert "spin_bin_low" in intervals.dtype.names - assert "spin_bin_high" in intervals.dtype.names - assert "n_bins" in intervals.dtype.names - assert "esa_step_mask" in intervals.dtype.names - assert "cull_value" in intervals.dtype.names + # Check that intervals have the correct dtype + assert intervals.dtype == INTERVAL_DTYPE + + # Check that all required fields exist + required_fields = [ + "met_start", + "met_end", + "spin_bin_low", + "spin_bin_high", + "n_bins", + "esa_step_mask", + "cull_value", + ] + for field in required_fields: + assert field in intervals.dtype.names def test_get_good_intervals_all_good_values(self, goodtimes_instance): """Test interval values when all bins are good.""" intervals = goodtimes_instance.goodtimes.get_good_intervals() - # Single interval spanning all METs with all bins good - assert len(intervals) == 1 - interval = intervals[0] - assert interval["spin_bin_low"] == 0 - assert interval["spin_bin_high"] == 89 - assert interval["n_bins"] == 90 - assert interval["cull_value"] == 0 - # met_start should be first MET, met_end should be last MET + # With sweep-based grouping, we may have multiple intervals + assert len(intervals) >= 1 + + # All intervals should be all-good (cull_value == 0) + for interval in intervals: + assert interval["esa_step_mask"] > 0 + assert interval["cull_value"] == 0 + + # First interval should start at first MET met_values = goodtimes_instance.coords["met"].values - assert interval["met_start"] == met_values[0] - assert interval["met_end"] == met_values[-1] + assert intervals[0]["met_start"] == met_values[0] + + # Last interval's met_end should be the last MET + assert intervals[-1]["met_end"] == met_values[-1] + + # met_end of each interval (except last) should be met_start of next + for i in range(len(intervals) - 1): + assert intervals[i]["met_end"] == intervals[i + 1]["met_start"] def test_get_good_intervals_with_culled_bins(self, goodtimes_instance): """Test intervals when some bins are culled.""" @@ -341,22 +360,18 @@ def test_get_good_intervals_with_culled_bins(self, goodtimes_instance): intervals = goodtimes_instance.goodtimes.get_good_intervals() - # First MET has different pattern, creates separate intervals - # First MET: 2 intervals (bins 0-20 culled, bins 21-89 good) - # Remaining METs: 1 interval (all bins good) - assert len(intervals) == 3 - - # Check first interval (culled bins 0-20) - assert intervals[0]["spin_bin_low"] == 0 - assert intervals[0]["spin_bin_high"] == 20 - assert intervals[0]["n_bins"] == 21 - assert intervals[0]["cull_value"] == CullCode.INCOMPLETE_SPIN + # Only good intervals are output: + # - First sweep has one ESA step with partial cull (bins 21-89 good) + # - Remaining sweeps are fully good (all bins) + # The number of intervals depends on sweep grouping + assert len(intervals) >= 2 - # Check second interval (good bins 21-89) - assert intervals[1]["spin_bin_low"] == 21 - assert intervals[1]["spin_bin_high"] == 89 - assert intervals[1]["n_bins"] == 69 - assert intervals[1]["cull_value"] == 0 + # Check for the partial interval (bins 21-89 good for the culled ESA step) + has_partial = any( + interval["spin_bin_low"] == 21 and interval["spin_bin_high"] == 89 + for interval in intervals + ) + assert has_partial, "Should have at least one partial region with bins 21-89" def test_get_good_intervals_with_gaps(self, goodtimes_instance): """Test intervals when bins have gaps in cull values.""" @@ -368,25 +383,24 @@ def test_get_good_intervals_with_gaps(self, goodtimes_instance): intervals = goodtimes_instance.goodtimes.get_good_intervals() - # First MET has 3 regions (0-19 good, 20-70 culled, 71-89 good) - # Remaining METs merged into 1 interval (all bins good) - assert len(intervals) == 4 - - # First MET intervals should have same met_start == met_end - assert intervals[0]["met_start"] == intervals[0]["met_end"] - assert intervals[1]["met_start"] == intervals[1]["met_end"] - assert intervals[2]["met_start"] == intervals[2]["met_end"] - - # Check the three segments for first MET - assert intervals[0]["spin_bin_low"] == 0 - assert intervals[0]["spin_bin_high"] == 19 - assert intervals[0]["cull_value"] == 0 - assert intervals[1]["spin_bin_low"] == 20 - assert intervals[1]["spin_bin_high"] == 70 - assert intervals[1]["cull_value"] == CullCode.INCOMPLETE_SPIN - assert intervals[2]["spin_bin_low"] == 71 - assert intervals[2]["spin_bin_high"] == 89 - assert intervals[2]["cull_value"] == 0 + # Only good intervals are output: + # - First sweep has one ESA step with partial cull (bins 0-19 and 71-89 good) + # - Remaining sweeps are fully good + # Bad intervals (bins 20-70) are not output + assert len(intervals) >= 3 + + # Check that we have good bin regions for the partial ESA step + # bins 0-19 good + low_good = [ + i for i in intervals if i["spin_bin_low"] == 0 and i["spin_bin_high"] == 19 + ] + assert len(low_good) >= 1 + + # bins 71-89 good + high_good = [ + i for i in intervals if i["spin_bin_low"] == 71 and i["spin_bin_high"] == 89 + ] + assert len(high_good) >= 1 def test_get_good_intervals_all_bins_culled(self, goodtimes_instance): """Test intervals when all bins are culled for a MET.""" @@ -398,17 +412,20 @@ def test_get_good_intervals_all_bins_culled(self, goodtimes_instance): intervals = goodtimes_instance.goodtimes.get_good_intervals() - # Should have 2 intervals: one for culled first MET, one for remaining METs - assert len(intervals) == 2 + # Only good intervals are output - the fully-culled ESA step is not output + # The remaining good ESA steps should be output + assert len(intervals) >= 1 - # First interval is the culled MET - assert intervals[0]["cull_value"] == CullCode.INCOMPLETE_SPIN - assert intervals[0]["spin_bin_low"] == 0 - assert intervals[0]["spin_bin_high"] == 89 + # All output intervals should have good bins (cull_value indicates what + # was culled). Check that we have a fully-good interval (bins 0-89) for + # the good ESA steps + full_good = [ + i for i in intervals if i["spin_bin_low"] == 0 and i["spin_bin_high"] == 89 + ] + assert len(full_good) >= 1 - # Second interval is remaining good METs - assert intervals[1]["cull_value"] == 0 - assert intervals[1]["met_start"] == goodtimes_instance.coords["met"].values[1] + # The cull_value should indicate the cull code for the culled ESA step + assert full_good[0]["cull_value"] == CullCode.INCOMPLETE_SPIN def test_get_good_intervals_empty(self): """Test intervals with empty goodtimes dataset.""" @@ -428,18 +445,23 @@ def test_get_good_intervals_empty(self): assert len(intervals) == 0 def test_get_good_intervals_esa_step_mask(self, goodtimes_instance): - """Test that ESA step mask includes all ESA steps in the interval.""" + """Test that ESA step mask includes ESA steps in each interval.""" intervals = goodtimes_instance.goodtimes.get_good_intervals() - # Single interval should include all ESA steps from all METs - assert len(intervals) == 1 - esa_step_mask = intervals[0]["esa_step_mask"] + # With sweep-based grouping, each interval has its own ESA step mask + assert len(intervals) >= 1 - # Check that the mask has bits set for all unique ESA steps + # Collect all ESA steps across all intervals + all_esa_steps_in_intervals = set() + for interval in intervals: + esa_step_mask = interval["esa_step_mask"] + for bit_position in range(10): # ESA steps 1-10 + if (esa_step_mask >> bit_position) & 1: + all_esa_steps_in_intervals.add(bit_position + 1) + + # All unique ESA steps should be represented across all intervals unique_esa_steps = set(goodtimes_instance["esa_step"].values) - for esa_step in unique_esa_steps: - bit_position = esa_step - 1 # ESA step 1 -> bit 0, etc. - assert (esa_step_mask >> bit_position) & 1 == 1 + assert all_esa_steps_in_intervals == unique_esa_steps class TestGetCullStatistics: @@ -511,8 +533,8 @@ def test_to_txt_format(self, goodtimes_instance, tmp_path): with open(output_path) as f: lines = f.readlines() - # Should have 1 line (all METs merged into single interval) - assert len(lines) == 1 + # With sweep-based grouping, may have multiple intervals + assert len(lines) >= 1 # Check format of first line # Format: pointing met_start met_end bin_low bin_high sensor @@ -521,7 +543,7 @@ def test_to_txt_format(self, goodtimes_instance, tmp_path): assert len(parts) == 17 # 6 base fields + 10 ESA step flags + cull_value assert parts[0] == "00042" # pointing assert parts[5] == "45" # sensor - assert parts[16] == "0" # cull_value (all good) + assert parts[16] == "0" # cull_value (all good, no culled ESA steps) def test_to_txt_values(self, goodtimes_instance, tmp_path): """Test the values in the output file.""" @@ -529,34 +551,42 @@ def test_to_txt_values(self, goodtimes_instance, tmp_path): goodtimes_instance.goodtimes.write_txt(output_path) with open(output_path) as f: - line = f.readline() + lines = f.readlines() - parts = line.strip().split() + # With sweep-based grouping, may have multiple intervals + assert len(lines) >= 1 + + # Check first line format + parts = lines[0].strip().split() # Format: pointing met_start met_end bin_low bin_high sensor # esa_steps[10] cull_value pointing = parts[0] met_start = parts[1] - met_end = parts[2] bin_low = parts[3] bin_high = parts[4] sensor = parts[5] - esa_step_flags = parts[6:16] cull_value = parts[16] assert pointing == "00042" - assert int(met_start) == int(goodtimes_instance.coords["met"].values[0]) - assert int(met_end) == int(goodtimes_instance.coords["met"].values[-1]) + # First interval should start at first MET + assert float(met_start) == goodtimes_instance.coords["met"].values[0] assert int(bin_low) == 0 assert int(bin_high) == 89 assert sensor == "45" - assert cull_value == "0" - - # Check ESA step flags - should have 1s for all unique ESA steps + assert cull_value == "0" # All good, no culled ESA steps + + # Collect all ESA steps across all intervals + all_esa_steps = set() + for line in lines: + parts = line.strip().split() + esa_step_flags = parts[6:16] + for i, flag in enumerate(esa_step_flags): + if flag == "1": + all_esa_steps.add(i + 1) + + # All unique ESA steps should be represented unique_esa_steps = set(goodtimes_instance["esa_step"].values) - for i, flag in enumerate(esa_step_flags): - esa_step = i + 1 # ESA steps are 1-indexed - expected = "1" if esa_step in unique_esa_steps else "0" - assert flag == expected + assert all_esa_steps == unique_esa_steps def test_to_txt_with_culled_bins(self, goodtimes_instance, tmp_path): """Test output when some bins are culled.""" @@ -572,20 +602,29 @@ def test_to_txt_with_culled_bins(self, goodtimes_instance, tmp_path): with open(output_path) as f: lines = f.readlines() - # Should have 3 intervals: culled bins (0-20), good bins (21-89), remaining METs - assert len(lines) == 3 - - # First interval: culled bins 0-20 - parts = lines[0].strip().split() - assert int(parts[3]) == 0 # bin_low - assert int(parts[4]) == 20 # bin_high - assert parts[16] == "1" # cull_value (INCOMPLETE_SPIN) - - # Second interval: good bins 21-89 - parts = lines[1].strip().split() - assert int(parts[3]) == 21 # bin_low - assert int(parts[4]) == 89 # bin_high - assert parts[16] == "0" # cull_value (good) + # Only good intervals are output + # Should have intervals for: + # - Fully good ESA steps (all bins) + # - Partially good ESA step (bins 21-89) + assert len(lines) >= 2 + + # Check for interval with good bins 21-89 (partial) + partial_lines = [ + line + for line in lines + if line.strip().split()[3] == "21" and line.strip().split()[4] == "89" + ] + assert len(partial_lines) >= 1 + # cull_value should indicate what was culled + assert partial_lines[0].strip().split()[16] == "1" + + # Check for fully good intervals (bins 0-89) + full_lines = [ + line + for line in lines + if line.strip().split()[3] == "0" and line.strip().split()[4] == "89" + ] + assert len(full_lines) >= 1 def test_to_txt_with_gaps(self, goodtimes_instance, tmp_path): """Test output when bins have gaps.""" @@ -601,22 +640,38 @@ def test_to_txt_with_gaps(self, goodtimes_instance, tmp_path): with open(output_path) as f: lines = f.readlines() - # Should have 4 lines (3 for first MET with gap pattern, 1 for remaining METs) - assert len(lines) == 4 - - # First three lines should be for same MET (first MET) - parts1 = lines[0].strip().split() - parts2 = lines[1].strip().split() - parts3 = lines[2].strip().split() - assert parts1[1] == parts2[1] == parts3[1] # Same met_start - - # Check the regions: bins 0-19 (good), 20-70 (culled), 71-89 (good) - np.testing.assert_array_equal(parts1[3:5], ["0", "19"]) - assert parts1[16] == "0" - np.testing.assert_array_equal(parts2[3:5], ["20", "70"]) - assert parts2[16] == "1" - np.testing.assert_array_equal(parts3[3:5], ["71", "89"]) - assert parts3[16] == "0" + # Only good intervals are output (no culled intervals) + # Should have intervals for: + # - Fully good ESA steps (all bins) + # - Partially good ESA step (bins 0-19 and 71-89) + assert len(lines) >= 3 + + # Check for good region bins 0-19 + low_good = [ + line + for line in lines + if line.strip().split()[3] == "0" and line.strip().split()[4] == "19" + ] + assert len(low_good) >= 1 + # cull_value should indicate what was culled + assert low_good[0].strip().split()[16] == "1" + + # Check for good region bins 71-89 + high_good = [ + line + for line in lines + if line.strip().split()[3] == "71" and line.strip().split()[4] == "89" + ] + assert len(high_good) >= 1 + assert high_good[0].strip().split()[16] == "1" + + # Check for fully good intervals (bins 0-89) for other ESA steps + full_good = [ + line + for line in lines + if line.strip().split()[3] == "0" and line.strip().split()[4] == "89" + ] + assert len(full_good) >= 1 class TestFinalizeDataset: @@ -785,7 +840,7 @@ def test_finalize_formats_logical_source(self, goodtimes_instance): def test_finalize_preserves_original_dataset(self, goodtimes_instance): """Test that finalize doesn't modify the original dataset.""" - original_dims = set(goodtimes_instance.dims.keys()) + original_dims = set(goodtimes_instance.sizes.keys()) original_coords = set(goodtimes_instance.coords.keys()) with patch("imap_processing.hi.hi_goodtimes.met_to_ttj2000ns") as mock_convert: @@ -797,7 +852,7 @@ def test_finalize_preserves_original_dataset(self, goodtimes_instance): goodtimes_instance.goodtimes.finalize_dataset() # Original should be unchanged - assert set(goodtimes_instance.dims.keys()) == original_dims + assert set(goodtimes_instance.sizes.keys()) == original_dims assert set(goodtimes_instance.coords.keys()) == original_coords assert "epoch" not in goodtimes_instance.coords @@ -1671,7 +1726,12 @@ def test_mark_bad_tdc_cal_tdc3_fails(self, goodtimes_for_tdc): } ) - mark_bad_tdc_cal(goodtimes_for_tdc, diagfee_tdc3_fails) + # Check that setting check_tdc_3=False results in all good values + mark_bad_tdc_cal(goodtimes_for_tdc, diagfee_tdc3_fails, check_tdc_3=False) + assert np.all(goodtimes_for_tdc["cull_flags"].values[0, :] == CullCode.GOOD) + + # Now run with check_tdc_3=True, should mark times from 1050 to 1100 + mark_bad_tdc_cal(goodtimes_for_tdc, diagfee_tdc3_fails, check_tdc_3=True) # TDC3 fails at packet 0 (MET 1000), should mark times from 1000 to 1050 # MET 1000 (index 0) should be culled @@ -2237,7 +2297,7 @@ def test_multiple_sweeps(self): result = _compute_normalized_counts_per_sweep(ds, tof_ab_limit_ns=15) assert len(result["normalized_count"]) == 5 - assert result.dims["esa_sweep"] == 5 + assert result.sizes["esa_sweep"] == 5 class TestStatisticalFilter0: