news 2026/2/6 2:32:06

dubbo源码之一次RPC请求的生死之旅(基于Dubbo 2.7.8)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
dubbo源码之一次RPC请求的生死之旅(基于Dubbo 2.7.8)

1. 全景图:从宏观到微观

在钻入代码之前,我们需要先在脑海中建立一张全景图。一次同步的 RPC 调用,大致可以分为三个阶段:

  1. 消费端(Consumer):动态代理 -> 负载均衡 -> 封装请求 -> 编码发送 -> 同步阻塞等待。

  2. 服务端(Provider):解码接收 -> 线程池派发 -> 过滤器链 -> 反射调用 -> 封装响应 -> 编码发送。

  3. 消费端(Consumer):接收响应 -> 唤醒等待线程 -> 提取结果。

本文将略过配置加载和服务发现细节,通过核心链路代码将上述过程串联起来。

2. 第一阶段:消费端发起请求(Consumer)

当我们代码中执行 demoService.sayHello("world") 时,实际上是在调用 Dubbo 生成的代理对象。

2.1 动态代理入口

Dubbo 默认使用 Javassist 生成代理。所有的方法调用都会被转发到 InvokerInvocationHandler。

源码位置:org.apache.dubbo.rpc.proxy.InvokerInvocationHandler

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // ... 省略 Object 类方法的处理 // 将参数封装为 RpcInvocation RpcInvocation invocation = new RpcInvocation(method, serviceModel, args); invocation.setTargetServiceUniqueName(invoker.getUrl().getServiceKey()); // invoker 是一层层包装的,这里开始进入链式调用 return invoker.invoke(invocation).recreate(); }

2.2 集群容错与负载均衡 (Cluster)

这里的 invoker 对象通常是 MockClusterInvoker 包装下的 FailoverClusterInvoker(默认故障转移策略)。

