Skip to content

Feat/long link#141

Open
wardseptember wants to merge 9 commits into
trpc-group:masterfrom
wardseptember:feat/long_link
Open

Feat/long link#141
wardseptember wants to merge 9 commits into
trpc-group:masterfrom
wardseptember:feat/long_link

Conversation

@wardseptember
Copy link
Copy Markdown
Collaborator

No description provided.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 16, 2026

Codecov Report

❌ Patch coverage is 89.70976% with 39 lines in your changes missing coverage. Please review.
✅ Project coverage is 86.34156%. Comparing base (063c0d2) to head (aeb0cc1).

Files with missing lines Patch % Lines
.../trpc/transport/netty/NettyTcpClientTransport.java 85.48387% 9 Missing ⚠️
...t/trpc/core/transport/AbstractClientTransport.java 83.78378% 6 Missing ⚠️
...ent/trpc/core/cluster/RpcClusterClientManager.java 92.59259% 4 Missing ⚠️
...ncent/trpc/core/cluster/def/DefClusterInvoker.java 55.55556% 4 Missing ⚠️
...t/trpc/proto/http/client/Http2ConsumerInvoker.java 73.33333% 4 Missing ⚠️
...nt/trpc/proto/http/client/HttpConsumerInvoker.java 71.42857% 4 Missing ⚠️
.../transport/netty/NettyAbstractClientTransport.java 93.10345% 4 Missing ⚠️
.../trpc/transport/netty/NettyUdpClientTransport.java 66.66667% 3 Missing ⚠️
...m/tencent/trpc/core/transport/ClientTransport.java 0.00000% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@                  Coverage Diff                  @@
##                master        #141         +/-   ##
=====================================================
+ Coverage     85.87630%   86.34156%   +0.46526%     
- Complexity        4328        4401         +73     
=====================================================
  Files              436         437          +1     
  Lines            14373       14621        +248     
  Branches          1287        1330         +43     
=====================================================
+ Hits             12343       12624        +281     
+ Misses            2030        1997         -33     
Files with missing lines Coverage Δ
...n/java/com/tencent/trpc/core/common/Constants.java 66.66667% <ø> (ø)
...tencent/trpc/core/common/config/BackendConfig.java 97.17868% <100.00000%> (+0.02678%) ⬆️
...nt/trpc/core/common/config/BaseProtocolConfig.java 100.00000% <100.00000%> (ø)
...tencent/trpc/proto/http/client/Http2RpcClient.java 100.00000% <100.00000%> (+9.09090%) ⬆️
...encent/trpc/proto/http/client/Http2cRpcClient.java 100.00000% <100.00000%> (+18.75000%) ⬆️
.../tencent/trpc/proto/http/client/HttpRpcClient.java 100.00000% <100.00000%> (+15.78946%) ⬆️
...t/configuration/schema/AbstractProtocolSchema.java 100.00000% <100.00000%> (ø)
...ncent/trpc/transport/netty/NettyClientHandler.java 100.00000% <ø> (ø)
...ncent/trpc/transport/netty/NettyServerHandler.java 100.00000% <ø> (ø)
.../trpc/transport/netty/NettyTcpServerTransport.java 84.25926% <100.00000%> (-0.56218%) ⬇️
... and 9 more

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@wardseptember
Copy link
Copy Markdown
Collaborator Author

wardseptember commented May 16, 2026

基于本轮 HTTP 长链接修复 + 覆盖率提升,我把方案文档同步更新一遍:


tRPC-Java 长链接改造方案详细总结

一、需求与目标

1.1 master 分支问题

