Zach的博客

用线程处理客户请求

线程

传统的UNIX模型中,如果需要异步地完成一个任务,通常我们只要fork一个进程就可以了,但是fork进程存在以下两个问题:

  1. fork的代价是昂贵的。fork需要把父进程的内存映像复制到子进程,并在子进程中复制所有的描述符,虽然现在的实现是写时复制,但是fork一样是昂贵的。
  2. fork返回之后父子进程通信需要利用IPC机制,比较费力。

线程被称为lightweight process,同一进程可以创建多个线程,这些线程共享进程内的全局内存,这使得线程通信变得容易。同时,线程创建的代码大大小于进程创建。但是线程也存在同步的问题。

同一进程内的所有线程共享全局变量外,还共享:

  • 进程指令
  • 大多数数据
  • 打开的文件(描述符)
  • 信号处理函数和信号变量
  • 当前工作目录
  • 用户ID和组ID

每个线程也有各自的:

  • 线程ID
  • 寄存器集合,包括程序计数器和栈指针
  • 栈,存放局部变量和返回地址
  • errno
  • 信号掩码
  • 优先级

线程操纵函数

创建线程

函数原型为:

1
2
3
4
5
#include <pthread.h>

int pthread_create(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *), void *arg);

/× 成功返回0,否则返回正的Exxx值×/

进程内的线程有一个线程ID标志,由tid返回。每个线程有许多属性:优先级、初始栈大小、是否应该成为一个守护进程等,若attr为NULL,则取默认值。线程的执行逻辑由func函数指针指定,函数接收一个void指针的
参数,第四个参数就是我们传递的参数,如果需要传递多个参数,把参数打包进一个结构指针即可。

等待线程结束

利用pthread_join等待一个给定线程终止(是的,不能等待任意一个线程终止,如果需要,必须利用其他手段)。函数原型为:

1
2
3
4
5
#include <pthread.h>

int pthread_join(pthread_t *tid, void **status);

/* 成功返回0,否则返回正的Exxx值 */

status通常是创建线程时func函数的返回值。

线程终止

让一个线程终止的方法之一是调用pthread_exit,函数原型如下:

1
2
3
#include <pthread.h>

void pthread_exit(void *status);

status不能局限于要终止的线程的对象,因为当线程终止时,对象也将消失。如果该线程未曾脱离(下面会说到),它的线程ID和退出状态将一直留存到调用进程内的某个其他线程对它调用pthread_join。

让一个线程终止的另外的两个方法:

  • 启动线程的函数可以返回,其返回值就是相应线程的终止状态。
  • 如果进程的main函数返回或者任何线程调用了exit,整个进程就终止,其中包括它的任何线程。

pthread_self与pthread_detach函数

pthread_self返回进程内该线程的ID,pthread_detach后,线程像守护进程一样,当它们终止时,所有的资源都被释放,不需要某个其他进程调用pthread_join。函数原型如下:

1
2
3
4
5
#include <pthread.h>

pthread_t pthread_self(void);

int pthread_detach(pthread_t tid); /* 成功返回0,否则返回正的Exxx值 */

一般情况下,上述函数的调用情况是

1
pthread_detach(pthread_self());

以此,线程让自己脱离。

线程安全函数

当一个函数修改了全局变量,那么这个函数不是线程安全的,换句话说该函数不可重入。考虑以下的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main(int argc char **argv) {
int listenfd, connfd;

...

for (;;) {
len = addrlen;
connfd = Accept(listenfd, cliaddr, &len);
Pthread_create(&tid, NULL, doit, &connfd);
}
}
static void* doit(void *arg) {
int connfd;

connfd = *((int *)arg);
...
}

看起来上述代码是没有问题的,但是考虑这样一种情况:

  1. accept返回,主线程调用pthread_create创建一个新的线程
  2. 调度新创建的线程执行,但是并没有执行到给connfd赋值就被切换了。
  3. 另一个连接就绪,accept返回,返回的描述符存入connfd
  4. 先前切换的线程被调度执行,这时候它所得到的connfd却并不是第一个连接的描述符了。

POSIX要求许多函数是线程安全的,即它们都是可重入的,这个要求通过对我们透明的库函数内部执行某种形式的同步达到,线程安全函数见UNP P542。

线程特定数据

