AnthonyZero's Bolg

JUC-Semaphore

Alt text

概念

Semaphore(信号量)用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的事情后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池。

举例说明:比如一条马路上要限制流量,只允许同时有一百辆车在这条路上行驶,其他的必须在路口等待。后面的车要获取许可证才能驶入(表示当前已被阻塞),如果前一百辆车中有5辆已经离开了这条马路,那么后面就允许5辆车(这5辆车在所有等待的车辆中获取了前面离开马路之后释放的许可证)驶入这条马路。

构造方法

Semaphore内部包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类Sync,其中Sync继承AQS

// permits 设置许可证的数量
public Semaphore(int permits) {
    // 默认非公平
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

如果初始化了一个许可为1的Semaphore,那么就相当于一个不可重入的互斥锁(Mutex).其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待

获取许可

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//父类AQS 中调用子类需要重写的tryAcquireShared方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //如果线程被中断了,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //获取许可失败,将线程加入到等待队列中
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
} 

//默认非公平的时候 调用 NonfairSync 下的 nonfairTryAcquireShared 方法 
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 获取当前 state 值
        // 也即是当前可用的许可数
        int available = getState();
        // 得到剩余许可数
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) //compareAndSetState 更新失败,说明有其他线程获取到许可
            return remaining;
    }
}

//公平的时候 
protected int tryAcquireShared(int acquires) {
    for (;;) {
        //如果前面有线程再等待,直接返回-1
        if (hasQueuedPredecessors())
            return -1;
        //后面与非公平一样
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

从上述代码中可以看到,公平模式下获取许可和非公平模式下基本类似,只是为了保证 FIFO ,添加了 hasQueuedPredecessors 判断限制。

释放许可

获取了许可,当用完之后就需要释放,Semaphore提供release()来释放许可。

public void release() {
    sync.releaseShared(1);
}

//老套路 调用AQS的releaseShared方法 
public final boolean releaseShared(int arg) {
    //尝试去释放锁
    if (tryReleaseShared(arg)) {
        //如果释放成功就唤醒其他线程获取许可证
        doReleaseShared();
        return true;
    }
    return false;
}

//内部类Sync重写了tryReleaseShared方法 公平和非公平都调用这个
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        //信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS改变许可数量成功,返回true
        if (compareAndSetState(current, next))
            return true;
    }
}

代码示例

public class SemaphoreExample {
    public static void main(String[] args) {
        int threadCount = 10; //线程数量
        Semaphore semaphore = new Semaphore(3); //只允许3个线程并发
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < threadCount; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire(); //获取一个许可证
                        System.out.println("当前可用的许可证数:"  + semaphore.availablePermits());
                        semaphore.release(); //释放许可证
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();
    }
}

当前可用的许可证数:1
当前可用的许可证数:1
当前可用的许可证数:1
当前可用的许可证数:0
当前可用的许可证数:2
当前可用的许可证数:2
当前可用的许可证数:2
当前可用的许可证数:2
当前可用的许可证数:2
当前可用的许可证数:2

总结

Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程进入同步队列将会被挂起阻塞;而一旦有一个线程释放一个资源(许可证),那么就有可能重新唤醒等待队列中的线程继续执行,