Skip to content
标签
定时任务
字数
6747 字
阅读时间
27 分钟

一、概述

使用quartz或spring-task时,会有两个痛点:

  • 不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误。
  • quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。

Elastic-job分布式任务调度可以解决上述两个痛点。

Elastic-Job是当当开源的一款非常好用的分布式任务调度框架,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。Elastic-Job-Lite这也是本次所要讲解和使用的子项目。使用zookeeper作为注册中心

主要功能

  • 分布式调度协调

    在分布式环境中,任务能够按指定的调度策略执行,并且能够避免同一任务多实例重复执行。

  • 丰富调度策略

    基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。

  • 弹性扩容缩容

    当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例时,它所执行的任务能被转移到别的实例来执行。

  • 失效转移

    某实例在任务执行失败后,会被转移到其他实例执行。

  • 错过执行作业重触发

    若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业完成后自动触发。

  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例

  • 自诊断并修复分布式不稳定造成的问题

  • 支持并行调度

    支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。

  • 支持作业生命周期操作

    可以动态对任务进行开启及停止操作。

  • 丰富的作业类型

    支持Simple、DataFlow、Script三种作业类型,后续会有详细介绍。

  • Spring整合以及命名空间提供

    对Spring支持良好的整合方式,支持spring自定义命名空间,支持占位符。

  • 运维平台

    提供运维界面,可以管理作业和注册中心。

官网**:**http://elasticjob.io

1.1 架构

uTools_1656506212803

App:应用程序,内部包含任务执行业务逻辑和Elastic-Job-Lite组件,其中执行任务需要实现ElasticJob接口完成 与Elastic-Job-Lite组件的集成,并进行任务的相关配置。应用程序可启动多个实例,也就出现了多个任务执行实 例。

Elastic-Job-Lite:Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服 务,此组件负责任务的调度,并产生日志及任务调度记录。

无中心化,是指没有调度中心这一概念,每个运行在集群中的作业服务器都是对等的,各个作业节点是自治的、平 等的、节点之间通过注册中心进行分布式协调。

Registry:以Zookeeper作为Elastic-Job的注册中心组件,存储了执行任务的相关信息。同时,Elastic-Job利用该 组件进行执行任务实例的选举。

Console:Elastic-Job提供了运维平台,它通过读取Zookeeper数据展现任务执行状态,或更新Zookeeper数据修 改全局配置。通过Elastic-Job-Lite组件产生的数据来查看任务执行历史记录。

应用程序在启动时,在其内嵌的Elastic-Job-Lite组件会向Zookeeper注册该实例的信息,并触发选举(此时可能已 经启动了该应用程序的其他实例),从众多实例中选举出一个Leader,让其执行任务。当到达任务执行时间时, Elastic-Job-Lite组件会调用由应用程序实现的任务业务逻辑,任务执行后会产生任务执行记录。当应用程序的某一 个实例宕机时,Zookeeper组件会感知到并重新触发leader选举。

1.2 相关概念

分片概念

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。

分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

个性化参数的适用场景

个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

分布式调度

Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

作业高可用

Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。

一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

最大限度利用资源

Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。

例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

1.3 作业类型

Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

Simple类型作业

意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

Dataflow类型作业

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

Script类型作业

意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

二、集成Demo

2.1 与springboot集成

依赖

xml
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic‐job‐lite‐spring</artifactId>
    <version>2.1.5</version> 
</dependency>

配置

zookeeper配置

java
@Configuration 
public class ElasticJobRegistryCenterConfig { 
    private String registryServerList = "localhost:2181"; 
    private String registryNamespace = "elastic‐job‐example‐springboot"; 
    /**
    * 配置Zookeeper * @return 
    */ 
    @Bean(initMethod = "init") 
    public CoordinatorRegistryCenter createRegistryCenter() { 
        ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(registryServerList, registryNamespace);
        zkConfig.setSessionTimeoutMilliseconds(100);
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
        return regCenter;
    }
}

