欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

CAT的Client端初始化

程序员文章站 2022-07-15 16:50:38
...

1. Client端主要用来采集各种消息,本文采用官方的例子

  Transaction t = Cat.newTransaction("your transaction type", "your transaction name");
     try {
                 yourBusinessOperation();
                Cat.logEvent("your event type", "your event name", Event.SUCCESS, "keyValuePairs")
                t.setStatus(Transaction.SUCCESS);
     } catch (Exception e) {
            Cat.logError(e);//用log4j记录系统异常,以便在Logview中看到此信息
            t.setStatus(e);
            throw e; 
            	  (CAT所有的API都可以单独使用,也可以组合使用,比如Transaction中嵌套Event或者Metric。)
                  (注意如果这里希望异常继续向上抛,需要继续向上抛出,往往需要抛出异常,让上层应用知道。)
                  (如果认为这个异常在这边可以被吃掉,则不需要在抛出异常。)
     } finally {
           t.complete();
     }

首先需要初始化容器,Cat采用的容器是PlexusContainer,加载配置文件/META-INF/plexus/plexus.xml,初始化完后加载对应的模块CatClientModule,加载META-INF/plexus/components-cat-client.xml对应的配置,各个类的属性通过@Inject标签注入

private static void checkAndInitialize() {
		if (!s_init) {
			synchronized (s_instance) {
				if (!s_init) {
					initialize(new File(getCatHome(), "client.xml"));
					log("WARN", "Cat is lazy initialized!");
					s_init = true;
				}
			}
		}
	}
// this should be called during application initialization time
	public static void initialize(File configFile) {
		PlexusContainer container = ContainerLoader.getDefaultContainer();

		initialize(container, configFile);
	}

	public static void initialize(PlexusContainer container, File configFile) {
		ModuleContext ctx = new DefaultModuleContext(container);
		Module module = ctx.lookup(Module.class, CatClientModule.ID);

		if (!module.isInitialized()) {
			ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);

			ctx.setAttribute("cat-client-config-file", configFile);
			initializer.execute(ctx, module);
		}
	}

初始化毫秒获取器MilliSecondTimer,设置线程监听器CatThreadListener,初始化DefaultMessageProducer,PlainTextMessageCodec编解码器,通过ID = "plain-text"定位接口实现类,初始化DefaultMessageStatistics数据统计类

protected void execute(final ModuleContext ctx) throws Exception {
		ctx.info("Current working directory is " + System.getProperty("user.dir"));

		// initialize milli-second resolution level timer
		MilliSecondTimer.initialize();

		// tracking thread start/stop
		Threads.addListener(new CatThreadListener(ctx));

		// warm up Cat
		Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());

		// bring up TransportManager
		ctx.lookup(TransportManager.class);

		ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
		
		if (clientConfigManager.isCatEnabled()) {
			// start status update task
			StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);

			Threads.forGroup("cat").start(statusUpdateTask);
			LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms

			// MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
			// Threads.forGroup("cat").start(mmapReaderTask);
		}
	}

2. 初始化DefaultClientConfigManager,从"/data/appdatas/cat/client.xml"中加载全局客户端配置文件,从"/META-INF/app.properties"加载工程名,进而组建本地配置文件,如果不存在的话就从"/META-INF/cat/client.xml"加载,最后合并两个文件,并且合并里面的相应属性Server,Domain,Property等。最后生成对应的ClientConfig。

3. 初始化DefaultMessageManager,保存配置的第一个domain和主机名和ip,初始化id生成器MessageIdFactory,创建一个保存当前消息id和时间的文件,用MappedByteBuffer进行一对一映射文件,

public void initialize() throws InitializationException {
		m_domain = m_configManager.getDomain();
		m_hostName = NetworkInterfaceManager.INSTANCE.getLocalHostName();

		if (m_domain.getIp() == null) {
			m_domain.setIp(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());
		}

		// initialize domain and IP address
		try {
			m_factory.initialize(m_domain.getId());
		} catch (IOException e) {
			throw new InitializationException("Error while initializing MessageIdFactory!", e);
		}
	}

4. 初始化DefaultTransportManager,获取对应的Server地址信息,该信息在数据tcp交互时需要

m_tcpSocketSender.setServerAddresses(addresses);
				m_tcpSocketSender.initialize();

