Skip to content

Feat/streaming#29

Draft
izzet wants to merge 67 commits intollnl:developfrom
izzet:feat/streaming
Draft

Feat/streaming#29
izzet wants to merge 67 commits intollnl:developfrom
izzet:feat/streaming

Conversation

@izzet
Copy link
Copy Markdown
Collaborator

@izzet izzet commented Sep 2, 2025

No description provided.

izzet and others added 12 commits July 10, 2025 13:54
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Added InputConfig and FileInputConfig classes for handling file inputs.
- Introduced ZMQInputConfig for ZMQ address configuration.
- Refactored output configurations to inherit from FileOutput.
- Added ZMQOutput class for handling streaming outputs to ZMQ addresses.
- Updated the DarshanAnalyzer to use new input handling methods.
- Enhanced DFTracerAnalyzer to support reading from ZMQ streams.
- Added utility functions for handling pandas DataFrames and streaming.
- Introduced new constants for file and host hashes.
- Updated meson.build to include new utility files for pandas and streaming.
@izzet izzet self-assigned this Sep 2, 2025
@izzet izzet added the enhancement New feature or request label Sep 2, 2025
- Updated CI configuration to include 'streaming' in pip install dependencies.
- Refactored DFAnalyzer to remove unused imports and improve code clarity.
- Enhanced the epoch_window_via_dict class in streaming.py to better handle epoch events and added logging for error handling.
- Added new tests for epoch window functionality, ensuring proper buffering and emission of events based on epoch.start and epoch.end signals.
- Created new end-to-end tests for ZMQ analysis pipeline, validating the integration of streaming with the analyzer.
- Removed outdated test_streaming.py and consolidated tests into more relevant files.
- Added a tar.gz file containing real trace data for testing purposes.
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Sep 3, 2025

Codecov Report

❌ Patch coverage is 81.33971% with 117 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.87%. Comparing base (d76b03d) to head (6ea2807).

Files with missing lines Patch % Lines
dfanalyzer/dftracer.py 64.10% 42 Missing ⚠️
dfanalyzer/__main__.py 0.00% 24 Missing ⚠️
dfanalyzer/analyzer.py 86.95% 24 Missing ⚠️
dfanalyzer/utils/streaming.py 80.82% 14 Missing ⚠️
dfanalyzer/output.py 57.14% 6 Missing ⚠️
dfanalyzer/analysis_utils.py 63.63% 4 Missing ⚠️
dfanalyzer/darshan.py 75.00% 2 Missing ⚠️
dfanalyzer/metrics.py 99.22% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##             main      #29       +/-   ##
===========================================
+ Coverage   57.48%   70.87%   +13.39%     
===========================================
  Files          26       27        +1     
  Lines        2164     2414      +250     
===========================================
+ Hits         1244     1711      +467     
+ Misses        920      703      -217     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

izzet added 11 commits September 3, 2025 19:56
- Updated `test_analyzer_dftracer_read_zmq` to use `dftracer_ai_logging_posix_events` instead of `epoch_posix_events`.
- Modified `_test_e2e` in `test_e2e.py` to conditionally add `analyzer.assign_epochs=True` based on the analyzer and preset.
- Changed `test_e2e_zmq` to utilize `dftracer_ai_logging_posix_events` for event data.
- Introduced a new test file `test_metrics.py` with comprehensive tests for `set_main_metrics`, `set_view_metrics`, and `set_cross_layer_metrics` functions, ensuring proper handling of edge cases and metrics calculations.
izzet added 30 commits February 19, 2026 09:53
- Add _detect_fabric_protocol() to detect CXI fabric on Cray EX systems
- Check for /dev/cxi* and /dev/hfi* devices for Slingshot interconnect
- Fall back to TCP for local development
- Support environment variable overrides for testing
Replace streamz-based streaming with a simpler callback architecture using
direct ZMQ sockets. This removes the streamz dependency and provides more
control over the streaming lifecycle.

