Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/tmp.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimized sync with content caching in `QueryExistingContents` and query fix in `QueryExistingArtifacts`.
71 changes: 58 additions & 13 deletions pulpcore/plugin/stages/content_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,32 @@ class QueryExistingContents(Stage):

This stage drains all available items from `self._in_q` and batches everything into one large
call to the db for efficiency.

When `repo_version` is provided, content from that version is cached in memory on first
access (per content type) so that repeat syncs can resolve most items without per-batch
database queries.

Plugins with expensive fields that are not needed during sync can pass
`deferred_fields` - a mapping of content model class to a list of field names
to exclude from queries via Django's `defer()`.
Comment on lines +39 to +40
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is deferred the right way? Models can have a lot of fields, but only a few are ever used during a sync. Wouldn't only be the more practical option?

Copy link
Copy Markdown
Contributor

@dralley dralley Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this discussion (hidden because it was marked resolved) #7754 (comment)

TL;DR yes .only() could be theoretically more efficient, but it could also easily cause a regression without plugin intervention. You have to keep track of which fields are being used rigorously.

OTOH .defer() would not cause a regression, and since it's really only a small number of fields on a small number of models that are a problem, it is less invasive.

"""

def __init__(self, repo_version=None, deferred_fields=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._repo_version = repo_version
self._deferred_fields = deferred_fields or {}
self._content_cache = {}

def _ensure_type_cache(self, model_type):
if model_type not in self._content_cache and self._repo_version is not None:
deferred = self._deferred_fields.get(model_type, ())
cache = {}
for content in self._repo_version.get_content(
model_type.objects.defer(*deferred)
).iterator():
cache[content.natural_key()] = content
self._content_cache[model_type] = cache

async def run(self):
"""
The coroutine for this stage.
Expand All @@ -42,25 +66,46 @@ async def run(self):
async for batch in self.batches():
content_q_by_type = defaultdict(lambda: Q(pk__in=[]))
d_content_by_nat_key = defaultdict(list)
cache_hits_by_type = defaultdict(set)

for d_content in batch:
if d_content.content._state.adding:
Copy link
Copy Markdown
Contributor

@dralley dralley Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically content could be passed through already saved, in which case I think we're probably not touch ing it. That might be an existing bug, though not a particularly serious one.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand this. Why do we need to call touch when content already exists? Is it because of orphan clean up?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it basically resets the orphan cleanup protection timer.

model_type = type(d_content.content)
unit_q = d_content.content.q()
content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q
d_content_by_nat_key[d_content.content.natural_key()].append(d_content)

nat_key = d_content.content.natural_key()
await sync_to_async(self._ensure_type_cache)(model_type)
cached = self._content_cache.get(model_type, {}).get(nat_key)
if cached is not None:
d_content.content = cached
cache_hits_by_type[model_type].add(cached.pk)
else:
unit_q = d_content.content.q()
content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q
d_content_by_nat_key[nat_key].append(d_content)

db_results_by_type = defaultdict(list)
for model_type, content_q in content_q_by_type.items():
try:
await sync_to_async(model_type.objects.filter(content_q).touch)()
except AttributeError:
raise TypeError(
"Plugins which declare custom ORM managers on their content classes "
"should have those managers inherit from "
"pulpcore.plugin.models.ContentManager."
)
deferred = self._deferred_fields.get(model_type, ())
async for result in sync_to_async_iterable(
model_type.objects.filter(content_q).iterator()
model_type.objects.filter(content_q).defer(*deferred).iterator()
):
db_results_by_type[model_type].append(result)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
db_results_by_type[model_type].append(result)
db_results_by_type[model_type].append(result)
for d_content in d_content_by_nat_key[result.natural_key()]:
d_content.content = result

Move this here and delete the added loops on line 107-108.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this would cause "swap before touch" which we wanted to avoid.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to verify that it's OK to swap before touching. It MIGHT be ok, but IIRC the touch stuff was particularly subtle and the order it was done in might matter.


all_types = set(cache_hits_by_type.keys()) | set(db_results_by_type.keys())
for model_type in all_types:
pks = cache_hits_by_type.get(model_type, set())
pks = pks | {r.pk for r in db_results_by_type.get(model_type, [])}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pks = pks | {r.pk for r in db_results_by_type.get(model_type, [])}
pks |= {r.pk for r in db_results_by_type.get(model_type, [])}

if pks:
try:
await sync_to_async(model_type.objects.filter(pk__in=pks).touch)()
except AttributeError:
raise TypeError(
"Plugins which declare custom ORM managers on their content classes "
"should have those managers inherit from "
"pulpcore.plugin.models.ContentManager."
)

for model_type, results in db_results_by_type.items():
for result in results:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little worried that the additional loops here would outweigh the reduced # of queries

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rework it, but that would result in "swap before touch". Alternatively, I can revert the last commit and keep the two queries. Which option would you prefer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdellweg I feel like it was either you or Dennis that dealt with the touch issues last, do you know if touch need to happen before the existing model swap for correctness reasons?

for d_content in d_content_by_nat_key[result.natural_key()]:
d_content.content = result

Expand Down
8 changes: 6 additions & 2 deletions pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


class DeclarativeVersion:
def __init__(self, first_stage, repository, mirror=False, acs=False):
def __init__(self, first_stage, repository, mirror=False, acs=False, deferred_fields=None):
"""
A pipeline that creates a new [pulpcore.plugin.models.RepositoryVersion][] from a
stream of [pulpcore.plugin.stages.DeclarativeContent][] objects.
Expand Down Expand Up @@ -107,12 +107,16 @@ async def run(self):
'False' is the default.
acs (bool): When set to 'True' a new stage is added to look for
Alternate Content Sources.
deferred_fields (dict): A mapping of content model class to a list of
field names to exclude from queries via Django's `defer()`.
Passed through to [pulpcore.plugin.stages.QueryExistingContents][].

"""
self.first_stage = first_stage
self.repository = repository
self.mirror = mirror
self.acs = acs
self.deferred_fields = deferred_fields

def pipeline_stages(self, new_version):
"""
Expand Down Expand Up @@ -142,7 +146,7 @@ def pipeline_stages(self, new_version):
[
ArtifactDownloader(resource_budget=resource_budget),
ArtifactSaver(resource_budget=resource_budget),
QueryExistingContents(),
QueryExistingContents(new_version, self.deferred_fields),
ContentSaver(),
RemoteArtifactSaver(),
ResolveContentFutures(),
Expand Down
Loading