news 2026/3/29 12:08:08

SemaphoreCountDownlatchCyclicBarrier源码分析

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SemaphoreCountDownlatchCyclicBarrier源码分析

一、CountDownLatch:闭锁机制

1.1 基本原理与核心逻辑

CountDownLatch 让一个或多个线程等待其他线程执行完成后再执行。在创建 CountDownLatch 对象时,必须指定线程数 count,每当一个线程执行完成调用 countDown()方法,线程数 count 减 1,当 count 减到 0 时,await()方法就不再阻塞。

核心逻辑​:

  • 基于 AQS+CAS 实现
  • 初始时,count 为指定的线程数
  • 每个线程执行完成调用 countDown(),count 减 1
  • 当 count 减到 0 时,所有等待的线程被唤醒

1.2 源码分析

// 构造函数publicCountDownLatch(intcount){if(count<0)thrownewIllegalArgumentException("count < 0");this.sync=newSync(count);}// Sync内部类privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{Sync(intcount){setState(count);}intgetCount(){returngetState();}protectedinttryAcquireShared(intacquires){return(getState()==0)?1:-1;}protectedbooleantryReleaseShared(intreleases){for(;;){intc=getState();if(c==0)returnfalse;intnextc=c-1;if(compareAndSetState(c,nextc))returnnextc==0;}}}

关键点​:

  • CountDownLatch 的 state 属性表示剩余需要等待的线程数
  • tryAcquireShared:当 state 为 0 时,返回 1,表示等待完成
  • tryReleaseShared:每次 countDown(),state 减 1,当 state 为 0 时,返回 true,唤醒等待线程

1.3 使用场景与示例

电商场景​:商品价格计算完成后汇总

publicstaticvoidmain(String[]args)throwsInterruptedException{finalint[]products=getProductsByCategoryId();List<ProductPrice>list=Arrays.stream(products).mapToObj(ProductPrice::new).collect(Collectors.toList());finalCountDownLatchlatch=newCountDownLatch(products.length);list.forEach(pp->newThread(()->{System.out.println(pp.getProdID()+"->开始计算商品价格.");try{TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));if(pp.prodID%2==0){pp.setPrice(pp.prodID*0.9D);}else{pp.setPrice(pp.prodID*0.71D);}System.out.println(pp.getProdID()+"->价格计算完成.");}catch(InterruptedExceptione){e.printStackTrace();}finally{latch.countDown();}}).start());latch.await();System.out.println("所有价格计算完成.");list.forEach(System.out::println);}

1.4 重要结论

  1. CountDownLatch 是一次性的​:计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值
  2. 计数器为 0 时唤醒​:当 count 减到 0 时,所有等待的线程被唤醒
  3. 适用场景​:多任务完成后汇总、并行任务同步、资源初始化

二、Semaphore:信号量控制

2.1 基本原理与核心逻辑

Semaphore 基于 AQS+CAS 实现的,可用于在一个时刻允许多个线程对共享资源进行并行操作的场景。它维护了一个计数器,线程可以通过调用 acquire()方法获取 Semaphore 中的许可证,当计数器为 0 时,调用 acquire()的线程将被阻塞,直到有其他线程释放许可证;线程可以通过调用 release()方法释放 Semaphore 中的许可证。

核心逻辑​:

  • 维护一个计数器,表示可用的许可证数量
  • acquire():获取许可证,计数器减 1
  • release():释放许可证,计数器加 1

2.2 源码分析

// 构造函数publicSemaphore(intpermits){sync=newNonfairSync(permits);}publicSemaphore(intpermits,booleanfair){sync=fair?newFairSync(permits):newNonfairSync(permits);}// 非公平锁实现finalintnonfairTryAcquireShared(intacquires){for(;;){intavailable=getState();intremaining=available-acquires;if(remaining<0||compareAndSetState(available,remaining))returnremaining;}}// 释放许可证protectedfinalbooleantryReleaseShared(intreleases){for(;;){intcurrent=getState();intnext=current+releases;if(next<current)thrownewError("Maximum permit count exceeded");if(compareAndSetState(current,next))returntrue;}}

关键点​:

  • Semaphore 的 state 表示可用的许可证数量
  • 非公平锁:直接尝试获取许可证,不考虑等待队列
  • 公平锁:先检查等待队列是否有线程,如果有则优先唤醒

2.3 使用场景与示例

限流场景​:商品服务接口限流

publicclassSemaphoreDemo{privatestaticSemaphoresemaphore=newSemaphore(2);privatestaticExecutorexecutor=Executors.newFixedThreadPool(10);publicstaticvoidmain(String[]args){for(inti=0;i<10;i++){executor.execute(()->getProductInfo2());}}publicstaticStringgetProductInfo2(){if(!semaphore.tryAcquire()){log.error("请求被流控了");return"请求被流控了";}try{log.info("请求服务");Thread.sleep(2000);return"返回商品详情信息";}finally{semaphore.release();}}}

