欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

在linux下使用c语言实现MQTT(二.编程实现)

程序员文章站 2022-07-15 09:59:30
...

我是用的secureCRT登录的树莓派,要实现MQTT通信,就需要用到许多关于MQTT的函数,这里我用的是Paho.c库,所以首先下载库:
在git下下载paho C库git clone https://github.com/eclipse/paho.mqtt.c.git

cd paho.mqtt.c
make//编译
sudo make install//安装

执行make之后我们在build/output/下能看到以下动态库:

[email protected]_RPi:~/caiwentao/team_lihaojie/caiwentao/mqtt/mqtt_publish/paho.mqtt/build/output $ ls
libpaho-mqtt3a.so      libpaho-mqtt3as.so      libpaho-mqtt3c.so      libpaho-mqtt3cs.so      paho_c_version
libpaho-mqtt3a.so.1    libpaho-mqtt3as.so.1    libpaho-mqtt3c.so.1    libpaho-mqtt3cs.so.1    samples
libpaho-mqtt3a.so.1.0  libpaho-mqtt3as.so.1.0  libpaho-mqtt3c.so.1.0  libpaho-mqtt3cs.so.1.0  test

在这里说一下这里面的各个动态库的作用:
paho-mqtt3a : 一般实际开发中就是使用这个,a表示的是异步消息推送(asynchronous)。
paho-mqtt3as : as表示的是 异步+加密(asynchronous+OpenSSL)。
paho-mqtt3c : c 表示的应该是同步(Synchronize),一般性能较差,是发送+等待模式。
paho-mqtt3cs : cs表示的是同步+加密(asynchronous+OpenSSL)。
一丶发布端程序

get_temperature.c  get_temperature.h  get_time.c  get_time.h  opt_init.c  opt_init.h  publish.c  set_signal.c  set_signal.h

下面解释一下这些c代码:
1丶get_temperature.c就是发布端发布的消息,其内容是实验室树莓派获取的实时温度,其代码如下:

#include<stdio.h>
#include<unistd.h>
#include<dirent.h>
#include<string.h>
#include<sys/types.h>
#include<errno.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<stdlib.h>
#include"get_temperature.h"
int  get_temperature(float *temper)
{
        char    path[128]="/sys/bus/w1/devices/";
        char    path_s[32];
        char    buf[128];
        char    *ptr;
        DIR     *dirp;
        int     a=0;
        int     fd; 
        struct dirent *direntp;
        if((dirp=opendir(path))==NULL)
        {   
                printf("Open %s failure:%s\n",path,strerror(errno));
                return -1; 
        }   
        while((direntp=readdir(dirp))!=NULL)
        {   
                if(strstr(direntp->d_name,"28-"))
                {   
                        strcpy(path_s,direntp->d_name);
                        a=1;
                }   
        }   
        if(a==0)
        {   
                printf("Can not find ds18b20 in %s\n",path);
                return -1; 
        }   
        strncat(path,path_s,sizeof(path));
        strncat(path,"/w1_slave",sizeof(path));
        if((fd=open(path,O_RDONLY))<0)
        {   
                printf("Open %s failure:%s\n",path,strerror(errno));
                return -1;
        }
        if(read(fd,buf,sizeof(buf))<0)
        {
                printf("read data from %s failure:%s\n",path,strerror(errno));
                return -1;
        }
        ptr=strstr(buf,"t=");

        ptr+=2;

        if(!ptr)
        {
                printf("ERROR:%s\n",strerror(errno));
                return 1000;
        }
        *temper=atof(ptr)/1000;
        return 0;
}

2丶get_time.c:获取当前时间,网上有很多,我的示例代码如下:

1 #include<time.h>
  2 #include<stdio.h>
  3 #include<string.h>
  4 #include"get_time.h"
  5 int get_time(char time_s[24])
  6 {
  7         time_t   timec;
  8         char *str;
  9         time(&timec);
 10         str=ctime(&timec);
 11         char *chr=strchr(str,'\n');
 12         *chr='\0';
 13         strcpy(time_s,str);
 14         return 0;
 15 
 16 }