Job配置

java
@Configuration 
public class ElasticJobConfig { 
    @Autowired 
    private CoordinatorRegistryCenter registryCenter;
    @Autowired 
    private FileBackupJob fileBackupJob; 
    /**
    * 配置任务详细信息 
    * @param jobClass 任务执行类 
    * @param cron 执行策略 
    * @param shardingTotalCount 分片数量 
    * @param shardingItemParameters 分片个性化参数 
    * @return 
    */ 
    private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        // 定义作业核心配置 
        JobCoreConfiguration.Builder simpleCoreConfigBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount); 
        if(!StringUtils.isEmpty(shardingItemParameters)){
            simpleCoreConfigBuilder.shardingItemParameters(shardingItemParameters); 
        }
        JobCoreConfiguration jobCoreConfiguration =simpleCoreConfigBuilder.build(); 
        // 定义SIMPLE类型配置 
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
        // 定义Lite作业根配置
        JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
        return (LiteJobConfiguration) simpleJobRootConfig;
    }
    /**
    * 任务启动 
    * @return 
    */ 
    @Bean(initMethod = "init") 
    public SpringJobScheduler initSimpleElasticJob() {
        SimpleJob job1 = fileBackupJob;
        SpringJobScheduler jobScheduler = new SpringJobScheduler(job1, registryCenter, createJobConfiguration(job1.getClass(),"0/3 * * * * ?",1,null));
        return jobScheduler;
    } 
}

2.2 与spring整合

依赖

xml
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>

作业

java
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
/** 作业类 */
public class MyJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        // 获取分片总数
        int shardingTotalCount = context.getShardingTotalCount();
        // 获取分片项
        int shardingItem = context.getShardingItem();
        // 获取分片参数
        String shardingParameter = context.getShardingParameter();
        System.out.println("作业名称:" + context.getJobName());
        System.out.println("分片总数:" + shardingTotalCount);
        System.out.println("分片项:" + shardingItem);
        System.out.println("分片参数:" + shardingParameter);
        // 对不同的分片项进行处理
        switch (shardingItem){
            case 0:
                System.out.println("第1个分片参数:" + shardingParameter);
                break;
            case 1:
                System.out.println("第2个分片参数:" + shardingParameter);
                break;
            case 2:
                System.out.println("第3个分片参数:" + shardingParameter);
                break;
            case 3:
                System.out.println("第4个分片参数:" + shardingParameter);
                break;
            default:
                System.out.println("==default==");
        }
        System.out.println("============================");
    }
}

配置

xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.dangdang.com/schema/ddframe/reg
       http://www.dangdang.com/schema/ddframe/reg/reg.xsd
       http://www.dangdang.com/schema/ddframe/job
       http://www.dangdang.com/schema/ddframe/job/job.xsd">
    <!-- 
        配置作业注册中心
        id: bean的id
        server-lists: 连接zookeeper服务器地址(host1:port,host2:port)
        namespace: zookeeper的命名空间
        base-sleep-time-milliseconds: 等待重试的时间毫秒数(初始值)
        max-sleep-time-milliseconds: 等待重试的最大时间毫秒数
        max-retries: 最大重试次数
    -->
    <reg:zookeeper id="regCenter"
                   server-lists="127.0.0.1:2181"
                   namespace="elastic-job"
                   base-sleep-time-milliseconds="1000"
                   max-sleep-time-milliseconds="3000"
                   max-retries="3"/>
    <!-- 
        配置作业
        registry-center-ref: 引用注册中心
        cron: 配置触发作业时间表达式
        sharding-total-count: 配置分片总数
        class: 配置作业类
        id: 作业名称
        sharding-item-parameters: 分片项对应的参数值, 分片序列号=参数
        多个之间用逗号隔开,分片序列号从0开始,不能大于分片总数
    -->
    <job:simple registry-center-ref="regCenter"
                cron="0/10 * * * * ?"
                sharding-total-count="4"
                class="cn.itcast.ej.job.MyJob"
                id="myJob"
                sharding-item-parameters="0=北京,1=上海,2=广州,3=深圳"/>
