|  | 
| 24 | 24 | import jakarta.annotation.Nonnull; | 
| 25 | 25 | import jakarta.annotation.Nullable; | 
| 26 | 26 | import java.util.AbstractMap; | 
|  | 27 | +import java.util.ArrayList; | 
|  | 28 | +import java.util.HashMap; | 
|  | 29 | +import java.util.Iterator; | 
| 27 | 30 | import java.util.List; | 
|  | 31 | +import java.util.Map; | 
|  | 32 | +import java.util.Objects; | 
|  | 33 | +import java.util.Optional; | 
| 28 | 34 | import java.util.concurrent.ConcurrentHashMap; | 
| 29 | 35 | import java.util.concurrent.TimeUnit; | 
|  | 36 | +import java.util.function.Function; | 
|  | 37 | +import java.util.stream.Collectors; | 
| 30 | 38 | import org.apache.polaris.core.PolarisCallContext; | 
| 31 | 39 | import org.apache.polaris.core.PolarisDiagnostics; | 
| 32 | 40 | import org.apache.polaris.core.config.BehaviorChangeConfiguration; | 
| 33 | 41 | import org.apache.polaris.core.config.FeatureConfiguration; | 
| 34 | 42 | import org.apache.polaris.core.config.RealmConfig; | 
|  | 43 | +import org.apache.polaris.core.entity.EntityNameLookupRecord; | 
| 35 | 44 | import org.apache.polaris.core.entity.PolarisBaseEntity; | 
|  | 45 | +import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; | 
|  | 46 | +import org.apache.polaris.core.entity.PolarisEntityId; | 
| 36 | 47 | import org.apache.polaris.core.entity.PolarisEntityType; | 
| 37 | 48 | import org.apache.polaris.core.entity.PolarisGrantRecord; | 
| 38 | 49 | import org.apache.polaris.core.persistence.PolarisMetaStoreManager; | 
| 39 | 50 | import org.apache.polaris.core.persistence.ResolvedPolarisEntity; | 
|  | 51 | +import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult; | 
|  | 52 | +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; | 
| 40 | 53 | import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; | 
|  | 54 | +import org.slf4j.Logger; | 
|  | 55 | +import org.slf4j.LoggerFactory; | 
| 41 | 56 | 
 | 
