欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用阿里云消息队列

程序员文章站 2022-07-13 12:16:34
...

使用阿里云消息队列

控制台地址:http://ons.console.aliyun.com/#/home/topic
使用阿里云消息队列
            
    
    博客分类: Java mq消息队列MQ 
 

(1)生成Producer ID

点击"申请发布"
使用阿里云消息队列
            
    
    博客分类: Java mq消息队列MQ 
 示例代码:

package com.alibaba.ons.demo;

import java.util.Properties;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;

public class ProducerClient {

    public static void main(String[] args) {
       Properties properties = new Properties();
       properties.put(PropertyKeyConst.ProducerId, "PID_whuang");
       properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
       properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
       Producer producer = ONSFactory.createProducer(properties);
           
       //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
       producer.start();
       Message msg = new Message(
            //Message Topic
            "com_hbjltv", 
            //Message Tag,
            //可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤        
            "TagA",
            //Message Body
            //任何二进制形式的数据,ONS不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式
            "Hello ONS".getBytes()
        );
        
        // 设置代表消息的业务关键属性,请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下,可通过ONS Console查询消息并补发。
        // 注意:不设置也不会影响消息正常收发
        msg.setKey("ORDERID_100");
        
        //发送消息,只要不抛异常就是成功
        SendResult sendResult = producer.send(msg);
        System.out.println(sendResult);

        // 在应用退出前,销毁Producer对象
        // 注意:如果不销毁也没有问题
        producer.shutdown();
    }
}

 

 

 

(2)生成Consumer ID

点击"申请订阅"
使用阿里云消息队列
            
    
    博客分类: Java mq消息队列MQ 
 示例代码:

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.ConsumerId, "CID_tv_mobile");
        properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey");
        properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("com_hbjltv", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
    }
}   

 

(3) clientId 的限制

阿里云消息队列对clientId的名称有严格限制:

(a)必须以申请的Consumer ID 开头,后面跟@@@,接着跟用于区分客户端的标志,

例如:

CID_tv_mobile@@@86458fd 是合法的

CID_tv_mobile@@86458fd 是非法的,因为只有两个@

(b)总长度不能超过23个字符

例如

CID_tv_mobile@@@86458_A是合法的

CID_tv_mobile@@@86458_Ab是非法的,因为超过了23个字符


使用阿里云消息队列
            
    
    博客分类: Java mq消息队列MQ 
 

 

(4)在手机端(客户端)增加订阅逻辑

 

package com.service;

import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;

import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.content.SharedPreferences;
import android.os.IBinder;

import com.common.util.SystemHWUtil;
import com.dict.Constants3;
import com.jianli.R;
import com.push.PushCallback;
import com.string.widget.util.ValueWidget;
import com.util.MacSignature;
import com.util.ShopUtil;

/**
 * @author Dominik Obermaier
 */
public class MQTTService extends Service {

	// public static final String BROKER_URL =
	// "tcp://broker.mqttdashboard.com:1883";
	// public static String BROKER_URL = "tcp://172.16.15.50:1883";
	public static String BROKER_URL_FORMAT = "tcp://%s:%s";
	// public static final String BROKER_URL = "tcp://test.mosquitto.org:1883";

	/*
	 * In a real application, you should get an Unique Client ID of the device
	 * and use this, see
	 * http://android-developers.blogspot.de/2011/03/identifying
	 * -app-installations.html
	 */
	public static String clientId = null;

	/**
	 * 不能含有英文句点,可以包含下划线
	 */
	public static final String TOPIC = "com_hbjltv";
	private MqttClient mqttClient;
//	private String ip="182.92.80.122";
	/***
	 * 是否连接上activeMQ
	 */
	private boolean online = false;
	boolean isAliyun=false;

	public IBinder onBind(Intent intent) {
		return null;
	}

	@Override
	public void onCreate() {
		super.onCreate();
	}

