Skip to content
22 changes: 21 additions & 1 deletion modelaudit/scanners/pickle_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6882,14 +6882,34 @@ def extract_metadata(self, file_path: str) -> dict[str, Any]:
metadata = super().extract_metadata(file_path)

allow_deserialization = bool(self.config.get("allow_metadata_deserialization"))
metadata_read_cap = 10 * 1024 * 1024

try:
import pickle
import pickletools
from io import BytesIO

max_metadata_read_size = int(self.config.get("max_metadata_pickle_read_size", metadata_read_cap))

if max_metadata_read_size <= 0:
raise ValueError(
f"Invalid pickle metadata read limit: {max_metadata_read_size} (must be greater than 0)"
)
max_metadata_read_size = min(max_metadata_read_size, metadata_read_cap)

file_size = self.get_file_size(file_path)
if file_size > max_metadata_read_size:
raise ValueError(
f"Pickle metadata read limit exceeded: {file_size} bytes (max: {max_metadata_read_size})"
)

with open(file_path, "rb") as f:
pickle_data = f.read()
pickle_data = f.read(max_metadata_read_size + 1)

if len(pickle_data) > max_metadata_read_size:
raise ValueError(
f"Pickle metadata read limit exceeded: {len(pickle_data)} bytes (max: {max_metadata_read_size})"
)

# Analyze pickle structure
metadata.update(
Expand Down
46 changes: 46 additions & 0 deletions tests/test_metadata_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,52 @@ def __reduce__(self):
assert "REDUCE" in metadata.get("dangerous_opcodes", [])
assert metadata.get("has_dangerous_opcodes") is True

@pytest.mark.parametrize(
("limit", "expected_error"),
[
(256, None),
(64, "read limit exceeded"),
(0, "must be greater than 0"),
(-1, "must be greater than 0"),
],
)
def test_pickle_metadata_enforces_read_limit(
self,
tmp_path: Path,
limit: int,
expected_error: str | None,
) -> None:
"""Ensure pickle metadata extraction rejects oversized and invalid read limits."""
from modelaudit.scanners.pickle_scanner import PickleScanner

pkl_file = tmp_path / "oversized.pkl"
pkl_file.write_bytes(b"x" * 128)

scanner = PickleScanner({"max_metadata_pickle_read_size": limit})
metadata = scanner.extract_metadata(str(pkl_file))

if expected_error is None:
assert "extraction_error" not in metadata
assert metadata["pickle_size"] == 128
else:
assert "extraction_error" in metadata
assert expected_error in metadata["extraction_error"]

def test_pickle_metadata_caps_configured_read_limit_at_10_mib(self, tmp_path: Path) -> None:
"""Oversized caller-supplied limits should still be clamped to 10 MiB."""
from modelaudit.scanners.pickle_scanner import PickleScanner

pkl_file = tmp_path / "oversized-cap.pkl"
with pkl_file.open("wb") as handle:
handle.seek(10 * 1024 * 1024)
handle.write(b"x")

scanner = PickleScanner({"max_metadata_pickle_read_size": 20 * 1024 * 1024})
metadata = scanner.extract_metadata(str(pkl_file))

assert "extraction_error" in metadata
assert "max: 10485760" in metadata["extraction_error"]

Comment thread
coderabbitai[bot] marked this conversation as resolved.
def test_pickle_safe_data_no_dangerous_opcodes(self, tmp_path: Path) -> None:
"""Ensure simple data structures don't trigger dangerous opcode detection."""
from modelaudit.scanners.pickle_scanner import PickleScanner
Expand Down
Loading