[debezium 源码分析] BinlogReader和SnapshotReader
debezium
工作方式主要有2
种,分别是全量和增量订阅。 全量对应SnapshotReader
,增量订阅对应BinlogReader
;前者会读取全表数据,后者会对binlog
事件作出对应的处理逻辑。
SnapshotReader
该类内最重要的方法就是execute()
,调用该方法也就是意味着一次snapshot
操作,全量任务是在子线程里运行的。现在按序分析该方法的执行过程。
step 0
...
logger.info("Step 0: disabling autocommit and enabling repeatable read transactions");
mysql.setAutoCommit(false);
sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
mysql.execute(sql.get());
...
将mysql
事务隔离级别设置为REPEATABLE READ
;
step 1
...
try {
logger.info("Step 1: flush and obtain global read lock to prevent writes to database");
sql.set("FLUSH TABLES WITH READ LOCK");
mysql.execute(sql.get());
lockAcquired = clock.currentTimeInMillis();
metrics.globalLockAcquired();
isLocked = true;
} catch (SQLException e) {
logger.info("Step 1: unable to flush and acquire global read lock, will use table read locks after reading table names");
// Continue anyway, since RDS (among others) don't allow setting a global lock
assert !isLocked;
}
...
对数据库加全局只读锁。如果是生产环境,需要注意这一点。
step 2
logger.info("Step 2: start transaction with consistent snapshot");
sql.set("START TRANSACTION WITH CONSISTENT SNAPSHOT");
mysql.execute(sql.get());
isTxnStarted = true;
对存储引擎使用一致性读。根据mysql
官网内容,除了REPEATABLE READ
事务隔离级别,其它级别下WITH CONSISTENTSNAPSHOT
会被忽略。
step 3
step = 3;
if (isLocked) {
// Obtain the binlog position and update the SourceInfo in the context. This means that all source records
// generated as part of the snapshot will contain the binlog position of the snapshot.
readBinlogPosition(step++, source, mysql, sql);
}
// -------------------
// READ DATABASE NAMES
// -------------------
// Get the list of databases ...
if (!isRunning()) return;
logger.info("Step {}: read list of available databases", step++);
final List<String> databaseNames = new ArrayList<>();
sql.set("SHOW DATABASES");
mysql.query(sql.get(), rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
}
});
logger.info("\t list of available databases is: {}", databaseNames);
如果全局读锁加锁成功,会调用readBinlogPosition
方法。看readBinlogPosition
方法
protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection mysql, AtomicReference<String> sql) throws SQLException {
logger.info("Step {}: read binlog position of MySQL master", step);
String showMasterStmt = "SHOW MASTER STATUS";
sql.set(showMasterStmt);
mysql.query(sql.get(), rs -> {
if (rs.next()) {
String binlogFilename = rs.getString(1);
long binlogPosition = rs.getLong(2);
source.setBinlogStartPoint(binlogFilename, binlogPosition);
if (rs.getMetaData().getColumnCount() > 4) {
// This column exists only in MySQL 5.6.5 or later ...
String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
source.setCompletedGtidSet(gtidSet);
logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
gtidSet);
} else {
logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
}
source.startSnapshot();
} else {
throw new IllegalStateException("Cannot read the binlog filename and position via '" + showMasterStmt
+ "'. Make sure your server is correctly configured");
}
});
}
该方法会读取当前binlog
文件名以及位点,并保存。现在回到step 3
剩余代码,获取所有可用的数据库名。
step 4
logger.info("Step {}: read list of available tables in each database", step++);
List<TableId> tableIds = new ArrayList<>();
final Map<String, List<TableId>> tableIdsByDbName = new HashMap<>();
final Set<String> readableDatabaseNames = new HashSet<>();
for (String dbName : databaseNames) {
try {
// MySQL sometimes considers some local files as databases (see DBZ-164),
// so we will simply try each one and ignore the problematic ones ...
sql.set("SHOW FULL TABLES IN " + quote(dbName) + " where Table_Type = 'BASE TABLE'");
mysql.query(sql.get(), rs -> {
while (rs.next() && isRunning()) {
TableId id = new TableId(dbName, null, rs.getString(1));
if (filters.tableFilter().test(id)) {
tableIds.add(id);
tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
logger.info("\t including '{}'", id);
} else {
logger.info("\t '{}' is filtered out, discarding", id);
}
}
});
readableDatabaseNames.add(dbName);
} catch (SQLException e) {
// We were unable to execute the query or process the results, so skip this ...
logger.warn("\t skipping database '{}' due to error reading tables: {}", dbName, e.getMessage());
}
}
final Set<String> includedDatabaseNames = readableDatabaseNames.stream().filter(filters.databaseFilter()).collect(Collectors.toSet());
logger.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames);
获取数据库所有可用的表,之后对其进行过滤,得到用户想要的表。
step 5
if (!isLocked) {
// ------------------------------------
// LOCK TABLES and READ BINLOG POSITION
// ------------------------------------
// We were not able to acquire the global read lock, so instead we have to obtain a read lock on each table.
// This requires different privileges than normal, and also means we can't unlock the tables without
// implicitly committing our transaction ...
if (!context.userHasPrivileges("LOCK TABLES")) {
// We don't have the right privileges
throw new ConnectException("User does not have the 'LOCK TABLES' privilege required to obtain a "
+ "consistent snapshot by preventing concurrent writes to tables.");
}
// We have the required privileges, so try to lock all of the tables we're interested in ...
logger.info("Step {}: flush and obtain read lock for {} tables (preventing writes)", step++, tableIds.size());
String tableList = tableIds.stream()
.map(tid -> quote(tid))
.reduce((r, element) -> r+ "," + element)
.orElse(null);
if (tableList != null) {
sql.set("FLUSH TABLES " + tableList + " WITH READ LOCK");
mysql.execute(sql.get());
}
lockAcquired = clock.currentTimeInMillis();
metrics.globalLockAcquired();
isLocked = true;
tableLocks = true;
// Our tables are locked, so read the binlog position ...
readBinlogPosition(step++, source, mysql, sql);
}
如果数据库全局读锁加锁失败,就对每个表加表级读锁,然后调用readBinlogPosition
方法。
step 6
schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges);
// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds());
allTableIds.addAll(tableIds);
allTableIds.stream()
.filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
.forEach(tableId -> schema.applyDdl(source, tableId.schema(),
"DROP TABLE IF EXISTS " + quote(tableId),
this::enqueueSchemaChanges));
// Add a DROP DATABASE statement for each database that we no longer know about ...
schema.tables().tableIds().stream().map(TableId::catalog)
.filter(Predicates.not(readableDatabaseNames::contains))
.filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
.forEach(missingDbName -> schema.applyDdl(source, missingDbName,
"DROP DATABASE IF EXISTS " + quote(missingDbName),
this::enqueueSchemaChanges));
// Now process all of our tables for each database ...
for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) {
if (!isRunning()) break;
String dbName = entry.getKey();
// First drop, create, and then use the named database ...
schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + quote(dbName), this::enqueueSchemaChanges);
schema.applyDdl(source, dbName, "CREATE DATABASE " + quote(dbName), this::enqueueSchemaChanges);
schema.applyDdl(source, dbName, "USE " + quote(dbName), this::enqueueSchemaChanges);
for (TableId tableId : entry.getValue()) {
if (!isRunning()) break;
sql.set("SHOW CREATE TABLE " + quote(tableId));
mysql.query(sql.get(), rs -> {
if (rs.next()) {
schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
}
});
}
}
context.makeRecord().regenerate();
没看懂
step 7
if (minimalBlocking && isLocked) {
if (tableLocks) {
// We could not acquire a global read lock and instead had to obtain individual table-level read locks
// using 'FLUSH TABLE <tableName> WITH READ LOCK'. However, if we were to do this, the 'UNLOCK TABLES'
// would implicitly commit our active transaction, and this would break our consistent snapshot logic.
// Therefore, we cannot unlock the tables here!
// https://dev.mysql.com/doc/refman/5.7/en/flush.html
logger.info("Step {}: tables were locked explicitly, but to get a consistent snapshot we cannot "
+ "release the locks until we've read all tables.", step++);
} else {
// We are doing minimal blocking via a global read lock, so we should release the global read lock now.
// All subsequent SELECT should still use the MVCC snapshot obtained when we started our transaction
// (since we started it "...with consistent snapshot"). So, since we're only doing very simple SELECT
// without WHERE predicates, we can release the lock now ...
logger.info("Step {}: releasing global read lock to enable MySQL writes", step);
sql.set("UNLOCK TABLES");
mysql.execute(sql.get());
isLocked = false;
long lockReleased = clock.currentTimeInMillis();
metrics.globalLockReleased();
logger.info("Step {}: blocked writes to MySQL for a total of {}", step++,
Strings.duration(lockReleased - lockAcquired));
}
}
解除数据库级别的全局读锁。
step 8
第8
部代码很长慢慢分析;
BufferedBlockingConsumer<SourceRecord> bufferedRecordQueue = BufferedBlockingConsumer.bufferLast(super::enqueueRecord);
对enqueueRecord
方法的封装,此方法内部有一个records
队列,这是一个BlockingQueue
,该队列的数据在kafka connect
调用Task
的poll
方法时被读取。
protected void enqueueRecord(SourceRecord record) throws InterruptedException {
if (record != null) {
if (logger.isTraceEnabled()) {
logger.trace("Enqueuing source record: {}", record);
}
this.records.put(record);
}
}
Iterator<TableId> tableIdIter = tableIds.iterator();
while (tableIdIter.hasNext()) {
...
}
遍历tableIds
所有数据,这个List
里包含了所有需要全量的表。
//1.
RecordsForTable recordMaker = context.makeRecord().forTable(tableId, null, bufferedRecordQueue);
if (recordMaker != null) {
//2.
// Switch to the table's database ...
sql.set("USE " + quote(tableId.catalog()) + ";");
mysql.execute(sql.get());
AtomicLong numRows = new AtomicLong(-1);
AtomicReference<String> rowCountStr = new AtomicReference<>("<unknown>");
//3.
StatementFactory statementFactory = this::createStatementWithLargeResultSet;
if (largeTableCount > 0) {
try {
// Choose how we create statements based on the # of rows.
// This is approximate and less accurate then COUNT(*),
// but far more efficient for large InnoDB tables.
sql.set("SHOW TABLE STATUS LIKE '" + tableId.table() + "';");
mysql.query(sql.get(), rs -> {
if (rs.next()) numRows.set(rs.getLong(5));
});
if (numRows.get() <= largeTableCount) {
//4.
statementFactory = this::createStatement;
}
rowCountStr.set(numRows.toString());
} catch (SQLException e) {
// Log it, but otherwise just use large result set by default ...
logger.debug("Error while getting number of rows in table {}: {}", tableId, e.getMessage(), e);
}
}
// Scan the rows in the table ...
long start = clock.currentTimeInMillis();
logger.info("Step {}: - scanning table '{}' ({} of {} tables)", step, tableId, ++counter, tableIds.size());
//5.
Map<TableId, String> selectOverrides = getSnapshotSelectOverridesByTable();
String selectStatement = selectOverrides.getOrDefault(tableId, "SELECT * FROM " + quote(tableId));
logger.info("For table '{}' using select statement: '{}'", tableId, selectStatement);
sql.set(selectStatement);
//snapshot
try {
int stepNum = step;
//6.
mysql.query(sql.get(), statementFactory, rs -> {
long rowNum = 0;
try {
// The table is included in the connector's filters, so process all of the table records
// ...
final Table table = schema.tableFor(tableId);
final int numColumns = table.columns().size();
final Object[] row = new Object[numColumns];
while (rs.next()) {
for (int i = 0, j = 1; i != numColumns; ++i, ++j) {
Column actualColumn = table.columns().get(i);
row[i] = readField(rs, j, actualColumn);
}
recorder.recordRow(recordMaker, row, ts); // has no row number!
++rowNum;
if (rowNum % 100 == 0 && !isRunning()) {
// We've stopped running ...
break;
}
if (rowNum % 10_000 == 0) {
long stop = clock.currentTimeInMillis();
logger.info("Step {}: - {} of {} rows scanned from table '{}' after {}",
stepNum, rowNum, rowCountStr, tableId, Strings.duration(stop - start));
}
}
totalRowCount.addAndGet(rowNum);
if (isRunning()) {
long stop = clock.currentTimeInMillis();
logger.info("Step {}: - Completed scanning a total of {} rows from table '{}' after {}",
stepNum, rowNum, tableId, Strings.duration(stop - start));
}
} catch (InterruptedException e) {
Thread.interrupted();
// We were not able to finish all rows in all tables ...
logger.info("Step {}: Stopping the snapshot due to thread interruption", stepNum);
interrupted.set(true);
}
});
} finally {
metrics.completeTable();
if (interrupted.get()) break;
}
}
++completedCounter;
- 先获取某个表的消息生产器
recordMaker
;如果recordMaker
不为Null
,切换到该表所在的数据库。 - 初始化
statementFactory
,默认是一个用于大量数据的statement
构造方法。 - 执行
SHOW TABLE STATUS LIKE '" + tableId.table() + "';
,获取该表的数据行数。如果得到的数据行数小于定义的大表数据量,就将statementFactory
设为普通statement
生产方式。 - 构建全量
SQL
语句,调用getSnapshotSelectOverridesByTable
方法,用户设置的带有过滤条件的语句能够在这里变处理。 - 如果用户没有设置自己的
SQL
,那么执行的语句变为"SELECT * FROM " + quote(tableId)
语句。 - 执行
SQL
,对于每条数据,调用recorder.recordRow(recordMaker, row, ts);
方法,该方法将每条数据放入一个BlockingQueue
内,也就是AbstractReader
的records
变量里。
step 9
if (isTxnStarted) {
if (interrupted.get() || !isRunning()) {
// We were interrupted or were stopped while reading the tables,
// so roll back the transaction and return immediately ...
logger.info("Step {}: rolling back transaction after abort", step++);
sql.set("ROLLBACK");
mysql.execute(sql.get());
metrics.abortSnapshot();
return;
}
// Otherwise, commit our transaction
logger.info("Step {}: committing transaction", step++);
sql.set("COMMIT");
mysql.execute(sql.get());
metrics.completeSnapshot();
}
Commit
或者Rollback
;
step 10
if (isLocked) {
if (tableLocks) {
logger.info("Step {}: releasing table read locks to enable MySQL writes", step++);
} else {
logger.info("Step {}: releasing global read lock to enable MySQL writes", step++);
}
sql.set("UNLOCK TABLES");
mysql.execute(sql.get());
isLocked = false;
long lockReleased = clock.currentTimeInMillis();
metrics.globalLockReleased();
if (tableLocks) {
logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased - lockAcquired));
} else {
logger.info("Writes to MySQL tables prevented for a total of {}", Strings.duration(lockReleased - lockAcquired));
}
}
释放表级别的读锁。
SnapshotReader
核心代码分析完成。
BinlogReader
这里分析BinlogReader
几部分重要的代码;
BinlogReader初始化
client = new BinaryLogClient(context.hostname(), context.port(), context.username(), context.password());
...
client.registerEventListener(context.bufferSizeForBinlogReader() == 0 ? this::handleEvent : (new EventBuffer(context.bufferSizeForBinlogReader(), this))::add);
...
初始化一个Binlog
处理客户端,添加对事件的处理方式。这里看handleEvent
方法:
...
eventHandlers.getOrDefault(eventType, this::ignoreEvent).accept(event);
...
主要是一句,client
对于事件的处理逻辑里,会调用eventHandlers
的方法。eventHandlers
在doStart
里会注册对不同事件的处理逻辑。
EventDeserializer eventDeserializer = new EventDeserializer() {
@Override
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
try {
// Delegate to the superclass ...
Event event = super.nextEvent(inputStream);
// We have to record the most recent TableMapEventData for each table number for our custom deserializers ...
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
}
return event;
}
// DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
catch(EventDataDeserializationException edde) {
EventHeaderV4 header = new EventHeaderV4();
header.setEventType(EventType.INCIDENT);
header.setTimestamp(edde.getEventHeader().getTimestamp());
header.setServerId(edde.getEventHeader().getServerId());
if(edde.getEventHeader() instanceof EventHeaderV4) {
header.setEventLength(((EventHeaderV4)edde.getEventHeader()).getEventLength());
header.setNextPosition(((EventHeaderV4)edde.getEventHeader()).getNextPosition());
header.setFlags(((EventHeaderV4)edde.getEventHeader()).getFlags());
}
EventData data = new EventDataDeserializationExceptionData(edde);
return new Event(header, data);
}
}
};
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
client.setEventDeserializer(eventDeserializer);
初始化client
事件解析器; 对于不同的binlog
事件类型,注册不同的binlog
数据解析器,比如解析Date
,Datetime
等类型。
BinlogReader doStart方法
eventHandlers.put(EventType.STOP, this::handleServerStop);
eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
根据binlog
事件,注册相应的处理方法。对于delete
,update
,insert
变更,eventHandlers
分别使用handleDelete
,handleUpdate
,handleInsert
方法来处理,这3
个方法内会将数据添加到AbstractReader
的BlockingQueue
(records
)。
总结
到这里debezium
全量和增量订阅的处理逻辑已经非常清晰了。
推荐阅读
-
[debezium 源码分析] BinlogReader和SnapshotReader
-
[debezium 源码分析] MySqlConnectorTask 启动和拉取数据过程分析
-
Mybatis源码分析之存储过程调用和运行流程
-
Java StringBuilder和StringBuffer源码分析
-
Java StringBuilder和StringBuffer源码分析
-
MyBatis 源码分析 之SqlSession接口和Executor类
-
MyBatis 源码分析 之SqlSession接口和Executor类
-
流程图+源码深入分析:缓存穿透和击穿问题出现原理以及可落地的解决方案
-
Java太阳系小游戏分析和源码详解
-
Java太阳系小游戏分析和源码详解