CountDownLatch、CyclicBarrier 和 Semaphore

CountDownLatch

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.     ——JDK 源码

CountDownLatch 是 Java 并发包中的一个同步工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。CountDownLatch 被初始化为一个给定的“计数”(count),该计数是线程需要等待完成的操作数量。调用 CountDownLatchawait() 方法会使当前线程等待,直到计数变为零。每次在另一个线程中调用 CountDownLatchcountDown() 方法,计数会减一。当计数达到零时,所有因调用 await() 而等待的线程将被释放。

下面是一个简单的示例代码,演示了如何使用 CountDownLatch

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemoWithLambda {

    public static void main(String[] args) throws InterruptedException {
        // 初始化CountDownLatch,设置计数为3
        CountDownLatch latch = new CountDownLatch(3);

        // 创建并启动线程,这里使用lambda表达式
        for (int i = 0; i < 3; i++) {
            Thread worker = new Thread(() -> {
                try {
                    // 模拟一些工作
                    System.out.println(Thread.currentThread().getName() + " is working");
                    Thread.sleep(1000); // 模拟耗时任务
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 完成工作后减少latch的计数
                    latch.countDown();
                    System.out.println(Thread.currentThread().getName() + " has finished and latch count is now: " + latch.getCount());
                }
            });
            worker.start();
        }

        // 主线程等待所有工作线程完成
        System.out.println("Waiting for workers to finish...");
        latch.await(); // 当计数为0时,主线程继续执行

        // 所有工作线程完成后,主线程继续执行
        System.out.println("All workers have finished.");
    }
}

CyclicBarrier 

  • 作用:循环栅栏,让一组线程相互等待,直到所有线程都到达屏障点后同时继续执行。
  • 特点
    • 可重用:达到屏障点后自动重置,可重复使用。
    • 固定参与者:初始需指定参与线程数,但无法动态调整。
    • 屏障动作:可指定 Runnable 在所有线程到达后执行一次操作。
  • 典型场景
    • 并行计算中分阶段处理数据(如分块计算后合并结果)。
    • 模拟多线程同时触发某个操作(如高并发压力测试)。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        // 1. 定义需要同步的线程数量(3个玩家)
        int numberOfPlayers = 3;

        // 2. 创建 CyclicBarrier,指定线程数量和屏障动作(游戏开始时的提示)
        CyclicBarrier startBarrier = new CyclicBarrier(
            numberOfPlayers,
            () -> System.out.println("所有玩家已就绪!游戏开始!")
        );

        // 3. 启动3个玩家线程
        for (int i = 1; i <= numberOfPlayers; i++) {
            int playerNumber = i;
            new Thread(() -> {
                try {
                    // 玩家准备阶段:模拟不同时间的准备过程
                    System.out.println("玩家" + playerNumber + "正在准备...");
                    Thread.sleep((long) (Math.random() * 2000)); // 随机延迟0-2秒
                    
                    // 到达屏障点:等待其他玩家也准备完毕
                    System.out.println("玩家" + playerNumber + "准备完毕,等待其他玩家...");
                    startBarrier.await(); // 关键:调用 await() 进入等待
                    
                    // 所有玩家就绪后,开始游戏
                    System.out.println("玩家" + playerNumber + "开始游戏!");
                } catch (InterruptedException e) {
                    System.out.println("玩家" + playerNumber + "被中断,无法继续游戏");
                } catch (BrokenBarrierException e) {
                    System.out.println("游戏启动失败,屏障被破坏");
                }
            }).start();
        }
    }
}

InterruptedException:表示当前线程在等待(如睡眠或 await())时被其他线程调用 Thread.interrupt() 中断。
BrokenBarrierException:某个线程在等待时被中断(InterruptedException);某个线程调用了 CyclicBarrier.reset() 方法

Semaphore

信号量(Semaphore)是用于控制对某一资源的访问数量的同步工具。它维护了一个许可集,线程可以通过获取许可来访问资源,并在完成后释放许可。这对于限制对某些资源(如数据库连接池或文件句柄)的并发访问非常有用。

在这个例子中,我们将模拟一个场景,其中有多个线程尝试访问有限数量的资源(比如数据库连接)。我们将创建一个 Semaphore 来表示这些资源的最大可用数量,并让每个线程尝试获取一个资源进行操作,然后释放资源。

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreExample {

    // 创建一个初始许可数为3的Semaphore,表示最多可以有3个线程同时访问资源
    private static final Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        // 创建10个线程模拟对资源的请求
        for (int i = 0; i < 10; i++) {
            int taskId = i + 1;
            new Thread(() -> {
                try {
                    System.out.println("Thread " + taskId + " is waiting for a permit.");
                    semaphore.acquire(); // 请求一个许可
                    System.out.println("Thread " + taskId + " gets a permit and starts working.");

                    // 模拟工作时间
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("Thread " + taskId + " was interrupted.");
                } finally {
                    System.out.println("Thread " + taskId + " releases the permit.");
                    semaphore.release(); // 完成后释放许可
                }
            }).start();
        }

        // 主线程等待所有子线程执行完毕
        try {
            TimeUnit.SECONDS.sleep(30); // 给予足够的时间让所有子线程完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

场景题:有5个人,在那赛跑,请你设计一个多线程的裁判程序给出他们赛跑的结果顺序,5个人的速度随机处理

package com.chstack.learning.concurrent.abc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class HorseRace {

    public static int RUNNER_NUM = 5;
    public static int RACE_LENGTH = 50;

    public static void main(String[] args) {

        CyclicBarrier barrier = new CyclicBarrier(RUNNER_NUM, () -> {
            System.out.println(RUNNER_NUM + "位选手均已准备就绪,开跑!");
        });

        for (int i = 0; i < RUNNER_NUM; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    barrier.await();
                    double speed = Math.random() * 5 + 5;
                    double usedTime = RACE_LENGTH / speed;
                    System.out.println("第" + index + "号选手的速度是" + speed + "米/秒");
                    // 修正:确保先计算 usedTime * 1000 再转为 long
                    Thread.sleep( (long) (usedTime * 1000) );
                    System.out.println("第" + index + "号选手用时" + usedTime + "秒");
                } catch (InterruptedException | BrokenBarrierException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }
}

 

This article was updated on