1. Overview
1.概述
In this article, we’ll see how we can get a lock on a specific key to prevent concurrent actions on that key without impeding actions on other keys.
在这篇文章中,我们将看到如何在一个特定的键上获得一个锁,以防止该键上的并发操作,而不妨碍其他键上的操作。
In general, we’ll want to implement two methods and understand how to manipulate them:
一般来说,我们要实现两种方法,并了解如何操作它们。
- void lock(String key)
- void unlock(String key)
For the simplicity of the tutorial, we’ll always suppose that our keys are Strings. You can replace them with the type of objects you need under the lone condition that equals and hashCode methods are correctly defined because we’ll use them as HashMap keys.
为了简单起见,我们总是假设我们的键是字符串。在equals和hashCode方法被正确定义的唯一条件下,你可以用你需要的对象类型来替换它们,因为我们将把它们作为HashMap键使用。
2. A Simple Mutually Exclusive Lock
2.一个简单的互斥锁
First, let’s suppose we want to block any requested action if the corresponding key is already in use. Here, we’ll rather define a boolean tryLock(String key) method instead of the lock method we had imagined.
首先,让我们假设我们想要阻止任何请求的动作,如果相应的键已经在使用中。在这里,我们宁愿定义一个boolean tryLock(String key)方法而不是我们想象中的lock方法。
Concretely, we aim to maintain a Set of keys that we’ll fill with the keys in use at any moment. Thus, when a new action is requested on a key, we’ll just have to refuse it if we find out that the key is already used by another thread.
具体来说,我们的目标是维护一个键的Set,我们将用任何时候都在使用的键来填充。因此,当一个新的动作被请求在一个键上时,如果我们发现这个键已经被另一个线程使用,我们将不得不拒绝它。
The problem we face here is that there is no thread-safe implementation of Set. Hence, we’ll use a Set backed by a ConcurrentHashMap. Using ConcurrentHashMap guarantees us data coherency in a multi-threaded environment.
我们在这里面临的问题是,没有线程安全的Set实现。因此,我们将使用一个由ConcurrentHashMap支持的Set。使用ConcurrentHashMap可以保证我们在多线程环境下的数据一致性。
Let’s see this in action:
让我们看看这个行动。
public class SimpleExclusiveLockByKey {
private static Set<String> usedKeys= ConcurrentHashMap.newKeySet();
public boolean tryLock(String key) {
return usedKeys.add(key);
}
public void unlock(String key) {
usedKeys.remove(key);
}
}
Here’s how we would use this class:
下面是我们将如何使用这个类。
String key = "key";
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
try {
lockByKey.tryLock(key);
// insert the code that needs to be executed only if the key lock is available
} finally { // CRUCIAL
lockByKey.unlock(key);
}
Let’s insist on the presence of the finally block: It is crucial to call the unlock method inside it. This way, even if our code throws an Exception within the try brackets, we’ll unlock the key.
让我们坚持finally块的存在。在其中调用unlock方法是至关重要的。这样,即使我们的代码在try括号内抛出一个Exception,我们也能解锁钥匙。
3. Acquire and Release Locks by Keys
3.通过钥匙获取和释放锁具
Now, let’s dig further into the problem and say we don’t want to simply refuse simultaneous actions on the same keys, but we’d rather have new incoming actions wait until the current action on the key finishes.
现在,让我们进一步挖掘这个问题,说我们不想简单地拒绝在相同的键上同时进行的操作,但我们宁愿让新进入的操作等待,直到键上的当前操作结束。
The application flow will be:
申请流程将是。
- the first thread asks for a lock on a key: it acquires the lock on the key
- the second thread asks for a lock on the same key: thread 2 is told to wait
- the first thread releases the lock on the key
- the second thread acquires the lock on the key and can execute its action
3.1. Define a Lock With a Thread Counter
3.1.定义一个带有线程计数器的锁
In this case, it sounds natural to use a Lock. In brief, a Lock is an object used for thread synchronization that allows blocking threads until it can be acquired. Lock is an interface – we’ll use a ReentrantLock, the base implementation for it.
在这种情况下,使用Lock听起来很自然。简而言之,Lock是一个用于线程同步的对象,它允许阻断线程,直到可以获得它。Lock是一个接口–我们将使用ReentrantLock,这是它的基础实现。
Let’s start by wrapping our Lock in an inner class. This class will be able to track the number of threads currently waiting to lock the key. It will expose two methods, one to increment the thread counter and another one to decrement it:
让我们先把我们的Lock包裹在一个内部类中。这个类将能够跟踪当前等待锁定钥匙的线程数量。它将暴露两个方法,一个用来增加线程计数器,另一个用来减少它。
private static class LockWrapper {
private final Lock lock = new ReentrantLock();
private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
private LockWrapper addThreadInQueue() {
numberOfThreadsInQueue.incrementAndGet();
return this;
}
private int removeThreadFromQueue() {
return numberOfThreadsInQueue.decrementAndGet();
}
}
3.2. Let the Lock Handle Queuing Threads
3.2.让锁处理排队线程
Furthermore, we’ll continue to use a ConcurrentHashMap. But instead of simply extracting the keys of the Map like we were doing before, we’ll use LockWrapper objects as values:
此外,我们将继续使用一个ConcurrentHashMap。但我们将使用LockWrapper对象作为值,而不是像之前那样简单地提取Map的键。
private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
When a thread wants to acquire a lock on a key, we’ll need to see if a LockWrapper is already present for this key:
当一个线程想要获得一个键上的锁时,我们需要看看这个键是否已经有一个LockWrapper。
- if not, we’ll instantiate a new LockWrapper for the given key with a counter set at 1
- if so, we’ll return the existing LockWrapper and increment its associated counter
Let’s see how this is done:
让我们看看这是如何做到的。
public void lock(String key) {
LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
lockWrapper.lock.lock();
}
The code is very concise due to the use of HashMap‘s compute method. Let’s give some details on the functioning of this method:
由于使用了HashMap的compute方法,该代码非常简洁。让我们来介绍一下这个方法的运作细节。
- the compute method is applied to the object locks with key as its first argument: the initial value corresponding to key in locks is retrieved
- the BiFunction given as the second argument of compute is applied to the key and the initial value: the result gives a new value
- the new value replaces the initial value for key key in locks
3.3. Unlock and Optionally Remove Map Entry
3.3.解锁并可选择删除地图条目
Additionally, when a thread releases a lock, we’ll decrement the number of threads associated with the LockWrapper. If the count is down to zero, then we’ll remove the key from the ConcurrentHashMap:
此外,当一个线程释放一个锁时,我们将递减与LockWrapper相关的线程数量。如果计数降至零,那么我们将从ConcurrentHashMap中删除该键。
public void unlock(String key) {
LockWrapper lockWrapper = locks.get(key);
lockWrapper.lock.unlock();
if (lockWrapper.removeThreadFromQueue() == 0) {
// NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
locks.remove(key, lockWrapper);
}
}
3.4. Summary
3.4.总结
In a nutshell, let’s see what our whole class finally looks like:
简而言之,让我们看看我们的整个班级最终是什么样子的。
public class LockByKey {
private static class LockWrapper {
private final Lock lock = new ReentrantLock();
private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
private LockWrapper addThreadInQueue() {
numberOfThreadsInQueue.incrementAndGet();
return this;
}
private int removeThreadFromQueue() {
return numberOfThreadsInQueue.decrementAndGet();
}
}
private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
public void lock(String key) {
LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
lockWrapper.lock.lock();
}
public void unlock(String key) {
LockWrapper lockWrapper = locks.get(key);
lockWrapper.lock.unlock();
if (lockWrapper.removeThreadFromQueue() == 0) {
// NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
locks.remove(key, lockWrapper);
}
}
}
The usage is quite similar to what we had before:
使用情况与我们以前的情况很相似。
String key = "key";
LockByKey lockByKey = new LockByKey();
try {
lockByKey.lock(key);
// insert your code here
} finally { // CRUCIAL
lockByKey.unlock(key);
}
4. Allow Multiple Actions at the Same Time
4.允许在同一时间进行多种操作
Last but not least, let’s consider another case: Instead of allowing only one thread to make an action for a given key at a time, we want to limit the number of threads allowed to act simultaneously on the same key to some integer n. To keep it simple, we’ll set n=2.
最后但同样重要的是,让我们考虑另一种情况。我们不想每次只允许一个线程对一个给定的键进行操作,而是想把允许同时对同一个键进行操作的线程数量限制在某个整数n。为了简单起见,我们将设定n=2。
Let’s describe our use case extensively:
让我们广泛地描述一下我们的用例。
- the first thread wants to acquire the lock on the key: it will be allowed to do so
- a second thread wants to acquire the same lock: it will be also be allowed
- a third thread requests a lock on the same key: it will have to queue until one of the first two threads releases its lock
Semaphores are made for this. A Semaphore is an object used to limit the number of threads simultaneously accessing a resource.
Semaphores就是为此而生的。Semaphore是一个用来限制同时访问一个资源的线程数量的对象。
The global functioning and the code look very similar to what we had with locks:
全局功能和代码看起来与我们的锁非常相似。
public class SimultaneousEntriesLockByKey {
private static final int ALLOWED_THREADS = 2;
private static ConcurrentHashMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
public void lock(String key) {
Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(ALLOWED_THREADS) : v);
semaphore.acquireUninterruptibly();
}
public void unlock(String key) {
Semaphore semaphore = semaphores.get(key);
semaphore.release();
if (semaphore.availablePermits() == ALLOWED_THREADS) {
semaphores.remove(key, semaphore);
}
}
}
The usage is identical:
使用方法是相同的。
String key = "key";
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
try {
lockByKey.lock(key);
// insert your code here
} finally { // CRUCIAL
lockByKey.unlock(key);
}
5. Conclusion
5.总结
In this article, we’ve seen how we could put locks on keys to either totally impede concurrent actions or limit the number of concurrent actions to one (using locks) or more (using semaphores).
在这篇文章中,我们已经看到了我们如何在钥匙上加锁,以完全阻碍并发操作,或者将并发操作的数量限制在一个(使用锁)或更多(使用信号灯)。
As always, the code is available over on GitHub.
像往常一样,代码可在GitHub上获得。