Conversation
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
|
I have made the changes as per suggestions. If any other changes are required, do let me know. Looking forward! |
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
|
@TG1999 @keshav-space |
keshav-space
left a comment
There was a problem hiding this comment.
Thanks @Samk1710, see comments below.
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
|
Hi @keshav-space |
keshav-space
left a comment
There was a problem hiding this comment.
Thanks @Samk1710, see the comments below. And also please explain why you need to break the request into individual months in backfill_from_year?
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
Thanks @keshav-space for the review and clarifications. I have implemented all the suggestions. |
|
@Samk1710 here is response to #1 (comment)
This does not explain the need for monthly iteration. In every EUVD advisory we get
Again this is lot of words which absolutely answers nothing. No this does not make logic simpler you are breaking response into months but you are still using API |
|
@keshav-space Regarding datePublished, I went for fetching by dateUpdated, because advisories can be updated even after publication. For example, the above advisory was published in 2009 and updated in 2017. So if we use datePublished we will miss the 2017 update. Also if we keep the dateUpdated fetching and organize by datePublished we have to keep on touching backfill data while daily syncs and is also an extra layer of parsing. Also for fetching we can't do
Hence wee can either do a yearly backfill or a monthly backfill as they would be restartable and have similar API call numbers. I chose monthly as I already mentioned it was how most data sources I laid eyes on were structured. If you have further questions or want some changes in the code implementation kindly let me know. |
Exactly, API response is already paginated so there is no need to do double pagination like chunking response by months or years or anything else.
Use
If concern is that the pipeline may fail due to a network call, then backfill and daily mode are not solving that problem. It can fail even during a daily run if we get a huge number of updates. Moreover there is no built in mechanism to automatically start from the last failure in the next run. Get rid of backfill and daily mode. Instead, we should have something simpler like this: class EUVDAdvisoryMirror(BasePipeline):
url = "https://euvdservices.enisa.europa.eu/api/search"
@classmethod
def steps(cls):
return (
cls.load_checkpoint,
cls.create_session,
cls.collect_new_advisory,
cls.save_checkpoint,
)
This way in first run pipeline will automatically collect all advisories, and in subsequent runs it will collect only incremental updates. In case of a pipeline failure, it will always collect data since the last successful run. |
|
Thanks @keshav-space for the suggestions. I will update the implementation shortly. |
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
|
Hey @keshav-space |
keshav-space
left a comment
There was a problem hiding this comment.
Thanks @Samk1710, see some comments below.
sync_catalog.py
Outdated
| euvd_id = advisory.get("id") | ||
| if not euvd_id: | ||
| self.log(f"Advisory missing id, skipping: {advisory}") | ||
| return | ||
|
|
||
| date_published = advisory.get("datePublished", "") | ||
| dir_path = self.advisory_dir(date_published) | ||
|
|
||
| if dir_path is None: | ||
| dir_path = ADVISORY_PATH / "unpublished" | ||
| self.has_unpublished = True | ||
|
|
||
| dir_path.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| # If an existing unpublished advisory is published now, remove the stale advisory from unpublished directory. | ||
| if self.has_unpublished and dir_path != ADVISORY_PATH / "unpublished": | ||
| stale_advisory = ADVISORY_PATH / "unpublished" / f"{euvd_id}.json" | ||
| if stale_advisory.exists(): | ||
| stale_advisory.unlink() | ||
|
|
||
| # If old advisory is updated, the new data overwrites the existing file. | ||
| with (dir_path / f"{euvd_id}.json").open("w", encoding="utf-8") as f: | ||
| json.dump(advisory, f, indent=2) |
There was a problem hiding this comment.
Let's keep this simple, and use python-dateutil to parse date.
| euvd_id = advisory.get("id") | |
| if not euvd_id: | |
| self.log(f"Advisory missing id, skipping: {advisory}") | |
| return | |
| date_published = advisory.get("datePublished", "") | |
| dir_path = self.advisory_dir(date_published) | |
| if dir_path is None: | |
| dir_path = ADVISORY_PATH / "unpublished" | |
| self.has_unpublished = True | |
| dir_path.mkdir(parents=True, exist_ok=True) | |
| # If an existing unpublished advisory is published now, remove the stale advisory from unpublished directory. | |
| if self.has_unpublished and dir_path != ADVISORY_PATH / "unpublished": | |
| stale_advisory = ADVISORY_PATH / "unpublished" / f"{euvd_id}.json" | |
| if stale_advisory.exists(): | |
| stale_advisory.unlink() | |
| # If old advisory is updated, the new data overwrites the existing file. | |
| with (dir_path / f"{euvd_id}.json").open("w", encoding="utf-8") as f: | |
| json.dump(advisory, f, indent=2) | |
| destination = "unpublished" | |
| euvd_id = advisory["id"] | |
| if published := advisory.get("datePublished"): | |
| date = parse(published) | |
| destination = f"{date.year}/{date.month:02d}" | |
| path = ADVISORIES_PATH / f"{destination}/{euvd_id}.json" | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(advisory, f, indent=2) |
Co-authored-by: Keshav Priyadarshi <git@keshav.space> Signed-off-by: Samk <sampurnapyne1710@gmail.com>
6425109 to
954a6a3
Compare
Co-authored-by: Keshav Priyadarshi <git@keshav.space> Signed-off-by: Samk <sampurnapyne1710@gmail.com>
954a6a3 to
e9578fe
Compare
Signed-off-by: Samk <sampurnapyne1710@gmail.com>
|
Hey @keshav-space Pipeline Log Excerpt python sync_catalog.py
2026-03-11 19:09:26.673 Pipeline [EUVDAdvisoryMirror] starting
2026-03-11 19:09:26.673 Step [load_checkpoint] starting
2026-03-11 19:09:26.673 Step [load_checkpoint] completed in 0 seconds
2026-03-11 19:09:26.673 Step [create_session] starting
2026-03-11 19:09:26.673 Step [create_session] completed in 0 seconds
2026-03-11 19:09:26.674 Step [collect_new_advisory] starting
2026-03-11 19:09:27.570 Collecting 337107 advisories across 3372 pages
2026-03-11 19:18:46.685 Progress: 10% (338/3372) ETA: 5032 seconds (1.4 hours)
2026-03-11 19:32:45.135 Progress: 20% (675/3372) ETA: 5590 seconds (1.6 hours)
2026-03-11 19:49:03.171 Progress: 30% (1012/3372) ETA: 5543 seconds (1.5 hours)
2026-03-11 20:07:26.941 Progress: 40% (1349/3372) ETA: 5219 seconds (1.4 hours)
2026-03-11 20:29:15.445 Progress: 50% (1686/3372) ETA: 4788 seconds (1.3 hours) |
EUVD Mirror Pipeline
The script has two modes:
As of now the PR focuses on the code, once reviewed and approved I can add the locally collected backfill data.