master 分支走的是 "按需短连接 + 空闲断开" 模式:

  • RpcClusterClientManager 内置 idle scanner:周期扫描所有缓存的 RpcClient,超 idleTimeout 没用就 client.close(),连同底层 EventLoopGroup 一起拆掉
  • Netty 客户端 pipeline 注册 IdleStateHandler(0, idleTimeout, 0)WRITE_IDLE),半死链场景应用持续写入 send buffer,永远不会触发
  • Netty 服务端 pipeline 注册 IdleStateHandler(0, 0, idleTimeout)ALL_IDLE),客户端纯空闲 → 服务端反向 close
  • NettyClientHandler / NettyServerHandleruserEventTriggered 收到 IdleStateEvent 直接 ctx.close() —— 双向"闲就断"
  • SO_KEEPALIVE=true(OS 默认 7200s),半死链 2 小时无感知
  • AbstractClientTransport.ensureChannelActive 锁内无 double-check,断开瞬间 N 个并发请求各起一条物理连接(雷鸣群
  • 共享 EventLoopGroup 仅 NIO 单 slot,Linux 用户在"享受 epoll"和"共享 IO 线程池"中二选一
  • HTTP 协议侧:Http2cRpcClient / Http2RpcClient / HttpsRpcClient 完全没有连接池配置(裸 HttpAsyncClients.customHttp2().build());Http2RpcClient.doOpen catch 后静默吞异常导致 client "假活";Http2ConsumerInvoker 引入了 autovalue.shaded 内部包;三类 logger 全用 HttpRpcClient.class 路由错位

后果:高并发短间隔的 RPC 场景下反复 connect / disconnect 浪费三次握手 + TLS 协商;半死链场景 2 小时业务静默失败;连接断开瞬间 burst 请求引发短连风暴;HTTP/2 客户端连接池上限只有 25/5,高并发瞬间排队阻塞。

1.2 目标

  1. 真长链接:连接建立后保持,任一端不主动断
  2. 半死链快速恢复:Linux+epoll 60s 内、全平台 180s 内可恢复
  3. 不内存泄漏:客户端/服务端异常下线场景下,缓存里的死连接对象要能被回收
  4. 不连接风暴:断开瞬间 N 个并发请求只新建 1 条物理连接
  5. 修改尽量简单、轻量,保留 idleTimeout 字段做兼容(语义重新定义为应用层兜底)
  6. trpc 协议http 协议 都要支持长链接
  7. 修改内容单测覆盖率尽可能高

二、整体架构

┌──────────────────────────────────────────────────────────────────────┐
│ Layer 6 配置层    Constants / BaseProtocolConfig / AbstractProtocolSchema│
│                   idleTimeout=180000 / tcpKeepAlive=30/10/3              │
├──────────────────────────────────────────────────────────────────────┤
│ Layer 5 资源层    NettyAbstractClientTransport                          │
│                   NIO/Epoll 双 slot 共享池 + 引用计数                    │
├──────────────────────────────────────────────────────────────────────┤
│ Layer 4 内核层    Linux epoll: TCP_KEEPIDLE=30 / INTVL=10 / CNT=3      │
│                   半死链 ≈60s 内由内核 RST 上抛 channelInactive          │
├──────────────────────────────────────────────────────────────────────┤
│ Layer 3 应用层    NettyTcpClientTransport                               │
│                   IdleStateHandler READ_IDLE=180s + IdleCloseHandler    │
│                   pipeline 顺序:encoder/decoder/idleState/idleClose/handler │
├──────────────────────────────────────────────────────────────────────┤
│ Layer 2 连接管理  AbstractClientTransport                               │
│                   CopyOnWriteArrayList + ensureChannelActive 锁内 DC    │
│                   invalidateChannel 原子置空 slot                       │
├──────────────────────────────────────────────────────────────────────┤
│ Layer 1 Cluster   RpcClusterClientManager / DefClusterInvoker           │
│                   health observer 只观察 + closeFuture CAS remove       │
└──────────────────────────────────────────────────────────────────────┘

HTTP 侧(双层防护):
  ├── 池层:PoolingHttpClientConnectionManager
  │       maxTotal/maxPerRoute=maxConns + validateAfterInactivity=2s
  │       evictExpired + evictIdle(60s) + connectionTimeToLive(10min)
  │       HTTP/1: KeepAliveStrategy 5min ceiling
  │       HTTP/2: SO_KEEPALIVE on IOReactor
  └── 实例层:HttpRpcClient / Http2cRpcClient / Http2RpcClient
         lastUsedNanos + consecutiveFailures(50) → isAvailable()
         markUsed/markSuccess/markFailure 由 invoker 驱动

三、改造细节(按层分组)

3.1 Layer 1 Cluster:RpcClusterClientManager —— health observer 只观察、不踢人

这是 trpc 长链接改造核心

命名说明(方案 A:保留观察器,去除"reconnect"误导命名)

scanner 既不主动重连也不主动 close,只做健康观察 + 一次性告警。为避免命名歧义,统一改为 health observer 系列:

旧名(已废弃) 新名(最终代码)
RECONNECT_CHECK_PERIOD_SECONDS HEALTH_OBSERVE_PERIOD_SECONDS
MAX_RECONNECT_FAILURES STUCK_UNAVAILABLE_THRESHOLD
reconnectCheckerFuture healthObserverFuture
ensureReconnectCheckerStarted ensureHealthObserverStarted
checkAndReconnect observeHealth
warnedAtMaxFailures warnedStuckUnavailable

移除部分

// 删除:每 N 秒扫描所有 client,关闭 idle 超时的客户端
private static void scanIdleClient() { ... }

新增轻量定时器:观测 + 阈值告警(不主动 close、不主动重连、不发心跳)

private static final int HEALTH_OBSERVE_PERIOD_SECONDS = 30;
private static final int STUCK_UNAVAILABLE_THRESHOLD = 5;
private static volatile ScheduledFuture<?> healthObserverFuture;

private static void ensureHealthObserverStarted() {
    if (healthObserverFuture != null || CLOSED_FLAG.get()) return;
    synchronized (RpcClusterClientManager.class) {
        if (healthObserverFuture != null || CLOSED_FLAG.get()) return;
        try {
            healthObserverFuture = WorkerPoolManager.getShareScheduler()
                    .scheduleAtFixedRate(
                        RpcClusterClientManager::observeHealth,
                        HEALTH_OBSERVE_PERIOD_SECONDS,
                        HEALTH_OBSERVE_PERIOD_SECONDS,
                        TimeUnit.SECONDS);
        } catch (Throwable ex) {
            logger.warn("Start long-connection health observer failed; falling back to "
                    + "lazy reconnect on the request path only", ex);
        }
    }
}

static void observeHealth() {
    if (CLOSED_FLAG.get()) return;
    CLUSTER_MAP.forEach((bConfig, clusterMap) -> clusterMap.forEach((key, proxy) -> {
        try {
            if (proxy.isAvailable()) {
                proxy.failureCount.set(0);
                proxy.warnedStuckUnavailable.set(false);
                return;
            }
            int fails = proxy.failureCount.updateAndGet(
                    cur -> cur >= STUCK_UNAVAILABLE_THRESHOLD ? STUCK_UNAVAILABLE_THRESHOLD : cur + 1);
            if (fails == STUCK_UNAVAILABLE_THRESHOLD
                    && proxy.warnedStuckUnavailable.compareAndSet(false, true)) {
                logger.warn("Health-observe: client {} unavailable for {} consecutive checks "
                        + "(~{}s); leaving the transport intact and relying on lazy reconnect on "
                        + "the request path",
                        proxy.getProtocolConfig().toSimpleString(), fails,
                        fails * HEALTH_OBSERVE_PERIOD_SECONDS);
            }
        } catch (Throwable ex) {
            logger.error("Health-observe on client {} threw", key, ex);
        }
    }));
}
维度 master scanner 本分支 health observer
行为 超 idleTimeout 即 client.close(),连同 EventLoopGroup 一起拆 仅观察 isAvailable()绝不主动 close、绝不主动重连、绝不发心跳
重连驱动 scanner 拆完,下次请求重建整套 transport+EventLoopGroup 唯一驱动源是请求路径(ensureChannelActive)+ channelInactive
日志噪音 长期不可用时持续刷 warnedStuckUnavailable CAS 保证只告警一次,恢复即清零
计数风险 无封顶 STUCK_UNAVAILABLE_THRESHOLD = 5 封顶防 int 回卷
  • 不主动发心跳:依赖底层 transport 在 channelInactive 时已经把 channel 标 invalid,避免引入心跳协议复杂度
  • 5×30s = 150s 容忍窗:避免短暂网络抖动造成误判
  • 懒启动:仅当真正调用 getOrCreateClient 时才注册定时任务,未启用 RPC 客户端的进程零开销
  • 方案 A 取舍:保留 scanner 的本质考虑是—在主调长期 idle、又没新请求触发 lazy reconnect 的场景下,仍需要一个独立观察通道暴露"transport 卡死"问题;但 scanner 不应承担恢复职责(恢复只应由请求路径或 channelInactive 触发),故只保留观察 + 单次告警

缓存条目防误删(CAS remove)

proxy.closeFuture().whenComplete((r, e) -> {
    Map<String, RpcClientProxy> clusterMap = CLUSTER_MAP.get(bConfig);
    if (clusterMap != null) {
        clusterMap.remove(k, proxy);   // ← 关键:CAS remove(key, value)
    }
});

remove(k, proxy) 而不是 remove(k):旧 proxy close 后异步触发钩子时,新 proxy 可能已经放进缓存(用户主动 close → 立即重建)。无条件 remove(key)误删新对象导致内存泄漏与下一次请求失败。CAS 语义保证只有当缓存里的还是同一个旧 proxy 时才移除。

idleTimeout 字段保留

仅保留字段做向后兼容;在 RpcClusterClientManager 内不再读取。语义重新定义为"应用层 READ_IDLE 兜底",由 Layer 3 使用。


3.2 Layer 1 ClusterInvoker:DefClusterInvoker

ConsumerInvokerProxy<T> created = new ConsumerInvokerProxy<>(...);
invokerCache.put(key, created);
// CAS remove:只删自己注册的那一份
rpcClient.closeFuture().whenComplete((r, e) -> {
    boolean removed = invokerCache.remove(key, created);
    ...
});

3.3 Layer 2 Transport:AbstractClientTransport —— 原子切换 + 雷鸣群防护

this.channels = new CopyOnWriteArrayList<>();   // 替代 ArrayList,volatile 内存可见性

protected void ensureChannelActive(int chIndex) {
    ChannelFutureItem curChannelItem = channels.get(chIndex);
    if (!needsReconnect(curChannelItem)) return;
    connLock.lock();
    try {
        // 锁内 double-check:另一个线程可能已经替换好了
        ChannelFutureItem latest = channels.get(chIndex);
        if (!needsReconnect(latest)) return;
        channels.set(chIndex, new ChannelFutureItem(createChannel().toCompletableFuture(), config));
        latest.close();
    } finally {
        connLock.unlock();
    }
}

// isConnecting 视为"别人在连,等他"
private static boolean needsReconnect(ChannelFutureItem item) {
    if (item.isNotYetConnect()) return true;
    return !item.isAvailable() && !item.isConnecting();
}

@Override
public void invalidateChannel(Channel target) {
    // 找到承载 target 的 slot → 用空占位替换 → 再异步 close 旧 channel
    // 让 close "入队 → 完成" 这段窗口里到达的请求立刻走新建分支
    for (int i = 0; i < channels.size(); i++) {
        ...
        connLock.lock();
        try {
            ChannelFutureItem latest = channels.get(i);
            if (latest != item) return;
            channels.set(i, new ChannelFutureItem(null, config));
            item.close();
        } finally {
            connLock.unlock();
        }
    }
}

3.4 Layer 3+4 Netty 客户端:NettyTcpClientTransport

移除 master 反向 close 代码

  • 客户端去除原 WRITE_IDLE IdleStateHandler
  • 服务端去除 ALL_IDLE IdleStateHandler
  • NettyClientHandler / NettyServerHandler.userEventTriggered 中 idle 触发的 ctx.close() 死代码

Layer 3 — 装 READ_IDLE + IdleCloseHandler(关键 bug 修复点)

// 错误:装在 NettyClientHandler 之后,永远收不到 channelActive
p.addLast("idleState", new IdleStateHandler(idleTimeoutMills, 0, 0, MS));

// 正确:addBefore("handler", ...) 装在 NettyClientHandler 之前
p.addBefore("handler", "idleState",
        new IdleStateHandler(idleTimeoutMills, 0L, 0L, TimeUnit.MILLISECONDS));
p.addBefore("handler", "idleClose", new IdleCloseHandler());

根因NettyClientHandler.channelActive 没有调用 super.channelActive(ctx)(业务自有适配逻辑)。Netty 事件传播规则要求每个 handler 主动 fire* 才向下;如果 IdleStateHandler 装在 NettyClientHandler 之后,永远收不到 channelActivetimer 永远不启动 → READ_IDLE 永远不触发。必须用 addBefore("handler", ...) 把 idle handler 装在业务 handler 之前。

触发链

new IdleStateHandler(idleTimeoutMills, 0L, 0L, MILLISECONDS)  // READ_IDLE
↓ 触发
IdleCloseHandler.userEventTriggered:
   1) invalidateChannel(wrapper)   // 先让 slot 失效
   2) ctx.close()                  // 再异步关 channel

