1. Introduction
1.介绍
In a multi-threaded environment, sometimes we need to schedule tasks based on custom criteria instead of just the creation time.
在多线程环境中,有时我们需要根据自定义标准来安排任务,而不仅仅是创建时间。
Let’s see how we can achieve this in Java – using a PriorityBlockingQueue.
让我们看看如何在Java中实现这一点–使用一个PriorityBlockingQueue。
2. Overview
2.概述
Let us say we have jobs that we want to execute based on their priority:
假设我们有一些工作,我们想根据它们的优先级来执行。
public class Job implements Runnable {
private String jobName;
private JobPriority jobPriority;
@Override
public void run() {
System.out.println("Job:" + jobName +
" Priority:" + jobPriority);
Thread.sleep(1000); // to simulate actual execution time
}
// standard setters and getters
}
For demonstration purposes, we’re printing the job name and priority in the run() method.
为了演示,我们在run()方法中打印作业名称和优先级。
We also added sleep() so that we simulate a longer-running job; while the job is executing, more jobs will get accumulated in the priority queue.
我们还添加了sleep(),这样我们就模拟了一个运行时间较长的作业;在作业执行过程中,更多的作业将在优先级队列中得到积累。
Finally, JobPriority is a simple enum:
最后,JobPriority是一个简单的枚举。
public enum JobPriority {
HIGH,
MEDIUM,
LOW
}
3. Custom Comparator
3.自定义比较器
We need to write a comparator defining our custom criteria; and, in Java 8, it’s trivial:
我们需要编写一个定义我们的自定义标准的比较器;而且,在Java 8中,这很简单。
Comparator.comparing(Job::getJobPriority);
4. Priority Job Scheduler
4.优先作业调度器
With all the setup done, let’s now implement a simple job scheduler – which employs a single thread executor to look for jobs in the PriorityBlockingQueue and executes them:
所有的设置完成后,现在让我们实现一个简单的作业调度器–它采用一个单线程执行器在PriorityBlockingQueue中寻找作业并执行它们。
public class PriorityJobScheduler {
private ExecutorService priorityJobPoolExecutor;
private ExecutorService priorityJobScheduler
= Executors.newSingleThreadExecutor();
private PriorityBlockingQueue<Job> priorityQueue;
public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
priorityQueue = new PriorityBlockingQueue<Job>(
queueSize,
Comparator.comparing(Job::getJobPriority));
priorityJobScheduler.execute(() -> {
while (true) {
try {
priorityJobPoolExecutor.execute(priorityQueue.take());
} catch (InterruptedException e) {
// exception needs special handling
break;
}
}
});
}
public void scheduleJob(Job job) {
priorityQueue.add(job);
}
}
The key here is to create an instance of PriorityBlockingQueue of Job type with a custom comparator. The next job to execute is picked from the queue using take() method which retrieves and removes the head of the queue.
这里的关键是创建一个PriorityBlockingQueue的Job类型的实例,并带有一个自定义的比较器。使用take()方法从队列中挑选下一个要执行的作业,该方法检索并删除了队列的头部。
The client code now simply needs to call the scheduleJob() – which adds the job to the queue. The priorityQueue.add() queues the job at appropriate position as compared to existing jobs in the queue, using the JobExecutionComparator.
客户端代码现在只需要调用scheduleJob()–它将作业添加到队列中。priorityQueue.add()使用JobExecutionComparator,将作业排在适当的位置,与队列中的现有作业进行比较。
Note that the actual jobs are executed using a separate ExecutorService with a dedicated thread pool.
请注意,实际作业是使用一个单独的ExecutorService执行的,有一个专门的线程池。
5. Demo
5 演示
Finally, here’s a quick demonstration of the scheduler:
最后,这里有一个关于调度器的快速演示。
private static int POOL_SIZE = 1;
private static int QUEUE_SIZE = 10;
@Test
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
Job job1 = new Job("Job1", JobPriority.LOW);
Job job2 = new Job("Job2", JobPriority.MEDIUM);
Job job3 = new Job("Job3", JobPriority.HIGH);
Job job4 = new Job("Job4", JobPriority.MEDIUM);
Job job5 = new Job("Job5", JobPriority.LOW);
Job job6 = new Job("Job6", JobPriority.HIGH);
PriorityJobScheduler pjs = new PriorityJobScheduler(
POOL_SIZE, QUEUE_SIZE);
pjs.scheduleJob(job1);
pjs.scheduleJob(job2);
pjs.scheduleJob(job3);
pjs.scheduleJob(job4);
pjs.scheduleJob(job5);
pjs.scheduleJob(job6);
// clean up
}
In order to demo that the jobs are executed in the order of priority, we’ve kept the POOL_SIZE as 1 even though the QUEUE_SIZE is 10. We provide jobs with varying priority to the scheduler.
为了演示作业按优先级执行,我们将POOL_SIZE保持为1,尽管QUEUE_SIZE是10。我们向调度器提供不同优先级的作业。
Here is a sample output we got for one of the runs:
下面是我们在其中一次运行中得到的输出样本。
Job:Job3 Priority:HIGH
Job:Job6 Priority:HIGH
Job:Job4 Priority:MEDIUM
Job:Job2 Priority:MEDIUM
Job:Job1 Priority:LOW
Job:Job5 Priority:LOW
The output could vary across runs. However, we should never have a case where a lower priority job is executed even when the queue contains a higher priority job.
在不同的运行中,输出可能有所不同。然而,我们不应该出现这样的情况:即使队列中包含了一个优先级较高的作业,也会执行一个较低的作业。
6. Conclusion
6.结论
In this quick tutorial, we saw how PriorityBlockingQueue can be used to execute jobs in a custom priority order.
在这个快速教程中,我们看到PriorityBlockingQueue如何被用来按自定义的优先级顺序执行作业。
As usual, source files can be found over on GitHub.
像往常一样,源文件可以在GitHub上找到超过。