前言:
Java 并发编程远不止 `synchronized` 和 `volatile`。在 `java.util.concurrent` 包中,有一组专为多线程协作设计的工具类,它们各自解决不同场景下的线程同步问题。本文将围绕以下核心类展开:
第一梯队(基础协作类):CountDownLatch、CyclicBarrier、Semaphore、Exchanger
第二梯队(进阶工具类):Phaser、CompletableFuture、LockSupport
关联扩展:BlockingQueue、StampedLock、ForkJoinPool、Semaphore-based RateLimiter、Exchanger 变体等
一、CountDownLatch——倒计时门闩
1.1 核心思想
CountDownLatch 是一把一次性的倒计时锁。它允许一个或多个线程等待其他线程完成操作。内部维护一个计数器,每次 `countDown()` 减 1,减到 0 时,所有在 `await()` 上阻塞的线程被释放。
关键特性:不可重置,用完即废。
1.2 典型场景
场景一:主线程等待所有子任务完成
微服务启动时,需要同时加载配置、预热缓存、连接数据库,全部完成后才对外提供服务:
public class ServiceBootstrap {
private static final CountDownLatch latch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(() -> {
loadConfig();
latch.countDown();
System.out.println("配置加载完成");
});
executor.submit(() -> {
warmUpCache();
latch.countDown();
System.out.println("缓存预热完成");
});
executor.submit(() -> {
connectDatabase();
latch.countDown();
System.out.println("数据库连接完成");
});
// 主线程等待所有初始化完成
latch.await();
System.out.println("所有初始化完成,服务开始对外提供");
executor.shutdown();
}
}
场景二:并发测试——模拟 N 个用户同时发起请求
压测中需要让所有线程同时出发,而不是陆续到达:
public class ConcurrentStressTest {
public static void main(String[] args) throws InterruptedException {
int threadCount = 100;
CountDownLatch startGate = new CountDownLatch(1); // 统一起跑线
CountDownLatch endGate = new CountDownLatch(threadCount); // 终点计数
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
startGate.await(); // 所有线程在此等待
// 同时开始执行业务
doHttpRequest();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endGate.countDown();
}
}).start();
}
long start = System.currentTimeMillis();
startGate.countDown(); // 一声令下,所有线程同时出发
endGate.await(); // 等待所有线程完成
long cost = System.currentTimeMillis() - start;
System.out.println(threadCount + " 个请求总耗时: " + cost + "ms");
}
}
1.3 超时等待——await(long timeout, TimeUnit unit)
实际生产中绝不应该无限制 `await()`,必须设置超时,防止某个子任务异常导致主线程永远卡死:
public class ServiceBootstrapWithTimeout {
private static final CountDownLatch latch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(() -> { loadConfig(); latch.countDown(); });
executor.submit(() -> { warmUpCache(); latch.countDown(); });
executor.submit(() -> {
connectDatabase(); // 假设这个可能超时
latch.countDown();
});
// 最多等待 10 秒,超时后继续走降级逻辑
boolean allDone = latch.await(10, TimeUnit.SECONDS);
if (allDone) {
System.out.println("所有初始化完成,服务正常启动");
} else {
System.out.println("初始化超时,剩余未完成: " + latch.getCount()
+ ",以降级模式启动服务");
// 降级:关闭非核心功能,对外返回部分可用状态
}
executor.shutdown();
}
}
`await()` 返回 `boolean` 值——`true` 表示计数器归零正常释放,`false` 表示超时。这是判断初始化是否全部成功的关键依据。
1.4 重要特性
- 计数器归零后,再调用 `await()` 会立即返回(不阻塞)。因为 CountDownLatch 的语义是"事件已经发生",后续线程不需要再等。这与 CyclicBarrier 不同——CyclicBarrier 会重置进入下一阶段。
- `countDown()` 调用次数必须 >= 初始计数值,否则 `await()` 会永远阻塞。建议在 `finally` 块中调用。
- 不能重复使用,计数器归零后无法复原。
二、CyclicBarrier——循环屏障
2.1 核心思想
CyclicBarrier 让一组线程互相等待,全部到达屏障点后,再一起继续执行。与 CountDownLatch 最大的区别在于 Cyclic(循环)——它可以被重置后重复使用。
CyclicBarrier 还支持一个可选的 barrierAction,在所有线程到达后、释放前执行,适合做汇总操作。
2.2 典型场景
场景一:多线程分片计算后汇总
大表数据按用户 ID 分片,每片由一个线程处理,全部处理完后合并结果:
public class ParallelDataProcessor {
private static final int SHARD_COUNT = 4;
private static final ConcurrentHashMap<String, Long> mergedResult = new ConcurrentHashMap<>();
private static final CyclicBarrier barrier = new CyclicBarrier(SHARD_COUNT, () -> {
// barrierAction:所有分片完成后执行汇总
System.out.println("所有分片处理完毕,开始合并结果,共 " + mergedResult.size() + " 条");
// 这里可以写入数据库或发送到下游
});
public static void main(String[] args) {
for (int shard = 0; shard < SHARD_COUNT; shard++) {
final int shardId = shard;
new Thread(() -> {
try {
// 模拟处理本分片数据
Map<String, Long> partialResult = processShard(shardId);
mergedResult.putAll(partialResult);
System.out.println("分片 " + shardId + " 处理完成");
barrier.await(); // 等待其他分片
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
场景二:等待所有玩家加载完毕
多人对战游戏中,服务器需要等待所有玩家客户端加载资源完毕后,同时开始游戏:
public class GameRoom {
private final CyclicBarrier allReadyBarrier;
private final int playerCount;
public GameRoom(int playerCount) {
this.playerCount = playerCount;
this.allReadyBarrier = new CyclicBarrier(playerCount, this::startGame);
}
public void playerReady(String playerName) {
new Thread(() -> {
System.out.println(playerName + " 正在加载资源...");
loadResources(); // 模拟加载耗时