Skip to content

Commit bd5c1f2

Browse files
committed
Add PyPa live importer #1953
* Add PyPa live pipeline importer to fetch advisories affecting a single PURL * Add tests for PyPa live importer Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>
1 parent b560955 commit bd5c1f2

File tree

3 files changed

+291
-0
lines changed

3 files changed

+291
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2
5656
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
5757
from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2
58+
from vulnerabilities.pipelines.v2_importers import pypa_live_importer as pypa_live_importer_v2
5859
from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2
5960
from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2
6061
from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2
@@ -113,3 +114,9 @@
113114
oss_fuzz.OSSFuzzImporter,
114115
]
115116
)
117+
118+
LIVE_IMPORTERS_REGISTRY = create_registry(
119+
[
120+
pypa_live_importer_v2.PyPaLiveImporterPipeline,
121+
]
122+
)
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
11+
from typing import Iterable
12+
13+
import requests
14+
import saneyaml
15+
from packageurl import PackageURL
16+
from univers.versions import PypiVersion
17+
18+
from vulnerabilities.importer import AdvisoryData
19+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
20+
21+
22+
class PyPaLiveImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
23+
"""
24+
Pypa Live Importer Pipeline
25+
26+
Collect advisories from PyPA GitHub repository for a single PURL.
27+
"""
28+
29+
pipeline_id = "pypa_live_importer_v2"
30+
supported_types = ["pypi"]
31+
spdx_license_expression = "CC-BY-4.0"
32+
license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE"
33+
34+
@classmethod
35+
def steps(cls):
36+
return (
37+
cls.get_purl_inputs,
38+
cls.fetch_package_advisories,
39+
cls.collect_and_store_advisories,
40+
)
41+
42+
def get_purl_inputs(self):
43+
purl = self.inputs["purl"]
44+
if not purl:
45+
raise ValueError("PURL is required for PyPaLiveImporterPipeline")
46+
47+
if isinstance(purl, str):
48+
purl = PackageURL.from_string(purl)
49+
50+
if not isinstance(purl, PackageURL):
51+
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")
52+
53+
if purl.type not in self.supported_types:
54+
raise ValueError(
55+
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
56+
)
57+
58+
if not purl.version:
59+
raise ValueError(f"PURL: {purl!s} is expected to have a version")
60+
61+
self.purl = purl
62+
63+
def _is_version_affected(self, advisory_dict, version):
64+
affected = advisory_dict.get("affected", [])
65+
try:
66+
v = PypiVersion(version)
67+
except Exception:
68+
return False
69+
for entry in affected:
70+
ranges = entry.get("ranges", [])
71+
for r in ranges:
72+
events = r.get("events", [])
73+
introduced = None
74+
fixed = None
75+
for event in events:
76+
if "introduced" in event:
77+
introduced = event["introduced"]
78+
if "fixed" in event:
79+
fixed = event["fixed"]
80+
try:
81+
if introduced:
82+
introduced_v = PypiVersion(introduced)
83+
if v < introduced_v:
84+
continue
85+
if fixed:
86+
fixed_v = PypiVersion(fixed)
87+
if v >= fixed_v:
88+
continue
89+
if introduced:
90+
introduced_v = PypiVersion(introduced)
91+
if (not fixed or v < PypiVersion(fixed)) and v >= introduced_v:
92+
return True
93+
except Exception:
94+
continue
95+
return False
96+
97+
def fetch_package_advisories(self):
98+
if not self.purl.type in self.supported_types:
99+
return
100+
101+
search_path = f"vulns/{self.purl.name}"
102+
103+
self.package_advisories = []
104+
105+
api_url = f"https://api.github.com/repos/pypa/advisory-database/contents/{search_path}"
106+
response = requests.get(api_url)
107+
108+
if response.status_code == 404:
109+
self.log(f"No advisories found for package {self.purl.name}")
110+
return
111+
112+
if response.status_code != 200:
113+
self.log(f"Failed to fetch advisories: {response.status_code} {response.text}")
114+
return
115+
116+
for item in response.json():
117+
if item["type"] == "file" and item["name"].endswith(".yaml"):
118+
file_url = item["download_url"]
119+
self.log("Fetching advisory file: " + item["name"])
120+
file_response = requests.get(file_url)
121+
122+
if file_response.status_code == 200:
123+
advisory_text = file_response.text
124+
advisory_dict = saneyaml.load(advisory_text)
125+
126+
if self.purl.version and not self._is_version_affected(
127+
advisory_dict, self.purl.version
128+
):
129+
continue
130+
131+
self.package_advisories.append(
132+
{"text": advisory_text, "dict": advisory_dict, "url": item["html_url"]}
133+
)
134+
135+
def advisories_count(self):
136+
return len(self.package_advisories) if hasattr(self, "package_advisories") else 0
137+
138+
def collect_advisories(self) -> Iterable[AdvisoryData]:
139+
from vulnerabilities.importers.osv import parse_advisory_data_v2
140+
141+
if not hasattr(self, "package_advisories"):
142+
return
143+
144+
for advisory in self.package_advisories:
145+
yield parse_advisory_data_v2(
146+
raw_data=advisory["dict"],
147+
supported_ecosystems=self.supported_types,
148+
advisory_url=advisory["url"],
149+
advisory_text=advisory["text"],
150+
)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from unittest.mock import MagicMock
11+
from unittest.mock import patch
12+
13+
import pytest
14+
import saneyaml
15+
from packageurl import PackageURL
16+
17+
from vulnerabilities.importer import AdvisoryData
18+
19+
20+
@pytest.fixture
21+
def mock_github_api_response():
22+
return {
23+
"status_code": 200,
24+
"json": [
25+
{
26+
"type": "file",
27+
"name": "CVE-2022-1234.yaml",
28+
"download_url": "https://raw.githubusercontent.com/pypa/advisory-database/main/vulns/package1/CVE-2022-1234.yaml",
29+
"html_url": "https://github.com/pypa/advisory-database/blob/main/vulns/package1/CVE-2022-1234.yaml",
30+
},
31+
{
32+
"type": "file",
33+
"name": "CVE-2022-5678.yaml",
34+
"download_url": "https://raw.githubusercontent.com/pypa/advisory-database/main/vulns/package1/CVE-2022-5678.yaml",
35+
"html_url": "https://github.com/pypa/advisory-database/blob/main/vulns/package1/CVE-2022-5678.yaml",
36+
},
37+
],
38+
}
39+
40+
41+
@pytest.fixture
42+
def mock_advisory_files():
43+
advisory1 = {
44+
"id": "CVE-2022-1234",
45+
"summary": "A vulnerability in package1",
46+
"affected": [
47+
{
48+
"package": {"name": "package1", "ecosystem": "PyPI"},
49+
"ranges": [
50+
{"type": "ECOSYSTEM", "events": [{"introduced": "1.0.0"}, {"fixed": "1.2.0"}]}
51+
],
52+
}
53+
],
54+
}
55+
56+
advisory2 = {
57+
"id": "CVE-2022-5678",
58+
"summary": "Another vulnerability in package1",
59+
"affected": [
60+
{
61+
"package": {"name": "package1", "ecosystem": "PyPI"},
62+
"ranges": [
63+
{"type": "ECOSYSTEM", "events": [{"introduced": "1.5.0"}, {"fixed": "1.7.0"}]}
64+
],
65+
}
66+
],
67+
}
68+
69+
return {
70+
"https://raw.githubusercontent.com/pypa/advisory-database/main/vulns/package1/CVE-2022-1234.yaml": advisory1,
71+
"https://raw.githubusercontent.com/pypa/advisory-database/main/vulns/package1/CVE-2022-5678.yaml": advisory2,
72+
}
73+
74+
75+
def test_package_with_version_affected(mock_github_api_response, mock_advisory_files):
76+
from vulnerabilities.pipelines.v2_importers.pypa_live_importer import PyPaLiveImporterPipeline
77+
78+
purl = PackageURL(type="pypi", name="package1", version="1.1.0")
79+
80+
with patch("requests.get") as mock_get:
81+
mock_api_response = MagicMock()
82+
mock_api_response.status_code = mock_github_api_response["status_code"]
83+
mock_api_response.json.return_value = mock_github_api_response["json"]
84+
85+
def mock_get_side_effect(url, *args, **kwargs):
86+
if "api.github.com" in url:
87+
return mock_api_response
88+
89+
mock_file_response = MagicMock()
90+
mock_file_response.status_code = 200
91+
mock_file_response.text = saneyaml.dump(mock_advisory_files[url])
92+
return mock_file_response
93+
94+
mock_get.side_effect = mock_get_side_effect
95+
96+
with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:
97+
98+
def side_effect(raw_data, supported_ecosystems, advisory_url, advisory_text):
99+
return AdvisoryData(
100+
advisory_id=raw_data["id"],
101+
summary=raw_data["summary"],
102+
references_v2=[{"url": advisory_url}],
103+
affected_packages=[],
104+
weaknesses=[],
105+
url=advisory_url,
106+
)
107+
108+
mock_parse.side_effect = side_effect
109+
110+
pipeline = PyPaLiveImporterPipeline(selected_groups=["package_first"], purl=purl)
111+
pipeline.get_purl_inputs()
112+
pipeline.fetch_package_advisories()
113+
advisories = list(pipeline.collect_advisories())
114+
115+
assert len(advisories) == 1
116+
assert advisories[0].advisory_id == "CVE-2022-1234"
117+
118+
119+
def test_nonexistent_package():
120+
from vulnerabilities.pipelines.v2_importers.pypa_live_importer import PyPaLiveImporterPipeline
121+
122+
purl = PackageURL(type="pypi", name="nonexistent_package", version="1.0.0")
123+
124+
with patch("requests.get") as mock_get:
125+
mock_response = MagicMock()
126+
mock_response.status_code = 404
127+
mock_get.return_value = mock_response
128+
129+
pipeline = PyPaLiveImporterPipeline(selected_groups=["package_first"], purl=purl)
130+
pipeline.get_purl_inputs()
131+
pipeline.fetch_package_advisories()
132+
advisories = list(pipeline.collect_advisories())
133+
134+
assert len(advisories) == 0

0 commit comments

Comments
 (0)