5. 初始化TcpSocketSender,设置两个链式阻塞队列LinkedBlockingQueue,m_queue存储普通消息,m_atomicTrees存储原子消息,也就是type以"Cache."开头或者"SQL"类型的消息。

public void initialize() {
		int len = getQueueSize();

		m_queue = new DefaultMessageQueue(len);
		m_atomicTrees = new DefaultMessageQueue(len);

		m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);

		Threads.forGroup("cat").start(this);
		Threads.forGroup("cat").start(m_manager);
		Threads.forGroup("cat").start(new MergeAtomicTask());
	}

初始化tcp通讯管理类ChannelManager,设置netty客户端的相关信息,获取服务端ip端口的信息,当为空的时候才使用之前传过来的serverAddresses信息,

public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses, MessageQueue queue,
			ClientConfigManager configManager, MessageIdFactory idFactory) {
		m_logger = logger;
		m_queue = queue;
		m_configManager = configManager;
		m_idfactory = idFactory;

		EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread t = new Thread(r);
				t.setDaemon(true);
				return t;
			}
		});

		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(group).channel(NioSocketChannel.class);
		bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
		bootstrap.handler(new ChannelInitializer<Channel>() {
			@Override
			protected void initChannel(Channel ch) throws Exception {
			}
		});
		m_bootstrap = bootstrap;

		String serverConfig = loadServerConfig();

		if (StringUtils.isNotEmpty(serverConfig)) {
			List<InetSocketAddress> configedAddresses = parseSocketAddress(serverConfig);
			ChannelHolder holder = initChannel(configedAddresses, serverConfig);

			if (holder != null) {
				m_activeChannelHolder = holder;
			} else {
				m_activeChannelHolder = new ChannelHolder();
				m_activeChannelHolder.setServerAddresses(configedAddresses);
			}
		} else {
			ChannelHolder holder = initChannel(serverAddresses, null);

			if (holder != null) {
				m_activeChannelHolder = holder;
			} else {
				m_activeChannelHolder = new ChannelHolder();
				m_activeChannelHolder.setServerAddresses(serverAddresses);
				m_logger.error("error when init cat module due to error config xml in /data/appdatas/cat/client.xml");
			}
		}
	}

初始化tcp通道信息,创建通道ChannelFuture,保存到ChannelHolder中,并且记录相应的服务列表下标主机名等信息。

private ChannelHolder initChannel(List<InetSocketAddress> addresses, String serverConfig) {
		try {
			int len = addresses.size();

			for (int i = 0; i < len; i++) {
				InetSocketAddress address = addresses.get(i);
				String hostAddress = address.getAddress().getHostAddress();
				ChannelHolder holder = null;

				if (m_activeChannelHolder != null && hostAddress.equals(m_activeChannelHolder.getIp())) {
					holder = new ChannelHolder();
					holder.setActiveFuture(m_activeChannelHolder.getActiveFuture()).setConnectChanged(false);
				} else {
					ChannelFuture future = createChannel(address);

					if (future != null) {
						holder = new ChannelHolder();
						holder.setActiveFuture(future).setConnectChanged(true);
					}
				}
				if (holder != null) {
					holder.setActiveIndex(i).setIp(hostAddress);
					holder.setActiveServerConfig(serverConfig).setServerAddresses(addresses);

					m_logger.info("success when init CAT server, new active holder" + holder.toString());
					return holder;
				}
			}
		} catch (Exception e) {
			m_logger.error(e.getMessage(), e);
		}

		try {
			StringBuilder sb = new StringBuilder();

			for (InetSocketAddress address : addresses) {
				sb.append(address.toString()).append(";");
			}
			m_logger.info("Error when init CAT server " + sb.toString());
		} catch (Exception e) {
			// ignore
		}
		return null;
	}

6. 启动三个守护线程ChannelManager,每10s执行一次,在文件中保存消息ID索引信息,检查服务器列表信息是否有改变,检查当前的通道是否还在存活状态,按顺序链接服务器信息。比如本次链接的是第二个服务器,当这次执行的时候会看看第一个服务器是否能链接成功,是的话结束之前连接,保存新连接。

public void run() {
		while (m_active) {
			// make save message id index asyc
			m_idfactory.saveMark();
			checkServerChanged();

			ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
			List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

			doubleCheckActiveServer(activeFuture);
			reconnectDefaultServer(activeFuture, serverAddresses);

			try {
				Thread.sleep(10 * 1000L); // check every 10 seconds
			} catch (InterruptedException e) {
				// ignore
			}
		}
	}

