Zach的博客

线程基础

线程标识

每一个线程有一个线程ID,线程ID不像进程ID一样在整个系统唯一,它只在进程的上下文中才有意义。线程ID用pthread_t结构表示,不同的实现下,pthread_t的内部表示不同,Linux用长整型表示,而Mac OS X下用指针来表示,所以不能简单的将其转换成长整型来比较。用以下接口来获得和比较线程ID:

1
2
3
4
5
#include <pthread.h>

int pthread_equal(pthread_t tid1, pthread_t tid2);

pthread_t pthread_self(void);

线程创建

创建线程的接口:

1
2
3
4
5
#include <pthread.h>

int pthread_create(pthread_t *restrict tidp,
const pthread_attr_t *restrict attr,
void *(start_rtn)(void *), void *restrict arg);

pthread函数在发生错误时通常会返回错误码,而不去依赖全局的errno

pthread_attr_t指定创建的线程的属性,如果只需要默认的属性,将其设置为NULL即可。

POSIX.1定义的线程属性如下:

  • detachstate:线程的分离状态属性
  • guardsize:线程栈尾的警戒缓冲区大小
  • stackaddr:线程栈的最低地址
  • stacksize:线程栈的最小长度

线程创建后默认状态是PTHREAD_CREATE_JOINABLE,其终止状态,即线程执行最后的返回参数或者调用pthread_exit时传递的参数会保留知道对该线程调用pthread_join。如果线程已经分离,那么线程的底层存储资源可以在线程终止时立即被收回。可以在创建时指定detachstatePTHREAD_CREATE_DETACHED或者创建之后调用pthread_detach函数来指定线程分离。在线程被分离之后,我们不能调用pthread_join来等待它的终止状态,这会产生未定义的行为。

线程属性guardsize控制线程栈末尾(栈顶上方)之后用于避免栈溢出的扩展内存大小。这个默认值和具体的实现相关,如果设置为0,那么该机制就无效了。同样的,如果修改了线程的stackaddr属性,那么系统就认为我们要自己管理线程栈,该缓冲区机制无效。在这种缓冲机制下,如果线程栈指针溢出到了警戒区域,那么应用程序就可能通过信号接收到出错信息。

stackaddrstacksize顾名思义就是用来管理新创建线程的栈空间的。stackaddr线程属性被定义为栈的最低内存地址,一般来说栈是向低地址增长的,所以stackaddr一般来说是栈顶位置。如果只是希望改变栈的大小,而不自己管理栈空间,可以只设置stacksize属性,该属性值不能小于PTHREAD_STACK_MIN

相关接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <pthread.h>

// 初始化属性,销毁属性对象
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t *attr);

// 分离状态
/*
* PTHREAD_CREATE_JOINABLE
* PTHREAD_CREATE_DETACHED
*/

int pthread_attr_getdetachstate(const pthread_attr_t *restrict attr,
int *detachstate)
;

int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);

int pthread_detach(pthread_t tid);

// 设置线程栈
int pthread_attr_getstack(const pthread_attr_t *restrict attr,
void **restrict stackaddr,
size_t *restrict stacksize)
;

int pthread_attr_setstack(pthread_attr_t *attr,
void *stackaddr,
size_t stacksize)
;

int pthread_attr_getstacksize(const pthread_attr_t *restrict attr,
size_t *restrict stacksize)
;

int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);

// 警戒区大小
int pthread_attr_getguardsize(const pthread_attr_t *restrict attr,
size_t *restrict guardsize)
;

int pthread_attr_setguardsize(pthread_attr_t *attr,
size_t guardsize)
;

一个设置线程分离状态的帮助函数例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <pthread.h>

int makethread(void *(*fn)(void *), void *arg) {
int err;
pthread_t tid;
pthread_attr_t attr;

err = pthread_attr_init(&attr);
if (err != 0)
return err;
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (err == 0) {
err = pthread_create(&tid, &attr, fn, arg);
}
pthread_attr_destroy(&attr);

return err;
}

线程终止

如果进程中有任何一个线程调用了exit_Exit或者_exit中任何一个函数,那么整个进程就会终止。同样的,如果发送到某一个线程的信号的默认动作是终止进程,那么整个进程也就会被终止。

