news 2026/2/10 5:26:58

AQS、Condition

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AQS、Condition

目录

  • 一、AQS抽象类
    • 1.自定义AQS
    • 2.如何实现多个线程按序执行
    • 3.独占锁
      • 3.1 acquire()方法(ReentrantLock源码为例)
      • 3.2 release()方法(ReentrantLock源码为例)
    • 4.共享锁
      • 4.1 acquireShared()方法(Semaphore源码为例)
      • 4.2 releaseShared()方法(Semaphore源码为例)
  • 二、Condition

一、AQS抽象类

CLH锁获取锁失败时仍然会自旋,性能差。

AQS维护双向队列记录等待的线程,head指针持有锁的线程、tail指针指向队列尾部用于插入新的线程、state字段用来记录锁的使用情况,具体如何使用由实现类自定义ReentrantLock: state表示锁的重入次数、Semaphore: state表示可用许可数量、CountDownLatch: state表示剩余计数)。

线程首先CAS修改tail指针加入队列,然后分别CAS修改自己的前向指针和前结点的后向指针。与CAS不同的是,将获取锁失败的自旋操作替换为主动调用LockSupport.park()放弃CPU进入WATTING状态,head结点在释放锁后根据后向指针调用LockSupport.unpark()唤醒后继节点,后继结点获取锁后将自己的前向指针置空,便于垃圾回收器回收,然后开始获取任务,后向指针由后继结点负责连接。

上述结点和指针都要加volatile确保可见性。

1.自定义AQS

AQS是一个抽象类,定义了资源获取和释放的通用框架(双向队列+state+head、tail+等待唤醒机制),而具体的资源获取逻辑需要重写模板方法来实现。

需要定义的资源获取逻辑(重写的模版方法):

//独占方式。尝试获取资源,成功则返回true,失败则返回false。protectedbooleantryAcquire(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。protectedbooleantryRelease(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。protectedinttryAcquireShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。protectedbooleantryReleaseShared(int)//该线程是否正在独占资源。只有用到condition才需要去实现它。protectedbooleanisHeldExclusively()

自定义AQS感觉可以实现多个线程有序执行的业务,每个线程持有不同的id号,所有线程CAS自旋volatile state,仅当state为时获取锁,每个线程执行完修改state为下一个要执行的线程的state,然后释放锁。不过上述也没有用AQS,现在还想不出来怎么用AQS实现,主要是对这个抽象类的代码太陌生了。

2.如何实现多个线程按序执行

前两天看的面试题,今天突然就有思路了。

  1. CAS:实现多个线程有序执行的业务,每个线程持有不同的id号,所有线程CAS自旋volatile state,仅当state为时获取锁,每个线程执行完修改state为下一个要执行的线程的state,然后释放锁。
  2. AQS:因为CAS自旋太占CPU,将所有线程乱序入队head→T1→T3→T2,首先对首T1获取锁执行完,唤醒T3,但是T3CAS发现state跟自己序号不一样,所以实例化一个新的结点,将自己加入队尾,释放锁,唤醒T2,T2执行完唤醒T3。

3.独占锁

ReentrantLock就是AQS的独占锁实现类,定义state字段含义为锁的重入次数仅当state==0时表示当前没有线程持有锁state>0表示锁被占用,同时基于state实现了可重入机制,state--至0时释放锁

3.1 acquire()方法(ReentrantLock源码为例)

AQS本身不提供公平模式和非公平模式的实现,而是由实现类在模版方法中自行编写需要的模式,ReentrantLock实现了两种模式,下面介绍非公平模式:

  • 非公平模式:当一个线程尝试获取锁时,它会直接尝试获取,而不管等待队列中是否有其他线程在等待。如果获取失败,它才会加入等待队列

  • 公平模式:当一个线程尝试获取锁(或信号量)时,它会先检查等待队列中是否有其他线程在等待,如果有,那么它就会直接进入等待队列,而不是尝试获取。

该方法用于获取资源,会调用自定义的模版方法:

  • tryAcquire():首先获取state判断state==0,那么表示锁未被使用,那么尝试CAS修改state=state+1获取锁(仅非公平模式下执行,公平模式下直接进去等待队列)。
  • addWaiter():获取锁失败后,将当前线程封装为结点,CAS更新尾指针tail加入双向队列中。如果有前驱结点,还需要CAS修改前驱结点的next指针指向当前结点,指定前驱结点释放锁后要唤醒哪个结点
  • acquireQueued():当前线程加入队列之后,如果发现head指向当前节点,说明当前线程是队列中第一个等待的节点,于是调用tryAcquire()尝试获取锁。如果尝试获取锁失败或不是第一个节点,当前结点调用LockSupport.park()进入等待状态,等待被唤醒
publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}protectedbooleantryAcquire(intacquires){finalThreadcurrent=Thread.currentThread();// 1、获取 AQS 中的 state 状态intc=getState();// 2、如果 state 为 0,证明锁没有被其他线程占用if(c==0){// 2.1、通过 CAS 对 state 进行更新if(compareAndSetState(0,acquires)){// 2.2、如果 CAS 更新成功,就将锁的持有者设置为当前线程setExclusiveOwnerThread(current);returntrue;}}// 3、如果当前线程和锁的持有线程相同,说明发生了「锁的重入」elseif(current==getExclusiveOwnerThread()){intnextc=c+acquires;if(nextc<0)// overflowthrownewError("Maximum lock count exceeded");// 3.1、将锁的重入次数加 1setState(nextc);returntrue;}// 4、如果锁被其他线程占用,就返回 false,表示获取锁失败returnfalse;}privateNodeaddWaiter(Nodemode){// 1、将当前线程封装为 Node 节点。Nodenode=newNode(Thread.currentThread(),mode);Nodepred=tail;// 2、如果 pred != null,则证明 tail 节点已经被初始化,直接将 Node 节点加入队列即可。if(pred!=null){node.prev=pred;// 2.1、通过 CAS 控制并发安全。if(compareAndSetTail(pred,node)){pred.next=node;returnnode;}}// 3、初始化队列,并将新创建的 Node 节点加入队列。enq(node);returnnode;}finalbooleanacquireQueued(finalNodenode,intarg){booleanfailed=true;try{booleaninterrupted=false;for(;;){// 1、尝试获取锁。finalNodep=node.predecessor();if(p==head&&tryAcquire(arg)){setHead(node);p.next=null;// help GCfailed=false;returninterrupted;}// 2、判断线程是否可以阻塞,如果可以,则阻塞当前线程。if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{// 3、如果获取锁失败,就会取消获取锁,将节点状态更新为 CANCELLED。if(failed)cancelAcquire(node);}}

3.2 release()方法(ReentrantLock源码为例)

该方法用于释放资源,会调用自定义的模版方法:

  • tryRelease()CAS更新state=0释放锁,修改持有锁的线程为null。
  • unparkSuccessor():调用LockSupport.unpark()唤醒当前结点的后向指针指向的结点。
publicfinalbooleanrelease(intarg){// 1、尝试释放锁if(tryRelease(arg)){Nodeh=head;// 2、唤醒后继节点if(h!=null&&h.waitStatus!=0)unparkSuccessor(h);returntrue;}returnfalse;}protectedfinalbooleantryRelease(intreleases){intc=getState()-releases;// 1、判断持有锁的线程是否为当前线程if(Thread.currentThread()!=getExclusiveOwnerThread())thrownewIllegalMonitorStateException();booleanfree=false;// 2、如果 state 为 0,则表明当前线程已经没有重入次数。因此将 free 更新为 true,表明该线程会释放锁。if(c==0){free=true;// 3、更新持有资源的线程为 nullsetExclusiveOwnerThread(null);}// 4、更新 state 值setState(c);returnfree;}// 这里的入参 node 为队列的头节点(虚拟头节点)privatevoidunparkSuccessor(Nodenode){intws=node.waitStatus;// 1、将头节点的状态进行清除,为后续的唤醒做准备。if(ws<0)compareAndSetWaitStatus(node,ws,0);Nodes=node.next;// 2、如果后继节点异常,则需要从 tail 向前遍历,找到正常状态的节点进行唤醒。if(s==null||s.waitStatus>0){s=null;for(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}if(s!=null)// 3、唤醒后继节点LockSupport.unpark(s.thread);}

4.共享锁

Semaphore就是AQS的共享锁实现类,可实现多线程同时持有锁,定义state字段含义为锁的剩余容量仅当state==0时表示当前锁已没有余量state>0表示锁被占用,但仍有余量可申请。

4.1 acquireShared()方法(Semaphore源码为例)

该方法用于获取资源,会调用自定义的模版方法,默认使用自定义的非公平模式

  • tryAcquireShared():首先获取state判断state>0,那么表示锁还有剩余,那么尝试CAS修改state=state-1获取锁(仅非公平模式下执行,公平模式下直接进去等待队列)。
  • doAcquireShared():获取锁失败后,将当前线程封装为结点,CAS更新尾指针tail加入双向队列中。如果有前驱结点,还需要CAS修改前驱结点的next指针指向当前结点,指定前驱结点释放锁后要唤醒哪个结点。如果head指向当前节点,说明当前线程是队列中第一个等待的节点,于是调用tryAcquire()尝试获取锁,如果锁仍有余量那么同时唤醒后继结点如果尝试获取锁失败或不是第一个节点,当前结点调用LockSupport.park()进入等待状态,等待资源有余量被唤醒
publicfinalvoidacquireShared(intarg){if(tryAcquireShared(arg)<0)doAcquireShared(arg);}finalinttryAcquireShared(intacquires){for(;;){// 1、获取可用资源数量。intavailable=getState();// 2、计算剩余资源数量。intremaining=available-acquires;// 3、如果剩余资源数量 < 0,则说明资源不足,直接返回;如果 CAS 更新 state 成功,则说明当前线程获取到了共享资源,直接返回。if(remaining<0||compareAndSetState(available,remaining))returnremaining;}}privatevoiddoAcquireShared(intarg){// 1、将当前线程加入到队列中等待。finalNodenode=addWaiter(Node.SHARED);booleanfailed=true;try{booleaninterrupted=false;for(;;){finalNodep=node.predecessor();if(p==head){// 2、如果当前线程是等待队列的第一个节点,则尝试获取资源。intr=tryAcquireShared(arg);if(r>=0){// 3、将当前线程节点移出等待队列,并唤醒后续线程节点。setHeadAndPropagate(node,r);p.next=null;// help GCif(interrupted)selfInterrupt();failed=false;return;}}if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{// 3、如果获取资源失败,就会取消获取资源,将节点状态更新为 CANCELLED。if(failed)cancelAcquire(node);}}

4.2 releaseShared()方法(Semaphore源码为例)

该方法用于释放资源,会调用自定义的模版方法:

  • tryReleaseShared()CAS更新state++释放锁
  • doReleaseShared():调用LockSupport.unpark()唤醒当前结点的后向指针指向的结点。
publicfinalbooleanreleaseShared(intarg){if(tryReleaseShared(arg)){doReleaseShared();returntrue;}returnfalse;}protectedfinalbooleantryReleaseShared(intreleases){for(;;){intcurrent=getState();intnext=current+releases;if(next<current)// overflowthrownewError("Maximum permit count exceeded");if(compareAndSetState(current,next))returntrue;}}

二、Condition

Object类的wait()/notify()依赖于synchronized锁实现了线程间的等待和通知机制,必须在synchronized代码块内调用,且调用notify()会唤醒全部wait()在Object对象上的线程。

Condition依赖ReentrantLock锁,底层实现的是park/unpark机制,它的优点是对于每个ReentrantLock对象可以设置多个Condition,线程可以根据任务要求Condition.await()在ReentrantLock对象不同的Condition上,唤醒线程时调用对应Condition.signal()实现部分唤醒。

  • await():进入Condition等待队列等待被唤醒线程会唤醒后先获取ReentrantLock然后执行后的代码
  • signal()将Condition等待队列中的线程全部唤醒
publicclassReentrantLockDemo{staticReentrantLocklock=newReentrantLock();staticintstate=0;publicstaticvoidmain(String[]args){// 一个锁对象可以创建多个ConditionConditionnotEmpty=lock.newCondition();ConditionnotFull=lock.newCondition();newThread(()->{while(true){lock.lock();try{// await会自动释放锁,线程被唤醒并执行后续代码前会先获取锁if(state==0)notEmpty.await();state--;System.out.println("消费一次");notFull.signal();}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}}}).start();newThread(()->{while(true){lock.lock();try{// await会自动释放锁,线程被唤醒并执行后续代码前会先获取锁if(state==1)notFull.await();state++;System.out.println("生产一次");notEmpty.signal();}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}}}).start();}}
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/7 0:19:56

基因组+单细胞--弥漫性大B细胞淋巴瘤的生物学变异轴向

作者&#xff0c;Evil Genius分享文章之前&#xff0c;大家对基因组 单细胞的联合分析思路有了解了么&#xff1f;单细胞分析的CNV可以和WES的CNV分析相互对应么&#xff1f;比较维度WES-CNV (基于Bulk WES)scCNV (基于scRNA-seq&#xff0c;如inferCNV分析)互补与验证关系检测…

作者头像 李华
网站建设 2026/2/1 11:08:14

火山云豆包:重新定义AI交互,让智能触手可及

火山云豆包&#xff1a;重新定义AI交互&#xff0c;让智能触手可及在人工智能技术日新月异的今天&#xff0c;火山云豆包模型以其卓越的性能和人性化的交互体验&#xff0c;正逐渐成为用户心中智能助手的新标杆。这款由火山引擎自主研发的AI模型&#xff0c;凭借强大的自然语言…

作者头像 李华
网站建设 2026/2/5 3:23:43

小白程序员也能掌握的AI黑科技:本体驱动的零噪声GraphRAG,让知识图谱自我进化,告别“裸奔“数据垃圾场!

今天分享一个不错的开源项目trustgraph&#xff1a;AI 上下文图谱工厂&#xff0c;构建、管理并部署专为 AI 优化的上下文图谱 但是重点要分享的是其中关于本体驱动的零噪声GraphRAG部分 为什么要用本体Ontology尼&#xff0c;原因在于&#xff1a;构建一张会自我完善的知识图…

作者头像 李华
网站建设 2026/2/8 12:01:44

AI负载迅猛增加,隐性DevOps危机正在暴露

固守传统的DevOps团队将越来越难以满足AI时代下的数据需求。成功的团队必须提前布局全面可预测架构&#xff0c;帮助工程师们清晰洞察技术决策与业务成果之间的关联。曾经的运维很简单&#xff1a;选取技术栈中的特定组件&#xff0c;运行单元测试&#xff0c;隔离检查微服务&a…

作者头像 李华