使用阿里云消息队列
程序员文章站
2022-07-13 12:16:34
...
使用阿里云消息队列
控制台地址:http://ons.console.aliyun.com/#/home/topic
(1)生成Producer ID
点击"申请发布"
示例代码:
package com.alibaba.ons.demo; import java.util.Properties; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; public class ProducerClient { public static void main(String[] args) { Properties properties = new Properties(); properties.put(PropertyKeyConst.ProducerId, "PID_whuang"); properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey"); properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey"); Producer producer = ONSFactory.createProducer(properties); //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。 producer.start(); Message msg = new Message( //Message Topic "com_hbjltv", //Message Tag, //可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在ONS服务器过滤 "TagA", //Message Body //任何二进制形式的数据,ONS不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式 "Hello ONS".getBytes() ); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过ONS Console查询消息并补发。 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); //发送消息,只要不抛异常就是成功 SendResult sendResult = producer.send(msg); System.out.println(sendResult); // 在应用退出前,销毁Producer对象 // 注意:如果不销毁也没有问题 producer.shutdown(); } }
(2)生成Consumer ID
点击"申请订阅"
示例代码:
public class ConsumerTest { public static void main(String[] args) { Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "CID_tv_mobile"); properties.put(PropertyKeyConst.AccessKey, "请输入AccessKey"); properties.put(PropertyKeyConst.SecretKey, "请输入SecretKey"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("com_hbjltv", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); } }
(3) clientId 的限制
阿里云消息队列对clientId的名称有严格限制:
(a)必须以申请的Consumer ID 开头,后面跟@@@,接着跟用于区分客户端的标志,
例如:
CID_tv_mobile@@@86458fd 是合法的
CID_tv_mobile@@86458fd 是非法的,因为只有两个@
(b)总长度不能超过23个字符
例如
CID_tv_mobile@@@86458_A是合法的
CID_tv_mobile@@@86458_Ab是非法的,因为超过了23个字符
(4)在手机端(客户端)增加订阅逻辑
package com.service; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence; import android.app.Service; import android.content.Context; import android.content.Intent; import android.content.SharedPreferences; import android.os.IBinder; import com.common.util.SystemHWUtil; import com.dict.Constants3; import com.jianli.R; import com.push.PushCallback; import com.string.widget.util.ValueWidget; import com.util.MacSignature; import com.util.ShopUtil; /** * @author Dominik Obermaier */ public class MQTTService extends Service { // public static final String BROKER_URL = // "tcp://broker.mqttdashboard.com:1883"; // public static String BROKER_URL = "tcp://172.16.15.50:1883"; public static String BROKER_URL_FORMAT = "tcp://%s:%s"; // public static final String BROKER_URL = "tcp://test.mosquitto.org:1883"; /* * In a real application, you should get an Unique Client ID of the device * and use this, see * http://android-developers.blogspot.de/2011/03/identifying * -app-installations.html */ public static String clientId = null; /** * 不能含有英文句点,可以包含下划线 */ public static final String TOPIC = "com_hbjltv"; private MqttClient mqttClient; // private String ip="182.92.80.122"; /*** * 是否连接上activeMQ */ private boolean online = false; boolean isAliyun=false; public IBinder onBind(Intent intent) { return null; } @Override public void onCreate() { super.onCreate(); } private MqttClient createMqttClient(String serverURL, String clientId) throws MqttException{ return new MqttClient(serverURL, clientId, new MemoryPersistence()); } /*** * * @param serverURL * @param clientId * : 最大长度:23 * @param isAllowOffline * @param username * @param password * @throws MqttException */ private void connectAndSubscribe(String serverURL, String clientId, /* String topicFilter, */boolean isAllowOffline, String username, String password) throws MqttException { if(isAliyun){ if(!ShopUtil.validateClientId(getApplicationContext(), clientId)){ return; } } mqttClient = createMqttClient(serverURL, clientId); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(!isAllowOffline);// mqtt receive offline message if (ValueWidget.isNullOrEmpty(username)) { username = null; } String sign=null; if(isAliyun){ try { sign = MacSignature.macSignature(Constants3.CONSUMER_ID_TV, password); password=sign; } catch (InvalidKeyException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } } if (ValueWidget.isNullOrEmpty(password)) { password = null; } else { options.setPassword(password.toCharArray()); } options.setUserName(username); options.setConnectionTimeout(10); options.setKeepAliveInterval(10); if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null mqttClient = createMqttClient(serverURL, clientId); } mqttClient.setCallback(new PushCallback(this)); boolean isSuccess=false; mqttClient.connect(options); isSuccess=true; // Subscribe to all subtopics of homeautomation // mqttClient.subscribe(topicFilter); if(null==mqttClient){//点击HOME键,过很长时间,再点击应用时,mqttClient为null mqttClient = createMqttClient(serverURL, clientId); } if(isAliyun){ final String p2ptopic = TOPIC+"/p2p/"; //同时订阅两个topic,一个是基于标准mqtt协议的发布订阅模式,一个是扩展的点对点推送模式 final String[] topicFilters=new String[]{TOPIC,p2ptopic}; mqttClient.subscribe(topicFilters); }else{ mqttClient.subscribe(new String[] { TOPIC, clientId }); } } @Override public void onStart(Intent intent, int startId) { final boolean isRestart=intent.getBooleanExtra("isRestart", false); ShopUtil.logger2("restart MQTT service:"+isRestart); // super.onStart(intent, startId); // if (intent == null) {//重启服务时intent 确实为空 // Log.d(Constants.LOG_TAG, "intent is null"); // return; // } Context context = getApplicationContext(); clientId = ShopUtil.getIMEI(context); // Bundle bundle=intent.getExtras(); // String ip=bundle.getString(Constants.ACTIVEMQ_IP); // final String ip = context.getString(R.string.pushserver_ip); SharedPreferences preferences = getApplicationContext() .getSharedPreferences(Constants3.SHAREDPREFERENCES_NAME, Context.MODE_PRIVATE); final String ip ="mqtt.ons.aliyun.com";// preferences.getString("pushserver_ip", context.getString(R.string.pushserver_ip)); final String port = preferences.getString("pushserver_port", "1883"); isAliyun=SystemHWUtil.parse2Boolean(preferences.getString("is_aliyun_mq_ONS", "false")); // String topic=bundle.getString(Constants.ACTIVEMQ_TOPIC); System.out.println("push ip:"+ip); new Thread(new Runnable() { /**** * 尝试连接的次数,为什么要尝试连接几次那? * (1)无wifi时启动,则肯定连接失败,所以尝试连接三次,只要在这个期间启动wifi就可以连接上activeMQ;<br /> * (2)之前连接上,然后断开wifi,然后又启动wifi,<br /> * 这时容易报 "Broker unavailable"异常,暂时不清楚原因,所以也需要继续尝试连接;<br /> * */ private int tryTime = 5; @Override public void run() { System.out.println(tryTime+","+mqttClient+","+isOnline() ); while (tryTime > 0 && (!isOnline() || mqttClient == null || (!mqttClient .isConnected())||isRestart)) { try { ShopUtil.logger2("start push service"); ShopUtil.logger2("push server:"+ip); String prefix=Constants3.CONSUMER_ID_TV+"@@@"; int remainingLength=23-prefix.length(); String suffix=null; if(clientId.length()>remainingLength){ suffix=clientId.substring(0,remainingLength); }else{ suffix=clientId; } String clientId2=prefix+suffix; connectAndSubscribe(String.format( MQTTService.BROKER_URL_FORMAT, ip, port), clientId2, /* topic, */true, ""/*自己申请的access key*/, ""/*secret*/); ShopUtil.logger2("clientId:" + clientId2); ShopUtil.logger2("succeed to connect to activeMQ"); setOnline(true); } catch (MqttException e) { setOnline(false); mqttClient=null; e.printStackTrace(); ShopUtil.logger2("抛异常:"+e.getMessage()); ShopUtil.logger2("ip:" + ip + " ,port:" + port); try { Thread.sleep(10000); } catch (InterruptedException e1) { e1.printStackTrace(); } } tryTime--; } } }).start(); // new Thread(new Runnable() { // @Override // public void run() { // System.out.println("start:"+System.currentTimeMillis()); // try { // Thread.sleep(10000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // while(true){ // try { // Thread.sleep(10000); // if(mqttClient!=null&& !mqttClient.isConnected()){ // System.out.println("disConnected:"+System.currentTimeMillis()); // } // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // } // }).start(); } @Override public void onDestroy() { setOnline(false); try { ShopUtil.logger2("MQTTService destory"); mqttClient.disconnect(0); } catch (MqttException e) { // Toast.makeText(getApplicationContext(), // "Something went wrong!" + e.getMessage(), Toast.LENGTH_LONG) // .show(); e.printStackTrace(); } mqttClient = null; stopForeground(true); Intent intent = new Intent("com.dbjtech.waiqin.destroy"); sendBroadcast(intent); } public boolean isOnline() { return online; } public void setOnline(boolean online) { this.online = online; } @Override public int onStartCommand(Intent intent, int flags, int startId) { flags = START_STICKY; return super.onStartCommand(intent, flags, startId); } }
源代码见githb:
https://github.com/whuanghkl/mqtt_client_swing.git
阿里云消息队列MQ 开发者手册见附件
上一篇: Metrics:JVM的实时监控工具
下一篇: Spring事务:声明式事务管理