org.quartz-scheduler 代码分析

news/2024/7/3 11:41:48

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

Scheduler 

  1. 通过调度器工厂SchedulerFactory的实例对象StdSchedulerFactory构建Scheduler ;
    43e9c09df6d4043a2c5ce73dcbb09c5a9d8.jpg
     
    1. 从指定的文件初始化配置信息(默认文件名"quartz.properties", 系统变量为"org.quartz.properties")
      8dafcd2369bec7b39be40149ca5ad40fb06.jpg
      定义的默认执行QuartzSchedulerThread的线程池为:
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
      
    2.  一般返回StdScheduler,但核心是QuartzScheduler!
      1ac74c784dc1b3b35b182186e81b608d1cc.jpg
      StdSchedulerFactory.instantiate() 很重要!

      初始化RAMJobStoreSimpleThreadPoolQuartzSchedulerJobFactorySchedulerPluginJobRunShellFactoryQuartzSchedulerResources 等对象,返回StdScheduler实例。

  2. QuartzScheduler核心: 线程对象QuartzSchedulerThread。随着Scheduler实例化而被创建并扔进线程池执行。
    该线程就是调度线程,主要任务就是不停的从JobStore中获取即将被触发的触发器来执行。
    2e38e4e470460451b0f5a1a845fef951eac.jpg

