Skip to content

Commit bdd4c5b

Browse files
committed
feat: cdx vex
1 parent ca00903 commit bdd4c5b

File tree

7 files changed

+546
-1
lines changed

7 files changed

+546
-1
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Generated by Django 5.2.4 on 2025-07-30 12:57
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("vex", "0007_alter_csaf_tracking_current_release_date_and_more"),
10+
]
11+
12+
operations = [
13+
migrations.AlterField(
14+
model_name="vex_document",
15+
name="type",
16+
field=models.CharField(
17+
choices=[("CSAF", "CSAF"), ("OpenVEX", "OpenVEX"), ("CycloneDX", "CycloneDX")], max_length=16
18+
),
19+
),
20+
]
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
from rest_framework.exceptions import ValidationError
5+
6+
from application.core.api.serializers_helpers import validate_purl
7+
from application.vex.models import VEX_Document, VEX_Statement
8+
from application.vex.services.vex_engine import apply_vex_statements_after_import
9+
from application.vex.types import (
10+
CycloneDX_Analysis_State,
11+
VEX_Document_Type,
12+
VEX_Status,
13+
)
14+
15+
16+
@dataclass
17+
class CycloneDX_Analysis:
18+
state: str = ""
19+
justification: str = ""
20+
response: list[str] = None
21+
detail: str = ""
22+
first_issued: str = ""
23+
last_updated: str = ""
24+
25+
def __post_init__(self):
26+
if self.response is None:
27+
self.response = []
28+
29+
30+
def parse_cyclonedx_data(data: dict) -> None:
31+
cyclonedx_document = _create_cyclonedx_document(data)
32+
33+
product_purls, vex_statements = _process_vex_statements(data, cyclonedx_document)
34+
35+
apply_vex_statements_after_import(product_purls, vex_statements)
36+
37+
38+
def _create_cyclonedx_document(data: dict) -> VEX_Document:
39+
document_id = data.get("serialNumber")
40+
if not document_id:
41+
raise ValidationError("serialNumber is missing")
42+
43+
version_value = data.get("version")
44+
if version_value is None:
45+
raise ValidationError("version is missing")
46+
version = str(version_value)
47+
48+
metadata = data.get("metadata", {})
49+
50+
timestamp = metadata.get("timestamp")
51+
if not timestamp:
52+
raise ValidationError("metadata/timestamp is missing")
53+
54+
author = None
55+
# Prefer authors list if available
56+
authors = metadata.get("authors")
57+
if authors and isinstance(authors, list) and len(authors) > 0:
58+
# Find the first author with a name set
59+
author = next((item.get("name") for item in authors
60+
if isinstance(item, dict) and item.get("name")), None)
61+
62+
# Fall back to manufacturer or supplier if no authors
63+
if not author:
64+
author = metadata.get("manufacturer", {}).get("name") or metadata.get("supplier", {}).get("name")
65+
66+
if not author:
67+
raise ValidationError("author is missing")
68+
69+
try:
70+
cyclonedx_document = VEX_Document.objects.get(document_id=document_id, author=author)
71+
cyclonedx_document.delete()
72+
except VEX_Document.DoesNotExist:
73+
pass
74+
75+
cyclonedx_document = VEX_Document.objects.create(
76+
type=VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX,
77+
document_id=document_id,
78+
version=version,
79+
initial_release_date=timestamp,
80+
current_release_date=timestamp,
81+
author=author,
82+
role="",
83+
)
84+
85+
return cyclonedx_document
86+
87+
88+
def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tuple[set[str], set[VEX_Statement]]:
89+
vulnerabilities = data.get("vulnerabilities", [])
90+
if not vulnerabilities:
91+
raise ValidationError("CycloneDX document doesn't contain any vulnerabilities")
92+
if not isinstance(vulnerabilities, list):
93+
raise ValidationError("vulnerabilities is not a list")
94+
95+
components_map = _build_components_map(data)
96+
97+
product_purl = data.get("metadata", {}).get("component", {}).get("purl", "")
98+
if not product_purl:
99+
raise ValidationError("metadata/component/purl is missing")
100+
validate_purl(product_purl)
101+
102+
product_purls: set[str] = set()
103+
vex_statements: set[VEX_Statement] = set()
104+
105+
vulnerability_counter = 0
106+
for vulnerability in vulnerabilities:
107+
if not isinstance(vulnerability, dict):
108+
raise ValidationError(f"vulnerability[{vulnerability_counter}] is not a dictionary")
109+
110+
vulnerability_id = vulnerability.get("id")
111+
if not vulnerability_id:
112+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/id is missing")
113+
114+
analysis = vulnerability.get("analysis", {})
115+
if not analysis:
116+
# Skip vulnerabilities without analysis
117+
vulnerability_counter += 1
118+
continue
119+
120+
cyclonedx_analysis = _parse_analysis(analysis, vulnerability_counter)
121+
122+
vex_status = _map_cyclonedx_state_to_vex_status(cyclonedx_analysis.state)
123+
if not vex_status:
124+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is not valid: {cyclonedx_analysis.state}")
125+
126+
description = vulnerability.get("description", "")
127+
detail = vulnerability.get("detail", "")
128+
if detail:
129+
description += f"\n\n{detail}"
130+
131+
remediation = _build_remediation_text(cyclonedx_analysis.response, vulnerability.get("recommendation", ""))
132+
133+
affects = vulnerability.get("affects", [])
134+
if not affects:
135+
# General statement for the product
136+
_create_vex_statement(
137+
cyclonedx_document,
138+
vulnerability_id,
139+
description,
140+
vex_status,
141+
cyclonedx_analysis.justification,
142+
cyclonedx_analysis.detail,
143+
remediation,
144+
product_purl,
145+
"",
146+
product_purls,
147+
vex_statements,
148+
)
149+
elif not isinstance(affects, list):
150+
raise ValidationError(f"affects[{vulnerability_counter}] is not a list")
151+
else:
152+
_process_affected_components(
153+
cyclonedx_document=cyclonedx_document,
154+
product_purls=product_purls,
155+
vex_statements=vex_statements,
156+
vulnerability_counter=vulnerability_counter,
157+
vulnerability_id=vulnerability_id,
158+
description=description,
159+
vex_status=vex_status,
160+
justification=cyclonedx_analysis.justification,
161+
impact=cyclonedx_analysis.detail,
162+
remediation=remediation,
163+
affects=affects,
164+
components_map=components_map,
165+
product_purl=product_purl,
166+
)
167+
168+
vulnerability_counter += 1
169+
170+
return product_purls, vex_statements
171+
172+
173+
def _build_components_map(data: dict) -> dict[str, dict]:
174+
components_map = {}
175+
176+
# Add root component from metadata
177+
metadata_component = data.get("metadata", {}).get("component")
178+
if metadata_component and metadata_component.get("bom-ref"):
179+
components_map[metadata_component["bom-ref"]] = metadata_component
180+
181+
# Add all components
182+
for component in data.get("components", []):
183+
if component.get("bom-ref"):
184+
components_map[component["bom-ref"]] = component
185+
186+
return components_map
187+
188+
189+
def _parse_analysis(analysis: dict, vulnerability_counter: int) -> CycloneDX_Analysis:
190+
state = analysis.get("state", "")
191+
if not state:
192+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is missing")
193+
194+
justification = analysis.get("justification", "")
195+
response = analysis.get("response", [])
196+
if not isinstance(response, list):
197+
response = []
198+
199+
detail = analysis.get("detail", "")
200+
first_issued = analysis.get("firstIssued", "")
201+
last_updated = analysis.get("lastUpdated", "")
202+
203+
return CycloneDX_Analysis(
204+
state=state,
205+
justification=justification,
206+
response=response,
207+
detail=detail,
208+
first_issued=first_issued,
209+
last_updated=last_updated,
210+
)
211+
212+
213+
def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
214+
mapping = {
215+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED: VEX_Status.VEX_STATUS_FIXED,
216+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED_WITH_PEDIGREE: VEX_Status.VEX_STATUS_FIXED,
217+
CycloneDX_Analysis_State.CYCLONEDX_STATE_EXPLOITABLE: VEX_Status.VEX_STATUS_AFFECTED,
218+
CycloneDX_Analysis_State.CYCLONEDX_STATE_IN_TRIAGE: VEX_Status.VEX_STATUS_UNDER_INVESTIGATION,
219+
CycloneDX_Analysis_State.CYCLONEDX_STATE_FALSE_POSITIVE: VEX_Status.VEX_STATUS_NOT_AFFECTED,
220+
CycloneDX_Analysis_State.CYCLONEDX_STATE_NOT_AFFECTED: VEX_Status.VEX_STATUS_NOT_AFFECTED,
221+
}
222+
return mapping.get(state)
223+
224+
225+
def _build_remediation_text(response: list[str], recommendation: str) -> str:
226+
remediation_parts = []
227+
228+
if response:
229+
response_text = ", ".join(response)
230+
remediation_parts.append(f"Response: {response_text}")
231+
232+
if recommendation:
233+
remediation_parts.append(f"Recommendation: {recommendation}")
234+
235+
return "; ".join(remediation_parts)
236+
237+
238+
def _process_affected_components(
239+
*,
240+
cyclonedx_document: VEX_Document,
241+
product_purls: set,
242+
vex_statements: set,
243+
vulnerability_counter: int,
244+
vulnerability_id: str,
245+
description: str,
246+
vex_status: str,
247+
justification: str,
248+
impact: str,
249+
remediation: str,
250+
affects: list,
251+
components_map: dict,
252+
product_purl: str,
253+
) -> None:
254+
affected_counter = 0
255+
for affected in affects:
256+
if not isinstance(affected, dict):
257+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}] is not a dictionary")
258+
259+
ref = affected.get("ref")
260+
if not ref:
261+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}]/ref is missing")
262+
263+
component = components_map.get(ref)
264+
if not component:
265+
raise ValidationError(
266+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' not found in components"
267+
)
268+
269+
component_purl = component.get("purl", "")
270+
if not component_purl:
271+
raise ValidationError(
272+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' component is missing purl"
273+
)
274+
validate_purl(component_purl)
275+
276+
_create_vex_statement(
277+
cyclonedx_document,
278+
vulnerability_id,
279+
description,
280+
vex_status,
281+
justification,
282+
impact,
283+
remediation,
284+
product_purl,
285+
component_purl,
286+
product_purls,
287+
vex_statements,
288+
)
289+
290+
affected_counter += 1
291+
292+
293+
def _create_vex_statement(
294+
cyclonedx_document: VEX_Document,
295+
vulnerability_id: str,
296+
description: str,
297+
vex_status: str,
298+
justification: str,
299+
impact: str,
300+
remediation: str,
301+
product_purl: str,
302+
component_purl: str,
303+
product_purls: set,
304+
vex_statements: set,
305+
) -> None:
306+
vex_statement = VEX_Statement(
307+
document=cyclonedx_document,
308+
vulnerability_id=vulnerability_id,
309+
description=description,
310+
status=vex_status,
311+
justification=justification,
312+
impact=impact,
313+
remediation=remediation,
314+
product_purl=product_purl,
315+
component_purl=component_purl,
316+
)
317+
vex_statement.save()
318+
vex_statements.add(vex_statement)
319+
product_purls.add(product_purl)

