1. 概述
Phoenix 提供了一套完整的动态线程池管理方案,支持在应用运行时对线程池参数进行热更新,无需重启服务。核心能力包括:
- 统一注册管理:所有线程池通过
ThreadPoolManager统一注册、查询、关闭 - 运行时指标采集:活跃线程数、任务数、利用率、拒绝任务数等
- 动态参数热更新:核心线程数、最大线程数、队列类型/容量、拒绝策略等均可在线修改
- Web UI 可视化配置:通过前端表单页面直观地配置线程池参数
- WebSocket 下发:服务端通过 WebSocket 将配置变更推送至客户端应用,实时生效
2. 核心架构
2.1 核心类一览
| 类名 | 所属模块 | 职责 |
|---|---|---|
ThreadPoolManager |
phoenix-common-core | 线程池注册表 + 动态配置入口 |
MonitoredThreadPoolExecutor |
phoenix-common-core | 可监控的普通线程池(自动注册 + 拒绝计数) |
MonitoredScheduledThreadPoolExecutor |
phoenix-common-core | 可监控的定时线程池(自动注册 + 拒绝计数) |
ThreadPool |
phoenix-common-core | 公共线程池工厂(CPU/IO 密集型) |
JavaThreadPool.ThreadPoolInfoDomain |
phoenix-common-core | 线程池信息数据模型 |
QueueTypeEnum |
phoenix-common-core | 队列类型枚举 + 工厂方法 |
RejectedPolicyTypeEnum |
phoenix-common-core | 拒绝策略枚举 + SPI 扩展 |
ResizableLinkedBlockingQueue |
phoenix-common-core | 支持动态修改容量的阻塞队列 |
JavaThreadPoolMessageHandler |
phoenix-client-core | WebSocket 消息处理器,接收并应用线程池配置变更 |
2.2 数据流
Phoenix UI(Web页面)
│ 用户修改线程池参数并提交
▼
Phoenix Server(服务端)
│ 封装为 JavaThreadPoolPackage
│ 通过 WebSocket 下发
▼
Phoenix Client(客户端应用)
│ JavaThreadPoolMessageHandler 接收消息
│ 调用 ThreadPoolManager.dynamicUpdateThreadPool()
▼
ThreadPoolManager
│ 查找注册表中对应的 ThreadPoolExecutor
│ 依次设置各项参数
▼
线程池配置生效(无需重启)
│ 配置成功后立即触发一次线程池信息上报
▼
Phoenix Server 更新监控数据
3. 线程池注册
3.1 自动注册(推荐)
使用 MonitoredThreadPoolExecutor 或 MonitoredScheduledThreadPoolExecutor 创建线程池时,构造方法会自动调用
ThreadPoolManager.register() 完成注册。
普通线程池
MonitoredThreadPoolExecutor executor = new MonitoredThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
keepAliveTime, // 空闲回收时间
TimeUnit.SECONDS, // 时间单位
workQueue, // 工作队列
threadFactory, // 线程工厂
rejectedHandler, // 拒绝策略
"my-thread-pool", // 线程池名称(全局唯一)
true // 是否需要管理关闭
);
定时线程池
MonitoredScheduledThreadPoolExecutor scheduledExecutor = new MonitoredScheduledThreadPoolExecutor(
corePoolSize, // 核心线程数
threadFactory, // 线程工厂
rejectedHandler, // 拒绝策略
"my-scheduled-pool",// 线程池名称(全局唯一)
false // 是否需要管理关闭
);
参数说明
| 参数 | 说明 |
|---|---|
threadPoolName |
线程池名称,全局唯一,重复注册会抛出 MonitoringUniversalException |
needShutdown |
true:注册到 NEED_SHUTDOWN_THREAD_POOLS,应用关闭时由 ThreadPoolManager 统一优雅关闭false:注册到 UN_NEED_SHUTDOWN_THREAD_POOLS,仅做监控,不参与统一关闭(适合 Spring Bean 自行管理生命周期的场景) |
3.2 手动注册
对于已有的 ThreadPoolExecutor 实例,可以手动注册:
ThreadPoolManager.register("legacy-pool",existingExecutor, false);
3.3 取消注册
ThreadPoolManager.unregister("my-thread-pool");
4. 项目中的使用示例
4.1 公共线程池(ThreadPool 工厂)
ThreadPool 类采用双重检查锁 + volatile 的单例模式,提供 4 种公共线程池:
| 方法 | 线程池名称 | 类型 |
|---|---|---|
getCommonCpuIntensiveThreadPoolExecutor() |
phoenix-common-cpu-intensive-pool |
CPU 密集型普通线程池 |
getCommonIoIntensiveThreadPoolExecutor() |
phoenix-common-io-intensive-pool |
IO 密集型普通线程池 |
getCommonCpuIntensiveScheduledThreadPoolExecutor() |
phoenix-common-cpu-intensive-scheduled-pool |
CPU 密集型定时线程池 |
getCommonIoIntensiveScheduledThreadPoolExecutor() |
phoenix-common-io-intensive-scheduled-pool |
IO 密集型定时线程池 |
线程数计算公式:Ncpu / (1 - 阻塞系数),CPU 密集型阻塞系数 = 0.2,IO 密集型阻塞系数 = 0.8。
4.2 Spring Bean 方式(ThreadPoolConfig)
在 phoenix-server 和 phoenix-agent 中,通过 @Bean + @Lazy 注册线程池:
@Lazy
@Bean(name = "instanceMonitorThreadPoolExecutor", destroyMethod = "shutdown")
public MonitoredThreadPoolExecutor instanceMonitorThreadPoolExecutor() {
return new MonitoredThreadPoolExecutor(
(int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
(int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
1L, TimeUnit.HOURS,
new LinkedBlockingQueue<>(Integer.MAX_VALUE),
new BasicThreadFactory.Builder()
.namingPattern("phoenix-server-instance-monitor-pool-thread-%d")
.daemon(true).build(),
new ThreadPoolExecutor.AbortPolicy(),
"phoenix-server-instance-monitor-pool", // 线程池名称
false); // needShutdown=false,由Spring容器管理关闭
}
5. 动态配置详解
5.1 可配置参数
通过 ThreadPoolManager.dynamicUpdateThreadPool() 方法,支持以下参数的运行时热更新:
| 参数 | 字段名 | 类型 | 说明 | 约束 |
|---|---|---|---|---|
| 核心线程数 | corePoolSize |
Integer |
线程池始终保持的最小线程数 | ≥ 0,且 ≤ 最大线程数 |
| 最大线程数 | maximumPoolSize |
Integer |
线程池允许创建的最大线程数 | ≥ 1,且 ≥ 核心线程数;ScheduledThreadPoolExecutor 不支持修改(DelayedWorkQueue 无界) |
| 队列类型 | queueType |
String |
工作队列的实现类型 | 见 队列类型;ScheduledThreadPoolExecutor 不支持变更 |
| 队列容量 | queueCapacity |
Long |
工作队列的最大容量 | 仅 ResizableLinkedBlockingQueue 支持在不切换队列类型时动态修改 |
| 空闲回收时间 | keepAliveTime |
Long |
空闲线程的存活时间(秒) | ≥ 0 |
| 核心线程空闲回收 | allowCoreThreadTimeOut |
Boolean |
是否允许核心线程在空闲超时后被回收 | 开启前 keepAliveTime 必须 > 0,否则跳过设置 |
| 拒绝策略 | rejectedExecutionHandlerName |
String |
线程池满时的拒绝处理方式 | 见 拒绝策略 |
5.2 支持的队列类型
队列通过 QueueTypeEnum 枚举管理,默认容量为 1024。
| 队列类型 | 说明 | 是否支持动态修改容量 | 备注 |
|---|---|---|---|
ArrayBlockingQueue |
基于数组的有界阻塞队列,FIFO | 仅切换队列类型时 | 创建后容量不可变(JDK 限制) |
LinkedBlockingQueue |
基于链表的有界阻塞队列 | 仅切换队列类型时 | JDK 实现的 capacity 为 final |
LinkedBlockingDeque |
基于链表的双端阻塞队列 | 仅切换队列类型时 | 支持头尾双向操作 |
SynchronousQueue |
不存储元素的同步传递队列 | 不支持 | 容量恒为 0,每个 put 必须等待 take |
LinkedTransferQueue |
无界链表传输队列 | 不支持 | 无容量概念 |
PriorityBlockingQueue |
支持优先级排序的无界阻塞队列 | 仅切换队列类型时 | 任务需实现 Comparable |
ResizableLinkedBlockingQueue |
可动态调整容量的链表阻塞队列 | 支持 | 推荐使用,调用 setCapacity() 即可热更新容量 |
DelayedWorkQueue |
ScheduledThreadPoolExecutor 内部私有队列 |
不支持 | 仅展示,不可配置 |
推荐:如果需要在不切换队列类型的情况下动态修改队列容量,请使用
ResizableLinkedBlockingQueue。
队列类型变更机制
当队列类型发生变更时,ThreadPoolManager 会执行以下步骤:
- 通过
QueueTypeEnum.createBlockingQueue()创建目标类型的新队列 - 调用
oldQueue.drainTo(tasks)将旧队列中的待执行任务迁移到新队列 - 通过反射替换
ThreadPoolExecutor的private final workQueue字段
当队列类型未变更,仅修改容量时:
- 如果是
ResizableLinkedBlockingQueue:直接调用setCapacity(int)动态修改 - 如果是其他类型:因 JDK 队列的 capacity 是
final的,日志提示跳过
5.3 支持的拒绝策略
拒绝策略通过 RejectedPolicyTypeEnum 枚举管理,并支持 SPI 扩展。
| 策略名称 | 说明 |
|---|---|
AbortPolicy |
中止策略(JDK 默认):直接抛出 RejectedExecutionException |
CallerRunsPolicy |
调用者运行策略:由提交任务的线程直接执行被拒绝的任务,起到反压效果 |
DiscardOldestPolicy |
丢弃最老任务策略:丢弃队列头部(最早入队)的任务,然后重新提交被拒绝的任务 |
DiscardPolicy |
丢弃策略:直接丢弃被拒绝的任务,不做任何处理 |
RunsOldestTaskPolicy |
执行最老任务策略(自定义):从队列中取出最早的任务并执行它,再将新任务放入队列 |
SyncPutQueuePolicy |
同步阻塞入队策略(自定义):通过 queue.put() 阻塞等待直到队列有空间 |
CustomRejectedPolicy |
自定义 SPI 策略:通过 Java SPI 机制加载用户自定义的拒绝策略 |
自定义拒绝策略(SPI 扩展)
- 实现
CustomRejectedExecutionHandler接口:
public class MyCustomRejectedHandler implements CustomRejectedExecutionHandler {
@Override
public String getName() {
return "MyCustomRejectedHandler";
}
@Override
public RejectedExecutionHandler generateRejected() {
return (r, executor) -> {
// 自定义拒绝逻辑,例如记录日志、持久化任务等
log.warn("任务被拒绝: {}", r);
};
}
}
- 在
META-INF/services/下创建 SPI 配置文件:
文件名: com.gitee.pifeng.monitoring.common.threadpool.rejected.CustomRejectedExecutionHandler
内容: com.example.MyCustomRejectedHandler
- 在 Web UI 配置页面中选择
CustomRejectedPolicy(自定义SPI策略)即可使用。
6. 监控指标
ThreadPoolManager.getAllThreadPoolInfo() 方法采集所有注册线程池的运行时指标:
| 指标 | 字段名 | 类型 | 说明 |
|---|---|---|---|
| 线程池名称 | name |
String | 注册时指定的唯一名称 |
| 活跃线程数 | activeCount |
Integer | 当前正在执行任务的线程数 |
| 当前线程数 | poolSize |
Integer | 线程池中当前存活的线程总数 |
| 核心线程数 | corePoolSize |
Integer | 配置的核心线程数 |
| 最大线程数 | maximumPoolSize |
Integer | 配置的最大线程数 |
| 历史峰值线程数 | largestPoolSize |
Integer | 线程池曾达到的最大线程数 |
| 总任务数 | taskCount |
Long | 累计接收的任务数(含已完成、执行中、等待中) |
| 已完成任务数 | completedTaskCount |
Long | 已执行完毕的任务数 |
| 拒绝任务数 | rejectedTaskCount |
Long | 被拒绝的任务累计数量(仅 Monitored 类型线程池支持) |
| 队列类型 | queueType |
String | 当前使用的阻塞队列类型 |
| 队列大小 | queueSize |
Integer | 队列中当前等待的任务数 |
| 队列容量 | queueCapacity |
Long | 队列的总容量 |
| 队列剩余容量 | queueRemainingCapacity |
Integer | 队列还能容纳的任务数 |
| 拒绝策略 | rejectedExecutionHandlerName |
String | 当前使用的拒绝策略名称 |
| 核心线程空闲回收 | allowCoreThreadTimeOut |
Boolean | 核心线程是否在空闲时被回收 |
| 空闲回收时间 | keepAliveTime |
Long | 空闲线程存活时间(秒) |
| 利用率 | utilizationRate |
Double | activeCount / poolSize,保留 4 位小数 |
无界队列说明:
DelayedWorkQueue、无参构造的LinkedBlockingQueue等无界队列的remainingCapacity()返回Integer.MAX_VALUE,此时queueCapacity统一设为Integer.MAX_VALUE(2147483647),表示"无限制"。
7. 线程池生命周期管理
7.1 优雅关闭
ThreadPoolManager.shutdownGracefully() 方法实现了优雅关闭逻辑:
- 调用
shutdown()阻止新任务提交 - 等待 15 秒 让已提交任务完成
- 超时未完成则调用
shutdownNow()强制中断 - 再等待 15 秒 确认关闭完成
- 若线程被中断,调用
shutdownNow()并恢复中断状态
7.2 统一关闭
调用 ThreadPoolManager.shutdownAllGracefullyAndUnregister() 会遍历所有 needShutdown=true 的线程池,依次执行优雅关闭并取消注册。
8. Web UI 配置页面
配置页面 setup-instance-java-thread-pool.html 基于 Layui 框架,提供以下交互能力:
8.1 表单字段与交互规则
| 字段 | 控件类型 | 可编辑条件 | 校验规则 |
|---|---|---|---|
| 核心线程数 | number 输入框 | 始终可编辑 | 必填、数字、不能大于最大线程数 |
| 最大线程数 | number 输入框 | DelayedWorkQueue 时只读 |
必填、数字、不能小于核心线程数 |
| 队列类型 | 下拉选择框 | DelayedWorkQueue 时禁用 |
必选 |
| 队列容量 | number 输入框 | DelayedWorkQueue/SynchronousQueue/LinkedTransferQueue 时只读 |
必填、数字 |
| 核心线程空闲回收 | 开关组件 | 始终可编辑 | - |
| 空闲回收时间(秒) | number 输入框 | 始终可编辑 | 必填、数字 |
| 拒绝策略 | 下拉选择框 | 始终可编辑 | 必选 |
8.2 联动逻辑
- 队列类型切换时:选择
SynchronousQueue或LinkedTransferQueue后,队列容量自动设为 0 并变为只读 - ScheduledThreadPoolExecutor 限制:当队列类型为
DelayedWorkQueue时,最大线程数和队列类型/容量均为只读,因为ScheduledThreadPoolExecutor的DelayedWorkQueue是内部私有队列,不支持变更
9. 注意事项与最佳实践
9.1 ScheduledThreadPoolExecutor 的限制
ScheduledThreadPoolExecutor 使用内部私有的 DelayedWorkQueue(无界队列),因此:
- 最大线程数修改无意义(由 DelayedWorkQueue 的无界特性决定,仅核心线程数生效)
- 队列类型和容量不支持变更
9.2 allowCoreThreadTimeOut 前置条件
开启 allowCoreThreadTimeOut 前,keepAliveTime 必须 > 0。否则 JDK 会抛出 IllegalArgumentException。ThreadPoolManager
已内置此校验,不满足条件时会跳过设置并输出警告日志。
9.3 corePoolSize 与 maximumPoolSize 设置顺序
动态修改时,若新的 corePoolSize > 当前 maximumPoolSize,需先设置 maximumPoolSize 再设置 corePoolSize,否则 JDK 会抛出
IllegalArgumentException。ThreadPoolManager 已自动处理此问题。
9.4 队列容量动态修改建议
- 如果有动态调整队列容量的需求,建议创建线程池时直接使用
ResizableLinkedBlockingQueue作为工作队列 - 其他 JDK 内置队列的
capacity为final字段,无法在不切换队列类型的情况下修改容量 - 切换队列类型涉及反射替换
ThreadPoolExecutor.workQueue字段,存在一定风险,建议在低峰期操作
9.5 线程池命名规范
- 线程池名称全局唯一,重复注册会抛出异常
- 建议使用
模块名-功能-pool的命名规范,例如:phoenix-server-instance-monitor-pool - 线程命名建议使用
模块名-功能-pool-thread-%d的格式,便于排查问题