目录

    Phoenix监控平台技术解析(十二):线程池信息采集——MonitoredExecutor 注册表与定时上报机制

    上一篇我们拆解了 JVM 信息采集——Phoenix 通过 MXBean 给每个 Java 进程做了一次“全身体检”。但 JVM 层面的线程数只是一个笼统的数字,它告诉你“有 200 个线程在跑”,却不告诉你“这 200 个线程分别属于哪个线程池,哪个池子快满了,哪个池子在疯狂拒绝任务”。本篇将深入 Phoenix 的线程池信息采集机制——它是如何把散落在系统各处的线程池“收编”进一个统一注册表,再定时上报给服务端的。


    一、为什么 JVM 线程数据“不够用”?

    JVM 的 ThreadMXBean 能告诉你当前有多少活跃线程、多少守护线程、历史峰值是多少。但对于一个真实的生产系统来说,这就像只知道“公司有 200 个员工在工位上”,却不知道哪个部门缺人、哪个部门排队等任务。

    线程池才是线程的“归属部门”。一个典型的 Java 应用可能同时运行着十几个线程池:有的负责处理 HTTP 请求,有的负责定时调度,有的负责消息消费,有的负责数据库监控……每个池子都有自己的核心线程数、最大线程数、任务队列、拒绝策略。线程池级别的监控,才是运维真正需要的粒度。

    问题是——JDK 没有提供“线程池注册表”这个概念。ThreadPoolExecutor 创建完就是一个孤零零的对象,JVM 不知道它的存在,你也没有任何标准 API 能一次性列出当前进程中所有的线程池。

    Phoenix 的解法很直接:既然 JDK 不提供,那我自己建一个。


    二、MonitoredExecutor:在构造的一瞬间完成“入编”

    Phoenix 定义了两个自定义线程池类:MonitoredThreadPoolExecutorMonitoredScheduledThreadPoolExecutor,分别对应普通线程池和定时调度线程池。它们的核心思路完全相同——在构造方法中自动向 ThreadPoolManager 注册自己

    MonitoredThreadPoolExecutor 为例:

    public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
    
        private final AtomicLong rejectedTaskCount = new AtomicLong(0);
    
        public MonitoredThreadPoolExecutor(int corePoolSize,
                                           int maximumPoolSize,
                                           long keepAliveTime,
                                           TimeUnit unit,
                                           BlockingQueue<Runnable> workQueue,
                                           ThreadFactory threadFactory,
                                           RejectedExecutionHandler handler,
                                           String threadPoolName,
                                           boolean needShutdown) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
                  workQueue, threadFactory, handler);
            // 构造完成的那一刻,就注册到全局注册表
            ThreadPoolManager.register(threadPoolName, this, needShutdown);
        }
    }
    

    两个关键参数:

    • threadPoolName:线程池的名字,全局唯一,是后续查询、上报、动态配置的唯一标识。
    • needShutdown:是否需要由 ThreadPoolManager 统一管理关闭。有些线程池(比如 Spring 容器管理的 Bean 线程池)有自己的生命周期,不需要额外托管。

    这个设计的妙处在于——对使用者来说,创建线程池的代码几乎不需要改变,只是把 new ThreadPoolExecutor(...) 换成 new MonitoredThreadPoolExecutor(...),多传一个名字和一个布尔值,线程池就自动“入编”了。没有注解、没有 AOP、没有配置文件——就是最朴素的构造器注入。

    拒绝计数:JDK 不给,那就自己记

    MonitoredThreadPoolExecutor 还做了一件 JDK 原生线程池做不到的事——精确记录被拒绝的任务数

    @Override
    public void execute(@NonNull Runnable command) {
        try {
            super.execute(command);
        } catch (RejectedExecutionException e) {
            this.handleRejection(command, e);
            throw e;
        }
    }
    
    private void handleRejection(Object task, RejectedExecutionException exp) {
        this.rejectedTaskCount.incrementAndGet();
        log.error("任务 {} 被线程池 {} 拒绝!", task, this, exp);
    }
    

    它覆写了 executesubmit 等所有任务提交方法,在捕获到 RejectedExecutionException 时递增原子计数器。JDK 的 ThreadPoolExecutor 只会抛异常,不会帮你记“到底拒绝了多少次”——但这个数字对线上告警至关重要。一个线程池如果 rejectedTaskCount 持续增长,说明它的容量已经成为瓶颈,需要立即扩容或优化。

    MonitoredScheduledThreadPoolExecutor 的设计与之如出一辙,只是额外覆写了 schedulescheduleAtFixedRatescheduleWithFixedDelay 等定时调度方法。


    三、ThreadPoolManager:全局注册表的三重职责

    ThreadPoolManager 是整个线程池监控的“中枢”。它用两个 ConcurrentMap 管理所有已注册的线程池:

    // 需要由 Manager 统一关闭的线程池
    private static final Map<String, ThreadPoolExecutor> NEED_SHUTDOWN_THREAD_POOLS 
        = Maps.newConcurrentMap();
    
    // 不需要 Manager 管理关闭的线程池(如 Spring Bean)
    private static final Map<String, ThreadPoolExecutor> UN_NEED_SHUTDOWN_THREAD_POOLS 
        = Maps.newConcurrentMap();
    

    为什么要分两个 Map?因为线程池的“生命周期管理”和“信息采集”是两件事。Spring 容器管理的线程池通过 @Bean(destroyMethod = "shutdown") 已经有了自己的关闭机制,如果 ThreadPoolManager 再关一次,就会出现双重关闭的问题。分开存储,关闭时只处理 NEED_SHUTDOWN 的那一半,采集信息时两个 Map 全部遍历。

    职责一:注册与防重

    public static void register(String threadPoolName, ThreadPoolExecutor executor, 
                                 boolean needShutdown) {
        if (needShutdown) {
            if (NEED_SHUTDOWN_THREAD_POOLS.putIfAbsent(threadPoolName, executor) != null) {
                throw new MonitoringUniversalException(
                    "线程池已经存在,无法注册,请修改线程池名字!");
            }
        } else {
            if (UN_NEED_SHUTDOWN_THREAD_POOLS.putIfAbsent(threadPoolName, executor) != null) {
                throw new MonitoringUniversalException(
                    "线程池已经存在,无法注册,请修改线程池名字!");
            }
        }
    }
    

    putIfAbsent 保证了原子性——同名线程池只能注册一次,重复注册直接抛异常。这是一个“宁可启动失败,也不允许数据混乱”的防御性设计。

    职责二:信息采集

    getAllThreadPoolInfo() 是上报链路的起点。它遍历两个 Map,为每个线程池提取一份“快照”:

    public static JavaThreadPool getAllThreadPoolInfo() {
        List<JavaThreadPool.ThreadPoolInfoDomain> threadPoolInfoDomains 
            = Lists.newArrayList();
        for (Map.Entry<String, ThreadPoolExecutor> entry 
            : NEED_SHUTDOWN_THREAD_POOLS.entrySet()) {
            threadPoolInfoDomains.add(wrapThreadPoolInfoDomain(entry));
        }
        for (Map.Entry<String, ThreadPoolExecutor> entry 
            : UN_NEED_SHUTDOWN_THREAD_POOLS.entrySet()) {
            threadPoolInfoDomains.add(wrapThreadPoolInfoDomain(entry));
        }
        return JavaThreadPool.builder()
            .threadPoolInfoDomains(threadPoolInfoDomains).build();
    }
    

    每个线程池被提取的指标多达 16 个维度:

    指标 含义 为什么重要
    activeCount 正在执行任务的线程数 反映实时负载
    poolSize 当前线程池中的线程总数 判断线程是否被回收
    corePoolSize 核心线程数 基准容量
    maximumPoolSize 最大线程数 容量上限
    largestPoolSize 历史峰值线程数 判断是否曾经打满
    taskCount 累计接收的总任务数 反映吞吐量
    completedTaskCount 已完成的任务数 与 taskCount 差值 = 积压量
    rejectedTaskCount 累计拒绝的任务数 最核心的告警指标
    queueSize 当前队列中等待的任务数 排队严重程度
    queueCapacity 队列总容量 队列大小上限
    queueRemainingCapacity 队列剩余容量 距离满队列还有多远
    queueType 队列类型 LinkedBlockingQueue / ArrayBlockingQueue 等
    rejectedExecutionHandlerName 拒绝策略名称 AbortPolicy / CallerRunsPolicy 等
    keepAliveTime 空闲线程回收时间(秒) 线程存活策略
    allowCoreThreadTimeOut 是否允许核心线程超时 核心线程是否会被回收
    utilizationRate 线程池利用率 activeCount / poolSize

    其中,队列容量的计算有一个值得注意的边界处理:

    if (remainingCapacity >= Integer.MAX_VALUE) {
        threadPoolInfoDomain.setQueueCapacity((long) Integer.MAX_VALUE);
    } else {
        threadPoolInfoDomain.setQueueCapacity(
            (long) queueSize + (long) remainingCapacity);
    }
    

    无界队列(如 LinkedBlockingQueue 无参构造、DelayedWorkQueue)的 remainingCapacity() 返回 Integer.MAX_VALUE,这只是一个“哨兵值”表示无限制。如果直接用 queueSize + remainingCapacity 计算容量,结果会溢出。Phoenix 在这里做了防御——遇到无界队列,直接将容量设为 Integer.MAX_VALUE,语义清晰,不会误导。

    职责三:优雅关闭

    public static void shutdownAllGracefullyAndUnregister() {
        List<Map.Entry<String, ThreadPoolExecutor>> entries 
            = Lists.newArrayList(NEED_SHUTDOWN_THREAD_POOLS.entrySet());
        for (Map.Entry<String, ThreadPoolExecutor> entry : entries) {
            String key = entry.getKey();
            ThreadPoolExecutor executor = entry.getValue();
            shutdownGracefully(executor, key);
            unregister(key);
        }
    }
    

    关闭流程遵循“先 shutdown() 等待 15 秒,再 shutdownNow() 强制中断”的两阶段策略。注意它只处理 NEED_SHUTDOWN 池子——那些由 Spring 管理的线程池自有其归宿。


    四、谁在用 MonitoredExecutor?

    Phoenix 内部大量使用了这两个自定义线程池。以客户端的 ThreadPoolAcquirer 为例,它通过双重检查锁懒加载了多个调度线程池:

    public static MonitoredScheduledThreadPoolExecutor 
        getInstanceScheduledThreadPoolExecutor() {
        if (instanceScheduledThreadPoolExecutor == null) {
            synchronized (ThreadPoolAcquirer.class) {
                if (instanceScheduledThreadPoolExecutor == null) {
                    instanceScheduledThreadPoolExecutor = 
                        new MonitoredScheduledThreadPoolExecutor(
                            (int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
                            new BasicThreadFactory.Builder()
                                .namingPattern(
                                    "phoenix-instance-scheduled-pool-thread-%d")
                                .daemon(true).build(),
                            new ThreadPoolExecutor.AbortPolicy(), 
                            "phoenix-instance-scheduled-pool", true);
                }
            }
        }
        return instanceScheduledThreadPoolExecutor;
    }
    

    服务端的 ThreadPoolConfig 则通过 Spring @Bean 的方式创建,传入 needShutdown = false(由 Spring 的 destroyMethod = "shutdown" 管理生命周期):

    @Bean(name = "instanceMonitorThreadPoolExecutor", destroyMethod = "shutdown")
    public MonitoredThreadPoolExecutor instanceMonitorThreadPoolExecutor() {
        return new MonitoredThreadPoolExecutor(
            ...,
            "phoenix-server-instance-monitor-pool", false);
    }
    

    无论是客户端还是服务端、无论是手动创建还是 Spring 托管——只要用了 MonitoredExecutor,就自动进入注册表,自动被采集上报。这就是“构造即注册”模式的威力:零遗漏,零遗忘


    五、定时上报:从采集到落库的完整链路

    5.1 调度器:JavaThreadPoolTaskScheduler

    和 JVM 信息采集一样,线程池信息的上报也由一个专属调度器驱动:

    public class JavaThreadPoolTaskScheduler {
    
        public static void run() {
            boolean javaThreadPoolInfoEnable = ConfigLoader
                .getMonitoringProperties().getJavaThreadPoolInfo().getEnable();
            if (javaThreadPoolInfoEnable) {
                long rate = ConfigLoader
                    .getMonitoringProperties().getJavaThreadPoolInfo().getRate();
                ThreadPoolAcquirer.getInstanceScheduledThreadPoolExecutor()
                    .scheduleWithFixedDelay(
                        new JavaThreadPoolThread(), 45, rate, TimeUnit.SECONDS);
            }
        }
    }
    

    几个关键设计与 JVM 调度器一脉相承:

    • 默认关闭monitoring.java-thread-pool-info.enable 默认 false,按需开启。
    • 频率下界 30 秒:配置加载阶段强校验,低于 30 秒直接抛异常。
    • 延迟 45 秒:和 JVM 包一样,确保心跳先行注册实例,避免服务端因找不到实例而拒绝数据包。
    • 默认频率 60 秒:未配置时每 60 秒上报一次,对大多数场景足够。

    它在 Monitor 入口类的启动序列中排在第 9 步,紧跟在 JVM 信息采集之后:

    6. 心跳 → 7. 服务器信息 → 8. JVM 信息 → 9. 线程池信息 → 10. 网络设备信息
    

    5.2 执行线程:JavaThreadPoolThread

    每次“闹钟响起”,JavaThreadPoolThreadrun() 方法被触发,执行的是经典的“采集 → 封装 → 发送”三步走:

    @Override
    public void run() {
        if (!DataExchanger.isReady()) {
            return;
        }
        TimeInterval timer = DateUtil.timer();
        try {
            // 1. 从注册表中采集所有线程池的快照
            JavaThreadPool javaThreadPoolInfo 
                = JavaThreadPoolUtils.getJavaThreadPoolInfo();
            // 2. 封装成 JavaThreadPoolPackage
            JavaThreadPoolPackage javaThreadPoolPackage 
                = this.clientPackageConstructor
                    .structureJavaThreadPoolPackage(javaThreadPoolInfo);
            // 3. 通过 WebSocket 发送
            WebSocketPackage requestPackage = new WebSocketPackage();
            requestPackage.setClassName(JavaThreadPoolPackage.class.getName());
            requestPackage.setPayload(javaThreadPoolPackage);
            DataExchanger.sendMessage(requestPackage);
        } catch (NetException e) {
            log.error("获取网络信息异常!", e);
        } catch (Exception e) {
            log.error("其它异常!", e);
        } finally {
            // 耗时超过5秒,打 warn 日志
            String betweenDay = timer.intervalPretty();
            if (timer.intervalSecond() > 5) {
                log.warn("构建+发送Java线程池信息包耗时:{}", betweenDay);
            }
        }
    }
    

    其中 JavaThreadPoolUtils.getJavaThreadPoolInfo() 只有一行——直接委托给 ThreadPoolManager.getAllThreadPoolInfo()。这层工具类看起来“多余”,实际上它提供了一个稳定的 API 边界:采集逻辑变了,只改 Utils 内部实现;线程调用端不受影响。

    DataExchanger.isReady() 检查确保 WebSocket 连接已就绪。如果连接断了,这次采集直接跳过——不会因为网络问题阻塞后续调度。

    5.3 服务端落库:双表并行写入

    服务端收到线程池信息包后,JavaThreadPoolServiceImpl.dealJavaThreadPoolPackage() 负责处理。它的核心逻辑是两步并行:

    CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        // 1. 更新实时表(有则更新,无则新增)
        CompletableFuture.runAsync(
            () -> selfProxy.operateMonitorJavaThreadPool(javaThreadPoolPackage), 
            this.instanceMonitorThreadPoolExecutor),
        // 2. 追加历史表(每次都新增一条记录)
        CompletableFuture.runAsync(
            () -> this.javaThreadPoolHistoryService
                .operateMonitorJavaThreadPoolHistory(javaThreadPoolPackage), 
            this.instanceMonitorThreadPoolExecutor)
    );
    allFutures.get(30, TimeUnit.SECONDS);
    

    实时表 MONITOR_JAVA_THREAD_POOL:以 (instanceId, name) 为唯一键,已存在则更新最新值,不存在则新增。这张表回答的问题是“现在线程池是什么状态”。

    历史表 MONITOR_JAVA_THREAD_POOL_HISTORY:每次上报都追加一条记录,用于趋势分析。这张表回答的问题是“线程池的状态随时间如何变化”。

    两个维度不加事务,@Retryable 兜底瞬时故障——和 JVM 信息落库完全一致的策略。监控数据天然容忍短暂丢失,不值得为一致性牺牲并发性能。


    六、数据流全景:从构造器到数据库

    把上面的所有环节串起来,线程池信息的完整数据流是这样的:

    new MonitoredThreadPoolExecutor(...)  // 构造即注册
            │
            ▼
    ThreadPoolManager 注册表
      ├── NEED_SHUTDOWN_THREAD_POOLS
      └── UN_NEED_SHUTDOWN_THREAD_POOLS
            │
            ▼  (定时触发)
    JavaThreadPoolTaskScheduler
      └── scheduleWithFixedDelay(JavaThreadPoolThread, 45s, rate)
            │
            ▼
    JavaThreadPoolThread.run()
      ├── ThreadPoolManager.getAllThreadPoolInfo()   // 快照采集
      ├── ClientPackageConstructor.structureJavaThreadPoolPackage()  // 封装
      └── DataExchanger.sendMessage()               // WebSocket 发送
            │
            ▼  (服务端接收)
    JavaThreadPoolServiceImpl.dealJavaThreadPoolPackage()
      ├── CompletableFuture → operateMonitorJavaThreadPool()    // 实时表
      └── CompletableFuture → operateMonitorJavaThreadPoolHistory()  // 历史表
    

    从线程池被 new 出来的那一刻起,它就进入了监控体系。不需要手动注册、不需要配置文件声明、不需要注解扫描——构造即纳管,存在即可观测


    七、小结

    线程池信息采集是 Phoenix 监控中“粒度最细”的定时任务之一。相比 JVM 信息的五大维度面面俱到,线程池监控聚焦在一个核心问题上:你的每一个线程池,此刻活得好不好?

    回顾整个设计,几个值得记住的要点:

    1. 构造即注册MonitoredExecutor 在构造方法中自动调用 ThreadPoolManager.register(),消除了“忘记注册”的风险。
    2. 拒绝计数:通过覆写 execute/submit 方法拦截 RejectedExecutionException,用 AtomicLong 精确记录被拒绝的任务数——这是 JDK 原生线程池不提供的关键指标。
    3. 双 Map 分治:将“需要托管关闭”和“不需要托管关闭”的线程池分开存储,采集时全量遍历,关闭时只处理前者,避免与 Spring 容器的生命周期管理冲突。
    4. 16 维快照:每次采集提取 16 个指标维度,从活跃线程数到队列剩余容量,给运维人员一张完整的“线程池体检报告”。
    5. 双表落库:实时表反映当前状态,历史表支撑趋势分析,CompletableFuture 并行写入,不加事务换取并发性能。

    下一篇,我们将换一个视角——从客户端内部的采集逻辑,转向客户端与 Spring Boot 的集成方式。Phoenix 是如何通过一个 @EnableMonitoring 注解,就让 Spring Boot 应用自动接入整套监控体系的?Spring Boot Starter 的自动配置魔法,@Import 与条件装配的组合拳,背后的原理值得细细拆解。敬请期待。


    项目地址
    https://gitee.com/pifeng/phoenix
    https://gitee.com/monitoring-platform/phoenix
    https://github.com/709343767/phoenix

    欢迎关注微信公众号获取更多技术干货
    微信公众号·披锋斩棘

    end
  1. 作者: 锋哥 (联系作者)
  2. 发表时间: 2026-04-06 09:06
  3. 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  4. 转载声明:如果是转载博主转载的文章,请附上原文链接
  5. 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  6. 评论

    站长头像 知录

    你一句春不晚,我就到了真江南!

    文章0
    浏览0

    文章分类

    标签云