Guide to the Fork/Join Framework in Java – Java中的Fork/Join框架指南

最后修改: 2016年 4月 25日

中文/混合/英文(键盘快捷键:t)

1. Overview

1.概述

Java 7 introduced the fork/join framework. It provides tools to help speed up parallel processing by attempting to use all available processor cores. It accomplishes this through a divide and conquer approach.

Java 7引入了fork/join框架。它提供了一些工具,通过尝试使用所有可用的处理器内核来帮助加速并行处理。它通过分而治之的方法实现了这一目标

In practice, this means that the framework first “forks,” recursively breaking the task into smaller independent subtasks until they are simple enough to run asynchronously.

在实践中,这意味着框架首先 “分叉”,递归地将任务分解为更小的独立子任务,直到它们简单到可以异步运行。

After that, the “join” part begins. The results of all subtasks are recursively joined into a single result. In the case of a task that returns void, the program simply waits until every subtask runs.

之后,“连接 “部分开始。所有子任务的结果被递归地连接成一个单一的结果。如果一个任务的返回结果是空的,程序只需等待,直到每个子任务都运行。

To provide effective parallel execution, the fork/join framework uses a pool of threads called the ForkJoinPool. This pool manages worker threads of type ForkJoinWorkerThread.

为了提供有效的并行执行,fork/join框架使用一个称为ForkJoinPool的线程池。这个池子管理着ForkJoinWorkerThread类型的工作线程。

2. ForkJoinPool

2. ForkJoinPool

The ForkJoinPool is the heart of the framework. It is an implementation of the ExecutorService that manages worker threads and provides us with tools to get information about the thread pool state and performance.

ForkJoinPool是该框架的核心。它是ExecutorService的一个实现,它管理工人线程,并为我们提供工具来获取关于线程池状态和性能的信息。

Worker threads can execute only one task at a time, but the ForkJoinPool doesn’t create a separate thread for every single subtask. Instead, each thread in the pool has its own double-ended queue (or deque, pronounced “deck”) that stores tasks.

工作线程一次只能执行一个任务,但ForkJoinPool并不为每一个子任务创建单独的线程。相反,池中的每个线程都有自己的双端队列(或deque,发音为 “甲板”),用于存储任务。

This architecture is vital for balancing the thread’s workload with the help of the work-stealing algorithm.

这种架构对于在偷工减料算法的帮助下平衡线程的工作负荷至关重要。

2.1. Work-Stealing Algorithm

2.1.偷工减料算法

Simply put, free threads try to “steal” work from deques of busy threads.

简单地说,空闲线程试图从繁忙线程的脱机中 “偷走 “工作。

By default, a worker thread gets tasks from the head of its own deque. When it is empty, the thread takes a task from the tail of the deque of another busy thread or from the global entry queue since this is where the biggest pieces of work are likely to be located.

默认情况下,一个工作线程从它自己的deque的头部获取任务。当它是空的时候,线程会从另一个繁忙线程的deque尾部或全局入口队列中获取任务,因为最大的工作可能在这里。

This approach minimizes the possibility that threads will compete for tasks. It also reduces the number of times the thread will have to go looking for work, as it works on the biggest available chunks of work first.

这种方法最大限度地减少了线程竞争任务的可能性。它还减少了线程必须去寻找工作的次数,因为它首先在最大的可用工作块上工作。

2.2. ForkJoinPool Instantiation

2.2.ForkJoinPool实例化

In Java 8, the most convenient way to get access to the instance of the ForkJoinPool is to use its static method commonPool(). This will provide a reference to the common pool, which is a default thread pool for every ForkJoinTask.

在 Java 8 中,获取 ForkJoinPool 实例的最便捷方式是使用其静态方法 commonPool ()。这将提供一个对公共池的引用,它是每个ForkJoinTask的默认线程池。

According to Oracle’s documentation, using the predefined common pool reduces resource consumption since this discourages the creation of a separate thread pool per task.

根据Oracle的文档,使用预定义的公共池可以减少资源消耗,因为这不鼓励为每个任务创建单独的线程池。

ForkJoinPool commonPool = ForkJoinPool.commonPool();

We can achieve the same behavior in Java 7 by creating a ForkJoinPool and assigning it to a public static field of a utility class:

在Java 7中,我们可以通过创建一个ForkJoinPool并将其分配给一个实用类的public static字段来实现同样的行为。

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Now we can easily access it:

现在我们可以轻松地访问它。

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

With ForkJoinPool’s constructors, we can create a custom thread pool with a specific level of parallelism, thread factory and exception handler. Here the pool has a parallelism level of 2. This means that pool will use two processor cores.

通过ForkJoinPool的构造器,我们可以创建一个具有特定平行度、线程工厂和异常处理程序的自定义线程池。这里的线程池的并行度为2,这意味着该线程池将使用两个处理器核心。

3. ForkJoinTask<V>

3. ForkJoinTask<V>

ForkJoinTask is the base type for tasks executed inside ForkJoinPool. In practice, one of its two subclasses should be extended: the RecursiveAction for void tasks and the RecursiveTask<V> for tasks that return a value. They both have an abstract method compute() in which the task’s logic is defined.

ForkJoinTask是在ForkJoinPool中执行的任务的基础类型。在实践中,它的两个子类之一应该被扩展:RecursiveAction用于void任务,RecursiveTask<V>用于返回值的任务。它们都有一个抽象的方法compute() ,任务的逻辑在其中被定义。

3.1. RecursiveAction

3.1. RecursiveAction

In the example below, we use a String called workload to represent the unit of work to be processed. For demonstration purposes, the task is a nonsensical one: It simply uppercases its input and logs it.

