Zach的博客

这个人很懒,都不知道说些什么 :(


  • 首页

  • 分类

  • 关于

  • 归档

  • 标签
Zach的博客

epoll入门

| 阅读次数

epoll

epoll是linux内核的可扩展I/O机制,旨在替代POSIX的select和poll函数,让需要大量文件操作符的程序拥有更佳的性能。

epoll接口

1
2
3
4
5
6
7
#include <sys/epoll.h>

int epoll_create(int size);

int epoll_ctl(int epollfd, int op, int fd, struct epoll_event *ev);

int epoll_wait(int epollfd, struct epoll_event *events, int maxevents, int timeout);
  • epoll_create创建一个epoll的句柄,参数size告知内核这个epoll需要监听的I/O事件的个数。函数返回一个描述符,在使用完epoll以后,我们需要手动关闭这个描述符,否则可能导致描述符耗尽。
  • epoll_ctl用来操纵epoll所监听的事件,参数op表示这一次操作,其值:

    • EPOLL_CTL_ADD:注册新的事件到epoll中
    • EPOLL_CTL_DEL:从epoll中删除一个事件
    • EPOLL_CTL_MOD:修改之前注册的一个事件

    epoll_event是对应的描述符的事件的数据结构,其结构如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
    } epoll_data_t;

    struct epoll_event {
    uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    };

    epoll_event中events是以下几个值的按位或的集合:

    • EPOLLIN:对应的描述符可读
    • EPOLLOUT:对应的描述符可写
    • EPOLLERR:对应的描述符发生错误
    • EPOLLRDHUP:TCP套接字对端被关闭或者用shutdown函数关闭了写半部。
    • EPOLLPRI:有紧急数据可读
    • EPOLLHUP:对应的文件描述符被挂断
    • EPOLLET:将EPOLL设为边缘触发
    • EPOLLONESHOT:只监听一次事件,当事件发生之后如果还要监听则需要再次把事件注册入队列。

    epoll的工作模式有两种:

    1. ET(edge trigger)模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理,如果不处理,下次调用epoll_wait时,不会再次向应用程序通知此事件。
    2. LT(level trigger)模式:当epoll_wait检测到描述符事件发生并通知此事件时,应用程序不需要立即处理,下次调用epoll_wait时,会再次通知此事件。

    ET模式很大程序上减少了epoll事件被重复触发的次数,因此效率较LT模式高。epoll工作在ET模式时必须使用非阻塞接口,以避免一个阻塞读/写操作把处理多个文件描述符的任务饿死。

  • epoll_wait等待事件的发生。maxevents参数告诉内核这次返回的事件最多有多少个,返回的事件存放在events参数对应的数组中,timeoout指定超时事件,若为-1则永久阻塞。

一个echo server/client的例子

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <strings.h>

#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <arpa/inet.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8080
#define MAXSIZE 1024
#define LISTENQ 20
#define EPOLLEVENTS 100
#define FDSIZE 1024

static int socket_bind(const char* ip, int port);
static void do_epoll(int listenfd);
static void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf);
static void handle_accept(int epollfd, int listenfd);
static void do_read(int epollfd, int fd, char *buf);
static void do_write(int epollfd, int fd, char *buf);
static void add_event(int epollfd, int fd, int state);
static void modify_event(int epollfd, int fd, int state);
static void delete_event(int epollfd, int fd, int state);

int main(void) {
int listenfd = socket_bind(IPADDRESS, PORT);
listen(listenfd, LISTENQ);
do_epoll(listenfd);
return 0;
}

static int socket_bind(const char *ip, int port) {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
perror("socket create error");
exit(-1);
}

struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

if (bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind socket error");
exit(-1);
}
return listenfd;
}

static void do_epoll(int listenfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
char buf[MAXSIZE];
memset(buf, 0, sizeof(buf));

epollfd = epoll_create(FDSIZE); /* create a epoll fd which can handles FDSIZE fds */
if (epollfd < 0) {
perror("create epoll error");
exit(-1);
}
add_event(epollfd, listenfd, EPOLLIN);

int ret;
for (;;) {
ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
if (ret < 0) {
perror("epoll wait error");
exit(-1);
}
handle_events(epollfd, events, ret, listenfd, buf);
}
close(epollfd); /* must close epoll fd */
}

static void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf) {
int fd;
for (int i = 0; i<num; i++) {
fd = events[i].data.fd;
if (fd == listenfd && (events[i].events & EPOLLIN))
handle_accept(epollfd, listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd, fd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, buf);
}
}

static void handle_accept(int epollfd, int listenfd) {
struct sockaddr_in cli_addr;
int cli_size = 0;
int clifd;
if ((clifd = accept(listenfd, (struct sockaddr *)&cli_addr, &cli_size)) < 0) {
perror("accept error");
} else {
add_event(epollfd, clifd, EPOLLIN);
}
}

static void do_read(int epollfd, int fd, char *buf) {
int nread = read(fd, buf, MAXSIZE);
if (nread == -1) {
perror("read error");
close(fd);
delete_event(epollfd, fd, EPOLLIN);
} else if (nread == 0) {
fprintf(stderr, "client close.\n");
close(fd);
delete_event(epollfd, fd, EPOLLIN);
} else {
printf("read message: %s", buf);
modify_event(epollfd, fd, EPOLLOUT);
}
}

static void do_write(int epollfd, int fd, char *buf) {
int nwrite = write(fd, buf, strlen(buf));
if (nwrite == -1) {
perror("write error");
close(fd);
delete_event(epollfd, fd, EPOLLOUT);
} else
modify_event(epollfd, fd, EPOLLIN);
memset(buf, 0, MAXSIZE);
}

static void add_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

static void delete_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

static void modify_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}

客户端

客户端代码我们也用epoll来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <strings.h>

#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/epoll.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8080
#define MAXSIZE 1024
#define LISTENQ 20
#define EPOLLEVENTS 100
#define FDSIZE 1024

static void handle_connection(int sockfd);
static void handle_events(int pollfd, struct epoll_event *events, int num, int sockfd, char *buf);
static void do_read(int epollfd, int fd, int sockfd, char *buf);
static void do_write(int epollfd, int fd, int sockfd, char *buf);
static void add_event(int epollfd, int fd, int state);
static void modify_event(int epollfd, int fd, int state);
static void delete_event(int epollfd, int fd, int state);

int conn_flag;

int main(void) {
int sockfd;
struct sockaddr_in serv_addr;

sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket create error");
exit(-1);
}

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
if (inet_pton(AF_INET, IPADDRESS, &serv_addr.sin_addr) != 1) {
perror("inet_pton error");
exit(-1);
}
connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));

handle_connection(sockfd);
close(sockfd);
return 0;
}

static void handle_connection(int sockfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
char buf[MAXSIZE];

epollfd = epoll_create(FDSIZE);
if (epollfd < 0) {
perror("create epoll error");
exit(-1);
}
add_event(epollfd, STDIN_FILENO, EPOLLIN);

int ret;
for (;;) {
ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
handle_events(epollfd, events, ret, sockfd, buf);
}
}

static void handle_events(int epollfd, struct epoll_event *events, int num, int sockfd, char *buf) {
int fd;
for (int i=0; i<num; i++) {
fd = events[i].data.fd;
if (events[i].events & EPOLLIN)
do_read(epollfd, fd, sockfd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, sockfd, buf);
}
}

