微服务调用链的 Druid SQL日志捕获 博客分类: architecturespring Druid微服务日志
在微服务的开发中, 全链路的请求跟踪是极重要的. 我们希望根据一个请求id(流水号), 跟踪到该请求整个调用链的信息, 包括每次数据库操作的SQL.
日志信息通过ELK栈写入, 一次请求可能生成多条日志, 通过请求流水号能将所有日志找出来, 重现当时请求的全过程, 极大地方便了trouble shooting.
对于一个给定HTTP请求, 我们需要相当多的日志信息, 包括但不限于:
- HTTP本身的信息: 如流水号, URL, 参数... 大大小小几十个参数. 总之, 通过请求参数能完全重现当时的请求. 如果你此时意识到可能会有隐私安全问题, 那是正常的, 我们后面会谈到.
- Service调用链的信息: 包括耗时数据, 异常错误信息等. 总之, 出错的时候要能快速定位到错误位置.
- SQL信息: 抓取到每次数据库操作的实际SQL, 最好是可执行的SQL语句.
- 其他信息: 比如JVM线程信息, 线程CPU信息等.
此时要注意日志数据脱敏, 对于国内客户, 主要是密码相关信息. 对美国用户, 则最重要的是信用卡信息, 如CVV. 除Paypal之类的软件外, 要求用户提供社会安全号的情况是极其罕见的.
如果用户提供了了社会安全号, 则必须高度重视脱敏. 如果做不到彻底脱敏, 则甚至可以放弃记录日志. 毕竟, 还是公司的股票要紧. 老外不好惹, 动不动就敢起诉总统...
已脱敏的日志会丢失部分数据, 增大了故障排查的困难; 不过相对于可能带来的安全风险而言, 这些困难的付出是值得的.
终于到了正题, 本文只说Druid的日志捕获.
Druid其实自带SQL日志, 而且工作得很好. 只是我们希望Druid不要把SQL写入单独的外部日志文件, 我们希望拿到SQL, 然后写入我们调用链日志中.
此时需要实现一个自定的Filter.
@Bean @ConfigurationProperties("spring.datasource.druid.filter.slf4j") @Primary public Slf4jLogFilter slf4jLogFilter() { Slf4jLogFilter slf4jLogFilter = new CustomzedSlf4jLogFilter(); slf4jLogFilter.setConnectionLogEnabled(false); slf4jLogFilter.setResultSetLogEnabled(false); slf4jLogFilter.setStatementExecutableSqlLogEnable(true); slf4jLogFilter.setStatementSqlFormatOption(new SQLUtils.FormatOption(true, true)); return slf4jLogFilter; }
该Filter关闭了冗余的日志信息, 并允许生成可执行的SQL. (此处要考虑脱敏).
CustomzedSlf4jLogFilter内容如下:
/** * To integrate with trace log. * */ class CustomzedSlf4jLogFilter extends Slf4jLogFilter { private Logger sqlLogger = LoggerFactory.getLogger("SQL"); /** * Must be true, otherwise {@link StatementProxy#getLastExecuteTimeNano()} doesn't work. * * @see StatementProxy#getLastExecuteTimeNano() */ @Override public boolean isStatementLogEnabled() { return true; } /** * Must be true, otherwise {@link #statement_executeErrorAfter} never get called. * * @see #statement_executeErrorAfter */ @Override public boolean isStatementLogErrorEnabled() { return true; } @Override protected void statementExecuteAfter(StatementProxy statement, String sql, boolean firstResult) { super.statementExecuteAfter(statement, sql, firstResult); log(statement, sql); } @Override protected void statementExecuteBatchAfter(StatementProxy statement, int[] result) { super.statementExecuteBatchAfter(statement, result); String sql = statement instanceof PreparedStatementProxy ? ((PreparedStatementProxy) statement).getSql() : statement.getBatchSql(); log(statement, sql); } @Override protected void statementExecuteQueryAfter(StatementProxy statement, String sql, ResultSetProxy resultSet) { super.statementExecuteQueryAfter(statement, sql, resultSet); log(statement, sql); } @Override protected void statementExecuteUpdateAfter(StatementProxy statement, String sql, int updateCount) { super.statementExecuteUpdateAfter(statement, sql, updateCount); log(statement, sql); } @Override protected void statement_executeErrorAfter(StatementProxy statement, String sql, Throwable error) { super.statement_executeErrorAfter(statement, sql, error); log(statement, sql); } private void log(StatementProxy statement, String rawSql) { int elapsed = (int) statement.getLastExecuteTimeNano() / (1000 * 1000); String sql = String.format("%s /**Elapsed: %d milliseconds**/", executableSql(statement, rawSql), elapsed); sqlLogger.info(sql); LogContext.getLogEntry().addSql(sql); } private String executableSql(StatementProxy statement, String sql) { if (!super.isStatementExecutableSqlLogEnable()) { return sql; } if (statement.getParametersSize() == 0) { return sql; } List<Object> parameters = new ArrayList<>(statement.getParametersSize()); for (int i = 0; i < statement.getParametersSize(); ++i) { JdbcParameter jdbcParam = statement.getParameter(i); parameters.add(jdbcParam != null ? jdbcParam.getValue() : null); } String dbType = statement.getConnectionProxy().getDirectDataSource().getDbType(); return SQLUtils.format(sql, dbType, parameters, super.getStatementSqlFormatOption()); } }
注意到LogContext.getLogEntry().addSql(sql)这一句, 这句的作用是把捕获到的SQL加入到自己定义的日志对象中. 后面就可以把这个带SQL的LogEntry写入ElasticSearch.
我们还把每条SQL执行耗时通过SQL注释的方式记录了下来. 如果你在某些公司经历过部门之间常常扯皮的痛苦, 你就会知道这么一个小小的改进会节约你多少时间.
你可能已经猜到了, LogContext.getLogEntry()其实是从ThreadLocal中拿出来的, 大多数情况下这样都能工作.
对于跨JVM的情况, 比如每个Service都运行在自己的kubernates pod中, 都会通过各自的ThreadLocal抓获各自的日志. 对于同一个调用链的每一个service调用日志, 他们的流水号是相同的, 所以可以通过搜索ElasticSearch全部重现整个调用链的日志.
ThreadLocal在某些情况下会失效, 比如用到了异步的时候. 特别地, Spring 5 引入了Reactive Stream的实现Reactor, 大量使用了异步. 此时ThreadLocal大法就破产了.
幸好, Reactor是可以传送上下文的, 这是一个上下文在Reactor 的stream中传播的例子, 来自于Reactor官方. 可以把LogEntry之类的上下文数据通过类似Mono.subscriberContext()这种方式传播下去.
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor") .verifyComplete();