技术交流28群

服务热线

135-6963-3175

微信服务号

ContinueProcessOperation流转源码解析 更新时间 2022-3-20 浏览3296次

怎么继续节点流转的呢?

先给出详细流程:

主要做的操作:
    当前是节点:continueThroughFlowNode
                    1.如果是起始节点,触发开始节点启动执行监听器
                           getListenerNotificationHelper().executeExecutionListeners(elementWithExecutionListeners, executionEntity, eventType);
                    2.如果是子流程:创建子执行实例execution入库并作为子流程返回。
                           通过当前实例创建子执行execution入库ACT_RU_EXECUTION返回并把scope设置为true(子流程);删除执行实例相关的数据。            
                    3.多实例则执行多实例同步操作;强制同步操作或者非异步(默认)则执行同步操作;否则异步操作。
                            若多实例:
                                1.执行"start"执行监听器
                                2.非补偿且是活动节点(非网关和事件),则执行补偿
                                3.创建子执行,并执行边界事件行为
                                4.执行活动节点行为。
                                5.执行节点行为(activityBehavior.execute(execution))。(此处执行的是多实例MultiInstanceActivityBehavior的行为,放多例子行为类章节介绍)
                            若是同步(默认):
                                   1.入库ACT_HI_ACTINST 记录历史节点行为实例
                                   2. 执行start类型事件监听器
                                   3.执行边界事件
                                   4.行为不为null则执行行为类(行为里最后出线),否则执行出线操作(planTakeOutgoingSequenceFlowsOperation(execution, true))。  
                            若是异步:
                                   创建异步job入库(ACT_RU_JOB)并进行调度,可参考任务调度章节。                                
    当前是线条:continueThroughSequenceFlow
            1、线条执行监听器不为null:则执行start\take\end监听器
            2、创建并调度一个线条流转事件:
                表明引擎已从源活动获取(即遵循)到目标活动的sequenceflow。
            3、获取target节点设置到execution,并继续进行流转
                Context.getAgenda().planContinueProcessOperation(execution);//也就是当前分析的类

说明:

1、当执行为节点时:出口为行为类(最终也是执行出线planTakeOutgoingSequenceFlowsOperation(节点))
                            或者 直接planTakeOutgoingSequenceFlowsOperation(节点)
2、当执行为线条时:出口为planContinueProcessOperation(节点(通过对execution set seq的targetNode))


流转类之ContinueProcessOperation

代码如下:

@Override
  public void run() {
    FlowElement currentFlowElement = getCurrentFlowElement(execution);//获取当前执行元素
    if (currentFlowElement instanceof FlowNode) {//当前是节点
      continueThroughFlowNode((FlowNode) currentFlowElement);
    } else if (currentFlowElement instanceof SequenceFlow) {//当前是线条
      continueThroughSequenceFlow((SequenceFlow) currentFlowElement);
    } else {
      throw new ActivitiException("Programmatic error: no current flow element found or invalid type: " + currentFlowElement + ". Halting.");
    }
  }

主要做的事情:

  1. 当执行实例的当前元素是节点时:继续通过节点

  2. 当是线条时候:继续通过线条


一、首先看方法continueThroughFlowNode:

protected void continueThroughFlowNode(FlowNode flowNode) {
    //检查它是否是初始流元素。如果是这样,我们也必须触发流程的执行监听器
    // Check if it's the initial flow element. If so, we must fire the execution listeners for the process too
    if (flowNode.getIncomingFlows() != null
        && flowNode.getIncomingFlows().size() == 0
        && flowNode.getSubProcess() == null) {//起始 开始节点:无入线和子流程
      executeProcessStartExecutionListeners();//为开始节点启动执行监听器
    }
     // For a subprocess, a new child execution is created that will visit the steps of the subprocess
     // The original execution that arrived here will wait until the subprocess is finished
     // and will then be used to continue the process instance.
    if (flowNode instanceof SubProcess) {//子流程
      createChildExecutionForSubProcess((SubProcess) flowNode);
    }
    //多实例
    if (flowNode instanceof Activity && ((Activity) flowNode).hasMultiInstanceLoopCharacteristics()) {
      // the multi instance execution will look at async
      executeMultiInstanceSynchronous(flowNode);
    } else if (forceSynchronousOperation || !flowNode.isAsynchronous()) {//强制同步操作或非异步(默认)
      executeSynchronous(flowNode);
    } else {//异步
      executeAsynchronous(flowNode);
    }
}