单个线程可以通过3中方式退出而不终止整个进程:

  • 从线程的启动例程返回,返回值是退出码
  • 线程可以被同一个进程中的其他线程取消
  • 线程调用pthread_exit

如果一个线程未被分离,那么可以用pthread_join函数获得线程的退出状态码。

相关的接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
#include <pthread.h>

void pthread_exit(void *rval_ptr);

int pthread_join(pthread_t thread, void **rval_ptr);

// 取消线程
int pthread_cancel(pthread_t tid);

// 清理函数
void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);

线程的退出的例子可以参考:pthread_exit

需要注意的一点是,如果线程从启动例程正常返回,那么rval_ptr就包含返回码;如果线程被取消,由rval_ptr指定的内存单元就设置为PTHREAD_CANCELED

线程可以像进程一样安排它退出时需要调用的线程清理程序。一个线程可以建立多个清理处理程序,程序的执行顺序和建立顺序相反。

当线程执行以下动作时,清理函数被调度执行:

  • 调用pthread_exit
  • 响应取消请求
  • 用非零execute参数调用pthread_cleanup_pop

如果execute参数设置为0,那么清理函数不会被调用,但是无论execute参数如何,pthread_cleanup_pop都将删除最近一次pthread_cleanup_push建立的清理处理程序。

这里需要注意一点,pthread_cleanup_pushpthread_cleanup_pop可能被实现为宏,所以必须在与线程相同的作用域中以匹配对的形式使用。

一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

void cleanup(void *arg) {
char *msg = (char *)arg;

printf("%s\n", msg);
}

void *thr_fn1(void *arg) {
printf("thread 1 start\n");
pthread_cleanup_push(cleanup, "thread1 first cleanup");
pthread_cleanup_push(cleanup, "thread1 second cleanup");

if (arg)
return (void *)1;

pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
return (void *)1;
}

void *thr_fn2(void *arg) {
printf("thread 2 start\n");

pthread_cleanup_push(cleanup, "thread2 first cleanup");
pthread_cleanup_push(cleanup, "thread2 second cleanup");

if (arg)
pthread_exit((void *)2);

pthread_cleanup_pop(0);
pthread_cleanup_pop(0);

pthread_exit((void *)2);
}

int main(void) {
int err;
pthread_t tid1, tid2;

err = pthread_create(&tid1, NULL, thr_fn1, (void *)1);
err = pthread_create(&tid2, NULL, thr_fn2, (void *)2);

void *tret;
err = pthread_join(tid1, &tret);
printf("thread 1 ended with status: %ld\n", (long)tret);
err = pthread_join(tid2, &tret);
printf("thread2 ended with status: %ld\n", (long)tret);

return 0;
}

线程1只是简单地从启动例程返回,而线程2调用了pthread_exit以结束启动例程。

结果输出:

1
$ ./a.out 
thread 1 start
thread 2 start
thread 1 ended with status: 1
thread2 second cleanup
thread2 first cleanup
thread2 ended with status: 2

可以看到只有线程2调用了清理程序,而且清理程序的执行顺序和建立顺序相反。

线程取消的一些细节

前面提到线程可以被另外一个线程取消,但是这种行为是可以被关闭的。可取消状态属性控制着线程的是否响应取消请求,这个属性用两个值表示:

  • PTHREAD_CANCEL_ENABLE
  • PTHREAD_CANCEL_DISABLE

可以通过以下接口来修改线程的这一状态属性:

1
2
3
#include <pthread.h>

int pthread_setcancelstate(int state, int *oldstate);

在默认的情况下,线程在取消请求被发出后仍然继续执行,只有到某一个取消点时,线程才响应取消请求,执行清理函数。取消点会在调用某一个函数时出现,如调用了sleep函数。当线程的状态为PTHREAD_CANCEL_DISABLE时,如果发出了一个取消请求,那么线程不会被杀死,该请求被挂起,在状态更改为PTHREAD_CANCEL_ENABLE后,线程就会在下一个取消点响应这个取消请求。用户也可以调用pthread_testcancel这个函数,添加自己的取消点。

1
2
3
#include <pthread.h>

