技术交流28群

服务热线

135-6963-3175

微信服务号

activiti之包含网关行为类InclusiveGatewayActivityBehavior源码解析 更新时间 2022-3-17 浏览2487次

看该类源码:

public class InclusiveGatewayActivityBehavior extends GatewayActivityBehavior implements InactiveActivityBehavior {
  private static final long serialVersionUID = 1L;
  private static Logger logger = LoggerFactory.getLogger(InclusiveGatewayActivityBehavior.class.getName());
  @Override
  public void execute(DelegateExecution execution) {
    // The join in the inclusive gateway works as follows:
    // When an execution enters it, it is inactivated.
    // All the inactivated executions stay in the inclusive gateway
    // until ALL executions that CAN reach the inclusive gateway have reached it.
    //
    // This check is repeated on execution changes until the inactivated
    // executions leave the gateway.
    /**
     * 包容网关中的工作流程如下:
     * 当执行进入它时,它被停用。所有未激活的execution都保留在包含网关中,直到所有可以到达包含网关的执行都到达它为止。
     * 对execution更改重复此检查,直到未激活的execution离开网关。
     */
    execution.inactivate();//设置未激活 isActive = false;
    executeInclusiveGatewayLogic((ExecutionEntity) execution);
  }
  @Override
  public void executeInactive(ExecutionEntity executionEntity) {
    executeInclusiveGatewayLogic(executionEntity);
  }
  protected void executeInclusiveGatewayLogic(ExecutionEntity execution) {
    CommandContext commandContext = Context.getCommandContext();
    ExecutionEntityManager executionEntityManager = commandContext.getExecutionEntityManager();
    lockFirstParentScope(execution);//找execution的根parent
    //获取所有执行实例
    Collection<ExecutionEntity> allExecutions = executionEntityManager.findChildExecutionsByProcessInstanceId(execution.getProcessInstanceId());
    Iterator<ExecutionEntity> executionIterator = allExecutions.iterator();
    boolean oneExecutionCanReachGateway = false;
    while (!oneExecutionCanReachGateway && executionIterator.hasNext()) {
      ExecutionEntity executionEntity = executionIterator.next();
      if (!executionEntity.getActivityId().equals(execution.getCurrentActivityId())) {//非当前活动
        boolean canReachGateway = ExecutionGraphUtil.isReachable(execution.getProcessDefinitionId(),
                executionEntity.getActivityId(), execution.getCurrentActivityId());
        if (canReachGateway) {//可以到达网关
          oneExecutionCanReachGateway = true;
        }
      } else if (executionEntity.getActivityId().equals(execution.getCurrentActivityId()) && executionEntity.isActive()) {
        // Special case: the execution has reached the inc gw, but the operation hasn't been executed yet for that execution
        oneExecutionCanReachGateway = true;
      }
    }
    //如果没有执行可以到达网关,则网关激活并执行分叉行为(无论入口或者出口,说明已经全部到达了则可以继续执行了??)
    // If no execution can reach the gateway, the gateway activates and executes fork behavior
    if (!oneExecutionCanReachGateway) {
      //任何被激活的执行都无法访问包含网关
      logger.debug("Inclusive gateway cannot be reached by any execution and is activated");
      // Kill all executions here (except the incoming)
      Collection<ExecutionEntity> executionsInGateway = executionEntityManager
          .findInactiveExecutionsByActivityIdAndProcessInstanceId(execution.getCurrentActivityId(), execution.getProcessInstanceId());
      for (ExecutionEntity executionEntityInGateway : executionsInGateway) {
        if (!executionEntityInGateway.getId().equals(execution.getId())) {
          commandContext.getHistoryManager().recordActivityEnd(executionEntityInGateway, null);//记录结束时间等
          executionEntityManager.deleteExecutionAndRelatedData(executionEntityInGateway, null, false);//删除执行和相关数据
        }
      }
      // Leave
      commandContext.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution, true);
    }
  }
}

