Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/scripts/before_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ legacy_component_name: "pulpcore"
component_name: "core"
component_version: "${COMPONENT_VERSION}"
pulp_env: {"PULP_CA_BUNDLE": "/etc/pulp/certs/pulp_webserver.crt"}
pulp_settings: {"allowed_export_paths": ["/tmp"], "allowed_import_paths": ["/tmp"], "api_root": "/pulp/", "content_path_prefix": "/somewhere/else/", "csrf_trusted_origins": ["https://pulp:443"], "orphan_protection_time": 0, "task_diagnostics": ["memory"], "task_protection_time": 10, "tmpfile_protection_time": 10, "upload_protection_time": 10}
pulp_settings: {"allowed_export_paths": ["/tmp"], "allowed_import_paths": ["/tmp"], "api_root": "/pulp/", "content_path_prefix": "/somewhere/else/", "csrf_trusted_origins": ["https://pulp:443"], "distributed_publication_retention_period": 3, "orphan_protection_time": 0, "task_diagnostics": ["memory"], "task_protection_time": 10, "tmpfile_protection_time": 10, "upload_protection_time": 10}
pulp_scheme: "https"
image:
name: "pulp"
Expand Down
1 change: 1 addition & 0 deletions CHANGES/7514.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for serving superseded publications for a configurable retention period.
9 changes: 9 additions & 0 deletions docs/admin/reference/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,15 @@ All installed plugins must be Domain compatible for Pulp to start.

Defaults to `False`.

### DISTRIBUTED\_PUBLICATION\_RETENTION\_PERIOD

When a distribution switches to a newer publication, the previously served (superseded) publication continues to be available for this many seconds.
This grace period prevents 404 errors for clients that began downloading from the old publication before the switch occurred.

Set to `0` to disable the grace period and serve only the latest publication immediately.

Defaults to `259200` seconds (3 days).

### ENABLED\_PLUGINS

