Java 中的 Semaphore
1.概述
本文中,我们将探讨 Java 中信号量(semphore)和 mutex 的基础。
2. Semaphore
我们将从 java.util.concurrent.Semaphore
开始。我们可以使用信号量(semaphore)来限制访问特定资源的并发线程的数量。
在以下示例中,我们将实现一个简单的登录队列来限制系统中的用户数量:
class LoginQueueUsingSemaphore {
private Semaphore semaphore;
public LoginQueueUsingSemaphore(int slotLimit) {
semaphore = new Semaphore(slotLimit);
}
boolean tryLogin() {
return semaphore.tryAcquire();
}
void logout() {
semaphore.release();
}
int availableSlots() {
return semaphore.availablePermits();
}
}
请注意,我们是如何使用下面方法的:
tryAcquire()
– 如果许可立即可用,则返回true
,否则返回false
,但acquire()
获取许可并阻塞直到许可可用。release()
– 释放许可availablePermits()
– 返回当前可用许可的数量
为了测试我们的登录队列,我们将首先尝试使之达到限制,并检查下一次登录尝试是否会被阻塞:
@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
assertFalse(loginQueue.tryLogin());
}
接下来,我们将查看退出登录后是否有可用的插槽:
@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
loginQueue.logout();
assertTrue(loginQueue.availableSlots() > 0);
assertTrue(loginQueue.tryLogin());
}
3. 限时 Semaphore
接下来,我们将讨论 Apache Commons TimedSemaphore
。TimedSemaphor
e允许将多个许可作为一个简单的信号量,但该信号量在给定的时间段内有效,在该时间段之后,时间重置和所有许可都会被释放。
我们可以使用 TimedSemaphore
来构建一个简单的延迟队列,如下所示:
class DelayQueueUsingTimedSemaphore {
private TimedSemaphore semaphore;
DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
}
boolean tryAdd() {
return semaphore.tryAcquire();
}
int availableSlots() {
return semaphore.getAvailablePermits();
}
}
当我们使用一秒钟为时间周期的延迟队列时,一秒钟内使用所有插槽后,应该没有可用的插槽:
public void givenDelayQueue_whenReachLimit_thenBlocked() {
int slots = 50;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
DelayQueueUsingTimedSemaphore delayQueue
= new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
assertFalse(delayQueue.tryAdd());
}
不过在休眠一段时间后,semaphore 应该重置并释放许可:
@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
int slots = 50;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
Thread.sleep(1000);
assertTrue(delayQueue.availableSlots() > 0);
assertTrue(delayQueue.tryAdd());
}
4. Semaphore vs. Mutex
Mutex 作用类似于二进制信号量,我们可以使用它来实现互斥。
在下面的例子中,我们将使用一个简单的二进制信号量(semaphore)来构建一个计数器(counter):
class CounterUsingMutex {
private Semaphore mutex;
private int count;
CounterUsingMutex() {
mutex = new Semaphore(1);
count = 0;
}
void increase() throws InterruptedException {
mutex.acquire();
this.count = this.count + 1;
Thread.sleep(1000);
mutex.release();
}
int getCount() {
return this.count;
}
boolean hasQueuedThreads() {
return mutex.hasQueuedThreads();
}
}
当许多线程试图同时访问计数器时,它们只会被阻塞在队列中:
@Test
public void whenMutexAndMultipleThreads_thenBlocked()
throws InterruptedException {
int count = 5;
ExecutorService executorService
= Executors.newFixedThreadPool(count);
CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();
assertTrue(counter.hasQueuedThreads());
}
当我们等待时,所有线程都将访问计数器,队列中没有剩余线程:
@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount()
throws InterruptedException {
int count = 5;
ExecutorService executorService
= Executors.newFixedThreadPool(count);
CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();
assertTrue(counter.hasQueuedThreads());
Thread.sleep(5000);
assertFalse(counter.hasQueuedThreads());
assertEquals(count, counter.getCount());
}
5. 结论
本文中,我们探讨了 Java 中 semaphore 的基础。