Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;

import jakarta.ws.rs.ServiceUnavailableException;
import jakarta.ws.rs.core.Response;
Expand All @@ -41,9 +42,16 @@
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.clientImpl.TabletInformationImpl;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.RowRange;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletState;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.core.process.thrift.ServerProcessService.Client;
import org.apache.accumulo.core.rpc.ThriftUtil;
Expand All @@ -64,6 +72,7 @@
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.base.Suppliers;
import com.google.common.net.HostAndPort;

public class InformationFetcher implements RemovalListener<ServerId,MetricResponse>, Runnable {
Expand Down Expand Up @@ -139,32 +148,99 @@ public void run() {

}

private class TableInformationFetcher implements Runnable {
private final ServerContext ctx;
private final TableId tableId;
private final SystemInformation summary;

private TableInformationFetcher(ServerContext ctx, TableId tableId, SystemInformation summary) {
this.ctx = ctx;
this.tableId = tableId;
this.summary = summary;
}

@Override
public void run() {
public static Future<?> fetchTabletMetadata(ServerContext ctx, ExecutorService executor,
SystemInformation summary) {
return fetchTabletMetadata(ctx, tabletInformation -> {
final TableId tableId = tabletInformation.getTabletId().getTable();
try {
final String tableName = ctx.getQualifiedTableName(tableId);
try (Stream<TabletInformation> tablets =
this.ctx.tableOperations().getTabletInformation(tableName, List.of(RowRange.all()))) {
tablets.forEach(t -> summary.processTabletInformation(tableId, tableName, t));
}
summary.processTabletInformation(tableId, tableName, tabletInformation);
} catch (TableNotFoundException e) {
LOG.warn("TableNotFoundException thrown while trying to gather information for TableId: {}",
tableId, e);
if (SystemTables.containsTableId(tableId)) {
throw new IllegalStateException(e);
} else {
LOG.debug("Table name for table id : {}, assuming table deleted", tableId);
}
throw new RuntimeException(e);
}

}, executor);
}

public static Future<?> fetchTabletMetadata(ServerContext ctx,
Consumer<TabletInformation> tabletConsumer, ExecutorService executor) {

Supplier<Duration> currentTime = Suppliers.memoize(() -> {
try {
return ctx.instanceOperations().getManagerTime();
} catch (Exception e) {
LOG.warn("Interrupted while trying to gather information for TableId: {}", tableId, e);
throw new IllegalStateException(e);
}
}
});

// create an initial background task to read root tablet metadata from zookeeper
var rootStage = CompletableFuture.supplyAsync(() -> {
// read live tservers from zookeeper
Set<TServerInstance> liveTserverSet = TabletMetadata.getLiveTServers(ctx);
// read root tablet metadata from zookeeper
var rootTablet = ctx.getAmple().readTablet(RootTable.EXTENT);
var tabletSate = TabletState.compute(rootTablet, liveTserverSet);
tabletConsumer
.accept(new TabletInformationImpl(rootTablet, tabletSate::toString, currentTime));
if (tabletSate == TabletState.HOSTED) {
return liveTserverSet;
} else {
LOG.info("Not scanning root tablet because its state is {}",
TabletState.compute(rootTablet, liveTserverSet));
return null;
}
}, executor);

// runs a follow on task that scans the root tablet and creates a background task to scan each
// metadata table
var metaStage = rootStage.thenCompose(liveTserverSet -> {
if (liveTserverSet == null) {
// root stage was not successful
return null;
}
// TODO set an aggressive timeout on the metadata scan, if a failure happens after our checks
// it could cause the scan to hang.
try (var metaTablets =
ctx.getAmple().readTablets().forLevel(Ample.DataLevel.METADATA).build()) {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (var metaTablet : metaTablets) {
var tabletState = TabletState.compute(metaTablet, liveTserverSet);
tabletConsumer
.accept(new TabletInformationImpl(metaTablet, tabletState::toString, currentTime));
if (tabletState == TabletState.HOSTED) {
var range = MetadataSchema.TabletsSection.getRange()
.clip(metaTablet.getExtent().toDataRange(), true);
if (range != null) {
// spawn a task to scan this metadata tablet
var future = CompletableFuture.runAsync(() -> {
try (var userTablets =
ctx.getAmple().readTablets().scanMetadataTable().overRange(range).build()) {
for (var userTablet : userTablets) {
tabletConsumer.accept(new TabletInformationImpl(userTablet,
() -> TabletState.compute(userTablet, liveTserverSet).toString(),
currentTime));
}
}
});
futures.add(future);
}
} else {
LOG.info("Not scanning meta tablet {} because its state is {}", metaTablet.getExtent(),
tabletState);
}
}

// return a completable future that waits for all the scans of metadata tablets
return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
});

return metaStage;
}

private class RunningCompactionFetcher implements Runnable {
Expand Down Expand Up @@ -296,9 +372,8 @@ public void run() {
futures.add(this.pool.submit(new RunningCompactionFetcher(summary, pool)));

// Fetch Tablet / Tablet information from the metadata table
for (TableId tableId : this.ctx.createQualifiedTableNameToIdMap().values()) {
futures.add(this.pool.submit(new TableInformationFetcher(this.ctx, tableId, summary)));
}

futures.add(fetchTabletMetadata(ctx, pool, summary));

futures.add(this.pool.submit(() -> {
try {
Expand All @@ -324,20 +399,21 @@ public void run() {
tookToLong = true;
}

Iterator<Future<?>> iter = futures.iterator();
while (iter.hasNext()) {
Future<?> future = iter.next();
for (Future<?> future : futures) {
if (tookToLong && !future.isCancelled()) {
future.cancel(true);
} else if (future.isDone()) {
iter.remove();
try {
future.get();
} catch (CancellationException | InterruptedException | ExecutionException e) {
LOG.error("Error getting status from future", e);
}
}
}

// more efficient to do batch removal from array list than doing it one by one
futures.removeIf(Future::isDone);

if (!futures.isEmpty()) {
UtilWaitThread.sleep(3_000);
}
Expand Down