首页 > 聚焦 > 聚焦 > 正文

elastic-job源码(1)- job自动装配

2023-04-26 19:37:38来源:博客园

版本:3 1 0-SNAPSHOTgit地址:https: github com apache shardingsphere-elasticjobMaven坐标<dependency&


(资料图片)

版本:3.1.0-SNAPSHOTgit地址:https://github.com/apache/shardingsphere-elasticjobMaven 坐标
    org.apache.shardingsphere.elasticjob    elasticjob-lite-spring-boot-starter    ${latest.version}
Spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\  org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration
在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。ElasticJobLiteAutoConfiguration.java
/** * ElasticJob-Lite auto configuration. */@Configuration(proxyBeanMethods = false)@AutoConfigureAfter(DataSourceAutoConfiguration.class)/** * elastic job 开关 * elasticjob.enabled.ture默认为true */@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)/** * 导入 * ElasticJobRegistryCenterConfiguration.class 注册中心配置 * ElasticJobTracingConfiguration.class job事件追踪配置 * ElasticJobSnapshotServiceConfiguration.class 快照配置 */@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})/** * job相关配置信息 */@EnableConfigurationProperties(ElasticJobProperties.class)public class ElasticJobLiteAutoConfiguration {        @Configuration(proxyBeanMethods = false)    /**     * ElasticJobBootstrapConfiguration.class  创建job beans 注入spring容器     * ScheduleJobBootstrapStartupRunner.class  执行类型为ScheduleJobBootstrap.class 的job开始运行     */    @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})    protected static class ElasticJobConfiguration {    }}
Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。自动装配主要做了4件事事1.配置zookeeper 客户端信息,启动连接zookeeper.2.配置事件追踪数据库,用于保存job运行记录3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job. 第一件事:配置zookeeper 客户端信息,启动连接zookeeper.ZookeeperRegistryCenter.class
public void init() {    log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()            //设置zookeeper 服务器地址            .connectString(zkConfig.getServerLists())            //设置重试机制            .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))            //设置命名空间,zookeeper节点名称            .namespace(zkConfig.getNamespace());    //设置session超时时间    if (0 != zkConfig.getSessionTimeoutMilliseconds()) {        builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());    }    //设置连接超时时间    if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {        builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());    }    if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {        builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))                .aclProvider(new ACLProvider() {                                    @Override                    public List getDefaultAcl() {                        return ZooDefs.Ids.CREATOR_ALL_ACL;                    }                                    @Override                    public List getAclForPath(final String path) {                        return ZooDefs.Ids.CREATOR_ALL_ACL;                    }                });    }    client = builder.build();    //zookeeper 客户端开始启动    client.start();    try {        //zookeeper 客户端一直连接        if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {            client.close();            throw new KeeperException.OperationTimeoutException();        }        //CHECKSTYLE:OFF    } catch (final Exception ex) {        //CHECKSTYLE:ON        RegExceptionHandler.handleException(ex);    }}

第二件事:配置事件追踪数据库,用于保存job运行记录

ElasticJobTracingConfiguration.java

/** * Create a bean of tracing DataSource. * * @param tracingProperties tracing Properties * @return tracing DataSource */@Bean("tracingDataSource")//spring中注入bean name 为tracingDataSource的job数据库连接信息public DataSource tracingDataSource(final TracingProperties tracingProperties) {    //获取elastic-job 数据库配置    DataSourceProperties dataSource = tracingProperties.getDataSource();    if (dataSource == null) {        return null;    }    HikariDataSource tracingDataSource = new HikariDataSource();    tracingDataSource.setJdbcUrl(dataSource.getUrl());    BeanUtils.copyProperties(dataSource, tracingDataSource);    return tracingDataSource;}/** * Create a bean of tracing configuration. * * @param dataSource required by constructor * @param tracingDataSource tracing ataSource * @return a bean of tracing configuration */@Bean@ConditionalOnBean(DataSource.class)@ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")public TracingConfiguration tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {    /**     * dataSource 是业务数据库     * tracingDataSource 是job数据库     * 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录     * 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源     */    DataSource ds = tracingDataSource;    if (ds == null) {        ds = dataSource;    }    return new TracingConfiguration<>("RDB", ds);}

通过elasticjob.tracing.type=RDB的配置开启事件追踪功能,这边job的事件追踪数据源可以和业务数据源配置不一样。

第三件事:解析所有job配置文件

ElasticJobBootstrapConfiguration.class

public void createJobBootstrapBeans() {    //获取job配置    ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);    //获取单利注册对象    SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();    //获取注入zookeeper 客户端    CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);    //获取job事件追踪    TracingConfiguration tracingConfig = getTracingConfiguration();    //构造JobBootstraps    constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);}

重要的是constructJobBootstraps 这个方法,来看下

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,                                    final CoordinatorRegistryCenter registryCenter, final TracingConfiguration tracingConfig) {    //遍历配置的每一个job    for (Map.Entry entry : elasticJobProperties.getJobs().entrySet()) {        ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();        //校验 job class 和 type 都为空抛异常        Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()                        || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),                "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");        //校验 job class 和 type 都有 报相互排斥        Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()                        || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),                "[elasticJobClass] and [elasticJobType] are mutually exclusive.");        if (null != jobConfigurationProperties.getElasticJobClass()) {            //通过class 注入job            registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);        } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {            //通过type 注入job            registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);        }    }}
Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入 再来看看registerClassedJob 方法里的内容
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,                                final TracingConfiguration tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {    //获取job配置    JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);    //配置job事件追踪    jobExtraConfigurations(jobConfig, tracingConfig);    //获取job类型    ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());    //没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务    if (Strings.isNullOrEmpty(jobConfig.getCron())) {        Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");        singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));    } else {        //有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务        //设置bean name        String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";        //注入ScheduleJobBootstrap对象为单利对象        singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));    }}
Class 类型注入的job有两种类型1.ScheduleJobBootstrap:定时任务类型的job。2.OneOffJobBootstrap:一定次job类型。 看下定义的newScheduleJobBootstrap 方法
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {    Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");    this.regCenter = regCenter;    //获取job监听器    Collection jobListeners = getElasticJobListeners(jobConfig);    // 集成所有操作zookeeper 节点的services,job 监听器    setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);    //获取当前job名称    String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);    //zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息    this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);    // 集成所有操作zookeeper 节点的services    schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());    jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));    //检验job配置    validateJobProperties();    //定义job执行器    jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);    //监听器里注入GuaranteeService    setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);    //创建定时任务,开始执行    jobScheduleController = createJobScheduleController();}

看下createJobScheduleController

private JobScheduleController createJobScheduleController() {    JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());    //注册job    JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);    //注册器开始运行    registerStartUpInfo();    return result;}

看下registerStartUpInfo方法

public void registerStartUpInfo(final boolean enabled) {    //开始所有的监听器    listenerManager.startAllListeners();    //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器    leaderService.electLeader();    //{namespace}/{ipservers} 设置enable处理    serverService.persistOnline(enabled);    //临时节点   /{namespave}/instances 放置运行服务实例信息    instanceService.persistOnline();    //开启一个异步服务    if (!reconcileService.isRunning()) {        reconcileService.startAsync();    }}
这里实行的操作:1.开启所有监听器处理2.leader选举3.持久化节点数据4.开启异步服务 第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.
@Overridepublic void run(final String... args) {    log.info("Starting ElasticJob Bootstrap.");    applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);    log.info("ElasticJob Bootstrap started.");}
获取到所有的定时任务job(ScheduleJobBootstrap类型),执行schedule方法,底层实际使用quartz框架运行定时任务。

关键词:

责任编辑:hnmd004

最新资讯