主要操作:

  1. 如果是起始节点,触发开始节点启动执行监听器

  2. 如果是子流程:创建子执行实例execution入库并作为子流程返回。

  3. 多实例则执行多实例同步操作;强制同步操作或者非异步(默认)则执行同步操作;否则异步操作。

首先看步骤1:

protected void executeProcessStartExecutionListeners() {
    org.activiti.bpmn.model.Process process = ProcessDefinitionUtil.getProcess(execution.getProcessDefinitionId());
    executeExecutionListeners(process, execution.getParent(), ExecutionListener.EVENTNAME_START);
}
protected void executeExecutionListeners(HasExecutionListeners elementWithExecutionListeners, String eventType) {
    executeExecutionListeners(elementWithExecutionListeners, execution, eventType);
}
protected void executeExecutionListeners(HasExecutionListeners elementWithExecutionListeners,
      ExecutionEntity executionEntity, String eventType) {
    commandContext.getProcessEngineConfiguration().getListenerNotificationHelper()
      .executeExecutionListeners(elementWithExecutionListeners, executionEntity, eventType);
}

getListenerNotificationHelper().executeExecutionListeners(elementWithExecutionListeners, executionEntity, eventType);
该操作参考监听器通知相关文章。

看步骤2:

protected void createChildExecutionForSubProcess(SubProcess subProcess) {
    ExecutionEntity parentScopeExecution = findFirstParentScopeExecution(execution);//找父实例,也就是isScope=true的
    // Create the sub process execution that can be used to set variables
    // We create a new execution and delete the incoming one to have a proper scope that
    // does not conflict anything with any existing scopes
    //创建可用于设置变量的子流程执行我们创建一个新的执行并删除传入的执行以具有适当的范围,该范围不会与任何现有范围发生冲突
    ExecutionEntity subProcessExecution = commandContext.getExecutionEntityManager().createChildExecution(parentScopeExecution);
    subProcessExecution.setCurrentFlowElement(subProcess);
    subProcessExecution.setScope(true);//非子执行为true
    commandContext.getExecutionEntityManager().deleteExecutionAndRelatedData(execution, null, false);
    execution = subProcessExecution;
  }

若是子流程,则通过当前实例创建子执行execution入库ACT_RU_EXECUTION返回并把scope设置为true(子流程);删除执行实例相关的数据。

步骤3:

3.1若是多实例:

protected void executeMultiInstanceSynchronous(FlowNode flowNode) {
    // Execution listener: event 'start'
    if (CollectionUtil.isNotEmpty(flowNode.getExecutionListeners())) {
      executeExecutionListeners(flowNode, ExecutionListener.EVENTNAME_START);//"start"
    }
    //执行任何边界事件,子流程边界事件将从活动行为中执行(只有活动(排除网关和事件)可以有边界事件)
    // Execute any boundary events, sub process boundary events will be executed from the activity behavior
    if (!inCompensation && flowNode instanceof Activity) { // Only activities can have boundary events
      List<BoundaryEvent> boundaryEvents = ((Activity) flowNode).getBoundaryEvents();
      if (CollectionUtil.isNotEmpty(boundaryEvents)) {
        executeBoundaryEvents(boundaryEvents, execution);
      }
    }
    // Execute the multi instance behavior
    ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior();
    if (activityBehavior != null) {
      executeActivityBehavior(activityBehavior, flowNode);
    } else {
      throw new ActivitiException("Expected an activity behavior in flow node " + flowNode.getId());
    }
  }

