Redis命令处理过程源码解析

本文基于社区版redis 4.0.8

1、命令解析

redis服务器接收到的命令请求首先存储在客户端对象的querybuf输入缓冲区,然后解析命令请求的各个参数,并存储在客户端对象的argv和argc字段。

客户端解析命令请求的入口函数为readqueryfromclient,会读取socket数据存储到客户端对象的输入缓冲区,并调用函数processinputbuffer解析命令请求。

注:内联命令:使用telnet会话输入命令的方式

void processinputbuffer(client *c) {
    ......
    //循环遍历输入缓冲区,获取命令参数,调用processmultibulkbuffer解析命令参数和长度
    while(sdslen(c->querybuf)) {
        if (c->reqtype == proto_req_inline) {
            if (processinlinebuffer(c) != c_ok) break;//处理telnet方式的内联命令
        } else if (c->reqtype == proto_req_multibulk) {
            if (processmultibulkbuffer(c) != c_ok) break; //解析命令参数和长度暂存到客户端结构体中
        } else {
            serverpanic("unknown request type");
        }
    }    
}

//解析命令参数和长度暂存到客户端结构体中
int processmultibulkbuffer(client *c) {
    //定位到行尾
    newline = strchr(c->querybuf,'\r');
    //解析命令请求参数数目,并存储在客户端对象的c->multibulklen字段
    serverassertwithinfo(c,null,c->querybuf[0] == '*');
    ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
    c->multibulklen = ll;
    pos = (newline-c->querybuf)+2;//记录已解析命令的请求长度resp的长度
    /* setup argv array on client structure */
    //分配请求参数存储空间
    c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
    
    // 开始循环解析每个请求参数
    while(c->multibulklen) {
        ......
        newline = strchr(c->querybuf+pos,'\r');
        if (c->querybuf[pos] != '$') {
            return c_err;
        ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
        pos += newline-(c->querybuf+pos)+2;
        c->bulklen = ll;//字符串参数长度暂存在客户端对象的bulklen字段
        
        //读取该长度的参数内容,并创建字符串对象,同时更新待解析参数multibulklen
        c->argv[c->argc++] =createstringobject(c->querybuf+pos,c->bulklen);
        pos += c->bulklen+2;
        c->multibulklen--;
    }

2、命令调用

当multibulklen的值更新为0时,表示参数解析完成,开始调用processcommand来处理命令,处理命令前有很多校验逻辑,如下:

void processinputbuffer(client *c) {
    
    ......
     //调用processcommand来处理命令
     if (processcommand(c) == c_ok) {
         ......
     }
}

//处理命令函数
int processcommand(client *c) {
    //校验是否是quit命令
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addreply(c,shared.ok);
        c->flags |= client_close_after_reply;
        return c_err;
    }
    //调用lookupcommand,查看该命令是否存在
    c->cmd = c->lastcmd = lookupcommand(c->argv[0]->ptr);
    if (!c->cmd) {
        flagtransaction(c);
        addreplyerrorformat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return c_ok;
    //检查用户权限
    if (server.requirepass && !c->authenticated && c->cmd->proc != authcommand)
    {
        addreply(c,shared.noautherr);
    //还有很多检查,不一一列举,比如集群/持久化/复制等
    /* 真正执行命令 */
    if (c->flags & client_multi &&
        c->cmd->proc != execcommand && c->cmd->proc != discardcommand &&
        c->cmd->proc != multicommand && c->cmd->proc != watchcommand)
        queuemulticommand(c);
        //将结果写入outbuffer
        addreply(c,shared.queued);
    } 
// 调用execcommand执行命令
void execcommand(client *c) {
    call(c,cmd_call_full);//调用call执行命令
//调用execcommand调用call执行命令
void call(client *c, int flags) {
    start = ustime();
    c->cmd->proc(c);//执行命令
    duration = ustime()-start;
    //如果是慢查询,记录慢查询
    if (flags & cmd_call_slowlog && c->cmd->proc != execcommand) {
        char *latency_event = (c->cmd->flags & cmd_fast) ?
                              "fast-command" : "command";
        latencyaddsampleifneeded(latency_event,duration/1000);
        //记录到慢日志中
        slowlogpushentryifneeded(c,c->argv,c->argc,duration);
    //更新统计信息:当前命令执行时间和调用次数
    if (flags & cmd_call_stats) {
        c->lastcmd->microseconds += duration;
        c->lastcmd->calls++;

3、返回结果

redis返回结果并不是直接返回给客户端,而是先写入到输出缓冲区(buf字段)或者输出链表(reply字段)

int processcommand(client *c) {
    ......
    //将结果写入outbuffer
    addreply(c,shared.queued);
    ......
    
}
//将结果写入outbuffer
void addreply(client *c, robj *obj) {
    //调用listaddnodehead将客户端添加到服务端结构体的client_pending_write链表,以便后续能快速查找出哪些客户端有数据需要发送
    if (prepareclienttowrite(c) != c_ok) return;
    
    //然后添加字符串到输出缓冲区
    if (_addreplytobuffer(c,obj->ptr,sdslen(obj->ptr)) != c_ok)
        //如果添加失败,则添加到输出链表中
        _addreplyobjecttolist(c,obj); 
}

addreply函数只是将待发送给客户端的数据暂存在输出链表或者输出缓冲区,那么什么时候将这些数据发送给客户端呢?答案是开启事件循环时,调用的beforesleep函数,该函数专门执行一些不是很费时的操作,如过期键删除,向客户端返回命令回复等

void beforesleep(struct aeeventloop *eventloop) {
    ......
     /* handle writes with pending output buffers. */
    handleclientswithpendingwrites();
}

//回复客户端命令函数
int handleclientswithpendingwrites(void) {
    listiter li;
    listnode *ln;
    int processed = listlength(server.clients_pending_write);
    listrewind(server.clients_pending_write,&li);
    while((ln = listnext(&li))) {
        client *c = listnodevalue(ln);
        c->flags &= ~client_pending_write;
        listdelnode(server.clients_pending_write,ln);
        /* 发送客户端数据 */
        if (writetoclient(c->fd,c,0) == c_err) continue;
        /* if there is nothing left, do nothing. otherwise install
         * the write handler. */
         //如果数据量很大,一次性没有发送完成,则进行添加文件事件,监听当前客户端socket文件描述符的可写事件即可
        if (clienthaspendingreplies(c) &&
            aecreatefileevent(server.el, c->fd, ae_writable,
                sendreplytoclient, c) == ae_err)
        {
            freeclientasync(c);
        }
    }
    return processed;

到这里,命令请求才算真正处理完成了。

到此这篇关于redis命令处理过程源码解析的文章就介绍到这了,更多相关redis命令处理内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