AMS系统AMS系统12-11 15:25

redis 为什么可以如此的高并发

本文的讨论,暂时忽略redis数据结构和算法层面的东西。


目录

  • redis如此之快的原因
  • redis server启动流程分析
  • IO模型介绍与epoll与redis
  • 总结
  • 推荐资料

redis如此之快,整体来说原因如下

  • 绝大部分请求是纯粹的内存操作(非常快速)

  • 采用单线程,避免了不必要的上下文切换和竞争条件

  • 非阻塞IO 内部实现采用epoll,采用了epoll+自己实现的简单的事件框架。epoll中的读、写、关闭、连接都转化成了事件,然后利用epoll的多路复用特性,绝不在io上浪费一点时间

这3个条件不是相互独立的,特别是第一条,如果请求都是耗时的,采用单线程吞吐量及性能可想而知了。应该说redis为特殊的场景选择了合适的技术方案。


redis server启动流程

redis_startup.jpeg

main 函数

int main(int argc, char **argv) {
struct timeval tv;

/* We need to initialize our libraries, and the server configuration. */
// 初始化库
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv);
#endif
setlocale(LC_COLLATE,"");
zmalloc_enable_thread_safeness();
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());

// 检查服务器是否以 Sentinel 模式启动
server.sentinel_mode = checkForSentinelMode(argc,argv);

// 初始化服务器
initServerConfig();

/* We need to init sentinel right now as parsing the configuration file
 * in sentinel mode will have the effect of populating the sentinel
 * data structures with master nodes to monitor. */
// 如果服务器以 Sentinel 模式启动,那么进行 Sentinel 功能相关的初始化
// 并为要监视的主服务器创建一些相应的数据结构
if (server.sentinel_mode) {
    initSentinelConfig();
    initSentinel();
}

// 检查用户是否指定了配置文件,或者配置选项
if (argc >= 2) {
    int j = 1; /* First option to parse in argv[] */
    sds options = sdsempty();
    char *configfile = NULL;

    /* Handle special options --help and --version */
    // 处理特殊选项 -h 、-v 和 --test-memory
    if (strcmp(argv[1], "-v") == 0 ||
        strcmp(argv[1], "--version") == 0) version();
    if (strcmp(argv[1], "--help") == 0 ||
        strcmp(argv[1], "-h") == 0) usage();
    if (strcmp(argv[1], "--test-memory") == 0) {
        if (argc == 3) {
            memtest(atoi(argv[2]),50);
            exit(0);
        } else {
            fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
            fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
            exit(1);
        }
    }

    /* First argument is the config file name? */
    // 如果第一个参数(argv[1])不是以 "--" 开头
    // 那么它应该是一个配置文件
    if (argv[j][0] != '-' || argv[j][1] != '-')
        configfile = argv[j++];

    /* All the other options are parsed and conceptually appended to the
     * configuration file. For instance --port 6380 will generate the
     * string "port 6380\n" to be parsed after the actual file name
     * is parsed, if any. */
    // 对用户给定的其余选项进行分析,并将分析所得的字符串追加稍后载入的配置文件的内容之后
    // 比如 --port 6380 会被分析为 "port 6380\n"
    while(j != argc) {
        if (argv[j][0] == '-' && argv[j][1] == '-') {
            /* Option name */
            if (sdslen(options)) options = sdscat(options,"\n");
            options = sdscat(options,argv[j]+2);
            options = sdscat(options," ");
        } else {
            /* Option argument */
            options = sdscatrepr(options,argv[j],strlen(argv[j]));
            options = sdscat(options," ");
        }
        j++;
    }
    if (configfile) server.configfile = getAbsolutePath(configfile);
    // 重置保存条件
    resetServerSaveParams();

    // 载入配置文件, options 是前面分析出的给定选项
    loadServerConfig(configfile,options);
    sdsfree(options);

    // 获取配置文件的绝对路径
    if (configfile) server.configfile = getAbsolutePath(configfile);
} else {
    redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
}

// 将服务器设置为守护进程
if (server.daemonize) daemonize();

// 创建并初始化服务器数据结构
initServer();

// 如果服务器是守护进程,那么创建 PID 文件
if (server.daemonize) createPidFile();

// 为服务器进程设置名字
redisSetProcTitle(argv[0]);

// 打印 ASCII LOGO
redisAsciiArt();