为什么是 READ_IDLE:半死链时应用一直能写入 send buffer,写永远"成功";ALL_IDLE / WRITE_IDLE 在"持续请求 + 静默丢包"场景永远不触发;只有"读不到回复"才是半死链可观测信号

IdleCloseHandler 关闭日志增强主被调信息

logger.info("Long connection idle close: state={}, caller(local)={}, callee={}, remote={}, channelId={}",
        state, local, callee, remote, channelId);

Layer 4 — Linux 内核层 TCP keepalive

boolean useEpoll = Epoll.isAvailable() && config.useEpoll();
if (useEpoll) {
    bootstrap.option(EpollChannelOption.TCP_KEEPIDLE,  30);
    bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, 10);
    bootstrap.option(EpollChannelOption.TCP_KEEPCNT,   3);
}

两层叠加:Linux 60s 内核回收 + 全平台 180s 应用兜底。

服务端安全性

  • 正常 close / 进程退出:四次挥手 → channelInactive → 清理
  • 物理断网:依赖 OS TCP keepalive(默认 ~2h)兜底,每条 ~KB 级开销可接受
  • 主调侧依赖 channelInactive 配合 Layer 1 的 150s 定时器观察兜底

3.5 Layer 5 资源层:NettyAbstractClientTransport —— NIO/Epoll 双 slot 引用计数共享池