| 42 | 57 | /** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */ | 
| 43 | 58 | public class InMemoryEntityCache implements EntityCache { | 
| 44 |  | - | 
|  | 59 | +  private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEntityCache.class); | 
| 45 | 60 |   private EntityCacheMode cacheMode; | 
| 46 | 61 |   private final PolarisDiagnostics diagnostics; | 
| 47 | 62 |   private final PolarisMetaStoreManager polarisMetaStoreManager; | 
| @@ -473,4 +488,152 @@ && isNewer(existingCacheEntry, existingCacheEntryByName)) { | 
| 473 | 488 |     // return what we found | 
| 474 | 489 |     return new EntityCacheLookupResult(entry, cacheHit); | 
| 475 | 490 |   } | 
|  | 491 | + | 
|  | 492 | +  @Override | 
|  | 493 | +  public List<EntityCacheLookupResult> getOrLoadResolvedEntities( | 
|  | 494 | +      @Nonnull PolarisCallContext callCtx, | 
|  | 495 | +      @Nonnull PolarisEntityType entityType, | 
|  | 496 | +      @Nonnull List<PolarisEntityId> entityIds) { | 
|  | 497 | +    // use a map to collect cached entries to avoid concurrency problems in case a second thread is | 
|  | 498 | +    // trying to populate | 
|  | 499 | +    // the cache from a different snapshot | 
|  | 500 | +    Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new HashMap<>(); | 
|  | 501 | +    for (int i = 0; i < 100; i++) { | 
|  | 502 | +      Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc = | 
|  | 503 | +          idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx, entityType, idsToLoad); | 
|  | 504 | +      if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { | 
|  | 505 | +        break; | 
|  | 506 | +      } | 
|  | 507 | +    } | 
|  | 508 | + | 
|  | 509 | +    return entityIds.stream() | 
|  | 510 | +        .map( | 
|  | 511 | +            id -> { | 
|  | 512 | +              ResolvedPolarisEntity entity = resolvedEntities.get(id); | 
|  | 513 | +              return entity == null ? null : new EntityCacheLookupResult(entity, true); | 
|  | 514 | +            }) | 
|  | 515 | +        .collect(Collectors.toList()); | 
|  | 516 | +  } | 
|  | 517 | + | 
|  | 518 | +  @Override | 
|  | 519 | +  public List<EntityCacheLookupResult> getOrLoadResolvedEntities( | 
|  | 520 | +      @Nonnull PolarisCallContext callCtx, @Nonnull List<EntityNameLookupRecord> lookupRecords) { | 
|  | 521 | +    Map<PolarisEntityId, EntityNameLookupRecord> entityIdMap = | 
|  | 522 | +        lookupRecords.stream() | 
|  | 523 | +            .collect( | 
|  | 524 | +                Collectors.toMap( | 
|  | 525 | +                    e -> new PolarisEntityId(e.getCatalogId(), e.getId()), | 
|  | 526 | +                    Function.identity(), | 
|  | 527 | +                    (a, b) -> a)); | 
|  | 528 | +    Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc = | 
|  | 529 | +        idsToLoad -> | 
|  | 530 | +            polarisMetaStoreManager.loadResolvedEntities( | 
|  | 531 | +                callCtx, idsToLoad.stream().map(entityIdMap::get).collect(Collectors.toList())); | 
|  | 532 | + | 
|  | 533 | +    // use a map to collect cached entries to avoid concurrency problems in case a second thread is | 
|  | 534 | +    // trying to populate | 
|  | 535 | +    // the cache from a different snapshot | 
|  | 536 | +    Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new HashMap<>(); | 
|  | 537 | +    List<PolarisEntityId> entityIds = | 
|  | 538 | +        lookupRecords.stream() | 
|  | 539 | +            .map(e -> new PolarisEntityId(e.getCatalogId(), e.getId())) | 
|  | 540 | +            .collect(Collectors.toList()); | 
|  | 541 | +    for (int i = 0; i < 100; i++) { | 
|  | 542 | +      if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { | 
|  | 543 | +        break; | 
|  | 544 | +      } | 
|  | 545 | +    } | 
|  | 546 | + | 
|  | 547 | +    return lookupRecords.stream() | 
|  | 548 | +        .map( | 
|  | 549 | +            lookupRecord -> { | 
|  | 550 | +              ResolvedPolarisEntity entity = | 
|  | 551 | +                  resolvedEntities.get( | 
|  | 552 | +                      new PolarisEntityId(lookupRecord.getCatalogId(), lookupRecord.getId())); | 
|  | 553 | +              return entity == null ? null : new EntityCacheLookupResult(entity, true); | 
|  | 554 | +            }) | 
|  | 555 | +        .collect(Collectors.toList()); | 
|  | 556 | +  } | 
|  | 557 | + | 
|  | 558 | +  private boolean isCacheStateValid( | 
|  | 559 | +      @Nonnull PolarisCallContext callCtx, | 
|  | 560 | +      @Nonnull Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities, | 
|  | 561 | +      @Nonnull List<PolarisEntityId> entityIds, | 
|  | 562 | +      @Nonnull Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc) { | 
|  | 563 | +    ChangeTrackingResult changeTrackingResult = | 
|  | 564 | +        polarisMetaStoreManager.loadEntitiesChangeTracking(callCtx, entityIds); | 
|  | 565 | +    List<PolarisEntityId> idsToLoad = new ArrayList<>(); | 
|  | 566 | +    if (changeTrackingResult.isSuccess()) { | 
|  | 567 | +      idsToLoad.addAll(validateCacheEntries(entityIds, resolvedEntities, changeTrackingResult)); | 
|  | 568 | +    } else { | 
|  | 569 | +      idsToLoad.addAll(entityIds); | 
|  | 570 | +    } | 
|  | 571 | +    if (!idsToLoad.isEmpty()) { | 
|  | 572 | +      ResolvedEntitiesResult resolvedEntitiesResult = loaderFunc.apply(idsToLoad); | 
|  | 573 | +      if (resolvedEntitiesResult.isSuccess()) { | 
|  | 574 | +        LOGGER.debug("Resolved entities - validating cache"); | 
|  | 575 | +        resolvedEntitiesResult.getResolvedEntities().stream() | 
|  | 576 | +            .filter(Objects::nonNull) | 
|  | 577 | +            .forEach( | 
|  | 578 | +                e -> { | 
|  | 579 | +                  this.cacheNewEntry(e); | 
|  | 580 | +                  resolvedEntities.put( | 
|  | 581 | +                      new PolarisEntityId(e.getEntity().getCatalogId(), e.getEntity().getId()), e); | 
|  | 582 | +                }); | 
|  | 583 | +      } | 
|  | 584 | +    } | 
|  | 585 | + | 
|  | 586 | +    // the loader function should always return a batch of results from the same "snapshot" of the | 
|  | 587 | +    // persistence, so | 
|  | 588 | +    // if the changeTracking call above failed, we should have loaded the entire batch in one shot. | 
|  | 589 | +    // There should be no | 
|  | 590 | +    // need to revalidate the entities. | 
|  | 591 | +    List<PolarisEntityId> idsToReload = | 
|  | 592 | +        changeTrackingResult.isSuccess() | 
|  | 593 | +            ? validateCacheEntries(entityIds, resolvedEntities, changeTrackingResult) | 
|  | 594 | +            : List.of(); | 
|  | 595 | +    return idsToReload.isEmpty(); | 
|  | 596 | +  } | 
|  | 597 | + | 
|  | 598 | +  private List<PolarisEntityId> validateCacheEntries( | 
|  | 599 | +      List<PolarisEntityId> entityIds, | 
|  | 600 | +      Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities, | 
|  | 601 | +      ChangeTrackingResult changeTrackingResult) { | 
|  | 602 | +    List<PolarisEntityId> idsToReload = new ArrayList<>(); | 
|  | 603 | +    Iterator<PolarisEntityId> idIterator = entityIds.iterator(); | 
|  | 604 | +    Iterator<PolarisChangeTrackingVersions> changeTrackingIterator = | 
|  | 605 | +        changeTrackingResult.getChangeTrackingVersions().iterator(); | 
|  | 606 | +    while (idIterator.hasNext() && changeTrackingIterator.hasNext()) { | 
|  | 607 | +      PolarisEntityId entityId = idIterator.next(); | 
|  | 608 | +      PolarisChangeTrackingVersions changeTrackingVersions = changeTrackingIterator.next(); | 
|  | 609 | +      if (changeTrackingVersions == null) { | 
|  | 610 | +        // entity has been purged | 
|  | 611 | +        ResolvedPolarisEntity cachedEntity = getEntityById(entityId.getId()); | 
|  | 612 | +        if (cachedEntity != null || resolvedEntities.containsKey(entityId)) { | 
|  | 613 | +          LOGGER.debug("Entity {} has been purged, removing from cache", entityId); | 
|  | 614 | +          Optional.ofNullable(cachedEntity).ifPresent(this::removeCacheEntry); | 
|  | 615 | +          resolvedEntities.remove(entityId); | 
|  | 616 | +        } | 
|  | 617 | +        continue; | 
|  | 618 | +      } | 
|  | 619 | +      // compare versions using equals rather than less than so we can use the same function to | 
|  | 620 | +      // validate that the cache | 
|  | 621 | +      // entries are consistent with a single call to the change tracking table, rather than some | 
|  | 622 | +      // grants ahead and some | 
|  | 623 | +      // grants behind | 
|  | 624 | +      ResolvedPolarisEntity cachedEntity = | 
|  | 625 | +          resolvedEntities.computeIfAbsent(entityId, id -> this.getEntityById(id.getId())); | 
|  | 626 | +      if (cachedEntity == null | 
|  | 627 | +          || cachedEntity.getEntity().getEntityVersion() | 
|  | 628 | +              != changeTrackingVersions.getEntityVersion() | 
|  | 629 | +          || cachedEntity.getEntity().getGrantRecordsVersion() | 
|  | 630 | +              != changeTrackingVersions.getGrantRecordsVersion()) { | 
|  | 631 | +        idsToReload.add(entityId); | 
|  | 632 | +      } else { | 
|  | 633 | +        resolvedEntities.put(entityId, cachedEntity); | 
|  | 634 | +      } | 
|  | 635 | +    } | 
|  | 636 | +    LOGGER.debug("Cache entries {} need to be reloaded", idsToReload); | 
|  | 637 | +    return idsToReload; | 
|  | 638 | +  } | 
| 476 | 639 | } | 
0 commit comments