void pthread_testcancel(void);

一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

void cleanup(void *arg) {
char *msg = (char *)arg;

printf("%s\n", msg);
}

void *thr_fn1(void *arg) {
printf("waiting for canceling.\n");

/* pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); */

pthread_cleanup_push(cleanup, "first cleanup");
pthread_cleanup_push(cleanup, "second cleanup");
/* must put this after cleanup push so cleanup can be executed */
sleep(30);
/* pthread_testcancel(); */
pthread_exit((void *)0);

pthread_cleanup_pop(0);
pthread_cleanup_pop(0);
}

void *thr_fn2(void *arg) {
sleep(5);

pthread_t tid2cancel = (pthread_t)arg;

printf("cancel thread 1\n");
int err = pthread_cancel(tid2cancel);
if (err != 0) {
fprintf(stderr, "pthread_cancel error: %s\n", strerror(err));
}

return (void *)0;
}

int main(void) {
int err;
pthread_t tid1, tid2;
pthread_attr_t attr;

err = pthread_attr_init(&attr);
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

err = pthread_create(&tid1, NULL, thr_fn1, NULL);
err = pthread_create(&tid2, &attr, thr_fn2, (void *)tid1);

pthread_attr_destroy(&attr);

void *tret;
err = pthread_join(tid1, &tret);
if (tret == PTHREAD_CANCELED) {
printf("thread 1 is canceled\n");
} else {
printf("some error occurred\n");
}

return 0;
}

线程2在执行后马上睡眠,以便让线程1首先执行,在线程1建立完清理函数后,线程1睡眠,于是就有了一个取消点。线程2在睡眠结束后调用pthread_cancel 取消线程1,于是线程1响应这个请求,清理函数被执行。

结果如下:

1
$ ./a.out 
waiting for canceling.
cancel thread 1
second cleanup
first cleanup
thread 1 is canceled

我们也可以自己调用pthread_testcancel设置一个取消点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>

void cleanup(void *arg) {
char *msg = (char *)arg;

printf("%s\n", msg);
}

void *thr_fn1(void *arg) {
printf("waiting for canceling.\n");

/* pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); */

pthread_cleanup_push(cleanup, "first cleanup");
pthread_cleanup_push(cleanup, "second cleanup");
/* must put this after cleanup push so cleanup can be executed */
for (time_t t = time(NULL); t+5 > time(NULL); )
continue;
pthread_testcancel();
if (arg)
return (void *)0;

pthread_cleanup_pop(0);
pthread_cleanup_pop(0);

return (void *)0;
}

void *thr_fn2(void *arg) {
sleep(2);

pthread_t tid2cancel = (pthread_t)arg;

printf("cancel thread 1\n");
int err = pthread_cancel(tid2cancel);
if (err != 0) {
fprintf(stderr, "pthread_cancel error: %s\n", strerror(err));
}

return (void *)0;
}

int main(void) {
int err;
pthread_t tid1, tid2;
pthread_attr_t attr;

err = pthread_attr_init(&attr);
err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

err = pthread_create(&tid1, NULL, thr_fn1, NULL);
err = pthread_create(&tid2, &attr, thr_fn2, (void *)tid1);

pthread_attr_destroy(&attr);

void *tret;
err = pthread_join(tid1, &tret);
if (tret == PTHREAD_CANCELED) {
printf("thread 1 is canceled\n");
} else {
printf("some error occurred\n");
}

return 0;
}

线程1中,在设置了清理函数之后,我们让线程空转,以防止它过早的结束,否则线程2在调用pthread_cancel的时候就会出错。

运行结果如下:

1
$ ./a.out 
waiting for canceling.
cancel thread 1
second cleanup
first cleanup
thread 1 is canceled

线程同步

当有多个线程共享相同的内存时,需要确保每个线程看到一致的数据视图。当一个线程修改一个变量时,如果变量的修改时间多于一个存储器访问周期,那么就有可能出现不一致的情况。我们需要利用线程的同步机制来保证数据的一致。

互斥量

