TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

linux-互斥锁和条件变量

参考

Posix线程编程指南(3)

glibc nptl库pthread_mutex_lock和pthread_mutex_unlock浅析

概念

互斥锁和条件变量出自posix.1线程标准,它们总是可用来同步一个进程内的各个线程的,如果一个互斥锁或条件变量存放在多个进程间共享的某个内存区内,那么posix还允许它用于这些个进程间的同步。

Posix mutex

互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。

Posix Thread中的条件变量

pthread中,条件变量实际上是一个阻塞线程处于睡眠状态的线程队列。每个条件变量都必须与一个(且建议只能是一个)互斥锁配套使用。一个线程首先获得互斥锁,然后检查或者修改“条件”;如果条件不成立,则调用pthread_cond_wait(),依次实施3个操作:

1 对当前互斥锁解锁(以便其它线程可以访问或者修改“条件”)

2 把线程自身阻塞挂起到互斥锁的线程队列中

3 被其它线程的信号从互斥锁的线程队列唤醒后,对当前配套的互斥锁申请加锁,如果加锁未能成功,则阻塞挂起到当前互斥锁上。pthread_cond_wait() 函数将不返回直到线程获得配套的互斥锁。

线程离开“条件”的临界区时,必须对当前互斥锁执行解锁。

NPTL

Native POSIX Thread Library

Native POSIX Thread Library(NPTL)是Linux内核中实践POSIX Threads标准的库

在Linux内核2.6出现之前进程是(最小)可调度的对象,当时的Linux不真正支持线程。但是Linux内核有一个系统调用指令clone(),这个指令产生一个调用调用的进程的复件,而且这个复件与原进程使用同一地址空间。LinuxThreads计划使用这个系统调用来提供一个内核级的线程支持。但是这个解决方法与真正的POSIX标准有一些不兼容的地方,尤其是在信号处理、进程调度和进程间同步原语方面。

要提高LinuxThreads的效应很明显需要提供内核支持以及必须重写线程函数库。为了解决这个问题出现了两个互相竞争的项目:一个IBM的组的项目叫做NGPT(Next Generation POSIX Threads,下一代POSIX线程),另一个组是由Red Hat程序员组成的。2003年中NGPT被放弃,几乎与此同时NPTL公布了。

NPTL首次是随Red Hat Linux 9发表的。此前老式的Linux POSIX线程偶尔会发生系统无法产生线程的毛病,这个毛病的原因是因为在新线程开始的时候系统没有借机先占。当时的Windows系统对这个问题的解决比较好。Red Hat在关于Red Hat Linux 9上的Java的网页上发表了一篇文章称NPTL解决了这个问题[3]。

从第3版开始NPTL是Red Hat Enterprise Linux的一部分,从Linux内核2.6开始它被纳入内核。当前它完全被结合入GNU C 库。

NPTL的解决方法与LinuxThreads类似,内核看到的首要抽象依然是一个进程,新线程是通过clone()系统调用产生的。但是NPTL需要特殊的内核支持来解决同步的原始类型之间互相竞争的状况。在这种情况下线程必须能够入眠和再复苏。用来完成这个任务的原始类型叫做futex

NPTL是一个所谓的1×1线程函数库。用户产生的线程与内核能够分配的对象之间的联系是一对一的。这是所有线程程序中最简单的。

互斥锁

GNU C Library (GNU libc) stable release version 2.17, by Roland McGrath et al.


pthread_mutex_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Data structures for mutex handling.  The structure of the attribute
type is deliberately not exposed. */
typedef union
{
struct __pthread_mutex_s
{
int __lock;
unsigned int __count;
int __owner;
unsigned int __nusers;
/* KIND must stay at this position in the structure to maintain
binary compatibility. */
int __kind;
int __spins;
__pthread_list_t __list;
#define __PTHREAD_MUTEX_HAVE_PREV 1
} __data;
char __size[__SIZEOF_PTHREAD_MUTEX_T];
long int __align;
} pthread_mutex_t;

PTHREAD_MUTEX_INITIALIZER

静态初始化的宏

1
2
3
4
5
6
7
static pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER; 
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;

/* Mutex initializers. */
#ifdef __PTHREAD_MUTEX_HAVE_PREV
# define PTHREAD_MUTEX_INITIALIZER \
{ { 0, 0, 0, 0, 0, 0, { 0, 0 } } }

pthread_mutex_init