</beans>

启动

java
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MyJobTest {
    public static void main(String[] args){
       new ClassPathXmlApplicationContext("applicationContext-elasticjob.xml");
    }
}

三、zookeeper作用

Elastic-Job依赖ZooKeeper完成对执行任务信息的存储(如任务名称、任务参与实例、任务执行策略等);

Elastic-Job依赖ZooKeeper实现选举机制,在任务执行实例数量变化时(如在快速上手中的启动新实例或停止实例),会触发选举机制来决定让哪个实例去执行该任务。

3.1 任务信息保存

Elastic-Job使用ZooKeeper完成对任务信息的存取,任务执行实例作为ZooKeeper客户端对其znode操作,任务信息保存在znode中。 使用ZooInspector查看zookeeper节点

1、zookeeper图像化客户端工具的下载地址: https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip;

2、下载完后解压压缩包,双击地址为ZooInspector\build\zookeeper-dev-ZooInspector.jar的jar包

节点记录了任务的配置信息,包含执行类,cron表达式,分片算法类,分片数量,分片参数。默认状态下,如果 你修改了Job的配置比如cron表达式,分片数量等是不会更新到zookeeper上去的,需要把LiteJobConfiguration的 参数overwrite修改成true,或者删除zk的结点再启动作业重新创建。

instances节点: 同一个Job下的elastic-job的部署实例。一台机器上可以启动多个Job实例,也就是Jar包。instances的命名是 [IP+@-@+PID]。

leader节点: 任务实例的主节点信息,通过zookeeper的主节点选举,选出来的主节点信息。下面的子节点分为 election,sharding和failover三个子节点。分别用于主节点选举,分片和失效转移处理。election下面的instance 节点显式了当前主节点的实例ID:jobInstanceId。latch节点也是一个永久节点用于选举时候的实现分布式锁。 sharding节点下面有一个临时节点necessary,是否需要重新分片的标记,如果分片总数变化或任务实例节点上下 线,以及主节点选举,都会触发设置重分片标记,主节点会进行分片计算。

sharding节点: 任务的分片信息,子节点是分片项序号,从零开始,至分片总数减一。从这个节点可以看出哪个 分片在哪个实例上运行

3.2 任务执行实例选举

Elastic-Job使用ZooKeeper实现任务执行实例选举,若要使用ZooKeeper完成选举,就需要了解ZooKeeper的 znode类型了,ZooKeeper有四种类型的znode,客户端在创建znode时可以指定:

  • PERSISTENT-持久化目录节点

    客户端创建该类型znode,此客户端与ZooKeeper断开连接后该节点依旧存在,如果创建了重复的key,比 如/data,第二次创建会失败。

  • PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点

    客户端与ZooKeeper断开连接后该节点依旧存在,允许重复创建相同key,Zookeeper给该节点名称进行顺序 编号,如zk会在后面加一串数字比如 /data/data0000000001,如果重复创建,会创建一 个/data/data0000000002节点(一直往后加1)

  • EPHEMERAL-临时目录节点

    客户端与ZooKeeper断开连接后,该节点被删除,不允许重复创建相同key。

  • EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点

    客户端与ZooKeeper断开连接后,该节点被删除,允许重复创建相同key,依然采取顺序编号机制。

选举过程分析

每个Elastic-Job的任务执行实例作为ZooKeeper的客户端来操作ZooKeeper的znode

1)任意一个实例启动时首先创建一个 /server 的PERSISTENT节点 2)多个实例同时创建 /server/leader EPHEMERAL子节点 3) /server/leader子节点只能创建一个,后创建的会失败。创建成功的实例被选为leader节 点 ,用来执行任务。 4)所有任务实例监听 /server/leader 的变化,一旦节点被删除,就重新进行选举,抢占式地 创建 /server/leader节点,谁创建成功谁就是leader。

