Guide to Work Stealing in Java – Java中的偷工减料指南

最后修改: 2020年 1月 24日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

In this tutorial, we’ll look at the concept of work stealing in Java.

在本教程中,我们将研究Java中的偷工减料概念

2. What Is Work Stealing?

2.什么是偷工减料?

Work stealing was introduced in Java with the aim of reducing contention in multi-threaded applications. This is done using the fork/join framework.

偷工减料是在Java中引入的,目的是减少多线程应用程序中的争用。这是通过fork/join框架实现的。

2.1. Divide and Conquer Approach

2.1.分而治之的方法

In the fork/join framework, problems or tasks are recursively broken down into sub-tasks. The sub-tasks are then solved individually, with the sub-results combined to form the result:

在fork/join框架中,问题或任务被递归地分解为子任务。然后,这些子任务被单独解决,子任务的结果被结合起来,形成结果。

Result solve(Problem problem) {
    if (problem is small)
        directly solve problem
    else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

2.2. Worker Threads

2.2.工作线程

The broken-down task is solved with the help of worker threads provided by a thread pool. Each worker thread will have sub-tasks it’s responsible for. These are stored in double-ended queues (deques).

线程池提供的工作线程的帮助下,分解的任务得到解决。每个工人线程都会有它所负责的子任务。这些任务被存储在双端队列(deques)中。

Each worker thread gets sub-tasks from its deque by continuously popping a sub-task off the top of the deque. When a worker thread’s deque is empty, it means that all the sub-tasks have been popped off and completed.

每个工作线程通过不断地从deque的顶部弹出一个子任务,从其deque中获得子任务。当一个工作线程的deque为空时,意味着所有的子任务都已经被弹出并完成。

At this point, the worker thread randomly selects a peer thread-pool thread it can “steal” work from. It then uses the first-in, first-out approach (FIFO) to take sub-tasks from the tail end of the victim’s deque.

在这一点上,工人线程随机选择一个它可以 “偷 “工作的同行线程池线程。然后,它使用先进先出的方法(FIFO),从受害者的deque的尾部抽取子任务。

3. Fork/Join Framework Implementation

3.分叉/联合框架的实施

We can create a work-stealing thread pool using either the ForkJoinPool class or the Executors class:

我们可以使用ForkJoinPoolExecutors类创建一个偷工减料的线程池:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
ExecutorService workStealingPool = Executors.newWorkStealingPool();

The Executors class has an overloaded newWorkStealingPool method, which takes an integer argument representing the level of parallelism.

Executors类有一个重载的newWorkStealingPool方法,该方法需要一个整数参数,代表平行化水平

Executors.newWorkStealingPool is an abstraction of ForkJoinPool.commonPool. The only difference is that Executors.newWorkStealingPool creates a pool in asynchronous mode and ForkJoinPool.commonPool doesn’t.

Executors.newWorkStealingPoolForkJoinPool.commonPool的一个抽象概念。唯一的区别是,Executors.newWorkStealingPool在异步模式下创建一个池,而ForkJoinPool.commonPool则没有。

4. Synchronous vs Asynchronous Thread Pools

4.同步与异步线程池的关系

ForkJoinPool.commonPool uses a last-in, first-out (LIFO) queue configuration, whereas Executors.newWorkStealingPool uses first-in, first-out approach (FIFO) one.

ForkJoinPool.commonPool使用后进先出(LIFO)队列配置,而Executors.newWorkStealingPool使用先进先出(FIFO)方法。

According to Doug Lea, the FIFO approach has these advantages over LIFO:

根据Doug Lea的说法,先进先出的方法比后进先出的方法有这些优势。

  • It reduces contention by having stealers operate on the opposite side of the deque as owners
  • It exploits the property of recursive divide−and−conquer algorithms of generating “large” tasks early

The second point above means that it is possible to further break down an older stolen task by a thread that stole it.

上述第二点意味着有可能通过偷窃任务的线程来进一步分解一个较早的偷窃任务。

As per the Java documentation, setting asyncMode to true may be suitable for use with event-style tasks that are never joined.

根据Java文档,将asyncMode设置为true可能适合用于从未加入的事件式任务。

5. Working Example – Finding Prime Numbers

5.工作实例–寻找素数

We’ll use the example of finding prime numbers from a collection of numbers to show the computation time benefits of the work-stealing framework. We’ll also show the differences between using synchronous and asynchronous thread pools.

我们将使用从一个数字集合中寻找素数的例子来展示偷工减料框架的计算时间优势。我们还将展示使用同步和异步线程池的区别。

5.1. The Prime Numbers Problem

5.1.素数问题

Finding prime numbers from a collection of numbers can be a computationally expensive process. This is mainly due to the size of the collection of numbers.

从一个数字集合中寻找素数是一个计算上很昂贵的过程。这主要是由于数字集合的大小所决定的。

The PrimeNumbers class helps us find prime numbers:

PrimeNumbers类帮助我们找到素数。

public class PrimeNumbers extends RecursiveAction {

    private int lowerBound;
    private int upperBound;
    private int granularity;
    static final List<Integer> GRANULARITIES
      = Arrays.asList(1, 10, 100, 1000, 10000);
    private AtomicInteger noOfPrimeNumbers;

    PrimeNumbers(int lowerBound, int upperBound, int granularity, AtomicInteger noOfPrimeNumbers) {
        this.lowerBound = lowerBound;
        this.upperBound = upperBound;
        this.granularity = granularity;
        this.noOfPrimeNumbers = noOfPrimeNumbers;
    }

    // other constructors and methods

    private List<PrimeNumbers> subTasks() {
        List<PrimeNumbers> subTasks = new ArrayList<>();

        for (int i = 1; i <= this.upperBound / granularity; i++) {
            int upper = i * granularity;
            int lower = (upper - granularity) + 1;
            subTasks.add(new PrimeNumbers(lower, upper, noOfPrimeNumbers));
        }
        return subTasks;
    }

    @Override
    protected void compute() {
        if (((upperBound + 1) - lowerBound) > granularity) {
            ForkJoinTask.invokeAll(subTasks());
        } else {
            findPrimeNumbers();
        }
    }

    void findPrimeNumbers() {
        for (int num = lowerBound; num <= upperBound; num++) {
            if (isPrime(num)) {
                noOfPrimeNumbers.getAndIncrement();
            }
        }
    }

    public int noOfPrimeNumbers() {
        return noOfPrimeNumbers.intValue();
    }
}

A few important things to note about this class:

关于这个班级,有几件重要的事情需要注意。

  • It extends RecursiveAction, which allows us to implement the compute method used in computing tasks using a thread pool
  • It recursively breaks down tasks into sub-tasks based on the granularity value
  • The constructors take lower and upper bound values which control the range of numbers we want to determine prime numbers for
  • It enables us to determine prime numbers using either a work-stealing thread pool or a single thread

5.2. Solving the Problem Faster with Thread Pools

5.2.用线程池更快地解决这个问题

Let’s determine prime numbers in a single-threaded manner and also using work-stealing thread pools.

让我们以单线程的方式确定素数,同时使用偷工减料的线程池。

First, let’s see the single-threaded approach:

首先,让我们看看单线程方法

PrimeNumbers primes = new PrimeNumbers(10000);
primes.findPrimeNumbers();

And now, the ForkJoinPool.commonPool approach:

而现在,ForkJoinPool.commonPool方法

PrimeNumbers primes = new PrimeNumbers(10000);
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(primes);
pool.shutdown();

Finally, we’ll have a look at the Executors.newWorkStealingPool approach:

最后,我们来看看Executors.newWorkStealingPool的方法

PrimeNumbers primes = new PrimeNumbers(10000);
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealer.invoke(primes);
stealer.shutdown();

We use the invoke method of the ForkJoinPool class to pass tasks to the thread pool. This method takes in instances of sub-classes of RecursiveAction. Using Java Microbench Harness, we benchmark these different approaches against each other in terms of the average time per operation:

我们使用ForkJoinPool类的invoke方法来将任务传递给线程池。这个方法接收RecursiveAction的子类的实例。使用Java Microbench Harness,我们以每个操作的平均时间为基准,对这些不同的方法进行比较。

# Run complete. Total time: 00:04:50

Benchmark                                                      Mode  Cnt    Score   Error  Units
PrimeNumbersUnitTest.Benchmarker.commonPoolBenchmark           avgt   20  119.885 ± 9.917  ms/op
PrimeNumbersUnitTest.Benchmarker.newWorkStealingPoolBenchmark  avgt   20  119.791 ± 7.811  ms/op
PrimeNumbersUnitTest.Benchmarker.singleThread                  avgt   20  475.964 ± 7.929  ms/op

It is clear that both ForkJoinPool.commonPool and Executors.newWorkStealingPool allow us to determine prime numbers faster than with a single-threaded approach.

很明显,ForkJoinPool.commonPoolExecutors.newWorkStealingPool都能让我们比单线程方法更快地确定素数。

The fork/join pool framework lets us break down the task into sub-tasks. We broke down the collection of 10,000 integers into batches of 1-100, 101-200, 201-300 and so on. We then determined prime numbers for each batch and made the total number of prime numbers available with our noOfPrimeNumbers method.

fork/join池框架让我们把任务分解成子任务。我们将10000个整数的集合分解为1-100、101-200、201-300等批次。然后,我们为每一批确定了质数,并通过noOfPrimeNumbers方法使质数的总数可用。

5.3. Stealing Work to Compute

5.3.偷工减料的计算

With a synchronous thread pool, ForkJoinPool.commonPool puts threads in the pool as long as the task is still in progress. As a result, the level of work stealing is not dependent on the level of task granularity.

在同步线程池中,ForkJoinPool.commonPool只要任务仍在进行中,就会将线程放入池中。 因此,偷工减料的程度并不取决于任务粒度的大小。

The asynchronous Executors.newWorkStealingPool is more managed, allowing the level of work stealing to be dependent on the level of task granularity.

异步的Executors.newWorkStealingPool更具管理性,允许工作偷窃的水平取决于任务粒度的水平。

We get the level of work stealing using the getStealCount of the ForkJoinPool class:

我们使用ForkJoinPool类的getStealCount获得偷工减料的水平。

long steals = forkJoinPool.getStealCount();

Determining the work-stealing count for Executors.newWorkStealingPool and ForkJoinPool.commonPool gives us dissimilar behavior:

确定Executors.newWorkStealingPoolForkJoinPool.commonPool的偷工减料数,给了我们不同的行为。

Executors.newWorkStealingPool ->
Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]

