欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Azkaban日志中文乱码问题解决教程

程序员文章站 2022-05-14 07:53:08
解决Azkaban日志中文乱码问题,Azkaban调度DataX或者Hive,获取日志中含有中文出现乱码问题解决,修改Azkaban源码解决日志乱码教程 ......

  Azkaban作为LinkedIn开源的任务流式管理工具,在工作中很大程度上被用到。但是,由于非国人开发,对中文的支持性很不好。大多数情况下,会出现几种乱码现象: - 执行内置脚本生成log乱码 - 直接command执行中文乱码 - 中文包名乱码等,其中对日常使用影响最大的就是日志乱码问题。不管是调度Hive、DataX还是Java程序,只要日志抛出来中文,中文都是乱码显示,摸索许久,决定从源码入手开始层层解惑。

  文中大部分内容从源码一步步进入解析,有经验的朋友可以跳至文末见具体解决方法。

  根据页面获取日志的接口可以知道方法在 azkaban-web-server项目下package azkaban.webapp.servlet 下的方法handleAJAXAction,如下图 请求参数是fetchExecJobLogs

Azkaban日志中文乱码问题解决教程

   对应的处理方法为  ajaxFetchJobLogs(req, resp, ret, session.getUser(), exFlow)和ajaxFetchExecFlowLogs(req, resp, ret, session.getUser(), exFlow)

 

Azkaban日志中文乱码问题解决教程

  进入该方法后可以发现返回的data为经过 StringEscapeUtils.escapeHtml格式化过的,这就是引发乱码的原因之一。

Azkaban日志中文乱码问题解决教程

  改用commons-lang3下的方法可以解决这个问题,增加如下依赖后更新gradle项目,将此处StringEscapeUtils.escapeHtml(data.getData())更改为 org.apache.commons.lang3.StringEscapeUtils.escapeHtml3(data.getData())

  // https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
  compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.4'

  修改后为 

Azkaban日志中文乱码问题解决教程

Azkaban日志中文乱码问题解决教程

  仔细研读代码可以发现其实这样还不够,读取日志的时候会先判断该任务是否正在执行,如果在执行的话就直接在服务器上暂存日志路径读取,否则是话就在mysql表读取,具体代码如下

Azkaban日志中文乱码问题解决教程

 

  一.本地文件读取的话是走http请求的,对应的为azkaban-exec-server下的包package azkaban.execapp下的ExecutorServlet 

Azkaban日志中文乱码问题解决教程

  进入方法handleFetchLogEvent,一个是FlowLogs 一个是JobLogs ,

Azkaban日志中文乱码问题解决教程

 

  进入这两个方法后发现最终都是调用的FileIOUtils.readUtf8File()这个方法

Azkaban日志中文乱码问题解决教程

  读取日志的核心方法就是这个了

Azkaban日志中文乱码问题解决教程

  BufferedInputStream读取的是字节byte,因为一个汉字占两个字节,而当中英文混合的时候,有的字符占一个字节,有的字符占两个字节,所以如果直接读字节,而数据比较长,没有一次读完的时候,很可能刚好读到一个汉字的前一个字节,这样,这个中文就成了乱码,后面的数据因为没有字节对齐,也都成了乱码.所以我们需要用BufferedReader来读取,
它读到的是字符,所以不会读到半个字符的情况,不会出现乱码。
  在这里修改方法后为一个新方法如下
Azkaban日志中文乱码问题解决教程
  public static LogData readUtf8FileEncode(final File file, final int fileOffset, final int length)
          throws IOException {
    final char[] chars = new char[length];
    final FileInputStream fileStream = new FileInputStream(file);

    final long skipped = fileStream.skip(fileOffset);
    if (skipped < fileOffset) {
      fileStream.close();
      return new LogData(fileOffset, 0, "");
    }

    BufferedReader reader = null;
    int read = 0;
    try {
      reader = new BufferedReader(new InputStreamReader(fileStream, "GBK"));
      read = reader.read(chars);
    } finally {
      IOUtils.closeQuietly(reader);
    }

    if (read <= 0) {
      return new LogData(fileOffset, 0, "");
    }
    final byte[] buffer = new String(chars).getBytes();
    final Pair<Integer, Integer> utf8Range = getUtf8Range(buffer, 0, read);
    final String outputString = new String(buffer, utf8Range.getFirst(), utf8Range.getSecond(),
            StandardCharsets.UTF_8);

    return new LogData(fileOffset + utf8Range.getFirst(), utf8Range.getSecond(), outputString);
  }
View Code
  在原来引用的地方替换为新的即可,如下图。

Azkaban日志中文乱码问题解决教程

Azkaban日志中文乱码问题解决教程

 

  二.对于运行完成的,日志会被写入数据库,而且是经过压缩的,方法为 package azkaban.executor下的ExecutionLogsDao 搜索找到fetchLogs,在方法体中实例化了一个FetchLogsHandler

Azkaban日志中文乱码问题解决教程

  在FetchLogsHandler中核心方法为handle,在这个方法中可以发现这块会有解压缩的过程,开始怀疑是不是这块出现问题,测试后否定了这一猜想。

 Azkaban日志中文乱码问题解决教程

  这里有解压,那在入库之前肯定会有压缩过程,再找到入库的方法(如何触发将本地文件写入到数据库在这里不讨论)uploadLogFile(ExecutionLogsDao方法中)

