Priority-based Job Scheduling in Java – Java中基于优先级的工作调度

最后修改: 2018年 2月 12日

中文/混合/英文(键盘快捷键:t)

1. Introduction

1.介绍

In a multi-threaded environment, sometimes we need to schedule tasks based on custom criteria instead of just the creation time.

在多线程环境中,有时我们需要根据自定义标准来安排任务,而不仅仅是创建时间。

Let’s see how we can achieve this in Java – using a PriorityBlockingQueue.

让我们看看如何在Java中实现这一点–使用一个PriorityBlockingQueue

2. Overview

2.概述

Let us say we have jobs that we want to execute based on their priority:

假设我们有一些工作,我们想根据它们的优先级来执行。

public class Job implements Runnable {
    private String jobName;
    private JobPriority jobPriority;
    
    @Override
    public void run() {
        System.out.println("Job:" + jobName +
          " Priority:" + jobPriority);
        Thread.sleep(1000); // to simulate actual execution time
    }

    // standard setters and getters
}

For demonstration purposes, we’re printing the job name and priority in the run() method.

为了演示,我们在run()方法中打印作业名称和优先级。

We also added sleep() so that we simulate a longer-running job; while the job is executing, more jobs will get accumulated in the priority queue.

我们还添加了sleep(),这样我们就模拟了一个运行时间较长的作业;在作业执行过程中,更多的作业将在优先级队列中得到积累。

Finally, JobPriority is a simple enum:

最后,JobPriority是一个简单的枚举。

public enum JobPriority {
    HIGH,
    MEDIUM,
    LOW
}

3. Custom Comparator

3.自定义比较器

We need to write a comparator defining our custom criteria; and, in Java 8, it’s trivial:

我们需要编写一个定义我们的自定义标准的比较器;而且,在Java 8中,这很简单

Comparator.comparing(Job::getJobPriority);

4. Priority Job Scheduler

4.优先作业调度器

With all the setup done, let’s now implement a simple job scheduler – which employs a single thread executor to look for jobs in the PriorityBlockingQueue and executes them:

所有的设置完成后,现在让我们实现一个简单的作业调度器–它采用一个单线程执行器在PriorityBlockingQueue中寻找作业并执行它们。

public class PriorityJobScheduler {

    private ExecutorService priorityJobPoolExecutor;
    private ExecutorService priorityJobScheduler 
      = Executors.newSingleThreadExecutor();
    private PriorityBlockingQueue<Job> priorityQueue;

    public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
        priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
        priorityQueue = new PriorityBlockingQueue<Job>(
          queueSize, 
          Comparator.comparing(Job::getJobPriority));
        priorityJobScheduler.execute(() -> {
            while (true) {
                try {
                    priorityJobPoolExecutor.execute(priorityQueue.take());
                } catch (InterruptedException e) {
                    // exception needs special handling
                    break;
                }
            }
        });
    }

    public void scheduleJob(Job job) {
        priorityQueue.add(job);
    }
}

The key here is to create an instance of PriorityBlockingQueue of Job type with a custom comparator. The next job to execute is picked from the queue using take() method which retrieves and removes the head of the queue.

这里的关键是创建一个PriorityBlockingQueueJob类型的实例,并带有一个自定义的比较器。使用take()方法从队列中挑选下一个要执行的作业,该方法检索并删除了队列的头部。

The client code now simply needs to call the scheduleJob() – which adds the job to the queue. The priorityQueue.add() queues the job at appropriate position as compared to existing jobs in the queue, using the JobExecutionComparator.

客户端代码现在只需要调用scheduleJob()–它将作业添加到队列中。priorityQueue.add()使用JobExecutionComparator,将作业排在适当的位置,与队列中的现有作业进行比较。

Note that the actual jobs are executed using a separate ExecutorService with a dedicated thread pool.

请注意,实际作业是使用一个单独的ExecutorService执行的,有一个专门的线程池。

5. Demo

5 演示

Finally, here’s a quick demonstration of the scheduler:

最后,这里有一个关于调度器的快速演示。

private static int POOL_SIZE = 1;
private static int QUEUE_SIZE = 10;

@Test
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
    Job job1 = new Job("Job1", JobPriority.LOW);
    Job job2 = new Job("Job2", JobPriority.MEDIUM);
    Job job3 = new Job("Job3", JobPriority.HIGH);
    Job job4 = new Job("Job4", JobPriority.MEDIUM);
    Job job5 = new Job("Job5", JobPriority.LOW);
    Job job6 = new Job("Job6", JobPriority.HIGH);
    
    PriorityJobScheduler pjs = new PriorityJobScheduler(
      POOL_SIZE, QUEUE_SIZE);
    
    pjs.scheduleJob(job1);
    pjs.scheduleJob(job2);
    pjs.scheduleJob(job3);
    pjs.scheduleJob(job4);
    pjs.scheduleJob(job5);
    pjs.scheduleJob(job6);

    // clean up
}

In order to demo that the jobs are executed in the order of priority, we’ve kept the POOL_SIZE as 1 even though the QUEUE_SIZE is 10. We provide jobs with varying priority to the scheduler.

为了演示作业按优先级执行,我们将POOL_SIZE保持为1,尽管QUEUE_SIZE是10。我们向调度器提供不同优先级的作业。

Here is a sample output we got for one of the runs:

下面是我们在其中一次运行中得到的输出样本。

Job:Job3 Priority:HIGH
Job:Job6 Priority:HIGH
Job:Job4 Priority:MEDIUM
Job:Job2 Priority:MEDIUM
Job:Job1 Priority:LOW
Job:Job5 Priority:LOW

The output could vary across runs. However, we should never have a case where a lower priority job is executed even when the queue contains a higher priority job.

在不同的运行中,输出可能有所不同。然而,我们不应该出现这样的情况:即使队列中包含了一个优先级较高的作业,也会执行一个较低的作业。

6. Conclusion

6.结论

In this quick tutorial, we saw how PriorityBlockingQueue can be used to execute jobs in a custom priority order.

在这个快速教程中,我们看到PriorityBlockingQueue如何被用来按自定义的优先级顺序执行作业。

As usual, source files can be found over on GitHub.

像往常一样,源文件可以在GitHub上找到超过