backend/application/vex/services/openvex_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def _process_vex_statements(data: dict, openvex_document: VEX_Document) -> tuple
8282
openvex_statement.vulnerability_id = statement.get("vulnerability", {}).get("name")
8383
if not openvex_statement.vulnerability_id:
8484
raise ValidationError(f"vulnerability[{statement_counter}]/name is missing")
85-
openvex_statement.description = statement.get("vulnerability", {}).get("description")
85+
openvex_statement.description = statement.get("vulnerability", {}).get("description", "")
8686
openvex_statement.status = statement.get("status", "")
8787
if not openvex_statement.status:
8888
raise ValidationError(f"status[{statement_counter}] is missing")

backend/application/vex/services/vex_import.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from rest_framework.exceptions import ValidationError
66

77
from application.vex.services.csaf_parser import parse_csaf_data
8+
from application.vex.services.cyclonedx_parser import parse_cyclonedx_data
89
from application.vex.services.openvex_parser import parse_openvex_data
910
from application.vex.types import VEX_Document_Type
1011

@@ -22,6 +23,8 @@ def import_vex(vex_file: File) -> None:
2223
parse_openvex_data(data)
2324
elif vex_type == VEX_Document_Type.VEX_DOCUMENT_TYPE_CSAF:
2425
parse_csaf_data(data)
26+
elif vex_type == VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX:
27+
parse_cyclonedx_data(data)
2528

2629

2730
def _get_json_data(vex_file: File) -> Optional[dict]:
@@ -40,4 +43,7 @@ def _get_vex_type(data: dict) -> Optional[str]:
4043
if data.get("document", {}).get("category") == "csaf_vex" and data.get("document", {}).get("csaf_version") == "2.0":
4144
return VEX_Document_Type.VEX_DOCUMENT_TYPE_CSAF
4245

46+
if data.get("bomFormat") == "CycloneDX":
47+
return VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX
48+
4349
return None

0 commit comments

Comments
 (0)