Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
**/__pycache__
.cursor/
8 changes: 6 additions & 2 deletions src/trustshell/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,11 @@ def make_request_with_retry(
return {"items": all_items, "total": total_available}


def render_tree_to_string(root: Node) -> str:
"""Return tree as string (for testing and composition)."""
return "\n".join(f"{pre}{node.name}" for pre, _, node in RenderTree(root))


def render_tree(root: Node) -> None:
"""Pretty print a tree using name only"""
for pre, _, node in RenderTree(root):
console.print("%s%s" % (pre, node.name))
console.print(render_tree_to_string(root))
64 changes: 64 additions & 0 deletions src/trustshell/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Data models for trust-products search results."""

from dataclasses import dataclass, field
from typing import Optional


@dataclass(frozen=True)
class Affect:
"""Single affect entry for OSIDB flaw affects."""

ps_update_stream: str
purl: str # shipped_component PURL


@dataclass
class ProductResultRow:
"""Single result: CPE + product info + matched and shipped components."""

cpe: str
ps_update_stream: str
ps_module: Optional[str]
matched_component: str # PURL that matched (important for wildcard search)
shipped_component: (
str # PURL for affects: image-index/arch-specific OCI, or SRPM/binary RPM
)
sbom_ids: list[str] = field(
default_factory=list
) # SBOM IDs from path (root to leaf)


@dataclass
class ProductSearchResult:
"""Flat result model. No tree structure—just product info and components.

results and affects are sorted by (ps_update_stream, purl/shipped_component)
at creation time; consumers can iterate without re-sorting.
"""

results: list[ProductResultRow]
affects: list[Affect]
searched_purl: str

def render(
self,
output: str,
include_modules: bool = True,
cpes: bool = False,
show_sbom_ids: bool = False,
) -> None:
"""Render result to stdout. output is 'text' or 'json'."""
from trustshell import console
from trustshell.renderers import render_json_format, render_tree_format

if output == "text":
console.print(
render_tree_format(
self,
show_module=include_modules,
cpes=cpes,
show_sbom_ids=show_sbom_ids,
)
)
else:
console.print_json(render_json_format(self, include_module=include_modules))
67 changes: 41 additions & 26 deletions src/trustshell/osidb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import subprocess
import sys
import tempfile
from typing import Any
from typing import Any, Union

import click

from trustshell.models import Affect
from requests import HTTPError
from trustshell import console
import osidb_bindings
Expand Down Expand Up @@ -50,11 +52,11 @@ def parse_stream_purl_tuples(tuples_list: list[str]) -> set[tuple[str, str]]:

@staticmethod
def edit_tuples_in_editor(
current_tuples: set[tuple[str, str]],
) -> set[tuple[str, str]]:
current_tuples: Union[list[tuple[str, str]], set[tuple[str, str]]],
) -> list[tuple[str, str]]:
"""
Opens the default text editor for the user to modify the ps_update_stream/purl tuples.
Returns the modified set of tuples.
Returns the modified list of tuples, sorted by (ps_update_stream, purl).
"""
editor = os.environ.get("EDITOR", "vi")
original_content = "\n".join([f"{m},{p}" for m, p in current_tuples])
Expand Down Expand Up @@ -93,14 +95,17 @@ def edit_tuples_in_editor(
modified_lines = [
line.strip() for line in modified_content.splitlines() if line.strip()
]
return OSIDB.parse_stream_purl_tuples(modified_lines)
parsed = OSIDB.parse_stream_purl_tuples(modified_lines)
return sorted(parsed, key=lambda x: (x[0], x[1]))

def _affects_to_tuples(self, affects: list[Affect]) -> list[tuple[str, str]]:
"""Convert list[Affect] to list of (ps_update_stream, purl) tuples for display."""
return [(a.ps_update_stream, a.purl) for a in affects]

def add_affects(self, flaw: Flaw, affects_to_add: set[tuple[str, str]]) -> None:
def add_affects(self, flaw: Flaw, affects_to_add: list[Affect]) -> None:
console.print("Adding affects...")
affects_data: list[dict[str, Any]] = []
for affect in affects_to_add:
ps_update_stream, purl = affect

for ps_update_stream, purl in self._affects_to_tuples(affects_to_add):
osidb_affect = {
"flaw": flaw.uuid,
"embargoed": flaw.embargoed,
Expand All @@ -122,10 +127,11 @@ def add_affects(self, flaw: Flaw, affects_to_add: set[tuple[str, str]]) -> None:
def edit_flaw_affects(
self,
flaw_id: str,
ps_stream_purls: set[tuple[str, str]],
ps_stream_purls: list[Affect],
replace_mode: bool = False,
) -> None:
if not ps_stream_purls:
ps_stream_purls_list = self._affects_to_tuples(ps_stream_purls)
if not ps_stream_purls_list:
console.print("No new affects to add", style="warning")
return

Expand All @@ -147,31 +153,32 @@ def edit_flaw_affects(
if affects_by_state:
for state, affects_list in affects_by_state.items():
console.print(f"State: {state}")
for affect_str in affects_list:
for affect_str in sorted(affects_list, key=lambda x: (x[0], x[1])):
console.print(affect_str)
else:
console.print(" No affects found for this flaw.")
console.print("-----------------------------\n")

console.print("New affects:")
for ps_stream_purl in ps_stream_purls:
for ps_stream_purl in ps_stream_purls_list:
console.print(ps_stream_purl)

# Optionally edit tuples in editor
if click.confirm("Do you want to edit these affects?"):
console.print("Entering editor mode to modify input tuples...")
ps_stream_purls = self.edit_tuples_in_editor(ps_stream_purls)
ps_stream_purls_list = self.edit_tuples_in_editor(ps_stream_purls_list)
console.print("\n--- Modified Tuples from Editor ---")
if ps_stream_purls:
for m, p in ps_stream_purls:
if ps_stream_purls_list:
for m, p in ps_stream_purls_list:
console.print(f" - {m},{p}")
else:
console.print(" (No tuples provided after editing)")
console.print("-----------------------------------\n")

ps_stream_purls_set = set(ps_stream_purls_list)
if not replace_mode:
affects_to_add = (
ps_stream_purls - affects_by_state["NEW"]
ps_stream_purls_set - affects_by_state["NEW"]
) # Only truly new ones
if not affects_to_add:
console.print(
Expand All @@ -180,31 +187,36 @@ def edit_flaw_affects(
return

console.print("\n--- Affects to be ADDED ---")
for affect in affects_to_add:
console.print(f" - {affect[0]},{affect[1]}")
affects_to_add_list = [
Affect(ps_update_stream=ps, purl=p) for ps, p in affects_to_add
]
for affect in affects_to_add_list:
console.print(f" - {affect.ps_update_stream},{affect.purl}")
console.print("---------------------------\n")

click.confirm("Confirm adding the above affects?", abort=True)
self.add_affects(flaw, affects_to_add)
self.add_affects(flaw, affects_to_add_list)

else:
if not affects_by_state["NEW"] and not ps_stream_purls:
if not affects_by_state["NEW"] and not ps_stream_purls_list:
console.print(
"No existing 'NEW' affects to replace and no new affects provided. Nothing to do."
)
return

console.print("\n--- Existing 'NEW' Affects to be REPLACED ---")
if affects_by_state["NEW"]:
for affect in affects_by_state["NEW"]:
for affect in sorted(
affects_by_state["NEW"], key=lambda x: (x[0], x[1])
):
console.print(affect)
else:
console.print(" (No existing affects with state 'NEW')")
console.print("--------------------------------------------\n")

console.print("\n--- New Affects that will REPLACE the above ---")
if ps_stream_purls:
for affect in ps_stream_purls:
if ps_stream_purls_list:
for affect in ps_stream_purls_list:
console.print(affect)
else:
console.print(" (No new affects provided)")
Expand All @@ -228,7 +240,7 @@ def edit_flaw_affects(
existing_uuid, existing_affectedness = existing_value
# Don't delete and re-add existing new affects
if (
existing_key not in ps_stream_purls
existing_key not in ps_stream_purls_set
and existing_affectedness == "NEW"
):
try:
Expand All @@ -242,4 +254,7 @@ def edit_flaw_affects(
exit(1)

# Add any new affects not already on the flaw in NEW state
self.add_affects(flaw, ps_stream_purls)
ps_stream_purls_as_affects = [
Affect(ps_update_stream=ps, purl=p) for ps, p in ps_stream_purls_list
]
self.add_affects(flaw, ps_stream_purls_as_affects)
Loading