3丶opt_init.c:参数初始化函数。示例代码如下:

  1 #include<stdio.h>
  2 #include<getopt.h>
  3 #include<unistd.h>
  4 #include<string.h>
  5 #include<stdlib.h>
  6 #include"opt_init.h"
  7 void usage(char *arg)
  8 {
  9         printf("%s usage:\n",arg);
 10         puts("-p (--port)get the port");
 11         puts("-d (--daemon)run back");
 12         puts("-a (-address)the publish address");
 13         puts("-h (--help)get the help message");
 14         puts("-i (--id) get the publish id");
 15         puts("-t (--topic)the publish topic");
 16         return ;
 17 }
 18 
 19 int opt_init(int *port,char name[],char sub_id[],char topic[],int argc,char **argv)
 20 {
 21         struct option opts[]=
 22         {
 23                 {"port",required_argument,NULL,'p'},
 24                 {"daemon",no_argument,NULL,'d'},
 25                 {"topic",required_argument,NULL,'t'},
 26                 {"sub_id",required_argument,NULL,'i'},
 27                 {"address",required_argument,NULL,'a'},
 28                 {"help",no_argument,NULL,'h'},
 29                 {NULL,0,NULL,0}
 30         };
 31         int       rv; 32         while((rv=getopt_long(argc,argv,"d:p:a:i:t:h",opts,NULL))!=-1)
 33         {
 34                 switch(rv)
 35                 {
 36                     case 'd':
 37                     if(daemon(0,0)<0)
 38                 {
 39                     printf("daemon error\n");
 40                     return 0;
 41                 }
 42                     case 'p':
 43                         *port=atoi(optarg);
 44                         break;
 45                     case 'a':
 46                         strcpy(name,optarg);
 47                         break;
 48                     case 'i':
 49                         strcpy(sub_id,optarg);
 50                         break;
 51                     case 't':
 52                         strcpy(topic,optarg);
 53                         break;
 54                     case 'h':
 55                         usage(argv[0]);
 56                         return 0;
 57                         default:
 58                         break;
 59                 }
 60         }
 61         if(!*port||!name)
 62         {
 63                 usage(argv[0]);
 64                 return -1;
 65         }
 66 }

4丶set_signal.c:设置信号函数,示例代码如下:

 1 #include<signal.h>
  2 #include<stdio.h>
  3 #include<stdlib.h>
  4 #include"set_signal.h"
  5 
  6 
  7 int g_stop=0;
  8 void signal_action(int signum)
  9 {
 10         if(signum==SIGKILL)
 11         {
 12                 g_stop=1;
 13                 printf("kill signal makes program ended\n");
 14         }
 15         else if(signum==SIGINT)
 16         {
 17                 g_stop=1;
 18                 printf("Ctrl+C signal makes program ended\n");
 19         }
 20         exit(0);
 21 }
 22 
 23 int set_signal(void)
 24 {
 25         struct sigaction        sigact;
 26         sigemptyset(&sigact.sa_mask);
 27         sigact.sa_flags=0;
 28         sigact.sa_handler=signal_action;
 29 
 30         sigaction(SIGKILL,&sigact,0);
 31         sigaction(SIGINT,&sigact,0);
 32         return 0;
 33 }

