Elastic Job Lite 开发指南
老李 Lv4

1. 作业开发

Elastic-Job-LiteElastic-Job-Cloud提供统一作业接口,开发者仅需对业务作业进行一次开发,之后可根据不同的配置以及部署至不同的LiteCloud环境。

Elastic-Job提供SimpleDataflowScript 3种作业类型。
方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

a. Simple类型作业

意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyElasticJob implements SimpleJob {

@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}

b. Dataflow类型作业

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyElasticJob implements DataflowJob<Foo> {

@Override
public List<Foo> fetchData(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
List<Foo> data = // get data from database by sharding item 0
return data;
case 1:
List<Foo> data = // get data from database by sharding item 1
return data;
case 2:
List<Foo> data = // get data from database by sharding item 2
return data;
// case n: ...
}
}

@Override
public void processData(ShardingContext shardingContext, List<Foo> data) {
// process data
// ...
}
}

流式处理

可通过DataflowJobConfiguration配置是否流式处理。

流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去;
非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。
流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

2. 作业配置

Elastic-Job配置分为3个层级,分别是Core, TypeRoot。每个层级使用相似于装饰者模式的方式装配。

Core对应JobCoreConfiguration,用于提供作业核心配置信息,如:作业名称、分片总数、CRON表达式等。

Type对应JobTypeConfiguration,有3个子类分别对应SIMPLE, DATAFLOWSCRIPT类型作业,提供3种作业需要的不同配置,如:DATAFLOW类型是否流式处理或SCRIPT类型的命令行等。

Root对应JobRootConfiguration,有2个子类分别对应LiteCloud部署类型,提供不同部署类型所需的配置,如:Lite类型的是否需要覆盖本地配置或Cloud占用CPUMemory数量等。

a. 使用Java代码配置

通用作业配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());
// 定义Lite作业根配置
JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();

// 定义作业核心配置
JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob", "0/30 * * * * ?", 10).build();
// 定义DATAFLOW类型配置
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, DataflowDemoJob.class.getCanonicalName(), true);
// 定义Lite作业根配置
JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();

// 定义作业核心配置配置
JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder("demoScriptJob", "0/45 * * * * ?", 10).build();
// 定义SCRIPT类型配置
ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "test.sh");
// 定义Lite作业根配置
JobRootConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptCoreConfig).build();

配置项详细说明请参见下文

3. 作业启动

a. Java启动方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class JobDemo {

public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}

private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
regCenter.init();
return regCenter;
}

private static LiteJobConfiguration createJobConfiguration() {
// 创建作业配置
...
}
}

Elastic-Job-Lite配置手册

1. Java Code配置

a. 注册中心配置

用于注册和协调作业分布式行为的组件,目前仅支持Zookeeper

ZookeeperConfiguration属性详细说明

属性名 类型 构造器注入 缺省值 描述
serverLists String 连接Zookeeper服务器的列表
包括IP地址和端口号
多个地址用逗号分隔
如: host1:2181,host2:2181
namespace String Zookeeper的命名空间
baseSleepTimeMilliseconds int 1000 等待重试的间隔时间的初始值
单位:毫秒
maxSleepTimeMilliseconds String 3000 等待重试的间隔时间的最大值
单位:毫秒
maxRetries String 3 最大重试次数
sessionTimeoutMilliseconds boolean 60000 会话超时时间
单位:毫秒
connectionTimeoutMilliseconds boolean 15000 连接超时时间
单位:毫秒
digest String 连接Zookeeper的权限令牌
缺省为不需要权限验证

b. 作业配置

作业配置分为3级,分别是JobCoreConfigurationJobTypeConfigurationLiteJobConfigurationLiteJobConfiguration使用JobTypeConfigurationJobTypeConfiguration使用JobCoreConfiguration,层层嵌套。
JobTypeConfiguration根据不同实现类型分为SimpleJobConfigurationDataflowJobConfigurationScriptJobConfiguration

JobCoreConfiguration属性详细说明

属性名 类型 构造器注入 缺省值 描述
jobName String 作业名称
cron String cron表达式,用于控制作业触发时间
shardingTotalCount int 作业分片总数
shardingItemParameters String 分片序列号和参数用等号分隔,多个键值对用逗号分隔
分片序列号从0开始,不可大于或等于作业分片总数
如:
0=a,1=b,2=c
jobParameter String 作业自定义参数
作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业
例:每次获取的数据量、作业实例从数据库读取的主键等
failover boolean false 是否开启失效转移
monitorExecution开启,失效转移才有效
misfire boolean true 是否开启错过任务重新执行
description String 作业描述信息
jobProperties Enum 配置jobProperties定义的枚举控制Elastic-Job的实现细节
JOB_EXCEPTION_HANDLER用于扩展异常处理类<br />EXECUTOR_SERVICE_HANDLER`用于扩展作业处理线程池类

SimpleJobConfiguration属性详细说明

属性名 类型 构造器注入 缺省值 描述
coreConfig JobCoreConfiguration
jobClass String 作业实现类,需实现ElasticJob接口

DataflowJobConfiguration属性详细说明

属性名 类型 构造器注入 缺省值 描述
coreConfig JobCoreConfiguration
jobClass String 作业实现类,需实现ElasticJob接口
streamingProcess boolean false 是否流式处理数据
如果流式处理数据, 则fetchData不返回空结果将持续执行作业
如果非流式处理数据, 则处理数据完成后作业结束

LiteJobConfiguration属性详细说明

属性名 类型 构造器注入 缺省值 描述
jobConfig JobTypeConfiguration
monitorExecution boolean true 监控作业运行时状态
每次作业执行时间和间隔时间均非常短的情况,建议不监控作业运行时状态以提升效率。因为是瞬时状态,所以无必要监控。请用户自行增加数据堆积监控。并且不能保证数据重复选取,应在作业中实现幂等性。
每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取。
monitorPort int -1 作业监控端口
建议配置作业监控端口, 方便开发者dump作业信息。
使用方法: echo “dump” | nc 127.0.0.1 9888
maxTimeDiffSeconds int -1 最大允许的本机与注册中心的时间误差秒数
如果时间误差超过配置秒数则作业启动时将抛异常
配置为-1表示不校验时间误差
jobShardingStrategyClass String -1 作业分片策略实现类全路径
默认使用平均分配策略
reconcileIntervalMinutes int 10 修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复
单位:分钟
eventTraceRdbDataSource String 作业事件追踪的数据源Bean引用