Zach的博客

libev源码浅析

libev

libev是一个高效的异步I/O库,采用了事件循环模型。用户向libev注册感兴趣的事件,如文件描述符可读等,当事件发生时,用户注册事件时的回调被调用。

libev支持的事件有:

  1. 文件描述符事件(描述符可读、可写),ev_io
  2. Linux的inotify接口,ev_stat
  3. 信号事件,ev_signal
  4. 定时事件,ev_timer
  5. 周期事件,ev_periodic
  6. 进程状态变化,ev_child
  7. 事件循环自身的事件,ev_idleev_prepareev_check

一个例子

我们先看一个很简单的例子,然后用这个例子的执行流程去分析libev源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <stdio.h>
#include <unistd.h>

#include <ev.h>

ev_io stdin_watcher;

static void stdin_cb(EV_P_ ev_io *w, int revents) {
char buf[128];
int n;
if (revents & EV_READ) {
n = read(w -> fd, buf, 128);
if (n == 0) {
fprintf(stderr, "End of File\n");
ev_io_stop(EV_A_ w);
} else if (n > 0){
printf("read: %s\n", buf);
}
}
}

int main(void) {
struct ev_loop *loop = EV_DEFAULT;

ev_io_init(&stdin_watcher, stdin_cb, 0, EV_READ);
ev_io_start(loop, &stdin_watcher);

ev_run(loop, 0);

return 0;
}

libev中一个事件有一个watcher表示,一个watcher的类型的格式为ev_TYPE

每一个watcher有对应的初始化函数ev_TYPE_init,以ev_io为例,它的初始化函数原型为:

1
2
3
void ev_io_init(ev_io * w, void (*cb)(EV_P int revents), int fd, int events);

#define EV_P struct ev_loop *loop,

注:注意的一点是,在libev的实现中,ev_io_init实际上是一个宏,但是我们把它理解成一个函数其实区别并不大。

ev_io_init内部调用了两个函数:

1
2
3
void ev_init(ev_watcher *w, void (*cb)(EV_P int revents));

void ev_io_set(ev_io *w, int fd, int events);

在初始化之后就调用ev_io_start在loop中注册事件,最后调用ev_run运行loop。

当标准输入可读时,我们注册的回调模块stdin_watcher会被调用,这时候我们就可以读取标准输入的数据,如果读到了EOF,那么就调用ev_io_stop停止这个监听事件。

源码分析

数据结构

以下的代码都是以2.0版本为准的。
在分析源码之前,我们先来看看libev中几个关键的数据结构。

EV_WATCHER

1
2
3
4
5
6
7
struct ev_watcher {
int active;
int pending;
int priority;
EV_COMMON /* void *data */
void (*cb)(EV_P struct ev_TYPE *w, int revents);
}

ev_watcher相当于所有watcher的父类,它含有所有watcher的通用数据。拿ev_io来说,它的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
struct ev_io {
int active;
int pending;
int priority;
EV_COMMON
void (*cb)(EV_P struct ev_TYPE *w int revents);


struct ev_watcher_list *next;

int fd;
int events;
}

可以看到一个ev_io指针可以转换成一个ev_watcher指针,其他watcher类型也可以这么操作,所以ev_watcher相当于所有watcher的父类。

ANFD

1
2
3
4
5
6
7
struct ANFD {
WL head;
unsigned char events;
unsigned char reify;
}

typedef ev_watcher_list* WL;

ANFD表示一个文件描述符对应的事件。一个文件描述符可以有多个watcher,它们以链表的形式被组织起来,head即是链表的头。在ev_loop结构中,有一个anfds数组,每一个数组元素即数组下标对应的文件描述符的ANFD

ANPENDING

1
2
3
4
5
6
struct ANPENDING {
W w;
int events;
}

typedef ev_watcher* W;

一个ANPENDING即一个待处理的事件。在ev_loop中,待处理的事件的组织形式如下所示:

1
pri_max |----|     |----|----|----|----|----|
        |  --|---> |    |    |    |    |    |
   .    |----|     |----|----|----|----|----|
   .    |    |			ANPENDINGS
   .    |----|    
        |    |
        |----|
        |    |		pendings[w->priority][w->pending]即对应watcher的ANPENDING
        |----|
        |    |
        |----|
        |    |
