diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java index 026828c5795..2ca7a8af30b 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/InformationFetcher.java @@ -22,17 +22,18 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Stream; import jakarta.ws.rs.ServiceUnavailableException; import jakarta.ws.rs.core.Response; @@ -41,9 +42,16 @@ import org.apache.accumulo.core.client.admin.TabletInformation; import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.client.admin.servers.ServerId.Type; +import org.apache.accumulo.core.clientImpl.TabletInformationImpl; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.RowRange; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.SystemTables; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.TabletState; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.process.thrift.MetricResponse; import org.apache.accumulo.core.process.thrift.ServerProcessService.Client; import org.apache.accumulo.core.rpc.ThriftUtil; @@ -64,6 +72,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Suppliers; import com.google.common.net.HostAndPort; public class InformationFetcher implements RemovalListener, Runnable { @@ -139,32 +148,99 @@ public void run() { } - private class TableInformationFetcher implements Runnable { - private final ServerContext ctx; - private final TableId tableId; - private final SystemInformation summary; - - private TableInformationFetcher(ServerContext ctx, TableId tableId, SystemInformation summary) { - this.ctx = ctx; - this.tableId = tableId; - this.summary = summary; - } - - @Override - public void run() { + public static Future fetchTabletMetadata(ServerContext ctx, ExecutorService executor, + SystemInformation summary) { + return fetchTabletMetadata(ctx, tabletInformation -> { + final TableId tableId = tabletInformation.getTabletId().getTable(); try { final String tableName = ctx.getQualifiedTableName(tableId); - try (Stream tablets = - this.ctx.tableOperations().getTabletInformation(tableName, List.of(RowRange.all()))) { - tablets.forEach(t -> summary.processTabletInformation(tableId, tableName, t)); - } + summary.processTabletInformation(tableId, tableName, tabletInformation); } catch (TableNotFoundException e) { - LOG.warn("TableNotFoundException thrown while trying to gather information for TableId: {}", - tableId, e); + if (SystemTables.containsTableId(tableId)) { + throw new IllegalStateException(e); + } else { + LOG.debug("Table name for table id : {}, assuming table deleted", tableId); + } + throw new RuntimeException(e); + } + + }, executor); + } + + public static Future fetchTabletMetadata(ServerContext ctx, + Consumer tabletConsumer, ExecutorService executor) { + + Supplier currentTime = Suppliers.memoize(() -> { + try { + return ctx.instanceOperations().getManagerTime(); } catch (Exception e) { - LOG.warn("Interrupted while trying to gather information for TableId: {}", tableId, e); + throw new IllegalStateException(e); } - } + }); + + // create an initial background task to read root tablet metadata from zookeeper + var rootStage = CompletableFuture.supplyAsync(() -> { + // read live tservers from zookeeper + Set liveTserverSet = TabletMetadata.getLiveTServers(ctx); + // read root tablet metadata from zookeeper + var rootTablet = ctx.getAmple().readTablet(RootTable.EXTENT); + var tabletSate = TabletState.compute(rootTablet, liveTserverSet); + tabletConsumer + .accept(new TabletInformationImpl(rootTablet, tabletSate::toString, currentTime)); + if (tabletSate == TabletState.HOSTED) { + return liveTserverSet; + } else { + LOG.info("Not scanning root tablet because its state is {}", + TabletState.compute(rootTablet, liveTserverSet)); + return null; + } + }, executor); + + // runs a follow on task that scans the root tablet and creates a background task to scan each + // metadata table + var metaStage = rootStage.thenCompose(liveTserverSet -> { + if (liveTserverSet == null) { + // root stage was not successful + return null; + } + // TODO set an aggressive timeout on the metadata scan, if a failure happens after our checks + // it could cause the scan to hang. + try (var metaTablets = + ctx.getAmple().readTablets().forLevel(Ample.DataLevel.METADATA).build()) { + List> futures = new ArrayList<>(); + for (var metaTablet : metaTablets) { + var tabletState = TabletState.compute(metaTablet, liveTserverSet); + tabletConsumer + .accept(new TabletInformationImpl(metaTablet, tabletState::toString, currentTime)); + if (tabletState == TabletState.HOSTED) { + var range = MetadataSchema.TabletsSection.getRange() + .clip(metaTablet.getExtent().toDataRange(), true); + if (range != null) { + // spawn a task to scan this metadata tablet + var future = CompletableFuture.runAsync(() -> { + try (var userTablets = + ctx.getAmple().readTablets().scanMetadataTable().overRange(range).build()) { + for (var userTablet : userTablets) { + tabletConsumer.accept(new TabletInformationImpl(userTablet, + () -> TabletState.compute(userTablet, liveTserverSet).toString(), + currentTime)); + } + } + }); + futures.add(future); + } + } else { + LOG.info("Not scanning meta tablet {} because its state is {}", metaTablet.getExtent(), + tabletState); + } + } + + // return a completable future that waits for all the scans of metadata tablets + return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)); + } + }); + + return metaStage; } private class RunningCompactionFetcher implements Runnable { @@ -296,9 +372,8 @@ public void run() { futures.add(this.pool.submit(new RunningCompactionFetcher(summary, pool))); // Fetch Tablet / Tablet information from the metadata table - for (TableId tableId : this.ctx.createQualifiedTableNameToIdMap().values()) { - futures.add(this.pool.submit(new TableInformationFetcher(this.ctx, tableId, summary))); - } + + futures.add(fetchTabletMetadata(ctx, pool, summary)); futures.add(this.pool.submit(() -> { try { @@ -324,13 +399,10 @@ public void run() { tookToLong = true; } - Iterator> iter = futures.iterator(); - while (iter.hasNext()) { - Future future = iter.next(); + for (Future future : futures) { if (tookToLong && !future.isCancelled()) { future.cancel(true); } else if (future.isDone()) { - iter.remove(); try { future.get(); } catch (CancellationException | InterruptedException | ExecutionException e) { @@ -338,6 +410,10 @@ public void run() { } } } + + // more efficient to do batch removal from array list than doing it one by one + futures.removeIf(Future::isDone); + if (!futures.isEmpty()) { UtilWaitThread.sleep(3_000); }