redis 源代码阅读与学习笔记(四)
接收发缓存区处理
本章关注 redis 5.0 如何处理接收发数据
相关数据结构
/* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
// ... 其他无关字段略 ...
sds querybuf; /* Buffer we use to accumulate client queries. */
// ... 其他无关字段略 ...
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
// ... 其他无关字段略 ...
list *reply; /* List of reply objects to send to the client. */
// ... 其他无关字段略 ...
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
字段名 | 说明 |
---|---|
querybuf | 接收缓存区 |
argc | 解析接收缓存区中命令时,记录当前命令参数个数 |
argv | 解析接收缓存区中命令时,记录当前命令参数 |
reply | 待发送消息列表 |
buf | 正在发送缓冲区 |
struct client
结构体中,还有一些字段,用于辅助记录当前处理中的位置信息、统计信息等,就不列上去了
避免分散注意力、或增加理解难度
struct redisServer {
// ... 其他无关字段略 ...
list *clients_pending_write; /* There is to write or install handler. */
// ... 其他无关字段略 ...
};
字段名 | 说明 |
---|---|
clients_pending_write | 有数据待发送客户端对象列表 |
接收缓存区处理过程
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
// ... 其他无关代码略 ...
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
// ... 其他无关代码略 ...
processInputBufferAndReplicate(c);
}
-
c->querybuf
是 sds 简单动态字符串,就是一个可以动态扩展的 buff -
sdsMakeRoomFor
函数根据输入的readlen
,看情况动态扩展缓存区 -
read
从fd
中读取网络数据 -
processInputBufferAndReplicate
函数内,处理解析消息等
发送缓冲区处理过程
void beforeSleep(struct aeEventLoop *eventLoop) {
// ... 其他无关代码略 ...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
// ... 其他无关代码略 ...
}
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
// ... 其他无关代码略 ...
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyStringToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
- 服务器主循环每次
epoll
返回会调用beforeSleep
-
handleClientsWithPendingWrites
函数内,会遍历待发送数据的客户端列表clients_pending_write
-
writeToClient
是实际发送过程,代码有点长,没贴上去- 先
c->buf
中数据发送 - 再
c->reply
中也有数据,继续发送c->reply
中的
- 先
-
addReply
先丢进c->buf
消息合批,因为c->buf
大小固定,塞不下,就丢进c->reply
其他问题
sdsMakeRoomFor
函数是可能内存扩展的,但是readQueryFromClient
中未处理内存满时sdsMakeRoomFor
执行失败的情况;会 crash
官方 github 上,也有很多人遇到该问题,诸如:
-
https://github.com/antirez/redis/issues/4059
For some reason or another, your instance is using over 1 Gb of memory.
With only 2 Gb of main memory and your system requiring some of it as well, you simply run out of more to use for Redis.
Reduce memory usage, increase memory or set a proper memory limit for Redis.
即目前线上 redis 不能让其内存满负荷,满负荷的话,一定会 crash !!!
上一篇: Mac无法开机怎么办?苹果Mac重置SMC方法详细介绍
下一篇: day1_作业(账户登录检测)
推荐阅读
-
redis 源代码阅读与学习笔记(四)
-
redis 学习笔记 (四)持久化RDB
-
微服务架构实战学习笔记 第四章 Spring Cloud Netflix Ribbon与负载均衡
-
个人学习笔记--数据结构与算法--二叉树(四)
-
ASP.Net MVC开发基础学习笔记:四、校验、AJAX与过滤器
-
《Linux命令行与shell脚本编程大全》 第四章 学习笔记
-
《HTML5 与CSS3 权威指南》(第四版 上册)学习笔记
-
Docker学习笔记(四)-docker中的网络与存储
-
python学习笔记(四)之构造与析构函数(三)
-
微服务架构实战学习笔记 第四章 Spring Cloud Netflix Ribbon与负载均衡