Author: 仲肥
redis的aof持久化本质上是一个redo log,把所有执行过的写命令追加到aof文件中。那么随着redis的运行,aof文件会不断膨胀,当触发收缩条件时就要做aofrewrite。
redis是通过fork子进程来做aofrewrite,同时为了保证aof的连续性,父进程把aofrewrite期间的写命令缓存起来,等收割完子进程之后再追加到新的aof文件。如果期间写入量较大的话收割时就要有大量的写磁盘操作,造成性能下降。
为了提高aofrewrite效率,redis通过在父子进程间建立管道,把aofrewrite期间的写命令通过管道同步给子进程,追加写盘的操作也就转交给了子进程。
上图是aofrewrite的流程,标注为基本的函数调用关系。
1 - 首先,通过命令或是事件触发aofrewrite,调用rewriteAppendOnlyFileBackground()函数
2 - 父进程记录子进程的pid并开始缓存写命令
3 - 子进程调用rewriteAppendOnlyFile(tmpfile)函数创建新的aof文件
4 - 子进程退出后父进程调用backgroundRewriteDoneHandler()来处理
在aofrewrite过程中,如果redis本身数据量较大子进程执行时间较长,或者写入流量较高,就会导致aof-rewrite-buffer积攒较多,父进程就要进行大量写磁盘操作,这对于redis来说显然是不够高效的。
为了提高aofrewrite效率,redis使用pipe来优化,下图中红色标注即为优化的部分:
优化点:
1 - 父进程建立管道
2 - 父进程向管道写数据
3 - 子进程从管道读数据
4 - 父子进程交互
细心的读者会发现,aofRewriteBufferAppend()和aofRewriteBufferWrite()这一对函数仍然保留,父进程还是要把aof-rewrite-buffer写盘吗?是的,这是因为父子进程是异步结构,父子间总会有那么一点代沟,aof-rewrite-buffer还是需要保留的,不过这个时候父进程写盘的数据量就很小了,几乎可以忽略。
命令的触发不必详述,主要来看下serverCron的触发:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/* Trigger an AOF rewrite if needed */
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
...
}
也就是说aof文件大小超过了server.aof_rewrite_min_size,并且增长率大于server.aof_rewrite_perc时就会触发(增长率计算的基数server.aof_rewrite_base_size是上次aofrewrite完之后aof文件的大小)。
目前云redis设置server.aof_rewrite_min_size为内存规格的1/4,server.aof_rewrite_perc为100。
aofrewrite触发之后进入rewriteAppendOnlyFileBackground()函数:
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
...
OK,重点来了,在fork之前调用了aofCreatePipes()函数来创建管道(openChildInfoPipe()函数只是用来收集子进程copy-on-write用到的内存,就不详细展开了):
int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
if (pipe(fds) == -1) goto error; /* parent -> children data. 父进程向子进程写数据的管道*/
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. 子进程向父进程发起停止传输的控制管道*/
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. 父进程向子进程回复的控制管道*/
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
//注册读事件处理函数,负责处理子进程要求停止数据传输的消息
server.aof_pipe_write_data_to_child = fds[1]; //父进程向子进程写数据的fd
server.aof_pipe_read_data_from_parent = fds[0]; //子进程从父进程读数据的fd
server.aof_pipe_write_ack_to_parent = fds[3]; //子进程向父进程发起停止消息的fd
server.aof_pipe_read_ack_from_child = fds[2]; //父进程从子进程读取停止消息的fd
server.aof_pipe_write_ack_to_child = fds[5]; //父进程向子进程回复消息的fd
server.aof_pipe_read_ack_from_parent = fds[4]; //子进程从父进程读取回复消息的fd
server.aof_stop_sending_diff = 0; //是否停止管道传输标记位
return C_OK;
...
}
管道建立起来了我们再来看看fork之后父进程和子进程如何工作,首先看下父进程:
/* Parent */
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
...
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.aof_rewrite_buf will start
* with a SELECT statement and it will be safe to merge. */
server.aof_selected_db = -1;
...
父进程这里做的事情并不多,主要是信息的记录和一些标记位设置
接下来就是缓存写命令和管道通信部分了,入口是在feedAppendOnlyFile():
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
...
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
...
}
server.aof_child_pid在这时就生效了,开始缓存写命令:
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
if (len) { /* First block to allocate, or need another block. */
int numblocks;
block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);
/* Log every time we cross more 10 or 100 blocks, respectively
* as a notice or warning. */
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
redis用链表server.aof_rewrite_buf_blocks来缓存aofrewrite期间的写命令,链表的每个节点最大10MB;重点是在最后的写事件注册,当server.aof_pipe_write_data_to_child这个fd没有注册事件时,就注册写事件函数aofChildWriteDiffData:
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
...
while(1) {
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
if (block->used > 0) {
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
block->free += nwritten;
}
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
每次事件循环都会把server.aof_rewrite_buf_blocks积攒的写命令全部同步给子进程,除非server.aof_stop_sending_diff被设置了停止标记。
接下来看下子进程:
...
/* Child */
char tmpfile[256];
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
...
exitFromChild(0);
} else {
exitFromChild(1);
}
...
子进程首先关闭监听端口,然后就进入rewriteAppendOnlyFile()函数:
int rewriteAppendOnlyFile(char *filename) {
...
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
...
server.aof_child_diff = sdsempty();
...
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
...
首先打开一个临时aof文件,并初始化server.aof_child_diff缓存准备从父进程读数据,然后就调用rewriteAppendOnlyFileRio()来写aof文件和读取管道中的数据:
int rewriteAppendOnlyFileRio(rio *aof) {
...
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
processed = aof->processed_bytes;
aofReadDiffFromParent();
}
...
}
在遍历redis把key-value写入新aof文件过程中,新aof文件每增长10K就会调用aofReadDiffFromParent()从管道中读取数据追加到server.aof_child_diff:
ssize_t aofReadDiffFromParent(void) {
char buf[65536]; /* Default pipe buffer size on most Linux systems. */
ssize_t nread, total = 0;
while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
return total;
}
子进程在遍历完redis生成好新的aof文件之后就要准备退出了,那么退出前要先告诉父进程停止管道传输,依然回到rewriteAppendOnlyFile()函数来看:
int rewriteAppendOnlyFile(char *filename) {
...
/* Ask the master to stop sending diffs. */
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
/* We read the ACK from the server using a 10 seconds timeout. Normally
* it should reply ASAP, but just in case we lose its reply, we are sure
* the child will eventually get terminated. */
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
/* Read the final diff if any. */
aofReadDiffFromParent();
/* Write the received diff to the file. */
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
...
}
这里写的就很直接了:
那么父进程是如何处理”!”的呢,还记得之前注册的读事件aofChildPipeReadable()吧,子进程向控制管道发送”!”就会激活:
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
char byte;
...
if (read(fd,&byte,1) == 1 && byte == '!') {
serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
server.aof_stop_sending_diff = 1;
if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
/* If we can't send the ack, inform the user, but don't try again
* since in the other side the children will use a timeout if the
* kernel can't buffer our write, or, the children was
* terminated. */
serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
strerror(errno));
}
}
/* Remove the handler since this can be called only one time during a
* rewrite. */
aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}
很简单,标记server.aof_stop_sending_diff=1,给子进程回复”!”,并且把自己从事件循环删掉,自此父子进程间通信完成,剩下的就是父进程等待子进程退出进行收尾工作。
serverCron()中会调用wait3()来收割子进程:
/* Check if a background saving or AOF rewrite in progress terminated. */
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == -1) {
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
} else if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
updateDictResizePolicy();
closeChildInfoPipe();
}
如果收割到的pid是server.aof_child_pid就进入backgroundRewriteDoneHandler():
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
...
/* Flush the differences accumulated by the parent to the
* rewritten AOF. */
latencyStartMonitor(latency);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
serverLog(LL_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
首先会打开子进程生成的新aof文件,并调用aofRewriteBufferWrite()把server.aof_rewrite_buf_blocks中剩余的数据追加到新aof文件。
/* Rename the temporary file. This will not unlink the target file if
* it exists, because we reference it with "oldfd". */
latencyStartMonitor(latency);
if (rename(tmpfile,server.aof_filename) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
server.aof_filename,
strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename",latency);
之后把新aof文件rename为server.aof_filename记录的文件名。
/* Asynchronously close the overwritten AOF. */
if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
使用bio后台线程来close原来的aof文件。
cleanup:
aofClosePipes();
aofRewriteBufferReset();
aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1;
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_rewrite_scheduled = 1;
最后是清理工作,包括关闭管道、重置aof-rewrite-buffer、复位server.aof_child_pid=-1等,自此aofrewrite完成。
本文介绍了redis的aofrewrite基础实现以及利用pipe的优化,云Redis4.0已经上线,欢迎使用。