static void do_read(int epollfd, int fd, int sockfd, char *buf) {
int nread = read(fd, buf, MAXSIZE);
if (nread < 0) {
perror("read error");
exit(-1);
} else if (nread == 0) {
fprintf(stderr, "server close.\nbye\n");
exit(-1);
} else {
if (fd == STDIN_FILENO) {
add_event(epollfd, sockfd, EPOLLOUT);
} else {
delete_event(epollfd, sockfd, EPOLLIN);
add_event(epollfd, STDOUT_FILENO, EPOLLOUT);
}
}
}

static void do_write(int epollfd, int fd, int sockfd, char *buf) {
int nwrite = write(fd, buf, strlen(buf));
if (nwrite < 0) {
perror("write error");
exit(-1);
} else {
if (fd == STDOUT_FILENO) {
delete_event(epollfd, fd, EPOLLOUT);
} else {
modify_event(epollfd, fd, EPOLLIN);
}
}
memset(buf, 0, MAXSIZE);
}

static void add_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

static void delete_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

static void modify_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}

See Also

文章内容和代码学习自IO多路复用之epoll总结

epoll manual

Zach的博客

互斥锁和条件变量

| 阅读次数

并发编程的问题

考虑这样一个问题,有两个线程,A和B,他们对同一个全局变量执行递减操作。假设C编译器将递减运算转换成3条机器指令:从内存装载到寄存器、递减寄存器、从寄存器存储到内存。可能会出现以下情形:

  1. 线程A运行,把变量的值装载到一个寄存器中
  2. 系统把运行线程从A切换到B运行。A的寄存器被保存,B的寄存器则恢复。
  3. 线程B执行递减的全部操作,把新值存放到变量的变量中
  4. 线程A被恢复执行,A的寄存器被恢复,于是A从原来离开的地方开始执行,此时寄存器中保留的变量值是线程B执行递减前的值,这时候就出错了。

可以看到这种并发的错误是因为递减操作不是原子操作而造成的,线程的切换会中断某些步骤,从而出现不预期的错误。

简单实例

我们通过一个简单的例子来说明如何利用互斥锁来解决并发。程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define NLOOP 10
#define MAXP 5

int counter;

void *doit(void *);

int main(int argc, char **argv) {
pthread_t tids[MAXP];
int error;

for (int i=0; i<MAXP; i++) {
int *iptr = malloc(sizeof(int));
if (iptr == NULL) {
perror("malloc error");
return -1;
}
*iptr = i;
if ((error = pthread_create(&tids[i], NULL, doit, iptr)) != 0) {
errno = error;
perror("pthread create error");
return -1;
}
}

for (int i=0; i<MAXP; i++) {
if ((error = pthread_join(tids[i], NULL)) != 0) {
errno = error;
perror("pthread join error");
return -1;
}
}

return 0;
}

void *doit(void *arg) {
int val;
int idx = *((int *)arg);
free(arg);

for (int i=0; i<NLOOP; i++) {
val = counter;
printf("%d: %d\n", idx, val+1);
counter = val+1;
}

return NULL;
}

部分结果如下:

1
...

2: 1
2: 2
2: 3
2: 4
2: 5
3: 5
3: 6
1: 1
1: 2
0: 1
0: 2
0: 3
0: 4
0: 5
0: 6
0: 7
0: 8
0: 9
0: 10

...

可以看到当线程切换时,counter的值明显是错误的。

解决这种多个线程共享一个变量的问题是使用互斥锁(mutex,mutual exclusion)保护这个共享变量。互斥锁的作用是在线程访问该变量前必须持有互斥锁,否则线程将进入睡眠知道互斥锁可用。对于能够函数原型如下:

1
2
3
4
5
6
7
#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mptr);

int pthread_mutex_unlock(pthread_mutex_t *mptr);

/* 成功返回0,否则返回正的Exxx值*/

修改之后的程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define NLOOP 10
#define MAXP 5

int counter;
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *doit(void *);

void Pthread_mutex_lock(pthread_mutex_t *mptr);
void Pthread_mutex_unlock(pthread_mutex_t *mptr);

int main(int argc, char **argv) {
pthread_t tids[MAXP];
int error;

for (int i=0; i<MAXP; i++) {
int *iptr = malloc(sizeof(int));
if (iptr == NULL) {
perror("malloc error");
return -1;
}
*iptr = i;
if ((error = pthread_create(&tids[i], NULL, doit, iptr)) != 0) {
errno = error;
perror("pthread create error");
return -1;
}
}

for (int i=0; i<MAXP; i++) {
if ((error = pthread_join(tids[i], NULL)) != 0) {
errno = error;
perror("pthread join error");
return -1;
}
}

return 0;
}

void *doit(void *arg) {
int val;
int idx = *((int *)arg);
free(arg);

for (int i=0; i<NLOOP; i++) {
Pthread_mutex_lock(&counter_mutex);
val = counter;
printf("%d: %d\n", idx, val+1);
counter = val+1;
Pthread_mutex_unlock(&counter_mutex);
}

return NULL;
}

void Pthread_mutex_lock(pthread_mutex_t *mptr) {
int error;

if ((error = pthread_mutex_lock(mptr)) != 0) {
errno = error;
perror("pthread mutex lock error");
exit(-1);
}
}

void Pthread_mutex_unlock(pthread_mutex_t *mptr) {
int error;

if ((error = pthread_mutex_unlock(mptr)) != 0) {
errno = error;
perror("pthread mutex unlock error");
exit(-1);
}
}

运行结果如下:

1
0: 1
4: 2
4: 3
4: 4
4: 5
4: 6
4: 7
4: 8
4: 9
0: 10
4: 11
4: 12
0: 13
2: 14
3: 15
3: 16
3: 17
3: 18
1: 19
1: 20
1: 21
1: 22
1: 23
1: 24
1: 25
1: 26
1: 27
1: 28
0: 29
0: 30
0: 31
0: 32
0: 33
0: 34
0: 35
3: 36
3: 37
3: 38
3: 39
3: 40
3: 41
2: 42
2: 43
2: 44
2: 45
2: 46
2: 47
2: 48
2: 49
2: 50

结果是正确的。

条件变量

互斥锁适合于防止同时访问某个共享变量,但是有的时候我们需要另外某种在等待某个条件发生期间让我们的进入睡眠的东西,这个就是条件变量。这里有一个例子,pthread函数库并没有等待任意一个线程终止的接口,那么如何实现呢?我们可以利用条件变量来实现这一功能。关于条件变量的函数接口如下:

1
2
3
4
5
6
7
#include <pthread.h>

int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);

int pthread_cond_signal(pthread_cond_t *cptr);

/* 成功返回0,否则返回正的Exxx值*/

先看完整的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>

#define NLOOP 10
#define MAXP 5

int counter;
int t_done;
int flag[MAXP];
int nleft;

pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t tdone_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t tdone_cond = PTHREAD_COND_INITIALIZER;

void *doit(void *);

void Pthread_mutex_lock(pthread_mutex_t *mptr);
void Pthread_mutex_unlock(pthread_mutex_t *mptr);
void Pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);
void Pthread_cond_signal(pthread_cond_t *cptr);
void Pthread_join(pthread_t tid, void **status);

