CAT集成阿里RocketMQ产品
程序员文章站
2022-07-15 16:49:56
...
背景
在上面介绍了CAT如何做跨进程追踪的。公司项目采用业务垂直切分的方式开发、前后端也是分离方式。那么实时监控埋点的地方有如下几处:
- 前端和后端交互
- 后端业务逻辑
- 数据库存储
- 远程PRC调用
- 消息队列
消息队列介绍
这里介绍的是阿里云RocketMQ产品。由于公司现阶段运维实力不足,所以对于中间件采用的策略是直接购买产品。这样可以减少运维成本,同时中间件的稳定性也得到了保证。
使用的客户端版本是1.8.8Fianl。如果使用的版本是1.7.x 是没有spi这个包的。
集成思路
根据之前集成其他框架的思路。那么在集成消息队列的时候一样先去了解一下客户端,看看客户端是否已经提供的扩展的方式。然后在下手。
阿里云消息队列RocketMQC产品的客户端是openservices-client 这个jar. 打开jar包浏览一下。你们发现一个叫spi的包。嘿嘿,找到插入点了。spi是java中的一种扩展方式。打开这个包就会发现里面有ConsumerInterceptor 接口和ProducerInterceptor 接口。这两个接口继承了Interceptor接口。一看这个接口就知道它要做什么了。人如其名:
public interface Interceptor<T> {
/**
* Execute hook-in logic before executing business processing.
*
* <p>
* <strong>Exception Handling</strong> If this method raised an exception, the exception will be logged and
* interception chain will proceed to the next one.
* </p>
*
* @param invocationContext Invocation context
* @param instance client instance that is intercepted
* @return true if interceptor chain is allowed to proceed; false otherwise.
* @throws Exception If anything wrong is raised.
*/
boolean preHandle(InvocationContext invocationContext, T instance) throws Exception;
/**
* Execute hook-in logic after business processing. Potential execution result can be acquired through inspecting
* invocation context.
*
* @param invocationContext Invocation context
* @param instance Client instance being intercepted.
* @throws Exception If anything wrong raised.
*/
void postHandle(InvocationContext invocationContext, T instance) throws Exception;
}
至此我们找了集成的入口,那么接下来就是如何做集成了。
集成消息队列
既然是spi的方式,那么就按照spi的方式编写就好。以客户端为例,首先就是实现一个ConsumerInterceptor 接口的类。代码如下:
/**
* 测试使用阿里封装的消息队列客户端中的SPI方式扩展发送消息
* @author tengx
*/
@Slf4j
public class RocketMQProducerIntercept implements ProducerInterceptor {
@Override
public boolean preHandle(InvocationContext invocationContext, Admin instance) throws Exception {
Optional<List<Message>> message=invocationContext.getMessages();
List<Message> messages=message.get();
if (ObjectUtil.isNotNull(messages) && messages.size()>0){
for(Message msg:messages){
String topic=msg.getTopic();
String tag=msg.getTag();
byte[] body=msg.getBody();
Transaction t=Cat.newTransaction(CatRocketMQConstants.ROCKETMQ_PRODUCER_TOPIC,topic);
try {
Cat.Context ctx = new CatContext();
Cat.logRemoteCallClient(ctx,topic);
Properties properties=msg.getUserProperties();
if(ObjectUtil.isNull(properties)){
properties=new Properties();
}
properties.setProperty(Cat.Context.ROOT, ctx.getProperty(Cat.Context.ROOT));
properties.setProperty(Cat.Context.CHILD, ctx.getProperty(Cat.Context.CHILD));
properties.setProperty(Cat.Context.PARENT, ctx.getProperty(Cat.Context.PARENT));
properties.setProperty(CatRocketMQConstants.ROCKETMQ_PRODUCER_TIME, String.valueOf(System.currentTimeMillis()));
msg.setUserProperties(properties);
Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_IP, JsonUtils.toJSONString(NetUtil.localIpv4s()));
Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_TOPIC, topic);
Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_TAG, tag);
Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_BODY, new String(body));
Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_DATETIME, DateUtil.now());
t.setStatus(Transaction.SUCCESS);
}catch (Exception e){
t.setStatus(e);
Cat.logError(e);
}finally {
t.complete();
}
}
}
return true;
}
@Override
public void postHandle(InvocationContext invocationContext, Admin instance) throws Exception {
}
}
实现逻辑如下:
- 实现preHandler方式,也就是在发送消息前执行
- 进行 CAT一些指标的记录
- 将跨进程的一些参数放入到uerProperties 中进行传递
接下来就是在resources文件夹下创建META-INFI.services 文件夹,并创建接口对应的文件。在文件中写入实现类。消费端spi和生产者基本是一样的。
总结:
- 在做集成的时候,首先去看看框架的客户端是否提供了扩展方式
- java spi 是一种不错的扩展方式。以后自己在写客户端的时候可以加入spi扩展点。
推荐阅读