简介

Semaphore是一种同步辅助工具,翻译过来就是信号量,用来实现流量控制,它可以控制同一时间内对资源的访问次数.

无论是Synchroniezd还是ReentrantLock,一次都只允许一个线程访问一个资源,但是Semaphore可以指定多个线程同时访问某一个资源.

Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问,如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。

信号量上定义两种操作:

  • acquire(获取):当一个线程调用acquire操作时,它要么成功获取到信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时,Semaphore内部会维护一个等待队列用于存储这些被暂停的线程.

  • release(释放)实际上会将信号量的值+1,然后唤醒相应Sepmaphore实例的等待队列中的一个任意等待线程.

例子

5个线程抢3个车位,同时最多只有3个线程能抢到车位,等其他线程释放信号量后,才能抢到车位

	public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(3);
<pre><code>	for (int i = 0; i &lt; 5; i++) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					semaphore.acquire();//申请资源
					System.out.println(Thread.currentThread().getName()+"抢到车位");
					ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
					System.out.println(Thread.currentThread().getName()+"归还车位");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally {
				    //释放资源
					semaphore.release();
				}

			}
		},"线程"+i).start();
	}
}

  • Semaphore.acquire()和Semaphore.release()总是配对使用的,这点需要由应用代码自身保证.

  • Semaphore.release()调用应该放在finally块中,已避免应用代码出现异常的情况下,当前线程所获得的信号量无法返还.

  • 如果Semaphore构造器中的参数permits值设置为1,所创建的Semaphore相当于一个互斥锁.与其他互斥锁不同的是,这种互斥锁允许一个线程释放另外一个线程所持有的锁.因为一个线程可以在未执行过Semaphore.acquire()的情况下执行相应的Semaphore.release().

  • 默认情况下,Semaphore采用的是非公平性调度策略.

原理

Sync类

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;</p>
<pre><code>Sync(int permits) {
    setState(permits);
}

final int getPermits() {
    return getState();
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining &lt; 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next &lt; current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next &gt; current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

}

NofairSync类

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;</p>
<pre><code>NonfairSync(int permits) {
    super(permits);
}

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

}

FairSync类

static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;</p>
<pre><code>FairSync(int permits) {
    super(permits);
}

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining &lt; 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

}

初始化方法

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}</p>
<p>public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

如上所示,Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象.

true为公平策略,false为非公平策略

acquire方法

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//interrupted方法 判断当前线程是否中断,中断就抛出异常,不中断就往下走
if (Thread.interrupted())
throw new InterruptedException();
//tryAcquireShared方法最终回到了nonfairTryAcquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

这段代码是 Semaphore 内部 Sync 类的非公平尝试获取共享资源的方法 nonfairTryAcquireShared(int acquires) 的实现。

该方法使用一个无限循环 for (;;) 来不断尝试获取资源,直到成功为止。下面是代码的详细解释:

  1. int available = getState(); 获取当前可用的资源数量(状态变量的值)。

  2. int remaining = available - acquires; 尝试计算剩余的资源数量。将当前可用的资源数量减去要获取的资源数量,得到剩余的资源数量。

  3. if (remaining < 0 || compareAndSetState(available, remaining)) 判断是否满足获取资源的条件。如果剩余的资源数量小于 0(没有足够的资源可供获取),或者通过 compareAndSetState() 方法成功将状态变量的值从 available 修改为 remaining,则表示成功获取资源。

  4. return remaining; 返回剩余的资源数量。

该方法采用忙等待的方式进行尝试,通过不断循环和 CAS(比较并交换)操作来尝试更新状态变量的值。如果在循环过程中发生了竞争,即其他线程已经修改了状态变量的值,那么 CAS 操作会失败,循环会继续进行,直到成功获取到资源或满足获取条件为止。

需要注意的是,这段代码的实现是非公平的。在非公平模式下,尝试获取资源的线程不会考虑其他等待线程的顺序,而是直接尝试获取。这样可能导致新来的线程在竞争中获得了资源,而正在等待的线程被长时间地阻塞。相比之下,公平模式会按照线程的等待顺序进行获取。

上面是gpt生成的,我感觉没啥大问题


参考