ForkJoinPool.commonPool ->
Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]

When granularity changes from fine to coarse (1 to 10,000) for Executors.newWorkStealingPool, the level of work stealing decreases. Therefore, the steal count is one when the task is not broken down (granularity of 10,000).

当粒度从细到粗时(1到10,000)对于Executors.newWorkStealingPool,偷工的程度会降低。因此,当任务没有被分解时,偷窃次数为1(颗粒度为10,000)。

The ForkJoinPool.commonPool has a different behavior. The level of work stealing is always high and not influenced much by the change in task granularity.

ForkJoinPool.commonPool有不同的行为。偷工减料的水平总是很高,而且不怎么受任务粒度变化的影响。

Technically speaking, our prime numbers example is one that supports asynchronous processing of event-style tasks. This is because our implementation does not enforce the joining of results.

从技术上讲,我们的素数例子是一个支持事件式任务的异步处理的例子。这是因为我们的实现并不强制执行结果的连接。

A case can be made that Executors.newWorkStealingPool offers the best use of resources in solving the problem.

可以说,Executors.newWorkStealingPool提供了解决该问题的最佳资源利用。

6. Conclusion

6.结语

In this article, we looked at work stealing and how to apply it using the fork/join framework. We also looked at the examples of work stealing and how it can improve processing time and use of resources.

在这篇文章中,我们研究了偷工减料以及如何使用fork/join框架来应用它。我们还看了偷工减料的例子,以及它如何改善处理时间和资源的使用。

As always, the full source code of the example is available over on GitHub.

一如既往,该示例的完整源代码可在GitHub上获取