欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mysql filesort源代码解析

程序员文章站 2022-03-21 15:21:22
...

这里使用的代码是mysql8.0.20分支。其他分支的代码可能略有不同。
filesort在核心代码前面还有很多辅助代码,辅助部分就不说了,重点放在核心代码部分。
下面是一个调用栈:

SELECT_LEX_UNIT::ExecuteIteratorQuery(SELECT_LEX_UNIT * const this, THD * thd) (/root/mysql-8.0.20/sql/sql_union.cc:1165)
SELECT_LEX_UNIT::execute(SELECT_LEX_UNIT * const this, THD * thd) (/root/mysql-8.0.20/sql/sql_union.cc:1235)
Sql_cmd_dml::execute_inner(Sql_cmd_dml * const this, THD * thd) (/root/mysql-8.0.20/sql/sql_select.cc:945)
Sql_cmd_dml::execute(Sql_cmd_dml * const this, THD * thd) (/root/mysql-8.0.20/sql/sql_select.cc:725)
mysql_execute_command(THD * thd, bool first_level) (/root/mysql-8.0.20/sql/sql_parse.cc:4489)
mysql_parse(THD * thd, Parser_state * parser_state) (/root/mysql-8.0.20/sql/sql_parse.cc:5306)
dispatch_command(THD * thd, const COM_DATA * com_data, enum_server_command command) (/root/mysql-8.0.20/sql/sql_parse.cc:1776)
do_command(THD * thd) (/root/mysql-8.0.20/sql/sql_parse.cc:1274)
handle_connection(void * arg) (/root/mysql-8.0.20/sql/conn_handler/connection_handler_per_thread.cc:302)
pfs_spawn_thread(void * arg) (/root/mysql-8.0.20/storage/perfschema/pfs.cc:2854)
libpthread.so.0!start_thread (Unknown Source:0)
libc.so.6!clone (Unknown Source:0)

这里就从ExecuteIteratorQuery方法的第1165行开始,这部分是真正的排序和获取结果,以及发送结果到客户端的代码。

