原创 更新于 时间( 2020-09-04 17:03:38) ( 405) 次浏览 标签 : Redis
导读

Redis 的单线程设计、深度剖析IO多路复用的具体实现 ...

本文导读:

  • Redis 的单线程是否真的是单线程?为什么要采用单线程设计?
  • Redis 单线程为什么快
  • Redis 中事件驱动的实现


    源码阅读(3)
    源码版本: Redis 6.0.7
    主要文件: ae.c 、ae.h

1. Redis单线程

首先要确认的是, Redis 的单线程 指的是 Redis 网络 IO 和 键值对读写 是由一个 单独线程 来完成的,而其他的功能,持久化,异步删除,集群数据同步等,是由其他的线程执行。

既然 Redis 不用多线程去处理网络IO,那么就代表 多线程 并不是像想象中好用,使用多线程就可以增加系统吞吐率,增加处理效率,多线程之间共享资源的共享带来的互斥锁,等额外的开销也是不容忽视的,这是 Redis 使用单线程的一个愿意。

2. IO多路复用

为什么 Redis 单线程处理IO 还能在 查询和连接并发 有那么好的 表现呢?

你可能经常会看到这样的总结:

Redis 单线程 快的原因
1 内存
2 合理高效的数据结构
3 多路复用机制

前面两点都好理解,那么 第三点 多路复用机制 是什么呢?Redis 是如何实现 在单线程中,同时处理多个IO请求 的呢? 不要急,看完下面的就明白了。

这里对3种常见的IO模型进行对比:

  1. 同步阻塞IO(Blocking IO):传统的IO模型。 用户发起IO请求,等待内核将接受到的数据拷贝,返回给用户。

  2. 同步非阻塞IO (Non-blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。 用户发起 IO 请求,等待数据,没有收到,间断的继续发送IO请求,直到数据返回。

  3. IO多路复用(IO Multiplexing):异步阻塞IO,例如Linux中的epoll 用户将 IO 请求交给 select , select 调用socket ,然后由select 监听返回,当soket 返回时,select返回,用户再次发起IO ,得到返回数据 ,这与同步阻塞IO 的区别在于,用户可以注册多个select IO请求,这样就实现了,在单线程中,同时处理多个IO请求 的目的。

同样的

  1. 同步阻塞IO(Blocking IO) 你想问女神七夕晚上是否愿意与你共度良宵,于是你发了条短信然后跑去她宿舍楼下等她下楼,期间你就一直站在楼下等一动不动的等待女神下楼。

  2. 同步非阻塞IO (Non-blocking IO) 你想问女神七夕晚上是否愿意与你共度良宵,于是你发了条短信然后跑去她宿舍楼下等她下楼,期间你就一直站在楼下等,并且不断的发消息询问:“下楼了没有” 。

  3. IO多路复用(IO Multiplexing) 用户将 IO 请求交给 select , select 调用socket ,然后由select 监听返回,当soket 返回时,select返回,用户再次发起你想问女神七夕晚上是否愿意与你共度良宵,于是你让女生宿舍大妈帮你监视女生宿舍,等到大妈看到女神下楼了,她通知你,你成功等到女神。

select epoll 最主要的的区别在于:
在linux 中,select 是有上限的,而且采取的是循环来检查 ,所以每次查询效率是 O(n),而 epoll 的查询效率是 O(1) ,无上限,适合 多连接,并且每个连接不是很活跃的情况,效率更高。

在 redis 单线程的情况下,IO多路复用机制 允许内核中,同时存在多个 监听套接字 和 已连接套接字 内核会一直监听这些套接字上的连接请求 或 数据请求,交给 redis 处理,实现一个 redis 线程 处理多个 IO 流的效果 ,redis 网络框架 通过调用 epoll 机制,让内核监听这些套接字,实现不会阻塞在一个 客户端IO请求上,实现多客户端、并发性 。

3. Redis 网络IO源码

  1. ae.h

/*定义了fd的读取状态*/
#define AE_NONE 0       /* No events registered. */
#define AE_READABLE 1   /* Fire when descriptor is readable. */
#define AE_WRITABLE 2   /* Fire when descriptor is writable. */
#define AE_BARRIER 4    /* With WRITABLE, never fire the event if the
                           READABLE event already fired in the same event
                           loop iteration. Useful when you want to persist
                           things to disk before sending replies, and want
                           to do that in a group fashion. */

/* State of an event based program 
aeEventLoop struct 
events
fired
*/
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events 注册的事件s*/
    aeFiredEvent *fired; /* Fired events 解除的事件s*/
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

  1. ae.c 根据系统性能 使用 evport->epoll->kqueue->select aeCreateEventLoop 创建 aeCreateEventLoop 对象 aeCreateFileEvent FD创建 aeMain 不停的调用 aeProcessEvents 来处理IO事件

ae.c

#include "ae.h"
#include "zmalloc.h"
#include "config.h"

/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending.
 * 包括此 系统支持的 最佳 多路复用
 * 以下 按  性能  顺序降序排列
 * evport->epoll->kqueue->select
   环境对应:
    evport  Solaris 10(老旧的UNIX-SUN)   O(1) fd选择算法复杂度  10w 复用并发支持
    epoll   Linux的                      O(1)                 10w
    kqueue  OS X,FreeBSD的              O(1)                 10w
    select  通常安装在所有平台作为fallback  O(n)                  1024
 */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif


/*创建AE
* --maxclients 最大客户端连接数, CONFIG_FDSET_INCR 128
* setsize ->  : server.c 2846 | server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
*/
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}



/*网络IO事件的删除*/
void aeDeleteEventLoop(aeEventLoop *eventLoop) {
    aeApiFree(eventLoop);
    zfree(eventLoop->events);
    zfree(eventLoop->fired);

    /* Free the time events list. */
    aeTimeEvent *next_te, *te = eventLoop->timeEventHead;
    while (te) {
        next_te = te->next;
        zfree(te);
        te = next_te;
    }
    zfree(eventLoop);
}

/*关闭 比如 性能测试 redis-benchmark 结束  freeClient 调用 */
void aeStop(aeEventLoop *eventLoop) {
    eventLoop->stop = 1;
}

/* fd IO 注册
 * eventLoop -> aeCreateEventLoop 创建的 变量
 * fd -> 文件描述符
 * mask -> AE_WRITABLE AE_BARRIER  AE_READABLE ae.h line 42 fd 读写状态
 * aeFileProc *proc -> 函数指针事件处理函数 writeHandler、rdbPipeReadHandler、redisAeReadEvent
 * clientData -> client
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        /*超出 可以监听的网络事件fd 返回一个范围错误*/
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}


/* 收到 client request 处理
 * Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 *
 * 根据 flags 标志 ,分别
 * If flags is 0, the function does nothing and returns.
    如果标志为0,则函数不执行任何操作并返回。
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
    则处理所有类型的事件。
 * if flags has AE_FILE_EVENTS set, file events are processed.
   则处理文件事件。
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
 *
 * The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
...
}



/*
不停的调用 aeProcessEvents, redis  不停的收到 client 的 request 处理
*/
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}


Leon0204

打杂后端程序猿~

讨论区

发表评论
昵称:
评论:
验证