- Add zmq_io module with open_consumer/open_producer functions
- Refactor analyze_zmq to use output_handler callback instead of Stream
- Extract common streaming logic into _analyze_stream method
- Update ZMQOutput to send results as multipart messages with parquet
- Deprecate read_zmq and postread_zmq methods
- Update tests to use new callback-based approach
- Remove streamz-based streaming tests
…erformance insights

Add a new analysis facts framework that evaluates trace data against configurable
rules to produce structured performance findings. The implementation includes:

- FactEngine and FactRule classes for rule-based fact evaluation
- FactsConfig for enabling/disabling fact generation with configurable options
- New dataclasses: AnalysisFact, FactWindow, FactScope, FactSeverity, FactProvenance,
  FactEnvelope for structured fact representation
- DLIO-specific fact rules (dlio.yaml) for detecting common performance patterns
  (fetch_pressure, compute_dominance, epoch_straggler, rank_imbalance, etc.)
- JSON schema for fact envelope validation
- Integration with Analyzer to evaluate facts from flat views
- Fact envelope output support in ZMQ and Mofka streaming outputs
- Conditional debug output for metric_boundaries and flat_views

The facts system allows users to define custom rules that evaluate trace metrics and
emit structured findings with severity scores, confidence levels, and opportunity tags.
…mprovements

- Add --num-ranks parameter to wait for all ranks' epoch.block events
  before triggering analysis (ensures 1 analysis per epoch, not per rank)
- Add SIGTERM-based graceful shutdown with timestamp-bounded trace drain
- Add drain summary logging (cat_counts, top event names per epoch)
- Fix fact engine NA handling: use numpy fmax/fmin for nullable pd.NA
  values in derived metric max()/min() expressions
- Add facts.evaluate.done info log and analysis_facts count to
  analysis_complete log for pipeline observability
- Publish analysis_facts as JSON envelope to Mofka output topic
- Update streaming tests for multi-rank and fact engine scenarios
…ming

This change replaces the EpochBuffer implementation with a WindowBuffer
that provides more flexible window boundary tracking across multiple
ranks. The new implementation:

- Uses WindowBoundaryTracker to handle overlapping windows and out-of-order
  boundary events from different ranks
- Maintains backward compatibility by preserving the "epoch" field in events
- Adds "window" field for new window-based analysis
- Improves test coverage for window boundary tracking logic
- Updates fact engine to support layer-scoped facts with fillna0 support

The change enables more sophisticated streaming analysis patterns while
maintaining compatibility with existing epoch-based workflows.
- Add step_trigger_name/step_trigger_every/step_trigger_warmup params for
  step-level analysis windows (e.g. analyze every N steps after warmup)
- Add control_window_start_name/control_window_end_name for explicit
  control boundary events from Mofka control topic
- Add trace_drain_grace_ms: after control boundary, wait for lagging trace
  events before analyzing (handles trace/control race condition)
- Track per-rank step counts via control event metadata (step, epoch fields)
- Add fetch_pressure step-level fact rules to dlio.yaml
- WindowBuffer: support step-based windows, per-rank event counting
- Normalize step column in extra_columns alongside epoch
- Config: add step_trigger and drain_grace fields
…n when clause

- Add reader_posix_pressure rules (epoch + step) for TF workloads where
  reader_open_time is not available. Guard with reader_open_guard derived
  metric to prevent conflict with reader_open_pressure on PyTorch.
- Fix Int16 overflow in trange: use fillna(0).astype("Int32") instead of
  astype("Int16") to handle NaN values from metadata events.
- Move fillna0() from when clause to derived_metrics (reader_open_guard)
  because pd.eval() doesn't recognize custom functions in when expressions.
…guards

- Remove epoch/step trigger parameters from analyze_mofka and
  _analyze_mofka_with_control — hardcode window.start/window.stop