// 如果服务器不是运行在 SENTINEL 模式,那么执行以下代码
if (!server.sentinel_mode) {
    /* Things not needed when running in Sentinel mode. */
    // 打印问候语
    redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
    // 打印内存警告
    linuxOvercommitMemoryWarning();
#endif
    // 从 AOF 文件或者 RDB 文件中载入数据
    loadDataFromDisk();
    // 启动集群?
    if (server.cluster_enabled) {
        if (verifyClusterConfigWithData() == REDIS_ERR) {
            redisLog(REDIS_WARNING,
                "You can't have keys in a DB different than DB 0 when in "
                "Cluster mode. Exiting.");
            exit(1);
        }
    }
    // 打印 TCP 端口
    if (server.ipfd_count > 0)
        redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
    // 打印本地套接字端口
    if (server.sofd > 0)
        redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else {
    sentinelIsRunning();
}

/* Warning the user about suspicious maxmemory setting. */
// 检查不正常的 maxmemory 配置
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
    redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}

// 运行事件处理器,一直到服务器关闭为止
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);

// 服务器关闭,停止事件循环
aeDeleteEventLoop(server.el);

return 0;
}

initServer 函数,创建TCP Server,启动服务器,通过 listenToPort()完成 bind与listen,aeCreateFileEvent()注册事件监听,监听结果有acceptTcpHandler()函数处理

void initServer() {
int j;

// 设置信号处理函数
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();

// 设置 syslog
if (server.syslog_enabled) {
    openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
        server.syslog_facility);
}

// 初始化并创建数据结构
server.current_client = NULL;
server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;

// 创建共享对象
createSharedObjects();
adjustOpenFilesLimit();
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
server.db = zmalloc(sizeof(redisDb)*server.dbnum);

/* Open the TCP listening socket for the user commands. */
// 打开 TCP 监听端口,用于等待客户端的命令请求
if (server.port != 0 &&
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
    exit(1);

/* Open the listening Unix domain socket. */
// 打开 UNIX 本地端口
if (server.unixsocket != NULL) {
    unlink(server.unixsocket); /* don't care if this fails */
    server.sofd = anetUnixServer(server.neterr,server.unixsocket,
        server.unixsocketperm, server.tcp_backlog);
    if (server.sofd == ANET_ERR) {
        redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
        exit(1);
    }
    anetNonBlock(NULL,server.sofd);
}

/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0) {
    redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
    exit(1);
}

/* Create the Redis databases, and initialize other internal state. */
// 创建并初始化数据库结构
for (j = 0; j < server.dbnum; j++) {
    server.db[j].dict = dictCreate(&dbDictType,NULL);
    server.db[j].expires = dictCreate(&keyptrDictType,NULL);
    server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
    server.db[j].ready_keys = dictCreate(&setDictType,NULL);
    server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
    server.db[j].eviction_pool = evictionPoolAlloc();
    server.db[j].id = j;
    server.db[j].avg_ttl = 0;
}

// 创建 PUBSUB 相关结构
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);

server.cronloops = 0;
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.resident_set_size = 0;
server.lastbgsave_status = REDIS_OK;
server.aof_last_write_status = REDIS_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
updateCachedTime();

/* Create the serverCron() time event, that's our main way to process
 * background operations. */
// 为 serverCron() 创建时间事件
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    redisPanic("Can't create the serverCron time event.");
    exit(1);
}

/* Create an event handler for accepting new connections in TCP and Unix
 * domain sockets. */
// 为 TCP 连接关联连接应答(accept)处理器
// 用于接受并应答客户端的 connect() 调用
for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
            redisPanic(
                "Unrecoverable error creating server.ipfd file event.");
        }
}

// 为本地套接字关联应答处理器
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");

/* Open the AOF file if needed. */
// 如果 AOF 持久化功能已经打开,那么打开或创建一个 AOF 文件
if (server.aof_state == REDIS_AOF_ON) {
    server.aof_fd = open(server.aof_filename,
                           O_WRONLY|O_APPEND|O_CREAT,0644);
    if (server.aof_fd == -1) {
        redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
            strerror(errno));
        exit(1);
    }
}

/* 32 bit instances are limited to 4GB of address space, so if there is
 * no explicit limit in the user provided configuration we set a limit
 * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
 * useless crashes of the Redis instance for out of memory. */
// 对于 32 位实例来说,默认将最大可用内存限制在 3 GB
if (server.arch_bits == 32 && server.maxmemory == 0) {
    redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
    server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
    server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
}

// 如果服务器以 cluster 模式打开,那么初始化 cluster
if (server.cluster_enabled) clusterInit();

// 初始化复制功能有关的脚本缓存
replicationScriptCacheInit();

// 初始化脚本系统
scriptingInit();

// 初始化慢查询功能
slowlogInit();