旧:单 slot SHARE_EVENT_LOOP_GROUP (NIO)
新:双 slot
    ├── SHARE_NIO_GROUP   + SHARE_NIO_USED_NUMS   + NIO_LOCK
    └── SHARE_EPOLL_GROUP + SHARE_EPOLL_USED_NUMS + EPOLL_LOCK
public NettyAbstractClientTransport(...) {
    super(...);
    if (Boolean.TRUE.equals(config.isIoThreadGroupShare())) {
        this.sharedKind = wantsEpoll(config) ? SharedGroupKind.EPOLL : SharedGroupKind.NIO;
        acquireSharedGroup(this.sharedKind, ...);
        this.shareGroupAcquired = true;
    }
}

@Override protected void doClose() {
    ...
    if (shareGroupAcquired) {
        shareGroupAcquired = false;
        releaseSharedGroup(sharedKind);   // 引用计数 -1,归零才 shutdownGracefully
    }
}
  • 保留 SHARE_EVENT_LOOP_GROUP 旧字段做 NIO 别名向后兼容
  • UDP 取舍NettyUdpClientTransport 仅复用 Layer 5 共享池,不参与 Layer 2/3/4(UDP 无连接、无 read idle、无 TCP keepalive),javadoc 已显式说明

3.6 Layer 6 配置层

字段 默认值 作用层
idleTimeout 180000 ms Layer 3 应用兜底
tcpKeepAliveIdle 30 s Layer 4 内核层
tcpKeepAliveIntvl 10 s Layer 4 内核层
tcpKeepAliveCnt 3 Layer 4 内核层
ioThreadGroupShare true Layer 5 共享池

Spring Boot 用户通过 @ConfigProperty + Jackson SNAKE_CASE + BinderUtils 自动绑定 yaml 字段(如 tcp_keep_alive_idle)。


3.7 HTTP 协议长链接(重大重构)

HTTP 侧底层是 Apache HttpClient,HTTP/1 走 4.x 同步、HTTP/2(c)/HTTPS 走 5.x async。我们要解决三大类问题:

  1. 池层:连接池上限太小、空闲连接 NoHttpResponseException、NAT 静默回收
  2. 实例层HttpRpcClient 实例本身要能被回收(避免堆积),且要能感知"持续失败"
  3. 生命周期:H2/HTTPS 的 doOpen 异常静默吞 → "假活" client 被业务踩 NPE

修复 #1:H2/HTTPS 连接池配置(之前完全裸建)

// Http2cRpcClient.doOpen
PoolingAsyncClientConnectionManager cm = PoolingAsyncClientConnectionManagerBuilder.create()
    .setMaxConnTotal(maxConns)                                    // ← 之前默认 25,高并发瞬间排队
    .setMaxConnPerRoute(maxConns)                                 // ← 之前默认 5
    .setConnPoolPolicy(PoolReusePolicy.LIFO)
    .setValidateAfterInactivity(TimeValue.ofMilliseconds(2000))   // 2s idle 重新校验
    .setConnectionTimeToLive(TimeValue.ofMinutes(10))             // 10min 强制重建(K8s pod drift)
    .build();