利用pthread提供的互斥接口,可以确保同一时间只有一个线程访问数据。互斥量是一把锁,线程在访问数据前必须获得这个锁,否则线程无法访问数据。互斥量用pthread_mutex_t表示,在使用该变量前,必须将其初始化,如果是静态分配的互斥量,那么可以把它设置为PTHREAD_MUTEX_INIITIALIZER,否则就需要调用pthread_mutex_init来初始化锁。

1
2
3
4
5
#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr)
;

int pthread_mutex_desctroy(pthread_mutex_t *mutex);

获得和释放互斥量的锁的接口如下:

1
2
3
4
5
6
7
#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr)
;

int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutex_lock会阻塞线程,知道它可以获得锁,如果线程不想被阻塞,可以调用pthread_mutex_trylock,如果互斥量未被加锁,那么线程获得互斥量的锁,否则,函数立即返回,返回的错误码为EBUSY

在使用互斥量的时候要注意避免死锁的情况:假设线程A拥有互斥量a的锁,线程B拥有互斥量b的锁,如果在某一个时刻,线程A请求b的锁,然后线程A被睡眠,此后线程B被唤醒,它请求a的锁,这个时候线程A和线程B被相互阻塞,出现了死锁。

为了避免这种情况,当我们用两个或者两个以上的互斥量的时候,需要注意请求互斥量的锁的顺序。比如在上述例子中,我们规定互斥量的a的锁必须在互斥量b的锁之前获取。这样一样,线程B在获得b的锁之前必须请求一次a的锁,这样的话线程B就会因线程A占有a的锁而被阻塞,线程A可以获得a和b的锁,当A执行完之后释放所有的锁,线程B也得到了执行。

互斥量的属性

在调用pthread_mutex_init时,可以看到函数还接受一个pthread_mutexattr_t变量参数,该参数指定了互斥量的属性,如果只需要默认的属性,那么将参数设置为NULL即可。

初始化和销毁接口:

1
2
3
4
#include <pthread.h>

int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);

互斥量有如下属性:

  • 进程共享属性:如果互斥量属于多个进程彼此共享的内存空间,那么该属性如果为PTHREAD_PROCESS_SHARED,那么不同进程可以利用这个互斥量进程同步;如果该属性为PTHREAD_PROCESS_PRIVATE,那么只有同一个进程的线程可以利用这个互斥量进程同步,这也是默认的行为
  • 健壮属性:不太懂
  • 一致性属性:不太懂
  • 类型属性:有四种类型的互斥量
    • PTHREAD_MUTEX_NORMAL:标准互斥量类型,不做任何特殊的错误检查和死锁检测
    • PTHREAD_MUTEX_ERRORCHECK:提供错误检查的互斥量
    • PTHREAD_MUTEX_RECURSIVE:递归互斥量类型,在获得互斥量的锁之后,允许再次获得锁,解锁时,必须调用相同次数的解锁操作。
    • PTHREAD_MUTEX_DEFAULT:提供默认特性和行为的互斥量,操作系统可以把这个类型映射到上述三种中的任意一个类型,可具体实现有关。

四种类型的锁的行为如下所示。不占用时解锁指:一个线程对被另外一个线程枷锁的互斥量进行解锁;在已解锁时解锁指:线程对一个已经解锁的互斥量再次解锁的情况。

互斥量类型 没有解锁时重新加锁 不占用时解锁 已解锁时解锁
PTHREAD_MUTEX_NORMAL 死锁 未定义 未定义
PTHREAD_MUTEX_ERRORCHECK 返回错误 返回错误 返回错误
PTHREAD_MUTEX_RECURSIVE 允许 返回错误 返回错误
PTHREAD_MUTEX_DEFAULT 未定义 未定义 未定义

设置类型的接口:

1
2
3
4
5
#include <pthread.h>

int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);
int pthread_mutexattr_gettype(const pthread_mutexattr_t *restrict attr,
int *restrict type)
;

一个递归锁的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>

#include "pthread_functions.h"

struct to_info {
void *(*fn)(void *);
void *to_arg;
int to_wait;
};

void *timer_helper(void *arg) {
struct to_info *tip;

tip = (struct to_info *)arg;
printf("sleep for %d secs\n", tip -> to_wait);
sleep(tip -> to_wait);
(tip -> fn)(tip -> to_arg);
free(arg);

return (void *)0;
}

