Semaphores in Java – Java中的semaphores

最后修改: 2017年 7月 4日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this quick tutorial, we’ll explore the basics of semaphores and mutexes in Java.

在这个快速教程中,我们将探讨Java中semaphores和mutexes的基础知识。

2. Semaphore

2.Semaphore

We’ll start with java.util.concurrent.Semaphore. We can use semaphores to limit the number of concurrent threads accessing a specific resource.

我们将从java.util.concurrent.Semaphore开始。我们可以使用semaphores来限制访问特定资源的并发线程的数量。

In the following example, we will implement a simple login queue to limit the number of users in the system:

在下面的例子中,我们将实现一个简单的登录队列来限制系统中的用户数量。

class LoginQueueUsingSemaphore {

    private Semaphore semaphore;

    public LoginQueueUsingSemaphore(int slotLimit) {
        semaphore = new Semaphore(slotLimit);
    }

    boolean tryLogin() {
        return semaphore.tryAcquire();
    }

    void logout() {
        semaphore.release();
    }

    int availableSlots() {
        return semaphore.availablePermits();
    }

}

Notice how we used the following methods:

注意我们是如何使用以下方法的。

  • tryAcquire() – return true if a permit is available immediately and acquire it otherwise return false, but acquire() acquires a permit and blocking until one is available
  • release() – release a permit
  • availablePermits() – return number of current permits available

To test our login queue, we will first try to reach the limit and check if the next login attempt will be blocked:

为了测试我们的登录队列,我们将首先尝试达到极限,并检查下一次登录尝试是否会被阻止。

@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();

    assertEquals(0, loginQueue.availableSlots());
    assertFalse(loginQueue.tryLogin());
}

Next, we will see if any slots are available after a logout:

接下来,我们将看看在注销后是否有任何槽位可用。

@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
    int slots = 10;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(loginQueue::tryLogin));
    executorService.shutdown();
    assertEquals(0, loginQueue.availableSlots());
    loginQueue.logout();

    assertTrue(loginQueue.availableSlots() > 0);
    assertTrue(loginQueue.tryLogin());
}

3. Timed Semaphore

3.定时的Semaphore

Next, we will discuss Apache Commons TimedSemaphore. TimedSemaphore allows a number of permits as a simple Semaphore but in a given period of time, after this period the time reset and all permits are released.

接下来,我们将讨论Apache Commons TimedSemaphore。TimedSemaphore允许一些许可作为一个简单的Semaphore,但在一个给定的时间段内,在这个时间段之后,时间重置,所有的许可被释放。

We can use TimedSemaphore to build a simple delay queue as follows:

我们可以使用TimedSemaphore来建立一个简单的延迟队列,如下所示。

class DelayQueueUsingTimedSemaphore {

    private TimedSemaphore semaphore;

    DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
        semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
    }

    boolean tryAdd() {
        return semaphore.tryAcquire();
    }

    int availableSlots() {
        return semaphore.getAvailablePermits();
    }

}

When we use a delay queue with one second as time period and after using all the slots within one second, none should be available:

当我们使用一个以一秒为时间段的延迟队列时,在一秒内用完所有的槽位后,就不应该再有空闲。

public void givenDelayQueue_whenReachLimit_thenBlocked() {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue 
      = new DelayQueueUsingTimedSemaphore(1, slots);
    
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();

    assertEquals(0, delayQueue.availableSlots());
    assertFalse(delayQueue.tryAdd());
}

But after sleeping for some time, the semaphore should reset and release the permits:

但在睡眠一段时间后,信号灯应该重置并释放许可

@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
    int slots = 50;
    ExecutorService executorService = Executors.newFixedThreadPool(slots);
    DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
    IntStream.range(0, slots)
      .forEach(user -> executorService.execute(delayQueue::tryAdd));
    executorService.shutdown();

    assertEquals(0, delayQueue.availableSlots());
    Thread.sleep(1000);
    assertTrue(delayQueue.availableSlots() > 0);
    assertTrue(delayQueue.tryAdd());
}

4. Semaphore vs. Mutex

4.Semaphore与Mutex

Mutex acts similarly to a binary semaphore, we can use it to implement mutual exclusion.

Mutex的作用类似于二进制信号,我们可以用它来实现互斥。

In the following example, we’ll use a simple binary semaphore to build a counter:

在下面的例子中,我们将使用一个简单的二进制信号灯来建立一个计数器。

class CounterUsingMutex {

    private Semaphore mutex;
    private int count;

    CounterUsingMutex() {
        mutex = new Semaphore(1);
        count = 0;
    }

    void increase() throws InterruptedException {
        mutex.acquire();
        this.count = this.count + 1;
        Thread.sleep(1000);
        mutex.release();

    }

    int getCount() {
        return this.count;
    }

    boolean hasQueuedThreads() {
        return mutex.hasQueuedThreads();
    }
}

When a lot of threads try to access the counter at once, they’ll simply be blocked in a queue:

当很多线程试图同时访问计数器时,它们就会被挡在队列中

@Test
public void whenMutexAndMultipleThreads_thenBlocked()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();

    assertTrue(counter.hasQueuedThreads());
}

When we wait, all threads will access the counter and no threads left in the queue:

当我们等待时,所有的线程都会访问计数器,队列中没有线程留下。

@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount()
 throws InterruptedException {
    int count = 5;
    ExecutorService executorService
     = Executors.newFixedThreadPool(count);
    CounterUsingMutex counter = new CounterUsingMutex();
    IntStream.range(0, count)
      .forEach(user -> executorService.execute(() -> {
          try {
              counter.increase();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }));
    executorService.shutdown();

    assertTrue(counter.hasQueuedThreads());
    Thread.sleep(5000);
    assertFalse(counter.hasQueuedThreads());
    assertEquals(count, counter.getCount());
}

5. Conclusion

5.结论

In this article, we explored the basics of semaphores in Java.

在这篇文章中,我们探讨了Java中信号灯的基础知识。

As always, the full source code is available over on GitHub.

一如既往,完整的源代码可在GitHub上获得