技术交流28群

服务热线

135-6963-3175

微信服务号

activiti之命令拦截器责任链源码分析 更新时间 2022-3-15 浏览2600次

activiti之命令拦截器责任链源码分析

查看ProcessEngineConfigurationImpl类的init方法里调用了命令相关的初始化

public void initCommandExecutors() {
    initDefaultCommandConfig();//初始化命令默认配置(主要设置事务类型)
    initSchemaCommandConfig();//schema命令配置(设置schema操作不支持事务)
    initCommandInvoker();//命令拦截器(用于命令调用),初始化CommandInvoker
    initCommandInterceptors();//初始化拦截器列表
    initCommandExecutor();//初始化拦截器链关系及构建命令执行器
}

我们看一下拦截器链的构建

//拦截器初始化
  public void initCommandInterceptors() {
    if (commandInterceptors == null) {
      commandInterceptors = new ArrayList<CommandInterceptor>();
      if (customPreCommandInterceptors != null) {
          //自定义前置拦截器
        commandInterceptors.addAll(customPreCommandInterceptors);
      }
      //默认的拦截器
      commandInterceptors.addAll(getDefaultCommandInterceptors());
      if (customPostCommandInterceptors != null) {
          //后置拦截器
        commandInterceptors.addAll(customPostCommandInterceptors);
      }
      //加入命令调用拦截器
      commandInterceptors.add(commandInvoker);
    }
  }
  public Collection<? extends CommandInterceptor> getDefaultCommandInterceptors() {
    List<CommandInterceptor> interceptors = new ArrayList<CommandInterceptor>();
    //加入日志拦截器
    interceptors.add(new LogInterceptor());
    //加入事务拦截器
    CommandInterceptor transactionInterceptor = createTransactionInterceptor();
    if (transactionInterceptor != null) {
      interceptors.add(transactionInterceptor);
    }
    //加入命令上下文拦截器(用于命令上下文的环境依赖创建初始化工作)
    if (commandContextFactory != null) {
      interceptors.add(new CommandContextInterceptor(commandContextFactory, this));
    }
    //加入事务上下文拦截器(用于session事务的提交和回滚)
    if (transactionContextFactory != null) {
      interceptors.add(new TransactionContextInterceptor(transactionContextFactory));
    }
    return interceptors;
  }
  public void initCommandExecutor() {
    if (commandExecutor == null) {
        //初始化拦截器链关系并构建命令执行器(入口)
      CommandInterceptor first = initInterceptorChain(commandInterceptors);
      commandExecutor = new CommandExecutorImpl(getDefaultCommandConfig(), first);
    }
  }

在默认的拦截器链的前后可以加入我们自定义扩展的拦截器。

默认的拦截器主要有:

1、日志拦截器

LogInterceptor:每个命令类调用前后会输出日志

2、事务拦截器

(1)SpringTransactionInterceptor

  通过spring的 TransactionTemplate设置事务隔离级别并在事务里执行下一个命令操作

 如下:         

transactionTemplate.setPropagationBehavior(getPropagation(config));//设置事务隔离级别(基于commandConfig中的隔离级别)
T result = transactionTemplate.execute(new TransactionCallback<T>() {
   public T doInTransaction(TransactionStatus status) {
        return next.execute(config, command);
   }
});

 (2)JtaTransactionInterceptor(暂不解析jta)

3、命令上下文拦截器

CommandContextInterceptor

通过command创建命令上下文,包含:

   command命令,sessionFactory,agenda,failedJobCommandFactory失败任务命令工厂,

   closeListeners关闭命令监听器列表,involvedExecutions(当前命令上下文参与执行的执行对象),resultStack(结果栈)等。

并压入Context上下文栈中,若命令上下文不可重用(默认不可重用),则在finally执行命令上下文的关闭操作,并对当前上下文栈等进行回收。

如下代码:

} finally {
      try {
        if (!contextReused) {
          context.close();//当命令执行完可执行一些job等操作
        }
      } finally {
        
        // Pop from stack
        Context.removeCommandContext();//回收命令上下文
        Context.removeProcessEngineConfiguration();//回收配置引用
        Context.removeBpmnOverrideContext();//(流程定义id:ObjectNode)缓存
        Context.removeActiviti5CompatibilityHandler();
      }
    }

接下来看CommandContext的close操作:

public void close() {
    //此方法的目的是正确关闭所有资源,即使在会话或事务上下文的关闭或刷新方法中发生异常也是如此。
    // The intention of this method is that all resources are closed properly, even if exceptions occur
    // in close or flush methods of the sessions or the transaction context.
    try {
      try {
        try {
          executeCloseListenersClosing();
          if (exception == null) {
            flushSessions();//刷空session
          }
        } catch (Throwable exception) {
          exception(exception);//异常输出
        } finally {
          try {
            if (exception == null) {
              executeCloseListenersAfterSessionFlushed();//session清空后执行关闭后置监听器集合
            }
          } catch (Throwable exception) {
            exception(exception);//异常输出
          }
          if (exception != null) {
            logException();
            executeCloseListenersCloseFailure();//执行关闭失败后只监听器集合
          } else {
            executeCloseListenersClosed();//后置执行异步任务
          }
        }
      } catch (Throwable exception) {
        // Catch exceptions during rollback
        exception(exception);
      } finally {
        // Sessions need to be closed, regardless of exceptions/commit/rollback
        closeSessions();//关闭session
      }
    } catch (Throwable exception) {
      // Catch exceptions during session closing
      exception(exception);
    }
    if (exception != null) {
      rethrowExceptionIfNeeded();
    }
}

主要用于关闭前置closing监听器的执行,session的flush, 命令上下文关闭后置Closed监听器的执行,session的close关闭。


4、事务上下文拦截器

TransactionContextInterceptor

构建transactionContext传入listener用于事务的提交和回滚

相关代码:

if (transactionContextFactory != null && !isReused) {//不可重用
    TransactionContext transactionContext = transactionContextFactory.openTransactionContext(commandContext);
    Context.setTransactionContext(transactionContext);
    commandContext.addCloseListener(new TransactionCommandContextCloseListener(transactionContext));
}
return next.execute(config, command);

会往CommandContext加入addCloseListener,listener相关代码:在命令上下文的close操作时调用,用于事务的提交和回滚

@Override
  public void afterSessionsFlush(CommandContext commandContext) {
    transactionContext.commit();
  }
  @Override
  public void closed(CommandContext commandContext) {
    
  }
  @Override
  public void closeFailure(CommandContext commandContext) {
    transactionContext.rollback();
  }

5、CommandInvoker

execute用于命令的最终执行。(通过Agenda计划任务线程链表,可参考agenda源码解析章节

上面几个拦截器的执行步骤:

(1)log拦截器

(2)在事务拦截器里的doInTransaction调用next调用命令上下文拦截器。(保证后面的所有操作都在事务里操作)

(3)命令上下文拦截器用于命令上下文的创建及上下文压栈等操作

(4)然后调用next事务上下文拦截器构造事务listener后置操作(用于当前命令上下文 事务的提交和回滚)。

(5)CommanInvoker用于命令操作的最终执行调用。

(6)在步骤3命令上下文拦截器的后置finally操作里通过步骤4的listener进行事务的刷出缓冲,提交和回滚等。


通过initCommandExecutor()构建的命令执行器CommandExecutorImpl,在该类里进行了命令责任链的first及后续调用执行。在基本activiti提供的所有service方法接口里就是通过命令传入该命令执行器进行后续的执行和调用。