httpAsyncClient = HttpAsyncClients.custom()
    .setConnectionManager(cm)
    .setIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())
    .evictExpiredConnections()
    .evictIdleConnections(TimeValue.ofSeconds(60))
    .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
    .build();

Http2RpcClient (TLS / HTTP/2) 用同样的池配置。HttpsRpcClient 继承 Http2RpcClient 只切版本策略。

修复 #2:HTTP/1 KeepAliveStrategy + connectionTimeToLive

// HttpRpcClient.doOpen
.setKeepAliveStrategy(HttpRpcClient::resolveKeepAliveDuration)   // 提取为 public static 便于测试
.setConnectionTimeToLive(10, TimeUnit.MINUTES)

// 提取出的 resolver
public static long resolveKeepAliveDuration(HttpResponse rsp, HttpContext ctx) {
    long fallbackMs = TimeUnit.MINUTES.toMillis(5);    // 短于典型 NAT (5–15min)
    Header h = rsp.getFirstHeader("Keep-Alive");
    if (h != null) {
        for (HeaderElement el : h.getElements()) {
            if ("timeout".equalsIgnoreCase(el.getName()) && el.getValue() != null) {
                try {
                    long server = Long.parseLong(el.getValue()) * 1000L;
                    return Math.min(server, fallbackMs);
                } catch (NumberFormatException ignore) {
                    // fall through
                }
            }
        }
    }
    return fallbackMs;
}

设计要点:

  • 服务端不返 Keep-Alive header → 强制 5min 上限(不再"无穷大"),避免 NAT 静默回收后下次请求踩到死连接
  • connectionTimeToLive=10min 是硬上限,热连接也强制重建——这是回收 K8s pod IP 漂移的关键

修复 #3:实例层失败计数(与 trpc 协议侧 channelInactive→cluster 摘缓存对齐)

private final AtomicInteger consecutiveFailures = new AtomicInteger(0);
private volatile long lastUsedNanos = System.nanoTime();

public void markUsed()    { lastUsedNanos = System.nanoTime(); }
public void markSuccess() { consecutiveFailures.set(0); }
public void markFailure() { consecutiveFailures.incrementAndGet(); }

@Override
public boolean isAvailable() {
    if (!super.isAvailable()) return false;
    if (consecutiveFailures.get() >= 50) return false;            // 持续失败 → 摘缓存
    return (System.nanoTime() - lastUsedNanos) <= MINUTES.toNanos(10);   // 10min idle 视为 orphan
}

invoker 入口 markUsed,成功 markSuccess,异常路径 markFailure:

public Response send(Request request) throws Exception {
    httpRpcClient.markUsed();   // 入口
    HttpPost httpPost;
    try { httpPost = buildRequest(request); }
    catch (Exception ex) { httpRpcClient.markFailure(); return RpcUtils.newResponse(request, null, ex); }

    try (CloseableHttpResponse rsp = httpClient.execute(httpPost)) {
        Response response = handleResponse(request, rsp);
        if (response.getException() == null) httpRpcClient.markSuccess();
        else                                  httpRpcClient.markFailure();
        return response;
    } catch (Exception ex) {
        httpRpcClient.markFailure();
        return RpcUtils.newResponse(request, null, ex);
    }
}

对齐效果:HTTP 侧也具备和 trpc 协议同样的"持续失败 → cluster 摘缓存 → lazy 重建"链路。原来只能靠"冷却 10min"被动回收,现在 50 次连续失败立即标 unavailable。

修复 #4Http2cRpcClient / Http2RpcClient.doOpen 异常上抛

之前:

} catch (Exception e) {
    logger.error("httpAsyncClient error: ", e);    // ← 仅打日志,open() 假装成功
}

修复后:

} catch (Exception e) {
    String desc = protocolConfig != null ? protocolConfig.toSimpleString() : "<null>";
    throw TRpcException.newFrameException(ErrorCode.TRPC_CLIENT_CONNECT_ERR,
            "open http2c client (" + desc + ") failed", e);
}

让 lifecycle 真正进入 FAILED → isAvailable() 返 false → cluster 摘缓存。null-safe 是为了让测试可以反射置空 protocolConfig 复现失败路径。

修复 #5:杂项

  • logger 类名修正Http2cRpcClient / Http2RpcClient 之前都是 LoggerFactory.getLogger(HttpRpcClient.class)(拷错),改为各自类
  • 删除 shaded 依赖Http2ConsumerInvoker 删掉 autovalue.shaded.com.google.common.common.base.Objects,改 java.util.Objects.equals
  • Http2RpcClient.evictIdleSeconds() 这种 dead code 一并清理

3.8 HTTP 高并发断连场景的并发安全分析

主调或被调断开连接 + 高并发并发请求时是否安全?逐项分析:

