diff --git a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java index ccaf16cf0c..1fa3e86678 100644 --- a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java +++ b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java @@ -378,9 +378,13 @@ public void deleteAllInCurrentTxn(@Nonnull PolarisCallContext callCtx) { @Override public @Nonnull List lookupEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, List entityIds) { - return this.store.lookupEntities(localSession.get(), entityIds).stream() - .map(ModelEntity::toEntity) - .toList(); + Map idMap = + this.store.lookupEntities(localSession.get(), entityIds).stream() + .map(ModelEntity::toEntity) + .collect( + Collectors.toMap( + e -> new PolarisEntityId(e.getCatalogId(), e.getId()), Function.identity())); + return entityIds.stream().map(idMap::get).collect(Collectors.toList()); } /** {@inheritDoc} */ diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 8d6201453f..9401df2dd0 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -31,6 +31,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -463,7 +464,12 @@ public List lookupEntities( PreparedQuery query = QueryGenerator.generateSelectQueryWithEntityIds(realmId, schemaVersion, entityIds); try { - return datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion)); + Map idMap = + datasourceOperations.executeSelect(query, new ModelEntity(schemaVersion)).stream() + .collect( + Collectors.toMap( + e -> new PolarisEntityId(e.getCatalogId(), e.getId()), Function.identity())); + return entityIds.stream().map(idMap::get).collect(Collectors.toList()); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e); @@ -476,6 +482,7 @@ public List lookupEntityVersions( @Nonnull PolarisCallContext callCtx, List entityIds) { Map idToEntityMap = lookupEntities(callCtx, entityIds).stream() + .filter(Objects::nonNull) .collect( Collectors.toMap( entry -> new PolarisEntityId(entry.getCatalogId(), entry.getId()), @@ -570,7 +577,7 @@ public Page listEntities( @Nonnull @Override - public Page loadEntities( + public Page listFullEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index e2c46c1515..cd5a032ae3 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.config.FeatureConfiguration; @@ -67,6 +68,7 @@ import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult; import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; import org.apache.polaris.core.persistence.pagination.Page; @@ -705,7 +707,7 @@ private void revokeGrantRecord( /** {@inheritDoc} */ @Override - public @Nonnull Page loadEntities( + public @Nonnull Page listFullEntities( @Nonnull PolarisCallContext callCtx, @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @@ -728,7 +730,7 @@ private void revokeGrantRecord( // with sensitive data; but the window of inconsistency is only the duration of a single // in-flight request (the cache-based resolution follows a different path entirely). - return ms.loadEntities( + return ms.listFullEntities( callCtx, catalogId, parentId, @@ -1203,7 +1205,7 @@ private void revokeGrantRecord( // get the list of catalog roles, at most 2 List catalogRoles = - ms.loadEntities( + ms.listFullEntities( callCtx, catalogId, catalogId, @@ -1523,7 +1525,7 @@ private void revokeGrantRecord( // find all available tasks Page availableTasks = - ms.loadEntities( + ms.listFullEntities( callCtx, PolarisEntityConstants.getRootEntityId(), PolarisEntityConstants.getRootEntityId(), @@ -1763,6 +1765,56 @@ private void revokeGrantRecord( return result; } + @Nonnull + @Override + public ResolvedEntitiesResult loadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List entityIds) { + BasePersistence ms = callCtx.getMetaStore(); + return getResolvedEntitiesResult(callCtx, ms, entityIds, i -> entityType); + } + + private static ResolvedEntitiesResult getResolvedEntitiesResult( + PolarisCallContext callCtx, + BasePersistence ms, + List entityIds, + Function entityTypeForIndex) { + List entities = ms.lookupEntities(callCtx, entityIds); + // mimic the behavior of loadEntity above, return null if not found or type mismatch + List ret = + IntStream.range(0, entityIds.size()) + .mapToObj( + i -> { + if (entities.get(i) != null + && !entities.get(i).getType().equals(entityTypeForIndex.apply(i))) { + return null; + } else { + return entities.get(i); + } + }) + .map(e -> toResolvedPolarisEntity(callCtx, e, ms)) + .collect(Collectors.toList()); + return new ResolvedEntitiesResult(ret); + } + + private static ResolvedPolarisEntity toResolvedPolarisEntity( + PolarisCallContext callCtx, PolarisBaseEntity e, BasePersistence ms) { + if (e == null) { + return null; + } else { + // load the grant records + final List grantRecordsAsSecurable = + ms.loadAllGrantRecordsOnSecurable(callCtx, e.getCatalogId(), e.getId()); + final List grantRecordsAsGrantee = + e.getType().isGrantee() + ? ms.loadAllGrantRecordsOnGrantee(callCtx, e.getCatalogId(), e.getId()) + : List.of(); + return new ResolvedPolarisEntity( + PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable); + } + } + /** {@inheritDoc} */ @Override public @Nonnull ResolvedEntityResult refreshResolvedEntity( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java index 4fa60e8c5d..05aefa3922 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java @@ -279,7 +279,7 @@ List lookupEntityVersions( /** * List lightweight information of entities matching the given criteria with pagination. If all - * properties of the entity are required,use {@link #loadEntities} instead. + * properties of the entity are required,use {@link #listFullEntities} instead. * * @param callCtx call context * @param catalogId catalog id for that entity, NULL_ID if the entity is top-level @@ -314,7 +314,7 @@ Page listEntities( * @return the paged list of matching entities after transformation */ @Nonnull - Page loadEntities( + Page listFullEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java index 566b10e644..2273718d05 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java @@ -47,6 +47,7 @@ import org.apache.polaris.core.persistence.dao.entity.EntityWithPath; import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult; import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; @@ -114,7 +115,7 @@ EntityResult readEntityByName( /** * List lightweight information about entities matching the given criteria. If all properties of - * the entity are required,use {@link #loadEntities} instead. + * the entity are required,use {@link #listFullEntities} instead. * * @param callCtx call context * @param catalogPath path inside a catalog. If null or empty, the entities to list are top-level, @@ -135,7 +136,7 @@ ListEntitiesResult listEntities( /** * Load full entities matching the given criteria with pagination. If only the entity name/id/type * is required, use {@link #listEntities} instead. If no pagination is required, use {@link - * #loadEntitiesAll} instead. + * #listFullEntitiesAll} instead. * * @param callCtx call context * @param catalogPath path inside a catalog. If null or empty, the entities to list are top-level, @@ -145,7 +146,7 @@ ListEntitiesResult listEntities( * @return paged list of matching entities */ @Nonnull - Page loadEntities( + Page listFullEntities( @Nonnull PolarisCallContext callCtx, @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @@ -154,7 +155,7 @@ Page loadEntities( /** * Load full entities matching the given criteria into an unpaged list. If pagination is required - * use {@link #loadEntities} instead. If only the entity name/id/type is required, use {@link + * use {@link #listFullEntities} instead. If only the entity name/id/type is required, use {@link * #listEntities} instead. * * @param callCtx call context @@ -164,12 +165,13 @@ Page loadEntities( * @param entitySubType subType of entities to list (or ANY_SUBTYPE) * @return list of all matching entities */ - default @Nonnull List loadEntitiesAll( + default @Nonnull List listFullEntitiesAll( @Nonnull PolarisCallContext callCtx, @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @Nonnull PolarisEntitySubType entitySubType) { - return loadEntities(callCtx, catalogPath, entityType, entitySubType, PageToken.readEverything()) + return listFullEntities( + callCtx, catalogPath, entityType, entitySubType, PageToken.readEverything()) .items(); } @@ -416,6 +418,23 @@ ResolvedEntityResult loadResolvedEntityByName( @Nonnull PolarisEntityType entityType, @Nonnull String entityName); + /** + * Load a batch of resolved entities of a specified entity type given their {@link + * PolarisEntityId}. Will return an empty list if the input list is empty. Order in that returned + * list is the same as the input list. Some elements might be NULL if the entity has been dropped. + * + * @param callCtx call context + * @param entityType the type of entities to load + * @param entityIds the list of entity ids to load + * @return a non-null list of entities corresponding to the lookup keys. Some elements might be + * NULL if the entity has been dropped. + */ + @Nonnull + ResolvedEntitiesResult loadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List entityIds); + /** * Refresh a resolved entity from the backend store. Will return NULL if the entity does not * exist, i.e. has been purged or dropped. Else, will determine what has changed based on the diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index 8729558935..99c1f81624 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -53,6 +53,7 @@ import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult; import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; import org.apache.polaris.core.persistence.pagination.Page; @@ -133,7 +134,7 @@ public EntityResult readEntityByName( } @Override - public @Nonnull Page loadEntities( + public @Nonnull Page listFullEntities( @Nonnull PolarisCallContext callCtx, @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @@ -379,6 +380,16 @@ public ResolvedEntityResult loadResolvedEntityByName( return null; } + @Nonnull + @Override + public ResolvedEntitiesResult loadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List entityIds) { + diagnostics.fail("illegal_method_in_transaction_workspace", "loadResolvedEntities"); + return null; + } + @Override public ResolvedEntityResult refreshResolvedEntity( @Nonnull PolarisCallContext callCtx, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java index cd438c9950..481fa7a9e3 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java @@ -20,8 +20,10 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import java.util.List; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; @@ -80,4 +82,32 @@ EntityCacheLookupResult getOrLoadEntityById( @Nullable EntityCacheLookupResult getOrLoadEntityByName( @Nonnull PolarisCallContext callContext, @Nonnull EntityCacheByNameKey entityNameKey); + + /** + * Load multiple entities by id, returning those found in the cache and loading those not found. + * + *

Cached entity versions and grant versions must be verified against the versions returned by + * the {@link + * org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadEntitiesChangeTracking(PolarisCallContext, + * List)} API to ensure the returned entities are consistent with the current state of the + * metastore. Cache implementations must never return a mix of stale entities and fresh entities, + * as authorization or table conflict decisions could be made based on inconsistent data. For + * example, a Principal may have a grant to a Principal Role in a cached entry, but that grant may + * be revoked prior to the Principal Role being granted a privilege on a Catalog. If the Principal + * record is stale, but the Principal Role is refreshed, the Principal may be incorrectly + * authorized to access the Catalog. + * + * @param callCtx the Polaris call context + * @param entityType the entity type + * @param entityIds the list of entity ids to load + * @return the list of resolved entities, in the same order as the requested entity ids. As in + * {@link + * org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadResolvedEntities(PolarisCallContext, + * PolarisEntityType, List)}, elements in the returned list may be null if the corresponding + * entity id does not exist. + */ + List getOrLoadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List entityIds); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java index c30b996f14..cf5ad0c213 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java @@ -24,24 +24,39 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.config.BehaviorChangeConfiguration; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; +import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */ public class InMemoryEntityCache implements EntityCache { - + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEntityCache.class); + public static final int MAX_CACHE_REFRESH_ATTEMPTS = 100; private final PolarisDiagnostics diagnostics; private final PolarisMetaStoreManager polarisMetaStoreManager; private final Cache byId; @@ -451,4 +466,121 @@ && isNewer(existingCacheEntry, existingCacheEntryByName)) { // return what we found return new EntityCacheLookupResult(entry, cacheHit); } + + @Override + public List getOrLoadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List entityIds) { + // use a map to collect cached entries to avoid concurrency problems in case a second thread is + // trying to populate + // the cache from a different snapshot + Map resolvedEntities = new HashMap<>(); + boolean stateResolved = false; + for (int i = 0; i < MAX_CACHE_REFRESH_ATTEMPTS; i++) { + Function, ResolvedEntitiesResult> loaderFunc = + idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx, entityType, idsToLoad); + if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { + stateResolved = true; + break; + } + } + if (!stateResolved) { + LOGGER.warn( + "Unable to resolve entities in cache after multiple attempts {} - resolved: {}", + entityIds, + resolvedEntities); + diagnostics.fail("cannot_resolve_all_entities", "Unable to resolve entities in cache"); + } + + return entityIds.stream() + .map( + id -> { + ResolvedPolarisEntity entity = resolvedEntities.get(id); + return entity == null ? null : new EntityCacheLookupResult(entity, true); + }) + .collect(Collectors.toList()); + } + + private boolean isCacheStateValid( + @Nonnull PolarisCallContext callCtx, + @Nonnull Map resolvedEntities, + @Nonnull List entityIds, + @Nonnull Function, ResolvedEntitiesResult> loaderFunc) { + ChangeTrackingResult changeTrackingResult = + polarisMetaStoreManager.loadEntitiesChangeTracking(callCtx, entityIds); + List idsToLoad = new ArrayList<>(); + if (changeTrackingResult.isSuccess()) { + idsToLoad.addAll(validateCacheEntries(entityIds, resolvedEntities, changeTrackingResult)); + } else { + idsToLoad.addAll(entityIds); + } + if (!idsToLoad.isEmpty()) { + ResolvedEntitiesResult resolvedEntitiesResult = loaderFunc.apply(idsToLoad); + if (resolvedEntitiesResult.isSuccess()) { + LOGGER.debug("Resolved entities - validating cache"); + resolvedEntitiesResult.getResolvedEntities().stream() + .filter(Objects::nonNull) + .forEach( + e -> { + this.cacheNewEntry(e); + resolvedEntities.put( + new PolarisEntityId(e.getEntity().getCatalogId(), e.getEntity().getId()), e); + }); + } + } + + // the loader function should always return a batch of results from the same "snapshot" of the + // persistence, so + // if the changeTracking call above failed, we should have loaded the entire batch in one shot. + // There should be no + // need to revalidate the entities. + List idsToReload = + changeTrackingResult.isSuccess() + ? validateCacheEntries(entityIds, resolvedEntities, changeTrackingResult) + : List.of(); + return idsToReload.isEmpty(); + } + + private List validateCacheEntries( + List entityIds, + Map resolvedEntities, + ChangeTrackingResult changeTrackingResult) { + List idsToReload = new ArrayList<>(); + Iterator idIterator = entityIds.iterator(); + Iterator changeTrackingIterator = + changeTrackingResult.getChangeTrackingVersions().iterator(); + while (idIterator.hasNext() && changeTrackingIterator.hasNext()) { + PolarisEntityId entityId = idIterator.next(); + PolarisChangeTrackingVersions changeTrackingVersions = changeTrackingIterator.next(); + if (changeTrackingVersions == null) { + // entity has been purged + ResolvedPolarisEntity cachedEntity = getEntityById(entityId.getId()); + if (cachedEntity != null || resolvedEntities.containsKey(entityId)) { + LOGGER.debug("Entity {} has been purged, removing from cache", entityId); + Optional.ofNullable(cachedEntity).ifPresent(this::removeCacheEntry); + resolvedEntities.remove(entityId); + } + continue; + } + // compare versions using equals rather than less than so we can use the same function to + // validate that the cache + // entries are consistent with a single call to the change tracking table, rather than some + // grants ahead and some + // grants behind + ResolvedPolarisEntity cachedEntity = + resolvedEntities.computeIfAbsent(entityId, id -> this.getEntityById(id.getId())); + if (cachedEntity == null + || cachedEntity.getEntity().getEntityVersion() + != changeTrackingVersions.getEntityVersion() + || cachedEntity.getEntity().getGrantRecordsVersion() + != changeTrackingVersions.getGrantRecordsVersion()) { + idsToReload.add(entityId); + } else { + resolvedEntities.put(entityId, cachedEntity); + } + } + LOGGER.debug("Cache entries {} need to be reloaded", idsToReload); + return idsToReload; + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java new file mode 100644 index 0000000000..61eb27da1f --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.persistence.dao.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.util.List; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; + +/** Response object for the loadResolvedEntities call. */ +public class ResolvedEntitiesResult extends BaseResult { + private final List resolvedEntities; + + public ResolvedEntitiesResult(List resolvedEntities) { + super(ReturnStatus.SUCCESS, null); + this.resolvedEntities = resolvedEntities; + } + + public ResolvedEntitiesResult( + @Nonnull ReturnStatus returnStatus, @Nullable String extraInformation) { + super(returnStatus, extraInformation); + this.resolvedEntities = null; + } + + @JsonCreator + private ResolvedEntitiesResult( + @JsonProperty("returnStatus") ReturnStatus returnStatus, + @JsonProperty("extraInformation") String extraInformation, + @JsonProperty("resolvedEntities") List resolvedEntities) { + super(returnStatus, extraInformation); + this.resolvedEntities = resolvedEntities; + } + + public List getResolvedEntities() { + return resolvedEntities; + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java index 848a8421e5..a4d5024b69 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java @@ -389,7 +389,7 @@ public Page listEntities( /** {@inheritDoc} */ @Override @Nonnull - public Page loadEntities( + public Page listFullEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 402cdc280e..db3ccd0f35 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.config.FeatureConfiguration; @@ -56,6 +57,7 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisObjectMapperUtil; import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; +import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult; @@ -71,6 +73,7 @@ import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult; import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; import org.apache.polaris.core.persistence.pagination.Page; @@ -723,10 +726,10 @@ private void bootstrapPolarisService( } /** - * See {@link PolarisMetaStoreManager#loadEntities(PolarisCallContext, List, PolarisEntityType, - * PolarisEntitySubType, PageToken)} + * See {@link PolarisMetaStoreManager#listFullEntities(PolarisCallContext, List, + * PolarisEntityType, PolarisEntitySubType, PageToken)} */ - private @Nonnull Page loadEntities( + private @Nonnull Page listFullEntities( @Nonnull PolarisCallContext callCtx, @Nonnull TransactionalPersistence ms, @Nullable List catalogPath, @@ -756,7 +759,7 @@ private void bootstrapPolarisService( /** {@inheritDoc} */ @Override - public @Nonnull Page loadEntities( + public @Nonnull Page listFullEntities( @Nonnull PolarisCallContext callCtx, @Nullable List catalogPath, @Nonnull PolarisEntityType entityType, @@ -768,7 +771,7 @@ private void bootstrapPolarisService( // run operation in a read transaction return ms.runInReadTransaction( callCtx, - () -> loadEntities(callCtx, ms, catalogPath, entityType, entitySubType, pageToken)); + () -> listFullEntities(callCtx, ms, catalogPath, entityType, entitySubType, pageToken)); } /** {@link #createPrincipal(PolarisCallContext, PrincipalEntity)} */ @@ -2293,6 +2296,57 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( return result; } + private static ResolvedEntitiesResult getResolvedEntitiesResult( + PolarisCallContext callCtx, + TransactionalPersistence ms, + List entityIds, + Function entityTypeForIndex) { + List entities = ms.lookupEntitiesInCurrentTxn(callCtx, entityIds); + // mimic the behavior of loadEntity above, return null if not found or type mismatch + List ret = + IntStream.range(0, entityIds.size()) + .mapToObj( + i -> { + if (entities.get(i) != null + && !entities.get(i).getType().equals(entityTypeForIndex.apply(i))) { + return null; + } else { + return entities.get(i); + } + }) + .map(e -> toResolvedPolarisEntity(callCtx, e, ms)) + .collect(Collectors.toList()); + return new ResolvedEntitiesResult(ret); + } + + private static ResolvedPolarisEntity toResolvedPolarisEntity( + PolarisCallContext callCtx, PolarisBaseEntity e, TransactionalPersistence ms) { + if (e == null) { + return null; + } else { + // load the grant records + final List grantRecordsAsSecurable = + ms.loadAllGrantRecordsOnSecurableInCurrentTxn(callCtx, e.getCatalogId(), e.getId()); + final List grantRecordsAsGrantee = + e.getType().isGrantee() + ? ms.loadAllGrantRecordsOnSecurableInCurrentTxn(callCtx, e.getCatalogId(), e.getId()) + : List.of(); + return new ResolvedPolarisEntity( + PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable); + } + } + + @Nonnull + @Override + public ResolvedEntitiesResult loadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List entityIds) { + TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); + return ms.runInReadTransaction( + callCtx, () -> getResolvedEntitiesResult(callCtx, ms, entityIds, i -> entityType)); + } + /** {@inheritDoc} */ private @Nonnull ResolvedEntityResult refreshResolvedEntity( @Nonnull PolarisCallContext callCtx, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java index 3802908b82..dfcd5f9256 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java @@ -225,7 +225,7 @@ List lookupEntityVersionsInCurrentTxn( pageToken); } - /** See {@link org.apache.polaris.core.persistence.BasePersistence#loadEntities} */ + /** See {@link org.apache.polaris.core.persistence.BasePersistence#listFullEntities} */ @Nonnull Page loadEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java index ce34320504..95774364d9 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java @@ -19,16 +19,25 @@ package org.apache.polaris.core.persistence.cache; import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; +import static org.assertj.core.api.Assertions.assertThat; import java.time.Clock; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; @@ -37,6 +46,8 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl; import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; @@ -44,10 +55,14 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Unit testing of the entity cache */ public class InMemoryEntityCacheTest { + public static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEntityCache.class); private final PolarisDiagnostics diagServices; private final PolarisCallContext callCtx; private final PolarisTestMetaStoreManager tm; @@ -105,31 +120,31 @@ void testGetOrLoadEntityByName() { EntityCacheLookupResult lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isFalse(); - Assertions.assertThat(lookup.getCacheEntry()).isNotNull(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isFalse(); + assertThat(lookup.getCacheEntry()).isNotNull(); // validate the cache entry PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); - Assertions.assertThat(catalog).isNotNull(); - Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG); + assertThat(catalog).isNotNull(); + assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG); // do it again, should be found in the cache lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isTrue(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isTrue(); // do it again by id, should be found in the cache lookup = cache.getOrLoadEntityById( this.callCtx, catalog.getCatalogId(), catalog.getId(), catalog.getType()); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isTrue(); - Assertions.assertThat(lookup.getCacheEntry()).isNotNull(); - Assertions.assertThat(lookup.getCacheEntry().getEntity()).isNotNull(); - Assertions.assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isTrue(); + assertThat(lookup.getCacheEntry()).isNotNull(); + assertThat(lookup.getCacheEntry().getEntity()).isNotNull(); + assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull(); // get N1 PolarisBaseEntity N1 = @@ -140,128 +155,119 @@ void testGetOrLoadEntityByName() { new EntityCacheByNameKey( catalog.getId(), catalog.getId(), PolarisEntityType.NAMESPACE, "N1"); ResolvedPolarisEntity cacheEntry = cache.getEntityByName(N1_name); - Assertions.assertThat(cacheEntry).isNull(); + assertThat(cacheEntry).isNull(); // try to find it in the cache by id. Should not be there, i.e. no cache hit lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(), N1.getType()); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isFalse(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isFalse(); // should be there now, by name cacheEntry = cache.getEntityByName(N1_name); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); // should be there now, by id cacheEntry = cache.getEntityById(N1.getId()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); // lookup N1 ResolvedPolarisEntity N1_entry = cache.getEntityById(N1.getId()); - Assertions.assertThat(N1_entry).isNotNull(); - Assertions.assertThat(N1_entry.getEntity()).isNotNull(); - Assertions.assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(N1_entry).isNotNull(); + assertThat(N1_entry.getEntity()).isNotNull(); + assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull(); // negative tests, load an entity which does not exist lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 10000, N1.getType()); - Assertions.assertThat(lookup).isNull(); + assertThat(lookup).isNull(); lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "non_existant_catalog")); - Assertions.assertThat(lookup).isNull(); + assertThat(lookup).isNull(); // lookup N2 to validate grants EntityCacheByNameKey N2_name = new EntityCacheByNameKey(catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2"); lookup = cache.getOrLoadEntityByName(callCtx, N2_name); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_N1 = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_N1).isNotNull(); - Assertions.assertThat(cacheEntry_N1.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry_N1).isNotNull(); + assertThat(cacheEntry_N1.getEntity()).isNotNull(); + assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull(); // lookup catalog role R1 EntityCacheByNameKey R1_name = new EntityCacheByNameKey( catalog.getId(), catalog.getId(), PolarisEntityType.CATALOG_ROLE, "R1"); lookup = cache.getOrLoadEntityByName(callCtx, R1_name); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_R1 = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_R1).isNotNull(); - Assertions.assertThat(cacheEntry_R1.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull(); + assertThat(cacheEntry_R1).isNotNull(); + assertThat(cacheEntry_R1.getEntity()).isNotNull(); + assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull(); // we expect one TABLE_READ grant on that securable granted to the catalog role R1 - Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1); PolarisGrantRecord gr = cacheEntry_N1.getGrantRecordsAsSecurable().get(0); // securable is N1, grantee is R1 - Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId()); - Assertions.assertThat(gr.getGranteeCatalogId()) - .isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId()); - Assertions.assertThat(gr.getSecurableCatalogId()) - .isEqualTo(cacheEntry_N1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getPrivilegeCode()) - .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode()); + assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId()); + assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); + assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId()); + assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId()); + assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode()); // R1 should have 4 privileges granted to it - Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4); + assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4); List matchPriv = cacheEntry_R1.getGrantRecordsAsGrantee().stream() .filter( grantRecord -> grantRecord.getPrivilegeCode() == PolarisPrivilege.TABLE_READ_DATA.getCode()) .collect(Collectors.toList()); - Assertions.assertThat(matchPriv).hasSize(1); + assertThat(matchPriv).hasSize(1); gr = matchPriv.get(0); - Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId()); - Assertions.assertThat(gr.getGranteeCatalogId()) - .isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId()); - Assertions.assertThat(gr.getSecurableCatalogId()) - .isEqualTo(cacheEntry_N1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getPrivilegeCode()) - .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode()); + assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId()); + assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); + assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId()); + assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId()); + assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode()); // lookup principal role PR1 EntityCacheByNameKey PR1_name = new EntityCacheByNameKey(PolarisEntityType.PRINCIPAL_ROLE, "PR1"); lookup = cache.getOrLoadEntityByName(callCtx, PR1_name); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_PR1 = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_PR1).isNotNull(); - Assertions.assertThat(cacheEntry_PR1.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull(); + assertThat(cacheEntry_PR1).isNotNull(); + assertThat(cacheEntry_PR1.getEntity()).isNotNull(); + assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull(); // R1 should have 1 CATALOG_ROLE_USAGE privilege granted *on* it to PR1 - Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1); gr = cacheEntry_R1.getGrantRecordsAsSecurable().get(0); - Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId()); - Assertions.assertThat(gr.getSecurableCatalogId()) - .isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId()); - Assertions.assertThat(gr.getGranteeCatalogId()) - .isEqualTo(cacheEntry_PR1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getPrivilegeCode()) - .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode()); + assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId()); + assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); + assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId()); + assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_PR1.getEntity().getCatalogId()); + assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode()); // PR1 should have 1 grant on it to P1. - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode()) + assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode()) .isEqualTo(PolarisPrivilege.PRINCIPAL_ROLE_USAGE.getCode()); // PR1 should have 2 grants to it, on R1 and R2 - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode()) + assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2); + assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode()) .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode()); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode()) + assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode()) .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode()); } @@ -274,14 +280,14 @@ void testRefresh() { EntityCacheLookupResult lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isFalse(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isFalse(); // the catalog - Assertions.assertThat(lookup.getCacheEntry()).isNotNull(); + assertThat(lookup.getCacheEntry()).isNotNull(); PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); - Assertions.assertThat(catalog).isNotNull(); - Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG); + assertThat(catalog).isNotNull(); + assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG); // find table N5/N6/T6 PolarisBaseEntity N5 = @@ -294,23 +300,23 @@ void testRefresh() { PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ICEBERG_TABLE, "T6"); - Assertions.assertThat(T6v1).isNotNull(); + assertThat(T6v1).isNotNull(); // that table is not in the cache ResolvedPolarisEntity cacheEntry = cache.getEntityById(T6v1.getId()); - Assertions.assertThat(cacheEntry).isNull(); + assertThat(cacheEntry).isNull(); // now load that table in the cache cacheEntry = cache.getAndRefreshIfNeeded( this.callCtx, T6v1, T6v1.getEntityVersion(), T6v1.getGrantRecordsVersion()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); PolarisBaseEntity table = cacheEntry.getEntity(); - Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId()); - Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); - Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion()); + assertThat(table.getId()).isEqualTo(T6v1.getId()); + assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); + assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion()); // update the entity PolarisBaseEntity T6v2 = @@ -319,31 +325,31 @@ void testRefresh() { T6v1, "{\"v2_properties\": \"some value\"}", "{\"v2_internal_properties\": \"internal value\"}"); - Assertions.assertThat(T6v2).isNotNull(); + assertThat(T6v2).isNotNull(); // now refresh that entity. But because we don't change the versions, nothing should be reloaded cacheEntry = cache.getAndRefreshIfNeeded( this.callCtx, T6v1, T6v1.getEntityVersion(), T6v1.getGrantRecordsVersion()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); table = cacheEntry.getEntity(); - Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId()); - Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); - Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion()); + assertThat(table.getId()).isEqualTo(T6v1.getId()); + assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); + assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion()); // now refresh again, this time with the new versions. Should be reloaded cacheEntry = cache.getAndRefreshIfNeeded( this.callCtx, T6v2, T6v2.getEntityVersion(), T6v2.getGrantRecordsVersion()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); table = cacheEntry.getEntity(); - Assertions.assertThat(table.getId()).isEqualTo(T6v2.getId()); - Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); - Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion()); + assertThat(table.getId()).isEqualTo(T6v2.getId()); + assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); + assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion()); // update it again PolarisBaseEntity T6v3 = @@ -352,7 +358,7 @@ void testRefresh() { T6v2, "{\"v3_properties\": \"some value\"}", "{\"v3_internal_properties\": \"internal value\"}"); - Assertions.assertThat(T6v3).isNotNull(); + assertThat(T6v3).isNotNull(); // the two catalog roles PolarisBaseEntity R1 = @@ -368,9 +374,9 @@ void testRefresh() { this.callCtx, N2, N2.getEntityVersion(), N2.getGrantRecordsVersion()); // should have one single grant - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1); // perform an additional grant to R1 this.tm.grantPrivilege(R1, List.of(catalog, N1), N2, PolarisPrivilege.NAMESPACE_FULL_METADATA); @@ -380,8 +386,8 @@ void testRefresh() { this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); // same entity version but different grant records - Assertions.assertThat(N2v2).isNotNull(); - Assertions.assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion() + 1); + assertThat(N2v2).isNotNull(); + assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion() + 1); // the cache is outdated now lookup = @@ -389,24 +395,24 @@ void testRefresh() { this.callCtx, new EntityCacheByNameKey( catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2")); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); cacheEntry = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1); - Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion()) + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry.getEntity().getGrantRecordsVersion()) .isEqualTo(N2.getGrantRecordsVersion()); // now refresh cacheEntry = cache.getAndRefreshIfNeeded( this.callCtx, N2, N2v2.getEntityVersion(), N2v2.getGrantRecordsVersion()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2); - Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion()) + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2); + assertThat(cacheEntry.getEntity().getGrantRecordsVersion()) .isEqualTo(N2v2.getGrantRecordsVersion()); } @@ -418,23 +424,23 @@ void testRenameAndCacheDestinationBeforeLoadingSource() { EntityCacheLookupResult lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.getCacheEntry()).isNotNull(); + assertThat(lookup).isNotNull(); + assertThat(lookup.getCacheEntry()).isNotNull(); PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); PolarisBaseEntity N1 = this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(), N1.getType()); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); EntityCacheByNameKey T4_name = new EntityCacheByNameKey(N1.getCatalogId(), N1.getId(), PolarisEntityType.TABLE_LIKE, "T4"); lookup = cache.getOrLoadEntityByName(callCtx, T4_name); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_T4 = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_T4).isNotNull(); - Assertions.assertThat(cacheEntry_T4.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry_T4).isNotNull(); + assertThat(cacheEntry_T4.getEntity()).isNotNull(); + assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull(); PolarisBaseEntity T4 = cacheEntry_T4.getEntity(); @@ -445,18 +451,17 @@ void testRenameAndCacheDestinationBeforeLoadingSource() { new EntityCacheByNameKey( N1.getCatalogId(), N1.getId(), PolarisEntityType.TABLE_LIKE, "T4_renamed"); lookup = cache.getOrLoadEntityByName(callCtx, T4_renamed); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_T4_renamed = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_T4_renamed).isNotNull(); + assertThat(cacheEntry_T4_renamed).isNotNull(); PolarisBaseEntity T4_renamed_entity = cacheEntry_T4_renamed.getEntity(); // new entry if lookup by id EntityCacheLookupResult lookupResult = cache.getOrLoadEntityById(callCtx, T4.getCatalogId(), T4.getId(), T4.getType()); - Assertions.assertThat(lookupResult).isNotNull(); - Assertions.assertThat(lookupResult.getCacheEntry()).isNotNull(); - Assertions.assertThat(lookupResult.getCacheEntry().getEntity().getName()) - .isEqualTo("T4_renamed"); + assertThat(lookupResult).isNotNull(); + assertThat(lookupResult.getCacheEntry()).isNotNull(); + assertThat(lookupResult.getCacheEntry().getEntity().getName()).isEqualTo("T4_renamed"); // old name is gone, replaced by new name // Assertions.assertNull(cache.getOrLoadEntityByName(callCtx, T4_name)); @@ -469,7 +474,7 @@ void testRenameAndCacheDestinationBeforeLoadingSource() { T4_renamed_entity.getGrantRecordsVersion()); // now the loading by the old name should return null - Assertions.assertThat(cache.getOrLoadEntityByName(callCtx, T4_name)).isNull(); + assertThat(cache.getOrLoadEntityByName(callCtx, T4_name)).isNull(); } /* Helper for `testEntityWeigher` */ @@ -495,7 +500,584 @@ void testEntityWeigher() { .setMetadataLocation("a".repeat(1000 * 1000)) .build(); - Assertions.assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity)); - Assertions.assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity)); + assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity)); + assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity)); + } + + @Test + public void testBatchLoadByEntityIds() { + // get a new cache + InMemoryEntityCache cache = this.allocateNewCache(); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get some entities that exist in the test setup + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N5 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N5"); + PolarisBaseEntity N5_N6 = + this.tm.ensureExistsByName(List.of(catalog, N5), PolarisEntityType.NAMESPACE, "N6"); + + // Pre-load N1 into cache + cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(), N1.getType()); + + // Create list of entity IDs - N1 is already cached, N5, N5_N6 are not + List entityIds = + List.of(getPolarisEntityId(N1), getPolarisEntityId(N5), getPolarisEntityId(N5_N6)); + + // Test batch loading by entity IDs (all are namespaces) + List results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + + // Verify all entities were found + assertThat(results).hasSize(3); + assertThat(results.get(0)).isNotNull(); // N1 - was cached + assertThat(results.get(1)).isNotNull(); // N5 - was loaded + assertThat(results.get(2)).isNotNull(); // N5_N6 - was loaded + + // Verify the entities are correct + assertThat(results.get(0).getCacheEntry().getEntity().getId()).isEqualTo(N1.getId()); + assertThat(results.get(1).getCacheEntry().getEntity().getId()).isEqualTo(N5.getId()); + assertThat(results.get(2).getCacheEntry().getEntity().getId()).isEqualTo(N5_N6.getId()); + + // All should be cache hits now since they were loaded in the previous call + assertThat(results.get(0).isCacheHit()).isTrue(); + assertThat(results.get(1).isCacheHit()).isTrue(); + assertThat(results.get(2).isCacheHit()).isTrue(); + + // Test with a non-existent entity ID + List nonExistentIds = + List.of(new PolarisEntityId(catalog.getCatalogId(), 99999L)); + List nonExistentResults = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, nonExistentIds); + + assertThat(nonExistentResults).hasSize(1); + assertThat(nonExistentResults.get(0)).isNull(); + + // Test with table entities separately + PolarisBaseEntity T6 = + this.tm.ensureExistsByName( + List.of(catalog, N5, N5_N6), + PolarisEntityType.TABLE_LIKE, + PolarisEntitySubType.ICEBERG_TABLE, + "T6"); + + List tableIds = List.of(getPolarisEntityId(T6)); + + List tableResults = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.TABLE_LIKE, tableIds); + + assertThat(tableResults).hasSize(1); + assertThat(tableResults.get(0)).isNotNull(); + assertThat(tableResults.get(0).getCacheEntry().getEntity().getId()).isEqualTo(T6.getId()); + } + + @Test + public void testBatchLoadWithStaleVersions() { + // get a new cache + InMemoryEntityCache cache = this.allocateNewCache(); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get table T6 that we can update + PolarisBaseEntity N5 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N5"); + PolarisBaseEntity N5_N6 = + this.tm.ensureExistsByName(List.of(catalog, N5), PolarisEntityType.NAMESPACE, "N6"); + PolarisBaseEntity T6v1 = + this.tm.ensureExistsByName( + List.of(catalog, N5, N5_N6), + PolarisEntityType.TABLE_LIKE, + PolarisEntitySubType.ICEBERG_TABLE, + "T6"); + + // Load T6 into cache initially + cache.getOrLoadEntityById(this.callCtx, T6v1.getCatalogId(), T6v1.getId(), T6v1.getType()); + + // Verify it's in cache with original version + ResolvedPolarisEntity cachedT6 = cache.getEntityById(T6v1.getId()); + assertThat(cachedT6).isNotNull(); + assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); + + // Update the entity to create a new version + PolarisBaseEntity T6v2 = + this.tm.updateEntity( + List.of(catalog, N5, N5_N6), + T6v1, + "{\"v2_properties\": \"some value\"}", + "{\"v2_internal_properties\": \"internal value\"}"); + assertThat(T6v2).isNotNull(); + assertThat(T6v2.getEntityVersion()).isGreaterThan(T6v1.getEntityVersion()); + + List results; + // Create entity ID list with the updated entity + List entityIds = List.of(getPolarisEntityId(T6v2)); + + // Call batch load - this should detect the stale version and reload + results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.TABLE_LIKE, entityIds); + + // Verify the entity was reloaded with the new version + assertThat(results).hasSize(1); + assertThat(results.get(0)).isNotNull(); + + ResolvedPolarisEntity reloadedT6 = results.get(0).getCacheEntry(); + assertThat(reloadedT6.getEntity().getId()).isEqualTo(T6v2.getId()); + assertThat(reloadedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); + + // Verify the cache now contains the updated version + cachedT6 = cache.getEntityById(T6v2.getId()); + assertThat(cachedT6).isNotNull(); + assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); + } + + @Test + public void testBatchLoadWithStaleGrantVersions() { + // get a new cache + InMemoryEntityCache cache = this.allocateNewCache(); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get entities we'll work with + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + PolarisBaseEntity R1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.CATALOG_ROLE, "R1"); + + // Load N2 into cache initially + cache.getOrLoadEntityByName( + this.callCtx, + new EntityCacheByNameKey(catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2")); + + // Verify it's in cache with original grant version + ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId()); + assertThat(cachedN2).isNotNull(); + int originalGrantVersion = cachedN2.getEntity().getGrantRecordsVersion(); + + // Grant additional privilege to change grant version + this.tm.grantPrivilege(R1, List.of(catalog, N1), N2, PolarisPrivilege.NAMESPACE_FULL_METADATA); + + // Get the updated entity with new grant version + PolarisBaseEntity N2Updated = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + assertThat(N2Updated.getGrantRecordsVersion()).isGreaterThan(originalGrantVersion); + + // Create entity ID list + List entityIds = List.of(getPolarisEntityId(N2Updated)); + + // Call batch load - this should detect the stale grant version and reload + List results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + + // Verify the entity was reloaded with the new grant version + assertThat(results).hasSize(1); + assertThat(results.get(0)).isNotNull(); + + ResolvedPolarisEntity reloadedN2 = results.get(0).getCacheEntry(); + assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2Updated.getId()); + assertThat(reloadedN2.getEntity().getGrantRecordsVersion()) + .isEqualTo(N2Updated.getGrantRecordsVersion()); + + // Should now have more grant records + assertThat(reloadedN2.getGrantRecordsAsSecurable().size()).isGreaterThan(1); + + // Verify the cache now contains the updated grant version + cachedN2 = cache.getEntityById(N2Updated.getId()); + assertThat(cachedN2).isNotNull(); + assertThat(cachedN2.getEntity().getGrantRecordsVersion()) + .isEqualTo(N2Updated.getGrantRecordsVersion()); + } + + @Test + public void testBatchLoadVersionRetryLogic() { + // get a new cache + PolarisMetaStoreManager metaStoreManager = Mockito.spy(this.metaStoreManager); + InMemoryEntityCache cache = + new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), metaStoreManager); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get entities that we can work with + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + + // Load N2 into cache initially + cache.getOrLoadEntityByName( + this.callCtx, + new EntityCacheByNameKey(catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2")); + + // Verify it's in cache with original version + ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId()); + assertThat(cachedN2).isNotNull(); + int originalEntityVersion = cachedN2.getEntity().getEntityVersion(); + + // Update the entity multiple times to create version skew + PolarisBaseEntity N2v2 = + this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}", null); + + // the first call should return the first version, then we call the real method to get the + // latest + Mockito.doReturn( + new ChangeTrackingResult( + List.of( + changeTrackingFor(catalog), changeTrackingFor(N1), changeTrackingFor(N2v2)))) + .when(metaStoreManager) + .loadEntitiesChangeTracking(Mockito.any(), Mockito.any()); + Mockito.doCallRealMethod() + .when(metaStoreManager) + .loadEntitiesChangeTracking(Mockito.any(), Mockito.any()); + + // update again to create v3, which isn't returned in the change tracking result + PolarisBaseEntity N2v3 = + this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\": \"value\"}", null); + + // Verify versions increased + assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion); + assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion()); + + // Create entity ID list + List entityIds = + List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), getPolarisEntityId(N2)); + + // Call batch load - this should detect the stale versions and reload until consistent + List results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + + // Verify the entity was reloaded with the latest version + assertThat(results).hasSize(3); + assertThat(results) + .doesNotContainNull() + .extracting(EntityCacheLookupResult::getCacheEntry) + .doesNotContainNull() + .extracting(e -> e.getEntity().getId()) + .containsExactly(catalog.getId(), N1.getId(), N2.getId()); + + ResolvedPolarisEntity reloadedN2 = results.get(2).getCacheEntry(); + assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2v3.getId()); + assertThat(reloadedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion()); + + // Verify the cache now contains the latest version + cachedN2 = cache.getEntityById(N2v3.getId()); + assertThat(cachedN2).isNotNull(); + assertThat(cachedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion()); + } + + @Test + public void testBatchLoadVersionRetryFailsAfterMaxAttempts() { + // get a new cache + PolarisMetaStoreManager metaStoreManager = Mockito.spy(this.metaStoreManager); + InMemoryEntityCache cache = + new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), metaStoreManager); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get entities that we can work with + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + + // Load N2 into cache initially + cache.getOrLoadEntityByName( + this.callCtx, + new EntityCacheByNameKey(catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2")); + + // Verify it's in cache with original version + ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId()); + assertThat(cachedN2).isNotNull(); + int originalEntityVersion = cachedN2.getEntity().getEntityVersion(); + + // Update the entity multiple times to create version skew + PolarisBaseEntity N2v2 = + this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}", null); + + // return v2 for the change tracking + Mockito.doReturn( + new ChangeTrackingResult( + List.of( + changeTrackingFor(catalog), changeTrackingFor(N1), changeTrackingFor(N2v2)))) + .when(metaStoreManager) + .loadEntitiesChangeTracking(Mockito.any(), Mockito.any()); + + // update again to create v3, which isn't returned in the change tracking result + PolarisBaseEntity N2v3 = + this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\": \"value\"}", null); + + // Verify versions increased + assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion); + assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion()); + + // Create entity ID list + List entityIds = + List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), getPolarisEntityId(N2)); + Assertions.assertThatThrownBy( + () -> + cache.getOrLoadResolvedEntities( + this.callCtx, PolarisEntityType.NAMESPACE, entityIds)) + .isInstanceOf(RuntimeException.class); + } + + private static PolarisEntityId getPolarisEntityId(PolarisBaseEntity catalog) { + return new PolarisEntityId(catalog.getCatalogId(), catalog.getId()); + } + + @Test + public void testConcurrentClientLoadingBehavior() throws Exception { + // Load catalog into cache + EntityCacheLookupResult lookup = + allocateNewCache() + .getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get multiple entities to create a larger list for concurrent processing + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + PolarisBaseEntity N3 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N3"); + PolarisBaseEntity N5 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N5"); + PolarisBaseEntity N5_N6 = + this.tm.ensureExistsByName(List.of(catalog, N5), PolarisEntityType.NAMESPACE, "N6"); + + // Create entity IDs list for both clients + List entityIds = + List.of( + getPolarisEntityId(N1), + getPolarisEntityId(N2), + getPolarisEntityId(N3), + getPolarisEntityId(N5), + getPolarisEntityId(N5_N6)); + + // Update one of the entities to create version differences + PolarisBaseEntity N2v2 = + this.tm.updateEntity( + List.of(catalog, N1), N2, "{\"concurrent_test\": \"client1_version\"}", null); + PolarisBaseEntity N2v3 = + this.tm.updateEntity( + List.of(catalog, N1), N2v2, "{\"concurrent_test\": \"client2_version\"}", null); + + // Mock the metastore manager to control the timing of method calls + PolarisMetaStoreManager mockedMetaStoreManager = Mockito.spy(this.metaStoreManager); + + // Create caches with the mocked metastore manager + InMemoryEntityCache cache = + new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), mockedMetaStoreManager); + + // Synchronization primitives for controlling execution order + CountDownLatch client1ChangeTrackingResult = new CountDownLatch(1); + Semaphore client1ResolvedEntitiesBlocker = new Semaphore(0); + + // Atomic references to capture results from both threads + AtomicReference client1Exception = new AtomicReference<>(); + AtomicReference client2Exception = new AtomicReference<>(); + + // Configure mock behavior: + // 1. Allow both threads to call loadEntitiesChangeTracking() with different versions + // 2. Block client1's loadResolvedEntities() until client2's loadResolvedEntities() completes + + // Mock loadEntitiesChangeTracking to return different versions for each client + Mockito.doAnswer( + invocation -> { + // First call (client1) - returns older version for N2 + LOGGER.debug("Returning change tracking for client1"); + return new ChangeTrackingResult( + List.of( + changeTrackingFor(N1), + changeTrackingFor(N2v2), // older version + changeTrackingFor(N3), + changeTrackingFor(N5), + changeTrackingFor(N5_N6))); + }) + .doAnswer( + invocation -> { + // Second call (client2) - returns newer version for N2 + LOGGER.debug("Returning change tracking for client2"); + return new ChangeTrackingResult( + List.of( + changeTrackingFor(N1), + changeTrackingFor(N2v3), // newer version + changeTrackingFor(N3), + changeTrackingFor(N5), + changeTrackingFor(N5_N6))); + }) + .when(mockedMetaStoreManager) + .loadEntitiesChangeTracking(Mockito.any(), Mockito.any()); + + // Mock loadResolvedEntities to control timing - client1 blocks until client2 completes + // client1 receives the older version of all entities, while client2 receives the newer version + // of N2 + Answer client1Answer = + invocation -> { + // This is client1's loadResolvedEntities call - block until client2 completes + try { + client1ChangeTrackingResult.countDown(); + LOGGER.debug("Awaiting client2 to complete resolved entities load"); + client1ResolvedEntitiesBlocker.acquire(); // Block until client2 signals completion + List resolvedEntities = + List.of( + getResolvedPolarisEntity(N1), + getResolvedPolarisEntity(N2v2), + getResolvedPolarisEntity(N3), + getResolvedPolarisEntity(N5), + getResolvedPolarisEntity(N5_N6)); + LOGGER.debug("Client1 returning results {}", resolvedEntities); + return new ResolvedEntitiesResult(resolvedEntities); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + Answer client2Answer = + invocation -> { + // This is client2's loadResolvedEntities call - execute normally and signal client1 + try { + LOGGER.debug("Client2 loading resolved entities"); + var result = + new ResolvedEntitiesResult( + List.of( + getResolvedPolarisEntity(N1), + getResolvedPolarisEntity(N2v3), + getResolvedPolarisEntity(N3), + getResolvedPolarisEntity(N5), + getResolvedPolarisEntity(N5_N6))); + client1ResolvedEntitiesBlocker.release(); // Allow client1 to proceed + LOGGER.debug("Client2 returning results {}", result.getResolvedEntities()); + return result; + } catch (Exception e) { + client1ResolvedEntitiesBlocker.release(); // Release in case of error + throw e; + } + }; + Mockito.doAnswer(client1Answer) + .doAnswer(client2Answer) + .when(mockedMetaStoreManager) + .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any()); + + // ExecutorService isn't AutoCloseable in JDK 11 :( + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + // Client 1 task - should get older version and be blocked during loadResolvedEntities + Future> client1Task = + executorService.submit( + () -> { + try { + // Client1 calls getOrLoadResolvedEntities + // - loadEntitiesChangeTracking returns older version for N2 + // - loadResolvedEntities will block until client2 completes + return cache.getOrLoadResolvedEntities( + this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + } catch (Exception e) { + client1Exception.set(e); + return null; + } + }); + + // Client 2 task - should get newer version and complete first + Future> client2Task = + executorService.submit( + () -> { + try { + // Client2 calls getOrLoadResolvedEntities + // - loadEntitiesChangeTracking returns newer version for N2 + // - loadResolvedEntities executes normally and signals client1 when done + client1ChangeTrackingResult.await(); + return cache.getOrLoadResolvedEntities( + this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + } catch (Exception e) { + client2Exception.set(e); + client1ResolvedEntitiesBlocker.release(); // Release in case of error + return null; + } + }); + + // Wait for both tasks to complete + List client1Results = client1Task.get(); + List client2Results = client2Task.get(); + + // Verify no exceptions occurred + assertThat(client1Exception.get()).isNull(); + assertThat(client2Exception.get()).isNull(); + + // Verify both clients got results + assertThat(client1Results).isNotNull(); + assertThat(client2Results).isNotNull(); + assertThat(client1Results).hasSize(5); + assertThat(client2Results).hasSize(5); + + // All entities should be found + assertThat(client1Results).doesNotContainNull(); + assertThat(client2Results).doesNotContainNull(); + + // Verify that client1 got the older version of N2 (index 1 in the list) + ResolvedPolarisEntity client1N2 = client1Results.get(1).getCacheEntry(); + assertThat(client1N2.getEntity().getId()).isEqualTo(N2.getId()); + assertThat(client1N2.getEntity().getEntityVersion()).isEqualTo(N2v2.getEntityVersion()); + + // Verify that client2 got the newer version of N2 + ResolvedPolarisEntity client2N2 = client2Results.get(1).getCacheEntry(); + assertThat(client2N2.getEntity().getId()).isEqualTo(N2.getId()); + assertThat(client2N2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion()); + + // Verify that both clients got consistent versions for other entities + for (int i = 0; i < 5; i++) { + if (i != 1) { // Skip N2 which we expect to be different + ResolvedPolarisEntity client1Entity = client1Results.get(i).getCacheEntry(); + ResolvedPolarisEntity client2Entity = client2Results.get(i).getCacheEntry(); + + assertThat(client1Entity.getEntity().getId()) + .isEqualTo(client2Entity.getEntity().getId()); + assertThat(client1Entity.getEntity().getEntityVersion()) + .isEqualTo(client2Entity.getEntity().getEntityVersion()); + assertThat(client1Entity.getEntity().getGrantRecordsVersion()) + .isEqualTo(client2Entity.getEntity().getGrantRecordsVersion()); + } + } + assertThat(entityIds).extracting(id -> cache.getEntityById(id.getId())).doesNotContainNull(); + } finally { + executorService.shutdown(); + } + } + + private static ResolvedPolarisEntity getResolvedPolarisEntity(PolarisBaseEntity catalog) { + return new ResolvedPolarisEntity(PolarisEntity.of(catalog), List.of(), List.of()); + } + + private static PolarisChangeTrackingVersions changeTrackingFor(PolarisBaseEntity entity) { + return new PolarisChangeTrackingVersions( + entity.getEntityVersion(), entity.getGrantRecordsVersion()); } } diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java index 43a0c55815..f8816260a2 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java @@ -118,7 +118,7 @@ protected void testCreateEntities() { .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); List listedEntities = - metaStoreManager.loadEntitiesAll( + metaStoreManager.listFullEntitiesAll( polarisTestMetaStoreManager.polarisCallContext, null, PolarisEntityType.TASK, @@ -238,6 +238,12 @@ protected void testLookup() { polarisTestMetaStoreManager.testLookup(); } + /** test batch entity load */ + @Test + protected void testLoadResolvedEntitiesById() { + polarisTestMetaStoreManager.testLoadResolvedEntitiesById(); + } + /** Test the set of functions for the entity cache */ @Test protected void testEntityCache() { diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java index 514a14c30f..e79209aa7b 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.polaris.core.PolarisCallContext; @@ -48,6 +49,7 @@ import org.apache.polaris.core.persistence.dao.entity.LoadGrantsResult; import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult; import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; @@ -55,6 +57,7 @@ import org.apache.polaris.core.policy.PolicyType; import org.apache.polaris.core.policy.PredefinedPolicyTypes; import org.assertj.core.api.Assertions; +import org.assertj.core.api.InstanceOfAssertFactories; /** Test the Polaris persistence layer */ public class PolarisTestMetaStoreManager { @@ -655,6 +658,7 @@ PolarisBaseEntity createEntity( .parentId(parentId) .name(name) .propertiesAsMap(properties) + .internalPropertiesAsMap(Map.of()) .build(); PolarisBaseEntity entity = polarisMetaStoreManager @@ -2690,6 +2694,114 @@ public void testLookup() { this.ensureNotExistsById(catalog.getId(), T1.getId(), PolarisEntityType.NAMESPACE); } + public void testLoadResolvedEntitiesById() { + // load all principals + List principals = + polarisMetaStoreManager + .listEntities( + this.polarisCallContext, + null, + PolarisEntityType.PRINCIPAL, + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) + .getEntities(); + + // create new catalog + PolarisBaseEntity catalog = + new PolarisBaseEntity( + PolarisEntityConstants.getNullId(), + polarisMetaStoreManager.generateNewEntityId(this.polarisCallContext).getId(), + PolarisEntityType.CATALOG, + PolarisEntitySubType.NULL_SUBTYPE, + PolarisEntityConstants.getRootEntityId(), + "test"); + CreateCatalogResult catalogCreated = + polarisMetaStoreManager.createCatalog(this.polarisCallContext, catalog, List.of()); + Assertions.assertThat(catalogCreated).isNotNull(); + + // load the catalog again, since the grant versions are different + catalog = + polarisMetaStoreManager + .loadEntity( + polarisCallContext, + 0L, + catalogCreated.getCatalog().getId(), + PolarisEntityType.CATALOG) + .getEntity(); + + // now create all objects + PolarisBaseEntity N1 = this.createEntity(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N1_N2 = + this.createEntity(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + PolarisBaseEntity T1 = + this.createEntity( + List.of(catalog, N1, N1_N2), + PolarisEntityType.TABLE_LIKE, + PolarisEntitySubType.ICEBERG_TABLE, + "T1"); + + // batch load all entities. They should all be present and non-null + ResolvedEntitiesResult entitiesResult = + polarisMetaStoreManager.loadResolvedEntities( + polarisCallContext, + PolarisEntityType.NAMESPACE, + List.of( + new PolarisEntityId(N1.getCatalogId(), N1.getId()), + new PolarisEntityId(N1_N2.getCatalogId(), N1_N2.getId()))); + Assertions.assertThat(entitiesResult) + .isNotNull() + .returns(BaseResult.ReturnStatus.SUCCESS, ResolvedEntitiesResult::getReturnStatus) + .extracting( + ResolvedEntitiesResult::getResolvedEntities, + InstanceOfAssertFactories.list(ResolvedPolarisEntity.class)) + .hasSize(2) + .allSatisfy(entity -> Assertions.assertThat(entity).isNotNull()) + .extracting(r -> getEntityCore(r.getEntity())) + .containsExactly(getEntityCore(N1), getEntityCore(N1_N2)); + + // try entities which do not exist + entitiesResult = + polarisMetaStoreManager.loadResolvedEntities( + polarisCallContext, + PolarisEntityType.CATALOG, + List.of( + new PolarisEntityId(catalog.getId(), 27), + new PolarisEntityId(catalog.getId(), 35))); + Assertions.assertThat(entitiesResult) + .isNotNull() + .returns(BaseResult.ReturnStatus.SUCCESS, ResolvedEntitiesResult::getReturnStatus) + .extracting( + ResolvedEntitiesResult::getResolvedEntities, + InstanceOfAssertFactories.list(ResolvedPolarisEntity.class)) + .hasSize(2) + .allSatisfy(entity -> Assertions.assertThat(entity).isNull()); + + // existing entities, some with wrong type + entitiesResult = + polarisMetaStoreManager.loadResolvedEntities( + polarisCallContext, + PolarisEntityType.NAMESPACE, + List.of( + new PolarisEntityId(catalog.getCatalogId(), catalog.getId()), + new PolarisEntityId(catalog.getId(), N1_N2.getId()), + new PolarisEntityId(catalog.getId(), T1.getId()))); + Assertions.assertThat(entitiesResult) + .isNotNull() + .returns(BaseResult.ReturnStatus.SUCCESS, ResolvedEntitiesResult::getReturnStatus) + .extracting( + ResolvedEntitiesResult::getResolvedEntities, + InstanceOfAssertFactories.list(ResolvedPolarisEntity.class)) + .hasSize(3) + .filteredOn(Objects::nonNull) + .hasSize(1) + .extracting(r -> getEntityCore(r.getEntity())) + .containsExactly(getEntityCore(N1_N2)); + } + + private static PolarisEntityCore getEntityCore(PolarisBaseEntity entity) { + return new PolarisEntityCore.Builder<>(entity).build(); + } + /** Test the set of functions for the entity cache */ public void testEntityCache() { // create test catalog diff --git a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java index 9c9074dba7..f3ff47851e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java @@ -979,7 +979,7 @@ public List listCatalogs() { /** List all catalogs without checking for permission. */ private Stream listCatalogsUnsafe() { return metaStoreManager - .loadEntitiesAll( + .listFullEntitiesAll( getCurrentPolarisContext(), null, PolarisEntityType.CATALOG, @@ -1224,7 +1224,7 @@ public List listPrincipals() { authorizeBasicRootOperationOrThrow(op); return metaStoreManager - .loadEntitiesAll( + .listFullEntitiesAll( getCurrentPolarisContext(), null, PolarisEntityType.PRINCIPAL, @@ -1331,7 +1331,7 @@ public List listPrincipalRoles() { authorizeBasicRootOperationOrThrow(op); return metaStoreManager - .loadEntitiesAll( + .listFullEntitiesAll( getCurrentPolarisContext(), null, PolarisEntityType.PRINCIPAL_ROLE, @@ -1455,7 +1455,7 @@ public List listCatalogRoles(String catalogName) { CatalogEntity catalogEntity = getCatalogByName(resolutionManifest, catalogName); List catalogPath = PolarisEntity.toCoreList(List.of(catalogEntity)); return metaStoreManager - .loadEntitiesAll( + .listFullEntitiesAll( getCurrentPolarisContext(), catalogPath, PolarisEntityType.CATALOG_ROLE, diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java index 8e4063b87b..9b2f7c8a62 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java @@ -188,7 +188,7 @@ public List listPolicies(Namespace namespace, @Nullable Policy } // with a policyType filter we need to load the full PolicyEntity to apply the filter return metaStoreManager - .loadEntitiesAll( + .listFullEntitiesAll( callContext.getPolarisCallContext(), catalogPath, PolarisEntityType.POLICY,