欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

微服务调用链的 Druid SQL日志捕获 博客分类: architecturespring Druid微服务日志

程序员文章站 2024-03-17 22:21:58
...

在微服务的开发中, 全链路的请求跟踪是极重要的. 我们希望根据一个请求id(流水号), 跟踪到该请求整个调用链的信息, 包括每次数据库操作的SQL. 

 

日志信息通过ELK栈写入, 一次请求可能生成多条日志, 通过请求流水号能将所有日志找出来, 重现当时请求的全过程, 极大地方便了trouble shooting. 

 

对于一个给定HTTP请求, 我们需要相当多的日志信息, 包括但不限于:

  1. HTTP本身的信息: 如流水号, URL, 参数... 大大小小几十个参数. 总之, 通过请求参数能完全重现当时的请求.  如果你此时意识到可能会有隐私安全问题, 那是正常的, 我们后面会谈到.
  2. Service调用链的信息: 包括耗时数据, 异常错误信息等. 总之, 出错的时候要能快速定位到错误位置. 
  3. SQL信息: 抓取到每次数据库操作的实际SQL, 最好是可执行的SQL语句. 
  4. 其他信息: 比如JVM线程信息, 线程CPU信息等. 

 

此时要注意日志数据脱敏, 对于国内客户, 主要是密码相关信息. 对美国用户, 则最重要的是信用卡信息, 如CVV. 除Paypal之类的软件外, 要求用户提供社会安全号的情况是极其罕见的.  

如果用户提供了了社会安全号, 则必须高度重视脱敏. 如果做不到彻底脱敏, 则甚至可以放弃记录日志. 毕竟, 还是公司的股票要紧. 老外不好惹, 动不动就敢起诉总统...

已脱敏的日志会丢失部分数据, 增大了故障排查的困难; 不过相对于可能带来的安全风险而言, 这些困难的付出是值得的. 

 

终于到了正题, 本文只说Druid的日志捕获. 

Druid其实自带SQL日志, 而且工作得很好. 只是我们希望Druid不要把SQL写入单独的外部日志文件, 我们希望拿到SQL, 然后写入我们调用链日志中. 

 

此时需要实现一个自定的Filter. 

@Bean
@ConfigurationProperties("spring.datasource.druid.filter.slf4j")
@Primary
public Slf4jLogFilter slf4jLogFilter() {
    Slf4jLogFilter slf4jLogFilter = new CustomzedSlf4jLogFilter();
    slf4jLogFilter.setConnectionLogEnabled(false);
    slf4jLogFilter.setResultSetLogEnabled(false);
    slf4jLogFilter.setStatementExecutableSqlLogEnable(true);
    slf4jLogFilter.setStatementSqlFormatOption(new SQLUtils.FormatOption(true, true));
    return slf4jLogFilter;
}

 

该Filter关闭了冗余的日志信息, 并允许生成可执行的SQL. (此处要考虑脱敏).

 

CustomzedSlf4jLogFilter内容如下:

    /**
     * To integrate with trace log.
     *
     */
    class CustomzedSlf4jLogFilter extends Slf4jLogFilter {
        private Logger sqlLogger = LoggerFactory.getLogger("SQL");

        /**
         * Must be true, otherwise {@link StatementProxy#getLastExecuteTimeNano()} doesn't work.
         *
         * @see StatementProxy#getLastExecuteTimeNano()
         */
        @Override
        public boolean isStatementLogEnabled() {
            return true;
        }

        /**
         * Must be true, otherwise {@link #statement_executeErrorAfter} never get called.
         *
         * @see #statement_executeErrorAfter
         */
        @Override
        public boolean isStatementLogErrorEnabled() {
            return true;
        }

        @Override
        protected void statementExecuteAfter(StatementProxy statement, String sql, boolean firstResult) {
            super.statementExecuteAfter(statement, sql, firstResult);
            log(statement, sql);
        }

        @Override
        protected void statementExecuteBatchAfter(StatementProxy statement, int[] result) {
            super.statementExecuteBatchAfter(statement, result);
            String sql = statement instanceof PreparedStatementProxy ? ((PreparedStatementProxy) statement).getSql() : statement.getBatchSql();
            log(statement, sql);
        }

        @Override
        protected void statementExecuteQueryAfter(StatementProxy statement, String sql, ResultSetProxy resultSet) {
            super.statementExecuteQueryAfter(statement, sql, resultSet);
            log(statement, sql);
        }

        @Override
        protected void statementExecuteUpdateAfter(StatementProxy statement, String sql, int updateCount) {
            super.statementExecuteUpdateAfter(statement, sql, updateCount);
            log(statement, sql);
        }

        @Override
        protected void statement_executeErrorAfter(StatementProxy statement, String sql, Throwable error) {
            super.statement_executeErrorAfter(statement, sql, error);
            log(statement, sql);
        }

        private void log(StatementProxy statement, String rawSql) {
            int elapsed = (int) statement.getLastExecuteTimeNano() / (1000 * 1000);
            String sql = String.format("%s /**Elapsed: %d milliseconds**/", executableSql(statement, rawSql), elapsed);
            sqlLogger.info(sql);
            LogContext.getLogEntry().addSql(sql);
        }

        private String executableSql(StatementProxy statement, String sql) {
            if (!super.isStatementExecutableSqlLogEnable()) {
                return sql;
            }

            if (statement.getParametersSize() == 0) {
                return sql;
            }

            List<Object> parameters = new ArrayList<>(statement.getParametersSize());
            for (int i = 0; i < statement.getParametersSize(); ++i) {
                JdbcParameter jdbcParam = statement.getParameter(i);
                parameters.add(jdbcParam != null ? jdbcParam.getValue() : null);
            }

            String dbType = statement.getConnectionProxy().getDirectDataSource().getDbType();
            return SQLUtils.format(sql, dbType, parameters, super.getStatementSqlFormatOption());
        }
    }

 注意到LogContext.getLogEntry().addSql(sql)这一句, 这句的作用是把捕获到的SQL加入到自己定义的日志对象中. 后面就可以把这个带SQL的LogEntry写入ElasticSearch.

 

我们还把每条SQL执行耗时通过SQL注释的方式记录了下来. 如果你在某些公司经历过部门之间常常扯皮的痛苦, 你就会知道这么一个小小的改进会节约你多少时间.

 

你可能已经猜到了, LogContext.getLogEntry()其实是从ThreadLocal中拿出来的, 大多数情况下这样都能工作.

对于跨JVM的情况, 比如每个Service都运行在自己的kubernates pod中, 都会通过各自的ThreadLocal抓获各自的日志. 对于同一个调用链的每一个service调用日志, 他们的流水号是相同的, 所以可以通过搜索ElasticSearch全部重现整个调用链的日志. 

 

ThreadLocal在某些情况下会失效, 比如用到了异步的时候. 特别地, Spring 5 引入了Reactive Stream的实现Reactor, 大量使用了异步. 此时ThreadLocal大法就破产了.

幸好, Reactor是可以传送上下文的, 这是一个上下文在Reactor 的stream中传播的例子, 来自于Reactor官方. 可以把LogEntry之类的上下文数据通过类似Mono.subscriberContext()这种方式传播下去. 

String key = "message";
Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                                   .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));
 
StepVerifier.create(r)
            .expectNext("Hello Reactor")
            .verifyComplete();