Redis6.0.8源码解析六之执行set指令时,做了啥?
服务端
建立服务器连接套接字
server.initServer
这个方法分别创建了ipv4/v6二个套接字,且设置套接字为非阻塞,并存储fds中
listenToPort
int listenToPort(int port, int *fds, int *count) {
int j;
/* Force binding of 0.0.0.0 if no bind address is specified, always
* entering the loop if j == 0. */
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
if (server.bindaddr[j] == NULL) {
int unsupported = 0;
/* Bind * for both IPv6 and IPv4, we enter here only if
* server.bindaddr_count == 0. */
//创建并绑定监听连接的fd,并记录到fds
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
server.tcp_backlog);
//若创建成功
if (fds[*count] != ANET_ERR) {
//将fd设置非阻塞
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
serverLog(LL_WARNING,"Not listening to IPv6: unsupported");
}
if (*count == 1 || unsupported) {
/* Bind the IPv4 address as well. */
//绑定ipv4
fds[*count] = anetTcpServer(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
//将fd设置非阻塞
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
serverLog(LL_WARNING,"Not listening to IPv4: unsupported");
}
}
/* Exit the loop if we were able to bind * on IPv4 and IPv6,
* otherwise fds[*count] will be ANET_ERR and we'll print an
* error and return to the caller with an error. */
if (*count + unsupported == 2) break;
} else if (strchr(server.bindaddr[j],':')) {
/* Bind IPv6 address. */
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
} else {
/* Bind IPv4 address. */
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
}
if (fds[*count] == ANET_ERR) {
serverLog(LL_WARNING,
"Could not create server TCP listening socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
port, server.neterr);
if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT ||
errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
continue;
return C_ERR;
}
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
return C_OK;
}
创建并绑定监听套接字连接
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
}
/**
* 创建并绑定监听套接字连接
* @param err
* @param port
* @param bindaddr
* @param af
* @param backlog
* @return 接收连接的fd
*/
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
int s = -1, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
//将port按照%d格式,截取6个复制到port中
snprintf(_port,6,"%d",port);
//将hints置0
memset(&hints,0,sizeof(hints));
//设置ipv6地址协议
hints.ai_family = af;
//数据协议为流式协议
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
//处理名字到地址以及服务到端口这两种转换,返回一个addrinfo的结构指针,servinfo为传出参数
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
//创建socket服务器套接字
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
//设置端口复用
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
//绑定并创建监听套接字
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
goto end;
}
if (p == NULL) {
anetSetError(err, "unable to bind socket, errno: %d", errno);
goto error;
}
error:
if (s != -1) close(s);
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
设置端口复用
/**
* 设置端口复用
* @param err
* @param fd
* @return
*/
static int anetSetReuseAddr(char *err, int fd) {
int yes = 1;
/* Make sure connection-intensive things like the redis benchmark
* will be able to close/open sockets a zillion of times */
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) {
anetSetError(err, "setsockopt SO_REUSEADDR: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
绑定并创建监听套接字
/**
* 绑定并创建监听套接字
* @param err
* @param s
* @param sa
* @param len
* @param backlog
* @return
*/
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}
if (listen(s, backlog) == -1) {
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}
fd创建好后,会将fd设置非阻塞
anet.anetNonBlock
/**
* 将fd设置非阻塞
* @param err
* @param fd
* @return
*/
int anetNonBlock(char *err, int fd) {
return anetSetBlock(err,fd,1);
}
/**
* 设置fd是否非阻塞
* @param err
* @param fd
* @return
*/
int anetSetBlock(char *err, int fd, int non_block) {
int flags;
/* Set the socket blocking (if non_block is zero) or non-blocking.
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
* interrupted by a signal. */
//获取fd属性信息
if ((flags = fcntl(fd, F_GETFL)) == -1) {
anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
return ANET_ERR;
}
if (non_block)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) == -1) {
anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
紧接着就是创建ipv4的套接字,跟上述一样,只是绑定的ip协议不一样而已.还有一步是创建本地套接字,也跟上述一样.
设置监听文件事件,以及对应事件执行函数
socket创建好了,因为我在windows测试的,这里采用的是select模型,对应的需要为每个fd设置监听的事件,以及回调函数,select模型案例,大家可以参考我的这篇文章.点击这里
因为之前分别创建了ipv4/v6二个套接字文件描述符,所以这里会分别注册2次监听AE_READABLE,当事件到达执行acceptTcpHandler
server.initServer
ae.aeCreateFileEvent
这里需要注意的是
&eventLoop->events[fd]指向的数据全部都置0和NULL了
/*
* 根据 mask 参数的值,监听 fd 文件的状态,
* 当 fd 可用时,执行 proc 函数
*/
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];
// 监听指定 fd 的指定事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置监听的文件事件类型
fe->mask |= mask;
//设置监听事件触发时执行的处理器
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
//更新最大fd,主要用于后面遍历0~maxfd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
监听指定 fd 的指定事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
return 0;
}
在这里需要注意这个aeApiState 结构,这个结构分别存储了读写文件描述符的监听集合,且_rfds和_wfds作为备份.
typedef struct aeApiState {
fd_set rfds, wfds;
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */
fd_set _rfds, _wfds;
} aeApiState;
接受客户端连接
ae.aeProcessEvents
ae_select
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
//分别备份,读写文件描述符的监听集合
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
//这里就是熟悉的select了
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
//说明有事件发生.
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
//将发生的事件的fd以及事件类型,记录就绪队列中
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
处理客户端连接
ae.aeProcessEvents
//处理事件到达
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0;
int invert = fe->mask & AE_BARRIER;
//处理读事件
if (!invert && fe->mask & mask & AE_READABLE) {
//调用设置的回调函数
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
//确保读写事件只能执行一次
fired++;
//再次获取fe,防止扩展大小,造成指针错误
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
//处理写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
执行处理连接的回调函数
networking
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
//客户端连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 为客户端创建客户端状态(redisClient)
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
创建客户端连接文件描述符
anet
/**
* 客户端连接,返回fd
* @param err
* @param s
* @param ip
* @param ip_len
* @param port
* @return
*/
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;
//分别对ipv4/6ip以及port校验处理
if (sa.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&sa;
if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
if (port) *port = ntohs(s->sin_port);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
if (port) *port = ntohs(s->sin6_port);
}
return fd;
}
创建客户端
networking
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c; // 创建客户端
char conninfo[100];
UNUSED(ip);
if (connGetState(conn) != CONN_STATE_ACCEPTING) {
serverLog(LL_VERBOSE,
"Accepted client connection in error state: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn);
return;
}
/* Limit the number of connections we take at the same time.
*
* Admission control will happen before a client is created and connAccept()
* called, because we don't want to even start transport-level negotiation
* if rejected. */
// 如果新添加的客户端令服务器的最大客户端数量达到了
// 那么向新客户端写入错误信息,并关闭新客户端
// 先创建客户端,再进行数量检查是为了方便地进行错误信息写入
if (listLength(server.clients) + getClusterConnectionsCount()
>= server.maxclients)
{
char *err;
if (server.cluster_enabled)
err = "-ERR max number of clients + cluster "
"connections reached\r\n";
else
err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors.
* Note that for TLS connections, no handshake was done yet so nothing
* is written and the connection will just drop. */
if (connWrite(conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
// 更新拒绝连接数
server.stat_rejected_conn++;
//关闭客户端连接
connClose(conn);
return;
}
/* Create connection and client */
//创建客户端连接
if ((c = createClient(conn)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
/* Last chance to keep flags */
// 设置 FLAG
c->flags |= flags;
/* Initiate accept.
*
* Note that connAccept() is free to do two things here:
* 1. Call clientAcceptHandler() immediately;
* 2. Schedule a future call to clientAcceptHandler().
*
* Because of that, we must do nothing else afterwards.
*/
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
char conninfo[100];
if (connGetState(conn) == CONN_STATE_ERROR)
serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
freeClient(connGetPrivateData(conn));
return;
}
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
// 当 conn 不为 null 时,创建带网络连接的客户端
// 如果 conn 为 null ,那么创建无网络连接的伪客户端
// 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时
// 需要用到这种伪终端
if (conn) {
// 非阻塞
connNonBlock(conn);
// 禁用 Nagle 算法
connEnableTcpNoDelay(conn);
// 设置 keep alive
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
// 绑定读事件到事件 loop (开始接收命令请求),以及设置回调函数readQueryFromClient
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
// 初始化各个属性
// 默认数据库
selectDb(c,0);
uint64_t client_id = ++server.next_client_id;
c->id = client_id;
c->resp = 2;
c->conn = conn;
// 名字
c->name = NULL;
// 回复缓冲区的偏移量
c->bufpos = 0;
c->qb_pos = 0;
// 查询缓冲区
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
// 查询缓冲区峰值
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
c->argv = NULL;
c->cmd = c->lastcmd = NULL;
c->user = DefaultUser;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->ctime = c->lastinteraction = server.unixtime;
/* If the default user does not require authentication, the user is
* directly authenticated. */
c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&
!(c->user->flags & USER_FLAG_DISABLED);
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
c->slave_ip[0] = '\0';
c->slave_capa = SLAVE_CAPA_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE;
c->bpop.timeout = 0;
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
c->bpop.xread_group_noack = 0;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
c->pubsub_patterns = listCreate();
c->peerid = NULL;
c->client_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
c->client_cron_last_memory_usage = 0;
c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (conn) linkClient(c); //c添加到链表尾部
initClientMultiState(c);
// 返回客户端
return c;
}
为新建立连接的fd,绑定监听读事件,以及事件处理函数.主要用于客户端与服务端的交互
connection
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
//设置读事件发生时,处理函数
return conn->type->set_read_handler(conn, func);
}
也就是在这一步为客户端连接文件描述符监听读事件,以及设置了读事件到达时的回调函数.
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else //注册监听读事件
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
接收客户端发来命令
connection.connSocketEventHandler
本文地址:https://blog.csdn.net/Nuan_Feng/article/details/109010203
上一篇: r2p2之DPDK学习记录(2)
下一篇: Arduino基础学习笔记