5丶publish.c:发布端函数,示例代码如下:

 1 #include<stdio.h>
  2 #include<string.h>
  3 #include<stdlib.h>
  4 #include<errno.h>
  5 #include<unistd.h>
  6 #include"MQTTClient.h"
  7 #include"set_signal.h"
  8 #include"get_time.h"
  9 #include"get_temperature.h"
 10 #include"opt_init.h"
 11 //此发布端不使用回调函数
 12 int main(int argc,char **argv)
 13 {
 14     char                      address[128];
 15     int                       port=0;
 16     char                      topic[128];
 17     char                      pub_id[128];
 18     char                      buf[128]={'\0'};
 19     char                      date[128];
 20     char                      address_s[128];
 21     float                     temper;
 22     int                       rv;
 23     const int                 qos=1;
 24     const long                timeout=10000L;
 25     if(set_signal()<0)
 26     {
 27         printf("set_signal error:%s\n",strerror(errno));
 28         return -1;
 29     }
 30     if(opt_init(&port,address,pub_id,topic,argc,argv)<0)
 31     {
 32         printf("opt_init failure:%s\n",strerror(errno));
 33         return -1;
 34     }
 35     snprintf(address_s,sizeof(address_s),"tcp://%s:%d",address,port);
 36     MQTTClient client;
 37     MQTTClient_connectOptions conn_opts=MQTTClient_connectOptions_initializer;
 38     MQTTClient_message publish_msg=MQTTClient_message_initializer;
 39     MQTTClient_deliveryToken token;
 40     conn_opts.keepAliveInterval=60;
 41     conn_opts.cleansession=1;
 42     MQTTClient_create(&client,address_s,pub_id,MQTTCLIENT_PERSISTENCE_NONE,NULL);
  43     if((rv=MQTTClient_connect(client,&conn_opts))!=MQTTCLIENT_SUCCESS)
 44     {
 45         printf("MQTTClient_connect error:%s\n",strerror(errno));
 46         return -1;
 47     }
 48     publish_msg.qos=qos;
 49     publish_msg.retained=0;
 50     while(!g_stop)
 51     {
 52         if(get_time(date)<0)
 53         {
 54             printf("get_time error:%s\n",strerror(errno));
 55             return -1;
 56         }
 57         if(get_temperature(&temper)<0)
 58         {
 59             printf("get_temperature error:%s\n",strerror(errno));
 60             return -1;
 61         }
 62         snprintf(buf,sizeof(buf),"RPI0001/%s/%f",date,temper);
 63         publish_msg.payload=(void *)buf;
 64         publish_msg.payloadlen=strlen(buf);
 65         MQTTClient_publishMessage(client,topic,&publish_msg,&token);
 66         printf("Waiting for %d seconds for publication of---- %s---- on topic %s for subscriber with id:%s\n",timeout/1000,buf,topic,pub_id);
 67         rv=MQTTClient_waitForCompletion(client,token,timeout);
 68         printf("Message with delivery token %d delivered\n",rv);
 69         sleep(30);
 70     }
 71 }

二丶订阅端程序:

 db_init.h    insert_db.h      opt_init.c  set_signal.c   string_break.c   subscribe.c
 db_init.c    insert_db.c      opt_init.h  set_signal.h   string_break.h

1丶db_init.c:数据库初始化函数,示例代码如下:

 13 #include<stdio.h>
 14 #include<sqlite3.h>
 15 #include"db_init.h"
 16 #include<stdlib.h>
 17 int callback(void *Notused,int argc,char **argv,char **azColName)
 18 {
 19     int i=0;
 20     for(i=0;i<argc;i++)
 21     {
 22         printf("%s=%s\n",azColName[i],argv[i]?argv[i]:"NULL");
 23     }
 24     printf("\n");
 25     return 0;
 26 }
 27 
 28 int db_init(char *db_name,char *file_name)
 29 {
 30     sqlite3     *db;
 31     char        *zErrMsg=0;
 32     int         rc;
 33     char        sql[256];
 34     rc = sqlite3_open(db_name,&db);
 35     if(rc!=0)
 36     {
 37         fprintf(stderr,"can't open database:%s\n",sqlite3_errmsg(db));
 38         return -1;
 39     }
 40     else
 41     {
 42         fprintf(stdout,"open database ok\n");
 43     }
 44      snprintf(sql,256,"CREATE TABLE IF NOT EXISTS %s("
 45         "I INTEGER   PRIMARY KEY AUTOINCREMENT ,"
 46         "ID CHAR(32)                NOT NULL,"
 47         "DATE          CHAR(48)     NOT NULL,"
 48         "TEMPERATURE   CHAR(32)     NOT NULL);",file_name);
 49      rc = sqlite3_exec(db,sql,callback,0,&zErrMsg); 
 50      if(rc!=SQLITE_OK)
 51      {
 52         fprintf(stderr,"SQL error:%s\n",zErrMsg);
 53         sqlite3_free(zErrMsg);
 54         return -1;
 55      }
 56      else
 57      {
 58         fprintf(stdout,"table create ok\n");
 59      }
 60      sqlite3_close(db);
 61      return 0;
 62 }