动态初始化的函数(malloc或者分配在共享内存区)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
int
__pthread_mutex_init (mutex, mutexattr)
pthread_mutex_t *mutex;
const pthread_mutexattr_t *mutexattr;
{
const struct pthread_mutexattr *imutexattr;

assert (sizeof (pthread_mutex_t) <= __SIZEOF_PTHREAD_MUTEX_T);

imutexattr = (const struct pthread_mutexattr *) mutexattr ?: &default_attr;

/* Sanity checks. */
switch (__builtin_expect (imutexattr->mutexkind
& PTHREAD_MUTEXATTR_PROTOCOL_MASK,
PTHREAD_PRIO_NONE
<< PTHREAD_MUTEXATTR_PROTOCOL_SHIFT))
{
case PTHREAD_PRIO_NONE << PTHREAD_MUTEXATTR_PROTOCOL_SHIFT:
break;

case PTHREAD_PRIO_INHERIT << PTHREAD_MUTEXATTR_PROTOCOL_SHIFT:
#ifndef __ASSUME_FUTEX_LOCK_PI
if (__builtin_expect (tpi_supported == 0, 0))
{
int lock = 0;
INTERNAL_SYSCALL_DECL (err);
int ret = INTERNAL_SYSCALL (futex, err, 4, &lock, FUTEX_UNLOCK_PI,
0, 0);
assert (INTERNAL_SYSCALL_ERROR_P (ret, err));
tpi_supported = INTERNAL_SYSCALL_ERRNO (ret, err) == ENOSYS ? -1 : 1;
}
if (__builtin_expect (tpi_supported < 0, 0))
return ENOTSUP;
#endif
break;

default:
/* XXX: For now we don't support robust priority protected mutexes. */
if (imutexattr->mutexkind & PTHREAD_MUTEXATTR_FLAG_ROBUST)
return ENOTSUP;
break;
}

/* Clear the whole variable. */
memset (mutex, '\0', __SIZEOF_PTHREAD_MUTEX_T);

/* Copy the values from the attribute. */
mutex->__data.__kind = imutexattr->mutexkind & ~PTHREAD_MUTEXATTR_FLAG_BITS;

if ((imutexattr->mutexkind & PTHREAD_MUTEXATTR_FLAG_ROBUST) != 0)
{
#ifndef __ASSUME_SET_ROBUST_LIST
if ((imutexattr->mutexkind & PTHREAD_MUTEXATTR_FLAG_PSHARED) != 0
&& __set_robust_list_avail < 0)
return ENOTSUP;
#endif

mutex->__data.__kind |= PTHREAD_MUTEX_ROBUST_NORMAL_NP;
}

switch (imutexattr->mutexkind & PTHREAD_MUTEXATTR_PROTOCOL_MASK)
{
case PTHREAD_PRIO_INHERIT << PTHREAD_MUTEXATTR_PROTOCOL_SHIFT:
mutex->__data.__kind |= PTHREAD_MUTEX_PRIO_INHERIT_NP;
break;

case PTHREAD_PRIO_PROTECT << PTHREAD_MUTEXATTR_PROTOCOL_SHIFT:
mutex->__data.__kind |= PTHREAD_MUTEX_PRIO_PROTECT_NP;

int ceiling = (imutexattr->mutexkind
& PTHREAD_MUTEXATTR_PRIO_CEILING_MASK)
>> PTHREAD_MUTEXATTR_PRIO_CEILING_SHIFT;
if (! ceiling)
{
if (__sched_fifo_min_prio == -1)
__init_sched_fifo_prio ();
if (ceiling < __sched_fifo_min_prio)
ceiling = __sched_fifo_min_prio;
}
mutex->__data.__lock = ceiling << PTHREAD_MUTEX_PRIO_CEILING_SHIFT;
break;

default:
break;
}

/* The kernel when waking robust mutexes on exit never uses
FUTEX_PRIVATE_FLAG FUTEX_WAKE. */
if ((imutexattr->mutexkind & (PTHREAD_MUTEXATTR_FLAG_PSHARED
| PTHREAD_MUTEXATTR_FLAG_ROBUST)) != 0)
mutex->__data.__kind |= PTHREAD_MUTEX_PSHARED_BIT;

/* Default values: mutex not used yet. */
// mutex->__count = 0; already done by memset
// mutex->__owner = 0; already done by memset
// mutex->__nusers = 0; already done by memset
// mutex->__spins = 0; already done by memset
// mutex->__next = NULL; already done by memset

LIBC_PROBE (mutex_init, 1, mutex);

return 0;
}

比较复杂,不研究了

unp:

你可能会碰到省略了初始化操作的代码,因为它所在的实现把初始化常量定义为0(而且静态分配的变量被自动得初始化为0),不过这是不正确的代码


