Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
[project]
name = "trustshell"
version = "0.2.4"
version = "0.2.5"
description = "Command Line tool for Trustify"
readme = "README.md"
authors = [
{ email = "jason@jasonshepherd.net" }
]
requires-python = ">=3.12"
dependencies = [
"aiohttp>=3.13.3",
"anytree>=2.12.1",
"click>=8.2.1",
"httpx>=0.28.1",
Expand All @@ -20,6 +21,7 @@ dependencies = [
"rich>=14.0.0",
"types-pyyaml>=6.0.12.20250809",
"univers>=30.12.1",
"urllib3>=2.6.3",
]

[project.scripts]
Expand Down
26 changes: 25 additions & 1 deletion src/trustshell/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
import sys
from urllib.parse import urlparse, urlunparse, quote, parse_qs
from urllib.parse import urlparse, urlunparse, quote, parse_qs, urlencode
from typing import Optional, Any

import httpx
Expand Down Expand Up @@ -333,6 +333,12 @@ def make_request_with_retry(
client: httpx.Client, query_params: dict[str, Any], headers: dict[str, str]
) -> httpx.Response:
"""Make HTTP request with 401 retry logic"""
if logger.isEnabledFor(logging.DEBUG):
full_url = (
f"{endpoint}?{urlencode(query_params)}" if query_params else endpoint
)
logger.debug(f"Request URL: {full_url}")

try:
response = client.get(
endpoint, params=query_params, headers=headers, timeout=2400
Expand Down Expand Up @@ -366,17 +372,35 @@ def make_request_with_retry(
return {"items": [], "total": 0}

all_items = first_result.get("items", [])
total_pages = (total_available + limit - 1) // limit

if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Paginated request: {total_available} total items, "
f"{total_pages} page(s), page 1/{total_pages} complete"
)

# Fetch remaining pages sequentially
offset = limit
page_num = 2
while offset < total_available:
page_params = {**base_params, "limit": limit, "offset": offset}
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Fetching page {page_num}/{total_pages} (offset {offset})..."
)
try:
response = make_request_with_retry(client, page_params, auth_header)
result = response.json()
page_items = result.get("items", [])
all_items.extend(page_items)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Page {page_num}/{total_pages} complete "
f"({len(all_items)}/{total_available} items)"
)
offset += limit
page_num += 1
except Exception as e:
logger.error(f"Error fetching page at offset {offset}: {e}")
break
Expand Down
14 changes: 13 additions & 1 deletion src/trustshell/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ def prime_cache(check: bool, debug: bool) -> None:
default=False,
help="Show sbom_ids in text output (tree format).",
)
@click.option(
"--strict",
"-s",
is_flag=True,
default=False,
help="Use strict purl matching (append '@' to the searched purl).",
)
@click.option("--debug", "-d", is_flag=True, help="Debug log level.")
@click.argument(
"purl",
Expand All @@ -160,6 +167,7 @@ def search(
output: str,
show_module: bool,
show_sbom_ids: bool,
strict: bool,
debug: bool,
latest: bool,
cpes: bool,
Expand All @@ -186,6 +194,7 @@ def search(
latest,
show_versions=versions,
include_rpm_containers=include_rpm_containers,
strict=strict,
)
if not ancestor_trees or len(ancestor_trees) == 0:
console.print("No results")
Expand Down Expand Up @@ -378,11 +387,13 @@ def _get_roots(
latest: bool = True,
show_versions: bool = False,
include_rpm_containers: bool = False,
strict: bool = False,
) -> list[ComponentNode]:
"""Look up base_purl ancestors in Trustify

Uses purl~ query which Trustify automatically translates into optimized
field-specific queries (purl:ty, purl:name, purl:namespace, etc.)
When strict=True, appends '@' to the searched purl for stricter matching.
"""

auth_header = {}
Expand All @@ -395,9 +406,10 @@ def _get_roots(
else:
endpoint = ANALYSIS_ENDPOINT

search_purl = f"{base_purl}@" if strict else base_purl
# purl~ is automatically translated by Trustify to field-specific queries
# e.g., purl~pkg:npm/foo becomes purl:ty=npm&purl:name=foo
base_params = {"ancestors": ANCESTOR_COUNT, "q": f"purl~{base_purl}"}
base_params = {"ancestors": ANCESTOR_COUNT, "q": f"purl~{search_purl}"}
ancestors = paginated_trustify_query(
endpoint, base_params, auth_header, component_name=base_purl
)
Expand Down
Loading