2019独角兽企业重金招聘Python工程师标准>>>
Scheduler
- 通过调度器工厂SchedulerFactory的实例对象StdSchedulerFactory构建Scheduler ;
- 从指定的文件初始化配置信息(默认文件名"quartz.properties", 系统变量为"org.quartz.properties")
定义的默认执行QuartzSchedulerThread的线程池为:org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
- 一般返回StdScheduler,但核心是QuartzScheduler!
StdSchedulerFactory.instantiate() 很重要!初始化RAMJobStore、SimpleThreadPool、QuartzScheduler、JobFactory、SchedulerPlugin、JobRunShellFactory、QuartzSchedulerResources 等对象,返回StdScheduler实例。
- 从指定的文件初始化配置信息(默认文件名"quartz.properties", 系统变量为"org.quartz.properties")
-
QuartzScheduler核心: 线程对象QuartzSchedulerThread。随着Scheduler实例化而被创建并扔进线程池执行。
该线程就是调度线程,主要任务就是不停的从JobStore中获取即将被触发的触发器来执行。
scheduleJob
- 构建JobDetail<JobDetailImpl>
JobDetail result = JobBuilder.newJob(Class <? extends Job> jobClass).withIdentity(String name).build();
- 构建Trigger (有各种类型,由ScheduleBuilder来指定。 eg. SimpleScheduleBuilder -> SimpleTrigger<SimpleTriggerImpl>; CronScheduleBuilder-> CronTrigger<CronTriggerImpl>)
TriggerBuilder.newTrigger().withIdentity(String name).withSchedule(ScheduleBuilder schedBuilder).build()
- scheduler<QuartzScheduler>.scheduleJob(JobDetail jobDetail, Trigger trigger)
- Trigger的JobKey一定与JobDetail的Key得相同, 否则异常!
- 计算出trigger在scheduler中能够第一次执行的时间,若无效则异常!(eg. CronCalendar)
- 在JobStore中注册jobDetail和trigger;
- 唤醒QuartzSchedulerThread中的sigLock等待锁;并将trigger下一次要实行的时间NextFireTime通过SchedulerSignalerImpl传递到QuartzSchedulerThread
- scheduler<QuartzScheduler>.start(): 设置QuartzSchedulerThread中的paused为false,触发任务的执行。
SimpleThreadPool
根据count生成WorkerThread并保存在availWorkers 中;
当使用线程池时,通过synchronized 来控制并发。从availWorkers 移出第一个WorkerThread使用并保存到busyWorkers中 ;如果没有空闲线程则wait。
与JDK提供的线程池有很大的不同, 没有缓存队列、最大线程数拒绝策略等。 通过阻塞wait直到有空闲workers再执行。如果Shutdow后,还有任务被提交执行,则直接新实例化WorkerThread
public class SimpleThreadPool implements ThreadPool {
private int count = -1; // 线程个数
private List<WorkerThread> workers; //只是维护初始化的workerThread集合
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>(); //可用线程
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>(); //繁忙线程
......
public void initialize() throws SchedulerConfigException {
......
// create the worker threads and start them
Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = workerThreads.next();
wt.start();
availWorkers.add(wt);
}
}
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}
synchronized (nextRunnableLock) {
handoffPending = true;
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
WorkerThread
控制单个Runnable对象的执行过程。
成员变量有个Object类型对象作为lock,WorkerThread实例化后run()过程中如果还没有注入runnable且执行标记run==false时循环调用:lock.wait(500);
当SimpleThreadPool.runThread(Runnable) 调用WorkerThread.run(Runnable)时,注入执行Runnable并 lock.notifyAll()。
QuartzSchedulerThread
quartz核心处理过程, 通过synchronized(sigLock ) 来控制线程并发。
run()
- 通过Object对象sigLock & paused & 原子整型 halted 来控制当先线程是否运行任务。
- sigLock: 方法signalSchedulingChange、togglePause、halt 都触发notifyAll()。
- paused: 初始化为true,默认阻塞线程执行。
-
执行会优先判定线程池<SimpleThreadPool>中是否有空闲有效的WorkerThread,没有则阻塞。
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
-
有空闲有效工作线程时从JobStore中获取指定时间内<默认30s>要执行的的Trigger列表。
triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
-
如果triggers中的第一个trigger的NextFireTime距离当前时间大于2ms, 则等待直到<2ms。
-
从JobStore中获取TriggerFiredResult列表(绑定了JobDetail和OperableTrigger<CronTrigger>间的关系)。
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
-
依每一个TriggerFiredResult初始化JobRunShell。PropertySettingJobFactory通过TriggerFiredBundle获取到JobDetail, 初始化Job实例对象 ,反射设置好属性 最后封装到执行上下文中JobExecutionContextImpl。
JobRunShell
最终执行实际任务的对象。
从JobExecutionContextImpl中获取必要的JobDetail、trigger、Job等信息,并执行:job.execute(jec) ---->自己的业务逻辑
当执行结束时trigger需要确定下一个状态码,在JobStore.triggeredJobComplete处依此来判定trigger的生命周期
AbstractTrigger.executionComplete
RAMJobStore.triggeredJobComplete