// 初始化 BIO 系统
bioInit();
}

acceptTcpHandler 函数,创建一个 TCP 连接处理器,accept 客户端请求

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);

while(max--) {
    // accept 客户端连接
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    if (cfd == ANET_ERR) {
        if (errno != EWOULDBLOCK)
            redisLog(REDIS_WARNING,
                "Accepting client connection: %s", server.neterr);
        return;
    }
    redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
    // 为客户端创建客户端状态(redisClient)
    acceptCommonHandler(cfd,0);
   }
}

acceptCommonHandler()函数,首先创建client客户端,然后是处理客户端的请求。新添加的客户端令服务器的最大客户端数量达到了,那么向新客户端写入错误信息,并关闭新客户端

 #define MAX_ACCEPTS_PER_CALL 1000
 static void acceptCommonHandler(int fd, int flags) {
// 创建客户端
redisClient *c;
if ((c = createClient(fd)) == NULL) {
    redisLog(REDIS_WARNING,
        "Error registering fd event for the new client: %s (fd=%d)",
        strerror(errno),fd);
    close(fd); /* May be already closed, just ignore errors */
    return;
}

/* If maxclient directive is set and this is one client more... close the
 * connection. Note that we create the client instead to check before
 * for this condition, since now the socket is already set in non-blocking
 * mode and we can send an error for free using the Kernel I/O */
// 如果新添加的客户端令服务器的最大客户端数量达到了
// 那么向新客户端写入错误信息,并关闭新客户端
// 先创建客户端,再进行数量检查是为了方便地进行错误信息写入
if (listLength(server.clients) > server.maxclients) {
    char *err = "-ERR max number of clients reached\r\n";

    /* That's a best effort error message, don't check write errors */
    if (write(c->fd,err,strlen(err)) == -1) {
        /* Nothing to do, Just to avoid the warning... */
    }
    // 更新拒绝连接数
    server.stat_rejected_conn++;
    freeClient(c);
    return;
}

// 更新连接次数
server.stat_numconnections++;

// 设置 FLAG
c->flags |= flags;
}

IO模型介绍与epoll在redis的使用

考虑这样一个问题:有10000个客户端需要连上一个服务器并保持TCP连接,客户端会不定时的发送请求给服务器,服务器收到请求后需及时处理并返回结果。我们应该怎么解决?

方案一:我们使用一个线程来监听,当一个新的客户端发起连接时,建立连接并new一个线程来处理这个新连接。

缺点:当客户端数量很多时,服务端线程数过多,即便不压垮服务器,由于CPU有限其性能也极其不理想。因此此方案不可用。

redis_io_model1.png

方案二:我们使用一个线程监听,当一个新的客户端发起连接时,建立连接并使用线程池处理该连接。

优点:客户端连接数量不会压垮服务端。

缺点:服务端处理能力受限于线程池的线程数,而且如果客户端连接中大部分处于空闲状态的话服务端的线程资源被浪费。

redis_io_model2.png

因此,一个线程仅仅处理一个客户端连接无论如何都是不可接受的。那能不能一个线程处理多个连接呢?该线程轮询每个连接,如果某个连接有请求则处理请求,没有请求则处理下一个连接,这样可以实现吗?

方案三:I/O 多路复用技术

现代的UNIX操作系统提供了select/poll/kqueue/epoll这样的系统调用,这些系统调用的功能是:你告知我一批套接字,当这些套接字的可读或可写事件发生时,我通知你这些事件信息。

select

UNIX everthing is a file,套接字也不例外,每一个套接字都有对应的fd(即文件描述符)。我们简单看看这几个系统调用的原型。

select(int nfds, fd_set *r, fd_set *w, fd_set *e, struct timeval *timeout)

对于select(),我们需要传3个集合,r,w和e。其中,r表示我们对哪些fd的可读事件感兴趣,w表示我们对哪些fd的可写事件感兴趣,e表示我们对错误事件感兴趣。每个集合其实是一个bitmap,通过0/1表示我们感兴趣的fd。例如,我们对于fd为6的可读事件感兴趣,那么r集合的第6个bit需要被设置为1。这个系统调用会阻塞,直到我们感兴趣的事件(至少一个)发生。调用返回时,内核同样使用这3个集合来存放fd实际发生的事件信息。也就是说,调用前这3个集合表示我们感兴趣的事件,调用后这3个集合表示实际发生的事件。