四、作业配置

4.1 注册中心配置

属性名类型构造器注入缺省值描述
serverListsString连接Zookeeper服务器的列表 包括IP地址和端口号 多个地址用逗号分隔 如: host1:2181,host2:2181
namespaceStringZookeeper的命名空间
baseSleepTimeMillisecondsint1000等待重试的间隔时间的初始值 单位:毫秒
maxSleepTimeMillisecondsString3000等待重试的事件间隔的最大值 单位:毫秒
maxRetriesString3最大重试次数
sessionTimeoutMillisecondsboolean60000会话超时时间 单位:毫秒
connectionTimeoutMillisecondsboolean15000连接超时时间 单位:毫秒
digestString连接Zookeeper的权限令牌 缺省为不需要权限验证

4.2 作业配置

作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。 LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌 套。 JobTypeConfiguration根据不同实现类型分为SimpleJobConfiguration,DataflowJobConfiguration和 ScriptJobConfiguration。

属性名类型构造器注入缺省值描述
jobNameString作业名称
cronStringcron表达式,用于控制作业触发时间
shardingTotalCountint作业分片总数
shardingItemParametersString分片序列号和参数用等号分隔,多个键值对用逗号分隔 分片序列号从0开 始,不可大于或等于作业分片总数 如:0=a,1=b,2=c
jobParameterString作业自定义参数 作业自定义参数,可通过传递该参数为作业调度的业务方法 传参,用于实现带参数的作业 例:每次获取的数据量、作业实例从数据库读取的主键等
failoverbooleanfalse是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机, 允许将该次未完成的任务在另一作业节点上补偿执行
misfifirebooleantrue是否开启错过任务重新执行
descriptionString作业描述信息
jobPropertiesEnum配置jobProperties定义的枚举控制Elastic-Job的实现细节JOB_EXCEPTION_HANDLER用于扩展异常处理类EXECUTOR_SERVICE_HANDLER用于扩展作业处理线程池类

4.3 作业分片

作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或几个分片项。

  • 分片项与业务处理解耦

    Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处 理分片项与真实数据的对应关系。

  • 最大限度利用资源

    将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项

作业分片策略

  • AverageAllocationJobShardingStrategy

    全路径: com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy

    策略说明:

    基于平均分配算法的分片策略,也是默认的分片策略。

    如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器。如:

    如果有3台服务器,分成9片,则每台服务器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]

    如果有3台服务器,分成8片,则每台服务器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]

    如果有3台服务器,分成10片,则每台服务器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]

  • OdevitySortByNameJobShardingStrategy

    全路径: com.dangdang.ddframe.job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy

    策略说明:

    根据作业名的哈希值奇偶数决定IP升降序算法的分片策略。

    作业名的哈希值为奇数则IP升序。

    作业名的哈希值为偶数则IP降序。

    用于不同的作业平均分配负载至不同的服务器。

    AverageAllocationJobShardingStrategy的缺点是,一旦分片数小于作业服务器数,作业将永远分配至IP地址靠前 的服务器,导致IP地址靠后的服务器空闲。而OdevitySortByNameJobShardingStrategy则可以根据作业名称重新 分配服务器负载。如:

    如果有3台服务器,分成2片,作业名称的哈希值为奇数,则每台服务器分到的分片是:1=[0], 2=[1], 3=[]

    如果有3台服务器,分成2片,作业名称的哈希值为偶数,则每台服务器分到的分片是:3=[0], 2=[1], 1=[]

  • RotateServerByNameJobShardingStrategy

    全路径: com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy

    策略说明:

    根据作业名的哈希值对服务器列表进行轮转的分片策略。 配置分片策略 与配置通常的作业属性相同,在spring命名空间或者JobConfiguration中配置jobShardingStrategyClass属性,属 性值是作业分片策略类的全路径。

