Zach的博客

互斥锁和条件变量

并发编程的问题

考虑这样一个问题,有两个线程,A和B,他们对同一个全局变量执行递减操作。假设C编译器将递减运算转换成3条机器指令:从内存装载到寄存器、递减寄存器、从寄存器存储到内存。可能会出现以下情形:

  1. 线程A运行,把变量的值装载到一个寄存器中
  2. 系统把运行线程从A切换到B运行。A的寄存器被保存,B的寄存器则恢复。
  3. 线程B执行递减的全部操作,把新值存放到变量的变量中
  4. 线程A被恢复执行,A的寄存器被恢复,于是A从原来离开的地方开始执行,此时寄存器中保留的变量值是线程B执行递减前的值,这时候就出错了。

可以看到这种并发的错误是因为递减操作不是原子操作而造成的,线程的切换会中断某些步骤,从而出现不预期的错误。

简单实例

我们通过一个简单的例子来说明如何利用互斥锁来解决并发。程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define NLOOP 10
#define MAXP 5

int counter;

void *doit(void *);

int main(int argc, char **argv) {
pthread_t tids[MAXP];
int error;

for (int i=0; i<MAXP; i++) {
int *iptr = malloc(sizeof(int));
if (iptr == NULL) {
perror("malloc error");
return -1;
}
*iptr = i;
if ((error = pthread_create(&tids[i], NULL, doit, iptr)) != 0) {
errno = error;
perror("pthread create error");
return -1;
}
}

for (int i=0; i<MAXP; i++) {
if ((error = pthread_join(tids[i], NULL)) != 0) {
errno = error;
perror("pthread join error");
return -1;
}
}

return 0;
}

void *doit(void *arg) {
int val;
int idx = *((int *)arg);
free(arg);

for (int i=0; i<NLOOP; i++) {
val = counter;
printf("%d: %d\n", idx, val+1);
counter = val+1;
}

return NULL;
}

部分结果如下:

1
...

2: 1
2: 2
2: 3
2: 4
2: 5
3: 5
3: 6
1: 1
1: 2
0: 1
0: 2
0: 3
0: 4
0: 5
0: 6
0: 7
0: 8
0: 9
0: 10

...

可以看到当线程切换时,counter的值明显是错误的。

解决这种多个线程共享一个变量的问题是使用互斥锁(mutex,mutual exclusion)保护这个共享变量。互斥锁的作用是在线程访问该变量前必须持有互斥锁,否则线程将进入睡眠知道互斥锁可用。对于能够函数原型如下:

1
2
3
4
5
6
7
#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mptr);

int pthread_mutex_unlock(pthread_mutex_t *mptr);

/* 成功返回0,否则返回正的Exxx值*/

修改之后的程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define NLOOP 10
#define MAXP 5

int counter;
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *doit(void *);

void Pthread_mutex_lock(pthread_mutex_t *mptr);
void Pthread_mutex_unlock(pthread_mutex_t *mptr);

int main(int argc, char **argv) {
pthread_t tids[MAXP];
int error;

for (int i=0; i<MAXP; i++) {
int *iptr = malloc(sizeof(int));
if (iptr == NULL) {
perror("malloc error");
return -1;
}
*iptr = i;
if ((error = pthread_create(&tids[i], NULL, doit, iptr)) != 0) {
errno = error;
perror("pthread create error");
return -1;
}
}

for (int i=0; i<MAXP; i++) {
if ((error = pthread_join(tids[i], NULL)) != 0) {
errno = error;
perror("pthread join error");
return -1;
}
}

return 0;
}

void *doit(void *arg) {
int val;
int idx = *((int *)arg);
free(arg);

for (int i=0; i<NLOOP; i++) {
Pthread_mutex_lock(&counter_mutex);
val = counter;
printf("%d: %d\n", idx, val+1);
counter = val+1;
Pthread_mutex_unlock(&counter_mutex);
}

return NULL;
}

void Pthread_mutex_lock(pthread_mutex_t *mptr) {
int error;

if ((error = pthread_mutex_lock(mptr)) != 0) {
errno = error;
perror("pthread mutex lock error");
exit(-1);
}
}

void Pthread_mutex_unlock(pthread_mutex_t *mptr) {
int error;

if ((error = pthread_mutex_unlock(mptr)) != 0) {
errno = error;
perror("pthread mutex unlock error");
exit(-1);
}
}

运行结果如下:

1
0: 1
4: 2
4: 3
4: 4
4: 5
4: 6
4: 7
4: 8
4: 9
0: 10
4: 11
4: 12
0: 13
2: 14
3: 15
3: 16
3: 17
3: 18
1: 19
1: 20
1: 21
1: 22
1: 23
1: 24
1: 25
1: 26
1: 27
1: 28
0: 29
0: 30
0: 31
0: 32
0: 33
0: 34
0: 35
3: 36
3: 37
3: 38
3: 39
3: 40
3: 41
2: 42
2: 43
2: 44
2: 45
2: 46
2: 47
2: 48
2: 49
2: 50

结果是正确的。

条件变量

互斥锁适合于防止同时访问某个共享变量,但是有的时候我们需要另外某种在等待某个条件发生期间让我们的进入睡眠的东西,这个就是条件变量。这里有一个例子,pthread函数库并没有等待任意一个线程终止的接口,那么如何实现呢?我们可以利用条件变量来实现这一功能。关于条件变量的函数接口如下:

1
2
3
4
5
6
7
#include <pthread.h>

int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);

int pthread_cond_signal(pthread_cond_t *cptr);

/* 成功返回0,否则返回正的Exxx值*/

先看完整的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>

#define NLOOP 10
#define MAXP 5

