上一篇我们深入拆解了 WebSocket 服务端的
start()引导过程——从 SSL 证书加载到 Pipeline 七层链编排,再到连接健康巡检和优雅关闭。本篇将镜头转向客户端,走进WebsocketClient和DataExchanger的内部世界:CAS 无锁并发保护、CountDownLatch 连接同步、指数退避重连策略、重复连接识别、DCL 双重检查锁初始化以及 SPI 消息处理器注册——一场关于"如何在不确定的网络中建立可靠连接"的技术之旅。
一、回忆:为什么客户端不用 Netty?
第三篇博客中我们提到过一个设计决策——服务端用 Netty,客户端用 Tyrus(JSR-356)。这不是随性之举,而是一个深思熟虑的权衡。
服务端需要同时管理成百上千个客户端的长连接,对并发能力和内存效率有极高的要求——Netty 的 Reactor 模型和零拷贝正是为此而生。但客户端是一个 SDK,要嵌入到用户的 Java 应用中运行。如果客户端也引入 Netty,会带来两个问题:
- 依赖冲突:用户的项目可能已经使用了不同版本的 Netty。两个 Netty 版本共存,轻则方法签名不兼容(
NoSuchMethodError),重则类加载器冲突导致应用无法启动。 - 依赖膨胀:Netty 的完整依赖包括
netty-codec-http、netty-handler、netty-transport等一系列模块,对于一个只需要维护一条 WebSocket 连接的客户端来说,这些都是不必要的重量。
Phoenix 选择了 Tyrus——JSR-356(Java WebSocket API)的参考实现。JSR-356 是 Java EE 标准的一部分,API 极其简洁:用四个注解(@ClientEndpoint、@OnOpen、@OnMessage、@OnClose、@OnError)就能完成一个完整的 WebSocket 客户端。依赖轻量,和用户项目的技术栈几乎不会冲突。
这个选择体现了一个重要的工程哲学:SDK 的依赖越少越好。你不能假设用户的运行环境——他们可能用 Spring Boot 3.x,可能用传统的 Tomcat WAR 部署,甚至可能是一个没有任何框架的纯 Java 程序。用标准 API 而非特定框架,是对用户最大的尊重。
二、WebsocketClient 全景图
先从宏观视角看看 WebsocketClient 这个类的"装备清单":
@Slf4j
@ClientEndpoint
public class WebsocketClient {
// ─── 静态共享资源 ───
private static final ScheduledExecutorService GLOBAL_SCHEDULER = ...; // 全局重连调度器
private static final ClientManager SHARED_CLIENT = ClientManager.createClient(); // 全局 Tyrus 引擎
// ─── 构造时确定的不可变配置 ───
private final String serverUri; // 服务端地址
private final int connectTimeoutSeconds; // 连接超时(默认5秒)
private final int reconnectDelaySeconds; // 重连初始间隔(默认5秒)
private final boolean autoReconnect; // 是否自动重连(配置开关)
// ─── 运行时可变状态 ───
private volatile Session session; // 当前会话
private volatile boolean connected = false; // 连接状态
private volatile CountDownLatch connectLatch; // 连接同步门闩
private volatile Consumer<String> messageHandler; // 消息处理器
// ─── 并发控制 ───
private final AtomicBoolean connectionPending = new AtomicBoolean(false); // CAS 连接保护
private volatile boolean enableReconnect = true; // 运行时重连开关
}
十几个字段,从上到下可以分为四组:静态共享资源、不可变配置、运行时状态和并发控制。这种分组本身就是一种设计信号——静态的共享、final 的不变、volatile 的可变、AtomicBoolean 的并发保护——每个字段的修饰符都在告诉你它的生命周期和线程安全语义。
接下来,我们逐个拆解这些字段背后的设计考量。
三、静态共享资源:一次创建,全局复用
3.1 全局重连调度器:GLOBAL_SCHEDULER
private static final ScheduledExecutorService GLOBAL_SCHEDULER =
ThreadPoolAcquirer.getWebsocketClientReconnectScheduledThreadPoolExecutor();
重连任务需要延迟执行——"5 秒后重连""30 秒后重连""5 分钟后重连"。这就需要一个 ScheduledExecutorService。
Phoenix 没有让每个 WebsocketClient 实例各自创建调度器,而是通过 ThreadPoolAcquirer 获取一个全局共享的调度线程池。ThreadPoolAcquirer 内部使用 DCL(双重检查锁)保证线程池只初始化一次:
public static MonitoredScheduledThreadPoolExecutor getWebsocketClientReconnectScheduledThreadPoolExecutor() {
if (websocketClientReconnectScheduledThreadPoolExecutor == null) {
synchronized (ThreadPoolAcquirer.class) {
if (websocketClientReconnectScheduledThreadPoolExecutor == null) {
websocketClientReconnectScheduledThreadPoolExecutor = new MonitoredScheduledThreadPoolExecutor(
(int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
new BasicThreadFactory.Builder()
.namingPattern("phoenix-websocket-client-scheduled-pool-thread-%d")
.daemon(true)
.build(),
new ThreadPoolExecutor.AbortPolicy(),
"phoenix-websocket-client-scheduled-pool", true);
}
}
}
return websocketClientReconnectScheduledThreadPoolExecutor;
}
几个值得注意的细节:
- 线程数计算:
Ncpu / (1 - 0.8)——这是 I/O 密集型任务的经典公式。阻塞系数 0.8 表示线程大部分时间都在等待网络 I/O,所以需要更多线程来保持 CPU 利用率。 - 守护线程:
.daemon(true)确保这些线程不会阻止 JVM 退出。当用户的应用关闭时,重连线程会自动随主线程结束。 - MonitoredScheduledThreadPoolExecutor:这是 Phoenix 自研的"可监控线程池"——线程池本身也被纳入了监控体系,可以在 UI 上看到它的核心线程数、活跃线程、队列深度等指标。"监控系统自身也被监控",有点套娃的味道。
3.2 全局 Tyrus 客户端:SHARED_CLIENT
private static final ClientManager SHARED_CLIENT = ClientManager.createClient();
ClientManager 是 Tyrus 的客户端引擎,内部管理着连接所需的底层资源(线程池、ByteBuffer 池等)。如果每次 connect() 都创建一个新的 ClientManager,这些底层资源会被反复创建和销毁——既浪费内存,又增加 GC 压力。
用一个 static final 的 SHARED_CLIENT,所有 WebsocketClient 实例共享同一个 Tyrus 引擎,底层的线程和缓冲区都能被复用。这和我们在第二篇博客中看到的 HTTP 连接池是同样的理念——连接资源是昂贵的,能复用就复用。
四、构造器:Fail-Fast 的参数校验
WebsocketClient 提供了两个构造器——一个默认参数的简化版和一个全参数的完整版:
// 简化版:默认5秒超时、5秒重连间隔、开启自动重连
public WebsocketClient(String serverUri) {
this(serverUri, 5, 5, true);
}
// 全参数版
public WebsocketClient(String serverUri, int connectTimeoutSeconds,
int reconnectDelaySeconds, boolean autoReconnect) {
// 1. 地址严格校验
Objects.requireNonNull(serverUri, "serverUri不能为空!");
String trimmed = serverUri.trim();
try {
URI uri = new URI(trimmed);
String scheme = uri.getScheme();
if (!"ws".equals(scheme) && !"wss".equals(scheme)) {
throw new IllegalArgumentException("WebSocket协议必须为ws或wss!");
}
if (uri.getHost() == null || uri.getHost().isEmpty()) {
throw new IllegalArgumentException("WebSocket地址缺少Host!");
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("WebSocket地址格式错误!", e);
}
this.serverUri = trimmed;
// 2. 参数校验并赋值
this.connectTimeoutSeconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 5;
this.reconnectDelaySeconds = reconnectDelaySeconds > 0 ? reconnectDelaySeconds : 5;
this.autoReconnect = autoReconnect;
}
构造器做了严格的 Fail-Fast 校验:
- 非空检查:
Objects.requireNonNull()是 JDK 提供的标准方式,如果传入 null 立即抛出NullPointerException,不留隐患。 - 协议校验:只接受
ws://和wss://两种 scheme。如果传了http://,在构造阶段就报错,而不是在连接阶段才发现协议不对。 - Host 校验:没有 Host 的 URI(如
ws:///path)在构造时就被拦下。 - 参数兜底:超时和重连间隔如果传了负数或零,默认回退到 5 秒——防御性编程,不信任调用方。
这种"在最早的时机暴露错误"的理念,是所有健壮代码的共同特征。比起在 connect() 时才发现 URI 格式不对,在构造时就报错的调用栈更短、更容易定位问题。
五、connect():一场精心编排的连接仪式
connect() 方法是 WebsocketClient 的核心。看起来只有 40 多行代码,但每一行都有它存在的理由。
5.1 快速返回:已连接就别折腾了
public void connect() throws IOException, DeploymentException, InterruptedException {
// 已连接则快速返回
if (this.isConnected()) {
if (log.isDebugEnabled()) {
log.debug("已连接到WebSocket服务端[URI:{}],跳过重复连接!", this.serverUri);
}
return;
}
第一步是短路检查——如果已经连上了,直接返回。isConnected() 方法做了三重判断:
public boolean isConnected() {
Session sess = this.session; // 快照引用,避免并发问题
return this.connected && sess != null && sess.isOpen();
}
注意这里先把 this.session 读到一个局部变量 sess 中,再做后续判断。为什么不直接用 this.session?因为 session 是 volatile 的,在多线程环境下可能在两次读取之间被另一个线程置为 null——先读成非 null 通过了判空,再读时已经是 null 了,session.isOpen() 就会抛 NullPointerException。读到局部变量后,后续操作都基于同一个快照值,就不会有这个问题。
这是一种经典的 volatile 快照读 模式,在并发编程中非常实用。
5.2 CAS 并发保护:只允许一个线程连接
// 防止并发连接:仅允许一个线程进入连接流程
if (!this.connectionPending.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("已有连接请求正在处理中,跳过重复连接[URI:{}]!", this.serverUri);
}
return;
}
想象一下:客户端启动时,心跳线程、JVM 采集线程、服务器采集线程几乎同时启动。如果它们都发现"还没连上",都去调用 connect()——三个线程同时发起三个 WebSocket 连接请求,最终服务端收到三个来自同一客户端的连接,这就乱套了。
AtomicBoolean.compareAndSet(false, true) 是一个原子操作——它做了两件事:检查当前值是否为 false,如果是则设为 true 并返回 true;如果当前值已经是 true(说明有人已经在连接了),直接返回 false。整个过程无锁、无阻塞。
为什么不用 synchronized?因为 synchronized 会让后来的线程排队等待——等前一个连接完成后,它们还是会依次发起连接请求。而 CAS 的语义是"发现有人在连了,我就不连了"——直接放弃,不浪费时间排队。对于"防止重复连接"这个场景,CAS 比 synchronized 更合适。
5.3 CountDownLatch:等待 onOpen 的"结业通知"
Session newSession = null;
try {
// 重置门闩:确保每次 connect 使用新的同步点
this.connectLatch = new CountDownLatch(1);
// 发起连接(Tyrus 内部处理)
newSession = SHARED_CLIENT.connectToServer(this, URI.create(this.serverUri));
// 等待 onOpen 确认(或超时)
boolean success = this.connectLatch.await(this.connectTimeoutSeconds, TimeUnit.SECONDS);
if (!success) {
this.safeCloseSession(newSession);
throw new IOException(String.format(
"连接WebSocket服务端[URI:%s]超时(%d秒)!", this.serverUri, this.connectTimeoutSeconds));
}
}
这里有一个容易被忽略的问题:connectToServer() 返回了,不代表 WebSocket 连接真的建立成功了。
connectToServer() 是 Tyrus 提供的连接方法。它在底层会完成 TCP 三次握手和 WebSocket 协议升级(HTTP 101 Switching Protocols),然后返回一个 Session 对象。但在 Tyrus 的实现中,Session 对象返回时,@OnOpen 回调可能还没有执行——毕竟回调是在 Tyrus 的 I/O 线程中异步调用的。
如果 connect() 方法在 connectToServer() 返回后就立即声明"连接成功",后续的 sendMessage() 可能在 @OnOpen 还没执行之前就尝试发送数据——此时 session 字段还是旧值,消息会发到一个过期的会话上。
CountDownLatch 解决了这个问题:
connect()调用connectToServer()后,立即调用connectLatch.await(),阻塞等待。- 当 Tyrus 在 I/O 线程中调用
@OnOpen回调时,回调方法执行connectLatch.countDown(),唤醒connect()。 connect()被唤醒后,可以确认session和connected已经在@OnOpen中被正确设置了。
如果等了 5 秒(connectTimeoutSeconds)@OnOpen 还没来,await() 返回 false——超时了。这时候要做两件事:关闭已经创建但没完成握手的 Session(防止资源泄漏),然后抛出异常。
为什么每次 connect() 都要创建一个新的 CountDownLatch?因为 CountDownLatch 是一次性的——计数到 0 之后就不能重置了。如果复用上一次的 Latch,它的计数已经是 0,await() 会立即返回而不等待。
5.4 异常处理与资源清理
} catch (IOException | DeploymentException | InterruptedException e) {
this.safeCloseSession(newSession);
throw e;
} catch (Exception e) {
this.safeCloseSession(newSession);
throw new IOException(String.format(
"连接WebSocket服务端[URI:%s]失败,原因:%s", this.serverUri, e.getMessage()), e);
} finally {
// 释放连接占位,允许下次连接尝试
this.connectionPending.set(false);
}
异常处理有两层:
- 已知异常(
IOException、DeploymentException、InterruptedException):关闭可能已创建的 Session,然后原封不动地抛出去——让调用方根据异常类型做不同的处理。 - 未知异常:同样关闭 Session,但包装成
IOException再抛出——统一异常接口,简化调用方的 catch 逻辑。
finally 块中的 connectionPending.set(false) 至关重要——无论连接成功还是失败,都要释放 CAS 占位。如果忘了这一步,connectionPending 永远停在 true,后续所有连接请求都会被直接跳过,客户端就"死"了。
safeCloseSession() 方法的实现也很防御性:
private void safeCloseSession(Session session) {
if (session != null && session.isOpen()) {
try {
session.close();
} catch (IOException e) {
log.error("关闭WebSocket连接[URI:{}]时发生异常:{}", this.serverUri, e.getMessage());
}
}
}
先判空、再判状态、最后 try-catch——不会因为 Session 已关闭就抛异常,也不会因为关闭过程出错就影响主流程。这种"安全关闭"的写法在网络编程中非常常见。
六、JSR-356 生命周期回调:四个注解,四段人生
WebsocketClient 用 @ClientEndpoint 标注,Tyrus 会通过反射发现它的四个生命周期方法,并在对应时机自动调用。
6.1 @OnOpen:连接成功的"入学典礼"
@OnOpen
public void onOpen(Session session) {
this.session = session;
this.connected = true;
// 释放等待中的 connect() 调用
CountDownLatch latch = this.connectLatch;
if (latch != null) {
latch.countDown();
}
log.info("成功建立WebSocket连接[URI:{}]!", this.serverUri);
}
三件事:保存 Session、标记已连接、释放 CountDownLatch。
注意 connectLatch 的读取也用了局部快照——先读到 latch 变量,再判空和操作。虽然 connectLatch 是 volatile 的,但在极端并发下,close() 方法也可能访问它。用局部变量可以避免读到一个"半更新"的状态。
6.2 @OnMessage:收到消息的"传话"
@OnMessage
public void onMessage(String message) {
if (log.isDebugEnabled()) {
log.debug("从WebSocket服务端[URI:{}]收到消息:{}", this.serverUri, message);
}
Consumer<String> handler = this.messageHandler;
if (handler != null) {
try {
handler.accept(message);
} catch (Exception e) {
// 隔离异常,防止连接中断
log.error("处理来自WebSocket服务端[URI:{}]的消息时发生异常:{}", this.serverUri, e.getMessage());
}
}
}
客户端不仅发数据,也要接收服务端下发的指令——比如线程池调参命令。messageHandler 是一个 Consumer<String>,通过 setMessageHandler() 在外部设置。
这里有一个关键的设计决策:异常隔离。消息处理器的异常被 try-catch 包裹,不会向上层抛出。为什么?因为 @OnMessage 方法是在 Tyrus 的 I/O 线程中执行的——如果抛出未捕获异常,Tyrus 可能会认为这个连接出了问题,进而触发 @OnError 和 @OnClose,导致整个连接断开。一个消息处理器的 bug 不应该影响连接的存活——所以异常必须在这里被"吃掉"。
6.3 @OnClose:连接断开的"毕业仪式"
@OnClose
public void onClose(CloseReason reason) {
this.connected = false;
this.session = null;
String closeReasonStr = (reason != null) ? reason.getReasonPhrase() : "Unknown (no close handshake)";
String msg = StringUtils.isBlank(closeReasonStr)
? "WebSocket连接已关闭[URI:" + this.serverUri + "]!"
: "WebSocket连接已关闭[URI:" + this.serverUri + "],原因:" + closeReasonStr;
log.warn(msg);
// 重复连接,直接结束,不再重连
if (reason != null && reason.getCloseCode().getCode() == WebSocketCloseReasonEnums.DUPLICATE_CONNECTION.getCode()) {
log.warn("WebSocket服务端检测到已有活跃会话,拒绝重复连接!");
return;
}
// 满足条件则调度重连任务
if (this.enableReconnect && this.autoReconnect) {
log.info("WebSocket连接关闭,将在 {} 秒后尝试重新连接WebSocket服务端[URI:{}]...",
this.reconnectDelaySeconds, this.serverUri);
GLOBAL_SCHEDULER.schedule(() -> this.attemptReconnect(), this.reconnectDelaySeconds, TimeUnit.SECONDS);
}
}
@OnClose 是整个重连机制的触发入口。它做了三件事:
第一,清理状态:把 connected 设为 false,session 设为 null。
第二,重复连接识别:如果关闭原因的状态码是 4000(DUPLICATE_CONNECTION),说明服务端发现了这个客户端已经有另一个活跃连接——当前连接是"多余的"。这种情况下不应该重连,否则会陷入"连上→被踢→重连→又连上→又被踢"的死循环。
回忆一下第三篇博客中服务端 MonitoringFrameHandler 的重复连接处理逻辑:当同一个 instanceId 的客户端建立了新连接,服务端会向旧连接发送 CloseWebSocketFrame(4000, "Duplicate connection")。客户端在 @OnClose 中检测到这个特殊状态码,就知道"我是那个旧连接,我应该安静地退出,不要再折腾了"。
第三,调度重连:如果不是重复连接,并且重连开关是开着的(enableReconnect && autoReconnect),就用 GLOBAL_SCHEDULER 调度一个延迟任务——reconnectDelaySeconds 秒后执行 attemptReconnect()。
为什么不立即重连?因为服务端可能正在重启、网络可能正在波动。立即重连大概率还是失败,白白浪费资源。等几秒钟再试,给服务端和网络一个恢复的窗口。
6.4 @OnError:异常的"急诊室"
@OnError
public void onError(Throwable throwable) {
this.connected = false;
this.session = null;
log.error("WebSocket连接异常[URI:{}],原因:{}", this.serverUri, throwable.getMessage());
// 不在此处调度重连,onClose 会随后被调用并统一处理重连
}
注意最后一行注释:不在此处调度重连。为什么?
在 JSR-356 规范中,@OnError 被调用后,@OnClose 通常会紧接着被调用(因为底层 TCP 连接大概率已经断了)。如果在 @OnError 和 @OnClose 中都调度重连,就会出现两个重连任务同时执行的情况——两个线程同时发起连接,又回到了并发问题。
Phoenix 的策略是:@OnError 只负责清理状态和记录日志,重连统一由 @OnClose 处理。这样重连逻辑只有一个入口,简单可靠。
七、指数退避重连:不要在暴风雨中反复敲门
7.1 首次重连入口
private void attemptReconnect() {
this.attemptReconnect(1);
}
一个简单的转发——把重试次数初始化为 1,交给真正的重连方法。
7.2 核心重连逻辑
private void attemptReconnect(int attempt) {
// 三重防护:运行时开关 + 配置开关 + 当前状态
if (!this.enableReconnect || !this.autoReconnect || this.isConnected()) {
return;
}
try {
log.info("正在尝试重新连接WebSocket服务端[URI:{}],第{}次...", this.serverUri, attempt);
// 重用主连接逻辑
this.connect();
} catch (Exception e) {
log.error("重连WebSocket服务端[URI:{}] 第{}次 失败,原因:{}", this.serverUri, attempt, e.getMessage());
// 必须再次检查,避免禁用后仍无限重试
if (this.enableReconnect && this.autoReconnect) {
// 退避,上限5分钟
long delay = Math.min(300, this.reconnectDelaySeconds * (long) Math.pow(1.5, attempt - 1));
GLOBAL_SCHEDULER.schedule(() -> this.attemptReconnect(attempt + 1), delay, TimeUnit.SECONDS);
}
}
}
这段代码有几个精妙之处:
三重防护:进入方法的第一件事就是检查三个条件——运行时开关(enableReconnect)、配置开关(autoReconnect)、当前连接状态(isConnected())。只要任意一个不满足,就直接返回。这是一种"入口防御"——在做任何实际工作之前先确认前置条件。
重用 connect():重连时直接调用 connect() 方法,不重复实现连接逻辑。connect() 内部已经有 CAS 并发保护和 CountDownLatch 同步机制——这些防护在重连时同样有效。
指数退避算法:重连间隔的计算公式是:
delay = min(300, reconnectDelaySeconds × 1.5^(attempt-1))
以默认配置(reconnectDelaySeconds = 5)为例:
| 重试次数 | 计算公式 | 延迟(秒) | 实际延迟 |
|---|---|---|---|
| 第1次 | 5 × 1.5^0 | 5 | 5秒 |
| 第2次 | 5 × 1.5^1 | 7.5 | 7秒 |
| 第3次 | 5 × 1.5^2 | 11.25 | 11秒 |
| 第4次 | 5 × 1.5^3 | 16.87 | 16秒 |
| 第5次 | 5 × 1.5^4 | 25.31 | 25秒 |
| ... | ... | ... | ... |
| 第N次 | 5 × 1.5^(N-1) | ≥300 | 300秒(上限) |
为什么要指数退避?想象一下服务端突然宕机,同时有 100 个客户端在重连。如果每个客户端都固定 5 秒重连一次,服务端重启后的第 5 秒会同时收到 100 个连接请求——这可能直接把刚重启的服务端再次压垮。指数退避让重连间隔越来越长,100 个客户端的重连请求会被分散到不同的时间点,避免"重连风暴"。
为什么底数是 1.5 而不是 2?因为 2 的指数增长太快——第 10 次重试就要等 2560 秒(42 分钟),对于监控场景来说太久了。1.5 的增长更温和,到第 20 次左右才达到 5 分钟上限。
为什么上限是 5 分钟?因为恰好和服务端的 IdleStateHandler 读空闲超时(300 秒)一致——如果 5 分钟都没连上,说明问题不是暂时的,但也不能放弃,所以以 5 分钟为周期持续重试。
重连前的二次检查:注意在 catch 块中调度下一次重连之前,又检查了一次 enableReconnect && autoReconnect。为什么?因为在重连过程中,close() 方法可能被调用(比如应用正在关闭),enableReconnect 被设为 false。如果不检查就调度,会导致关闭后还在重连——违反了 close() 的语义。
7.3 connectWithRetry():启动时的首次连接
public void connectWithRetry() {
try {
this.connect();
} catch (Exception e) {
if (this.enableReconnect && this.autoReconnect) {
log.info("WebSocket连接失败,将在 {} 秒后尝试重新连接WebSocket服务端[URI:{}]...",
this.reconnectDelaySeconds, this.serverUri);
GLOBAL_SCHEDULER.schedule(() -> this.attemptReconnect(), this.reconnectDelaySeconds, TimeUnit.SECONDS);
}
}
}
connectWithRetry() 是对 connect() 的一层薄封装——首次连接失败时,不直接抛异常,而是启动重连流程。这个方法在 DataExchanger.run() 中被调用,用于客户端启动时的首次连接。它的语义是"尽力连,连不上就后台慢慢重试"——不会阻塞调用方。
八、sendMessage():发送消息的防御性编程
public void sendMessage(String message) {
Session sess = this.session;
if (sess == null || !this.connected || !sess.isOpen()) {
if (log.isDebugEnabled()) {
log.debug("未连接到WebSocket服务端[URI:{}],消息已丢弃!", this.serverUri);
}
return;
}
try {
sess.getBasicRemote().sendText(message);
if (log.isDebugEnabled()) {
log.debug("已向WebSocket服务端[URI:{}]发送消息:{}", this.serverUri, message);
}
} catch (IOException e) {
log.error("向WebSocket服务端[URI:{}]发送消息失败:{}", this.serverUri, e.getMessage());
this.connected = false;
this.safeCloseSession(sess);
}
}
又一次看到了 volatile 快照读——Session sess = this.session。在多线程环境下,this.session 可能在判空和 sendText() 之间被 @OnClose 置为 null。用局部变量锁住引用,就不会有这个问题。
发送失败时的处理也很讲究:不仅标记 connected = false,还主动关闭 Session。为什么要主动关闭?因为 IOException 通常意味着底层 TCP 连接已经出了问题。主动关闭可以触发 @OnClose 回调,进而触发重连流程——比被动等待 TCP 超时(可能几分钟后才感知到断开)快得多。
另外注意:sess.getBasicRemote().sendText(message) 使用的是 BasicRemote(同步发送),而不是 AsyncRemote(异步发送)。这意味着 sendText() 会阻塞直到消息完全写入底层缓冲区。对于监控数据来说,同步发送更安全——可以及时发现发送失败并处理。
九、close():温柔而坚定的告别
public void close() {
// 永久禁用重连(优先级高于 autoReconnect)
this.enableReconnect = false;
// 防止 connect() 永久阻塞
CountDownLatch latch = this.connectLatch;
if (latch != null) {
latch.countDown();
}
// 安全关闭会话
log.info("正在主动关闭WebSocket连接[URI:{}]...", this.serverUri);
this.safeCloseSession(this.session);
this.connected = false;
this.session = null;
}
close() 方法做了三件事:
第一,关闭重连总闸:this.enableReconnect = false。这是一个"不可逆操作"——一旦设为 false,后续所有重连检查都会被拦下,客户端再也不会自动重连。
第二,释放 CountDownLatch:如果 connect() 正在 connectLatch.await() 中阻塞等待,close() 调用 latch.countDown() 将其唤醒。被唤醒后 connect() 会发现 Session 已经被关了,走正常的失败处理流程。如果不释放 Latch,connect() 可能永远阻塞在那里——线程泄漏。
第三,安全关闭 Session:safeCloseSession() 会发送 WebSocket Close 帧给服务端,然后关闭底层 TCP 连接。这是一个"正常关闭"流程,服务端会收到 Close 帧并清理对应的客户端信息。
十、双层重连开关:配置与运行时的分工
WebsocketClient 设计了两个重连开关,它们的职责不同:
| 字段 | 类型 | 可变性 | 语义 |
|---|---|---|---|
autoReconnect |
final boolean |
构造时确定,不可变 | "这个客户端是否支持自动重连" |
enableReconnect |
volatile boolean |
运行时可变 | "当前是否允许自动重连" |
为什么需要两个开关?
autoReconnect 是一个配置级的开关——如果创建客户端时传了 autoReconnect = false,那这个客户端从始至终都不会自动重连。它是 final 的,不可修改。
enableReconnect 是一个运行时的开关——默认是 true。调用 close() 或 disableReconnect() 时会被设为 false。它的作用是"关闭正在运行的重连机制"——比如应用正在优雅停机,需要阻止重连。
两者是 AND 关系:
if (this.enableReconnect && this.autoReconnect) {
// 才会真正执行重连
}
只有两个开关都为 true 时才重连。这种"双层保险"的设计保证了:
- 不想要重连功能的客户端,从构造时就永远不会重连(
autoReconnect = false) - 需要运行时关闭重连的场景,可以随时关闭(
enableReconnect = false),不需要重建客户端
十一、DataExchanger:客户端的数据交换中枢
DataExchanger 是客户端与服务端之间的"桥梁"——它封装了 WebsocketClient 的初始化、消息处理器注册和加密发送的完整流程。
11.1 DCL 双重检查锁:只初始化一次
public static void run() {
// 双重检查锁定(DCL)风格,避免 synchronized 开销
if (started) {
if (log.isDebugEnabled()) {
log.debug("数据交换器已启动,跳过重复初始化!");
}
return;
}
String serverUri = ConfigLoader.getMonitoringProperties().getComm().getWebsocket().getUrl();
if (StringUtils.isBlank(serverUri)) {
return;
}
synchronized (OBJECT_LOCK) {
// 二次检查 + 设置 started
if (started) {
return;
}
// 1. 创建客户端
String endpoint = ConfigLoader.getMonitoringProperties().getInstance().getEndpoint();
String instanceId = InstanceGenerator.getInstanceId();
String uri = serverUri + "/websocket/relay/" + WebSocketBusinessTypeConstants.MONITORING
+ "?endpoint=" + endpoint + "&instanceId=" + instanceId;
wsClient = new WebsocketClient(uri);
// 标记为已启动
started = true;
// 捕获局部快照,防止 close() 并发将 wsClient 置 null 导致 NPE
WebsocketClient clientRef = wsClient;
ThreadPool.getCommonIoIntensiveThreadPoolExecutor().execute(() -> {
try {
// 2. 注册消息处理器
MulticastWebsocketMessageHandler dispatcher = new MulticastWebsocketMessageHandler();
ServiceLoader<IWebsocketMessageHandler> loader = ServiceLoader.load(IWebsocketMessageHandler.class);
for (IWebsocketMessageHandler handler : loader) {
dispatcher.registerHandler(handler);
}
clientRef.setMessageHandler(dispatcher::onRawMessage);
log.info("已注册 {}个 WebSocket消息处理器!", dispatcher.getHandlerCount());
// 3. 阻塞直到连接成功或超时
clientRef.connectWithRetry();
} catch (Exception e) {
log.error("运行数据交换器失败(将依赖WebsocketClient内部重连机制):{}", e.getMessage());
}
});
}
}
DCL(Double-Checked Locking)的结构我们在 ThreadPoolAcquirer 中已经见过了,但 DataExchanger 的实现有几个独特之处:
started 标志的设置时机:注意 started = true 是在创建完 WebsocketClient 之后、连接之前就设置了。这意味着即使连接失败,也不会再次进入初始化流程。为什么?因为 WebsocketClient 有自己的重连机制——创建成功后连接失败,交给它内部重试就好了,不需要重新走 DataExchanger.run() 的初始化流程。
局部快照 clientRef:把 wsClient 赋值给局部变量 clientRef,然后在 Lambda 中使用 clientRef 而不是 wsClient。为什么?因为异步任务执行时,close() 方法可能已经把 wsClient 置为 null 了。局部变量是 Lambda 捕获的"快照",不受外部修改影响。
异步初始化:连接和消息处理器注册都放在 ThreadPool.getCommonIoIntensiveThreadPoolExecutor().execute(...) 中异步执行。这意味着 DataExchanger.run() 调用后会立即返回,不会阻塞 Monitor 的启动流程。
11.2 SPI 消息处理器注册:自动发现的魔法
MulticastWebsocketMessageHandler dispatcher = new MulticastWebsocketMessageHandler();
ServiceLoader<IWebsocketMessageHandler> loader = ServiceLoader.load(IWebsocketMessageHandler.class);
for (IWebsocketMessageHandler handler : loader) {
dispatcher.registerHandler(handler);
}
clientRef.setMessageHandler(dispatcher::onRawMessage);
客户端不仅要发数据,也要接收服务端下发的指令。那么问题来了:客户端怎么知道要注册哪些消息处理器?
Phoenix 使用了 Java 标准的 SPI(Service Provider Interface) 机制。在 META-INF/services/ 目录下有一个以接口全限定名命名的文件:
# 文件路径:META-INF/services/com.gitee.pifeng.monitoring.plug.core.wsclient.inf.IWebsocketMessageHandler
# 客户端模块中的实现
com.gitee.pifeng.monitoring.plug.core.wsclient.JavaThreadPoolMessageHandler
ServiceLoader.load() 会自动扫描 classpath 下所有 JAR 包的 META-INF/services/ 目录,找到 IWebsocketMessageHandler 接口的所有实现类,并实例化它们。开发者只需要:
- 实现
IWebsocketMessageHandler接口 - 在
META-INF/services/中注册
不需要修改 DataExchanger 的任何代码,新的处理器就会被自动发现和注册。这就是 SPI 的魅力——约定优于配置。
目前 Phoenix 注册了两个消息处理器:
JavaThreadPoolMessageHandler(客户端模块):处理服务端下发的线程池调参指令DockerMessageHandler(代理端模块):处理服务端下发的 Docker 操作指令
以 JavaThreadPoolMessageHandler 为例:
public class JavaThreadPoolMessageHandler implements IWebsocketMessageHandler {
@Override
public void handleMessage(WebSocketPackage responsePackage) {
String className = responsePackage.getClassName();
if (!StringUtils.equals(JavaThreadPoolPackage.class.getName(), className)) {
return;
}
Object payload = responsePackage.getPayload();
JavaThreadPoolPackage javaThreadPoolPackage = (JavaThreadPoolPackage) payload;
JavaThreadPool threadPool = javaThreadPoolPackage.getJavaThreadPool();
List<JavaThreadPool.ThreadPoolInfoDomain> threadPoolInfoDomains = threadPool.getThreadPoolInfoDomains();
AtomicBoolean dynamicUpdateSuccess = new AtomicBoolean(false);
if (CollectionUtils.isNotEmpty(threadPoolInfoDomains)) {
threadPoolInfoDomains.forEach((threadPoolInfoDomain) -> {
boolean success = ThreadPoolManager.dynamicUpdateThreadPool(threadPoolInfoDomain);
if (success) {
dynamicUpdateSuccess.set(true);
}
});
}
// 存在成功,立即发送Java线程池信息
if (dynamicUpdateSuccess.get()) {
ThreadPoolAcquirer.getInstanceScheduledThreadPoolExecutor().execute(new JavaThreadPoolThread());
}
}
}
整个流程:收到服务端下发的 JavaThreadPoolPackage → 提取线程池配置 → 动态修改本地线程池参数 → 立即上报最新的线程池状态。这就是 WebSocket 双向通信的价值——运维人员在 UI 上调了参数,几秒钟内客户端就完成了调整并反馈了结果。
11.3 MulticastWebsocketMessageHandler:多播分发器
public class MulticastWebsocketMessageHandler {
private final List<IWebsocketMessageHandler> handlers = Lists.newArrayList();
@SneakyThrows
public void onRawMessage(String message) {
if (StringUtils.isBlank(message)) {
return;
}
// 解密 → 反序列化 → 白名单校验
WebSocketPackage pkg = WebSocketPackage.convert(message, DOWNSTREAM_ALLOWED_CLASS_NAMES);
for (IWebsocketMessageHandler handler : this.handlers) {
try {
handler.handleMessage(pkg);
} catch (Exception e) {
// 隔离异常,防止一个处理器失败影响其他处理器
log.error("Websocket消息处理器执行异常,消息:{}", message, e);
}
}
}
}
MulticastWebsocketMessageHandler 是一个经典的 广播器(Multicaster) 模式。它持有一个处理器列表,收到消息后依次调用每个处理器,异常相互隔离。
注意它使用的是 下行白名单 DOWNSTREAM_ALLOWED_CLASS_NAMES:
public static final Set<String> DOWNSTREAM_ALLOWED_CLASS_NAMES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
CommandPackage.class.getName(),
JavaThreadPoolPackage.class.getName()
)));
只有两种类型:命令包和线程池包。这和上行白名单(9 种类型)形成了鲜明对比——上行方向(客户端→服务端)允许的类型远多于下行方向(服务端→客户端)。这是因为客户端上报的数据种类繁多(心跳、JVM、服务器、Docker……),而服务端下发给客户端的指令种类有限。上行和下行分别维护独立的白名单,严格遵循最小权限原则。
11.4 sendMessage():TOCTOU 安全的消息发送
public static void sendMessage(WebSocketPackage requestPackage) {
String requestPackageJsonStr = requestPackage.toJsonString();
if (log.isDebugEnabled()) {
log.debug("发送数据包:{}", requestPackageJsonStr);
}
// 将 明文JSON字符串 转换成 密文JSON字符串
String encryptStr = MsgPayloadUtils.encryptPayload(requestPackageJsonStr);
WebsocketClient client = wsClient; // 读一次 volatile
if (client == null) {
log.warn("WebSocket客户端尚未初始化,消息已丢弃!");
return;
}
client.sendMessage(encryptStr);
}
这里又出现了我们的老朋友——volatile 快照读。WebsocketClient client = wsClient 把 volatile 变量读到局部变量 client 中,后续操作都基于这个快照。
这解决了一个经典的并发问题——TOCTOU(Time-of-Check-to-Time-of-Use)竞态条件。如果直接用 wsClient:
// 危险的写法!
if (wsClient != null) { // 时刻1:检查,不为 null
// 此时 close() 把 wsClient 置为 null
wsClient.sendMessage(...); // 时刻2:使用,NullPointerException!
}
在"检查"和"使用"之间的时间窗口内,另一个线程可能调用了 close() 把 wsClient 置为 null。读到局部变量后,即使另一个线程修改了 wsClient,client 变量仍然指向原来的对象——检查和使用的是同一个引用。
11.5 isReady():数据交换器的就绪检查
public static boolean isReady() {
WebsocketClient client = wsClient;
if (client == null) {
return false;
}
return client.isConnected();
}
所有的定时上报线程(心跳、JVM、服务器、线程池等)在执行任务前,都会先调用 DataExchanger.isReady() 检查连接是否就绪。如果还没连上,这次采集就直接跳过——不浪费 CPU 去采集数据,因为发不出去。
// HeartbeatThread 中的用法
@Override
public void run() {
if (!DataExchanger.isReady()) {
return; // 连接没好,这次不发
}
// 构建心跳包并发送...
}
11.6 close():关闭数据交换器
public static void close() {
synchronized (OBJECT_LOCK) {
if (wsClient != null) {
wsClient.close();
}
wsClient = null;
started = false;
}
}
close() 在 synchronized 块中执行——和 run() 使用同一个锁对象 OBJECT_LOCK,保证初始化和关闭不会同时执行。关闭时把 started 重置为 false,理论上允许 DataExchanger 被重新启动——虽然在当前 Phoenix 的生命周期中,这种情况不会发生。
十二、从 Monitor 启动看全链路
把所有组件串起来,看看客户端从启动到发送第一个心跳包的完整链路:
┌──────────────────┐
│ Monitor.start() │
└────────┬─────────┘
│
├── ① 初始化加解密配置
│
├── ② DataExchanger.run()
│ │
│ ├── 读取 WebSocket URL 配置
│ ├── 构建带 endpoint + instanceId 的完整 URI
│ ├── new WebsocketClient(uri) ──── 参数校验
│ ├── started = true
│ └── 异步执行 ──────────────────────────────────────┐
│ │
├── ③ HeartbeatTaskScheduler.run() │
│ └── 每30秒执行 HeartbeatThread │
│ └── if (!DataExchanger.isReady()) return; ←─┼── 连接未就绪时跳过
│ │
├── ④ JvmTaskScheduler.run() │
│ └── 每60秒执行 JvmThread │
│ │
├── ⑤ 其他定时任务... │
│ │
└── ⑥ ShutdownHook.addShutdownHook() │
│
┌────────────────────────────────────────────┘
│ 异步线程:
├── ServiceLoader 发现并注册消息处理器
├── clientRef.setMessageHandler(dispatcher::onRawMessage)
└── clientRef.connectWithRetry()
│
├── connect()
│ ├── CAS 占位
│ ├── new CountDownLatch(1)
│ ├── SHARED_CLIENT.connectToServer(this, uri)
│ ├── connectLatch.await(5, SECONDS)
│ │ │
│ │ │ ◀── onOpen() 回调
│ │ │ ├── session = session
│ │ │ ├── connected = true
│ │ │ └── connectLatch.countDown() ──── 唤醒
│ │ │
│ └── CAS 释放
│
└── 连接成功!DataExchanger.isReady() 返回 true
│
└── 定时任务开始正常发送数据 ✅
整个过程是异步的、非阻塞的。Monitor.start() 不需要等待 WebSocket 连接成功就能返回——定时任务先启动,但通过 isReady() 检查保证在连接就绪前不会发送数据。一旦连接成功,定时任务的下一次执行就会通过 isReady() 检查,开始正常发送数据。
十三、设计复盘:几个值得学习的模式
"CAS vs synchronized"——选择正确的并发工具
connect() 中使用 AtomicBoolean.compareAndSet() 而不是 synchronized,原因是语义不同:
synchronized的语义是"排队等待"——后来的线程会阻塞,等前面的线程完成后继续执行。CAS的语义是"发现有人在做了,我就不做了"——后来的线程直接放弃,不浪费时间。
对于"防止重复连接"这个场景,CAS 更合适——重复的连接请求不需要排队等待,直接跳过就好。
但在 DataExchanger.run() 中,使用的是 synchronized——因为初始化流程中有多步操作(创建客户端、设置标志、提交异步任务),需要原子性保证。CAS 只能保护一个布尔值的翻转,无法保护多步操作的原子性。
选择哪种工具,取决于你要保护什么。保护一个状态标志用 CAS,保护一段初始化逻辑用 synchronized——各得其所。
"volatile 快照读"——低成本的并发安全
在 isConnected()、sendMessage()、DataExchanger.sendMessage() 等方法中,反复出现一个模式:
Session sess = this.session; // 快照读
if (sess != null && sess.isOpen()) {
sess.sendText(message);
}
这是一种零开销的并发安全技巧——不需要锁,不需要 CAS,只需要一行赋值语句。它的原理是:volatile 保证了可见性,但不保证"两次读取得到相同值"。通过赋值到局部变量,把"多次读 volatile"变成"一次读 volatile + 多次读局部变量",消除了 TOCTOU 竞态。
"SPI + 广播器"——无侵入的扩展机制
DataExchanger 通过 SPI 发现消息处理器,MulticastWebsocketMessageHandler 广播消息给所有处理器。新增一种服务端下发指令的处理,只需要:
- 实现
IWebsocketMessageHandler接口 - 在
META-INF/services/中注册
不需要修改 DataExchanger、MulticastWebsocketMessageHandler 或任何现有代码。这种面向扩展开放、面向修改关闭的设计,让客户端 SDK 在不修改源码的情况下就能被扩展——对于一个要嵌入用户应用的 SDK 来说,这是非常重要的品质。
"双层开关"——配置与运行时的分离
autoReconnect(final,不可变)和 enableReconnect(volatile,可变)的组合设计,让"是否支持重连"和"是否允许重连"成为两个独立的关注点。构造时决定能力,运行时控制行为——清晰、灵活、不会混淆。
十四、小结
本篇深入拆解了 Phoenix WebSocket 客户端的连接与重连机制。回顾核心要点:
- 技术选型:客户端用 Tyrus(JSR-356)而非 Netty,避免依赖冲突和依赖膨胀
- 静态共享资源:全局调度器
GLOBAL_SCHEDULER和全局 Tyrus 引擎SHARED_CLIENT,复用底层资源 - 构造器:Fail-Fast 校验 URI 格式、协议、Host,在最早时机暴露错误
- connect() 方法:CAS 无锁并发保护(
connectionPending)→ CountDownLatch 连接同步(connectLatch)→ 异常处理与资源清理 - JSR-356 回调:
@OnOpen更新状态并释放 Latch,@OnMessage异常隔离保护连接,@OnClose识别重复连接(状态码 4000)并触发重连,@OnError只清理不重连 - 指数退避重连:初始 5 秒 → 1.5 倍递增 → 上限 5 分钟,三重防护(运行时开关 + 配置开关 + 连接状态)
- 双层重连开关:
autoReconnect(final,配置级)+enableReconnect(volatile,运行时级),AND 关系 - DataExchanger:DCL 双重检查锁初始化、SPI 自动发现消息处理器、volatile 快照读防 TOCTOU、异步初始化不阻塞启动
- 消息处理:
MulticastWebsocketMessageHandler广播器 +IWebsocketMessageHandlerSPI 扩展点,下行白名单独立于上行白名单
如果说服务端的 WebSocket 实现像一座精密的工厂——Boss/Worker 双线程池、7 层 Pipeline、定时健康巡检——那客户端就像一个坚韧的旅行者:它知道自己要去哪里(serverUri),懂得在路途中保护自己(CAS、CountDownLatch、异常隔离),遇到挫折不放弃但也不蛮干(指数退避),收到"你不该来"的信号时能体面地退出(状态码 4000),在安顿好之后也能听从总部的指挥(SPI 消息处理器)。
下一篇,我们将进入数据安全领域——深入拆解 Phoenix 的加解密体系:AES、DES、SM4 国密算法的统一抽象,看看监控数据是如何在传输过程中被加密保护的。
项目地址:
https://gitcode.com/monitoring-platform/phoenix
https://gitee.com/monitoring-platform/phoenix
https://github.com/709343767/phoenix
评论