bool SELECT_LEX_UNIT::ExecuteIteratorQuery(THD *thd) {
 。。。。。。
//下面是第1165行
  if (m_root_iterator->Init()) {
    return true;
  }

  {
    PFSBatchMode pfs_batch_mode(m_root_iterator.get());
    auto join_cleanup = create_scope_guard([this, thd] {
      for (SELECT_LEX *sl = first_select(); sl; sl = sl->next_select()) {
        JOIN *join = sl->join;
        join->join_free();
        thd->inc_examined_row_count(join->examined_rows);
      }
      if (fake_select_lex != nullptr) {
        thd->inc_examined_row_count(fake_select_lex->join->examined_rows);
      }
    });

    for (;;) {
      int error = m_root_iterator->Read();
      DBUG_EXECUTE_IF("bug13822652_1", thd->killed = THD::KILL_QUERY;);

      if (error > 0 || thd->is_error())  // Fatal error
        return true;
      else if (error < 0)
        break;
      else if (thd->killed)  // Aborted by user
      {
        thd->send_kill_message();
        return true;
      }

      ++*send_records_ptr;
      if (query_result->send_data(thd, *fields)) {
        return true;
      }
      thd->get_stmt_da()->inc_current_row_for_condition();
    }

    // NOTE: join_cleanup must be done before we send EOF, so that we get the
    // row counts right.
  }

先看一下ExecuteIteratorQuery方法其中的调用脉络:
mysql filesort源代码解析

可以看到,在ExecuteIteratorQuery方法中调用了SortingIterator::Init()方法,在SortingIterator::Init()中调用了Dosort()方法,整个核心的排序过程都在其中。接下来,SortingIterator::Init()中根据不同的排序路径选择TableRowIterator的不同子类来实际进行排序结果的获取动作。m_root_iterator->Read()代理了TableRowIterator的不同子类的读取。最后通过query_result->send_data将数据发送到客户端。

这里跟随代码看一下具体的实现。关键的部分都带上了注释。

先看看SortingIterator::Init():
 

bool SortingIterator::Init() {
  ReleaseBuffers();

  // Both empty result and error count as errors. (TODO: Why? This is a legacy
  // choice that doesn't always seem right to me, although it should nearly
  // never happen in practice.)
  //排序真正发生的地方
  if (DoSort() != 0) return true;

  // Prepare the result iterator for actually reading the data. Read()
  // will proxy to it.
  TABLE *table = m_filesort->table;
  /*
  * 如果m_sort_result.io_cache没有初始化,或者分配了m_sort_result.io_cache.buffer缓冲区,这只会在merge_pass下发生
  */
  if (m_sort_result.io_cache && my_b_inited(m_sort_result.io_cache)) {
    // Test if ref-records was used
    if (m_fs_info.using_addon_fields()) {
      DBUG_PRINT("info", ("using SortFileIterator"));
      if (m_fs_info.addon_fields->using_packed_addons())
	  //m_result_iterator是std::unique_ptr类型包裹了RowIterator的对象,
	  //m_result_iterator_holder.xxx是占位符
        m_result_iterator.reset(
            new (&m_result_iterator_holder.sort_file_packed_addons)
                SortFileIterator<true>(thd(), table, m_sort_result.io_cache,
                                       &m_fs_info, m_examined_rows));
      else
        m_result_iterator.reset(
            new (&m_result_iterator_holder.sort_file)
                SortFileIterator<false>(thd(), table, m_sort_result.io_cache,
                                        &m_fs_info, m_examined_rows));
    } else {
      /*
        m_fs_info->addon_field is checked because if we use addon fields,
        it doesn't make sense to use cache - we don't read from the table
        and m_fs_info->io_cache is read sequentially
      */
      bool request_cache = !m_fs_info.using_addon_fields();
      m_result_iterator.reset(
          new (&m_result_iterator_holder.sort_file_indirect)
              SortFileIndirectIterator(
                  thd(), table, m_sort_result.io_cache, request_cache,
                  /*ignore_not_found_rows=*/false, m_examined_rows));
    }
    m_sort_result.io_cache =
        nullptr;  // The result iterator has taken ownership.
  } else {
  /*m_sort_result.has_result_in_memory()为真的情况只发生在save_index函数调用中,
  * 只有完全处于内存中排序的情况才会发生。
  */
    DBUG_ASSERT(m_sort_result.has_result_in_memory());
    if (m_fs_info.using_addon_fields()) {
      DBUG_PRINT("info", ("using SortBufferIterator"));
      DBUG_ASSERT(m_sort_result.sorted_result_in_fsbuf);
      if (m_fs_info.addon_fields->using_packed_addons())
        m_result_iterator.reset(
            new (&m_result_iterator_holder.sort_buffer_packed_addons)
                SortBufferIterator<true>(thd(), table, &m_fs_info,
                                         &m_sort_result, m_examined_rows));
      else
        m_result_iterator.reset(
            new (&m_result_iterator_holder.sort_buffer)
                SortBufferIterator<false>(thd(), table, &m_fs_info,
                                          &m_sort_result, m_examined_rows));
    } else {
	//使用filesort排序发生sort_merge_pass的情况下,会走到这里。
      DBUG_PRINT("info", ("using SortBufferIndirectIterator (sort)"));
      m_result_iterator.reset(
          new (&m_result_iterator_holder.sort_buffer_indirect)
              SortBufferIndirectIterator(thd(), table, &m_sort_result,
                                         /*ignore_not_found_rows=*/false,
                                         m_examined_rows));
    }
  }

  return m_result_iterator->Init();
}

按照调用顺序,接下来是Dosort():
Dosort()其中方法调用的脉络,有个总体清晰的印象:
mysql filesort源代码解析

,看看DoSort()方法的实现,同样的,关键部分有注释

/*
  Do the actual sort, by calling filesort. The result will be left in one of
  several places depending on what sort strategy we chose; it is up to Init() to
  figure out what happened and create the appropriate iterator to read from it.

  RETURN VALUES
    0		ok
    -1		Some fatal error
    1		No records
*/

int SortingIterator::DoSort() {
  DBUG_ASSERT(m_sort_result.io_cache == nullptr);
  //初始化排序结果存储io_cache
  m_sort_result.io_cache =
      (IO_CACHE *)my_malloc(key_memory_TABLE_sort_io_cache, sizeof(IO_CACHE),
                            MYF(MY_WME | MY_ZEROFILL));

  if (m_qep_tab != nullptr) {
    JOIN *join = m_qep_tab->join();
    if (join != nullptr && join->unit->root_iterator() == nullptr) {
      /* Fill schema tables with data before filesort if it's necessary */
      if ((join->select_lex->active_options() & OPTION_SCHEMA_TABLE) &&
          get_schema_tables_result(join, PROCESSED_BY_CREATE_SORT_INDEX))
        return -1;
    }
  }

  ha_rows found_rows;
  //filesort排序真正发生的地方
  bool error = filesort(thd(), m_filesort, m_source_iterator.get(), &m_fs_info,
                        &m_sort_result, &found_rows);
  if (m_qep_tab != nullptr) {
    m_qep_tab->set_records(found_rows);  // For SQL_CALC_FOUND_ROWS
  }
  m_filesort->table->set_keyread(false);  // Restore if we used indexes
  return error;
}

filesort()方法实现,关键部分有注释:
 

/**
  Sort a table.
  Creates a set of pointers that can be used to read the rows
  in sorted order. This should be done with the functions
  in records.cc.

  The result set is stored in fs_info->io_cache or
  fs_info->sorted_result, or left in the main filesort buffer.

  @param      thd            Current thread
  @param      filesort       How to sort the table
  @param      source_iterator Where to read the rows to be sorted from.
  @param      fs_info        Owns the buffers for sort_result.
  @param      sort_result    Where to store the sort result.
  @param[out] found_rows     Store the number of found rows here.
                             This is the number of found rows after
                             applying WHERE condition.

  @note
    If we sort by position (like if sort_positions is 1) filesort() will
    call table->prepare_for_position().

  @returns   False if success, true if error
*/

bool filesort(THD *thd, Filesort *filesort, RowIterator *source_iterator,
              Filesort_info *fs_info, Sort_result *sort_result,
              ha_rows *found_rows) {
  int error;
  ulong memory_available = thd->variables.sortbuff_size;
  ha_rows num_rows_found = HA_POS_ERROR;
  ha_rows num_rows_estimate = HA_POS_ERROR;
  IO_CACHE tempfile;    // Temporary file for storing intermediate results.
  IO_CACHE chunk_file;  // For saving Merge_chunk structs.
  IO_CACHE *outfile;    // Contains the final, sorted result.
  Sort_param *param = &filesort->m_sort_param;
  TABLE *const table = filesort->table;
  ha_rows max_rows = filesort->limit;
  uint s_length = 0;

  DBUG_TRACE;
  // 'pushed_join' feature need to read the partial joined results directly
  // from the NDB API. First storing it into a temporary table, means that
  // any joined child results are effectively wasted, and we will have to
  // re-read them as non-pushed later.
  DBUG_ASSERT(!table->file->member_of_pushed_join());

  if (!(s_length = filesort->sort_order_length()))
    return true; /* purecov: inspected */

  DBUG_ASSERT(!table->reginfo.join_tab);

  DEBUG_SYNC(thd, "filesort_start");

  DBUG_ASSERT(sort_result->sorted_result == nullptr);
  sort_result->sorted_result_in_fsbuf = false;

  outfile = sort_result->io_cache;
  my_b_clear(&tempfile);
  my_b_clear(&chunk_file);
  error = 1;

  // Make sure the source iterator is initialized before init_for_filesort(),
  // since table->file (and in particular, ref_length) may not be initialized
  // before that.
  DBUG_EXECUTE_IF("bug14365043_1", DBUG_SET("+d,ha_rnd_init_fail"););
  if (source_iterator->Init()) {
    return HA_POS_ERROR;
  }

  /*
    We need a nameless wrapper, since we may be inside the "steps" of
    "join_execution".
  */
  Opt_trace_context *const trace = &thd->opt_trace;
  Opt_trace_object trace_wrapper(trace);
  trace_wrapper.add_alnum("sorting_table", table->alias);

  trace_filesort_information(trace, filesort->sortorder, s_length);
  /*
  *sortlength(thd, filesort->sortorder, s_length),根据order by后面的排序项计算排序key长度。其中,除json类型之外,都使用固定长度,json类型使用可变长度。
  */
  param->init_for_filesort(filesort, make_array(filesort->sortorder, s_length),
                           sortlength(thd, filesort->sortorder, s_length),
                           table, max_rows, filesort->m_remove_duplicates);

  fs_info->addon_fields = param->addon_fields;
  //累加Sort_scan状态变量值。
  thd->inc_status_sort_scan();

  if (table->file->inited) {
    if (table->s->tmp_table)
      table->file->info(HA_STATUS_VARIABLE);  // Get record count
	  //估算表最大记录数。公式为(cluster leaf page数量*page size / 记录最小大小)*2扩展因子
    num_rows_estimate = table->file->estimate_rows_upper_bound();
  } else {
    // If number of rows is not known, use as much of sort buffer as possible.
    num_rows_estimate = HA_POS_ERROR;
  }

  Bounded_queue<uchar *, uchar *, Sort_param, Mem_compare_queue_key> pq(
      param->max_record_length(),
      (Malloc_allocator<uchar *>(key_memory_Filesort_info_record_pointers)));

  // We would have liked to do this ahead-of-time so that we can show it
  // in EXPLAIN. However, we don't know the table->file->ref_length before
  // sorting time, which makes it hard to make the decision if we're row IDs.
  // (If we sort rows, we would know, but it's not much use knowing it
  // ahead-of-time _sometimes_.)
  //
  // However, do note this cannot change the addon fields status,
  // so that we at least know that when checking whether we can skip
  // in-between temporary tables (StreamingIterator).
  /*
  * if里面是一个优先级队列的东东,如果"最大行数/3"能够在sort_buffer_size的内存空间中排序完成的话,
  * 就使用优先级队列进行排序。
  */
  if (check_if_pq_applicable(trace, param, fs_info, num_rows_estimate,
                             memory_available)) {
    DBUG_PRINT("info", ("filesort PQ is applicable"));
    /*
      For PQ queries (with limit) we know exactly how many pointers/records
      we have in the buffer, so to simplify things, we initialize
      all pointers here. (We cannot pack fields anyways, so there is no
      point in doing incremental allocation).
     */
    if (fs_info->preallocate_records(param->max_rows_per_buffer)) {
      my_error(ER_OUT_OF_SORTMEMORY, ME_FATALERROR);
      LogErr(ERROR_LEVEL, ER_SERVER_OUT_OF_SORTMEMORY);
      goto err;
    }
    if (pq.init(param->max_rows, param, fs_info->get_sort_keys())) {
      /*
       If we fail to init pq, we have to give up:
       out of memory means my_malloc() will call my_error().
      */
      DBUG_PRINT("info", ("failed to allocate PQ"));
      fs_info->free_sort_buffer();
      DBUG_ASSERT(thd->is_error());
      goto err;
    }
    filesort->using_pq = true;
    param->using_pq = true;
    param->m_addon_fields_status = Addon_fields_status::using_priority_queue;
  } else {
    DBUG_PRINT("info", ("filesort PQ is not applicable"));
    filesort->using_pq = false;
    param->using_pq = false;

    /*
      When sorting using priority queue, we cannot use packed addons.
      Without PQ, we can try.
    */
	/*
	* 如果能pack的话,需要加上4字节的长度字节。能pack的标准是打包前的长度不能小于14字节。
	*/
    param->try_to_pack_addons();

    /*
      NOTE: param->max_rows_per_buffer is merely informative (for optimizer
      trace) in this case, not actually used.
    */
	//如果评估出来的行数少于15行
    if (num_rows_estimate < MERGEBUFF2) num_rows_estimate = MERGEBUFF2;
	//keys等于最大可用内存(sort_buffer_size变量定义)除以(最大记录大小+记录指针大小)
    ha_rows keys =
        memory_available / (param->max_record_length() + sizeof(char *));
		//计算buffer可容纳多少记录
    param->max_rows_per_buffer =
        min(num_rows_estimate > 0 ? num_rows_estimate : 1, keys);

    fs_info->set_max_size(memory_available, param->max_record_length());
  }

  param->sort_form = table;
  size_t longest_key, longest_addons;
  longest_addons = 0;

  // New scope, because subquery execution must be traced within an array.
  {
    Opt_trace_array ota(trace, "filesort_execution");
	/*
	* 读取所有排序行,这里逻辑大致为先分配一个32k的buffer用来存读取的行。这里从32k开始,是因为malloc调用有一个128K的阈值,
	* 超过此值的话是从进程地址空间的文件映射区域分配内存,在此值之下,都是从进程堆内存进行分配,可以提升内存分配速度。
	* 随着行不断读取后塞满buffer,
	* 接着分配下一个1.5倍大小的buffer进行存储,所有分配的buffer都保存在一个名为chunk结构中。
	* 随着buffer越分越多,总的buffer大小会接近可用排序内存最大值(sort_buffer_size变量定义)
	* 此时,将整个chunk包含的所有buffer中的记录进行排序后(排序最终使用c++标准库按照内存字节进行排序),
	* 将有序记录写出到写出到tempfile文件中,并将此chunk中包含的记录在tempfile中的开始位置以及
	* 记录数等信息构造成一个Merge_chunk对象写入chunk_file。
	* 重复以上步骤直到数据读取完毕。
	* 如果使用优先级队列的话,是直接在内存中排序的,比较简单,就不说了。
	*/
    num_rows_found =
        read_all_rows(thd, param, fs_info, &chunk_file, &tempfile,
                      param->using_pq ? &pq : nullptr, source_iterator,
                      found_rows, &longest_key, &longest_addons);
    if (num_rows_found == HA_POS_ERROR) goto err;
  }

  size_t num_chunks, num_initial_chunks;
  if (my_b_inited(&chunk_file)) {
  //计算最终使用了多少个chunk。
    num_chunks =
        static_cast<size_t>(my_b_tell(&chunk_file)) / sizeof(Merge_chunk);
  } else {
    num_chunks = 0;
  }

  num_initial_chunks = num_chunks;

  if (num_chunks == 0)  // The whole set is in memory
  {
    //如果chunk数量为0,说明所有记录都可以在内存中进行存储、排序。直接使用save_index函数在内存排序并将结果写出到sort_result.io_cache中。
	ha_rows rows_in_chunk =
        param->using_pq ? pq.num_elements() : num_rows_found;
    if (save_index(param, rows_in_chunk, fs_info, sort_result)) goto err;
  } else {
    // If deduplicating, we'll need to remember the previous key somehow.
    if (filesort->m_remove_duplicates) {
      param->m_last_key_seen =
          static_cast<uchar *>(thd->mem_root->Alloc(longest_key));
    }

    // We will need an extra buffer in SortFileIndirectIterator
    if (fs_info->addon_fields != nullptr &&
        !(fs_info->addon_fields->allocate_addon_buf(longest_addons)))
      goto err; /* purecov: inspected */

    fs_info->read_chunk_descriptors(&chunk_file, num_chunks);
    if (fs_info->merge_chunks.is_null()) goto err; /* purecov: inspected */

    close_cached_file(&chunk_file);

    /* Open cached file if it isn't open */
    if (!my_b_inited(outfile) &&
        open_cached_file(outfile, mysql_tmpdir, TEMP_PREFIX, READ_RECORD_BUFFER,
                         MYF(MY_WME)))
      goto err;
    if (reinit_io_cache(outfile, WRITE_CACHE, 0L, false, false)) goto err;

    param->max_rows_per_buffer = static_cast<uint>(
        fs_info->max_size_in_bytes() / param->max_record_length());
    /*fs_info->get_contiguous_buffer()一次性分配了一个最大大小的排序缓冲区
	* 如果sort_buffer_size设置过大的话,malloc调用可能使用mmap方式进行,内存分配速度会降低。
	*/
    Bounds_checked_array<uchar> merge_buf = fs_info->get_contiguous_buffer();
    if (merge_buf.array() == nullptr) {
      my_error(ER_OUT_OF_SORTMEMORY, ME_FATALERROR);
      LogErr(ERROR_LEVEL, ER_SERVER_OUT_OF_SORTMEMORY);
      goto err;
    }
	/*这里是多路归并的重点,大致的逻辑是将所有chunk按照写入顺序,分为7个一组从tempfile文件读取,
	* 从每组中的7个chunk中平均读一部分数据到排序缓冲区中,进行7路归并,使用优先级队列排序算法,
	* 归并的结果写出到to_file2中,此时这7个chunk合并为一个有序的大chunk。接着重复进行上述步骤归并剩下的
	* chunk分组。第一轮归并结束后,将tempfile与to_file2交换,
	* 从to_file2读取结果,归并排序结果输出到tempfile,并仍按照相同算法进行第二轮、第三轮归并,
	* 每轮结束后交换输入输出文件。直到最后一轮归并结束后剩下不大于15个有序chunk组。
	* merge_many_buff中调用了merge_buffers方法,其中累积了Sort_merge_passes状态变量,每一轮每一次7路归并都累加1。
	*/
    if (merge_many_buff(thd, param, merge_buf, fs_info->merge_chunks,
                        &num_chunks, &tempfile))
      goto err;
	/*io_cache是另一个值得细说的部分,io_cache对应了一个tmp文件,最终的结果集都是写出到这个文件中
	* 但是io_cache同时存在了一个16K/32K的内存缓冲buffer,用于缓存写入的数据,加速写入。
	* 因此,进行后续动作之前,需要先把buffer刷出去。并将io_cache的类型由WRITE_CACHE变更为READ_CACHE,为后面的读取做准备
	*/
    if (flush_io_cache(&tempfile) ||
        reinit_io_cache(&tempfile, READ_CACHE, 0L, false, false))
      goto err;
	/*
	* 上面merge_many_buff之后,留下了不大于15组有序的chunk。这里需要对这些chunk进行一次归并排序后,
	* 将排序key去掉,将addon_field数据写出到outfile中。
	*/
    if (merge_index(
            thd, param, merge_buf,
            Merge_chunk_array(fs_info->merge_chunks.begin(), num_chunks),
            &tempfile, outfile))
      goto err;

    sort_result->found_records = num_rows_found;
  }

  if (trace->is_started()) {
    char buffer[100];
    String sort_mode(buffer, sizeof(buffer), &my_charset_bin);
    sort_mode.length(0);
    sort_mode.append("<");
    if (param->using_varlen_keys())
      sort_mode.append("varlen_sort_key");
    else
      sort_mode.append("fixed_sort_key");
    sort_mode.append(", ");
    sort_mode.append(param->using_packed_addons()
                         ? "packed_additional_fields"
                         : param->using_addon_fields() ? "additional_fields"
                                                       : "rowid");
    sort_mode.append(">");

    const char *algo_text[] = {"none", "std::sort", "std::stable_sort"};

    Opt_trace_object filesort_summary(trace, "filesort_summary");
    filesort_summary.add("memory_available", memory_available)
        .add("key_size", param->max_compare_length())
        .add("row_size", param->max_record_length())
        .add("max_rows_per_buffer", param->max_rows_per_buffer)
        .add("num_rows_estimate", num_rows_estimate)
        .add("num_rows_found", num_rows_found)
        .add("num_initial_chunks_spilled_to_disk", num_initial_chunks)
        .add("peak_memory_used", fs_info->peak_memory_used())
        .add_alnum("sort_algorithm", algo_text[param->m_sort_algorithm]);
    if (!param->using_packed_addons())
      filesort_summary.add_alnum(
          "unpacked_addon_fields",
          addon_fields_text(param->m_addon_fields_status));
    filesort_summary.add_alnum("sort_mode", sort_mode.c_ptr());
  }

  if (num_rows_found > param->max_rows) {
    // If read_all_rows() produced more results than the query LIMIT.
    num_rows_found = param->max_rows;
  }
  error = 0;

err:
  if (!filesort->keep_buffers) {
    if (!sort_result->sorted_result_in_fsbuf) fs_info->free_sort_buffer();
    my_free(fs_info->merge_chunks.array());
    fs_info->merge_chunks = Merge_chunk_array(nullptr, 0);
  }
  close_cached_file(&tempfile);
  close_cached_file(&chunk_file);
  if (my_b_inited(outfile)) {
    if (flush_io_cache(outfile)) error = 1;
    {
      my_off_t save_pos = outfile->pos_in_file;
      /* For following reads */
      if (reinit_io_cache(outfile, READ_CACHE, 0L, false, false)) error = 1;
      outfile->end_of_file = save_pos;
    }
  }
  if (error) {
    DBUG_ASSERT(thd->is_error() || thd->killed);
  } else
    /*
	* 这里累积了状态变量Sort_rows。需要注意的是,这个值相比真正参与排序的记录数是偏小的。
	*/
    thd->inc_status_sort_rows(num_rows_found);

  return error;
} /* filesort */

init_for_filesort():
 

void Sort_param::init_for_filesort(Filesort *file_sort,
                                   Bounds_checked_array<st_sort_field> sf_array,
                                   uint sortlen, TABLE *table, ha_rows maxrows,
                                   bool remove_duplicates) {
  m_fixed_sort_length = sortlen;
  m_force_stable_sort = file_sort->m_force_stable_sort;
  m_remove_duplicates = remove_duplicates;
  ref_length = table->file->ref_length;

  local_sortorder = sf_array;
  /*出于如下几种原因,不能使用addon_fields。除此之外,将字段值直接放置在sort key后部。
  * fulltext_searched,
  * keep_rowid,
  * row_not_packable,
  * row_contains_blob,
  * skip_heuristic,
  * using_priority_queue
  */
  decide_addon_fields(file_sort, table, file_sort->m_force_sort_positions);
  if (using_addon_fields()) {
    fixed_res_length = m_addon_length;
  } else {
    fixed_res_length = ref_length;
    /*
      The reference to the record is considered
      as an additional sorted field
    */
	/*
	* 正如上面注释说的,如果使用rowid的话,那rowid需要作为sort key使用。为啥?可能是考虑到排序稳定性。
	* 考虑两条完全相同的记录A和B,在分页的情况下,如果第一次排序返回顺序A > B,后面排序返回顺序B > A。
	* 如果在两条记录中间发生页面切换,A可能出现在前后两个页面,而B可能一次都不会出现。但是在后面实现
	* 的时候,并没有将rowid一起进行排序。
	*/
    AddWithSaturate(ref_length, &m_fixed_sort_length);
  }

  m_num_varlen_keys = count_varlen_keys();
  m_num_json_keys = count_json_keys();
  if (using_varlen_keys()) {
  //如果有可变长度的字段,需要添加4字节的可变字段数量值
    AddWithSaturate(size_of_varlength_field, &m_fixed_sort_length);
  }
  /*
    Add hash at the end of sort key to order cut values correctly.
    Needed for GROUPing, rather than for ORDERing.
  */
  //如果是json排序项,在末尾添加8字节,保存hash值。
  if (using_json_keys()) {
    use_hash = true;
    AddWithSaturate(sizeof(ulonglong), &m_fixed_sort_length);
  }
  //m_fixed_rec_length就是每个rec的长度。
  m_fixed_rec_length = AddWithSaturate(m_fixed_sort_length, m_addon_length);
  max_rows = maxrows;
}

init_for_filesort()其中,大家可能比较关注decide_addon_fields(file_sort, table, file_sort->m_force_sort_positions);方法调用,这里决定了sort_key后面是跟着addon_fields还是rowid:

void Sort_param::decide_addon_fields(Filesort *file_sort, TABLE *table,
                                     bool force_sort_positions) {
  if (m_addon_fields_status != Addon_fields_status::unknown_status) {
    // Already decided.
    return;
  }

  DBUG_EXECUTE_IF("filesort_force_sort_row_ids", {
    m_addon_fields_status = Addon_fields_status::keep_rowid;
    return;
  });

  // Generally, prefer using addon fields (ie., sorting rows instead of just
  // row IDs) if we can.
  //
  // NOTE: If the table is already in memory (e.g. the MEMORY engine; see the
  // HA_FAST_KEY_READ flag), it would normally be beneficial to sort row IDs
  // over rows to get smaller sort chunks. However, eliding the temporary table
  // entirely is even better than using row IDs, and only possible if we sort
  // rows. Furthermore, since we set up filesort at optimize time, we don't know
  // yet whether the source table would _remain_ in memory, or could be spilled
  // to disk using InnoDB. Thus, the only case that reasonably remains where
  // we'd want to use row IDs without being forced would be on user (ie.,
  // non-temporary) MEMORY tables, which doesn't seem reasonable to add
  // complexity for.
  //如果是全文检索,则用不上addon_fields
  if (table->fulltext_searched) {
    // MATCH() (except in “boolean mode”) doesn't use the actual value, it just
    // goes and asks the handler directly for the current row. Thus, we need row
    // IDs, so that the row is positioned correctly.
    //
    // When sorting a join, table->fulltext_searched will be false, but items
    // (like Item_func_match) are materialized (by StreamingIterator or
    // MaterializeIterator) before the sort, so this is moot.
    m_addon_fields_status = Addon_fields_status::fulltext_searched;
  } else if (force_sort_positions) {
  /*如果强制使用rowid,也用不上addon_fields。那什么情况下会强制使用rowid?
  * 好问题,我也暂时还没完全理清楚。
  */
    m_addon_fields_status = Addon_fields_status::keep_rowid;
  } else {
    /*
      Get the descriptors of all fields whose values are appended
      to sorted fields and get its total length in m_addon_length.
    */
	//从代码里来看,这里就可以使用addon_fields了
    addon_fields = file_sort->get_addon_fields(
        table, &m_addon_fields_status, &m_addon_length, &m_packable_length);
  }
}

当然,除了代码中列出来的两种原因,这里还列出了其他不能使用addon_fields的原因:
第三种及以后的原因不能使用addon_fields(直接将select字段值放置在sort key后面,而不是放置rowid)。下文都有机会揭示后面几种原因。
enum class Addon_fields_status {
  unknown_status,
  using_addon_fields,

  // The remainder are reasons why we are _not_ using addon fields.
  fulltext_searched,
  keep_rowid,
  row_not_packable,
  row_contains_blob,
  skip_heuristic,
  using_priority_queue
};
check_if_pq_applicable()里会判断下是否可以使用优先级队列在内存中排序,这个对应了上文的using_priority_queue的原因,这个比较简单,就不说了。
Sort_param::try_to_pack_addons()在上面代码中有注释,就不细说了。
read_all_rows()是比较重要的,用于读取所有待排序的行,描述了排序缓冲区是如何从一个最小32KB的block,如何一步步增长到总大小sort_buffer_size大小的缓冲区。此方法的实现,在源代码中有一段伪码的描述,这里偷个懒,直接放上来:

 while (get_next_sortkey())
     {
       if (using priority queue)
         push sort key into queue
       else
       {
         try to put sort key into buffer;
         if (no free space in sort buffer)
         {
           do {
             allocate new, larger buffer;
             retry putting sort key into buffer;
           } until (record fits or no space for new buffer)
           if (no space for new buffer)
           {
             sort record pointers (all buffers);
             dump sorted sequence to 'tempfile';
             dump Merge_chunk describing sequence location into 'chunk_file';
           }
         }
         if (key was packed)
           tell sort buffer the actual number of bytes used;
       }
     }
     if (buffer has some elements && dumped at least once)
       sort-dump-dump as above;
     else
       don't sort, leave sort buffer to be sorted by caller.

之后,就进入了两个分支,如果判断所有排序记录都可以容纳在排序缓冲区中,则进入save_index方法,进行内存排序;否则,进入merge文件排序过程,此过程有三个重要的方法,merge_many_buff()、merge_index()、merge_buffers().
save_index比较简单,就不说了,大家直接去看代码就好了。这里重点说一下merge过程。
首先是merge_many_buff:
mysql filesort源代码解析

接下来merge_buffers:
mysql filesort源代码解析
merge_index()方法用来做最后一次归并排序后,将排序结果去掉sort-key后,输出到outfile结果文件中。
整个排序过程涉及到两类重要的对象结构,第一个是Filesort_info,其中包括了排序过程的所有元数据,其中聚合了Filesort_buffer类,Filesort_buffer负责分配以及管理排序缓冲区内存。需要知道的是,排序缓冲区一开始是从32KB开始分配,随着使用的增长,逐渐增大到总大小sort_buffer_size大小。bool Filesort_buffer::allocate_block(size_t num_bytes)负责了排序缓冲区的分配以及增长。

另外一类重要的对象时IO_CACHE。这个对象主要包含两部分内容,buffer内存缓冲区以及对应的实际的文件。在读写的时候,数据会先进入到写入/读入到buffer,接着写出到对应的文件或者从buffer中一条条读取。整个排序过程中涉及到4个文件,一个tempfile存储了read_all_rows方法的输出数据,一个t_file2用来与tempfile两者之间不断进行对倒交换,以完成多轮归并排序,一个chunkfile用来存储归并过程中的chunk组信息,一个outfile用来存储最终排序后的结果集。此4个文件都是IO_CACHE类型结构。

tempfile与t_file2中存储的数据结构在sort_param.h中有描述,我就直接贴出来了:

/**
  There are several record formats for sorting:
@verbatim
    |<key a><key b>...    | <rowid> |
    / m_fixed_sort_length / ref_len /
@endverbatim

  or with "addon fields"
@verbatim
    |<key a><key b>...    |<null bits>|<field a><field b>...|
    / m_fixed_sort_length /         addon_length            /
@endverbatim

  The packed format for "addon fields"
@verbatim
    |<key a><key b>...    |<length>|<null bits>|<field a><field b>...|
    / m_fixed_sort_length /         addon_length                     /
@endverbatim

  All the formats above have fixed-size keys, with appropriate padding.
  Fixed-size keys can be compared/sorted using memcmp().

  The packed (variable length) format for keys:
@verbatim
    |<keylen>|<varkey a><key b>...<hash>|<rowid>  or <addons>     |
    / 4 bytes/   keylen bytes           / ref_len or addon_length /
@endverbatim

  This format is currently only used if we are sorting JSON data.
  Variable-size keys must be compared piece-by-piece, using type information
  about each individual key part, @see cmp_varlen_keys.

  All the record formats consist of a (possibly composite) key,
  followed by a (possibly composite) payload.
  The key is used for sorting data. Once sorting is done, the payload is
  stored in some buffer, and read by some RowIterator.

  For fixed-size keys, with @<aaa@qq.com> payload, the @<aaa@qq.com> is also
  considered to be part of the key.

<dl>
<dt>@<aaa@qq.com>
          <dd>  Fields are fixed-size, specially encoded with
                Field::make_sort_key() so we can do byte-by-byte compare.
<dt>@<aaa@qq.com>
          <dd>  Contains the *actual* packed length (after packing) of
                everything after the sort keys.
                The size of the length field is 2 bytes,
                which should cover most use cases: addon data <= 65535 bytes.
                This is the same as max record size in MySQL.
<dt>@<null aaa@qq.com>
          <dd>  One bit for each nullable field, indicating whether the field
                is null or not. May have size zero if no fields are nullable.
<dt>@<field aaa@qq.com>
          <dd>  Are stored with field->pack(), and retrieved with
                field->unpack().
                Addon fields within a record are stored consecutively, with no
                "holes" or padding. They will have zero size for NULL values.
<dt>@<aaa@qq.com>
          <dd>  Contains the *actual* packed length of all the keys.
                We may have an arbitrary mix of fixed and variable-sized keys.
<dt>@<aaa@qq.com>
          <dd>  Optional 8 byte hash, used for GROUPing of JSON values.
<dt>@<aaa@qq.com>
          <dd>  Used for JSON and variable-length string values, the format is:
</dl>
@verbatim
                |<null value>|<key length>|<sort key>        |
                / 1 byte     /   4 bytes  / key length bytes /
@endverbatim
<dl>
<dt>@<null aaa@qq.com>
          <dd>  0x00 for NULL. 0xff for NULL under DESC sort. 0x01 for NOT NULL.
<dt>@<key aaa@qq.com>
          <dd>  The length of the sort key, *including* the four bytes for the
                key length. Does not exist if the field is NULL.
</dl>
 */

到这里,整个排序过程基本就结束了,进入到了结果获取阶段,这部分的代码在SortingIterator::Init()的后半段,大家看上面SortingIterator::Init()的代码注释就好了。

最后,来澄清一些问题:
1,read_rnd_buffer_size在排序过程中,并没有起到实际的作用。IO_CACHE的buffer分配方法调用脉络:
mysql filesort源代码解析
最终的buffer内存的分配发生在init_io_cache_ext方法中,read_rnd_buffer_size起作用的地方在这一段代码:
  if (!cachesize && !(cachesize = my_default_record_cache_size))
    return 1; /* No cache requested */
其中my_default_record_cache_size代表read_rnd_buffer_size。可见,如果cachesize没定义的话,才会使用my_default_record_cache_size来设置cachesize。但是open_cache_file在排序过程中的几次调用都输入了cachesize大小:
sql/filesort.cc:
   544:         open_cached_file(outfile, mysql_tmpdir, TEMP_PREFIX, READ_RECORD_BUFFER,
  1065:       open_cached_file(chunk_file, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
  1070:       open_cached_file(tempfile, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
sql/merge_many_buff.h:
  62:       open_cached_file(&t_file2, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
其中:
#define READ_RECORD_BUFFER (uint)(IO_SIZE * 8) /* Pointer_buffer_size */
#define DISK_BUFFER_SIZE (uint)(IO_SIZE * 16)  /* Size of diskbuffer */
#define IO_SIZE 4096
可以看到,chunk_file、tempfile、t_file2分配的cachesisze为64KB大小,outfile为32KB大小。

2, 对于sort_key的长度,对于json类型会使用固定长度;另外,根据字符集不同,如果字符集有Pad_attribute==NO_PAD属性,则对于其他可变长度类型,使用可变长度;如果字符集Pad_attribute==PAD_SPACE属性,因此除json类型字段外,其他字段都是固定长度。参与比较的长度为min(字段最大长度,max_sort_length)。。具体可查看uint sortlength(THD *thd, st_sort_field *sortorder, uint s_length)方法。因此,就排序来说,char与varchar没有区别。如果字段定义的长度过大,那么会造成排序缓冲区空间的极大浪费。顺便说一下,utf8mb4具有Pad_attribute==PAD_SPACE。
CHARSET_INFO my_charset_utf8mb4_general_ci = {
    45,
    0,
    0, /* number       */
    MY_CS_COMPILED | MY_CS_STRNXFRM | MY_CS_UNICODE |
        MY_CS_UNICODE_SUPPLEMENT, /* state  */
    MY_UTF8MB4,                   /* cs name      */
    MY_UTF8MB4_GENERAL_CI,        /* name       */
    "UTF-8 Unicode",              /* comment      */
    nullptr,                      /* tailoring    */
    nullptr,                      /* coll_param   */
    ctype_utf8mb4,                /* ctype        */
    to_lower_utf8mb4,             /* to_lower     */
    to_upper_utf8mb4,             /* to_upper     */
    to_upper_utf8mb4,             /* sort_order   */
    nullptr,                      /* uca          */
    nullptr,                      /* tab_to_uni   */
    nullptr,                      /* tab_from_uni */
    &my_unicase_default,          /* caseinfo     */
    nullptr,                      /* state_map    */
    nullptr,                      /* ident_map    */
    1,                            /* strxfrm_multiply */
    1,                            /* caseup_multiply  */
    1,                            /* casedn_multiply  */
    1,                            /* mbminlen     */
    4,                            /* mbmaxlen     */
    1,                            /* mbmaxlenlen  */
    0,                            /* min_sort_char */
    0xFFFF,                       /* max_sort_char */
    ' ',                          /* pad char      */
    false,                        /* escape_with_backslash_is_dangerous */
    1,                            /* levels_for_compare */
    &my_charset_utf8mb4_handler,
    &my_collation_utf8mb4_general_ci_handler,
    PAD_SPACE};

3,排序缓冲区在read_all_rows()过程中是从小到大按照1.5倍大小增长的,但是如果发生merge的话,是一次分配了sort_buffer_size大小的缓冲区。


最后说一下,CSDN真的垃圾,写了半天的东西,一个误操作control+z全没了。但凡正常一点的,control+z也应该是一步一步回退,也不至于一个control+z大部分文章都没有了。