在下面的例子中,我们用一个叫做workload字符串来表示要处理的工作单位。出于演示目的,该任务是一个无意义的任务。它只是简单地将其输入大写并记录下来。

To demonstrate the forking behavior of the framework, the example splits the task if workload.length() is larger than a specified threshold using the createSubtask() method.

为了演示框架的分叉行为,如果workload.length()大于指定的阈值,该示例将任务分割开来使用createSubtask()方法。

The String is recursively divided into substrings, creating CustomRecursiveTask instances that are based on these substrings.

字符串被递归划分为子字符串,创建基于这些子字符串的CustomRecursiveTaskinstances。

As a result, the method returns a List<CustomRecursiveAction>.

结果是,该方法返回一个List

The list is submitted to the ForkJoinPool using the invokeAll() method:

该列表被提交给ForkJoinPool,使用invokeAll()方法。

public class CustomRecursiveAction extends RecursiveAction {

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger = 
      Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }

    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
           processing(workload);
        }
    }

    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    }

    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by " 
          + Thread.currentThread().getName());
    }
}

We can use this pattern to develop our own RecursiveAction classes. To do this, we create an object that represents the total amount of work, chose a suitable threshold, define a method to divide the work and define a method to do the work.

我们可以使用这种模式来开发我们自己的RecursiveAction类。为此,我们创建一个代表总工作量的对象,选择一个合适的阈值,定义一个分工的方法,并定义一个做工的方法。

3.2. RecursiveTask<V>

3.2. RecursiveTask<V>

For tasks that return a value, the logic here is similar.

对于返回一个值的任务,这里的逻辑是类似的。

The difference is that the result for each subtask is united in a single result:

不同的是,每个子任务的结果都统一为一个结果。

public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }

    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .mapToInt(ForkJoinTask::join)
              .sum();
        } else {
            return processing(arr);
        }
    }

    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }

    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
          .filter(a -> a > 10 && a < 27)
          .map(a -> a * 10)
          .sum();
    }
}

In this example, we use an array stored in the arr field of the CustomRecursiveTask class to represent the work. The createSubtasks() method recursively divides the task into smaller pieces of work until each piece is smaller than the threshold. Then the invokeAll() method submits the subtasks to the common pool and returns a list of Future.

在这个例子中,我们使用存储在arr字段中的数组来表示工作。createSubtasks()方法递归地将任务划分为更小的工作片段,直到每一个片段都小于阈值。然后invokeAll()方法将子任务提交给公共池,并返回一个Future的列表。

To trigger execution, the join() method is called for each subtask.

为了触发执行,每个子任务都要调用join()方法。

We’ve accomplished this here using Java 8’s Stream API. We use the sum() method as a representation of combining sub results into the final result.

我们在这里使用Java 8的Stream API完成了这个任务。我们使用sum()方法来表示将子结果合并为最终结果。

4. Submitting Tasks to the ForkJoinPool

4.向ForkJoinPool提交任务

We can use a few approaches to submit tasks to the thread pool.

我们可以使用一些方法来提交任务到线程池。

Let’s start with the submit() or execute() method (their use cases are the same):

让我们从submit() execute()方法开始(它们的使用情况是一样的)。

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

The invoke() method forks the task and waits for the result, and doesn’t need any manual joining:

invoke()方法会分叉任务并等待结果,并且不需要任何手动加入。

int result = forkJoinPool.invoke(customRecursiveTask);

The invokeAll() method is the most convenient way to submit a sequence of ForkJoinTasks to the ForkJoinPool. It takes tasks as parameters (two tasks, var args or a collection), forks and then returns a collection of Future objects in the order in which they were produced.

invokeAll()方法是向ForkJoinPool提交ForkJoinTasks序列的最方便方式。它将任务作为参数(两个任务、var args或一个集合),进行分叉,然后按照产生的顺序返回一个Future对象的集合。

Alternatively, we can use separate fork() and join() methods. The fork() method submits a task to a pool, but it doesn’t trigger its execution. We must use the join() method for this purpose.

另外,我们可以使用单独的fork()join()方法。fork()方法将一个任务提交给一个池,但它并不触发它的执行。我们必须使用join()方法来实现这一目的。

In the case of RecursiveAction, the join() returns nothing but null; for RecursiveTask<V>, it returns the result of the task’s execution:

对于RecursiveActionjoin()只返回null;对于RecursiveTask<V>,它返回该任务的执行结果。

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

Here we used the invokeAll() method to submit a sequence of subtasks to the pool. We can do the same job with fork() and join(), though this has consequences for the ordering of the results.

在这里,我们使用invokeAll()方法来提交一连串的子任务到池中。我们可以用fork()join()完成同样的工作,尽管这对结果的排序有影响。

To avoid confusion, it is generally a good idea to use invokeAll() method to submit more than one task to the ForkJoinPool.

为了避免混淆,一般来说,使用invokeAll()方法向ForkJoinPool提交一个以上的任务是个好主意。

5. Conclusion

5.结论

Using the fork/join framework can speed up processing of large tasks, but to achieve this outcome, we should follow some guidelines:

使用fork/join框架可以加快大型任务的处理速度,但是为了达到这个结果,我们应该遵循一些准则。

  • Use as few thread pools as possible. In most cases, the best decision is to use one thread pool per application or system.
  • Use the default common thread pool if no specific tuning is needed.
  • Use a reasonable threshold for splitting ForkJoinTask into subtasks.
  • Avoid any blocking in ForkJoinTasks.

The examples used in this article are available in the linked GitHub repository.

本文中使用的例子可在链接的GitHub仓库中找到。