An optional list of plugin names.
Expand Down
146 changes: 146 additions & 0 deletions pulp_file/tests/functional/api/test_distributed_publication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Tests for the distributed publication grace-period feature."""

from dataclasses import dataclass
from urllib.parse import urljoin

import pytest
import requests
from pulpcore.client.pulp_file import FileFilePublication, FileFileDistribution
from pulpcore.client.pulp_file import RepositorySyncURL

import time
from django.conf import settings


@dataclass
class DistributionPublicationContext:
pub_with_file: FileFilePublication
pub_without_file: FileFilePublication

def create_distribution(self, publication: FileFilePublication) -> FileFileDistribution:
raise NotImplementedError

def update_distribution(self, dist: FileFileDistribution, publication: FileFilePublication):
raise NotImplementedError

def get_file_url(self, distribution: FileFileDistribution) -> str:
raise NotImplementedError

def clear_dist_cache(self, dist: FileFileDistribution) -> None:
raise NotImplementedError

def wait_retention_period(self):
assert (
settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD <= 5
), "DISTRIBUTED_PUBLICATION_RETENTION_PERIOD is too long for testing."
time.sleep(settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD + 1)


class TestDistributionPublicationRetention:
@pytest.mark.parallel
def test_old_content_is_served_within_retention_period(
self,
ctx: DistributionPublicationContext,
):
"""Old content is still served immediately after switching to a new publication."""
dist = ctx.create_distribution(publication=ctx.pub_with_file)
file_url = ctx.get_file_url(dist)
assert requests.get(file_url).status_code == 200

ctx.update_distribution(dist, publication=ctx.pub_without_file)
assert requests.get(file_url).status_code == 200

@pytest.mark.parallel
def test_old_content_expires_after_retention_period(
self,
ctx: DistributionPublicationContext,
):
"""Old content becomes unavailable once the retention period expires."""
dist = ctx.create_distribution(publication=ctx.pub_with_file)
file_url = ctx.get_file_url(dist)

ctx.update_distribution(dist, publication=ctx.pub_without_file)
ctx.wait_retention_period()
ctx.clear_dist_cache(dist) # if redis is enabled it interferes with the assertion
assert requests.get(file_url).status_code == 404

@pytest.fixture
def ctx(
self,
file_bindings,
file_repository_factory,
file_remote_ssl_factory,
basic_manifest_path,
file_publication_factory,
file_distribution_factory,
distribution_base_url,
monitor_task,
) -> DistributionPublicationContext:
"""Set up two publications: one with a file, one without."""
repo = file_repository_factory()
remote = file_remote_ssl_factory(manifest_path=basic_manifest_path, policy="immediate")

# Sync to get files into the repo
monitor_task(
file_bindings.RepositoriesFileApi.sync(
repo.pulp_href, RepositorySyncURL(remote=remote.pulp_href)
).task
)
repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href)

# Publish version 1 (has the file)
pub_with_file = file_publication_factory(repository=repo.pulp_href)

# Pick a file that exists in pub_with_file
content = file_bindings.ContentFilesApi.list(
repository_version=repo.latest_version_href
).results[0]
file_relative_path = content.relative_path

# Remove the file from the repo
monitor_task(
file_bindings.RepositoriesFileApi.modify(
repo.pulp_href, {"remove_content_units": [content.pulp_href]}
).task
)
repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href)

# Publish version 2 (does not have the file)
pub_without_file = file_publication_factory(repository=repo.pulp_href)

class _DistributionPublicationContext(DistributionPublicationContext):
def create_distribution(self, publication: FileFilePublication) -> FileFileDistribution:
return file_distribution_factory(publication=publication.pulp_href)

def update_distribution(
self, dist: FileFileDistribution, publication: FileFilePublication
) -> None:
monitor_task(
file_bindings.DistributionsFileApi.partial_update(
dist.pulp_href, {"publication": publication.pulp_href}
).task
)

def get_file_url(self, distribution: FileFileDistribution) -> str:
return urljoin(distribution_base_url(distribution.base_url), file_relative_path)

def clear_dist_cache(self, dist: FileFileDistribution) -> None:
original_base_path = dist.base_path
tmp_base_path = original_base_path + "-tmp"
monitor_task(
file_bindings.DistributionsFileApi.partial_update(
dist.pulp_href, {"base_path": tmp_base_path}
).task
)
monitor_task(
file_bindings.DistributionsFileApi.partial_update(
dist.pulp_href, {"base_path": original_base_path}
).task
)
restored = file_bindings.DistributionsFileApi.read(dist.pulp_href)
assert restored.base_path == original_base_path

return _DistributionPublicationContext(
pub_with_file=pub_with_file,
pub_without_file=pub_without_file,
)
31 changes: 31 additions & 0 deletions pulpcore/app/migrations/0149_distributedpublication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Generated by Django 5.2.10 on 2026-03-30 18:48

import django.db.models.deletion
import django_lifecycle.mixins
import pulpcore.app.models.base
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0148_artifact_artifact_domain_size_index'),
]

operations = [
migrations.CreateModel(
name='DistributedPublication',
fields=[
('pulp_id', models.UUIDField(default=pulpcore.app.models.base.pulp_uuid, editable=False, primary_key=True, serialize=False)),
('pulp_created', models.DateTimeField(auto_now_add=True)),
('pulp_last_updated', models.DateTimeField(auto_now=True, null=True)),
('expires_at', models.DateTimeField(null=True)),
('distribution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='core_distributedpublications', to='core.distribution')),
('publication', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='core_distributedpublications', to='core.publication')),
],
options={
'abstract': False,
},
bases=(django_lifecycle.mixins.LifecycleModelMixin, models.Model),
),
]
2 changes: 2 additions & 0 deletions pulpcore/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from .publication import (
ContentGuard,
Distribution,
DistributedPublication,
Publication,
PublishedArtifact,
PublishedMetadata,
Expand Down Expand Up @@ -138,6 +139,7 @@
"PulpImporter",
"ContentGuard",
"Distribution",
"DistributedPublication",
"Publication",
"PublishedArtifact",
"PublishedMetadata",
Expand Down
126 changes: 124 additions & 2 deletions pulpcore/app/models/publication.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from django.contrib.postgres.fields import HStoreField
from django.db import DatabaseError, IntegrityError, models, transaction
from django.utils import timezone
from django_lifecycle import hook, AFTER_UPDATE, BEFORE_DELETE
from django_lifecycle import hook, AFTER_CREATE, AFTER_UPDATE, BEFORE_DELETE

from .base import MasterModel, BaseModel
from .content import Artifact, Content, ContentArtifact
Expand All @@ -29,7 +29,7 @@
from rest_framework.exceptions import APIException
from pulpcore.app.models import AutoAddObjPermsMixin
from pulpcore.responses import ArtifactResponse
from pulpcore.app.util import get_domain_pk, cache_key, get_url
from pulpcore.app.util import get_domain_pk, cache_key, get_url, retain_distributed_pub_enabled

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -233,6 +233,18 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.delete()
raise

# Create distributed publication for repository auto-publish scenario
if retain_distributed_pub_enabled():
for distro in Distribution.objects.filter(repository=self.repository):
detail_distro = distro.cast()
if not detail_distro.SERVE_FROM_PUBLICATION:
continue
_, _, latest_repo_publication = (
detail_distro.get_repository_publication_and_version()
)
if self == latest_repo_publication:
DistributedPublication(distribution=distro, publication=self).save()

# Unmark old checkpoints if retention is configured
if self.checkpoint:
repository = self.repository_version.repository
Expand Down Expand Up @@ -731,6 +743,74 @@ def content_headers_for(self, path):
"""
return {}