__pthread_mutex_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//pthread_mutex_lock.c
int
__pthread_mutex_lock (mutex)
pthread_mutex_t *mutex;
{
assert (sizeof (mutex->__size) >= sizeof (mutex->__data));

unsigned int type = PTHREAD_MUTEX_TYPE (mutex);

LIBC_PROBE (mutex_entry, 1, mutex);

if (__builtin_expect (type & ~PTHREAD_MUTEX_KIND_MASK_NP, 0))
return __pthread_mutex_lock_full (mutex);

pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP)
== PTHREAD_MUTEX_TIMED_NP)
{
simple:
/* Normal mutex. */
LLL_MUTEX_LOCK (mutex);
assert (mutex->__data.__owner == 0);
}
else if (__builtin_expect (type == PTHREAD_MUTEX_RECURSIVE_NP, 1))
{
//递归锁
/* Recursive mutex. */

/* Check whether we already hold the mutex. */
if (mutex->__data.__owner == id)
{
/* Just bump the counter. */
if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
/* Overflow of the counter. */
return EAGAIN;

++mutex->__data.__count;

return 0;
}

/* We have to get the mutex. */
LLL_MUTEX_LOCK (mutex);

assert (mutex->__data.__owner == 0);
mutex->__data.__count = 1;
}
else if (__builtin_expect (type == PTHREAD_MUTEX_ADAPTIVE_NP, 1))
{
if (! __is_smp)
goto simple;

if (LLL_MUTEX_TRYLOCK (mutex) != 0)
{
int cnt = 0;
int max_cnt = MIN (MAX_ADAPTIVE_COUNT,
mutex->__data.__spins * 2 + 10);
do
{
if (cnt++ >= max_cnt)
{
LLL_MUTEX_LOCK (mutex);
break;
}

#ifdef BUSY_WAIT_NOP
BUSY_WAIT_NOP;
#endif
}
while (LLL_MUTEX_TRYLOCK (mutex) != 0);

mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;
}
assert (mutex->__data.__owner == 0);
}
else
{
assert (type == PTHREAD_MUTEX_ERRORCHECK_NP);
/* Check whether we already hold the mutex. */
if (__builtin_expect (mutex->__data.__owner == id, 0))
return EDEADLK;
goto simple;
}

/* Record the ownership. */
mutex->__data.__owner = id;
#ifndef NO_INCR
++mutex->__data.__nusers;
#endif

LIBC_PROBE (mutex_acquired, 1, mutex);

return 0;
}

  • 如果是普通锁,调用LLL_MUTEX_LOCK,进行CAS(Compare-and-Swap)操作,失败则执行系统调用sys_futex陷入内核态
  • 如果是递归锁,判断是否是当前进程(线程)持有锁。然后将count计数器加一,如果count等于0,则return EAGAIN
  • 如果是自适应锁,通过非阻塞的LLL_MUTEX_TRYLOCK自旋到最大次数后,执行LLL_MUTEX_LOCK
  • 如果是检错锁,判断是否是当前进程(线程)重复加锁,返回EDEADLK

__pthread_mutex_unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
int
__pthread_mutex_unlock (mutex)
pthread_mutex_t *mutex;
{
return __pthread_mutex_unlock_usercnt (mutex, 1);
}

int
internal_function attribute_hidden
__pthread_mutex_unlock_usercnt (mutex, decr)
pthread_mutex_t *mutex;
int decr;
{
int type = PTHREAD_MUTEX_TYPE (mutex);
if (__builtin_expect (type & ~PTHREAD_MUTEX_KIND_MASK_NP, 0))
return __pthread_mutex_unlock_full (mutex, decr);

if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP)
== PTHREAD_MUTEX_TIMED_NP)
{
/* Always reset the owner field. */
normal:
mutex->__data.__owner = 0;
if (decr)
/* One less user. */
--mutex->__data.__nusers;

/* Unlock. */
lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex));

LIBC_PROBE (mutex_release, 1, mutex);

return 0;
}
else if (__builtin_expect (type == PTHREAD_MUTEX_RECURSIVE_NP, 1))
{
/* Recursive mutex. */
if (mutex->__data.__owner != THREAD_GETMEM (THREAD_SELF, tid))
return EPERM;

if (--mutex->__data.__count != 0)
/* We still hold the mutex. */
return 0;
goto normal;
}
else if (__builtin_expect (type == PTHREAD_MUTEX_ADAPTIVE_NP, 1))
goto normal;
else
{
/* Error checking mutex. */
assert (type == PTHREAD_MUTEX_ERRORCHECK_NP);
if (mutex->__data.__owner != THREAD_GETMEM (THREAD_SELF, tid)
|| ! lll_islocked (mutex->__data.__lock))
return EPERM;
goto normal;
}
}
  • 如果是普通锁,执行lll_unlock
  • 如果是递归锁,判断是否是当前的进程(线程)持有锁,如果是,count计数器减一,count为0的话,返回
  • 如果是自适应锁,执行lll_unlock
  • 如果是检错锁,判断是否是当前的进程(线程)持有锁,如果是,执行lll_unlock。如果不是,返回EPERM

__pthread_mutex_trylock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
int
__pthread_mutex_trylock (mutex)
pthread_mutex_t *mutex;
{
int oldval;
pid_t id = THREAD_GETMEM (THREAD_SELF, tid);

switch (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex),
PTHREAD_MUTEX_TIMED_NP))
{
/* Recursive mutex. */
case PTHREAD_MUTEX_RECURSIVE_NP:
/* Check whether we already hold the mutex. */
if (mutex->__data.__owner == id)
{
/* Just bump the counter. */
if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
/* Overflow of the counter. */
return EAGAIN;

++mutex->__data.__count;
return 0;
}

if (lll_trylock (mutex->__data.__lock) == 0)
{
/* Record the ownership. */
mutex->__data.__owner = id;
mutex->__data.__count = 1;
++mutex->__data.__nusers;
return 0;
}
break;

case PTHREAD_MUTEX_ERRORCHECK_NP:
case PTHREAD_MUTEX_TIMED_NP:
case PTHREAD_MUTEX_ADAPTIVE_NP:
/* Normal mutex. */
if (lll_trylock (mutex->__data.__lock) != 0)
break;

/* Record the ownership. */
mutex->__data.__owner = id;
++mutex->__data.__nusers;

return 0;

case PTHREAD_MUTEX_ROBUST_RECURSIVE_NP:
case PTHREAD_MUTEX_ROBUST_ERRORCHECK_NP:
case PTHREAD_MUTEX_ROBUST_NORMAL_NP:
case PTHREAD_MUTEX_ROBUST_ADAPTIVE_NP:
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
&mutex->__data.__list.__next);