把一个未线程化的程序转换成使用线程的版本时,有时会碰到因其中有函数使用静态变量而引起的一个常见变成错误。解决这样的错误有许多方法,这里我们介绍使用线程特定数据的方法。

每个系统支持优先数量的线程特定数据,POSIX要求这个限制不小于128.系统为每个进程维护一个我们称之为Key的结构数据,如图所示:

key_tsd.png

flag表示该项是否被使用,析构函数用于线程终止时释放线程占用的数据内存,由编程人员指定。

这个Key是进程范围内的,即所有线程共享该数组。除此之外,系统还为每个线程维护特定于线程的Pthread结构,其部分内容是我们称之为pkey数据的128个元素的指针数组。如下所示:

pkey_tsd.png

pkey数组的所有元素都被初始化为空指针,这128个指针和进程内的128个可能的Key是逐一关联的。

举一个例子来说明线程特定的数据如何使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static pthread_key_t rl_key;
static pthread_once_t rl_once = PTHREAD_ONCE_INIT;

static void readline_destructor(void *ptr) {
free(ptr);
}

static void readline_once(void) {
Pthread_key_create(&rl_key, readline_destructor);
}

ssize_t thread_readline(int fd, void *vptr, size_t maxlen) {
...

pthread_once(&rl_once, readline_once);

if ((ptr = pthread_getspecific(rl_key)) == NULL) {
ptr = Malloc(...);
pthread_setspecific(rl_key, ptr);
}
...
/* use values pointed to by ptr */
}

当进程内第一个线程调用thread_readline时,一个Key被创建,同时注册对应的析构函数。之后进程内线程的状态如下:

thread_statues1.png

这时候如果有另外一个线程调用thread_readline,那么pthread_once之前执行过一次,Key已经创建(这两次析构函数是一样的),线程取该Key对应的内存指针,返回为空,于是线程创建一个新的内存区域。进程内线程的状态变为如下所示:

thread_statues2.png

可以看到每一个线程有特定于线程自身的缓冲区域,这样就不会发生同步的问题。

可以给出用到的函数原型:

1
2
3
4
5
6
7
8
9
#include d.h>

int pthread_once(pthread_once_t *onceptr, void (*init)(void));

int pthread_key_create(pthread_key_t *keyptr, void (*destructor) (void *));

void *pthread_getspecific(pthread_key_t key);

void *pthread_setspecific(pthread_key_t key, const void *value);

注意pthread_once_t类型的变量在传入pthread_once函数之前,要初始化为PTHREAD_ONCE_INIT。

使用线程特定数据的readline函数

这里给出一个具体的例子,该例子是修改过的readline函数,它是利用了线程的特定数据以做到线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include "unp.h"
#include "unpthread.h"

static pthread_key_t rl_key;
static pthread_once_t rl_once = PTHREAD_ONCE_INIT;

static void readline_destructor(void *ptr) {
free(ptr);
}

static void readline_once(void) {
Pthread_key_create(&rl_key, readline_destructor);
}

typedef struct {
int rl_cnt;
char *rl_bufptr;
char rl_buf[MAXLINE];
}Rline;

static ssize_t my_read(Rline *tsd, int fd, char *ptr) {
if (tsd -> rl_cnt <= 0) {
/* no data in buffer */
again:
if ((tsd -> rl_cnt = read(fd, tsd -> rl_buf, MAXLINE)) < 0) {
if (errno == EINTR)
goto again;
return -1;
}
if (tsd -> rl_cnt == 0) {
/* EOF */
return 0;
} else {
tsd -> rl_bufptr = tsd -> rl_buf;
}
}

tsd -> rl_cnt--;
*ptr = *tsd -> rl_bufptr++;
return 1;
}

ssize_t thread_readline(int fd, void *vptr, size_t maxlen) {
Rline *tsd;
size_t n, rc;
char c, *ptr;

Pthread_once(&rl_once, readline_once);

if ((tsd = pthread_getspecific(rl_key)) == NULL) {
/* alloc thread specific data */
tsd = Calloc(1, sizeof(Rline));
pthread_setspecific(rl_key, tsd);
}

ptr = vptr;
for (n = 1; n<maxlen; n++) {
if ((rc = my_read(tsd, fd, &c)) == 1) {
*ptr++ = c;
if (c == '\n')
break;
} else if (rc == 0) {
/* end of file */
*ptr = 0;
return n-1;
} else {
/* error */
return -1;
}
}

*ptr = 0;
return n;
}