pri_min |----|

ev_loop中每一个ANPENDING都有一个优先级,高优先级的事件在一个事件循环中首先被处理,但是低优先级事件也一定会被执行,只不过执行被延后了而已。

EV_LOOP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
struct ev_loop {
ev_tstamp ev_rt_now;
int activent;
int loop_count;
void (*backend_modify)(EV_P int fd, int oev, int nev);
void (*backend_poll)(EV_P ev_tstamp timeout);
int backend_fd;

struct epoll_event *epoll_events; /* epoll for example */
int epoll_eventmax;

ANFD *anfds;
int anfdmax;

ANPENDING *pendings[NUMPRI];
int pendingmax[NUMPRI];
int pendingcnt[NUMPRI];

int *fdchanges;
int fdchangemax;
int fdchangecnt;

WT *timers;
int timermax;
int timercnt;

WT *periodics;
int periodicmax;
int periodiccnt;

ev_idle **idles[NUMPRI];
int idlemax[NUMPRI];
int idlecnt[NUMPRI];

ev_prepare **prepares;
int preparemax;
int preparecnt;

ev_check **checks;
int checkmax;
int checkcnt;

...
}

ev_loop显然是libev中最重要的结构,这里只列出部分元素的含义,其余部分等在分析对应源码时再作解释。

  • ev_rt_now:用于记录ev_loop的现在时间。libev中的计时器是基于真实时间的,如果你注册了一个超时事件,事件在一小时之后发生,之后你把系统时间设置成去年的某个事件,注册的事件也会在大约一小时后发生。
  • activent:watcher必须保持ev_loop存活,这样每当一个事件发生时,watcher的回调函数才能被执行。为了保持ev_loop存活,watcher必须调用ev_ref 增加activent的个数,若activent值为0,那么这一次事件循环之后,ev_loop就被摧毁了。
  • loop_count:记录了ev_loop事件迭代的次数
  • backend_modifyev_loop添加或修改事件监听的接口,依平台而定。libev支持的接口有
    • select
    • poll
    • epoll
    • kqueue
    • port
  • backend_pollev_loop调用平台相关接口监听相关事件的接口。
  • backend_fd:以epoll为例,其值为我们调用epoll_create接口创建的文件句柄。

ev_io

首先来看一下ev_io的执行流程

ev_io_start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void noinline
ev_io_start (EV_P_ ev_io *w)
{

int fd = w->fd;

if (expect_false (ev_is_active (w)))
return;

assert (("ev_io_start called with negative fd", fd >= 0));

ev_start (EV_A_ (W)w, 1);
array_needsize (ANFD, anfds, anfdmax, fd + 1, anfds_init);
wlist_add (&anfds[fd].head, (WL)w);

fd_change (EV_A_ fd, w->events & EV_IOFDSET | 1);
w->events &= ~EV_IOFDSET;
}

void inline_speed
ev_start (EV_P_ W w, int active)
{

pri_adjust (EV_A_ w);
w->active = active;
ev_ref (EV_A);
}

void inline_size
wlist_add (WL *head, WL elem)
{

elem->next = *head;
*head = elem;
}

void inline_size
fd_change (EV_P_ int fd, int flags)
{

unsigned char reify = anfds [fd].reify;
anfds [fd].reify |= flags;

if (expect_true (!reify))
{
++fdchangecnt;
array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);
fdchanges [fdchangecnt - 1] = fd;
}
}

主要步骤为:

  1. 执行ev_start,调整watcher的优先级,设置watcher的active标志同增加ev_loopactivent
  2. 在文件描述符对应的watcher链表中插入该ev_io
  3. 调用fd_change,它增加fdchangecnt的个数,同时记录发生变化的文件描述符,以便在事件循环的时候处理它。

ev_loop

在最新版本中ev_loop对应的函数为ev_run,由于我看的是2.0版本的,就用ev_loop来说明了。

