Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7333.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pulp Replicas now present all of their distribution updates at the end of the replication process, rather than each individual repository-distribution pair being updated individually as syncs and publishes are completed. Unfortunately for users with existing replicated setups the very first replication post-upgrade will still have the old behavior - but afterwards it should work as expected.
4 changes: 4 additions & 0 deletions pulp_file/app/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def url(self, upstream_distribution):
manifest = self.publication_ctx_cls(
self.pulp_ctx, upstream_distribution["publication"]
).entity["manifest"]
elif upstream_distribution.get("repository_version"):
# Extract repository href from repository_version href
repo_href = upstream_distribution["repository_version"].rsplit("versions/", 1)[0]
manifest = self.repository_ctx_cls(self.pulp_ctx, repo_href).entity["manifest"]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky, it implies that other plugins may also have compatibility issues without modification

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe pulp_python may also need a tweak

else:
# This distribution doesn't serve any content
return None
Expand Down
18 changes: 10 additions & 8 deletions pulpcore/app/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ def remote_extra_fields(self, upstream_distribution):
return {}

def create_or_update_remote(self, upstream_distribution):
if not upstream_distribution.get("repository") and not upstream_distribution.get(
"publication"
if (
not upstream_distribution.get("repository")
and not upstream_distribution.get("repository_version")
and not upstream_distribution.get("publication")
):
return None
url = self.url(upstream_distribution)
Expand Down Expand Up @@ -171,10 +173,11 @@ def create_or_update_repository(self, remote):
def distribution_extra_fields(self, repository, upstream_distribution):
"""
Return the fields that need to be updated/cleared on distributions for idempotence.

Note: repository, publication, and repository_version are NOT included here.
They are all updated atomically in finalize_replication after all syncs complete.
"""
return {
"repository": get_url(repository),
"publication": None,
Comment on lines -176 to -177
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay for simplicity!

"base_path": upstream_distribution["base_path"],
}

Expand All @@ -189,7 +192,6 @@ def create_or_update_distribution(self, repository, upstream_distribution):
return None
needs_update = self.needs_update(distribution_data, distro)
if needs_update:
# Update the distribution
dispatch(
ageneral_update,
task_group=self.task_group,
Expand All @@ -202,15 +204,15 @@ def create_or_update_distribution(self, repository, upstream_distribution):
},
)
except self.distribution_model_cls.DoesNotExist:
# Dispatch a task to create the distribution
distribution_data["name"] = upstream_distribution["name"]
create_data = dict(distribution_data)
create_data["name"] = upstream_distribution["name"]
dispatch(
general_create,
task_group=self.task_group,
shared_resources=[repository, self.server],
exclusive_resources=self.distros_uris,
args=(self.app_label, self.distribution_serializer_name),
kwargs={"data": distribution_data},
kwargs={"data": create_data},
)

def sync_params(self, repository, remote):
Expand Down
30 changes: 26 additions & 4 deletions pulpcore/app/tasks/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import sys
from tempfile import NamedTemporaryFile

from django.db import transaction
from django.db.models import Min

from pulpcore.constants import TASK_STATES
from pulpcore.app.apps import pulp_plugin_configs, PulpAppConfig
from pulpcore.app.models import UpstreamPulp, Task, TaskGroup
from pulpcore.app.models import Distribution, Repository, UpstreamPulp, Task, TaskGroup
from pulpcore.app.replica import ReplicaContext
from pulpcore.tasking.tasks import dispatch

Expand Down Expand Up @@ -77,6 +78,7 @@ def replicate_distributions(server_pk):
replicator = replicator_class(ctx, task_group, tls_settings, server)
supported_replicators.append(replicator)

distro_repo_pairs = []
for replicator in supported_replicators:
distros = replicator.upstream_distributions(q=server.q_select)
distro_names = []
Expand All @@ -90,7 +92,7 @@ def replicate_distributions(server_pk):
# Check if there is already a repository
repository = replicator.create_or_update_repository(remote=remote)
if not repository:
# No update occured because server.policy==LABELED and there was
# No update occurred because server.policy==LABELED and there was
# an already existing local repository with the same name
continue

Expand All @@ -103,24 +105,44 @@ def replicate_distributions(server_pk):

# Add name to the list of known distribution names
distro_names.append(distro["name"])
distro_repo_pairs.append((distro["name"], str(repository.pk)))

replicator.remove_missing(distro_names)

dispatch(
finalize_replication,
task_group=task_group,
exclusive_resources=[server],
args=[server.pk],
args=[server.pk, distro_repo_pairs],
)


def finalize_replication(server_pk):
def finalize_replication(server_pk, distro_repo_pairs):
task = Task.current()
task_group = TaskGroup.current()
server = UpstreamPulp.objects.get(pk=server_pk)
if task_group.tasks.exclude(pk=task.pk).exclude(state=TASK_STATES.COMPLETED).exists():
raise Exception("Replication failed.")

# Atomically update all managed distributions to point to their repo's latest version,
# clearing any previous repository or publication references.
with transaction.atomic():
for distro_name, repo_pk in distro_repo_pairs:
distro = Distribution.objects.get(name=distro_name, pulp_domain=server.pulp_domain)
repo = Repository.objects.get(pk=repo_pk)
latest_version = repo.latest_version()
if latest_version:
needs_update = (
distro.repository_version != latest_version
or distro.repository is not None
or distro.publication is not None
)
if needs_update:
distro.repository = None
distro.publication = None
distro.repository_version = latest_version
distro.save(update_fields=["repository", "publication", "repository_version"])

# Record timestamp of last successful replication.
started_at = task_group.tasks.aggregate(Min("started_at"))["started_at__min"]
server.set_last_replication_timestamp(started_at)
Loading
Loading