int main(int argc, char **argv) {
pthread_t tids[MAXP];
int error;

memset(flag, 0, sizeof(flag));
nleft = MAXP;

for (int i=0; i<MAXP; i++) {
int *iptr = malloc(sizeof(int));
if (iptr == NULL) {
perror("malloc error");
return -1;
}
*iptr = i;

if ((error = pthread_create(&tids[i], NULL, doit, iptr)) != 0) {
errno = error;
perror("pthread create error");
return -1;
}
}

while (nleft > 0) {
Pthread_mutex_lock(&tdone_mutex);
while (t_done == 0)
Pthread_cond_wait(&tdone_cond, &tdone_mutex);
for (int i=0; i<MAXP; i++) {
if (flag[i] == 1) { /* thread i already done */
printf("thread %d done\n", i);
Pthread_join(tids[i], NULL);
flag[i] = -1;
nleft--;
}
}
Pthread_mutex_unlock(&tdone_mutex);
}
return 0;
}

void *doit(void *arg) {
int val;
int idx = *((int *)arg);
free(arg);

for (int i=0; i<NLOOP; i++) {
Pthread_mutex_lock(&counter_mutex);
val = counter;
printf("%d: %d\n", idx, val+1);
counter = val+1;
Pthread_mutex_unlock(&counter_mutex);
}

Pthread_mutex_lock(&tdone_mutex);
flag[idx] = 1;
t_done++;
Pthread_cond_signal(&tdone_cond);
Pthread_mutex_unlock(&tdone_mutex);

return NULL;
}

void Pthread_mutex_lock(pthread_mutex_t *mptr) {
int error;

if ((error = pthread_mutex_lock(mptr)) != 0) {
errno = error;
perror("pthread mutex lock error");
exit(-1);
}
}

void Pthread_mutex_unlock(pthread_mutex_t *mptr) {
int error;

if ((error = pthread_mutex_unlock(mptr)) != 0) {
errno = error;
perror("pthread mutex unlock error");
exit(-1);
}
}

void Pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr) {
int error;

if((error = pthread_cond_wait(cptr, mptr)) != 0) {
errno = error;
perror("pthread condition wait error");
exit(-1);
}
}

void Pthread_cond_signal(pthread_cond_t *cptr) {
int error;

if ((error = pthread_cond_signal(cptr)) != 0) {
errno = error;
perror("pthread condition signal error");
exit(-1);
}
}

void Pthread_join(pthread_t tid, void **status) {
int error;

if ((error = pthread_join(tid, status)) != 0) {
errno = error;
perror("pthread join error");
exit(-1);
}
}
  1. 首先我们新建一个全局变量t_done表示当前终止的线程数量。
  2. 创建一个tdone_cond条件变量和与之相关联的互斥锁,通过持有该互斥锁期间递增该计数器并发送信号到该条件变量,一个线程通知朱循环自身即将终止。
  3. 主循环在持有条件变量相关联的互斥锁期间检查t_done,如果发现无事可做,那么主线程调用pthread_cond_wait等待信号,该函数把调用线程投入睡眠并释放调用线程持有的互斥锁,当调用线程后来从pthread_cond_wait返回时,
    线程再次只有该互斥锁。
  4. 我们用一个flag标志来记录线程的状态,1表示线程已经终止,但未被另外一个线程调用pthread_join,-1表示已经被调用了pthread_join。

每一个条件变量都需要关联一个互斥锁,因为条件通常是线程之间共享的某个变量的值。允许不同线程设置和测试该变量要求有一个与该变量相关联的互斥锁。举个栗子,如果之前的代码我们没有互斥锁,那么主循环的就是如下的形式:

1
2
while (t_done == 0)
Pthread_cond_wait(&tdone_cond, &tdone_mutex);

有这样的可能:主线程外的一个线程在主循环测试t_done == 0之后但是在调用pthread_cond_wait之前调用递增了t_done,那么这个信号就会永远的地丢失了(pthread_cond_wait一定要在pthread_cond_signal前调用)。同样的理由要求
pthread_cond_wait被调用时,其所关联的互斥锁是必须上锁的,该函数作为单个原子操作解锁该互斥锁并把调用线程投入睡眠也是出于这个理由。如果函数不先解锁,到返回是再给它上锁,调用线程不得不实现解锁事后上锁,测试t_done的循环就变成了

1
2
3
4
5
6
Pthread_mutex_lock(&tdone_mutex);
while (t_done == 0) {
Pthread_mutex_unlock(&tdone_mutex);
Pthread_cond_wait(&tdone_cond, &tdone_mutex);
Pthread_mutex_lock(&tdone_mutex);
}

然而会有这种情况:在Pthread_mutext_unlock与Pthread_cond_wait之间有另外的一个线程递增了t_done,那么这个信号也就永远消失了。

最后要说明的是,pthread_cond_signal通常唤醒等待相应条件上的单个线程,有时候一个线程需要唤醒多个等待条件的线程,这时候可以调用pthread_cond_broadcast,函数原型如下:

1
2
3
4
5
6
#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cptr);
int pthread_cond_timedwait(pthread_cond_t *cptr, pthread_mutext_t *mptr, const struct timespec *abstime);

/* 成功返回0,否则返回正的Exxx值 */
Zach的博客

用线程处理客户请求

| 阅读次数

线程

传统的UNIX模型中,如果需要异步地完成一个任务,通常我们只要fork一个进程就可以了,但是fork进程存在以下两个问题:

  1. fork的代价是昂贵的。fork需要把父进程的内存映像复制到子进程,并在子进程中复制所有的描述符,虽然现在的实现是写时复制,但是fork一样是昂贵的。
  2. fork返回之后父子进程通信需要利用IPC机制,比较费力。

线程被称为lightweight process,同一进程可以创建多个线程,这些线程共享进程内的全局内存,这使得线程通信变得容易。同时,线程创建的代码大大小于进程创建。但是线程也存在同步的问题。

同一进程内的所有线程共享全局变量外,还共享:

  • 进程指令
  • 大多数数据
  • 打开的文件(描述符)
  • 信号处理函数和信号变量
  • 当前工作目录
  • 用户ID和组ID

每个线程也有各自的:

  • 线程ID
  • 寄存器集合,包括程序计数器和栈指针
  • 栈,存放局部变量和返回地址
  • errno
  • 信号掩码
  • 优先级

线程操纵函数

创建线程

函数原型为:

1
2
3
4
5
#include <pthread.h>

int pthread_create(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *), void *arg);

/× 成功返回0,否则返回正的Exxx值×/

进程内的线程有一个线程ID标志,由tid返回。每个线程有许多属性:优先级、初始栈大小、是否应该成为一个守护进程等,若attr为NULL,则取默认值。线程的执行逻辑由func函数指针指定,函数接收一个void指针的
参数,第四个参数就是我们传递的参数,如果需要传递多个参数,把参数打包进一个结构指针即可。

等待线程结束

利用pthread_join等待一个给定线程终止(是的,不能等待任意一个线程终止,如果需要,必须利用其他手段)。函数原型为:

1
2
3
4
5
#include <pthread.h>

int pthread_join(pthread_t *tid, void **status);

/* 成功返回0,否则返回正的Exxx值 */

status通常是创建线程时func函数的返回值。

线程终止

让一个线程终止的方法之一是调用pthread_exit,函数原型如下:

1
2
3
#include <pthread.h>

void pthread_exit(void *status);

status不能局限于要终止的线程的对象,因为当线程终止时,对象也将消失。如果该线程未曾脱离(下面会说到),它的线程ID和退出状态将一直留存到调用进程内的某个其他线程对它调用pthread_join。

