欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

CAT的Server初始化

程序员文章站 2022-07-15 16:50:20
...

1. Server初始化从web.xml文件开始,作为一个war包项目,首先需要初始化Servlet,首先是CatServlet专门初始化cat相关的server程序,比如接受客户端传过来的数据等等,另一个servlet为MVC专门提供数据查询接口的普通的MVC功能,功能相当于缩小版的SpringMVC。过滤器有两个CatFilter最主要是做一些过滤处理,每次请求经过ENVIRONMENT,ID_SETUP,LOG_CLIENT_PAYLOAD,LOG_SPAN的handle方法,打印一些日志,统计一些信息。DomainFilter主要是对domain进行处理。

2. CatServlet初始化,首先执行父类AbstractContainerServlet的init方法,这是Servlet统一的初始化方法,初始化上下文IOC容器PlexusContainer,在CatServlet中开始准备初始化各模块,在上下文中保存客户端和服务端文件路径,后文会用到。

protected void initComponents(ServletConfig servletConfig) throws ServletException {
		try {
			ModuleContext ctx = new DefaultModuleContext(getContainer());
			ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
			File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");
			File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");

			ctx.setAttribute("cat-client-config-file", clientXmlFile);
			ctx.setAttribute("cat-server-config-file", serverXmlFile);
			initializer.execute(ctx);
		} catch (Exception e) {
			m_exception = e;
			System.err.println(e);
			throw new ServletException(e);
		}
	}
执行DefaultModuleInitializer#execute方法,先执行各模块的setup方法,只有CatHomeModule实现了该方法
private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {
      if (modules != null) {
         for (Module module : modules) {
            if (module != null && !all.contains(module)) {
               if (module instanceof AbstractModule) {
                  ((AbstractModule) module).setup(ctx);
               }

               expandAll(ctx, module.getDependencies(ctx), all);

               all.add(module);
            }
         }
      }
   }

判断各模块初始化状态,未初始化的进行初始化处理

 expandAll(ctx, modules, all);

         for (Module module : all) {
            if (!module.isInitialized()) {
               executeModule(ctx, module, m_index++);
            }
         }

初始化各个模块,设置初始化状态,AbstractModule#=>AbstractModule#execute,只有CatClientModule和CatHomeModule有具体的执行内容,因为CatClientModule的执行内容前文已经介绍过,可以参考前文。

private synchronized void executeModule(ModuleContext ctx, Module module, int index) throws Exception {
      long start = System.currentTimeMillis();

      // set flag to avoid re-entrance
      module.setInitialized(true);

      info(ctx, index + " ------ " + module.getClass().getName());

      // execute itself after its dependencies
      module.initialize(ctx);

      long end = System.currentTimeMillis();
      info(ctx, index + " ------ " + module.getClass().getName() + " DONE in " + (end - start) + " ms.");
   }

3. 至此主要就是解析CatHomeModule类,首先是它的setup方法,根据之前设置到上下文的服务器文件来初始化ServerConfigManager,初始化TcpSocketReceiver主要用于和客户端之前进行消息通讯,也就是充当netty的服务端,设置服务结束时的关闭钩子,销毁netty的线程池通道,关闭通道等等。

protected void setup(ModuleContext ctx) throws Exception {
		if (!isInitialized()) {
			File serverConfigFile = ctx.getAttribute("cat-server-config-file");
			ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
			final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);

			serverConfigManager.initialize(serverConfigFile);
			messageReceiver.init();

			Runtime.getRuntime().addShutdownHook(new Thread() {
				@Override
				public void run() {
					messageReceiver.destory();
				}
			});
		}
	}

netty的服务端设置,端口号默认是2280,根据系统选择不同的线程执行方式,设置主线程池m_bossGroup也就是NIO中TCP连接器Selector的作用,保持和客户端之间的通道链接处理,m_workerGroup主要是处理具体的数据的线程池,具体可以参考之前关于Netty的相关文章。

public synchronized void startServer(int port) throws InterruptedException {
		boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
		int threads = 24;
		ServerBootstrap bootstrap = new ServerBootstrap();

		m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
		m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
		bootstrap.group(m_bossGroup, m_workerGroup);
		bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				ChannelPipeline pipeline = ch.pipeline();

				pipeline.addLast("decode", new MessageDecoder());
			}
		});

		bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
		bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
		bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
		bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

		try {
			m_future = bootstrap.bind(port).sync();
			m_logger.info("start netty server!");
		} catch (Exception e) {
			m_logger.error("Started Netty Server Failed:" + port, e);
		}
	}

4. 当消息从客户端发送的时候需要对数据进行编码处理,m_codec.encode(tree, buf);内存块的前四个字节为消息的总长度,先获取当前可写的下标,然后写入长度占位数字0,开始给消息头编码,统计长度

public void encode(MessageTree tree, ByteBuf buf) {
		int count = 0;
		int index = buf.writerIndex();

		buf.writeInt(0); // place-holder
		count += encodeHeader(tree, buf);

		if (tree.getMessage() != null) {
			count += encodeMessage(tree.getMessage(), buf);
		}

		buf.setInt(index, count);
	}

给消息体编码,这里以主要的Transaction类型消息距离,当本次线程只发送一个消息时也就是子消息列表为空,会设置type为A,如果有两个消息的话就会一次设置t,A,T三个type,不同的type也对应不同的encodeLine处理,增加消息时间等等。最后就是统计长度,最后统一覆盖到之前的占位下标。(消息类型Transaction,Event,Trace,Metric,Heartbeat)