void timeout(const int when, void *(*fn)(void *), void *arg) {
struct to_info *tip;

tip = (struct to_info *)malloc(sizeof(struct to_info));
if (tip != NULL) {
tip -> to_wait = when;
tip -> fn = fn;
tip -> to_arg = arg;

pthread_t tid;
int err = makethread(timer_helper, (void *)tip, &tid);

if (err != 0) {
fprintf(stderr, "makethread error: %s\n", strerror(err));
free(tip);
exit(-1);
}
}
}

pthread_mutex_t lock;
pthread_mutexattr_t attr;

static volatile int flag = 0;

void *retry(void *arg) {
pthread_mutex_lock(&lock);

printf("Recursive lock\n");
flag = 1;
pthread_mutex_unlock(&lock);

return (void *)0;
}

int main(void) {
int err;

err = pthread_mutexattr_init(&attr);
if (err != 0) {
fprintf(stderr, "pthread_mutexattr_init error: %s\n", strerror(err));
exit(-1);
}

err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
if (err != 0) {
fprintf(stderr, "pthread_mutexattr_settype error: %s\n", strerror(err));
exit(-1);
}

err = pthread_mutex_init(&lock, &attr);
if (err != 0) {
fprintf(stderr, "pthread_mutex_init error: %s\n", strerror(err));
exit(-1);
}

pthread_mutex_lock(&lock);

timeout(4, retry, NULL);

pthread_mutex_unlock(&lock);

while (!flag)
;

return 0;
}

条件变量

条件变量时线程可用的另外一种同步机制。条件变量给多个线程提供了一个会和的场所。条件变量和互斥变量一起使用时,允许线程以无竞争的方式等待特定的条件发生。条件变量本身是被互斥量保护的,线程在改变条件之前,必须锁住互斥量。

条件变量以pthread_cond_t结构表示,同样的,静态分配的变量可以用PTHREAD_COND_INITIALIZER初始化,动态分配的必须使用pthread_cond_init函数来初始化变量。

相关接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <pthread.h>

int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr)
;


int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex)
;


int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr)
;


int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

传递给pthread_cond_wait的互斥量堆条件进行保护,调用者把锁住的互斥量传递给函数,函数然后自动把调用线程放到等待条件的线程列表中,对互斥量解锁。当获得互斥量之后,互斥量再一次被锁住。

pthread_cond_signalpthread_cond_broadcast用于通知线程条件已经得到满足,pthread_cond_signal可以唤醒一个等待条件的线程,pthread_cond_broadcast则用于唤醒所有等待该条件的线程。

一个条件变量的简单例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>

int quitflag;
sigset_t mask;

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t waitloc = PTHREAD_COND_INITIALIZER;

void *thr_fn(void *arg) {
int err, signo;

for (;;) {
err = sigwait(&mask, &signo);
if (err != 0) {
fprintf(stderr, "sigwait failed: %s\n", strerror(err));
exit(-1);
}

switch (signo) {
case SIGINT: {
printf("\ninterrupt\n");
break;
}
case SIGQUIT: {
pthread_mutex_lock(&lock);
quitflag = 1;
pthread_mutex_unlock(&lock);
pthread_cond_signal(&waitloc);

return (void *)0;
}
default:
printf("unexpected signal: %d\n", signo);
exit(-1);
}
}
}

int main(void) {
int err;
sigset_t oldmask;
pthread_t tid;

/* sigemptyset(&mask); */
/* sigaddset(&mask, SIGQUIT); */
/* sigaddset(&mask, SIGINT); */
sigfillset(&mask);
if ((err = pthread_sigmask(SIG_BLOCK, &mask, &oldmask)) != 0) {
fprintf(stderr, "SIG_BLOCK error: %s\n", strerror(err));
exit(-1);
}

err = pthread_create(&tid, NULL, thr_fn, NULL);
if (err != 0) {
fprintf(stderr, "pthread_create error: %s\n", strerror(err));
exit(-1);
}

pthread_mutex_lock(&lock);
while (quitflag == 0) {
pthread_cond_wait(&waitloc, &lock);
}

printf("SIGQUIT received\n");

if (sigprocmask(SIG_SETMASK, &oldmask, NULL) != 0) {
fprintf(stderr, "SIG_SETMASK error\n");
exit(-1);
}
return 0;
}

