欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[debezium 源码分析] BinlogReader和SnapshotReader

程序员文章站 2024-03-11 18:21:25
...

debezium工作方式主要有2种,分别是全量和增量订阅。 全量对应SnapshotReader,增量订阅对应BinlogReader;前者会读取全表数据,后者会对binlog事件作出对应的处理逻辑。

SnapshotReader

该类内最重要的方法就是execute(),调用该方法也就是意味着一次snapshot操作,全量任务是在子线程里运行的。现在按序分析该方法的执行过程。

step 0

...
            logger.info("Step 0: disabling autocommit and enabling repeatable read transactions");
            mysql.setAutoCommit(false);
            sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
            mysql.execute(sql.get());
...

mysql事务隔离级别设置为REPEATABLE READ;

step 1

...
try {
    logger.info("Step 1: flush and obtain global read lock to prevent writes to database");
    sql.set("FLUSH TABLES WITH READ LOCK");
    mysql.execute(sql.get());
    lockAcquired = clock.currentTimeInMillis();
    metrics.globalLockAcquired();
    isLocked = true;
} catch (SQLException e) {
    logger.info("Step 1: unable to flush and acquire global read lock, will use table read locks after reading table names");
    // Continue anyway, since RDS (among others) don't allow setting a global lock
    assert !isLocked;
}
...

对数据库加全局只读锁。如果是生产环境,需要注意这一点。

step 2

logger.info("Step 2: start transaction with consistent snapshot");
sql.set("START TRANSACTION WITH CONSISTENT SNAPSHOT");
mysql.execute(sql.get());
isTxnStarted = true;

对存储引擎使用一致性读。根据mysql官网内容,除了REPEATABLE READ事务隔离级别,其它级别下WITH CONSISTENTSNAPSHOT会被忽略。

step 3

step = 3;
if (isLocked) {
    // Obtain the binlog position and update the SourceInfo in the context. This means that all source records
    // generated as part of the snapshot will contain the binlog position of the snapshot.
    readBinlogPosition(step++, source, mysql, sql);
}

// -------------------
// READ DATABASE NAMES
// -------------------
// Get the list of databases ...
if (!isRunning()) return;
logger.info("Step {}: read list of available databases", step++);
final List<String> databaseNames = new ArrayList<>();
sql.set("SHOW DATABASES");
mysql.query(sql.get(), rs -> {
    while (rs.next()) {
        databaseNames.add(rs.getString(1));
    }
});
logger.info("\t list of available databases is: {}", databaseNames);

如果全局读锁加锁成功,会调用readBinlogPosition方法。看readBinlogPosition方法

protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection mysql, AtomicReference<String> sql) throws SQLException {
    logger.info("Step {}: read binlog position of MySQL master", step);
    String showMasterStmt = "SHOW MASTER STATUS";
    sql.set(showMasterStmt);
    mysql.query(sql.get(), rs -> {
        if (rs.next()) {
            String binlogFilename = rs.getString(1);
            long binlogPosition = rs.getLong(2);
            source.setBinlogStartPoint(binlogFilename, binlogPosition);
            if (rs.getMetaData().getColumnCount() > 4) {
                // This column exists only in MySQL 5.6.5 or later ...
                String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
                source.setCompletedGtidSet(gtidSet);
                logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
                            gtidSet);
            } else {
                logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
            }
            source.startSnapshot();
        } else {
            throw new IllegalStateException("Cannot read the binlog filename and position via '" + showMasterStmt
                    + "'. Make sure your server is correctly configured");
        }
    });
}

该方法会读取当前binlog文件名以及位点,并保存。现在回到step 3剩余代码,获取所有可用的数据库名。

step 4

logger.info("Step {}: read list of available tables in each database", step++);
List<TableId> tableIds = new ArrayList<>();
final Map<String, List<TableId>> tableIdsByDbName = new HashMap<>();
final Set<String> readableDatabaseNames = new HashSet<>();
for (String dbName : databaseNames) {
    try {
        // MySQL sometimes considers some local files as databases (see DBZ-164),
        // so we will simply try each one and ignore the problematic ones ...
        sql.set("SHOW FULL TABLES IN " + quote(dbName) + " where Table_Type = 'BASE TABLE'");
        mysql.query(sql.get(), rs -> {
            while (rs.next() && isRunning()) {
                TableId id = new TableId(dbName, null, rs.getString(1));
                if (filters.tableFilter().test(id)) {
                    tableIds.add(id);
                    tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
                    logger.info("\t including '{}'", id);
                } else {
                    logger.info("\t '{}' is filtered out, discarding", id);
                }
            }
        });
        readableDatabaseNames.add(dbName);
    } catch (SQLException e) {
        // We were unable to execute the query or process the results, so skip this ...
        logger.warn("\t skipping database '{}' due to error reading tables: {}", dbName, e.getMessage());
    }
}
final Set<String> includedDatabaseNames = readableDatabaseNames.stream().filter(filters.databaseFilter()).collect(Collectors.toSet());
logger.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames);

获取数据库所有可用的表,之后对其进行过滤,得到用户想要的表。

step 5

if (!isLocked) {
    // ------------------------------------
    // LOCK TABLES and READ BINLOG POSITION
    // ------------------------------------
    // We were not able to acquire the global read lock, so instead we have to obtain a read lock on each table.
    // This requires different privileges than normal, and also means we can't unlock the tables without
    // implicitly committing our transaction ...
    if (!context.userHasPrivileges("LOCK TABLES")) {
        // We don't have the right privileges
        throw new ConnectException("User does not have the 'LOCK TABLES' privilege required to obtain a "
                + "consistent snapshot by preventing concurrent writes to tables.");
    }
    // We have the required privileges, so try to lock all of the tables we're interested in ...
    logger.info("Step {}: flush and obtain read lock for {} tables (preventing writes)", step++, tableIds.size());
    String tableList = tableIds.stream()
            .map(tid -> quote(tid))
            .reduce((r, element) -> r+ "," + element)
            .orElse(null);
    if (tableList != null) {
        sql.set("FLUSH TABLES " + tableList + " WITH READ LOCK");
        mysql.execute(sql.get());
    }
    lockAcquired = clock.currentTimeInMillis();
    metrics.globalLockAcquired();
    isLocked = true;
    tableLocks = true;

    // Our tables are locked, so read the binlog position ...
    readBinlogPosition(step++, source, mysql, sql);
}

如果数据库全局读锁加锁失败,就对每个表加表级读锁,然后调用readBinlogPosition方法。

step 6

schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges);

// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds());
allTableIds.addAll(tableIds);
allTableIds.stream()
           .filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
           .forEach(tableId -> schema.applyDdl(source, tableId.schema(),
                                               "DROP TABLE IF EXISTS " + quote(tableId),
                                               this::enqueueSchemaChanges));

// Add a DROP DATABASE statement for each database that we no longer know about ...
schema.tables().tableIds().stream().map(TableId::catalog)
      .filter(Predicates.not(readableDatabaseNames::contains))
      .filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
      .forEach(missingDbName -> schema.applyDdl(source, missingDbName,
                                                "DROP DATABASE IF EXISTS " + quote(missingDbName),
                                                this::enqueueSchemaChanges));

// Now process all of our tables for each database ...
for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) {
    if (!isRunning()) break;
    String dbName = entry.getKey();
    // First drop, create, and then use the named database ...
    schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + quote(dbName), this::enqueueSchemaChanges);
    schema.applyDdl(source, dbName, "CREATE DATABASE " + quote(dbName), this::enqueueSchemaChanges);
    schema.applyDdl(source, dbName, "USE " + quote(dbName), this::enqueueSchemaChanges);
    for (TableId tableId : entry.getValue()) {
        if (!isRunning()) break;
        sql.set("SHOW CREATE TABLE " + quote(tableId));
        mysql.query(sql.get(), rs -> {
            if (rs.next()) {
                schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
            }
        });
    }
}
context.makeRecord().regenerate();

没看懂

step 7

if (minimalBlocking && isLocked) {
    if (tableLocks) {
        // We could not acquire a global read lock and instead had to obtain individual table-level read locks
        // using 'FLUSH TABLE <tableName> WITH READ LOCK'. However, if we were to do this, the 'UNLOCK TABLES'
        // would implicitly commit our active transaction, and this would break our consistent snapshot logic.
        // Therefore, we cannot unlock the tables here!
        // https://dev.mysql.com/doc/refman/5.7/en/flush.html
        logger.info("Step {}: tables were locked explicitly, but to get a consistent snapshot we cannot "
                + "release the locks until we've read all tables.", step++);
    } else {
        // We are doing minimal blocking via a global read lock, so we should release the global read lock now.
        // All subsequent SELECT should still use the MVCC snapshot obtained when we started our transaction
        // (since we started it "...with consistent snapshot"). So, since we're only doing very simple SELECT
        // without WHERE predicates, we can release the lock now ...
        logger.info("Step {}: releasing global read lock to enable MySQL writes", step);
        sql.set("UNLOCK TABLES");
        mysql.execute(sql.get());
        isLocked = false;
        long lockReleased = clock.currentTimeInMillis();
        metrics.globalLockReleased();
        logger.info("Step {}: blocked writes to MySQL for a total of {}", step++,
                    Strings.duration(lockReleased - lockAcquired));
    }
}

解除数据库级别的全局读锁。

step 8

8部代码很长慢慢分析;

BufferedBlockingConsumer<SourceRecord> bufferedRecordQueue = BufferedBlockingConsumer.bufferLast(super::enqueueRecord);

enqueueRecord方法的封装,此方法内部有一个records队列,这是一个BlockingQueue,该队列的数据在kafka connect调用Taskpoll方法时被读取。

protected void enqueueRecord(SourceRecord record) throws InterruptedException {
    if (record != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Enqueuing source record: {}", record);
        }
        this.records.put(record);
    }
}
Iterator<TableId> tableIdIter = tableIds.iterator();
while (tableIdIter.hasNext()) {
    ...
}

遍历tableIds所有数据,这个List里包含了所有需要全量的表。

//1.
RecordsForTable recordMaker = context.makeRecord().forTable(tableId, null, bufferedRecordQueue);
if (recordMaker != null) {

    //2.
    // Switch to the table's database ...
    sql.set("USE " + quote(tableId.catalog()) + ";");
    mysql.execute(sql.get());

    AtomicLong numRows = new AtomicLong(-1);
    AtomicReference<String> rowCountStr = new AtomicReference<>("<unknown>");
    //3.
    StatementFactory statementFactory = this::createStatementWithLargeResultSet;
    if (largeTableCount > 0) {
        try {
            // Choose how we create statements based on the # of rows.
            // This is approximate and less accurate then COUNT(*),
            // but far more efficient for large InnoDB tables.
            sql.set("SHOW TABLE STATUS LIKE '" + tableId.table() + "';");
            mysql.query(sql.get(), rs -> {
                if (rs.next()) numRows.set(rs.getLong(5));
            });
            if (numRows.get() <= largeTableCount) {
                //4.
                statementFactory = this::createStatement;
            }
            rowCountStr.set(numRows.toString());
        } catch (SQLException e) {
            // Log it, but otherwise just use large result set by default ...
            logger.debug("Error while getting number of rows in table {}: {}", tableId, e.getMessage(), e);
        }
    }

    // Scan the rows in the table ...
    long start = clock.currentTimeInMillis();
    logger.info("Step {}: - scanning table '{}' ({} of {} tables)", step, tableId, ++counter, tableIds.size());

    //5.
    Map<TableId, String> selectOverrides = getSnapshotSelectOverridesByTable();
    String selectStatement = selectOverrides.getOrDefault(tableId, "SELECT * FROM " + quote(tableId));
    logger.info("For table '{}' using select statement: '{}'", tableId, selectStatement);
    sql.set(selectStatement);

    //snapshot
    try {
        int stepNum = step;
        //6.
        mysql.query(sql.get(), statementFactory, rs -> {
            long rowNum = 0;
            try {
                // The table is included in the connector's filters, so process all of the table records
                // ...
                final Table table = schema.tableFor(tableId);
                final int numColumns = table.columns().size();
                final Object[] row = new Object[numColumns];
                while (rs.next()) {
                    for (int i = 0, j = 1; i != numColumns; ++i, ++j) {
                        Column actualColumn = table.columns().get(i);
                        row[i] = readField(rs, j, actualColumn);
                    }
                    recorder.recordRow(recordMaker, row, ts); // has no row number!
                    ++rowNum;
                    if (rowNum % 100 == 0 && !isRunning()) {
                        // We've stopped running ...
                        break;
                    }
                    if (rowNum % 10_000 == 0) {
                        long stop = clock.currentTimeInMillis();
                        logger.info("Step {}: - {} of {} rows scanned from table '{}' after {}",
                                    stepNum, rowNum, rowCountStr, tableId, Strings.duration(stop - start));
                    }
                }

                totalRowCount.addAndGet(rowNum);
                if (isRunning()) {
                    long stop = clock.currentTimeInMillis();
                    logger.info("Step {}: - Completed scanning a total of {} rows from table '{}' after {}",
                                stepNum, rowNum, tableId, Strings.duration(stop - start));
                }
            } catch (InterruptedException e) {
                Thread.interrupted();
                // We were not able to finish all rows in all tables ...
                logger.info("Step {}: Stopping the snapshot due to thread interruption", stepNum);
                interrupted.set(true);
            }
        });
    } finally {
        metrics.completeTable();
        if (interrupted.get()) break;
    }
}
++completedCounter;
  1. 先获取某个表的消息生产器recordMaker;如果recordMaker不为Null,切换到该表所在的数据库。
  2. 初始化statementFactory,默认是一个用于大量数据的statement构造方法。
  3. 执行SHOW TABLE STATUS LIKE '" + tableId.table() + "';,获取该表的数据行数。如果得到的数据行数小于定义的大表数据量,就将statementFactory设为普通statement生产方式。
  4. 构建全量SQL语句,调用getSnapshotSelectOverridesByTable方法,用户设置的带有过滤条件的语句能够在这里变处理。
  5. 如果用户没有设置自己的SQL,那么执行的语句变为"SELECT * FROM " + quote(tableId) 语句。
  6. 执行SQL,对于每条数据,调用recorder.recordRow(recordMaker, row, ts);方法,该方法将每条数据放入一个BlockingQueue内,也就是AbstractReaderrecords变量里。

step 9

if (isTxnStarted) {
    if (interrupted.get() || !isRunning()) {
        // We were interrupted or were stopped while reading the tables,
        // so roll back the transaction and return immediately ...
        logger.info("Step {}: rolling back transaction after abort", step++);
        sql.set("ROLLBACK");
        mysql.execute(sql.get());
        metrics.abortSnapshot();
        return;
    }
    // Otherwise, commit our transaction
    logger.info("Step {}: committing transaction", step++);
    sql.set("COMMIT");
    mysql.execute(sql.get());
    metrics.completeSnapshot();
}

Commit或者Rollback;

step 10

if (isLocked) {
    if (tableLocks) {
        logger.info("Step {}: releasing table read locks to enable MySQL writes", step++);
    } else {
        logger.info("Step {}: releasing global read lock to enable MySQL writes", step++);
    }
    sql.set("UNLOCK TABLES");
    mysql.execute(sql.get());
    isLocked = false;
    long lockReleased = clock.currentTimeInMillis();
    metrics.globalLockReleased();
    if (tableLocks) {
        logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(lockReleased - lockAcquired));
    } else {
        logger.info("Writes to MySQL tables prevented for a total of {}", Strings.duration(lockReleased - lockAcquired));
    }
}

释放表级别的读锁。

SnapshotReader核心代码分析完成。


BinlogReader

这里分析BinlogReader几部分重要的代码;

BinlogReader初始化

client = new BinaryLogClient(context.hostname(), context.port(), context.username(), context.password());
...
client.registerEventListener(context.bufferSizeForBinlogReader() == 0 ? this::handleEvent : (new EventBuffer(context.bufferSizeForBinlogReader(), this))::add);
...

初始化一个Binlog处理客户端,添加对事件的处理方式。这里看handleEvent方法:

...
eventHandlers.getOrDefault(eventType, this::ignoreEvent).accept(event);
...

主要是一句,client对于事件的处理逻辑里,会调用eventHandlers的方法。eventHandlersdoStart里会注册对不同事件的处理逻辑。

EventDeserializer eventDeserializer = new EventDeserializer() {
    @Override
    public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
        try {
            // Delegate to the superclass ...
            Event event = super.nextEvent(inputStream);

            // We have to record the most recent TableMapEventData for each table number for our custom deserializers ...
            if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
                TableMapEventData tableMapEvent = event.getData();
                tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
            }
            return event;
        }
        // DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
        catch(EventDataDeserializationException edde) {
            EventHeaderV4 header = new EventHeaderV4();
            header.setEventType(EventType.INCIDENT);
            header.setTimestamp(edde.getEventHeader().getTimestamp());
            header.setServerId(edde.getEventHeader().getServerId());

            if(edde.getEventHeader() instanceof EventHeaderV4) {
                header.setEventLength(((EventHeaderV4)edde.getEventHeader()).getEventLength());
                header.setNextPosition(((EventHeaderV4)edde.getEventHeader()).getNextPosition());
                header.setFlags(((EventHeaderV4)edde.getEventHeader()).getFlags());
            }

            EventData data = new EventDataDeserializationExceptionData(edde);
            return new Event(header, data);
        }
    }
};
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
                                           new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
                                           new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
                                           new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
                                           new RowDeserializers.WriteRowsDeserializer(
                                                   tableMapEventByTableId).setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
                                           new RowDeserializers.UpdateRowsDeserializer(
                                                   tableMapEventByTableId).setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
                                           new RowDeserializers.DeleteRowsDeserializer(
                                                   tableMapEventByTableId).setMayContainExtraInformation(true));
client.setEventDeserializer(eventDeserializer);

初始化client事件解析器; 对于不同的binlog事件类型,注册不同的binlog数据解析器,比如解析Date,Datetime等类型。

BinlogReader doStart方法

eventHandlers.put(EventType.STOP, this::handleServerStop);
eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat);
eventHandlers.put(EventType.INCIDENT, this::handleServerIncident);
eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent);
eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata);
eventHandlers.put(EventType.QUERY, this::handleQueryEvent);
eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert);
eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate);
eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete);
eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange);
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);

根据binlog事件,注册相应的处理方法。对于delete,update,insert变更,eventHandlers分别使用handleDelete,handleUpdate,handleInsert方法来处理,这3个方法内会将数据添加到AbstractReaderBlockingQueue(records)。

总结

到这里debezium全量和增量订阅的处理逻辑已经非常清晰了。

相关标签: debezium