oldval = mutex->__data.__lock;
do
{
again:
if ((oldval & FUTEX_OWNER_DIED) != 0)
{
/* The previous owner died. Try locking the mutex. */
int newval = id | (oldval & FUTEX_WAITERS);

newval
= atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
newval, oldval);

if (newval != oldval)
{
oldval = newval;
goto again;
}

/* We got the mutex. */
mutex->__data.__count = 1;
/* But it is inconsistent unless marked otherwise. */
mutex->__data.__owner = PTHREAD_MUTEX_INCONSISTENT;

ENQUEUE_MUTEX (mutex);
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);

/* Note that we deliberately exist here. If we fall
through to the end of the function __nusers would be
incremented which is not correct because the old
owner has to be discounted. */
return EOWNERDEAD;
}

/* Check whether we already hold the mutex. */
if (__builtin_expect ((oldval & FUTEX_TID_MASK) == id, 0))
{
int kind = PTHREAD_MUTEX_TYPE (mutex);
if (kind == PTHREAD_MUTEX_ROBUST_ERRORCHECK_NP)
{
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
NULL);
return EDEADLK;
}

if (kind == PTHREAD_MUTEX_ROBUST_RECURSIVE_NP)
{
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
NULL);

/* Just bump the counter. */
if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
/* Overflow of the counter. */
return EAGAIN;

++mutex->__data.__count;

return 0;
}
}

oldval = lll_robust_trylock (mutex->__data.__lock, id);
if (oldval != 0 && (oldval & FUTEX_OWNER_DIED) == 0)
{
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);

return EBUSY;
}

if (__builtin_expect (mutex->__data.__owner
== PTHREAD_MUTEX_NOTRECOVERABLE, 0))
{
/* This mutex is now not recoverable. */
mutex->__data.__count = 0;
if (oldval == id)
lll_unlock (mutex->__data.__lock,
PTHREAD_ROBUST_MUTEX_PSHARED (mutex));
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
return ENOTRECOVERABLE;
}
}
while ((oldval & FUTEX_OWNER_DIED) != 0);

ENQUEUE_MUTEX (mutex);
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);

mutex->__data.__owner = id;
++mutex->__data.__nusers;
mutex->__data.__count = 1;

return 0;

case PTHREAD_MUTEX_PI_RECURSIVE_NP:
case PTHREAD_MUTEX_PI_ERRORCHECK_NP:
case PTHREAD_MUTEX_PI_NORMAL_NP:
case PTHREAD_MUTEX_PI_ADAPTIVE_NP:
case PTHREAD_MUTEX_PI_ROBUST_RECURSIVE_NP:
case PTHREAD_MUTEX_PI_ROBUST_ERRORCHECK_NP:
case PTHREAD_MUTEX_PI_ROBUST_NORMAL_NP:
case PTHREAD_MUTEX_PI_ROBUST_ADAPTIVE_NP:
{
int kind = mutex->__data.__kind & PTHREAD_MUTEX_KIND_MASK_NP;
int robust = mutex->__data.__kind & PTHREAD_MUTEX_ROBUST_NORMAL_NP;

if (robust)
/* Note: robust PI futexes are signaled by setting bit 0. */
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending,
(void *) (((uintptr_t) &mutex->__data.__list.__next)
| 1));

oldval = mutex->__data.__lock;

/* Check whether we already hold the mutex. */
if (__builtin_expect ((oldval & FUTEX_TID_MASK) == id, 0))
{
if (kind == PTHREAD_MUTEX_ERRORCHECK_NP)
{
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
return EDEADLK;
}

if (kind == PTHREAD_MUTEX_RECURSIVE_NP)
{
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);

/* Just bump the counter. */
if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
/* Overflow of the counter. */
return EAGAIN;

++mutex->__data.__count;

return 0;
}
}

oldval
= atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
id, 0);

if (oldval != 0)
{
if ((oldval & FUTEX_OWNER_DIED) == 0)
{
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);

return EBUSY;
}

assert (robust);

/* The mutex owner died. The kernel will now take care of
everything. */
int private = (robust
? PTHREAD_ROBUST_MUTEX_PSHARED (mutex)
: PTHREAD_MUTEX_PSHARED (mutex));
INTERNAL_SYSCALL_DECL (__err);
int e = INTERNAL_SYSCALL (futex, __err, 4, &mutex->__data.__lock,
__lll_private_flag (FUTEX_TRYLOCK_PI,
private), 0, 0);

if (INTERNAL_SYSCALL_ERROR_P (e, __err)
&& INTERNAL_SYSCALL_ERRNO (e, __err) == EWOULDBLOCK)
{
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);

