TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

epoll惊群

参考

再谈Linux epoll惊群问题的原因和解决方案


accept惊群问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key,
wait_queue_entry_t *bookmark)
{
wait_queue_entry_t *curr, *next;
int cnt = 0;

lockdep_assert_held(&wq_head->lock);

if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) {
curr = list_next_entry(bookmark, entry);

list_del(&bookmark->entry);
bookmark->flags = 0;
} else
curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);

if (&curr->entry == &wq_head->head)
return nr_exclusive;

list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
unsigned flags = curr->flags;
int ret;

if (flags & WQ_FLAG_BOOKMARK)
continue;

//唤醒函数
ret = curr->func(curr, mode, wake_flags, key);
if (ret < 0)
break;
if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;

if (bookmark && (++cnt > WAITQUEUE_WALK_BREAK_CNT) &&
(&next->entry != &wq_head->head)) {
bookmark->flags = WQ_FLAG_BOOKMARK;
list_add_tail(&bookmark->entry, &next->entry);
break;
}
}

return nr_exclusive;
}

WQ_FLAG_EXCLUSIVE参数可以确保只wake one,而不是wake many.可以解决该问题

如果是WQ_FLAG_EXCLUSIVE,则wake一次就退出循环

epoll惊群问题

linux 3.10.1 kernel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
struct wakeup_source *ws;
poll_table pt;

init_poll_funcptr(&pt, NULL);

for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
epi = list_first_entry(head, struct epitem, rdllink);

ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}

//先从就绪链表删除
list_del_init(&epi->rdllink);

revents = ep_item_poll(epi, &pt);

if (revents) {
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
//LT模式下会继续加入就绪链表
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
}

return eventcnt;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv,
int depth)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);

mutex_lock_nested(&ep->mtx, depth);

spin_lock_irqsave(&ep->lock, flags);
list_splice_init(&ep->rdllist, &txlist);
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);

error = (*sproc)(ep, &txlist, priv);

//回调函数中判断如果是LT模式,则继续加入就绪链表
spin_lock_irqsave(&ep->lock, flags);

for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {

if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
ep->ovflist = EP_UNACTIVE_PTR;

if (!list_empty(&ep->rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);

mutex_unlock(&ep->mtx);
//.............................
return error;
}


LT模式下惊群的原因:如果是水平触发,从就绪链表中取出后发送给用户空间然后又加入了就绪链表,然会判断队列非空,会唤醒其他阻塞在epoll_wait的进程/线程。这里的关键是epoll是怎么唤醒的

复现

用这个作者再谈Linux epoll惊群问题的原因和解决方案的demo测试

里面有个值需要改大点才能复现出来,把这个彩蛋留下来,本文只复制人间的demo,不改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <time.h>
#include <signal.h>

#define COUNT 1

int mode = 0;
int slp = 0;

int pid[COUNT] = {0};
int count = 0;

void server(int epfd)
{
struct epoll_event *events;
int num, i;
struct timespec ts;

events = calloc(64, sizeof(struct epoll_event));

while (1) {
int sd, csd;
struct sockaddr in_addr;

num = epoll_wait(epfd, events, 64, -1);
if (num <= 0) {
continue;
}
/*
ts.tv_sec = 0;
ts.tv_nsec = 1;
if(nanosleep(&ts, NULL) != 0) {
perror("nanosleep");
exit(1);
}
*/
// 用于测试ET模式下丢事件的情况
if (slp) {
sleep(slp);
}

sd = events[0].data.fd;
socklen_t in_len = sizeof(in_addr);

csd = accept(sd, &in_addr, &in_len);
if (csd == -1) {
// 打印这个说明中了epoll LT惊群的招了。
printf("shit xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:%d\n", getpid());
continue;
}
// 本进程一共成功处理了多少个请求。
count ++;
printf("get client:%d\n", getpid());
close(csd);
}
}

static void siguser_handler(int sig)
{
// 在主进程被Ctrl-C退出的时候,每一个子进程均要打印自己处理了多少个请求。
printf("pid:%d count:%d\n", getpid(), count);
exit(0);
}

static void sigint_handler(int sig)
{
int i = 0;
// 给每一个子进程发信号,要求其打印自己处理了多少个请求。
for (i = 0; i < COUNT; i++) {
kill(pid[i], SIGUSR1);
}
}

int main (int argc, char *argv[])
{
int ret = 0;
int listener;
int c = 0;
struct sockaddr_in saddr;
int port;
int status;
int flags;
int epfd;
struct epoll_event event;


if (argc < 4) {
exit(1);
}

// 0为LT模式,1为ET模式
mode = atoi(argv[1]);
port = atoi(argv[2]);
// 是否在处理accept之前耽搁一会儿,这个参数更容易重现问题
slp = atoi(argv[3]);

signal(SIGINT, sigint_handler);

listener = socket(PF_INET, SOCK_STREAM, 0);

saddr.sin_family = AF_INET;
saddr.sin_port = htons(port);
saddr.sin_addr.s_addr = INADDR_ANY;

bind(listener, (struct sockaddr*)&saddr, sizeof(saddr));
listen(listener, SOMAXCONN);

flags = fcntl (listener, F_GETFL, 0);
flags |= O_NONBLOCK;
fcntl (listener, F_SETFL, flags);


epfd = epoll_create(64);
if (epfd == -1) {
perror("epoll_create");
abort();
}

event.data.fd = listener;
event.events = EPOLLIN;
if (mode == 1) {
event.events |= EPOLLET;
} else if (mode == 2) {
event.events |= EPOLLONESHOT;
}

ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &event);
if (ret == -1) {
perror("epoll_ctl");
abort();
}


for(c = 0; c < COUNT; c++) {
int child;
child = fork();
if(child == 0) {
// 安装打印count值的信号处理函数
signal(SIGUSR1, siguser_handler);
server(epfd);
}
pid[c] = child;
printf("server:%d pid:%d\n", c+1, child);
}
wait(&status);
sleep(1000000);
close (listener);
}

遗留问题

再有的服务器上测试上面的代码可以复现(试了kernel 2.6版本和3.10版本).但在我虚拟机(3.10 版本)死活复现不出来。不知道具体原因了。按照上面csdn那篇文章理解是会出现这种情况(前提是,一个epoll_wait释放了wait链后,其他epoll_wait的wait还是生效的,不过分析感觉应该是生效的,也可能跟进程还是线程监控有关系把)。这个留作以后有时间了再研究吧。


ending

71671791_p0_master1200_lit.jpg

----------- ending -----------