news 2026/3/16 4:31:53

traceId 传递-MQ

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
traceId 传递-MQ

mq 发送消息

private <T extends BaseEvent> MessageBuilder<T> toMessageBuilder(final T event) { if (StringUtils.isBlank(event.keys())) { throw new RuntimeException("keys是必填项"); } // 获取tag,默认使用类名 String tags = StringUtils.defaultString(event.tags(), event.getClass().getSimpleName()); // 构建消息 MessageBuilder<T> messageBuilder = MessageBuilder.withPayload(event) .setHeader(RocketMQHeaders.TAGS, tags) .setHeader(RocketMQHeaders.KEYS, event.keys()); String traceId = MDC.get(Constants.MDC_TRACE_ID); if (StringUtils.isNotBlank(traceId)) { messageBuilder.setHeader(RocketMQConsts.Header.TRACE_ID, traceId); } String env = RequestThread.getValue(Constants.ENV); if (StringUtils.isNotBlank(env)) { messageBuilder.setHeader(RocketMQConsts.Header.ENV, env); } String desc = event.desc(); if (StringUtils.isNotBlank(desc)) { messageBuilder.setHeader(RocketMQConsts.Header.DESC, desc); } String producerApplicationName = environment.getProperty(Constants.SPRING_APPLICATION_NAME, DEFAULT_PRODUCER); messageBuilder.setHeader(RocketMQConsts.Header.PRODUCER, producerApplicationName); return messageBuilder; }

mq消费

public <T extends BaseEvent> void process(final String key, final Message<T> message, final Consumer<T> function) { String cacheKey = RedisKeyUtil.generate(REDIS_REPEAT_PREFIX_KEY, key); KeyInfo keyInfo = KeyInfo.builder() .prefix(REDIS_REPEAT_PREFIX_KEY) .keys(new String[]{key}) .waitTime(3) .timeUnit(TimeUnit.SECONDS) .build(); // 获取消息体 T event = message.getPayload(); try { // 获取链路追踪id String traceId = getHeaderValue(message, RocketMQConsts.Header.TRACE_ID); MDC.put(Constants.MDC_TRACE_ID, StringUtils.isBlank(traceId) ? key : traceId); // 设置调度tag环境 String env = getHeaderValue(message, RocketMQConsts.Header.ENV); RequestThread.addParam(Constants.ENV, env); //对相同key的消息加锁,保证相同的消息是串行消费 lockService.lock(keyInfo, () -> { //判断该消息是否已处理 if (redisCache.exists(cacheKey)) { log.info("[MQ消息-已处理]--[{}]:{}", key, ReflectUtils.processObject(event)); return null; } log.info("[MQ消息-开始处理]--[{}]:{}", key, ReflectUtils.processObject(event)); function.accept(event); log.info("[MQ消息-处理完成]--{}", key); //缓存处理结果一小时,一个小时内不重复消费 redisCache.setCacheObject(cacheKey, DateUtil.now(), 60, TimeUnit.MINUTES); return null; }); } catch (LockException e) { //加锁失败,说明有线程正在消费 log.warn("[MQ消息-正在处理]--[{}]:{}", key, ReflectUtils.processObject(event)); throw e; } catch (Exception e) { log.error("[MQ消息-处理异常]--[{}]:{}", key, event, e); throw e; } finally { MDC.clear(); RequestThread.clear(); } }

问题

为什么mq消费可以从mac中获取traceId 传递给消息请求头

这个代码展示了TraceId 如何从 HTTP 请求上下文传播到 MQ 消息的完整链路。让我详细解释为什么这里可以从 MDC 获取到 traceId:

🔄 完整的传播链路

🎯 为什么可以从 MDC 获取 traceId?

关键:在同一个线程内!

java

// 时间线分析(假设单线程同步处理): 1. HTTP请求进入 → HeaderFilter.doFilterInternal() ↓ 2. HeaderFilter设置MDC: String traceId = request.getHeader("traceId"); MDC.put("traceId", traceId); // 存入当前线程的ThreadLocal ↓ 3. 调用Controller → Service → MQ发送方法 ↓ 4. MQ发送方法执行: String traceId = MDC.get("traceId"); // 从同一个线程的ThreadLocal获取! ↓ 5. HeaderFilter清理: finally { MDC.clear(); } // 清理,但已经用过了

线程状态可视化

java

public class ThreadStateDemo { public void processOrder() { // 时间点1:HeaderFilter执行前 // 线程:http-nio-8080-exec-1 // MDC状态:{} // HeaderFilter执行 MDC.put("traceId", "TRACE-123"); // MDC状态:{"traceId": "TRACE-123"} // 时间点2:Controller执行 // 仍然是同一个线程:http-nio-8080-exec-1 String traceId1 = MDC.get("traceId"); // "TRACE-123" // 时间点3:Service执行,调用MQ发送 // 仍然是同一个线程:http-nio-8080-exec-1 sendMQEvent(); // sendMQEvent内部: String traceId2 = MDC.get("traceId"); // "TRACE-123" ✅ // 时间点4:HeaderFilter finally清理 MDC.clear(); // MDC状态:{} } private void sendMQEvent() { // 关键:这里和调用者在同一个线程! System.out.println("当前线程: " + Thread.currentThread().getName()); // 输出:http-nio-8080-exec-1 String traceId = MDC.get("traceId"); System.out.println("获取traceId: " + traceId); // "TRACE-123" } }

📊 完整的上下文传播表

阶段组件上下文存储位置如何获取是否同一线程
网关接收TraceGlobalFilter请求Headerrequest.getHeader()N/A
网关传递TraceGlobalFilter请求Headerheader.set()网关线程
业务服务接收HeaderFilter请求Headerrequest.getHeader()业务线程
业务服务存储HeaderFilterMDC (ThreadLocal)MDC.put()业务线程
Controller使用Controller方法MDCMDC.get()同一业务线程
Service使用Service方法MDCMDC.get()同一业务线程
MQ发送MQ发送器MDCMDC.get()同一业务线程
MQ消息头RocketMQ消息Message HeadersetHeader()同一业务线程

🔧 代码执行流程详解

1. 请求进入(设置MDC)

java

@Order(Ordered.HIGHEST_PRECEDENCE) public class HeaderFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, ...) { try { // 从HTTP header获取traceId String traceId = request.getHeader(Constants.MDC_TRACE_ID); // 设置到当前线程的MDC MDC.put(Constants.MDC_TRACE_ID, StringUtils.defaultString(traceId, String.valueOf(System.currentTimeMillis()))); // 继续处理请求 chain.doFilter(request, response); // → 进入Controller } finally { MDC.clear(); // 请求处理完后清理 } } }

2. 业务处理(使用MDC)

java

@Service public class OrderService { @Autowired private EventPublisher eventPublisher; public void createOrder(OrderRequest request) { // 这里可以直接使用MDC中的traceId String currentTraceId = MDC.get(Constants.MDC_TRACE_ID); log.info("创建订单,traceId: {}", currentTraceId); // 执行业务逻辑... Order order = saveOrder(request); // 发送MQ事件 OrderCreatedEvent event = new OrderCreatedEvent(order); eventPublisher.publish(event); // → 进入MQ发送代码 } }

3. MQ发送(从MDC获取)

java

public class EventPublisher { private <T extends BaseEvent> MessageBuilder<T> toMessageBuilder(final T event) { // 关键:这里和Service方法在同一个线程 // 所以可以获取到MDC中的traceId String traceId = MDC.get(Constants.MDC_TRACE_ID); // ✅ 能获取到! if (StringUtils.isNotBlank(traceId)) { // 设置到MQ消息头 messageBuilder.setHeader(RocketMQConsts.Header.TRACE_ID, traceId); } // 同样获取其他上下文 String env = RequestThread.getValue(Constants.ENV); // ThreadLocal if (StringUtils.isNotBlank(env)) { messageBuilder.setHeader(RocketMQConsts.Header.ENV, env); } return messageBuilder; } }

📝 总结:为什么能从MDC获取traceId
核心原因:MQ发送代码与HTTP请求处理在同一个Tomcat线程内执行。

线程连续性:HTTP请求线程 → Controller → Service → MQ发送,都在同一个线程

MDC的线程绑定:MDC基于ThreadLocal,线程内共享

生命周期匹配:在HeaderFilter清理MDC之前,MQ发送已经完成

设计意图:这正是MDC的设计目的——在请求处理链路中共享上下文

关键时间线:


所以你的MQ发送代码能正确获取traceId,这正是分布式追踪系统设计精妙之处:通过线程上下文在同步处理链路上自动传递追踪信息。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/15 11:33:20

traceId 传递-线程的变化

在整个链路中&#xff0c;网关、业务服务、调用其他服务&#xff0c;异步调用、或者发送mq是一个线程吗&#x1f4ca; 线程切换详解表场景线程是否变化线程示例MDC/TraceId传递网关→业务服务✅ 变化http-nio-8080-exec-1 → http-nio-8081-exec-3通过HTTP Header自动传递业务服…

作者头像 李华
网站建设 2026/3/15 15:44:42

Linux_1217_2

umaskchattr命令 功能说明&#xff1a;改变文件属性 a&#xff1a;系统只允许在这个文件之后追加数据&#xff0c;不允许任何进程覆盖或截断这个文件 i&#xff1a;不得任意改动文件或目录。任务4-3 使用文件访问控制列表 如果希望对某个指定的用户进行单独的权限控制&#xf…

作者头像 李华
网站建设 2026/3/15 15:43:02

22、Linux系统管理:RPM包管理与内核模块操作

Linux系统管理:RPM包管理与内核模块操作 1. RPM包管理概述 RPM(Red Hat Package Manager)最初常用于Linux系统,也可在其他Unix平台编译使用。它允许用户将源代码打包成源文件和二进制文件,方便程序的跟踪和重建。同时,RPM会创建并维护一个包和文件的数据库,用于验证包…

作者头像 李华
网站建设 2026/3/15 19:44:33

ProfiNet转DeviceNet工业智能网关让老旧传感器焕发新生

一、 项目背景 华东某智能物流装备公司新建一条“重载 AGV 装配检测线”&#xff0c;要求 AGV 在 30 m 行程内实现 1 mm 重复定位&#xff0c;并在运行过程中实时检测前方障碍物&#xff0c;实现“缓行→刹停→声光报警”三级安全策略。主控器为西门子 S7-1200 PLC&#xff08;…

作者头像 李华
网站建设 2026/3/15 19:44:35

Rod性能优化终极指南:5个技巧让你的爬虫速度提升3倍

想要让你的Web自动化脚本运行效率翻倍吗&#xff1f;Rod作为一款基于DevTools Protocol的高性能驱动工具&#xff0c;通过合理的优化策略能够显著提升爬虫任务的执行速度。本文将为你揭示Rod性能优化的核心秘密&#xff0c;让你的自动化任务飞起来。 【免费下载链接】rod A Dev…

作者头像 李华
网站建设 2026/3/15 19:44:33

AlphaPose实战宝典:从零掌握多人姿态估计核心技术

想要快速上手多人姿态估计技术&#xff1f;AlphaPose作为当前最先进的实时多人姿态估计与追踪系统&#xff0c;为你提供了一站式解决方案。无论是体育训练分析、安防监控升级&#xff0c;还是虚拟现实应用&#xff0c;AlphaPose都能帮你轻松应对复杂场景下的多人姿态识别挑战。…

作者头像 李华