return EBUSY;
}

oldval = mutex->__data.__lock;
}

if (__builtin_expect (oldval & FUTEX_OWNER_DIED, 0))
{
atomic_and (&mutex->__data.__lock, ~FUTEX_OWNER_DIED);

/* We got the mutex. */
mutex->__data.__count = 1;
/* But it is inconsistent unless marked otherwise. */
mutex->__data.__owner = PTHREAD_MUTEX_INCONSISTENT;

ENQUEUE_MUTEX (mutex);
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);

/* Note that we deliberately exit here. If we fall
through to the end of the function __nusers would be
incremented which is not correct because the old owner
has to be discounted. */
return EOWNERDEAD;
}

if (robust
&& __builtin_expect (mutex->__data.__owner
== PTHREAD_MUTEX_NOTRECOVERABLE, 0))
{
/* This mutex is now not recoverable. */
mutex->__data.__count = 0;

INTERNAL_SYSCALL_DECL (__err);
INTERNAL_SYSCALL (futex, __err, 4, &mutex->__data.__lock,
__lll_private_flag (FUTEX_UNLOCK_PI,
PTHREAD_ROBUST_MUTEX_PSHARED (mutex)),
0, 0);

THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
return ENOTRECOVERABLE;
}

if (robust)
{
ENQUEUE_MUTEX_PI (mutex);
THREAD_SETMEM (THREAD_SELF, robust_head.list_op_pending, NULL);
}

mutex->__data.__owner = id;
++mutex->__data.__nusers;
mutex->__data.__count = 1;

return 0;
}

case PTHREAD_MUTEX_PP_RECURSIVE_NP:
case PTHREAD_MUTEX_PP_ERRORCHECK_NP:
case PTHREAD_MUTEX_PP_NORMAL_NP:
case PTHREAD_MUTEX_PP_ADAPTIVE_NP:
{
int kind = mutex->__data.__kind & PTHREAD_MUTEX_KIND_MASK_NP;

oldval = mutex->__data.__lock;

/* Check whether we already hold the mutex. */
if (mutex->__data.__owner == id)
{
if (kind == PTHREAD_MUTEX_ERRORCHECK_NP)
return EDEADLK;

if (kind == PTHREAD_MUTEX_RECURSIVE_NP)
{
/* Just bump the counter. */
if (__builtin_expect (mutex->__data.__count + 1 == 0, 0))
/* Overflow of the counter. */
return EAGAIN;

++mutex->__data.__count;

return 0;
}
}

int oldprio = -1, ceilval;
do
{
int ceiling = (oldval & PTHREAD_MUTEX_PRIO_CEILING_MASK)
>> PTHREAD_MUTEX_PRIO_CEILING_SHIFT;

if (__pthread_current_priority () > ceiling)
{
if (oldprio != -1)
__pthread_tpp_change_priority (oldprio, -1);
return EINVAL;
}

int retval = __pthread_tpp_change_priority (oldprio, ceiling);
if (retval)
return retval;

ceilval = ceiling << PTHREAD_MUTEX_PRIO_CEILING_SHIFT;
oldprio = ceiling;

oldval
= atomic_compare_and_exchange_val_acq (&mutex->__data.__lock,
ceilval | 1, ceilval);

if (oldval == ceilval)
break;
}
while ((oldval & PTHREAD_MUTEX_PRIO_CEILING_MASK) != ceilval);

if (oldval != ceilval)
{
__pthread_tpp_change_priority (oldprio, -1);
break;
}

assert (mutex->__data.__owner == 0);
/* Record the ownership. */
mutex->__data.__owner = id;
++mutex->__data.__nusers;
mutex->__data.__count = 1;

return 0;
}
break;

default:
/* Correct code cannot set any other type. */
return EINVAL;
}

return EBUSY;
}

非阻塞获取锁,具体不分析了


__pthread_mutex_destroy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int
__pthread_mutex_destroy (mutex)
pthread_mutex_t *mutex;
{
LIBC_PROBE (mutex_destroy, 1, mutex);

if ((mutex->__data.__kind & PTHREAD_MUTEX_ROBUST_NORMAL_NP) == 0
&& mutex->__data.__nusers != 0)
return EBUSY;

/* Set to an invalid value. */
mutex->__data.__kind = -1;

return 0;
}

摧毁互斥锁


四种锁的属性

Posix线程编程指南(3)

互斥锁的属性在创建锁的时候指定,在LinuxThreads实现中仅有一个锁类型属性,不同的锁类型在试图对一个已经被锁定的互斥锁加锁时表现不同。当前(glibc2.2.3,linuxthreads0.9)有四个值可供选择:

  • PTHREAD_MUTEX_TIMED_NP,这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性

  • PTHREAD_MUTEX_RECURSIVE_NP,嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。

  • PTHREAD_MUTEX_ADAPTIVE_NP 适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。线程旋转直到达到最大旋转计数或获得锁定为止https://stackoverflow.com/questions/19863734/what-is-pthread-mutex-adaptive-np

  • PTHREAD_MUTEX_ERRORCHECK_NP 检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁。

