From 1f5e3b39349cc73ff8b824cae251bb4aafb6d7ed Mon Sep 17 00:00:00 2001 From: Jitka Halova Date: Fri, 29 May 2026 18:38:09 +0200 Subject: [PATCH 1/4] Optimize re-sync content Assisted By: Claude Opus 4.6 --- pulpcore/plugin/stages/content_stages.py | 49 +++++++++++++++++-- pulpcore/plugin/stages/declarative_version.py | 2 +- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/pulpcore/plugin/stages/content_stages.py b/pulpcore/plugin/stages/content_stages.py index 78962e2bc1f..1a436a8dd01 100644 --- a/pulpcore/plugin/stages/content_stages.py +++ b/pulpcore/plugin/stages/content_stages.py @@ -32,6 +32,27 @@ class QueryExistingContents(Stage): call to the db for efficiency. """ + def __init__(self, repo_version=None, deferred_fields=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self._repo_version = repo_version + self._deferred_fields = deferred_fields or {} + self._content_cache = {} + + def _fields_for_type(self, model_type): + base = ("pk",) + model_type._sanitized_natural_key_fields() + extra = self._deferred_fields.get(model_type, ()) + return base + tuple(f for f in extra if f not in base) + + def _ensure_type_cache(self, model_type): + if model_type not in self._content_cache and self._repo_version is not None: + fields = self._fields_for_type(model_type) + cache = {} + for content in self._repo_version.get_content( + model_type.objects.only(*fields) + ).iterator(): + cache[content.natural_key()] = content + self._content_cache[model_type] = cache + async def run(self): """ The coroutine for this stage. @@ -42,12 +63,31 @@ async def run(self): async for batch in self.batches(): content_q_by_type = defaultdict(lambda: Q(pk__in=[])) d_content_by_nat_key = defaultdict(list) + cache_hits_by_type = defaultdict(lambda: Q(pk__in=[])) + for d_content in batch: if d_content.content._state.adding: model_type = type(d_content.content) - unit_q = d_content.content.q() - content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q - d_content_by_nat_key[d_content.content.natural_key()].append(d_content) + nat_key = d_content.content.natural_key() + await sync_to_async(self._ensure_type_cache)(model_type) + cached = self._content_cache.get(model_type, {}).get(nat_key) + if cached is not None: + d_content.content = cached + cache_hits_by_type[model_type] |= Q(pk=cached.pk) + else: + unit_q = d_content.content.q() + content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q + d_content_by_nat_key[nat_key].append(d_content) + + for model_type, hit_q in cache_hits_by_type.items(): + try: + await sync_to_async(model_type.objects.filter(hit_q).touch)() + except AttributeError: + raise TypeError( + "Plugins which declare custom ORM managers on their content classes " + "should have those managers inherit from " + "pulpcore.plugin.models.ContentManager." + ) for model_type, content_q in content_q_by_type.items(): try: @@ -58,8 +98,9 @@ async def run(self): "should have those managers inherit from " "pulpcore.plugin.models.ContentManager." ) + fields = self._fields_for_type(model_type) async for result in sync_to_async_iterable( - model_type.objects.filter(content_q).iterator() + model_type.objects.filter(content_q).only(*fields).iterator() ): for d_content in d_content_by_nat_key[result.natural_key()]: d_content.content = result diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 22cd54173ca..0e91071ca47 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -142,7 +142,7 @@ def pipeline_stages(self, new_version): [ ArtifactDownloader(resource_budget=resource_budget), ArtifactSaver(resource_budget=resource_budget), - QueryExistingContents(), + QueryExistingContents(new_version), ContentSaver(), RemoteArtifactSaver(), ResolveContentFutures(), From 57a04c8a3d713da1c12e6a68e479bcee96397cbb Mon Sep 17 00:00:00 2001 From: Jitka Halova Date: Mon, 1 Jun 2026 13:20:41 +0200 Subject: [PATCH 2/4] wip - fix review findings --- pulpcore/plugin/stages/content_stages.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pulpcore/plugin/stages/content_stages.py b/pulpcore/plugin/stages/content_stages.py index 1a436a8dd01..bab7d28b5c8 100644 --- a/pulpcore/plugin/stages/content_stages.py +++ b/pulpcore/plugin/stages/content_stages.py @@ -30,17 +30,26 @@ class QueryExistingContents(Stage): This stage drains all available items from `self._in_q` and batches everything into one large call to the db for efficiency. + + When `repo_version` is provided, content from that version is cached in memory on first + access (per content type) so that repeat syncs can resolve most items without per-batch + database queries. + + By default, the cache and DB queries load only the natural key fields (plus `pk`). + Plugins that need additional fields in later pipeline stages can pass + `extra_fields` - a mapping of content model class to a list of extra field names + to include. """ - def __init__(self, repo_version=None, deferred_fields=None, *args, **kwargs): + def __init__(self, repo_version=None, extra_fields=None, *args, **kwargs): super().__init__(*args, **kwargs) self._repo_version = repo_version - self._deferred_fields = deferred_fields or {} + self._extra_fields = extra_fields or {} self._content_cache = {} def _fields_for_type(self, model_type): base = ("pk",) + model_type._sanitized_natural_key_fields() - extra = self._deferred_fields.get(model_type, ()) + extra = self._extra_fields.get(model_type, ()) return base + tuple(f for f in extra if f not in base) def _ensure_type_cache(self, model_type): @@ -63,7 +72,7 @@ async def run(self): async for batch in self.batches(): content_q_by_type = defaultdict(lambda: Q(pk__in=[])) d_content_by_nat_key = defaultdict(list) - cache_hits_by_type = defaultdict(lambda: Q(pk__in=[])) + cache_hits_by_type = defaultdict(set) for d_content in batch: if d_content.content._state.adding: @@ -73,15 +82,15 @@ async def run(self): cached = self._content_cache.get(model_type, {}).get(nat_key) if cached is not None: d_content.content = cached - cache_hits_by_type[model_type] |= Q(pk=cached.pk) + cache_hits_by_type[model_type].add(cached.pk) else: unit_q = d_content.content.q() content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q d_content_by_nat_key[nat_key].append(d_content) - for model_type, hit_q in cache_hits_by_type.items(): + for model_type, hit_pks in cache_hits_by_type.items(): try: - await sync_to_async(model_type.objects.filter(hit_q).touch)() + await sync_to_async(model_type.objects.filter(pk__in=hit_pks).touch)() except AttributeError: raise TypeError( "Plugins which declare custom ORM managers on their content classes " From 22fa096ea0b4c7d9c7c134b324be92dbda749274 Mon Sep 17 00:00:00 2001 From: Jitka Halova Date: Wed, 3 Jun 2026 15:03:14 +0200 Subject: [PATCH 3/4] wip - reduce redundant queries --- CHANGES/tmp.bugfix | 1 + pulpcore/plugin/stages/content_stages.py | 37 ++++++++++++------------ 2 files changed, 20 insertions(+), 18 deletions(-) create mode 100644 CHANGES/tmp.bugfix diff --git a/CHANGES/tmp.bugfix b/CHANGES/tmp.bugfix new file mode 100644 index 00000000000..0aa60de66ce --- /dev/null +++ b/CHANGES/tmp.bugfix @@ -0,0 +1 @@ +Optimized sync with content caching in `QueryExistingContents` and query fix in `QueryExistingArtifacts`. diff --git a/pulpcore/plugin/stages/content_stages.py b/pulpcore/plugin/stages/content_stages.py index bab7d28b5c8..d30df481f69 100644 --- a/pulpcore/plugin/stages/content_stages.py +++ b/pulpcore/plugin/stages/content_stages.py @@ -88,29 +88,30 @@ async def run(self): content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q d_content_by_nat_key[nat_key].append(d_content) - for model_type, hit_pks in cache_hits_by_type.items(): - try: - await sync_to_async(model_type.objects.filter(pk__in=hit_pks).touch)() - except AttributeError: - raise TypeError( - "Plugins which declare custom ORM managers on their content classes " - "should have those managers inherit from " - "pulpcore.plugin.models.ContentManager." - ) - + db_results_by_type = defaultdict(list) for model_type, content_q in content_q_by_type.items(): - try: - await sync_to_async(model_type.objects.filter(content_q).touch)() - except AttributeError: - raise TypeError( - "Plugins which declare custom ORM managers on their content classes " - "should have those managers inherit from " - "pulpcore.plugin.models.ContentManager." - ) fields = self._fields_for_type(model_type) async for result in sync_to_async_iterable( model_type.objects.filter(content_q).only(*fields).iterator() ): + db_results_by_type[model_type].append(result) + + all_types = set(cache_hits_by_type.keys()) | set(db_results_by_type.keys()) + for model_type in all_types: + pks = cache_hits_by_type.get(model_type, set()) + pks = pks | {r.pk for r in db_results_by_type.get(model_type, [])} + if pks: + try: + await sync_to_async(model_type.objects.filter(pk__in=pks).touch)() + except AttributeError: + raise TypeError( + "Plugins which declare custom ORM managers on their content classes " + "should have those managers inherit from " + "pulpcore.plugin.models.ContentManager." + ) + + for model_type, results in db_results_by_type.items(): + for result in results: for d_content in d_content_by_nat_key[result.natural_key()]: d_content.content = result From 37ce3a4809e8a5aefd4c688100e4f24bd8c49af2 Mon Sep 17 00:00:00 2001 From: Jitka Halova Date: Wed, 3 Jun 2026 15:06:46 +0200 Subject: [PATCH 4/4] Rework and expose deferred_fields --- pulpcore/plugin/stages/content_stages.py | 24 +++++++------------ pulpcore/plugin/stages/declarative_version.py | 8 +++++-- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/pulpcore/plugin/stages/content_stages.py b/pulpcore/plugin/stages/content_stages.py index d30df481f69..ec1f675c39e 100644 --- a/pulpcore/plugin/stages/content_stages.py +++ b/pulpcore/plugin/stages/content_stages.py @@ -35,29 +35,23 @@ class QueryExistingContents(Stage): access (per content type) so that repeat syncs can resolve most items without per-batch database queries. - By default, the cache and DB queries load only the natural key fields (plus `pk`). - Plugins that need additional fields in later pipeline stages can pass - `extra_fields` - a mapping of content model class to a list of extra field names - to include. + Plugins with expensive fields that are not needed during sync can pass + `deferred_fields` - a mapping of content model class to a list of field names + to exclude from queries via Django's `defer()`. """ - def __init__(self, repo_version=None, extra_fields=None, *args, **kwargs): + def __init__(self, repo_version=None, deferred_fields=None, *args, **kwargs): super().__init__(*args, **kwargs) self._repo_version = repo_version - self._extra_fields = extra_fields or {} + self._deferred_fields = deferred_fields or {} self._content_cache = {} - def _fields_for_type(self, model_type): - base = ("pk",) + model_type._sanitized_natural_key_fields() - extra = self._extra_fields.get(model_type, ()) - return base + tuple(f for f in extra if f not in base) - def _ensure_type_cache(self, model_type): if model_type not in self._content_cache and self._repo_version is not None: - fields = self._fields_for_type(model_type) + deferred = self._deferred_fields.get(model_type, ()) cache = {} for content in self._repo_version.get_content( - model_type.objects.only(*fields) + model_type.objects.defer(*deferred) ).iterator(): cache[content.natural_key()] = content self._content_cache[model_type] = cache @@ -90,9 +84,9 @@ async def run(self): db_results_by_type = defaultdict(list) for model_type, content_q in content_q_by_type.items(): - fields = self._fields_for_type(model_type) + deferred = self._deferred_fields.get(model_type, ()) async for result in sync_to_async_iterable( - model_type.objects.filter(content_q).only(*fields).iterator() + model_type.objects.filter(content_q).defer(*deferred).iterator() ): db_results_by_type[model_type].append(result) diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 0e91071ca47..fce83223015 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -21,7 +21,7 @@ class DeclarativeVersion: - def __init__(self, first_stage, repository, mirror=False, acs=False): + def __init__(self, first_stage, repository, mirror=False, acs=False, deferred_fields=None): """ A pipeline that creates a new [pulpcore.plugin.models.RepositoryVersion][] from a stream of [pulpcore.plugin.stages.DeclarativeContent][] objects. @@ -107,12 +107,16 @@ async def run(self): 'False' is the default. acs (bool): When set to 'True' a new stage is added to look for Alternate Content Sources. + deferred_fields (dict): A mapping of content model class to a list of + field names to exclude from queries via Django's `defer()`. + Passed through to [pulpcore.plugin.stages.QueryExistingContents][]. """ self.first_stage = first_stage self.repository = repository self.mirror = mirror self.acs = acs + self.deferred_fields = deferred_fields def pipeline_stages(self, new_version): """ @@ -142,7 +146,7 @@ def pipeline_stages(self, new_version): [ ArtifactDownloader(resource_budget=resource_budget), ArtifactSaver(resource_budget=resource_budget), - QueryExistingContents(new_version), + QueryExistingContents(new_version, self.deferred_fields), ContentSaver(), RemoteArtifactSaver(), ResolveContentFutures(),