1. Overview
1.概述
The java.util.concurrent package provides tools for creating concurrent applications.
java.util.concurrent包提供了创建并发应用程序的工具。
In this article, we will do an overview of the whole package.
在这篇文章中,我们将对整个软件包做一个概述。
2. Main Components
2.主要部件
The java.util.concurrent contains way too many features to discuss in a single write-up. In this article, we will mainly focus on some of the most useful utilities from this package like:
java.util.concurrent包含的功能太多,无法在一篇文章中讨论。在这篇文章中,我们将主要关注这个包中最有用的一些工具,比如。
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
- BlockingQueue
- DelayQueue
- Locks
- Phaser
You can also find many dedicated articles to individual classes here.
你还可以在这里找到许多关于个别班级的专门文章。
2.1. Executor
2.1.执行者
Executor is an interface that represents an object that executes provided tasks.
Executor是一个接口,代表一个执行所提供任务的对象。
It depends on the particular implementation (from where the invocation is initiated) if the task should be run on a new or current thread. Hence, using this interface, we can decouple the task execution flow from the actual task execution mechanism.
如果任务应该在新线程或当前线程上运行,这取决于特定的实现(从调用开始的地方)。因此,使用这个接口,我们可以将任务执行流程与实际的任务执行机制解耦。
One point to note here is that Executor does not strictly require the task execution to be asynchronous. In the simplest case, an executor can invoke the submitted task instantly in the invoking thread.
这里需要注意的一点是,Executor并不严格要求任务的执行是异步的。在最简单的情况下,执行者可以在调用的线程中即时调用已提交的任务。
We need to create an invoker to create the executor instance:
我们需要创建一个调用器来创建执行器实例。
public class Invoker implements Executor {
@Override
public void execute(Runnable r) {
r.run();
}
}
Now, we can use this invoker to execute the task.
现在,我们可以使用这个调用器来执行任务。
public void execute() {
Executor executor = new Invoker();
executor.execute( () -> {
// task to be performed
});
}
Point to note here is that if the executor can’t accept the task for execution, it will throw RejectedExecutionException.
这里需要注意的是,如果执行器不能接受任务的执行,它将抛出RejectedExecutionException。
2.2. ExecutorService
2.2.ExecutorService
ExecutorService is a complete solution for asynchronous processing. It manages an in-memory queue and schedules submitted tasks based on thread availability.
ExecutorService是一个用于异步处理的完整解决方案。它管理着一个内存队列,并根据线程的可用性来安排提交的任务。
To use ExecutorService, we need to create one Runnable class.
为了使用ExecutorService,我们需要创建一个Runnable类。
public class Task implements Runnable {
@Override
public void run() {
// task details
}
}
Now we can create the ExecutorService instance and assign this task. At the time of creation, we need to specify the thread-pool size.
现在我们可以创建ExecutorService实例并分配这个任务。在创建时,我们需要指定线程池的大小。
ExecutorService executor = Executors.newFixedThreadPool(10);
If we want to create a single-threaded ExecutorService instance, we can use newSingleThreadExecutor(ThreadFactory threadFactory) to create the instance.
如果我们想创建一个单线程的ExecutorService实例,我们可以使用newSingleThreadExecutor(ThreadFactory threadFactory)来创建实例。
Once the executor is created, we can use it to submit the task.
一旦创建了执行器,我们就可以用它来提交任务。
public void execute() {
executor.submit(new Task());
}
We can also create the Runnable instance while submitting the task.
我们也可以在提交任务时创建Runnable实例。
executor.submit(() -> {
new Task();
});
It also comes with two out-of-the-box execution termination methods. The first one is shutdown(); it waits until all the submitted tasks finish executing. The other method is shutdownNow() which attempts to terminate all actively executing tasks and halts the processing of waiting tasks.
它还配备了两个开箱即用的执行终止方法。第一个是shutdown();它一直等到所有提交的任务执行完毕。另一个方法是shutdownNow(),它试图终止所有正在执行的任务,并停止对等待任务的处理。
There is also another method awaitTermination(long timeout, TimeUnit unit) which forcefully blocks until all tasks have completed execution after a shutdown event triggered or execution-timeout occurred, or the execution thread itself is interrupted,
还有另一个方法awaitTermination(long timeout, TimeUnit unit),在关闭事件触发或执行超时发生后,或执行线程本身被中断后,强制阻塞,直到所有任务完成执行。
try {
executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
e.printStackTrace();
}
2.3. ScheduledExecutorService
2.3.ScheduledExecutorService
ScheduledExecutorService is a similar interface to ExecutorService, but it can perform tasks periodically.
ScheduledExecutorService是一个与ExecutorService类似的接口,但它可以定期执行任务。
Executor and ExecutorService‘s methods are scheduled on the spot without introducing any artificial delay. Zero or any negative value signifies that the request needs to be executed instantly.
Executor和ExecutorService的方法是当场安排的,没有引入任何人为的延迟。零或任何负值标志着请求需要立即执行。
We can use both Runnable and Callable interface to define the task.
我们可以使用Runnable和Callable接口来定义任务。
public void execute() {
ScheduledExecutorService executorService
= Executors.newSingleThreadScheduledExecutor();
Future<String> future = executorService.schedule(() -> {
// ...
return "Hello world";
}, 1, TimeUnit.SECONDS);
ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
// ...
}, 1, TimeUnit.SECONDS);
executorService.shutdown();
}
ScheduledExecutorService can also schedule the task after some given fixed delay:
ScheduledExecutorService也可以安排任务在某个给定的固定延迟之后。
executorService.scheduleAtFixedRate(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
executorService.scheduleWithFixedDelay(() -> {
// ...
}, 1, 10, TimeUnit.SECONDS);
Here, the scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit ) method creates and executes a periodic action that is invoked firstly after the provided initial delay, and subsequently with the given period until the service instance shutdowns.
这里,scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit )方法创建并执行一个定期动作,该动作首先在提供的初始延迟后被调用,随后以给定的周期被调用,直到服务实例关闭。
The scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit ) method creates and executes a periodic action that is invoked firstly after the provided initial delay, and repeatedly with the given delay between the termination of the executing one and the invocation of the next one.
scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit )方法创建并执行一个周期性动作,该动作首先在提供的初始延迟后被调用,并在终止执行一个动作和调用下一个动作之间重复给出延迟。
2.4. Future
2.4.未来
Future is used to represent the result of an asynchronous operation. It comes with methods for checking if the asynchronous operation is completed or not, getting the computed result, etc.
Future用于表示异步操作的结果。它带有检查异步操作是否完成、获取计算结果的方法,等等。
What’s more, the cancel(boolean mayInterruptIfRunning) API cancels the operation and releases the executing thread. If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly.
更重要的是,cancel(boolean mayInterruptIfRunning) API取消了操作并释放了执行的线程。如果mayInterruptIfRunning的值为真,执行任务的线程将被立即终止。
Otherwise, in-progress tasks will be allowed to complete.
否则,正在进行的任务将被允许完成。
We can use below code snippet to create a future instance:
我们可以使用下面的代码片断来创建一个未来的实例。
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}
We can use following code snippet to check if the future result is ready and fetch the data if the computation is done:
我们可以使用下面的代码片断来检查未来的结果是否准备好了,如果计算完成了,就取回数据。
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
We can also specify a timeout for a given operation. If the task takes more than this time, a TimeoutException is thrown:
我们还可以为一个给定的操作指定一个超时时间。如果任务花费的时间超过了这个时间,就会抛出一个TimeoutException。
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
2.5. CountDownLatch
2.5.CountDownLatch
CountDownLatch (introduced in JDK 5) is a utility class which blocks a set of threads until some operation completes.
CountDownLatch(在JDK 5中引入)是一个实用类,它可以阻塞一组线程,直到某些操作完成。
A CountDownLatch is initialized with a counter(Integer type); this counter decrements as the dependent threads complete execution. But once the counter reaches zero, other threads get released.
一个CountDownLatch被初始化为一个counter(Integer类型);这个计数器随着依赖线程完成执行而递减。但一旦计数器达到零,其他线程就会被释放。
You can learn more about CountDownLatch here.
您可以在CountDownLatch这里了解更多信息。
2.6. CyclicBarrier
2.6.CyclicBarrier
CyclicBarrier works almost the same as CountDownLatch except that we can reuse it. Unlike CountDownLatch, it allows multiple threads to wait for each other using await() method(known as barrier condition) before invoking the final task.
CyclicBarrier的工作原理与CountDownLatch几乎相同,只是我们可以重复使用它。与CountDownLatch不同的是,它允许多个线程在调用最终任务之前使用await()方法(被称为障碍条件)互相等待。
We need to create a Runnable task instance to initiate the barrier condition:
我们需要创建一个Runnable任务实例来启动障碍条件。
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
Now we can invoke some threads to race for the barrier condition:
现在我们可以调用一些线程来为障碍条件进行竞赛。
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
Here, the isBroken() method checks if any of the threads got interrupted during the execution time. We should always perform this check before performing the actual process.
这里,isBroken()方法检查是否有任何线程在执行期间被中断。我们应该在执行实际进程之前始终执行这一检查。
2.7. Semaphore
2.7.Semaphore
The Semaphore is used for blocking thread level access to some part of the physical or logical resource. A semaphore contains a set of permits; whenever a thread tries to enter the critical section, it needs to check the semaphore if a permit is available or not.
Semaphore用于阻止线程级别对物理或逻辑资源的某些部分的访问。一个semaphore包含一组许可;每当一个线程试图进入关键部分,它需要检查semaphore是否有许可。
If a permit is not available (via tryAcquire()), the thread is not allowed to jump into the critical section; however, if the permit is available the access is granted, and the permit counter decreases.
如果许可证不可用(通过tryAcquire()),线程不允许跳转到关键部分;然而,如果许可证可用,则允许访问,并且许可证计数器会减少。
Once the executing thread releases the critical section, again the permit counter increases (done by release() method).
一旦执行线程释放了关键部分,许可计数器再次增加(由release()方法完成)。
We can specify a timeout for acquiring access by using the tryAcquire(long timeout, TimeUnit unit) method.
我们可以通过使用tryAcquire(long timeout, TimeUnit unit)方法来指定获取访问的超时。
We can also check the number of available permits or the number of threads waiting to acquire the semaphore.
我们还可以检查可用的许可数量或等待获取信号的线程数量。。
Following code snippet can be used to implement a semaphore:
下面的代码片断可以用来实现一个信号。
static Semaphore semaphore = new Semaphore(10);
public void execute() throws InterruptedException {
LOG.info("Available permit : " + semaphore.availablePermits());
LOG.info("Number of threads waiting to acquire: " +
semaphore.getQueueLength());
if (semaphore.tryAcquire()) {
try {
// ...
}
finally {
semaphore.release();
}
}
}
We can implement a Mutex like data-structure using Semaphore. More details on this can be found here.
我们可以使用Semaphore实现类似于Mutex的数据结构。关于这个的更多细节可以在这里找到。
2.8. ThreadFactory
2.8.ThreadFactory
As the name suggests, ThreadFactory acts as a thread (non-existing) pool which creates a new thread on demand. It eliminates the need of a lot of boilerplate coding for implementing efficient thread creation mechanisms.
顾名思义,ThreadFactory就像一个线程(不存在)池,它可以根据需求创建一个新线程。它消除了实现高效线程创建机制所需的大量模板编码。
We can define a ThreadFactory:
我们可以定义一个ThreadFactory。
public class BaeldungThreadFactory implements ThreadFactory {
private int threadId;
private String name;
public BaeldungThreadFactory(String name) {
threadId = 1;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, name + "-Thread_" + threadId);
LOG.info("created new thread with id : " + threadId +
" and name : " + t.getName());
threadId++;
return t;
}
}
We can use this newThread(Runnable r) method to create a new thread at runtime:
我们可以使用这个newThread(Runnable r)方法来在运行时创建一个新的线程。
BaeldungThreadFactory factory = new BaeldungThreadFactory(
"BaeldungThreadFactory");
for (int i = 0; i < 10; i++) {
Thread t = factory.newThread(new Task());
t.start();
}
2.9. BlockingQueue
2.9.BlockingQueue
In asynchronous programming, one of the most common integration patterns is the producer-consumer pattern. The java.util.concurrent package comes with a data-structure know as BlockingQueue – which can be very useful in these async scenarios.
在异步编程中,最常见的集成模式之一是生产者-消费者模式>。java.util.concurrent包附带了一个被称为BlockingQueue的数据结构,它在这些异步场景中非常有用。
More information and a working example on this is available here.
有关这方面的更多信息和工作实例可在此处获得。
2.10. DelayQueue
2.10.延迟队列
DelayQueue is an infinite-size blocking queue of elements where an element can only be pulled if it’s expiration time (known as user defined delay) is completed. Hence, the topmost element (head) will have the most amount delay and it will be polled last.
DelayQueue是一个无限大的元素阻塞队列,只有当一个元素的过期时间(被称为用户定义的延迟)完成后才能被拉出。因此,最上面的元素(head)将有最多的延迟量,它将被最后轮询。
More information and a working example on this is available here.
关于这个问题的更多信息和工作实例可在这里获得。
2.11. Locks
2.11.锁
Not surprisingly, Lock is a utility for blocking other threads from accessing a certain segment of code, apart from the thread that’s executing it currently.
毫不奇怪,Lock是一个用于阻止其他线程访问某段代码的工具,除了目前正在执行的线程之外。
The main difference between a Lock and a Synchronized block is that synchronized block is fully contained in a method; however, we can have Lock API’s lock() and unlock() operation in separate methods.
锁和同步块的主要区别在于,同步块完全包含在一个方法中;但是,我们可以在不同的方法中进行Lock API的lock()和unlock()操作。
More information and a working example on this is available here.
关于这一点的更多信息和工作实例可在此处获得。
2.12. Phaser
2.12. Phaser
Phaser is a more flexible solution than CyclicBarrier and CountDownLatch – used to act as a reusable barrier on which the dynamic number of threads need to wait before continuing execution. We can coordinate multiple phases of execution, reusing a Phaser instance for each program phase.
Phaser是一个比CyclicBarrier和CountDownLatch更灵活的解决方案–用于充当一个可重复使用的屏障,在继续执行之前,动态数量的线程需要在上面等待。我们可以协调多个执行阶段,为每个程序阶段重复使用一个Phaser实例。
More information and a working example on this is available here.
有关这方面的更多信息和工作实例可在这里获得。
3. Conclusion
3.结论
In this high-level, overview article, we’ve focused on the different utilities available of java.util.concurrent package.
在这篇高层次的综述文章中,我们重点介绍了java.util.concurrent包中的不同实用工具。
As always, the full source code is available over on GitHub.
一如既往,完整的源代码可在GitHub上获得,。