条件变量

pthread_cond_t

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* Data structure for conditional variable handling.  The structure of
the attribute type is deliberately not exposed. */
typedef union
{
struct
{
int __lock;
unsigned int __futex;
__extension__ unsigned long long int __total_seq;
__extension__ unsigned long long int __wakeup_seq;
__extension__ unsigned long long int __woken_seq;
void *__mutex;
unsigned int __nwaiters;
unsigned int __broadcast_seq;
} __data;
char __size[__SIZEOF_PTHREAD_COND_T];
__extension__ long long int __align;
} pthread_cond_t;


PTHREAD_COND_INITIALIZER

1
2
3
/* Conditional variable handling.  */
#define PTHREAD_COND_INITIALIZER { { 0, 0, 0, 0, 0, (void *) 0, 0, 0 } }

静态初始化


__pthread_cond_init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int
__pthread_cond_init (cond, cond_attr)
pthread_cond_t *cond;
const pthread_condattr_t *cond_attr;
{
struct pthread_condattr *icond_attr = (struct pthread_condattr *) cond_attr;

cond->__data.__lock = LLL_LOCK_INITIALIZER;
cond->__data.__futex = 0;
cond->__data.__nwaiters = (icond_attr != NULL
? ((icond_attr->value >> 1)
& ((1 << COND_NWAITERS_SHIFT) - 1))
: CLOCK_REALTIME);
cond->__data.__total_seq = 0;
cond->__data.__wakeup_seq = 0;
cond->__data.__woken_seq = 0;
cond->__data.__mutex = (icond_attr == NULL || (icond_attr->value & 1) == 0
? NULL : (void *) ~0l);
cond->__data.__broadcast_seq = 0;

LIBC_PROBE (cond_init, 2, cond, cond_attr);

return 0;
}

动态初始化


__pthread_cond_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
int
__pthread_cond_wait (cond, mutex)
pthread_cond_t *cond;
pthread_mutex_t *mutex;
{
struct _pthread_cleanup_buffer buffer;
struct _condvar_cleanup_buffer cbuffer;
int err;
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;

LIBC_PROBE (cond_wait, 2, cond, mutex);

/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);

/* Now we can release the mutex. */
err = __pthread_mutex_unlock_usercnt (mutex, 0);
if (__builtin_expect (err, 0))
{
lll_unlock (cond->__data.__lock, pshared);
return err;
}

/* We have one new user of the condvar. */
++cond->__data.__total_seq;
++cond->__data.__futex;
cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;

/* Remember the mutex we are using here. If there is already a
different address store this is a bad user bug. Do not store
anything for pshared condvars. */
if (cond->__data.__mutex != (void *) ~0l)
cond->__data.__mutex = mutex;

/* Prepare structure passed to cancellation handler. */
cbuffer.cond = cond;
cbuffer.mutex = mutex;

/* Before we block we enable cancellation. Therefore we have to
install a cancellation handler. */
__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);

/* The current values of the wakeup counter. The "woken" counter
must exceed this value. */
unsigned long long int val;
unsigned long long int seq;
val = seq = cond->__data.__wakeup_seq;
/* Remember the broadcast counter. */
cbuffer.bc_seq = cond->__data.__broadcast_seq;

do
{
unsigned int futex_val = cond->__data.__futex;

/* Prepare to wait. Release the condvar futex. */
lll_unlock (cond->__data.__lock, pshared);

/* Enable asynchronous cancellation. Required by the standard. */
cbuffer.oldtype = __pthread_enable_asynccancel ();

/* Wait until woken by signal or broadcast. */
lll_futex_wait (&cond->__data.__futex, futex_val, pshared);

/* Disable asynchronous cancellation. */
__pthread_disable_asynccancel (cbuffer.oldtype);

/* We are going to look at shared data again, so get the lock. */
lll_lock (cond->__data.__lock, pshared);

/* If a broadcast happened, we are done. */
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;

/* Check whether we are eligible for wakeup. */
val = cond->__data.__wakeup_seq;
}
while (val == seq || cond->__data.__woken_seq == val);

/* Another thread woken up. */
++cond->__data.__woken_seq;

bc_out:

cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;

/* If pthread_cond_destroy was called on this varaible already,
notify the pthread_cond_destroy caller all waiters have left
and it can be successfully destroyed. */
if (cond->__data.__total_seq == -1ULL
&& cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);

/* We are done with the condvar. */
lll_unlock (cond->__data.__lock, pshared);

/* The cancellation handling is back to normal, remove the handler. */
__pthread_cleanup_pop (&buffer, 0);

/* Get the mutex before returning. */
return __pthread_mutex_cond_lock (mutex);
}

无条件等待pthread_cond_wait(),必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()(或pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。

激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程。


__pthread_cond_timedwait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
int
__pthread_cond_timedwait (cond, mutex, abstime)
pthread_cond_t *cond;
pthread_mutex_t *mutex;
const struct timespec *abstime;
{
struct _pthread_cleanup_buffer buffer;
struct _condvar_cleanup_buffer cbuffer;
int result = 0;

/* Catch invalid parameters. */
if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)
return EINVAL;

int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;

/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);

