Redis 源码研究(一) - AE 事件库概述
简介
AE 事件库是 Redis 用来处理 IO 事件的库,根据系统的不同选用尽可能高效率的 IO 实现,在 ae.c
中使用条件编译根据系统支持的多路复用器(multiplexing)来选用相应的文件,其按优先级排序为:
- evport -
ae_evport.c
- epoll -
ae_epoll.c
- kqueue -
ae_kqueue.c
- select -
ae_select.c
在各个源文件中实现了 ae 库基本操作的统一接口,使得实际的实现代码得以相同。
在本次分享中,我准备先对 ae.h
进行阅读,也即大概了解 ae 事件库完成后所提供的接口,然后对 ae_epoll.c
的 api 实现进行阅读。
在文末我也简单总结了我个人从这两个文件所看出的事件库的设计。
ae.h
这里将通过直接附上原始代码并于代码内进行备注的形式进行阅读和记录,并对源文件中包含的英文注释进行翻译,一些作用显然的内容不再进行注释解释。
从 ae.h 所提供的接口来推测事件库所提供的大致功能为注册并管理事件以及执行事件循环,目前来看还比较模糊,具体的实现可能需要详细深入 ae.c
中的实现。
#ifndef __AE_H__
#define __AE_H__
#include <time.h>
#define AE_OK 0
#define AE_ERR -1
#define AE_NONE 0 /* 无事件注册 */
#define AE_READABLE 1 /* 当描述符可读时触发 */
#define AE_WRITABLE 2 /* 当描述符可写时触发 */
#define AE_BARRIER 4 /* 如果在同一次事件循环中可读事件已经触发,则可写事件将永远不会触发;当你需要在发送回复前想要将一些内容持久化地写入磁盘时会很有用 */
#define AE_FILE_EVENTS 1
#define AE_TIME_EVENTS 2
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT 4
#define AE_CALL_AFTER_SLEEP 8
#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1
/* Macros */
#define AE_NOTUSED(V) ((void) V)
struct aeEventLoop;
/* 类型定义与数据解构 */
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
/* 文件事件 */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* 时间事件 */
typedef struct aeTimeEvent {
long long id; /* 时间事件标识符(id) */
long when_sec; /* 秒 */
long when_ms; /* 毫秒 */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
// 结构中有 prev 和 next 指针,如此看来应该是一个双向链表的结构
/* 一个已经触发的事件 */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
/* 一个事件驱动的程序状态 */
typedef struct aeEventLoop {
int maxfd; /* 当前已注册的最大描述符 */
int setsize; /* 追踪的文件描述符数量最大值 */
long long timeEventNextId;
time_t lastTime; /* 用于检测系统时钟周期(skew) */
aeFileEvent *events; /* 已注册的事件 */
aeFiredEvent *fired; /* 已触发的事件 */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* 用于处理 polling API 的数据 */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
/* 函数原型 */
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
#endif
ae_epoll.c
redis 中会根据系统支持的多路复用驱动选择更优的方案,这里先对 epoll
封装的 API 进行阅读,主要为了摸清楚 ae 事件库所封装的一层操作。
#include <sys/epoll.h>
// 定义状态结构体 aeApiState,从名称来看是控制 aeApi 的状态的
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
// 创建控制结构
static int aeApiCreate(aeEventLoop *eventLoop) {
// 先分配 aeApiState 和 epoll_event 的空间,后者分配数量取决于参数传进来的 setsize(允许最大描述符数)
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 使用 epoll_create 创建 epoll 结构,用 epfd 跟踪
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
// 同时设置事件循环内的 polling apidata 为当前这个 apistate
// 值得注意的是 apidata 是 void* 类型,应该是根据不同的实现中使用不同的 aeApi,统一调取 aeEventLoop 的 api 结构,这样就能保证同一份文件的 aeapi 操作是没问题的了
eventLoop->apidata = state;
return 0;
}
// 重新设置事件循环允许跟踪的描述符数
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
aeApiState *state = eventLoop->apidata;
state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
return 0;
}
static void aeApiFree(aeEventLoop *eventLoop) {
aeApiState *state = eventLoop->apidata;
close(state->epfd);
zfree(state->events);
zfree(state);
}
// 添加事件方法
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* 避免 valgrind 工具提示的警告信息 */
// valgrind 经查询是一种可以用来检测内存泄漏的工具
/* 如果描述符已经被某些事件监听了的话,那么需要使用 EPOLL_CTL_MOD(修改)
* 否则使用 ADD 操作 */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// events 居然是用描述符直接作为偏移量的吗?难怪结构中有一个最大分配的文件描述符 maxfd
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* 合并旧事件 */
// 根据最终的 mask 决定在 epoll 事件上是否处理 IN/OUT
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 使用 epoll_ctl 将文件描述符以及监听的事件传递至内核
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
// 删除事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);
// 与上面添加同理,根据最终的 mask 决定如何修改 epoll 事件,即将 AE 库本身的一些描述转换为实际驱动的描述(这里是 epoll)
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 如果删了指定操作的最终 mask 是空的话,应该让 epoll 直接去删除事件,否则是修改事件
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* 注意,内核版本 < 2.6.1 时,即使是对于删除操作也要求传递非空的事件指针(ee) */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
// 实际的事件等待
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 调用 epoll_wait 等待在给定的 fd 上发生的事件,等待的 timeout 为传进来的 tvp
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
// 对于 epoll 返回的每个就绪事件,将 EPOLL 事件转换为对应的 AE 事件状态,并且添加到事件循环结构的已触发数组中
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
// 返回使用的 API 类型,根据实际条件编译时引入的文件不同返回结果不同
static char *aeApiName(void) {
return "epoll";
}
总结
从上面的两个文件是能够较为清晰的看出 AE 事件库的大体设计情况了,首先是使用条件编译根据系统支持的底层 API 不同选择不同的文件(ae_select.c
这一些),在这些文件中提供的 AeApi
封装底层 API 提供统一的接口给 AE 主库使用,它们的关系大致如下图:
虽说 redis 是用 C 编写的,但其所设计也是面向对象的思想(各个控制结构的方法的调用都需要传递一个控制结构实例指针,其对应的就是面向对象中的 this
指针,只不过面向对象的编程语言隐藏了这一具体实现)
AE 的事件循环 AeEventLoop
结构中有一个空类型指针 void *apidata
,其本身并不操作这里的数据,而是交给 AeApiState
结构去创建并维护,原因是在不同系统平台下支持的多路复用IO接口不同,条件编译时选用的实现并不同,但是在各个实现下 AeApiState
均提供了统一的创建、控制、删除等接口,在调用 aeApiCreate
之后,相应的实现就会将需要使用的控制数据放到事件循环结构的 apidata
下以供其他方法使用。
这几个文件最主要的功能做的是将系统提供的多路复用IO接口的状态统一封装为 AE 事件库的状态,也即 AE_WRITABLE, AE_READABLE, AE_BARRIER
,以此来屏蔽底层的细节并对外提供统一的接口。
以上,如有错误欢迎指出,欢迎交流