Semaphore详解
简介
Semaphore是一种同步辅助工具,翻译过来就是信号量,用来实现流量控制,它可以控制同一时间内对资源的访问次数.
无论是Synchroniezd还是ReentrantLock,一次都只允许一个线程访问一个资源,但是Semaphore可以指定多个线程同时访问某一个资源.
Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问,如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。
信号量上定义两种操作:
acquire(获取):当一个线程调用acquire操作时,它要么成功获取到信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时,Semaphore内部会维护一个等待队列用于存储这些被暂停的线程.
release(释放)实际上会将信号量的值+1,然后唤醒相应Sepmaphore实例的等待队列中的一个任意等待线程.
例子
5个线程抢3个车位,同时最多只有3个线程能抢到车位,等其他线程释放信号量后,才能抢到车位
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
<pre><code> for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//申请资源
System.out.println(Thread.currentThread().getName()+"抢到车位");
ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
System.out.println(Thread.currentThread().getName()+"归还车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放资源
semaphore.release();
}
}
},"线程"+i).start();
}
}
Semaphore.acquire()和Semaphore.release()总是配对使用的,这点需要由应用代码自身保证.
Semaphore.release()调用应该放在finally块中,已避免应用代码出现异常的情况下,当前线程所获得的信号量无法返还.
如果Semaphore构造器中的参数permits值设置为1,所创建的Semaphore相当于一个互斥锁.与其他互斥锁不同的是,这种互斥锁允许一个线程释放另外一个线程所持有的锁.因为一个线程可以在未执行过Semaphore.acquire()的情况下执行相应的Semaphore.release().
默认情况下,Semaphore采用的是非公平性调度策略.
原理
Sync类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;</p>
<pre><code>Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
NofairSync类
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;</p>
<pre><code>NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
FairSync类
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;</p>
<pre><code>FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
初始化方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}</p>
<p>public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
如上所示,Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象.
true为公平策略,false为非公平策略
acquire方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//interrupted方法 判断当前线程是否中断,中断就抛出异常,不中断就往下走
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared方法最终回到了nonfairTryAcquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
这段代码是 Semaphore 内部 Sync 类的非公平尝试获取共享资源的方法 nonfairTryAcquireShared(int acquires) 的实现。
该方法使用一个无限循环 for (;;)
来不断尝试获取资源,直到成功为止。下面是代码的详细解释:
int available = getState();
获取当前可用的资源数量(状态变量的值)。int remaining = available - acquires;
尝试计算剩余的资源数量。将当前可用的资源数量减去要获取的资源数量,得到剩余的资源数量。if (remaining < 0 || compareAndSetState(available, remaining))
判断是否满足获取资源的条件。如果剩余的资源数量小于 0(没有足够的资源可供获取),或者通过 compareAndSetState() 方法成功将状态变量的值从 available 修改为 remaining,则表示成功获取资源。return remaining;
返回剩余的资源数量。
该方法采用忙等待的方式进行尝试,通过不断循环和 CAS(比较并交换)操作来尝试更新状态变量的值。如果在循环过程中发生了竞争,即其他线程已经修改了状态变量的值,那么 CAS 操作会失败,循环会继续进行,直到成功获取到资源或满足获取条件为止。
需要注意的是,这段代码的实现是非公平的。在非公平模式下,尝试获取资源的线程不会考虑其他等待线程的顺序,而是直接尝试获取。这样可能导致新来的线程在竞争中获得了资源,而正在等待的线程被长时间地阻塞。相比之下,公平模式会按照线程的等待顺序进行获取。
上面是gpt生成的,我感觉没啥大问题
参考
- 感谢你赐予我前进的力量