	private MqttClient createMqttClient(String serverURL, String clientId) throws MqttException{
		return new MqttClient(serverURL, clientId,
				new MemoryPersistence());
	}
	/***
	 * 
	 * @param serverURL
	 * @param clientId
	 *            : 最大长度:23
	 * @param isAllowOffline
	 * @param username
	 * @param password
	 * @throws MqttException
	 */
	private void connectAndSubscribe(String serverURL, String clientId,
	/* String topicFilter, */boolean isAllowOffline, String username,
			String password) throws MqttException {
		
		if(isAliyun){
			if(!ShopUtil.validateClientId(getApplicationContext(), clientId)){
				return;
			}
		}
		mqttClient = createMqttClient(serverURL, clientId);
		MqttConnectOptions options = new MqttConnectOptions();
		options.setCleanSession(!isAllowOffline);// mqtt receive offline message
		if (ValueWidget.isNullOrEmpty(username)) {
			username = null;
		}
		String sign=null;
		if(isAliyun){
        	try {
				sign = MacSignature.macSignature(Constants3.CONSUMER_ID_TV, password);
				password=sign;
			} catch (InvalidKeyException e) {
				e.printStackTrace();
			} catch (NoSuchAlgorithmException e) {
				e.printStackTrace();
			}
        	
        }
		if (ValueWidget.isNullOrEmpty(password)) {
			password = null;
		} else {
			options.setPassword(password.toCharArray());
		}
		options.setUserName(username);
		options.setConnectionTimeout(10);
		options.setKeepAliveInterval(10);
		if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null
			mqttClient = createMqttClient(serverURL, clientId);
		}
		mqttClient.setCallback(new PushCallback(this));
		boolean isSuccess=false;
		mqttClient.connect(options);
		isSuccess=true;
		// Subscribe to all subtopics of homeautomation
		// mqttClient.subscribe(topicFilter);
		if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null
			mqttClient = createMqttClient(serverURL, clientId);
		}
		if(isAliyun){
    		final String p2ptopic = TOPIC+"/p2p/";
    		//同时订阅两个topic,一个是基于标准mqtt协议的发布订阅模式,一个是扩展的点对点推送模式
    		final String[] topicFilters=new String[]{TOPIC,p2ptopic};
    		mqttClient.subscribe(topicFilters);
    	}else{
    		mqttClient.subscribe(new String[] { TOPIC, clientId });
		}
	}

	@Override
	public void onStart(Intent intent, int startId) {
		final boolean isRestart=intent.getBooleanExtra("isRestart", false);
		ShopUtil.logger2("restart MQTT service:"+isRestart);
		// super.onStart(intent, startId);
//		if (intent == null) {//重启服务时intent 确实为空
		
//			Log.d(Constants.LOG_TAG, "intent is null");
//			return;
//		}
		Context context = getApplicationContext();
		clientId = ShopUtil.getIMEI(context);
		// Bundle bundle=intent.getExtras();
		// String ip=bundle.getString(Constants.ACTIVEMQ_IP);
//		final String ip = context.getString(R.string.pushserver_ip);
		SharedPreferences preferences = getApplicationContext()
				.getSharedPreferences(Constants3.SHAREDPREFERENCES_NAME,
						Context.MODE_PRIVATE);
		final String ip ="mqtt.ons.aliyun.com";// preferences.getString("pushserver_ip", context.getString(R.string.pushserver_ip));
		final String port = preferences.getString("pushserver_port", "1883");
		isAliyun=SystemHWUtil.parse2Boolean(preferences.getString("is_aliyun_mq_ONS", "false"));
		// String topic=bundle.getString(Constants.ACTIVEMQ_TOPIC);
		 System.out.println("push ip:"+ip);
		new Thread(new Runnable() {
			/****
			 * 尝试连接的次数,为什么要尝试连接几次那?
			 * (1)无wifi时启动,则肯定连接失败,所以尝试连接三次,只要在这个期间启动wifi就可以连接上activeMQ;<br />
			 * (2)之前连接上,然后断开wifi,然后又启动wifi,<br />
			 * 这时容易报 "Broker unavailable"异常,暂时不清楚原因,所以也需要继续尝试连接;<br />
			 * 
			 */
			private int tryTime = 5;

			@Override
			public void run() {
				System.out.println(tryTime+","+mqttClient+","+isOnline() );
				while (tryTime > 0
						&& (!isOnline() || mqttClient == null || (!mqttClient
								.isConnected())||isRestart)) {
					try {
						ShopUtil.logger2("start push service");
						ShopUtil.logger2("push server:"+ip);
						String prefix=Constants3.CONSUMER_ID_TV+"@@@";
						int remainingLength=23-prefix.length();
						String suffix=null;
						if(clientId.length()>remainingLength){
							suffix=clientId.substring(0,remainingLength);
						}else{
							suffix=clientId;
						}
						String clientId2=prefix+suffix;
						connectAndSubscribe(String.format(
								MQTTService.BROKER_URL_FORMAT, ip, port),
								clientId2, /* topic, */true, ""/*自己申请的access key*/, ""/*secret*/);
						 ShopUtil.logger2("clientId:" + clientId2);
						ShopUtil.logger2("succeed to connect to activeMQ");
						setOnline(true);
					} catch (MqttException e) {
						setOnline(false);
						mqttClient=null;
						 e.printStackTrace();
						ShopUtil.logger2("抛异常:"+e.getMessage());
						ShopUtil.logger2("ip:" + ip + " ,port:" + port);
						try {
							Thread.sleep(10000);
						} catch (InterruptedException e1) {
							e1.printStackTrace();
						}
					}
					tryTime--;
				}

			}
		}).start();
//		new Thread(new Runnable() {
//			@Override
//			public void run() {
//				System.out.println("start:"+System.currentTimeMillis());
//				try {
//					Thread.sleep(10000);
//				} catch (InterruptedException e) {
//					e.printStackTrace();
//				}
//				while(true){
//					try {
//						Thread.sleep(10000);
//						if(mqttClient!=null&& !mqttClient.isConnected()){
//							System.out.println("disConnected:"+System.currentTimeMillis());
//						}
//					} catch (InterruptedException e) {
//						e.printStackTrace();
//					}
//				}
//			}
//		}).start();
	}

	@Override
	public void onDestroy() {
		setOnline(false);
	     
		try {
			ShopUtil.logger2("MQTTService destory");
			mqttClient.disconnect(0);
		} catch (MqttException e) {
//			Toast.makeText(getApplicationContext(),
//					"Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG)
//					.show();
			e.printStackTrace();
			
		}
		mqttClient = null;
		stopForeground(true);
		Intent intent = new Intent("com.dbjtech.waiqin.destroy");  
	    sendBroadcast(intent); 
	}

	

	public boolean isOnline() {
		return online;
	}

	public void setOnline(boolean online) {
		this.online = online;
	}
	@Override
	public int onStartCommand(Intent intent, int flags, int startId) {
		flags = START_STICKY;
		return super.onStartCommand(intent, flags, startId);
	}
}

 

源代码见githb:

https://github.com/whuanghkl/mqtt_client_swing.git

阿里云消息队列MQ 开发者手册见附件

  • 使用阿里云消息队列
            
    
    博客分类: Java mq消息队列MQ 
  • 大小: 28.2 KB
  • 使用阿里云消息队列
            
    
    博客分类: Java mq消息队列MQ 
  • 大小: 19 KB
  • 使用阿里云消息队列
            
    
    博客分类: Java mq消息队列MQ 
  • 大小: 17.7 KB
  • 使用阿里云消息队列
            
    
    博客分类: Java mq消息队列MQ 
  • 大小: 267.3 KB