欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

CAT集成阿里RocketMQ产品

程序员文章站 2022-07-15 16:49:56
...

背景

在上面介绍了CAT如何做跨进程追踪的。公司项目采用业务垂直切分的方式开发、前后端也是分离方式。那么实时监控埋点的地方有如下几处:

  • 前端和后端交互
  • 后端业务逻辑
  • 数据库存储
  • 远程PRC调用
  • 消息队列

 

消息队列介绍

这里介绍的是阿里云RocketMQ产品。由于公司现阶段运维实力不足,所以对于中间件采用的策略是直接购买产品。这样可以减少运维成本,同时中间件的稳定性也得到了保证。

使用的客户端版本是1.8.8Fianl。如果使用的版本是1.7.x 是没有spi这个包的。

 

集成思路

根据之前集成其他框架的思路。那么在集成消息队列的时候一样先去了解一下客户端,看看客户端是否已经提供的扩展的方式。然后在下手。

阿里云消息队列RocketMQC产品的客户端是openservices-client 这个jar. 打开jar包浏览一下。你们发现一个叫spi的包。嘿嘿,找到插入点了。spi是java中的一种扩展方式。打开这个包就会发现里面有ConsumerInterceptor 接口和ProducerInterceptor 接口。这两个接口继承了Interceptor接口。一看这个接口就知道它要做什么了。人如其名:

 

public interface Interceptor<T> {
    /**
     * Execute hook-in logic before executing business processing.
     *
     * <p>
     * <strong>Exception Handling</strong> If this method raised an exception, the exception will be logged and
     * interception chain will proceed to the next one.
     * </p>
     *
     * @param invocationContext Invocation context
     * @param instance client instance that is intercepted
     * @return true if interceptor chain is allowed to proceed; false otherwise.
     * @throws Exception If anything wrong is raised.
     */
    boolean preHandle(InvocationContext invocationContext, T instance) throws Exception;

    /**
     * Execute hook-in logic after business processing. Potential execution result can be acquired through inspecting
     * invocation context.
     *
     * @param invocationContext Invocation context
     * @param instance Client instance being intercepted.
     * @throws Exception If anything wrong raised.
     */
    void postHandle(InvocationContext invocationContext, T instance) throws Exception;
}

 

至此我们找了集成的入口,那么接下来就是如何做集成了。

 

集成消息队列

既然是spi的方式,那么就按照spi的方式编写就好。以客户端为例,首先就是实现一个ConsumerInterceptor 接口的类。代码如下:

/**
 * 测试使用阿里封装的消息队列客户端中的SPI方式扩展发送消息
 * @author tengx
 */
@Slf4j
public class RocketMQProducerIntercept implements ProducerInterceptor {

    @Override
    public boolean preHandle(InvocationContext invocationContext, Admin instance) throws Exception {
        Optional<List<Message>> message=invocationContext.getMessages();
        List<Message> messages=message.get();
        if (ObjectUtil.isNotNull(messages) && messages.size()>0){
            for(Message msg:messages){
                String topic=msg.getTopic();
                String tag=msg.getTag();
                byte[] body=msg.getBody();
                Transaction t=Cat.newTransaction(CatRocketMQConstants.ROCKETMQ_PRODUCER_TOPIC,topic);
                try {
                    Cat.Context ctx = new CatContext();
                    Cat.logRemoteCallClient(ctx,topic);
                    Properties properties=msg.getUserProperties();
                    if(ObjectUtil.isNull(properties)){
                        properties=new Properties();
                    }
                    properties.setProperty(Cat.Context.ROOT, ctx.getProperty(Cat.Context.ROOT));
                    properties.setProperty(Cat.Context.CHILD, ctx.getProperty(Cat.Context.CHILD));
                    properties.setProperty(Cat.Context.PARENT, ctx.getProperty(Cat.Context.PARENT));
                    properties.setProperty(CatRocketMQConstants.ROCKETMQ_PRODUCER_TIME, String.valueOf(System.currentTimeMillis()));
                    msg.setUserProperties(properties);
                    Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_IP, JsonUtils.toJSONString(NetUtil.localIpv4s()));
                    Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_TOPIC, topic);
                    Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_TAG, tag);
                    Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_BODY, new String(body));
                    Cat.logEvent(CatRocketMQConstants.ROCKETMQ_PRODUCER_DATETIME, DateUtil.now());

                    t.setStatus(Transaction.SUCCESS);
                }catch (Exception e){
                    t.setStatus(e);
                    Cat.logError(e);
                }finally {
                    t.complete();
                }

            }

        }
        return true;
    }

    @Override
    public void postHandle(InvocationContext invocationContext, Admin instance) throws Exception {

    }
}

 

实现逻辑如下:

  • 实现preHandler方式,也就是在发送消息前执行
  • 进行 CAT一些指标的记录
  • 将跨进程的一些参数放入到uerProperties 中进行传递

 

接下来就是在resources文件夹下创建META-INFI.services 文件夹,并创建接口对应的文件。在文件中写入实现类。消费端spi和生产者基本是一样的。

 

总结:

  • 在做集成的时候,首先去看看框架的客户端是否提供了扩展方式
  • java spi 是一种不错的扩展方式。以后自己在写客户端的时候可以加入spi扩展点。

 

相关标签: CAT