MergeAtomicTask线程,5s执行一次。当m_atomicTrees队列中的消息最长的已经超过了30s或者长度已经超过200个,就可以把这些消息进行合并处理,保存成一条消息MessageTree,最后放进消息队列m_queue,回收已经合并过的消息id,当消息数量超过队列容量时,会记录相关的溢出数据,每1000次打印一次error日志

public void run() {
			while (true) {
				if (shouldMerge(m_atomicTrees)) {
					MessageTree tree = mergeTree(m_atomicTrees);
					boolean result = m_queue.offer(tree);

					if (!result) {
						logQueueFullInfo(tree);
					}
				} else {
					try {
						Thread.sleep(5);
					} catch (InterruptedException e) {
						break;
					}
				}
			}
		}

TcpSocketSender消息发送线程,检查通道是否正常可写,从消息队列m_queue取出消息,编码并且写入到ByteBuf,写入并且刷新通道,统计消息大小以及数量,当通道不可写时,查看消息队列中的消息生成时间,超过一个小时的清理掉,记录溢出数。

public void run() {
		m_active = true;

		try {
			while (m_active) {
				ChannelFuture channel = m_manager.channel();

				if (channel != null && checkWritable(channel)) {
					try {
						MessageTree tree = m_queue.poll();

						if (tree != null) {
							sendInternal(tree);
							tree.setMessage(null);
						}

					} catch (Throwable t) {
						m_logger.error("Error when sending message over TCP socket!", t);
					}
				} else {
					long current = System.currentTimeMillis();
					long oldTimestamp = current - HOUR;

					while (true) {
						try {
							MessageTree tree = m_queue.peek();

							if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
								MessageTree discradTree = m_queue.poll();

								if (discradTree != null) {
									m_statistics.onOverflowed(discradTree);
								}
							} else {
								break;
							}
						} catch (Exception e) {
							m_logger.error(e.getMessage(), e);
							break;
						}
					}

					TimeUnit.MILLISECONDS.sleep(5);
				}
			}
		} catch (InterruptedException e) {
			// ignore it
			m_active = false;
		}
	}
private void sendInternal(MessageTree tree) {
		ChannelFuture future = m_manager.channel();
		ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

		m_codec.encode(tree, buf);

		int size = buf.readableBytes();
		Channel channel = future.channel();

		channel.writeAndFlush(buf);
		if (m_statistics != null) {
			m_statistics.onBytes(size);
		}
	}

7. 启动守护线程StatusUpdateTask,定时发送心跳消息和本机的线程内存JVM等信息。

public void run() {
		// try to wait cat client init success
		try {
			Thread.sleep(10 * 1000);
		} catch (InterruptedException e) {
			return;
		}

		while (true) {
			Calendar cal = Calendar.getInstance();
			int second = cal.get(Calendar.SECOND);

			// try to avoid send heartbeat at 59-01 second
			if (second < 2 || second > 58) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					// ignore it
				}
			} else {
				break;
			}
		}

		try {
			buildClasspath();
		} catch (Exception e) {
			e.printStackTrace();
		}
		MessageProducer cat = Cat.getProducer();
		Transaction reboot = cat.newTransaction("System", "Reboot");

		reboot.setStatus(Message.SUCCESS);
		cat.logEvent("Reboot", NetworkInterfaceManager.INSTANCE.getLocalHostAddress(), Message.SUCCESS, null);
		reboot.complete();

		while (m_active) {
			long start = MilliSecondTimer.currentTimeMillis();

			if (m_manager.isCatEnabled()) {
				Transaction t = cat.newTransaction("System", "Status");
				Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);
				StatusInfo status = new StatusInfo();

				t.addData("dumpLocked", m_manager.isDumpLocked());
				try {
					StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);

					status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));

					buildExtensionData(status);
					h.addData(status.toString());
					h.setStatus(Message.SUCCESS);
				} catch (Throwable e) {
					h.setStatus(e);
					cat.logError(e);
				} finally {
					h.complete();
				}
				t.setStatus(Message.SUCCESS);
				t.complete();
			}
			long elapsed = MilliSecondTimer.currentTimeMillis() - start;

			if (elapsed < m_interval) {
				try {
					Thread.sleep(m_interval - elapsed);
				} catch (InterruptedException e) {
					break;
				}
			}
		}
	}

 

相关标签: CAT