news 2026/4/17 4:50:08

供应链计划系统架构实战(七):轻量级分布式计算框架设计与实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
供应链计划系统架构实战(七):轻量级分布式计算框架设计与实现

1、框架设计逻辑

核心组件

1、服务注册与发现(Redis)

  • 使用Redis作为服务注册中心
  • 以服务名称ApplicationName为key存储节点集群
  • 基于时间戳的心跳机制(10秒间隔)

2、任务调度系统

  • 数据库作为任务持久化存储
  • 守护线程轮询获取新任务
  • 基于负载的调度算法(选择负载最小节点)

3、双守护线程模型

  • 节点监控守护线程:维护节点健康状态
  • 任务发现守护线程:分配计算任务

具体简单时序图如下图所示

2、核心代码实现

2.1、框架核心实现

2.1.1、监听Spring应用启动事件

  • 事件驱动:利用 Spring 应用启动事件,在合适时机启动监控功能
  • 条件控制:通过配置控制功能是否启用,提高灵活性
  • 功能整合:同时启动监控线程和执行类型注册,完成进程监控的初始化

1、启动守护线程

    • ProcessDaemonServiceImpl 实现了 ApplicationListener 接口,监听 Spring 应用启动事件;
    • 在应用启动完成后启动守护线程,监控节点存活状态和进程状态;

2、注册中心注册

    • 获取应用上下文:从事件中获取 ApplicationContext
    • 执行注册服务:获取 ProcessTypeRegisterService 并调用 doRegister()
@Slf4j @Component public class ProcessDaemonServiceImpl implements ApplicationListener<ApplicationStartedEvent> { @Autowired ProcessProperties processProperties; @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { if(processProperties.isEnable() && processProperties.getBusinessKeys().size()!=0){ startDaemonThread(); ApplicationContext applicationContext = applicationStartedEvent.getApplicationContext(); applicationContext.getBean(ProcessTypeRegisterService.class).doRegister(); } } // 启动守护线程 线程优先级设置为10(最高优先级) private void startDaemonThread() { Thread daemonThread2 = new Thread(processUtilServiceImpl.nodeAliveWatcher, "nodeAliveWatcher"); daemonThread2.setDaemon(true); daemonThread2.setPriority(10); // 启动线程 daemonThread2.start(); log.info("{},守护线程启动",daemonThread2.getName()); Thread daemonThread = new Thread(processUtilServiceImpl.processStatusWatcher, "processStatusWatcher"); // 设置为守护线程 daemonThread.setDaemon(true); daemonThread.setPriority(10); // 启动线程 daemonThread.start(); log.info("{},守护线程启动",daemonThread.getName()); } }

2.1.2、监控器

2.1.2.1、节点保活监控器

无限循环运行的守护线程,负责监控节点的状态信息,分布式锁:使用 ALL_NODE_PROCESS_LOCK_KEY 确保集群中只有一个节点执行监控

策略:

    • 定期更新:每 8 秒更新一次节点状态
    • 分布式协调:通过分布式锁确保集群节点状态的一致性
    • 负载信息维护:更新当前节点的负载信息

重启检测

    • 重启标识:初始化时设置 isRestarted 为 true
    • 状态同步:向集群其他节点通知当前节点重启状态
@Slf4j @Component public class ProcessUtilServiceImpl implements ProcessUtilService { /*** * 节点保活监视器 **/ public final Runnable nodeAliveWatcher = () -> { StatusDTO statusDTO = new StatusDTO(); statusDTO.setIsRestarted(true); statusDTO.setWeight(null); while (true) { try { ThreadSleepUtil.parkSeconds(8); String timeSlot = MyDateUtils.getTimeSlot(); ALL_NODE_PROCESS_LOCK_KEY = String.format(ALL_NODE_PROCESS_LOCK_KEY, applicationName,timeSlot); redissonDistributeLock.dealWithLock(ALL_NODE_PROCESS_LOCK_KEY, null, nodeProcessLoadServiceImpl.updateThisNodeInfoFunc, (param) -> { log.warn("节点保活监视器无法正常获取锁,无法更新节点状态"); return null; }, statusDTO); } catch (Exception e) { log.error("节点保活监视器异常"); log.error(e.getMessage(), e); } } }; }

节点状态

@Data public class StatusDTO { private Random random = new Random(); private Boolean isRestarted = true; Long weight ; }

节点状态更新机制

基于redis缓存去更新