在这里例子中,我们创建一个线程专门用于接收信号,如果SIGQUIT信号到来,那么主线程等待的条件得到满足,调用pthread_cond_signal通知这一事件,于是主线程被唤醒得以继续执行。

输出:

1
$ ./a.out 
^C
interrupt
^C
interrupt
^\SIGQUIT received

读写锁

读写锁也成为共享互斥锁。一个读写锁在读模式被锁住时,其他线程仍然可以以读模式锁住读写锁,但是不能以写模式锁住读写锁;而当读写锁以写模式锁住时,无论如何其他线程都无法锁住这个读写锁。读写锁适用于对数据结构读的次数远大于写的情况。

如果是静态分配的读写锁,可以将PTHREAD_RWLOCK_INITIALIZER赋予给它以初始化,否则需要调用pthread_rwlock_init函数来初始化读写锁。

相关接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <pthread.h>

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr)
;

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

// 非阻塞的读写锁
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

// 带超时的读写锁
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock,
const struct timespec *restrict tsptr)
;


int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock,
const struct timespec *restrict tsptr)
;

读写锁唯一支持的属性是进程共享属性,它和互斥量的进程共享属性是相同的,如果设置为PTHREAD_PROCESS_SHARED,那么多个进程可以利用它进行任务同步操作。

相关接口:

1
2
3
4
5
6
7
8
9
#include <pthread.h>

int pthread_rwlockattr_init(pthread_rwlockattr_t *attr);
int pthread_rwlockattr_destroy(pthread_rwlockattr_t *attr);

int pthread_rwlockattr_getpshared(const pthread_rwlockattr_t *restrict attr,
int *restrict pshared)
;

int pthread_rwlockattr_setpshared(pthread_rwlockattr_t *attr,
int pshared)
;

这里给出一个利用任务队列实现读写锁的例子,程序允许任务队列并发地搜索任务队列,但是要写入或者删除队列时,必须获得写模式的读写锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
struct job
{
int j_id;
struct job *j_prev;
struct job *j_next;
};

struct queue
{
pthread_rwlock_t q_lock;
struct job *q_head;
struct job *q_tail;
};

int queue_init(struct queue *qp) {

int err;

qp -> q_head = NULL;
qp -> q_tail = NULL;
err = pthread_rwlock_init(&qp -> q_lock, NULL);
if (err)
return err;
return 0;
}

void delay(int sec) {
time_t t;
for (t = time(NULL); t+sec > time(NULL); )
;
}

void job_insert(struct queue *qp, struct job *jp) {
// insert a job at head
pthread_rwlock_wrlock(&qp -> q_lock);
jp -> j_next = qp -> q_head;
jp -> j_prev = NULL;
if (qp -> q_head != NULL) {
qp -> q_head -> j_prev = jp;
} else {
// empty list
qp -> q_tail = jp;
}
qp -> q_head = jp;
pthread_rwlock_unlock(&qp -> q_lock);
}

void job_append(struct queue *qp, struct job *jp) {
// append a job to the list
pthread_rwlock_wrlock(&qp -> q_lock);
jp -> j_prev = qp -> q_tail;
jp -> j_next = NULL;
if (qp -> q_tail != NULL) {
qp -> q_tail -> j_next = jp;
} else {
qp -> q_head = jp;
}
qp -> q_tail = jp;
pthread_rwlock_unlock(&qp -> q_lock);
}

void job_remove(struct queue *qp, struct job *jp) {
pthread_rwlock_wrlock(&qp -> q_lock);
if (jp == qp -> q_head) {
qp -> q_head = jp -> j_next;
if (qp -> q_tail == jp) {
qp -> q_tail = NULL;
} else {
jp -> j_next -> j_prev = NULL;
}
} else if (jp == qp -> q_tail) {
qp -> q_tail = jp -> j_prev;
if (jp == qp -> q_head) {
qp -> q_head = NULL;
} else {
jp -> j_prev -> j_next = NULL;
}
} else {
jp -> j_prev -> j_next = jp -> j_next;
jp -> j_next -> j_prev = jp -> j_prev;
}
pthread_rwlock_unlock(&qp -> q_lock);
}