让一个线程终止的另外的两个方法:

  • 启动线程的函数可以返回,其返回值就是相应线程的终止状态。
  • 如果进程的main函数返回或者任何线程调用了exit,整个进程就终止,其中包括它的任何线程。

pthread_self与pthread_detach函数

pthread_self返回进程内该线程的ID,pthread_detach后,线程像守护进程一样,当它们终止时,所有的资源都被释放,不需要某个其他进程调用pthread_join。函数原型如下:

1
2
3
4
5
#include <pthread.h>

pthread_t pthread_self(void);

int pthread_detach(pthread_t tid); /* 成功返回0,否则返回正的Exxx值 */

一般情况下,上述函数的调用情况是

1
pthread_detach(pthread_self());

以此,线程让自己脱离。

线程安全函数

当一个函数修改了全局变量,那么这个函数不是线程安全的,换句话说该函数不可重入。考虑以下的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main(int argc char **argv) {
int listenfd, connfd;

...

for (;;) {
len = addrlen;
connfd = Accept(listenfd, cliaddr, &len);
Pthread_create(&tid, NULL, doit, &connfd);
}
}
static void* doit(void *arg) {
int connfd;

connfd = *((int *)arg);
...
}

看起来上述代码是没有问题的,但是考虑这样一种情况:

  1. accept返回,主线程调用pthread_create创建一个新的线程
  2. 调度新创建的线程执行,但是并没有执行到给connfd赋值就被切换了。
  3. 另一个连接就绪,accept返回,返回的描述符存入connfd
  4. 先前切换的线程被调度执行,这时候它所得到的connfd却并不是第一个连接的描述符了。

POSIX要求许多函数是线程安全的,即它们都是可重入的,这个要求通过对我们透明的库函数内部执行某种形式的同步达到,线程安全函数见UNP P542。

线程特定数据

把一个未线程化的程序转换成使用线程的版本时,有时会碰到因其中有函数使用静态变量而引起的一个常见变成错误。解决这样的错误有许多方法,这里我们介绍使用线程特定数据的方法。

每个系统支持优先数量的线程特定数据,POSIX要求这个限制不小于128.系统为每个进程维护一个我们称之为Key的结构数据,如图所示:

key_tsd.png

flag表示该项是否被使用,析构函数用于线程终止时释放线程占用的数据内存,由编程人员指定。

这个Key是进程范围内的,即所有线程共享该数组。除此之外,系统还为每个线程维护特定于线程的Pthread结构,其部分内容是我们称之为pkey数据的128个元素的指针数组。如下所示:

pkey_tsd.png

pkey数组的所有元素都被初始化为空指针,这128个指针和进程内的128个可能的Key是逐一关联的。

举一个例子来说明线程特定的数据如何使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static pthread_key_t rl_key;
static pthread_once_t rl_once = PTHREAD_ONCE_INIT;

static void readline_destructor(void *ptr) {
free(ptr);
}

static void readline_once(void) {
Pthread_key_create(&rl_key, readline_destructor);
}

ssize_t thread_readline(int fd, void *vptr, size_t maxlen) {
...

pthread_once(&rl_once, readline_once);

if ((ptr = pthread_getspecific(rl_key)) == NULL) {
ptr = Malloc(...);
pthread_setspecific(rl_key, ptr);
}
...
/* use values pointed to by ptr */
}

当进程内第一个线程调用thread_readline时,一个Key被创建,同时注册对应的析构函数。之后进程内线程的状态如下:

thread_statues1.png

这时候如果有另外一个线程调用thread_readline,那么pthread_once之前执行过一次,Key已经创建(这两次析构函数是一样的),线程取该Key对应的内存指针,返回为空,于是线程创建一个新的内存区域。进程内线程的状态变为如下所示:

thread_statues2.png

可以看到每一个线程有特定于线程自身的缓冲区域,这样就不会发生同步的问题。

可以给出用到的函数原型:

1
2
3
4
5
6
7
8
9
#include d.h>

int pthread_once(pthread_once_t *onceptr, void (*init)(void));

int pthread_key_create(pthread_key_t *keyptr, void (*destructor) (void *));

void *pthread_getspecific(pthread_key_t key);

void *pthread_setspecific(pthread_key_t key, const void *value);

注意pthread_once_t类型的变量在传入pthread_once函数之前,要初始化为PTHREAD_ONCE_INIT。

使用线程特定数据的readline函数

这里给出一个具体的例子,该例子是修改过的readline函数,它是利用了线程的特定数据以做到线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include "unp.h"
#include "unpthread.h"

static pthread_key_t rl_key;
static pthread_once_t rl_once = PTHREAD_ONCE_INIT;

static void readline_destructor(void *ptr) {
free(ptr);
}

static void readline_once(void) {
Pthread_key_create(&rl_key, readline_destructor);
}

typedef struct {
int rl_cnt;
char *rl_bufptr;
char rl_buf[MAXLINE];
}Rline;

static ssize_t my_read(Rline *tsd, int fd, char *ptr) {
if (tsd -> rl_cnt <= 0) {
/* no data in buffer */
again:
if ((tsd -> rl_cnt = read(fd, tsd -> rl_buf, MAXLINE)) < 0) {
if (errno == EINTR)
goto again;
return -1;
}
if (tsd -> rl_cnt == 0) {
/* EOF */
return 0;
} else {
tsd -> rl_bufptr = tsd -> rl_buf;
}
}

tsd -> rl_cnt--;
*ptr = *tsd -> rl_bufptr++;
return 1;
}

ssize_t thread_readline(int fd, void *vptr, size_t maxlen) {
Rline *tsd;
size_t n, rc;
char c, *ptr;

Pthread_once(&rl_once, readline_once);

if ((tsd = pthread_getspecific(rl_key)) == NULL) {
/* alloc thread specific data */
tsd = Calloc(1, sizeof(Rline));
pthread_setspecific(rl_key, tsd);
}

ptr = vptr;
for (n = 1; n<maxlen; n++) {
if ((rc = my_read(tsd, fd, &c)) == 1) {
*ptr++ = c;
if (c == '\n')
break;
} else if (rc == 0) {
/* end of file */
*ptr = 0;
return n-1;
} else {
/* error */
return -1;
}
}

*ptr = 0;
return n;
}
Zach的博客

IPv4与IPv6的户操作性

| 阅读次数

IPv4映射的IPv6地址

IPv4映射的IPv6地址允许在因特网向IPv6过度时期让运行在双栈主机上的IPv6应用进程能够与只支持IPv4的主机通信。这些地址是IPv6应用进程查询某个只有IPv4地址的主机的IPv6地址时,DNS解析器按需自动创建的且不存在于任何DNS
数据文件中。在IPv6套接字上使用这种类型的地址导致往目的地IPv4主机发送IPv4数据报。

下图展示了一个IPv4映射的IPv6地址的格式:

mapped.png

IPv4客户与IPv6服务器

双栈主机的一个特性就是其上的IPv6服务器既可以处理IPv4客户,也可以处理IPv6客户。这是通过IPv4映射的IPv6地址实现的。下图是一个例子:

ipv4to6.png

IPv4客户发送一个IPv4的SYN分节以期和服务器建立连接。来自IPv4客户的SYN分节在以太网中表现为一个以太网首部、一个IPv4首部、一个TCP部以及TCP数据。以太网首部中包含的类型字段为0x0800,表示它是一个IPv4帧数据。