xml
<!-- 分片策略配置xml方式 -->
<job:simple id="hotelSimpleSpringJob"
            class="com.chuanzhi.spiderhotel.job.SpiderJob" 
            registry‐ center‐ref="regCenter" cron="0/10 * * * * ?" 
            sharding‐total‐count="4" 
            sharding‐item‐ parameters="0=A,1=B,2=C,3=D" 
            monitor‐port="9888"
            reconcile‐interval‐minutes="10" 
            job‐sharding‐strategy‐class="com.dangdang.ddframe.job.lite.api.strategy.impl.RotateServerByNameJobShardingStrategy"/>
java
// 分片策略配置java方式
JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass("com.dangdang.ddframe. job.lite.api.strategy.impl.OdevitySortByNameJobShardingStrategy").build();

五、Elastic-Job高级

5.1 Dataflow类型定时任务

Dataflow类型的定时任务需实现DataflowJob接口,该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处 理(processData)数据。咱们继续对例子进行改造。

Dataflow类型用于处理数据流,它和SimpleJob不同,它以数据流的方式执行,调用fetchData抓取数据,直到抓 取不到数据才停止作业。

java
/**
* 文件备份任务类 
* 数据流模式 
*/ 
@Componet 
public class FileBackupDataFlowJob implements DataflowJob<FileCustom> {
    @Autowired
    FileService fileService;
    /**
    * 每次任务执行要备份的文件数目 
    */ 
    private final int FETCH_SIZE = 2; 
    //抓取数据 
    @Override 
    public List<File> fetchData(ShardingContext shardingContext) {
        try {
            Thread.sleep(1000); 
        } catch (InterruptedException e) {
            e.printStackTrace(); 
        }
        //1.获取未备份文件 
        List<File> unBackupFiles = fileService.fetchUnBackupFiles(shardingContext.getShardingParameter(), FETCH_SIZE);
        System.out.println(String.format("Time: %s | 线程 %d | 分片 %s | 已获取文件数据 %d 条",new SimpleDateFormat("HH:mm:ss").format(new Date()) ,Thread.currentThread().getId() ,shardingContext.getShardingParameter() ,unBackupFiles.size()));
        return unBackupFiles;
    }
    //处理数据 
    @Override 
    public void processData(ShardingContext shardingContext, List<File> unBackupFiles) { 
        //1.执行文件备份操作 
        fileService.backupFiles(unBackupFiles);
    }
}
java
@Autowired 
FileBackupDataFlowJob fileBackupDataFlowJob; 
/** 
* 配置数据流处理任务详细信息 
* @param jobClass 任务执行类 
* @param cron 执行策略 
* @param shardingTotalCount 分片数量 
* @param shardingItemParameters 分片个性化参数 
* @return 
*/ 
private LiteJobConfiguration createFlowJobConfiguration(final Class<? extends ElasticJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
    // 定义作业核心配置 
    JobCoreConfiguration.Builder coreConfigBuilder = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount);
    if(!StringUtils.isEmpty(shardingItemParameters)){ 
        coreConfigBuilder.shardingItemParameters(shardingItemParameters);
    }
    JobCoreConfiguration coreConfig =coreConfigBuilder.build();
    // 定义数据流类型任务配置 
    DataflowJobConfiguration jobConfig = new DataflowJobConfiguration(coreConfig, jobClass.getCanonicalName(),true); 
    // 定义Lite作业根配置 
    JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).build(); 
    return (LiteJobConfiguration) simpleJobRootConfig; 
}

/**
* 启动数据流任务 
* @return 
*/ 
@Bean(initMethod = "init") 
public SpringJobScheduler initFlowElasticJob() {
    SpringJobScheduler jobScheduler = new SpringJobScheduler(fileBackupDataFlowJob, registryCenter, createFlowJobConfiguration(fileBackupDataFlowJob.getClass(),"0/10 * * * * ?",4,"0=text,1=image,2=radio,3=vedio"));
    return jobScheduler; 
}