限制同时在线用户数量

publicclassSemaphoreDemo7{publicstaticvoidmain(String[]args){finalintMAX_PERMIT_LOGIN_ACCOUNT=10;finalLoginServiceloginService=newLoginService(MAX_PERMIT_LOGIN_ACCOUNT);IntStream.range(0,20).forEach(i->newThread(()->{booleanlogin=loginService.login();if(!login){System.out.println(Thread.currentThread()+" is refused due to exceed max online account.");return;}try{simulateWork();}finally{loginService.logout();}},"User-"+i).start());}privatestaticvoidsimulateWork(){try{TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));}catch(InterruptedExceptione){// ignore}}privatestaticclassLoginService{privatefinalSemaphoresemaphore;publicLoginService(intmaxPermitLoginAccount){this.semaphore=newSemaphore(maxPermitLoginAccount,true);}publicbooleanlogin(){returnsemaphore.tryAcquire();}publicvoidlogout(){semaphore.release();}}}

2.4 重要结论

  1. Semaphore 的许可证数量​:可用于控制在同一时间允许多少个线程对共享资源进行访问
  2. 非公平锁是默认​:Semaphore 默认使用非公平锁
  3. 适用场景​:限流、资源池、限制同时在线用户数量

三、CyclicBarrier:回环栅栏

3.1 基本原理与核心逻辑

CyclicBarrier(回环栅栏或循环屏障),是 Java 并发库中的一个同步工具,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier 可以被重用。

核心逻辑​:

  • parties:屏障拦截的线程数量
  • 当指定数量的线程全部调用 await()方法时,这些线程不再阻塞
  • 可循环使用:当所有线程都被释放后,CyclicBarrier 可以被重用

3.2 源码分析

// 构造函数publicCyclicBarrier(intparties){this(parties,null);}publicCyclicBarrier(intparties,RunnablebarrierAction){if(parties<=0)thrownewIllegalArgumentException();this.parties=parties;this.count=parties;this.barrierCommand=barrierAction;}// await()方法publicintawait()throwsInterruptedException,BrokenBarrierException{try{returndowait(false,0L);}catch(TimeoutExceptiontoe){thrownewError(toe);}}privateintdowait(booleantimed,longnanos)throwsInterruptedException,BrokenBarrierException,TimeoutException{finalReentrantLocklock=this.lock;lock.lock();try{finalGenerationg=generation;if(g.broken)thrownewBrokenBarrierException();if(Thread.interrupted()){breakBarrier();thrownewInterruptedException();}intindex=--count;if(index==0){// 最后一个线程booleanranAction=false;try{finalRunnablecommand=barrierCommand;if(command!=null)command.run();ranAction=true;nextGeneration();return0;}finally{if(!ranAction)breakBarrier();}}// 等待其他线程到达屏障点for(;;){try{if(!timed)trip.await();elseif(nanos>0L)nanos=trip.awaitNanos(nanos);}catch(InterruptedExceptionie){if(g==generation&&!g.broken){breakBarrier();throwie;}else{Thread.currentThread().interrupt();}}if(g.broken)thrownewBrokenBarrierException();if(g!=generation)returnindex;if(timed&&nanos<=0L){breakBarrier();thrownewTimeoutException();}}}finally{lock.unlock();}}// 重置CyclicBarrierpublicvoidreset(){finalReentrantLocklock=this.lock;lock.lock();try{breakBarrier();nextGeneration();}finally{lock.unlock();}}

关键点​:

  • CyclicBarrier 内部结构:Generation,持有布尔类型的属性 broken
  • await():计数器递减,当 count 为 0 时,执行 barrierCommand,再唤醒等待线程
  • reset():重置 CyclicBarrier 的计数器和 generation 属性

3.3 使用场景与示例

模拟跟团旅游

publicclassCyclicBarrierDemo3{publicstaticvoidmain(String[]args)throwsBrokenBarrierException,InterruptedException{finalCyclicBarrierbarrier=newCyclicBarrier(11);for(inti=0;i<10;i++){newThread(newTourist(i,barrier)).start();}barrier.await();System.out.println("导游:所有的游客都上了车.");barrier.await();System.out.println("导游:所有的游客都下车了.");}privatestaticclassTouristimplementsRunnable{privatefinalinttouristID;privatefinalCyclicBarrierbarrier;privateTourist(inttouristID,CyclicBarrierbarrier){this.touristID=touristID;this.barrier=barrier;}@Overridepublicvoidrun(){System.out.printf("游客:%d乘坐旅游大巴\\n",touristID);spendSeveralSeconds();waitAndPrint("游客:%d上车,等别人上车.\\n");System.out.printf("游客:%d到达目的地\\n",touristID);spendSeveralSeconds();waitAndPrint("游客:%d下车,等别人下车.\\n");}privatevoidwaitAndPrint(Stringmessage){System.out.printf(message,touristID);try{barrier.await();}catch(InterruptedException|BrokenBarrierExceptione){// ignore}}privatevoidspendSeveralSeconds(){try{TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));}catch(InterruptedExceptione){// ignore}}}}

3.4 重要结论

  1. CyclicBarrier 是可循环使用的​:当所有等待线程都被释放后,CyclicBarrier 可以被重用
  2. 与 CountDownLatch 的区别​:
    • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
    • CountDownLatch 的 await 方法会等待计数器被 count down 到 0,而 CyclicBarrier 的 await 方法会等待其他线程到达 barrier point
  3. 适用场景​:多线程任务并行执行后同时触发后续操作、多线程数据处理

四、重要结论总结

工具类适用场景核心特点一次性使用可重用性
CountDownLatch多任务完成后汇总等待 N 个任务完成
Semaphore限制并发数量控制资源访问数量
CyclicBarrier多线程协同工作等待 N 个线程到达屏障点

关键总结​:

  1. CountDownLatch​:适合等待多个任务完成的场景,一次性使用
  2. Semaphore​:适合控制并发访问数量的场景,如限流、资源池
  3. CyclicBarrier​:适合多线程协同工作的场景,可循环使用,能保证所有线程同时执行

使用建议​:

  • 当需要等待多个任务完成后再继续执行时,使用 CountDownLatch
  • 当需要控制并发访问数量时,使用 Semaphore
  • 当需要多个线程协同工作,同时到达某个点后再继续执行时,使用 CyclicBarrier
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/25 13:18:18

5分钟部署Whisper语音识别:零基础搭建多语言转录服务

5分钟部署Whisper语音识别&#xff1a;零基础搭建多语言转录服务 引言&#xff1a;语音识别原来这么简单 你有没有遇到过这样的场景&#xff1f;会议录音需要整理成文字&#xff0c;外语视频需要翻译字幕&#xff0c;或者想给音频内容添加文字说明。传统方法要么手动打字费时…

作者头像 李华
网站建设 2026/3/15 11:08:37

VMD-SE-BiLSTM+Transformer多变量时序预测,MATLAB代码

一、研究背景 该模型针对复杂非线性时间序列预测问题&#xff0c;特别是具有多尺度、非平稳特性的时序数据。传统单一模型难以同时捕捉时序数据中的低频趋势和高频波动特征&#xff0c;因此采用分解-重构-混合建模 的策略&#xff0c;结合信号处理与深度学习技术提升预测精度。…

作者头像 李华
网站建设 2026/3/24 14:05:03

局域网中两台win电脑传输文件

文章目录1.方案一&#xff1a;Python 一行命令 HTTP 服务 (最接近 Linux 体验)1. 在发送方电脑 A 上操作2. 在接收方电脑 B 上操作2.方案二&#xff1a;Windows 共享文件夹 (适合频繁传输)3. Linux电脑向Win电脑传输文件总结✨✨✨学习的道路很枯燥&#xff0c;希望我们能并肩走…

作者头像 李华
网站建设 2026/3/28 8:59:37

Flink运行架构深度解析:从核心组件到实战提交

一、Flink运行架构概述Flink作为一个分布式流式计算引擎&#xff0c;其运行架构主要围绕 JobManager 和 TaskManager 两大核心组件展开。1. JobManager&#xff08;Master&#xff09;负责协调分布式任务的执行&#xff0c;包括任务调度、资源申请、检查点协调和故障恢复等。一…

作者头像 李华
网站建设 2026/3/28 8:08:44

如何选择高安全性CDN服务?2026年五大厂商深度横评指南

在数字化时代&#xff0c;CDN 作为业务内容分发的核心基础设施&#xff0c;其安全性直接决定了企业数据传输与业务运营的稳定性&#xff0c;选择一家高安全性的 CDN 服务公司成为企业数字化布局的关键。本文从合规资质、传输加密、访问控制、运维与服务四大核心维度&#xff0c…

作者头像 李华
网站建设 2026/3/15 15:26:02

数位差与数值和的构造

求解代码public static void main(String[] args) throws IOException {BufferedReader br new BufferedReader(new InputStreamReader(System.in));StringTokenizer in new StringTokenizer(br.readLine());PrintWriter out new PrintWriter(new OutputStreamWriter(System…

作者头像 李华