Zach的博客

epoll入门

epoll

epoll是linux内核的可扩展I/O机制,旨在替代POSIX的selectpoll函数,让需要大量文件操作符的程序拥有更佳的性能。

epoll接口

1
2
3
4
5
6
7
#include <sys/epoll.h>

int epoll_create(int size);

int epoll_ctl(int epollfd, int op, int fd, struct epoll_event *ev);

int epoll_wait(int epollfd, struct epoll_event *events, int maxevents, int timeout);
  • epoll_create创建一个epoll的句柄,参数size告知内核这个epoll需要监听的I/O事件的个数。函数返回一个描述符,在使用完epoll以后,我们需要手动关闭这个描述符,否则可能导致描述符耗尽。
  • epoll_ctl用来操纵epoll所监听的事件,参数op表示这一次操作,其值:

    • EPOLL_CTL_ADD:注册新的事件到epoll中
    • EPOLL_CTL_DEL:从epoll中删除一个事件
    • EPOLL_CTL_MOD:修改之前注册的一个事件

    epoll_event是对应的描述符的事件的数据结构,其结构如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    typedef union epoll_data {
    void *ptr;
    int fd;
    uint32_t u32;
    uint64_t u64;
    } epoll_data_t;

    struct epoll_event {
    uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    };

    epoll_event中events是以下几个值的按位或的集合:

    • EPOLLIN:对应的描述符可读
    • EPOLLOUT:对应的描述符可写
    • EPOLLERR:对应的描述符发生错误
    • EPOLLRDHUP:TCP套接字对端被关闭或者用shutdown函数关闭了写半部。
    • EPOLLPRI:有紧急数据可读
    • EPOLLHUP:对应的文件描述符被挂断
    • EPOLLET:将EPOLL设为边缘触发
    • EPOLLONESHOT:只监听一次事件,当事件发生之后如果还要监听则需要再次把事件注册入队列。

    epoll的工作模式有两种:

    1. ET(edge trigger)模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理,如果不处理,下次调用epoll_wait时,不会再次向应用程序通知此事件。
    2. LT(level trigger)模式:当epoll_wait检测到描述符事件发生并通知此事件时,应用程序不需要立即处理,下次调用epoll_wait时,会再次通知此事件。

    ET模式很大程序上减少了epoll事件被重复触发的次数,因此效率较LT模式高。epoll工作在ET模式时必须使用非阻塞接口,以避免一个阻塞读/写操作把处理多个文件描述符的任务饿死。

  • epoll_wait等待事件的发生。maxevents参数告诉内核这次返回的事件最多有多少个,返回的事件存放在events参数对应的数组中,timeoout指定超时事件,若为-1则永久阻塞。

一个echo server/client的例子

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <strings.h>

#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <arpa/inet.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8080
#define MAXSIZE 1024
#define LISTENQ 20
#define EPOLLEVENTS 100
#define FDSIZE 1024

static int socket_bind(const char* ip, int port);
static void do_epoll(int listenfd);
static void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf);
static void handle_accept(int epollfd, int listenfd);
static void do_read(int epollfd, int fd, char *buf);
static void do_write(int epollfd, int fd, char *buf);
static void add_event(int epollfd, int fd, int state);
static void modify_event(int epollfd, int fd, int state);
static void delete_event(int epollfd, int fd, int state);

int main(void) {
int listenfd = socket_bind(IPADDRESS, PORT);
listen(listenfd, LISTENQ);
do_epoll(listenfd);
return 0;
}

static int socket_bind(const char *ip, int port) {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
perror("socket create error");
exit(-1);
}

struct sockaddr_in server_addr;
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

if (bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind socket error");
exit(-1);
}
return listenfd;
}

static void do_epoll(int listenfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
char buf[MAXSIZE];
memset(buf, 0, sizeof(buf));

epollfd = epoll_create(FDSIZE); /* create a epoll fd which can handles FDSIZE fds */
if (epollfd < 0) {
perror("create epoll error");
exit(-1);
}
add_event(epollfd, listenfd, EPOLLIN);

int ret;
for (;;) {
ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
if (ret < 0) {
perror("epoll wait error");
exit(-1);
}
handle_events(epollfd, events, ret, listenfd, buf);
}
close(epollfd); /* must close epoll fd */
}

static void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf) {
int fd;
for (int i = 0; i<num; i++) {
fd = events[i].data.fd;
if (fd == listenfd && (events[i].events & EPOLLIN))
handle_accept(epollfd, listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd, fd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, buf);
}
}