主要做了几件事情:

1、每个顺序流进来后被暂停,isActive = false;

2、获取所有其他执行实例,判断是否还有未到达的顺序流:

        若有的话,则不分叉。若没有了,则开始进行分叉(无论start入网关节点或者end出网关节点,全部到达才进行后续执行)。

可以看下ExecutionGraphUtil的isReachablef方法:

 /**
   * 验证具有给定源标识符的元素是否可以通过以下序列流到达具有目标标识符的元素。
   * Verifies if the element with the given source identifier can reach the element with the target identifier through following sequence flow.
   */
  public static boolean isReachable(String processDefinitionId, String sourceElementId, String targetElementId) {
    // Fetch source and target elements
    Process process = ProcessDefinitionUtil.getProcess(processDefinitionId);
    FlowElement sourceFlowElement = process.getFlowElement(sourceElementId, true);
    FlowNode sourceElement = null;
    if (sourceFlowElement instanceof FlowNode) {
      sourceElement = (FlowNode) sourceFlowElement;
    } else if (sourceFlowElement instanceof SequenceFlow) {
      sourceElement = (FlowNode) ((SequenceFlow) sourceFlowElement).getTargetFlowElement();
    }
    FlowElement targetFlowElement = process.getFlowElement(targetElementId, true);
    FlowNode targetElement = null;
    if (targetFlowElement instanceof FlowNode) {
      targetElement = (FlowNode) targetFlowElement;
    } else if (targetFlowElement instanceof SequenceFlow) {
      targetElement = (FlowNode) ((SequenceFlow) targetFlowElement).getTargetFlowElement();
    }
    if (sourceElement == null) {
      throw new ActivitiException("Invalid sourceElementId '" + sourceElementId + "': no element found for this id n process definition '" + processDefinitionId + "'");
    }
    if (targetElement == null) {
      throw new ActivitiException("Invalid targetElementId '" + targetElementId + "': no element found for this id n process definition '" + processDefinitionId + "'");
    }
    Set<String> visitedElements = new HashSet<String>();
    return isReachable(process, sourceElement, targetElement, visitedElements);
  }
  public static boolean isReachable(Process process, FlowNode sourceElement, FlowNode targetElement, Set<String> visitedElements) {
    //没有传出的seq流:可能是流程或嵌入式子流程的结尾。
    // No outgoing seq flow: could be the end of eg . the process or an embedded subprocess
    if (sourceElement.getOutgoingFlows().size() == 0) {
      visitedElements.add(sourceElement.getId());
      FlowElementsContainer parentElement = process.findParent(sourceElement);
      if (parentElement instanceof SubProcess) {//parent是子流程
        sourceElement = (SubProcess) parentElement;
      } else {
        return false;
      }
    }
    if (sourceElement.getId().equals(targetElement.getId())) {
      return true;
    }
    //为了避免无限循环,我们必须捕获我们访问的每个节点,如果我们已经访问过该节点,则在进一步查看之前检查。
    // To avoid infinite looping, we must capture every node we visit
    // and check before going further in the graph if we have already
    // visited the node.
    visitedElements.add(sourceElement.getId());
    List<SequenceFlow> sequenceFlows = sourceElement.getOutgoingFlows();
    if (sequenceFlows != null && sequenceFlows.size() > 0) {
      for (SequenceFlow sequenceFlow : sequenceFlows) {
        String targetRef = sequenceFlow.getTargetRef();
        FlowNode sequenceFlowTarget = (FlowNode) process.getFlowElement(targetRef, true);
        if (sequenceFlowTarget != null && !visitedElements.contains(sequenceFlowTarget.getId())) {
          boolean reachable = isReachable(process, sequenceFlowTarget, targetElement, visitedElements);
          if (reachable) {
            return true;
          }
        }
      }
    }
    return false;
  }

主要传入一个source和一个target,判断source是否能到达target。