2丶insert_db.c:插入数据库。示例代码如下:

 13 #include<stdio.h>
 14 #include<stdlib.h>
 15 #include<sqlite3.h>
 16 #include"insert_db.h"
 17 int call_back(void *NotUsed,int argc , char **argv,char **azColName)
 18 {
 19     int i=0;
 20     for(i=0;i<argc;i++)
 21     {
 22         printf("%s=%s\n",azColName[i],argv[i]?argv[i]:"NULL");
 23     }
 24     printf("\n");
 25     return 0;
 26 }
 27 int insert_db(char *db_name,char *file_name,char *id,char *date,char *temper)
 28 {
 29     sqlite3     *db;
 30     char        *zErrMsg=0;
 31     int         rc ;
 32     char        sql[256];
 33     rc=sqlite3_open(db_name,&db);
 34     if(rc!=0)
 35     {
 36         fprintf(stderr,"can't open database:%s\n",sqlite3_errmsg(db));
 37         return -1;
 38     }
 39     else
 40     {
 41         fprintf(stderr,"open database ok\n");
 42     }
 43     snprintf(sql,256,"INSERT INTO %s(I,ID,DATE,TEMPERATURE)VALUES (NULL,'%s','%s','%s'); ",file_name,id,date,temper);
 44     rc=sqlite3_exec(db,sql,call_back,0,&zErrMsg);
  45     if(rc!=SQLITE_OK)
 46     {
 47         fprintf(stderr,"SQL error:%s\n",zErrMsg);
 48         sqlite3_free(zErrMsg);
 49         return -1;
 50     }
 51     else
 52     {
 53         fprintf(stdout,"records careated ok\n");
 54     }
 55     sqlite3_close(db);
 56     return 0;
 57 }

3丶string_break.c:字符串分割,示例代码如下:

  1 #include "Rectemper_server.h"
  2 
  3 int string_break(char *buf, char *sn, char *time, char *temper)
  4 {
  5     char            *p_head, *p_end ;
  6 
  7     p_end = strstr(buf, "/") ;
  8     strncpy(sn , buf, (p_end - buf)) ;
  9     printf("sn: %s\n", sn) ;
 10 
 11     p_head = p_end + 1 ;        // p_head = "2019-1-23  14:17:27/15.375000C"
 12     p_end = strstr( (p_end + 1), "/") ; // p_end= "/15.375000C"
 13     strncpy(time, p_head, (p_end-p_head));
 14     printf("time: %s\n", time) ;
 15 
 16     p_head = p_end + 1 ; // p_head ="15.375000C"
 17     p_end = strstr(p_end, "C") ;
 18     strncpy(temper, p_head, (p_end - p_head)) ;
 19     printf("Temperature: %s\n", temper) ;
 20 
 21 }

