Feat/long link#141
Conversation
|
基于本轮 HTTP 长链接修复 + 覆盖率提升,我把方案文档同步更新一遍: tRPC-Java 长链接改造方案详细总结一、需求与目标1.1 master 分支问题master 分支走的是 "按需短连接 + 空闲断开" 模式:
后果:高并发短间隔的 RPC 场景下反复 connect / disconnect 浪费三次握手 + TLS 协商;半死链场景 2 小时业务静默失败;连接断开瞬间 burst 请求引发短连风暴;HTTP/2 客户端连接池上限只有 25/5,高并发瞬间排队阻塞。 1.2 目标
二、整体架构三、改造细节(按层分组)3.1 Layer 1 Cluster:
|
| 旧名(已废弃) | 新名(最终代码) |
|---|---|
RECONNECT_CHECK_PERIOD_SECONDS |
HEALTH_OBSERVE_PERIOD_SECONDS |
MAX_RECONNECT_FAILURES |
STUCK_UNAVAILABLE_THRESHOLD |
reconnectCheckerFuture |
healthObserverFuture |
ensureReconnectCheckerStarted |
ensureHealthObserverStarted |
checkAndReconnect |
observeHealth |
warnedAtMaxFailures |
warnedStuckUnavailable |
移除部分
// 删除:每 N 秒扫描所有 client,关闭 idle 超时的客户端
private static void scanIdleClient() { ... }新增轻量定时器:观测 + 阈值告警(不主动 close、不主动重连、不发心跳)
private static final int HEALTH_OBSERVE_PERIOD_SECONDS = 30;
private static final int STUCK_UNAVAILABLE_THRESHOLD = 5;
private static volatile ScheduledFuture<?> healthObserverFuture;
private static void ensureHealthObserverStarted() {
if (healthObserverFuture != null || CLOSED_FLAG.get()) return;
synchronized (RpcClusterClientManager.class) {
if (healthObserverFuture != null || CLOSED_FLAG.get()) return;
try {
healthObserverFuture = WorkerPoolManager.getShareScheduler()
.scheduleAtFixedRate(
RpcClusterClientManager::observeHealth,
HEALTH_OBSERVE_PERIOD_SECONDS,
HEALTH_OBSERVE_PERIOD_SECONDS,
TimeUnit.SECONDS);
} catch (Throwable ex) {
logger.warn("Start long-connection health observer failed; falling back to "
+ "lazy reconnect on the request path only", ex);
}
}
}
static void observeHealth() {
if (CLOSED_FLAG.get()) return;
CLUSTER_MAP.forEach((bConfig, clusterMap) -> clusterMap.forEach((key, proxy) -> {
try {
if (proxy.isAvailable()) {
proxy.failureCount.set(0);
proxy.warnedStuckUnavailable.set(false);
return;
}
int fails = proxy.failureCount.updateAndGet(
cur -> cur >= STUCK_UNAVAILABLE_THRESHOLD ? STUCK_UNAVAILABLE_THRESHOLD : cur + 1);
if (fails == STUCK_UNAVAILABLE_THRESHOLD
&& proxy.warnedStuckUnavailable.compareAndSet(false, true)) {
logger.warn("Health-observe: client {} unavailable for {} consecutive checks "
+ "(~{}s); leaving the transport intact and relying on lazy reconnect on "
+ "the request path",
proxy.getProtocolConfig().toSimpleString(), fails,
fails * HEALTH_OBSERVE_PERIOD_SECONDS);
}
} catch (Throwable ex) {
logger.error("Health-observe on client {} threw", key, ex);
}
}));
}| 维度 | master scanner | 本分支 health observer |
|---|---|---|
| 行为 | 超 idleTimeout 即 client.close(),连同 EventLoopGroup 一起拆 |
仅观察 isAvailable(),绝不主动 close、绝不主动重连、绝不发心跳 |
| 重连驱动 | scanner 拆完,下次请求重建整套 transport+EventLoopGroup | 唯一驱动源是请求路径(ensureChannelActive)+ channelInactive |
| 日志噪音 | 长期不可用时持续刷 | warnedStuckUnavailable CAS 保证只告警一次,恢复即清零 |
| 计数风险 | 无封顶 | STUCK_UNAVAILABLE_THRESHOLD = 5 封顶防 int 回卷 |
- 不主动发心跳:依赖底层 transport 在
channelInactive时已经把 channel 标 invalid,避免引入心跳协议复杂度 - 5×30s = 150s 容忍窗:避免短暂网络抖动造成误判
- 懒启动:仅当真正调用
getOrCreateClient时才注册定时任务,未启用 RPC 客户端的进程零开销 - 方案 A 取舍:保留 scanner 的本质考虑是—在主调长期 idle、又没新请求触发 lazy reconnect 的场景下,仍需要一个独立观察通道暴露"transport 卡死"问题;但 scanner 不应承担恢复职责(恢复只应由请求路径或 channelInactive 触发),故只保留观察 + 单次告警
缓存条目防误删(CAS remove)
proxy.closeFuture().whenComplete((r, e) -> {
Map<String, RpcClientProxy> clusterMap = CLUSTER_MAP.get(bConfig);
if (clusterMap != null) {
clusterMap.remove(k, proxy); // ← 关键:CAS remove(key, value)
}
});remove(k, proxy) 而不是 remove(k):旧 proxy close 后异步触发钩子时,新 proxy 可能已经放进缓存(用户主动 close → 立即重建)。无条件 remove(key) 会误删新对象导致内存泄漏与下一次请求失败。CAS 语义保证只有当缓存里的还是同一个旧 proxy 时才移除。
idleTimeout 字段保留
仅保留字段做向后兼容;在 RpcClusterClientManager 内不再读取。语义重新定义为"应用层 READ_IDLE 兜底",由 Layer 3 使用。
3.2 Layer 1 ClusterInvoker:DefClusterInvoker
ConsumerInvokerProxy<T> created = new ConsumerInvokerProxy<>(...);
invokerCache.put(key, created);
// CAS remove:只删自己注册的那一份
rpcClient.closeFuture().whenComplete((r, e) -> {
boolean removed = invokerCache.remove(key, created);
...
});3.3 Layer 2 Transport:AbstractClientTransport —— 原子切换 + 雷鸣群防护
this.channels = new CopyOnWriteArrayList<>(); // 替代 ArrayList,volatile 内存可见性
protected void ensureChannelActive(int chIndex) {
ChannelFutureItem curChannelItem = channels.get(chIndex);
if (!needsReconnect(curChannelItem)) return;
connLock.lock();
try {
// 锁内 double-check:另一个线程可能已经替换好了
ChannelFutureItem latest = channels.get(chIndex);
if (!needsReconnect(latest)) return;
channels.set(chIndex, new ChannelFutureItem(createChannel().toCompletableFuture(), config));
latest.close();
} finally {
connLock.unlock();
}
}
// isConnecting 视为"别人在连,等他"
private static boolean needsReconnect(ChannelFutureItem item) {
if (item.isNotYetConnect()) return true;
return !item.isAvailable() && !item.isConnecting();
}
@Override
public void invalidateChannel(Channel target) {
// 找到承载 target 的 slot → 用空占位替换 → 再异步 close 旧 channel
// 让 close "入队 → 完成" 这段窗口里到达的请求立刻走新建分支
for (int i = 0; i < channels.size(); i++) {
...
connLock.lock();
try {
ChannelFutureItem latest = channels.get(i);
if (latest != item) return;
channels.set(i, new ChannelFutureItem(null, config));
item.close();
} finally {
connLock.unlock();
}
}
}3.4 Layer 3+4 Netty 客户端:NettyTcpClientTransport
移除 master 反向 close 代码
- 客户端去除原
WRITE_IDLEIdleStateHandler - 服务端去除
ALL_IDLEIdleStateHandler NettyClientHandler/NettyServerHandler.userEventTriggered中 idle 触发的ctx.close()死代码
Layer 3 — 装 READ_IDLE + IdleCloseHandler(关键 bug 修复点)
// 错误:装在 NettyClientHandler 之后,永远收不到 channelActive
p.addLast("idleState", new IdleStateHandler(idleTimeoutMills, 0, 0, MS));
// 正确:addBefore("handler", ...) 装在 NettyClientHandler 之前
p.addBefore("handler", "idleState",
new IdleStateHandler(idleTimeoutMills, 0L, 0L, TimeUnit.MILLISECONDS));
p.addBefore("handler", "idleClose", new IdleCloseHandler());根因:NettyClientHandler.channelActive 没有调用 super.channelActive(ctx)(业务自有适配逻辑)。Netty 事件传播规则要求每个 handler 主动 fire* 才向下;如果 IdleStateHandler 装在 NettyClientHandler 之后,永远收不到 channelActive → timer 永远不启动 → READ_IDLE 永远不触发。必须用 addBefore("handler", ...) 把 idle handler 装在业务 handler 之前。
触发链:
new IdleStateHandler(idleTimeoutMills, 0L, 0L, MILLISECONDS) // READ_IDLE
↓ 触发
IdleCloseHandler.userEventTriggered:
1) invalidateChannel(wrapper) // 先让 slot 失效
2) ctx.close() // 再异步关 channel
为什么是 READ_IDLE:半死链时应用一直能写入 send buffer,写永远"成功";ALL_IDLE / WRITE_IDLE 在"持续请求 + 静默丢包"场景永远不触发;只有"读不到回复"才是半死链可观测信号。
IdleCloseHandler 关闭日志增强主被调信息:
logger.info("Long connection idle close: state={}, caller(local)={}, callee={}, remote={}, channelId={}",
state, local, callee, remote, channelId);Layer 4 — Linux 内核层 TCP keepalive
boolean useEpoll = Epoll.isAvailable() && config.useEpoll();
if (useEpoll) {
bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, 30);
bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, 10);
bootstrap.option(EpollChannelOption.TCP_KEEPCNT, 3);
}两层叠加:Linux 60s 内核回收 + 全平台 180s 应用兜底。
服务端安全性
- 正常 close / 进程退出:四次挥手 →
channelInactive→ 清理 - 物理断网:依赖 OS TCP keepalive(默认 ~2h)兜底,每条 ~KB 级开销可接受
- 主调侧依赖
channelInactive配合 Layer 1 的 150s 定时器观察兜底
3.5 Layer 5 资源层:NettyAbstractClientTransport —— NIO/Epoll 双 slot 引用计数共享池
旧:单 slot SHARE_EVENT_LOOP_GROUP (NIO)
新:双 slot
├── SHARE_NIO_GROUP + SHARE_NIO_USED_NUMS + NIO_LOCK
└── SHARE_EPOLL_GROUP + SHARE_EPOLL_USED_NUMS + EPOLL_LOCK
public NettyAbstractClientTransport(...) {
super(...);
if (Boolean.TRUE.equals(config.isIoThreadGroupShare())) {
this.sharedKind = wantsEpoll(config) ? SharedGroupKind.EPOLL : SharedGroupKind.NIO;
acquireSharedGroup(this.sharedKind, ...);
this.shareGroupAcquired = true;
}
}
@Override protected void doClose() {
...
if (shareGroupAcquired) {
shareGroupAcquired = false;
releaseSharedGroup(sharedKind); // 引用计数 -1,归零才 shutdownGracefully
}
}- 保留
SHARE_EVENT_LOOP_GROUP旧字段做 NIO 别名向后兼容 - UDP 取舍:
NettyUdpClientTransport仅复用 Layer 5 共享池,不参与 Layer 2/3/4(UDP 无连接、无 read idle、无 TCP keepalive),javadoc 已显式说明
3.6 Layer 6 配置层
| 字段 | 默认值 | 作用层 |
|---|---|---|
idleTimeout |
180000 ms | Layer 3 应用兜底 |
tcpKeepAliveIdle |
30 s | Layer 4 内核层 |
tcpKeepAliveIntvl |
10 s | Layer 4 内核层 |
tcpKeepAliveCnt |
3 | Layer 4 内核层 |
ioThreadGroupShare |
true | Layer 5 共享池 |
Spring Boot 用户通过 @ConfigProperty + Jackson SNAKE_CASE + BinderUtils 自动绑定 yaml 字段(如 tcp_keep_alive_idle)。
3.7 HTTP 协议长链接(重大重构)
HTTP 侧底层是 Apache HttpClient,HTTP/1 走 4.x 同步、HTTP/2(c)/HTTPS 走 5.x async。我们要解决三大类问题:
- 池层:连接池上限太小、空闲连接
NoHttpResponseException、NAT 静默回收 - 实例层:
HttpRpcClient实例本身要能被回收(避免堆积),且要能感知"持续失败" - 生命周期:H2/HTTPS 的
doOpen异常静默吞 → "假活" client 被业务踩 NPE
修复 #1:H2/HTTPS 连接池配置(之前完全裸建)
// Http2cRpcClient.doOpen
PoolingAsyncClientConnectionManager cm = PoolingAsyncClientConnectionManagerBuilder.create()
.setMaxConnTotal(maxConns) // ← 之前默认 25,高并发瞬间排队
.setMaxConnPerRoute(maxConns) // ← 之前默认 5
.setConnPoolPolicy(PoolReusePolicy.LIFO)
.setValidateAfterInactivity(TimeValue.ofMilliseconds(2000)) // 2s idle 重新校验
.setConnectionTimeToLive(TimeValue.ofMinutes(10)) // 10min 强制重建(K8s pod drift)
.build();
httpAsyncClient = HttpAsyncClients.custom()
.setConnectionManager(cm)
.setIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())
.evictExpiredConnections()
.evictIdleConnections(TimeValue.ofSeconds(60))
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
.build();Http2RpcClient (TLS / HTTP/2) 用同样的池配置。HttpsRpcClient 继承 Http2RpcClient 只切版本策略。
修复 #2:HTTP/1 KeepAliveStrategy + connectionTimeToLive
// HttpRpcClient.doOpen
.setKeepAliveStrategy(HttpRpcClient::resolveKeepAliveDuration) // 提取为 public static 便于测试
.setConnectionTimeToLive(10, TimeUnit.MINUTES)
// 提取出的 resolver
public static long resolveKeepAliveDuration(HttpResponse rsp, HttpContext ctx) {
long fallbackMs = TimeUnit.MINUTES.toMillis(5); // 短于典型 NAT (5–15min)
Header h = rsp.getFirstHeader("Keep-Alive");
if (h != null) {
for (HeaderElement el : h.getElements()) {
if ("timeout".equalsIgnoreCase(el.getName()) && el.getValue() != null) {
try {
long server = Long.parseLong(el.getValue()) * 1000L;
return Math.min(server, fallbackMs);
} catch (NumberFormatException ignore) {
// fall through
}
}
}
}
return fallbackMs;
}设计要点:
- 服务端不返
Keep-Aliveheader → 强制 5min 上限(不再"无穷大"),避免 NAT 静默回收后下次请求踩到死连接 connectionTimeToLive=10min是硬上限,热连接也强制重建——这是回收 K8s pod IP 漂移的关键
修复 #3:实例层失败计数(与 trpc 协议侧 channelInactive→cluster 摘缓存对齐)
private final AtomicInteger consecutiveFailures = new AtomicInteger(0);
private volatile long lastUsedNanos = System.nanoTime();
public void markUsed() { lastUsedNanos = System.nanoTime(); }
public void markSuccess() { consecutiveFailures.set(0); }
public void markFailure() { consecutiveFailures.incrementAndGet(); }
@Override
public boolean isAvailable() {
if (!super.isAvailable()) return false;
if (consecutiveFailures.get() >= 50) return false; // 持续失败 → 摘缓存
return (System.nanoTime() - lastUsedNanos) <= MINUTES.toNanos(10); // 10min idle 视为 orphan
}invoker 入口 markUsed,成功 markSuccess,异常路径 markFailure:
public Response send(Request request) throws Exception {
httpRpcClient.markUsed(); // 入口
HttpPost httpPost;
try { httpPost = buildRequest(request); }
catch (Exception ex) { httpRpcClient.markFailure(); return RpcUtils.newResponse(request, null, ex); }
try (CloseableHttpResponse rsp = httpClient.execute(httpPost)) {
Response response = handleResponse(request, rsp);
if (response.getException() == null) httpRpcClient.markSuccess();
else httpRpcClient.markFailure();
return response;
} catch (Exception ex) {
httpRpcClient.markFailure();
return RpcUtils.newResponse(request, null, ex);
}
}对齐效果:HTTP 侧也具备和 trpc 协议同样的"持续失败 → cluster 摘缓存 → lazy 重建"链路。原来只能靠"冷却 10min"被动回收,现在 50 次连续失败立即标 unavailable。
修复 #4:Http2cRpcClient / Http2RpcClient.doOpen 异常上抛
之前:
} catch (Exception e) {
logger.error("httpAsyncClient error: ", e); // ← 仅打日志,open() 假装成功
}修复后:
} catch (Exception e) {
String desc = protocolConfig != null ? protocolConfig.toSimpleString() : "<null>";
throw TRpcException.newFrameException(ErrorCode.TRPC_CLIENT_CONNECT_ERR,
"open http2c client (" + desc + ") failed", e);
}让 lifecycle 真正进入 FAILED → isAvailable() 返 false → cluster 摘缓存。null-safe 是为了让测试可以反射置空 protocolConfig 复现失败路径。
修复 #5:杂项
- logger 类名修正:
Http2cRpcClient/Http2RpcClient之前都是LoggerFactory.getLogger(HttpRpcClient.class)(拷错),改为各自类 - 删除 shaded 依赖:
Http2ConsumerInvoker删掉autovalue.shaded.com.google.common.common.base.Objects,改java.util.Objects.equals Http2RpcClient.evictIdleSeconds()这种 dead code 一并清理
3.8 HTTP 高并发断连场景的并发安全分析
主调或被调断开连接 + 高并发并发请求时是否安全?逐项分析:
| 场景 | race 点 | 结论 |
|---|---|---|
N 业务线程并发 httpClient.execute |
HTTP/1 走 PoolingHttpClientConnectionManager 全局锁、HTTP/2 走单连接多路复用 selector |
HttpClient 内部 lock,无 race |
markUsed / markFailure 与 isAvailable 并发读 |
volatile long + AtomicInteger |
内存可见性保证,无 race |
临界点 consecutiveFailures 49→50 |
多线程瞬时都看到 50+ → 都判 unavailable | 一次 markSuccess 归零即恢复,最终一致 |
业务线程并发 close client(shutdownBackendConfig) |
A 线程 in-flight execute、B 线程 close() → A 拿到 IllegalStateException("I/O reactor has been shut down") |
A 走 catch → markFailure + 包到 Response.exception。AtomicInteger 永远可写,业务侧拿到失败 Response 自然处理,无数据破坏 |
| 服务端瞬间 RST 大量连接(kill -9) | N 个 in-flight 各自 IOException | 各自 markFailure → 50+ 失败 → cluster 摘缓存 → lazy 重建 |
| open 失败假活(旧 bug) | 之前 httpAsyncClient = null 但 isAvailable=true,业务 NPE |
已修:doOpen 抛 TRpcException → lifecycle FAILED |
结论:所有高并发断连场景下没有数据竞争或破坏,只会出现"业务请求失败"的预期表现,并被失败计数 + 池 evict + cluster CAS 摘缓存的链路自动恢复。
四、对比 master 的核心优势(一表速览)
| 维度 | master | 本分支 |
|---|---|---|
| 长连接是否真正长存 | ✗ 双向 IdleStateHandler 主动断 | ✓ 显式 close 才断 |
| 半死链恢复时间 | 7200s(OS 默认 keepalive) | Linux+epoll ~60s / 全平台 180s |
| Cluster scanner 行为 | 拆 transport 拆 EventLoopGroup | 仅观察 + 一次性告警,不主动 close、不主动重连、不发心跳 |
| 缓存与 transport 一致性 | 无条件 remove(key) |
closeFuture + CAS remove(key, value) |
channels 内存可见性 |
ArrayList |
CopyOnWriteArrayList |
| 雷鸣群防护 | 无 | 锁内 double-check + isConnecting 让路 |
| 关闭瞬间窗口 | 请求可能写到 dying channel | invalidateChannel 先置空 slot 再 close |
| Netty pipeline 顺序 | idle handler 加在末尾,本项目场景永不触发 | addBefore("handler", ...) 强制装在业务 handler 之前 |
| 共享池 vs epoll | 互斥,二选一 | 双 slot 引用计数,两者兼得 |
| 可调参数 | 仅 idleTimeout(语义混乱) |
4 个参数全 yaml 可配 + Spring schema |
| HTTP/2 池配置 | 完全裸建(25/5) | maxConns / validateAfterInactivity / TTL / SO_KEEPALIVE 全配 |
| HTTP keep-alive ceiling | 无(依赖 server header) | 5min fallback 上限(破 NAT 静默回收) |
| HTTP 持续失败感知 | 无 | consecutiveFailures 50 次翻 unavailable + markSuccess 重置 |
| HTTP/2 doOpen 失败 | 静默吞 → "假活"client | 抛 TRpcException → lifecycle FAILED |
| HTTP logger 路由 | 三类全用 HttpRpcClient.class(错位) |
各自类,错位修正 |
五、完整防泄漏链路
场景 A:被调(服务端)下线/IP 漂移
↓
(A1)正常 FIN/RST:主调侧 Netty 立即 channelInactive → 清理 NettyChannel
(A2)半死链(Linux+epoll):内核 30s + 10s×3 ≈ 60s RST → channelInactive
(A3)半死链(其他平台):READ_IDLE 180s 触发 → invalidateChannel + ctx.close → channelInactive
(A4)HTTP/1:池层 evictIdle(60s) + connectionTTL(10min) + KeepAlive(5min ceiling) 三重剔除
(A5)HTTP/2:连续 50 次 markFailure → isAvailable=false → cluster 摘缓存
↓
后续 RpcClient.isAvailable() 返回 false
↓
RpcClusterClientManager 的 30s 定时器观察到 unavailable,failureCount++
↓
连续 5 次累计 ≈ 150s(trpc 协议下 Layer 3/4 已主动 close 走 closeFuture,更快)
↓
proxy.closeFuture() 完成 → CLUSTER_MAP.remove(k, proxy) CAS 摘缓存
↓
DefClusterInvoker.invokerCache 同步 CAS remove
↓
内存被 GC
场景 B:主调(客户端)异常下线
↓
进程退出 → OS 发 FIN 给被调
↓
被调 Netty 收到 channelInactive → 清理服务端 channel
↓
(拔网线场景)依赖 OS keepalive ~2h 兜底(服务端未配 epoll keepalive,是已知取舍)
六、单测覆盖
| 测试类 | 用途 | 关键技术 |
|---|---|---|
RpcClusterClientManagerTest(增强) |
主流程:getOrCreateClient / close / observeHealth / Proxy delegate | 纯 JUnit + Stub |
DefClusterInvokerCloseFutureTest(新建) |
CAS remove 防误删语义 | 纯 JUnit |
RpcClusterLoggerLevelTest(新建) |
覆盖 3 个 logger.isDebugEnabled()==false 分支 |
log4j2 LoggerContext.updateLoggers() 临时调级别 |
RpcClusterSchedulerRejectTest(新建) |
覆盖 scheduler 抛 RejectedExecutionException 的 catch 分支 |
反射替换 WorkerPoolManager.shareScheduler(避开 PowerMock) |
AbstractClientTransportTest(增强,+9 case) |
雷鸣群 double-check + invalidateChannel 原子切换 | 纯 JUnit + Stub |
NettyAbstractClientTransportTest(新建,6 case) |
NIO/Epoll 双 slot 引用计数语义 | 纯 JUnit |
NettyTcpClientTransportTest(新建/增强,6 case) |
doOpen 装 idle pipeline 顺序、TCP_KEEPIDLE 透传 | 离线 EmbeddedChannel + 反射调用 ChannelInitializer |
NettyTcpClientIdleCloseTest(新建) |
READ_IDLE 触发 invalidateChannel + close 端到端 | 真实 Netty server/client |
MultiPortNamingUrlConcurrentTest#testIdleTimeoutChannelRecycle(新建) |
e2e 长链接 idle 回收:100 线程 × 1000 请求 × 5 connsPerAddr 验证 50 条 channel 全部 fire/close | 多端口 server + 多线程并发 |
BaseProtocolConfigTest / AbstractProtocolSchemaTest(增强) |
tcp_keep_alive 三参数解析与默认值 | 纯 JUnit |
BindTest2 + application-bind-test2.yml(增强) |
Spring Boot yaml 自动绑定端到端 | Spring Test |
HttpRpcClientLongLinkTest(增强到 22 case) |
markUsed / isAvailable / markSuccess / markFailure 全链路 + KeepAliveStrategy 5 个分支 + doClose IOException + Http2c/Http2 doOpen 失败上抛 + 32 线程 × 1000 次 AtomicInteger 压测 | Mockito mock + 反射调 doOpen |
HttpMultiPortNamingUrlConcurrentTest(新建) |
HTTP 100 线程 × 1000 请求 × 10 endpoint 多端口并发 + 池层长连接复用 | 纯 JUnit |
HTTP 长链接相关类覆盖率(指令级)
| 类 | 修复前 | 最终 | 备注 |
|---|---|---|---|
HttpRpcClient |
68.8% | 100% | KeepAliveStrategy 提取后所有分支可测 |
Http2cRpcClient |
76.5% | 97.7% | 仅余 4 条 branch 细节 |
Http2RpcClient |
70.5% | 100% | 删除 dead code + 反射调 doOpen 测异常上抛 |
HttpsRpcClient |
100% | 100% | — |
HttpConsumerInvoker |
96.2% | 96.2% | — |
Http2ConsumerInvoker |
96.2% | 96.2% | FutureCallback inner 58.8%(原代码就有) |
HttpRpcClientFactory |
100% | 100% | — |
AbstractConsumerInvoker |
91.4% | 91.4% | — |
所有本次改动的类 ≥ 90%,其中 4 个 100%。
RpcClusterClientManager 行覆盖 93%。
关键单测设计取舍
- PowerMock 在 JDK17 下崩溃(
MagicAccessorImpl限制)→ 项目锁定 JDK8,PowerMock 大部分场景可用;但RpcClusterSchedulerRejectTest仍改用反射替换静态字段,避免 PowerMock 全 JVM 副作用 - PowerMock 的 JVM 级污染问题(
NettyTcpClientTransportTest踩坑):早期实现起真实 TCP listen + connect,全量测试时被前面用了 PowerMock 的失败 case 污染 EventLoop / classloader 状态导致挂连接 race。最终改为 离线 EmbeddedChannel + 反射调用ChannelInitializer.initChannel(Channel),完全脱离真实 EventLoop - logger 分支覆盖:
Whitebox.setInternalState在 PowerMock 1.7.4 + JDK8 下也修改不了 static-final → 改用 log4j2 标准 API 调级别 - HTTP doOpen 失败路径:直接
client.open()会被 lifecycle 的Objects.requireNonNull(protocolConfig)拦在前面拿不到 catch 分支 → 反射调doOpen()绕过 lifecycle,专门覆盖 catch - HTTP KeepAliveStrategy 不可测:原 lambda 私有 → 提取为
public static long resolveKeepAliveDuration,5 个分支全覆盖(无 header、server 小于上限、server 大于上限钳位、timeout=abcNumberFormatException、max=100无 timeout key) - e2e 长连接 idle 回收测试:用
idleTimeout=10s、跑 100 线程 × 1000 请求 × 5 connsPerAddr 共 50 条 channel;assert 50 条全部 fire IdleCloseHandler + 全部 close + 后续请求自动 lazy reconnect 成功
七、改动文件清单
Layer 1 Cluster
RpcClusterClientManager.java— scanner 改纯观察 + closeFuture CAS remove + 命名按方案 A 重命名DefClusterInvoker.java— invokerCache CAS remove 修复
Layer 2 Transport
AbstractClientTransport.java—channels改CopyOnWriteArrayList+ 锁内 double-check + 新增invalidateChannelClientTransport.java— 接口加default invalidateChannel
Layer 3+4 Netty
NettyTcpClientTransport.java— 移除旧 WRITE_IDLE,addBefore("handler", "idleState/idleClose", ...)装 READ_IDLE + IdleCloseHandler;epoll TCP keepalive;IdleCloseHandler 日志增加主被调信息(state、caller(local)、callee、remote、channelId)NettyTcpServerTransport.java— 移除 ALL_IDLE handlerNettyClientHandler.java/NettyServerHandler.java— 删除 idle close 死代码
Layer 5 资源层
NettyAbstractClientTransport.java— NIO/Epoll 双 slot 引用计数共享池NettyUdpClientTransport.java— 跟随 useEpoll 选 EpollDatagramChannel + 共享池;javadoc 明确 UDP 仅复用 Layer 5
Layer 6 配置
Constants.java— 新增DEFAULT_TCP_KEEPALIVE_IDLE/INTVL/CNT,DEFAULT_IDLE_TIMEOUT改 180000BaseProtocolConfig.java/BackendConfig.java— 三个 keepalive 字段透传AbstractProtocolSchema.java— Spring schema 同步加字段
HTTP 长链接(重大重构)
HttpRpcClient.javaevictExpired/evictIdle+validateAfterInactivity+connectionTimeToLive(10min)+KeepAliveStrategy5min ceilinglastUsedNanos+consecutiveFailures(50)+markUsed/markSuccess/markFailure+ 重写isAvailableresolveKeepAliveDuration提取为 public static 便于测试
Http2cRpcClient.java- 完整池配置(之前裸建):
maxConnTotal/maxConnPerRoute/validateAfterInactivity/TTL/PoolReusePolicy/SO_KEEPALIVE/FORCE_HTTP_2 evictExpired + evictIdle(60s)lastUsedNanos+consecutiveFailures同 HttpRpcClientdoOpen异常上抛 TRpcException(不再静默吞)- logger 类名修正为
Http2cRpcClient.class
- 完整池配置(之前裸建):
Http2RpcClient.java- 完整池配置 +
evictExpired/evictIdle/SO_KEEPALIVE doOpen异常上抛 TRpcException- logger 类名修正
- 删 dead code(
evictIdleSeconds()/ 多余的super.doClose()注释)
- 完整池配置 +
HttpsRpcClient.java— 继承Http2RpcClient,行为自动跟随HttpConsumerInvoker.javamarkUsed移到入口(与失败路径对齐)- 成功
markSuccess/ 异常markFailure
Http2ConsumerInvoker.javamarkUsed入口- 成功 / 失败分支 markSuccess/markFailure
- 删除
autovalue.shaded.com.google.common.common.base.Objects,改java.util.Objects
单测
RpcClusterClientManagerTest(增强)DefClusterInvokerCloseFutureTest(新建)RpcClusterLoggerLevelTest(新建)RpcClusterSchedulerRejectTest(新建,反射替换静态字段)AbstractClientTransportTest(增强,+9 case)NettyAbstractClientTransportTest(新建,6 case)NettyTcpClientTransportTest(新建/增强,6 case,离线 EmbeddedChannel)NettyTcpClientIdleCloseTest(新建)MultiPortNamingUrlConcurrentTest(新建 +testIdleTimeoutChannelRecycle)HttpMultiPortNamingUrlConcurrentTest(新建 +@Before AbstractConsumerInvoker.reset()解决 HashedWheelTimer 跨测试污染)BaseProtocolConfigTest/AbstractProtocolSchemaTest(增强)BindTest2+application-bind-test2.yml(增强)HttpRpcClientLongLinkTest(增强到 22 case:失败计数 + 32×1000 并发原子计数压测 + KeepAliveStrategy 5 分支 + doClose IOException + doOpen 失败上抛 + Http2/Https 继承)
八、生产推荐配置
trpc:
client:
io_mode: epoll # 关键:开 epoll 才能享受 Layer 4 的 60s 内核回收
# 以下保持默认即可
# io_thread_group_share: true
# idle_timeout: 180000
# tcp_keep_alive_idle: 30
# tcp_keep_alive_intvl: 10
# tcp_keep_alive_cnt: 3收益:内核 ≈60s 半死链恢复 + 应用 180s 兜底 + 切换原子化 + 共享线程池零额外开销 + 不再雷鸣群 + HTTP 实例不堆积 + HTTP/2 连接池真正可用 + HTTP 持续失败可被识别。
九、本次方案与代码同步的关键修订点(相对早期版本)
- scanner 命名:
reconnect/checkAndReconnect/MAX_RECONNECT_FAILURES→health observer / observeHealth / STUCK_UNAVAILABLE_THRESHOLD,避免名字暗示"会主动重连" - Netty pipeline 顺序生产 bug:必须
addBefore("handler", ...),否则IdleStateHandler永不启动 timer(项目里NettyClientHandler.channelActive不调 super) - IdleCloseHandler 日志可观测性:补全 state/caller/callee/remote/channelId
- e2e 长连接 idle 回收测试:100 线程 × 1000 请求 × 5 connsPerAddr,覆盖 50 条 channel 的全部生命周期
- NettyTcpClientTransportTest 离线化:用 EmbeddedChannel + 反射调用 ChannelInitializer,避免被 PowerMock 失败用例的 JVM 状态污染
- RpcClusterSchedulerRejectTest 去 PowerMock:改反射替换
WorkerPoolManager.shareScheduler - HTTP/2 池配置补齐:
Http2cRpcClient/Http2RpcClient完整配maxConns / validateAfterInactivity / connectionTimeToLive / evictIdle / SO_KEEPALIVE / VersionPolicy,从 0 到 1 - HTTP keep-alive ceiling:
HttpRpcClient加KeepAliveStrategy5min fallback +connectionTimeToLive(10min),破 NAT 静默回收 - HTTP 实例层失败计数:
consecutiveFailuresAtomicInteger +markSuccess/markFailure,与 trpc 协议侧 cluster 摘缓存机制对齐 - HTTP/2 doOpen 不再吞异常:
Http2cRpcClient.doOpen/Http2RpcClient.doOpen抛 TRpcException 让 lifecycle 进 FAILED;catch 块用null-safe表达式(protocolConfig != null ? toSimpleString : "<null>")让测试可反射置空触发 - HTTP logger 路由修正:
Http2cRpcClient/Http2RpcClient不再用HttpRpcClient.class,避免日志通道串 - HTTP shaded 依赖清除:
Http2ConsumerInvoker移除autovalue.shaded.Objects - HTTP
KeepAliveStrategy提取为 public static:HttpRpcClient.resolveKeepAliveDuration让 5 个分支可被纯 Mockito 单测覆盖 - HTTP 测试隔离:
HttpMultiPortNamingUrlConcurrentTest加@Before AbstractConsumerInvoker.reset()解决HashedWheelTimer cannot be started once stopped跨测试污染
No description provided.