阿里云 IOT 使用【随笔】
Aliyun IOT 使用
服务端订阅
官网:https://help.aliyun.com/document_detail/142376.html?spm=a2c4g.11186623.6.622.46b92cf0vmZSwV
准备工作
一、首先开通并进入阿里云物联网平台,创建一个公共实例/企业版实例【公共实例可用于测试,生产最好用企业版实例】
二、点击进入实例,创建产品,参数根据实际情况输入。
三、产品创建完成后,添加设备
四、若想测试发布订阅消息,还需要添加自定义的 topic 主题【当然也可以用阿里官方提供端】以及 创建服务端订阅
服务端订阅编码实现【Java】
工程下导入 pom 依赖
<!--aliyun core-->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.6</version>
</dependency>
<!--aliyun Iot-->
<!-- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-iot -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-iot</artifactId>
<version>7.16.0</version>
</dependency>
<!-- IOT用于监听阿里平台消息 -->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>iot-client-message</artifactId>
<version>1.1.5</version>
</dependency>
<!-- amqp 1.0 qpid client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.47.0</version>
</dependency>
<!-- util for base64-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
在项目目录下创建一个
config.properties
配置文件,具体参数如下:
## 阿里云**ID
iot.accessKeyID=*************
# 阿里云**
iot.accessKeySecret=*************
iot.uid=**********
# 要访问的iot的regionId 目前支持的 cn-shanghai(华东2)、ap-southeast-1(新加坡) 、us-west-1(美西)
iot.regionId=cn-shenzhen
#iot套件对应的产品code 保持不变即可
iot.productCode=Iot
#Iot api的服务地址 跟regionId对应 这是华东2的
iot.domain=iot.${iot.regionId}.aliyuncs.com
#Iot api 的版本
iot.version=2020-01-20
# 消费组ID
iot.consumerGroupId=DEFAULT_GROUP
# 企业版iot实例ID,公共版没有
iot.iotInstanceId=*************
# 签名方法:支持hmacmd5、hmacsha1和hmacsha256。
iot.signMethod=hmacsha1
# iot公网终端节点url,企业版
iot.amqp.connectionUrl=*************
参数介绍:
【accessKeyID 、accessKeySecret 、uid】 为阿里云用户信息;建议自己创建一个只具有 IOT 权限的用户步骤:创建用户,设置权限,创建 accessKeySecret
【regionId、productCode、domain、version】regionId 为区域ID,productCode 固定为 iot,domain 为区域对应的访问地址,version 为版本号;这些参数都很好获取就不做介绍了。
【consumerGroupId、iotInstanceId、signMethod,connectionUrl】consumerGroupId 为分组id,在上面创建订阅时右边的就是;iotInstanceId:为企业版实例的ID,公共实例不需要;signMethod 为加密方式,固定为上面给出的即可;connectionUrl:为企业版实例 AMQP 接入方式的 url
创建 IOT 连接工具类
import com.akieay.cloudprint.common.util.aliyun.iot.util.LogUtil;
import com.aliyun.openservices.iot.api.Profile;
import com.aliyun.openservices.iot.api.message.MessageClientFactory;
import com.aliyun.openservices.iot.api.message.api.MessageClient;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Properties;
/**
* iot相关配置信息,以及client的生产
* @author akieay
*/
public class IotConnectionUtil {
/**
* 阿里云
*/
private static String accessKeyID;
private static String accessKeySecret;
private static String uid;
private static String regionId;
private static String product;
private static String domain;
private static String version;
private static String consumerGroupId;
private static String iotInstanceId;
private static String signMethod;
private static String amqpConnectionUrl;
private static String CONNECTION_NAME= "SBCF";
static {
Properties prop = new Properties();
try {
prop.load(Object.class.getResourceAsStream("/config.properties"));
accessKeyID = prop.getProperty("iot.accessKeyID");
accessKeySecret = prop.getProperty("iot.accessKeySecret");
uid = prop.getProperty("iot.uid");
regionId = prop.getProperty("iot.regionId");
product = prop.getProperty("iot.productCode");
domain = prop.getProperty("iot.domain");
version = prop.getProperty("iot.version");
consumerGroupId = prop.getProperty("iot.consumerGroupId");
iotInstanceId = prop.getProperty("iot.iotInstanceId");
signMethod = prop.getProperty("iot.signMethod");
amqpConnectionUrl = prop.getProperty("iot.amqp.connectionUrl");
} catch (IOException e) {
e.printStackTrace();
}
}
public static String getAmqpConnectionUrl() {
//企业版实例
// return "failover:(amqps://"+amqpConnectionUrl+":5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";
//公共实例
return "failover:(amqps://"+uid+".iot-amqp."+regionId+".aliyuncs.com:5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";
}
public static Connection getConnectionByEnterpriseInstance( Hashtable<String, String> hashtable ) throws Exception {
Connection connection = null;
try {
long timeStamp = System.currentTimeMillis();
String userName = uid + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKeyID
// //公共版不需要该参数,企业版必须填写该参数
// + ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
String signContent = "authId=" + accessKeyID + "×tamp=" + timeStamp;
String password = doSign(signContent,accessKeySecret, signMethod);
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup(CONNECTION_NAME);
connection = cf.createConnection(userName, password);
} catch (Exception e) {
LogUtil.print("初始化messageClient失败!exception:" + e.getMessage());
}
return connection;
}
public static MessageClient getMessageClientByPublicInstance() {
MessageClient messageClient = null;
try {
String endPoint = "https://" + uid + ".iot-as-http2." + regionId + ".aliyuncs.com";
// 连接配置
Profile profile = Profile.getAccessKeyProfile(endPoint, regionId, accessKeyID, accessKeySecret);
// 构造客户端
messageClient = MessageClientFactory.messageClient(profile);
} catch (Exception e) {
LogUtil.print("初始化messageClient失败!exception:" + e.getMessage());
}
return messageClient;
}
/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
public static DefaultAcsClient getClient() {
DefaultAcsClient client = null;
try {
IClientProfile profile = DefaultProfile.getProfile(regionId, accessKeyID, accessKeySecret);
DefaultProfile.addEndpoint(regionId, product, domain);
// 初始化client
client = new DefaultAcsClient(profile);
} catch (Exception e) {
LogUtil.print("初始化client失败!exception:" + e.getMessage());
}
return client;
}
public static String getRegionId() {
return regionId;
}
public static void setRegionId(String regionId) {
IotConnectionUtil.regionId = regionId;
}
public static String getDomain() {
return domain;
}
public static void setDomain(String domain) {
IotConnectionUtil.domain = domain;
}
public static String getVersion() {
return version;
}
public static void setVersion(String version) {
IotConnectionUtil.version = version;
}
}
创建服务端订阅业务类
import com.akieay.cloudprint.common.util.aliyun.iot.connection.IotConnectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author akieay
* @Date: 2020/11/18 11:09
*/
@Slf4j
public class ServerSideSubscription {
/**
* 业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
*/
private final static ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50000));
public static void main(String[] args) throws Exception {
Hashtable<String, String> hashtable = new Hashtable<>();
String connectionUrl = IotConnectionUtil.getAmqpConnectionUrl();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Connection connection = IotConnectionUtil.getConnectionByEnterpriseInstance(hashtable);
Context context = new InitialContext(hashtable);
Destination queue = (Destination)context.lookup("QUEUE");
((JmsConnection) connection).addConnectionListener(jmsConnectionListener);
// 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建Receiver连接。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
/**
* 消息监听器
*/
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(() -> processMessage(message));
} catch (Exception e) {
log.error("submit task occurs exception ", e);
}
}
};
/**
* 在这里处理您收到消息后的具体业务逻辑。
*/
private static void processMessage(Message message) {
try {
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
byte[] body = message.getBody(byte[].class);
String content = null;
if (null != body) {
content = new String(body);
}
System.out.println("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
} catch (Exception e) {
log.error("processMessage occurs error ", e);
}
}
/**
* 连接状态监听器
*/
private static JmsConnectionListener jmsConnectionListener = new JmsConnectionListener() {
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
log.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error) {
log.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
log.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
log.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
@Override
public void onSessionClosed(Session session, Throwable cause) {}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};
}
设备端接入
以下介绍的设备端接入为 java 版,主要用于配合服务端订阅的调试
官网:https://help.aliyun.com/document_detail/97331.html?spm=a2c4g.11186623.6.675.7636277c2CMzcp
设备端接入编码实现
导入 pom 依赖
<!--aliyun client-->
<dependency>
<groupId>com.aliyun.alink.linksdk</groupId>
<artifactId>iot-linkkit-java</artifactId>
<version>1.2.0.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.25</version>
<scope>compile</scope>
</dependency>
客户端接入业务类
import com.akieay.cloudprint.common.util.aliyun.iot.constant.TopicConstant;
import com.aliyun.alink.dm.api.DeviceInfo;
import com.aliyun.alink.dm.api.InitResult;
import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
import com.aliyun.alink.linkkit.api.LinkKit;
import com.aliyun.alink.linkkit.api.LinkKitInitParams;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
import com.aliyun.alink.linksdk.tools.AError;
import lombok.extern.slf4j.Slf4j;
/**
* @author akieay
* @Date: 2020/11/18 16:59
*/
@Slf4j
public class ClientSideSubscription {
public static void main(String[] args) {
/**
* 主要注意参数 channelHost,公共实例格式为:{productKey}+".iot-as-mqtt."+{regionId}+".aliyuncs.com:1883"
* 企业版实例格式为:实现详情下的公网终端节点(Endpoint)中的 AMQP 的路径 + ":1883"
*/
new ClientSideSubscription().init("产品Key", "设备deviceName",
"设备deviceSecret", "你的channelHost");
}
public void init(String productKey, String deviceName, String deviceSecret, String channelHost){
LinkKitInitParams params = new LinkKitInitParams();
/**
* 设置 Mqtt 初始化参数
*/
IoTMqttClientConfig config = new IoTMqttClientConfig();
config.productKey = productKey;
config.deviceName = deviceName;
config.deviceSecret = deviceSecret;
config.channelHost = channelHost;
/**
* 是否接受离线消息
* 对应 mqtt 的 cleanSession 字段
*/
config.receiveOfflineMsg = false;
params.mqttClientConfig = config;
/**
* 设置初始化三元组信息,用户传入
*/
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.productKey = productKey;
deviceInfo.deviceName = deviceName;
deviceInfo.deviceSecret = deviceSecret;
params.deviceInfo = deviceInfo;
//初始化连接
LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
@Override
public void onError(AError aError) {
log.info("Mqtt connect fail");
}
@Override
public void onInitDone(InitResult initResult) {
log.info("Mqtt connect success");
// 订阅
MqttSubscribeRequest request = new MqttSubscribeRequest();
// topic 用户根据实际场景填写
request.topic = "/" + productKey + "/" + deviceName + TopicConstant.TEST_TOPIC;
request.isSubscribe = true;
LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {
@Override
public void onSuccess() {
// 订阅成功
log.info("subscribe topic " + request.topic + " success");
// 注册下行监听,包括长连接的状态和云端下行的数据
LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
@Override
public void onNotify(String connectId, String topic, AMessage aMessage) {
// 云端下行数据回调
// connectId 连接类型 topic 下行 topic; aMessage 下行数据
String data = new String((byte[]) aMessage.data);
log.info("topic: " + topic + " \t data: " + data);
// pushData 示例 {"method":"thing.service.test_service","id":"123374967","params":{"vv":60},"version":"1.0.0"}
// method 服务类型; params 下推数据内容
}
@Override
public boolean shouldHandle(String connectId, String topic) {
// 选择是否不处理某个 topic 的下行数据
// 如果不处理某个topic,则onNotify不会收到对应topic的下行数据
return true; //TODO 根基实际情况设置
}
@Override
public void onConnectStateChange(String connectId, ConnectState connectState) {
log.info(connectId, connectState);
// 对应连接类型的连接状态变化回调,具体连接状态参考 SDK ConnectState
}
});
}
@Override
public void onFailure(AError aError) {
// 订阅失败
log.info("onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
}
});
}
});
}
}
下一篇: 2020-11-12