欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

阿里云 IOT 使用【随笔】

程序员文章站 2022-05-15 14:14:02
...

Aliyun IOT 使用

服务端订阅

官网:https://help.aliyun.com/document_detail/142376.html?spm=a2c4g.11186623.6.622.46b92cf0vmZSwV

准备工作

一、首先开通并进入阿里云物联网平台,创建一个公共实例/企业版实例【公共实例可用于测试,生产最好用企业版实例】

阿里云 IOT 使用【随笔】

二、点击进入实例,创建产品,参数根据实际情况输入。

阿里云 IOT 使用【随笔】

三、产品创建完成后,添加设备

阿里云 IOT 使用【随笔】

四、若想测试发布订阅消息,还需要添加自定义的 topic 主题【当然也可以用阿里官方提供端】以及 创建服务端订阅

阿里云 IOT 使用【随笔】

阿里云 IOT 使用【随笔】

服务端订阅编码实现【Java】

工程下导入 pom 依赖

  <!--aliyun core-->
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-core</artifactId>
    <version>4.5.6</version>
</dependency>
<!--aliyun Iot-->
<!-- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-iot -->
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-iot</artifactId>
    <version>7.16.0</version>
</dependency>
<!-- IOT用于监听阿里平台消息 -->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>iot-client-message</artifactId>
    <version>1.1.5</version>
</dependency>
<!-- amqp 1.0 qpid client -->
<dependency>
    <groupId>org.apache.qpid</groupId>
    <artifactId>qpid-jms-client</artifactId>
    <version>0.47.0</version>
</dependency>
<!-- util for base64-->
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.10</version>
</dependency>

在项目目录下创建一个 config.properties 配置文件,具体参数如下:

## 阿里云**ID
iot.accessKeyID=*************
# 阿里云**
iot.accessKeySecret=*************
iot.uid=**********
# 要访问的iot的regionId 目前支持的 cn-shanghai(华东2)、ap-southeast-1(新加坡) 、us-west-1(美西)
iot.regionId=cn-shenzhen
#iot套件对应的产品code 保持不变即可
iot.productCode=Iot
#Iot api的服务地址 跟regionId对应 这是华东2的
iot.domain=iot.${iot.regionId}.aliyuncs.com
#Iot api 的版本
iot.version=2020-01-20
# 消费组ID
iot.consumerGroupId=DEFAULT_GROUP
# 企业版iot实例ID,公共版没有
iot.iotInstanceId=*************
# 签名方法:支持hmacmd5、hmacsha1和hmacsha256。
iot.signMethod=hmacsha1
# iot公网终端节点url,企业版
iot.amqp.connectionUrl=*************

参数介绍:

【accessKeyID 、accessKeySecret 、uid】 为阿里云用户信息;建议自己创建一个只具有 IOT 权限的用户步骤:创建用户,设置权限,创建 accessKeySecret

阿里云 IOT 使用【随笔】

阿里云 IOT 使用【随笔】

阿里云 IOT 使用【随笔】

【regionId、productCode、domain、version】regionId 为区域ID,productCode 固定为 iot,domain 为区域对应的访问地址,version 为版本号;这些参数都很好获取就不做介绍了。

【consumerGroupId、iotInstanceId、signMethod,connectionUrl】consumerGroupId 为分组id,在上面创建订阅时右边的就是;iotInstanceId:为企业版实例的ID,公共实例不需要;signMethod 为加密方式,固定为上面给出的即可;connectionUrl:为企业版实例 AMQP 接入方式的 url

阿里云 IOT 使用【随笔】

阿里云 IOT 使用【随笔】

阿里云 IOT 使用【随笔】

创建 IOT 连接工具类

import com.akieay.cloudprint.common.util.aliyun.iot.util.LogUtil;
import com.aliyun.openservices.iot.api.Profile;
import com.aliyun.openservices.iot.api.message.MessageClientFactory;
import com.aliyun.openservices.iot.api.message.api.MessageClient;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import org.apache.commons.codec.binary.Base64;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Properties;

/**
 * iot相关配置信息,以及client的生产
 * @author akieay
 */
public class IotConnectionUtil {
    /**
     * 阿里云
     */
	private static String accessKeyID;
	private static String accessKeySecret;
	private static String uid;
	private static String regionId;
	private static String product;
    private static String domain;
    private static String version;

    private static String consumerGroupId;
    private static String iotInstanceId;
    private static String signMethod;
    private static String amqpConnectionUrl;
    private static String CONNECTION_NAME= "SBCF";

