diff --git a/CHANGES/+optimize_resync.bugfix b/CHANGES/+optimize_resync.bugfix new file mode 100644 index 00000000000..281dc405e0b --- /dev/null +++ b/CHANGES/+optimize_resync.bugfix @@ -0,0 +1 @@ +Optimized resync with content caching in `QueryExistingContents`. diff --git a/pulpcore/plugin/stages/content_stages.py b/pulpcore/plugin/stages/content_stages.py index 78962e2bc1f..77278d1a8e5 100644 --- a/pulpcore/plugin/stages/content_stages.py +++ b/pulpcore/plugin/stages/content_stages.py @@ -30,8 +30,32 @@ class QueryExistingContents(Stage): This stage drains all available items from `self._in_q` and batches everything into one large call to the db for efficiency. + + When `repo_version` is provided, content from that version is cached in memory on first + access (per content type) so that repeat syncs can resolve most items without per-batch + database queries. + + Plugins with expensive fields that are not needed during sync can pass + `deferred_fields` - a mapping of content model class to a list of field names + to exclude from queries via Django's `defer()`. """ + def __init__(self, repo_version=None, deferred_fields=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self._repo_version = repo_version + self._deferred_fields = deferred_fields or {} + self._content_cache = {} + + def _ensure_type_cache(self, model_type): + if model_type not in self._content_cache and self._repo_version is not None: + deferred = self._deferred_fields.get(model_type, ()) + cache = {} + for content in self._repo_version.get_content( + model_type.objects.defer(*deferred) + ).iterator(): + cache[content.natural_key()] = content + self._content_cache[model_type] = cache + async def run(self): """ The coroutine for this stage. @@ -42,25 +66,46 @@ async def run(self): async for batch in self.batches(): content_q_by_type = defaultdict(lambda: Q(pk__in=[])) d_content_by_nat_key = defaultdict(list) + cache_hits_by_type = defaultdict(set) + for d_content in batch: if d_content.content._state.adding: model_type = type(d_content.content) - unit_q = d_content.content.q() - content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q - d_content_by_nat_key[d_content.content.natural_key()].append(d_content) - + nat_key = d_content.content.natural_key() + await sync_to_async(self._ensure_type_cache)(model_type) + cached = self._content_cache.get(model_type, {}).get(nat_key) + if cached is not None: + d_content.content = cached + cache_hits_by_type[model_type].add(cached.pk) + else: + unit_q = d_content.content.q() + content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q + d_content_by_nat_key[nat_key].append(d_content) + + db_results_by_type = defaultdict(list) for model_type, content_q in content_q_by_type.items(): - try: - await sync_to_async(model_type.objects.filter(content_q).touch)() - except AttributeError: - raise TypeError( - "Plugins which declare custom ORM managers on their content classes " - "should have those managers inherit from " - "pulpcore.plugin.models.ContentManager." - ) + deferred = self._deferred_fields.get(model_type, ()) async for result in sync_to_async_iterable( - model_type.objects.filter(content_q).iterator() + model_type.objects.filter(content_q).defer(*deferred).iterator() ): + db_results_by_type[model_type].append(result) + + all_types = set(cache_hits_by_type.keys()) | set(db_results_by_type.keys()) + for model_type in all_types: + pks = cache_hits_by_type.get(model_type, set()) + pks |= {r.pk for r in db_results_by_type.get(model_type, [])} + if pks: + try: + await sync_to_async(model_type.objects.filter(pk__in=pks).touch)() + except AttributeError: + raise TypeError( + "Plugins which declare custom ORM managers on their content classes " + "should have those managers inherit from " + "pulpcore.plugin.models.ContentManager." + ) + + for model_type, results in db_results_by_type.items(): + for result in results: for d_content in d_content_by_nat_key[result.natural_key()]: d_content.content = result diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 22cd54173ca..fce83223015 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -21,7 +21,7 @@ class DeclarativeVersion: - def __init__(self, first_stage, repository, mirror=False, acs=False): + def __init__(self, first_stage, repository, mirror=False, acs=False, deferred_fields=None): """ A pipeline that creates a new [pulpcore.plugin.models.RepositoryVersion][] from a stream of [pulpcore.plugin.stages.DeclarativeContent][] objects. @@ -107,12 +107,16 @@ async def run(self): 'False' is the default. acs (bool): When set to 'True' a new stage is added to look for Alternate Content Sources. + deferred_fields (dict): A mapping of content model class to a list of + field names to exclude from queries via Django's `defer()`. + Passed through to [pulpcore.plugin.stages.QueryExistingContents][]. """ self.first_stage = first_stage self.repository = repository self.mirror = mirror self.acs = acs + self.deferred_fields = deferred_fields def pipeline_stages(self, new_version): """ @@ -142,7 +146,7 @@ def pipeline_stages(self, new_version): [ ArtifactDownloader(resource_budget=resource_budget), ArtifactSaver(resource_budget=resource_budget), - QueryExistingContents(), + QueryExistingContents(new_version, self.deferred_fields), ContentSaver(), RemoteArtifactSaver(), ResolveContentFutures(),