欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

rocketMq实战(3)-console和运维 博客分类: rocketmq rocketmq-console 

程序员文章站 2024-03-17 19:16:58
...


 
 rocketmq这么复杂的东西,没有运维工具可搞不定啊。

 

哈哈,别急,官方提供了一个WEB项目,可以查看rocketmq数据和执行一些操作。

而且我自己也添加了一些功能

官网:https://github.com/rocketmq/rocketmq-console

运行修改一下namesers的地址,注意多个地址用 分号 分割


rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
 

下面是成功页面,cluster查询broker集群的tps和出队入队情况。

topic查询生产和消费信息

connection查询生产则和消费者的连接信息。

其他的不讲了,多试试就明白了。

注意consumer这一项,可以查询消息积压,这是我们最关心的。

首先更正一个观念 对与broker是没有积压这个概念的,只有consumer有积压的概念。

 

 


rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
 看下图可以查询consumer的消费情况,下图是查询指定consumer的页面 每个queue对应的broker的生产和消费情况。

Diff Total 是总的挤压数


rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
 

 重点介绍我自己开发的几个功能

一 ,查询所有consumer的积压

如果想查询所有consumer的挤压情况,抱歉没有,只能自己开发。

这个功能的原理就是 通过ssh Java客户端去执行 命令 获取返回数据,在任何的broker执行都可以(如果童鞋们知道更好的实现方式 请告知)。

命令资料如下

 

 

rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
 通过sa账号执行 sh /opt/ali-rocketmq/devenv/bin/mqadmin consumerProgress -n \"10.103.16.77:9876;10.103.16.15:9876\"  报出

()

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

解决方法

 在bin/mqadmin 脚本写入 export JAVA_HOME=/usr/local/java .

 
rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
 

具体代码实现见附件

 

 

 二,定时查询某个topic的挤压数 ,报警功能

代码如下

/**
 * 单条consumer的定时提醒
 * @author chenchangqun
 *
 */
public class SingleNotifyTask {
    static final Logger logger = LoggerFactory.getLogger(SingleNotifyTask.class);
	private int delayMils;//延迟启动时间,单位 毫秒
	private int periodMils;//周期时间 间隔,单位 毫秒
	private 	String consumerName;
	   private INotifyInvoke notifyInvoke;//通知操作	
	@Resource(name="consumerService")
    private ConsumerService consumerService;
       private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQNotifyTaskScheduledThread");
        }
    });
	public void init(){
   scheduledExecutorService.scheduleAtFixedRate(new Runnable(){
	@Override
			public void run() {
				try {
					Table table = consumerService.consumerProgress(consumerName);
					String diffCountStr = table.getExtData().get("Diff Total:");
					if (StringUtils.isBlank(diffCountStr)) {
						logger.error("diff Total is null,consumer=" + consumerName);
						return;
					}
					int diffCount = Integer.parseInt(diffCountStr);
					notifyInvoke.invoke(diffCount,consumerName);
				

				} catch (Throwable e) {
					logger.error("queryConsumerState fail, consumer=" + consumerName, e);
				}

			}
	}, delayMils, periodMils, TimeUnit.MILLISECONDS);
	}
 
	public void setDelayMils(int delayMils) {
		this.delayMils = delayMils;
	}
	public void setPeriodMils(int periodMils) {
		this.periodMils = periodMils;
	}
	public void setConsumerName(String consumerName) {
		this.consumerName = consumerName;
	}

	public void setNotifyInvoke(INotifyInvoke notifyInvoke) {
		this.notifyInvoke = notifyInvoke;
	}
	
}

 通过 定时调用rocketmq的API查询挤压数,根据实现类的逻辑执行报警

/**
 * 通知操作 interface
 * @author chenchangqun
 *
 */
public interface INotifyInvoke {

/**
 * 执行通知的具体操作
 * @param diffCount
 */
public void  invoke(int diffCount,String consumerName);
}

 

/**
 * 操作通知具体实现类
 * @author chenchangqun
 *
 */
public class NotifyInvokeImpl implements INotifyInvoke {
private int thresholdValue;//积压阀值,用作是否发出提醒
	@Override
	public void invoke(int diffCount,String  consumerName) {
		if (diffCount > thresholdValue) {
			System.out.println("single check invoke ," + consumerName
					+ "  geater thean thresholdValue ,do something,thresholdValue=" + thresholdValue);
		}
	}
	public void setThresholdValue(int thresholdValue) {
		this.thresholdValue = thresholdValue;
	}
}

 

配置如下

 

  
<bean id="singleNotifyTask" class="com.alibaba.rocketmq.common.SingleNotifyTask" init-method="init">
   <property name="consumerName" value="firstSpringConsumer"></property>
   <property name="delayMils" value="1000"></property>
   <property name="periodMils" value="3000"></property>
   <property name="notifyInvoke" ref="notifyInvoke"></property> 
   </bean>
  <!-- 具体的通知动作 -->   
<bean id="notifyInvoke" class="com.alibaba.rocketmq.common.NotifyInvokeImpl" >
   <property name="thresholdValue" value="1"></property>
   </bean>
    

 

 

附件中给出具体代码,仅供参考

 

  • rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
  • 大小: 46.8 KB
  • rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
  • 大小: 35.5 KB
  • rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
  • 大小: 55.7 KB
  • rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
  • 大小: 41.1 KB
  • rocketMq实战(3)-console和运维
            
    
    博客分类: rocketmq rocketmq-console 
  • 大小: 7.5 KB
相关标签: rocketmq-console