int counter;
int t_done;
int flag[MAXP];
int nleft;

pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t tdone_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t tdone_cond = PTHREAD_COND_INITIALIZER;

void *doit(void *);

void Pthread_mutex_lock(pthread_mutex_t *mptr);
void Pthread_mutex_unlock(pthread_mutex_t *mptr);
void Pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);
void Pthread_cond_signal(pthread_cond_t *cptr);
void Pthread_join(pthread_t tid, void **status);

int main(int argc, char **argv) {
pthread_t tids[MAXP];
int error;

memset(flag, 0, sizeof(flag));
nleft = MAXP;

for (int i=0; i<MAXP; i++) {
int *iptr = malloc(sizeof(int));
if (iptr == NULL) {
perror("malloc error");
return -1;
}
*iptr = i;

if ((error = pthread_create(&tids[i], NULL, doit, iptr)) != 0) {
errno = error;
perror("pthread create error");
return -1;
}
}

while (nleft > 0) {
Pthread_mutex_lock(&tdone_mutex);
while (t_done == 0)
Pthread_cond_wait(&tdone_cond, &tdone_mutex);
for (int i=0; i<MAXP; i++) {
if (flag[i] == 1) { /* thread i already done */
printf("thread %d done\n", i);
Pthread_join(tids[i], NULL);
flag[i] = -1;
nleft--;
}
}
Pthread_mutex_unlock(&tdone_mutex);
}
return 0;
}

void *doit(void *arg) {
int val;
int idx = *((int *)arg);
free(arg);

for (int i=0; i<NLOOP; i++) {
Pthread_mutex_lock(&counter_mutex);
val = counter;
printf("%d: %d\n", idx, val+1);
counter = val+1;
Pthread_mutex_unlock(&counter_mutex);
}

Pthread_mutex_lock(&tdone_mutex);
flag[idx] = 1;
t_done++;
Pthread_cond_signal(&tdone_cond);
Pthread_mutex_unlock(&tdone_mutex);

return NULL;
}

void Pthread_mutex_lock(pthread_mutex_t *mptr) {
int error;

if ((error = pthread_mutex_lock(mptr)) != 0) {
errno = error;
perror("pthread mutex lock error");
exit(-1);
}
}

void Pthread_mutex_unlock(pthread_mutex_t *mptr) {
int error;

if ((error = pthread_mutex_unlock(mptr)) != 0) {
errno = error;
perror("pthread mutex unlock error");
exit(-1);
}
}

void Pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr) {
int error;

if((error = pthread_cond_wait(cptr, mptr)) != 0) {
errno = error;
perror("pthread condition wait error");
exit(-1);
}
}

void Pthread_cond_signal(pthread_cond_t *cptr) {
int error;

if ((error = pthread_cond_signal(cptr)) != 0) {
errno = error;
perror("pthread condition signal error");
exit(-1);
}
}

void Pthread_join(pthread_t tid, void **status) {
int error;

if ((error = pthread_join(tid, status)) != 0) {
errno = error;
perror("pthread join error");
exit(-1);
}
}
  1. 首先我们新建一个全局变量t_done表示当前终止的线程数量。
  2. 创建一个tdone_cond条件变量和与之相关联的互斥锁,通过持有该互斥锁期间递增该计数器并发送信号到该条件变量,一个线程通知朱循环自身即将终止。
  3. 主循环在持有条件变量相关联的互斥锁期间检查t_done,如果发现无事可做,那么主线程调用pthread_cond_wait等待信号,该函数把调用线程投入睡眠并释放调用线程持有的互斥锁,当调用线程后来从pthread_cond_wait返回时,
    线程再次只有该互斥锁。
  4. 我们用一个flag标志来记录线程的状态,1表示线程已经终止,但未被另外一个线程调用pthread_join,-1表示已经被调用了pthread_join。

每一个条件变量都需要关联一个互斥锁,因为条件通常是线程之间共享的某个变量的值。允许不同线程设置和测试该变量要求有一个与该变量相关联的互斥锁。举个栗子,如果之前的代码我们没有互斥锁,那么主循环的就是如下的形式:

1
2
while (t_done == 0)
Pthread_cond_wait(&tdone_cond, &tdone_mutex);

有这样的可能:主线程外的一个线程在主循环测试t_done == 0之后但是在调用pthread_cond_wait之前调用递增了t_done,那么这个信号就会永远的地丢失了(pthread_cond_wait一定要在pthread_cond_signal前调用)。同样的理由要求
pthread_cond_wait被调用时,其所关联的互斥锁是必须上锁的,该函数作为单个原子操作解锁该互斥锁并把调用线程投入睡眠也是出于这个理由。如果函数不先解锁,到返回是再给它上锁,调用线程不得不实现解锁事后上锁,测试t_done的循环就变成了

1
2
3
4
5
6
Pthread_mutex_lock(&tdone_mutex);
while (t_done == 0) {
Pthread_mutex_unlock(&tdone_mutex);
Pthread_cond_wait(&tdone_cond, &tdone_mutex);
Pthread_mutex_lock(&tdone_mutex);
}

然而会有这种情况:在Pthread_mutext_unlock与Pthread_cond_wait之间有另外的一个线程递增了t_done,那么这个信号也就永远消失了。

最后要说明的是,pthread_cond_signal通常唤醒等待相应条件上的单个线程,有时候一个线程需要唤醒多个等待条件的线程,这时候可以调用pthread_cond_broadcast,函数原型如下:

1
2
3
4
5
6
#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cptr);
int pthread_cond_timedwait(pthread_cond_t *cptr, pthread_mutext_t *mptr, const struct timespec *abstime);

/* 成功返回0,否则返回正的Exxx值 */