ev_loop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
void
ev_loop (EV_P_ int flags)
{

loop_done = flags & (EVLOOP_ONESHOT | EVLOOP_NONBLOCK)
? EVUNLOOP_ONE
: EVUNLOOP_CANCEL;

call_pending (EV_A); /* in case we recurse, ensure ordering stays nice and clean */

do
{
#ifndef _WIN32
if (expect_false (curpid)) /* penalise the forking check even more */
if (expect_false (getpid () != curpid))
{
curpid = getpid ();
postfork = 1;
}
#endif

#if EV_FORK_ENABLE
/* we might have forked, so queue fork handlers */
if (expect_false (postfork))
if (forkcnt)
{
queue_events (EV_A_ (W *)forks, forkcnt, EV_FORK);
call_pending (EV_A);
}
#endif

/* queue prepare watchers (and execute them) */
if (expect_false (preparecnt))
{
queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE);
call_pending (EV_A);
}

if (expect_false (!activecnt)) /* A */
break;

/* we might have forked, so reify kernel state if necessary */
if (expect_false (postfork))
loop_fork (EV_A);

/* update fd-related kernel structures */
fd_reify (EV_A); /* B */

/* calculate blocking time */
{
ev_tstamp block;

if (expect_false (flags & EVLOOP_NONBLOCK || idleall || !activecnt))
block = 0.; /* do not block at all */
else
{
/* update time to cancel out callback processing overhead */
time_update (EV_A_ 1e100);

block = MAX_BLOCKTIME;

if (timercnt)
{
ev_tstamp to = ((WT)timers [0])->at - mn_now + backend_fudge;
if (block > to) block = to;
}

#if EV_PERIODIC_ENABLE
if (periodiccnt)
{
ev_tstamp to = ((WT)periodics [0])->at - ev_rt_now + backend_fudge;
if (block > to) block = to;
}
#endif

if (expect_false (block < 0.)) block = 0.;
}

/* C */
++loop_count;
backend_poll (EV_A_ block);

/* update ev_rt_now, do magic */
time_update (EV_A_ block);
}

/* queue pending timers and reschedule them */
timers_reify (EV_A); /* relative timers called last */
#if EV_PERIODIC_ENABLE
periodics_reify (EV_A); /* absolute timers called first */
#endif

#if EV_IDLE_ENABLE
/* queue idle watchers unless other events are pending */
idle_reify (EV_A);
#endif

/* queue check watchers, to be executed first */
if (expect_false (checkcnt))
queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);

call_pending (EV_A); /* E */

}
while (expect_true (activecnt && !loop_done));

if (loop_done == EVUNLOOP_ONE)
loop_done = EVUNLOOP_CANCEL;
}
fd_reify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void inline_size
fd_reify (EV_P)
{

int i;

for (i = 0; i < fdchangecnt; ++i)
{
int fd = fdchanges [i];
ANFD *anfd = anfds + fd;
ev_io *w;

unsigned char events = 0;

for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next)
events |= (unsigned char)w->events;

#if EV_SELECT_IS_WINSOCKET
if (events)
{
unsigned long argp;
anfd->handle = _get_osfhandle (fd);
assert (("libev only supports socket fds in this configuration", ioctlsocket (anfd->handle, FIONREAD, &argp) == 0));
}
#endif

{
unsigned char o_events = anfd->events;
unsigned char o_reify = anfd->reify;

anfd->reify = 0;
anfd->events = events;

if (o_events != events || o_reify & EV_IOFDSET)
backend_modify (EV_A_ fd, o_events, events);
}
}

fdchangecnt = 0;
}
epoll_poll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void
epoll_poll (EV_P_ ev_tstamp timeout)
{

/* D */
int i;
int eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, (int)ceil (timeout * 1000.));

if (expect_false (eventcnt < 0))
{
if (errno != EINTR)
syserr ("(libev) epoll_wait");

return;
}

for (i = 0; i < eventcnt; ++i)
{
struct epoll_event *ev = epoll_events + i;

int fd = ev->data.u64;
int got = (ev->events & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0)
| (ev->events & (EPOLLIN | EPOLLERR | EPOLLHUP) ? EV_READ : 0);
int want = anfds [fd].events;

if (expect_false (got & ~want))
{
/* we received an event but are not interested in it, try mod or del */
ev->events = (want & EV_READ ? EPOLLIN : 0)
| (want & EV_WRITE ? EPOLLOUT : 0);

epoll_ctl (backend_fd, want ? EPOLL_CTL_MOD : EPOLL_CTL_DEL, fd, ev);
}

fd_event (EV_A_ fd, got);
}

