From 029b464432d41eea417d70df96d1b67f6bcd3c27 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Mon, 30 Mar 2026 15:46:16 -0300 Subject: [PATCH 1/4] Add support for serving superseded publications for a retention period Introduces a DistributedPublication model to track which publications are associated with each distribution. When a new publication becomes the latest for a repository, older ones are retained and served by the content handler for a configurable grace period, allowing clients to continue fetching from superseded publications without interruption. Handles publications with pass-trough=True|False. The behavior is sumarized: Given: - CA=ContentArtifact, - PA=PublishedAritfact - PU=Publication - RV=RepositoryVersion - DP=DistributedPublication - DT=Distribution Then CA is retrieved with: - pass-through: CA<-PA<-PU<-DP->DT - no-pass-through: CA<-RV<-PU<-DP->DT Closes: #7514 Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/scripts/before_install.sh | 2 +- CHANGES/7514.feature | 1 + .../api/test_distributed_publication.py | 138 +++++++++++ .../migrations/0149_distributedpublication.py | 31 +++ pulpcore/app/models/__init__.py | 2 + pulpcore/app/models/publication.py | 140 +++++++++++- pulpcore/app/settings.py | 4 + pulpcore/content/handler.py | 11 + .../api/using_plugin/test_content_cache.py | 4 +- pulpcore/tests/functional/utils.py | 9 + .../unit/models/test_publication_retention.py | 215 ++++++++++++++++++ template_config.yml | 1 + 12 files changed, 555 insertions(+), 3 deletions(-) create mode 100644 CHANGES/7514.feature create mode 100644 pulp_file/tests/functional/api/test_distributed_publication.py create mode 100644 pulpcore/app/migrations/0149_distributedpublication.py create mode 100644 pulpcore/tests/unit/models/test_publication_retention.py diff --git a/.github/workflows/scripts/before_install.sh b/.github/workflows/scripts/before_install.sh index 95a1ba503cb..60e2c8a9385 100755 --- a/.github/workflows/scripts/before_install.sh +++ b/.github/workflows/scripts/before_install.sh @@ -50,7 +50,7 @@ legacy_component_name: "pulpcore" component_name: "core" component_version: "${COMPONENT_VERSION}" pulp_env: {"PULP_CA_BUNDLE": "/etc/pulp/certs/pulp_webserver.crt"} -pulp_settings: {"allowed_export_paths": ["/tmp"], "allowed_import_paths": ["/tmp"], "api_root": "/pulp/", "content_path_prefix": "/somewhere/else/", "csrf_trusted_origins": ["https://pulp:443"], "orphan_protection_time": 0, "task_diagnostics": ["memory"], "task_protection_time": 10, "tmpfile_protection_time": 10, "upload_protection_time": 10} +pulp_settings: {"allowed_export_paths": ["/tmp"], "allowed_import_paths": ["/tmp"], "api_root": "/pulp/", "content_path_prefix": "/somewhere/else/", "csrf_trusted_origins": ["https://pulp:443"], "distributed_publication_retention_period": 3, "orphan_protection_time": 0, "task_diagnostics": ["memory"], "task_protection_time": 10, "tmpfile_protection_time": 10, "upload_protection_time": 10} pulp_scheme: "https" image: name: "pulp" diff --git a/CHANGES/7514.feature b/CHANGES/7514.feature new file mode 100644 index 00000000000..c0c939e9edb --- /dev/null +++ b/CHANGES/7514.feature @@ -0,0 +1 @@ +Added support for serving superseded publications for a configurable retention period. diff --git a/pulp_file/tests/functional/api/test_distributed_publication.py b/pulp_file/tests/functional/api/test_distributed_publication.py new file mode 100644 index 00000000000..386d3af2691 --- /dev/null +++ b/pulp_file/tests/functional/api/test_distributed_publication.py @@ -0,0 +1,138 @@ +"""Tests for the distributed publication grace-period feature.""" + +from dataclasses import dataclass +from urllib.parse import urljoin + +import pytest +import requests +from pulpcore.client.pulp_file import FileFilePublication, FileFileDistribution +from pulpcore.client.pulp_file import RepositorySyncURL +from pulpcore.tests.functional.utils import wait_distributed_publication_retention_period + + +@dataclass +class DistributionPublicationContext: + pub_with_file: FileFilePublication + pub_without_file: FileFilePublication + + def create_distribution(self, publication: FileFilePublication) -> FileFileDistribution: + raise NotImplementedError + + def update_distribution(self, dist: FileFileDistribution, publication: FileFilePublication): + raise NotImplementedError + + def get_file_url(self, distribution: FileFileDistribution) -> str: + raise NotImplementedError + + def clear_dist_cache(self, dist: FileFileDistribution) -> None: + raise NotImplementedError + + +class TestDistributionPublicationRetention: + @pytest.mark.parallel + def test_old_content_is_served_within_retention_period( + self, + ctx: DistributionPublicationContext, + ): + """Old content is still served immediately after switching to a new publication.""" + dist = ctx.create_distribution(publication=ctx.pub_with_file) + file_url = ctx.get_file_url(dist) + assert requests.get(file_url).status_code == 200 + + ctx.update_distribution(dist, publication=ctx.pub_without_file) + assert requests.get(file_url).status_code == 200 + + @pytest.mark.parallel + def test_old_content_expires_after_retention_period( + self, + ctx: DistributionPublicationContext, + ): + """Old content becomes unavailable once the retention period expires.""" + dist = ctx.create_distribution(publication=ctx.pub_with_file) + file_url = ctx.get_file_url(dist) + + ctx.update_distribution(dist, publication=ctx.pub_without_file) + wait_distributed_publication_retention_period() + ctx.clear_dist_cache(dist) # if redis is enabled it interferes with the assertion + assert requests.get(file_url).status_code == 404 + + @pytest.fixture + def ctx( + self, + file_bindings, + file_repository_factory, + file_remote_ssl_factory, + basic_manifest_path, + file_publication_factory, + file_distribution_factory, + distribution_base_url, + monitor_task, + ) -> DistributionPublicationContext: + """Set up two publications: one with a file, one without.""" + repo = file_repository_factory() + remote = file_remote_ssl_factory(manifest_path=basic_manifest_path, policy="immediate") + + # Sync to get files into the repo + monitor_task( + file_bindings.RepositoriesFileApi.sync( + repo.pulp_href, RepositorySyncURL(remote=remote.pulp_href) + ).task + ) + repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href) + + # Publish version 1 (has the file) + pub_with_file = file_publication_factory(repository=repo.pulp_href) + + # Pick a file that exists in pub_with_file + content = file_bindings.ContentFilesApi.list( + repository_version=repo.latest_version_href + ).results[0] + file_relative_path = content.relative_path + + # Remove the file from the repo + monitor_task( + file_bindings.RepositoriesFileApi.modify( + repo.pulp_href, {"remove_content_units": [content.pulp_href]} + ).task + ) + repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href) + + # Publish version 2 (does not have the file) + pub_without_file = file_publication_factory(repository=repo.pulp_href) + + class _DistributionPublicationContext(DistributionPublicationContext): + def create_distribution(self, publication: FileFilePublication) -> FileFileDistribution: + return file_distribution_factory(publication=publication.pulp_href) + + def update_distribution( + self, dist: FileFileDistribution, publication: FileFilePublication + ) -> None: + monitor_task( + file_bindings.DistributionsFileApi.partial_update( + dist.pulp_href, {"publication": publication.pulp_href} + ).task + ) + + def get_file_url(self, distribution: FileFileDistribution) -> str: + return urljoin(distribution_base_url(distribution.base_url), file_relative_path) + + def clear_dist_cache(self, dist: FileFileDistribution) -> None: + original_base_path = dist.base_path + tmp_base_path = original_base_path + "-tmp" + monitor_task( + file_bindings.DistributionsFileApi.partial_update( + dist.pulp_href, {"base_path": tmp_base_path} + ).task + ) + monitor_task( + file_bindings.DistributionsFileApi.partial_update( + dist.pulp_href, {"base_path": original_base_path} + ).task + ) + restored = file_bindings.DistributionsFileApi.read(dist.pulp_href) + assert restored.base_path == original_base_path + + return _DistributionPublicationContext( + pub_with_file=pub_with_file, + pub_without_file=pub_without_file, + ) diff --git a/pulpcore/app/migrations/0149_distributedpublication.py b/pulpcore/app/migrations/0149_distributedpublication.py new file mode 100644 index 00000000000..4022757d6b8 --- /dev/null +++ b/pulpcore/app/migrations/0149_distributedpublication.py @@ -0,0 +1,31 @@ +# Generated by Django 5.2.10 on 2026-03-30 18:48 + +import django.db.models.deletion +import django_lifecycle.mixins +import pulpcore.app.models.base +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0148_artifact_artifact_domain_size_index'), + ] + + operations = [ + migrations.CreateModel( + name='DistributedPublication', + fields=[ + ('pulp_id', models.UUIDField(default=pulpcore.app.models.base.pulp_uuid, editable=False, primary_key=True, serialize=False)), + ('pulp_created', models.DateTimeField(auto_now_add=True)), + ('pulp_last_updated', models.DateTimeField(auto_now=True, null=True)), + ('expires_at', models.DateTimeField(null=True)), + ('distribution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='core_distributedpublications', to='core.distribution')), + ('publication', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='core_distributedpublications', to='core.publication')), + ], + options={ + 'abstract': False, + }, + bases=(django_lifecycle.mixins.LifecycleModelMixin, models.Model), + ), + ] diff --git a/pulpcore/app/models/__init__.py b/pulpcore/app/models/__init__.py index 3ea9942d948..6f085af768b 100644 --- a/pulpcore/app/models/__init__.py +++ b/pulpcore/app/models/__init__.py @@ -51,6 +51,7 @@ from .publication import ( ContentGuard, Distribution, + DistributedPublication, Publication, PublishedArtifact, PublishedMetadata, @@ -138,6 +139,7 @@ "PulpImporter", "ContentGuard", "Distribution", + "DistributedPublication", "Publication", "PublishedArtifact", "PublishedMetadata", diff --git a/pulpcore/app/models/publication.py b/pulpcore/app/models/publication.py index 64635264042..5154ea2322b 100644 --- a/pulpcore/app/models/publication.py +++ b/pulpcore/app/models/publication.py @@ -7,6 +7,7 @@ from base64 import b64decode from binascii import Error as Base64DecodeError +from contextlib import suppress from datetime import timedelta from gettext import gettext as _ from url_normalize import url_normalize @@ -18,7 +19,7 @@ from django.contrib.postgres.fields import HStoreField from django.db import DatabaseError, IntegrityError, models, transaction from django.utils import timezone -from django_lifecycle import hook, AFTER_UPDATE, BEFORE_DELETE +from django_lifecycle import hook, AFTER_CREATE, AFTER_UPDATE, BEFORE_DELETE from .base import MasterModel, BaseModel from .content import Artifact, Content, ContentArtifact @@ -34,6 +35,19 @@ _logger = logging.getLogger(__name__) +def _latest_publication_for_repository(repository): + """Return the latest complete Publication for a repository, or None.""" + with suppress(Publication.DoesNotExist): + return ( + Publication.objects.filter( + repository_version__in=repository.versions.all(), complete=True + ) + .select_related("repository_version") + .latest("repository_version", "pulp_created") + ) + return None + + class PublicationQuerySet(models.QuerySet): """A queryset that provides publication filtering methods.""" @@ -233,6 +247,15 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.delete() raise + # Register with distributed publication tracking + for distro in Distribution.objects.filter(repository=self.repository): + detail_distro = distro.cast() + if not detail_distro.SERVE_FROM_PUBLICATION: + continue + latest_repo_publication = _latest_publication_for_repository(self.repository) + if self == latest_repo_publication: + DistributedPublication(distribution=distro, publication=self).save() + # Unmark old checkpoints if retention is configured if self.checkpoint: repository = self.repository_version.repository @@ -731,6 +754,76 @@ def content_headers_for(self, path): """ return {} + def get_fallback_ca(self, path): + """ + Return a ContentArtifact for path from the grace-period publication history, or None. + + Iterates DistributedPublication records for this distribution from newest to oldest, + trying each publication until the path is found. Handles both pass-through and + non-pass-through (PublishedArtifact) publications. + """ + recent_dp = ( + self.core_distributedpublications.filter( + models.Q(expires_at__gte=timezone.now()) | models.Q(expires_at__isnull=True) + ) + .order_by("-pulp_created") + .select_related("publication__repository_version") + ) + for dp in recent_dp.iterator(): + pub = dp.publication + if pub.pass_through: + ca = ( + ContentArtifact.objects.select_related("artifact", "artifact__pulp_domain") + .filter(content__in=pub.repository_version.content, relative_path=path) + .first() + ) + if ca is not None: + return ca + else: + pa = ( + pub.published_artifact.select_related( + "content_artifact__artifact", + "content_artifact__artifact__pulp_domain", + ) + .filter(relative_path=path) + .first() + ) + if pa is not None: + return pa.content_artifact + return None + + @hook(AFTER_CREATE) + @hook(AFTER_UPDATE, when="publication", has_changed=True, is_not=None) + @hook(AFTER_UPDATE, when="repository", has_changed=True, is_not=None) + @hook(AFTER_UPDATE, when="repository_version", has_changed=True, is_not=None) + def set_distributed_publication(self): + """Track the publication being served when a distribution is created or changed.""" + if not self.cast().SERVE_FROM_PUBLICATION: + return + pub = None + if self.publication: + pub = self.publication + elif self.repository: + pub = _latest_publication_for_repository(self.repository) + elif self.repository_version: + with suppress(Publication.DoesNotExist): + pub = Publication.objects.filter( + repository_version=self.repository_version, complete=True + ).latest("pulp_created") + if pub is None: + return + DistributedPublication(distribution=self, publication=pub).save() + + @hook( + AFTER_UPDATE, + when_any=["publication", "repository", "repository_version"], + has_changed=True, + is_now=None, + ) + def clear_distributed_publication(self): + """Remove all DistributedPublication records when the distribution's source is cleared.""" + DistributedPublication.objects.filter(distribution=self).delete() + @hook(BEFORE_DELETE) @hook( AFTER_UPDATE, @@ -794,3 +887,48 @@ class Meta: permissions = [ ("manage_roles_artifactdistribution", "Can manage roles on artifact distributions"), ] + + +class DistributedPublication(BaseModel): + """ + Records the history of publications served by each Distribution. + + Keeps superseded publications alive and serveable for a configurable grace period + so clients mid-download don't receive 404 errors when a distribution switches publications. + + - ``expires_at=null`` — currently active + - ``expires_at=`` — superseded; still served until that time + """ + + distribution = models.ForeignKey( + Distribution, on_delete=models.CASCADE, related_name="core_distributedpublications" + ) + publication = models.ForeignKey( + Publication, on_delete=models.CASCADE, related_name="core_distributedpublications" + ) + expires_at = models.DateTimeField(null=True) + + @classmethod + def get_active(cls, distribution): + """Return records that are still being served (current or within grace period).""" + return cls.objects.filter(distribution=distribution).filter( + models.Q(expires_at__isnull=True) | models.Q(expires_at__gte=timezone.now()) + ) + + @classmethod + def get_expired(cls, distribution): + """Return records whose grace period has elapsed for a distribution.""" + return cls.objects.filter(distribution=distribution, expires_at__lt=timezone.now()) + + @hook(AFTER_CREATE) + def cleanup(self): + """Expire older active DistributedPublications; delete already-expired ones.""" + DistributedPublication.objects.filter(expires_at__lt=timezone.now()).delete() + DistributedPublication.objects.exclude(pk=self.pk).filter( + distribution=self.distribution, expires_at__isnull=True + ).update( + expires_at=( + timezone.now() + + timedelta(seconds=settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD) + ) + ) diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index aa9af431ae8..2f85b4e6bfa 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -333,6 +333,10 @@ # The time in seconds a RemoteArtifact will be ignored after failure. REMOTE_CONTENT_FETCH_FAILURE_COOLDOWN = 5 * 60 # 5 minutes +# The time in seconds that a superseded publication will continue to be served for distributions +# that have switched to a newer publication. Prevents 404s for clients mid-download. +DISTRIBUTED_PUBLICATION_RETENTION_PERIOD = 3 * 24 * 60 * 60 # 3 days + SPECTACULAR_SETTINGS = { "OAS_VERSION": "3.1.1", "SERVE_URLCONF": ROOT_URLCONF, diff --git a/pulpcore/content/handler.py b/pulpcore/content/handler.py index d00a53cba7b..8a1c0e82f86 100644 --- a/pulpcore/content/handler.py +++ b/pulpcore/content/handler.py @@ -823,6 +823,17 @@ async def _match_and_stream(self, path, request): request, StreamResponse(headers=headers), ca ) + # Grace-period fallback: serve from a recently-superseded publication + if distro.SERVE_FROM_PUBLICATION: + ca = await sync_to_async(distro.get_fallback_ca)(original_rel_path) + if ca is not None: + if ca.artifact: + return await self._serve_content_artifact(ca, headers, request) + else: + return await self._stream_content_artifact( + request, StreamResponse(headers=headers), ca + ) + if repo_version and not publication and not distro.SERVE_FROM_PUBLICATION: # Look for index.html or list the directory index_path = "{}index.html".format(rel_path) diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_cache.py b/pulpcore/tests/functional/api/using_plugin/test_content_cache.py index c238b485d36..7d24399f2b5 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_cache.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_cache.py @@ -10,7 +10,9 @@ PatchedfileFileDistribution, ) -from pulpcore.tests.functional.utils import get_from_url +from pulpcore.tests.functional.utils import ( + get_from_url, +) @pytest.mark.parallel diff --git a/pulpcore/tests/functional/utils.py b/pulpcore/tests/functional/utils.py index aeab1328ca0..74c8b97317d 100644 --- a/pulpcore/tests/functional/utils.py +++ b/pulpcore/tests/functional/utils.py @@ -5,12 +5,21 @@ import hashlib import os import random +import time from aiohttp import web from dataclasses import dataclass +from django.conf import settings from multidict import CIMultiDict +def wait_distributed_publication_retention_period(): + assert ( + settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD <= 5 + ), "DISTRIBUTED_PUBLICATION_RETENTION_PERIOD is too long for testing." + time.sleep(settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD + 1) + + async def get_response(url): async with aiohttp.ClientSession() as session: return await session.get(url) diff --git a/pulpcore/tests/unit/models/test_publication_retention.py b/pulpcore/tests/unit/models/test_publication_retention.py new file mode 100644 index 00000000000..cd6c95361eb --- /dev/null +++ b/pulpcore/tests/unit/models/test_publication_retention.py @@ -0,0 +1,215 @@ +import hashlib +import uuid + +import pytest + +from pulpcore.app.models import ( + Content, + ContentArtifact, + DistributedPublication, + PublishedArtifact, +) +from pulp_file.app.models import ( + FileContent, + FileDistribution, + FilePublication, + FileRepository, +) + +UNSET = object() + + +def pub_factory(repo_version=None, pass_through=False, create_pa=False): + if repo_version is None: + repo_version = FileRepository.objects.create( + name=f"repo-{uuid.uuid4().hex[:8]}" + ).versions.first() + pub = FilePublication.objects.create( + repository_version=repo_version, complete=True, pass_through=pass_through + ) + if create_pa: + for ca in ContentArtifact.objects.filter(content__in=repo_version.content.all()): + PublishedArtifact.objects.create( + publication=pub, content_artifact=ca, relative_path=ca.relative_path + ) + return pub + + +def dist_factory(repo=None, repover=None, pub=None, name=None): + assert [repo, repover, pub].count( + None + ) == 2, "Exactly one of repo, repover, or pub must be provided" + name = name or f"dist-{uuid.uuid4().hex[:8]}" + return FileDistribution.objects.create( + name=name, base_path=name, repository=repo, repository_version=repover, publication=pub + ) + + +def update_dist(dist, repo=UNSET, repover=UNSET, pub=UNSET): + assert (repo, repover, pub).count( + UNSET + ) == 2, "Exactly one of repo, repover, or pub must be provided" + if repo is not UNSET: + dist.repository = repo + if repover is not UNSET: + dist.repository_version = repover + if pub is not UNSET: + dist.publication = pub + dist.save() + + +def create_version(repo, add=None, remove=None): + """ + Create a RepositoryVersion adding and/or removing content by path. + """ + assert add or remove, "at least one of add or remove must be specified" + with repo.new_version() as repo_version: + for path in add or []: + digest = hashlib.sha256(path.encode()).hexdigest() + content = FileContent.objects.create(relative_path=path, digest=digest) + repo_version.add_content(Content.objects.filter(pk=content.pk)) + ContentArtifact.objects.create(content=content, relative_path=path) + for path in remove or []: + ca = ContentArtifact.objects.get(relative_path=path) + repo_version.remove_content(Content.objects.filter(pk=ca.content_id)) + for path in add or []: + ca = ContentArtifact.objects.get(relative_path=path) + assert repo_version.content.filter( + pk=ca.content_id + ).exists(), f"{path!r} not found in repository version content" + for path in remove or []: + ca = ContentArtifact.objects.get(relative_path=path) + assert not repo_version.content.filter( + pk=ca.content_id + ).exists(), f"{path!r} should not be in repository version content after removal" + return repo_version + + +@pytest.mark.django_db +class TestDistributedPublication: + def test_created_when_publication_added_to_distribution(self, db): + pub = pub_factory() + dist = dist_factory(pub=pub) + active = DistributedPublication.get_active(dist) + assert active.count() == 1 + assert active.first().publication_id == pub.pk + assert active.first().expires_at is None + + def test_first_distributed_publication_is_active(self, db): + dist = dist_factory(pub=pub_factory()) + assert DistributedPublication.get_active(dist).count() == 1 + assert DistributedPublication.get_expired(dist).count() == 0 + + def test_switching_publication_expires_old_and_activates_new(self, db): + pub1 = pub_factory() + dist = dist_factory(pub=pub1) + + pub2 = pub_factory() + update_dist(dist, pub=pub2) + + active = DistributedPublication.get_active(dist) + assert active.count() == 2 + assert active.filter(expires_at__isnull=True).first().publication_id == pub2.pk + assert active.filter(expires_at__isnull=False).first().publication_id == pub1.pk + + +@pytest.mark.django_db +class TestClearDistributedPublication: + def test_deleting_publication_clears_dps(self, db): + pub = pub_factory() + dist = dist_factory(pub=pub) + assert DistributedPublication.objects.filter(distribution=dist).exists() + pub.delete() + assert not DistributedPublication.objects.filter(distribution=dist).exists() + + def test_deleting_older_repository_version_doesnt_clear_dps(self, db): + repo = FileRepository.objects.create(name="test-repo") + v1 = repo.latest_version() + v2 = create_version(repo, add=["some-file.txt"]) + pub = pub_factory(v2) + dist = dist_factory(pub=pub) + assert DistributedPublication.objects.filter(distribution=dist).count() == 1 + v1.delete() + assert DistributedPublication.objects.filter(distribution=dist).count() == 1 + + def test_deleting_repository_clears_dps(self, db): + pub = pub_factory() + dist = dist_factory(pub=pub) + repo = pub.repository_version.repository + assert DistributedPublication.objects.filter(distribution=dist).exists() + repo.delete() + assert not DistributedPublication.objects.filter(distribution=dist).exists() + + +@pytest.mark.django_db +class TestGetFallbackCa: + def test_returns_ca_when_content_in_publication(self, version_with_content, expected_ca): + pub_with_a = pub_factory(version_with_content, pass_through=True) + dist = dist_factory(pub=pub_with_a) + assert dist.get_fallback_ca(self.content_path) == expected_ca + + def test_returns_none_when_content_not_in_publication(self, version_without_content): + pub_without_a = pub_factory(version_without_content, pass_through=True) + dist = dist_factory(pub=pub_without_a) + assert dist.get_fallback_ca(self.content_path) is None + + def test_returns_none_when_no_published_artifact(self, version_with_content): + pub_with_a = pub_factory(version_with_content, pass_through=False, create_pa=False) + dist = dist_factory(pub=pub_with_a) + assert dist.get_fallback_ca(self.content_path) is None + + def test_returns_ca_via_published_artifact(self, version_with_content, expected_ca): + pub_with_a = pub_factory(version_with_content, pass_through=False, create_pa=True) + dist = dist_factory(pub=pub_with_a) + assert dist.get_fallback_ca(self.content_path) == expected_ca + + def test_returns_ca_from_superseded_publication( + self, version_with_content, version_without_content, expected_ca + ): + pub_with_a = pub_factory(version_with_content, pass_through=True) + dist = dist_factory(pub=pub_with_a) + pub_without_a = pub_factory(version_without_content, pass_through=True) + update_dist(dist, pub=pub_without_a) + assert dist.get_fallback_ca(self.content_path) == expected_ca + + def test_returns_none_after_unsetting_repository(self, version_with_content, expected_ca): + repo = version_with_content.repository + pub_factory(version_with_content, pass_through=True) + dist = dist_factory(repo=repo) + assert dist.get_fallback_ca(self.content_path) == expected_ca + + update_dist(dist, repo=None) + assert dist.get_fallback_ca(self.content_path) is None + + def test_returns_none_after_unsetting_repository_version( + self, version_with_content, expected_ca + ): + pub_factory(version_with_content, pass_through=True) + dist = dist_factory(repover=version_with_content) + assert dist.get_fallback_ca(self.content_path) == expected_ca + + update_dist(dist, repover=None) + assert dist.get_fallback_ca(self.content_path) is None + + def test_returns_none_after_removing_publication(self, version_with_content, expected_ca): + pub = pub_factory(version_with_content, pass_through=True) + dist = dist_factory(pub=pub) + assert dist.get_fallback_ca(self.content_path) == expected_ca + + update_dist(dist, pub=None) + assert dist.get_fallback_ca(self.content_path) is None + + @pytest.fixture + def version_with_content(self, db): + repo = FileRepository.objects.create(name="test-repo") + self.content_path = "test.txt" + return create_version(repo, add=["test.txt"]) + + @pytest.fixture + def version_without_content(self, version_with_content): + repo = version_with_content.repository + return create_version(repo, add=["other.txt"], remove=[self.content_path]) + + @pytest.fixture + def expected_ca(self, version_with_content): + return ContentArtifact.objects.get(relative_path=self.content_path) diff --git a/template_config.yml b/template_config.yml index e5f66093813..2fc54fc30bb 100644 --- a/template_config.yml +++ b/template_config.yml @@ -59,6 +59,7 @@ pulp_settings: content_path_prefix: "/somewhere/else/" csrf_trusted_origins: - "https://pulp:443" + distributed_publication_retention_period: 3 orphan_protection_time: 0 task_diagnostics: - "memory" From ca70237f1fc468629f73777aef2c38f73ffd7b52 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 1 Apr 2026 09:55:50 -0300 Subject: [PATCH 2/4] fixup: refactors --- .../api/test_distributed_publication.py | 12 +++++-- pulpcore/app/models/publication.py | 32 ++++--------------- pulpcore/tests/functional/utils.py | 9 ------ 3 files changed, 16 insertions(+), 37 deletions(-) diff --git a/pulp_file/tests/functional/api/test_distributed_publication.py b/pulp_file/tests/functional/api/test_distributed_publication.py index 386d3af2691..0911dcc7db9 100644 --- a/pulp_file/tests/functional/api/test_distributed_publication.py +++ b/pulp_file/tests/functional/api/test_distributed_publication.py @@ -7,7 +7,9 @@ import requests from pulpcore.client.pulp_file import FileFilePublication, FileFileDistribution from pulpcore.client.pulp_file import RepositorySyncURL -from pulpcore.tests.functional.utils import wait_distributed_publication_retention_period + +import time +from django.conf import settings @dataclass @@ -27,6 +29,12 @@ def get_file_url(self, distribution: FileFileDistribution) -> str: def clear_dist_cache(self, dist: FileFileDistribution) -> None: raise NotImplementedError + def wait_retention_period(self): + assert ( + settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD <= 5 + ), "DISTRIBUTED_PUBLICATION_RETENTION_PERIOD is too long for testing." + time.sleep(settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD + 1) + class TestDistributionPublicationRetention: @pytest.mark.parallel @@ -52,7 +60,7 @@ def test_old_content_expires_after_retention_period( file_url = ctx.get_file_url(dist) ctx.update_distribution(dist, publication=ctx.pub_without_file) - wait_distributed_publication_retention_period() + ctx.wait_retention_period() ctx.clear_dist_cache(dist) # if redis is enabled it interferes with the assertion assert requests.get(file_url).status_code == 404 diff --git a/pulpcore/app/models/publication.py b/pulpcore/app/models/publication.py index 5154ea2322b..77083cac3a9 100644 --- a/pulpcore/app/models/publication.py +++ b/pulpcore/app/models/publication.py @@ -7,7 +7,6 @@ from base64 import b64decode from binascii import Error as Base64DecodeError -from contextlib import suppress from datetime import timedelta from gettext import gettext as _ from url_normalize import url_normalize @@ -35,19 +34,6 @@ _logger = logging.getLogger(__name__) -def _latest_publication_for_repository(repository): - """Return the latest complete Publication for a repository, or None.""" - with suppress(Publication.DoesNotExist): - return ( - Publication.objects.filter( - repository_version__in=repository.versions.all(), complete=True - ) - .select_related("repository_version") - .latest("repository_version", "pulp_created") - ) - return None - - class PublicationQuerySet(models.QuerySet): """A queryset that provides publication filtering methods.""" @@ -252,7 +238,9 @@ def __exit__(self, exc_type, exc_val, exc_tb): detail_distro = distro.cast() if not detail_distro.SERVE_FROM_PUBLICATION: continue - latest_repo_publication = _latest_publication_for_repository(self.repository) + _, _, latest_repo_publication = ( + detail_distro.get_repository_publication_and_version() + ) if self == latest_repo_publication: DistributedPublication(distribution=distro, publication=self).save() @@ -798,18 +786,10 @@ def get_fallback_ca(self, path): @hook(AFTER_UPDATE, when="repository_version", has_changed=True, is_not=None) def set_distributed_publication(self): """Track the publication being served when a distribution is created or changed.""" - if not self.cast().SERVE_FROM_PUBLICATION: + detail = self.cast() + if not detail.SERVE_FROM_PUBLICATION: return - pub = None - if self.publication: - pub = self.publication - elif self.repository: - pub = _latest_publication_for_repository(self.repository) - elif self.repository_version: - with suppress(Publication.DoesNotExist): - pub = Publication.objects.filter( - repository_version=self.repository_version, complete=True - ).latest("pulp_created") + _, _, pub = detail.get_repository_publication_and_version() if pub is None: return DistributedPublication(distribution=self, publication=pub).save() diff --git a/pulpcore/tests/functional/utils.py b/pulpcore/tests/functional/utils.py index 74c8b97317d..aeab1328ca0 100644 --- a/pulpcore/tests/functional/utils.py +++ b/pulpcore/tests/functional/utils.py @@ -5,21 +5,12 @@ import hashlib import os import random -import time from aiohttp import web from dataclasses import dataclass -from django.conf import settings from multidict import CIMultiDict -def wait_distributed_publication_retention_period(): - assert ( - settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD <= 5 - ), "DISTRIBUTED_PUBLICATION_RETENTION_PERIOD is too long for testing." - time.sleep(settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD + 1) - - async def get_response(url): async with aiohttp.ClientSession() as session: return await session.get(url) From 48a886884665b14e35657f0a23546fd33f79ec82 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 1 Apr 2026 10:23:30 -0300 Subject: [PATCH 3/4] fixup: add feature flag --- docs/admin/reference/settings.md | 11 ++++ pulpcore/app/models/publication.py | 50 +++++++++++-------- pulpcore/app/settings.py | 11 ++++ pulpcore/app/util.py | 5 ++ .../api/using_plugin/test_content_cache.py | 4 +- 5 files changed, 57 insertions(+), 24 deletions(-) diff --git a/docs/admin/reference/settings.md b/docs/admin/reference/settings.md index cdb326e4f57..cc8bb1cadd1 100644 --- a/docs/admin/reference/settings.md +++ b/docs/admin/reference/settings.md @@ -293,6 +293,17 @@ All installed plugins must be Domain compatible for Pulp to start. Defaults to `False`. +### DISTRIBUTED\_PUBLICATION\_RETENTION\_PERIOD + +When a distribution switches to a newer publication, the previously served (superseded) publication +continues to be available for this many seconds. +This grace period prevents 404 errors for clients that began downloading from the old publication +before the switch occurred. + +Set to `0` to disable the grace period and serve only the latest publication immediately. + +Defaults to `259200` seconds (3 days). + ### ENABLED\_PLUGINS An optional list of plugin names. diff --git a/pulpcore/app/models/publication.py b/pulpcore/app/models/publication.py index 77083cac3a9..04bfcd00800 100644 --- a/pulpcore/app/models/publication.py +++ b/pulpcore/app/models/publication.py @@ -29,7 +29,7 @@ from rest_framework.exceptions import APIException from pulpcore.app.models import AutoAddObjPermsMixin from pulpcore.responses import ArtifactResponse -from pulpcore.app.util import get_domain_pk, cache_key, get_url +from pulpcore.app.util import get_domain_pk, cache_key, get_url, retain_distributed_pub_enabled _logger = logging.getLogger(__name__) @@ -233,16 +233,17 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.delete() raise - # Register with distributed publication tracking - for distro in Distribution.objects.filter(repository=self.repository): - detail_distro = distro.cast() - if not detail_distro.SERVE_FROM_PUBLICATION: - continue - _, _, latest_repo_publication = ( - detail_distro.get_repository_publication_and_version() - ) - if self == latest_repo_publication: - DistributedPublication(distribution=distro, publication=self).save() + # Create distributed publication for repository auto-publish scenario + if retain_distributed_pub_enabled(): + for distro in Distribution.objects.filter(repository=self.repository): + detail_distro = distro.cast() + if not detail_distro.SERVE_FROM_PUBLICATION: + continue + _, _, latest_repo_publication = ( + detail_distro.get_repository_publication_and_version() + ) + if self == latest_repo_publication: + DistributedPublication(distribution=distro, publication=self).save() # Unmark old checkpoints if retention is configured if self.checkpoint: @@ -749,7 +750,11 @@ def get_fallback_ca(self, path): Iterates DistributedPublication records for this distribution from newest to oldest, trying each publication until the path is found. Handles both pass-through and non-pass-through (PublishedArtifact) publications. + + Returns None immediately when DISTRIBUTED_PUBLICATION_RETENTION_PERIOD is 0. """ + if not retain_distributed_pub_enabled(): + return None recent_dp = ( self.core_distributedpublications.filter( models.Q(expires_at__gte=timezone.now()) | models.Q(expires_at__isnull=True) @@ -781,13 +786,16 @@ def get_fallback_ca(self, path): return None @hook(AFTER_CREATE) - @hook(AFTER_UPDATE, when="publication", has_changed=True, is_not=None) - @hook(AFTER_UPDATE, when="repository", has_changed=True, is_not=None) - @hook(AFTER_UPDATE, when="repository_version", has_changed=True, is_not=None) + @hook( + AFTER_UPDATE, + when_any=["publication", "repository", "repository_version"], + has_changed=True, + is_not=None, + ) def set_distributed_publication(self): """Track the publication being served when a distribution is created or changed.""" detail = self.cast() - if not detail.SERVE_FROM_PUBLICATION: + if not detail.SERVE_FROM_PUBLICATION or not retain_distributed_pub_enabled(): return _, _, pub = detail.get_repository_publication_and_version() if pub is None: @@ -904,11 +912,11 @@ def get_expired(cls, distribution): def cleanup(self): """Expire older active DistributedPublications; delete already-expired ones.""" DistributedPublication.objects.filter(expires_at__lt=timezone.now()).delete() - DistributedPublication.objects.exclude(pk=self.pk).filter( + superseded = DistributedPublication.objects.exclude(pk=self.pk).filter( distribution=self.distribution, expires_at__isnull=True - ).update( - expires_at=( - timezone.now() - + timedelta(seconds=settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD) - ) ) + if not retain_distributed_pub_enabled(): + superseded.delete() + else: + retention = settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD + superseded.update(expires_at=timezone.now() + timedelta(seconds=retention)) diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index 2f85b4e6bfa..c5c559306f9 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -534,6 +534,16 @@ }, ) +distributed_publication_retention_period_validator = Validator( + "DISTRIBUTED_PUBLICATION_RETENTION_PERIOD", + is_type_of=int, + gte=0, + messages={ + "is_type_of": "{name} must be an integer (number of seconds).", + "gte": "{name} must be a non-negative integer. Set to 0 to disable the grace period.", + }, +) + def otel_middleware_hook(settings): data = {"dynaconf_merge": True} @@ -561,6 +571,7 @@ def otel_middleware_hook(settings): json_header_auth_validator, authentication_json_header_openapi_security_scheme_validator, otel_pulp_api_histogram_buckets_validator, + distributed_publication_retention_period_validator, ], post_hooks=(otel_middleware_hook,), ) diff --git a/pulpcore/app/util.py b/pulpcore/app/util.py index 5bb669208c4..f94e8c5c2d3 100644 --- a/pulpcore/app/util.py +++ b/pulpcore/app/util.py @@ -37,6 +37,11 @@ STRIPPED_API_ROOT = settings.API_ROOT.strip("/") +@lru_cache(maxsize=None) +def retain_distributed_pub_enabled(): + return settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD > 0 + + def reverse(viewname, args=None, kwargs=None, request=None, relative_url=True, **extra): """ Customized reverse to handle Pulp specific parameters like domains and API_ROOT rewrite. diff --git a/pulpcore/tests/functional/api/using_plugin/test_content_cache.py b/pulpcore/tests/functional/api/using_plugin/test_content_cache.py index 7d24399f2b5..c238b485d36 100644 --- a/pulpcore/tests/functional/api/using_plugin/test_content_cache.py +++ b/pulpcore/tests/functional/api/using_plugin/test_content_cache.py @@ -10,9 +10,7 @@ PatchedfileFileDistribution, ) -from pulpcore.tests.functional.utils import ( - get_from_url, -) +from pulpcore.tests.functional.utils import get_from_url @pytest.mark.parallel From c87f525f6eee07dec505acfd9ca7289cf409bab2 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 1 Apr 2026 10:48:17 -0300 Subject: [PATCH 4/4] fixup: refactors --- docs/admin/reference/settings.md | 6 ++-- pulpcore/app/models/publication.py | 30 ++++++++----------- .../unit/models/test_publication_retention.py | 29 +++++++++++------- 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/docs/admin/reference/settings.md b/docs/admin/reference/settings.md index cc8bb1cadd1..2c4acc3657e 100644 --- a/docs/admin/reference/settings.md +++ b/docs/admin/reference/settings.md @@ -295,10 +295,8 @@ Defaults to `False`. ### DISTRIBUTED\_PUBLICATION\_RETENTION\_PERIOD -When a distribution switches to a newer publication, the previously served (superseded) publication -continues to be available for this many seconds. -This grace period prevents 404 errors for clients that began downloading from the old publication -before the switch occurred. +When a distribution switches to a newer publication, the previously served (superseded) publication continues to be available for this many seconds. +This grace period prevents 404 errors for clients that began downloading from the old publication before the switch occurred. Set to `0` to disable the grace period and serve only the latest publication immediately. diff --git a/pulpcore/app/models/publication.py b/pulpcore/app/models/publication.py index 04bfcd00800..12e75d49b6a 100644 --- a/pulpcore/app/models/publication.py +++ b/pulpcore/app/models/publication.py @@ -756,9 +756,8 @@ def get_fallback_ca(self, path): if not retain_distributed_pub_enabled(): return None recent_dp = ( - self.core_distributedpublications.filter( - models.Q(expires_at__gte=timezone.now()) | models.Q(expires_at__isnull=True) - ) + DistributedPublication.get_non_expired() + .filter(distribution=self) .order_by("-pulp_created") .select_related("publication__repository_version") ) @@ -881,11 +880,8 @@ class DistributedPublication(BaseModel): """ Records the history of publications served by each Distribution. - Keeps superseded publications alive and serveable for a configurable grace period - so clients mid-download don't receive 404 errors when a distribution switches publications. - - - ``expires_at=null`` — currently active - - ``expires_at=`` — superseded; still served until that time + Keeps superseded publications alive and serveable for a configurable grace period. + Null `expired_at` means the publication was not superseded yet. """ distribution = models.ForeignKey( @@ -897,21 +893,21 @@ class DistributedPublication(BaseModel): expires_at = models.DateTimeField(null=True) @classmethod - def get_active(cls, distribution): - """Return records that are still being served (current or within grace period).""" - return cls.objects.filter(distribution=distribution).filter( - models.Q(expires_at__isnull=True) | models.Q(expires_at__gte=timezone.now()) - ) + def get_non_expired(cls, include_current=True): + if include_current: + return cls.objects.filter( + models.Q(expires_at__isnull=True) | models.Q(expires_at__gte=timezone.now()) + ) + return cls.objects.filter(expires_at__gte=timezone.now()) @classmethod - def get_expired(cls, distribution): - """Return records whose grace period has elapsed for a distribution.""" - return cls.objects.filter(distribution=distribution, expires_at__lt=timezone.now()) + def get_expired(cls): + return cls.objects.filter(expires_at__lt=timezone.now()) @hook(AFTER_CREATE) def cleanup(self): """Expire older active DistributedPublications; delete already-expired ones.""" - DistributedPublication.objects.filter(expires_at__lt=timezone.now()).delete() + DistributedPublication.get_expired().delete() superseded = DistributedPublication.objects.exclude(pk=self.pk).filter( distribution=self.distribution, expires_at__isnull=True ) diff --git a/pulpcore/tests/unit/models/test_publication_retention.py b/pulpcore/tests/unit/models/test_publication_retention.py index cd6c95361eb..417d4857688 100644 --- a/pulpcore/tests/unit/models/test_publication_retention.py +++ b/pulpcore/tests/unit/models/test_publication_retention.py @@ -90,15 +90,22 @@ class TestDistributedPublication: def test_created_when_publication_added_to_distribution(self, db): pub = pub_factory() dist = dist_factory(pub=pub) - active = DistributedPublication.get_active(dist) - assert active.count() == 1 - assert active.first().publication_id == pub.pk - assert active.first().expires_at is None + non_expired = DistributedPublication.get_non_expired(include_current=True).filter( + distribution=dist + ) + assert non_expired.count() == 1 + assert non_expired.first().publication_id == pub.pk + assert non_expired.first().expires_at is None def test_first_distributed_publication_is_active(self, db): dist = dist_factory(pub=pub_factory()) - assert DistributedPublication.get_active(dist).count() == 1 - assert DistributedPublication.get_expired(dist).count() == 0 + assert ( + DistributedPublication.get_non_expired(include_current=True) + .filter(distribution=dist) + .count() + == 1 + ) + assert DistributedPublication.get_expired().filter(distribution=dist).count() == 0 def test_switching_publication_expires_old_and_activates_new(self, db): pub1 = pub_factory() @@ -107,10 +114,12 @@ def test_switching_publication_expires_old_and_activates_new(self, db): pub2 = pub_factory() update_dist(dist, pub=pub2) - active = DistributedPublication.get_active(dist) - assert active.count() == 2 - assert active.filter(expires_at__isnull=True).first().publication_id == pub2.pk - assert active.filter(expires_at__isnull=False).first().publication_id == pub1.pk + non_expired = DistributedPublication.get_non_expired(include_current=True).filter( + distribution=dist + ) + assert non_expired.count() == 2 + assert non_expired.filter(expires_at__isnull=True).first().publication_id == pub2.pk + assert non_expired.filter(expires_at__isnull=False).first().publication_id == pub1.pk @pytest.mark.django_db