一、概述
使用quartz或spring-task时,会有两个痛点:
- 不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误。
- quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。
Elastic-job分布式任务调度可以解决上述两个痛点。
Elastic-Job是当当开源的一款非常好用的分布式任务调度框架,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。Elastic-Job-Lite这也是本次所要讲解和使用的子项目。使用zookeeper作为注册中心
主要功能
分布式调度协调
在分布式环境中,任务能够按指定的调度策略执行,并且能够避免同一任务多实例重复执行。
丰富调度策略
基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。
弹性扩容缩容
当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被转移到别的实例来执行。
失效转移
某实例在任务执行失败后,会被转移到其他实例执行。
错过执行作业重触发
若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
自诊断并修复分布式不稳定造成的问题
支持并行调度
支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。
支持作业生命周期操作
可以动态对任务进行开启及停止操作。
丰富的作业类型
支持Simple、DataFlow、Script三种作业类型,后续会有详细介绍。
Spring整合以及命名空间提供
对Spring支持良好的整合方式,支持spring自定义命名空间,支持占位符。
运维平台
提供运维界面,可以管理作业和注册中心。
官网**:**http://elasticjob.io
1.1 架构

App:应用程序,内部包含任务执行业务逻辑和Elastic-Job-Lite组件,其中执行任务需要实现ElasticJob接口完成 与Elastic-Job-Lite组件的集成,并进行任务的相关配置。应用程序可启动多个实例,也就出现了多个任务执行实 例。
Elastic-Job-Lite:Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服 务,此组件负责任务的调度,并产生日志及任务调度记录。
无中心化,是指没有调度中心这一概念,每个运行在集群中的作业服务器都是对等的,各个作业节点是自治的、平 等的、节点之间通过注册中心进行分布式协调。
Registry:以Zookeeper作为Elastic-Job的注册中心组件,存储了执行任务的相关信息。同时,Elastic-Job利用该 组件进行执行任务实例的选举。
Console:Elastic-Job提供了运维平台,它通过读取Zookeeper数据展现任务执行状态,或更新Zookeeper数据修 改全局配置。通过Elastic-Job-Lite组件产生的数据来查看任务执行历史记录。
应用程序在启动时,在其内嵌的Elastic-Job-Lite组件会向Zookeeper注册该实例的信息,并触发选举(此时可能已 经启动了该应用程序的其他实例),从众多实例中选举出一个Leader,让其执行任务。当到达任务执行时间时, Elastic-Job-Lite组件会调用由应用程序实现的任务业务逻辑,任务执行后会产生任务执行记录。当应用程序的某一 个实例宕机时,Zookeeper组件会感知到并重新触发leader选举。
1.2 相关概念
分片概念
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。
分片项与业务处理解耦
Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
个性化参数的适用场景
个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。
分布式调度
Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
作业高可用
Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。
一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
最大限度利用资源
Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
1.3 作业类型
Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。
Simple类型作业
意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。
Dataflow类型作业
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
Script类型作业
意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
二、集成Demo
2.1 与springboot集成
依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic‐job‐lite‐spring</artifactId>
<version>2.1.5</version>
</dependency>配置
zookeeper配置
@Configuration
public class ElasticJobRegistryCenterConfig {
private String registryServerList = "localhost:2181";
private String registryNamespace = "elastic‐job‐example‐springboot";
/**
* 配置Zookeeper * @return
*/
@Bean(initMethod = "init")
public CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(registryServerList, registryNamespace);
zkConfig.setSessionTimeoutMilliseconds(100);
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
return regCenter;
}
}Job配置
@Configuration
public class ElasticJobConfig {
@Autowired
private CoordinatorRegistryCenter registryCenter;
@Autowired
private FileBackupJob fileBackupJob;
/**
* 配置任务详细信息
* @param jobClass 任务执行类
* @param cron 执行策略
* @param shardingTotalCount 分片数量
* @param shardingItemParameters 分片个性化参数
* @return
*/
private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
// 定义作业核心配置
JobCoreConfiguration.Builder simpleCoreConfigBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
if(!StringUtils.isEmpty(shardingItemParameters)){
simpleCoreConfigBuilder.shardingItemParameters(shardingItemParameters);
}
JobCoreConfiguration jobCoreConfiguration =simpleCoreConfigBuilder.build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
// 定义Lite作业根配置
JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
return (LiteJobConfiguration) simpleJobRootConfig;
}
/**
* 任务启动
* @return
*/
@Bean(initMethod = "init")
public SpringJobScheduler initSimpleElasticJob() {
SimpleJob job1 = fileBackupJob;
SpringJobScheduler jobScheduler = new SpringJobScheduler(job1, registryCenter, createJobConfiguration(job1.getClass(),"0/3 * * * * ?",1,null));
return jobScheduler;
}
}2.2 与spring整合
依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>作业
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
/** 作业类 */
public class MyJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
// 获取分片总数
int shardingTotalCount = context.getShardingTotalCount();
// 获取分片项
int shardingItem = context.getShardingItem();
// 获取分片参数
String shardingParameter = context.getShardingParameter();
System.out.println("作业名称:" + context.getJobName());
System.out.println("分片总数:" + shardingTotalCount);
System.out.println("分片项:" + shardingItem);
System.out.println("分片参数:" + shardingParameter);
// 对不同的分片项进行处理
switch (shardingItem){
case 0:
System.out.println("第1个分片参数:" + shardingParameter);
break;
case 1:
System.out.println("第2个分片参数:" + shardingParameter);
break;
case 2:
System.out.println("第3个分片参数:" + shardingParameter);
break;
case 3:
System.out.println("第4个分片参数:" + shardingParameter);
break;
default:
System.out.println("==default==");
}
System.out.println("============================");
}
}配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">
<!--
配置作业注册中心
id: bean的id
server-lists: 连接zookeeper服务器地址(host1:port,host2:port)
namespace: zookeeper的命名空间
base-sleep-time-milliseconds: 等待重试的时间毫秒数(初始值)
max-sleep-time-milliseconds: 等待重试的最大时间毫秒数
max-retries: 最大重试次数
-->
<reg:zookeeper id="regCenter"
server-lists="127.0.0.1:2181"
namespace="elastic-job"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="3000"
max-retries="3"/>
<!--
配置作业
registry-center-ref: 引用注册中心
cron: 配置触发作业时间表达式
sharding-total-count: 配置分片总数
class: 配置作业类
id: 作业名称
sharding-item-parameters: 分片项对应的参数值, 分片序列号=参数
多个之间用逗号隔开,分片序列号从0开始,不能大于分片总数
-->
<job:simple registry-center-ref="regCenter"
cron="0/10 * * * * ?"
sharding-total-count="4"
class="cn.itcast.ej.job.MyJob"
id="myJob"
sharding-item-parameters="0=北京,1=上海,2=广州,3=深圳"/>
</beans>启动
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MyJobTest {
public static void main(String[] args){
new ClassPathXmlApplicationContext("applicationContext-elasticjob.xml");
}
}三、zookeeper作用
Elastic-Job依赖ZooKeeper完成对执行任务信息的存储(如任务名称、任务参与实例、任务执行策略等);
Elastic-Job依赖ZooKeeper实现选举机制,在任务执行实例数量变化时(如在快速上手中的启动新实例或停止实例),会触发选举机制来决定让哪个实例去执行该任务。
3.1 任务信息保存
Elastic-Job使用ZooKeeper完成对任务信息的存取,任务执行实例作为ZooKeeper客户端对其znode操作,任务信息保存在znode中。 使用ZooInspector查看zookeeper节点
1、zookeeper图像化客户端工具的下载地址: https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip;
2、下载完后解压压缩包,双击地址为ZooInspector\build\zookeeper-dev-ZooInspector.jar的jar包
节点记录了任务的配置信息,包含执行类,cron表达式,分片算法类,分片数量,分片参数。默认状态下,如果 你修改了Job的配置比如cron表达式,分片数量等是不会更新到zookeeper上去的,需要把LiteJobConfiguration的 参数overwrite修改成true,或者删除zk的结点再启动作业重新创建。
instances节点: 同一个Job下的elastic-job的部署实例。一台机器上可以启动多个Job实例,也就是Jar包。instances的命名是 [IP+@-@+PID]。
leader节点: 任务实例的主节点信息,通过zookeeper的主节点选举,选出来的主节点信息。下面的子节点分为 election,sharding和failover三个子节点。分别用于主节点选举,分片和失效转移处理。election下面的instance 节点显式了当前主节点的实例ID:jobInstanceId。latch节点也是一个永久节点用于选举时候的实现分布式锁。 sharding节点下面有一个临时节点necessary,是否需要重新分片的标记,如果分片总数变化或任务实例节点上下 线,以及主节点选举,都会触发设置重分片标记,主节点会进行分片计算。
sharding节点: 任务的分片信息,子节点是分片项序号,从零开始,至分片总数减一。从这个节点可以看出哪个 分片在哪个实例上运行
3.2 任务执行实例选举
Elastic-Job使用ZooKeeper实现任务执行实例选举,若要使用ZooKeeper完成选举,就需要了解ZooKeeper的 znode类型了,ZooKeeper有四种类型的znode,客户端在创建znode时可以指定:
PERSISTENT-持久化目录节点
客户端创建该类型znode,此客户端与ZooKeeper断开连接后该节点依旧存在,如果创建了重复的key,比 如/data,第二次创建会失败。
PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与ZooKeeper断开连接后该节点依旧存在,允许重复创建相同key,Zookeeper给该节点名称进行顺序 编号,如zk会在后面加一串数字比如 /data/data0000000001,如果重复创建,会创建一 个/data/data0000000002节点(一直往后加1)
EPHEMERAL-临时目录节点
客户端与ZooKeeper断开连接后,该节点被删除,不允许重复创建相同key。
EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与ZooKeeper断开连接后,该节点被删除,允许重复创建相同key,依然采取顺序编号机制。
选举过程分析
每个Elastic-Job的任务执行实例作为ZooKeeper的客户端来操作ZooKeeper的znode
1)任意一个实例启动时首先创建一个 /server 的PERSISTENT节点 2)多个实例同时创建 /server/leader EPHEMERAL子节点 3) /server/leader子节点只能创建一个,后创建的会失败。创建成功的实例被选为leader节 点 ,用来执行任务。 4)所有任务实例监听 /server/leader 的变化,一旦节点被删除,就重新进行选举,抢占式地 创建 /server/leader节点,谁创建成功谁就是leader。
四、作业配置
4.1 注册中心配置
| 属性名 | 类型 | 构造器注入 | 缺省值 | 描述 |
|---|---|---|---|---|
| serverLists | String | 是 | 连接Zookeeper服务器的列表 包括IP地址和端口号 多个地址用逗号分隔 如: host1:2181,host2:2181 | |
| namespace | String | 是 | Zookeeper的命名空间 | |
| baseSleepTimeMilliseconds | int | 否 | 1000 | 等待重试的间隔时间的初始值 单位:毫秒 |
| maxSleepTimeMilliseconds | String | 否 | 3000 | 等待重试的事件间隔的最大值 单位:毫秒 |
| maxRetries | String | 否 | 3 | 最大重试次数 |
| sessionTimeoutMilliseconds | boolean | 否 | 60000 | 会话超时时间 单位:毫秒 |
| connectionTimeoutMilliseconds | boolean | 否 | 15000 | 连接超时时间 单位:毫秒 |
| digest | String | 否 | 连接Zookeeper的权限令牌 缺省为不需要权限验证 |
4.2 作业配置
作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。 LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌 套。 JobTypeConfiguration根据不同实现类型分为SimpleJobConfiguration,DataflowJobConfiguration和 ScriptJobConfiguration。
| 属性名 | 类型 | 构造器注入 | 缺省值 | 描述 |
|---|---|---|---|---|
| jobName | String | 是 | 作业名称 | |
| cron | String | 是 | cron表达式,用于控制作业触发时间 | |
| shardingTotalCount | int | 是 | 作业分片总数 | |
| shardingItemParameters | String | 否 | 分片序列号和参数用等号分隔,多个键值对用逗号分隔 分片序列号从0开 始,不可大于或等于作业分片总数 如:0=a,1=b,2=c | |
| jobParameter | String | 否 | 作业自定义参数 作业自定义参数,可通过传递该参数为作业调度的业务方法 传参,用于实现带参数的作业 例:每次获取的数据量、作业实例从数据库读取的主键等 | |
| failover | boolean | 否 | false | 是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机, 允许将该次未完成的任务在另一作业节点上补偿执行 |
| misfifire | boolean | 否 | true | 是否开启错过任务重新执行 |
| description | String | 否 | 作业描述信息 | |
| jobProperties | Enum | 否 | 配置jobProperties定义的枚举控制Elastic-Job的实现细节JOB_EXCEPTION_HANDLER用于扩展异常处理类EXECUTOR_SERVICE_HANDLER用于扩展作业处理线程池类 |
4.3 作业分片
作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或几个分片项。
分片项与业务处理解耦
Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处 理分片项与真实数据的对应关系。
最大限度利用资源
将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项
作业分片策略
AverageAllocationJobShardingStrategy
全路径: com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy
策略说明:
基于平均分配算法的分片策略,也是默认的分片策略。
如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器。如:
如果有3台服务器,分成9片,则每台服务器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
如果有3台服务器,分成8片,则每台服务器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
如果有3台服务器,分成10片,则每台服务器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
OdevitySortByNameJobShardingStrategy
全路径: com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy
策略说明:
根据作业名的哈希值奇偶数决定IP升降序算法的分片策略。
作业名的哈希值为奇数则IP升序。
作业名的哈希值为偶数则IP降序。
用于不同的作业平均分配负载至不同的服务器。
AverageAllocationJobShardingStrategy的缺点是,一旦分片数小于作业服务器数,作业将永远分配至IP地址靠前 的服务器,导致IP地址靠后的服务器空闲。而OdevitySortByNameJobShardingStrategy则可以根据作业名称重新 分配服务器负载。如:
如果有3台服务器,分成2片,作业名称的哈希值为奇数,则每台服务器分到的分片是:1=[0], 2=[1], 3=[]
如果有3台服务器,分成2片,作业名称的哈希值为偶数,则每台服务器分到的分片是:3=[0], 2=[1], 1=[]
RotateServerByNameJobShardingStrategy
全路径: com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy
策略说明:
根据作业名的哈希值对服务器列表进行轮转的分片策略。 配置分片策略 与配置通常的作业属性相同,在spring命名空间或者JobConfiguration中配置jobShardingStrategyClass属性,属 性值是作业分片策略类的全路径。
<!-- 分片策略配置xml方式 -->
<job:simple id="hotelSimpleSpringJob"
class="com.chuanzhi.spiderhotel.job.SpiderJob"
registry‐ center‐ref="regCenter" cron="0/10 * * * * ?"
sharding‐total‐count="4"
sharding‐item‐ parameters="0=A,1=B,2=C,3=D"
monitor‐port="9888"
reconcile‐interval‐minutes="10"
job‐sharding‐strategy‐class="com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy"/>// 分片策略配置java方式
JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass("com.dangdang.ddframe. job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy").build();五、Elastic-Job高级
5.1 Dataflow类型定时任务
Dataflow类型的定时任务需实现DataflowJob接口,该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处 理(processData)数据。咱们继续对例子进行改造。
Dataflow类型用于处理数据流,它和SimpleJob不同,它以数据流的方式执行,调用fetchData抓取数据,直到抓 取不到数据才停止作业。
/**
* 文件备份任务类
* 数据流模式
*/
@Componet
public class FileBackupDataFlowJob implements DataflowJob<FileCustom> {
@Autowired
FileService fileService;
/**
* 每次任务执行要备份的文件数目
*/
private final int FETCH_SIZE = 2;
//抓取数据
@Override
public List<File> fetchData(ShardingContext shardingContext) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//1.获取未备份文件
List<File> unBackupFiles = fileService.fetchUnBackupFiles(shardingContext.getShardingParameter(), FETCH_SIZE);
System.out.println(String.format("Time: %s | 线程 %d | 分片 %s | 已获取文件数据 %d 条",new SimpleDateFormat("HH:mm:ss").format(new Date()) ,Thread.currentThread().getId() ,shardingContext.getShardingParameter() ,unBackupFiles.size()));
return unBackupFiles;
}
//处理数据
@Override
public void processData(ShardingContext shardingContext, List<File> unBackupFiles) {
//1.执行文件备份操作
fileService.backupFiles(unBackupFiles);
}
}@Autowired
FileBackupDataFlowJob fileBackupDataFlowJob;
/**
* 配置数据流处理任务详细信息
* @param jobClass 任务执行类
* @param cron 执行策略
* @param shardingTotalCount 分片数量
* @param shardingItemParameters 分片个性化参数
* @return
*/
private LiteJobConfiguration createFlowJobConfiguration(final Class<? extends ElasticJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
// 定义作业核心配置
JobCoreConfiguration.Builder coreConfigBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
if(!StringUtils.isEmpty(shardingItemParameters)){
coreConfigBuilder.shardingItemParameters(shardingItemParameters);
}
JobCoreConfiguration coreConfig =coreConfigBuilder.build();
// 定义数据流类型任务配置
DataflowJobConfiguration jobConfig = new DataflowJobConfiguration(coreConfig, jobClass.getCanonicalName(),true);
// 定义Lite作业根配置
JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).build();
return (LiteJobConfiguration) simpleJobRootConfig;
}
/**
* 启动数据流任务
* @return
*/
@Bean(initMethod = "init")
public SpringJobScheduler initFlowElasticJob() {
SpringJobScheduler jobScheduler = new SpringJobScheduler(fileBackupDataFlowJob, registryCenter, createFlowJobConfiguration(fileBackupDataFlowJob.getClass(),"0/10 * * * * ?",4,"0=text,1=image,2=radio,3=vedio"));
return jobScheduler;
}5.2 事件追踪
Elastic-Job-Lite在配置中提供了JobEventConfiguration,支持数据库方式配置,会在数据库中自动创建 JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若干索引,来记录作业的相关信息。
@Autowired
private DataSource dataSource;
//数据源已经存在,直接引入
@Bean(initMethod = "init")
public SpringJobScheduler initSimpleElasticJob() {
SimpleJob job1 = new FileBackupJob();
JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);
// 增加 任务事件追踪配置
SpringJobScheduler jobScheduler = new SpringJobScheduler(job1, registryCenter, createJobConfiguration(job1.getClass(),"0/10 * * * * ?",4,"0=text,1=image,2=radio,3=vedio"), jobEventConfig);
return jobScheduler;
}JOB_EXECUTION_LOG记录每次作业的执行历史。分为两个步骤:
- 作业开始执行时向数据库插入数据,除failure_cause和complete_time外的其他字段均不为空。
- 作业完成执行时向数据库更新数据,更新is_success, complete_time和failure_cause(如果作业执行失败)。JOB_STATUS_TRACE_LOG记录作业状态变更痕迹表。可通过每次作业运行的task_id查询作业状态变化的生命周期 和运行轨迹。
5.3 dump命令
使用Elastic-Job-Lite过程中可能会碰到一些问题,导致作业运行不稳定。由于无法在生产环境调试,通过dump命 令可以把作业内部相关信息dump出来,方便开发者debug分析。
开启dump监控端口,并运行程序
修改中ElasticJobConfifig中的createJobConfifiguration方法里JobRootConfifiguration的配置,开启dump监控端口:
javaJobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig) .monitorPort(9888)//设置dump端口 .build();安装netcat(若操作系统中已经有nc命令,此步骤可略过)
执行dump命令
打开命令行工具,进入netcat-win32-1.12.zip的解压目录,执行以下命令:
shecho dump| nc 127.0.0.1 9888 > job_debug_dump.txt
5.4 运维
设计理念
本控制台和Elastic Job并无直接关系,是通过读取Elastic Job的注册中心数据展现作业状态,或更新注册中心 数据修改全局配置。
控制台只能控制作业本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完全分布式的,控制台并不能控制作业服务器。
主要功能
查看作业以及服务器状态
快捷的修改以及删除作业设置
启用和禁用作业
跨注册中心查看作业
查看作业运行轨迹和运行状态
不支持项
- 添加作业。因为作业都是在首次运行时自动添加,使用控制台添加作业并无必要。直接在作业服务器启动包含Elastic Job的作业进程即可
搭建
解压缩 elastic-job-lite-console-${version}.tar.gz 。
进入 bin目录 并执行:
打开浏览器访问 http://localhost:8899/ 即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。
elastic-job-lite-console-${version}.tar.gz 也可通过 elastic-job 源码用 mvn install编译获取。
提供两种账户,管理员及访客,管理员拥有全部操作权限,访客仅拥有察看权限。默认管理员用户名和密码是 root/root,访客用户名和密码是guest/guest,可通过conf\auth.properties修改管理员及访客用户名及密码。
配置及使用
配置注册中心地址,
先启动zookeeper 然后在注册中心配置界面点击添加注册中心。提交后点击连接
连接成功后,在作业维度下可以显示该命名空间下作业名称、分片数量及该作业的cron表达式等信息 ,在服务器维度可以查看服务器ip、当前运行的实例数、作业总数等信息。
配置事件追踪数据源
在事件追踪数据源配置页面点添加按钮,输入相关信息,提交后点击连接即可在作业历史下查看作业历史记录