欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

fio是如何运行的?

程序员文章站 2023-03-09 15:39:17
本文主要介绍fio是如何运行的,并且以单线程、单job为例 fio的入口在fio.c中的main函数,下面列出了main函数,此处只出示了一些调用的关键函数 在main函数中主要调用了两个关键函数,parse_options,顾名思义,就是分析options,也就是fio的参数,而fio_backe ......

本文主要介绍fio是如何运行的,并且以单线程、单job为例

fio的入口在fio.c中的main函数,下面列出了main函数,此处只出示了一些调用的关键函数

1 int main(int argc, char *argv[], char *envp[])
2 {
3     parse_options(argc, argv);
4     fio_backend();
5 }

在main函数中主要调用了两个关键函数,parse_options,顾名思义,就是分析options,也就是fio的参数,而fio_backend()函数则是fio进程的入口

fio_backend()函数在backend.c文件中

1 int fio_backend(void)
2 {
3     ......
4     run_threads();
5     ......
6 }

在fio_backend()函数中,初始化一些数据结构之后,调用了run_threads()(backend.c)函数,该函数是fio用来创建譬如i/o, verify线程等。

 

 1 /*
 2  * main function for kicking off and reaping jobs, as needed.
 3  */
 4 static void run_threads(void)
 5 {
 6     ......
 7     todo = thread_number;
 8     ......
 9     while (todo) {
10         if (td->o.use_thread) {
11             ......
12             ret = pthread_create(&td->thread, null,thread_main, td);
13             ret = pthread_detach(td->thread);
14             ......
15     }
16     ......
17 }

在这个函数中,创建了thread_main线程(backend.c),这个线程功能是,生成i/o,发送并完成i/o,记录数据等。

 1 /*
 2  * entry point for the thread based jobs. the process based jobs end up
 3  * here as well, after a little setup.
 4  */
 5 static void *thread_main(void *data)
 6 {
 7     ........
 8      /*
 9       * may alter parameters that init_io_u() will use, so we need to
10       * do this first.
11       * 下面两个函数的主要功能是生成读写的参数,offset,len
12        */
13     if (init_iolog(td))
14         goto err;
15 
16     if (init_io_u(td))
17         goto err;
18     ......
19     while (keep_running(td)) {
20         do_io(td);
21         do_verify(td, verify_bytes);//如果需要verification的话
22     }
23 }

如果不考虑verify的话,下面主要看do_io(backend.c)函数。

 1 /*
 2  * main io worker function. it retrieves io_u's to process and queues
 3  * and reaps them, checking for rate and errors along the way.
 4  *
 5  * returns number of bytes written and trimmed.
 6  */
 7 static uint64_t do_io(struct thread_data *td)
 8 {
 9     ......
10     //下面是do_io的主循环,判断条件是,io_log里有生成的pos信息,而且已经iuuse的数据小于总数据,
11     while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
12               (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) ||
13               td->o.time_based) {
14         ......
15         io_u = get_io_u(td);// io_u,是一个io unit,是根据参数生成的io unit
16         ......
17         ret = td_io_queue(td, io_u); //将io_u提交到队列中
18         switch (ret) {
19         case fio_q_completed: //处理错误,以及同步的操作
20         ......
21         case fio_q_queued://成功入队
22             bytes_issued += io_u->xfer_buflen;
23         case fio_q_busy: //队伍满了,重新入队
24         .......
25     /*
26          * see if we need to complete some commands. note that we
27          * can get busy even without io queued, if the system is
28          * resource starved.
29          */
30         full = queue_full(td) || (ret == fio_q_busy && td->cur_depth);
31         if (full || !td->o.iodepth_batch_complete) {
32         min_evts = min(td->o.iodepth_batch_complete,td->cur_depth);
33         /*
34          * if the queue is full, we must reap at least 1 event
35          */
36         if (full && !min_evts)
37             min_evts = 1;
38         do {
39             ret = io_u_queued_complete(td, min_evts, bytes_done);
40         } while (full && (td->cur_depth > td->o.iodepth_low));
41     }
42 }

上面do_io函数的关键入队函数td_io_queue(ioengines.c)

 1 int td_io_queue(struct thread_data *td, struct io_u *io_u)
 2 {
 3     ......
 4     ret = td->io_ops->queue(td, io_u);
 5     ......
 6     else if (ret == fio_q_queued) {
 7         int r;
 8         if (ddir_rw(io_u->ddir)) {
 9             td->io_u_queued++;
10             td->ts.total_io_u[io_u->ddir]++;
11         }
12          if (td->io_u_queued >= td->o.iodepth_batch) {
13              r = td_io_commit(td);
14              if (r < 0)
15                  return r;
16          }
17     }
18 }
19 int td_io_commit(struct thread_data *td)
20 {
21     ......
22     int ret;
23     if (td->io_ops->commit) {
24         ret = td->io_ops->commit(td);
25     }
26     ......
27     return 0;
28 }

对于td->iops,它是在各个engines中定义了,以libaio(/engines/libaio.c)为例,调用的函数就是相应engines里对应的函数

 1 static struct ioengine_ops ioengine = {
 2     .name            = "libaio",
 3     .version        = fio_ioops_version,
 4     .init            = fio_libaio_init,
 5     .prep            = fio_libaio_prep,
 6     .queue            = fio_libaio_queue,
 7     .commit            = fio_libaio_commit,
 8     .cancel            = fio_libaio_cancel,
 9     .getevents        = fio_libaio_getevents,
10     .event            = fio_libaio_event,
11     .cleanup        = fio_libaio_cleanup,
12     .open_file        = generic_open_file,
13     .close_file        = generic_close_file,
14     .get_file_size        = generic_get_file_size,
15     .options        = options,
16     .option_struct_size    = sizeof(struct libaio_options),
17 };

对于reap流程里的io_u_queued_complete(io_u.c)函数

 1 /*
 2  * called to complete min_events number of io for the async engines.
 3  */
 4 int io_u_queued_complete(struct thread_data *td, int min_evts,
 5              uint64_t *bytes)
 6 {
 7     ......
 8 
 9     ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete, tvp);
10     ......
11 }
12 
13 int td_io_getevents(struct thread_data *td, unsigned int min, unsigned int max,
14             struct timespec *t)
15 {
16     int r = 0;
17 
18     if (min > 0 && td->io_ops->commit) {
19         r = td->io_ops->commit(td);
20         if (r < 0)
21             goto out;
22     }
23     if (max > td->cur_depth)
24         max = td->cur_depth;
25     if (min > max)
26         max = min;
27 
28     r = 0;
29     if (max && td->io_ops->getevents)
30         r = td->io_ops->getevents(td, min, max, t);
31 out:
32     ......
33     return r;
34 }    
这里调用的getevents也是各个engines里定义的函数