技术交流28群

服务热线

135-6963-3175

微信服务号

activiti之引擎计划ActivitiEngineAgenda源码分析 更新时间 2022-3-15 浏览2687次

activiti之引擎计划ActivitiEngineAgenda

我们知道activti6基本是基于责任链和命令模式实现,但底层是如何进行命令的执行和流程的驱动流转的呢?

我们首先看一下命令调用类CommandInvoker代码:

public class CommandInvoker extends AbstractCommandInterceptor {
  private static final Logger logger = LoggerFactory.getLogger(CommandInvoker.class);
  @Override
  @SuppressWarnings("unchecked")
  public <T> T execute(final CommandConfig config, final Command<T> command) {
    final CommandContext commandContext = Context.getCommandContext();
    //1、要执行的线程操作加入计划任务链表
    commandContext.getAgenda().planOperation(new Runnable() {
      @Override
      public void run() {
        commandContext.setResult(command.execute(commandContext));//execute且执行该命令的过程中可能会通过agenda往Operation压入很多计划线程操作
      }
    });
    //2、执行
    executeOperations(commandContext);
    // At the end, call the execution tree change listeners.
    // TODO: optimization: only do this when the tree has actually changed (ie check dbSqlSession).
    //3、若当前命令或者流转操作上下文有参与执行的执行对象,则进行执行
    if (commandContext.hasInvolvedExecutions()) {
      Context.getAgenda().planExecuteInactiveBehaviorsOperation();//针对包容网关
      executeOperations(commandContext);//类似上面步骤2
    }
    return (T) commandContext.getResult();
  }
  protected void executeOperations(final CommandContext commandContext) {
    while (!commandContext.getAgenda().isEmpty()) {
      //获取计划链表里的线程进行执行
      Runnable runnable = commandContext.getAgenda().getNextOperation();//弹出
      executeOperation(runnable);//执行
    }
  }
  //线程的真正启动
  public void executeOperation(Runnable runnable) {
    if (runnable instanceof AbstractOperation) {
      AbstractOperation operation = (AbstractOperation) runnable;
      // Execute the operation if the operation has no execution (i.e. it's an operation not working on a process instance)
      // or the operation has an execution and it is not ended
      if (operation.getExecution() == null || !operation.getExecution().isEnded()) {
        if (logger.isDebugEnabled()) {
          logger.debug("Executing operation {} ", operation.getClass());
        }
        runnable.run();
      }
    } else {
      runnable.run();
    }
  }
  @Override
  public CommandInterceptor getNext() {
    return null;
  }
  @Override
  public void setNext(CommandInterceptor next) {
    throw new UnsupportedOperationException("CommandInvoker must be the last interceptor in the chain");
  }
}

可以看出该类主要三个事情:

1、要执行的命令县城加入命令链表;

2、循环弹出并启动执行链表里的所有待执行的线程操作;

3、若当前命令或者流转操作上下文有参与执行的执行对象,则进行执行

4、返回命令执行结果

activit6底层的ActivitiEngineAgenda作为引擎节点流转的计划任务核心类,驱动流程的运转。

看一下接口

public interface ActivitiEngineAgenda extends Agenda {
    //执行流程操作
    void planContinueProcessOperation(ExecutionEntity execution);
    //执行流程同步操作
    void planContinueProcessSynchronousOperation(ExecutionEntity execution);
    //流程补偿
    void planContinueProcessInCompensation(ExecutionEntity execution);
    //多实例操作
    void planContinueMultiInstanceOperation(ExecutionEntity execution);
    //任务出线操作
    void planTakeOutgoingSequenceFlowsOperation(ExecutionEntity execution, boolean evaluateConditions);
    //结束execution操作
    void planEndExecutionOperation(ExecutionEntity execution);
    //触发execution执行操作
    void planTriggerExecutionOperation(ExecutionEntity execution);
    //执行有效范围销毁
    void planDestroyScopeOperation(ExecutionEntity execution);
    //执行未激活的行为
    void planExecuteInactiveBehaviorsOperation();
}
//相关的计划流转类:
ContinueProcessOperation
ContinueMultiInstanceOperation
DestroyScopeOperation
EndExecutionOperation
ExecuteInactiveBehaviorsOperation
TakeOutgoingSequenceFlowsOperation
TriggerExecutionOperation
(且以上类都实现了runnable接口)

可以看下ActivitiEngineAgenda接口默认的实现类的核心代码:

public class DefaultActivitiEngineAgenda implements ActivitiEngineAgenda {
    //保存了要执行的线程集合
    protected LinkedList<Runnable> operations = new LinkedList<Runnable>();
    protected CommandContext commandContext;
    @Override
    public boolean isEmpty() {
            return operations.isEmpty();
    }
    
   @Override
   public Runnable getNextOperation() {//弹出要执行的线程
        return operations.poll();
   } 
   /**
     * Generic method to plan a {@link Runnable}.
    */
   @Override
   public void planOperation(Runnable operation) {
      operations.add(operation);//加入线程集合
    
      if (operation instanceof AbstractOperation) {
           ExecutionEntity execution = ((AbstractOperation) operation).getExecution();
           if (execution != null) {//线程若有执行实例则加入关联的执行实例集合
               commandContext.addInvolvedExecution(execution);
           }
      }
    
      logger.debug("Operation {} added to agenda", operation.getClass());
   }
   //加入待执行的线程链表
   @Override
    public void planContinueProcessOperation(ExecutionEntity execution) {
        planOperation(new ContinueProcessOperation(commandContext, execution));
    }
   。。省略其他类型的加入操作
}

该类负责维护命令实际的线程任务集合和加入命令上下文执行实例加入集合。