场景 race 点 结论
N 业务线程并发 httpClient.execute HTTP/1 走 PoolingHttpClientConnectionManager 全局锁、HTTP/2 走单连接多路复用 selector HttpClient 内部 lock,无 race
markUsed / markFailureisAvailable 并发读 volatile long + AtomicInteger 内存可见性保证,无 race
临界点 consecutiveFailures 49→50 多线程瞬时都看到 50+ → 都判 unavailable 一次 markSuccess 归零即恢复,最终一致
业务线程并发 close client(shutdownBackendConfig A 线程 in-flight execute、B 线程 close() → A 拿到 IllegalStateException("I/O reactor has been shut down") A 走 catch → markFailure + 包到 Response.exceptionAtomicInteger 永远可写,业务侧拿到失败 Response 自然处理,无数据破坏
服务端瞬间 RST 大量连接(kill -9) N 个 in-flight 各自 IOException 各自 markFailure → 50+ 失败 → cluster 摘缓存 → lazy 重建
open 失败假活(旧 bug) 之前 httpAsyncClient = nullisAvailable=true,业务 NPE 已修:doOpen 抛 TRpcException → lifecycle FAILED

结论:所有高并发断连场景下没有数据竞争或破坏,只会出现"业务请求失败"的预期表现,并被失败计数 + 池 evict + cluster CAS 摘缓存的链路自动恢复。


四、对比 master 的核心优势(一表速览)

维度 master 本分支
长连接是否真正长存 ✗ 双向 IdleStateHandler 主动断 ✓ 显式 close 才断
半死链恢复时间 7200s(OS 默认 keepalive) Linux+epoll ~60s / 全平台 180s
Cluster scanner 行为 拆 transport 拆 EventLoopGroup 仅观察 + 一次性告警,不主动 close、不主动重连、不发心跳
缓存与 transport 一致性 无条件 remove(key) closeFuture + CAS remove(key, value)
channels 内存可见性 ArrayList CopyOnWriteArrayList
雷鸣群防护 锁内 double-check + isConnecting 让路
关闭瞬间窗口 请求可能写到 dying channel invalidateChannel 先置空 slot 再 close
Netty pipeline 顺序 idle handler 加在末尾,本项目场景永不触发 addBefore("handler", ...) 强制装在业务 handler 之前
共享池 vs epoll 互斥,二选一 双 slot 引用计数,两者兼得
可调参数 idleTimeout(语义混乱) 4 个参数全 yaml 可配 + Spring schema
HTTP/2 池配置 完全裸建(25/5) maxConns / validateAfterInactivity / TTL / SO_KEEPALIVE 全配
HTTP keep-alive ceiling 无(依赖 server header) 5min fallback 上限(破 NAT 静默回收)
HTTP 持续失败感知 consecutiveFailures 50 次翻 unavailable + markSuccess 重置
HTTP/2 doOpen 失败 静默吞 → "假活"client 抛 TRpcException → lifecycle FAILED
HTTP logger 路由 三类全用 HttpRpcClient.class(错位) 各自类,错位修正

五、完整防泄漏链路

场景 A:被调(服务端)下线/IP 漂移
   ↓
(A1)正常 FIN/RST:主调侧 Netty 立即 channelInactive → 清理 NettyChannel
(A2)半死链(Linux+epoll):内核 30s + 10s×3 ≈ 60s RST → channelInactive
(A3)半死链(其他平台):READ_IDLE 180s 触发 → invalidateChannel + ctx.close → channelInactive
(A4)HTTP/1:池层 evictIdle(60s) + connectionTTL(10min) + KeepAlive(5min ceiling) 三重剔除
(A5)HTTP/2:连续 50 次 markFailure → isAvailable=false → cluster 摘缓存
   ↓
后续 RpcClient.isAvailable() 返回 false
   ↓
RpcClusterClientManager 的 30s 定时器观察到 unavailable,failureCount++
   ↓
连续 5 次累计 ≈ 150s(trpc 协议下 Layer 3/4 已主动 close 走 closeFuture,更快)
   ↓
proxy.closeFuture() 完成 → CLUSTER_MAP.remove(k, proxy) CAS 摘缓存
   ↓
DefClusterInvoker.invokerCache 同步 CAS remove
   ↓
内存被 GC

场景 B:主调(客户端)异常下线
   ↓
进程退出 → OS 发 FIN 给被调
   ↓
被调 Netty 收到 channelInactive → 清理服务端 channel
   ↓
(拔网线场景)依赖 OS keepalive ~2h 兜底(服务端未配 epoll keepalive,是已知取舍)

六、单测覆盖

测试类 用途 关键技术
RpcClusterClientManagerTest(增强) 主流程:getOrCreateClient / close / observeHealth / Proxy delegate 纯 JUnit + Stub
DefClusterInvokerCloseFutureTest(新建) CAS remove 防误删语义 纯 JUnit
RpcClusterLoggerLevelTest(新建) 覆盖 3 个 logger.isDebugEnabled()==false 分支 log4j2 LoggerContext.updateLoggers() 临时调级别
RpcClusterSchedulerRejectTest(新建) 覆盖 scheduler 抛 RejectedExecutionException 的 catch 分支 反射替换 WorkerPoolManager.shareScheduler(避开 PowerMock)
AbstractClientTransportTest(增强,+9 case) 雷鸣群 double-check + invalidateChannel 原子切换 纯 JUnit + Stub
NettyAbstractClientTransportTest(新建,6 case) NIO/Epoll 双 slot 引用计数语义 纯 JUnit
NettyTcpClientTransportTest(新建/增强,6 case) doOpen 装 idle pipeline 顺序、TCP_KEEPIDLE 透传 离线 EmbeddedChannel + 反射调用 ChannelInitializer
NettyTcpClientIdleCloseTest(新建) READ_IDLE 触发 invalidateChannel + close 端到端 真实 Netty server/client
MultiPortNamingUrlConcurrentTest#testIdleTimeoutChannelRecycle(新建) e2e 长链接 idle 回收:100 线程 × 1000 请求 × 5 connsPerAddr 验证 50 条 channel 全部 fire/close 多端口 server + 多线程并发
BaseProtocolConfigTest / AbstractProtocolSchemaTest(增强) tcp_keep_alive 三参数解析与默认值 纯 JUnit
BindTest2 + application-bind-test2.yml(增强) Spring Boot yaml 自动绑定端到端 Spring Test
HttpRpcClientLongLinkTest增强到 22 case markUsed / isAvailable / markSuccess / markFailure 全链路 + KeepAliveStrategy 5 个分支 + doClose IOException + Http2c/Http2 doOpen 失败上抛 + 32 线程 × 1000 次 AtomicInteger 压测 Mockito mock + 反射调 doOpen
HttpMultiPortNamingUrlConcurrentTest(新建) HTTP 100 线程 × 1000 请求 × 10 endpoint 多端口并发 + 池层长连接复用 纯 JUnit

HTTP 长链接相关类覆盖率(指令级)

修复前 最终 备注
HttpRpcClient 68.8% 100% KeepAliveStrategy 提取后所有分支可测
Http2cRpcClient 76.5% 97.7% 仅余 4 条 branch 细节
Http2RpcClient 70.5% 100% 删除 dead code + 反射调 doOpen 测异常上抛
HttpsRpcClient 100% 100%
HttpConsumerInvoker 96.2% 96.2%
Http2ConsumerInvoker 96.2% 96.2% FutureCallback inner 58.8%(原代码就有)
HttpRpcClientFactory 100% 100%
AbstractConsumerInvoker 91.4% 91.4%

所有本次改动的类 ≥ 90%,其中 4 个 100%。

RpcClusterClientManager 行覆盖 93%

关键单测设计取舍

  1. PowerMock 在 JDK17 下崩溃MagicAccessorImpl 限制)→ 项目锁定 JDK8,PowerMock 大部分场景可用;但 RpcClusterSchedulerRejectTest 仍改用反射替换静态字段,避免 PowerMock 全 JVM 副作用
  2. PowerMock 的 JVM 级污染问题NettyTcpClientTransportTest 踩坑):早期实现起真实 TCP listen + connect,全量测试时被前面用了 PowerMock 的失败 case 污染 EventLoop / classloader 状态导致挂连接 race。最终改为 离线 EmbeddedChannel + 反射调用 ChannelInitializer.initChannel(Channel),完全脱离真实 EventLoop
  3. logger 分支覆盖Whitebox.setInternalState 在 PowerMock 1.7.4 + JDK8 下也修改不了 static-final → 改用 log4j2 标准 API 调级别
  4. HTTP doOpen 失败路径:直接 client.open() 会被 lifecycle 的 Objects.requireNonNull(protocolConfig) 拦在前面拿不到 catch 分支 → 反射调 doOpen() 绕过 lifecycle,专门覆盖 catch
  5. HTTP KeepAliveStrategy 不可测:原 lambda 私有 → 提取为 public static long resolveKeepAliveDuration,5 个分支全覆盖(无 header、server 小于上限、server 大于上限钳位、timeout=abc NumberFormatException、max=100 无 timeout key)
  6. e2e 长连接 idle 回收测试:用 idleTimeout=10s、跑 100 线程 × 1000 请求 × 5 connsPerAddr 共 50 条 channel;assert 50 条全部 fire IdleCloseHandler + 全部 close + 后续请求自动 lazy reconnect 成功