    static {
        Properties prop = new Properties();
        try {
            prop.load(Object.class.getResourceAsStream("/config.properties"));
            accessKeyID = prop.getProperty("iot.accessKeyID");
            accessKeySecret = prop.getProperty("iot.accessKeySecret");
            uid = prop.getProperty("iot.uid");
            regionId = prop.getProperty("iot.regionId");
            product = prop.getProperty("iot.productCode");
            domain = prop.getProperty("iot.domain");
            version = prop.getProperty("iot.version");

            consumerGroupId = prop.getProperty("iot.consumerGroupId");
            iotInstanceId = prop.getProperty("iot.iotInstanceId");
            signMethod = prop.getProperty("iot.signMethod");
            amqpConnectionUrl = prop.getProperty("iot.amqp.connectionUrl");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static String getAmqpConnectionUrl() {
        //企业版实例
//       return "failover:(amqps://"+amqpConnectionUrl+":5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";
        //公共实例
        return "failover:(amqps://"+uid+".iot-amqp."+regionId+".aliyuncs.com:5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";
    }

    public static Connection getConnectionByEnterpriseInstance( Hashtable<String, String> hashtable ) throws Exception {
        Connection connection = null;
        try {
            long timeStamp = System.currentTimeMillis();
            String userName = uid + "|authMode=aksign"
                    + ",signMethod=" + signMethod
                    + ",timestamp=" + timeStamp
                    + ",authId=" + accessKeyID
//                   //公共版不需要该参数,企业版必须填写该参数
//                    + ",iotInstanceId=" + iotInstanceId
                    + ",consumerGroupId=" + consumerGroupId
                    + "|";
            String signContent = "authId=" + accessKeyID + "&timestamp=" + timeStamp;
            String password = doSign(signContent,accessKeySecret, signMethod);

            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory)context.lookup(CONNECTION_NAME);
            connection = cf.createConnection(userName, password);
        } catch (Exception e) {
            LogUtil.print("初始化messageClient失败!exception:" + e.getMessage());
        }
        return connection;
    }

    public static MessageClient getMessageClientByPublicInstance() {
        MessageClient messageClient = null;
        try {
            String endPoint = "https://" + uid + ".iot-as-http2." + regionId + ".aliyuncs.com";

            // 连接配置
            Profile profile = Profile.getAccessKeyProfile(endPoint, regionId, accessKeyID, accessKeySecret);
            // 构造客户端
            messageClient = MessageClientFactory.messageClient(profile);
        } catch (Exception e) {
            LogUtil.print("初始化messageClient失败!exception:" + e.getMessage());
        }

        return messageClient;
    }

    /**
     * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
     */
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);
    }

    public static DefaultAcsClient getClient() {
		DefaultAcsClient client = null;
		try {
			IClientProfile profile = DefaultProfile.getProfile(regionId, accessKeyID, accessKeySecret);
			DefaultProfile.addEndpoint(regionId, product, domain);
			// 初始化client
			client = new DefaultAcsClient(profile);

		} catch (Exception e) {
			LogUtil.print("初始化client失败!exception:" + e.getMessage());
		}

		return client;
	}


    public static String getRegionId() {
        return regionId;
    }

    public static void setRegionId(String regionId) {
        IotConnectionUtil.regionId = regionId;
    }

    public static String getDomain() {
        return domain;
    }

    public static void setDomain(String domain) {
        IotConnectionUtil.domain = domain;
    }

    public static String getVersion() {
        return version;
    }

    public static void setVersion(String version) {
        IotConnectionUtil.version = version;
    }
}

创建服务端订阅业务类

import com.akieay.cloudprint.common.util.aliyun.iot.connection.IotConnectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author akieay
 * @Date: 2020/11/18 11:09
 */
