Skip to content

Quartz

1. Quartz介绍

Quartz是OpenSymphony开源的一个项目,是一个由Java编写的开源作业调度框架。官网:https://www.quartz-scheduler.org

2. 特点

  1. 支持分布式高可用, 相比使用@Scheduled方式会造成所有节点都执行一遍。
  2. 支持持久化,Quartz有专门的数据表来实现定时任务的持久化
  3. 支持多任务调度和管理,@Scheduled不支持任务的增删改查。

3. Quartz组成

  1. 任务:JobDetail
  2. 触发器:Trigger(分为SimpleTrigger和CronTrigger)
  3. 调度器:Scheduler

3.1 JobDetail

JobDetail主要由JobKey(job的名字name和分组group)、JobClass、JobDataMap(任务相关的数据)、JobBuilder组成。常用的是前几个。
Alt text JobDetail是一个接口,不能直接创建,创建方式如下:

java
Map<String,String> jobData = new HashMap<>();
		String jobName = "schedulerJob";
		String jobGroup = "schedulerGroup";
		jobData.put("key00", "value00");
        JobDetail jobDetail = JobBuilder.newJob(SchedulerJob.class)
                .withIdentity(jobName, jobGroup)
                .usingJobData("key01", "value01")
                .usingJobData(jobData)
                .storeDurably()
                .build();

3.2 Trigger

Trigger规定触发执行Job实现类,主要有SimpleTrigger和CronTrigger两个实现类。Trigger由以下部分组成:
Alt text

  1. TriggerKey(job的名字name和分组group)
  2. JobDataMap(Trigger相关的数据,同JobDetail中JobDataMap,存相同key,若value不同,会覆盖前者。)
  3. ScheduleBuilder(有CronScheduleBuilder、SimpleScheduleBuilder、 CalendarIntervalScheduleBuilder、DailyTimeIntervalScheduleBuilder常用前2种。)
    Trigger也是接口,创建方式如下:
java
//SimpleScheduleBuilder
String triggerName = "schedulerJob";
String triggerGroup = "schedulerGroup";
Trigger trigger = TriggerBuilder
    .newTrigger()
    .withIdentity(triggerName, triggerGroup)
    .withSchedule(SimpleScheduleBuilder)
    .repeatSecondlyForever(1)
    .withIntervalInSeconds(0)
    .withRepeatCount(0))
    .startNow()
    .build();

//CronScheduleBuilder
String triggerName2 = "schedulerJob2";
String triggerGroup2 = "schedulerGroup2";
String jobTime = "0 0 * * * ?";
Trigger trigger2 = TriggerBuilder
    .newTrigger()
    .withIdentity(triggerName2, triggerGroup2)
    .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime))
    .startNow()
    .build();

3.3 Scheduler

调度器就是为了读取触发器Trigger从而触发定时任务JobDetail。可以通过SchedulerFactory进行创建调度器,分为StdSchedulerFactory(常用)和DirectSchedulerFactory两种。

  1. StdSchedulerFactory使用一组属性(放在配置文件中)创建和初始化调度器,然后通过getScheduler()方法生成调度程序。
  2. DirectSchedulerFactory不常用,不支持持久化任务。 Scheduler是个接口,使用如下:
java
/建好jobDetail,trigger
... ...
//StdSchedulerFactory方式,用的多
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler schedulerStd = schedulerFactory.getScheduler();

//DirectSchedulerFactory方式
DirectSchedulerFactory directSchedulerFactory = DirectSchedulerFactory.getInstance();
Scheduler schedulerDir=directSchedulerFactory.getScheduler();

//执行调度
schedulerStd.scheduleJob(jobDetail, trigger);
schedulerStd.start();

4. Cron表达式

定时任务离不开Cron表达式设置具体执行时间或执行周期,Cron表达式是一个字符串,一般有两种表达:

  1. 秒 分 小时 日 月 星期 年
  2. 秒 分 小时 日 月 星期
    其中年份即为可选的,中间用空格分开。星期除了使用数字1-7数字表示,额外还可以使用英文缩写来表示,注意1表示的是星期日,7表示的星期六。
位置名称允许值允许的特殊字符
10-59, - * /
2分钟0-59, - * /
3小时0-23, - * /
4日期1-31, - * ? / L W C
5月份1-12, - * /
6星期1-7, - * ? / L C #
7年(可选)空值1970~2099, - * /

