阿里云物联网平台最完全的使用教程
简介:包括内容如下(详细到每一个细节和步骤,如果还不清楚,可以进入阿里云控制台创建工单,请教阿里的工程师) 使用环境:(使用蜂窝网进行过测试,和WiFi直连并无差别,可以直接使用)
一、阿里云账号说明
1、基本功能说明
进入阿里云官网创建主账号
https://www.aliyun.com/?spm=a2c4g.11186623.amxosvpfn.2.15f5293ewZtPYC
创建完成之后,进入控制台并选择Access Key管理,如图所示
创建子账号,并将物联网平台的所有权限给予子账号,以后我们就用子账号进行各类操作,注意保存得到的三元组,这是接入物联网平台的关键之一
2、开通物联网服务
https://www.aliyun.com/product/iot?spm=5176.10695662.J_3717714080.2.1ce83318Gaytdw
选择开通即可,前两个月赠送的免费消息足够用了,选择进入管理控制台
https://iot.console.aliyun.com/lk/summary
指出来的这几个是需要用到的功能
二、物联网平台的基本使用
1、创建产品,如下图
2、添加设备
三、设备接入物联网平台
1、开发环境设置
https://help.aliyun.com/document_detail/98292.html?spm=a2c4g.11186623.6.683.7d5b1f19UYzxqv
环境win10,pycharm2020,python3.8(Ubuntu16 64-bit和Ubuntu18 arm架构同理)
(1)python3.8的安装和pycharm的安装略过(python3.8需要安装pip下载工具)
(2)环境配置
无需参考官方文档配置虚拟环境,直接用pycharm就好了
直接win+R进入win10命令行控制环境
输入:
pip install aliyun-iot-linkkit
2、连接
https://help.aliyun.com/document_detail/98293.html?spm=a2c4g.11186623.6.684.61c84912ccMTDp
使用一机一密方式进行
from linkkit import linkkit
import sys
#一机一密认证
lk = linkkit.LinkKit(
host_name="cn-shanghai",
product_key="aG*******k",
device_name="Test1",
device_secret="****************")
#连接上物联网平台后的回调,成功连接session_flag和rc返回0
def onconnect(sessionflag, rc, userdata):
print("onconnect:%d,rc:%d:" % (sessionflag, rc))
pass
#断开连接物联网平台后的回调,断开后rc返回1
def on_disconnect(rc, userdata):
print("on_disconnect:rc:%d:" % rc)
#当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用
lk.onconnect = onconnect
lk.ondisconnect = ondisconnect
lk.connect_async() # 连接物联网平台
lk.startworkerloop() # 保持连接
print("connected")
注意
1、lk后面的是你创建产品设备对应的三元组|
2、注意所有的回调函数放在连接之前,程序会一直执行,只要出现相应的操作回调函数就会被调用,即只要连接上时,就输出rc=0,只要断开连接时,就输出rc=1 |
3、自定义MQTT通信
(1)创建自定义的Topic(注意:Topic的权限与代码中的函数要一一对应,例如权限为订阅,那么在通信时选择的应该是subscribe回调,可以接收到云端消息,发布同理)
Topic的名字是作为通信的凭证
(2)实现(首先需要连接上阿里云物联网平台,再构造逻辑进行相应操作,https://help.aliyun.com/document_detail/98295.html?spm=a2c4g.11186623.6.685.6d596dc9OWMDE9)
from linkkit import linkkit
import sys
import time
#一机一密认证
lk = linkkit.LinkKit(
host_name="cn-shanghai",
product_key="a*************Gk",
device_name="Test1",
device_secret="****************************")
#连接上物联网平台后的回调,成功连接session_flag和rc返回0
def onconnect(sessionflag, rc, userdata):
print("onconnect:%d,rc:%d:" % (sessionflag, rc))
pass
#断开连接物联网平台后的回调,断开后rc返回1
def on_disconnect(rc, userdata):
print("on_disconnect:rc:%d:" % rc)
#订阅topic回调
def onsubscribetopic(mid, granted_qos, userdata):
print("onsubscribetopic mid:%d, granted_qos:%s" %
(mid, str(','.join('%s' % it for it in granted_qos))))
pass
#取消订阅回调
def onunsubscribetopic(mid, userdata):
print("onunsubscribetopic mid:%d" % mid)
pass
#接收消息回调,调用函数时会一直执行print,效果为:如果调用该方法,发布消息的topic每次发布消息都会被输出到控制台并打印出来
def ontopicmessage(topic, payload, qos, userdata):
print("ontopicmessage:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
pass
#当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用
lk.onconnect = onconnect
lk.ondisconnect = ondisconnect
lk.onsubscribetopic = onsubscribetopic # 订阅topic回调
lk.onunsubscribetopic = onunsubscribetopic # 取消订阅topic回调
lk.ontopicmessage = ontopicmessage # 接收topic消息回调
lk.connect_async() # 连接物联网平台
lk.startworkerloop() # 保持连接
print("connected")
#增加while循环的作用:保证物联网平台是连接上之后再进行通信的
while True:
try:
msg = input() # 获取从控制台的输入
except KeyboardInterrupt:
sys.exit()
else:
if msg == "1":
lk.disconnect()
elif msg == "2":
lk.connect_async()
elif msg == "3": # 输入为3时,订阅get这个topic,每个订阅的topic只需订阅一次即可,会在物联网平台的设备topic列表中显示
rc, mid = lk.subscribetopic(lk.tofull_topic("user/get")) # 注意topic只需要写成这样的格式即可,格式需要完全一致,全称会自动补全,不需要输入设备名
if rc == 0: # rc返回值为0时则表示订阅成功
print("subscribe topic success:%r, mid:%r" % (rc, mid))
else:
print("subscribe topic fail:%d" % rc)
elif msg == "4": # 取消订阅
rc, mid = lk.unsubscribetopic(lk.tofull_topic("user/get"))
if rc == 0:
print("unsubscribe topic success:%r, mid:%r" % (rc, mid))
else:
print("unsubscribe topic fail:%d" % rc)
elif msg == "5": # 发布消息“123”给自定义的reciver这个topic
rc, mid = lk.publishtopic(lk.tofull_topic("user/test"), "123")
if rc == 0:
print("publish topic success:%r, mid:%r" % (rc, mid))
else:
print("publish topic fail:%d" % rc)
elif msg == "6": # 同时订阅多个topic
rc, mid = lk.subscribetopic([(lk.tofull_topic("user/sender"), 1),
(lk.tofulltopic("user/get"), 1),
(lk.tofulltopic("user/test"), 1)])
if rc == 0:
print("subscribe multiple topics success:%r, mid:%r" % (rc, mid))
else:
print("subscribe multiple topics fail:%d" % rc)
elif msg == "7": # 同时取消订阅多个topic
rc, mid = lk.unsubscribetopic([lk.tofulltopic("user/get"), lk.tofull_topic("user/test")])
if rc == 0:
print("unsubscribe multiple topics success:%r, mid:%r" % (rc, mid))
else:
print("unsubscribe multiple topics fail:%d" % rc)
elif msg == "8": # RRPC
lk.ontopicmessage = ontopicmessage
elif msg == "11": # 物模型通信,属性上报
prop_data = {
"Test001": "hh",
"memory_usage": 99
}
rc, requestid = lk.thingpostproperty(propdata)
if rc == 0:
print("propertydata post success:%r, requestid:%r" % (rc, request_id))
else:
print("property_data post fail:%d" % rc)
elif msg == "12": # 物模型通信,事件1上报
event_data = {
"Testdata001": 100
}
rc, requestid = lk.thingtriggerevent(("Test001event",event_data))
if rc == 0:
print("eventdata post success:%r, requestid:%r" % (rc, request_id))
else:
print("event_data post fail:%d" % rc)
elif msg == "13": # 物模型通信,事件2上报
event_data = {
"Testdata002": 1
}
rc, requestid = lk.thingtriggerevent(("Test002event",event_data))
if rc == 0:
print("eventdata post success:%r, requestid:%r" % (rc, request_id))
else:
print("event_data post fail:%d" % rc)
elif msg == "98": # 打印topic列表?
ret = lk.dumpusertopics()
print("user topics:%s", str(ret))
elif msg == "99": # 断开连接
lk.destruct()
print("destructed")
else:
sys.exit()
注意点
1、自行查看自定义MQTT通信的代码
2、接收消息回调,当云端发送消息到设备时发生作用,可将接收的数据输出到控制台(回调函数是程序执行过程中一直在执行的代码,只要满足相应的条件就会被运行)
3、注意Topic的格式为:“user/test”(只能是这样的,不需要输入完整的topic名称,sdk会自动补全名称,即如下图部分)
4、设备发送到云端消息查看,如下图:
4、物模型通信
(1)自定义物模型
点编辑草稿进行自定义物模型创建
自定义各项数据
点击生成设备端代码或者物模型TSL查看自定义的物模型的名称等信息(这个是作为物模型通信的凭证)
(2)实现(https://help.aliyun.com/document_detail/98370.html?spm=a2c4g.11186623.6.686.7e1352f7jxsvvk)
完整测试代码
from linkkit import linkkit
import sys
import time
#一机一密认证
lk = linkkit.LinkKit(
host_name="cn-shanghai",
product_key="a************Gk",
device_name="Test1",
device_secret="***************************")
#配置物模型文件
lk.thing_setup("tsl.json")
#物模型可用时回调函数
def onthingenable(self, userdata):
print("onthingenable")
def onthingenable(userdata):
print("onthingenable")
pass
#物模型不可用时回调函数
def onthingdisable(userdata):
print("onthingdisable")
#属性上报回调
def onthingproppost(self, requestid, code, data, message, userdata):
print("onthingproppost request id:%s, code:%d, data:%s message:%s" % (requestid, code, str(data), message))
def onthingproppost(requestid, code, data, message, userdata):
print("onthingprop_post request id:%s, code:%d, data:%s message:%s" %
(request_id, code, str(data), message))
#事件上报回调
def onthingeventpost(self, event, requestid, code, data, message, userdata):
print("onthingevent_post request id:%s, code:%d, data:%s message:%s" % (event, code, str(data), message))
def onthingeventpost(event, requestid, code, data, message, userdata):
print("onthingevent_post event:%s,request id:%s, code:%d, data:%s, message:%s" %
(event, request_id, code, str(data), message))
#RRPC请求回调
def ontopicrrpc_message(self, id, topic, payload, qos, userdata):
print("ontopicrrpc_message: id:%s, topic:%s, payload:%s" % (id, topic, payload))
self.linkkit.thinganswerrrpc(id, payload)
#service请求回调
def onthingcallservice(self, identifier, requestid, params, userdata):
print("onthingcallservice: identifier:%s, requestid:%s, params:%s" % (identifier, request_id, params))
self.linkkit.thinganswerservice(identifier, request_id, 200, {})
#连接上物联网平台后的回调,成功连接session_flag和rc返回0
def onconnect(sessionflag, rc, userdata):
print("onconnect:%d,rc:%d:" % (sessionflag, rc))
pass
#断开连接物联网平台后的回调,断开后rc返回1
def on_disconnect(rc, userdata):
print("on_disconnect:rc:%d:" % rc)
#订阅topic回调
def onsubscribetopic(mid, granted_qos, userdata):
print("onsubscribetopic mid:%d, granted_qos:%s" %
(mid, str(','.join('%s' % it for it in granted_qos))))
pass
#取消订阅回调
def onunsubscribetopic(mid, userdata):
print("onunsubscribetopic mid:%d" % mid)
pass
#接收消息回调,调用函数时会一直执行print,效果为:如果调用该方法,发布消息的topic每次发布消息都会被输出到控制台并打印出来
def ontopicmessage(topic, payload, qos, userdata):
print("ontopicmessage:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
pass
#当出现网络波动时,程序会自动循环调用连接,显示的效果为这两个回调函数会被一直调用
lk.onconnect = onconnect
lk.ondisconnect = ondisconnect
lk.onsubscribetopic = onsubscribetopic # 订阅topic回调
lk.onunsubscribetopic = onunsubscribetopic # 取消订阅topic回调
lk.ontopicmessage = ontopicmessage # 接收topic消息回调
linkkit.ontopicrrpcmessage = ontopicrrpcmessage # 接收RRPC请求回调
linkkit.onthingcallservice = onthingcallservice # 处理同步类型的service
lk.onthingenable = onthingenable # 物模型功能可用时回调
lk.onthingdisable = onthingdisable # 物模型功能不可用时回调
lk.onthingproppost = onthingproppost # 属性上报成功时回调
lk.onthingeventpost = onthingeventpost # 事件上报成功时回调
lk.connect_async() # 连接物联网平台
lk.startworkerloop() # 保持连接
print("connected")
#增加while循环的作用:保证物联网平台是连接上之后再进行通信的
while True:
try:
msg = input() # 获取从控制台的输入
except KeyboardInterrupt:
sys.exit()
else:
if msg == "1":
lk.disconnect()
elif msg == "2":
lk.connect_async()
elif msg == "3": # 输入为3时,订阅get这个topic,每个订阅的topic只需订阅一次即可,会在物联网平台的设备topic列表中显示
rc, mid = lk.subscribetopic(lk.tofull_topic("user/get")) # 注意topic只需要写成这样的格式即可,格式需要完全一致,全称会自动补全,不需要输入设备名
if rc == 0: # rc返回值为0时则表示订阅成功
print("subscribe topic success:%r, mid:%r" % (rc, mid))
else:
print("subscribe topic fail:%d" % rc)
elif msg == "4": # 取消订阅
rc, mid = lk.unsubscribetopic(lk.tofull_topic("user/get"))
if rc == 0:
print("unsubscribe topic success:%r, mid:%r" % (rc, mid))
else:
print("unsubscribe topic fail:%d" % rc)
elif msg == "5": # 发布消息“123”给自定义的reciver这个topic
rc, mid = lk.publishtopic(lk.tofull_topic("user/test"), "123")
if rc == 0:
print("publish topic success:%r, mid:%r" % (rc, mid))
else:
print("publish topic fail:%d" % rc)
elif msg == "6": # 同时订阅多个topic
rc, mid = lk.subscribetopic([(lk.tofull_topic("user/sender"), 1),
(lk.tofulltopic("user/get"), 1),
(lk.tofulltopic("user/test"), 1)])
if rc == 0:
print("subscribe multiple topics success:%r, mid:%r" % (rc, mid))
else:
print("subscribe multiple topics fail:%d" % rc)
elif msg == "7": # 同时取消订阅多个topic
rc, mid = lk.unsubscribetopic([lk.tofulltopic("user/get"), lk.tofull_topic("user/test")])
if rc == 0:
print("unsubscribe multiple topics success:%r, mid:%r" % (rc, mid))
else:
print("unsubscribe multiple topics fail:%d" % rc)
elif msg == "8": # RRPC
lk.ontopicmessage = ontopicmessage
elif msg == "11": # 物模型通信,属性上报
prop_data = {
"Test001": "hh",
"memory_usage": 99
}
rc, requestid = lk.thingpostproperty(propdata)
if rc == 0:
print("propertydata post success:%r, requestid:%r" % (rc, request_id))
else:
print("property_data post fail:%d" % rc)
elif msg == "12": # 物模型通信,事件1上报
event_data = {
"Testdata001": 100
}
rc, requestid = lk.thingtriggerevent(("Test001event",event_data))
if rc == 0:
print("eventdata post success:%r, requestid:%r" % (rc, request_id))
else:
print("event_data post fail:%d" % rc)
elif msg == "13": # 物模型通信,事件2上报
event_data = {
"Testdata002": 1
}
rc, requestid = lk.thingtriggerevent(("Test002event",event_data))
if rc == 0:
print("eventdata post success:%r, requestid:%r" % (rc, request_id))
else:
print("event_data post fail:%d" % rc)
elif msg == "98": # 打印topic列表?
ret = lk.dumpusertopics()
print("user topics:%s", str(ret))
elif msg == "99": # 断开连接
lk.destruct()
print("destructed")
else:
sys.exit()
RRPC代码
!/usr/bin/env python
coding=utf-8
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
client = AcsClient('LTAI************XR3', 'Dh******9fw', 'cn-shanghai')
request = CommonRequest()
request.setacceptformat('json')
request.set_domain('iot.cn-shanghai.aliyuncs.com')
request.set_method('POST')
request.setprotocoltype('https') # https | http
request.set_version('2018-01-20')
request.setactionname('RRpc')
request.addqueryparam('RegionId', "cn-shanghai")
request.addqueryparam('DeviceName', "Test1")
request.addqueryparam('Timeout', "5000")
request.addqueryparam('RequestBase64Byte', "dG*********Gxl")
request.addqueryparam('ProductKey', "a1V************Gk")
response = client.do_action(request)
python2: print(response)
print(str(response, encoding = 'utf-8'))
注意点
1、首先注意要配置物模型文件,即lk.thing_setup(“tsl.json”)
2、注意属性、事件和服务的上报方式各不相同
3、查看上报数据,如下图
4、RRPC并未使用成功,可忽略,也欢迎完善之后发回来,在此感谢(内部参数自己填写)
四、服务端开发
1、AMQP(https://help.aliyun.com/document_detail/142489.html?spm=a2c4g.11186623.6.623.3e36354e3xozA7)
(1)环境设置(Qpid Proton 0.29.0直接下载地址:下载网站)
1)Ubuntu18,Python2.7,Qpid Proton 0.29.0(当前测试环境,证实可用)
2)其他Linux系统下的python2.7和win10下的C++
3)测试proton是否可用
import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')
(2)实现(https://help.aliyun.com/document_detail/143597.html?spm=a2c4g.11186623.6.626.3800719ftRcJ40)
# encoding=utf-8
import sys
import logging
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
import hashlib
import hmac
import base64
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest())
class AmqpClient(MessagingHandler):
def __init__(self):
super(AmqpClient, self).__init__()
def on_start(self, event):
# 接入域名,请参见AMQP客户端接入说明文档。
url = "amqps://18************019.iot-amqp.cn-shanghai.aliyuncs.com:5671"
accessKey = "LTA*****************XR3"
accessSecret = "Dhc*************Q19fw"
consumerGroupId = "xoZ***********0100"
# iotInstanceId:购买的实例请填写实例ID,公共实例请填空字符串""。
iotInstanceId = ""
clientId = "test1"
# 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
signMethod = "hmacsha1"
timestamp = current_time_millis()
# userName组装方法,请参见AMQP客户端接入说明文档。
userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)
self.receiver = event.container.create_receiver(conn)
# 当连接成功建立被调用。
def on_connection_opened(self, event):
logger.info("Connection established, remoteUrl: %s", event.connection.hostname)
# 当连接关闭时被调用。
def on_connection_closed(self, event):
logger.info("Connection closed: %s", self)
# 当远端因错误而关闭连接时被调用。
def on_connection_error(self, event):
logger.info("Connection error")
# 当建立AMQP连接错误时被调用,包括身份验证错误和套接字错误。
def on_transport_error(self, event):
if event.transport.condition:
if event.transport.condition.info:
logger.error("%s: %s: %s" % (
event.transport.condition.name, event.transport.condition.description,
event.transport.condition.info))
else:
logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
else:
logging.error("Unspecified transport error")
# 当收到消息时被调用。
def on_message(self, event):
message = event.message
content = message.body.decode('utf-8')
topic = message.properties.get("topic")
message_id = message.properties.get("messageId")
print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))
event.receiver.flow(1)
Container(AmqpClient()).run()
注意
1、文中格式一定需要保持一致,accessKey、accessSecret注意要保证该RAM账号具有物联网平台权限
url:
consumerGroupId:
2、MNS(https://help.aliyun.com/document_detail/32305.html?spm=a2c4g.11186623.6.624.20b33dc31he3Dc,https://help.aliyun.com/document_detail/68948.html?spm=a2c4g.11186623.6.629.7c8b5608a7Kq8s)
(收费较高不考虑)
下载对应SDK后就在根目录进行调试
若是能够调试出接收消息的代码,请发给我,感谢
#!/usr/bin/env python
#coding=utf8
# Copyright (C) 2015, Alibaba Cloud Computing
#Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import sys
import time
from mns.account import Account
from mns.queue import *
from mns.topic import *
from mns.subscription import *
try:
import configparser as ConfigParser
except ImportError:
import ConfigParser as ConfigParser
cfgFN = "sample.cfg"
required_ops = [("Base", "AccessKeyId"), ("Base", "AccessKeySecret"), ("Base", "Endpoint")]
optional_ops = [("Optional", "SecurityToken")]
parser = ConfigParser.ConfigParser()
parser.read(cfgFN)
for sec,op in required_ops:
if not parser.has_option(sec, op):
sys.stderr.write("ERROR: need (%s, %s) in %s.\n" % (sec,op,cfgFN))
sys.stderr.write("Read README to get help inforamtion.\n")
sys.exit(1)
#获取配置信息
## AccessKeyId 阿里云官网获取
## AccessKeySecret 阿里云官网获取
## Endpoint 阿里云消息和通知服务官网获取, Example: http://$AccountId.mns.cn-hangzhou.aliyuncs.com
## WARNING: Please do not hard code your accessId and accesskey in next line.(more information: https://yq.aliyun.com/articles/55947)
accessKeyId = parser.get("Base", "AccessKeyId")
accessKeySecret = parser.get("Base", "AccessKeySecret")
endpoint = parser.get("Base", "Endpoint")
securityToken = ""
if parser.has_option("Optional", "SecurityToken") and parser.get("Optional", "SecurityToken") != "$SecurityToken":
securityToken = parser.get("Optional", "SecurityToken")
#初始化my_account
my_account = Account(endpoint, accessKeyId, accessKeySecret, securityToken)
##############Queue 相关操作#####################
my_queue = my_account.get_queue("MyQueue-%s" % time.strftime("%y%m%d-%H%M%S", time.localtime()))
#创建队列
## message被receive后,持续不可消费的时间 100秒
## message body的最大长度 10240Byte
## message最长存活时间 3600秒
## 新message可消费的默认延迟时间 10秒
## receive message时,长轮询时间 20秒
queue_meta = QueueMeta()
queue_meta.set_visibilitytimeout(100)
queue_meta.set_maximum_message_size(10240)
queue_meta.set_message_retention_period(3600)
queue_meta.set_delay_seconds(10)
queue_meta.set_polling_wait_seconds(20)
queue_meta.set_logging_enabled(True)
try:
queue_url = my_queue.create(queue_meta)
sys.stdout.write("Create Queue Succeed!\nQueueURL:%s\n\n" % queue_url)
except MNSExceptionBase as e:
sys.stderr.write("Create Queue Fail!\nException:%s\n\n" % e)
sys.exit(1)
#修改队列属性
## message被receive后,持续不可消费的时间 50秒
## message body的最大长度 5120Byte
## message最长存活时间 1800秒
## 新message可消费的默认延迟时间 5秒
## receive message时,长轮询时间 10秒
queue_meta = QueueMeta()
queue_meta.set_visibilitytimeout(50)
queue_meta.set_maximum_message_size(5120)
queue_meta.set_message_retention_period(1800)
queue_meta.set_delay_seconds(5)
queue_meta.set_polling_wait_seconds(10)
try:
queue_url = my_queue.set_attributes(queue_meta)
sys.stdout.write("Set Queue Attributes Succeed!\n\n")
except MNSExceptionBase as e:
sys.stderr.write("Set Queue Attributes Fail!\nException:%s\n\n" % e)
sys.exit(1)
#获取队列属性
## 除可设置属性外,返回如下属性:
## ActiveMessages: 可消费消息数,近似值
## InactiveMessages: 正在被消费的消息数,近似值
## DelayMessages: 延迟消息数,近似值
## CreateTime: queue创建时间,单位:秒
## LastModifyTime: 修改queue属性的最近时间,单位:秒
try:
queue_meta = my_queue.get_attributes()
sys.stdout.write("Get Queue Attributes Succeed! \
\nQueueName: %s\nVisibilityTimeout: %s \
\nMaximumMessageSize: %s\nDelaySeconds: %s \
\nPollingWaitSeconds: %s\nActiveMessages: %s \
\nInactiveMessages: %s\nDelayMessages: %s \
\nCreateTime: %s\nLastModifyTime: %s\n\n" %
(queue_meta.queue_name, queue_meta.visibility_timeout,
queue_meta.maximum_message_size, queue_meta.delay_seconds,
queue_meta.polling_wait_seconds, queue_meta.active_messages,
queue_meta.inactive_messages, queue_meta.delay_messages,
queue_meta.create_time, queue_meta.last_modify_time))
except MNSExceptionBase as e:
sys.stderr.write("Get Queue Attributes Fail!\nException:%s\n\n" % e)
sys.exit(1)
#列出所有队列
## prefix 指定queue name前缀
## ret_number 单次list_queue最大返回队列个数
## marker list_queue的开始位置; 当一次list queue不能列出所有队列时,返回的next_marker作为下一次list queue的marker参数
try:
prefix = u""
ret_number = 10
marker = u""
total_qcount = 0
while(True):
queue_url_list, next_marker = my_account.list_queue(prefix, ret_number, marker)
total_qcount += len(queue_url_list)
for queue_url in queue_url_list:
sys.stdout.write("QueueURL:%s\n" % queue_url)
if(next_marker == ""):
break
marker = next_marker
sys.stdout.write("List Queue Succeed! Total Queue Count:%s!\n\n" % total_qcount)
except MNSExceptionBase as e:
sys.stderr.write("List Queue Fail!\nException:%s\n\n" % e)
sys.exit(1)
#发送消息
## set_delayseconds 设置消息的延迟时间,单位:秒
## set_priority 设置消息的优先级
## 返回如下属性:
## MessageId 消息编号
## MessageBodyMd5 消息正文的MD5值
msg_body = "I am test Message."
message = Message(msg_body)
message.set_delayseconds(0)
message.set_priority(10)
try:
send_msg = my_queue.send_message(message)
sys.stdout.write("Send Message Succeed.\nMessageBody:%s\nMessageId:%s\nMessageBodyMd5:%s\n\n" % (msg_body, send_msg.message_id, send_msg.message_body_md5))
except MNSExceptionBase as e:
sys.stderr.write("Send Message Fail!\nException:%s\n\n" % e)
sys.exit(1)
推荐阅读
-
阿里云物联网平台最完全的使用教程
-
【Python】使用Python模拟设备接入阿里云物联网的MQTT服务器
-
阿里云物联网平台边缘计算(LinkIotEdge)实例使用体验
-
《ServerSuperIO Designer IDE使用教程》- 6.增加与阿里云物联网(IOT)对接服务,实现数据交互。发布:v4.2.4 版本
-
支付宝小程序使用MQTT over WebSocket连接阿里云IoT物联网平台
-
《ServerSuperIO Designer IDE使用教程》- 6.增加与阿里云物联网(IOT)对接服务,实现数据交互。发布:v4.2.4 版本
-
【Python】使用Python模拟设备接入阿里云物联网的MQTT服务器
-
支付宝小程序使用MQTT over WebSocket连接阿里云IoT物联网平台