感谢学java的生生提供的视频,让我能够速成部分实践八股,弥补了我没有好好看和学习国外CS课程的缺憾
现在有一个Main函数,我们要实现里面scheduleService方法(就是实现@Schedule注解),让main函数正常实习
package tech.insight;
/**
* @author gongxuanzhangmeit@gmail.com
*/
public class Main {
public static void main(String[] args) throws InterruptedException {
ScheduleService scheduleService = new ScheduleService();
scheduleService.schedule(() -> {
System.out.println(1);
}, 1000);
Thread.sleep(1000);
System.out.println("添加一个每200毫秒打印一个2的定时任务");
scheduleService.schedule(() -> {
System.out.println(2);
}, 200);
}
}
package tech.insight;
/**
* @author gongxuanzhangmelt@gmail.com
*/
public class ScheduleService {
void schedule(Runnable task, long delay) {
// TODO
}
}
这是一个新的工程。我们创建了一个类,这个类有一个函数 `schedule`,它有两个参数:第一个是一个任务(task),第二个是一个延迟时间(delay),单位是毫秒。每隔这么多毫秒,我们会让这个任务自动执行一次。一般来说这里可以加一个 `TimeUnit`,但我们默认使用毫秒。
我们可以先写一个“慢函数”,这个慢函数直接调用某个方法,比如打印一个“1”。我们希望它每隔100毫秒打印一次“1”。当然,在主函数中,我们不能阻塞主线程。因此,我们在主函数里先睡眠一秒,然后添加一个每200毫秒打印一次“2”的定时任务。也就是说,我们在一秒之后,再添加一个每200毫秒打印“2”的任务。
接下来我们要实现 `schedule` 方法,使得这个“慢函数”能够正常运行。
首先思考一个问题:这个任务是由谁来执行的?我们可以随便定义一个线程池,无所谓。问题是,如何用线程池去执行一个带延迟的任务? 最直观的想法就是新建一个线程池线程,让他按照要求sleep那么几秒,但是有问题:下一次什么时候能执行?目前这个任务只能执行一次,也没有延迟。我们能不能简单粗暴地让线程先 `sleep(delay)` 毫秒,然后直接调用 `task.run()`?其实是可以的,但这会带来几个问题。
void schedule(Runnable task, long delay) {
ExecutorService executorService = Executors.newFixedThreadPool(6);
executorService.execute(() -> {
Thread.sleep(delay);
task.run();
});
}
第一个问题是:下一次什么时候执行?我们可以写一个死循环,在任务执行完后再次睡眠 `delay` 时间,然后继续执行。但这样又引出另一个问题:如果我们使用的线程池只有六个线程,那么最多只能同时处理六个任务。当用户第七次调用 `schedule` 时,线程池就会耗尽,无法提交新任务,显然这不符合我们的需求。
所以我们需要一个更好的设计思路。设想有一个组件,它可以在指定的时间点自动把任务提交到线程池中,而不是在调用 `schedule` 的时候立即提交。换句话说,如果有一个组件叫“触发器”(Trigger),它负责等待合适的时间,然后把任务扔进线程池,而线程池只负责执行任务本身,不负责调度时间,那是不是就能解决问题了?
所以这个定时任务的核心就在于这个“触发器”如何实现。
根据上面的分析,我们重新调整代码结构。首先,把真正执行任务的线程池作为成员变量或局部变量创建出来,线程池的具体类型可以根据需要配置。然后我们定义一个内部类,叫做 `Trigger`,它的职责非常单一:等待正确的执行时间,并将任务提交到线程池。
那么怎么实现这个 `Trigger` 呢?由于它需要持续监听任务并阻塞等待,但它本身不能阻塞主线程,所以 `Trigger` 内部必须有一个独立的线程来运行。
我们来思考这个线程要做什么。首先,它需要获取所有待执行的任务,并且准确知道每个任务的执行时间。为此,我们封装一个类叫 `Job`。这个 `Job` 是对任务的包装,包含两个关键信息:一是真正的任务(`Runnable task`),二是任务的下一次执行时间(`startTime`)。我们为它提供 getter 和 setter 方法。
当 `Trigger` 接收到一个 `Job` 时,它需要判断当前时间和 `Job` 的 `startTime` 的关系。用 `startTime - System.currentTimeMillis()` 得到一个差值,称为 `waitTime`。如果 `waitTime > 0`,说明还没到执行时间,需要等待;如果 `waitTime <= 0`,说明已经到了或错过了执行时间,应该立即执行任务。
这就是整个 `Trigger` 的基本思路。
public class Job {
private Runnable task;
private long startTime = now;
public Runnable getTask() {
return task;
}
public Job setTask(Runnable task) {
this.task = task;
return this;
}
public long getStartTime() {
return startTime;
}
public Job setStartTime(long startTime) {
this.startTime = startTime;
}
}
具体实现上,这个线程会从某个容器中取出一个 `Job`,判断是否需要等待,如果需要就等待对应时间,然后将任务提交给线程池执行。
我们可以先简单写一个死循环:
// 等待合适的时间,把对应的任务扔到线程池中
class Trigger{
List<Job> jobList = new ArrayList<>();
Thread thread = new Thread(() -> {
while (true) {
// 1 job wait 1s 2 job wait 500ms
for (Job job : jobList) {
long waitTime = job.getStartTime() - System.currentTimeMillis();
if (waitTime > 0) {
Thread.sleep(waitTime);
}
executorService.execute(job.getTask());
}
}
});
}
但这段代码存在两个明显问题:
第一,当 `jobList` 是空的时候,这个死循环会不断轮询,造成 CPU 资源浪费。
第二,假设 `jobList` 中有两个任务:一个需要等待 1 秒,另一个需要等待 500 毫秒。如果我们按顺序处理,仍然会先等 1 秒,导致第二个任务延迟执行。所以我们需要确保任务是有序的——先执行的任务排在前面。
因此,我们必须对 `Job` 实现排序逻辑,比如实现 `Comparable<Job>` 接口,比较依据就是 `startTime`。每次取任务前对列表排序,以保证最先执行的任务在最前面。
Collections.sort(jobList);
但这样做也有问题:每次排序的时间复杂度是 O(n log n),频繁插入任务会导致性能下降;而且在多线程环境下,排序过程中如果有新任务加入,会产生线程安全问题。
为了解决这两个问题,我们将容器换成一个**阻塞式的优先级队列**(`PriorityBlockingQueue`)。如果你不熟悉这个类,可以查阅相关文档。你可以把它理解为一个自动排序、线程安全、支持阻塞操作的优先队列。
我们将 `jobList` 替换为 `PriorityBlockingQueue<Job> queue`。这样我们不需要手动排序,每次取出的任务自然是最先要执行的那个。
修改后的逻辑如下:
class Trigger{
PriorityQueue<Job> queue = new PriorityBlockingQueue<>();
Thread thread = new Thread(() ->{
while (true){
if(!queue.isEmpty()){
Job job = queue.poll();
long waitTime = job.getStartTime() - System.currentTimeMillis();
if(waitTime > 0){
Thread.sleep(waitTime);
}
executorService.execute(job.getTask());
}
}
});
}
但是这里还有一个问题:如果我们正在等待一个 1 秒后执行的任务,此时用户又添加了一个 500 毫秒后执行的任务,原来的等待仍然会持续 1 秒,新任务无法提前触发。所以我们需要一个“唤醒机制”,让正在等待的线程能被及时唤醒,重新检查队列。
对于 `Thread.sleep()`,唯一的唤醒方式是调用 `interrupt()`,但这是 JDK 不推荐的做法,还需要捕获 `InterruptedException`,代码不优雅。
我们改用 `LockSupport.park()` 和 `LockSupport.unpark()`。`park()` 可以阻塞线程,`unpark(Thread t)` 可以唤醒指定线程。我们可以在 `Trigger` 中暴露一个 `wakeup()` 方法,用于唤醒自己。
于是我们修改逻辑:每当添加新任务时,就调用一次 `wakeup()`,确保触发器能重新检查队列。
但现在又出现一个问题:假设我们正在等待 1 秒,这时添加了一个 500 毫秒的任务并唤醒了线程,线程被唤醒后继续执行,但它仍然会去执行那个 1 秒的任务,而不是新的 500 毫秒任务。这说明我们的逻辑有问题。
正确的做法是:**每次被唤醒后,都应该重新从队列中获取最近的任务,重新判断是否需要等待**。
所以我们重构逻辑:
class Trigger{
PriorityQueue<Job> queue = new PriorityBlockingQueue<>();
Thread thread = new Thread(() ->{
while (true){
if (queue.isEmpty()) {
LockSupport.park();
}
Job latelyJob = queue.peek();
if (latelyJob.getStartTime() < System.currentTimeMillis()) {
latelyJob = queue.poll();
executorService.execute(latelyJob.getTask());
}else{
LockSupport.parkUntil(latelyJob.getStartTime());
}
}
});
if(!queue.isEmpty()){
// wait 1s
Job job = queue.poll();
long waitTime = job.getStartTime() - System.currentTimeMillis();
if(waitTime > 0){
LockSupport.park(1s);
}
executorService.execute(job.getTask());
}
}
注意这里使用了 `peek()` 先查看任务,再决定是否 `poll()`。因为在多线程环境下,`peek()` 返回的对象和 `poll()` 返回的对象可能不是同一个——比如在 `peek()` 之后、`poll()` 之前,又有更早的任务被插入。
但因为是优先队列,只要我们每次取“最近”的任务,就能保证正确性。
此外还有一个重要问题:**虚假唤醒**(spurious wakeup)。即使没有调用 `unpark()`,`park()` 也可能被随机唤醒。类似 `Object.wait()` 的推荐写法是使用 `while` 而不是 `if` 来判断条件。因此我们也应该把 `queue.peek()` 的判断放在 `while` 循环中。
最终修正版逻辑:
// 等待合适的时间,把对应的任务扔到线程池中
class Trigger{
PriorityQueue<Job> queue = new PriorityBlockingQueue<>();
Thread thread = new Thread(() ->{
while (true){
while (queue.isEmpty()) {
LockSupport.park();
}
Job latelyJob = queue.peek();
if (latelyJob.getStartTime() < System.currentTimeMillis()) {
latelyJob = queue.poll();
executorService.execute(latelyJob.getTask());
}else{
LockSupport.parkUntil(latelyJob.getStartTime());
}
}
});
void wakeUp(){
LockSupport.unpark(thread);
}
}
这样就解决了空队列阻塞、虚假唤醒、及时响应新任务等问题。
接下来我们完善 `schedule` 方法。每次调用 `schedule(task, delay)` 时,我们需要创建一个 `Job`,设置其 `task` 和 `startTime`(当前时间 + delay),然后放入队列,并调用 `trigger.wakeup()` 唤醒触发器线程。
void schedule(Runnable task, long delay) {
Job job = new Job();
job.setTask(task);
job.setStartTime(System.currentTimeMillis() + delay);
trigger.queue.offer(job);
trigger.wakeUp();
}
注意:`Trigger` 的线程必须在首次调用 `schedule` 前启动。我们可以在类初始化时就 `new Thread(triggerRunnable).start()`,或者在第一次添加任务时启动。也可以加一句日志:“触发器已启动”。
现在还有一个问题:目前的任务只执行一次。我们要实现周期性任务,就必须在任务执行后,计算下一次执行时间,生成新的 `Job` 并重新放入队列。
为此,我们在 `Job` 类中增加一个字段:`delay`,表示任务的执行间隔。在任务执行完毕后,创建一个新的 `Job`:
这样就能实现无限循环的定时任务。
// 等待合适的时间,把对应的任务扔到线程池中
class Trigger {
PriorityQueue<Job> queue = new PriorityBlockingQueue<>();
Thread thread = new Thread(() -> {
while (true) {
while (queue.isEmpty()) {
LockSupport.park();
}
Job latelyJob = queue.peek();
if (latelyJob.getStartTime() < System.currentTimeMillis()) {
latelyJob = queue.poll();
executorService.execute(latelyJob.getTask());
Job nextJob = new Job();
nextJob.setTask(latelyJob.getTask());
nextJob.setDelay(latelyJob.getDelay());
nextJob.setStartTime(System.currentTimeMillis() + latelyJob.getDelay());
queue.offer(nextJob);
} else {
LockSupport.parkUntil(latelyJob.getStartTime());
}
}
});
{
thread.start();
System.out.println("触发器启动了");
}
}
我们测试一下:主函数中先 schedule 一个每 100ms 打印“1”的任务,sleep 1秒后,再添加一个每 200ms 打印“2”的任务。打印内容加上时间戳格式化输出,例如:
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss.SSS")) + " - 这是100ms一次的任务");
观察输出,虽然时间间隔略有偏差(受线程调度影响),但基本符合预期。200ms 的任务也大致按周期执行。
总结一下,我们大约用了70行代码(去掉空行和注释后约50行),实现了一个轻量级的定时任务调度器,核心思想是:
- 使用独立的 `Trigger` 线程负责调度;
- 使用 `PriorityBlockingQueue` 管理任务,按执行时间排序;
- 使用 `LockSupport.parkUntil()` 实现精准阻塞;
- 支持周期性任务的自动重入队列;
- 对外提供简单的 `schedule(task, delay)` 接口。
最后留一个思考题:你能实现一个可以中断的 `Job` 吗?比如用户可以在运行时取消某个定时任务。另外,请思考:Chrome 中的 `setTimeout` / `setInterval` 和我们这种基于线程池+优先队列的定时任务实现有什么区别?
Comments NOTHING