*:星号,表示每个字段对应的时间域的每一个,如在日中,就是表示每天。
?:问号,只能在日期和星期字段中使用,表示无意义的值,等价于点位符。
-:减号,表示一个范围,如在分钟中使用5-8,则表示5-8分钟,即5、6、7、8分钟。
,:逗号,表示一个列表值,如在星期中星期一和星期三使用MON,WED,也可以使用数字来表示:1,3
/:斜杠,使用x/y来表示一个等步长序列,x表示起始值,y表示步长值。如在秒字段中使用0/15,表示从0秒开始,每15秒增量,即0秒,15秒,30秒,45秒,这种就可以理解为每15秒执行任务。
L:只能在日期和星期字段中使用,表示Last。在日期中,L表示月份的最后一天,如1月中的31日;在星期中,L表示星期六(或数字7)。
W:只能在日期字段中使用,表示离该日期最近的工作期,不可以跨月。如10W,表示离该月10号最近的工作日,若10号为星期六,则匹配9号星期五;若10号为星期日,则匹配11号星期一;若10号为星期一,则匹配10号星期一。LW组合表示该月的最后一个工作日。
C:只能在日期和星期字段中使用,表示Calendar,即计划所关联的日期,若日期未被关联,则等价于关联所有日期。如日期中使用4C,表示日期4号以后的第一天;星期中使用1C,表示星期日后的第一天。
#:井号只能在星期字段中使用,表示当月某个工作日。如6#2表示当月的第二个星期五(其中,6表示星期五,#3表示当月的第二个)。

4.1 常见Cron示例

Cron表达式说明
0 0 * * * ?每小时0分0秒运行
0 0 1 * * ?每天01:00:00运行
0 0 1 * * ? *每天01:00:00运行,同上
0 0 1 * * ? 20212021年每天01:00:00运行
0 * 10 * * ?每天10点-11点之间每分钟运行一次,
开始于10:00:00,结束于10:59:00
0 0/5 10 * * ?每天10点-11点之间每5分钟运行一次,
开始于10:00:00,结束于10:59:00
0 0/5 10,15 * * ?每天10点-11点之间每5分钟运行一次,
每天15点-16点之间每5分钟运行一次
0 0-10 10 * * ?每天10:00-10:10之间每分钟运行
0 10 1 ? * MON-FRI每周一,二,三,四,五的1:10分运行
0 10 1 1 * ?每月1日的1:10分运行
0 10 1 L * ?每月最后一天1:10分运行
0 10 1 ? * 6L每月最后一个星期五1:10分运行
0 10 1 ? * 6#3每月第3个星期五1:10分运行

5. Quartz增删改查模板

java
public interface QuartzService {
    /**
     * 增加一个任务job
     * @param jobClass  任务job实现类
     * @param jobName   任务job名称(保证唯一性)
     * @param jobGroupName  任务job组名
     * @param jobTime   任务时间间隔(秒)
     * @param jobTimes  任务运行次数(若<0,则不限次数)
     * @param jobData   任务参数
     */
    void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
                int jobTimes, Map jobData);
    /**
     * 增加一个任务job
     * @param jobClass  任务job实现类
     * @param jobName   任务job名称(保证唯一性)
     * @param jobGroupName  任务job组名
     * @param jobTime   任务时间表达式
     * @param jobData   任务参数
     */
    void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData);
    /**
     * 修改一个任务job
     * @param jobName 任务名称
     * @param jobGroupName  任务组名
     * @param jobTime   cron时间表达式
     */
    void updateJob(String jobName, String jobGroupName, String jobTime);
    /**
     * 删除一个任务job
     * @param jobName
     * @param jobGroupName
     */
    void deleteJob(String jobName, String jobGroupName);
    /**
     * 暂停一个任务job
     * @param jobName
     * @param jobGroupName
     */
    void pauseJob(String jobName, String jobGroupName);
    /**
     * 恢复一个任务job
     * @param jobName
     * @param jobGroupName
     */
    void resumeJob(String jobName, String jobGroupName);
    /**
     * 立即执行一个任务job
     * @param jobName
     * @param jobGroupName
     */
    void runAJobNow(String jobName, String jobGroupName);
    /**
     * 获取所有任务job
     */
    List<Map<String, Object>> queryAllJob();
    /**
     * 获取正在运行的任务job
     */
    List<Map<String, Object>> queryRunJob();
}
java
@Slf4j
@Service
public class QuartzServiceImpl implements QuartzService {

    @Autowired
    private Scheduler scheduler;