struct job *job_find(struct queue *qp, int id) {
struct job *jp;
if (pthread_rwlock_rdlock(&qp -> q_lock) != 0)
return NULL;

for (jp = qp -> q_head; jp != NULL; jp = jp -> j_next) {
if (jp -> j_id == id)
break;
}
pthread_rwlock_unlock(&qp -> q_lock);
return jp;
}

void traverse_jobs(struct queue *qp) {
struct job *jp;
int err;
if ((err = pthread_rwlock_rdlock(&qp -> q_lock)) != 0) {
fprintf(stderr, "pthread_rwlock_rdlock error: %s\n", strerror(err));
exit(-1);
}
printf("delay before traversing\n");
delay(5);
printf("Queue:");
for (jp = qp -> q_head; jp != NULL; jp = jp-> j_next) {
printf(" %d", jp -> j_id);
}
printf("\n");
pthread_rwlock_unlock(&qp -> q_lock);
}

#define NJOBS 20
struct queue *qp;
struct job* jp_arr[NJOBS];

void *thr_fn1(void *arg) {
for (int i=0; i<NJOBS; i++) {
printf("inserting\n");
struct job *jp = malloc(sizeof(struct job));
jp -> j_id = i;
if (i%2)
job_insert(qp, jp);
else
job_append(qp, jp);
jp_arr[i] = jp;
delay(1);
}

for (int i=0; i<NJOBS; i++) {
printf("deleting\n");
job_remove(qp, jp_arr[i]);
delay(1);
}
return (void *)0;
}

void *thr_fn2(void *arg) {
sleep(5);
while (qp -> q_head != qp -> q_tail) {
traverse_jobs(qp);
delay(3);
}
printf("Done\n");
return (void *)0;
}

int main(void) {
int err;
pthread_t tid1, tid2;
void *tret;

setbuf(stdout, NULL);

qp = (struct queue *)malloc(sizeof(struct queue));
err = queue_init(qp);
if (err != 0) {
fprintf(stderr, "error: %s\n", strerror(err));
exit(-1);
}

err = pthread_create(&tid1, NULL, thr_fn1, NULL);
if (err != 0) {
fprintf(stderr, "error: %s\n", strerror(err));
exit(-1);
}
err = pthread_create(&tid2, NULL, thr_fn2, NULL);
if (err != 0) {
fprintf(stderr, "error: %s\n", strerror(err));
exit(-1);
}
err = pthread_join(tid1, &tret);
err = pthread_join(tid2, &tret);

return 0;
}

屏障

屏障是用户协调多个线程并行工作的同步机制。它允许多个线程暂停等待,直到所有的合作线程都达到某一点,然后从该点继续执行。pthread_join就是一种屏障,它暂停当前线程的运行,直到指定的线程退出为止。

接口如下:

1
2
3
4
5
6
7
8
#include <pthread.h>

int pthread_barrier_init(pthread_barrier_t *restrict barriet,
const pthread_barrierattr_t *restrict attr,
unsigned int count)
;

int pthread_barrier_destroy(pthread_barrier_t *barrier);

int pthread_barrier_wait(pthread_barrier_t *barrier);

初始化屏障时,使用count参数指定,在允许所有线程继续运行之前,必须达到屏障的线程数目。对于任意一个线程,pthread_barrier_wait函数返回了PTHREAD_BARRIER_SERIAL_THREAD,剩下的函数看到的返回都是0,这使得一个线程可以作为主线程,它可以工作在其他所有线程已完成的工作结果上。

屏障目前定义的只有进程共享属性。

1
2
3
4
5
6
7
8
9
#include <pthread.h>

int pthread_barrierattr_init(pthread_barrierattr_t *attr);
int pthread_barrierattr_destroy(pthread_barrierattr_t *attr);

int pthread_barrierattr_getpshared(const pthread_barrierattr_t *restrict attr,
int *restrict pshared)
;

int pthread_barrierattr_setpshared(pthread_barrierattr_t *attr,
int pshared)
;