接收数据链路通过查看以太网类型字段把每个帧数据发送给对应的IP模块。IPv4模块结合其上的TCP模块检测到IPv4数据报的目的段端口对应一个IPv4套接字,于是该数据报Ipv4首部中的源IPv4地址被转换成IPv4映射的IPv6地址,当
accept系统调用把这个已经接收的IPv4客户连接返回被服务器进程的时候,这个映射后的地址作为客户的IPv6地址返回到服务器的IPv6套接字。该连接上的其余数据同样都是IPv4数据报。服务器进程完全不知道它是在与一个IPv4客户机通信。

IPv6客户与IPv4服务器

在一个双栈主机上运行一个IPv6客户程序,其于一个IPv4服务器程序交互的流程如下:

  1. 一个IPv4服务器在只支持IPv4的一个主机上启动后创建一个IPv4监听套接字。
  2. IPv6客户启动后调用getaddrinfo单纯查找IPv6地址(请求AF_INET6地址,hints结构中设置了AI_V4MAPPED标志),最后只有得到只支持IPv4的服务器主机的A记录,那么返回给客户的就是IPv4映射之后的IPv6地址。
  3. IPv6客户设置这个IPv4映射后的IPv6地址,调用connect,内核检测到这个映射的地址后自动发送一个IPv4的SYN分节到服务器。
  4. 服务器响应一个IPv4 SYN/ACK分节,连接于是用IPv4数据报建立。

以后在以太网上传递的是IPv4数据报。

Zach的博客

系统服务——daemon

| 阅读次数

daemon

什么是daemon

简单来说,系统为了某些功能必须提供一些服务,这个服务我们称之为service,但是service的提供需要进程的运行,所以实现这个service的程序我们称之为daemon。我们不必
区分daemon与service,或者说可以将他们视为等同的。因为没有daemon在后台运行就不会有这个serivce。daemon没有控制终端,所以当有事发生时,它们需要有消息输出的方法可用。
syslog函数是输出这些消息的标准方法,它把这些消息发送给syslogd守护进程。

syslogd的函数原型为:

1
2
3
#include <syslog.h>

void syslog(int priority, const char *message, ...);

priority由level和facility组成,具体可以查看UNP P288。

daemon的主要分类

如果按照启动与管理方式分类:

  • stand alone:这种类型的daemon不必通过其他机制来管理,可以自行启动,一旦启动就常驻内存。其最大的优点是由于一直在内存内持续的提供服务,因此对于客户的请求响应较快。常见的
    stand alone的daemon有ftp、httpd等。
  • super daemon:一个特殊的daemon来统一管理其他的daemon。这一种服务的启动方式通过同一个daemon来负责唤起服务。这个特殊的daemon被称为super daemon。早期的super daemon是inetd,现在
    Linux下是xinetd。当客户没有请求时,对应的服务未启动,只有当客户有对应的请求来到时,super daemon才会唤醒相应的服务,当请求完成之后,被唤醒的这个服务也会关闭并释放资源。该机制的优点在于:1)super daemon
    可以具有安全管控机制;2)服务在请求结束后就关闭,不会一直占用资源。缺点在于对于请求的响应较慢。

如果按照工作形态分类:

  • signal-control:这种daemon通过信号来管理,只要有任何客户端的请求进来,它就会立即启动取处理。
  • interval-control:这种daemon每隔一段时间就主动取执行某项工作。

将一个程序作为daemon运行

Linux提供了daemon函数将一个普通进程转变为守护进程运行。它的原理和这里给出的daemon_init函数大同小异。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int daemon_init(const char *pname, int facility) {
pid_t pid;

if ((pid = Fork()) < 0)
return -1;
else if (pid > 0)
_exit(0);

/* child continues */

/* become session leader */
if (setsid() < 0)
return -1;

/* 必须忽略SIGHUP,否则当会话头进程终止时,会话中的所有进程都收到SIGHUP信号 */
Signal(SIGHUP, SIG_IGN);
/* 再次Fork,使得进程不再是会话头 */
if ((pid = Fork()) < 0)
return -1;
else if (pid)
_exit(0);

daemon_proc = 1;

chdir("/");

for (int i=0; i<MAXFD; i++)
close(i);

/* redirec stdin, stdout and stderr to '/dev/null' */
open("/dev/null", O_RDONLY);
open("/dev/null", O_RDWR);
open("/dev/null", O_RDWR);

openlog(pname, LOG_PID, facility);

return 0;
}
  1. 首先是fork,在fork结束之后,父进程终止,子进程自动在后台运行。另外,由于子进程继承了父进程的进程组ID,这保证了子进程不是一个进程组的头进程,这为下面的setsid做了准备。
  2. setsid,其用于创建一个新的会话,当前进程变为新的会话的会话头进程以及新进程组的进程组头进程,从而不再有控制终端。
  3. 忽略SIGHUP信号并再次fork,再次fork保证本守护进程不会是会话头进程,那么即使将来打开了一个终端设备,也不会自动获得控制终端。因为当没有控制终端的会话头进程打开一个终端设备时,该终端会自动成为
    这个会话头进程的控制终端。忽略SIGHUP信号是因为当会话头进程终止时,其会话中的所有进程(再次fork产生的子进程)都收到SIGHUP信号。
  4. 将stdin、stdout以及stderr重定向到/dev/null,打开这些描述符的理由在于,守护进程调用的那些假设能从这三个描述符读写的库函数不会因为这些描述符未打开而失败。
  5. 调用syslog处理函数。

既然守护进程在没有控制终端的环境下运行,那么它绝对不会收到来自内核的SIGHUP信号,许多守护进程因此把这个信号作为来自系统管理员的一个通知,表示其配置文件已经发生变化,守护进程应该重新读入配置文件。

Linux下daemon的启动脚本和启动方式

配置文件位置

通常daemon的启动脚本放在/etc/init.d/目录下,启动脚本可以进程环境检测、配置文件分析、PID文件放置以及相关重要交换文件的锁操作。

super daemon的配置文件放置在/etc/xinetd.d/目录和/etc/xinetd.conf中。

在/etc/目录下还有各自服务的配置文件

/var/lib/目录下是一些会产生数据的服务放置产生的数据库的位置,数据库管理系统MySQL的数据库默认写入/var/lib/mysql。

/var/run/目录下存放各服务程序的PID记录。

对于stand alone的daemon,我们即可执行/etc/init.d/目录下的脚本来启动它,

1
➜  ~ /etc/init.d/mysql start

也可以利用service命令(实际上它也是一个stand alone的服务)

1
➜  ~ service mysql start

super daemon配置文件

Linux下super daemon由xinetd这个进程实现,它不仅可以启动其他daemon,还可以进行安全性或者其他管理机制的控制。xinetd的默认配置存放在/etc/xinetd.conf中,由xinetd启动的服务程序的配置放置在/etc/xinetd.d目录下。
如果一个服务的配置文件没有xinetd.conf中指定的参数,那么该服务对应的参数就以xinetd.conf中的为准。具体的参数说明可以参加《鸟哥——基础篇》P559。一个简单的daytime服务配置如下:

1
service mydaytime
{
        disable         = no
        socket_type     = stream
        wait            = no
        user            = root
        server          = /home/zach41/Desktop/unp/unpv13e/inetd/daytimetcpsrv3
        log_on_failure  = USERID
}

