diff --git a/CHANGES/7333.bugfix b/CHANGES/7333.bugfix new file mode 100644 index 0000000000..9a514c8aae --- /dev/null +++ b/CHANGES/7333.bugfix @@ -0,0 +1 @@ +Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed. Unfortunately for users with existing replicated setups the very first replication post-upgrade will still have the old behavior - but afterwards it should work as expected. diff --git a/pulp_file/app/replica.py b/pulp_file/app/replica.py index aae1c2de58..6943ba954e 100644 --- a/pulp_file/app/replica.py +++ b/pulp_file/app/replica.py @@ -33,6 +33,10 @@ def url(self, upstream_distribution): manifest = self.publication_ctx_cls( self.pulp_ctx, upstream_distribution["publication"] ).entity["manifest"] + elif upstream_distribution.get("repository_version"): + # Extract repository href from repository_version href + repo_href = upstream_distribution["repository_version"].rsplit("versions/", 1)[0] + manifest = self.repository_ctx_cls(self.pulp_ctx, repo_href).entity["manifest"] else: # This distribution doesn't serve any content return None diff --git a/pulpcore/app/replica.py b/pulpcore/app/replica.py index 73e2fc4585..677515574b 100644 --- a/pulpcore/app/replica.py +++ b/pulpcore/app/replica.py @@ -106,8 +106,10 @@ def remote_extra_fields(self, upstream_distribution): return {} def create_or_update_remote(self, upstream_distribution): - if not upstream_distribution.get("repository") and not upstream_distribution.get( - "publication" + if ( + not upstream_distribution.get("repository") + and not upstream_distribution.get("repository_version") + and not upstream_distribution.get("publication") ): return None url = self.url(upstream_distribution) @@ -171,10 +173,11 @@ def create_or_update_repository(self, remote): def distribution_extra_fields(self, repository, upstream_distribution): """ Return the fields that need to be updated/cleared on distributions for idempotence. + + Note: repository, publication, and repository_version are NOT included here. + They are all updated atomically in finalize_replication after all syncs complete. """ return { - "repository": get_url(repository), - "publication": None, "base_path": upstream_distribution["base_path"], } @@ -189,7 +192,6 @@ def create_or_update_distribution(self, repository, upstream_distribution): return None needs_update = self.needs_update(distribution_data, distro) if needs_update: - # Update the distribution dispatch( ageneral_update, task_group=self.task_group, @@ -202,15 +204,15 @@ def create_or_update_distribution(self, repository, upstream_distribution): }, ) except self.distribution_model_cls.DoesNotExist: - # Dispatch a task to create the distribution - distribution_data["name"] = upstream_distribution["name"] + create_data = dict(distribution_data) + create_data["name"] = upstream_distribution["name"] dispatch( general_create, task_group=self.task_group, shared_resources=[repository, self.server], exclusive_resources=self.distros_uris, args=(self.app_label, self.distribution_serializer_name), - kwargs={"data": distribution_data}, + kwargs={"data": create_data}, ) def sync_params(self, repository, remote): diff --git a/pulpcore/app/tasks/replica.py b/pulpcore/app/tasks/replica.py index 137239a49f..e873c844cc 100644 --- a/pulpcore/app/tasks/replica.py +++ b/pulpcore/app/tasks/replica.py @@ -3,11 +3,12 @@ import sys from tempfile import NamedTemporaryFile +from django.db import transaction from django.db.models import Min from pulpcore.constants import TASK_STATES from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig -from pulpcore.app.models import UpstreamPulp, Task, TaskGroup +from pulpcore.app.models import Distribution, Repository, UpstreamPulp, Task, TaskGroup from pulpcore.app.replica import ReplicaContext from pulpcore.tasking.tasks import dispatch @@ -77,6 +78,7 @@ def replicate_distributions(server_pk): replicator = replicator_class(ctx, task_group, tls_settings, server) supported_replicators.append(replicator) + distro_repo_pairs = [] for replicator in supported_replicators: distros = replicator.upstream_distributions(q=server.q_select) distro_names = [] @@ -90,7 +92,7 @@ def replicate_distributions(server_pk): # Check if there is already a repository repository = replicator.create_or_update_repository(remote=remote) if not repository: - # No update occured because server.policy==LABELED and there was + # No update occurred because server.policy==LABELED and there was # an already existing local repository with the same name continue @@ -103,6 +105,7 @@ def replicate_distributions(server_pk): # Add name to the list of known distribution names distro_names.append(distro["name"]) + distro_repo_pairs.append((distro["name"], str(repository.pk))) replicator.remove_missing(distro_names) @@ -110,17 +113,36 @@ def replicate_distributions(server_pk): finalize_replication, task_group=task_group, exclusive_resources=[server], - args=[server.pk], + args=[server.pk, distro_repo_pairs], ) -def finalize_replication(server_pk): +def finalize_replication(server_pk, distro_repo_pairs): task = Task.current() task_group = TaskGroup.current() server = UpstreamPulp.objects.get(pk=server_pk) if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists(): raise Exception("Replication failed.") + # Atomically update all managed distributions to point to their repo's latest version, + # clearing any previous repository or publication references. + with transaction.atomic(): + for distro_name, repo_pk in distro_repo_pairs: + distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain) + repo = Repository.objects.get(pk=repo_pk) + latest_version = repo.latest_version() + if latest_version: + needs_update = ( + distro.repository_version != latest_version + or distro.repository is not None + or distro.publication is not None + ) + if needs_update: + distro.repository = None + distro.publication = None + distro.repository_version = latest_version + distro.save(update_fields=["repository", "publication", "repository_version"]) + # Record timestamp of last successful replication. started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"] server.set_last_replication_timestamp(started_at) diff --git a/pulpcore/tests/functional/api/test_replication.py b/pulpcore/tests/functional/api/test_replication.py index c269473885..20cd235431 100644 --- a/pulpcore/tests/functional/api/test_replication.py +++ b/pulpcore/tests/functional/api/test_replication.py @@ -13,8 +13,8 @@ def test_replication( pulp_settings, gen_object_with_cleanup, ): - # This test assures that an Upstream Pulp can be created in a non-default domain and that this - # Upstream Pulp configuration can be used to execute the replicate task. + """This test assures that an Upstream Pulp can be created in a non-default domain and that this + Upstream Pulp configuration can be used to execute the replicate task.""" # Create a non-default domain non_default_domain = domain_factory() @@ -57,8 +57,8 @@ def test_replication_idempotence( tmp_path, add_domain_objects_to_cleanup, ): - # This test assures that an Upstream Pulp can be created in a non-default domain and that this - # Upstream Pulp configuration can be used to execute the replicate task. + """This test assures that an Upstream Pulp can be created in a non-default domain and that this + Upstream Pulp configuration can be used to execute the replicate task.""" # Create a domain to replicate from source_domain = domain_factory() @@ -117,6 +117,13 @@ def test_replication_idempotence( assert "UpstreamPulp" in obj.pulp_labels assert upstream_pulp.prn.split(":")[-1] == obj.pulp_labels["UpstreamPulp"] + # Verify the replica distribution uses repository_version (not repository) + replica_distro = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results[0] + assert replica_distro.repository is None + assert replica_distro.repository_version is not None + # Now replicate backwards upstream_pulp_body = { @@ -147,7 +154,9 @@ def test_replication_idempotence( assert result.count == 1 new_distribution = result.results[0] assert new_distribution.pulp_href == distro.pulp_href - assert new_distribution.repository == new_repository.pulp_href + assert new_distribution.repository is None + assert new_distribution.repository_version is not None + assert new_distribution.repository_version.startswith(new_repository.pulp_href) assert new_distribution.publication is None assert "UpstreamPulp" in new_distribution.pulp_labels assert upstream_pulp2.prn.split(":")[-1] == new_distribution.pulp_labels["UpstreamPulp"] @@ -159,6 +168,186 @@ def test_replication_idempotence( assert upstream_pulp2.prn.split(":")[-1] == new_remote.pulp_labels["UpstreamPulp"] +@pytest.mark.parallel +def test_replication_with_repo_based_distribution( + domain_factory, + bindings_cfg, + pulpcore_bindings, + file_bindings, + monitor_task, + monitor_task_group, + pulp_settings, + gen_object_with_cleanup, + file_distribution_factory, + file_repository_factory, + file_remote_factory, + basic_manifest_path, + add_domain_objects_to_cleanup, +): + """Test replication when upstream distribution uses repository (not publication).""" + source_domain = domain_factory() + add_domain_objects_to_cleanup(source_domain) + + # Create a repo with autopublish, sync it, and distribute via repository (not publication) + remote = file_remote_factory( + pulp_domain=source_domain.name, manifest_path=basic_manifest_path, policy="immediate" + ) + repo = file_repository_factory(pulp_domain=source_domain.name, autopublish=True) + sync_data = file_bindings.module.RepositorySyncURL(remote=remote.pulp_href, mirror=True) + monitor_task(file_bindings.RepositoriesFileApi.sync(repo.pulp_href, sync_data).task) + _ = file_distribution_factory(pulp_domain=source_domain.name, repository=repo.pulp_href) + + # Replicate + replica_domain = domain_factory() + add_domain_objects_to_cleanup(replica_domain) + upstream_pulp = gen_object_with_cleanup( + pulpcore_bindings.UpstreamPulpsApi, + { + "name": str(uuid.uuid4()), + "base_url": bindings_cfg.host, + "api_root": pulp_settings.API_ROOT, + "domain": source_domain.name, + "username": bindings_cfg.username, + "password": bindings_cfg.password, + }, + pulp_domain=replica_domain.name, + ) + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + monitor_task_group(response.task_group) + + # Verify replica distribution uses repository_version, not repository or publication + replica_distro = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results[0] + assert replica_distro.repository is None + assert replica_distro.repository_version is not None + assert replica_distro.publication is None + + # Verify content was replicated + source_repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href) + source_present = file_bindings.RepositoriesFileVersionsApi.read( + source_repo.latest_version_href + ).content_summary.present + replica_present = file_bindings.RepositoriesFileVersionsApi.read( + replica_distro.repository_version + ).content_summary.present + assert source_present["file.file"]["count"] == replica_present["file.file"]["count"] + + +@pytest.mark.parallel +def test_replication_multi_distribution_content_update( + domain_factory, + bindings_cfg, + pulpcore_bindings, + file_bindings, + monitor_task, + monitor_task_group, + pulp_settings, + gen_object_with_cleanup, + file_distribution_factory, + file_repository_factory, + file_publication_factory, + add_domain_objects_to_cleanup, + tmp_path, +): + """Test that all distributions are updated after re-replication with new content.""" + source_domain = domain_factory() + add_domain_objects_to_cleanup(source_domain) + + # Create 3 repos with content and publication-based distributions + distros = [] + repos = [] + for i in range(3): + repo = file_repository_factory(pulp_domain=source_domain.name) + repos.append(repo) + file_path = tmp_path / f"file_{i}.txt" + file_path.write_text(f"content_{i}") + monitor_task( + file_bindings.ContentFilesApi.create( + file=str(file_path), + relative_path=f"file_{i}.txt", + repository=repo.pulp_href, + pulp_domain=source_domain.name, + ).task + ) + pub = file_publication_factory(pulp_domain=source_domain.name, repository=repo.pulp_href) + distros.append( + file_distribution_factory(pulp_domain=source_domain.name, publication=pub.pulp_href) + ) + + # Initial replication + replica_domain = domain_factory() + add_domain_objects_to_cleanup(replica_domain) + upstream_pulp = gen_object_with_cleanup( + pulpcore_bindings.UpstreamPulpsApi, + { + "name": str(uuid.uuid4()), + "base_url": bindings_cfg.host, + "api_root": pulp_settings.API_ROOT, + "domain": source_domain.name, + "username": bindings_cfg.username, + "password": bindings_cfg.password, + }, + pulp_domain=replica_domain.name, + ) + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + monitor_task_group(response.task_group) + + # Record initial versions + replica_distros = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results + assert len(replica_distros) == 3 + initial_versions = {} + for rd in replica_distros: + assert rd.repository is None + assert rd.repository_version is not None + assert rd.publication is None + initial_versions[rd.name] = rd.repository_version + + # Add new content to all source repos and update publications + for i, repo in enumerate(repos): + file_path = tmp_path / f"file_{i}_v2.txt" + file_path.write_text(f"new_content_{i}") + monitor_task( + file_bindings.ContentFilesApi.create( + file=str(file_path), + relative_path=f"file_{i}_v2.txt", + repository=repo.pulp_href, + pulp_domain=source_domain.name, + ).task + ) + repo = file_bindings.RepositoriesFileApi.read(repo.pulp_href) + pub = file_publication_factory( + pulp_domain=source_domain.name, + repository_version=repo.latest_version_href, + ) + monitor_task( + file_bindings.DistributionsFileApi.partial_update( + distros[i].pulp_href, {"publication": pub.pulp_href} + ).task + ) + + # Re-replicate + response = pulpcore_bindings.UpstreamPulpsApi.replicate(upstream_pulp.pulp_href) + monitor_task_group(response.task_group) + + # Verify all distributions were updated to new versions with new content + replica_distros = file_bindings.DistributionsFileApi.list( + pulp_domain=replica_domain.name + ).results + assert len(replica_distros) == 3 + for rd in replica_distros: + assert rd.repository is None + assert rd.repository_version is not None + assert rd.publication is None + # Version should have changed + assert rd.repository_version != initial_versions[rd.name] + # Verify new content is present (original file + new file = 2) + version = file_bindings.RepositoriesFileVersionsApi.read(rd.repository_version) + assert version.content_summary.present["file.file"]["count"] == 2 + + @pytest.mark.parallel def test_replication_with_wrong_ca_cert( domain_factory, @@ -168,9 +357,9 @@ def test_replication_with_wrong_ca_cert( pulp_settings, gen_object_with_cleanup, ): - # This test assures that setting ca_cert on an Upstream Pulp causes that CA bundle to be used - # to verify the certificate presented by the Upstream Pulp's REST API. The replication tasks - # are expected to fail. + """This test assures that setting ca_cert on an Upstream Pulp causes that CA bundle to be used + to verify the certificate presented by the Upstream Pulp's REST API. The replication tasks + are expected to fail.""" if not bindings_cfg.host.startswith("https"): pytest.skip("HTTPS is not enabled for Pulp's API.") @@ -373,11 +562,9 @@ def _check_replication( assert upstream_pulp.last_replication > old_replication # check if the content was correctly replicated - local_version = file_bindings.RepositoriesFileApi.read( - distribution.repository - ).latest_version_href + assert distribution.repository_version is not None local_present = file_bindings.RepositoriesFileVersionsApi.read( - local_version + distribution.repository_version ).content_summary.present upstream_version = file_bindings.PublicationsFileApi.read( upstream_distribution.publication