目录

    Phoenix监控平台技术解析(十):心跳机制——HeartbeatTaskScheduler 的设计与实现

    如果把监控系统比作一个人体,心跳就是那个你永远不会去主动关注、却一秒都不能停的东西。上一篇我们讲了配置加载的“双轨制”,配置加载完成后,Monitor.start() 的第六步就是调用 HeartbeatTaskScheduler.run()——心脏,开始跳动。本篇将从调度器、执行线程、数据包构造,一路追到服务端的存活判定与告警触发,完整拆解 Phoenix 的心跳机制。


    一、心跳:监控世界的“我还活着”

    在分布式系统中,一个节点是否存活,外界无法直接感知——你不能像看一台显示器那样“看到”一个 Java 进程是否还在运行。唯一的办法就是让它定期主动报到

    这就是心跳的本质:客户端每隔一段时间向服务端发送一个轻量级的数据包,告诉对方“我还在”。如果服务端在一定时间内没有收到心跳,就可以判定这个实例“失联”了——要么宕机了,要么网络断了,要么进程被 kill 了。

    Phoenix 的心跳机制有几个显著特征:

    • 无开关:心跳是唯一没有“enable”配置项的定时任务。服务器信息、JVM 信息、线程池信息都可以通过配置关闭采集,唯独心跳不行——因为它是实例存活的唯一证明。
    • 频率可调但有下界:默认 30 秒一次,用户可以配置更大的值,但不能低于 30 秒。配置加载阶段就会校验这个下界。
    • 延迟启动:首次心跳在应用启动 35 秒后才发出。这不是偷懒,而是刻意为之——给 WebSocket 连接的建立、其他组件的初始化留出充足的时间窗口。

    二、HeartbeatTaskScheduler:只有一个方法的调度器

    打开 HeartbeatTaskScheduler 的源码,你可能会惊讶于它的简洁:

    public class HeartbeatTaskScheduler {
    
        private HeartbeatTaskScheduler() {
        }
    
        public static void run() {
            // 心跳频率
            long rate = ConfigLoader.getMonitoringProperties().getHeartbeat().getRate();
            ThreadPoolAcquirer.getInstanceScheduledThreadPoolExecutor()
                .scheduleWithFixedDelay(new HeartbeatThread(), 35, rate, TimeUnit.SECONDS);
        }
    }
    

    整个类只有一个公开的静态方法 run(),加上一个私有构造方法防止实例化。但这短短几行代码里,藏着三个值得深究的设计决策。

    2.1 scheduleWithFixedDelay 而非 scheduleAtFixedRate

    这是一个经典的选择题。JUC 的 ScheduledThreadPoolExecutor 提供了两种周期调度方式:

    • scheduleAtFixedRate:从上一次开始时间算起,间隔固定时长后执行下一次。如果某次执行耗时超过间隔,下一次会“追赶”执行。
    • scheduleWithFixedDelay:从上一次结束时间算起,间隔固定时长后执行下一次。无论执行多慢,两次执行之间始终有固定的“休息时间”。

    Phoenix 选择了后者。为什么?因为心跳包的发送涉及网络 IO——构造数据包、序列化、加密、通过 WebSocket 发送。如果某次网络抖动导致发送耗时 10 秒,用 scheduleAtFixedRate 可能会导致任务堆积;而 scheduleWithFixedDelay 保证了“上一次完全结束后,再等 N 秒才开始下一次”,天然避免了任务积压。

    这就像约定“每次会议结束后休息 30 分钟再开下一场”,比“每 30 分钟开一场会”要合理得多——前者不会因为某场会议超时而让后面的会议连环追尾。

    2.2 ThreadPoolAcquirer:懒加载的线程池获取器

    心跳任务并不是跑在一个“全局公共线程池”里的,而是由 ThreadPoolAcquirer.getInstanceScheduledThreadPoolExecutor() 提供的专用调度线程池执行。这个线程池有几个特点:

    instanceScheduledThreadPoolExecutor = new MonitoredScheduledThreadPoolExecutor(
        // 线程数 = Ncpu /(1 - 阻塞系数),IO密集型阻塞系数相对较大
        (int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
        new BasicThreadFactory.Builder()
            .namingPattern("phoenix-instance-scheduled-pool-thread-%d")
            .daemon(true)
            .build(),
        new ThreadPoolExecutor.AbortPolicy(),
        "phoenix-instance-scheduled-pool", true);
    

    线程数量公式Ncpu / (1 - 0.8),即 CPU 核心数的 5 倍。这是 IO 密集型任务的经典线程数估算公式——阻塞系数 0.8 意味着线程大部分时间在等待 IO(网络请求),真正占用 CPU 的时间只有 20%。

    守护线程:设置为 daemon = true,意味着当 JVM 中所有非守护线程都结束时,这些心跳线程不会阻止 JVM 退出。这和关闭钩子(Shutdown Hook)配合使用——关闭钩子负责“善后”,守护线程负责“不拖后腿”。

    DCL 懒加载ThreadPoolAcquirer 使用双重检查锁定(Double-Checked Locking)模式创建线程池,确保在多线程环境下只初始化一次。用 volatile 修饰实例变量保证可见性——这是 Java 并发编程中的标准范式。

    自我监控:注意这里创建的不是普通的 ScheduledThreadPoolExecutor,而是 MonitoredScheduledThreadPoolExecutor——Phoenix 自己封装的可监控线程池。也就是说,用来发送心跳的线程池本身,也在被 Phoenix 监控着。监控系统监控自己——有点“套娃”的味道,但这恰恰体现了 Phoenix 的工程严谨性。

    2.3 延迟 35 秒的讲究

    五个定时任务的延迟启动时间分别是 35、40、45、45、50 秒。心跳排在最前面——它是最重要的,也是最轻量的。

    35 秒的延迟给了 WebSocket 连接建立足够的缓冲。回忆第八篇的内容:DataExchanger.run() 在第五步被调用,它在后台异步线程中执行 connectWithRetry()。WebSocket 连接的建立通常只需几秒,但在网络不佳时可能更久。35 秒的窗口足以覆盖绝大多数场景。

    如果 35 秒后 WebSocket 仍未连接成功呢?别担心——HeartbeatThread 在每次执行时都会检查 DataExchanger.isReady(),未就绪就静默跳过,等下一轮再试。


    三、HeartbeatThread:心跳包的构建与发送

    HeartbeatThread 实现了 Runnable 接口,是心跳调度器每次“唤醒”时实际执行的任务:

    @Slf4j
    public class HeartbeatThread implements Runnable {
    
        private final ClientPackageConstructor clientPackageConstructor 
            = ClientPackageConstructor.getInstance();
    
        @Override
        public void run() {
            // 数据交换器未就绪,静默跳过
            if (!DataExchanger.isReady()) {
                return;
            }
            // 计时器
            TimeInterval timer = DateUtil.timer();
            try {
                // 构建心跳数据包
                HeartbeatPackage heartbeatPackage 
                    = this.clientPackageConstructor.structureHeartbeatPackage();
                // 封装为 WebSocket 数据包
                WebSocketPackage requestPackage = new WebSocketPackage();
                requestPackage.setClassName(HeartbeatPackage.class.getName());
                requestPackage.setPayload(heartbeatPackage);
                // 发送
                DataExchanger.sendMessage(requestPackage);
            } catch (NetException e) {
                log.error("获取网络信息异常!", e);
            } catch (Exception e) {
                log.error("其它异常!", e);
            } finally {
                String betweenDay = timer.intervalPretty();
                int criticalValue = 5;
                if (timer.intervalSecond() > criticalValue) {
                    log.warn("构建+发送心跳包耗时:{}", betweenDay);
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("构建+发送心跳包耗时:{}", betweenDay);
                    }
                }
            }
        }
    }
    

    这段代码虽然不长,但每一处都经过精心设计。

    3.1 前置检查:isReady() 的宽容

    DataExchanger.isReady() 是一道柔性门禁。如果 WebSocket 连接尚未建立,心跳线程不会报错、不会抛异常、不会打一堆 ERROR 日志——它只是安静地 return,等待下一轮调度。

    这种“宽容式”设计避免了启动初期的日志污染。想象一下,如果每次检查不通过都打 ERROR 日志,前 35 秒内(WebSocket 还没连上)可能会刷出几十条无意义的异常信息,吓坏运维人员。

    3.2 耗时监控:5 秒阈值的预警

    finally 块里的计时逻辑非常务实:如果一次心跳包的“构建+发送”耗时超过 5 秒,打 WARN 日志;否则只在 DEBUG 级别记录。

    为什么是 5 秒?因为默认心跳频率是 30 秒。如果一次心跳耗时 5 秒,意味着六分之一的周期被消耗在发送上——这已经是一个值得关注的信号了。可能是网络拥塞、服务端响应慢、或者 GC 导致的停顿。WARN 级别确保运维人员能看到,但又不至于触发告警。

    3.3 WebSocketPackage:消息的信封

    心跳包不是直接丢进 WebSocket 通道的,而是先封装成 WebSocketPackage

    WebSocketPackage requestPackage = new WebSocketPackage();
    requestPackage.setClassName(HeartbeatPackage.class.getName());
    requestPackage.setPayload(heartbeatPackage);
    

    className 字段存储的是 HeartbeatPackage 类的全限定名(如 com.gitee.pifeng.monitoring.common.dto.HeartbeatPackage)。服务端的 WebSocketMessageDispatcher 收到消息后,会根据这个 className 查找注册表,将消息路由到对应的处理器。

    这种设计让所有类型的消息(心跳、JVM、服务器信息、告警……)都走同一个 WebSocket 通道,通过 className 做多路复用。


    四、HeartbeatPackage:心跳包里装了什么?

    心跳包的类继承链条是:

    AbstractSuperBean
      └── AbstractSuperPackage(实例端点、实例ID、实例名、IP、计算机名、链路信息)
            └── BaseRequestPackage(请求ID、请求时间、附加信息)
                  └── HeartbeatPackage(心跳频率、Arthas开关、VM指标开关、线程池指标开关)
    

    HeartbeatPackage 自身只有四个字段:

    public class HeartbeatPackage extends BaseRequestPackage {
        private long rate;                       // 心跳频率
        private boolean isEnableArthas = false;  // 是否启用Arthas
        private boolean isCollectVmMetrics = true;  // 是否收集JVM指标
        private boolean isCollectThreadPoolMetrics = false; // 是否收集线程池指标
    }
    

    但加上父类的字段,一个完整的心跳包实际上携带了十几项信息:实例端点、实例ID、实例名称、实例描述、程序语言、应用服务器类型、IP地址、计算机名、链路信息、请求ID、请求时间、心跳频率,以及三个开关标志。

    4.1 structureHeartbeatPackage:数据包的装配流水线

    ClientPackageConstructor.structureHeartbeatPackage() 负责组装完整的心跳包:

    public HeartbeatPackage structureHeartbeatPackage() throws NetException {
        HeartbeatPackage heartbeatPackage = new HeartbeatPackage();
        // 构造基础请求包数据(父类字段:端点、实例ID、IP、时间等)
        this.structureBaseRequestPackage(heartbeatPackage, null);
        // 心跳频率
        heartbeatPackage.setRate(ConfigLoader.getMonitoringProperties().getHeartbeat().getRate());
        // 是否启用Arthas
        heartbeatPackage.setEnableArthas(ConfigLoader.getMonitoringProperties().getArthas().getEnable());
        // 是否收集VM指标
        heartbeatPackage.setCollectVmMetrics(ConfigLoader.getMonitoringProperties().getJvmInfo().getEnable());
        // 是否收集线程池指标
        heartbeatPackage.setCollectThreadPoolMetrics(
            ConfigLoader.getMonitoringProperties().getJavaThreadPoolInfo().getEnable());
        return heartbeatPackage;
    }
    

    注意心跳包不仅仅是“我还活着”这么简单——它还同步了客户端的配置信息给服务端。服务端收到心跳包后,会更新该实例的连接频率、Arthas 状态、JVM 和线程池采集开关。

    这意味着,如果你修改了客户端的心跳频率配置并重启,服务端在收到下一个心跳包时就会自动更新这个实例的 connFrequency 字段——不需要额外的“配置同步”接口。心跳包就是天然的配置同步载体

    4.2 实例ID:这颗心属于谁?

    心跳包中最关键的标识字段是 instanceId,由 InstanceGenerator 生成。这个 ID 的生成算法值得一提:

    // 能获取到主板号
    instanceId = Md5Utils.encrypt(uuid + baseboardSerialNumber + order + instanceName);
    // 不能获取到主板号
    instanceId = Md5Utils.encrypt(uuid + mac + ip + order + instanceName);
    

    使用 UUID + 硬件信息 + 实例次序 + 实例名称的组合做 MD5,保证同一台机器上同一个应用的不同实例(通过 order 区分)有不同的 ID,而重启后 ID 保持不变(因为生成后会写入本地文件,下次启动优先从文件读取)。

    这个设计解决了一个现实问题:应用重启后,服务端能识别出“这还是那个实例”,而不是当作一个新实例来注册。


    五、服务端:收到心跳后做了什么?

    心跳包通过 WebSocket 抵达服务端后,经过解密、解压,被 WebSocketMessageDispatcher 根据 className 路由到心跳事件处理器,最终调用 HeartbeatServiceImpl.dealHeartbeatPackage()

    @Service
    public class HeartbeatServiceImpl implements IHeartbeatService {
    
        @Autowired
        private IInstanceService instanceService;
    
        @Override
        public Result dealHeartbeatPackage(HeartbeatPackage heartbeatPackage) {
            // 把应用实例添加或者更新到数据库
            this.instanceService.operateMonitorInstance(heartbeatPackage);
            return Result.builder().isSuccess(true).msg(ResultMsgConstants.SUCCESS).build();
        }
    }
    

    简洁到只有一行核心逻辑。真正的“脏活”交给了 InstanceServiceImpl.operateMonitorInstance()

    @Retryable
    @Transactional(rollbackFor = Throwable.class)
    public void operateMonitorInstance(HeartbeatPackage heartbeatPackage) {
        String instanceId = heartbeatPackage.getInstanceId();
        Date currentTime = new Date();
        // 查询数据库中有没有当前实例
        int selectCountDb = this.count(
            new LambdaQueryWrapper<MonitorInstance>()
                .eq(MonitorInstance::getInstanceId, instanceId));
    
        MonitorInstance entity = new MonitorInstance();
        entity.setInstanceId(instanceId);
        entity.setInstanceName(heartbeatPackage.getInstanceName());
        entity.setIp(heartbeatPackage.getIp());
        entity.setConnFrequency((int) heartbeatPackage.getRate());
        entity.setIsEnableArthas(heartbeatPackage.isEnableArthas());
        // ...填充其他字段...
        entity.setIsOfflineNotice(ZeroOrOneConstants.ZERO);
    
        if (selectCountDb == 0) {
            // 首次心跳 → 新增记录
            entity.setInsertTime(currentTime);
            entity.setOfflineCount(0);
            entity.setIsEnableMonitor(ZeroOrOneConstants.ONE);
            entity.setIsEnableAlarm(ZeroOrOneConstants.ONE);
            this.save(entity);
        } else {
            // 后续心跳 → 更新记录(关键:更新 updateTime)
            entity.setUpdateTime(currentTime);
            this.update(entity, new LambdaUpdateWrapper<MonitorInstance>()
                .eq(MonitorInstance::getInstanceId, instanceId));
        }
    }
    

    这里有几个细节值得注意:

    首次心跳 = 自动注册。客户端不需要提前在服务端“注册”自己——第一个心跳包到达时,服务端会自动在 MONITOR_INSTANCE 表中创建一条记录,并默认开启监控和告警。这就是 Phoenix 的“零配置接入”体验。

    updateTime 是存活的证据。每次心跳包到达,最关键的操作就是把 updateTime 更新为当前时间。后续的离线判定,就是拿这个时间和当前时间做比较。

    @Retryable 注解。数据库操作加了重试机制——如果遇到瞬时的数据库锁冲突或连接抖动,会自动重试,避免因一次偶发异常就丢失心跳。

    isOfflineNotice 重置。每次收到心跳,都会把 isOfflineNotice 重置为“0”(未通知离线)。这是为后续的在线/离线判定做准备——下一节会详细讲。


    六、离线判定:服务端的“审判官”

    心跳包解决了“上报存活状态”的问题,但谁来判定一个实例是否已经“死了”? 答案是 InstanceMonitorJob——一个基于 Quartz 的定时任务,定期扫描 MONITOR_INSTANCE 表中的所有实例。

    核心判定逻辑如下:

    // 允许的误差时间 = 心跳频率 × 容忍倍数
    int thresholdSecond = monitorInstance.getConnFrequency() 
        * this.monitoringConfigPropertiesLoader.getMonitoringProperties().getThreshold();
    // 最后一次心跳更新时间
    Date dateTime = monitorInstance.getUpdateTime() == null 
        ? monitorInstance.getInsertTime() : monitorInstance.getUpdateTime();
    // 判决时间 = 最后心跳时间 + 允许误差 + 30秒额外宽限
    DateTime judgeDateTime = new DateTime(dateTime)
        .plusSeconds(thresholdSecond).plusSeconds(30);
    // 判决
    if (judgeDateTime.isBeforeNow()) {
        // 离线!
        this.offLine(monitorInstance, false);
    } else {
        // 在线
        this.onLine(monitorInstance);
    }
    

    这个判定公式值得细品:

    判决时间 = 最后心跳时间 + 心跳频率 × 容忍倍数 + 30 秒

    假设心跳频率是 30 秒,容忍倍数是 3(可在服务端配置),那么判决时间就是最后心跳后的 30×3+30=120 秒。也就是说,一个实例需要连续 4 个心跳周期(约 2 分钟)没有消息,才会被判定为离线。

    为什么要这么“宽容”?因为现实中网络不是理想状态——偶尔的丢包、GC 停顿、网络抖动都可能导致一两个心跳包丢失。如果只要一次心跳超时就判定离线,告警系统会被“狼来了”淹没。额外的 30 秒宽限则是为了应对时钟不同步和调度延迟。

    6.1 离线告警

    当一个实例被判定为离线时,InstanceMonitorJob 会做两件事:

    1. 发送告警:构造一个 FATAL 级别的告警包,包含应用ID、名称、IP、环境、分组等详细信息,通过邮件/钉钉/企业微信/飞书推送给相关人员。
    2. 更新数据库:将 isOnline 设为“0”(离线),offlineCount 加 1。

    反过来,当一个之前离线的实例重新发来心跳,onLine() 方法会发送一条 INFO 级别的“应用程序上线”通知——让运维人员知道“它又回来了”。如果是一个全新的实例(isOnline 字段为空),还会发送“发现新应用程序”的通知。

    6.2 主动下线 vs 被动超时

    Phoenix 的离线判定有两条路径:

    路径 触发方式 场景
    被动超时 InstanceMonitorJob 定时扫描发现心跳超时 进程崩溃、网络断开、kill -9
    主动下线 客户端 Shutdown Hook 发送 OfflinePackage 正常停止、优雅关闭

    主动下线的好处是即时性——服务端不需要等待心跳超时周期,收到下线包后立刻将实例标记为离线。isOfflineNotice 字段正是用来区分这两种情况的:收到主动下线通知的实例,即使最后心跳时间还在判决时间范围内,也不会被 onLine() 方法“复活”。


    七、心跳的完整生命周期

    把以上所有环节串起来,一个心跳包从出生到“发挥作用”的完整旅程如下:

    客户端                                    服务端
      │                                        │
      │  T=35s  HeartbeatThread 首次触发         │
      │  ├── isReady()? ✅                      │
      │  ├── structureHeartbeatPackage()        │
      │  │   ├── 填充实例信息(ID/IP/名称...)     │
      │  │   ├── 填充心跳频率                    │
      │  │   └── 填充开关标志                    │
      │  ├── 封装为 WebSocketPackage             │
      │  └── DataExchanger.sendMessage()        │
      │         │                              │
      │         │  ──加密──压缩──WebSocket──▶    │
      │         │                              │
      │                        WebSocketMessageDispatcher
      │                        ├── className 路由
      │                        └── 发布 HeartbeatEvent
      │                              │
      │                        HeartbeatServiceImpl
      │                        └── operateMonitorInstance()
      │                              ├── 首次? → INSERT
      │                              └── 非首次? → UPDATE(updateTime)
      │                                        │
      │  T=65s  HeartbeatThread 再次触发         │
      │  └── 重复上述流程                        │
      │                                        │
      │                        InstanceMonitorJob(Quartz)
      │                        ├── 扫描所有实例
      │                        ├── 计算判决时间
      │                        ├── 超时? → 离线告警
      │                        └── 正常? → 保持/恢复在线
    

    八、设计亮点总结

    回顾整个心跳机制,有几个值得借鉴的设计思想:

    1. 心跳不止是“活着”的信号。 Phoenix 的心跳包同时携带了客户端的配置信息(频率、开关状态),让心跳成为一个天然的“配置同步通道”。这种“搭便车”的设计减少了额外的同步接口。

    2. 宽容的就绪检查。 DataExchanger.isReady() 返回 false 时,心跳线程不报错、不阻塞、不打扰——安静地等下一轮。这种设计避免了启动阶段的日志风暴,也让各组件之间的依赖关系变得更松散。

    3. 多层次的容错。 从客户端到服务端,每一层都有容错机制:客户端有 isReady() 检查和异常捕获;服务端有 @Retryable 重试和事务回滚;离线判定有容忍倍数和 30 秒宽限。层层兜底,不让单点故障导致整体失效。

    4. 主动下线 + 被动超时双保险。 正常关闭时通过 Shutdown Hook 主动通知,异常退出时靠心跳超时被动发现。两条路径覆盖了所有场景,确保没有实例能“悄无声息地消失”。

    5. scheduleWithFixedDelay 防止积压。 选择“固定延迟”而非“固定速率”,从根本上杜绝了网络抖动导致的任务堆积问题。对于涉及 IO 的周期性任务,这几乎是唯一正确的选择。


    九、小结

    心跳机制是 Phoenix 监控平台最基础也是最关键的组件。HeartbeatTaskScheduler 虽然只有一个方法,但它背后串联起了线程池管理、数据包构造、WebSocket 通信、服务端持久化、Quartz 定时扫描、告警触发等多个子系统。理解了心跳,你就理解了 Phoenix 客户端与服务端之间那根最细却最重要的“生命线”。

    下一篇,我们将从心跳上移到更“重”的采集任务——JVM 信息采集。内存、线程、GC、类加载,Phoenix 是如何在不侵入业务代码的前提下,把 JVM 的“体检报告”定期送到服务端的?敬请期待。


    项目地址
    https://gitee.com/pifeng/phoenix
    https://gitee.com/monitoring-platform/phoenix
    https://github.com/709343767/phoenix

    欢迎关注微信公众号获取更多技术干货 微信公众号·披锋斩棘

    end
  1. 作者: 锋哥 (联系作者)
  2. 发表时间: 2026-04-03 11:51
  3. 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  4. 转载声明:如果是转载博主转载的文章,请附上原文链接
  5. 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  6. 评论

    站长头像 知录

    你一句春不晚,我就到了真江南!

    文章0
    浏览0

    文章分类

    标签云