欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

MySQL半同步slave操作教程

程序员文章站 2022-07-02 20:03:09
salve的半同步操作涉及一下几个函数: semisync_slave_plugin.cc:: Binlog_relay_IO_observer relay_io_ob...

salve的半同步操作涉及一下几个函数:

semisync_slave_plugin.cc::  
Binlog_relay_IO_observer relay_io_observer = {  
  sizeof(Binlog_relay_IO_observer), // len  
  
  repl_semi_slave_io_start, // start  
  repl_semi_slave_io_end,   // stop  
  repl_semi_slave_request_dump, // request_transmit  
  repl_semi_slave_read_event,   // after_read_event  
  repl_semi_slave_queue_event,  // after_queue_event  
  repl_semi_reset_slave,    // reset  
}; 

下面分别解析:

Binlog_relay_IO_delegate::thread_start-->
int repl_semi_slave_io_start(Binlog_relay_IO_param *param)
{
  return repl_semisync.slaveStart(param);
}
int ReplSemiSyncSlave::slaveStart(Binlog_relay_IO_param *param)
{
  bool semi_sync= getSlaveEnabled();

  if (semi_sync && !rpl_semi_sync_slave_status)
    rpl_semi_sync_slave_status= 1;
  return 0;
}
int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
{
  if (rpl_semi_sync_slave_status)
    rpl_semi_sync_slave_status= 0;
  if (mysql_reply)
    mysql_close(mysql_reply);
  mysql_reply= 0;
  return 0;
}

1、thread_start的功能是:slave开启半同步情况下,将rpl_semi_sync_slave_status变量置成1

2、thread_stop功能正好相反,将rpl_semi_sync_slave_status置成0。断开与master的连接

int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
				 uint32 flags)
{
  MYSQL *mysql= param->mysql;
  MYSQL_RES *res= 0;
  MYSQL_ROW row;
  const char *query;
  /*
  1、半同步没有启用,返回
  */
  if (!repl_semisync.getSlaveEnabled())
    return 0;

  /* 
  2、向master发送SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'
     看master是否按照半同步插件,并启用master半同步。
     2.1 master不支持半同步,则将rpl_semi_sync_slave_status=0,执行异步复制
	 2.2 否则,向master发送SET @rpl_semi_sync_slave= 1,告诉master dump thread
	     slave想要半同步复制,并将rpl_semi_sync_slave_status=1
  */
  query= "SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'";
  if (mysql_real_query(mysql, query, strlen(query)) ||
      !(res= mysql_store_result(mysql)))
  {
    sql_print_error("Execution failed on master: %s", query);
    return 1;
  }
  row= mysql_fetch_row(res);
  if (!row)
  {
    /* Master does not support semi-sync */
    sql_print_warning("Master server does not support semi-sync, "
                      "fallback to asynchronous replication");
    rpl_semi_sync_slave_status= 0;
    mysql_free_result(res);
    return 0;
  }
  mysql_free_result(res);

  /*
    Tell master dump thread that we want to do semi-sync
    replication
  */
  query= "SET @rpl_semi_sync_slave= 1";
  if (mysql_real_query(mysql, query, strlen(query)))
  {
    sql_print_error("Set 'rpl_semi_sync_slave=1' on master failed");
    return 1;
  }
  mysql_free_result(mysql_store_result(mysql));
  rpl_semi_sync_slave_status= 1;
  return 0;
}
//主要功能是读取包头,看下接收到的event是否需要发送半同步ACK
int repl_semi_slave_read_event(Binlog_relay_IO_param *param,
			       const char *packet, unsigned long len,
			       const char **event_buf, unsigned long *event_len)
{
  if (rpl_semi_sync_slave_status)
    return repl_semisync.slaveReadSyncHeader(packet, len,
					     &semi_sync_need_reply,
					     event_buf, event_len);
  *event_buf= packet;
  *event_len= len;
  return 0;
}
int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
				const char *event_buf,
				unsigned long event_len,
				uint32 flags)
{
  //半同步下,由上面函数得到需要回复ACK
  if (rpl_semi_sync_slave_status && semi_sync_need_reply)
  {
    (void) repl_semisync.slaveReply(param->mysql,
                                    param->master_log_name,
                                    param->master_log_pos);
  }
  return 0;
}
//向master发送向主库发送数据包,包括当前的binlog文件名及偏移量信息
int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
                                 const char *binlog_filename,
                                 my_off_t binlog_filepos)
{
  const char *kWho = "ReplSemiSyncSlave::slaveReply";
  NET *net= &mysql->net;
  uchar reply_buffer[REPLY_MAGIC_NUM_LEN
                     + REPLY_BINLOG_POS_LEN
                     + REPLY_BINLOG_NAME_LEN];
  int  reply_res, name_len = strlen(binlog_filename);

  function_enter(kWho);

  /* Prepare the buffer of the reply. */
  reply_buffer[REPLY_MAGIC_NUM_OFFSET] = kPacketMagicNum;
  int8store(reply_buffer + REPLY_BINLOG_POS_OFFSET, binlog_filepos);
  memcpy(reply_buffer + REPLY_BINLOG_NAME_OFFSET,
         binlog_filename,
         name_len + 1 /* including trailing '\0' */);

  net_clear(net, 0);
  /* Send the reply. */
  reply_res = my_net_write(net, reply_buffer,
                           name_len + REPLY_BINLOG_NAME_OFFSET);
  if (!reply_res)
  {
    reply_res = net_flush(net);
    if (reply_res)
      sql_print_error("Semi-sync slave net_flush() reply failed");
  }
  else
  {
    sql_print_error("Semi-sync slave send reply failed: %s (%d)",
                    net->last_error, net->last_errno);
  }

  return function_exit(kWho, reply_res);
}