七、改动文件清单

Layer 1 Cluster

  • RpcClusterClientManager.java — scanner 改纯观察 + closeFuture CAS remove + 命名按方案 A 重命名
  • DefClusterInvoker.java — invokerCache CAS remove 修复

Layer 2 Transport

  • AbstractClientTransport.javachannelsCopyOnWriteArrayList + 锁内 double-check + 新增 invalidateChannel
  • ClientTransport.java — 接口加 default invalidateChannel

Layer 3+4 Netty

  • NettyTcpClientTransport.java — 移除旧 WRITE_IDLE,addBefore("handler", "idleState/idleClose", ...) 装 READ_IDLE + IdleCloseHandler;epoll TCP keepalive;IdleCloseHandler 日志增加主被调信息(state、caller(local)、callee、remote、channelId)
  • NettyTcpServerTransport.java — 移除 ALL_IDLE handler
  • NettyClientHandler.java / NettyServerHandler.java — 删除 idle close 死代码

Layer 5 资源层

  • NettyAbstractClientTransport.java — NIO/Epoll 双 slot 引用计数共享池
  • NettyUdpClientTransport.java — 跟随 useEpoll 选 EpollDatagramChannel + 共享池;javadoc 明确 UDP 仅复用 Layer 5

Layer 6 配置

  • Constants.java — 新增 DEFAULT_TCP_KEEPALIVE_IDLE/INTVL/CNTDEFAULT_IDLE_TIMEOUT 改 180000
  • BaseProtocolConfig.java / BackendConfig.java — 三个 keepalive 字段透传
  • AbstractProtocolSchema.java — Spring schema 同步加字段

