diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index e5b059a80f7..7dc37c35452 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -710,8 +710,8 @@ public BatchScanner createBatchScanner(String tableName, Authorizations authoriz int numQueryThreads) throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchReader(this, requireNotOffline(getTableId(tableName), tableName), - tableName, authorizations, numQueryThreads); + return new TabletServerBatchReader(this, getTableId(tableName), tableName, authorizations, + numQueryThreads); } @Override @@ -796,8 +796,7 @@ public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - Scanner scanner = - new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations); + Scanner scanner = new ScannerImpl(this, getTableId(tableName), authorizations); Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties()); if (batchSize != null) { scanner.setBatchSize(batchSize); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java new file mode 100644 index 00000000000..495e65e8854 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.util.Timer; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy.Eviction; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Preconditions; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter; + +public class OfflineTabletLocatorImpl extends TabletLocator { + + private static final Logger LOG = LoggerFactory.getLogger(OfflineTabletLocatorImpl.class); + + public static class OfflineTabletLocation extends TabletLocation { + + public static final String SERVER = "offline_table_marker"; + + public OfflineTabletLocation(KeyExtent tablet_extent) { + super(tablet_extent, SERVER, SERVER); + } + + } + + private class OfflineTabletsCache implements RemovalListener { + + private final ClientContext context; + private final int maxCacheSize; + private final int prefetch; + private final Cache cache; + private final LinkedBlockingQueue evictions = new LinkedBlockingQueue<>(); + private final TreeSet extents = new TreeSet<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Timer scanTimer = Timer.startNew(); + private final AtomicInteger cacheCount = new AtomicInteger(0); + private final Eviction evictionPolicy; + + private OfflineTabletsCache(ClientContext context) { + this.context = context; + Properties clientProperties = context.getProperties(); + Duration cacheDuration = Duration.ofMillis( + ClientProperty.OFFLINE_LOCATOR_CACHE_DURATION.getTimeInMillis(clientProperties)); + maxCacheSize = + Integer.parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_SIZE.getValue(clientProperties)); + prefetch = Integer + .parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_PREFETCH.getValue(clientProperties)); + + // This cache is used to evict KeyExtents from the extents TreeSet when + // they have not been accessed in cacheDuration. We are targeting to have + // maxCacheSize objects in the cache, but are not using the Cache's maximumSize + // to achieve this as the Cache will remove things from the Cache that were + // newly inserted and not yet used. This negates the pre-fetching feature + // that we have added into this TabletLocator for offline tables. Here we + // set the maximum size much larger than the property and use the cacheCount + // variable to manage the max size manually. + // @formatter:off + cache = Caffeine.newBuilder() + .expireAfterAccess(cacheDuration) + .initialCapacity(maxCacheSize) + .maximumSize(maxCacheSize * 2) + .removalListener(this) + .scheduler(Scheduler.systemScheduler()) + .recordStats(() -> new CaffeineStatsCounter(Metrics.globalRegistry, + OfflineTabletsCache.class.getSimpleName())) + .build(); + // @formatter:on + evictionPolicy = cache.policy().eviction().orElseThrow(); + } + + @Override + public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) { + if (cause == RemovalCause.REPLACED) { + // Don't remove from `extents` if the object was replaced in the cache + return; + } + LOG.trace("Extent {} was evicted from cache for {} ", key, cause); + cacheCount.decrementAndGet(); + evictions.add(key); + try { + if (lock.writeLock().tryLock(1, TimeUnit.MILLISECONDS)) { + try { + processRecentCacheEvictions(); + } finally { + lock.writeLock().unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting to acquire write lock", e); + } + } + + private void processRecentCacheEvictions() { + Preconditions.checkArgument(lock.writeLock().isHeldByCurrentThread()); + Set copy = new HashSet<>(); + evictions.drainTo(copy); + int numEvictions = copy.size(); + if (numEvictions > 0) { + LOG.trace("Processing {} prior evictions", numEvictions); + extents.removeAll(copy); + } + } + + private KeyExtent findOrLoadExtent(KeyExtent searchKey) { + lock.readLock().lock(); + try { + KeyExtent match = extents.ceiling(searchKey); + if (match != null && match.contains(searchKey.endRow())) { + // update access time in cache + @SuppressWarnings("unused") + var unused = cache.getIfPresent(match); + LOG.trace("Extent {} found in cache for start row {}", match, searchKey); + return match; + } + } finally { + lock.readLock().unlock(); + } + lock.writeLock().lock(); + // process prior evictions since we have the write lock + processRecentCacheEvictions(); + // The following block of code fixes an issue with + // the cache where recently pre-fetched extents + // will be evicted from the cache when it reaches + // the maxCacheSize. This is because from the cache's + // perspective they are the coldest objects. The code + // below manually removes the coldest extents that are + // before the searchKey.endRow to make room for the next + // batch of extents that we are going to load into the + // cache so that they are not immediately evicted. + if (cacheCount.get() + prefetch + 1 >= maxCacheSize) { + int evictionSize = prefetch * 2; + Set candidates = new HashSet<>(evictionPolicy.coldest(evictionSize).keySet()); + LOG.trace("Cache near max size, evaluating {} coldest entries", candidates); + candidates.removeIf(ke -> ke.contains(searchKey.endRow()) || ke.endRow() == null + || ke.endRow().compareTo(searchKey.endRow()) >= 0); + LOG.trace("Manually evicting coldest entries: {}", candidates); + cache.invalidateAll(candidates); + cache.cleanUp(); + } + // Load TabletMetadata + if (LOG.isDebugEnabled()) { + scanTimer.restart(); + } + int added = 0; + try (TabletsMetadata tm = + context.getAmple().readTablets().forTable(tid).overlapping(searchKey.endRow(), true, null) + .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { + Iterator iter = tm.iterator(); + for (int i = 0; i < prefetch && iter.hasNext(); i++) { + TabletMetadata t = iter.next(); + KeyExtent ke = t.getExtent(); + if (t.getLocation() != null) { + if (context.getTableState(tid) == TableState.ONLINE) { + throw new IllegalStateException( + "Cannot continue scan with OfflineTabletLocator, table is now online"); + } + throw new IllegalStateException( + "Extent " + ke + " has current or future location, but table is not online"); + } + LOG.trace("Caching extent: {}", ke); + cache.put(ke, ke); + cacheCount.incrementAndGet(); + TabletLocatorImpl.removeOverlapping(extents, ke); + extents.add(ke); + added++; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Took {}ms to scan and load {} metadata tablets for table {}", + scanTimer.elapsed(TimeUnit.MILLISECONDS), added, tid); + } + return extents.ceiling(searchKey); + } finally { + lock.writeLock().unlock(); + } + } + + private void invalidate(KeyExtent failedExtent) { + cache.invalidate(failedExtent); + } + + private void invalidate(Collection keySet) { + cache.invalidateAll(keySet); + } + + private void invalidateAll() { + cache.invalidateAll(); + } + + } + + private final TableId tid; + private final OfflineTabletsCache extentCache; + + public OfflineTabletLocatorImpl(ClientContext context, TableId tableId) { + tid = tableId; + if (context.getTableState(tid) != TableState.OFFLINE) { + throw new IllegalStateException("Table " + tableId + " is not offline"); + } + extentCache = new OfflineTabletsCache(context); + } + + @Override + public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, + boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + + if (skipRow) { + row = new Text(row); + row.append(new byte[] {0}, 0, 1); + } + + Text metadataRow = new Text(tid.canonical()); + metadataRow.append(new byte[] {';'}, 0, 1); + metadataRow.append(row.getBytes(), 0, row.getLength()); + + LOG.trace("Locating offline tablet for row: {}", metadataRow); + KeyExtent searchKey = KeyExtent.fromMetaRow(metadataRow); + KeyExtent match = extentCache.findOrLoadExtent(searchKey); + if (match != null) { + if (match.prevEndRow() == null || match.prevEndRow().compareTo(row) < 0) { + LOG.trace("Found match for row: {}, extent = {}", row, match); + return new OfflineTabletLocation(match); + } + } + LOG.trace("Found no matching extent for row: {}", row); + return null; + } + + @Override + public List binRanges(ClientContext context, List ranges, + Map>> binnedRanges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + + List tabletLocations = new ArrayList<>(ranges.size()); + List failures = new ArrayList<>(); + + l1: for (Range r : ranges) { + LOG.trace("Looking up locations for range: {}", r); + tabletLocations.clear(); + Text startRow; + + if (r.getStartKey() != null) { + startRow = r.getStartKey().getRow(); + } else { + startRow = new Text(); + } + + TabletLocation tl = this.locateTablet(context, startRow, false, false); + if (tl == null) { + LOG.trace("NOT FOUND first tablet in range: {}", r); + failures.add(r); + continue; + } + LOG.trace("Found first tablet in range: {}, extent: {}", r, tl.tablet_extent); + tabletLocations.add(tl); + + while (tl.tablet_extent.endRow() != null + && !r.afterEndKey(new Key(tl.tablet_extent.endRow()).followingKey(PartialKey.ROW))) { + KeyExtent priorExtent = tl.tablet_extent; + tl = locateTablet(context, tl.tablet_extent.endRow(), true, false); + + if (tl == null) { + LOG.trace("NOT FOUND tablet following {} in range: {}", priorExtent, r); + failures.add(r); + continue l1; + } + LOG.trace("Found following tablet in range: {}, extent: {}", r, tl.tablet_extent); + tabletLocations.add(tl); + } + + // Ensure the extents found are non overlapping and have no holes. When reading some extents + // from the cache and other from the metadata table in the loop above we may end up with + // non-contiguous extents. This can happen when a subset of exents are placed in the cache and + // then after that merges and splits happen. + if (TabletLocatorImpl.isContiguous(tabletLocations)) { + for (TabletLocation tl2 : tabletLocations) { + TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, r); + } + } else { + LOG.trace("Found non-contiguous tablet in range: {}", r); + failures.add(r); + } + + } + return failures; + } + + @Override + public void binMutations(ClientContext context, List mutations, + Map> binnedMutations, List failures) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + throw new UnsupportedOperationException(); + } + + @Override + public void invalidateCache(KeyExtent failedExtent) { + extentCache.invalidate(failedExtent); + } + + @Override + public void invalidateCache(Collection keySet) { + extentCache.invalidate(keySet); + } + + @Override + public void invalidateCache() { + extentCache.invalidateAll(); + } + + @Override + public void invalidateCache(ClientContext context, String server) { + invalidateCache(); + } + +} diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java index b4dfaa6258d..0fef7ecaab0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; @@ -163,6 +164,15 @@ public synchronized int getBatchSize() { @Override public synchronized Iterator> iterator() { ensureOpen(); + if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) { + try { + String tableName = context.getTableName(tableId); + context.requireNotOffline(tableId, tableName); + } catch (TableNotFoundException e) { + throw new RuntimeException("Table not found", e); + } + } + ScannerIterator iter = new ScannerIterator(context, tableId, authorizations, range, size, Duration.ofMillis(getTimeout(MILLISECONDS)), this, isolated, readaheadThreshold, new Reporter()); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java index c4c1dcdcd9f..b1a4557d7f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataLocationObtainer; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; @@ -110,6 +111,7 @@ public boolean equals(LocatorKey lk) { } private static final HashMap locators = new HashMap<>(); + private static final HashMap offlineLocators = new HashMap<>(); private static boolean enabled = true; public static synchronized void clearLocators() { @@ -117,6 +119,7 @@ public static synchronized void clearLocators() { locator.isValid = false; } locators.clear(); + offlineLocators.clear(); } static synchronized boolean isEnabled() { @@ -135,24 +138,31 @@ static synchronized void enable() { public static synchronized TabletLocator getLocator(ClientContext context, TableId tableId) { Preconditions.checkState(enabled, "The Accumulo singleton that that tracks tablet locations is " + "disabled. This is likely caused by all AccumuloClients being closed or garbage collected"); - LocatorKey key = new LocatorKey(context.getInstanceID(), tableId); - TabletLocator tl = locators.get(key); - if (tl == null) { - MetadataLocationObtainer mlo = new MetadataLocationObtainer(); - - if (RootTable.ID.equals(tableId)) { - tl = new RootTabletLocator(context.getTServerLockChecker()); - } else if (MetadataTable.ID.equals(tableId)) { - tl = new TabletLocatorImpl(MetadataTable.ID, getLocator(context, RootTable.ID), mlo, - context.getTServerLockChecker()); - } else { - tl = new TabletLocatorImpl(tableId, getLocator(context, MetadataTable.ID), mlo, - context.getTServerLockChecker()); + TableState state = context.getTableState(tableId); + if (state == TableState.OFFLINE) { + return offlineLocators.computeIfAbsent(tableId, + f -> new OfflineTabletLocatorImpl(context, tableId)); + } else { + offlineLocators.remove(tableId); + LocatorKey key = new LocatorKey(context.getInstanceID(), tableId); + TabletLocator tl = locators.get(key); + if (tl == null) { + MetadataLocationObtainer mlo = new MetadataLocationObtainer(); + + if (RootTable.ID.equals(tableId)) { + tl = new RootTabletLocator(context.getTServerLockChecker()); + } else if (MetadataTable.ID.equals(tableId)) { + tl = new TabletLocatorImpl(MetadataTable.ID, getLocator(context, RootTable.ID), mlo, + context.getTServerLockChecker()); + } else { + tl = new TabletLocatorImpl(tableId, getLocator(context, MetadataTable.ID), mlo, + context.getTServerLockChecker()); + } + locators.put(key, tl); } - locators.put(key, tl); + return tl; } - return tl; } static { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java index 8b149da57fa..cb03b0b4d89 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java @@ -118,6 +118,10 @@ public Iterator> iterator() { throw new IllegalStateException("batch reader closed"); } + if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) { + context.requireNotOffline(tableId, tableName); + } + return new TabletServerBatchReaderIterator(context, tableId, tableName, authorizations, ranges, numThreads, queryThreadPool, this, retryTimeout); } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java index 34780c8a06e..b7812c8b623 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java @@ -141,7 +141,23 @@ public enum ClientProperty { "A list of span receiver classes to send trace spans"), @Deprecated(since = "2.1.0", forRemoval = true) TRACE_ZOOKEEPER_PATH("trace.zookeeper.path", Constants.ZTRACERS, PropertyType.PATH, - "The zookeeper node where tracers are registered", "2.0.0", false); + "The zookeeper node where tracers are registered", "2.0.0", false), + + /* + * For use with OfflineTabletLocatorImpl + */ + OFFLINE_LOCATOR_CACHE_DURATION("offline.locator.cache.duration", "10m", PropertyType.TIMEDURATION, + "Amount of time for which offline extent information should be cached in the client. The offline" + + " extent information is used when performing eventual scans on offline tables.", + "2.1.5", false), + OFFLINE_LOCATOR_CACHE_PREFETCH("offline.locator.cache.prefetch", "10", PropertyType.COUNT, + "The number of offline extents that should be pre-loaded into the cache. The offline" + + " extent information is used when performing eventual scans on offline tables.", + "2.1.5", false), + OFFLINE_LOCATOR_CACHE_SIZE("offline.locator.cache.size", "100", PropertyType.COUNT, + "The number of offline extents that should be cached in the client. The offline" + + " extent information is used when performing eventual scans on offline tables.", + "2.1.5", false); @Deprecated(since = "2.1.0", forRemoval = true) public static final String TRACE_SPAN_RECEIVER_PREFIX = "trace.span.receiver"; diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java index 22275c9e119..f56d2a21a93 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -448,7 +449,8 @@ public Iterator> getReplicationNeededIterat } return Maps.immutableEntry(file, stat); }); - } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException e) { + } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException + | TableOfflineException e) { // No elements that we need to preclude return Collections.emptyIterator(); } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 4d9c4e745a6..39aec22ca2a 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -448,7 +449,8 @@ protected int removeReplicationEntries(Map candidates) { candidates.remove(id); log.info("Ignore closed log " + id + " because it is being replicated"); } - } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException ex) { + } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException + | TableOfflineException ex) { return candidates.size(); } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java index e14834cf483..a4039ca36a5 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java @@ -154,8 +154,8 @@ public static enum ScannerType { BATCH_SCANNER, SCANNER; } - private ScannerBase createScanner(AccumuloClient client, ScannerType stype, String tableName) - throws TableNotFoundException { + public static ScannerBase createScanner(AccumuloClient client, ScannerType stype, + String tableName) throws TableNotFoundException { switch (stype) { case BATCH_SCANNER: BatchScanner batchScanner = client.createBatchScanner(tableName, Authorizations.EMPTY); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 77cdc8c6478..b271ae22864 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -35,7 +35,6 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.ClientProperty; @@ -146,24 +145,6 @@ public void testBatchScan() throws Exception { } } - @Test - public void testScanOfflineTable() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - String tableName = getUniqueNames(1)[0]; - - createTableAndIngest(client, tableName, null, 10, 10, "colf"); - client.tableOperations().offline(tableName, true); - - assertThrows(TableOfflineException.class, () -> { - try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { - scanner.setRange(new Range()); - scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); - assertEquals(100, Iterables.size(scanner)); - } // when the scanner is closed, all open sessions should be closed - }); - } - } - @Test @Timeout(value = 20) public void testBatchScannerTimeout() throws Exception { @@ -232,7 +213,7 @@ public static int createTableAndIngest(AccumuloClient client, String tableName, */ public static int ingest(AccumuloClient client, String tableName, int rowCount, int colCount, int offset, String colf, boolean shouldFlush) throws Exception { - ReadWriteIT.ingest(client, colCount, rowCount, 50, offset, colf, tableName); + ReadWriteIT.ingest(client, rowCount, colCount, 50, offset, colf, tableName); final int ingestedEntriesCount = colCount * rowCount; diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java new file mode 100644 index 00000000000..868a5332c8e --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.TabletLocator; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.ScanServerAllowedTablesIT.ScannerType; +import org.apache.accumulo.test.functional.ReadWriteIT; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import com.google.common.collect.Iterables; + +public class ScanServerOfflineTableIT extends SharedMiniClusterBase { + + private static class ScanServerOfflineITConfiguration + implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, + org.apache.hadoop.conf.Configuration coreSite) { + cfg.setNumScanServers(1); + + // Timeout scan sessions after being idle for 3 seconds + cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s"); + + // Configure the scan server to only have 1 scan executor thread. This means + // that the scan server will run scans serially, not concurrently. + cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1"); + } + } + + @BeforeAll + public static void start() throws Exception { + ScanServerOfflineITConfiguration c = new ScanServerOfflineITConfiguration(); + SharedMiniClusterBase.startMiniClusterWithConfig(c); + SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, + "localhost"); + + String zooRoot = getCluster().getServerContext().getZooKeeperRoot(); + ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter(); + String scanServerRoot = zooRoot + Constants.ZSSERVERS; + + while (zrw.getChildren(scanServerRoot).size() == 0) { + Thread.sleep(500); + } + } + + @AfterAll + public static void stop() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + @ParameterizedTest + @EnumSource(value = ScanServerAllowedTablesIT.ScannerType.class) + public void testSimpleScan(ScannerType stype) throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0] + stype.name(); + + final int ingestedEntryCount = + ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, "colf"); + client.tableOperations().offline(tableName, true); + + // This isn't necessary, but will ensure that the TabletLocator is cleared + // Invalidate the TabletLocator for the offline table + TabletLocator.getLocator((ClientContext) client, + TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache(); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + } + } + + @ParameterizedTest + @EnumSource(value = ScanServerAllowedTablesIT.ScannerType.class) + public void testScan(ScannerType stype) throws Exception { + + final int rows = 1000; + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0] + stype.name(); + + final int ingestedEntryCount = + ScanServerIT.createTableAndIngest(client, tableName, null, rows, 10, "colf"); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The tablet server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verify(client, rows, 10, 50, 0, tableName); + + client.tableOperations().offline(tableName, true); + + // This isn't necessary, but will ensure that the TabletLocator is cleared + // Invalidate the TabletLocator for the offline table + TabletLocator.getLocator((ClientContext) client, + TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache(); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verifyEventual(client, rows, 10, 50, 0, tableName); + + client.tableOperations().online(tableName, true); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The tablet server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verify(client, rows, 10, 50, 0, tableName); + + // Add some splits to the table + SortedSet splits = new TreeSet<>(); + for (int i = 0; i < rows; i++) { + splits.add(new Text("row_" + String.format("%010d", i))); + } + client.tableOperations().addSplits(tableName, splits); + client.instanceOperations().waitForBalance(); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The tablet server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verify(client, rows, 10, 50, 0, tableName); + + client.tableOperations().offline(tableName, true); + + // This isn't necessary, but will ensure that the TabletLocator is cleared + // Invalidate the TabletLocator for the offline table + TabletLocator.getLocator((ClientContext) client, + TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache(); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verifyEventual(client, rows, 10, 50, 0, tableName); + + } + + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java index 092ea14a750..527ac4a95f1 100644 --- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.data.Key; @@ -109,6 +110,12 @@ public static void main(String[] args) throws Exception { public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams params) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + verifyIngest(accumuloClient, params, ConsistencyLevel.IMMEDIATE); + } + + public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams params, + ConsistencyLevel cl) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { byte[][] bytevals = TestIngest.generateValues(params.dataSize); Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2"); @@ -136,6 +143,7 @@ public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams para Text colq = new Text("col_" + String.format("%07d", expectedCol)); try (Scanner scanner = accumuloClient.createScanner("test_ingest", labelAuths)) { + scanner.setConsistencyLevel(cl); scanner.setBatchSize(1); Key startKey = new Key(rowKey, colf, colq); Range range = new Range(startKey, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL)); @@ -181,6 +189,7 @@ public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams para Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow))); try (Scanner scanner = accumuloClient.createScanner(params.tableName, labelAuths)) { + scanner.setConsistencyLevel(cl); scanner.setRange(new Range(startKey, endKey)); for (int j = 0; j < params.cols; j++) { scanner.fetchColumn(new Text(params.columnFamily), diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java index 44eb5b23743..c4cb9574a97 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@ -62,6 +62,7 @@ import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -211,18 +212,23 @@ public static void ingest(AccumuloClient accumuloClient, int rows, int cols, int public static void verify(AccumuloClient accumuloClient, int rows, int cols, int width, int offset, String tableName) throws Exception { - verify(accumuloClient, rows, cols, width, offset, COLF, tableName); + verify(accumuloClient, rows, cols, width, offset, COLF, tableName, ConsistencyLevel.IMMEDIATE); + } + + public static void verifyEventual(AccumuloClient accumuloClient, int rows, int cols, int width, + int offset, String tableName) throws Exception { + verify(accumuloClient, rows, cols, width, offset, COLF, tableName, ConsistencyLevel.EVENTUAL); } private static void verify(AccumuloClient accumuloClient, int rows, int cols, int width, - int offset, String colf, String tableName) throws Exception { + int offset, String colf, String tableName, ConsistencyLevel cl) throws Exception { VerifyParams params = new VerifyParams(accumuloClient.properties(), tableName, rows); params.rows = rows; params.dataSize = width; params.startRow = offset; params.columnFamily = colf; params.cols = cols; - VerifyIngest.verifyIngest(accumuloClient, params); + VerifyIngest.verifyIngest(accumuloClient, params, cl); } public static String[] args(String... args) { @@ -445,7 +451,7 @@ public void localityGroupChange() throws Exception { to.setLocalityGroups(table, getGroups(cfg)); to.flush(table, null, null, true); verify(accumuloClient, ROWS * i, 1, 50, 0, table); - verify(accumuloClient, ROWS * i, 1, 50, 0, "xyz", table); + verify(accumuloClient, ROWS * i, 1, 50, 0, "xyz", table, ConsistencyLevel.IMMEDIATE); i++; } }