/* Now we can release the mutex. */
int err = __pthread_mutex_unlock_usercnt (mutex, 0);
if (err)
{
lll_unlock (cond->__data.__lock, pshared);
return err;
}

/* We have one new user of the condvar. */
++cond->__data.__total_seq;
++cond->__data.__futex;
cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;

/* Work around the fact that the kernel rejects negative timeout values
despite them being valid. */
if (__builtin_expect (abstime->tv_sec < 0, 0))
goto timeout;

/* Remember the mutex we are using here. If there is already a
different address store this is a bad user bug. Do not store
anything for pshared condvars. */
if (cond->__data.__mutex != (void *) ~0l)
cond->__data.__mutex = mutex;

/* Prepare structure passed to cancellation handler. */
cbuffer.cond = cond;
cbuffer.mutex = mutex;

/* Before we block we enable cancellation. Therefore we have to
install a cancellation handler. */
__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);

/* The current values of the wakeup counter. The "woken" counter
must exceed this value. */
unsigned long long int val;
unsigned long long int seq;
val = seq = cond->__data.__wakeup_seq;
/* Remember the broadcast counter. */
cbuffer.bc_seq = cond->__data.__broadcast_seq;

while (1)
{
#if (!defined __ASSUME_FUTEX_CLOCK_REALTIME \
|| !defined lll_futex_timed_wait_bitset)
struct timespec rt;
{
# ifdef __NR_clock_gettime
INTERNAL_SYSCALL_DECL (err);
(void) INTERNAL_VSYSCALL (clock_gettime, err, 2,
(cond->__data.__nwaiters
& ((1 << COND_NWAITERS_SHIFT) - 1)),
&rt);
/* Convert the absolute timeout value to a relative timeout. */
rt.tv_sec = abstime->tv_sec - rt.tv_sec;
rt.tv_nsec = abstime->tv_nsec - rt.tv_nsec;
# else
/* Get the current time. So far we support only one clock. */
struct timeval tv;
(void) gettimeofday (&tv, NULL);

/* Convert the absolute timeout value to a relative timeout. */
rt.tv_sec = abstime->tv_sec - tv.tv_sec;
rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000;
# endif
}
if (rt.tv_nsec < 0)
{
rt.tv_nsec += 1000000000;
--rt.tv_sec;
}
/* Did we already time out? */
if (__builtin_expect (rt.tv_sec < 0, 0))
{
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;

goto timeout;
}
#endif

unsigned int futex_val = cond->__data.__futex;

/* Prepare to wait. Release the condvar futex. */
lll_unlock (cond->__data.__lock, pshared);

/* Enable asynchronous cancellation. Required by the standard. */
cbuffer.oldtype = __pthread_enable_asynccancel ();

#if (!defined __ASSUME_FUTEX_CLOCK_REALTIME \
|| !defined lll_futex_timed_wait_bitset)
/* Wait until woken by signal or broadcast. */
err = lll_futex_timed_wait (&cond->__data.__futex,
futex_val, &rt, pshared);
#else
unsigned int clockbit = (cond->__data.__nwaiters & 1
? 0 : FUTEX_CLOCK_REALTIME);
err = lll_futex_timed_wait_bitset (&cond->__data.__futex, futex_val,
abstime, clockbit, pshared);
#endif

/* Disable asynchronous cancellation. */
__pthread_disable_asynccancel (cbuffer.oldtype);

/* We are going to look at shared data again, so get the lock. */
lll_lock (cond->__data.__lock, pshared);

/* If a broadcast happened, we are done. */
if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;

/* Check whether we are eligible for wakeup. */
val = cond->__data.__wakeup_seq;
if (val != seq && cond->__data.__woken_seq != val)
break;

/* Not woken yet. Maybe the time expired? */
if (__builtin_expect (err == -ETIMEDOUT, 0))
{
timeout:
/* Yep. Adjust the counters. */
++cond->__data.__wakeup_seq;
++cond->__data.__futex;

/* The error value. */
result = ETIMEDOUT;
break;
}
}

/* Another thread woken up. */
++cond->__data.__woken_seq;

bc_out:

cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;

/* If pthread_cond_destroy was called on this variable already,
notify the pthread_cond_destroy caller all waiters have left
and it can be successfully destroyed. */
if (cond->__data.__total_seq == -1ULL
&& cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))
lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);

/* We are done with the condvar. */
lll_unlock (cond->__data.__lock, pshared);

/* The cancellation handling is back to normal, remove the handler. */
__pthread_cleanup_pop (&buffer, 0);

/* Get the mutex before returning. */
err = __pthread_mutex_cond_lock (mutex);

return err ?: result;
}

超时等待__pthread_cond_timedwait(),必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()(或pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。

激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程。


__pthread_cond_signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int
__pthread_cond_signal (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;

LIBC_PROBE (cond_signal, 1, cond);

/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);

/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)
{
/* Yes. Mark one of them as woken. */
++cond->__data.__wakeup_seq;
++cond->__data.__futex;

/* Wake one. */
if (! __builtin_expect (lll_futex_wake_unlock (&cond->__data.__futex, 1,
1, &cond->__data.__lock,
pshared), 0))
return 0;

lll_futex_wake (&cond->__data.__futex, 1, pshared);
}