Azkaban日志中文乱码问题解决教程

  进入方法体后可以发现和之前类似,也是采用BufferedInputStream读取文件

Azkaban日志中文乱码问题解决教程

  修改后的方法为如下

Azkaban日志中文乱码问题解决教程
    private void uploadLogFileEncode(final DatabaseTransOperator transOperator, final int execId, final String name,
            final int attempt, final File[] files, final EncodingType encType) throws SQLException {
        // 50K buffer... if logs are greater than this, we chunk.
        // However, we better prevent large log files from being uploaded somehow
        final char[] buffer = new char[50 * 1024];
        int pos = 0;
        int length = buffer.length;
        int startByte = 0;
        try {
            for (int i = 0; i < files.length; ++i) {
                final File file = files[i];
                // BufferedInputStream读取的是字节byte,因为一个汉字占两个字节,而当中英文混合的时候,有的字符占一个字节,
                // 有的字符占两个字节,所以如果直接读字节,而数据比较长,没有一次读完的时候,很可能刚好读到一个汉字的前一个字节,
                // 这样,这个中文就成了乱码,后面的数据因为没有字节对齐,也都成了乱码.所以我们需要用BufferedReader来读取,
                // 它读到的是字符,所以不会读到半个字符的情况,不会出现乱码
                BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "GBK"));
                try {
                    int size = reader.read(buffer, pos, length);
                    while (size >= 0) {
                        if (pos + size == buffer.length) {
                            // Flush here.
                            uploadLogPart(transOperator, execId, name, attempt, startByte, startByte + buffer.length,
                                    encType, buffer, buffer.length);

                            pos = 0;
                            length = buffer.length;
                            startByte += buffer.length;
                        } else {
                            // Usually end of file.
                            pos += size;
                            length = buffer.length - pos;
                        }
                        size = reader.read(buffer, pos, length);
                    }
                } finally {
                    IOUtils.closeQuietly(reader);
                }
            }

            // Final commit of buffer.
            if (pos > 0) {
                uploadLogPart(transOperator, execId, name, attempt, startByte, startByte + pos, encType, buffer, pos);
            }
        } catch (final SQLException e) {
            logger.error("Error writing log part.", e);
            throw new SQLException("Error writing log part", e);
        } catch (final IOException e) {
            logger.error("Error chunking.", e);
            throw new SQLException("Error chunking", e);
        }
    }

    private void uploadLogPart(final DatabaseTransOperator transOperator, final int execId, final String name,
            final int attempt, final int startByte, final int endByte, final EncodingType encType, final char[] buffer,
            final int length) throws SQLException, IOException {
        final String INSERT_EXECUTION_LOGS = "INSERT INTO execution_logs "
                + "(exec_id, name, attempt, enc_type, start_byte, end_byte, "
                + "log, upload_time) VALUES (?,?,?,?,?,?,?,?)";

        byte[] buf = new String(buffer).getBytes();
        if (encType == EncodingType.GZIP) {
            buf = GZIPUtils.gzipBytes(buf, 0, buf.length);
        } else if (length < buffer.length) {
            buf = new String(buffer, 0, length).getBytes();
        }

        transOperator.update(INSERT_EXECUTION_LOGS, execId, name, attempt, encType.getNumVal(), startByte,
                startByte + length, buf, DateTime.now().getMillis());
    
View Code

  以上代码直接放在ExecutionLogsDao内即可,修改引用的地方

 Azkaban日志中文乱码问题解决教程

   至此日志显示中文乱码问题即可解决。以上为代码分析过程,操作过程可直接总结如下:

1.添加commons-lang3依赖

// https://mvnrepository.com/artifact/org.apache.commons/commons-lang3
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.4'

2.azkaban-web-server -->package azkaban.webapp.servlet-->ExecutorServlet-->将方法ajaxFetchExecFlowLogs和ajaxFetchJobLogs中的StringEscapeUtils.escapeHtml替换为org.apache.commons.lang3.StringEscapeUtils.escapeHtml3

3.azkaban-common-->package azkaban.utils-->FileIOUtils-->增加方法readUtf8FileEncode (文中第一段代码)

4.azkaban-common-->package azkaban.execapp-->FlowRunnerManager-->分别将readFlowLogs和readJobLogs中readUtf8File替换为readUtf8FileEncode

5.azkaban-common-->package azkaban.executor-->ExecutionLogsDao-->增加方法uploadLogFileEncode和uploadLogPart(重载)(文中第二段代码),将uploadLogFile(在70行左右的这个方法public void uploadLogFile(final int execId, final String name, final int attempt, final File... files))中的uploadLogFile替换为uploadLogFileEncode

  代码修改就是这些,来张前后对比图(对于已经写入数据库的日志,中文乱码问题无法解决,因为写入数据库前读取文件的方法不支持中文,修改代码之后入数据库的日志可以支持中文)

Azkaban日志中文乱码问题解决教程

 

Azkaban日志中文乱码问题解决教程