static void handle_accept(int epollfd, int listenfd) {
struct sockaddr_in cli_addr;
int cli_size = 0;
int clifd;
if ((clifd = accept(listenfd, (struct sockaddr *)&cli_addr, &cli_size)) < 0) {
perror("accept error");
} else {
add_event(epollfd, clifd, EPOLLIN);
}
}

static void do_read(int epollfd, int fd, char *buf) {
int nread = read(fd, buf, MAXSIZE);
if (nread == -1) {
perror("read error");
close(fd);
delete_event(epollfd, fd, EPOLLIN);
} else if (nread == 0) {
fprintf(stderr, "client close.\n");
close(fd);
delete_event(epollfd, fd, EPOLLIN);
} else {
printf("read message: %s", buf);
modify_event(epollfd, fd, EPOLLOUT);
}
}

static void do_write(int epollfd, int fd, char *buf) {
int nwrite = write(fd, buf, strlen(buf));
if (nwrite == -1) {
perror("write error");
close(fd);
delete_event(epollfd, fd, EPOLLOUT);
} else
modify_event(epollfd, fd, EPOLLIN);
memset(buf, 0, MAXSIZE);
}

static void add_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

static void delete_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

static void modify_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}

客户端

客户端代码我们也用epoll来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <strings.h>

#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/epoll.h>

#define IPADDRESS "127.0.0.1"
#define PORT 8080
#define MAXSIZE 1024
#define LISTENQ 20
#define EPOLLEVENTS 100
#define FDSIZE 1024

static void handle_connection(int sockfd);
static void handle_events(int pollfd, struct epoll_event *events, int num, int sockfd, char *buf);
static void do_read(int epollfd, int fd, int sockfd, char *buf);
static void do_write(int epollfd, int fd, int sockfd, char *buf);
static void add_event(int epollfd, int fd, int state);
static void modify_event(int epollfd, int fd, int state);
static void delete_event(int epollfd, int fd, int state);

int conn_flag;

int main(void) {
int sockfd;
struct sockaddr_in serv_addr;

sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket create error");
exit(-1);
}

bzero(&serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
if (inet_pton(AF_INET, IPADDRESS, &serv_addr.sin_addr) != 1) {
perror("inet_pton error");
exit(-1);
}
connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));

handle_connection(sockfd);
close(sockfd);
return 0;
}

static void handle_connection(int sockfd) {
int epollfd;
struct epoll_event events[EPOLLEVENTS];
char buf[MAXSIZE];

epollfd = epoll_create(FDSIZE);
if (epollfd < 0) {
perror("create epoll error");
exit(-1);
}
add_event(epollfd, STDIN_FILENO, EPOLLIN);

int ret;
for (;;) {
ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
handle_events(epollfd, events, ret, sockfd, buf);
}
}

static void handle_events(int epollfd, struct epoll_event *events, int num, int sockfd, char *buf) {
int fd;
for (int i=0; i<num; i++) {
fd = events[i].data.fd;
if (events[i].events & EPOLLIN)
do_read(epollfd, fd, sockfd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, sockfd, buf);
}
}

static void do_read(int epollfd, int fd, int sockfd, char *buf) {
int nread = read(fd, buf, MAXSIZE);
if (nread < 0) {
perror("read error");
exit(-1);
} else if (nread == 0) {
fprintf(stderr, "server close.\nbye\n");
exit(-1);
} else {
if (fd == STDIN_FILENO) {
add_event(epollfd, sockfd, EPOLLOUT);
} else {
delete_event(epollfd, sockfd, EPOLLIN);
add_event(epollfd, STDOUT_FILENO, EPOLLOUT);
}
}
}

static void do_write(int epollfd, int fd, int sockfd, char *buf) {
int nwrite = write(fd, buf, strlen(buf));
if (nwrite < 0) {
perror("write error");
exit(-1);
} else {
if (fd == STDOUT_FILENO) {
delete_event(epollfd, fd, EPOLLOUT);
} else {
modify_event(epollfd, fd, EPOLLIN);
}
}
memset(buf, 0, MAXSIZE);
}

static void add_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}

static void delete_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
}

static void modify_event(int epollfd, int fd, int state) {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = state;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
}

See Also

文章内容和代码学习自IO多路复用之epoll总结

epoll manual