上一篇我们拆解了 JVM 信息采集——Phoenix 通过 MXBean 给每个 Java 进程做了一次“全身体检”。但 JVM 层面的线程数只是一个笼统的数字,它告诉你“有 200 个线程在跑”,却不告诉你“这 200 个线程分别属于哪个线程池,哪个池子快满了,哪个池子在疯狂拒绝任务”。本篇将深入 Phoenix 的线程池信息采集机制——它是如何把散落在系统各处的线程池“收编”进一个统一注册表,再定时上报给服务端的。
一、为什么 JVM 线程数据“不够用”?
JVM 的 ThreadMXBean 能告诉你当前有多少活跃线程、多少守护线程、历史峰值是多少。但对于一个真实的生产系统来说,这就像只知道“公司有 200 个员工在工位上”,却不知道哪个部门缺人、哪个部门排队等任务。
线程池才是线程的“归属部门”。一个典型的 Java 应用可能同时运行着十几个线程池:有的负责处理 HTTP 请求,有的负责定时调度,有的负责消息消费,有的负责数据库监控……每个池子都有自己的核心线程数、最大线程数、任务队列、拒绝策略。线程池级别的监控,才是运维真正需要的粒度。
问题是——JDK 没有提供“线程池注册表”这个概念。ThreadPoolExecutor 创建完就是一个孤零零的对象,JVM 不知道它的存在,你也没有任何标准 API 能一次性列出当前进程中所有的线程池。
Phoenix 的解法很直接:既然 JDK 不提供,那我自己建一个。
二、MonitoredExecutor:在构造的一瞬间完成“入编”
Phoenix 定义了两个自定义线程池类:MonitoredThreadPoolExecutor 和 MonitoredScheduledThreadPoolExecutor,分别对应普通线程池和定时调度线程池。它们的核心思路完全相同——在构造方法中自动向 ThreadPoolManager 注册自己。
以 MonitoredThreadPoolExecutor 为例:
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicLong rejectedTaskCount = new AtomicLong(0);
public MonitoredThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler,
String threadPoolName,
boolean needShutdown) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
// 构造完成的那一刻,就注册到全局注册表
ThreadPoolManager.register(threadPoolName, this, needShutdown);
}
}
两个关键参数:
threadPoolName:线程池的名字,全局唯一,是后续查询、上报、动态配置的唯一标识。needShutdown:是否需要由ThreadPoolManager统一管理关闭。有些线程池(比如 Spring 容器管理的 Bean 线程池)有自己的生命周期,不需要额外托管。
这个设计的妙处在于——对使用者来说,创建线程池的代码几乎不需要改变,只是把 new ThreadPoolExecutor(...) 换成 new MonitoredThreadPoolExecutor(...),多传一个名字和一个布尔值,线程池就自动“入编”了。没有注解、没有 AOP、没有配置文件——就是最朴素的构造器注入。
拒绝计数:JDK 不给,那就自己记
MonitoredThreadPoolExecutor 还做了一件 JDK 原生线程池做不到的事——精确记录被拒绝的任务数。
@Override
public void execute(@NonNull Runnable command) {
try {
super.execute(command);
} catch (RejectedExecutionException e) {
this.handleRejection(command, e);
throw e;
}
}
private void handleRejection(Object task, RejectedExecutionException exp) {
this.rejectedTaskCount.incrementAndGet();
log.error("任务 {} 被线程池 {} 拒绝!", task, this, exp);
}
它覆写了 execute、submit 等所有任务提交方法,在捕获到 RejectedExecutionException 时递增原子计数器。JDK 的 ThreadPoolExecutor 只会抛异常,不会帮你记“到底拒绝了多少次”——但这个数字对线上告警至关重要。一个线程池如果 rejectedTaskCount 持续增长,说明它的容量已经成为瓶颈,需要立即扩容或优化。
MonitoredScheduledThreadPoolExecutor 的设计与之如出一辙,只是额外覆写了 schedule、scheduleAtFixedRate、scheduleWithFixedDelay 等定时调度方法。
三、ThreadPoolManager:全局注册表的三重职责
ThreadPoolManager 是整个线程池监控的“中枢”。它用两个 ConcurrentMap 管理所有已注册的线程池:
// 需要由 Manager 统一关闭的线程池
private static final Map<String, ThreadPoolExecutor> NEED_SHUTDOWN_THREAD_POOLS
= Maps.newConcurrentMap();
// 不需要 Manager 管理关闭的线程池(如 Spring Bean)
private static final Map<String, ThreadPoolExecutor> UN_NEED_SHUTDOWN_THREAD_POOLS
= Maps.newConcurrentMap();
为什么要分两个 Map?因为线程池的“生命周期管理”和“信息采集”是两件事。Spring 容器管理的线程池通过 @Bean(destroyMethod = "shutdown") 已经有了自己的关闭机制,如果 ThreadPoolManager 再关一次,就会出现双重关闭的问题。分开存储,关闭时只处理 NEED_SHUTDOWN 的那一半,采集信息时两个 Map 全部遍历。
职责一:注册与防重
public static void register(String threadPoolName, ThreadPoolExecutor executor,
boolean needShutdown) {
if (needShutdown) {
if (NEED_SHUTDOWN_THREAD_POOLS.putIfAbsent(threadPoolName, executor) != null) {
throw new MonitoringUniversalException(
"线程池已经存在,无法注册,请修改线程池名字!");
}
} else {
if (UN_NEED_SHUTDOWN_THREAD_POOLS.putIfAbsent(threadPoolName, executor) != null) {
throw new MonitoringUniversalException(
"线程池已经存在,无法注册,请修改线程池名字!");
}
}
}
putIfAbsent 保证了原子性——同名线程池只能注册一次,重复注册直接抛异常。这是一个“宁可启动失败,也不允许数据混乱”的防御性设计。
职责二:信息采集
getAllThreadPoolInfo() 是上报链路的起点。它遍历两个 Map,为每个线程池提取一份“快照”:
public static JavaThreadPool getAllThreadPoolInfo() {
List<JavaThreadPool.ThreadPoolInfoDomain> threadPoolInfoDomains
= Lists.newArrayList();
for (Map.Entry<String, ThreadPoolExecutor> entry
: NEED_SHUTDOWN_THREAD_POOLS.entrySet()) {
threadPoolInfoDomains.add(wrapThreadPoolInfoDomain(entry));
}
for (Map.Entry<String, ThreadPoolExecutor> entry
: UN_NEED_SHUTDOWN_THREAD_POOLS.entrySet()) {
threadPoolInfoDomains.add(wrapThreadPoolInfoDomain(entry));
}
return JavaThreadPool.builder()
.threadPoolInfoDomains(threadPoolInfoDomains).build();
}
每个线程池被提取的指标多达 16 个维度:
| 指标 | 含义 | 为什么重要 |
|---|---|---|
activeCount |
正在执行任务的线程数 | 反映实时负载 |
poolSize |
当前线程池中的线程总数 | 判断线程是否被回收 |
corePoolSize |
核心线程数 | 基准容量 |
maximumPoolSize |
最大线程数 | 容量上限 |
largestPoolSize |
历史峰值线程数 | 判断是否曾经打满 |
taskCount |
累计接收的总任务数 | 反映吞吐量 |
completedTaskCount |
已完成的任务数 | 与 taskCount 差值 = 积压量 |
rejectedTaskCount |
累计拒绝的任务数 | 最核心的告警指标 |
queueSize |
当前队列中等待的任务数 | 排队严重程度 |
queueCapacity |
队列总容量 | 队列大小上限 |
queueRemainingCapacity |
队列剩余容量 | 距离满队列还有多远 |
queueType |
队列类型 | LinkedBlockingQueue / ArrayBlockingQueue 等 |
rejectedExecutionHandlerName |
拒绝策略名称 | AbortPolicy / CallerRunsPolicy 等 |
keepAliveTime |
空闲线程回收时间(秒) | 线程存活策略 |
allowCoreThreadTimeOut |
是否允许核心线程超时 | 核心线程是否会被回收 |
utilizationRate |
线程池利用率 | activeCount / poolSize |
其中,队列容量的计算有一个值得注意的边界处理:
if (remainingCapacity >= Integer.MAX_VALUE) {
threadPoolInfoDomain.setQueueCapacity((long) Integer.MAX_VALUE);
} else {
threadPoolInfoDomain.setQueueCapacity(
(long) queueSize + (long) remainingCapacity);
}
无界队列(如 LinkedBlockingQueue 无参构造、DelayedWorkQueue)的 remainingCapacity() 返回 Integer.MAX_VALUE,这只是一个“哨兵值”表示无限制。如果直接用 queueSize + remainingCapacity 计算容量,结果会溢出。Phoenix 在这里做了防御——遇到无界队列,直接将容量设为 Integer.MAX_VALUE,语义清晰,不会误导。
职责三:优雅关闭
public static void shutdownAllGracefullyAndUnregister() {
List<Map.Entry<String, ThreadPoolExecutor>> entries
= Lists.newArrayList(NEED_SHUTDOWN_THREAD_POOLS.entrySet());
for (Map.Entry<String, ThreadPoolExecutor> entry : entries) {
String key = entry.getKey();
ThreadPoolExecutor executor = entry.getValue();
shutdownGracefully(executor, key);
unregister(key);
}
}
关闭流程遵循“先 shutdown() 等待 15 秒,再 shutdownNow() 强制中断”的两阶段策略。注意它只处理 NEED_SHUTDOWN 池子——那些由 Spring 管理的线程池自有其归宿。
四、谁在用 MonitoredExecutor?
Phoenix 内部大量使用了这两个自定义线程池。以客户端的 ThreadPoolAcquirer 为例,它通过双重检查锁懒加载了多个调度线程池:
public static MonitoredScheduledThreadPoolExecutor
getInstanceScheduledThreadPoolExecutor() {
if (instanceScheduledThreadPoolExecutor == null) {
synchronized (ThreadPoolAcquirer.class) {
if (instanceScheduledThreadPoolExecutor == null) {
instanceScheduledThreadPoolExecutor =
new MonitoredScheduledThreadPoolExecutor(
(int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
new BasicThreadFactory.Builder()
.namingPattern(
"phoenix-instance-scheduled-pool-thread-%d")
.daemon(true).build(),
new ThreadPoolExecutor.AbortPolicy(),
"phoenix-instance-scheduled-pool", true);
}
}
}
return instanceScheduledThreadPoolExecutor;
}
服务端的 ThreadPoolConfig 则通过 Spring @Bean 的方式创建,传入 needShutdown = false(由 Spring 的 destroyMethod = "shutdown" 管理生命周期):
@Bean(name = "instanceMonitorThreadPoolExecutor", destroyMethod = "shutdown")
public MonitoredThreadPoolExecutor instanceMonitorThreadPoolExecutor() {
return new MonitoredThreadPoolExecutor(
...,
"phoenix-server-instance-monitor-pool", false);
}
无论是客户端还是服务端、无论是手动创建还是 Spring 托管——只要用了 MonitoredExecutor,就自动进入注册表,自动被采集上报。这就是“构造即注册”模式的威力:零遗漏,零遗忘。
五、定时上报:从采集到落库的完整链路
5.1 调度器:JavaThreadPoolTaskScheduler
和 JVM 信息采集一样,线程池信息的上报也由一个专属调度器驱动:
public class JavaThreadPoolTaskScheduler {
public static void run() {
boolean javaThreadPoolInfoEnable = ConfigLoader
.getMonitoringProperties().getJavaThreadPoolInfo().getEnable();
if (javaThreadPoolInfoEnable) {
long rate = ConfigLoader
.getMonitoringProperties().getJavaThreadPoolInfo().getRate();
ThreadPoolAcquirer.getInstanceScheduledThreadPoolExecutor()
.scheduleWithFixedDelay(
new JavaThreadPoolThread(), 45, rate, TimeUnit.SECONDS);
}
}
}
几个关键设计与 JVM 调度器一脉相承:
- 默认关闭:
monitoring.java-thread-pool-info.enable默认false,按需开启。 - 频率下界 30 秒:配置加载阶段强校验,低于 30 秒直接抛异常。
- 延迟 45 秒:和 JVM 包一样,确保心跳先行注册实例,避免服务端因找不到实例而拒绝数据包。
- 默认频率 60 秒:未配置时每 60 秒上报一次,对大多数场景足够。
它在 Monitor 入口类的启动序列中排在第 9 步,紧跟在 JVM 信息采集之后:
6. 心跳 → 7. 服务器信息 → 8. JVM 信息 → 9. 线程池信息 → 10. 网络设备信息
5.2 执行线程:JavaThreadPoolThread
每次“闹钟响起”,JavaThreadPoolThread 的 run() 方法被触发,执行的是经典的“采集 → 封装 → 发送”三步走:
@Override
public void run() {
if (!DataExchanger.isReady()) {
return;
}
TimeInterval timer = DateUtil.timer();
try {
// 1. 从注册表中采集所有线程池的快照
JavaThreadPool javaThreadPoolInfo
= JavaThreadPoolUtils.getJavaThreadPoolInfo();
// 2. 封装成 JavaThreadPoolPackage
JavaThreadPoolPackage javaThreadPoolPackage
= this.clientPackageConstructor
.structureJavaThreadPoolPackage(javaThreadPoolInfo);
// 3. 通过 WebSocket 发送
WebSocketPackage requestPackage = new WebSocketPackage();
requestPackage.setClassName(JavaThreadPoolPackage.class.getName());
requestPackage.setPayload(javaThreadPoolPackage);
DataExchanger.sendMessage(requestPackage);
} catch (NetException e) {
log.error("获取网络信息异常!", e);
} catch (Exception e) {
log.error("其它异常!", e);
} finally {
// 耗时超过5秒,打 warn 日志
String betweenDay = timer.intervalPretty();
if (timer.intervalSecond() > 5) {
log.warn("构建+发送Java线程池信息包耗时:{}", betweenDay);
}
}
}
其中 JavaThreadPoolUtils.getJavaThreadPoolInfo() 只有一行——直接委托给 ThreadPoolManager.getAllThreadPoolInfo()。这层工具类看起来“多余”,实际上它提供了一个稳定的 API 边界:采集逻辑变了,只改 Utils 内部实现;线程调用端不受影响。
DataExchanger.isReady() 检查确保 WebSocket 连接已就绪。如果连接断了,这次采集直接跳过——不会因为网络问题阻塞后续调度。
5.3 服务端落库:双表并行写入
服务端收到线程池信息包后,JavaThreadPoolServiceImpl.dealJavaThreadPoolPackage() 负责处理。它的核心逻辑是两步并行:
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
// 1. 更新实时表(有则更新,无则新增)
CompletableFuture.runAsync(
() -> selfProxy.operateMonitorJavaThreadPool(javaThreadPoolPackage),
this.instanceMonitorThreadPoolExecutor),
// 2. 追加历史表(每次都新增一条记录)
CompletableFuture.runAsync(
() -> this.javaThreadPoolHistoryService
.operateMonitorJavaThreadPoolHistory(javaThreadPoolPackage),
this.instanceMonitorThreadPoolExecutor)
);
allFutures.get(30, TimeUnit.SECONDS);
实时表 MONITOR_JAVA_THREAD_POOL:以 (instanceId, name) 为唯一键,已存在则更新最新值,不存在则新增。这张表回答的问题是“现在线程池是什么状态”。
历史表 MONITOR_JAVA_THREAD_POOL_HISTORY:每次上报都追加一条记录,用于趋势分析。这张表回答的问题是“线程池的状态随时间如何变化”。
两个维度不加事务,@Retryable 兜底瞬时故障——和 JVM 信息落库完全一致的策略。监控数据天然容忍短暂丢失,不值得为一致性牺牲并发性能。
六、数据流全景:从构造器到数据库
把上面的所有环节串起来,线程池信息的完整数据流是这样的:
new MonitoredThreadPoolExecutor(...) // 构造即注册
│
▼
ThreadPoolManager 注册表
├── NEED_SHUTDOWN_THREAD_POOLS
└── UN_NEED_SHUTDOWN_THREAD_POOLS
│
▼ (定时触发)
JavaThreadPoolTaskScheduler
└── scheduleWithFixedDelay(JavaThreadPoolThread, 45s, rate)
│
▼
JavaThreadPoolThread.run()
├── ThreadPoolManager.getAllThreadPoolInfo() // 快照采集
├── ClientPackageConstructor.structureJavaThreadPoolPackage() // 封装
└── DataExchanger.sendMessage() // WebSocket 发送
│
▼ (服务端接收)
JavaThreadPoolServiceImpl.dealJavaThreadPoolPackage()
├── CompletableFuture → operateMonitorJavaThreadPool() // 实时表
└── CompletableFuture → operateMonitorJavaThreadPoolHistory() // 历史表
从线程池被 new 出来的那一刻起,它就进入了监控体系。不需要手动注册、不需要配置文件声明、不需要注解扫描——构造即纳管,存在即可观测。
七、小结
线程池信息采集是 Phoenix 监控中“粒度最细”的定时任务之一。相比 JVM 信息的五大维度面面俱到,线程池监控聚焦在一个核心问题上:你的每一个线程池,此刻活得好不好?
回顾整个设计,几个值得记住的要点:
- 构造即注册:
MonitoredExecutor在构造方法中自动调用ThreadPoolManager.register(),消除了“忘记注册”的风险。 - 拒绝计数:通过覆写
execute/submit方法拦截RejectedExecutionException,用AtomicLong精确记录被拒绝的任务数——这是 JDK 原生线程池不提供的关键指标。 - 双 Map 分治:将“需要托管关闭”和“不需要托管关闭”的线程池分开存储,采集时全量遍历,关闭时只处理前者,避免与 Spring 容器的生命周期管理冲突。
- 16 维快照:每次采集提取 16 个指标维度,从活跃线程数到队列剩余容量,给运维人员一张完整的“线程池体检报告”。
- 双表落库:实时表反映当前状态,历史表支撑趋势分析,
CompletableFuture并行写入,不加事务换取并发性能。
下一篇,我们将换一个视角——从客户端内部的采集逻辑,转向客户端与 Spring Boot 的集成方式。Phoenix 是如何通过一个 @EnableMonitoring 注解,就让 Spring Boot 应用自动接入整套监控体系的?Spring Boot Starter 的自动配置魔法,@Import 与条件装配的组合拳,背后的原理值得细细拆解。敬请期待。
项目地址:
https://gitee.com/pifeng/phoenix
https://gitee.com/monitoring-platform/phoenix
https://github.com/709343767/phoenix

评论