欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[debezium 源码分析] MySqlConnectorTask 启动和拉取数据过程分析

程序员文章站 2024-03-11 17:28:13
...

MySqlConnectorTaskpoll方法会获取,并将这些数据存入kafka内。

start 方法

现在先分析MySqlConnectorTask.start方法,下面是一部分代码

...
this.taskContext.start();
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
...

可以看到start()方法里调用了taskContext.start()方法,后者会调用MysqlSchema.start()方法,MysqlSchema.start()方法内调用了DatabaseHistory.start()方法,默认使用的DatabaseHistory实现是KafkaDatabaseHistory;KafkaDatabaseHistory.start()方法里初始化了一个KafkaProducer实例。

继续分析MySqlConnectorTask.start方法

// Get the offsets for our partition ...
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition());
if (offsets != null) {
    // Set the position in our source info ...
    source.setOffset(offsets);
    logger.info("Found existing offset: {}", offsets);

    // Before anything else, recover the database history to the specified binlog coordinates ...
    taskContext.loadHistory(source);

    if (source.isSnapshotInEffect()) {
        // The last offset was an incomplete snapshot that we cannot recover from...
        if (taskContext.isSnapshotNeverAllowed()) {
            // No snapshots are allowed
            String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
                    + "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
            throw new ConnectException(msg);
        }
        // Otherwise, restart a new snapshot ...
        startWithSnapshot = true;
        logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
    } else {
        // No snapshot was in effect, so we should just start reading from the binlog ...
        startWithSnapshot = false;

        // But check to see if the server still has those binlog coordinates ...
        if (!isBinlogAvailable()) {
            if (!taskContext.isSnapshotAllowedWhenNeeded()) {
                String msg = "The connector is trying to read binlog starting at " + source + ", but this is no longer "
                        + "available on the server. Reconfigure the connector to use a snapshot when needed.";
                throw new ConnectException(msg);
            }
            startWithSnapshot = true;
        }
    }

} else {
    // We have no recorded offsets ...
    if (taskContext.isSnapshotNeverAllowed()) {
        // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
        // full history of the database.
        logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
        source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog
        taskContext.initializeHistory();

        // Look to see what the first available binlog file is called, and whether it looks like binlog files have
        // been purged. If so, then output a warning ...
        String earliestBinlogFilename = earliestBinlogFilename();
        if (earliestBinlogFilename == null) {
            logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
        } else if (!earliestBinlogFilename.endsWith("00001")) {
            logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
        }
    } else {
        // We are allowed to use snapshots, and that is the best way to start ...
        startWithSnapshot = true;
        // The snapshot will determine if GTIDs are set
        logger.info("Found no existing offset, so preparing to perform a snapshot");
        // The snapshot will also initialize history ...
    }
}

首先看offset != null的内容,如果允许snapshot,就设置startWithSnapshottrue;否则设为false,但是如果这个时候binlog,不可用并且设值必要时刻不可用snapshot,那么就会抛出异常;如果设置为必要时刻可以snapshot,那么就startWithSnapshottrue;

现在看offsets == null部分的内容,offsetnull表示从之前不存在同名的debezium订阅任务;如果用户上传的配置信息里不允许snapshot,那么从binlog0开始的位置进行消费, 之后会调用earliestBinlogFilename()方法,获取最早的binlog日志名。

protected String earliestBinlogFilename() {
    // Accumulate the available binlog filenames ...
    List<String> logNames = new ArrayList<>();
    try {
        logger.info("Checking all known binlogs from MySQL");
        taskContext.jdbc().query("SHOW BINARY LOGS", rs -> {
            while (rs.next()) {
                logNames.add(rs.getString(1));
            }
        });
    } catch (SQLException e) {
        throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
    }

    if (logNames.isEmpty()) return null;
    return logNames.get(0);
}

如果用户允许snapshot,将startWithSnapshot设置为true;

下面是剩下的MySqlConnectorTask.start内容

// Check whether the row-level binlog is enabled ...
final boolean rowBinlogEnabled = isRowBinlogEnabled();

// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
readers = new ChainedReader();
readers.uponCompletion(this::completeReaders);
BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
    // We're supposed to start with a snapshot, so set that up ...
    SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
    snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking());
    if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();
    readers.add(snapshotReader);

    if (taskContext.isInitialSnapshotOnly()) {
        logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
        readers.add(new BlockingReader("blocker"));
        readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
    } else {
        if (!rowBinlogEnabled) {
            throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is "
                    + "required for this connector to work properly. Change the MySQL configuration to use a "
                    + "row-level binlog and restart the connector.");
        }
        readers.add(binlogReader);
    }
} else {
    if (!rowBinlogEnabled) {
        throw new ConnectException(
                "The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
    }
    // We're going to start by reading the binlog ...
    readers.add(binlogReader);
}

// And finally initialize and start the chain of readers ...
this.readers.initialize();
this.readers.start();

先介绍2Reader的功能: 1) BinlogReader订阅最新的binlog数据;2) SnapshotReader订阅全量数据;
如果startWithSnapshottrue并且不是initial_only模式,就会向readers里添加BinlogReaderSnapshotReader实例,否则如果只是startWithSnapshottrue,那么会添加SnapshotReaderBlockingReader实例,后者会阻塞订阅任务的运行(因为用户配置的为initial_only模式);如果为false就只添加BinlogReader;

poll 方法

下面是MysqlConnectorTask.poll代码

@Override
public List<SourceRecord> poll() throws InterruptedException {
    Reader currentReader = readers;
    if (currentReader == null) {
        return null;
    }
    PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
    try {
        logger.trace("Polling for events");
        return currentReader.poll();
    } finally {
        prevLoggingContext.restore();
    }
}

currentReader.poll()方法里会调用AbstractReader.poll方法:

@Override
public List<SourceRecord> poll() throws InterruptedException {
    // Before we do anything else, determine if there was a failure and throw that exception ...
    failureException = this.failure.get();
    if (failureException != null) {
        // In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine
        // will then explicitly stop the connector task. Most likely, however, the reader that threw
        // the exception will have already stopped itself and will generate no additional records.
        // Regardless, there may be records on the queue that will never be consumed.
        throw failureException;
    }

    // this reader has been stopped before it reached the success or failed end state, so clean up and abort
    if (!running.get()) {
        cleanupResources();
        throw new InterruptedException( "Reader was stopped while polling" );
    }

    logger.trace("Polling for next batch of records");
    List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
    while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {
        // No records are available even though the snapshot has not yet completed, so sleep for a bit ...
        metronome.pause();

        // Check for failure after waking up ...
        failureException = this.failure.get();
        if (failureException != null) throw failureException;
    }

    if (batch.isEmpty() && success.get() && records.isEmpty()) {
        // We found no records but the operation completed successfully, so we're done
        this.running.set(false);
        cleanupResources();
        return null;
    }
    pollComplete(batch);
    logger.trace("Completed batch of {} records", batch.size());
    return batch;
}

注意这里的batch,它是最后会返回的拉取到的数据,这个listrecords这个BlockingQueue来填充。recordsenqueueRecord被填充数据。

protected void enqueueRecord(SourceRecord record) throws InterruptedException {
    if (record != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Enqueuing source record: {}", record);
        }
        this.records.put(record);
    }
}

这个方法会被SnapshotReaderBinlogReader调用;,这2Reader主要用来收集解析数据。目前为止,kafka connect调用poll拉取数据的过程已经清楚了。

相关标签: debezium