欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

redis源代码分析23–VM(上)

程序员文章站 2022-04-08 15:34:40
...

VM是Redis2.0新增的一个功能。在没有VM之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入VM功能来试图解决这个问题。简言之,VM使得redis会把很少

VM是Redis2.0新增的一个功能。在没有VM之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入VM功能来试图解决这个问题。简言之,VM使得redis会把很少访问的value保存到磁盘中。但同时,所有value的key都放在内存中,这是为了让被换出的value的查找在启用VM前后性能差不多。

VM在redis中算是redis中最复杂的模块之一,我们分三节来介绍。这一节介绍redis的主要数据结构,下一节介绍非阻塞方式,最后一节介绍多线程方式。

我们先来看看redis中的通用对象结构redisObject :

// VM启用时, 对象所处位置
#define REDIS_VM_MEMORY 0       /* The object is on memory */
#define REDIS_VM_SWAPPED 1      /* The object is on disk */
#define REDIS_VM_SWAPPING 2     /* Redis is swapping this object on disk */
#define REDIS_VM_LOADING 3      /* Redis is loading this object from disk */
/* The VM object structure */
struct redisObjectVM {
    off_t page;         /* the page at witch the object is stored on disk */
    off_t usedpages;    /* number of pages used on disk */
    time_t atime;       /* Last access time */
} vm;
/* The actual Redis Object */
// 通用类型
// 对于key,需额外标志保存value的位置、类型等
typedef struct redisObject {
    void *ptr;
    unsigned char type;
    unsigned char encoding;
    unsigned char storage;  /* If this object is a key, where is the value?
                             * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
    unsigned char vtype; /* If this object is a key, and value is swapped out,
                          * this is the type of the swapped out object. */
    int refcount;
    /* VM fields, this are only allocated if VM is active, otherwise the
     * object allocation function will just allocate
     * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
     * Redis without VM active will not have any overhead. */
    struct redisObjectVM vm;
} robj;

robj 中的type保存了对象的类型,如string、list、set等。storage保存了该key对象对应的value所处的位置:内存、磁盘、正在被换出到磁盘,正在加载。vtype表示该key对象所对应的value的类型。page和usedpages保存了该key对象所对应的 value,atime是value的最后一次访问时间。因此,当robj所表示的key对象的storage类型为REDIS_VM_SWAPPED 时,就表示该key的value已不在内存中,需从VM中page的位置加载该value,vaue的类型为vtype,大小为usedpages。

创建对象的时候,根据是否启用VM机制,来分配合适大小的robj对象大小。

static robj *createObject(int type, void *ptr) {
   ---
   else {
        if (server.vm_enabled) {
            pthread_mutex_unlock(&server.obj_freelist_mutex);
            o = zmalloc(sizeof(*o));
        } else {
            o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
        }
    }
    ---
    if (server.vm_enabled) {
        /* Note that this code may run in the context of an I/O thread
         * and accessing to server.unixtime in theory is an error
         * (no locks). But in practice this is safe, and even if we read
         * garbage Redis will not fail, as it's just a statistical info */
        o->vm.atime = server.unixtime;
        o->storage = REDIS_VM_MEMORY;
    }
    return o;
}

VM的所有相关结构保存在redisServer 的如下几个字段中。

 /* Global server state structure */
struct redisServer {
    ---
    /* Virtual memory state */
    FILE *vm_fp;
    int vm_fd;
    off_t vm_next_page; /* Next probably empty page */
    off_t vm_near_pages; /* Number of pages allocated sequentially */
    unsigned char *vm_bitmap; /* Bitmap of free/used pages */
    time_t unixtime;    /* Unix time sampled every second. */
    /* Virtual memory I/O threads stuff */
    /* An I/O thread process an element taken from the io_jobs queue and
     * put the result of the operation in the io_done list. While the
     * job is being processed, it's put on io_processing queue. */
    list *io_newjobs; /* List of VM I/O jobs yet to be processed */
    list *io_processing; /* List of VM I/O jobs being processed */
    list *io_processed; /* List of VM I/O jobs already processed */
    list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
    pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
    pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
    pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
    pthread_attr_t io_threads_attr; /* attributes for threads creation */
    int io_active_threads; /* Number of running I/O threads */
    int vm_max_threads; /* Max number of I/O threads running at the same time */
    /* Our main thread is blocked on the event loop, locking for sockets ready
     * to be read or written, so when a threaded I/O operation is ready to be
     * processed by the main thread, the I/O thread will use a unix pipe to
     * awake the main thread. The followings are the two pipe FDs. */
    int io_ready_pipe_read;
    int io_ready_pipe_write;
    /* Virtual memory stats */
    unsigned long long vm_stats_used_pages;
    unsigned long long vm_stats_swapped_objects;
    unsigned long long vm_stats_swapouts;
    unsigned long long vm_stats_swapins;
   ---
};