/* if the receive array was full, increase its size */
if (expect_false (eventcnt == epoll_eventmax))
{
ev_free (epoll_events);
epoll_eventmax = array_nextsize (sizeof (struct epoll_event), epoll_eventmax, epoll_eventmax + 1);
epoll_events = (struct epoll_event *)ev_malloc (sizeof (struct epoll_event) * epoll_eventmax);
}
}
call_pending
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void inline_speed
call_pending (EV_P)
{

int pri;

for (pri = NUMPRI; pri--; )
while (pendingcnt [pri])
{
ANPENDING *p = pendings [pri] + --pendingcnt [pri];

if (expect_true (p->w))
{
/*assert (("non-pending watcher on pending list", p->w->pending));*/

p->w->pending = 0;
EV_CB_INVOKE (p->w, p->events);
}
}
}

主体部分就在do {} while这个循环里面。

我们先略过不和ev_io相关的部分,只看代码中标注([A-E])位置对应的部分:

  • A:检查activent值是否为0,若是,那么loop退出事件循环
  • B:调用fd_reify函数,该函数遍历fdchanges数组,对于每一个描述符,如果其对应的事件有改变或者新增加的描述符,那么就调用backend_modify修改或添加文件描述符的事件。
  • C:增加事件循环的迭代次数,然后调用backend_poll调用相关平台的接口监听文件描述符事件。
  • D:以epoll为例,backend_poll的实现为epoll_pollepoll_poll调用epoll_wait,发生的事件被存放在epoll_events数组中,对于一个事件,得到的事件不是我们想要的事件,那么就修改或删除文件描述符对应的监听事件。然后调用fd_event函数,把得到文件描述符事件加到ev_loop的对应的pendings列表中。在fd_event之后,如果发现epoll_eventmax == eventcnt,那么就增大epoll_events数组元素的个数,以便下一次能够接收更多发生的文件描述符事件。
  • E:调用call_pending函数:
    1. 按照优先级从大到小,遍历pendings数组
    2. 如果对应的pendingcnt[pri]值大于0,即对应优先级有事件待处理,依次去对应ANPENDING列表的元素
    3. 对取到的ANPENDING,用EV_CB_INVOKE宏调用其对应watcher的回调函数。

ev_io_stop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void noinline
ev_io_stop (EV_P_ ev_io *w)
{

/* A */
clear_pending (EV_A_ (W)w);
if (expect_false (!ev_is_active (w)))
return;

assert (("ev_io_start called with illegal fd (must stay constant after start!)", w->fd >= 0 && w->fd < anfdmax));

/* B */
wlist_del (&anfds[w->fd].head, (WL)w);
/* C */
ev_stop (EV_A_ (W)w);

/* D */
fd_change (EV_A_ w->fd, 1);
}
clear_pending
1
2
3
4
5
6
7
8
9
void inline_speed
clear_pending (EV_P_ W w)
{

if (w->pending)
{
pendings [ABSPRI (w)][w->pending - 1].w = 0;
w->pending = 0;
}
}
ev_stop
1
2
3
4
5
6
void inline_size
ev_stop (EV_P_ W w)
{

ev_unref (EV_A);
w->active = 0;
}
  • A:从pendings列表中删除对应的watcher
  • B:从文件描述符对应的watcher链表anfds[w->fd]中删除将被停止的watcher。
  • C:调用ev_stop,减少ev_loopactivent的个数(通过ev_unref实现),讲watcher的active标志置为0。
  • D:调用fd_change函数修改对应的fdchanges数组和fdchangecnt变量,以便在下一次事件循环中修改文件描述符的监听事件。如果文件描述符没有任何监听事件,那么在文件描述符的epoll事件会在epoll_poll函数中被删除。
1
2
3
4
5
6
7
8
if (expect_false (got & ~want))
{
/* we received an event but are not interested in it, try mod or del */
ev->events = (want & EV_READ ? EPOLLIN : 0)
| (want & EV_WRITE ? EPOLLOUT : 0);

epoll_ctl (backend_fd, want ? EPOLL_CTL_MOD : EPOLL_CTL_DEL, fd, ev);
}

ev_timer & ev_periodic

ev_timerev_periodic都可以用来设置超时和周期事件,不同的是,ev_periodic可以设置一个回调函数,在每一次周期完成后这个回调函数被调用并返回一个时间节点,该节点是下一次事件被触发的时间。

ev_loop结构内有两个元素:

  • timers
  • periodics

timerperiodic的结构分别为:

timer
1
2
3
4
5
struct ev_timer {
EV_WATCHER(ev_timer)
ev_tstamp at;
ev_tstamp repeat; /* 多少时间后重复执行*/
}
periodic
1
2
3
4
5
6
struct ev_periodic {
EV_WATCHER(ev_periodic)
ev_tstamp offset;
ev_tstamp interval;
ev_tstamp (*reschedule_cb)(struct ev_periodic *w, ev_tstamp now);
}

它们分别存储了在ev_loop中注册的所有ev_timerev_periodic。它们都以最小堆的形式被组织,堆顶是离现在最近的timer事件。每一次事件循环,在调用backend_poll之前,首先取两个堆顶的元素,取时间较小的那个作为此次backend_poll的超时事件。在backend_poll返回之后调用timers_reifyperiodics_reify调整堆,同时把已经发生的超时和定时事件加入到pendings中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
        if (expect_false (flags & EVLOOP_NONBLOCK || idleall || !activecnt))
block = 0.; /* do not block at all */
else
{
/* update time to cancel out callback processing overhead */
time_update (EV_A_ 1e100);

block = MAX_BLOCKTIME;

if (timercnt)
{
ev_tstamp to = ((WT)timers [0])->at - mn_now + backend_fudge;
if (block > to) block = to;
}

#if EV_PERIODIC_ENABLE
if (periodiccnt)
{
ev_tstamp to = ((WT)periodics [0])->at - ev_rt_now + backend_fudge;
if (block > to) block = to;
}
#endif

if (expect_false (block < 0.)) block = 0.;
}

++loop_count;
backend_poll (EV_A_ block);

/* update ev_rt_now, do magic */
time_update (EV_A_ block);
}

/* queue pending timers and reschedule them */
timers_reify (EV_A); /* relative timers called last */
#if EV_PERIODIC_ENABLE
periodics_reify (EV_A); /* absolute timers called first */
#endif
  • time_update:主要用于更新ev_loop的当前时间。
  • backend_fudge:时间误差变量

time_reifyperiodic_reify函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void inline_size
timers_reify (EV_P)
{

while (timercnt && ((WT)timers [0])->at <= mn_now)
{
ev_timer *w = (ev_timer *)timers [0];

/*assert (("inactive timer on timer heap detected", ev_is_active (w)));*/

/* first reschedule or stop timer */
if (w->repeat)
{
assert (("negative ev_timer repeat value found while processing timers", w->repeat > 0.));

((WT)w)->at += w->repeat;
if (((WT)w)->at < mn_now)
((WT)w)->at = mn_now;

downheap (timers, timercnt, 0);
}
else
ev_timer_stop (EV_A_ w); /* nonrepeating: stop timer */

ev_feed_event (EV_A_ (W)w, EV_TIMEOUT);
}
}

void inline_size
periodics_reify (EV_P)
{

/* A */
while (periodiccnt && ((WT)periodics [0])->at <= ev_rt_now)
{
ev_periodic *w = (ev_periodic *)periodics [0];

/*assert (("inactive timer on periodic heap detected", ev_is_active (w)));*/

/* first reschedule or stop timer */
/* B */
if (w->reschedule_cb)
{
((WT)w)->at = w->reschedule_cb (w, ev_rt_now + TIME_EPSILON);
assert (("ev_periodic reschedule callback returned time in the past", ((WT)w)->at > ev_rt_now));
downheap (periodics, periodiccnt, 0);
}
else if (w->interval)
{
((WT)w)->at = w->offset + ceil ((ev_rt_now - w->offset) / w->interval) * w->interval;
if (((WT)w)->at - ev_rt_now <= TIME_EPSILON) ((WT)w)->at += w->interval;
assert (("ev_periodic timeout in the past detected while processing timers, negative interval?", ((WT)w)->at > ev_rt_now));
downheap (periodics, periodiccnt, 0);
}
else
ev_periodic_stop (EV_A_ w); /* nonrepeating: stop timer */

ev_feed_event (EV_A_ (W)w, EV_PERIODIC);
}
}