4丶subscribe.c:订阅端程序,示例代码如下:

 1 #include<stdio.h>
  2 #include<stdlib.h>
  3 #include<string.h>
  4 #include<errno.h>
  5 #include"MQTTClient.h"
  6 #include<unistd.h>
  7 #include"set_signal.h"
  8 #include"opt_init.h"
  9 #include"db_init.h"
 10 #include"insert_db.h"
 11 #include"string_break.h"
 12 volatile MQTTClient_deliveryToken deliveredtoken;
 13 
 14 void delivered(void *context,MQTTClient_deliveryToken dt)
 15 {
 16     printf("Message with token value %d delivery confirmed\n",dt);
 17     deliveredtoken=dt;
 18 }
 19 
 20 int msgarrvd(void *context,char *topicName,int topicLen,MQTTClient_message *message)
 21 {
 22     int i;
 23     char buf[128]={'\0'};
 24     char id[128];
 25     char date[128];
 26     char temper[128];
 27     char *payloadptr;
 28     payloadptr=message->payload;
 29     printf("Message arrived \n");
 30     printf("topic:%s\n",topicName);
 31     printf("Message:%s\n",message->payload);
 32     printf("message:");
 33     for(i=0;i<message->payloadlen;i++)
 34     {
 35         buf[i]=*payloadptr;
 36         putchar(*payloadptr++);
 37     }
 38     printf("%s\n",buf);
 39     putchar('\n');
  40     if(string_break(id,date,temper,buf)<0)
 41     {
 42         printf("string_break failure:%s\n",strerror(errno));
 43         return -1;
 44     }
 45     if(db_init("Messages","message")<0)
 46     {
 47         printf("db_init failure:%s\n",strerror(errno));
 48         return -1;
 49     }
 50     if(insert_db("Messages","message",id,date,temper)<0)
 51     {
 52         printf("insert_db failure:%s\n",strerror(errno));
 53         return -1;
 54     }
 55     MQTTClient_freeMessage(&message);
 56     MQTTClient_free(topicName);
 57     return 1;
 58 }
 59 
 60 void connlost(void *context,char *cause)
 61 {
 62     printf("Connection lost\n");
 63     printf("cause %s\n",cause);
 64 }
 65 
 66 
 67 int main(int argc,char **argv)
 68 {
 69     MQTTClient client;
 70     const int qos=1;
 71         const long timeout=10000L;
 72     char buf[128];
  73     int port=0;
 74     char address[128];
 75     char sub_id[128];
 76     char topic[128];
 77     MQTTClient_connectOptions conn_opts=MQTTClient_connectOptions_initializer;
 78     int rc,ch;
 79     if(set_signal()<0)
 80     {
 81         printf("set_signal failure:%s\n",strerror(errno));
 82         return -1;
 83     }
 84     if(opt_init(&port,address,sub_id,topic,argc,argv)<0)
 85     {
 86         printf("opt_init failure:%s\n",strerror(errno));
 87         return 1;
 88     }
 89     snprintf(buf,sizeof(buf),"tcp://%s:%d",address,port);
 90     MQTTClient_create(&client,buf,sub_id,MQTTCLIENT_PERSISTENCE_NONE,NULL);
 91     conn_opts.keepAliveInterval=20;
 92     conn_opts.cleansession=1;
 93     MQTTClient_setCallbacks(client,NULL,connlost,msgarrvd,delivered);
 94 
 95     if((rc=MQTTClient_connect(client,&conn_opts))!=MQTTCLIENT_SUCCESS)
 96     {
 97         printf("MQTTClient_connect failure:%s\n",strerror(errno));
 98         return -1;
 99     }
100     printf("Subscribe to topic %s for client %s using QOS %d\n",topic,sub_id,qos);
101     MQTTClient_subscribe(client,topic,qos);
102     do
103     {
104         ch=getchar();
105     }while(ch!='q'&&ch!='Q');
106     MQTTClient_disconnect(client,10000);
107     MQTTClient_destroy(&client);
108     return rc;
109 }

set_signal.c与opt_init.c与publish端代码一样。
三丶发布端与订阅端实现通信
下面是发布端发布时间和温度:

[email protected]_RPi:~/caiwentao/team_lihaojie/caiwentao/mqtt/mqtt_publish $ ./a.out -p 1883 -a 127.0.0.1 -i 100 -t cwt 
Waiting for 10 seconds for publication of---- RPI0001/Mon Jul 22 16:36:48 2019/25.937000---- on topic cwt for subscriber with id:100
Message with delivery token 0 delivered
Waiting for 10 seconds for publication of---- RPI0001/Mon Jul 22 16:37:19 2019/26.000000---- on topic cwt for subscriber with id:100
Message with delivery token 0 delivered

下面是订阅端收到时间和温度:

[email protected]_RPi:~/caiwentao/team_lihaojie/caiwentao/mqtt/mqtt_subscribe $ ./a.out -p 1883 -a 127.0.0.1  -t cwt                 
Subscribe to topic cwt for client   using QOS 1
Message arrived 
topic:cwt
Message:RPI0001/Mon Jul 22 16:36:48 2019/25.937000
message:RPI0001/Mon Jul 22 16:36:48 2019/25.937000RPI0001/Mon Jul 22 16:36:48 2019/25.937000
open database ok
table create ok
open database ok
records careated ok
Message arrived 
topic:cwt
Message:RPI0001/Mon Jul 22 16:37:19 2019/26.000000
message:RPI0001/Mon Jul 22 16:37:19 2019/26.000000RPI0001/Mon Jul 22 16:37:19 2019/26.000000
open database ok
table create ok
open database ok
records careated ok

检查是否存入数据库中:

[email protected]_RPi:~/caiwentao/team_lihaojie/caiwentao/mqtt/mqtt_subscribe $ sqlite3 
SQLite version 3.16.2 2017-01-06 16:32:41
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
sqlite> .open Messages 
sqlite> .table
message
sqlite> select * from message;
1|RPI0001|Mon Jul 22 16:36:48 20195.937000
2|RPI0001|Mon Jul 22 16:37:19 20196.000000

上图显示数据已经存入数据库中,可能由于显示问题后面的时间与温度数字出现重合,正确时间是2019年,温度是25.937000。
如上,成功实现MQTT通信。