这之后我们还需要修改/etc/services文件,分配一个端口来提供daytime服务。具体就是添加如下一行:

1
mydaytime    9999/tcp

这样配置之后,我们重启xinetd服务

1
➜  ~ service xinetd restart
➜  ~ sudo netstat -tnlp | grep 9999
[sudo] password for zach41: 
tcp        0      0 0.0.0.0:9999            0.0.0.0:*               LISTEN      15914/xinetd

可以看到xinetd进程在监听9999端口。

我们运行一个daytime客户端程序

1
➜  names git:(master) ✗ ./daytimetcpcli2 0.0.0.0 9999
trying 0.0.0.0:9999
Mon Sep 19 18:43:04 2016

daytime daemon程序

最后这里给出daytimeserver的代码,这是由xinetd作为守护进程启动的时间获取服务器程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include	"unp.h"
#include <time.h>

int
main(int argc, char **argv)
{

socklen_t len;
struct sockaddr *cliaddr;
char buff[MAXLINE];
time_t ticks;

daemon_inetd(argv[0], 0);

cliaddr = Malloc(sizeof(struct sockaddr_storage));
len = sizeof(struct sockaddr_storage);
Getpeername(0, cliaddr, &len);
err_msg("connection from %s", Sock_ntop(cliaddr, len));

ticks = time(NULL);
snprintf(buff, sizeof(buff), "%.24s\r\n", ctime(&ticks));
Write(0, buff, strlen(buff));

Close(0); /* close TCP connection */
exit(0);
}

可以看到所有套接字的创建代码(tcp_listen和accept的调用)都不见了,这些步骤都有xinetd执行,我们使用描述符0指代已由xinetd接受的TCP连接(套接字描述符被复制到描述符0, 1, 2)。daemon_init只是负责设置daemon_proc
标志以及调用openlog,从而发送日志信息给syslogd守护进程。

Zach的博客

套接字选项

| 阅读次数

修改套接字选项

有几种方法来修改和获取套接字的选项:

  • getsockopt & setsockopt
  • fcntl
  • ioctl

getsockopt & setsockopt

这两个函数仅仅用于套接字。函数原型如下:

1
2
3
4
5
#include <sys/socket.h>

int getsockopt(int sockfd, int level, int optname, void *optval, socklen_t *optlen);

int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);

sockfd必须指向一个已经打开的套接字,level指向系统中解释选项的代码或为通用套接字代码,或为某个特定于协议的代码(如TCP、IPv4、IPv6或SCTP)。

optval指向某个变量,变量的大小由最后一个参数optlen指定,setsockopt从optval中取得待设置的新值,而getsockopt把获取到的值存入对应的地址中。

套接字选项粗分为两大基本类型:

  • 启用或禁止某个特性的标志选项(flag = 0禁止特性,flag = 1开启特性)
  • 取得并返回我们可以设置或检查的特定值选项,即值选项。

套接字选项可以参见UNP的P151

通用套接字选项(部分)

这里仅仅列出部分的通用套接字选项,其实也就是挑了一些自己看得懂的选项记录下。

SO_BROADCAST

开启或禁止进程发送广播消息的能力。只有数据报支持广播,并且还是在支持广播消息的网络上(以太网、令牌网络等)。

SO_DEBUG

仅由TCP支持,当给一个TCP套接字开启本选项时,内核将为TCP在该套接字发送和接收的所有分组保留详细的跟踪信息,这些信息保存在内核的某个环形缓冲区中,并可使用trpt程序进行检查。

SO_ERROR

当一个套接字上发生错误时,内核将套接字的名为so_error的变量设置为标准的Unix Exxx值中的一个,它被成为该套接字的待处理错误(pending error),内核以以下方式通知进程这个错误:

  1. 如果进程阻塞在套接字的select调用上,那么无论是检查可读条件还是可读可写条件,select均返回并设置其中一个或所有的条件。
  2. 如果进程使用信号驱动I/O模型,那么进程或进程组就会接收到内核产生的SIGIO信息。

进程在被内核通知之后,可以通过访问SO_ERROR套接字选项获得so_error的值,由getsockopt返回的整数即待处理错误,随后so_error由内核复位成0。

这个套接字选项可以获取,但是不能设置。

SO_KEEPALIVE

给一个套接字设置保持存活(keep-alive)选项后,如果2小时内在该套接字的任意一个方向上没有数据交换,那么TCP就会自动给对端发送一个保持存活的探测分节(keep-alive probe),这是一个对端必须响应的TCP分节,它
会导致以下三种情况之一:

  1. 对端以期望的ACK响应,一切正常
  2. 对端以RST响应,对端已经崩溃并重启,套接字的待处理错误被置为ECONNRESET,套接字本身被关闭。
  3. 对端对保持存活的探测分节没有响应,TCP间隔一段时间再次发送探测分节,多次之后若还是没有响应,则放弃套接字的待处理错误被置为ETIMEOUT,如果收到一个ICMP错误作为某个探测分节的响应,就返回相应的错误,套接字本身被关闭。

SO_LINGER

指定close函数对面向连接的协议(TCP和SCTP)如何操作。默认的行为是立即返回,如果这时候发送缓冲区有数据残留,系统会尝试把数据发送给对方。选项要求用户进程和内核间传递如下结构:

1
2
3
4
struct linger {
int l_onoff;
int l_linger;
}
  1. 如果l_onoff = 0,l_linger被忽略,选项关闭
  2. l_onoff非0,
    • l_linger = 0,那么TCP丢弃发送缓冲区内的任何数据,并发送一个RST给对端,不会再有四次挥手。
    • l_linger不为0,进程被阻塞,知道所有发送缓冲区的数据都发送完毕并被对方确认,或者是超过了l_linger指定的时间,如果超时,close返回EWOULDBLOCK错误。

假设客户在发送玩数据后调用了close函数,close可能在服务器读套接字接收缓冲区中的剩余数据之前就返回,在应用程序读数据之前服务器可能就会崩溃,而客户进程永远不会知道。
如果设置了SO_LINGER套接字,那么在调用close函数后,应用进程阻塞一段时间,等待数据全部发送并被对方确认(如图所示)。但是这里还有一个问题,延滞的时间可能不够,close仍然会返回EWOULDBLOCK错误,而且close的成功返回只是告诉我们先前发送的数据已经由对方确认,并不能告诉我们应用进程是否读取了数据。

so_linger.png

让客户进程知道服务器已经读取数据的一个方法是改用shutdown(设置SHUT_WR),改用之后的流程如下图:

shutdown_linger.png

另外一个方法是应用程序自己做确认。客户在向服务器发送完数据后,调用read来读取一个字节

1
2
3
4
char ack;

Write(sockfd, data, nbytes);
n = read(sockfd, &ack, 1);

服务器读取来自客户端的数据后发回一个字节的ACK。

1
2
3
nbytes = Read(sockfd, buff, sizeof(buff));

Write(sockfd, "", 1);

SO_RCVBUF & SO_SNDBUF

每一个套接字有一个发送和接收缓冲区,可以利用SO_RCVBUF和SO_SNDBUF选项来修改默认的缓冲区大小。需要注意的是,由于TCP的窗口规模是在建立连接时用SYN分节与对端呼唤a的都的。对于客户端,意味着
SO_RCVBUF必须在connect调用之前设置;对于服务端,该选项必须在调用Listen之前设置。