HTTP 长链接(重大重构)

  • HttpRpcClient.java
    • evictExpired/evictIdle + validateAfterInactivity + connectionTimeToLive(10min) + KeepAliveStrategy 5min ceiling
    • lastUsedNanos + consecutiveFailures(50) + markUsed/markSuccess/markFailure + 重写 isAvailable
    • resolveKeepAliveDuration 提取为 public static 便于测试
  • Http2cRpcClient.java
    • 完整池配置(之前裸建):maxConnTotal/maxConnPerRoute/validateAfterInactivity/TTL/PoolReusePolicy/SO_KEEPALIVE/FORCE_HTTP_2
    • evictExpired + evictIdle(60s)
    • lastUsedNanos + consecutiveFailures 同 HttpRpcClient
    • doOpen 异常上抛 TRpcException(不再静默吞)
    • logger 类名修正为 Http2cRpcClient.class
  • Http2RpcClient.java
    • 完整池配置 + evictExpired/evictIdle/SO_KEEPALIVE
    • doOpen 异常上抛 TRpcException
    • logger 类名修正
    • 删 dead code(evictIdleSeconds() / 多余的 super.doClose() 注释)
  • HttpsRpcClient.java — 继承 Http2RpcClient,行为自动跟随
  • HttpConsumerInvoker.java
    • markUsed 移到入口(与失败路径对齐)
    • 成功 markSuccess / 异常 markFailure
  • Http2ConsumerInvoker.java
    • markUsed 入口
    • 成功 / 失败分支 markSuccess/markFailure
    • 删除 autovalue.shaded.com.google.common.common.base.Objects,改 java.util.Objects

单测

  • RpcClusterClientManagerTest(增强)
  • DefClusterInvokerCloseFutureTest(新建)
  • RpcClusterLoggerLevelTest(新建)
  • RpcClusterSchedulerRejectTest(新建,反射替换静态字段)
  • AbstractClientTransportTest(增强,+9 case)
  • NettyAbstractClientTransportTest(新建,6 case)
  • NettyTcpClientTransportTest(新建/增强,6 case,离线 EmbeddedChannel)
  • NettyTcpClientIdleCloseTest(新建)
  • MultiPortNamingUrlConcurrentTest(新建 + testIdleTimeoutChannelRecycle
  • HttpMultiPortNamingUrlConcurrentTest(新建 + @Before AbstractConsumerInvoker.reset() 解决 HashedWheelTimer 跨测试污染)
  • BaseProtocolConfigTest / AbstractProtocolSchemaTest(增强)
  • BindTest2 + application-bind-test2.yml(增强)
  • HttpRpcClientLongLinkTest(增强到 22 case:失败计数 + 32×1000 并发原子计数压测 + KeepAliveStrategy 5 分支 + doClose IOException + doOpen 失败上抛 + Http2/Https 继承)

八、生产推荐配置

trpc:
  client:
    io_mode: epoll              # 关键:开 epoll 才能享受 Layer 4 的 60s 内核回收
    # 以下保持默认即可
    # io_thread_group_share: true
    # idle_timeout: 180000
    # tcp_keep_alive_idle: 30
    # tcp_keep_alive_intvl: 10
    # tcp_keep_alive_cnt: 3

收益:内核 ≈60s 半死链恢复 + 应用 180s 兜底 + 切换原子化 + 共享线程池零额外开销 + 不再雷鸣群 + HTTP 实例不堆积 + HTTP/2 连接池真正可用 + HTTP 持续失败可被识别。


九、本次方案与代码同步的关键修订点(相对早期版本)

  1. scanner 命名reconnect/checkAndReconnect/MAX_RECONNECT_FAILUREShealth observer / observeHealth / STUCK_UNAVAILABLE_THRESHOLD,避免名字暗示"会主动重连"
  2. Netty pipeline 顺序生产 bug:必须 addBefore("handler", ...),否则 IdleStateHandler 永不启动 timer(项目里 NettyClientHandler.channelActive 不调 super)
  3. IdleCloseHandler 日志可观测性:补全 state/caller/callee/remote/channelId
  4. e2e 长连接 idle 回收测试:100 线程 × 1000 请求 × 5 connsPerAddr,覆盖 50 条 channel 的全部生命周期
  5. NettyTcpClientTransportTest 离线化:用 EmbeddedChannel + 反射调用 ChannelInitializer,避免被 PowerMock 失败用例的 JVM 状态污染
  6. RpcClusterSchedulerRejectTest 去 PowerMock:改反射替换 WorkerPoolManager.shareScheduler
  7. HTTP/2 池配置补齐Http2cRpcClient / Http2RpcClient 完整配 maxConns / validateAfterInactivity / connectionTimeToLive / evictIdle / SO_KEEPALIVE / VersionPolicy,从 0 到 1
  8. HTTP keep-alive ceilingHttpRpcClientKeepAliveStrategy 5min fallback + connectionTimeToLive(10min),破 NAT 静默回收
  9. HTTP 实例层失败计数consecutiveFailures AtomicInteger + markSuccess/markFailure,与 trpc 协议侧 cluster 摘缓存机制对齐
  10. HTTP/2 doOpen 不再吞异常Http2cRpcClient.doOpen / Http2RpcClient.doOpen 抛 TRpcException 让 lifecycle 进 FAILED;catch 块用 null-safe 表达式(protocolConfig != null ? toSimpleString : "<null>")让测试可反射置空触发
  11. HTTP logger 路由修正Http2cRpcClient / Http2RpcClient 不再用 HttpRpcClient.class,避免日志通道串
  12. HTTP shaded 依赖清除Http2ConsumerInvoker 移除 autovalue.shaded.Objects
  13. HTTP KeepAliveStrategy 提取为 public staticHttpRpcClient.resolveKeepAliveDuration 让 5 个分支可被纯 Mockito 单测覆盖
  14. HTTP 测试隔离HttpMultiPortNamingUrlConcurrentTest@Before AbstractConsumerInvoker.reset() 解决 HashedWheelTimer cannot be started once stopped 跨测试污染

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant