欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于mosquitto库搭建mqtt客户端发布实现与阿里云的通信

程序员文章站 2022-06-16 18:44:00
在学习此之前,我们要先明白iniparser库以及cJSON库,为什么要学习这个呢?首先iniparser库可以对ini文件进行解析、设置、删除等操作,我们知道当我们与阿里云进行通信时,需要很多参数,比如要知道阿里云的域名,端口、以及username、passwd、客户端id以及发布的主题、服务质量都需要进行配置,所以使用iniparsesr库可以解决很多配置的问题。如下图是我写的ini文件关于iniparser库的学习可以参考以下我之前写的博客,博客链接为:https://blog.csdn.net...

我们先看一下搭建mqtt的基本流程图吧(不是项目的流程图,我简单画的一个)。
基于mosquitto库搭建mqtt客户端发布实现与阿里云的通信
关于调用mosquitto库中的函数的学习,可以参考一下我写的常见mosquitto库中函数的学习:https://blog.csdn.net/makunIT/article/details/107283260

在写代码的时候,我用到了iniparser库和cJSON库,我们要先明白iniparser库以及cJSON库,为什么要学习这个呢?首先iniparser库可以对ini文件进行解析、设置、删除等操作,我们知道当我们与阿里云进行通信时,需要很多参数,比如要知道阿里云的域名,端口、以及username、passwd、客户端id以及发布的主题、服务质量都需要进行配置,所以使用iniparsesr库可以解决很多配置的问题。如下图是我写的ini文件
基于mosquitto库搭建mqtt客户端发布实现与阿里云的通信
关于iniparser库的学习可以参考以下我之前写的博客,博客链接为:
https://blog.csdn.net/makunIT/article/details/107209915

那么我们为什么还需要学习cJSON库那,cJSON是一个超轻巧,携带方便,单文件,简答的可以作为ANSI-C标准的解析器。而json是一种在服务器端和客户端交换数据的轻量级数据格式,而对于阿里云的上报会比较严格,还需要相同的json格式发布阿里云平台才能收到,而cJSON库的使用可以创建josn格式的数据。
关于cJSON的学习可以参考一下我的博客:https://blog.csdn.net/makunIT/article/details/107199000
基于mosquitto库搭建mqtt客户端发布实现与阿里云的通信
以上是我创建的json格式的数据,我们将把这个消息发送给阿里云。

我们先来看一下我这段代码的作用,这段代码是我写的通过传参的形式来配置连接阿里云之间的通信的,为什么我们用ini文件配置了为什么还要传连接时候的参数那,因为这样可以实现代码的利用率,这样我们再阿里云再创建一个设备时,我们可以直接通过参数来连接阿里云,这样也会更加方便一点。

int set_mqtt_conf(char *path_ini, char *host, int port, char *clientid, char *user, char *passwd, char *topic)
{
    FILE        *fp = NULL;
    dictionary  *ini = NULL;
    char        * mqtt_port;
    char          mqtt_pot[16];

    if(port)
    {
        snprintf(mqtt_pot, sizeof(mqtt_pot), "%d" , port);
        mqtt_port = mqtt_pot;
    }

    ini = iniparser_load(path_ini);

    if(ini == NULL)
    {
        printf("iniparser failure \n");
        return -1;
    }
    /*配置参数*/
    iniparser_set(ini, "aly_address:host", host);
    iniparser_set(ini, "aly_address:port", mqtt_port);
    iniparser_set(ini, "clientId:id", clientid);
    iniparser_set(ini, "user_passwd:username", user);
    iniparser_set(ini, "user_passwd:passwd", passwd);
    iniparser_set(ini, "pub_topic:topic", topic);
    /*有些参数后面再配置*/

    fp = fopen(path_ini, "w");
    if(fp == NULL)
    {
        printf("stone: fopen error!\n");
        return -2;
    }
    
    iniparser_dump_ini(ini, fp);

    iniparser_freedict(ini);

    return 0;
}

基于mosquitto库搭建mqtt客户端发布实现与阿里云的通信
我们再来看这段代码,这段代码的功能就是解析ini文件,实现连接阿里云,实现两者的通信,我们使用iniparser库获取ini文件的配置,然后填充结构体,此结构体的内容将会在发布的时候要用到。

int gain_mqtt_conf(char *path_ini, mqtt_ctx_t *mqtt)
{
    dictionary    *ini = NULL;
    const char    *hostname;
    int           port;
    const char    *username;
    const  char   *passwd;
    const  char   *clientid;
    const  char   *pubTopic;
    int           pubQos;
    int           keepalive;

    const   char *method;
    const   char *id;
    const   char *identifier;
    const   char *version;

    if(!path_ini || !mqtt)
    {   
        printf("Invalid input argument in %s!\n",__FUNCTION__);
        return -1; 
    }   

    /* 加载配置文件,将数据存于dictionary结构中*/
    ini = iniparser_load(path_ini);
    if(ini == NULL)
    {   
        printf("iniparser_load failure!\n");
        return -2; 
    }


    /*获取字符串,若未找到返回第三个参数的内容*/
    hostname = iniparser_getstring(ini,"aly_address:host", DEF_BORKER_HOSTNAME);
    port = iniparser_getint(ini,"aly_address:port", DEF_BORKER_PORT);

    username = iniparser_getstring(ini, "user_passwd:username", DEF_BORKER_USERNAME);
    passwd = iniparser_getstring(ini, "user_passwd:passwd", DEF_BORKER_PASSWD);

    clientid = iniparser_getstring(ini, "clientId:id",DEF_BORKER_CLIENTID );

    pubTopic = iniparser_getstring(ini, "pub_topic:topic", DEF_BORKER_PUBTOPIC);


    pubQos = iniparser_getint(ini, "aly_Qos:QoS", DEF_PUBQOS);

    keepalive = iniparser_getint(ini, "KEEP_ALIVE:alive",DEF_BROKER_KEEPALIVE);

    method = iniparser_getstring(ini, "aly_json:method",DEF_METHOD);
    id = iniparser_getstring(ini, "aly_json:id",DEF_ID);
    identifier = iniparser_getstring(ini, "aly_json:identifier",DEF_IDENTIFIER);
    version = iniparser_getstring(ini, "aly_json:version",DEF_VERSION);

    strncpy(mqtt->method, method, sizeof(mqtt->method));
    strncpy(mqtt->id, id, sizeof(mqtt->method));
    strncpy(mqtt->identifier, identifier,sizeof(mqtt->identifier));
    strncpy(mqtt->version, version, sizeof(mqtt->version));



    /*  Broker settings  */
    strncpy(mqtt->hostname, hostname, sizeof(mqtt->hostname) );
    mqtt->port = port;
    printf("Use default broker server [%s:%d]\n", mqtt->hostname, mqtt->port);

    strncpy(mqtt->username, username, sizeof(mqtt->username) );
    strncpy(mqtt->passwd , passwd, sizeof(mqtt->passwd) );
    printf("Use default broker author by [%s:%s]\n", mqtt->username, mqtt->passwd);

    mqtt->keepalive= keepalive;
    printf("Use default broker keepalive timeout [%d] seconds\n", mqtt->keepalive);

    /*  Publisher settings  */
    strncpy(mqtt->pubTopic, pubTopic, sizeof(mqtt->pubTopic));
    mqtt->pubQos = pubQos;
    printf( "Use default publisher topic \"%s\" with Qos[%d]\n",mqtt->pubTopic, mqtt->pubQos);

    strncpy(mqtt->clientid,  clientid, sizeof(mqtt->clientid));

    /*释放dictionary对象/内存*/
    iniparser_freedict(ini); 

    return 0;

}

以下代码是搭建客户端的主函数,此函数主要用mosquitto库里面的函数,连接阿里云然后发送JSON格式的数据,实现将客户端发布的数据,显示到阿里云平台上面。

/*********************************************************************************
 *      Copyright:  (C) 2020 makun<1394987689@qq.com>
 *                  All rights reserved.
 *
 *       Filename:  mosquitto_pub.c
 *    Description:  This file 
 *                 
 *        Version:  1.0.0(2020年07月08日)
 *         Author:  makun <1394987689@qq.com>
 *      ChangeLog:  1, Release initial version on "2020年07月08日 22时30分04秒"
 *                 
 ********************************************************************************/


#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <libgen.h>
#include <getopt.h>
#include <string.h>
#include <mosquitto.h>
#include <stdlib.h>

#include "conf.h"
#include "cJSON.h"


#define  PROG_VERSION "1.0.0"
#define  PATH_INT "./mqtt_aly_conf.ini"

void pub_conn_callback(struct mosquitto *mosq, void *obj, int rc);
static void  print_usage( char *progname)
{
    printf("Usage  %s [option]...\n", progname);
    printf("%s is makun studi MQTT daemon program running on RaspberryPi\n", progname);

    printf("-p (--port):   the port of the server you want to connect\n");
    printf("-h (--host):   the hostname of the server you want to connect\n");
    printf("-u (--user):   the username of the client\n");
    printf("-P (--passwd): the passwd of the client you\n");
    printf("-i (--clientid): the clientid of the user\n");
    printf("-t (--topic):  the topic of the client you want to pub\n");
    printf("-H (--help): Display this help information\n");
    printf("-v (--version): Display the program version\n");

    printf("%s  Version %s\n", progname, PROG_VERSION);
    return ;
}

int main (int argc, char **argv)
{
    char         *host = NULL;
    int          port ;
    char         *clientid = NULL;
    char         *user = NULL;
    char         *passwd = NULL;
    char         *topic = NULL;
    int          rv = -1;
    int          opt= -1;
    char         *progname = NULL;
    bool         session = true;
    mqtt_ctx_t   mqtt;

    struct mosquitto *mosq = NULL;

    
    struct option long_options[]= {
        {"host", required_argument, NULL, 'h'},
        {"port", required_argument, NULL, 'p'},
        {"user", required_argument, NULL, 'u'},
        {"passwd",required_argument, NULL,'P'},
        {"topic", required_argument, NULL, 't'},
        {"clientid", required_argument, NULL, 'i'},
        {"help", no_argument, NULL, 'H'},
        {"version", no_argument, NULL, 'v'},
        {0, 0, 0, 0}
    };

    progname = (char *)basename(argv[0]);

    while( (opt = getopt_long(argc, argv,"h:p:u:P:i:tHv", long_options,NULL)) != -1)
    {
        switch (opt)
        {
            case 'h':
                host = optarg;
                break;
            case 'p':
                port = atoi(optarg);
                break;
            case 'u':
                user = optarg;
                break;
            case 'P':
                passwd = optarg;
                break;
            case 'i':
                clientid = optarg;
            case 't':
                topic = optarg;
                break;
            case 'v':
                printf("%s Version %s\n",progname, PROG_VERSION);
                return 0;
            case 'H':
                print_usage(progname);
                return 0;
            default:
                break;
        }
    }

    rv=set_mqtt_conf(PATH_INT, host, port, clientid, user,passwd, topic);
    if(rv < 0)
    {
        printf("set mqtt conf is failure %d\n", rv);
        return -1;
    }

    memset(&mqtt, 0, sizeof(mqtt));

    rv = gain_mqtt_conf(PATH_INT, &mqtt);
    if(rv < 0)
    {
        printf("gain mqtt conf failure %d\n", rv);
        return -2;
    }

    /*必须在任何其他mosquitto功能之前调用*/
    mosquitto_lib_init();

    /*创建一个新的mosquitto客户端实例,第二个参数为true,代理清除断开连接时的所有消息和订阅*/
    mosq = mosquitto_new(mqtt.clientid,session, (void *)&mqtt );
    if(!mosq)
    {
        printf("mosquitto new failure: %s\n", strerror(errno));
        goto cleanup;
    }

    printf("Create mosquitto successfuly\n");

    /*设置连接回调,当代理发送CONNACK消息以响应连接时,将调用此方法*/
    mosquitto_connect_callback_set(mosq, pub_conn_callback);


    /*配置mosquitto实例的用户名和密码*/
    if( mosquitto_username_pw_set(mosq, mqtt.username,mqtt.passwd) !=MOSQ_ERR_SUCCESS)
    {
        printf("mosquitto username and passwd failure:%s\n",strerror(errno));
        goto cleanup;
    }

    while(1)
    {
        /*连接MQTT代理*/
        if(mosquitto_connect(mosq, mqtt.hostname, mqtt.port, mqtt.keepalive) != MOSQ_ERR_SUCCESS )
        {
            printf("mosquitto connect server failure:%s\n",strerror(errno));
            continue;
            sleep(1);
        }

        /*无线阻塞循环调用loop*/
        mosquitto_loop_forever(mosq, -1, 1 );
        sleep(10);
    }

cleanup:
    printf("program will exit\n");
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    return 0;

}


void pub_conn_callback(struct mosquitto *mosq, void *obj, int rc)
{
    mqtt_ctx_t  *mqtt;
    int mid;
    char    *msg;
    cJSON   *root = cJSON_CreateObject();
    cJSON   *item = cJSON_CreateObject();
    int     retain = 0;
    char    *payload = "moquitto test aly";

    if(!mosq ||!obj)
    {
        printf("invalid input argument\n");
        return ;
    }
    mqtt = (mqtt_ctx_t *)obj;

    cJSON_AddItemToObject(root, "method", cJSON_CreateString(mqtt->method));//根节点下添加
    cJSON_AddItemToObject(root, "id", cJSON_CreateString(mqtt->id));//根节点下添加
    cJSON_AddItemToObject(root, "params",item);
    cJSON_AddItemToObject(item, mqtt->identifier, cJSON_CreateString(payload));
    cJSON_AddItemToObject(root, "version", cJSON_CreateString(mqtt->version));
    
    msg = cJSON_Print(root);
    printf("%s\n", msg);

    if(!rc)
    {  
        if( mosquitto_publish(mosq,&mid,mqtt->pubTopic,strlen(msg)+1, msg, mqtt->pubQos, retain) != MOSQ_ERR_SUCCESS )  
        {   
            printf("Mosq_Publish() error: %s\n", strerror(errno));
            return ;
        }
        printf("pubilush topic:%s\n",mqtt->pubTopic) ;
    }
    mosquitto_disconnect(mosq) ;

}

我们打开阿里云平台上的在线调试。
基于mosquitto库搭建mqtt客户端发布实现与阿里云的通信
基于mosquitto库搭建mqtt客户端发布实现与阿里云的通信

基于mosquitto库搭建mqtt客户端发布实现与阿里云的通信
这样我们就成功实现了客户端到服务器端的通信,以上,只是我写的一个测试的例子,接下来,我将学习SHT20获取温度,然后通过mqtt客户端发布温度,阿里云能够实时监控到温度的一个学习,感兴趣的可以关注一下我的博客,我们一起学习。

编程源码链接:https://gitee.com/ma_kung/mqtt

本文地址:https://blog.csdn.net/makunIT/article/details/107269016