elastic-job 源码分析
分布式任务调度框架值的好好研究,之前项目中都是直接关联在项目中的用Spring直接调用,这有个坏处在大数据量的时候如果是分布式部署的时候 做了大量重复的劳动,而当当的elastic-job 这套能很好的解决该问题。
实例演示
只需要实现SimpleJob接口,该接口有execute方法,来看看他的接口实现:
1 | public class SpringSimpleJob implements SimpleJob { |
先来看看他的启动流程是怎么样的?笔者在git上下载了SpringBoot的demo,然后启动之,它的启动代码是这样的:
1 | @Bean(initMethod = "init") |
发现它是new一个SpringJobScheduler出来的,这个玩意是啥?发现它是继承自JobScheduler的,具体来看看obScheduler这个类,首先看看它的构造函数:
1 | private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) { |
可以发现构造里面多了SchedulerFacade类和LiteJobFacade这两个类以及JobRegistry这个类,通过查看JobRegistry这个类,发现它实现了单例模式。
接着看看init方法,为什么要看这个方法呢?因为在springboot方法上是调用该方法:
1 | /** |
可以发现,所有的操作都是围绕SchedulerFacade和JobFacade这几个类来操作的。
在schedulerFacade中有一个updateJobConfiguration方法,该方法主要用来更新作业配置
1 | /** |
它首先通过configService类去持久化liteJobConfig任务配置信息。
1 | /** |
首先检查zk下面是否存在config的节点,如果存在则抛出异常,否则则将该节点持久化到zk下面。节点名字也是config。数据通过Gson持久化。以下是我电脑上序列化出来Json字符串。
1 | {"jobName":"com.dangdang.ddframe.job.example.job.dataflow.SpringDataflowJob","jobClass":"com.dangdang.ddframe.job.example.job.dataflow.SpringDataflowJob","jobType":"DATAFLOW","cron":"0/5 * * * * ?","shardingTotalCount":8,"shardingItemParameters":"0\u003dBeijing,1\u003dShanghai,2\u003dGuangzhou","jobParameter":"","failover":false,"misfire":true,"description":"","jobProperties":{"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler","executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"},"streamingProcess":true,"monitorExecution":true,"maxTimeDiffSeconds":-1,"monitorPort":-1,"jobShardingStrategyClass":"","reconcileIntervalMinutes":10,"disabled":false,"overwrite":true} |
持久化完成后还需要从zk中拿出具体的配置:
1 | public LiteJobConfiguration load(boolean fromCache) { |
这里传入的默认为false,则没有走缓存拿,直接从zk中拿取的数据,拿到数据之后通过反序列化将数据转换成LiteJobConfiguration对象。
总结
本文简单的简单的介绍了elastic-job 局部启动流程,对elastic-job有了一个大概的了解,为接下来详细分析每个操作步骤奠定了基础。