欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

maxwell + kafka + bireme传统数仓实时同步方案

程序员文章站 2022-07-14 21:47:52
...

1. mysql(maridb) 开启binlog

SHOW VARIABLES LIKE '%log_bin%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | OFF   |
+---------------+-------+
--  如果返回OFF 没有开启,ON 则是已经开启

##1.1 设置开启login

修改my.cnf,mysql 一般位于 / e t c / m y . c n f \color{red}{/etc/my.cnf} /etc/my.cnf maridb一般位于 / e t c / m y . c n f . d / s e r v e r . c n f \color{red}{/etc/my.cnf.d/server.cnf} /etc/my.cnf.d/server.cnf

# mysqld 中添加如下
[mysqld]
log-bin = mysql-bin #开启binlog
binlog-format = ROW #选择row模式
server_id = 1 #配置mysql replication需要定义,不能和canal的slaveId重复

修改后重启数据库

service mysql restart

检查是否开启

show global variables like "%log_bin%";
+---------------------------------+---------------------------------------+
| Variable_name                   | Value                                 |
+---------------------------------+---------------------------------------+
| log_bin                         | ON                                    |
| log_bin_basename                | /usr/local/mysql/data/mysql-bin       |
| log_bin_index                   | /usr/local/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF                                   |
| log_bin_use_v1_row_events       | OFF                                   |
+---------------------------------+---------------------------------------+
SHOW VARIABLES LIKE '%log_bin%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
-- 开启成功

1.2 创始用户及表

-- 开通MySQL高权限账号
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
FLUSH PRIVILEGES;
-- 初始化表
CREATE TABLE tb1(a int, b char(10), primary key(a))

2. maxwell

Maxwell通过canal解析binlog,并将其发送到Kafka

  1. maxwell下载地址

  2. maxwell官方文档

  3. 解压: tar xf maxwell.tar.gz

  4. 修改配置文件 : cp config.properties.example config.properties && vi config.properties

    #输入来源于mysql binlog 
    #输出到kafka
    #配置说明
    1)kafka_topic 
    #可配置成如 namespace_%{database}_%{table} %{database} 和 %{table}会被替换成真正的值
    2)kafka_version 
    #注意和kafka版本匹配。
    3)额外配置 
    #kafka.acks、kafka.compression.type、kafka.retries
    4)filter
    #可排除库、表、过滤掉某些行。也可用一段js灵活处理数据 
    #如 exclude: test_maxwell.user_info.userid = 1 排除test_maxwell库user_info表userid值为1的行
    5)monitor
    #可配置的监控方式jmx、http等
    #http_bind_address 监控绑定的IP
    #http_port 监控绑定的Port
    #metrics_type=http
    #metrics_jvm=true
    #http_bind_address=localhost
    #http_port=8111
    #http_path_prefix=maxwell
    #http_diagnostic=true
    #http_diagnostic_timeout=10000
    # 检查Maxwell健康状态
    http://localhost:8111/test_maxwell/healthcheck
    # 成功发送到Kafka的消息数、发送失败的消息数、已从binlog处理的行数、消费binlog速度、jvm状态
    http://localhost:8111/test_maxwell/metrics
    # 能ping通返回字符串pong
    http://localhost:8111/test_maxwell/ping
    

5.bootstrap引导操作

Maxwell提供了对于需要对同步表的初始化操作,针对初始化的问题,Maxwell 提供了一个命令工具 maxwell-bootstrap 帮助我们完成数据初始化,maxwell-bootstrap 是基于 SELECT * FROM table 的方式进行全量数据初始化,不会产生多余的binlog!
bin/maxwell-bootstrap --user maxwell --password maxwell --host 127.0.0.1  --database test --table tb2 --client_id maxwell 
#在启动maxwell 指定 client_id
bin/maxwell --client_id = maxwell

maxwell.bootstrap 表中插入记录,手动触发

INSERT INTO maxwell.bootstrap (database_name, table_name,where_clause) VALUES ('test', 'tb2','1=1');

2.1 输入到控制台

# config.propertis
log_level=info
producer=stdout
# mysql login info
host=localhost
port=3306
user=maxwell
password=maxwell

启动Maxwell

bin/maxwell --config config.propertis  
#或者 bin/maxwell --host='localhost' --port=3306 --user='maxwell' --password='maxwell' --producer=stdout --log_level=INFO # 

maxwell默认在源数据库生成库maxwell记录相关信息

mysql> show tables;
+-------------------+
| Tables_in_maxwell |
+-------------------+
| bootstrap         |
| columns           |
| databases         |
| heartbeats        |
| positions         |
| schemas           |
| tables            |
+-------------------+

插入数据

insert into tb1 value(1,"a");
insert into tb1 value(2,"b")

控制台输出

