diff --git a/README.md b/README.md index c0a87ba..a2e627a 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ -Dyana is a sandbox environment using Docker and [Tracee](https://github.com/aquasecurity/tracee) for loading, running and profiling a wide range of files, including machine learning models, ELF executables, Pickle serialized files, Javascripts [and more](https://docs.dreadnode.io/open-source/dyana/topics/loaders). It provides detailed insights into GPU memory usage, filesystem interactions, network requests, and security related events. +Dyana is a sandbox environment using Docker and [Tracee](https://github.com/aquasecurity/tracee) for loading, running and profiling a wide range of files, including machine learning models, SafeTensors model files, ELF executables, Pickle serialized files, Javascripts [and more](https://docs.dreadnode.io/open-source/dyana/topics/loaders). It provides detailed insights into GPU memory usage, filesystem interactions, network requests, and security related events. ## Installation diff --git a/dyana/cli.py b/dyana/cli.py index 5eb92ff..47b6549 100644 --- a/dyana/cli.py +++ b/dyana/cli.py @@ -20,6 +20,7 @@ from dyana.view import ( view_disk_events, view_disk_usage, + view_extra, view_gpus, view_header, view_imports, @@ -139,7 +140,9 @@ def trace( except Exception as e: serr = str(e) if "could not select device driver" in serr and "capabilities: [[gpu]]" in serr: - rich_print(":cross_mark: [bold][red]error:[/] [red]GPUs are not available on this system, run with --no-gpu.[/]") + rich_print( + ":cross_mark: [bold][red]error:[/] [red]GPUs are not available on this system, run with --no-gpu.[/]" + ) else: rich_print(f":cross_mark: [bold][red]error:[/] [red]{e}[/]") @@ -187,3 +190,4 @@ def summary(trace_path: pathlib.Path = typer.Option(help="Path to the trace file view_legacy_extra(trace["run"]) else: view_imports(trace["run"]["stages"]) + view_extra(trace["run"]) diff --git a/dyana/cli_test.py b/dyana/cli_test.py index 16fef17..8bdea9e 100644 --- a/dyana/cli_test.py +++ b/dyana/cli_test.py @@ -177,9 +177,7 @@ def test_trace_runs_and_saves(self, _mock_run: t.Any, tmp_path: t.Any) -> None: @patch("dyana.cli.Tracer.__init__", _noop_tracer_init) @patch( "dyana.cli.Tracer.run_trace", - side_effect=RuntimeError( - "could not select device driver '' with capabilities: [[gpu]]" - ), + side_effect=RuntimeError("could not select device driver '' with capabilities: [[gpu]]"), ) @patch("dyana.cli.Loader.__init__", _noop_loader_init) def test_trace_gpu_error(self, _mock_run: t.Any) -> None: diff --git a/dyana/loaders/base/dyana_test.py b/dyana/loaders/base/dyana_test.py index 0f22347..5f13cc1 100644 --- a/dyana/loaders/base/dyana_test.py +++ b/dyana/loaders/base/dyana_test.py @@ -18,62 +18,48 @@ def setup_method(self) -> None: def test_singleton(self) -> None: with patch("dyana.loaders.base.dyana.Stage.create") as mock_create: - mock_create.return_value = Stage( - name="start", timestamp=0, ram=0, disk=0, network={}, imports={} - ) + mock_create.return_value = Stage(name="start", timestamp=0, ram=0, disk=0, network={}, imports={}) p = Profiler() assert Profiler.instance is p def test_on_stage(self) -> None: with patch("dyana.loaders.base.dyana.Stage.create") as mock_create: - mock_create.return_value = Stage( - name="test", timestamp=0, ram=0, disk=0, network={}, imports={} - ) + mock_create.return_value = Stage(name="test", timestamp=0, ram=0, disk=0, network={}, imports={}) p = Profiler() p.on_stage("after_load") assert len(p._stages) == 2 def test_track_error(self) -> None: with patch("dyana.loaders.base.dyana.Stage.create") as mock_create: - mock_create.return_value = Stage( - name="start", timestamp=0, ram=0, disk=0, network={}, imports={} - ) + mock_create.return_value = Stage(name="start", timestamp=0, ram=0, disk=0, network={}, imports={}) p = Profiler() p.track_error("loader", "something broke") assert p._errors == {"loader": "something broke"} def test_track_warning(self) -> None: with patch("dyana.loaders.base.dyana.Stage.create") as mock_create: - mock_create.return_value = Stage( - name="start", timestamp=0, ram=0, disk=0, network={}, imports={} - ) + mock_create.return_value = Stage(name="start", timestamp=0, ram=0, disk=0, network={}, imports={}) p = Profiler() p.track_warning("pip", "could not import") assert p._warnings == {"pip": "could not import"} def test_track_extra(self) -> None: with patch("dyana.loaders.base.dyana.Stage.create") as mock_create: - mock_create.return_value = Stage( - name="start", timestamp=0, ram=0, disk=0, network={}, imports={} - ) + mock_create.return_value = Stage(name="start", timestamp=0, ram=0, disk=0, network={}, imports={}) p = Profiler() p.track_extra("imports", {"os": "/usr/lib"}) assert p._extra == {"imports": {"os": "/usr/lib"}} def test_track(self) -> None: with patch("dyana.loaders.base.dyana.Stage.create") as mock_create: - mock_create.return_value = Stage( - name="start", timestamp=0, ram=0, disk=0, network={}, imports={} - ) + mock_create.return_value = Stage(name="start", timestamp=0, ram=0, disk=0, network={}, imports={}) p = Profiler() p.track("custom_key", "custom_value") assert p._additionals == {"custom_key": "custom_value"} def test_as_dict(self) -> None: with patch("dyana.loaders.base.dyana.Stage.create") as mock_create: - mock_create.return_value = Stage( - name="start", timestamp=0, ram=0, disk=0, network={}, imports={} - ) + mock_create.return_value = Stage(name="start", timestamp=0, ram=0, disk=0, network={}, imports={}) p = Profiler() p.track_error("err", "msg") result = p.as_dict() @@ -85,9 +71,7 @@ def test_as_dict(self) -> None: def test_flush(self, capsys: t.Any) -> None: with patch("dyana.loaders.base.dyana.Stage.create") as mock_create: - mock_create.return_value = Stage( - name="start", timestamp=0, ram=0, disk=0, network={}, imports={} - ) + mock_create.return_value = Stage(name="start", timestamp=0, ram=0, disk=0, network={}, imports={}) Profiler() Profiler.flush() captured = capsys.readouterr() @@ -151,7 +135,9 @@ def test_with_prev_imports(self) -> None: patch("dyana.loaders.base.dyana.get_peak_rss", return_value=1024), patch("dyana.loaders.base.dyana.get_disk_usage", return_value=2048), patch("dyana.loaders.base.dyana.get_network_stats", return_value={}), - patch("dyana.loaders.base.dyana.get_current_imports", return_value={"os": "/a", "sys": "/b", "new_mod": "/c"}), + patch( + "dyana.loaders.base.dyana.get_current_imports", return_value={"os": "/a", "sys": "/b", "new_mod": "/c"} + ), ): stage = Stage.create("test", prev_imports={"os": "/a", "sys": "/b"}) assert "new_mod" in stage.imports diff --git a/dyana/loaders/loader.py b/dyana/loaders/loader.py index e430ffd..222b7f4 100644 --- a/dyana/loaders/loader.py +++ b/dyana/loaders/loader.py @@ -184,7 +184,9 @@ def run(self, allow_network: bool = False, allow_gpus: bool = True, allow_volume rich_print(":popcorn: [bold]loader[/]: [yellow]required bridged network access[/]") elif allow_network: - rich_print(":popcorn: [bold]loader[/]: [yellow]warning: allowing bridged network access to the container[/]") + rich_print( + ":popcorn: [bold]loader[/]: [yellow]warning: allowing bridged network access to the container[/]" + ) if allow_volume_write: rich_print(":popcorn: [bold]loader[/]: [yellow]warning: allowing volume write to the container[/]") diff --git a/dyana/loaders/safetensors/.gitignore b/dyana/loaders/safetensors/.gitignore new file mode 100644 index 0000000..3d1264e --- /dev/null +++ b/dyana/loaders/safetensors/.gitignore @@ -0,0 +1,3 @@ +dyana.py +dyana-requirements.txt +dyana-requirements-gpu.txt diff --git a/dyana/loaders/safetensors/Dockerfile b/dyana/loaders/safetensors/Dockerfile new file mode 100644 index 0000000..02c691c --- /dev/null +++ b/dyana/loaders/safetensors/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y build-essential +COPY dyana.py . +COPY dyana-requirements.txt . +RUN pip install --no-cache-dir --root-user-action=ignore -r dyana-requirements.txt + +COPY requirements.txt . +RUN pip install --no-cache-dir --root-user-action=ignore -r requirements.txt + +COPY main.py . + +ENTRYPOINT ["python3", "-W", "ignore", "main.py"] diff --git a/dyana/loaders/safetensors/main.py b/dyana/loaders/safetensors/main.py new file mode 100644 index 0000000..95a1dea --- /dev/null +++ b/dyana/loaders/safetensors/main.py @@ -0,0 +1,309 @@ +from __future__ import annotations + +import argparse +import json +import math +import os +import re +import struct +import typing as t + +DTYPE_SIZES: dict[str, int] = { + "F16": 2, + "F32": 4, + "BF16": 2, + "F64": 8, + "I8": 1, + "I16": 2, + "I32": 4, + "I64": 8, + "U8": 1, + "BOOL": 1, +} + +MAX_HEADER_SIZE = 100 * 1024 * 1024 # 100MB + + +def validate_header(path: str) -> dict[str, t.Any]: + errors: list[str] = [] + header_length = 0 + file_size = 0 + header_json: dict[str, t.Any] = {} + + try: + file_size = os.path.getsize(path) + except OSError as e: + errors.append(f"cannot read file: {e}") + return { + "header_length": 0, + "file_size": 0, + "header_json": {}, + "errors": errors, + } + + if file_size < 8: + errors.append(f"file too small ({file_size} bytes), cannot read header length") + return { + "header_length": 0, + "file_size": file_size, + "header_json": {}, + "errors": errors, + } + + with open(path, "rb") as f: + header_length_bytes = f.read(8) + header_length = struct.unpack(" MAX_HEADER_SIZE: + errors.append(f"header length ({header_length:,} bytes) exceeds 100MB limit — possible header bomb") + return { + "header_length": header_length, + "file_size": file_size, + "header_json": {}, + "errors": errors, + } + + if header_length + 8 > file_size: + errors.append(f"header extends past end of file (header_length={header_length}, file_size={file_size})") + return { + "header_length": header_length, + "file_size": file_size, + "header_json": {}, + "errors": errors, + } + + header_bytes = f.read(header_length) + try: + header_json = json.loads(header_bytes) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + errors.append(f"invalid JSON in header: {e}") + return { + "header_length": header_length, + "file_size": file_size, + "header_json": {}, + "errors": errors, + } + + return { + "header_length": header_length, + "file_size": file_size, + "header_json": header_json, + "errors": errors, + } + + +def analyze_tensors(header: dict[str, t.Any], data_start: int, file_size: int) -> dict[str, t.Any]: + errors: list[str] = [] + warnings: list[str] = [] + info: list[str] = [] + + tensor_entries: list[dict[str, t.Any]] = [] + dtype_distribution: dict[str, int] = {} + total_data_bytes = 0 + + data_size = file_size - data_start + + # collect all tensor entries (skip __metadata__) + for name, entry in header.items(): + if name == "__metadata__": + continue + + if not isinstance(entry, dict): + errors.append(f"tensor '{name}': entry is not a dict") + continue + + dtype = entry.get("dtype") + shape = entry.get("shape") + offsets = entry.get("data_offsets") + + # validate dtype + if dtype not in DTYPE_SIZES: + errors.append(f"tensor '{name}': unknown dtype '{dtype}'") + continue + + # validate shape + if not isinstance(shape, list) or not all(isinstance(s, int) and s >= 0 for s in shape): + errors.append(f"tensor '{name}': invalid shape {shape}") + continue + + # validate offsets + if not isinstance(offsets, list) or len(offsets) != 2 or not all(isinstance(o, int) for o in offsets): + errors.append(f"tensor '{name}': invalid data_offsets {offsets}") + continue + + begin, end = offsets + if begin > end: + errors.append(f"tensor '{name}': begin offset ({begin}) > end offset ({end})") + continue + + if end > data_size: + errors.append(f"tensor '{name}': data_offsets [{begin}, {end}] exceed data section size ({data_size})") + + # check shape/size consistency + expected_size = math.prod(shape) * DTYPE_SIZES[dtype] if shape else 0 + actual_size = end - begin + if expected_size != actual_size: + warnings.append( + f"tensor '{name}': shape {shape} with dtype {dtype} expects {expected_size} bytes, " + f"but data_offsets span {actual_size} bytes" + ) + + dtype_distribution[dtype] = dtype_distribution.get(dtype, 0) + 1 + total_data_bytes += actual_size + tensor_entries.append( + { + "name": name, + "dtype": dtype, + "shape": shape, + "begin": begin, + "end": end, + } + ) + + # detect overlapping byte ranges + sorted_tensors = sorted(tensor_entries, key=lambda t: t["begin"]) + for i in range(len(sorted_tensors) - 1): + curr = sorted_tensors[i] + nxt = sorted_tensors[i + 1] + if curr["end"] > nxt["begin"]: + errors.append( + f"overlapping tensors: '{curr['name']}' [{curr['begin']}:{curr['end']}] " + f"overlaps with '{nxt['name']}' [{nxt['begin']}:{nxt['end']}]" + ) + + # detect gaps + if sorted_tensors: + if sorted_tensors[0]["begin"] > 0: + info.append(f"gap of {sorted_tensors[0]['begin']} bytes at start of data section") + for i in range(len(sorted_tensors) - 1): + gap = sorted_tensors[i + 1]["begin"] - sorted_tensors[i]["end"] + if gap > 0: + info.append( + f"gap of {gap} bytes between '{sorted_tensors[i]['name']}' and '{sorted_tensors[i + 1]['name']}'" + ) + last_end = sorted_tensors[-1]["end"] + if last_end < data_size: + info.append(f"gap of {data_size - last_end} bytes at end of data section") + + # sample tensors (first 10) + sample_tensors = [{"name": t["name"], "dtype": t["dtype"], "shape": t["shape"]} for t in tensor_entries[:10]] + + return { + "count": len(tensor_entries), + "total_data_bytes": total_data_bytes, + "dtype_distribution": dtype_distribution, + "sample_tensors": sample_tensors, + "errors": errors, + "warnings": warnings, + "info": info, + } + + +def analyze_metadata(header: dict[str, t.Any]) -> dict[str, t.Any]: + warnings: list[str] = [] + info: list[str] = [] + + metadata = header.get("__metadata__") + if metadata is None: + return {"metadata": None, "warnings": warnings, "info": info} + + if not isinstance(metadata, dict): + warnings.append(f"__metadata__ is not a dict (got {type(metadata).__name__})") + return {"metadata": None, "warnings": warnings, "info": info} + + for key, value in metadata.items(): + if not isinstance(value, str): + warnings.append(f"metadata key '{key}': value is {type(value).__name__}, expected string") + continue + + if len(value) > 10240: + warnings.append(f"metadata key '{key}': very long value ({len(value)} chars)") + + if re.search(r"https?://", value): + info.append(f"metadata key '{key}': contains URL") + + if re.search(r"^[A-Za-z0-9+/]{100,}={0,2}$", value): + info.append(f"metadata key '{key}': value looks like base64-encoded data") + + return {"metadata": dict(metadata), "warnings": warnings, "info": info} + + +if __name__ == "__main__": + from dyana import Profiler # type: ignore[attr-defined] + + parser = argparse.ArgumentParser(description="Analyze SafeTensors files for structural integrity") + parser.add_argument("--safetensors", help="Path to SafeTensors file", required=True) + args = parser.parse_args() + profiler: Profiler = Profiler(gpu=False) + + if not os.path.exists(args.safetensors): + profiler.track_error("safetensors", "SafeTensors file not found") + else: + # Stage 1: validate structure + profiler.on_stage("validating_structure") + result = validate_header(args.safetensors) + + for error in result["errors"]: + profiler.track_error("structure", error) + + # Stage 2: parse header + profiler.on_stage("parsing_header") + header_json = result["header_json"] + data_start = 8 + result["header_length"] + + profiler.track_extra( + "file_structure", + { + "header_length": result["header_length"], + "file_size": result["file_size"], + "data_section_size": result["file_size"] - data_start if result["header_length"] > 0 else 0, + "header_valid": len(result["errors"]) == 0, + }, + ) + + if header_json: + # Stage 3: analyze tensors + profiler.on_stage("analyzing_tensors") + tensor_result = analyze_tensors(header_json, data_start, result["file_size"]) + + for error in tensor_result["errors"]: + profiler.track_error("tensor", error) + for warning in tensor_result["warnings"]: + profiler.track_warning("tensor", warning) + + profiler.track_extra( + "tensor_summary", + { + "count": tensor_result["count"], + "total_data_bytes": tensor_result["total_data_bytes"], + "dtype_distribution": tensor_result["dtype_distribution"], + "sample_tensors": tensor_result["sample_tensors"], + }, + ) + + # Stage 4: analyze metadata + profiler.on_stage("analyzing_metadata") + meta_result = analyze_metadata(header_json) + + for warning in meta_result["warnings"]: + profiler.track_warning("metadata", warning) + + if meta_result["metadata"]: + profiler.track_extra("metadata", meta_result["metadata"]) + + # Collect all findings + findings: dict[str, list[str]] = { + "errors": result["errors"] + tensor_result["errors"], + "warnings": tensor_result["warnings"] + meta_result["warnings"], + "info": tensor_result["info"] + meta_result["info"], + } + profiler.track_extra("findings", findings) diff --git a/dyana/loaders/safetensors/requirements.txt b/dyana/loaders/safetensors/requirements.txt new file mode 100644 index 0000000..5756220 --- /dev/null +++ b/dyana/loaders/safetensors/requirements.txt @@ -0,0 +1 @@ +safetensors==0.5.3 diff --git a/dyana/loaders/safetensors/safetensors_test.py b/dyana/loaders/safetensors/safetensors_test.py new file mode 100644 index 0000000..3e44c4a --- /dev/null +++ b/dyana/loaders/safetensors/safetensors_test.py @@ -0,0 +1,233 @@ +from __future__ import annotations + +import json +import struct +import typing as t +from pathlib import Path + +from dyana.loaders.loader import Loader +from dyana.loaders.safetensors.main import ( + analyze_metadata, + analyze_tensors, + validate_header, +) + + +def _make_safetensors(path: Path, header: dict[str, t.Any], data: bytes = b"") -> None: + header_bytes = json.dumps(header).encode("utf-8") + header_length = len(header_bytes) + with open(path, "wb") as f: + f.write(struct.pack(" None: + loader = Loader(name="safetensors", build=False) + assert loader.settings is not None + assert loader.settings.gpu is False + + def test_correct_arg_structure(self) -> None: + loader = Loader(name="safetensors", build=False) + assert loader.settings is not None + assert loader.settings.args is not None + assert len(loader.settings.args) == 1 + assert loader.settings.args[0].name == "safetensors" + assert loader.settings.args[0].required is True + assert loader.settings.args[0].volume is True + + +class TestValidateHeader: + def test_valid_minimal(self, tmp_path: Path) -> None: + path = tmp_path / "test.safetensors" + header: dict[str, t.Any] = { + "weight": { + "dtype": "F32", + "shape": [2, 3], + "data_offsets": [0, 24], + } + } + _make_safetensors(path, header, data=b"\x00" * 24) + result = validate_header(str(path)) + assert result["errors"] == [] + assert result["header_json"] == header + assert result["file_size"] > 0 + assert result["header_length"] > 0 + + def test_header_bomb(self, tmp_path: Path) -> None: + path = tmp_path / "bomb.safetensors" + with open(path, "wb") as f: + # header_length = 200MB + f.write(struct.pack(" 0 + assert "100MB" in result["errors"][0] + + def test_header_past_eof(self, tmp_path: Path) -> None: + path = tmp_path / "truncated.safetensors" + with open(path, "wb") as f: + f.write(struct.pack(" 0 + assert "past end of file" in result["errors"][0] + + def test_invalid_json(self, tmp_path: Path) -> None: + path = tmp_path / "badjson.safetensors" + bad_json = b"not json at all!" + with open(path, "wb") as f: + f.write(struct.pack(" 0 + assert "invalid JSON" in result["errors"][0] + + def test_empty_file(self, tmp_path: Path) -> None: + path = tmp_path / "empty.safetensors" + path.write_bytes(b"") + result = validate_header(str(path)) + assert len(result["errors"]) > 0 + assert "too small" in result["errors"][0] + + def test_zero_length_header(self, tmp_path: Path) -> None: + path = tmp_path / "zero.safetensors" + with open(path, "wb") as f: + f.write(struct.pack(" 0 + assert "zero" in result["errors"][0] + + +class TestAnalyzeTensors: + def test_valid_tensors(self) -> None: + header: dict[str, t.Any] = { + "weight": { + "dtype": "F32", + "shape": [2, 3], + "data_offsets": [0, 24], + }, + "bias": { + "dtype": "F32", + "shape": [3], + "data_offsets": [24, 36], + }, + } + result = analyze_tensors(header, data_start=100, file_size=136) + assert result["count"] == 2 + assert result["total_data_bytes"] == 36 + assert result["errors"] == [] + assert result["warnings"] == [] + + def test_unknown_dtype(self) -> None: + header: dict[str, t.Any] = { + "weight": { + "dtype": "BFLOAT256", + "shape": [2, 3], + "data_offsets": [0, 24], + } + } + result = analyze_tensors(header, data_start=100, file_size=124) + assert len(result["errors"]) > 0 + assert "unknown dtype" in result["errors"][0] + + def test_overlapping_offsets(self) -> None: + header: dict[str, t.Any] = { + "a": { + "dtype": "F32", + "shape": [10], + "data_offsets": [0, 40], + }, + "b": { + "dtype": "F32", + "shape": [10], + "data_offsets": [20, 60], + }, + } + result = analyze_tensors(header, data_start=100, file_size=160) + assert any("overlapping" in e for e in result["errors"]) + + def test_out_of_bounds_offsets(self) -> None: + header: dict[str, t.Any] = { + "weight": { + "dtype": "F32", + "shape": [2, 3], + "data_offsets": [0, 24], + } + } + # file_size too small to contain data + result = analyze_tensors(header, data_start=100, file_size=110) + assert any("exceed" in e for e in result["errors"]) + + def test_shape_size_mismatch(self) -> None: + header: dict[str, t.Any] = { + "weight": { + "dtype": "F32", + "shape": [2, 3], + "data_offsets": [0, 100], # should be 24 + } + } + result = analyze_tensors(header, data_start=100, file_size=200) + assert len(result["warnings"]) > 0 + assert "expects" in result["warnings"][0] + + def test_zero_dimension_tensor(self) -> None: + header: dict[str, t.Any] = { + "empty": { + "dtype": "F32", + "shape": [0], + "data_offsets": [0, 0], + } + } + result = analyze_tensors(header, data_start=100, file_size=100) + assert result["count"] == 1 + assert result["errors"] == [] + + def test_empty_header(self) -> None: + result = analyze_tensors({}, data_start=100, file_size=100) + assert result["count"] == 0 + assert result["errors"] == [] + + def test_metadata_skipped(self) -> None: + header: dict[str, t.Any] = { + "__metadata__": {"format": "pt"}, + "weight": { + "dtype": "F32", + "shape": [4], + "data_offsets": [0, 16], + }, + } + result = analyze_tensors(header, data_start=100, file_size=116) + assert result["count"] == 1 + + +class TestAnalyzeMetadata: + def test_no_metadata(self) -> None: + result = analyze_metadata({"weight": {"dtype": "F32"}}) + assert result["metadata"] is None + assert result["warnings"] == [] + + def test_clean_metadata(self) -> None: + header: dict[str, t.Any] = {"__metadata__": {"format": "pt", "framework": "pytorch"}} + result = analyze_metadata(header) + assert result["metadata"] == {"format": "pt", "framework": "pytorch"} + assert result["warnings"] == [] + assert result["info"] == [] + + def test_non_string_values(self) -> None: + header: dict[str, t.Any] = {"__metadata__": {"count": 42}} + result = analyze_metadata(header) + assert len(result["warnings"]) > 0 + assert "expected string" in result["warnings"][0] + + def test_very_long_values(self) -> None: + header: dict[str, t.Any] = {"__metadata__": {"blob": "x" * 20000}} + result = analyze_metadata(header) + assert len(result["warnings"]) > 0 + assert "very long" in result["warnings"][0] + + def test_suspicious_url(self) -> None: + header: dict[str, t.Any] = {"__metadata__": {"source": "https://evil.example.com/payload"}} + result = analyze_metadata(header) + assert any("URL" in i for i in result["info"]) diff --git a/dyana/loaders/safetensors/settings.yml b/dyana/loaders/safetensors/settings.yml new file mode 100644 index 0000000..9a785c8 --- /dev/null +++ b/dyana/loaders/safetensors/settings.yml @@ -0,0 +1,16 @@ +description: Analyzes SafeTensors model files for structural integrity and security issues. + +gpu: false + +args: + - name: safetensors + description: Path to the SafeTensors file to analyze. + required: true + volume: true + +examples: + - description: "Analyze a SafeTensors file:" + command: dyana trace --loader safetensors --safetensors /path/to/model.safetensors + + - description: "Analyze with verbose output:" + command: dyana trace --loader safetensors --safetensors /path/to/model.safetensors --verbose diff --git a/dyana/view.py b/dyana/view.py index aed6a9b..de2a944 100644 --- a/dyana/view.py +++ b/dyana/view.py @@ -20,7 +20,9 @@ def _view_loader_help_markdown(loader: Loader) -> None: rich_print() rich_print("* **Requires Network:**", "yes" if loader.settings.network else "no") if loader.settings.build_args: - rich_print("* **Optional Build Arguments:**", ", ".join({f"`--{k}`" for k in loader.settings.build_args.keys()})) + rich_print( + "* **Optional Build Arguments:**", ", ".join({f"`--{k}`" for k in loader.settings.build_args.keys()}) + ) if loader.settings.args: rich_print() @@ -34,7 +36,9 @@ def _view_loader_help_markdown(loader: Loader) -> None: "|--------------|---------------------------------------------------------------------|------------------------------|----------|" ) for arg in loader.settings.args: - rich_print(f"| `--{arg.name}` | {arg.description} | `{arg.default}` | {'yes' if arg.required else 'no'} |") + rich_print( + f"| `--{arg.name}` | {arg.description} | `{arg.default}` | {'yes' if arg.required else 'no'} |" + ) if loader.settings.examples: rich_print() @@ -333,7 +337,7 @@ def view_network_events(trace: dict[str, t.Any]) -> None: else: data = [arg["value"] for arg in event["args"] if arg["name"] == "proto_dns"][0] question_names = [q["name"] for q in data["questions"]] - answers = [f'{a["name"]}={a["IP"]}' for a in data["answers"]] + answers = [f"{a['name']}={a['IP']}" for a in data["answers"]] if not answers: line = f" * [[dim]{event['processId']}[/]] {event['processName']} | [bold red]dns[/] | question={', '.join(question_names)}" @@ -422,3 +426,65 @@ def view_security_events(trace: dict[str, t.Any]) -> None: rich_print(f" * {signature} ([dim]{category}[/], {severity_fmt(severity_level)})") rich_print() + + +def _view_safetensors_extra(extra: dict[str, t.Any]) -> None: + file_structure = extra.get("file_structure") + if file_structure: + rich_print("[bold yellow]File Structure:[/]") + rich_print(f" Header length : {sizeof_fmt(file_structure['header_length'])}") + rich_print(f" File size : {sizeof_fmt(file_structure['file_size'])}") + rich_print(f" Data section : {sizeof_fmt(file_structure['data_section_size'])}") + valid_str = "[green]yes[/]" if file_structure["header_valid"] else "[red]no[/]" + rich_print(f" Header valid : {valid_str}") + rich_print() + + tensor_summary = extra.get("tensor_summary") + if tensor_summary: + rich_print("[bold yellow]Tensor Summary:[/]") + rich_print(f" Count : {tensor_summary['count']}") + rich_print(f" Total data : {sizeof_fmt(tensor_summary['total_data_bytes'])}") + if tensor_summary["dtype_distribution"]: + dist = ", ".join( + f"{dtype}: {count}" for dtype, count in sorted(tensor_summary["dtype_distribution"].items()) + ) + rich_print(f" Dtypes : {dist}") + rich_print() + + if tensor_summary.get("sample_tensors"): + rich_print("[bold yellow]Sample Tensors:[/]") + for tensor in tensor_summary["sample_tensors"]: + shape_str = "x".join(str(s) for s in tensor["shape"]) + rich_print(f" * [green]{tensor['name']}[/] : {tensor['dtype']} [{shape_str}]") + rich_print() + + metadata = extra.get("metadata") + if metadata: + rich_print("[bold yellow]Metadata:[/]") + for key, value in metadata.items(): + display_value = value if len(str(value)) <= 80 else str(value)[:77] + "..." + rich_print(f" * [green]{key}[/] : {display_value}") + rich_print() + + findings = extra.get("findings") + if findings: + has_any = findings.get("errors") or findings.get("warnings") or findings.get("info") + if has_any: + rich_print("[bold yellow]Findings:[/]") + for error in findings.get("errors", []): + rich_print(f" * [bold red]ERROR[/] : {error}") + for warning in findings.get("warnings", []): + rich_print(f" * [yellow]WARNING[/] : {warning}") + for info_msg in findings.get("info", []): + rich_print(f" * [dim]INFO[/] : {info_msg}") + rich_print() + + +def view_extra(run: dict[str, t.Any]) -> None: + extra = run.get("extra") + if not extra: + return + + loader_name = run.get("loader_name", "") + if loader_name == "safetensors": + _view_safetensors_extra(extra) diff --git a/dyana/view_test.py b/dyana/view_test.py index 02f95c1..eaa8049 100644 --- a/dyana/view_test.py +++ b/dyana/view_test.py @@ -5,6 +5,7 @@ severity_fmt, view_disk_events, view_disk_usage, + view_extra, view_header, view_network_events, view_process_executions, @@ -284,9 +285,7 @@ def test_dedup(self) -> None: "processId": 1, "processName": "curl", "syscall": "connect", - "args": [ - {"name": "remote_addr", "value": {"sa_family": "AF_INET", "sin_addr": "1.2.3.4", "sin_port": 80}} - ], + "args": [{"name": "remote_addr", "value": {"sa_family": "AF_INET", "sin_addr": "1.2.3.4", "sin_port": 80}}], } trace: dict[str, t.Any] = {"events": [event, {**event, "timestamp": 2000}]} with patch("dyana.view.rich_print") as mock_print: @@ -434,3 +433,101 @@ def test_basic(self) -> None: assert "Disk Usage" in output assert "start" in output assert "end" in output + + +class TestViewExtra: + def test_no_extra(self) -> None: + run: dict[str, t.Any] = {"extra": None, "loader_name": "safetensors"} + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + mock_print.assert_not_called() + + def test_empty_extra(self) -> None: + run: dict[str, t.Any] = {"extra": {}, "loader_name": "safetensors"} + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + mock_print.assert_not_called() + + def test_safetensors_file_structure(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "safetensors", + "extra": { + "file_structure": { + "header_length": 256, + "file_size": 1048576, + "data_section_size": 1048312, + "header_valid": True, + } + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "File Structure" in output + assert "Header length" in output + assert "yes" in output + + def test_safetensors_tensor_summary(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "safetensors", + "extra": { + "tensor_summary": { + "count": 42, + "total_data_bytes": 1048576, + "dtype_distribution": {"F32": 30, "F16": 12}, + "sample_tensors": [ + {"name": "layer.0.weight", "dtype": "F32", "shape": [768, 768]}, + ], + } + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "Tensor Summary" in output + assert "42" in output + assert "F32" in output + assert "layer.0.weight" in output + assert "768x768" in output + + def test_safetensors_findings(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "safetensors", + "extra": { + "findings": { + "errors": ["overlapping tensors"], + "warnings": ["size mismatch"], + "info": ["gap detected"], + } + }, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "ERROR" in output + assert "overlapping tensors" in output + assert "WARNING" in output + assert "size mismatch" in output + assert "INFO" in output + assert "gap detected" in output + + def test_safetensors_metadata(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "safetensors", + "extra": {"metadata": {"format": "pt", "framework": "pytorch"}}, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + output = " ".join(str(c) for c in mock_print.call_args_list) + assert "Metadata" in output + assert "format" in output + assert "pytorch" in output + + def test_unknown_loader_no_output(self) -> None: + run: dict[str, t.Any] = { + "loader_name": "unknown_loader", + "extra": {"some_key": "some_value"}, + } + with patch("dyana.view.rich_print") as mock_print: + view_extra(run) + mock_print.assert_not_called()