news 2026/7/5 13:26:35

SynchronousQueue 源码

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SynchronousQueue 源码

构造方法

publicSynchronousQueue(){this(false);// 默认非公平 TransferStack}publicSynchronousQueue(booleanfair){transferer=fair?newTransferQueue<E>():newTransferStack<E>();}

TransferStack

核心变量

volatileSNodehead;// 栈顶staticfinalUnsafeUNSAFE;staticlongheadOffset;staticfinalintREQUEST=0;// 消费者结点(take/poll)staticfinalintDATA=1;// 生产者结点(put/offer)staticfinalintFULFILLING=2;// 匹配中标记位

SNode

staticfinalclassSNode{volatileSNodehead;// 栈头,所有阻塞线程压入栈顶volatileSNodenext;// 栈下一节点volatileSNodematch;// 匹配成功的对方节点volatileThreadwaiter;// 阻塞等待的线程Objectitem;// 数据:生产者存元素,消费者为nullintmode;// 节点类型:0未定义,1生产者(DATA),2消费者(REQUEST)// Unsafe内存偏移,自旋CAS必备privatestaticfinalsun.misc.UnsafeUNSAFE;privatestaticfinallongmatchOffset;privatestaticfinallongnextOffset;privatestaticfinallongitemOffset;static{try{UNSAFE=sun.misc.Unsafe.getUnsafe();Class<?>k=SNode.class;matchOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("match"));nextOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("next"));itemOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("item"));}catch(Exceptione){thrownewError(e);}}}

casHead

// CAS修改栈头booleancasHead(SNodeh,SNodenh){returnh==head&&UNSAFE.compareAndSwapObject(this,headOffset,h,nh);}

casMatch

booleancasMatch(SNodecmp,SNodeval){returnUNSAFE.compareAndSwapObject(this,matchOffset,cmp,val);}

casNext

// CAS 操作辅助方法booleancasNext(SNodecmp,SNodeval){returncmp==next&&UNSAFE.compareAndSwapObject(this,nextOffset,cmp,val);}

tryMatch

booleantryMatch(SNodes){if(match==null&&casMatch(null,s)){Threadw=waiter;if(w!=null){waiter=null;LockSupport.unpark(w);// 唤醒对方线程}returntrue;}returnfalse;}

tryCancel

// 取消节点,标记match为自身,表示取消等待booleantryCancel(){returncasMatch(null,this);}

isCanceled

booleanisCancelled(){returnmatch==this;}

transfer

  • 生产者调用:transfer(item, true, nanos) 存入数据,阻塞等待消费者取走
  • 消费者调用:transfer(null, true, nanos) 获取数据,阻塞等待生产者存入
  • 第二个参数 timed:是否限时等待;nanos 超时时间
Etransfer(Ee,booleantimed,longnanos){SNodes=null;intmode=(e==null)?REQUEST:DATA;for(;;){SNodeh=head;// 分支1:栈空 || 栈顶和当前线程同类型(全生产者/全消费者),入栈阻塞等待if(h==null||h.mode==mode){// 超时且没时间等待,直接返回nullif(timed&&nanos<=0){if(h!=null&&h.isCancelled())casHead(h,h.next);elsereturnnull;}// 能等待:创建节点压入栈顶elseif(casHead(h,s=snode(s,e,h,mode))){// 阻塞等待匹配SNodem=awaitFulfill(s,timed,nanos);// m==自身:等待被中断/超时取消,清理栈返回nullif(m==s){clean(s);returnnull;}// 辅助弹出已完成匹配的节点,减轻后续CAS竞争if((h=head)!=null&&h.next==s)casHead(h,s.next);// 返回数据:消费者拿生产者item,生产者返回自身数据return(E)((mode==REQUEST)?m.item:s.item);}}// 分支2:栈顶是互补节点,且不在交接中,主动发起匹配elseif(!isFulfilling(h.mode)){// 栈顶节点已取消,弹出重试if(h.isCancelled())casHead(h,h.next);// 压入一个带FULFILLING标记的交接节点elseif(casHead(h,s=snode(s,e,h,FULFILLING|mode))){for(;;){SNodem=s.next;// 等待节点消失,本次匹配失败,重置循环if(m==null){casHead(s,null);s=null;break;}SNodemn=m.next;// CAS绑定匹配关系,唤醒对方线程if(m.tryMatch(s)){// 一次性弹出交接节点+等待节点casHead(s,mn);return(E)((mode==REQUEST)?m.item:s.item);}else// 匹配竞争失败,辅助断开失效节点s.casNext(m,mn);}}}// 分支3:栈顶正在交接,当前线程协助完成匹配(加速清理栈)else{SNodem=h.next;if(m==null)casHead(h,null);else{SNodemn=m.next;if(m.tryMatch(h))casHead(h,mn);elseh.casNext(m,mn);}}}}

awaitFulfill

SNodeawaitFulfill(SNodes,booleantimed,longnanos){finallongdeadline=timed?System.nanoTime()+nanos:0L;Threadw=Thread.currentThread();// 自旋次数优化:栈顶就是自己时多自旋,减少park切换intspins=(head.next==s)?1:0;for(;;){SNodem=s.match;// 已经匹配成功,返回对方节点if(m!=null)returnm;// 线程中断 / 超时,标记取消if(w.isInterrupted()||(timed&&nanos<=0)){s.tryCancel();returns;}// 自旋超过阈值,设置waiter准备parkif(spins>32)s.waiter=w;elseif((head!=s||s.match!=null)&&++spins==0)spins=32;// 休眠让出CPUif(!timed)LockSupport.park(this);elseLockSupport.parkNanos(this,nanos);if(timed)nanos=deadline-System.nanoTime();}}

snode

// 构建/复用SNode,缓存失效节点减少创建开销staticSNodesnode(SNodes,Objecte,SNodenext,intmode){if(s==null)s=newSNode(e);s.mode=mode;s.next=next;s.item=e;s.match=null;s.waiter=null;returns;}

入队方法

put

publicvoidput(Ee)throwsInterruptedException{if(e==null)thrownewNullPointerException();// transfer 参数:e!=null 代表是存放数据的生产者;不限时阻塞// e == null:表示当前是消费者if(transferer.transfer(e,false,0)==null){Thread.interrupted();thrownewInterruptedException();}}

add

publicbooleanadd(Ee){if(offer(e))returntrue;elsethrownewIllegalStateException("Queue full");}

offer(不超时)

publicbooleanoffer(Ee){if(e==null)thrownewNullPointerException();// timed=true,nanos=0:自旋一次,不park,匹配不到直接返回returntransferer.transfer(e,true,0)!=null;}

offer(超时)

publicbooleanoffer(Ee,longtimeout,TimeUnitunit)throwsInterruptedException{if(e==null)thrownewNullPointerException();longnanos=unit.toNanos(timeout);Ex=transferer.transfer(e,true,nanos);if(x!=null)returntrue;if(Thread.interrupted())thrownewInterruptedException();returnfalse;}

出队方法

take

publicEtake()throwsInterruptedException{Ee=transferer.transfer(null,false,0);if(e!=null)returne;Thread.interrupted();thrownewInterruptedException();}

poll

publicEpoll(){returntransferer.transfer(null,true,0);}

poll(超时)

publicEpoll(longtimeout,TimeUnitunit)throwsInterruptedException{Ee=transferer.transfer(null,true,unit.toNanos(timeout));if(e!=null||!Thread.interrupted())returne;thrownewInterruptedException();}

总结

transfer 三大分支

  • 分支 1:if (h == null || h.mode == mode) 入栈阻塞
    • 触发场景:栈空,或者栈顶和当前线程是同一角色(一堆生产者 / 一堆消费者),没法直接匹配。
    • 执行步骤:
      1. 如果开启超时且剩余时间为 0,清理栈内取消节点直接返回 null;
      2. 调用snode()复用节点,CAS 替换 head 压栈;
      3. awaitFulfill()自旋 park 阻塞,等待其他互补线程来匹配自己;
      4. 先自旋一小段时间优化,减少线程切换开销;
      5. 自旋超过阈值,标记 waiter 并调用 LockSupport.park 休眠;
      6. 阻塞唤醒后判断:
        • 返回 m != 自身:匹配成功,清理栈辅助弹出节点,返回数据;
        • 返回 m == 自身:中断 / 超时取消,调用clean()清理无效节点,返回 null。
  • 分支 2:else if (!isFulfilling (h.mode)) 主动发起匹配
    • 触发场景:栈顶是互补角色,且栈顶节点没有正在交接。
    • 执行步骤:
      1. 栈顶节点已取消,弹出 head 重试循环;
      2. 创建带 FULFILLING|mode 的交接节点压入栈顶;
      3. 内层无限循环拿到栈下一个等待节点 m;
      4. m.tryMatch() CAS 绑定匹配关系,唤醒对方阻塞线程;
      5. CAS 一次性弹出交接节点 s 和等待节点 m,返回交付数据。
  • 分支 3:else 协助交接(性能优化分支)
    • 触发场景:栈顶节点已经带 FULFILLING 标记,正在和下方节点匹配。
    • 当前线程不做新的入栈、不发起新匹配,只做辅助工作:
      1. 帮忙执行tryMatch完成栈顶未做完的绑定;
      2. CAS 断开失效、已取消的节点;
      3. 弹出匹配完成的双节点,减少后续线程的 CAS 竞争,大幅提升高并发吞吐。

流程图解

先 put 再 take

初始状态

head = null,栈空

执行 put

  1. transfer(“Apple”, false, 0)
    mode = DATA,h=null 进入分支 1

  2. CAS 创建 S_p (DATA, item=“Apple”),压栈,head 指向 S_p

head ↓ [S_p | DATA | item=Apple | waiter=Tp | match=null] next=null
  1. 执行 awaitFulfill(S_p)
    自旋几次无匹配对象,设置 waiter=Tp,调用 LockSupport.park()
  2. Tp 阻塞挂起,停在 awaitFulfill,等待 match 被赋值

执行 take

  1. transfer(null, false, 0),mode=REQUEST
    读取栈顶 h=S_p,h.mode=DATA ≠ REQUEST,且 !isFulfilling(h.mode) → 分支 2
  2. 创建交接节点 S_f:mode = FULFILLING | REQUEST
  3. CAS 将 S_f 压入栈顶,head 更新为 S_f
head ↓ [S_f | FULFILLING|REQUEST | next=S_p] ↓ [S_p | DATA | item=Apple | waiter=Tp | match=null]
  1. 内层循环:m = S_f.next = S_p(待匹配生产者节点)
  2. 执行 S_p.tryMatch(S_f) 核心绑定:
    • CAS 把 S_p.match = S_f
    • 拿到 S_p.waiter = Tp,执行 LockSupport.unpark(Tp) 唤醒生产者
    • 返回 true,匹配绑定完成
  3. CAS 一次性弹出栈顶两个节点:casHead(S_f, S_p.next),S_p.next 是 null,执行后 head = null,栈清空
  4. 当前线程是 REQUEST 消费者,返回 m.item = S_p.item = “Apple”
  5. take () 方法拿到 “Apple”,消费者 Tc 执行完毕退出

put 线程被唤醒

Tp 从 park 处唤醒,回到 awaitFulfill 循环

  1. 读取 S_p.match = S_f,match 不为空,直接 return S_f(匹配节点)
  2. 回到 transfer 分支 1 收尾逻辑:辅助清理栈
  3. if ((h = head) != null && h.next == s) casHead(h, s.next);
  4. mode 是 DATA 生产者,返回 s.item = “Apple”
  5. put 方法 transfer 返回非 null,无中断,put 执行完成,生产者 Tp 退出

先 take 再 put

  1. Tc 先 take,栈空走分支 1,压入 S_c (REQUEST),Tc park 阻塞
head → [S_c | REQUEST | waiter=Tc]
  1. Tp 执行 put (“Banana”),栈顶 mode 互补,走分支 2
    创建 FULFILLING|DATA 交接节点 S_f 压栈
head → [S_f | FULFILLING|DATA] → [S_c | REQUEST]
  1. S_c.tryMatch(S_f),绑定 match,unpark (Tc)
  2. 弹出双节点,栈清空;生产者返回自己的 Banana,消费者唤醒后拿到数据
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/7/5 13:23:52

音视频原理

文章目录像素分辨率位深帧率一、基础概念二、常见帧率标准与用途三、帧率核心特点四、补充常识码率Stride&#xff08;Pitch 行跨度/行字节数&#xff09;像素 定义&#xff1a;像素是图像的基本组成单位&#xff0c;英文pixel由picture&#xff08;图片&#xff09;与element…

作者头像 李华
网站建设 2026/7/5 13:21:18

《唤醒你的AI同事:WorkBuddy从零上手》037:附录B 快捷键一览

本文是《唤醒你的 AI 同事——WorkBuddy 从零上手》系列 第 37 篇。 回顾总结:通过第 036 篇附录 A,我们整理了 WorkBuddy 最实用的指令模板——从报告撰写、合同审查到数据分析、代码生成等 10+ 个场景。你现在已经拥有了即拿即用的"武器库"。但光有模板还不够,手…

作者头像 李华
网站建设 2026/7/5 13:19:06

如何用m4s-converter将B站缓存视频永久保存为MP4格式?

如何用m4s-converter将B站缓存视频永久保存为MP4格式&#xff1f; 【免费下载链接】m4s-converter 一个跨平台小工具&#xff0c;将bilibili缓存的m4s格式音视频文件合并成mp4 项目地址: https://gitcode.com/gh_mirrors/m4/m4s-converter 你是否曾遇到过B站收藏的视频突…

作者头像 李华
网站建设 2026/7/5 13:18:36

影刀RPA深度教程:定时任务与企业通知实战

影刀RPA深度教程&#xff1a;定时任务与企业通知实战 流程写好了&#xff0c;总不能每天手动点"运行"吧&#xff1f; 这篇讲定时任务、企业微信通知、监控告警&#xff0c;让你的自动化真正"无人值守"。 先装好环境 www.yingdao.com 下载&#xff0c;社…

作者头像 李华
网站建设 2026/7/5 13:15:37

云计算虚拟化技术全解析:从理论到实践

云计算虚拟化技术深度解析——从理论到实践的全面指南 前言 在当今数字化转型的浪潮中&#xff0c;虚拟化技术已经成为IT基础设施的核心基石。作为一名云计算学习者&#xff0c;我通过系统学习《Linux系统管理-云计算虚拟化技术》课程&#xff0c;对虚拟化技术有了全面的认识。…

作者头像 李华
网站建设 2026/7/5 13:14:46

北京通州少儿美术机构专业排行榜出炉!优秀学画画之地究竟有哪些?

在北京通州&#xff0c;少儿学画画的需求日益增长&#xff0c;各类美术培训机构如雨后春笋般涌现。那么&#xff0c;哪些是优秀的学画画之地呢&#xff1f;下面为大家详细分析。北京通州学画画的行业现状行业报告显示&#xff0c;近年来北京通州少儿美术培训市场规模逐步扩大&a…

作者头像 李华