mysql filesort源代码解析
这里使用的代码是mysql8.0.20分支。其他分支的代码可能略有不同。
filesort在核心代码前面还有很多辅助代码,辅助部分就不说了,重点放在核心代码部分。
下面是一个调用栈:
SELECT_LEX_UNIT::ExecuteIteratorQuery(SELECT_LEX_UNIT * const this, THD * thd) (/root/mysql-8.0.20/sql/sql_union.cc:1165)
SELECT_LEX_UNIT::execute(SELECT_LEX_UNIT * const this, THD * thd) (/root/mysql-8.0.20/sql/sql_union.cc:1235)
Sql_cmd_dml::execute_inner(Sql_cmd_dml * const this, THD * thd) (/root/mysql-8.0.20/sql/sql_select.cc:945)
Sql_cmd_dml::execute(Sql_cmd_dml * const this, THD * thd) (/root/mysql-8.0.20/sql/sql_select.cc:725)
mysql_execute_command(THD * thd, bool first_level) (/root/mysql-8.0.20/sql/sql_parse.cc:4489)
mysql_parse(THD * thd, Parser_state * parser_state) (/root/mysql-8.0.20/sql/sql_parse.cc:5306)
dispatch_command(THD * thd, const COM_DATA * com_data, enum_server_command command) (/root/mysql-8.0.20/sql/sql_parse.cc:1776)
do_command(THD * thd) (/root/mysql-8.0.20/sql/sql_parse.cc:1274)
handle_connection(void * arg) (/root/mysql-8.0.20/sql/conn_handler/connection_handler_per_thread.cc:302)
pfs_spawn_thread(void * arg) (/root/mysql-8.0.20/storage/perfschema/pfs.cc:2854)
libpthread.so.0!start_thread (Unknown Source:0)
libc.so.6!clone (Unknown Source:0)
这里就从ExecuteIteratorQuery方法的第1165行开始,这部分是真正的排序和获取结果,以及发送结果到客户端的代码。
bool SELECT_LEX_UNIT::ExecuteIteratorQuery(THD *thd) {
。。。。。。
//下面是第1165行
if (m_root_iterator->Init()) {
return true;
}
{
PFSBatchMode pfs_batch_mode(m_root_iterator.get());
auto join_cleanup = create_scope_guard([this, thd] {
for (SELECT_LEX *sl = first_select(); sl; sl = sl->next_select()) {
JOIN *join = sl->join;
join->join_free();
thd->inc_examined_row_count(join->examined_rows);
}
if (fake_select_lex != nullptr) {
thd->inc_examined_row_count(fake_select_lex->join->examined_rows);
}
});
for (;;) {
int error = m_root_iterator->Read();
DBUG_EXECUTE_IF("bug13822652_1", thd->killed = THD::KILL_QUERY;);
if (error > 0 || thd->is_error()) // Fatal error
return true;
else if (error < 0)
break;
else if (thd->killed) // Aborted by user
{
thd->send_kill_message();
return true;
}
++*send_records_ptr;
if (query_result->send_data(thd, *fields)) {
return true;
}
thd->get_stmt_da()->inc_current_row_for_condition();
}
// NOTE: join_cleanup must be done before we send EOF, so that we get the
// row counts right.
}
先看一下ExecuteIteratorQuery方法其中的调用脉络:
可以看到,在ExecuteIteratorQuery方法中调用了SortingIterator::Init()方法,在SortingIterator::Init()中调用了Dosort()方法,整个核心的排序过程都在其中。接下来,SortingIterator::Init()中根据不同的排序路径选择TableRowIterator的不同子类来实际进行排序结果的获取动作。m_root_iterator->Read()代理了TableRowIterator的不同子类的读取。最后通过query_result->send_data将数据发送到客户端。
这里跟随代码看一下具体的实现。关键的部分都带上了注释。
先看看SortingIterator::Init():
bool SortingIterator::Init() {
ReleaseBuffers();
// Both empty result and error count as errors. (TODO: Why? This is a legacy
// choice that doesn't always seem right to me, although it should nearly
// never happen in practice.)
//排序真正发生的地方
if (DoSort() != 0) return true;
// Prepare the result iterator for actually reading the data. Read()
// will proxy to it.
TABLE *table = m_filesort->table;
/*
* 如果m_sort_result.io_cache没有初始化,或者分配了m_sort_result.io_cache.buffer缓冲区,这只会在merge_pass下发生
*/
if (m_sort_result.io_cache && my_b_inited(m_sort_result.io_cache)) {
// Test if ref-records was used
if (m_fs_info.using_addon_fields()) {
DBUG_PRINT("info", ("using SortFileIterator"));
if (m_fs_info.addon_fields->using_packed_addons())
//m_result_iterator是std::unique_ptr类型包裹了RowIterator的对象,
//m_result_iterator_holder.xxx是占位符
m_result_iterator.reset(
new (&m_result_iterator_holder.sort_file_packed_addons)
SortFileIterator<true>(thd(), table, m_sort_result.io_cache,
&m_fs_info, m_examined_rows));
else
m_result_iterator.reset(
new (&m_result_iterator_holder.sort_file)
SortFileIterator<false>(thd(), table, m_sort_result.io_cache,
&m_fs_info, m_examined_rows));
} else {
/*
m_fs_info->addon_field is checked because if we use addon fields,
it doesn't make sense to use cache - we don't read from the table
and m_fs_info->io_cache is read sequentially
*/
bool request_cache = !m_fs_info.using_addon_fields();
m_result_iterator.reset(
new (&m_result_iterator_holder.sort_file_indirect)
SortFileIndirectIterator(
thd(), table, m_sort_result.io_cache, request_cache,
/*ignore_not_found_rows=*/false, m_examined_rows));
}
m_sort_result.io_cache =
nullptr; // The result iterator has taken ownership.
} else {
/*m_sort_result.has_result_in_memory()为真的情况只发生在save_index函数调用中,
* 只有完全处于内存中排序的情况才会发生。
*/
DBUG_ASSERT(m_sort_result.has_result_in_memory());
if (m_fs_info.using_addon_fields()) {
DBUG_PRINT("info", ("using SortBufferIterator"));
DBUG_ASSERT(m_sort_result.sorted_result_in_fsbuf);
if (m_fs_info.addon_fields->using_packed_addons())
m_result_iterator.reset(
new (&m_result_iterator_holder.sort_buffer_packed_addons)
SortBufferIterator<true>(thd(), table, &m_fs_info,
&m_sort_result, m_examined_rows));
else
m_result_iterator.reset(
new (&m_result_iterator_holder.sort_buffer)
SortBufferIterator<false>(thd(), table, &m_fs_info,
&m_sort_result, m_examined_rows));
} else {
//使用filesort排序发生sort_merge_pass的情况下,会走到这里。
DBUG_PRINT("info", ("using SortBufferIndirectIterator (sort)"));
m_result_iterator.reset(
new (&m_result_iterator_holder.sort_buffer_indirect)
SortBufferIndirectIterator(thd(), table, &m_sort_result,
/*ignore_not_found_rows=*/false,
m_examined_rows));
}
}
return m_result_iterator->Init();
}
按照调用顺序,接下来是Dosort():
Dosort()其中方法调用的脉络,有个总体清晰的印象:
,看看DoSort()方法的实现,同样的,关键部分有注释
/*
Do the actual sort, by calling filesort. The result will be left in one of
several places depending on what sort strategy we chose; it is up to Init() to
figure out what happened and create the appropriate iterator to read from it.
RETURN VALUES
0 ok
-1 Some fatal error
1 No records
*/
int SortingIterator::DoSort() {
DBUG_ASSERT(m_sort_result.io_cache == nullptr);
//初始化排序结果存储io_cache
m_sort_result.io_cache =
(IO_CACHE *)my_malloc(key_memory_TABLE_sort_io_cache, sizeof(IO_CACHE),
MYF(MY_WME | MY_ZEROFILL));
if (m_qep_tab != nullptr) {
JOIN *join = m_qep_tab->join();
if (join != nullptr && join->unit->root_iterator() == nullptr) {
/* Fill schema tables with data before filesort if it's necessary */
if ((join->select_lex->active_options() & OPTION_SCHEMA_TABLE) &&
get_schema_tables_result(join, PROCESSED_BY_CREATE_SORT_INDEX))
return -1;
}
}
ha_rows found_rows;
//filesort排序真正发生的地方
bool error = filesort(thd(), m_filesort, m_source_iterator.get(), &m_fs_info,
&m_sort_result, &found_rows);
if (m_qep_tab != nullptr) {
m_qep_tab->set_records(found_rows); // For SQL_CALC_FOUND_ROWS
}
m_filesort->table->set_keyread(false); // Restore if we used indexes
return error;
}
filesort()方法实现,关键部分有注释:
/**
Sort a table.
Creates a set of pointers that can be used to read the rows
in sorted order. This should be done with the functions
in records.cc.
The result set is stored in fs_info->io_cache or
fs_info->sorted_result, or left in the main filesort buffer.
@param thd Current thread
@param filesort How to sort the table
@param source_iterator Where to read the rows to be sorted from.
@param fs_info Owns the buffers for sort_result.
@param sort_result Where to store the sort result.
@param[out] found_rows Store the number of found rows here.
This is the number of found rows after
applying WHERE condition.
@note
If we sort by position (like if sort_positions is 1) filesort() will
call table->prepare_for_position().
@returns False if success, true if error
*/
bool filesort(THD *thd, Filesort *filesort, RowIterator *source_iterator,
Filesort_info *fs_info, Sort_result *sort_result,
ha_rows *found_rows) {
int error;
ulong memory_available = thd->variables.sortbuff_size;
ha_rows num_rows_found = HA_POS_ERROR;
ha_rows num_rows_estimate = HA_POS_ERROR;
IO_CACHE tempfile; // Temporary file for storing intermediate results.
IO_CACHE chunk_file; // For saving Merge_chunk structs.
IO_CACHE *outfile; // Contains the final, sorted result.
Sort_param *param = &filesort->m_sort_param;
TABLE *const table = filesort->table;
ha_rows max_rows = filesort->limit;
uint s_length = 0;
DBUG_TRACE;
// 'pushed_join' feature need to read the partial joined results directly
// from the NDB API. First storing it into a temporary table, means that
// any joined child results are effectively wasted, and we will have to
// re-read them as non-pushed later.
DBUG_ASSERT(!table->file->member_of_pushed_join());
if (!(s_length = filesort->sort_order_length()))
return true; /* purecov: inspected */
DBUG_ASSERT(!table->reginfo.join_tab);
DEBUG_SYNC(thd, "filesort_start");
DBUG_ASSERT(sort_result->sorted_result == nullptr);
sort_result->sorted_result_in_fsbuf = false;
outfile = sort_result->io_cache;
my_b_clear(&tempfile);
my_b_clear(&chunk_file);
error = 1;
// Make sure the source iterator is initialized before init_for_filesort(),
// since table->file (and in particular, ref_length) may not be initialized
// before that.
DBUG_EXECUTE_IF("bug14365043_1", DBUG_SET("+d,ha_rnd_init_fail"););
if (source_iterator->Init()) {
return HA_POS_ERROR;
}
/*
We need a nameless wrapper, since we may be inside the "steps" of
"join_execution".
*/
Opt_trace_context *const trace = &thd->opt_trace;
Opt_trace_object trace_wrapper(trace);
trace_wrapper.add_alnum("sorting_table", table->alias);
trace_filesort_information(trace, filesort->sortorder, s_length);
/*
*sortlength(thd, filesort->sortorder, s_length),根据order by后面的排序项计算排序key长度。其中,除json类型之外,都使用固定长度,json类型使用可变长度。
*/
param->init_for_filesort(filesort, make_array(filesort->sortorder, s_length),
sortlength(thd, filesort->sortorder, s_length),
table, max_rows, filesort->m_remove_duplicates);
fs_info->addon_fields = param->addon_fields;
//累加Sort_scan状态变量值。
thd->inc_status_sort_scan();
if (table->file->inited) {
if (table->s->tmp_table)
table->file->info(HA_STATUS_VARIABLE); // Get record count
//估算表最大记录数。公式为(cluster leaf page数量*page size / 记录最小大小)*2扩展因子
num_rows_estimate = table->file->estimate_rows_upper_bound();
} else {
// If number of rows is not known, use as much of sort buffer as possible.
num_rows_estimate = HA_POS_ERROR;
}
Bounded_queue<uchar *, uchar *, Sort_param, Mem_compare_queue_key> pq(
param->max_record_length(),
(Malloc_allocator<uchar *>(key_memory_Filesort_info_record_pointers)));
// We would have liked to do this ahead-of-time so that we can show it
// in EXPLAIN. However, we don't know the table->file->ref_length before
// sorting time, which makes it hard to make the decision if we're row IDs.
// (If we sort rows, we would know, but it's not much use knowing it
// ahead-of-time _sometimes_.)
//
// However, do note this cannot change the addon fields status,
// so that we at least know that when checking whether we can skip
// in-between temporary tables (StreamingIterator).
/*
* if里面是一个优先级队列的东东,如果"最大行数/3"能够在sort_buffer_size的内存空间中排序完成的话,
* 就使用优先级队列进行排序。
*/
if (check_if_pq_applicable(trace, param, fs_info, num_rows_estimate,
memory_available)) {
DBUG_PRINT("info", ("filesort PQ is applicable"));
/*
For PQ queries (with limit) we know exactly how many pointers/records
we have in the buffer, so to simplify things, we initialize
all pointers here. (We cannot pack fields anyways, so there is no
point in doing incremental allocation).
*/
if (fs_info->preallocate_records(param->max_rows_per_buffer)) {
my_error(ER_OUT_OF_SORTMEMORY, ME_FATALERROR);
LogErr(ERROR_LEVEL, ER_SERVER_OUT_OF_SORTMEMORY);
goto err;
}
if (pq.init(param->max_rows, param, fs_info->get_sort_keys())) {
/*
If we fail to init pq, we have to give up:
out of memory means my_malloc() will call my_error().
*/
DBUG_PRINT("info", ("failed to allocate PQ"));
fs_info->free_sort_buffer();
DBUG_ASSERT(thd->is_error());
goto err;
}
filesort->using_pq = true;
param->using_pq = true;
param->m_addon_fields_status = Addon_fields_status::using_priority_queue;
} else {
DBUG_PRINT("info", ("filesort PQ is not applicable"));
filesort->using_pq = false;
param->using_pq = false;
/*
When sorting using priority queue, we cannot use packed addons.
Without PQ, we can try.
*/
/*
* 如果能pack的话,需要加上4字节的长度字节。能pack的标准是打包前的长度不能小于14字节。
*/
param->try_to_pack_addons();
/*
NOTE: param->max_rows_per_buffer is merely informative (for optimizer
trace) in this case, not actually used.
*/
//如果评估出来的行数少于15行
if (num_rows_estimate < MERGEBUFF2) num_rows_estimate = MERGEBUFF2;
//keys等于最大可用内存(sort_buffer_size变量定义)除以(最大记录大小+记录指针大小)
ha_rows keys =
memory_available / (param->max_record_length() + sizeof(char *));
//计算buffer可容纳多少记录
param->max_rows_per_buffer =
min(num_rows_estimate > 0 ? num_rows_estimate : 1, keys);
fs_info->set_max_size(memory_available, param->max_record_length());
}
param->sort_form = table;
size_t longest_key, longest_addons;
longest_addons = 0;
// New scope, because subquery execution must be traced within an array.
{
Opt_trace_array ota(trace, "filesort_execution");
/*
* 读取所有排序行,这里逻辑大致为先分配一个32k的buffer用来存读取的行。这里从32k开始,是因为malloc调用有一个128K的阈值,
* 超过此值的话是从进程地址空间的文件映射区域分配内存,在此值之下,都是从进程堆内存进行分配,可以提升内存分配速度。
* 随着行不断读取后塞满buffer,
* 接着分配下一个1.5倍大小的buffer进行存储,所有分配的buffer都保存在一个名为chunk结构中。
* 随着buffer越分越多,总的buffer大小会接近可用排序内存最大值(sort_buffer_size变量定义)
* 此时,将整个chunk包含的所有buffer中的记录进行排序后(排序最终使用c++标准库按照内存字节进行排序),
* 将有序记录写出到写出到tempfile文件中,并将此chunk中包含的记录在tempfile中的开始位置以及
* 记录数等信息构造成一个Merge_chunk对象写入chunk_file。
* 重复以上步骤直到数据读取完毕。
* 如果使用优先级队列的话,是直接在内存中排序的,比较简单,就不说了。
*/
num_rows_found =
read_all_rows(thd, param, fs_info, &chunk_file, &tempfile,
param->using_pq ? &pq : nullptr, source_iterator,
found_rows, &longest_key, &longest_addons);
if (num_rows_found == HA_POS_ERROR) goto err;
}
size_t num_chunks, num_initial_chunks;
if (my_b_inited(&chunk_file)) {
//计算最终使用了多少个chunk。
num_chunks =
static_cast<size_t>(my_b_tell(&chunk_file)) / sizeof(Merge_chunk);
} else {
num_chunks = 0;
}
num_initial_chunks = num_chunks;
if (num_chunks == 0) // The whole set is in memory
{
//如果chunk数量为0,说明所有记录都可以在内存中进行存储、排序。直接使用save_index函数在内存排序并将结果写出到sort_result.io_cache中。
ha_rows rows_in_chunk =
param->using_pq ? pq.num_elements() : num_rows_found;
if (save_index(param, rows_in_chunk, fs_info, sort_result)) goto err;
} else {
// If deduplicating, we'll need to remember the previous key somehow.
if (filesort->m_remove_duplicates) {
param->m_last_key_seen =
static_cast<uchar *>(thd->mem_root->Alloc(longest_key));
}
// We will need an extra buffer in SortFileIndirectIterator
if (fs_info->addon_fields != nullptr &&
!(fs_info->addon_fields->allocate_addon_buf(longest_addons)))
goto err; /* purecov: inspected */
fs_info->read_chunk_descriptors(&chunk_file, num_chunks);
if (fs_info->merge_chunks.is_null()) goto err; /* purecov: inspected */
close_cached_file(&chunk_file);
/* Open cached file if it isn't open */
if (!my_b_inited(outfile) &&
open_cached_file(outfile, mysql_tmpdir, TEMP_PREFIX, READ_RECORD_BUFFER,
MYF(MY_WME)))
goto err;
if (reinit_io_cache(outfile, WRITE_CACHE, 0L, false, false)) goto err;
param->max_rows_per_buffer = static_cast<uint>(
fs_info->max_size_in_bytes() / param->max_record_length());
/*fs_info->get_contiguous_buffer()一次性分配了一个最大大小的排序缓冲区
* 如果sort_buffer_size设置过大的话,malloc调用可能使用mmap方式进行,内存分配速度会降低。
*/
Bounds_checked_array<uchar> merge_buf = fs_info->get_contiguous_buffer();
if (merge_buf.array() == nullptr) {
my_error(ER_OUT_OF_SORTMEMORY, ME_FATALERROR);
LogErr(ERROR_LEVEL, ER_SERVER_OUT_OF_SORTMEMORY);
goto err;
}
/*这里是多路归并的重点,大致的逻辑是将所有chunk按照写入顺序,分为7个一组从tempfile文件读取,
* 从每组中的7个chunk中平均读一部分数据到排序缓冲区中,进行7路归并,使用优先级队列排序算法,
* 归并的结果写出到to_file2中,此时这7个chunk合并为一个有序的大chunk。接着重复进行上述步骤归并剩下的
* chunk分组。第一轮归并结束后,将tempfile与to_file2交换,
* 从to_file2读取结果,归并排序结果输出到tempfile,并仍按照相同算法进行第二轮、第三轮归并,
* 每轮结束后交换输入输出文件。直到最后一轮归并结束后剩下不大于15个有序chunk组。
* merge_many_buff中调用了merge_buffers方法,其中累积了Sort_merge_passes状态变量,每一轮每一次7路归并都累加1。
*/
if (merge_many_buff(thd, param, merge_buf, fs_info->merge_chunks,
&num_chunks, &tempfile))
goto err;
/*io_cache是另一个值得细说的部分,io_cache对应了一个tmp文件,最终的结果集都是写出到这个文件中
* 但是io_cache同时存在了一个16K/32K的内存缓冲buffer,用于缓存写入的数据,加速写入。
* 因此,进行后续动作之前,需要先把buffer刷出去。并将io_cache的类型由WRITE_CACHE变更为READ_CACHE,为后面的读取做准备
*/
if (flush_io_cache(&tempfile) ||
reinit_io_cache(&tempfile, READ_CACHE, 0L, false, false))
goto err;
/*
* 上面merge_many_buff之后,留下了不大于15组有序的chunk。这里需要对这些chunk进行一次归并排序后,
* 将排序key去掉,将addon_field数据写出到outfile中。
*/
if (merge_index(
thd, param, merge_buf,
Merge_chunk_array(fs_info->merge_chunks.begin(), num_chunks),
&tempfile, outfile))
goto err;
sort_result->found_records = num_rows_found;
}
if (trace->is_started()) {
char buffer[100];
String sort_mode(buffer, sizeof(buffer), &my_charset_bin);
sort_mode.length(0);
sort_mode.append("<");
if (param->using_varlen_keys())
sort_mode.append("varlen_sort_key");
else
sort_mode.append("fixed_sort_key");
sort_mode.append(", ");
sort_mode.append(param->using_packed_addons()
? "packed_additional_fields"
: param->using_addon_fields() ? "additional_fields"
: "rowid");
sort_mode.append(">");
const char *algo_text[] = {"none", "std::sort", "std::stable_sort"};
Opt_trace_object filesort_summary(trace, "filesort_summary");
filesort_summary.add("memory_available", memory_available)
.add("key_size", param->max_compare_length())
.add("row_size", param->max_record_length())
.add("max_rows_per_buffer", param->max_rows_per_buffer)
.add("num_rows_estimate", num_rows_estimate)
.add("num_rows_found", num_rows_found)
.add("num_initial_chunks_spilled_to_disk", num_initial_chunks)
.add("peak_memory_used", fs_info->peak_memory_used())
.add_alnum("sort_algorithm", algo_text[param->m_sort_algorithm]);
if (!param->using_packed_addons())
filesort_summary.add_alnum(
"unpacked_addon_fields",
addon_fields_text(param->m_addon_fields_status));
filesort_summary.add_alnum("sort_mode", sort_mode.c_ptr());
}
if (num_rows_found > param->max_rows) {
// If read_all_rows() produced more results than the query LIMIT.
num_rows_found = param->max_rows;
}
error = 0;
err:
if (!filesort->keep_buffers) {
if (!sort_result->sorted_result_in_fsbuf) fs_info->free_sort_buffer();
my_free(fs_info->merge_chunks.array());
fs_info->merge_chunks = Merge_chunk_array(nullptr, 0);
}
close_cached_file(&tempfile);
close_cached_file(&chunk_file);
if (my_b_inited(outfile)) {
if (flush_io_cache(outfile)) error = 1;
{
my_off_t save_pos = outfile->pos_in_file;
/* For following reads */
if (reinit_io_cache(outfile, READ_CACHE, 0L, false, false)) error = 1;
outfile->end_of_file = save_pos;
}
}
if (error) {
DBUG_ASSERT(thd->is_error() || thd->killed);
} else
/*
* 这里累积了状态变量Sort_rows。需要注意的是,这个值相比真正参与排序的记录数是偏小的。
*/
thd->inc_status_sort_rows(num_rows_found);
return error;
} /* filesort */
init_for_filesort():
void Sort_param::init_for_filesort(Filesort *file_sort,
Bounds_checked_array<st_sort_field> sf_array,
uint sortlen, TABLE *table, ha_rows maxrows,
bool remove_duplicates) {
m_fixed_sort_length = sortlen;
m_force_stable_sort = file_sort->m_force_stable_sort;
m_remove_duplicates = remove_duplicates;
ref_length = table->file->ref_length;
local_sortorder = sf_array;
/*出于如下几种原因,不能使用addon_fields。除此之外,将字段值直接放置在sort key后部。
* fulltext_searched,
* keep_rowid,
* row_not_packable,
* row_contains_blob,
* skip_heuristic,
* using_priority_queue
*/
decide_addon_fields(file_sort, table, file_sort->m_force_sort_positions);
if (using_addon_fields()) {
fixed_res_length = m_addon_length;
} else {
fixed_res_length = ref_length;
/*
The reference to the record is considered
as an additional sorted field
*/
/*
* 正如上面注释说的,如果使用rowid的话,那rowid需要作为sort key使用。为啥?可能是考虑到排序稳定性。
* 考虑两条完全相同的记录A和B,在分页的情况下,如果第一次排序返回顺序A > B,后面排序返回顺序B > A。
* 如果在两条记录中间发生页面切换,A可能出现在前后两个页面,而B可能一次都不会出现。但是在后面实现
* 的时候,并没有将rowid一起进行排序。
*/
AddWithSaturate(ref_length, &m_fixed_sort_length);
}
m_num_varlen_keys = count_varlen_keys();
m_num_json_keys = count_json_keys();
if (using_varlen_keys()) {
//如果有可变长度的字段,需要添加4字节的可变字段数量值
AddWithSaturate(size_of_varlength_field, &m_fixed_sort_length);
}
/*
Add hash at the end of sort key to order cut values correctly.
Needed for GROUPing, rather than for ORDERing.
*/
//如果是json排序项,在末尾添加8字节,保存hash值。
if (using_json_keys()) {
use_hash = true;
AddWithSaturate(sizeof(ulonglong), &m_fixed_sort_length);
}
//m_fixed_rec_length就是每个rec的长度。
m_fixed_rec_length = AddWithSaturate(m_fixed_sort_length, m_addon_length);
max_rows = maxrows;
}
init_for_filesort()其中,大家可能比较关注decide_addon_fields(file_sort, table, file_sort->m_force_sort_positions);方法调用,这里决定了sort_key后面是跟着addon_fields还是rowid:
void Sort_param::decide_addon_fields(Filesort *file_sort, TABLE *table,
bool force_sort_positions) {
if (m_addon_fields_status != Addon_fields_status::unknown_status) {
// Already decided.
return;
}
DBUG_EXECUTE_IF("filesort_force_sort_row_ids", {
m_addon_fields_status = Addon_fields_status::keep_rowid;
return;
});
// Generally, prefer using addon fields (ie., sorting rows instead of just
// row IDs) if we can.
//
// NOTE: If the table is already in memory (e.g. the MEMORY engine; see the
// HA_FAST_KEY_READ flag), it would normally be beneficial to sort row IDs
// over rows to get smaller sort chunks. However, eliding the temporary table
// entirely is even better than using row IDs, and only possible if we sort
// rows. Furthermore, since we set up filesort at optimize time, we don't know
// yet whether the source table would _remain_ in memory, or could be spilled
// to disk using InnoDB. Thus, the only case that reasonably remains where
// we'd want to use row IDs without being forced would be on user (ie.,
// non-temporary) MEMORY tables, which doesn't seem reasonable to add
// complexity for.
//如果是全文检索,则用不上addon_fields
if (table->fulltext_searched) {
// MATCH() (except in “boolean mode”) doesn't use the actual value, it just
// goes and asks the handler directly for the current row. Thus, we need row
// IDs, so that the row is positioned correctly.
//
// When sorting a join, table->fulltext_searched will be false, but items
// (like Item_func_match) are materialized (by StreamingIterator or
// MaterializeIterator) before the sort, so this is moot.
m_addon_fields_status = Addon_fields_status::fulltext_searched;
} else if (force_sort_positions) {
/*如果强制使用rowid,也用不上addon_fields。那什么情况下会强制使用rowid?
* 好问题,我也暂时还没完全理清楚。
*/
m_addon_fields_status = Addon_fields_status::keep_rowid;
} else {
/*
Get the descriptors of all fields whose values are appended
to sorted fields and get its total length in m_addon_length.
*/
//从代码里来看,这里就可以使用addon_fields了
addon_fields = file_sort->get_addon_fields(
table, &m_addon_fields_status, &m_addon_length, &m_packable_length);
}
}
当然,除了代码中列出来的两种原因,这里还列出了其他不能使用addon_fields的原因:
第三种及以后的原因不能使用addon_fields(直接将select字段值放置在sort key后面,而不是放置rowid)。下文都有机会揭示后面几种原因。
enum class Addon_fields_status {
unknown_status,
using_addon_fields,
// The remainder are reasons why we are _not_ using addon fields.
fulltext_searched,
keep_rowid,
row_not_packable,
row_contains_blob,
skip_heuristic,
using_priority_queue
};
check_if_pq_applicable()里会判断下是否可以使用优先级队列在内存中排序,这个对应了上文的using_priority_queue的原因,这个比较简单,就不说了。
Sort_param::try_to_pack_addons()在上面代码中有注释,就不细说了。
read_all_rows()是比较重要的,用于读取所有待排序的行,描述了排序缓冲区是如何从一个最小32KB的block,如何一步步增长到总大小sort_buffer_size大小的缓冲区。此方法的实现,在源代码中有一段伪码的描述,这里偷个懒,直接放上来:
while (get_next_sortkey())
{
if (using priority queue)
push sort key into queue
else
{
try to put sort key into buffer;
if (no free space in sort buffer)
{
do {
allocate new, larger buffer;
retry putting sort key into buffer;
} until (record fits or no space for new buffer)
if (no space for new buffer)
{
sort record pointers (all buffers);
dump sorted sequence to 'tempfile';
dump Merge_chunk describing sequence location into 'chunk_file';
}
}
if (key was packed)
tell sort buffer the actual number of bytes used;
}
}
if (buffer has some elements && dumped at least once)
sort-dump-dump as above;
else
don't sort, leave sort buffer to be sorted by caller.
之后,就进入了两个分支,如果判断所有排序记录都可以容纳在排序缓冲区中,则进入save_index方法,进行内存排序;否则,进入merge文件排序过程,此过程有三个重要的方法,merge_many_buff()、merge_index()、merge_buffers().
save_index比较简单,就不说了,大家直接去看代码就好了。这里重点说一下merge过程。
首先是merge_many_buff:
接下来merge_buffers:
merge_index()方法用来做最后一次归并排序后,将排序结果去掉sort-key后,输出到outfile结果文件中。
整个排序过程涉及到两类重要的对象结构,第一个是Filesort_info,其中包括了排序过程的所有元数据,其中聚合了Filesort_buffer类,Filesort_buffer负责分配以及管理排序缓冲区内存。需要知道的是,排序缓冲区一开始是从32KB开始分配,随着使用的增长,逐渐增大到总大小sort_buffer_size大小。bool Filesort_buffer::allocate_block(size_t num_bytes)负责了排序缓冲区的分配以及增长。
另外一类重要的对象时IO_CACHE。这个对象主要包含两部分内容,buffer内存缓冲区以及对应的实际的文件。在读写的时候,数据会先进入到写入/读入到buffer,接着写出到对应的文件或者从buffer中一条条读取。整个排序过程中涉及到4个文件,一个tempfile存储了read_all_rows方法的输出数据,一个t_file2用来与tempfile两者之间不断进行对倒交换,以完成多轮归并排序,一个chunkfile用来存储归并过程中的chunk组信息,一个outfile用来存储最终排序后的结果集。此4个文件都是IO_CACHE类型结构。
tempfile与t_file2中存储的数据结构在sort_param.h中有描述,我就直接贴出来了:
/**
There are several record formats for sorting:
@verbatim
|<key a><key b>... | <rowid> |
/ m_fixed_sort_length / ref_len /
@endverbatim
or with "addon fields"
@verbatim
|<key a><key b>... |<null bits>|<field a><field b>...|
/ m_fixed_sort_length / addon_length /
@endverbatim
The packed format for "addon fields"
@verbatim
|<key a><key b>... |<length>|<null bits>|<field a><field b>...|
/ m_fixed_sort_length / addon_length /
@endverbatim
All the formats above have fixed-size keys, with appropriate padding.
Fixed-size keys can be compared/sorted using memcmp().
The packed (variable length) format for keys:
@verbatim
|<keylen>|<varkey a><key b>...<hash>|<rowid> or <addons> |
/ 4 bytes/ keylen bytes / ref_len or addon_length /
@endverbatim
This format is currently only used if we are sorting JSON data.
Variable-size keys must be compared piece-by-piece, using type information
about each individual key part, @see cmp_varlen_keys.
All the record formats consist of a (possibly composite) key,
followed by a (possibly composite) payload.
The key is used for sorting data. Once sorting is done, the payload is
stored in some buffer, and read by some RowIterator.
For fixed-size keys, with @<aaa@qq.com> payload, the @<aaa@qq.com> is also
considered to be part of the key.
<dl>
<dt>@<aaa@qq.com>
<dd> Fields are fixed-size, specially encoded with
Field::make_sort_key() so we can do byte-by-byte compare.
<dt>@<aaa@qq.com>
<dd> Contains the *actual* packed length (after packing) of
everything after the sort keys.
The size of the length field is 2 bytes,
which should cover most use cases: addon data <= 65535 bytes.
This is the same as max record size in MySQL.
<dt>@<null aaa@qq.com>
<dd> One bit for each nullable field, indicating whether the field
is null or not. May have size zero if no fields are nullable.
<dt>@<field aaa@qq.com>
<dd> Are stored with field->pack(), and retrieved with
field->unpack().
Addon fields within a record are stored consecutively, with no
"holes" or padding. They will have zero size for NULL values.
<dt>@<aaa@qq.com>
<dd> Contains the *actual* packed length of all the keys.
We may have an arbitrary mix of fixed and variable-sized keys.
<dt>@<aaa@qq.com>
<dd> Optional 8 byte hash, used for GROUPing of JSON values.
<dt>@<aaa@qq.com>
<dd> Used for JSON and variable-length string values, the format is:
</dl>
@verbatim
|<null value>|<key length>|<sort key> |
/ 1 byte / 4 bytes / key length bytes /
@endverbatim
<dl>
<dt>@<null aaa@qq.com>
<dd> 0x00 for NULL. 0xff for NULL under DESC sort. 0x01 for NOT NULL.
<dt>@<key aaa@qq.com>
<dd> The length of the sort key, *including* the four bytes for the
key length. Does not exist if the field is NULL.
</dl>
*/
到这里,整个排序过程基本就结束了,进入到了结果获取阶段,这部分的代码在SortingIterator::Init()的后半段,大家看上面SortingIterator::Init()的代码注释就好了。
最后,来澄清一些问题:
1,read_rnd_buffer_size在排序过程中,并没有起到实际的作用。IO_CACHE的buffer分配方法调用脉络:
最终的buffer内存的分配发生在init_io_cache_ext方法中,read_rnd_buffer_size起作用的地方在这一段代码:
if (!cachesize && !(cachesize = my_default_record_cache_size))
return 1; /* No cache requested */
其中my_default_record_cache_size代表read_rnd_buffer_size。可见,如果cachesize没定义的话,才会使用my_default_record_cache_size来设置cachesize。但是open_cache_file在排序过程中的几次调用都输入了cachesize大小:
sql/filesort.cc:
544: open_cached_file(outfile, mysql_tmpdir, TEMP_PREFIX, READ_RECORD_BUFFER,
1065: open_cached_file(chunk_file, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
1070: open_cached_file(tempfile, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
sql/merge_many_buff.h:
62: open_cached_file(&t_file2, mysql_tmpdir, TEMP_PREFIX, DISK_BUFFER_SIZE,
其中:
#define READ_RECORD_BUFFER (uint)(IO_SIZE * 8) /* Pointer_buffer_size */
#define DISK_BUFFER_SIZE (uint)(IO_SIZE * 16) /* Size of diskbuffer */
#define IO_SIZE 4096
可以看到,chunk_file、tempfile、t_file2分配的cachesisze为64KB大小,outfile为32KB大小。
2, 对于sort_key的长度,对于json类型会使用固定长度;另外,根据字符集不同,如果字符集有Pad_attribute==NO_PAD属性,则对于其他可变长度类型,使用可变长度;如果字符集Pad_attribute==PAD_SPACE属性,因此除json类型字段外,其他字段都是固定长度。参与比较的长度为min(字段最大长度,max_sort_length)。。具体可查看uint sortlength(THD *thd, st_sort_field *sortorder, uint s_length)方法。因此,就排序来说,char与varchar没有区别。如果字段定义的长度过大,那么会造成排序缓冲区空间的极大浪费。顺便说一下,utf8mb4具有Pad_attribute==PAD_SPACE。
CHARSET_INFO my_charset_utf8mb4_general_ci = {
45,
0,
0, /* number */
MY_CS_COMPILED | MY_CS_STRNXFRM | MY_CS_UNICODE |
MY_CS_UNICODE_SUPPLEMENT, /* state */
MY_UTF8MB4, /* cs name */
MY_UTF8MB4_GENERAL_CI, /* name */
"UTF-8 Unicode", /* comment */
nullptr, /* tailoring */
nullptr, /* coll_param */
ctype_utf8mb4, /* ctype */
to_lower_utf8mb4, /* to_lower */
to_upper_utf8mb4, /* to_upper */
to_upper_utf8mb4, /* sort_order */
nullptr, /* uca */
nullptr, /* tab_to_uni */
nullptr, /* tab_from_uni */
&my_unicase_default, /* caseinfo */
nullptr, /* state_map */
nullptr, /* ident_map */
1, /* strxfrm_multiply */
1, /* caseup_multiply */
1, /* casedn_multiply */
1, /* mbminlen */
4, /* mbmaxlen */
1, /* mbmaxlenlen */
0, /* min_sort_char */
0xFFFF, /* max_sort_char */
' ', /* pad char */
false, /* escape_with_backslash_is_dangerous */
1, /* levels_for_compare */
&my_charset_utf8mb4_handler,
&my_collation_utf8mb4_general_ci_handler,
PAD_SPACE};
3,排序缓冲区在read_all_rows()过程中是从小到大按照1.5倍大小增长的,但是如果发生merge的话,是一次分配了sort_buffer_size大小的缓冲区。
最后说一下,CSDN真的垃圾,写了半天的东西,一个误操作control+z全没了。但凡正常一点的,control+z也应该是一步一步回退,也不至于一个control+z大部分文章都没有了。
推荐阅读
-
解析mysql与Oracle update的区别
-
MySQL 表种类即,MyISAM与innodb详细解析
-
mysql源代码安装细说_MySQL
-
解析mysql二进制日志处理事务与非事务性语句的区别_MySQL
-
优化-mysql使用roll up后出现Using temporary,Using filesort
-
PHP和MySQL Web开发(第4版)之第1章1.2.1源代码
-
解析mysql修改为utf8后仍然有乱码的问题_MySQL
-
解析获取优酷视频真实下载地址的PHP源代码_PHP教程
-
解析MySQL创建外键关联错误
-
Linux-6.5下基于cmake28来编译安装mysql服务配置解析_MySQL