欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息队列

程序员文章站 2022-05-18 10:46:44
...

1、消息队列结构

  • 创建一个新的消息队列或者打开一个已有的消息队列,使用msgget函数
  • 发送一个消息到消息队列,使用msgsnd函数
  • 从消息队列获取一则消息,使用msgrcv函数

        其它:消息队列自然占用内核资源,因此消息队列个数以及单个消息的最大字节个数有限制,设计时应当注意这点

消息队列

2、消息队列编程

2.1 创建消息队列

源码演示的是一个服务器进程,它负责:

  • 创建IPC对象,删除IPC对象
  • 同客户进程之间约定:使用相同的key或者是直接使用相同的IPC 标识符(msgid)
extern "C"
{
       #include <stdlib.h>
       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
       #include <stdio.h>


      /*
	 int msgget(key_t key, int msgflg);
	 int msgctl(int msqid, int cmd, struct msqid_ds *buf);

	 POSIX.1-2001, POSIX.1-2008.
	 key_t ftok(char *pathname, char proj_id);
	*/

}

int main()
{
	/*方法1:只有内核知道该IPC对象的key
	用户得到IPC的标识 msgid才是目的,key用户并不关心*/
	int msgid = msgget(IPC_PRIVATE,0);
	if(-1 == msgid)
	{
		cout << "msg queue[key private]created failed,exiting" << endl;
		return -1;
	}

	/*存储msgid,这样客户进程读取它就可以通信了*/
	char save_msgid[100];
	sprintf(save_msgid,"echo %d > /home/jianleya/keys/msgid_A_01",msgid);
	system(save_msgid);

	/*消息队列需要用户主动删除掉,它并不随着进程退出被删除*/
	int res = msgctl(msgid,IPC_RMID,NULL);
	if(-1 ==  res)
	{
		cout << "delete msg queue[key private] failed,exiting" << endl;
		return -1;
	}

	
	/*方法2:默认一个key 666 ,创建一个IPC对象。客户进程使用同样的key 666将会得到一个相同的msgid*/
	int key = 666;
	msgid = msgget(key,IPC_CREAT|IPC_EXCL);
	if(-1==msgid)
	{
		cout << "msg queue [key 666] created,exiting" << endl;
		return -1;
	}
	res = msgctl(msgid,IPC_RMID,NULL);
	if(-1 ==  res)
	{
		cout << "delete msg queue[key 666] failed,exiting" << endl;
		return -1;
	}

	/*方法3:使用ftok得到一个key,客户进程使用同样的方法产生相同的key*/
	key = ftok("/home/jianleya/fifo/fifo_example",100);
	if(-1 == key)
	{
		cout << "generate key from ftok failed,exiting" << endl;
		return -1;
	}
	msgid = msgget(key,IPC_CREAT|IPC_EXCL);
	if(-1==msgid)
	{
		cout << "msg queue [key ftok] created,exiting" << endl;
		return -1;
	}
	res = msgctl(msgid,IPC_RMID,NULL);
	if(-1 ==  res)
	{
		cout << "delete msg queue[key ftok] failed,exiting" << endl;
		return -1;
	}
	
	return 0;
}

2.2 发送与接收消息

设计目标
功能:
进程A发送消息给进程B,进程B每当接收到一条消息则打印该信息
约束:
进程A发送消息时,如果内核无法存放,进程A阻塞直到发送完毕返回
进程B接收消息时,如果内核没有数据,进程B阻塞直到有消息为止返回
进程A发来的消息类型可能有0x01,0x02,0x03,如果有其它类型,进程B认为这条消息是错误的!

消息队列进程A负责创建IPC 对象,销毁IPC对象

#include <iostream>
using namespace std;

extern "C"
{
       #include <stdlib.h>
       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
       #include <stdio.h>
       #include <unistd.h>
       #include <string.h>

      /*
	 int msgget(key_t key, int msgflg);
	 int msgctl(int msqid, int cmd, struct msqid_ds *buf);

	POSIX.1-2001, POSIX.1-2008, SVr4
	int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
	*/

}

#define MSG_SIZE (512)
#define SND_CNT (40)

char const *L0_text = "L0" ;
char const *L1_text = "L1" ;
char const *L2_text = "L2" ;

struct my_msg
{
	long type;
	char text[MSG_SIZE];
};

int main()
{
	/**/
	int key = 666;
	int msgid = msgget(key,0666|IPC_CREAT|IPC_EXCL);
	if(-1==msgid)
	{
		cout << "msg queue [key 666] created,exiting" << endl;
		return -1;
	}

	my_msg type0_msg,type1_msg,type2_msg;
	type0_msg.type = 1;
	type1_msg.type = 2;
	type2_msg.type = 3;

	int cnt = 1;
	while(1)
	{
		usleep(1000);
		sprintf(type0_msg.text,"%d %s",cnt,L0_text);
		sprintf(type1_msg.text,"%d %s",cnt,L1_text);
		sprintf(type2_msg.text,"%d %s",cnt,L2_text);
		
		
		if(cnt > SND_CNT)
		break;

		msgsnd(msgid,&type0_msg,MSG_SIZE,0);
		msgsnd(msgid,&type1_msg,MSG_SIZE,0);
		msgsnd(msgid,&type2_msg,MSG_SIZE,0);
		cnt++;
	}
	/*这里需要进程同步
	服务器进程休眠30s,并不能保证客户端进程一定可以处理完毕所有的消息*/
	sleep(30);
	int res = msgctl(msgid,IPC_RMID,NULL);
	if(-1 ==  res)
	{
		cout << "delete msg queue[key 666] failed,exiting" << endl;
		return -1;
	}
	
	return 0;
}

进程B检测到IPC对象被删除,进程B退出

#include <iostream>
using namespace std;

extern "C"
{
       #include <stdlib.h>
       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
       #include <stdio.h>
       #include <unistd.h>
       #include <errno.h>

      /*
	 int msgget(key_t key, int msgflg);
	 int msgctl(int msqid, int cmd, struct msqid_ds *buf);

	POSIX.1-2001, POSIX.1-2008, SVr4
	ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
                      int msgflg);

	*/

}

#define MSG_SIZE (512)


struct recv_msg
{
	long type;
	char text[MSG_SIZE];
};

recv_msg msg;
int msgid;

int msg_handler(void)
{

		int res = msgrcv(msgid,&msg,MSG_SIZE,0,0);
		if(-1 == res)
		{
			perror("msgrcv");
			return errno;
		}
		else
		{
			cout << msg.text << endl;
		}
	
}

int main()
{
	/**/
	int key = 666;
	msgid = msgget(key,0666);
	if(-1==msgid)
	{
		cout << "msg queue [key 666] created,exiting" << endl;
		return -1;
	}



	while(1)
	{
		int res = msg_handler();
		if(EIDRM ==  res)
		{
			cout << "exiting process" << endl;
			return 0;
		}
		else if(EINTR ==  res)
		{
			continue;
		}
		
		
	}
	

	
	return 0;
}
相关标签: 进程间通信