Flume-ng 1.6启动过程源码分析(二)
程序员文章站
2022-05-12 10:13:38
...
阅读Flume源码后发现,Flume有两个*的接口:
1. ConfigurationProvider接口,提供了getConfiguration()方法,用于获取不同组件的配置。
2. LifecycleAware接口,提供了三个方法,start() stop()和getLifecycleState(),分别用于组件的启动 停止以及组件在生命周期中处的状态,可以说这个接口贯穿于整个Flume中。
继续Flume-ng启动过程的源码分析,Flume启动类org.apache.flume.node.Application,所有组件加载完毕后会调用start方法。下面的代码中可以看到,start方法会遍历所有组件,并调用类LifecycleSupervisor的supervise方法
接下来supervisor再被实例化LifecycleSupervisor对象的时候会做哪些工作呢?
即new LifecycleSupervisor(),构造函数做了什么
主要初始化了一个有10个线程,上限是20的线程池
再回到之前supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState)方法,它是具体执行启动各个组件的方法
首先会把组件的渴望的状态desiredState(如LifecycleState.START)和策略policy(如new SupervisorPolicy.AlwaysRestartPolicy())放到静态内部类Supervisoree中,再将组件lifecycleAware及之前的信息,一并传值到MonitorRunnable线程中,下面线程类MonitorRunnable的run方法会根据switch case匹配的值去执行不同操作,当前回到START流程,执行lifecycleAware.start(),完成组件的启动
接下来再调用lifecycleAware.start()后,会根据组件的不同,找到对应组件的Runner类,这里以Source中的PollableSourceRunner为例,Flume Source提供了两种类别的Source即:EventDrivenSource和PollableSource,EventDrivenSource为事件驱动,PollableSource则是轮询机制,PollableSource类型的启动依赖于PollableSourceRunner,在PollableSourceRunner内部会有一个用于轮询的类
PollingRunner,在执行的过程中会根据PollableSource类型特有的source.process()返回值的状态,进行Sleep
1. ConfigurationProvider接口,提供了getConfiguration()方法,用于获取不同组件的配置。
2. LifecycleAware接口,提供了三个方法,start() stop()和getLifecycleState(),分别用于组件的启动 停止以及组件在生命周期中处的状态,可以说这个接口贯穿于整个Flume中。
继续Flume-ng启动过程的源码分析,Flume启动类org.apache.flume.node.Application,所有组件加载完毕后会调用start方法。下面的代码中可以看到,start方法会遍历所有组件,并调用类LifecycleSupervisor的supervise方法
public synchronized void start() { for(LifecycleAware component : components) { //supervisor作为LifecycleSupervisor的实例,调用supervise supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
接下来supervisor再被实例化LifecycleSupervisor对象的时候会做哪些工作呢?
即new LifecycleSupervisor(),构造函数做了什么
public LifecycleSupervisor() { lifecycleState = LifecycleState.IDLE; supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>(); monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>(); monitorService = new ScheduledThreadPoolExecutor(10, new ThreadFactoryBuilder().setNameFormat( "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d") .build()); monitorService.setMaximumPoolSize(20); monitorService.setKeepAliveTime(30, TimeUnit.SECONDS); purger = new Purger(); needToPurge = false; }
主要初始化了一个有10个线程,上限是20的线程池
再回到之前supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState)方法,它是具体执行启动各个组件的方法
public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) { if(this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()){ throw new FlumeException("Supervise called on " + lifecycleAware + " " + "after shutdown has been initiated. " + lifecycleAware + " will not" + " be started"); } Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware), "Refusing to supervise " + lifecycleAware + " more than once"); if (logger.isDebugEnabled()) { logger.debug("Supervising service:{} policy:{} desiredState:{}", new Object[] { lifecycleAware, policy, desiredState }); } Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future); }
首先会把组件的渴望的状态desiredState(如LifecycleState.START)和策略policy(如new SupervisorPolicy.AlwaysRestartPolicy())放到静态内部类Supervisoree中,再将组件lifecycleAware及之前的信息,一并传值到MonitorRunnable线程中,下面线程类MonitorRunnable的run方法会根据switch case匹配的值去执行不同操作,当前回到START流程,执行lifecycleAware.start(),完成组件的启动
@Override public void run() { logger.debug("checking process:{} supervisoree:{}", lifecycleAware, supervisoree); long now = System.currentTimeMillis(); try { if (supervisoree.status.firstSeen == null) { logger.debug("first time seeing {}", lifecycleAware); supervisoree.status.firstSeen = now; } supervisoree.status.lastSeen = now; synchronized (lifecycleAware) { if (supervisoree.status.discard) { // Unsupervise has already been called on this. logger.info("Component has already been stopped {}", lifecycleAware); return; } else if (supervisoree.status.error) { logger.info("Component {} is in error state, and Flume will not" + "attempt to change its state", lifecycleAware); return; } supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { logger.debug("Want to transition {} from {} to {} (failures:{})", new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start(); } catch (Throwable e) { logger.error("Unable to start " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { // This component can never recover, shut it down. supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); logger.warn("Component {} stopped, since it could not be" + "successfully started due to missing dependencies", lifecycleAware); } catch (Throwable e1) { logger.error("Unsuccessful attempt to " + "shutdown component: {} due to missing dependencies." + " Please shutdown the agent" + "or disable this component, or the agent will be" + "in an undefined state.", e1); supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } // Set the state to stop, so that the conf poller can // proceed. } } supervisoree.status.failures++; } break; case STOP: try { lifecycleAware.stop(); } catch (Throwable e) { logger.error("Unable to stop " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); } if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { logger.error( "Policy {} of {} has been violated - supervisor should exit!", supervisoree.policy, lifecycleAware); } } } } catch(Throwable t) { logger.error("Unexpected error", t); } logger.debug("Status check complete"); }
接下来再调用lifecycleAware.start()后,会根据组件的不同,找到对应组件的Runner类,这里以Source中的PollableSourceRunner为例,Flume Source提供了两种类别的Source即:EventDrivenSource和PollableSource,EventDrivenSource为事件驱动,PollableSource则是轮询机制,PollableSource类型的启动依赖于PollableSourceRunner,在PollableSourceRunner内部会有一个用于轮询的类
PollingRunner,在执行的过程中会根据PollableSource类型特有的source.process()返回值的状态,进行Sleep
public void run() { logger.debug("Polling runner starting. Source:{}", source); while (!shouldStop.get()) { counterGroup.incrementAndGet("runner.polls"); try { if (source.process().equals(PollableSource.Status.BACKOFF)) { counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * backoffSleepIncrement, maxBackoffSleep)); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (InterruptedException e) { logger.info("Source runner interrupted. Exiting"); counterGroup.incrementAndGet("runner.interruptions"); } catch (EventDeliveryException e) { logger.error("Unable to deliver event. Exception follows.", e); counterGroup.incrementAndGet("runner.deliveryErrors"); } catch (Exception e) { counterGroup.incrementAndGet("runner.errors"); logger.error("Unhandled exception, logging and sleeping for " + maxBackoffSleep + "ms", e); try { Thread.sleep(maxBackoffSleep); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } logger.debug("Polling runner exiting. Metrics:{}", counterGroup); }
推荐阅读
-
分布式定时任务(xxl-job执行器的启动过程源码分析)
-
Tomcat源码分析 (九)----- HTTP请求处理过程(二)
-
Tomcat源码分析三:Tomcat启动加载过程(一)的源码解析
-
Tomcat源码分析 (六)----- Tomcat 启动过程(一)
-
二 分析easyswoole源码(启动服务)
-
Bootstrap初始化过程源码分析--netty客户端的启动
-
Flutter Android启动源码分析(二)
-
死磕 java集合之TreeMap源码分析(二)- 内含红黑树分析全过程
-
Disconf源码分析之启动过程分析上(1)
-
Tomcat源码分析 (九)----- HTTP请求处理过程(二)