一个利用屏障来排序一个庞大的数组的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <stdio.h>
#include <limits.h>
#include <sys/time.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

#define NTHR 8
#define NUMNUM 8000000L
#define TNUM (NUMNUM / NTHR)

long nums[NUMNUM];
long snums[NUMNUM];

pthread_barrier_t barrier;

int cmplong(const void *arg1, const void *arg2) {
long l1 = *(long *)arg1;
long l2 = *(long *)arg2;

if (l1 == l2)
return 0;
else if (l1 < l2)
return -1;
else
return 1;
}

void *thr_fn(void *arg) {
long idx = (long)arg;

qsort(&nums[idx], TNUM, sizeof(long), cmplong);
pthread_barrier_wait(&barrier);

return (void *)0;
}

void merge() {
long idx[NTHR];
long minidx, num;

for (int i=0; i<NTHR; i++)
idx[i] = i*TNUM;
for (int sidx = 0; sidx<NUMNUM; sidx++) {
num = LONG_MAX;
for (int i=0; i<NTHR; i++) {
if (idx[i] < (i+1)*TNUM && num > nums[idx[i]]) {
num = nums[idx[i]];
minidx = i;
}
}

snums[sidx] = num;
idx[minidx] += 1;
}
}

int main(void) {
int err;
pthread_t tid;
struct timeval start, end;
long long startusec, endusec;
double elapsed;

srandom(1);
for (long long i=0; i<NUMNUM; i++) {
nums[i] = random();
}

gettimeofday(&start, NULL);
pthread_barrier_init(&barrier, NULL, NTHR+1);
for (int i=0; i<NTHR; i++) {
err = pthread_create(&tid, NULL, thr_fn, (void *)(i*TNUM));
if (err != 0) {
fprintf(stderr, "pthread_create error: %s\n", strerror(err));
exit(-1);
}
}
pthread_barrier_wait(&barrier);
merge();
gettimeofday(&end, NULL);

startusec = start.tv_sec * 1000000 + start.tv_usec;
endusec = end.tv_sec * 1000000 + end.tv_usec;
elapsed = (endusec - startusec) / 1000000.0;
printf("sort took %.4f seconds\n", elapsed);
for (int i=0; i<NUMNUM; i++) {
fprintf(stderr, "%ld\n", snums[i]);
}
return 0;
}

程序利用多个线程来并行地排序数组的某一个部分,然后主线程来合并各个线程的排序结果,我们比较一下它和单线程排序所花去的时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#define NUMNUM 8000000L

long nums[NUMNUM];

int cmplong(const void *arg1, const void *arg2) {
long l1 = *(long *)arg1;
long l2 = *(long *)arg2;

if (l1 == l2)
return 0;
else if (l1 < l2)
return -1;
else
return 1;
}

int main(void) {
struct timeval start, end;
long long startusec, endusec;

srandom(1);
for (long long i=0; i<NUMNUM; i++) {
nums[i] = random();
}

gettimeofday(&start, NULL);

qsort(nums, NUMNUM, sizeof(long), cmplong);

gettimeofday(&end, NULL);

startusec = start.tv_sec * 1000000 + start.tv_usec;
endusec = end.tv_sec * 1000000 + end.tv_usec;

double elapsed = (endusec - startusec) / 1000000.0;
printf("normal sort time: %.4f\n", elapsed);

return 0;
}
1
$ ./barrier_sort 2>/dev/null
sort took 0.7819 seconds

# zach41 @ zach41-H81M-S1 in ~/Desktop/UNIX-Demos/pthread on git:master x [14:02:50] 
$ ./norm_sort 2>/dev/null
normal sort time: 1.7947

可以看到多线程排序所有时间明显低于单线程排序。

线程特定数据

线程特定数据,也称为线程私有数据,是存储和查询某个特定线程相关数据的一种机制。一个线程可以访问所属进程的整个地址空间,除了使用寄存器以外,一个线程没有办法阻止另一个线程访问它的数据,线程特定数据也不例外。虽然底层的实现部分并不能阻止这种访问能力,但是管理线程特定数据的函数可以提高线程间数据独立性,使得线程不太容易访问到其他线程的线程特定数据。

具体的可以看用线程处理客户请求中的线程特定数据