|
24 | 24 | import jakarta.annotation.Nonnull; |
25 | 25 | import jakarta.annotation.Nullable; |
26 | 26 | import java.util.AbstractMap; |
| 27 | +import java.util.ArrayList; |
| 28 | +import java.util.HashMap; |
| 29 | +import java.util.Iterator; |
27 | 30 | import java.util.List; |
| 31 | +import java.util.Map; |
| 32 | +import java.util.Objects; |
| 33 | +import java.util.Optional; |
28 | 34 | import java.util.concurrent.ConcurrentHashMap; |
29 | 35 | import java.util.concurrent.TimeUnit; |
| 36 | +import java.util.function.Function; |
| 37 | +import java.util.stream.Collectors; |
30 | 38 | import org.apache.polaris.core.PolarisCallContext; |
31 | 39 | import org.apache.polaris.core.PolarisDiagnostics; |
32 | 40 | import org.apache.polaris.core.config.BehaviorChangeConfiguration; |
33 | 41 | import org.apache.polaris.core.config.FeatureConfiguration; |
34 | 42 | import org.apache.polaris.core.config.RealmConfig; |
| 43 | +import org.apache.polaris.core.entity.EntityNameLookupRecord; |
35 | 44 | import org.apache.polaris.core.entity.PolarisBaseEntity; |
| 45 | +import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; |
| 46 | +import org.apache.polaris.core.entity.PolarisEntityId; |
36 | 47 | import org.apache.polaris.core.entity.PolarisEntityType; |
37 | 48 | import org.apache.polaris.core.entity.PolarisGrantRecord; |
38 | 49 | import org.apache.polaris.core.persistence.PolarisMetaStoreManager; |
39 | 50 | import org.apache.polaris.core.persistence.ResolvedPolarisEntity; |
| 51 | +import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult; |
| 52 | +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; |
40 | 53 | import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; |
| 54 | +import org.slf4j.Logger; |
| 55 | +import org.slf4j.LoggerFactory; |
41 | 56 |
|
42 | 57 | /** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */ |
43 | 58 | public class InMemoryEntityCache implements EntityCache { |
44 | | - |
| 59 | + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEntityCache.class); |
45 | 60 | private EntityCacheMode cacheMode; |
46 | 61 | private final PolarisDiagnostics diagnostics; |
47 | 62 | private final PolarisMetaStoreManager polarisMetaStoreManager; |
@@ -473,4 +488,152 @@ && isNewer(existingCacheEntry, existingCacheEntryByName)) { |
473 | 488 | // return what we found |
474 | 489 | return new EntityCacheLookupResult(entry, cacheHit); |
475 | 490 | } |
| 491 | + |
| 492 | + @Override |
| 493 | + public List<EntityCacheLookupResult> getOrLoadResolvedEntities( |
| 494 | + @Nonnull PolarisCallContext callCtx, |
| 495 | + @Nonnull PolarisEntityType entityType, |
| 496 | + @Nonnull List<PolarisEntityId> entityIds) { |
| 497 | + // use a map to collect cached entries to avoid concurrency problems in case a second thread is |
| 498 | + // trying to populate |
| 499 | + // the cache from a different snapshot |
| 500 | + Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new HashMap<>(); |
| 501 | + for (int i = 0; i < 100; i++) { |
| 502 | + Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc = |
| 503 | + idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx, entityType, idsToLoad); |
| 504 | + if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { |
| 505 | + break; |
| 506 | + } |
| 507 | + } |
| 508 | + |
| 509 | + return entityIds.stream() |
| 510 | + .map( |
| 511 | + id -> { |
| 512 | + ResolvedPolarisEntity entity = resolvedEntities.get(id); |
| 513 | + return entity == null ? null : new EntityCacheLookupResult(entity, true); |
| 514 | + }) |
| 515 | + .collect(Collectors.toList()); |
| 516 | + } |
| 517 | + |
| 518 | + @Override |
| 519 | + public List<EntityCacheLookupResult> getOrLoadResolvedEntities( |
| 520 | + @Nonnull PolarisCallContext callCtx, @Nonnull List<EntityNameLookupRecord> lookupRecords) { |
| 521 | + Map<PolarisEntityId, EntityNameLookupRecord> entityIdMap = |
| 522 | + lookupRecords.stream() |
| 523 | + .collect( |
| 524 | + Collectors.toMap( |
| 525 | + e -> new PolarisEntityId(e.getCatalogId(), e.getId()), |
| 526 | + Function.identity(), |
| 527 | + (a, b) -> a)); |
| 528 | + Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc = |
| 529 | + idsToLoad -> |
| 530 | + polarisMetaStoreManager.loadResolvedEntities( |
| 531 | + callCtx, idsToLoad.stream().map(entityIdMap::get).collect(Collectors.toList())); |
| 532 | + |
| 533 | + // use a map to collect cached entries to avoid concurrency problems in case a second thread is |
| 534 | + // trying to populate |
| 535 | + // the cache from a different snapshot |
| 536 | + Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new HashMap<>(); |
| 537 | + List<PolarisEntityId> entityIds = |
| 538 | + lookupRecords.stream() |
| 539 | + .map(e -> new PolarisEntityId(e.getCatalogId(), e.getId())) |
| 540 | + .collect(Collectors.toList()); |
| 541 | + for (int i = 0; i < 100; i++) { |
| 542 | + if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { |
| 543 | + break; |
| 544 | + } |
| 545 | + } |
| 546 | + |
| 547 | + return lookupRecords.stream() |
| 548 | + .map( |
| 549 | + lookupRecord -> { |
| 550 | + ResolvedPolarisEntity entity = |
| 551 | + resolvedEntities.get( |
| 552 | + new PolarisEntityId(lookupRecord.getCatalogId(), lookupRecord.getId())); |
| 553 | + return entity == null ? null : new EntityCacheLookupResult(entity, true); |
| 554 | + }) |
| 555 | + .collect(Collectors.toList()); |
| 556 | + } |
| 557 | + |
| 558 | + private boolean isCacheStateValid( |
| 559 | + @Nonnull PolarisCallContext callCtx, |
| 560 | + @Nonnull Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities, |
| 561 | + @Nonnull List<PolarisEntityId> entityIds, |
| 562 | + @Nonnull Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc) { |
| 563 | + ChangeTrackingResult changeTrackingResult = |
| 564 | + polarisMetaStoreManager.loadEntitiesChangeTracking(callCtx, entityIds); |
| 565 | + List<PolarisEntityId> idsToLoad = new ArrayList<>(); |
| 566 | + if (changeTrackingResult.isSuccess()) { |
| 567 | + idsToLoad.addAll(validateCacheEntries(entityIds, resolvedEntities, changeTrackingResult)); |
| 568 | + } else { |
| 569 | + idsToLoad.addAll(entityIds); |
| 570 | + } |
| 571 | + if (!idsToLoad.isEmpty()) { |
| 572 | + ResolvedEntitiesResult resolvedEntitiesResult = loaderFunc.apply(idsToLoad); |
| 573 | + if (resolvedEntitiesResult.isSuccess()) { |
| 574 | + LOGGER.debug("Resolved entities - validating cache"); |
| 575 | + resolvedEntitiesResult.getResolvedEntities().stream() |
| 576 | + .filter(Objects::nonNull) |
| 577 | + .forEach( |
| 578 | + e -> { |
| 579 | + this.cacheNewEntry(e); |
| 580 | + resolvedEntities.put( |
| 581 | + new PolarisEntityId(e.getEntity().getCatalogId(), e.getEntity().getId()), e); |
| 582 | + }); |
| 583 | + } |
| 584 | + } |
| 585 | + |
| 586 | + // the loader function should always return a batch of results from the same "snapshot" of the |
| 587 | + // persistence, so |
| 588 | + // if the changeTracking call above failed, we should have loaded the entire batch in one shot. |
| 589 | + // There should be no |
| 590 | + // need to revalidate the entities. |
| 591 | + List<PolarisEntityId> idsToReload = |
| 592 | + changeTrackingResult.isSuccess() |
| 593 | + ? validateCacheEntries(entityIds, resolvedEntities, changeTrackingResult) |
| 594 | + : List.of(); |
| 595 | + return idsToReload.isEmpty(); |
| 596 | + } |
| 597 | + |
| 598 | + private List<PolarisEntityId> validateCacheEntries( |
| 599 | + List<PolarisEntityId> entityIds, |
| 600 | + Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities, |
| 601 | + ChangeTrackingResult changeTrackingResult) { |
| 602 | + List<PolarisEntityId> idsToReload = new ArrayList<>(); |
| 603 | + Iterator<PolarisEntityId> idIterator = entityIds.iterator(); |
| 604 | + Iterator<PolarisChangeTrackingVersions> changeTrackingIterator = |
| 605 | + changeTrackingResult.getChangeTrackingVersions().iterator(); |
| 606 | + while (idIterator.hasNext() && changeTrackingIterator.hasNext()) { |
| 607 | + PolarisEntityId entityId = idIterator.next(); |
| 608 | + PolarisChangeTrackingVersions changeTrackingVersions = changeTrackingIterator.next(); |
| 609 | + if (changeTrackingVersions == null) { |
| 610 | + // entity has been purged |
| 611 | + ResolvedPolarisEntity cachedEntity = getEntityById(entityId.getId()); |
| 612 | + if (cachedEntity != null || resolvedEntities.containsKey(entityId)) { |
| 613 | + LOGGER.debug("Entity {} has been purged, removing from cache", entityId); |
| 614 | + Optional.ofNullable(cachedEntity).ifPresent(this::removeCacheEntry); |
| 615 | + resolvedEntities.remove(entityId); |
| 616 | + } |
| 617 | + continue; |
| 618 | + } |
| 619 | + // compare versions using equals rather than less than so we can use the same function to |
| 620 | + // validate that the cache |
| 621 | + // entries are consistent with a single call to the change tracking table, rather than some |
| 622 | + // grants ahead and some |
| 623 | + // grants behind |
| 624 | + ResolvedPolarisEntity cachedEntity = |
| 625 | + resolvedEntities.computeIfAbsent(entityId, id -> this.getEntityById(id.getId())); |
| 626 | + if (cachedEntity == null |
| 627 | + || cachedEntity.getEntity().getEntityVersion() |
| 628 | + != changeTrackingVersions.getEntityVersion() |
| 629 | + || cachedEntity.getEntity().getGrantRecordsVersion() |
| 630 | + != changeTrackingVersions.getGrantRecordsVersion()) { |
| 631 | + idsToReload.add(entityId); |
| 632 | + } else { |
| 633 | + resolvedEntities.put(entityId, cachedEntity); |
| 634 | + } |
| 635 | + } |
| 636 | + LOGGER.debug("Cache entries {} need to be reloaded", idsToReload); |
| 637 | + return idsToReload; |
| 638 | + } |
476 | 639 | } |
0 commit comments