vm_fp 和vm_fd指向磁盘上的vm文件,通过这两个指针来读写vm文件。vm_bitmap管理着vm文件中每一页的分配与释放情况(每一项为0表示该页空闲,为1表示已使用)。每一页的大小通过vm-page-size来配置,页数通过vm-pages来配置。值得一提的是,redis中的每一页最多只能放置一个对象,一个对象可以放在连续的多个页上。unixtime只是缓存时间值,这在计算value的最近使用频率时会用到。接下来的结构跟多线程方式换出/换进vlue有关。使用多线程方式时,换进/换出value被看成一个个的job,job的类型有如下几种:

/* VM threaded I/O request message */
#define REDIS_IOJOB_LOAD 0          /* Load from disk to memory */
#define REDIS_IOJOB_PREPARE_SWAP 1  /* Compute needed pages */
#define REDIS_IOJOB_DO_SWAP 2       /* Swap from memory to disk */
typedef struct iojob {
    int type;   /* Request type, REDIS_IOJOB_* */
    redisDb *db;/* Redis database */
    robj *key;  /* This I/O request is about swapping this key */
    robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
                 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
    off_t page; /* Swap page where to read/write the object */
    off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
    int canceled; /* True if this command was canceled by blocking side of VM */
    pthread_t thread; /* ID of the thread processing this entry */
} iojob;

类型为REDIS_IOJOB_LOAD的job用来加载某个value,类型为REDIS_IOJOB_DO_SWAP的job就用来换出某个 value,在换出value之前,需要创建类型为REDIS_IOJOB_PREPARE_SWAP的job来计算所需的交换页数。

无论是上述3种中的哪一种,新建的job都会使用queueIOJob放在io_newjobs队列中,而线程入口函数IOThreadEntryPoint 会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。

io_ready_clients保存了可以继续运行的client链表(之前因为等待value已阻塞),后面几个结构跟多线程的保护和全局的vm统计有关。

VM的初始化在vmInit中,主要做的工作就是上面介绍的几个结构的初始化。除此之外,最重要的工作就是设置管道的read事件的处理函数vmThreadedIOCompletedJob,该函数会在管道可读时运行,跟多线程的运行密切相关。

static void vmInit(void) {
    off_t totsize;
    int pipefds[2];
    size_t stacksize;
    struct flock fl;
    if (server.vm_max_threads != 0)
        zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
    redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file);
    /* Try to open the old swap file, otherwise create it */
    if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
        server.vm_fp = fopen(server.vm_swap_file,"w+b");
    }
    if (server.vm_fp == NULL) {
        redisLog(REDIS_WARNING,
            "Can't open the swap file: %s. Exiting.",
            strerror(errno));
        exit(1);
    }
    server.vm_fd = fileno(server.vm_fp);
    /* Lock the swap file for writing, this is useful in order to avoid
     * another instance to use the same swap file for a config error. */
    fl.l_type = F_WRLCK;
    fl.l_whence = SEEK_SET;
    fl.l_start = fl.l_len = 0;
    if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) {
        redisLog(REDIS_WARNING,
            "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno));
        exit(1);
    }
    /* Initialize */
    server.vm_next_page = 0;
    server.vm_near_pages = 0;
    server.vm_stats_used_pages = 0;
    server.vm_stats_swapped_objects = 0;
    server.vm_stats_swapouts = 0;
    server.vm_stats_swapins = 0;
    totsize = server.vm_pages*server.vm_page_size;
    redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
    if (ftruncate(server.vm_fd,totsize) == -1) {
        redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
            strerror(errno));
        exit(1);
    } else {
        redisLog(REDIS_NOTICE,"Swap file allocated with success");
    }
    server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
    redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
        (long long) (server.vm_pages+7)/8, server.vm_pages);
    memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
    /* Initialize threaded I/O (used by Virtual Memory) */
    server.io_newjobs = listCreate();
    server.io_processing = listCreate();
    server.io_processed = listCreate();
    server.io_ready_clients = listCreate();
    pthread_mutex_init(&server.io_mutex,NULL);
    pthread_mutex_init(&server.obj_freelist_mutex,NULL);
    pthread_mutex_init(&server.io_swapfile_mutex,NULL);
    server.io_active_threads = 0;
    if (pipe(pipefds) == -1) {
        redisLog(REDIS_WARNING,"Unable to intialized VM: pipe(2): %s. Exiting."
            ,strerror(errno));
        exit(1);
    }
    server.io_ready_pipe_read = pipefds[0];
    server.io_ready_pipe_write = pipefds[1];
    redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
    /* LZF requires a lot of stack */
    pthread_attr_init(&server.io_threads_attr);
    pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
    /* Solaris may report a stacksize of 0, let's set it to 1 otherwise 115
     * multiplying it by 2 in the while loop later will not really help   */
    if (!stacksize) stacksize = 1;
    while (stacksize