5.2 事件追踪

Elastic-Job-Lite在配置中提供了JobEventConfiguration,支持数据库方式配置,会在数据库中自动创建 JOB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表以及若干索引,来记录作业的相关信息。

java
@Autowired 
private DataSource dataSource; 
//数据源已经存在,直接引入 
@Bean(initMethod = "init")
public SpringJobScheduler initSimpleElasticJob() { 
    SimpleJob job1 = new FileBackupJob(); 
    JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);
    // 增加 任务事件追踪配置 
    SpringJobScheduler jobScheduler = new SpringJobScheduler(job1, registryCenter, createJobConfiguration(job1.getClass(),"0/10 * * * * ?",4,"0=text,1=image,2=radio,3=vedio"), jobEventConfig); 
    return jobScheduler;
}

JOB_EXECUTION_LOG记录每次作业的执行历史。分为两个步骤:

  1. 作业开始执行时向数据库插入数据,除failure_cause和complete_time外的其他字段均不为空。
  2. 作业完成执行时向数据库更新数据,更新is_success, complete_time和failure_cause(如果作业执行失败)。JOB_STATUS_TRACE_LOG记录作业状态变更痕迹表。可通过每次作业运行的task_id查询作业状态变化的生命周期 和运行轨迹。

5.3 dump命令

使用Elastic-Job-Lite过程中可能会碰到一些问题,导致作业运行不稳定。由于无法在生产环境调试,通过dump命 令可以把作业内部相关信息dump出来,方便开发者debug分析。

  1. 开启dump监控端口,并运行程序

    修改中ElasticJobConfifig中的createJobConfifiguration方法里JobRootConfifiguration的配置,开启dump监控端口:

    java
    JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig) .monitorPort(9888)//设置dump端口
    .build();
  2. 安装netcat(若操作系统中已经有nc命令,此步骤可略过)

  3. 执行dump命令

    打开命令行工具,进入netcat-win32-1.12.zip的解压目录,执行以下命令:

    sh
    echo dump| nc 127.0.0.1 9888 > job_debug_dump.txt

5.4 运维

设计理念

  1. 本控制台和Elastic Job并无直接关系,是通过读取Elastic Job的注册中心数据展现作业状态,或更新注册中心 数据修改全局配置。

  2. 控制台只能控制作业本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完全分布式的,控制台并不能控制作业服务器。

主要功能

  1. 查看作业以及服务器状态

  2. 快捷的修改以及删除作业设置

  3. 启用和禁用作业

  4. 跨注册中心查看作业

  5. 查看作业运行轨迹和运行状态

不支持项

  1. 添加作业。因为作业都是在首次运行时自动添加,使用控制台添加作业并无必要。直接在作业服务器启动包含Elastic Job的作业进程即可

搭建

下载地址

解压缩 elastic-job-lite-console-${version}.tar.gz 。

进入 bin目录 并执行:

打开浏览器访问 http://localhost:8899/ 即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。

elastic-job-lite-console-${version}.tar.gz 也可通过 elastic-job 源码用 mvn install编译获取。

提供两种账户,管理员及访客,管理员拥有全部操作权限,访客仅拥有察看权限。默认管理员用户名和密码是 root/root,访客用户名和密码是guest/guest,可通过conf\auth.properties修改管理员及访客用户名及密码。

配置及使用

  • 配置注册中心地址,

    先启动zookeeper 然后在注册中心配置界面点击添加注册中心。提交后点击连接

    连接成功后,在作业维度下可以显示该命名空间下作业名称、分片数量及该作业的cron表达式等信息 ,在服务器维度可以查看服务器ip、当前运行的实例数、作业总数等信息。

  • 配置事件追踪数据源

    在事件追踪数据源配置页面点添加按钮,输入相关信息,提交后点击连接即可在作业历史下查看作业历史记录