periodics_reify例子(timers_reify类似):

  1. 如果堆顶的时间比现在的事件小,那么取堆顶,否则函数返回
  2. 如果是周期事件,即reschedule_cb不为空或者interval不为0,那么计算出下一次事件触发的事件,调整堆,否则停止这个timer事件。
  3. 把这一次触发的事件加入到pendings中,等待call_pending被调用而触发回调函数。

ev_signal

libev加入了对信号事件的支持。当一个信号发生时,回调函数不会像在UNIX系统中一样被立刻调用,而是在下一个事件循环中被处理。

先看看信号事件在libev中的组织形式。

1
     |--------|     |----|----|----|----|----|
     | head   |---> |    |    |    |    |    |
.    |        |     |----|----|----|----|----|
.    | gotsig |			EV_WATCHERS
.    |--------|    
     |        |
     |        |
     |        |	
     |--------|
     |        |
     |        |
     |        |
     |--------|

相关的数据结构:

1
2
3
4
struct ANSIG {
WL *head;
sig_atomic_t volatile gotsig;
}

当某一个信号被触发时,信号对应的WL中所有的回调函数都会被依次执行。

libev利用管道实现了异步信号处理。一个loop在调用loop_init初始化之后,调用siginit

1
2
3
4
5
6
7
8
9
10
static void noinline
siginit (EV_P)
{

fd_intern (sigpipe [0]);
fd_intern (sigpipe [1]);

ev_io_set (&sigev, sigpipe [0], EV_READ);
ev_io_start (EV_A_ &sigev);
ev_unref (EV_A); /* child watcher should not keep loop alive */
}

可以看siginit往loop中注册一个ev_io,用于监听管道中pipe[0]的读事件。

用户在注册一个信号事件时,调用ev_signal_init设置信号的回调处理,监听的信号值等。然后调用ev_signal_start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...  
ev_start (EV_A_ (W)w, 1);
wlist_add (&signals [w->signum - 1].head, (WL)w);

if (!((WL)w)->next)
{
#if _WIN32
signal (w->signum, sighandler);
#else
struct sigaction sa;
sa.sa_handler = sighandler;
sigfillset (&sa.sa_mask);
sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
sigaction (w->signum, &sa, 0);
#endif
}
...

首先讲watcher加入对应信号的链表,然后如果是链头,那么说明对应的信号处理函数未被注册到内核中,于是初始化一个sigaction,注册对应的信号处理函数。sighandler代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static void
sighandler (int signum)
{

#if _WIN32
signal (signum, sighandler);
#endif

signals [signum - 1].gotsig = 1;

if (!gotsig)
{
int old_errno = errno;
gotsig = 1;
write (sigpipe [1], &signum, 1);
errno = old_errno;
}
}

可以看到,当一个信号发生时,libev设置信号的对应gosig为1,然后往管道离写信号值,这样先前注册的读管道监视器sigev就被激活,其对应的回调函数被在下一个事件循环被调用:

1
2
3
4
5
6
7
8
9
10
11
12
static void
sigcb (EV_P_ ev_io *iow, int revents)
{

int signum;

read (sigpipe [0], &revents, 1);
gotsig = 0;

for (signum = signalmax; signum--; )
if (signals [signum].gotsig)
ev_feed_signal_event (EV_A_ signum + 1);
}

在这之后所有的信号事件被加入pendings中,当call_pending被调用时,信号事件的回调事件也就得到了处理。

ev_prepare & ev_check

ev_prepareev_checkev_loop事件循环自身的事件。ev_prepareev_loop收集事件前被调用;ev_check在收集完事件后被调用。他们都能唤醒和休眠任意个监视器,以实现一些特定的事件循环行为。

ev_prepareev_check的结构为:

1
2
3
struct ev_TYPE {
EV_WATCHER(TYPE)
}

ev_stat

ev_stat相关接口没有看,因为对inotifykqueue接口还不是很熟悉(逃