Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@
import org.apache.accumulo.monitor.next.SystemInformation.TimeOrderedRunningCompactionSet;
import org.apache.accumulo.monitor.next.deployment.DeploymentOverview;
import org.apache.accumulo.monitor.next.ec.CompactorsSummary;
import org.apache.accumulo.monitor.next.views.ServersView;
import org.apache.accumulo.monitor.next.views.ServersView.Status;
import org.apache.accumulo.monitor.next.views.Status;
import org.apache.accumulo.monitor.next.views.TableData;
import org.apache.accumulo.monitor.next.views.TableDataFactory;

import io.micrometer.core.instrument.Meter.Id;
import io.micrometer.core.instrument.cumulative.CumulativeDistributionSummary;
Expand Down Expand Up @@ -267,12 +268,12 @@ public Map<Id,CumulativeDistributionSummary> getScanServerAllMetricSummary() {
@GET
@Path("servers/view")
@Produces(MediaType.APPLICATION_JSON)
@Description("Returns a UI-ready table model for server process pages. Add ';table=<ServersView.ServerTable>' to URL")
public ServersView getServerProcessView(@MatrixParam("table") ServersView.ServerTable table) {
@Description("Returns a UI-ready table model for server process pages. Add ';table=<TableDataFactory.TableName>' to URL")
public TableData getServerProcessView(@MatrixParam("table") TableDataFactory.TableName table) {
if (table == null) {
throw new BadRequestException("A 'table' parameter is required");
}
ServersView view =
TableData view =
monitor.getInformationFetcher().getSummaryForEndpoint().getServerProcessView(table);
if (view == null) {
throw new NotFoundException("ServersView object for table " + table.name() + " not found");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@
import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.monitor.next.deployment.DeploymentOverview;
import org.apache.accumulo.monitor.next.views.ServersView;
import org.apache.accumulo.monitor.next.views.ServersView.Column;
import org.apache.accumulo.monitor.next.views.ServersView.ColumnFactory;
import org.apache.accumulo.monitor.next.views.ServersView.Status;
import org.apache.accumulo.monitor.next.views.ColumnFactory;
import org.apache.accumulo.monitor.next.views.Status;
import org.apache.accumulo.monitor.next.views.TableData;
import org.apache.accumulo.monitor.next.views.TableData.Column;
import org.apache.accumulo.monitor.next.views.TableDataFactory;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.metrics.MetricResponseWrapper;
Expand Down Expand Up @@ -431,8 +432,8 @@ public record CompactionGroupSummary(String groupId, long running) {
private final AtomicLong timestamp = new AtomicLong(0);
private final EnumMap<ServerId.Type,Status> componentStatuses =
new EnumMap<>(ServerId.Type.class);
private final EnumMap<ServersView.ServerTable,Supplier<ServersView>> serverMetricsView =
new EnumMap<>(ServersView.ServerTable.class);
private final EnumMap<TableDataFactory.TableName,Supplier<TableData>> serverMetricsView =
new EnumMap<>(TableDataFactory.TableName.class);
private DeploymentOverview deploymentOverview = new DeploymentOverview(0L, List.of());
private String managerGoalState;
private final int rgLongRunningCompactionSize;
Expand Down Expand Up @@ -512,18 +513,19 @@ private void updateAggregates(final MetricResponse response,

}

private ServersView createCompactionQueueSummary(final Set<ServerId> managers) {
private TableData createCompactionQueueSummary(final Set<ServerId> managers) {

final Column COMPACTION_QUEUE_COL =
new Column(ServersView.RG_COL_KEY, "Compaction Queue", "Compaction Queue", "");
new Column(TableDataFactory.RG_COL_KEY, "Compaction Queue", "Compaction Queue", "");

List<ColumnFactory> cols = ServersView.columnsFor(ServersView.ServerTable.COORDINATOR_QUEUES);
List<ColumnFactory> cols =
TableDataFactory.columnsFor(TableDataFactory.TableName.COORDINATOR_QUEUES);

// Remove the column mapping for the resource group and replace it so that
// the column header reads "Compaction Queue" instead of "Resource Group"
int rgIdx = -1;
for (int idx = 0; idx < cols.size(); idx++) {
if (cols.get(idx).getColumn().key().equals(ServersView.RG_COL_KEY)) {
if (cols.get(idx).getColumn().key().equals(TableDataFactory.RG_COL_KEY)) {
rgIdx = idx;
break;
}
Expand Down Expand Up @@ -583,9 +585,9 @@ public Object getRowData(ServerId sid, MetricResponse mr,

if (!qm.isEmpty()) {
// Create a ServersView object from the MetricResponse for each queue
return new ServersView(qm.keySet(), 0, qm, timestamp.get(), cols);
return TableDataFactory.forColumns(qm.keySet(), qm, timestamp.get(), cols);
}
return new ServersView(Set.of(), 0, Map.of(), timestamp.get(), cols);
return TableDataFactory.forColumns(Set.of(), Map.of(), timestamp.get(), cols);
}

public void processResponse(final ServerId server, final MetricResponse response) {
Expand Down Expand Up @@ -760,30 +762,30 @@ public void finish() {
switch (type) {
case COMPACTOR:
compactors.values().forEach(servers::addAll);
cacheServerProcessView(ServersView.ServerTable.COMPACTORS, servers);
cacheServerProcessView(TableDataFactory.TableName.COMPACTORS, servers);
break;
case GARBAGE_COLLECTOR:
servers.add(gc.get());
cacheServerProcessView(ServersView.ServerTable.GC_SUMMARY, servers);
cacheServerProcessView(ServersView.ServerTable.GC_FILES, servers);
cacheServerProcessView(ServersView.ServerTable.GC_WALS, servers);
cacheServerProcessView(TableDataFactory.TableName.GC_SUMMARY, servers);
cacheServerProcessView(TableDataFactory.TableName.GC_FILES, servers);
cacheServerProcessView(TableDataFactory.TableName.GC_WALS, servers);
break;
case MANAGER:
servers.addAll(managers);
cacheServerProcessView(ServersView.ServerTable.MANAGERS, servers);
cacheServerProcessView(ServersView.ServerTable.MANAGER_FATE, servers);
cacheServerProcessView(ServersView.ServerTable.MANAGER_COMPACTIONS, servers);
ServersView coordinatorQueues = createCompactionQueueSummary(servers);
serverMetricsView.put(ServersView.ServerTable.COORDINATOR_QUEUES,
cacheServerProcessView(TableDataFactory.TableName.MANAGERS, servers);
cacheServerProcessView(TableDataFactory.TableName.MANAGER_FATE, servers);
cacheServerProcessView(TableDataFactory.TableName.MANAGER_COMPACTIONS, servers);
TableData coordinatorQueues = createCompactionQueueSummary(servers);
serverMetricsView.put(TableDataFactory.TableName.COORDINATOR_QUEUES,
memoize(() -> coordinatorQueues));
break;
case SCAN_SERVER:
sservers.values().forEach(servers::addAll);
cacheServerProcessView(ServersView.ServerTable.SCAN_SERVERS, servers);
cacheServerProcessView(TableDataFactory.TableName.SCAN_SERVERS, servers);
break;
case TABLET_SERVER:
tservers.values().forEach(servers::addAll);
cacheServerProcessView(ServersView.ServerTable.TABLET_SERVERS, servers);
cacheServerProcessView(TableDataFactory.TableName.TABLET_SERVERS, servers);
break;
case MONITOR:
default:
Expand Down Expand Up @@ -822,14 +824,15 @@ private Status computeServerStatus(ServerId.Type type) {
long problemHostCount =
problemHosts.stream().filter(serverId -> serverId.getType() == type).count();
int missingMetricCount = (int) servers.stream()
.filter(serverId -> !ServersView.hasMetricData(allMetrics.getIfPresent(serverId))).count();
return ServersView.buildStatus(servers.size(), problemHostCount, missingMetricCount,
.filter(serverId -> !TableDataFactory.hasMetricData(allMetrics.getIfPresent(serverId)))
.count();
return Status.buildStatus(servers.size(), problemHostCount, missingMetricCount,
type == ServerId.Type.TABLET_SERVER);
}

private String computeManagerGoalState() {
Integer goalState = managers.stream().map(allMetrics::getIfPresent)
.map(response -> ServersView.metricValuesByName(response)
.map(response -> TableDataFactory.metricValuesByName(response)
.get(Metric.MANAGER_GOAL_STATE.getName()))
.filter(value -> value != null && !value.isEmpty())
.map(value -> value.stream().map(SystemInformation::getMetricValue).filter(Objects::nonNull)
Expand Down Expand Up @@ -949,15 +952,13 @@ public long getTimestamp() {
/**
* Cache a ServersView for the given table and set of servers.
*/
private void cacheServerProcessView(ServersView.ServerTable table, Set<ServerId> servers) {
long problemHostCount =
problemHosts.stream().filter(serverId -> servers.contains(serverId)).count();
serverMetricsView.put(table, memoize(() -> new ServersView(servers, problemHostCount,
allMetrics.asMap(), timestamp.get(), ServersView.columnsFor(table))));
private void cacheServerProcessView(TableDataFactory.TableName table, Set<ServerId> servers) {
serverMetricsView.put(table, memoize(
() -> TableDataFactory.forTable(table, servers, allMetrics.asMap(), timestamp.get())));
}

public ServersView getServerProcessView(ServersView.ServerTable table) {
Supplier<ServersView> view = this.serverMetricsView.get(table);
public TableData getServerProcessView(TableDataFactory.TableName table) {
Supplier<TableData> view = this.serverMetricsView.get(table);
if (view != null) {
return view.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.monitor.next.views;

import java.util.List;
import java.util.Map;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.metrics.MonitorMeterRegistry;
import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.monitor.next.SystemInformation;
import org.apache.accumulo.monitor.next.views.TableData.Column;

public interface ColumnFactory {

default Number computeRate(Number sum) {
if (sum == null) {
return null;
}
return sum.doubleValue() / MonitorMeterRegistry.STEP.toSeconds();
}

default Number add(Number n1, Number n2) {
if (n1 == null && n2 == null) {
return null;
} else if (n1 == null) {
return n2;
} else if (n2 == null) {
return n1;
} else if (n1 instanceof Double || n2 instanceof Double) {
return n1.doubleValue() + n2.doubleValue();
} else if (n1 instanceof Long || n2 instanceof Long) {
return n1.longValue() + n2.longValue();
} else if (n1 instanceof Integer || n2 instanceof Integer) {
return n1.intValue() + n2.intValue();
} else {
throw new IllegalArgumentException(
"Unexpected value type: " + n1.getClass().getName() + " " + n2.getClass().getName());
}
}

default Number sum(List<FMetric> metrics) {
Number sum = null;
for (var metric : metrics) {
sum = add(sum, SystemInformation.getMetricValue(metric));
}
return sum;
}

Column getColumn();

Object getRowData(ServerId sid, MetricResponse mr, Map<String,List<FMetric>> serverMetrics);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.monitor.next.views;

import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
import org.apache.accumulo.core.metrics.flatbuffers.FTag;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.monitor.next.SystemInformation;
import org.apache.accumulo.monitor.next.views.TableData.Column;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorColumnFactory implements ColumnFactory {

private static final Logger log = LoggerFactory.getLogger(ExecutorColumnFactory.class);

private final Column column;
private final String metricName;
private final Predicate<String> tagPredicate;
private final Type type;

enum Type {
COMPLETED, QUEUED
}

String getMetricName(Type t) {
return switch (t) {
case COMPLETED -> Metric.EXECUTOR_COMPLETED.getName();
case QUEUED -> Metric.EXECUTOR_QUEUED.getName();
};
}

String getCssClass(Type t) {
return switch (t) {
case COMPLETED -> Metric.MonitorCssClass.RATE.getCssClass();
case QUEUED -> Metric.MonitorCssClass.NUMBER.getCssClass();
};
}

ExecutorColumnFactory(Type type, String theadPoolPrefix, String label, String description) {
this.type = type;
this.metricName = getMetricName(type);
this.tagPredicate = s -> s.startsWith(theadPoolPrefix);
this.column =
new Column(metricName + "-" + theadPoolPrefix, label, description, getCssClass(type));
}

ExecutorColumnFactory(Type type, String keySuffix, Predicate<String> tagPredicate, String label,
String description) {
this.type = type;
this.metricName = getMetricName(type);
this.tagPredicate = tagPredicate;
this.column = new Column(metricName + "-" + keySuffix, label, description, getCssClass(type));
}

@Override
public Column getColumn() {
return column;
}

@Override
public Object getRowData(ServerId sid, MetricResponse mr,
Map<String,List<FMetric>> serverMetrics) {

var metrics = serverMetrics.getOrDefault(metricName, List.of());

Number sum = null;

FTag ftag = new FTag();
for (var metric : metrics) {
boolean foundTag = false;
String tag = null;
for (int i = 0; i < metric.tagsLength(); i++) {
metric.tags(ftag, i);
var key = ftag.key();
var value = ftag.value();
if (key != null && value != null && key.equals("name") && tagPredicate.test(value)) {
foundTag = true;
tag = value;
break;
}
}

var metricStatistic = TableDataFactory.extractStatistic(metric);
if (foundTag && (metricStatistic == null || metricStatistic.equals("value")
|| metricStatistic.equals("count"))) {
var val = SystemInformation.getMetricValue(metric);
log.trace("adding {}+{} for {} {} {} {}", sum, val, metric.name(), tag,
sid.toHostPortString(), sid.getResourceGroup());
sum = add(sum, SystemInformation.getMetricValue(metric));

}
}

if (type == Type.COMPLETED) {
// Convert to a rate
return computeRate(sum);
}

return sum;
}

}
Loading