From a92f9e2816c044e373654e5a0947a9e3e5b04c53 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Wed, 18 Mar 2026 14:30:03 -0400 Subject: [PATCH 1/2] Replicas should atomically update their distributions It's not ideal for distributions to pick up replica changes at random time intervals as various tasks complete, ideally the entire replica is presented as updated at once (or with the smallest possible window). closes #7333 Assisted-By: claude-opus-4.6 --- CHANGES/7333.bugfix | 1 + pulp_file/app/replica.py | 4 ++ pulpcore/app/replica.py | 37 +++++++++++++++---- pulpcore/app/tasks/replica.py | 22 +++++++++-- .../tests/functional/api/test_replication.py | 17 ++++++--- 5 files changed, 65 insertions(+), 16 deletions(-) create mode 100644 CHANGES/7333.bugfix diff --git a/CHANGES/7333.bugfix b/CHANGES/7333.bugfix new file mode 100644 index 00000000000..0724ba0df5b --- /dev/null +++ b/CHANGES/7333.bugfix @@ -0,0 +1 @@ +Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed. diff --git a/pulp_file/app/replica.py b/pulp_file/app/replica.py index aae1c2de585..6943ba954eb 100644 --- a/pulp_file/app/replica.py +++ b/pulp_file/app/replica.py @@ -33,6 +33,10 @@ def url(self, upstream_distribution): manifest = self.publication_ctx_cls( self.pulp_ctx, upstream_distribution["publication"] ).entity["manifest"] + elif upstream_distribution.get("repository_version"): + # Extract repository href from repository_version href + repo_href = upstream_distribution["repository_version"].rsplit("versions/", 1)[0] + manifest = self.repository_ctx_cls(self.pulp_ctx, repo_href).entity["manifest"] else: # This distribution doesn't serve any content return None diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index 73e2fc45855..df20d7eab2a 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -106,8 +106,10 @@ def remote_extra_fields(self, upstream_distribution): return {} def create_or_update_remote(self, upstream_distribution): - if not upstream_distribution.get("repository") and not upstream_distribution.get( - "publication" + if ( + not upstream_distribution.get("repository") + and not upstream_distribution.get("repository_version") + and not upstream_distribution.get("publication") ): return None url = self.url(upstream_distribution) @@ -171,9 +173,18 @@ def create_or_update_repository(self, remote): def distribution_extra_fields(self, repository, upstream_distribution): """ Return the fields that need to be updated/cleared on distributions for idempotence. + + Note: repository_version is computed here but filtered out for updates/creates. + It will be set atomically in finalize_replication after sync completes. """ + latest = repository.latest_version() + if latest: + repo_version_href = get_url(repository) + "versions/{}/".format(latest.number) + else: + repo_version_href = None return { - "repository": get_url(repository), + "repository": None, + "repository_version": repo_version_href, "publication": None, "base_path": upstream_distribution["base_path"], } @@ -187,7 +198,11 @@ def create_or_update_distribution(self, repository, upstream_distribution): ) if not self._is_managed(distro): return None - needs_update = self.needs_update(distribution_data, distro) + # Don't update repository_version here — that happens atomically in + # finalize_replication after all syncs complete. + # Do clear repository and publication so they don't conflict. + update_data = {k: v for k, v in distribution_data.items() if k != "repository_version"} + needs_update = self.needs_update(update_data, distro) if needs_update: # Update the distribution dispatch( @@ -197,20 +212,28 @@ def create_or_update_distribution(self, repository, upstream_distribution): exclusive_resources=self.distros_uris, args=(distro.pk, self.app_label, self.distribution_serializer_name), kwargs={ - "data": distribution_data, + "data": update_data, "partial": True, }, ) except self.distribution_model_cls.DoesNotExist: # Dispatch a task to create the distribution - distribution_data["name"] = upstream_distribution["name"] + # Don't set repository_version for new distributions - it will be set in + # finalize_replication after sync completes. + create_data = { + k: v + for k, v in distribution_data.items() + if k not in ("repository_version", "repository", "publication") + } + create_data["name"] = upstream_distribution["name"] + create_data["pulp_labels"] = distribution_data["pulp_labels"] dispatch( general_create, task_group=self.task_group, shared_resources=[repository, self.server], exclusive_resources=self.distros_uris, args=(self.app_label, self.distribution_serializer_name), - kwargs={"data": distribution_data}, + kwargs={"data": create_data}, ) def sync_params(self, repository, remote): diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 137239a49f1..13adc0ffd3f 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -3,11 +3,12 @@ import sys from tempfile import NamedTemporaryFile +from django.db import transaction from django.db.models import Min from pulpcore.constants import TASK_STATES from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig -from pulpcore.app.models import UpstreamPulp, Task, TaskGroup +from pulpcore.app.models import Distribution, Repository, UpstreamPulp, Task, TaskGroup from pulpcore.app.replica import ReplicaContext from pulpcore.tasking.tasks import dispatch @@ -77,6 +78,7 @@ def replicate_distributions(server_pk): replicator = replicator_class(ctx, task_group, tls_settings, server) supported_replicators.append(replicator) + distro_repo_pairs = [] for replicator in supported_replicators: distros = replicator.upstream_distributions(q=server.q_select) distro_names = [] @@ -90,7 +92,7 @@ def replicate_distributions(server_pk): # Check if there is already a repository repository = replicator.create_or_update_repository(remote=remote) if not repository: - # No update occured because server.policy==LABELED and there was + # No update occurred because server.policy==LABELED and there was # an already existing local repository with the same name continue @@ -103,6 +105,7 @@ def replicate_distributions(server_pk): # Add name to the list of known distribution names distro_names.append(distro["name"]) + distro_repo_pairs.append((distro["name"], str(repository.pk))) replicator.remove_missing(distro_names) @@ -110,17 +113,28 @@ def replicate_distributions(server_pk): finalize_replication, task_group=task_group, exclusive_resources=[server], - args=[server.pk], + args=[server.pk, distro_repo_pairs], ) -def finalize_replication(server_pk): +def finalize_replication(server_pk, distro_repo_pairs): task = Task.current() task_group = TaskGroup.current() server = UpstreamPulp.objects.get(pk=server_pk) if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists(): raise Exception("Replication failed.") + # Atomically update all managed distributions to point to their repo's latest version. + with transaction.atomic(): + for distro_name, repo_pk in distro_repo_pairs: + distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain) + repo = Repository.objects.get(pk=repo_pk) + latest_version = repo.latest_version() + if latest_version: + if distro.repository_version != latest_version: + distro.repository_version = latest_version + distro.save(update_fields=["repository_version"]) + # Record timestamp of last successful replication. started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"] server.set_last_replication_timestamp(started_at) diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index c2694738852..385a6afcb4a 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -117,6 +117,13 @@ def test_replication_idempotence( assert "UpstreamPulp" in obj.pulp_labels assert upstream_pulp.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"] + # Verify the replica distribution uses repository_version (not repository) + replica_distro = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results[0] + assert replica_distro.repository is None + assert replica_distro.repository_version is not None + # Now replicate backwards upstream_pulp_body = { @@ -147,7 +154,9 @@ def test_replication_idempotence( assert result.count == 1 new_distribution = result.results[0] assert new_distribution.pulp_href == distro.pulp_href - assert new_distribution.repository == new_repository.pulp_href + assert new_distribution.repository is None + assert new_distribution.repository_version is not None + assert new_distribution.repository_version.startswith(new_repository.pulp_href) assert new_distribution.publication is None assert "UpstreamPulp" in new_distribution.pulp_labels assert upstream_pulp2.prn.split(":")[-1] == new_distribution.pulp_labels["UpstreamPulp"] @@ -373,11 +382,9 @@ def _check_replication( assert upstream_pulp.last_replication > old_replication # check if the content was correctly replicated - local_version = file_bindings.RepositoriesFileApi.read( - distribution.repository - ).latest_version_href + assert distribution.repository_version is not None local_present = file_bindings.RepositoriesFileVersionsApi.read( - local_version + distribution.repository_version ).content_summary.present upstream_version = file_bindings.PublicationsFileApi.read( upstream_distribution.publication From 9e2158e363d18f316def6bbc5463dc590daa721d Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Tue, 31 Mar 2026 14:11:51 -0400 Subject: [PATCH 2/2] Add additional tests for replication Generated-By: claude-opus-4.6 --- CHANGES/7333.bugfix | 2 +- pulpcore/app/replica.py | 31 +-- pulpcore/app/tasks/replica.py | 14 +- .../tests/functional/api/test_replication.py | 194 +++++++++++++++++- 4 files changed, 204 insertions(+), 37 deletions(-) diff --git a/CHANGES/7333.bugfix b/CHANGES/7333.bugfix index 0724ba0df5b..9a514c8aae4 100644 --- a/CHANGES/7333.bugfix +++ b/CHANGES/7333.bugfix @@ -1 +1 @@ -Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed. +Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed. Unfortunately for users with existing replicated setups the very first replication post-upgrade will still have the old behavior - but afterwards it should work as expected. diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index df20d7eab2a..677515574b9 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -174,18 +174,10 @@ def distribution_extra_fields(self, repository, upstream_distribution): """ Return the fields that need to be updated/cleared on distributions for idempotence. - Note: repository_version is computed here but filtered out for updates/creates. - It will be set atomically in finalize_replication after sync completes. + Note: repository, publication, and repository_version are NOT included here. + They are all updated atomically in finalize_replication after all syncs complete. """ - latest = repository.latest_version() - if latest: - repo_version_href = get_url(repository) + "versions/{}/".format(latest.number) - else: - repo_version_href = None return { - "repository": None, - "repository_version": repo_version_href, - "publication": None, "base_path": upstream_distribution["base_path"], } @@ -198,13 +190,8 @@ def create_or_update_distribution(self, repository, upstream_distribution): ) if not self._is_managed(distro): return None - # Don't update repository_version here — that happens atomically in - # finalize_replication after all syncs complete. - # Do clear repository and publication so they don't conflict. - update_data = {k: v for k, v in distribution_data.items() if k != "repository_version"} - needs_update = self.needs_update(update_data, distro) + needs_update = self.needs_update(distribution_data, distro) if needs_update: - # Update the distribution dispatch( ageneral_update, task_group=self.task_group, @@ -212,21 +199,13 @@ def create_or_update_distribution(self, repository, upstream_distribution): exclusive_resources=self.distros_uris, args=(distro.pk, self.app_label, self.distribution_serializer_name), kwargs={ - "data": update_data, + "data": distribution_data, "partial": True, }, ) except self.distribution_model_cls.DoesNotExist: - # Dispatch a task to create the distribution - # Don't set repository_version for new distributions - it will be set in - # finalize_replication after sync completes. - create_data = { - k: v - for k, v in distribution_data.items() - if k not in ("repository_version", "repository", "publication") - } + create_data = dict(distribution_data) create_data["name"] = upstream_distribution["name"] - create_data["pulp_labels"] = distribution_data["pulp_labels"] dispatch( general_create, task_group=self.task_group, diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 13adc0ffd3f..e873c844ccb 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -124,16 +124,24 @@ def finalize_replication(server_pk, distro_repo_pairs): if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists(): raise Exception("Replication failed.") - # Atomically update all managed distributions to point to their repo's latest version. + # Atomically update all managed distributions to point to their repo's latest version, + # clearing any previous repository or publication references. with transaction.atomic(): for distro_name, repo_pk in distro_repo_pairs: distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain) repo = Repository.objects.get(pk=repo_pk) latest_version = repo.latest_version() if latest_version: - if distro.repository_version != latest_version: + needs_update = ( + distro.repository_version != latest_version + or distro.repository is not None + or distro.publication is not None + ) + if needs_update: + distro.repository = None + distro.publication = None distro.repository_version = latest_version - distro.save(update_fields=["repository_version"]) + distro.save(update_fields=["repository", "publication", "repository_version"]) # Record timestamp of last successful replication. started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"] diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index 385a6afcb4a..20cd235431c 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -13,8 +13,8 @@ def test_replication( pulp_settings, gen_object_with_cleanup, ): - # This test assures that an Upstream Pulp can be created in a non-default domain and that this - # Upstream Pulp configuration can be used to execute the replicate task. + """This test assures that an Upstream Pulp can be created in a non-default domain and that this + Upstream Pulp configuration can be used to execute the replicate task.""" # Create a non-default domain non_default_domain = domain_factory() @@ -57,8 +57,8 @@ def test_replication_idempotence( tmp_path, add_domain_objects_to_cleanup, ): - # This test assures that an Upstream Pulp can be created in a non-default domain and that this - # Upstream Pulp configuration can be used to execute the replicate task. + """This test assures that an Upstream Pulp can be created in a non-default domain and that this + Upstream Pulp configuration can be used to execute the replicate task.""" # Create a domain to replicate from source_domain = domain_factory() @@ -168,6 +168,186 @@ def test_replication_idempotence( assert upstream_pulp2.prn.split(":")[-1] == new_remote.pulp_labels["UpstreamPulp"] +@pytest.mark.parallel +def test_replication_with_repo_based_distribution( + domain_factory, + bindings_cfg, + pulpcore_bindings, + file_bindings, + monitor_task, + monitor_task_group, + pulp_settings, + gen_object_with_cleanup, + file_distribution_factory, + file_repository_factory, + file_remote_factory, + basic_manifest_path, + add_domain_objects_to_cleanup, +): + """Test replication when upstream distribution uses repository (not publication).""" + source_domain = domain_factory() + add_domain_objects_to_cleanup(source_domain) + + # Create a repo with autopublish, sync it, and distribute via repository (not publication) + remote = file_remote_factory( + pulp_domain=source_domain.name, manifest_path=basic_manifest_path, policy="immediate" + ) + repo = file_repository_factory(pulp_domain=source_domain.name, autopublish=True) + sync_data = file_bindings.module.RepositorySyncURL(remote=remote.pulp_href, mirror=True) + monitor_task(file_bindings.RepositoriesFileApi.sync(repo.pulp_href, sync_data).task) + _ = file_distribution_factory(pulp_domain=source_domain.name, repository=repo.pulp_href) + + # Replicate + replica_domain = domain_factory() + add_domain_objects_to_cleanup(replica_domain) + upstream_pulp = gen_object_with_cleanup( + pulpcore_bindings.UpstreamPulpsApi, + { + "name": str(uuid.uuid4()), + "base_url": bindings_cfg.host, + "api_root": pulp_settings.API_ROOT, + "domain": source_domain.name, + "username": bindings_cfg.username, + "password": bindings_cfg.password, + }, + pulp_domain=replica_domain.name, + ) + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + monitor_task_group(response.task_group) + + # Verify replica distribution uses repository_version, not repository or publication + replica_distro = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results[0] + assert replica_distro.repository is None + assert replica_distro.repository_version is not None + assert replica_distro.publication is None + + # Verify content was replicated + source_repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href) + source_present = file_bindings.RepositoriesFileVersionsApi.read( + source_repo.latest_version_href + ).content_summary.present + replica_present = file_bindings.RepositoriesFileVersionsApi.read( + replica_distro.repository_version + ).content_summary.present + assert source_present["file.file"]["count"] == replica_present["file.file"]["count"] + + +@pytest.mark.parallel +def test_replication_multi_distribution_content_update( + domain_factory, + bindings_cfg, + pulpcore_bindings, + file_bindings, + monitor_task, + monitor_task_group, + pulp_settings, + gen_object_with_cleanup, + file_distribution_factory, + file_repository_factory, + file_publication_factory, + add_domain_objects_to_cleanup, + tmp_path, +): + """Test that all distributions are updated after re-replication with new content.""" + source_domain = domain_factory() + add_domain_objects_to_cleanup(source_domain) + + # Create 3 repos with content and publication-based distributions + distros = [] + repos = [] + for i in range(3): + repo = file_repository_factory(pulp_domain=source_domain.name) + repos.append(repo) + file_path = tmp_path / f"file_{i}.txt" + file_path.write_text(f"content_{i}") + monitor_task( + file_bindings.ContentFilesApi.create( + file=str(file_path), + relative_path=f"file_{i}.txt", + repository=repo.pulp_href, + pulp_domain=source_domain.name, + ).task + ) + pub = file_publication_factory(pulp_domain=source_domain.name, repository=repo.pulp_href) + distros.append( + file_distribution_factory(pulp_domain=source_domain.name, publication=pub.pulp_href) + ) + + # Initial replication + replica_domain = domain_factory() + add_domain_objects_to_cleanup(replica_domain) + upstream_pulp = gen_object_with_cleanup( + pulpcore_bindings.UpstreamPulpsApi, + { + "name": str(uuid.uuid4()), + "base_url": bindings_cfg.host, + "api_root": pulp_settings.API_ROOT, + "domain": source_domain.name, + "username": bindings_cfg.username, + "password": bindings_cfg.password, + }, + pulp_domain=replica_domain.name, + ) + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + monitor_task_group(response.task_group) + + # Record initial versions + replica_distros = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results + assert len(replica_distros) == 3 + initial_versions = {} + for rd in replica_distros: + assert rd.repository is None + assert rd.repository_version is not None + assert rd.publication is None + initial_versions[rd.name] = rd.repository_version + + # Add new content to all source repos and update publications + for i, repo in enumerate(repos): + file_path = tmp_path / f"file_{i}_v2.txt" + file_path.write_text(f"new_content_{i}") + monitor_task( + file_bindings.ContentFilesApi.create( + file=str(file_path), + relative_path=f"file_{i}_v2.txt", + repository=repo.pulp_href, + pulp_domain=source_domain.name, + ).task + ) + repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href) + pub = file_publication_factory( + pulp_domain=source_domain.name, + repository_version=repo.latest_version_href, + ) + monitor_task( + file_bindings.DistributionsFileApi.partial_update( + distros[i].pulp_href, {"publication": pub.pulp_href} + ).task + ) + + # Re-replicate + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + monitor_task_group(response.task_group) + + # Verify all distributions were updated to new versions with new content + replica_distros = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results + assert len(replica_distros) == 3 + for rd in replica_distros: + assert rd.repository is None + assert rd.repository_version is not None + assert rd.publication is None + # Version should have changed + assert rd.repository_version != initial_versions[rd.name] + # Verify new content is present (original file + new file = 2) + version = file_bindings.RepositoriesFileVersionsApi.read(rd.repository_version) + assert version.content_summary.present["file.file"]["count"] == 2 + + @pytest.mark.parallel def test_replication_with_wrong_ca_cert( domain_factory, @@ -177,9 +357,9 @@ def test_replication_with_wrong_ca_cert( pulp_settings, gen_object_with_cleanup, ): - # This test assures that setting ca_cert on an Upstream Pulp causes that CA bundle to be used - # to verify the certificate presented by the Upstream Pulp's REST API. The replication tasks - # are expected to fail. + """This test assures that setting ca_cert on an Upstream Pulp causes that CA bundle to be used + to verify the certificate presented by the Upstream Pulp's REST API. The replication tasks + are expected to fail.""" if not bindings_cfg.host.startswith("https"): pytest.skip("HTTPS is not enabled for Pulp's API.")