TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

linux-消息队列

参考

unp

进程间通信之POSIX消息队列

wikipedia

概念

在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。[1]

实现

实际上,消息队列常常保存在链表结构中。[2]拥有权限的进程可以向消息队列中写入或读取消息。

当前,有很多消息队列有很多开源的实现,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、RabbitMQ[3]、IBM MQ[4]、Apache Qpid[5]、Apache RocketMQ[6]和HTTPSQS。[7]

优缺点

消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如WWW中使用的HTTP协议(HTTP/2之前)是同步的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。

和信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。[2]但消息队列仍然有大小限制。

消息队列除了可以当不同线程或进程间的缓冲外,更可以透过消息队列当前消息数量来侦测接收线程或进程性能是否有问题。

使用

数据结构

1
2
3
4
5
6
7
8
9
10
typedef int mqd_t;

struct mq_attr
{
long int mq_flags; /* Message queue flags. */
long int mq_maxmsg; /* Maximum number of messages. */
long int mq_msgsize; /* Maximum message size. */
long int mq_curmsgs; /* Number of messages currently queued. */
};

mq_open

1
2
3
4
5
6
7
8
9
/* Establish connection between a process and a message queue NAME and
return message queue descriptor or (mqd_t) -1 on error. OFLAG determines
the type of access used. If O_CREAT is on OFLAG, the third argument is
taken as a `mode_t', the mode of the created message queue, and the fourth
argument is taken as `struct mq_attr *', pointer to message queue
attributes. If the fourth argument is NULL, default attributes are
used. */
extern mqd_t mq_open (const char *__name, int __oflag, ...)
__THROW __nonnull ((1));

mq_close

1
2
3
/* Removes the association between message queue descriptor MQDES and its
message queue. */
extern int mq_close (mqd_t __mqdes) __THROW;

mq_getattr

1
2
3
/* Query status and attributes of message queue MQDES.  */
extern int mq_getattr (mqd_t __mqdes, struct mq_attr *__mqstat)
__THROW __nonnull ((2));

mq_setattr

1
2
3
4
5
6
/* Set attributes associated with message queue MQDES and if OMQSTAT is
not NULL also query its old attributes. */
extern int mq_setattr (mqd_t __mqdes,
const struct mq_attr *__restrict __mqstat,
struct mq_attr *__restrict __omqstat)
__THROW __nonnull ((2));
1
2
/* Remove message queue named NAME.  */
extern int mq_unlink (const char *__name) __THROW __nonnull ((1));

mq_notify

1
2
3
4
/* Register notification issued upon message arrival to an empty
message queue MQDES. */
extern int mq_notify (mqd_t __mqdes, const struct sigevent *__notification)
__THROW;

mq_receive

1
2
3
4
/* Receive the oldest from highest priority messages in message queue
MQDES. */
extern ssize_t mq_receive (mqd_t __mqdes, char *__msg_ptr, size_t __msg_len,
unsigned int *__msg_prio) __nonnull ((2));

mq_send

1
2
3
/* Add message pointed by MSG_PTR to message queue MQDES.  */
extern int mq_send (mqd_t __mqdes, const char *__msg_ptr, size_t __msg_len,
unsigned int __msg_prio) __nonnull ((2));

mq_timedreceive

1
2
3
4
5
6
7
8
#ifdef __USE_XOPEN2K
/* Receive the oldest from highest priority messages in message queue
MQDES, stop waiting if ABS_TIMEOUT expires. */
extern ssize_t mq_timedreceive (mqd_t __mqdes, char *__restrict __msg_ptr,
size_t __msg_len,
unsigned int *__restrict __msg_prio,
const struct timespec *__restrict __abs_timeout)
__nonnull ((2, 5));

mq_timedsend

1
2
3
4
5
6
7
/* Add message pointed by MSG_PTR to message queue MQDES, stop blocking
on full message queue if ABS_TIMEOUT expires. */
extern int mq_timedsend (mqd_t __mqdes, const char *__msg_ptr,
size_t __msg_len, unsigned int __msg_prio,
const struct timespec *__abs_timeout)
__nonnull ((2, 5));
#endif

DEMO

send.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <iostream>
#include <unistd.h>
#include <mqueue.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string>
#include <time.h>
#include <string.h>

using namespace std;

int main()
{
mqd_t mq = mq_open("/mymq", O_RDWR | O_CREAT, 0644, NULL);
if (mq < 0)
cout << "open err!" << endl;

time_t tt;
char buf[100] = {0};
for (int i = 0; i < 20; i++)
{
time(&tt);
struct tm *ptm = nullptr;
ptm = localtime(&tt);

strftime(buf, sizeof(buf), "%Y %m %d %H %M %S", ptm);

cout << buf << endl;
mq_send(mq, buf, strlen(buf), 0);
sleep(1);
}

mq_close(mq);




return 0;
}

recv.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <iostream>
#include <unistd.h>
#include <mqueue.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string>
#include <time.h>
#include <string.h>

using namespace std;

int main()
{
mqd_t mq = mq_open("/mymq", O_RDWR | O_CREAT, 0644, NULL);
if (mq < 0)
cout << "open err!" << endl;

mq_attr attr = {0};
ssize_t len = 0;
mq_getattr(mq, &attr);
len = attr.mq_msgsize;
char *psz = (char*)malloc(len);
while (true)
{
if (0 > mq_receive(mq, psz, len, NULL))
{
cout << "recv err!" << endl;
//free not use, just test
free(psz);
psz = NULL;
return -1;
}

cout << psz << endl;
}
free(psz);
psz = NULL;
mq_close(mq);




return 0;
}

OUTPUT

  [root@localhost mq]# ./send 
  2020 04 14 23 03 23
  2020 04 14 23 03 24
  2020 04 14 23 03 25
  2020 04 14 23 03 26
  2020 04 14 23 03 27
  2020 04 14 23 03 28
  2020 04 14 23 03 29
  2020 04 14 23 03 30
  2020 04 14 23 03 31
  2020 04 14 23 03 32
  2020 04 14 23 03 33
  2020 04 14 23 03 34
  2020 04 14 23 03 35
  2020 04 14 23 03 36
  2020 04 14 23 03 37
  2020 04 14 23 03 38
  2020 04 14 23 03 39
  2020 04 14 23 03 40
  2020 04 14 23 03 41
  2020 04 14 23 03 42

  [root@localhost mq]# ./recv 
  2020 04 14 23 03 23
  2020 04 14 23 03 24
  2020 04 14 23 03 25
  2020 04 14 23 03 26
  2020 04 14 23 03 27
  2020 04 14 23 03 28
  2020 04 14 23 03 29
  2020 04 14 23 03 30
  2020 04 14 23 03 31
  2020 04 14 23 03 32
  2020 04 14 23 03 33
  2020 04 14 23 03 34
  2020 04 14 23 03 35
  2020 04 14 23 03 36
  2020 04 14 23 03 37
  2020 04 14 23 03 38
  2020 04 14 23 03 39
  2020 04 14 23 03 40
  2020 04 14 23 03 41
  2020 04 14 23 03 42
  ^C

ending

71944339_p0_master1200.jpg

----------- ending -----------