bin/maxwell --host='localhost' --port=3306 --user='root' --password='root' --producer=stdout 
# 使用maxwell-bootstrap  加 --client_id = maxwell 手动触发不需要添加
Using kafka version: 1.0.0
10:58:36,398 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
10:58:36,676 INFO  Maxwell - Maxwell v1.24.1 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-bin.000007:956390], lastHeartbeat=1584672514512]
10:58:36,843 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000007:917665], lastHeartbeat=0])
10:58:36,906 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000007:956390
10:58:36,921 INFO  BinaryLogClient - Connected to localhost:3306 at mysql-bin.000007/956390 (sid:6379, cid:79)
10:58:36,921 INFO  BinlogConnectorLifecycleListener - Binlog connected.
## 全量输出
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":10,"b":"10","dtm":null}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":11,"b":"100000000","dtm":null}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":12,"b":"1000000000","dtm":null}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":14,"b":"1000000000","dtm":null}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":20,"b":"123","dtm":"2020-01-01 14:00:00"}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":21,"b":"123","dtm":"2020-01-01 14:00:00"}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":22,"b":"123","dtm":"2020-01-01 14:00:00"}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":23,"b":"1234","dtm":"2020-01-01 14:00:00"}}
{"database":"sampledata","table":"tb2","type":"bootstrap-complete","ts":1584716273,"data":{}}
22:57:53,010 INFO  SynchronousBootstrapper - bootstrapping ended for #1 sampledata.tb2 WHERE 1=1
# 增量输出
{"database":"sampledata","table":"tb1","type":"insert","ts":1584672522,"xid":18003,"commit":true,"data":{"a":1,"b":"a"}}
{"database":"sampledata","table":"tb1","type":"insert","ts":1584672522,"xid":18004,"commit":true,"data":{"a":2,"b":"b"}}

2.2 输出到kafak

2.2.1 kafak-topic

# 创建主题
bin/kafka-topics.sh --create --zookeeper hdp-m-1.data.dc.zjft.com:2181 hdp-m-2.data.dc.zjft.com:2181 hdp-e-1.data.dc.zjft.com:2181 --replication-factor 1 --partitions 1 --topic maxwell
# 查看所有主题 
bin/kafka-topics.sh --clist --zookeeper hdp-m-1.data.dc.zjft.com:2181 hdp-m-2.data.dc.zjft.com:2181 hdp-e-1.data.dc.zjft.com:2181
# 消费者
bin/kafka-console-consumer.sh --topic maxwell2 --bootstrap-server hdp-s-1.data.dc.zjft.com:6667

2.2.2 配置

# config.propertis
log_level=info

producer=kafka

kafka_topic=maxwell
kafka.bootstrap.servers=hdp-s-1.data.dc.zjft.com:6667

# mysql login info
host=localhost
port=3306
user=maxwell
password=maxwell

运行

bin/maxwell --config config.protpertis
# 主题不存在报错:Error while fetching metadata with correlation id 379 : {tpoicname:=INVALID_TOPIC_EXCEPTION}
# 若无报错,则消费者即可得到数据
# 在mysql 收到插入(更新)数据
kafka-console-consumer.sh --topic maxwell1 --bootstrap-server hdp-s-1.data.dc.zjft.com:6667
{"database":"sampledata","table":"tb1","type":"insert","ts":1584673396,"xid":18339,"commit":true,"data":{"a":3,"b":"c"}}
{"database":"sampledata","table":"tb1","type":"update","ts":1584673750,"xid":18812,"commit":true,"data":{"a":3,"b":"d"},"old":{"b":"c"}}
{"database":"sampledata","table":"tb1","type":"update","ts":1584673774,"xid":18847,"commit":true,"data":{"a":1,"b":"d"},"old":{"b":"a"}}
{"database":"sampledata","table":"tb1","type":"delete","ts":1584673810,"xid":18899,"commit":true,"data":{"a":2,"b":"b"}}
{"database":"sampledata","table":"tb1","type":"delete","ts":1584673827,"xid":18925,"commit":true,"data":{"a":1,"b":"d"}}
{"database":"sampledata","table":"tb1","type":"insert","ts":1584673921,"xid":19052,"commit":true,"data":{"a":1,"b":"1"}}

3. bireme

运行bireme环境

  • JAVA_HOME
  • yun install jsvc

使用这个版本bireme 这个使用会报错:Cannot find daemon loader org/apache/commons/daemon/support/DaemonLoader

因为bireme 的 jar 包不在路径里,直接下载编译好的bireme

  1. 下载bireme

  2. 解压: tar xf bireme bireme-2.0.0-alpha-1.tar.gz

  3. 修改配置文件 vim etc/config.properties etc/maxwell.properties

# etc/config.propertis

target.url = jdbc:postgresql://10.34.11.141:2345/postgres
target.user = pentaho_user
target.passwd = cloud.Zijin

# data source name list, separated by comma.
data_source = maxwell1

# data source name list, separated by comma.
data_source = maxwell1

# data source "mysql1" type
maxwell1.type = maxwell
# kafka server which maxwell write binlog into.
maxwell1.kafka.server = hdp-s-1.data.dc.zjft.com:6667
# kafka topic which maxwell write binlog into.
maxwell1.kafka.topic = maxwell
# kafka groupid used for consumer.
maxwell1.kafka.groupid = bireme

# set the IP address for bireme state server.
state.server.addr = 0.0.0.0
# set the port for bireme state server.
state.server.port = 8080

maxwell1.properties的maxwell1一定要和bireme的data_source保持一致

# etc/maxwell.properties
test.tb1 = public.tb1

若postgres表不存在则:cn.hashdata.bireme.BiremeException: Greenplum table and MySQL table size are inconsistent!*

  1. 在postgres建表
create table tb1(a int, b char(11), primary key(a));
-- 字段需要和mysql一直否在报:字段不存在
-- 字段类型需要一直,否则同步到postgres中会忽略该字段
  1. 启动bireme : bin/bireme start

  2. 监控,http://127.0.0.1:8080/pretty

  3. 监听日志:tai l -f logs/bireme.out

  4. 测试(查询psotgres)

select * from tb1;
a | b
----±-----------
3 | d
6 | 6
7 | 7
4 | 4
10 | 10
1 | 1
5 | 5