scheduleJob

  1. 构建JobDetail<JobDetailImpl>
    JobDetail result = JobBuilder.newJob(Class <? extends Job> jobClass).withIdentity(String name).build();
    
  2.  构建Trigger (有各种类型,由ScheduleBuilder来指定。 eg. SimpleScheduleBuilder -> SimpleTrigger<SimpleTriggerImpl>;  CronScheduleBuilder-> CronTrigger<CronTriggerImpl>)
    TriggerBuilder.newTrigger().withIdentity(String name).withSchedule(ScheduleBuilder schedBuilder).build()
  3. scheduler<QuartzScheduler>.scheduleJob(JobDetail jobDetail, Trigger trigger)
    2cb56316fd9d5d555b7c2b4892305421bec.jpg
    1. Trigger的JobKey一定与JobDetail的Key得相同, 否则异常!
    2. 计算出trigger在scheduler中能够第一次执行的时间,若无效则异常!(eg. CronCalendar
    3. 在JobStore中注册jobDetail和trigger;
    4. 唤醒QuartzSchedulerThread中的sigLock等待锁;并将trigger下一次要实行的时间NextFireTime通过SchedulerSignalerImpl传递到QuartzSchedulerThread
  4. scheduler<QuartzScheduler>.start(): 设置QuartzSchedulerThread中的paused为false,触发任务的执行。
    d80131dcbcccda3df9c48be5d882db2a7fb.jpg

SimpleThreadPool

根据count生成WorkerThread并保存在availWorkers 中;
当使用线程池时,通过synchronized 来控制并发。从availWorkers 移出第一个WorkerThread使用并保存到busyWorkers中 ;如果没有空闲线程则wait。

与JDK提供的线程池有很大的不同, 没有缓存队列、最大线程数拒绝策略等。 通过阻塞wait直到有空闲workers再执行。如果Shutdow后,还有任务被提交执行,则直接新实例化WorkerThread

public class SimpleThreadPool implements ThreadPool {

    private int count = -1; // 线程个数
    private List<WorkerThread> workers; //只是维护初始化的workerThread集合
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); //可用线程
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); //繁忙线程
       ......
    public void initialize() throws SchedulerConfigException {
       ......
        // create the worker threads and start them
        Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
        while(workerThreads.hasNext()) {
            WorkerThread wt = workerThreads.next();
            wt.start();
            availWorkers.add(wt);
        }
    }

 public boolean runInThread(Runnable runnable) {
        if (runnable == null) {
            return false;
        }

        synchronized (nextRunnableLock) {
            handoffPending = true;
            // Wait until a worker thread is available
            while ((availWorkers.size() < 1) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }
            if (!isShutdown) {
                WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
                busyWorkers.add(wt);
                wt.run(runnable);
            } else {
                // If the thread pool is going down, execute the Runnable
                // within a new additional worker thread (no thread from the pool).
                WorkerThread wt = new WorkerThread(this, threadGroup,
                        "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
                busyWorkers.add(wt);
                workers.add(wt);
                wt.start();
            }
            nextRunnableLock.notifyAll();
            handoffPending = false;
        }

        return true;
    }

WorkerThread

控制单个Runnable对象的执行过程。
e33683852fb0189743cd5c1128627a8756a.jpg

成员变量有个Object类型对象作为lock,WorkerThread实例化后run()过程中如果还没有注入runnable且执行标记run==false时循环调用:lock.wait(500);

当SimpleThreadPool.runThread(Runnable) 调用WorkerThread.run(Runnable)时,注入执行Runnable并 lock.notifyAll()
86d0ed3b8414322a89a1aaf8fc175243dcb.jpg

QuartzSchedulerThread

quartz核心处理过程, 通过synchronized(sigLock ) 来控制线程并发。

 run()

  1. 通过Object对象sigLock & paused & 原子整型 halted 来控制当先线程是否运行任务。
    1. sigLock: 方法signalSchedulingChange、togglePause、halt 都触发notifyAll()。
    2. paused:  初始化为true,默认阻塞线程执行。
  2. 执行会优先判定线程池<SimpleThreadPool>中是否有空闲有效的WorkerThread,没有则阻塞。

    int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
    
  3. 有空闲有效工作线程时从JobStore中获取指定时间内<默认30s>要执行的的Trigger列表。

     triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                    now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
  4. 如果triggers中的第一个trigger的NextFireTime距离当前时间大于2ms, 则等待直到<2ms。

  5. 从JobStore中获取TriggerFiredResult列表(绑定了JobDetail和OperableTrigger<CronTrigger>间的关系)。

    List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
    
  6. 依每一个TriggerFiredResult初始化JobRunShellPropertySettingJobFactory通过TriggerFiredBundle获取到JobDetail, 初始化Job实例对象 ,反射设置好属性 最后封装到执行上下文中JobExecutionContextImpl。
    72d4e22235f777185206004f2cf7513bf2e.jpg

JobRunShell

最终执行实际任务的对象。

JobExecutionContextImpl中获取必要的JobDetail、trigger、Job等信息,并执行:job.execute(jec)  ---->自己的业务逻辑

当执行结束时trigger需要确定下一个状态码,在JobStore.triggeredJobComplete处依此来判定trigger的生命周期

AbstractTrigger.executionComplete

feb613e43a0a4e734670f1ad9f62a3867a2.jpg

RAMJobStore.triggeredJobComplete

f3ac269cb6f4f3d69f8d31315510ea68aed.jpg

转载于:https://my.oschina.net/u/3434392/blog/3059872


http://www.niftyadmin.cn/n/2572980.html

相关文章

Spring Boot 整合 JPA

[toc] 前言&#xff1a;之前一直用的都是Mybatis&#xff0c;最近由于工作原因&#xff0c;要使用JPA&#xff0c;因此整理一下学习笔记防止忘记&#xff0c;也希望能够帮到需要使用这个技术的人 1. Spring Data JPA 概念 JPA(Java Persistence API&#xff0c;Java持久层api) …

iscsi 华为存储配置 上课内容

创建一个LUN,10G。再创建一个LUN组&#xff0c;命名为rhce&#xff0c;把LUN加到这个LUN组里面去。/etc/iscsi/initiatorname.iscsi [rootlocalhost iscsi]# cat initiatorname.iscsi InitiatorNameiqn.1994-05.com.redhat:3dfb14784b4b如果改了名字 需要重启iscsi服务iscsi连…

谈谈 GC:新的 Orinoco 垃圾收集器

过去这些年 V8 的垃圾回收器&#xff08;GC&#xff09;发生了很多的变化&#xff0c;Orinoco 项目采用了 stop-the-world 垃圾回收器&#xff0c;以使其变成了一个更加并行&#xff0c;并发和增量的垃圾回收器。 不论什么垃圾回收器都有一些定期需要去做的任务&#xff1a; 标…

新技术

亲们&#xff0c;大家好&#xff0c;大众创业服务平台服务器确定将剥离掉业务功能的nopCommerce&#xff08;http://www.nopcommerce.com&#xff0c;国外开源的成熟电商&#xff09;项目做为系统基本框架&#xff0c;在该框架的基础上开发OpenAPI引擎&#xff0c;未来在引擎上…

2012/3/28----堆排序

堆排序是一种基于二叉树的排序&#xff0c;利用二叉树的性质进行排序的算法。但是这里的二叉树并不是真实存在的&#xff0c;是我们利用数组的编号进行设计的特殊的二叉树。 而堆排序其本质是一个就地排序算法。 Java代码 /* * 堆排序算法 * version 1.0 2012/3/28 * au…

[转帖]Prometheus+Grafana监控Kubernetes

原博客的位置: https://blog.csdn.net/shenhonglei1234/article/details/80503353 感谢原作者 这里记录一下自己试验过程中遇到的问题: 1. 自己查看prometheus 里面的配置文件时 对mount的路径理解不清晰,以为是需要宿主机里面需要有目录才可以, 实际上不需要. 是k8s 将证书和t…

省时省力小技巧教你图片怎么转文字?

现在的这个时代是一个现代化的社会&#xff0c;越来越多省时省力的生活方式或工作学习的小技巧出现。大家知道为什么“懒人”是生活的最惬意的一种人吗&#xff1f;因为他们不喜欢付出很多的力气&#xff0c;于是他们就想方设法研究出了很多的省时省力的小技巧。今天小编带领大…

2012/3/29----快速排序

前面用到了分治算法所演变出来的一种排序---归并排序。这里&#xff0c;我们介绍另一种分治算法演变出来的排序算法---快速排序。 快速排序通过选取数组中的关键字&#xff0c;把一个A[n]数组划分为3部分&#xff1a;A[key]关键字&#xff0c;A[0...key-1]{比关键字小的元素}&…