服务热线
135-6963-3175
主要涉及类:一系列JobHandler用于job的处理、JobManager用于job的入库调度激活挂起等操作、
asyncExecutor用于job的异步执行(通过线程池实现、和3个死循环的线程用于扫描job表操作调度job)
首先查看引擎配置初始化类ProcessEngineConfigurationImpl的init方法:
其中调用了 //------------作业处理器相关--------------- initJobHandlers();//作业处理器集合(异步任务、定时触发任务、开始事件、定时挂起激活流程事件等、自定义等处理器) initJobManager();//job任务管理器(用于创建或调度各种job的操作) initAsyncExecutor();//异步执行器(基于ThreadPoolExecutor线程池和BlockingQueue阻塞队列)
第一个initJobHandlers(),用于各种任务处理器放入一个map中,方便使用时候通过key类型去获取。
第二个initJobManager(),进行JobManager的初始化,用于异步job、timerJob的创建和调度、挂起等操作。
异步job通过异步执行器去执行。
第三个initAsyncExecutor(),初始化异步执行器(通过线程池实现)
public void initAsyncExecutor() { if (asyncExecutor == null) { DefaultAsyncJobExecutor defaultAsyncExecutor = new DefaultAsyncJobExecutor(); // Message queue mode 队列模式 默认false defaultAsyncExecutor.setMessageQueueMode(asyncExecutorMessageQueueMode); // Thread pool config defaultAsyncExecutor.setCorePoolSize(asyncExecutorCorePoolSize);//核心线程数 defaultAsyncExecutor.setMaxPoolSize(asyncExecutorMaxPoolSize);//最大线程数 defaultAsyncExecutor.setKeepAliveTime(asyncExecutorThreadKeepAliveTime);//保持存活时间 // Threadpool queue if (asyncExecutorThreadPoolQueue != null) { defaultAsyncExecutor.setThreadPoolQueue(asyncExecutorThreadPoolQueue);//阻塞队列 } defaultAsyncExecutor.setQueueSize(asyncExecutorThreadPoolQueueSize);//阻塞队列大小 // Acquisition wait time defaultAsyncExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(asyncExecutorDefaultTimerJobAcquireWaitTime);//设置作业计时器查询间隔默认的等待时间10s defaultAsyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(asyncExecutorDefaultAsyncJobAcquireWaitTime);//异步作业查询间隔默认等待时间,默认10s // Queue full wait time defaultAsyncExecutor.setDefaultQueueSizeFullWaitTimeInMillis(asyncExecutorDefaultQueueSizeFullWaitTime);//队列已满时的等待时间默认0s // Job locking defaultAsyncExecutor.setTimerLockTimeInMillis(asyncExecutorTimerLockTimeInMillis);//定时执行器锁时间,默认5分钟 defaultAsyncExecutor.setAsyncJobLockTimeInMillis(asyncExecutorAsyncJobLockTimeInMillis);//异步作业执行器锁时间,默认5分钟 if (asyncExecutorLockOwner != null) {//执行器锁所有者 defaultAsyncExecutor.setLockOwner(asyncExecutorLockOwner); } // Reset expired defaultAsyncExecutor.setResetExpiredJobsInterval(asyncExecutorResetExpiredJobsInterval);//重制过期任务间隔 defaultAsyncExecutor.setResetExpiredJobsPageSize(asyncExecutorResetExpiredJobsPageSize);//重置过期任务查询条数 // Shutdown defaultAsyncExecutor.setSecondsToWaitOnShutdown(asyncExecutorSecondsToWaitOnShutdown);//关闭前等待时间 asyncExecutor = defaultAsyncExecutor; } asyncExecutor.setProcessEngineConfiguration(this); asyncExecutor.setAutoActivate(asyncExecutorActivate);//是否自动激活,默认true }
且在ProcessEngineImpl的构造方法里进行了asyncExecutor线程的启动:
public ProcessEngineImpl(ProcessEngineConfigurationImpl processEngineConfiguration) { //服务及配置注入 this.processEngineConfiguration = processEngineConfiguration; this.name = processEngineConfiguration.getProcessEngineName();//默认为:default this.repositoryService = processEngineConfiguration.getRepositoryService(); this.runtimeService = processEngineConfiguration.getRuntimeService(); this.historicDataService = processEngineConfiguration.getHistoryService(); this.identityService = processEngineConfiguration.getIdentityService(); this.taskService = processEngineConfiguration.getTaskService(); this.formService = processEngineConfiguration.getFormService(); this.managementService = processEngineConfiguration.getManagementService(); this.dynamicBpmnService = processEngineConfiguration.getDynamicBpmnService(); this.asyncExecutor = processEngineConfiguration.getAsyncExecutor(); this.commandExecutor = processEngineConfiguration.getCommandExecutor(); this.sessionFactories = processEngineConfiguration.getSessionFactories(); this.transactionContextFactory = processEngineConfiguration.getTransactionContextFactory(); this.formEngineRepositoryService = processEngineConfiguration.getFormEngineRepositoryService(); this.formEngineFormService = processEngineConfiguration.getFormEngineFormService(); if (processEngineConfiguration.isUsingRelationalDatabase() && processEngineConfiguration.getDatabaseSchemaUpdate() != null) { //数据库脚本检查,根据配置的schema策略 commandExecutor.execute(processEngineConfiguration.getSchemaCommandConfig(), new SchemaOperationsProcessEngineBuild()); } if (name == null) { log.info("default activiti ProcessEngine created"); } else { log.info("ProcessEngine {} created", name); } ProcessEngines.registerProcessEngine(this);//放入引擎缓存map(name:engine实例) if (asyncExecutor != null && asyncExecutor.isAutoActivate()) {//异步执行器状态激活,默认true asyncExecutor.start();//启动各种线程池和线程 } if (processEngineConfiguration.getProcessEngineLifecycleListener() != null) {//引擎被创建监听器触发 processEngineConfiguration.getProcessEngineLifecycleListener().onProcessEngineBuilt(this); } //更新缓存:实例表覆盖原始实例表bpmnModel缓存 initModel(managementService); processEngineConfiguration.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createGlobalEvent(ActivitiEventType.ENGINE_CREATED)); }
接下来我们查看DefaultAsyncJobExecutor的start及相关方法源码:
public boolean executeAsyncJob(final Job job) { if (isMessageQueueMode) {//使用基于消息队列的作业执行器运行时,这里不执行作业。 // When running with a message queue based job executor, // the job is not executed here. return true; } Runnable runnable = null; if (isActive) { //基于job创建线程 runnable = createRunnableForJob(job); try { executorService.execute(runnable);//放入线程池执行 } catch (RejectedExecutionException e) { ...... } } public void start() { if (isActive) {//默认false未激活 return; } log.info("Starting up the default async job executor [{}].", getClass().getName()); if (timerJobRunnable == null) {//timerJob线程(负责查询最新(<=now)一条的timerJob表数据并放入job表(所有线程都起来后也就是isActive=true后会涉及加锁set锁过期时间操作),删除timerJob表) timerJobRunnable = new AcquireTimerJobsRunnable(this, processEngineConfiguration.getJobManager()); } if (resetExpiredJobsRunnable == null) {//new 重置过期job线程(查持有锁时间过期的job,并进行锁撤销: // 6版本:删除旧的job,插入新的job(新的不设置lock过期时间和id,lock持有者等);5版本:直接更新) resetExpiredJobsRunnable = new ResetExpiredJobsRunnable(this); } if (!isMessageQueueMode && asyncJobsDueRunnable == null) { asyncJobsDueRunnable = new AcquireAsyncJobsDueRunnable(this); } if (!isMessageQueueMode) {//非消息队列模式 initAsyncJobExecutionThreadPool();//初始化异步线程池 startJobAcquisitionThread();//启动异步可执行作业线程(开始真正的执行) } startTimerAcquisitionThread();//启动timerJob作业获取线程(作业搬运到job) startResetExpiredJobsThread();//启动重置过期job线程 isActive = true;//设置为异步作业执行器为已激活 executeTemporaryJobs();//执行临时作业 } //初始化一个线程池 protected void initAsyncJobExecutionThreadPool() { if (threadPoolQueue == null) { log.info("Creating thread pool queue of size {}", queueSize); threadPoolQueue = new ArrayBlockingQueue<Runnable>(queueSize); } if (executorService == null) { log.info("Creating executor service with corePoolSize {}, maxPoolSize {} and keepAliveTime {}", corePoolSize, maxPoolSize, keepAliveTime); BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("activiti-async-job-executor-thread-%d").build(); executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, threadPoolQueue, threadFactory); } } //启动异步线程池 protected void startJobAcquisitionThread() { if (asyncJobAcquisitionThread == null) { asyncJobAcquisitionThread = new Thread(asyncJobsDueRunnable); } asyncJobAcquisitionThread.start(); } //启动timerJob线程 protected void startTimerAcquisitionThread() { if (timerJobAcquisitionThread == null) { timerJobAcquisitionThread = new Thread(timerJobRunnable); } timerJobAcquisitionThread.start(); } //启动重置过期job线程 protected void startResetExpiredJobsThread() { if (resetExpiredJobThread == null) { resetExpiredJobThread = new Thread(resetExpiredJobsRunnable); } resetExpiredJobThread.start(); }
该类主要职责:
1、进行了线程池的创建和启动、线程放入线程池执行
2、启动AcquireTimerJobsRunnable 线程(timerJob作业搬运到job):
timerJob线程(负责查询最新(<=now())一条的timerJob表数据并放入job表(所有线程都起来后也就是isActive=true后会涉及加锁set锁过期时间操作),删除timerJob表)
3、启动ResetExpiredJobsRunnable重置过期job线程
重置过期job线程:(查持有锁时间过期的job,并进行锁撤销:
6版本:删除旧的job,插入新的job(新的不设置lock过期时间和id,lock持有者等);
5版本:直接更新)
4、启动AcquireAsyncJobsDueRunnable该线程里开始job真正的执行,通过步骤1异步处理器的线程池去执行线程
:该步骤在线程池里最终调用了JobManager().execute(job);方法
该方法如下:
public void execute(Job job) { if (job instanceof JobEntity) { if (Job.JOB_TYPE_MESSAGE.equals(job.getJobType())) {//message executeMessageJob((JobEntity) job); } else if (Job.JOB_TYPE_TIMER.equals(job.getJobType())) {//timer executeTimerJob((JobEntity) job); } } else { throw new ActivitiException("Only jobs with type JobEntity are supported to be executed"); } }
之后通过作业处理器集合取到相应的处理器进行了job的执行 。
如下代码:
Map<String, JobHandler> jobHandlers = processEngineConfiguration.getJobHandlers(); JobHandler jobHandler = jobHandlers.get(jobEntity.getJobHandlerType()); jobHandler.execute(jobEntity, jobEntity.getJobHandlerConfiguration(), execution, getCommandContext());
综上:
1、init用于jobHandlerMap、jobManager、asyncExecutor异步处理器的初始化
2、jobManager用于job的创建、挂起、数据库操作等。
3、AsyncExecutor异步处理器通过线程池和多个线程去扫表并进行job的调度,通过线程池去执行,并最终通过jobManager的的execute调用job对应的具体jobHandler去处理执行job。
说明:异步处理器中线程的启动在ProcessEngineImpl构造方法。
待补充....