分析代码:做了如下操作:

  1. 执行"start"执行监听器

  2. 非补偿且是活动节点(非网关和事件),则执行补偿

    1. 创建子执行,并执行边界事件行为

  3. 执行活动节点行为。

    1. 执行节点行为(activityBehavior.execute(execution))。(此处执行的是多实例MultiInstanceActivityBehavior的行为,放多例子行为类章节介绍

3.2若是同步操作(默认

protected void executeSynchronous(FlowNode flowNode) {
    ////入库ACT_HI_ACTINST 记录历史节点行为实例
    commandContext.getHistoryManager().recordActivityStart(execution);
    // Execution listener: event 'start'  执行start类型事件监听器
    if (CollectionUtil.isNotEmpty(flowNode.getExecutionListeners())) {
      executeExecutionListeners(flowNode, ExecutionListener.EVENTNAME_START);
    }
    // Execute any boundary events, sub process boundary events will be executed from the activity behavior
    if (!inCompensation && flowNode instanceof Activity) { // Only activities can have boundary events 执行边界事件
      List<BoundaryEvent> boundaryEvents = ((Activity) flowNode).getBoundaryEvents();
      if (CollectionUtil.isNotEmpty(boundaryEvents)) {
        executeBoundaryEvents(boundaryEvents, execution);
      }
    }
    // Execute actual behavior
    ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior();
    if (activityBehavior != null) {
      executeActivityBehavior(activityBehavior, flowNode);
    } else {
      logger.debug("No activityBehavior on activity '{}' with execution {}", flowNode.getId(), execution.getId());
      Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);
    }
  }

   1.入库ACT_HI_ACTINST 记录历史节点行为实例

   2. 执行start类型事件监听器

   3.执行边界事件

   4.行为不为null则执行行为类(行为里最后出线),否则执行出线操作。

3.3若是异步操作 

protected void executeAsynchronous(FlowNode flowNode) {
    JobEntity job = commandContext.getJobManager().createAsyncJob(execution, flowNode.isExclusive());
    commandContext.getJobManager().scheduleAsyncJob(job);
}

创建异步job入库(ACT_RU_JOB)并进行调度,可参考任务调度章节。


二、接下来我们分析第二个方法continueThroughSequenceFlow继续通过线条:

protected void continueThroughSequenceFlow(SequenceFlow sequenceFlow) {
    // Execution listener. Sequenceflow only 'take' makes sense ... but we've supported all three since the beginning
    if (CollectionUtil.isNotEmpty(sequenceFlow.getExecutionListeners())) {
      executeExecutionListeners(sequenceFlow, ExecutionListener.EVENTNAME_START);//start 开始
      executeExecutionListeners(sequenceFlow, ExecutionListener.EVENTNAME_TAKE);//take 经过
      executeExecutionListeners(sequenceFlow, ExecutionListener.EVENTNAME_END);//end 结束
    }
    // Firing event that transition is being taken
    if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
      FlowElement sourceFlowElement = sequenceFlow.getSourceFlowElement();
      FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement();
      Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
        ActivitiEventBuilder.createSequenceFlowTakenEvent(
            (ExecutionEntity) execution,
            ActivitiEventType.SEQUENCEFLOW_TAKEN,
            sequenceFlow.getId(),
            sourceFlowElement != null ? sourceFlowElement.getId() : null,
            sourceFlowElement != null ? (String) sourceFlowElement.getName() : null,
            sourceFlowElement != null ? sourceFlowElement.getClass().getName() : null,
            sourceFlowElement != null ? ((FlowNode) sourceFlowElement).getBehavior(): null,
            targetFlowElement != null ? targetFlowElement.getId() : null,
            targetFlowElement != null ? targetFlowElement.getName() : null,
            targetFlowElement != null ? targetFlowElement.getClass().getName() : null,
            targetFlowElement != null ? ((FlowNode) targetFlowElement).getBehavior(): null));
    }
    FlowElement targetFlowElement = sequenceFlow.getTargetFlowElement();
    execution.setCurrentFlowElement(targetFlowElement);//该出线对应的target节点
    logger.debug("Sequence flow '{}' encountered. Continuing process by following it using execution {}", sequenceFlow.getId(), execution.getId());
    Context.getAgenda().planContinueProcessOperation(execution);
}

分析,主要执行以下操作:

1、线条执行监听器不为null:则执行start\take\end监听器

2、创建并调度一个线条流转事件:

    表明引擎已从源活动获取(即遵循)到目标活动的sequenceflow。

3、获取target节点设置到execution,并继续进行流转

    Context.getAgenda().planContinueProcessOperation(execution);//也就是当前分析的类


综上:

1、当执行为节点时:出口为行为类(最终也是执行出线planTakeOutgoingSequenceFlowsOperation(节点)

                            或者 直接planTakeOutgoingSequenceFlowsOperation(节点)

2、当执行为线条时:出口为planContinueProcessOperation(节点)


注意:

1、在子流程行为类(SubProcessActivityBehavior)里会自动找startEvent节点进行planContinueProcessOperation(startEvent)流转

2、在startEvent节点行为类(NoneStartEventActivityBehavior通过父类去leave)会自动找出线leave(planTakeOutgoingSequenceFlowsOperation)