  • 维护节点状态:更新当前节点的存活状态和负载信息
  • 权重管理:根据 StatusDTO 中的权重值调整节点负载
  • 节点清理:移除长时间未更新的节点信息
public Function<StatusDTO, Void> updateThisNodeInfoFunc = (statusDTO) -> { try { Long dealingWeight = statusDTO.getWeight(); RMap<String, NodeProcessStastic> nodeDatas = redissonClient.getMap(ALL_NODE_PROCESS_KEY); String localHostIp = IpAddressUtil.getHostIp(); NodeProcessStastic nodeProcessStastic = nodeDatas.getOrDefault(localHostIp, new NodeProcessStastic()); nodeProcessStastic.setTimestamp(DateUtil.now()); nodeProcessStastic.setIpAddress(localHostIp.replaceAll("\\.", "-")); nodeProcessStastic.setSupportBusinessKeys(processProperties.getBusinessKeys()); if (dealingWeight != null) { if ((dealingWeight < 0 && nodeProcessStastic.getDealingProcessWeightSum()>0)|| dealingWeight > 0) { log.info("该机器:{},增加权重得分:{}", localHostIp, dealingWeight); nodeProcessStastic.setDealingProcessWeightSum(nodeProcessStastic.getDealingProcessWeightSum() + dealingWeight); nodeProcessStastic.setLastWeightChangeTimestamp(DateUtil.now()); } }else{ if(statusDTO.getIsRestarted()){ nodeProcessStastic.setDealingProcessWeightSum(0); nodeProcessStastic.setLastWeightChangeTimestamp(null); } } nodeDatas.put(localHostIp, nodeProcessStastic); // 移除可能已经重启了的pod int oriSize = nodeDatas.size(); nodeDatas.values().removeIf(ele -> { long btwTime = DateUtil.between(DateUtil.parse(ele.g
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 15:50:40

cuda安装完成后测试样例:PyTorch-CUDA-v2.8自带验证脚本

PyTorch-CUDA-v2.8 镜像环境下的 GPU 验证实践 在深度学习项目启动的最初几分钟里&#xff0c;最令人沮丧的莫过于——明明装好了 CUDA 和 PyTorch&#xff0c;运行代码时却提示“CUDA not available”。这种看似配置完成、实则无法调用 GPU 的情况&#xff0c;在科研和工程实…

作者头像 李华
网站建设 2026/4/16 13:51:14

数字孪生是指什么?

数字孪生是指什么&#xff1f;数字孪生&#xff08;Digital Twin&#xff09;是指在虚拟空间中构建与物理实体或系统完全对应的动态数字镜像&#xff0c;通过实时数据采集、仿真分析和智能决策技术&#xff0c;打造虚实联动的监控、预测与优化闭环&#xff0c;其核心在于实时映…

作者头像 李华
网站建设 2026/4/18 0:34:43

diskinfo下载官网太慢?PyTorch-CUDA镜像已集成硬件监控工具

PyTorch-CUDA镜像已集成硬件监控工具&#xff1a;告别diskinfo下载慢的困扰 在深度学习项目开发中&#xff0c;最让人抓狂的往往不是模型调参&#xff0c;而是环境搭建阶段的各种“卡顿”——pip install torch 卡在 10%&#xff0c;CUDA 安装报错 libcudart.so 找不到&#xf…

作者头像 李华
网站建设 2026/4/15 19:48:19

华为云国际站代理商EDCM主要有什么作用呢?

华为云国际站代理商视角下&#xff0c;EDCM&#xff08;Edge Data Center Management&#xff0c;边缘数据中心管理&#xff09;是面向中小 / 边缘数据中心的云端统一监控运维系统&#xff0c;核心作用是集中远程管边缘、降本提效、合规留痕、赋能客户与伙伴增收&#xff0c;适…

作者头像 李华
网站建设 2026/4/18 2:33:42

PyTorch知识蒸馏实战:在CUDA-v2.8中训练小型化模型

PyTorch知识蒸馏实战&#xff1a;在CUDA-v2.8中训练小型化模型引言 技术背景 随着人工智能技术的快速发展&#xff0c;深度学习模型在计算机视觉、自然语言处理等领域的应用日益广泛。然而&#xff0c;大型神经网络虽然具备强大的表达能力&#xff0c;但也带来了高计算成本、高…

作者头像 李华
网站建设 2026/4/10 23:30:51

【思维模型】设计思维 ② ( 设计思维 有利于创新 | 创新形式 - 产品创新、技术创新、市场创新、资源配置创新、组织创新 | 同理心 | 观测法 | 采访法 | 体验法 )

文章目录一、设计思维 有利于创新1、传统问题、设计思维 解决方案2、创新形式 - 产品创新、技术创新、市场创新、资源配置创新、组织创新二、设计思维 步骤 - 同理心、定义、创想、原型制作、测试1、同理心① 观测法 - APOEM 工具② 采访法 - 5w1h 工具③ 体验法 - 共情工具一、…

作者头像 李华