SO_RCVLOWAT & SO_SNDLOWAT

每一个套接字还有一个接收和发送低水位标志,由select使用,用SO_RCVLOWAT和SO_SNDLOWAT可以修改低水位标志的值。

接收低水位标志是让select返回可读时套接字接收缓冲区所需要的最少的数据量。
发送低水位标志是让select返回可写是套接字发送缓冲区可存入的最少数据量。

两个低水位的默认值都为1

SO_RCVTIMEO & SO_SNDTIMEO

这两个选项允许我们给套接字的接收和发送设置一个超时值。

Zach的博客

Linux文件与目录的默认、隐藏和特殊权限

| 阅读次数

默认权限

在Linux下创建一个文件或者目录的默认权限和umask相关,umask指定“当前用户在创建文件或者目录时的权限默认值”。用umask来查看文件或目录的默认权限值:

1
➜  ~ umask
002

需要注意的是umask指定文件或目录的默认值应该减去的数值。而对于一个文件,默认值是“-rw-rw-rw”,对一个目录,默认值是“drwxrwxrwx”,举例来说,如果当前用户创建了一个文件,那么文件的权限默认值就是

1
(-rw-rw-rw-) - (--------w-) => -rw-rw-r--

隐藏属性

可以利用chattr和lsattr来修改和查看文件或目录的隐藏属性。需要注意的是这两个命令只在Ext2/Ext3文件系统上有效。

命令使用方法:

1
chattr [+-=] [ASacdistu] files...

这里只说明两个参数,其余可以参见manual

  • i: 让一个文件不能被删除、改名、设置连接、写入或添加数据。只有root才能设置该属性。
  • a: 设置a之后,这个文件只能添加数据,而不能删除也不能修改数据,只有root才能设置该属性。

lsattr查看隐藏属性,用法简单,可以直接查看manual

文件特殊权限

Linux有三个特殊权限:

  • SUID
  • SGID
  • SBIT

SUID

当s标志出现在文件所有者的x权限上时,此时就被成为SET UID,即SUID的特殊权限。

SUID有这样的功能和限制:

  1. 仅仅对二进制程序有效
  2. 执行者对该程序具有科执行的权限
  3. 本权限仅仅在执行过程中有效
  4. 执行者将拥有该程序所有者的权限(仅在执行过程中)

举例来说:

1
➜  ~ ls -l /usr/bin/passwd 
-rwsr-xr-x 1 root root 54256 3月  29 17:25 /usr/bin/passwd

passwd是一个二进制程序,它的所有者是root,但是当我们用用户身份登入时一样可以执行该程序并修改自己的密码(这回修改/etc/shadow文件)。这是因为passwd拥有SUID权限。

SGID

当s标志在用户组的x时,成为SET GID,即SGID。SGID可以作用于一个文件,也可以作用于一个目录。

当作用于一个文件时:

  1. SGID对二进制程序有效
  2. 程序执行者对该程序有执行的权限
  3. 执行者在执行的过程中可以得到程序所在用户组的权限。

当作用于一个目录时:

  1. 用户若对此目录有r和x的权限,该用户能够进入此目录(前提)
  2. 用户在目录下的有效用户组将会变成该目录的用户组
  3. 若用户在此目录下具有w的权限,则用户所创建的文件将与此目录的用户组相同。

SBIT

SBIT(Sticky Bit)只对目录有效。它对目录的作用是:

  1. 当用户对此目录有w和x的权限,即具有写入权限时(前提)
  2. 当用户在该目录下创建文件或目录时,仅自己与root才有权限删除该文件。

如果一个目录D有SBIT权限,当用户A在D下只能对自己创建的文件或目录进程删除、重命名、移动等操作,而无法删除他人的文件。

Zach的博客

UIStackView初探

| 阅读次数

UIStackView

iOS9引入了UIStackView,Apple文档对UIStackView的定义如下:

1
The UIStackView class provides a streamlined interface for laying out a collection of views in either a column or a row. 
Stack views let you leverage the power of Auto Layout, creating user interfaces that can dynamically adapt to the device’s 
screen size, and any changes in the available space

即,利用UIStackView我们可以很方便地垂直或水平排列多个subview。

配置UIStackView

UIStackView有以下几个配置选项让我们来配置它:

  1. Axis:表示水平排列还是垂直排列
  2. Alignment:控制subview的对齐方式
  3. Distribution:控制subview的分布方式
  4. Spacing:subview间的最小距离

Alignment因Axis而异,用图示说明更加形象。

水平排列

align_h.png

垂直排列

align_v1.png align_v2.png

注意

UIStackView会被当成一个Container View,它不会像其他UIView一样被渲染,所以类似与设置背景或者是重载drawRect方法都是没有任何效果的。

管理Subview

UIStackView有两个属性

  • arrangedSubviews
  • subviews

如果我们想添加一个subview让StackView来管理,只要调用addArrangedSubview(_ view: UIView)或者insertArrangedSubview(_ view: UIView, at stackIndex: Int)即可,但是如果要删除一个被StackView管理的subview,那么我们需要主注意到
removeArrangedSubview(_ view: UIView)函数只是让对应的subview不让StackView来管理它的约束,它并没有从当前的视图层级从移除,如果要移除视图,我们需要显示调用removeFromSuperview()。

简单的例子

参考

  • iOS 9: Getting Started with UIStackView
  • UIStackView-Tutorial-Introducing-Stack-Views
Zach的博客

TCP连接建立和终止

| 阅读次数

TCP连接的建立和终止

TCP连接建立

建立一个TCP连接的过程图示如下(图片均来源于网络…):

shakehand3.jpg
  1. 服务器在连接建立之前必须准备好接收客户发来的连接,这一般通过创建套接字,绑定服务器地址、监听套接字来完成,即socket,bind,listen三个函数。这一个过程被成为被动打开。
  2. 客户通过connect函数发起主动打开,客户会发送一个SYN分节,它告诉服务器客户将在连接中发送的数据的初始序列号,通常该分节不带数据,其所在IP数据只含有一个IP首部、一个TCP首部以及可能的TCP选项。
  3. 服务器收到客户发来的SYN分节后,服务器必须确认该分节,同时服务器发送一个自己的SYN分节,它含有服务器将在该连接中发送的数据的初始序列号。
  4. 客户收到服务器发回的消息后,确认服务器的分节,连接建立。

从图中可以看到ACK是SYN分节的序列号加1,类似的,FIN分节的ACK也是FIN分节的序列号加1。

TCP连接终止

TCP终止一个连接需要4个分节。

终止一个连接的图示如下:

shakehand4.jpg
  1. 某个应用首先调用close,该端主动关闭,发送一个FIN分节
  2. 接收到FIN分节的对端执行被动关闭,这个FIN由TCP确认,FIN分节的接收也作为一个文件结束符(EOF)传递给接收端应用进程。
  3. 一段时间之后,接收到这个文件结束符的引用进程将调用close关闭它的套接字,这会导致它也发送一个FIN分节。
  4. 接收这个最终FIN的对端发送一个ACK确认这个FIN分节。

连接和终止

一个完整的连接和终止的过程示意:

full_process.jpg

一个完整的TCP状态转换图:

state.jpg

TIME_WAIT