- Remove _should_close_on_step_trigger() and step_trigger_counts_by_pid
- Guard json_dict access for ph, cat, ts, name with .get() or key checks
  to handle Mofka events without standard DFTracer fields
…ts, JSON output

  Merge 43 upstream commits bringing hybrid profile support, two-track
  per-process/per-call statistics, JSON output, system metrics, size-layer
  configuration, and set-like derived metric batching into the streaming
  analysis pipeline.

  Key integration points:
  - analyze_file now supports ReadTraceResult with profiles and system_metrics
  - _compute_high_level_metrics adds per-call alias columns (time_sq, size_sq,
    time_call_min/max, size_call_min/max) with dual Dask/pandas paths
  - build_view_rename_map renames _min/_max to _proc_min/_proc_max and
    collapses _call_min_min to _call_min after view-level aggregation
  - derive_call_stats computes _call_mean/_call_std from sum-of-squares
  - size_layers/size_derived_metrics control which layers retain size columns
  - _attach_metadata extracted as static method with dtype coercion guards
  - compute_time_boundaries uses time_proc_max for non-process views
  - Fixed ZeroDivisionError in set_main_metrics with safe division
  - Renamed AnalyzerResultType to AnalysisResult throughout
  - dlio-prev preset restores epoch-level deps (step only in dlio/AI-logging)
… logs

Cast time_granularity * time_resolution to int before floor-division to
avoid float precision issues.  Change trange dtype from Int32 to Int64
to handle large trace timestamps.  Comment out per-event debug logs in
load_objects_dict and normalize_input that flood structured logs at
high event rates.
Add suppresses_tags to AnalysisFact and FactRule so that rules like
compute_dominance can suppress reader_parallelism opportunities.  Add
scope.node field for cross-node scoping.  Add window_index and
_pre_layer_hlm to AnalysisResult for hierarchical HLM publishing.
Add debug logging for derived metric evaluation and rule firing.
Migrate DLIO rules from fetch_data_time to fetch_iter_time metrics.
Add compute_frac < 0.85 guard to all input-side rules so they do not
fire when the workload is compute-dominant.  Add compute_dominance
rule with suppresses_tags for reader_parallelism/dataloader_prefetch.
Add io_contention and pread_contention rules for storage-side signals.
Remove redundant step-level duplicates of epoch rules.
Add stormer.yaml with communication-aware layer definitions.
Add variant configs: dlio-all, dlio-app, dlio-global, dlio-posix-corona,
dlio-posix-lustre3 for cross-layer evaluation experiments.
Add analyze_mofka_global() for hierarchical cross-node analysis:
buffers per-node HLM parquet payloads by window_index, concatenates
when all nodes report, and runs _analyze_hlm with host_hash view.
Add wall-clock timing instrumentation for drain, analysis, and publish
phases.  Add debug logging for flat view data and analysis facts.

Extend MofkaOutput with global HLM publishing via a second ofi+cxi
Mofka producer.  Add source_node tracking to fact envelopes.

Add AnalyzerPresetConfigStormer with communication-aware layer defs
and fetch_block layer for prefetch worker visibility.
Align test fixtures and assertions with the fetch_data_time →
fetch_iter_time metric rename in dlio.yaml.
Add stormer.yaml to meson install_data so the preset is available at
runtime.
Add cross-node outlier detection rules to dlio-global.yaml for the
hierarchical analyzer. R10 detects nodes with elevated fetch fraction
relative to the cluster; R11 detects reader-open tail outliers.
Both fire per-node on the host_hash view. Also add dlio-global.yaml
to meson.build install list.
Align fact_type name with the paper's rule catalog (Table I).
Add bw_saturation rule that fires when per-rank read bandwidth reaches
80% of the site-specific storage ceiling, suppressing further reader
parallelism and prefetch exploration.
Align fact_type with the paper's Table I identifier.
reader_posix_pressure → read_pressure (Table I)
excessive_metadata_access → metadata_dominance (Table I)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants