BinlogReader和SnapshotReader详情分析
debezium工作方式主要有
2种,分别是全量和增量订阅。 全量对应
SnapshotReader,增量订阅对应
BinlogReader;前者会读取全表数据,后者会对
binlog事件作出对应的处理逻辑。
SnapshotReader
SnapshotReader
该类内最重要的方法就是
execute(),调用该方法也就是意味着一次
snapshot操作,全量任务是在子线程里运行的。现在按序分析该方法的执行过程。
step 0
step 0
... logger.info("Step 0: disabling autocommit and enabling repeatable read transactions"); mysql.setAutoCommit(false); sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"); mysql.execute(sql.get()); ...
将
mysql事务隔离级别设置为
REPEATABLE READ;
step 1
step 1
... try { logger.info("Step 1: flush and obtain global read lock to prevent writes to database"); sql.set("FLUSH TABLES WITH READ LOCK"); mysql.execute(sql.get()); lockAcquired = clock.currentTimeInMillis(); metrics.globalLockAcquired(); isLocked = true; } catch (SQLException e) { logger.info("Step 1: unable to flush and acquire global read lock, will use table read locks after reading table names"); // Continue anyway, since RDS (among others) don't allow setting a global lock assert !isLocked; } ...
对数据库加全局只读锁。如果是生产环境,需要注意这一点。
step 2
step 2
logger.info("Step 2: start transaction with consistent snapshot"); sql.set("START TRANSACTION WITH CONSISTENT SNAPSHOT"); mysql.execute(sql.get()); isTxnStarted = true;
对存储引擎使用一致性读。根据
mysql官网内容,除了
REPEATABLE READ事务隔离级别,其它级别下
WITH CONSISTENTSNAPSHOT会被忽略。
step 3
step 3
step = 3; if (isLocked) { // Obtain the binlog position and update the SourceInfo in the context. This means that all source records // generated as part of the snapshot will contain the binlog position of the snapshot. readBinlogPosition(step++, source, mysql, sql); } // ------------------- // READ DATABASE NAMES // ------------------- // Get the list of databases ... if (!isRunning()) return; logger.info("Step {}: read list of available databases", step++); final List databaseNames = new ArrayList<>(); sql.set("SHOW DATABASES"); mysql.query(sql.get(), rs -> { while (rs.next()) { databaseNames.add(rs.getString(1)); } }); logger.info("\t list of available databases is: {}", databaseNames);
如果全局读锁加锁成功,会调用
readBinlogPosition方法。看
readBinlogPosition方法
protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection mysql, AtomicReference sql) throws SQLException { logger.info("Step {}: read binlog position of MySQL master", step); String showMasterStmt = "SHOW MASTER STATUS"; sql.set(showMasterStmt); mysql.query(sql.get(), rs -> { if (rs.next()) { String binlogFilename = rs.getString(1); long binlogPosition = rs.getLong(2); source.setBinlogStartPoint(binlogFilename, binlogPosition); if (rs.getMetaData().getColumnCount() > 4) { // This column exists only in MySQL 5.6.5 or later ... String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set source.setCompletedGtidSet(gtidSet); logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition, gtidSet); } else { logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition); } source.startSnapshot(); } else { throw new IllegalStateException("Cannot read the binlog filename and position via '" + showMasterStmt + "'. Make sure your server is correctly configured"); } }); }
该方法会读取当前
binlog文件名以及位点,并保存。现在回到
step 3剩余代码,获取所有可用的数据库名。
step 4
step 4
logger.info("Step {}: read list of available tables in each database", step++); List tableIds = new ArrayList<>(); final Map> tableIdsByDbName = new HashMap<>(); final Set readableDatabaseNames = new HashSet<>(); for (String dbName : databaseNames) { try { // MySQL sometimes considers some local files as databases (see DBZ-164), // so we will simply try each one and ignore the problematic ones ... sql.set("SHOW FULL TABLES IN " + quote(dbName) + " where Table_Type = 'BASE TABLE'"); mysql.query(sql.get(), rs -> { while (rs.next() && isRunning()) { TableId id = new TableId(dbName, null, rs.getString(1)); if (filters.tableFilter().test(id)) { tableIds.add(id); tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id); logger.info("\t including '{}'", id); } else { logger.info("\t '{}' is filtered out, discarding", id); } } }); readableDatabaseNames.add(dbName); } catch (SQLException e) { // We were unable to execute the query or process the results, so skip this ... logger.warn("\t skipping database '{}' due to error reading tables: {}", dbName, e.getMessage()); } } final Set includedDatabaseNames = readableDatabaseNames.stream().filter(filters.databaseFilter()).collect(Collectors.toSet()); logger.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames);
获取数据库所有可用的表,之后对其进行过滤,得到用户想要的表。
step 5
step 5
if (!isLocked) { // ------------------------------------ // LOCK TABLES and READ BINLOG POSITION // ------------------------------------ // We were not able to acquire the global read lock, so instead we have to obtain a read lock on each table. // This requires different privileges than normal, and also means we can't unlock the tables without // implicitly committing our transaction ... if (!context.userHasPrivileges("LOCK TABLES")) { // We don't have the right privileges throw new ConnectException("User does not have the 'LOCK TABLES' privilege required to obtain a " + "consistent snapshot by preventing concurrent writes to tables."); } // We have the required privileges, so try to lock all of the tables we're interested in ... logger.info("Step {}: flush and obtain read lock for {} tables (preventing writes)", step++, tableIds.size()); String tableList = tableIds.stream() .map(tid -> quote(tid)) .reduce((r, element) -> r+ "," + element) .orElse(null); if (tableList != null) { sql.set("FLUSH TABLES " + tableList + " WITH READ LOCK"); mysql.execute(sql.get()); } lockAcquired = clock.currentTimeInMillis(); metrics.globalLockAcquired(); isLocked = true; tableLocks = true; // Our tables are locked, so read the binlog position ... readBinlogPosition(step++, source, mysql, sql); }
如果数据库全局读锁加锁失败,就对每个表加表级读锁,然后调用
readBinlogPosition方法。
step 6
step 6
schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges); // Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ... Set allTableIds = new HashSet<>(schema.tables().tableIds()); allTableIds.addAll(tableIds); allTableIds.stream() .filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped .forEach(tableId -> schema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + quote(tableId), this::enqueueSchemaChanges)); // Add a DROP DATABASE statement for each database that we no longer know about ... schema.tables().tableIds().stream().map(TableId::catalog) .filter(Predicates.not(readableDatabaseNames::contains)) .filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped .forEach(missingDbName -> schema.applyDdl(source, missingDbName, "DROP DATABASE IF EXISTS " + quote(missingDbName), this::enqueueSchemaChanges)); // Now process all of our tables for each database ... for (Map.Entry> entry : tableIdsByDbName.entrySet()) { if (!isRunning()) break; String dbName = entry.getKey(); // First drop, create, and then use the named database ... schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + quote(dbName), this::enqueueSchemaChanges); schema.applyDdl(source, dbName, "CREATE DATABASE " + quote(dbName), this::enqueueSchemaChanges); schema.applyDdl(source, dbName, "USE " + quote(dbName), this::enqueueSchemaChanges); for (TableId tableId : entry.getValue()) { if (!isRunning()) break; sql.set("SHOW CREATE TABLE " + quote(tableId)); mysql.query(sql.get(), rs -> { if (rs.next()) { schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges); } }); } } context.makeRecord().regenerate();
没看懂
step 7
step 7
if (minimalBlocking && isLocked) { if (tableLocks) { // We could not acquire a global read lock and instead had to obtain individual table-level read locks // using 'FLUSH TABLE WITH READ LOCK'. However, if we were to do this, the 'UNLOCK TABLES' // would implicitly commit our active transaction, and this would break our consistent snapshot logic. // Therefore, we cannot unlock the tables here! // https://dev.mysql.com/doc/refman/5.7/en/flush.html logger.info("Step {}: tables were locked explicitly, but to get a consistent snapshot we cannot " + "release the locks until we've read all tables.", step++); } else { // We are doing minimal blocking via a global read lock, so we should release the global read lock now. // All subsequent SELECT should still use the MVCC snapshot obtained when we started our transaction // (since we started it "...with consistent snapshot"). So, since we're only doing very simple SELECT // without WHERE predicates, we can release the lock now ... logger.info("Step {}: releasing global read lock to enable MySQL writes", step); sql.set("UNLOCK TABLES"); mysql.execute(sql.get()); isLocked = false; long lockReleased = clock.currentTimeInMillis(); metrics.globalLockReleased(); logger.info("Step {}: blocked writes to MySQL for a total of {}", step++, Strings.duration(lockReleased - lockAcquired)); } }
解除数据库级别的全局读锁。
step 8
step 8
第
8部代码很长慢慢分析;
BufferedBlockingConsumer bufferedRecordQueue = BufferedBlockingConsumer.bufferLast(super::enqueueRecord);
对
enqueueRecord方法的封装,此方法内部有一个
records队列,这是一个
BlockingQueue,该队列的数据在
kafka connect调用
Task的
poll方法时被读取。
protected void enqueueRecord(SourceRecord record) throws InterruptedException { if (record != null) { if (logger.isTraceEnabled()) { logger.trace("Enqueuing source record: {}", record); } this.records.put(record); } }
Iterator tableIdIter = tableIds.iterator(); while (tableIdIter.hasNext()) { ... }
遍历
tableIds所有数据,这个
List里包含了所有需要全量的表。
//1. RecordsForTable recordMaker = context.makeRecord().forTable(tableId, null, bufferedRecordQueue); if (recordMaker != null) { //2. // Switch to the table's database ... sql.set("USE " + quote(tableId.catalog()) + ";"); mysql.execute(sql.get()); AtomicLong numRows = new AtomicLong(-1); AtomicReference rowCountStr = new AtomicReference<>(""); //3. StatementFactory statementFactory = this::createStatementWithLargeResultSet; if (largeTableCount > 0) { try { // Choose how we create statements based on the # of rows. // This is approximate and less accurate then COUNT(*), // but far more efficient for large InnoDB tables. sql.set("SHOW TABLE STATUS LIKE '" + tableId.table() + "';"); mysql.query(sql.get(), rs -> { if (rs.next()) numRows.set(rs.getLong(5)); }); if (numRows.get() <= largeTableCount) { //4. statementFactory = this::createStatement; } rowCountStr.set(numRows.toString()); } catch (SQLException e) { // Log it, but otherwise just use large result set by default ... logger.debug("Error while getting number of rows in table {}: {}", tableId, e.getMessage(), e); } } // Scan the rows in the table ... long start = clock.currentTimeInMillis(); logger.info("Step {}: - scanning table '{}' ({} of {} tables)", step, tableId, ++counter, tableIds.size()); //5. Map selectOverrides = getSnapshotSelectOverridesByTable(); String selectStatement = selectOverrides.getOrDefault(tableId, "SELECT * FROM " + quote(tableId)); logger.info("For table '{}' using select statement: '{}'", tableId, selectStatement); sql.set(selectStatement); //snapshot try { int stepNum = step; //6. mysql.query(sql.get(), statementFactory, rs -> { long rowNum = 0; try { // The table is included in the connector's filters, so process all of the table records // ... final Table table = schema.tableFor(tableId); final int numColumns = table.columns().size(); final Object[] row = new Object[numColumns]; while (rs.next()) { for (int i = 0, j = 1; i != numColumns; ++i, ++j) { Column actualColumn = table.columns().get(i); row[i] = readField(rs, j, actualColumn); } recorder.recordRow(recordMaker, row, ts); // has no row number! ++rowNum; if (rowNum % 100 == 0 && !isRunning()) { // We've stopped running ... break; } if (rowNum % 10_000 == 0) { long stop = clock.currentTimeInMillis(); logger.info("Step {}: - {} of {} rows scanned from table '{}' after {}", stepNum, rowNum, rowCountStr, tableId, Strings.duration(stop - start)); } } totalRowCount.addAndGet(rowNum); if (isRunning()) { long stop = clock.currentTimeInMillis(); logger.info("Step {}: - Completed scanning a total of {} rows from table '{}' after {}", stepNum, rowNum, tableId, Strings.duration(stop - start)); } } catch (InterruptedException e) { Thread.interrupted(); // We were not able to finish all rows in all tables ... logger.info("Step {}: Stopping the snapshot due to thread interruption", stepNum); interrupted.set(true); } }); } finally { metrics.completeTable(); if (interrupted.get()) break; } } ++completedCounter;
先获取某个表的消息生产器recordMaker;如果recordMaker不为Null,切换到该表所在的数据库。 初始化statementFactory,默认是一个用于大量数据的statement构造方法。 执行SHOW TABLE STATUS LIKE '" + tableId.table() + "';,获取该表的数据行数。如果得到的数据行数小于定义的大表数据量,就将statementFactory设为普通statement生产方式。 构建全量SQL语句,调用getSnapshotSelectOverridesByTable方法,用户设置的带有过滤条件的语句能够在这里变处理。 如果用户没有设置自己的SQL,那么执行的语句变为"SELECT * FROM " + quote(tableId) 语句。 执行SQL,对于每条数据,调用recorder.recordRow(recordMaker, row, ts);方法,该方法将每条数据放入一个BlockingQueue内,也就是AbstractReader的records变量里。
step 9
step 9
if (isTxnStarted) { if (interrupted.get() || !isRunning()) { // We were interrupted or were stopped while reading the tables, // so roll back the transaction and return immediately ... logger.info("Step {}: rolling back transaction after abort", step++); sql.set("ROLLBACK"); mysql.execute(sql.get()); metrics.abortSnapshot(); return; } // Otherwise, commit our transaction logger.info("Step {}: committing transaction", step++); sql.set("COMMIT"); mysql.execute(sql.get()); metrics.completeSnapshot(); }
Commit或者
Rollback;
step 10
step 10
if (isLocked) { if (tableLocks) { logger.info("Step {}: releasing table read locks to enable MySQL writes", step++); } else { logger.info("Step {}: releasing global read lock to enable MySQL writes", step++); } sql.set("UNLOCK TABLES"); mysql.execute(sql.get()); isLocked = false; long lockReleased = clock.currentTimeInMillis(); metrics.globalLockReleased(); if (tableLocks) { logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased - lockAcquired)); } else { logger.info("Writes to MySQL tables prevented for a total of {}", Strings.duration(lockReleased - lockAcquired)); } }
释放表级别的读锁。
SnapshotReader核心代码分析完成。
BinlogReader
BinlogReader
这里分析
BinlogReader几部分重要的代码;
BinlogReader初始化
BinlogReader初始化
client = new BinaryLogClient(context.hostname(), context.port(), context.username(), context.password()); ... client.registerEventListener(context.bufferSizeForBinlogReader() == 0 ? this::handleEvent : (new EventBuffer(context.bufferSizeForBinlogReader(), this))::add); ...
初始化一个
Binlog处理客户端,添加对事件的处理方式。这里看
handleEvent方法:
... eventHandlers.getOrDefault(eventType, this::ignoreEvent).accept(event); ...
主要是一句,
client对于事件的处理逻辑里,会调用
eventHandlers的方法。
eventHandlers在
doStart里会注册对不同事件的处理逻辑。
EventDeserializer eventDeserializer = new EventDeserializer() { @Override public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { try { // Delegate to the superclass ... Event event = super.nextEvent(inputStream); // We have to record the most recent TableMapEventData for each table number for our custom deserializers ... if (event.getHeader().getEventType() == EventType.TABLE_MAP) { TableMapEventData tableMapEvent = event.getData(); tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent); } return event; } // DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging catch(EventDataDeserializationException edde) { EventHeaderV4 header = new EventHeaderV4(); header.setEventType(EventType.INCIDENT); header.setTimestamp(edde.getEventHeader().getTimestamp()); header.setServerId(edde.getEventHeader().getServerId()); if(edde.getEventHeader() instanceof EventHeaderV4) { header.setEventLength(((EventHeaderV4)edde.getEventHeader()).getEventLength()); header.setNextPosition(((EventHeaderV4)edde.getEventHeader()).getNextPosition()); header.setFlags(((EventHeaderV4)edde.getEventHeader()).getFlags()); } EventData data = new EventDataDeserializationExceptionData(edde); return new Event(header, data); } } }; // Add our custom deserializers ... eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer()); eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer()); eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS, new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)); eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS, new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)); eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS, new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)); eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS, new RowDeserializers.WriteRowsDeserializer( tableMapEventByTableId).setMayContainExtraInformation(true)); eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS, new RowDeserializers.UpdateRowsDeserializer( tableMapEventByTableId).setMayContainExtraInformation(true)); eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS, new RowDeserializers.DeleteRowsDeserializer( tableMapEventByTableId).setMayContainExtraInformation(true)); client.setEventDeserializer(eventDeserializer);
初始化
client事件解析器; 对于不同的
binlog事件类型,注册不同的
binlog数据解析器,比如解析
Date,
Datetime等类型。
dostart方法">BinlogReader doStart方法
BinlogReader doStart方法
eventHandlers.put(EventType.STOP, this::handleServerStop); eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat); eventHandlers.put(EventType.INCIDENT, this::handleServerIncident); eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent); eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata); eventHandlers.put(EventType.QUERY, this::handleQueryEvent); eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert); eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate); eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete); eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert); eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate); eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete); eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange); eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
根据
binlog事件,注册相应的处理方法。对于
delete,
update,
insert变更,
eventHandlers分别使用
handleDelete,
handleUpdate,
handleInsert方法来处理,这
3个方法内会将数据添加到
AbstractReader的
BlockingQueue(
records)。
总结
总结
到这里
debezium全量和增量订阅的处理逻辑已经非常清晰了。