flume ExecSource 支持获取获取指定列数据 博客分类: flume flume ExecSource 支持获取获取指定列数据
程序员文章站
2024-03-12 19:10:08
...
需求描述:
flume使用 execSource 类型 实现截取数据行中指定列的数据(详见下图)
实现:
1.方案一: execSource接受的是linux命令,所以可以使用linux awk实现这个功能
命令:tail -F /root/test.log | awk -F ',' '{print $2;fflush()}' 注意:fflush()一定要加,否则不输出
完整的flume-exec.properties文件内容如下:
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' hd1.sources=s1 hd1.sources.s1.type=exec hd1.sources.s1.shell=/bin/bash -c hd1.sources.s1.command=tail -F /root/test.log | awk -F ',' '{print $2;fflush()}' hd1.channels=c1 hd1.channels.c1.type=memory hd1.channels.c1.capacity=1000 hd1.channels.c1.transcationCapacity=100 hd1.sinks=sk1 hd1.sinks.sk1.type=logger #把source 和 sink 关联到channel上 #1个source可以对应多个channel(重点) hd1.sources.s1.channels=c1 #一个sink只对应1个sink(重点) hd1.sinks.sk1.channel=c1
2.方案二:修改源码,扩展ExecSource
(1) 具体怎么改?
看ExecSource.java 的源码,可以知道 ExecSource是通过BufferedReader,读取InputStream,然后把读取出来的每行内容包装成event,发往channel,所以我们可以在包装成event之前,把内容替换成我们需要的
(2)具体实现:
修改ExecSource.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.flume.source; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.*; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * <p> * A {@link Source} implementation that executes a Unix process and turns each * line of text into an event. * </p> * <p> * This source runs a given Unix command on start up and expects that process to * continuously produce data on standard out (stderr ignored by default). Unless * told to restart, if the process exits for any reason, the source also exits and * will produce no further data. This means configurations such as <tt>cat [named pipe]</tt> * or <tt>tail -F [file]</tt> are going to produce the desired results where as * <tt>date</tt> will probably not - the former two commands produce streams of * data where as the latter produces a single event and exits. * </p> * <p> * The <tt>ExecSource</tt> is meant for situations where one must integrate with * existing systems without modifying code. It is a compatibility gateway built * to allow simple, stop-gap integration and doesn't necessarily offer all of * the benefits or guarantees of native integration with Flume. If one has the * option of using the <tt>AvroSource</tt>, for instance, that would be greatly * preferred to this source as it (and similarly implemented sources) can * maintain the transactional guarantees that exec can not. * </p> * <p> * <i>Why doesn't <tt>ExecSource</tt> offer transactional guarantees?</i> * </p> * <p> * The problem with <tt>ExecSource</tt> and other asynchronous sources is that * the source can not guarantee that if there is a failure to put the event into * the {@link Channel} the client knows about it. As a for instance, one of the * most commonly requested features is the <tt>tail -F [file]</tt>-like use case * where an application writes to a log file on disk and Flume tails the file, * sending each line as an event. While this is possible, there's an obvious * problem; what happens if the channel fills up and Flume can't send an event? * Flume has no way of indicating to the application writing the log file that * it needs to retain the log or that the event hasn't been sent, for some * reason. If this doesn't make sense, you need only know this: <b>Your * application can never guarantee data has been received when using a * unidirectional asynchronous interface such as ExecSource!</b> As an extension * of this warning - and to be completely clear - there is absolutely zero * guarantee of event delivery when using this source. You have been warned. * </p> * <p> * <b>Configuration options</b> * </p> * <table> * <tr> * <th>Parameter</th> * <th>Description</th> * <th>Unit / Type</th> * <th>Default</th> * </tr> * <tr> * <td><tt>command</tt></td> * <td>The command to execute</td> * <td>String</td> * <td>none (required)</td> * </tr> * <tr> * <td><tt>restart</tt></td> * <td>Whether to restart the command when it exits</td> * <td>Boolean</td> * <td>false</td> * </tr> * <tr> * <td><tt>restartThrottle</tt></td> * <td>How long in milliseconds to wait before restarting the command</td> * <td>Long</td> * <td>10000</td> * </tr> * <tr> * <td><tt>logStderr</tt></td> * <td>Whether to log or discard the standard error stream of the command</td> * <td>Boolean</td> * <td>false</td> * </tr> * <tr> * <td><tt>batchSize</tt></td> * <td>The number of events to commit to channel at a time.</td> * <td>integer</td> * <td>20</td> * </tr> * <tr> * <td><tt>batchTimeout</tt></td> * <td>Amount of time (in milliseconds) to wait, if the buffer size was not reached, * before data is pushed downstream.</td> * <td>long</td> * <td>3000</td> * </tr> * </table> * <p> * <b>Metrics</b> * </p> * <p> * TODO * </p> */ public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable { private static final Logger logger = LoggerFactory.getLogger(ExecSource.class); private String shell; private String command; private SourceCounter sourceCounter; private ExecutorService executor; private Future<?> runnerFuture; private long restartThrottle; private boolean restart; private boolean logStderr; private Integer bufferCount; private long batchTimeout; private ExecRunnable runner; private Charset charset; //开关,是否做split private boolean customSplitSwitchOn; //split的分隔符 private String customSplitDelimiter; //split后获取的列 private Integer customFetchColId; @Override public void start() { logger.info("Exec source starting with command: {}", command); // Start the counter before starting any threads that may access it. sourceCounter.start(); executor = Executors.newSingleThreadExecutor(); //把自定义的三个参数,传给构造函数 runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset, customSplitSwitchOn, customSplitDelimiter, customFetchColId); // Start the runner thread. runnerFuture = executor.submit(runner); // Mark the Source as RUNNING. super.start(); logger.debug("Exec source started"); } @Override public void stop() { logger.info("Stopping exec source with command: {}", command); if (runner != null) { runner.setRestart(false); runner.kill(); } if (runnerFuture != null) { logger.debug("Stopping exec runner"); runnerFuture.cancel(true); logger.debug("Exec runner stopped"); } executor.shutdown(); while (!executor.isTerminated()) { logger.debug("Waiting for exec executor service to stop"); try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } sourceCounter.stop(); super.stop(); logger.debug("Exec source with command:{} stopped. Metrics:{}", command, sourceCounter); } @Override public void configure(Context context) { command = context.getString("command"); Preconditions.checkState(command != null, "The parameter command must be specified"); restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE); restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART, ExecSourceConfigurationConstants.DEFAULT_RESTART); logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR, ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR); bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE); batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT); charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, ExecSourceConfigurationConstants.DEFAULT_CHARSET)); shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null); // 获取split开关配置值 customSplitSwitchOn = context .getBoolean(ExecSourceConfigurationConstants.CUSTOM_SPLIT_SWITCH_ON, ExecSourceConfigurationConstants.DEFAULT_CUSTON_SWITCH_ON); //获取分隔符配置值 customSplitDelimiter = context .getString(ExecSourceConfigurationConstants.CUSTOM_SPLIT_DELIMITER, ExecSourceConfigurationConstants.DEFAULT_CUSTOM_SPLIT_DELIMITER); //获取split后的列 customFetchColId = context.getInteger(ExecSourceConfigurationConstants.CUSTOM_FETCH_COL, ExecSourceConfigurationConstants.DEFAULT_CUSTOM_FETCH_COL_ID); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } } private static class ExecRunnable implements Runnable { //构造函数加入三个参数 public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset, boolean customSplitSwitchOn, String customSplitDelimiter, Integer customFetchColId) { this.command = command; this.channelProcessor = channelProcessor; this.sourceCounter = sourceCounter; this.restartThrottle = restartThrottle; this.bufferCount = bufferCount; this.batchTimeout = batchTimeout; this.restart = restart; this.logStderr = logStderr; this.charset = charset; this.shell = shell; //custom属性 this.customSplitSwitchOn = customSplitSwitchOn; this.customSplitDelimiter = customSplitDelimiter; this.customFetchColId = customFetchColId; } private final String shell; private final String command; private final ChannelProcessor channelProcessor; private final SourceCounter sourceCounter; private volatile boolean restart; private final long restartThrottle; private final int bufferCount; private long batchTimeout; private final boolean logStderr; private final Charset charset; //split的分隔符 private String customSplitDelimiter; //开关(是否允许做split) private boolean customSplitSwitchOn; //split后需要获取的列id private int customFetchColId; private Process process = null; private SystemClock systemClock = new SystemClock(); private Long lastPushToChannel = systemClock.currentTimeMillis(); ScheduledExecutorService timedFlushService; ScheduledFuture<?> future; @Override public void run() { do { String exitCode = "unknown"; BufferedReader reader = null; String line = null; final List<Event> eventList = new ArrayList<Event>(); timedFlushService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat( "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); try { if (shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); // StderrLogger dies as soon as the input stream is invalid StderrReader stderrReader = new StderrReader(new BufferedReader( new InputStreamReader(process.getErrorStream(), charset)), logStderr); stderrReader.setName("StderrReader-[" + command + "]"); stderrReader.setDaemon(true); stderrReader.start(); future = timedFlushService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { synchronized (eventList) { if (!eventList.isEmpty() && timeout()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Exception occurred when processing event batch", e); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } } }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); String splits[]; while ((line = reader.readLine()) != null) { sourceCounter.incrementEventReceivedCount(); synchronized (eventList) { //如果开启了split开关,那么将根据指定的分割符做split,并返回指定列的内容 if (customSplitSwitchOn) { try { splits = line.split(customSplitDelimiter); if (splits.length > customFetchColId) { line = splits[customFetchColId]; } else { logger.error("customColId is larger than " + splits.length); continue; } } catch (Exception e) { logger.error("Failed while split line: ", e); continue; } } eventList.add(EventBuilder.withBody(line.getBytes(charset))); if (eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } } synchronized (eventList) { if (!eventList.isEmpty()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Failed while running command: " + command, e); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { logger.error("Failed to close reader for exec source", ex); } } exitCode = String.valueOf(kill()); } if (restart) { logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); try { Thread.sleep(restartThrottle); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { logger.info("Command [" + command + "] exited with " + exitCode); } } while (restart); } private void flushEventBatch(List<Event> eventList) { channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); } private boolean timeout() { return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout; } private static String[] formulateShellCommand(String shell, String command) { String[] shellArgs = shell.split("\\s+"); String[] result = new String[shellArgs.length + 1]; System.arraycopy(shellArgs, 0, result, 0, shellArgs.length); result[shellArgs.length] = command; return result; } public int kill() { if (process != null) { synchronized (process) { process.destroy(); try { int exitValue = process.waitFor(); // Stop the Thread that flushes periodically if (future != null) { future.cancel(true); } if (timedFlushService != null) { timedFlushService.shutdown(); while (!timedFlushService.isTerminated()) { try { timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug( "Interrupted while waiting for exec executor service " + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } } return exitValue; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } return Integer.MIN_VALUE; } return Integer.MIN_VALUE / 2; } public void setRestart(boolean restart) { this.restart = restart; } } private static class StderrReader extends Thread { private BufferedReader input; private boolean logStderr; protected StderrReader(BufferedReader input, boolean logStderr) { this.input = input; this.logStderr = logStderr; } @Override public void run() { try { int i = 0; String line = null; while ((line = input.readLine()) != null) { if (logStderr) { // There is no need to read 'line' with a charset // as we do not to propagate it. // It is in UTF-16 and would be printed in UTF-8 format. logger.info("StderrLogger[{}] = '{}'", ++i, line); } } } catch (IOException e) { logger.info("StderrLogger exiting", e); } finally { try { if (input != null) { input.close(); } } catch (IOException ex) { logger.error("Failed to close stderr reader for exec source", ex); } } } } }
修改ExecSourceConfigurationConstants
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.flume.source; import com.google.common.annotations.VisibleForTesting; public class ExecSourceConfigurationConstants { /** * Should the exec'ed command restarted if it dies: : default false */ public static final String CONFIG_RESTART = "restart"; public static final boolean DEFAULT_RESTART = false; /** * Amount of time to wait before attempting a restart: : default 10000 ms */ public static final String CONFIG_RESTART_THROTTLE = "restartThrottle"; public static final long DEFAULT_RESTART_THROTTLE = 10000L; /** * Should stderr from the command be logged: default false */ public static final String CONFIG_LOG_STDERR = "logStdErr"; public static final boolean DEFAULT_LOG_STDERR = false; /** * Number of lines to read at a time */ public static final String CONFIG_BATCH_SIZE = "batchSize"; public static final int DEFAULT_BATCH_SIZE = 20; /** * Amount of time to wait, if the buffer size was not reached, before * to data is pushed downstream: : default 3000 ms */ public static final String CONFIG_BATCH_TIME_OUT = "batchTimeout"; public static final long DEFAULT_BATCH_TIME_OUT = 3000L; /** * Charset for reading input */ public static final String CHARSET = "charset"; public static final String DEFAULT_CHARSET = "UTF-8"; /** * Optional shell/command processor used to run command */ public static final String CONFIG_SHELL = "shell"; /** * 自定义分隔符,默认使用逗号分割 */ public static final String CUSTOM_SPLIT_DELIMITER = "customSplitDelimiter"; public static final String DEFAULT_CUSTOM_SPLIT_DELIMITER = ","; /** * split的开关,默认关闭 */ public static final String CUSTOM_SPLIT_SWITCH_ON = "customSwitchOn"; public static final boolean DEFAULT_CUSTON_SWITCH_ON = false; /** * split后获取哪一列,从0开始,同数组下标 */ public static final String CUSTOM_FETCH_COL = "customFetchCol"; public static final int DEFAULT_CUSTOM_FETCH_COL_ID = 0; }
修改flume-custom.properties
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' hd1.sources=s1 hd1.sources.s1.type=exec hd1.sources.s1.shell=/bin/bash -c #方法一 #hd1.sources.s1.command=tail -F /root/test.log | awk -F ',' '{print $2;fflush()}' #方法二,改源码 hd1.sources.s1.command=tail -F /root/test.log hd1.sources.s1.customSwitchOn=true hd1.sources.s1.customFetchCol=1 hd1.channels=c1 hd1.channels.c1.type=memory hd1.channels.c1.capacity=1000 hd1.channels.c1.transcationCapacity=100 hd1.sinks=sk1 hd1.sinks.sk1.type=logger #把source 和 sink 关联到channel上 #1个source可以对应多个channel(重点) hd1.sources.s1.channels=c1 #一个sink只对应1个sink(重点) hd1.sinks.sk1.channel=c1
做完以上修改 ,重新打包 flume-ng-core模块,把打完的新包,替换掉服务器的apache-flume-1.8.0-bin/lib目录下的flume-ng-core-1.8.0.jar (这个包的版本号,根据你实际的版本去替换)即可,使用上面修改好的 flume-custom.properties启动flume测试即可
上一步改完源码重新打包,flume pom.xml中配置了代码风格检测的插件,需要注释掉这个插件,否则会因为编码风格不一致,打包报错
思考:
- linux的一些常用命令还是要好好掌握,遇到问题慢慢分析,例如:fflush()
- 不要局限在解决问题,要发散,多扩展思路
- 源码其实并没那么难改,有时候限制我们的,可能是我们自己的耐心,细心等,例如:源码编译过程比较耗时,各种找不到包等,很可能让你望而却步了
- 多看官网,多查阅,多和大牛交流