/* We are done. */
lll_unlock (cond->__data.__lock, pshared);

return 0;
}

唤醒单个线程lll_futex_wake (&cond->__data.__futex, 1, pshared);


__pthread_cond_broadcast

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
int
__pthread_cond_broadcast (cond)
pthread_cond_t *cond;
{
LIBC_PROBE (cond_broadcast, 1, cond);

int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;
/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);

/* Are there any waiters to be woken? */
if (cond->__data.__total_seq > cond->__data.__wakeup_seq)
{
/* Yes. Mark them all as woken. */
cond->__data.__wakeup_seq = cond->__data.__total_seq;
cond->__data.__woken_seq = cond->__data.__total_seq;
cond->__data.__futex = (unsigned int) cond->__data.__total_seq * 2;
int futex_val = cond->__data.__futex;
/* Signal that a broadcast happened. */
++cond->__data.__broadcast_seq;

/* We are done. */
lll_unlock (cond->__data.__lock, pshared);

/* Do not use requeue for pshared condvars. */
if (cond->__data.__mutex == (void *) ~0l)
goto wake_all;

/* Wake everybody. */
pthread_mutex_t *mut = (pthread_mutex_t *) cond->__data.__mutex;

/* XXX: Kernel so far doesn't support requeue to PI futex. */
/* XXX: Kernel so far can only requeue to the same type of futex,
in this case private (we don't requeue for pshared condvars). */
if (__builtin_expect (mut->__data.__kind
& (PTHREAD_MUTEX_PRIO_INHERIT_NP
| PTHREAD_MUTEX_PSHARED_BIT), 0))
goto wake_all;

/* lll_futex_requeue returns 0 for success and non-zero
for errors. */
if (__builtin_expect (lll_futex_requeue (&cond->__data.__futex, 1,
INT_MAX, &mut->__data.__lock,
futex_val, LLL_PRIVATE), 0))
{
/* The requeue functionality is not available. */
wake_all:
//INT_MAX 唤醒所有等待的线程
lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);
}

/* That's all. */
return 0;
}

/* We are done. */
lll_unlock (cond->__data.__lock, pshared);

return 0;
}

唤醒所有等待的线程lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);


pthread_cond_destroy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
int
__pthread_cond_destroy (cond)
pthread_cond_t *cond;
{
int pshared = (cond->__data.__mutex == (void *) ~0l)
? LLL_SHARED : LLL_PRIVATE;

LIBC_PROBE (cond_destroy, 1, cond);

/* Make sure we are alone. */
lll_lock (cond->__data.__lock, pshared);

if (cond->__data.__total_seq > cond->__data.__wakeup_seq)
{
/* If there are still some waiters which have not been
woken up, this is an application bug. */
lll_unlock (cond->__data.__lock, pshared);
return EBUSY;
}

/* Tell pthread_cond_*wait that this condvar is being destroyed. */
cond->__data.__total_seq = -1ULL;

/* If there are waiters which have been already signalled or
broadcasted, but still are using the pthread_cond_t structure,
pthread_cond_destroy needs to wait for them. */
unsigned int nwaiters = cond->__data.__nwaiters;

if (nwaiters >= (1 << COND_NWAITERS_SHIFT))
{
/* Wake everybody on the associated mutex in case there are
threads that have been requeued to it.
Without this, pthread_cond_destroy could block potentially
for a long time or forever, as it would depend on other
thread's using the mutex.
When all threads waiting on the mutex are woken up, pthread_cond_wait
only waits for threads to acquire and release the internal
condvar lock. */
if (cond->__data.__mutex != NULL
&& cond->__data.__mutex != (void *) ~0l)
{
pthread_mutex_t *mut = (pthread_mutex_t *) cond->__data.__mutex;
lll_futex_wake (&mut->__data.__lock, INT_MAX,
PTHREAD_MUTEX_PSHARED (mut));
}

do
{
lll_unlock (cond->__data.__lock, pshared);

lll_futex_wait (&cond->__data.__nwaiters, nwaiters, pshared);

lll_lock (cond->__data.__lock, pshared);

nwaiters = cond->__data.__nwaiters;
}
while (nwaiters >= (1 << COND_NWAITERS_SHIFT));
}

return 0;
}

在释放或废弃条件变量之前,需要毁坏它

总结

  • pthread_cleanup_push和pthread_cleanup_pop作为线程取消的回调函数,在wait函数中执行,防止线程退出导致的锁未释放,进而出现死锁的情况
  • 条件变量机制不是异步信号安全的,也就是说,在信号处理函数中调用pthread_cond_signal()或者pthread_cond_broadcast()很可能引起死锁。
  • wait函数先解锁,然后加入睡眠列表,没有忙轮询的消耗,被其他线程”唤醒”后重新加锁
  • wait函数执行前当前线程必须已经对mutex加锁,并且锁的类型是普通锁或者适应锁

ending

80413347_p0_master1200.jpg

----------- ending -----------