def get_fallback_ca(self, path):
"""
Return a ContentArtifact for path from the grace-period publication history, or None.

Iterates DistributedPublication records for this distribution from newest to oldest,
trying each publication until the path is found. Handles both pass-through and
non-pass-through (PublishedArtifact) publications.

Returns None immediately when DISTRIBUTED_PUBLICATION_RETENTION_PERIOD is 0.
"""
if not retain_distributed_pub_enabled():
return None
recent_dp = (
DistributedPublication.get_non_expired()
.filter(distribution=self)
.order_by("-pulp_created")
.select_related("publication__repository_version")
)
for dp in recent_dp.iterator():
pub = dp.publication
if pub.pass_through:
ca = (
ContentArtifact.objects.select_related("artifact", "artifact__pulp_domain")
.filter(content__in=pub.repository_version.content, relative_path=path)
.first()
)
if ca is not None:
return ca
else:
pa = (
pub.published_artifact.select_related(
"content_artifact__artifact",
"content_artifact__artifact__pulp_domain",
)
.filter(relative_path=path)
.first()
)
if pa is not None:
return pa.content_artifact
return None

@hook(AFTER_CREATE)
@hook(
AFTER_UPDATE,
when_any=["publication", "repository", "repository_version"],
has_changed=True,
is_not=None,
)
def set_distributed_publication(self):
"""Track the publication being served when a distribution is created or changed."""
detail = self.cast()
if not detail.SERVE_FROM_PUBLICATION or not retain_distributed_pub_enabled():
return
_, _, pub = detail.get_repository_publication_and_version()
if pub is None:
return
DistributedPublication(distribution=self, publication=pub).save()

@hook(
AFTER_UPDATE,
when_any=["publication", "repository", "repository_version"],
has_changed=True,
is_now=None,
)
def clear_distributed_publication(self):
"""Remove all DistributedPublication records when the distribution's source is cleared."""
DistributedPublication.objects.filter(distribution=self).delete()

@hook(BEFORE_DELETE)
@hook(
AFTER_UPDATE,
Expand Down Expand Up @@ -794,3 +874,45 @@ class Meta:
permissions = [
("manage_roles_artifactdistribution", "Can manage roles on artifact distributions"),
]


class DistributedPublication(BaseModel):
"""
Records the history of publications served by each Distribution.

Keeps superseded publications alive and serveable for a configurable grace period.
Null `expired_at` means the publication was not superseded yet.
"""

distribution = models.ForeignKey(
Distribution, on_delete=models.CASCADE, related_name="core_distributedpublications"
)
publication = models.ForeignKey(
Publication, on_delete=models.CASCADE, related_name="core_distributedpublications"
)
expires_at = models.DateTimeField(null=True)

@classmethod
def get_non_expired(cls, include_current=True):
if include_current:
return cls.objects.filter(
models.Q(expires_at__isnull=True) | models.Q(expires_at__gte=timezone.now())
)
return cls.objects.filter(expires_at__gte=timezone.now())

@classmethod
def get_expired(cls):
return cls.objects.filter(expires_at__lt=timezone.now())

@hook(AFTER_CREATE)
def cleanup(self):
"""Expire older active DistributedPublications; delete already-expired ones."""
DistributedPublication.get_expired().delete()
superseded = DistributedPublication.objects.exclude(pk=self.pk).filter(
distribution=self.distribution, expires_at__isnull=True
)
if not retain_distributed_pub_enabled():
superseded.delete()
else:
retention = settings.DISTRIBUTED_PUBLICATION_RETENTION_PERIOD
superseded.update(expires_at=timezone.now() + timedelta(seconds=retention))
Loading
Loading