maxwell + kafka + bireme传统数仓实时同步方案
文章目录
1. mysql(maridb) 开启binlog
SHOW VARIABLES LIKE '%log_bin%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
-- 如果返回OFF 没有开启,ON 则是已经开启
##1.1 设置开启login
修改my.cnf,mysql 一般位于 / e t c / m y . c n f \color{red}{/etc/my.cnf} /etc/my.cnf maridb一般位于 / e t c / m y . c n f . d / s e r v e r . c n f \color{red}{/etc/my.cnf.d/server.cnf} /etc/my.cnf.d/server.cnf
# mysqld 中添加如下
[mysqld]
log-bin = mysql-bin #开启binlog
binlog-format = ROW #选择row模式
server_id = 1 #配置mysql replication需要定义,不能和canal的slaveId重复
修改后重启数据库
service mysql restart
检查是否开启
show global variables like "%log_bin%";
+---------------------------------+---------------------------------------+
| Variable_name | Value |
+---------------------------------+---------------------------------------+
| log_bin | ON |
| log_bin_basename | /usr/local/mysql/data/mysql-bin |
| log_bin_index | /usr/local/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+---------------------------------+---------------------------------------+
SHOW VARIABLES LIKE '%log_bin%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
-- 开启成功
1.2 创始用户及表
-- 开通MySQL高权限账号
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
FLUSH PRIVILEGES;
-- 初始化表
CREATE TABLE tb1(a int, b char(10), primary key(a))
2. maxwell
Maxwell通过canal解析binlog,并将其发送到Kafka
-
解压: tar xf maxwell.tar.gz
-
修改配置文件 : cp config.properties.example config.properties && vi config.properties
#输入来源于mysql binlog #输出到kafka #配置说明 1)kafka_topic #可配置成如 namespace_%{database}_%{table} %{database} 和 %{table}会被替换成真正的值 2)kafka_version #注意和kafka版本匹配。 3)额外配置 #kafka.acks、kafka.compression.type、kafka.retries 4)filter #可排除库、表、过滤掉某些行。也可用一段js灵活处理数据 #如 exclude: test_maxwell.user_info.userid = 1 排除test_maxwell库user_info表userid值为1的行 5)monitor #可配置的监控方式jmx、http等 #http_bind_address 监控绑定的IP #http_port 监控绑定的Port #metrics_type=http #metrics_jvm=true #http_bind_address=localhost #http_port=8111 #http_path_prefix=maxwell #http_diagnostic=true #http_diagnostic_timeout=10000 # 检查Maxwell健康状态 http://localhost:8111/test_maxwell/healthcheck # 成功发送到Kafka的消息数、发送失败的消息数、已从binlog处理的行数、消费binlog速度、jvm状态 http://localhost:8111/test_maxwell/metrics # 能ping通返回字符串pong http://localhost:8111/test_maxwell/ping
5.bootstrap引导操作
Maxwell提供了对于需要对同步表的初始化操作,针对初始化的问题,Maxwell 提供了一个命令工具 maxwell-bootstrap 帮助我们完成数据初始化,maxwell-bootstrap 是基于 SELECT * FROM table 的方式进行全量数据初始化,不会产生多余的binlog!
bin/maxwell-bootstrap --user maxwell --password maxwell --host 127.0.0.1 --database test --table tb2 --client_id maxwell
#在启动maxwell 指定 client_id
bin/maxwell --client_id = maxwell
在 maxwell.bootstrap
表中插入记录,手动触发
INSERT INTO maxwell.bootstrap (database_name, table_name,where_clause) VALUES ('test', 'tb2','1=1');
2.1 输入到控制台
# config.propertis
log_level=info
producer=stdout
# mysql login info
host=localhost
port=3306
user=maxwell
password=maxwell
启动Maxwell
bin/maxwell --config config.propertis
#或者 bin/maxwell --host='localhost' --port=3306 --user='maxwell' --password='maxwell' --producer=stdout --log_level=INFO #
maxwell默认在源数据库生成库maxwell记录相关信息
mysql> show tables;
+-------------------+
| Tables_in_maxwell |
+-------------------+
| bootstrap |
| columns |
| databases |
| heartbeats |
| positions |
| schemas |
| tables |
+-------------------+
插入数据
insert into tb1 value(1,"a");
insert into tb1 value(2,"b")
控制台输出
bin/maxwell --host='localhost' --port=3306 --user='root' --password='root' --producer=stdout
# 使用maxwell-bootstrap 加 --client_id = maxwell 手动触发不需要添加
Using kafka version: 1.0.0
10:58:36,398 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
10:58:36,676 INFO Maxwell - Maxwell v1.24.1 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-bin.000007:956390], lastHeartbeat=1584672514512]
10:58:36,843 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000007:917665], lastHeartbeat=0])
10:58:36,906 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000007:956390
10:58:36,921 INFO BinaryLogClient - Connected to localhost:3306 at mysql-bin.000007/956390 (sid:6379, cid:79)
10:58:36,921 INFO BinlogConnectorLifecycleListener - Binlog connected.
## 全量输出
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":10,"b":"10","dtm":null}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":11,"b":"100000000","dtm":null}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":12,"b":"1000000000","dtm":null}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":14,"b":"1000000000","dtm":null}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":20,"b":"123","dtm":"2020-01-01 14:00:00"}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":21,"b":"123","dtm":"2020-01-01 14:00:00"}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":22,"b":"123","dtm":"2020-01-01 14:00:00"}}
{"database":"sampledata","table":"tb2","type":"bootstrap-insert","ts":1584716272,"data":{"a":23,"b":"1234","dtm":"2020-01-01 14:00:00"}}
{"database":"sampledata","table":"tb2","type":"bootstrap-complete","ts":1584716273,"data":{}}
22:57:53,010 INFO SynchronousBootstrapper - bootstrapping ended for #1 sampledata.tb2 WHERE 1=1
# 增量输出
{"database":"sampledata","table":"tb1","type":"insert","ts":1584672522,"xid":18003,"commit":true,"data":{"a":1,"b":"a"}}
{"database":"sampledata","table":"tb1","type":"insert","ts":1584672522,"xid":18004,"commit":true,"data":{"a":2,"b":"b"}}
2.2 输出到kafak
2.2.1 kafak-topic
# 创建主题
bin/kafka-topics.sh --create --zookeeper hdp-m-1.data.dc.zjft.com:2181 hdp-m-2.data.dc.zjft.com:2181 hdp-e-1.data.dc.zjft.com:2181 --replication-factor 1 --partitions 1 --topic maxwell
# 查看所有主题
bin/kafka-topics.sh --clist --zookeeper hdp-m-1.data.dc.zjft.com:2181 hdp-m-2.data.dc.zjft.com:2181 hdp-e-1.data.dc.zjft.com:2181
# 消费者
bin/kafka-console-consumer.sh --topic maxwell2 --bootstrap-server hdp-s-1.data.dc.zjft.com:6667
2.2.2 配置
# config.propertis
log_level=info
producer=kafka
kafka_topic=maxwell
kafka.bootstrap.servers=hdp-s-1.data.dc.zjft.com:6667
# mysql login info
host=localhost
port=3306
user=maxwell
password=maxwell
运行
bin/maxwell --config config.protpertis
# 主题不存在报错:Error while fetching metadata with correlation id 379 : {tpoicname:=INVALID_TOPIC_EXCEPTION}
# 若无报错,则消费者即可得到数据
# 在mysql 收到插入(更新)数据
kafka-console-consumer.sh --topic maxwell1 --bootstrap-server hdp-s-1.data.dc.zjft.com:6667
{"database":"sampledata","table":"tb1","type":"insert","ts":1584673396,"xid":18339,"commit":true,"data":{"a":3,"b":"c"}}
{"database":"sampledata","table":"tb1","type":"update","ts":1584673750,"xid":18812,"commit":true,"data":{"a":3,"b":"d"},"old":{"b":"c"}}
{"database":"sampledata","table":"tb1","type":"update","ts":1584673774,"xid":18847,"commit":true,"data":{"a":1,"b":"d"},"old":{"b":"a"}}
{"database":"sampledata","table":"tb1","type":"delete","ts":1584673810,"xid":18899,"commit":true,"data":{"a":2,"b":"b"}}
{"database":"sampledata","table":"tb1","type":"delete","ts":1584673827,"xid":18925,"commit":true,"data":{"a":1,"b":"d"}}
{"database":"sampledata","table":"tb1","type":"insert","ts":1584673921,"xid":19052,"commit":true,"data":{"a":1,"b":"1"}}
3. bireme
运行bireme环境
- JAVA_HOME
- yun install jsvc
使用这个版本bireme 这个使用会报错:Cannot find daemon loader org/apache/commons/daemon/support/DaemonLoader
因为bireme 的 jar 包不在路径里,直接下载编译好的bireme 包
-
下载bireme
-
解压: tar xf bireme bireme-2.0.0-alpha-1.tar.gz
-
修改配置文件 vim etc/config.properties etc/maxwell.properties
# etc/config.propertis
target.url = jdbc:postgresql://10.34.11.141:2345/postgres
target.user = pentaho_user
target.passwd = cloud.Zijin
# data source name list, separated by comma.
data_source = maxwell1
# data source name list, separated by comma.
data_source = maxwell1
# data source "mysql1" type
maxwell1.type = maxwell
# kafka server which maxwell write binlog into.
maxwell1.kafka.server = hdp-s-1.data.dc.zjft.com:6667
# kafka topic which maxwell write binlog into.
maxwell1.kafka.topic = maxwell
# kafka groupid used for consumer.
maxwell1.kafka.groupid = bireme
# set the IP address for bireme state server.
state.server.addr = 0.0.0.0
# set the port for bireme state server.
state.server.port = 8080
maxwell1.properties的maxwell1一定要和bireme的data_source保持一致
# etc/maxwell.properties
test.tb1 = public.tb1
若postgres表不存在则:cn.hashdata.bireme.BiremeException: Greenplum table and MySQL table size are inconsistent!*
- 在postgres建表
create table tb1(a int, b char(11), primary key(a));
-- 字段需要和mysql一直否在报:字段不存在
-- 字段类型需要一直,否则同步到postgres中会忽略该字段
-
启动bireme : bin/bireme start
-
监控,http://127.0.0.1:8080/pretty
-
监听日志:tai l -f logs/bireme.out
-
测试(查询psotgres)
select * from tb1;
a | b
----±-----------
3 | d
6 | 6
7 | 7
4 | 4
10 | 10
1 | 1
5 | 5