select为最早期的UNIX系统调用,它存在4个问题:

  • 这3个bitmap有大小限制(FD_SETSIZE,通常为1024);
  • 由于这3个集合在返回时会被内核修改,因此我们每次调用时都需要重新设置;
  • 我们在调用完成后需要扫描这3个集合才能知道哪些fd的读/写事件发生了,一般情况下全量集合比较大而实际发生读/写事件的fd比较少,效率比较低下;
  • 内核在每次调用都需要扫描这3个fd集合,然后查看哪些fd的事件实际发生,在读/写比较稀疏的情况下同样存在效率问题。

poll

由于存在这些问题,于是人们对select进行了改进,从而有了poll。

poll(struct pollfd *fds, int nfds, int timeout)

struct pollfd {

int fd;/* 文件描述符 */

short events; /*等待的事件 */

short revents; /* 实际发生的事件*/

 }

具体使用列子参考 下载(实现server和client)

poll调用需要传递的是一个pollfd结构的数组,调用返回时结果信息也存放在这个数组里面。 pollfd的结构中存放着fd、我们对该fd感兴趣的事件(events)以及该fd实际发生的事件(revents)。poll传递的不是固定大小的bitmap,因此select的问题1解决了;poll将感兴趣事件和实际发生事件分开了,因此select的问题2也解决了。但select的问题3和问题4仍然没有解决。

select问题3比较容易解决,只要系统调用返回的是实际发生相应事件的fd集合,我们便不需要扫描全量的fd集合。

对于select的问题4,我们为什么需要每次调用都传递全量的fd呢?内核可不可以在第一次调用的时候记录这些fd,然后我们在以后的调用中不需要再传这些fd呢?

问题的关键在于无状态。对于每一次系统调用,内核不会记录下任何信息,所以每次调用都需要重复传递相同信息。

上帝说要有状态,所以我们有了epoll和kqueue。

epoll和kqueue

epoll与kqueue类似,只不过kqueue是用在FreeBSD(FreeBSD是一种自由的类Unix操作系统)上面。
在redis里面也有类似的宏定义,在ae.c中

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
     #ifdef HAVE_EPOLL
     #include "ae_epoll.c"
  #else
      #ifdef HAVE_KQUEUE
      #include "ae_kqueue.c"
      #else
      #include "ae_select.c"
      #endif
   #endif
#endif

epoll主要使用以下三个函数

int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核需要监听的数目一共有多大。当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close() 关闭,否则可能导致fd被耗尽。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

epoll的事件注册函数,第一个参数是 epoll_create() 的返回值,第二个参数表示动作,使用如下三个宏来表示:

EPOLL_CTL_ADD    //注册新的fd到epfd中;
EPOLL_CTL_MOD    //修改已经注册的fd的监听事件;
EPOLL_CTL_DEL    //从epfd中删除一个fd;

第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event 结构如下:

typedef union epoll_data
{
    void        *ptr;
    int          fd;
    __uint32_t   u32;
    __uint64_t   u64;
} epoll_data_t;

struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
};

events 可以是以下几个宏的集合:

EPOLLIN     //表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT    //表示对应的文件描述符可以写;
EPOLLPRI    //表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR    //表示对应的文件描述符发生错误;
EPOLLHUP    //表示对应的文件描述符被挂断;
EPOLLET     //将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT//只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数events用来从内核得到事件的集合,maxevents 告之内核这个events有多大,这个 maxevents 的值不能大于创建 epoll_create() 时的size,
参数 timeout 是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

EPOLL事件有两种模型 Level Triggered (LT)Edge Triggered (ET)

LT(level triggered,水平触发模式)是缺省的工作方式,并且同时支持 block 和 non-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。

ET(edge-triggered,边缘触发模式)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,等到下次有新的数据进来的时候才会再次出发就绪事件。

一个例子见 下载

epoll的优点

  1. epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于 2048, 一般来说这个数目和系统内存关系很大 ,具体数目可以 cat /proc/sys/fs/file-max 察看。

  2. 效率提升, Epoll 最大的优点就在于它只管你“活跃”的连接 ,而跟连接总数无关,因此在实际的网络环境中, Epoll 的效率就会远远高于 select 和 poll 。

  3. 内存拷贝, Epoll 在这点上使用了“共享内存 ”,这个内存拷贝也省略了

epoll与redis

我们下面来看下epoll在redis源码中的使用

redis_epoll.png

总结

通过上面,我们了解了redis的启动过程,以及常见的IO模型。redis使用了最简单的方式实现了最高的并发,极简原则。。

推荐资料

Redis的设计与实现

Redis 3.0代码注释(Redis的设计与实现作者实现)

《Redis Command Reference》全文的中文翻译版

官场书屋二维码

小额赞赏

000
评论