    @PostConstruct
    public void startScheduler() {
        try {
            scheduler.start();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, int jobTime,
                       int jobTimes, Map jobData) {
        try {
            // 任务名称和组构成任务key
            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
                    .build();
            // 设置job参数
            if(jobData!= null && jobData.size()>0){
                jobDetail.getJobDataMap().putAll(jobData);
            }
            // 使用simpleTrigger规则
            Trigger trigger = null;
            if (jobTimes < 0) {
                trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
                        .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime))
                        .startNow().build();
            } else {
                trigger = TriggerBuilder
                        .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder
                                .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes))
                        .startNow().build();
            }
            log.info("jobDataMap: {}", jobDetail.getJobDataMap().getWrappedMap());
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
            throw new BaseException("add job error!");
        }
    }

    @Override
    public void addJob(Class<? extends QuartzJobBean> jobClass, String jobName, String jobGroupName, String jobTime, Map jobData) {
        try {
            // 创建jobDetail实例,绑定Job实现类
            // 指明job的名称,所在组的名称,以及绑定job类
            // 任务名称和组构成任务key
            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)
                    .build();
            // 设置job参数
            if(jobData!= null && jobData.size()>0){
                jobDetail.getJobDataMap().putAll(jobData);
            }
            // 定义调度触发规则
            // 使用cornTrigger规则
            // 触发器key
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName)
                    .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
                    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).startNow().build();
            // 把作业和触发器注册到任务调度中
            scheduler.scheduleJob(jobDetail, trigger);
            log.info("jobDataMap: {}", jobDetail.getJobDataMap());
        } catch (Exception e) {
            e.printStackTrace();
            throw new BaseException("add job error!");
        }
    }

    /**
     * 修改 一个job的 时间表达式
     *
     * @param jobName
     * @param jobGroupName
     * @param jobTime
     */
    @Override
    public void updateJob(String jobName, String jobGroupName, String jobTime) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            log.info("new jobTime: {}", jobTime);
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
                    .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build();
            // 重启触发器
            scheduler.rescheduleJob(triggerKey, trigger);
        } catch (SchedulerException e) {
            e.printStackTrace();
            throw new BaseException("update job error!");
        }
    }

    @Override
    public void deleteJob(String jobName, String jobGroupName) {
        try {
            scheduler.deleteJob(new JobKey(jobName, jobGroupName));
        } catch (Exception e) {
            e.printStackTrace();
            throw new BaseException("delete job error!");
        }
    }

    @Override
    public void pauseJob(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.pauseJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
            throw new BaseException("pause job error!");
        }
    }

    @Override
    public void resumeJob(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.resumeJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
            throw new BaseException("resume job error!");
        }
    }

    /**
     * 立即执行一个job
     * @param jobName
     * @param jobGroupName
     */
    @Override
    public void runAJobNow(String jobName, String jobGroupName) {
        try {
            JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
            scheduler.triggerJob(jobKey);
        } catch (SchedulerException e) {
            e.printStackTrace();
            throw new BaseException("run a job error!");
        }
    }

    /**
     * 获取所有计划中的任务列表
     */
    @Override
    public List<Map<String, Object>> queryAllJob() {
        List<Map<String, Object>> jobList = null;
        try {
            GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
            Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
            jobList = new ArrayList<Map<String, Object>>();
            for (JobKey jobKey : jobKeys) {
                log.info("maps: {}", scheduler.getJobDetail(jobKey).getJobDataMap().getWrappedMap());
                List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
                for (Trigger trigger : triggers) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("jobName", jobKey.getName());
                    map.put("jobGroupName", jobKey.getGroup());
                    map.put("description", "触发器:" + trigger.getKey());
                    Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                    map.put("jobStatus", triggerState.name());
                    if (trigger instanceof CronTrigger) {
                        CronTrigger cronTrigger = (CronTrigger) trigger;
                        String cronExpression = cronTrigger.getCronExpression();
                        map.put("jobTime", cronExpression);
                    }
                    jobList.add(map);
                }
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
            throw new BaseException("query all jobs error!");
        }
        return jobList;
    }

    /**
     * 获取所有正在运行的job
     */
    @Override
    public List<Map<String, Object>> queryRunJob() {
        List<Map<String, Object>> jobList = null;
        try {
            List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
            jobList = new ArrayList<Map<String, Object>>(executingJobs.size());
            for (JobExecutionContext executingJob : executingJobs) {
                Map<String, Object> map = new HashMap<String, Object>();
                JobDetail jobDetail = executingJob.getJobDetail();
                JobKey jobKey = jobDetail.getKey();
                Trigger trigger = executingJob.getTrigger();
                map.put("jobName", jobKey.getName());
                map.put("jobGroupName", jobKey.getGroup());
                map.put("description", "触发器:" + trigger.getKey());
                Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
                map.put("jobStatus", triggerState.name());
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    map.put("jobTime", cronExpression);
                }
                jobList.add(map);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
            throw new BaseException("query run jobs error!");
        }
        return jobList;
    }
}

6. Quartz数据表

qrtz_blob_triggers : 以Blob类型存储的触发器。
qrtz_calendars:存放日历信息, quartz可配置一个日历来指定一个时间范围。
qrtz_cron_triggers:存放cron类型的触发器。
qrtz_fired_triggers:存放已触发的触发器。
qrtz_job_details:存放一个jobDetail信息。
qrtz_job_listeners:job监听器。
qrtz_locks:存储程序的悲观锁的信息(假如使用了悲观锁)。
qrtz_paused_trigger_graps:存放暂停掉的触发器。
qrtz_scheduler_state:调度器状态。
qrtz_simple_triggers:简单触发器的信息。
qrtz_trigger_listeners:触发器监听器。
qrtz_triggers:触发器的基本信息。