TCP终止中,执行了主动关闭的那端在接收到对端发来的FIN分节和发送对应的ACK之后就进入了TIME_WAIT状态,该状态的持续时间是2个MSL(maximum segment lifetime)。它的存在有两个理由:

  1. 可靠地实现TCP全双工连接的终止
  2. 允许老的重复分节在网络中消逝

对于第一个理由,如果最后的ACK丢失了,那么服务器需要重发FIN,因此客户必须维护状态信息,以允许它发送最终的那个ACK,如果客户不维护状态信息,那么客户在收到服务器重传的FIN后将直接相应RST,这个分节将会被服务器解释为一个错误。

对于第二个理由,假设在一个连接被终止了马上又发起来一个相同IP和端口上的连接,后一个连接成为前一个连接的化身,TCP必须防止老的连接的分组在老连接结束之后再现,以防止其被误解为新的连接的分组。TCP不会让处于TIME_WAIT状态的连接发起新的化身,既然TIME_WAIT的时间是MSL的2倍,这就足以让某个方向上的分组最多存活MSL时间即被丢弃,另外一个方向上的应答最多存活MSL时间也被丢弃。通过这种方式,老的重复分组在网络中都已经消逝。

Zach的博客

I/O模型

| 阅读次数

I/O模型的种类

Unix环境下可用的5种I/O模型分别为:

  • 阻塞式I/Ob
  • 非阻塞式I/O
  • I/O复用(select和poll)
  • 信号驱动I/O(SIGIO)
  • 异步I/O(POSIX的aio_系列函数)

一个输入操作的通常包括两个步骤:

  1. 等待数据准备好
  2. 从内核向进程复制数据

阻塞式I/O模型

io_block.png

在图示中,进程调用recvfrom,其系统调用直到数据到达且被复制到应用进程的缓冲区中或者发生错误才返回,最常见的错误就是被信号中断。

非阻塞I/O模型

io_nonblock.png

从图示中我们可以看出,当recvfrom没有数据可返回时,内核立即返回一个EWOULDBLOCK错误;如果有数据准备好了,那么s内核开始将数据复制到应用进程缓冲区,于是recvfrom成功返回。

这样一种对一个非阻塞描述符循环调用recvfrom的方式,我们称之为polling。

I/O复用模型

io_multiplexing.png

I/O复用模型中,我们的进程阻塞于select或者poll函数,而不是阻塞在真正的I/O系统调用上,当某一个描述符数据准备好时,我们的进程被唤醒,从而可以处理数据,通过I/O复用模型,我们可以让应用进程等待多个I/O的操作完成,而不是单单阻塞于一个I/O操作。

信号驱动的I/O模型

io_signal.png

信号驱动模型在数据就绪时通过发送SIGIO通知应用进程,应用进程收到信号后可以在信号处理函数中调用recvfrom读取数据。这种模式的优势在于等待数据报到达期间,进程不会被阻塞,主循环可继续执行。但是在内核复制数据到应用进程缓冲区期间,应用进程被阻塞。

异步I/O模型

POSIX定义同步和异步I/O操作如下:

  1. 同步I/O操作:导致请求进程阻塞,直到I/O操作完成(前面四个模型按照该定义都是同步I/O)
  2. 异步I/O操作:不导致请求进程阻塞。

异步I/O由POSIX规范定义,它告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到应用进程的缓冲区)完成后通知我们。它于信号驱动的I/O模型相比,区别在于异步模型由内核通知我们什么时候I/O操作完成,而信号驱动模型由内核通知我们什么时候可以启动一个I/O操作。

select函数

select函数允许进程指示等到多个时间中的任何一个发生,并只在有一个或者多个时间发生或经历一段指定时间后才唤醒它。

函数的原型为:

1
2
3
4
#include <sys/select.h>
#include <sys/time.h>

int select(int maxfd1, fd_set *readset, fd_set *writeset, fd_set *excepset, const struct timeval *timeout);

中断三个参数readset,writeset,exceptset指定要让内核测试读、写和异常条件的描述符。maxfd1参数指定待测试的描述符的个数,它的值是待测试描述符的最大描述符加1。在函数返回后,描述符集内任何与未就绪描述符对应的位都会被清为0,为此,每次重新调用select函数时,我们都得再次把所有需要测试的描述符对应位置位。

一般来说,为了提升性能而引入缓冲机制会增加网络应用程序的复杂性。比如用fgets读取文本行,这转而会使得一可用的文本行被读入到stdio缓冲区中,然而fgets只返回1行,其余仍然在缓冲区中,当fgets完成任务后,select函数会被再次调用以等待新的工作,它不管stdio缓冲区中是否还有数据,究其原因是select不知道stdio使用了缓冲区,它只是从read系统调用的角度指出是否有数据可用。所以在混合使用stdio和select时需要格外小心。

shutdown函数

终止网络连接的通常方法是调用close函数,但是close函数有两个限制:

  1. close把描述符的引用计数见减1,仅仅在计数变为0时关闭套接字。
  2. close终止读和写两个方向的数据传送。

利用shutdown函数,可以避免这两个限制,shutdown函数可以不管引用计数就激发TCP的正常终止序列,也可以在我们发送完数据后,只关闭写半部,仍然等待远端数据接收(反之亦可)。

函数原型为:

1
2
3
#include <sys/socket.h>

int shutdown(int sockfd, int howto)

函数的行为依赖于howto参数:

  • SHUT_RD:关闭读半部,套接字中不再有数据可以接收,而且套接字接收缓冲区中的现有数据都被丢弃。进程不能再对这样的套接字调用任何读函数,对一个TCP套接字这样调用shutdown函数后,由该套接字接收的来自对端的任何数据都被确认,然后悄然丢弃。
  • SHUT_WR:关闭写半部,对于TCP套接字这称为半关闭。当前留在套接字发送缓冲区中的数据将被发送掉,之后发送TCP的FIN分节,不管套接字描述符引用是否为0,写半部关闭都会执行,进程不能再对这样的套接字调用任何写函数。
  • SHUT_RDWR:连接的读半部和写半部都关闭,这和调用两次shutdown等效。

一个I/O复用的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void select_strcli(FILE *fp, int sockfd) {
int maxfdp1, stdineof, n;
fd_set rset;
char buf[MAXLINE];

stdineof = 0;
FD_ZERO(&rset);
for (;;) {
/* 每次select返回后,任何未就绪的描述符清0,所以每次都要置位*/
if (stdineof == 0)
FD_SET(fileno(fp), &rset);
FD_SET(sockfd, &rset);
maxfdp1 = max(fileno(fp), sockfd) + 1;

n = select(maxfdp1, &rset, NULL, NULL, NULL);
if (n < 0)
err_sys("select error");

if (FD_ISSET(sockfd, &rset)) {
if ((n = Read(sockfd, buf, MAXLINE)) == 0) {
if (stdineof == 1)
return; /* terminated normally */
else
err_quit("str_cli: server terminated prematurelly");
}
Write(fileno(stdout), buf, n);
}

if (FD_ISSET(fileno(fp), &rset)) {
if ((n = Read(fileno(fp), buf, MAXLINE)) == 0) {
stdineof = 1;
Shutdown(sockfd, SHUT_WR); /* send FIN, still can read */
FD_CLR(fileno(fp), &rset);
continue;
}
Writen(sockfd, buf, n);
}
}
}
1234
Zach

Zach

38 日志
17 分类
47 标签
© 2017 Zach