public int encodeMessage(Message message, ByteBuf buf) {
		if (message instanceof Transaction) {
			Transaction transaction = (Transaction) message;
			List<Message> children = transaction.getChildren();

			if (children.isEmpty()) {
				return encodeLine(transaction, buf, 'A', Policy.WITH_DURATION);
			} else {
				int count = 0;
				int len = children.size();

				count += encodeLine(transaction, buf, 't', Policy.WITHOUT_STATUS);

				for (int i = 0; i < len; i++) {
					Message child = children.get(i);

					if (child != null) {
						count += encodeMessage(child, buf);
					}
				}

				count += encodeLine(transaction, buf, 'T', Policy.WITH_DURATION);

				return count;
			}
		} else if (message instanceof Event) {
			return encodeLine(message, buf, 'E', Policy.DEFAULT);
		} else if (message instanceof Trace) {
			return encodeLine(message, buf, 'L', Policy.DEFAULT);
		} else if (message instanceof Metric) {
			return encodeLine(message, buf, 'M', Policy.DEFAULT);
		} else if (message instanceof Heartbeat) {
			return encodeLine(message, buf, 'H', Policy.DEFAULT);
		} else {
			throw new RuntimeException(String.format("Unsupported message type: %s.", message));
		}
	}

同样,数据流到达服务端的时候就需要进行解码处理,pipeline.addLast("decode", new MessageDecoder());解码操作在PlainTextMessageCodec#decode,先解码消息头,然后判断是否有可读字节,

public void decode(ByteBuf buf, MessageTree tree) {
		Context ctx = m_ctx.get().setBuffer(buf);

		decodeHeader(ctx, tree);

		if (buf.readableBytes() > 0) {
			decodeMessage(ctx, tree);
		}
	}

先解析出父消息,这个时候前面设置的type就会起到分支的作用,一个消息直接走'A',两个消息依次走t,A,T,刚好最开始放入栈中的消息为空case 't':stack.push(parent);这个刚好是退出while循环的条件,case 'A':parent.addChild(tran);case 'T':return stack.pop();同时把该设置的子消息设置到父消息中,解析m_durationInMicro时需要减2是因为编码的时候添加了"us"字符串。具体方法在PlainTextMessageCodec#decodeLine中。完整的消息DefaultMessageTree就这样解码完成。

protected void decodeMessage(Context ctx, MessageTree tree) {
		Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>();
		Message parent = decodeLine(ctx, null, stack);

		tree.setMessage(parent);

		while (ctx.getBuffer().readableBytes() > 0) {
			Message message = decodeLine(ctx, (DefaultTransaction) parent, stack);

			if (message instanceof DefaultTransaction) {
				parent = message;
			} else {
				break;
			}
		}
	}

当然在解码之前,会对数据的完整性进行校验,因为每条消息都会有个四字节的长度,所以可读字节数不能小于4,mark暂存数据块的读下标,开始读取消息长度,读下标往后移动了四个位置,现在重置回读取之前的位置reset也就是往前移动了四位,继续校验可读字节数,最后读取消息内容数据块进行解码。解码完成后把数据块保存在DefaultMessageTree中,记录执行数量m_processCount以及其他的统计值,最后开始进行消息处理。

protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
			if (buffer.readableBytes() < 4) {
				return;
			}
			buffer.markReaderIndex();
			int length = buffer.readInt();
			buffer.resetReaderIndex();
			if (buffer.readableBytes() < length + 4) {
				return;
			}
			try {
				if (length > 0) {
					ByteBuf readBytes = buffer.readBytes(length + 4);
					readBytes.markReaderIndex();
					readBytes.readInt();

					DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);

					readBytes.resetReaderIndex();
					tree.setBuffer(readBytes);
					m_handler.handle(tree);
					m_processCount++;

					long flag = m_processCount % CatConstants.SUCCESS_COUNT;

					if (flag == 0) {
						m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);
					}
				} else {
					// client message is error
					buffer.readBytes(length);
				}
			} catch (Exception e) {
				m_serverStateManager.addMessageTotalLoss(1);
				m_logger.error(e.getMessage(), e);
			}
		}

5. 执行CatHomeModule#execute方法,初始化消息处理类RealtimeConsumer,初始化配置更新守护线程ConfigReloadTask并且进行启动,根据服务配置类ServerConfigManager的相关配置启动其他的几个守护线程。

public void run() {
		boolean active = true;
		while (active) {
			try {
				m_productLineConfigManager.refreshConfig();
			} catch (Exception e) {
				Cat.logError(e);
			}
			try {
				m_metricConfigManager.refreshConfig();
			} catch (Exception e) {
				Cat.logError(e);
			}

			Transaction t = Cat.newTransaction("ReloadConfig", "router");
			try {
				m_routerConfigManager.refreshConfig();
				t.setStatus(Transaction.SUCCESS);
			} catch (Exception e) {
				Cat.logError(e);
				t.setStatus(e);
			} finally {
				t.complete();
			}

			try {
				m_blackListManager.refreshConfig();
			} catch (Exception e) {
				Cat.logError(e);
			}
			try {
				m_allTransactionConfigManager.refreshConfig();
			} catch (Exception e) {
				Cat.logError(e);
			}
			try {
				Thread.sleep(TimeHelper.ONE_MINUTE);
			} catch (InterruptedException e) {
				active = false;
			}
		}
	}

最后设置服务关闭钩子MessageConsumer#doCheckpoint做一些消息处理的结尾工作,在文件中保存消息数据等等。

 

相关标签: CAT