@Slf4j
public class ServerSideSubscription {
    /**
     * 业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
     */
    private final static ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000));

    public static void main(String[] args) throws Exception {
        Hashtable<String, String> hashtable = new Hashtable<>();
        String connectionUrl = IotConnectionUtil.getAmqpConnectionUrl();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");

        Connection connection = IotConnectionUtil.getConnectionByEnterpriseInstance(hashtable);
        Context context = new InitialContext(hashtable);
        Destination queue = (Destination)context.lookup("QUEUE");

        ((JmsConnection) connection).addConnectionListener(jmsConnectionListener);
        // 创建会话。
        // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
        // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // 创建Receiver连接。
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    /**
     * 消息监听器
     */
    private static MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                //1.收到消息之后一定要ACK。
                // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
                // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
                // message.acknowledge();
                //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
                // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
                executorService.submit(() -> processMessage(message));
            } catch (Exception e) {
                log.error("submit task occurs exception ", e);
            }
        }
    };

    /**
     * 在这里处理您收到消息后的具体业务逻辑。
     */
    private static void processMessage(Message message) {
        try {
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
            byte[] body = message.getBody(byte[].class);
            String content = null;
            if (null != body) {
                content = new String(body);
            }
            System.out.println("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
        } catch (Exception e) {
            log.error("processMessage occurs error ", e);
        }
    }

    /**
     * 连接状态监听器
     */
    private static JmsConnectionListener jmsConnectionListener = new JmsConnectionListener() {
        /**
         * 连接成功建立。
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            log.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }

        /**
         * 尝试过最大重试次数之后,最终连接失败。
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            log.error("onConnectionFailure, {}", error.getMessage());
        }

        /**
         * 连接中断。
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            log.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }

        /**
         * 连接中断后又自动重连上。
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            log.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}
    };

}

设备端接入

以下介绍的设备端接入为 java 版,主要用于配合服务端订阅的调试
官网:https://help.aliyun.com/document_detail/97331.html?spm=a2c4g.11186623.6.675.7636277c2CMzcp

设备端接入编码实现

导入 pom 依赖

<!--aliyun client-->
<dependency>
    <groupId>com.aliyun.alink.linksdk</groupId>
    <artifactId>iot-linkkit-java</artifactId>
    <version>1.2.0.1</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.1</version>
    <scope>compile</scope>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.25</version>
    <scope>compile</scope>
</dependency>

客户端接入业务类

import com.akieay.cloudprint.common.util.aliyun.iot.constant.TopicConstant;
import com.aliyun.alink.dm.api.DeviceInfo;
import com.aliyun.alink.dm.api.InitResult;
import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
import com.aliyun.alink.linkkit.api.LinkKit;
import com.aliyun.alink.linkkit.api.LinkKitInitParams;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
import com.aliyun.alink.linksdk.tools.AError;
import lombok.extern.slf4j.Slf4j;

/**
 * @author akieay
 * @Date: 2020/11/18 16:59
 */
@Slf4j
public class ClientSideSubscription {

    public static void main(String[] args) {
        /**
         * 主要注意参数 channelHost,公共实例格式为:{productKey}+".iot-as-mqtt."+{regionId}+".aliyuncs.com:1883"
         * 企业版实例格式为:实现详情下的公网终端节点(Endpoint)中的 AMQP 的路径 + ":1883"
         */
        new ClientSideSubscription().init("产品Key", "设备deviceName",
                "设备deviceSecret", "你的channelHost");
    }

    public void init(String productKey, String deviceName, String deviceSecret, String channelHost){
        LinkKitInitParams params = new LinkKitInitParams();

        /**
         * 设置 Mqtt 初始化参数
         */
        IoTMqttClientConfig config = new IoTMqttClientConfig();
        config.productKey = productKey;
        config.deviceName = deviceName;
        config.deviceSecret = deviceSecret;
        config.channelHost = channelHost;
        /**
         * 是否接受离线消息
         * 对应 mqtt 的 cleanSession 字段
         */
        config.receiveOfflineMsg = false;
        params.mqttClientConfig = config;

        /**
         * 设置初始化三元组信息,用户传入
         */
        DeviceInfo deviceInfo = new DeviceInfo();
        deviceInfo.productKey = productKey;
        deviceInfo.deviceName = deviceName;
        deviceInfo.deviceSecret = deviceSecret;
        params.deviceInfo = deviceInfo;

        //初始化连接
        LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
            @Override
            public void onError(AError aError) {
               log.info("Mqtt connect fail");
            }

            @Override
            public void onInitDone(InitResult initResult) {
                log.info("Mqtt connect success");

                // 订阅
                MqttSubscribeRequest request = new MqttSubscribeRequest();
                // topic 用户根据实际场景填写
                request.topic = "/" + productKey + "/" + deviceName + TopicConstant.TEST_TOPIC;
                request.isSubscribe = true;
                LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {
                    @Override
                    public void onSuccess() {
                        // 订阅成功
                        log.info("subscribe topic " + request.topic + " success");

                        // 注册下行监听,包括长连接的状态和云端下行的数据
                        LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
                            @Override
                            public void onNotify(String connectId, String topic, AMessage aMessage) {
                                // 云端下行数据回调
                                // connectId 连接类型 topic 下行 topic; aMessage 下行数据
                                String data = new String((byte[]) aMessage.data);
                                log.info("topic: " + topic + " \t data: " + data);
                                // pushData 示例  {"method":"thing.service.test_service","id":"123374967","params":{"vv":60},"version":"1.0.0"}
                                // method 服务类型; params 下推数据内容
                            }

                            @Override
                            public boolean shouldHandle(String connectId, String topic) {
                                // 选择是否不处理某个 topic 的下行数据
                                // 如果不处理某个topic,则onNotify不会收到对应topic的下行数据
                                return true; //TODO 根基实际情况设置
                            }

                            @Override
                            public void onConnectStateChange(String connectId, ConnectState connectState) {
                                log.info(connectId, connectState);
                                // 对应连接类型的连接状态变化回调,具体连接状态参考 SDK ConnectState
                            }
                        });
                    }

                    @Override
                    public void onFailure(AError aError) {
                        // 订阅失败
                        log.info("onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
                    }
                });
            }
        });
    }
}
相关标签: 阿里云