概念
Semaphore(信号量)用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的事情后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池。
举例说明:比如一条马路上要限制流量,只允许同时有一百辆车在这条路上行驶,其他的必须在路口等待。后面的车要获取许可证才能驶入(表示当前已被阻塞),如果前一百辆车中有5辆已经离开了这条马路,那么后面就允许5辆车(这5辆车在所有等待的车辆中获取了前面离开马路之后释放的许可证)驶入这条马路。
构造方法
Semaphore内部包含公平锁(FairSync)和非公平锁(NonfairSync),继承内部类Sync,其中Sync继承AQS
// permits 设置许可证的数量
public Semaphore(int permits) {
// 默认非公平
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
如果初始化了一个许可为1的Semaphore,那么就相当于一个不可重入的互斥锁(Mutex).其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待
获取许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//父类AQS 中调用子类需要重写的tryAcquireShared方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果线程被中断了,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//获取许可失败,将线程加入到等待队列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//默认非公平的时候 调用 NonfairSync 下的 nonfairTryAcquireShared 方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取当前 state 值
// 也即是当前可用的许可数
int available = getState();
// 得到剩余许可数
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) //compareAndSetState 更新失败,说明有其他线程获取到许可
return remaining;
}
}
//公平的时候
protected int tryAcquireShared(int acquires) {
for (;;) {
//如果前面有线程再等待,直接返回-1
if (hasQueuedPredecessors())
return -1;
//后面与非公平一样
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
从上述代码中可以看到,公平模式下获取许可和非公平模式下基本类似,只是为了保证 FIFO ,添加了 hasQueuedPredecessors 判断限制。
释放许可
获取了许可,当用完之后就需要释放,Semaphore提供release()来释放许可。
public void release() {
sync.releaseShared(1);
}
//老套路 调用AQS的releaseShared方法
public final boolean releaseShared(int arg) {
//尝试去释放锁
if (tryReleaseShared(arg)) {
//如果释放成功就唤醒其他线程获取许可证
doReleaseShared();
return true;
}
return false;
}
//内部类Sync重写了tryReleaseShared方法 公平和非公平都调用这个
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS改变许可数量成功,返回true
if (compareAndSetState(current, next))
return true;
}
}
代码示例
public class SemaphoreExample {
public static void main(String[] args) {
int threadCount = 10; //线程数量
Semaphore semaphore = new Semaphore(3); //只允许3个线程并发
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire(); //获取一个许可证
System.out.println("当前可用的许可证数:" + semaphore.availablePermits());
semaphore.release(); //释放许可证
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
}
当前可用的许可证数:1
当前可用的许可证数:1
当前可用的许可证数:1
当前可用的许可证数:0
当前可用的许可证数:2
当前可用的许可证数:2
当前可用的许可证数:2
当前可用的许可证数:2
当前可用的许可证数:2
当前可用的许可证数:2
总结
Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程进入同步队列将会被挂起阻塞;而一旦有一个线程释放一个资源(许可证),那么就有可能重新唤醒等待队列中的线程继续执行,