源码位置:org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) { // 1. 获取重试次数,默认 retries="2" (共调3次) int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; for (int i = 0; i < len; i++) { // 2. 负载均衡选择一个 Invoker (例如 DubboInvoker) Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); try { // 3. 执行调用 Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { // 发生异常,循环继续,即“重试” } } }

2.3 过滤器链 (Filter Chain)

在选定具体的 DubboInvoker 之前,请求会经过一系列 Filter(如 ConsumerContextFilter、MonitorFilter)。这是通过 ProtocolFilterWrapper 构建的责任链模式。

2.4 协议层发送 (Protocol)

请求最终到达 DubboInvoker,这里是 Dubbo 协议的核心。

源码位置:org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker

protected Result doInvoke(final Invocation invocation) throws Throwable { // 获取 ExchangeClient (封装了 Netty Client) ExchangeClient currentClient = clients[index.getAndIncrement() % clients.length]; // 区分单向调用、异步调用、同步调用 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); if (isOneway) { // 单向调用,只发不回 currentClient.send(inv, getUrl().getMethodParameter(methodName, SENT_KEY, false)); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else if (isAsync) { // 异步调用 ResponseFuture future = currentClient.request(inv, timeout); // ... 返回 Future } else { // 【核心重点】同步调用 (默认) RpcContext.getContext().setFuture(null); // 发送请求,获得 DefaultFuture return (Result) currentClient.request(inv, timeout).get(); } }

2.5 交换层与同步等待 (Exchange)

currentClient.request 会调用 HeaderExchangeChannel.request。这是实现“同步转异步”的关键。

源码位置:org.apache.dubbo.remoting.exchange.support.DefaultFuture

// 发送请求 public ResponseFuture request(Object request, int timeout) throws RemotingException { // 1. 创建请求对象,自动生成全局唯一 Request ID Request req = new Request(); req.setData(request); // 2. 创建 DefaultFuture,映射关系:Request ID -> Future DefaultFuture future = newDefaultFuture(channel, req, timeout); // 3. 通过 Netty 发送数据 channel.send(req); return future; }

紧接着,DubboInvoker 调用了 future.get(),线程在此阻塞

// DefaultFuture.java public Object get(int timeout) throws RemotingException { // 使用 Condition.await 进行阻塞,等待服务端响应唤醒 if (!done) { long start = System.currentTimeMillis(); lock.lock(); try { while (!done) { done.await(timeout, TimeUnit.MILLISECONDS); // 超时检查逻辑... } } finally { lock.unlock(); } } return returnFromResponse(); }

3. 第二阶段:服务端处理请求(Provider)

网络报文经过 TCP 传输到达服务端,Netty 接收到字节流。

3.1 线程派发 (Thread Model)

Netty 的 IO 线程(Worker Group)负责解码,解码后的消息会经过 AllChannelHandler(默认策略),将请求派发到 Dubbo 的业务线程池中去执行,避免阻塞 IO 线程。

源码位置:org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler

public void received(Channel channel, Object message) throws RemotingException { // 获取业务线程池 ExecutorService executor = getExecutorService(); try { // 将请求包装成 ChannelEventRunnable 丢给线程池执行 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { // 线程池满的拒绝策略(报错) } }

3.2 交换层处理 (Exchange)

业务线程拿到请求后,层层传递,到达 HeaderExchangeHandler.received。它区分这是请求(Request)还是响应(Response)。

源码位置:org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler

public void received(Channel channel, Object message) { if (message instanceof Request) { handleRequest(channel, (Request) message); } else if (message instanceof Response) { handleResponse(channel, (Response) message); } } void handleRequest(Channel channel, Request req) { Response res = new Response(req.getId()); // 继续调用后续 Handler (DubboProtocol) Object result = handler.reply(channel, req.getData()); res.setResult(result); // 发送响应回客户端 channel.send(res); }

3.3 协议层与反射调用 (Protocol)

handler.reply 最终会调用到 DubboProtocol.requestHandler。在这里,根据 ServiceKey 找到服务端暴露的 Exporter。

源码位置:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

// 匿名内部类 requestHandler public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) { Invocation inv = (Invocation) message; // 查找 Exporter Invoker<?> invoker = getInvoker(channel, inv); // 执行调用链 (Filter -> 实现类) return invoker.invoke(inv); }

最终,JavassistProxyFactory 生成的 Wrapper 类会直接通过方法名调用你写的 ServiceImpl 代码。

4. 第三阶段:响应返回与唤醒(Consumer)

服务端 channel.send(res) 将结果发回给消费端。

4.1 响应接收

消费端的 Netty IO 线程收到响应报文,同样经过解码,最终到达 HeaderExchangeHandler.received。这次走的是 handleResponse 分支。

static void handleResponse(Channel channel, Response response) { // 核心:调用 DefaultFuture.received DefaultFuture.received(channel, response); }

4.2 唤醒线程

Dubbo 怎么知道这个响应对应哪个请求?Request ID

源码位置:org.apache.dubbo.remoting.exchange.support.DefaultFuture

public static void received(Channel channel, Response response) { // 1. 根据 Response 中的 ID 从 Map 中移除并获取对应的 Future DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { // 2. 触发唤醒逻辑 future.doReceived(response); } } private void doReceived(Response res) { lock.lock(); try { response = res; done = condition.signal(); // 唤醒之前阻塞在 get() 方法的线程 } finally { lock.unlock(); } }

4.3 结果返回

被唤醒的消费端线程从 get() 方法中苏醒,拿到 response.getResult(),经过动态代理层层返回,最终你的 demoService.sayHello 拿到了返回值。

5. 架构师总结

回顾整个流程,Dubbo 2.7.8 的核心设计精髓在于:

  1. 分层架构:每一层(Proxy, Cluster, Protocol, Exchange, Transport)职责清晰,互不干扰。

  2. 异步转同步:利用 DefaultFuture 和 Request ID 机制,在 Netty 异步通讯的基础上实现了对上层业务的同步阻塞假象,降低了开发复杂度。

  3. 线程模型:IO 线程与业务线程分离(AllDispatcher),保证了高并发下 Netty IO 的吞吐量,防止业务逻辑阻塞网络读写。

面试与调优建议:

  • 超时问题:往往发生在 DefaultFuture.get() 等不到 signal,或者服务端线程池满导致无法及时处理 Request。

  • 线程池满:重点关注 Provider 端的 AllChannelHandler 派发逻辑。

  • 序列化:发生在 Netty 的 Codec 阶段,是 CPU 密集型操作。

欢迎关注、一起交流、一起进步~

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/29 20:11:33

区块链相关知识

一、区块链的简介 区块链可视为一种特殊的分布式数据库。 首先,区块链的主要作用是存储信息,任何需要保存的信息,都可以写入区块链,也可以从中读取信息,所以视它为数据库。 其次,任何人都可以架设服务器,加入区块链网络,成为一个节点。区块链的世界中没有中心节点,…

作者头像 李华
网站建设 2026/2/3 10:25:34

【完整源码+数据集+部署教程】食品分类2检测系统源码分享[一条龙教学YOLOV8标注好的数据集一键训练_70+全套改进创新点发刊_Web前端展示]

一、背景意义 随着全球人口的不断增长和生活水平的提高&#xff0c;食品安全与营养健康问题日益受到关注。食品种类繁多&#xff0c;消费者在选择食品时不仅关注其营养成分&#xff0c;还对食品的来源、品质和安全性提出了更高的要求。在此背景下&#xff0c;食品分类与检测技术…

作者头像 李华
网站建设 2026/1/30 5:18:41

【英飞凌 CY8CKIT-062S2-AI评测】-开发环境搭建与开发

过21IC网&#xff0c;申请到了英飞凌 CY8CKIT-062S2-AI开发板&#xff0c;该开发板是英飞凌的PSOC6系列的人工智能评估套件&#xff0c;它有一套创新工具用来原型制作和收集真实数据&#xff0c;以快速构建机器学习模型。硬件尺寸很小巧35mm*45mm&#xff0c;基于它可以建构边缘…

作者头像 李华
网站建设 2026/1/30 0:27:47

基于SpringBoot2+Vue2的企业合作与活动管理平台

企业合作与活动管理平台 演示视频 https://www.bilibili.com/video/BV1E4qpB9E8b/ 角色 管理员、普通用户、企业用户 技术 后端&#xff1a;Spring Boot 2、MySQL 前端&#xff1a;Vue.js 核心功能 本系统是一个企业合作与活动管理平台&#xff0c;旨在连接企业和普通用…

作者头像 李华
网站建设 2026/1/30 16:21:28

稀土抑烟剂在PVC材料中的防火与抑烟作用

PVC&#xff08;聚氯乙烯&#xff09;因耐用、易加工、成本低&#xff0c;被广泛应用于建筑管材、电线护套、地板和卷材等领域。但在火灾条件下&#xff0c;PVC燃烧容易产生大量烟雾和刺激性气体&#xff0c;不仅影响逃生&#xff0c;也增加了火灾危害。一、什么是稀土抑烟剂&a…

作者头像 李华
网站建设 2026/2/4 4:09:26

别让AI抢了你的饭碗:学会让它替你打工,才是未来的生存法则

朋友们&#xff0c;你有没有想过&#xff0c;未来的世界可能会被简单地分成两种人&#xff1f;一种是让AI替自己干活的人&#xff0c;另一种是活被AI抢走的人。这句话听起来有点残酷&#xff0c;但趋势已经摆在我们眼前。从写报告、做设计&#xff0c;到分析数据、客服应答&…

作者头像 李华