李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
Java
正文
07.SpringBoot集成Quartz实现分布式任务调度
Leefs
2021-09-03 PM
1627℃
0条
# 07.SpringBoot集成Quartz实现分布式任务调度 ### 前言 **本篇内容包括** + SpringBoot整合Quartz + Quartz持久化 + 分布式任务调度 ### 一、介绍 #### 1.1 Quartz集群 Quartz集群中每个节点都是一个单独的Quartz应用,它又管理着其他的节点。这个集群需要每个节点单独的启动或停止;和我们的应用服务器集群不同,独立的Quratz节点之间是不需要通信的。不同节点之间是通过数据库表来感知另一个应用。只有使用持久的JobStore才能完成Quartz集群。 ![07.SpringBoot集成Quartz实现分布式任务调度01.png](https://lilinchao.com/usr/uploads/2021/09/2659836787.png) #### 1.2 Quartz持久化 Quartz持久化配置提供了两种存储器: | 类型 | 优点 | 缺点 | | ------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | | RAMJobStore | 不要外部数据库,配置容易,运行速度快 | 因为调度程序信息是存储在被分配给 JVM 的内存里面,所以,当应用程序停止运行时,所有调度信息将被丢失。另外因为存储到JVM内存里面,所以可以存储多少个 Job 和 Trigger 将会受到限制 | | JDBC 作业存储 | 支持集群,因为所有的任务信息都会保存到数据库中,可以控制事物,还有就是如果应用服务器关闭或者重启,任务信息都不会丢失,并且可以恢复因服务器关闭或者重启而导致执行失败的任务 | 运行速度的快慢取决与连接数据库的快慢 | ### 二、操作步骤 需要提前创建一个SpringBoot项目 #### 2.1 引入依赖包 ```xml
org.springframework.boot
spring-boot-starter-web
commons-lang
commons-lang
2.5
mysql
mysql-connector-java
5.1.39
com.mchange
c3p0
0.9.5.4
``` #### 2.2 配置文件 通过在application.yml配置文件中对quartz进行相关配置。 ```yaml server: port: 8090 spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=utf-8&useSSL=false username: root password: 123456 quartz: jdbc: initialize-schema: never # 是否自动使用 SQL 初始化 Quartz 表结构。always:总是,never:不需要 job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。 properties: org: quartz: scheduler: instanceName: QuartzScheduler # 调度标识名 集群中每一个实例都必须使用相同的名称 instanceId: AUTO # 定时任务的实例编号, 如果手动指定需要保证每个节点的唯一性 threadPool: class: org.quartz.simpl.SimpleThreadPool threadCount: 100 # 线程池大小。默认为 10 。 threadPriority: 5 # 线程优先级 jobStore: misfireThreshold: 120000 ``` *说明:application.yml文件中的配置相当与Quartz中quartz.properties配置文件* #### 2.3 创建数据库表 ##### Quartz持久化过程创建数据库表方式 **第一种** 将yml文件中配置项initialize-schema在第一次执行时指定为always,会自动在数据库中生成表信息,当表创建完成后在将参数改成never ``` initialize-schema: always ``` **第二种** 在官网中下载Quartz对应版本的安装包在`docs\dbTables`目录下找到`tables_mysql_innodb.sql`文件在数据库中执行SQL。 官网下载地址:http://www.quartz-scheduler.org/downloads/ ##### 数据库表结构说明 | 表名 | 说明 | | ------------------------- | ----------------------------------------------------- | | qrtz_blob_triggers | 以Blob 类型存储的触发器 | | qrtz_calendars | 存放日历信息, quartz可配置一个日历来指定一个时间范围 | | qrtz_cron_triggers | 存放cron类型的触发器 | | qrtz_fired_triggers | 存放已触发的触发器 | | qrtz_job_details | 存放一个jobDetail信息 | | qrtz_job_listeners | job监听器 | | qrtz_locks | 存储程序的悲观锁的信息(假如使用了悲观锁) | | qrtz_paused_trigger_graps | 存放暂停掉的触发器 | | qrtz_scheduler_state | 调度器状态 | | qrtz_simple_triggers | 简单触发器的信息 | | qrtz_trigger_listeners | 触发器监听器 | | qrtz_triggers | 触发器的基本信息 | #### 2.4 定义Job + QuartzJobFirst ```java import com.dataojo.quartz.util.SchedulerUtils; import org.quartz.*; import org.springframework.scheduling.quartz.QuartzJobBean; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * @author lilinchao * @date 2021/9/2 * @description 1.0 **/ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class QuartzJobFirst extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext jobExecutionContext){ SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); List
executionContexts; try { executionContexts = SchedulerUtils.getScheduler().getCurrentlyExecutingJobs(); } catch (SchedulerException e) { e.printStackTrace(); return; } for (JobExecutionContext executionContext : executionContexts){ JobKey jobKey = executionContext.getJobDetail().getKey(); Date fireTime = executionContext.getFireTime(); System.out.println(jobKey+",对应的执行时间是"+sf.format(fireTime)); } } } ``` + QuartzJobTwo ```java import org.quartz.*; import org.springframework.scheduling.quartz.QuartzJobBean; import java.text.SimpleDateFormat; import java.util.Date; /** * @author lilinchao * @date 2021/9/2 * @description 1.0 **/ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class QuartzJobTwo extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext jobExecutionContext){ SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("QuartzJobTwo执行时间是"+sf.format(new Date())); } } ``` **说明** + **QuartzJobBean** QuartzJobBean已经实现了job接口,并重写了接口中的execute()方法,在SpringBoot集成时候直接继承QuartzJobBean即可。执行逻辑在executeInternal()方法中。 + **@PersistJobDataAfterExecution** 告诉Quartz在成功执行了Job实现类的execute方法后(没有发生任何异常),更新JobDetail中JobDataMap的数据,使得该JobDetail实例在下一次执行的时候,JobDataMap中是更新后的数据,而不是更新前的旧数据。 + **@DisallowConcurrentExecution** 告诉Quartz不要并发地执行同一个JobDetail实例。 **总结** + 当某一个JobDetail实例到点运行之后,在其运行结束之前,不会再发起一次该JobDetail实例的调用,即使设置的该JobDetail实例的定时执行时间到了。 + JobDetail实例之间互不影响。 #### 2.5 实现ApplicationListener 完成动态调度 根据ApplicationListener的原理,其onApplicationEvent(ContextRefreshedEvent event) 方法会在初始化所有的bean之后被调用,因此我们可以在这里进行scheduler的创建、启动,以及注册trigger和job。 ```java import com.dataojo.quartz.job.QuartzJobFirst; import com.dataojo.quartz.job.QuartzJobTwo; import com.dataojo.quartz.util.SchedulerUtils; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; /** * @author lilinchao * @date 2021/9/2 * @description 1.0 **/ @Component public class StartApplicationListener implements ApplicationListener
{ @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { System.out.println("-------执行StartApplicationListener--------"); SchedulerUtils.scheduleCronJob(QuartzJobFirst.class, "*/5 * * * * ?"); SchedulerUtils.scheduleCronJob(QuartzJobTwo.class, "*/10 * * * * ?"); } } ``` #### 2.6 工具类 + **SchedulerUtils** ```java import org.apache.commons.lang.time.DateUtils; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.ParseException; import java.util.Date; /** * @author lilinchao * @date 2021/9/3 * @description 1.0 */ public class SchedulerUtils { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerUtils.class); private static Scheduler scheduler; private static String jobGroup = "group"; static { if (SpringContextUtils.getApplicationContext() != null) { scheduler = SpringContextUtils.getBean(Scheduler.class); } else { SchedulerFactory schedulerFactory = new StdSchedulerFactory(); try { scheduler = schedulerFactory.getScheduler(); scheduler.start(); } catch (SchedulerException e) { throw new RuntimeException(e); } } } private SchedulerUtils() { } public static Scheduler getScheduler() { return scheduler; } public static void scheduleCronJob(Class extends Job> jobClass, String cronExpression) { scheduleCronJob(jobClass, jobClass.getSimpleName(), cronExpression); } public static void scheduleCronJob(Class extends Job> jobClass, Object name, String cronExpression) { scheduleCronJob(jobClass, name, jobGroup, cronExpression); } public static void scheduleCronJob(Class extends Job> jobClass, Object name, String group, String cronExpression) { scheduleCronJob(jobClass, name, group, cronExpression, null); } public static void scheduleCronJob(Class extends Job> jobClass, Object name, String group, String cronExpression, JobDataMap jobDataMap) { scheduleCronJob(jobClass, name, group, cronExpression, jobDataMap, null, null); } public static void scheduleCronJob(Class extends Job> jobClass, Object name, String group, String cronExpression, JobDataMap jobDataMap, Date startDate, Date endDate) { try { JobKey jobKey = new JobKey(String.valueOf(name), group); if (!scheduler.checkExists(jobKey)) { JobBuilder jobBuilder = JobBuilder.newJob(jobClass); jobBuilder.withIdentity(jobKey); if (jobDataMap != null && !jobDataMap.isEmpty()) { jobBuilder.setJobData(jobDataMap); } JobDetail jobDetail = jobBuilder.build(); CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); TriggerBuilder
triggerBuilder = TriggerBuilder.newTrigger().withSchedule(cronScheduleBuilder); if (startDate != null) { triggerBuilder.startAt(startDate); } else { triggerBuilder.startNow(); } if (endDate != null) { triggerBuilder.endAt(endDate); } CronTrigger trigger = triggerBuilder.build(); scheduler.scheduleJob(jobDetail, trigger); } } catch (Exception e) { LOGGER.error("Submit job error, name=" + name + " and group=" + group, e); } } /** * 默认立即执行且只执行一次 * @param jobClass */ public static void scheduleSimpleJob(Class extends Job> jobClass) { scheduleSimpleJob(jobClass, jobClass.getSimpleName()); } public static void scheduleSimpleJob(Class extends Job> jobClass, Object name) { scheduleSimpleJob(jobClass, name, 0, 0); } /** * @param jobClass * @param name * @param intervalInMilliseconds 执行间隔 * @param repeatCount 重复次数,小于0的时候重复执行 */ public static void scheduleSimpleJob(Class extends Job> jobClass, Object name, long intervalInMilliseconds, int repeatCount) { scheduleSimpleJob(jobClass, name, jobGroup, intervalInMilliseconds, repeatCount); } public static void scheduleSimpleJob(Class extends Job> jobClass, Object name, String group, long intervalInMilliseconds, int repeatCount) { scheduleSimpleJob(jobClass, name, group, intervalInMilliseconds, repeatCount, null); } public static void scheduleSimpleJob(Class extends Job> jobClass, Object name, String group, long intervalInMilliseconds, int repeatCount, JobDataMap jobDataMap) { scheduleSimpleJob(jobClass, name, group, intervalInMilliseconds, repeatCount, jobDataMap, null, null); } public static void scheduleSimpleJob(Class extends Job> jobClass, Object name, String group, long intervalInMilliseconds, int repeatCount, JobDataMap jobDataMap, Date startDate, Date endDate) { try { JobKey jobKey = new JobKey(String.valueOf(name), group); if (!scheduler.checkExists(jobKey)) { JobBuilder jobBuilder = JobBuilder.newJob(jobClass); jobBuilder.withIdentity(jobKey); if (jobDataMap != null && !jobDataMap.isEmpty()) { jobBuilder.setJobData(jobDataMap); } JobDetail jobDetail = jobBuilder.build(); SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule(); simpleScheduleBuilder.withIntervalInMilliseconds(intervalInMilliseconds); if (repeatCount >= 0) { simpleScheduleBuilder.withRepeatCount(repeatCount); } else { simpleScheduleBuilder.repeatForever(); } TriggerBuilder
triggerBuilder = TriggerBuilder.newTrigger().withSchedule(simpleScheduleBuilder); if (startDate != null) { triggerBuilder.startAt(startDate); } else { triggerBuilder.startNow(); } if (endDate != null) { triggerBuilder.endAt(endDate); } SimpleTrigger trigger = triggerBuilder.build(); scheduler.scheduleJob(jobDetail, trigger); } } catch (Exception e) { LOGGER.error("Submit job error, name=" + name + " and group=" + group, e); } } public static void interrupt(Object name, String group) { JobKey jobKey = new JobKey(String.valueOf(name), group); try { if (scheduler.checkExists(jobKey)) { scheduler.interrupt(jobKey); } } catch (SchedulerException e) { LOGGER.warn("Interrupt job error, name=" + name + " and group=" + group, e); } } public static void deleteJob(Object name, String group) { JobKey jobKey = new JobKey(String.valueOf(name), group); try { if (scheduler.checkExists(jobKey)) { scheduler.deleteJob(jobKey); } } catch (SchedulerException e) { LOGGER.warn("Delete job error, name=" + name + " and group=" + group, e); } } public static boolean checkExists(Object name, String group) { JobKey jobKey = new JobKey(String.valueOf(name), group); try { return scheduler.checkExists(jobKey); } catch (SchedulerException e) { LOGGER.warn("CheckExists job error, name=" + name + " and group=" + group, e); } return false; } public static Date getNeedFireTime(String cron, Date startDate) { Date nextFireTime1 = getNextFireTime(cron, startDate); Date nextFireTime2 = getNextFireTime(cron, nextFireTime1); int intervals = (int) (nextFireTime2.getTime() - nextFireTime1.getTime()); Date cal1 = DateUtils.addMilliseconds(nextFireTime1, - intervals); Date cal2 = getNextFireTime(cron, cal1); Date cal3 = getNextFireTime(cron, cal2); while (!cal3.equals(nextFireTime1)) { cal1 = DateUtils.addMilliseconds(cal1, - intervals); cal2 = getNextFireTime(cron, cal1); cal3 = getNextFireTime(cron, cal2); if (cal3.before(nextFireTime1)) { intervals = -1000; } } return cal2; } public static Date getNextFireTime(String cron, Date startDate) { return getCronExpression(cron).getTimeAfter(startDate); } private static CronExpression getCronExpression(String cron) { try { return new CronExpression(cron); } catch (ParseException e) { throw new IllegalArgumentException(e); } } } ``` + **SpringContextUtils** 实现ApplicationContextAware的工具类,可以通过其它类引用它以操作spring容器及其中的Bean实例。 ```java import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author lilinchao * @date 2021/9/3 * @description 1.0 */ @Component public class SpringContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtils.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } public static Object getBean(String beanName) throws BeansException { return applicationContext.getBean(beanName); } public static
T getBean(String beanName, Class
clazz) throws BeansException { return applicationContext.getBean(beanName, clazz); } public static
T getBean(Class
clazz) throws BeansException { return applicationContext.getBean(clazz); } public static Object getBean(String beanName, Object... args) throws BeansException { return applicationContext.getBean(beanName, args); } public static
T getBean(Class
clazz, Object... args) throws BeansException { return applicationContext.getBean(clazz, args); } } ``` **说明** Spring容器会检测容器中的所有Bean,如果发现某个Bean实现了ApplicationContextAware接口,Spring容器会在创建该Bean之后,自动调用该Bean的setApplicationContextAware()方法,调用该方法时,会将容器本身作为参数传给该方法——该方法中的实现部分将Spring传入的参数(容器本身)赋给该类对象的applicationContext实例变量,因此接下来可以通过该applicationContext实例变量来访问容器本身。 #### 2.7 运行结果 ![07.SpringBoot集成Quartz实现分布式任务调度02.jpg](https://lilinchao.com/usr/uploads/2021/09/2287616831.jpg) ### 三、项目目录结构 ![07.SpringBoot集成Quartz实现分布式任务调度03.jpg](https://lilinchao.com/usr/uploads/2021/09/3691505737.jpg) *附参考文章链接:* *https://www.jianshu.com/p/ab438d944669* *https://www.cnblogs.com/summerday152/p/14193968.htm* *https://gitee.com/tqbx/springboot-samples-learn/tree/master/spring-boot-quartz*
标签:
Quartz
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://www.lilinchao.com/archives/1411.html
上一篇
【转载】06.Quartz配置quartz.properties详解
下一篇
01.Yarn基础架构
取消回复
评论啦~
提交评论
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
标签云
Flink
哈希表
工具
容器深入研究
排序
并发编程
Java编程思想
Azkaban
FastDFS
gorm
Quartz
MyBatisX
Docker
Map
MySQL
人工智能
Hbase
NIO
线程池
Golang基础
ajax
机器学习
Linux
Java工具类
Sentinel
FileBeat
微服务
国产数据库改造
序列化和反序列化
Eclipse
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