CAT的Server初始化
1. Server初始化从web.xml文件开始,作为一个war包项目,首先需要初始化Servlet,首先是CatServlet专门初始化cat相关的server程序,比如接受客户端传过来的数据等等,另一个servlet为MVC专门提供数据查询接口的普通的MVC功能,功能相当于缩小版的SpringMVC。过滤器有两个CatFilter最主要是做一些过滤处理,每次请求经过ENVIRONMENT,ID_SETUP,LOG_CLIENT_PAYLOAD,LOG_SPAN的handle方法,打印一些日志,统计一些信息。DomainFilter主要是对domain进行处理。
2. CatServlet初始化,首先执行父类AbstractContainerServlet的init方法,这是Servlet统一的初始化方法,初始化上下文IOC容器PlexusContainer,在CatServlet中开始准备初始化各模块,在上下文中保存客户端和服务端文件路径,后文会用到。
protected void initComponents(ServletConfig servletConfig) throws ServletException {
try {
ModuleContext ctx = new DefaultModuleContext(getContainer());
ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");
File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");
ctx.setAttribute("cat-client-config-file", clientXmlFile);
ctx.setAttribute("cat-server-config-file", serverXmlFile);
initializer.execute(ctx);
} catch (Exception e) {
m_exception = e;
System.err.println(e);
throw new ServletException(e);
}
}
执行DefaultModuleInitializer#execute方法,先执行各模块的setup方法,只有CatHomeModule实现了该方法
private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {
if (modules != null) {
for (Module module : modules) {
if (module != null && !all.contains(module)) {
if (module instanceof AbstractModule) {
((AbstractModule) module).setup(ctx);
}
expandAll(ctx, module.getDependencies(ctx), all);
all.add(module);
}
}
}
}
判断各模块初始化状态,未初始化的进行初始化处理
expandAll(ctx, modules, all);
for (Module module : all) {
if (!module.isInitialized()) {
executeModule(ctx, module, m_index++);
}
}
初始化各个模块,设置初始化状态,AbstractModule#=>AbstractModule#execute,只有CatClientModule和CatHomeModule有具体的执行内容,因为CatClientModule的执行内容前文已经介绍过,可以参考前文。
private synchronized void executeModule(ModuleContext ctx, Module module, int index) throws Exception {
long start = System.currentTimeMillis();
// set flag to avoid re-entrance
module.setInitialized(true);
info(ctx, index + " ------ " + module.getClass().getName());
// execute itself after its dependencies
module.initialize(ctx);
long end = System.currentTimeMillis();
info(ctx, index + " ------ " + module.getClass().getName() + " DONE in " + (end - start) + " ms.");
}
3. 至此主要就是解析CatHomeModule类,首先是它的setup方法,根据之前设置到上下文的服务器文件来初始化ServerConfigManager,初始化TcpSocketReceiver主要用于和客户端之前进行消息通讯,也就是充当netty的服务端,设置服务结束时的关闭钩子,销毁netty的线程池通道,关闭通道等等。
protected void setup(ModuleContext ctx) throws Exception {
if (!isInitialized()) {
File serverConfigFile = ctx.getAttribute("cat-server-config-file");
ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);
serverConfigManager.initialize(serverConfigFile);
messageReceiver.init();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
messageReceiver.destory();
}
});
}
}
netty的服务端设置,端口号默认是2280,根据系统选择不同的线程执行方式,设置主线程池m_bossGroup也就是NIO中TCP连接器Selector的作用,保持和客户端之间的通道链接处理,m_workerGroup主要是处理具体的数据的线程池,具体可以参考之前关于Netty的相关文章。
public synchronized void startServer(int port) throws InterruptedException {
boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
int threads = 24;
ServerBootstrap bootstrap = new ServerBootstrap();
m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
bootstrap.group(m_bossGroup, m_workerGroup);
bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode", new MessageDecoder());
}
});
bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
try {
m_future = bootstrap.bind(port).sync();
m_logger.info("start netty server!");
} catch (Exception e) {
m_logger.error("Started Netty Server Failed:" + port, e);
}
}
4. 当消息从客户端发送的时候需要对数据进行编码处理,m_codec.encode(tree, buf);内存块的前四个字节为消息的总长度,先获取当前可写的下标,然后写入长度占位数字0,开始给消息头编码,统计长度
public void encode(MessageTree tree, ByteBuf buf) {
int count = 0;
int index = buf.writerIndex();
buf.writeInt(0); // place-holder
count += encodeHeader(tree, buf);
if (tree.getMessage() != null) {
count += encodeMessage(tree.getMessage(), buf);
}
buf.setInt(index, count);
}
给消息体编码,这里以主要的Transaction类型消息距离,当本次线程只发送一个消息时也就是子消息列表为空,会设置type为A,如果有两个消息的话就会一次设置t,A,T三个type,不同的type也对应不同的encodeLine处理,增加消息时间等等。最后就是统计长度,最后统一覆盖到之前的占位下标。(消息类型Transaction,Event,Trace,Metric,Heartbeat)
public int encodeMessage(Message message, ByteBuf buf) {
if (message instanceof Transaction) {
Transaction transaction = (Transaction) message;
List<Message> children = transaction.getChildren();
if (children.isEmpty()) {
return encodeLine(transaction, buf, 'A', Policy.WITH_DURATION);
} else {
int count = 0;
int len = children.size();
count += encodeLine(transaction, buf, 't', Policy.WITHOUT_STATUS);
for (int i = 0; i < len; i++) {
Message child = children.get(i);
if (child != null) {
count += encodeMessage(child, buf);
}
}
count += encodeLine(transaction, buf, 'T', Policy.WITH_DURATION);
return count;
}
} else if (message instanceof Event) {
return encodeLine(message, buf, 'E', Policy.DEFAULT);
} else if (message instanceof Trace) {
return encodeLine(message, buf, 'L', Policy.DEFAULT);
} else if (message instanceof Metric) {
return encodeLine(message, buf, 'M', Policy.DEFAULT);
} else if (message instanceof Heartbeat) {
return encodeLine(message, buf, 'H', Policy.DEFAULT);
} else {
throw new RuntimeException(String.format("Unsupported message type: %s.", message));
}
}
同样,数据流到达服务端的时候就需要进行解码处理,pipeline.addLast("decode", new MessageDecoder());解码操作在PlainTextMessageCodec#decode,先解码消息头,然后判断是否有可读字节,
public void decode(ByteBuf buf, MessageTree tree) {
Context ctx = m_ctx.get().setBuffer(buf);
decodeHeader(ctx, tree);
if (buf.readableBytes() > 0) {
decodeMessage(ctx, tree);
}
}
先解析出父消息,这个时候前面设置的type就会起到分支的作用,一个消息直接走'A',两个消息依次走t,A,T,刚好最开始放入栈中的消息为空case 't':stack.push(parent);这个刚好是退出while循环的条件,case 'A':parent.addChild(tran);case 'T':return stack.pop();同时把该设置的子消息设置到父消息中,解析m_durationInMicro时需要减2是因为编码的时候添加了"us"字符串。具体方法在PlainTextMessageCodec#decodeLine中。完整的消息DefaultMessageTree就这样解码完成。
protected void decodeMessage(Context ctx, MessageTree tree) {
Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>();
Message parent = decodeLine(ctx, null, stack);
tree.setMessage(parent);
while (ctx.getBuffer().readableBytes() > 0) {
Message message = decodeLine(ctx, (DefaultTransaction) parent, stack);
if (message instanceof DefaultTransaction) {
parent = message;
} else {
break;
}
}
}
当然在解码之前,会对数据的完整性进行校验,因为每条消息都会有个四字节的长度,所以可读字节数不能小于4,mark暂存数据块的读下标,开始读取消息长度,读下标往后移动了四个位置,现在重置回读取之前的位置reset也就是往前移动了四位,继续校验可读字节数,最后读取消息内容数据块进行解码。解码完成后把数据块保存在DefaultMessageTree中,记录执行数量m_processCount以及其他的统计值,最后开始进行消息处理。
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (buffer.readableBytes() < 4) {
return;
}
buffer.markReaderIndex();
int length = buffer.readInt();
buffer.resetReaderIndex();
if (buffer.readableBytes() < length + 4) {
return;
}
try {
if (length > 0) {
ByteBuf readBytes = buffer.readBytes(length + 4);
readBytes.markReaderIndex();
readBytes.readInt();
DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);
readBytes.resetReaderIndex();
tree.setBuffer(readBytes);
m_handler.handle(tree);
m_processCount++;
long flag = m_processCount % CatConstants.SUCCESS_COUNT;
if (flag == 0) {
m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);
}
} else {
// client message is error
buffer.readBytes(length);
}
} catch (Exception e) {
m_serverStateManager.addMessageTotalLoss(1);
m_logger.error(e.getMessage(), e);
}
}
5. 执行CatHomeModule#execute方法,初始化消息处理类RealtimeConsumer,初始化配置更新守护线程ConfigReloadTask并且进行启动,根据服务配置类ServerConfigManager的相关配置启动其他的几个守护线程。
public void run() {
boolean active = true;
while (active) {
try {
m_productLineConfigManager.refreshConfig();
} catch (Exception e) {
Cat.logError(e);
}
try {
m_metricConfigManager.refreshConfig();
} catch (Exception e) {
Cat.logError(e);
}
Transaction t = Cat.newTransaction("ReloadConfig", "router");
try {
m_routerConfigManager.refreshConfig();
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
Cat.logError(e);
t.setStatus(e);
} finally {
t.complete();
}
try {
m_blackListManager.refreshConfig();
} catch (Exception e) {
Cat.logError(e);
}
try {
m_allTransactionConfigManager.refreshConfig();
} catch (Exception e) {
Cat.logError(e);
}
try {
Thread.sleep(TimeHelper.ONE_MINUTE);
} catch (InterruptedException e) {
active = false;
}
}
}
最后设置服务关闭钩子MessageConsumer#doCheckpoint做一些消息处理的结尾工作,在文件中保存消息数据等等。
推荐阅读
-
在Vultr VPS主机上安装使用Windows Server的教程及评测
-
服务器变量 $_SERVER 的深入解析
-
sql server 2005 在 windows7 下的安装教程
-
C#中序列化实现深拷贝,实现DataGridView初始化刷新的方法
-
SQL Server中的XML数据进行insert、update、delete操作实现代码
-
SQL Server 数据库清除日志的方法
-
SQL Server CROSS APPLY和OUTER APPLY的应用详解
-
SQL Server2005打开数据表中的XML内容时报错的解决办法
-
Sql Server 2005中查询用分隔符分割的内容中是否包含其中一个内容
-
如何控制SQL Server中的跟踪标记