CAT的Client端初始化
1. Client端主要用来采集各种消息,本文采用官方的例子
Transaction t = Cat.newTransaction("your transaction type", "your transaction name");
try {
yourBusinessOperation();
Cat.logEvent("your event type", "your event name", Event.SUCCESS, "keyValuePairs")
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
Cat.logError(e);//用log4j记录系统异常,以便在Logview中看到此信息
t.setStatus(e);
throw e;
(CAT所有的API都可以单独使用,也可以组合使用,比如Transaction中嵌套Event或者Metric。)
(注意如果这里希望异常继续向上抛,需要继续向上抛出,往往需要抛出异常,让上层应用知道。)
(如果认为这个异常在这边可以被吃掉,则不需要在抛出异常。)
} finally {
t.complete();
}
首先需要初始化容器,Cat采用的容器是PlexusContainer,加载配置文件/META-INF/plexus/plexus.xml,初始化完后加载对应的模块CatClientModule,加载META-INF/plexus/components-cat-client.xml对应的配置,各个类的属性通过@Inject标签注入
private static void checkAndInitialize() {
if (!s_init) {
synchronized (s_instance) {
if (!s_init) {
initialize(new File(getCatHome(), "client.xml"));
log("WARN", "Cat is lazy initialized!");
s_init = true;
}
}
}
}
// this should be called during application initialization time
public static void initialize(File configFile) {
PlexusContainer container = ContainerLoader.getDefaultContainer();
initialize(container, configFile);
}
public static void initialize(PlexusContainer container, File configFile) {
ModuleContext ctx = new DefaultModuleContext(container);
Module module = ctx.lookup(Module.class, CatClientModule.ID);
if (!module.isInitialized()) {
ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
ctx.setAttribute("cat-client-config-file", configFile);
initializer.execute(ctx, module);
}
}
初始化毫秒获取器MilliSecondTimer,设置线程监听器CatThreadListener,初始化DefaultMessageProducer,PlainTextMessageCodec编解码器,通过ID = "plain-text"定位接口实现类,初始化DefaultMessageStatistics数据统计类
protected void execute(final ModuleContext ctx) throws Exception {
ctx.info("Current working directory is " + System.getProperty("user.dir"));
// initialize milli-second resolution level timer
MilliSecondTimer.initialize();
// tracking thread start/stop
Threads.addListener(new CatThreadListener(ctx));
// warm up Cat
Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());
// bring up TransportManager
ctx.lookup(TransportManager.class);
ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
Threads.forGroup("cat").start(statusUpdateTask);
LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms
// MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
// Threads.forGroup("cat").start(mmapReaderTask);
}
}
2. 初始化DefaultClientConfigManager,从"/data/appdatas/cat/client.xml"中加载全局客户端配置文件,从"/META-INF/app.properties"加载工程名,进而组建本地配置文件,如果不存在的话就从"/META-INF/cat/client.xml"加载,最后合并两个文件,并且合并里面的相应属性Server,Domain,Property等。最后生成对应的ClientConfig。
3. 初始化DefaultMessageManager,保存配置的第一个domain和主机名和ip,初始化id生成器MessageIdFactory,创建一个保存当前消息id和时间的文件,用MappedByteBuffer进行一对一映射文件,
public void initialize() throws InitializationException {
m_domain = m_configManager.getDomain();
m_hostName = NetworkInterfaceManager.INSTANCE.getLocalHostName();
if (m_domain.getIp() == null) {
m_domain.setIp(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());
}
// initialize domain and IP address
try {
m_factory.initialize(m_domain.getId());
} catch (IOException e) {
throw new InitializationException("Error while initializing MessageIdFactory!", e);
}
}
4. 初始化DefaultTransportManager,获取对应的Server地址信息,该信息在数据tcp交互时需要
m_tcpSocketSender.setServerAddresses(addresses);
m_tcpSocketSender.initialize();
5. 初始化TcpSocketSender,设置两个链式阻塞队列LinkedBlockingQueue,m_queue存储普通消息,m_atomicTrees存储原子消息,也就是type以"Cache."开头或者"SQL"类型的消息。
public void initialize() {
int len = getQueueSize();
m_queue = new DefaultMessageQueue(len);
m_atomicTrees = new DefaultMessageQueue(len);
m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);
Threads.forGroup("cat").start(this);
Threads.forGroup("cat").start(m_manager);
Threads.forGroup("cat").start(new MergeAtomicTask());
}
初始化tcp通讯管理类ChannelManager,设置netty客户端的相关信息,获取服务端ip端口的信息,当为空的时候才使用之前传过来的serverAddresses信息,
public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses, MessageQueue queue,
ClientConfigManager configManager, MessageIdFactory idFactory) {
m_logger = logger;
m_queue = queue;
m_configManager = configManager;
m_idfactory = idFactory;
EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
}
});
m_bootstrap = bootstrap;
String serverConfig = loadServerConfig();
if (StringUtils.isNotEmpty(serverConfig)) {
List<InetSocketAddress> configedAddresses = parseSocketAddress(serverConfig);
ChannelHolder holder = initChannel(configedAddresses, serverConfig);
if (holder != null) {
m_activeChannelHolder = holder;
} else {
m_activeChannelHolder = new ChannelHolder();
m_activeChannelHolder.setServerAddresses(configedAddresses);
}
} else {
ChannelHolder holder = initChannel(serverAddresses, null);
if (holder != null) {
m_activeChannelHolder = holder;
} else {
m_activeChannelHolder = new ChannelHolder();
m_activeChannelHolder.setServerAddresses(serverAddresses);
m_logger.error("error when init cat module due to error config xml in /data/appdatas/cat/client.xml");
}
}
}
初始化tcp通道信息,创建通道ChannelFuture,保存到ChannelHolder中,并且记录相应的服务列表下标主机名等信息。
private ChannelHolder initChannel(List<InetSocketAddress> addresses, String serverConfig) {
try {
int len = addresses.size();
for (int i = 0; i < len; i++) {
InetSocketAddress address = addresses.get(i);
String hostAddress = address.getAddress().getHostAddress();
ChannelHolder holder = null;
if (m_activeChannelHolder != null && hostAddress.equals(m_activeChannelHolder.getIp())) {
holder = new ChannelHolder();
holder.setActiveFuture(m_activeChannelHolder.getActiveFuture()).setConnectChanged(false);
} else {
ChannelFuture future = createChannel(address);
if (future != null) {
holder = new ChannelHolder();
holder.setActiveFuture(future).setConnectChanged(true);
}
}
if (holder != null) {
holder.setActiveIndex(i).setIp(hostAddress);
holder.setActiveServerConfig(serverConfig).setServerAddresses(addresses);
m_logger.info("success when init CAT server, new active holder" + holder.toString());
return holder;
}
}
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
}
try {
StringBuilder sb = new StringBuilder();
for (InetSocketAddress address : addresses) {
sb.append(address.toString()).append(";");
}
m_logger.info("Error when init CAT server " + sb.toString());
} catch (Exception e) {
// ignore
}
return null;
}
6. 启动三个守护线程ChannelManager,每10s执行一次,在文件中保存消息ID索引信息,检查服务器列表信息是否有改变,检查当前的通道是否还在存活状态,按顺序链接服务器信息。比如本次链接的是第二个服务器,当这次执行的时候会看看第一个服务器是否能链接成功,是的话结束之前连接,保存新连接。
public void run() {
while (m_active) {
// make save message id index asyc
m_idfactory.saveMark();
checkServerChanged();
ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();
doubleCheckActiveServer(activeFuture);
reconnectDefaultServer(activeFuture, serverAddresses);
try {
Thread.sleep(10 * 1000L); // check every 10 seconds
} catch (InterruptedException e) {
// ignore
}
}
}
MergeAtomicTask线程,5s执行一次。当m_atomicTrees队列中的消息最长的已经超过了30s或者长度已经超过200个,就可以把这些消息进行合并处理,保存成一条消息MessageTree,最后放进消息队列m_queue,回收已经合并过的消息id,当消息数量超过队列容量时,会记录相关的溢出数据,每1000次打印一次error日志
public void run() {
while (true) {
if (shouldMerge(m_atomicTrees)) {
MessageTree tree = mergeTree(m_atomicTrees);
boolean result = m_queue.offer(tree);
if (!result) {
logQueueFullInfo(tree);
}
} else {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
break;
}
}
}
}
TcpSocketSender消息发送线程,检查通道是否正常可写,从消息队列m_queue取出消息,编码并且写入到ByteBuf,写入并且刷新通道,统计消息大小以及数量,当通道不可写时,查看消息队列中的消息生成时间,超过一个小时的清理掉,记录溢出数。
public void run() {
m_active = true;
try {
while (m_active) {
ChannelFuture channel = m_manager.channel();
if (channel != null && checkWritable(channel)) {
try {
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
} else {
long current = System.currentTimeMillis();
long oldTimestamp = current - HOUR;
while (true) {
try {
MessageTree tree = m_queue.peek();
if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
MessageTree discradTree = m_queue.poll();
if (discradTree != null) {
m_statistics.onOverflowed(discradTree);
}
} else {
break;
}
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
break;
}
}
TimeUnit.MILLISECONDS.sleep(5);
}
}
} catch (InterruptedException e) {
// ignore it
m_active = false;
}
}
private void sendInternal(MessageTree tree) {
ChannelFuture future = m_manager.channel();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K
m_codec.encode(tree, buf);
int size = buf.readableBytes();
Channel channel = future.channel();
channel.writeAndFlush(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}
7. 启动守护线程StatusUpdateTask,定时发送心跳消息和本机的线程内存JVM等信息。
public void run() {
// try to wait cat client init success
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
return;
}
while (true) {
Calendar cal = Calendar.getInstance();
int second = cal.get(Calendar.SECOND);
// try to avoid send heartbeat at 59-01 second
if (second < 2 || second > 58) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore it
}
} else {
break;
}
}
try {
buildClasspath();
} catch (Exception e) {
e.printStackTrace();
}
MessageProducer cat = Cat.getProducer();
Transaction reboot = cat.newTransaction("System", "Reboot");
reboot.setStatus(Message.SUCCESS);
cat.logEvent("Reboot", NetworkInterfaceManager.INSTANCE.getLocalHostAddress(), Message.SUCCESS, null);
reboot.complete();
while (m_active) {
long start = MilliSecondTimer.currentTimeMillis();
if (m_manager.isCatEnabled()) {
Transaction t = cat.newTransaction("System", "Status");
Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);
StatusInfo status = new StatusInfo();
t.addData("dumpLocked", m_manager.isDumpLocked());
try {
StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);
status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));
buildExtensionData(status);
h.addData(status.toString());
h.setStatus(Message.SUCCESS);
} catch (Throwable e) {
h.setStatus(e);
cat.logError(e);
} finally {
h.complete();
}
t.setStatus(Message.SUCCESS);
t.complete();
}
long elapsed = MilliSecondTimer.currentTimeMillis() - start;
if (elapsed < m_interval) {
try {
Thread.sleep(m_interval - elapsed);
} catch (InterruptedException e) {
break;
}
}
}
}