Oracle同步数据到kafka的方法
环境准备
软件准备
- centos linux 7.6.1810 (2台,a主机,b主机)
- oracle 11.2.0.4(a主机安装)
- kafka 2.13-2.6.0 (b主机安装)
- kafka-connect-oracle-master (b主机安装,开源程序,用于同步oracle数据到kafka)
- apache-maven 3.6.3 (b主机安装,kafka-connect-oracle-master 的打包工具)
- jdk-8u261-linux-x64.rpm (b主机安装)
下载地址
- cenos 和oracle 和 jdk 自行到官网下载
- kafka http://kafka.apache.org/downloads
- kafka connect oracle https://github.com/tianxiancode/kafka-connect-oracle
- apache-maven 3.6.3 http://maven.apache.org/download.cgi
实施过程
oracle主机(a)配置
oracle实例配置项:
- 开启归档日志
- 开启附加日志
- 创建kafka-connect-oracle-master连接用户
- 创建测试数据生成用户及测试表
--开启归档日志 sqlplus / as sysdba sql>shutdown immediate sql>startup mount sql>alter database archivelog; sql>alter database open; --开启附加日志 sql>alter database add supplemental log data (all) columns; --创建kafka-connect-oracle-master连接用户 create role logmnr_role; grant create session to logmnr_role; grant execute_catalog_role,select any transaction ,select any dictionary to logmnr_role; create user kminer identified by kminerpass; grant logmnr_role to kminer; alter user kminer quota unlimited on users; --创建测试数据生成用户及测试表 create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100m autoextend on next 10m; create user whtest identified by whtest default tablespace test_date; grant connect,resource to whtest; grant execute on dbms_scheduler to whtest; grant execute on dbms_random to whtest; grant create job to whtest; create table t1 ( id int , name char(10), createtime date default sysdate ); alter table whtest.t1 add constraint pk_id_t1 primary key (id) using index tablespace test_date; create table t2 ( id int , name char(10), createtime date default sysdate ); alter table whtest.t2 add constraint pk_id_t2 primary key (id) using index tablespace test_date; create table t3 ( id int , name char(10), createtime date default sysdate ); alter table whtest.t3 add constraint pk_id_t3 primary key (id) using index tablespace test_date; begin dbms_scheduler.create_job( job_name=> 't1_job', job_type=> 'plsql_block', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t1 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t1 every 5 sec'); end; / begin dbms_scheduler.create_job( job_name=> 't2_job', job_type=> 'plsql_block', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t2 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t1 every 5 sec'); end; / begin dbms_scheduler.create_job( job_name=> 't3_job', job_type=> 'plsql_block', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t3 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t3 every 5 sec'); end; / --job创建之后,暂时先diable,待kafka配置完成之后再enable exec dbms_scheduler.disable('t1_job'); exec dbms_scheduler.disable('t2_job'); exec dbms_scheduler.disable('t3_job'); exec dbms_scheduler.enable('t1_job'); exec dbms_scheduler.enable('t2_job'); exec dbms_scheduler.enable('t3_job');
kafka主机(b)配置
将下载好的kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、jdk 1.8.0上传至b主机/soft目录待使用。
主机hosts文件添加解析
[root@softdelvily ~]# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.20.44 softdelvily localhost
安装jdk
[root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm warning: jdk-8u261-linux-x64.rpm: header v3 rsa/sha256 signature, key id ec551f03: nokey preparing... ################################# [100%] updating / installing... 1:jdk1.8-2000:1.8.0_261-fcs ################################# [100%] unpacking jar files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar...
配置apache-maven工具
将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。
[root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -c /usr/local/ apache-maven-3.6.3/readme.txt apache-maven-3.6.3/license ..... [root@softdelvily soft]# cd /usr/local/ [root@softdelvily local]# ll total 0 drwxr-xr-x. 6 root root 99 sep 23 09:56 apache-maven-3.6.3 drwxr-xr-x. 2 root root 6 apr 11 2018 bin ..... [root@softdelvily local]# vi /etc/profile ....... ##添加如下环境变量 maven_home=/usr/local/apache-maven-3.6.3 export maven_home export path=${path}:${maven_home}/bin [root@softdelvily local]# source /etc/profile [root@softdelvily local]# mvn -v apache maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) maven home: /usr/local/apache-maven-3.6.3 java version: 1.8.0_262, vendor: oracle corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre default locale: en_us, platform encoding: utf-8 os name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"
配置kafka 2.13-2.6.0
解压kafka 2.13-2.6.0 至/usr/local目录。
[root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -c /usr/local/ kafka_2.13-2.6.0/ kafka_2.13-2.6.0/license ...... [root@softdelvily soft]# cd /usr/local/ [root@softdelvily local]# ll total 0 drwxr-xr-x. 6 root root 99 sep 23 09:56 apache-maven-3.6.3 drwxr-xr-x. 6 root root 89 jul 29 02:23 kafka_2.13-2.6.0 .....
开启kafka,并创建对应同步数据库过的topic
--1、session1 启动zk [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties [2020-09-23 10:06:49,158] info reading configuration from: ../config/zookeeper.properties ....... [2020-09-23 10:06:49,311] info using checkintervalms=60000 maxperminute=10000 (org.apache.zookeeper.server.containermanager) --2、session2 启动kafka [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties --3、session3 创建cdczztar [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar created topic cdczztar. [root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets cdczztar
配置kafka-connect-oracle-maste
解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。
--解压zip包 [root@softdelvily soft]# unzip kafka-connect-oracle-master.zip [root@softdelvily soft]# ll total 201180 -rw-r--r--. 1 root root 9506321 sep 22 16:05 apache-maven-3.6.3-bin.tar.gz -rw-r--r--. 1 root root 127431820 sep 8 10:43 jdk-8u261-linux-x64.rpm -rw-r--r--. 1 root root 65537909 sep 22 15:59 kafka_2.13-2.6.0.tgz drwxr-xr-x. 5 root root 107 sep 8 15:48 kafka-connect-oracle-master -rw-r--r--. 1 root root 3522729 sep 22 14:14 kafka-connect-oracle-master.zip [root@softdelvily soft]# cd kafka-connect-oracle-master/config/ [root@softdelvily config]# ll total 4 -rw-r--r--. 1 root root 1135 sep 8 15:48 oraclesourceconnector.properties --调整properties配置文件 --需要调整项db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具体说明参考readme.md [root@softdelvily config]# vi oraclesourceconnector.properties name=oracle-logminer-connector connector.class=com.ecer.kafka.connect.oracle.oraclesourceconnector db.name.alias=zztar tasks.max=1 topic=cdczztar db.name=zztar db.hostname=192.168.xx.xx db.port=1521 db.user=kminer db.user.password=kminerpass db.fetch.size=1 table.whitelist=whtest.t1,whtest.t2 table.blacklist=whtest.t3 parse.dml.data=true reset.offset=true start.scn= multitenant=false --编译程序 [root@softdelvily ~]# cd /soft/kafka-connect-oracle-master [root@softdelvily kafka-connect-oracle-master]# mvn clean package [info] scanning for projects... ....... [info] building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar with assembly file: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar [info] ------------------------------------------------------------------------ [info] build success [info] ------------------------------------------------------------------------ [info] total time: 94.03 s [info] finished at: 2020-09-23t10:25:52+08:00 [info] ------------------------------------------------------------------------
将如下文件复制到kafka工作目录。
- 复制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$kafka_home/lib
- 复制config/oraclesourceconnector.properties 文件到$kafka_home/config
[root@softdelvily config]# cd /soft/kafka-connect-oracle-master/target/ [root@softdelvily target]# cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/ [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/lib [root@softdelvily lib]# cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/ [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/config/ [root@softdelvily config]# cp oraclesourceconnector.properties /usr/local/kafka_2.13-2.6.0/config/
启动kafka-connect-oracle
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/oraclesourceconnector.properties ...... (com.ecer.kafka.connect.oracle.oraclesourcetask:187) [2020-09-23 10:40:31,375] info log miner will start at new position scn : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.oraclesourcetask:188)
启动kafka消费者
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic cdczztar
启动数据库job
[oracle@oracle01 ~]$ sqlplus / as sysdba sql*plus: release 11.2.0.4.0 production on wed sep 23 10:45:16 2020 copyright (c) 1982, 2011, oracle. all rights reserved. set pagesize 999 connected to: oracle database 11g enterprise edition release 11.2.0.4.0 - 64bit production with the partitioning, olap, data mining and real application testing options sql> sql> conn whtest/whtest connected. sql> exec dbms_scheduler.enable('t1_job'); pl/sql procedure successfully completed.
kafka消费者界面
出现类似记录,表明同步成功,数据以key:value的形式输出。
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"scn"},{"type":"string","optional":false,"field":"seg_owner"},{"type":"string","optional":false,"field":"table_name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.timestamp","version":1,"field":"timestamp"},{"type":"string","optional":false,"field":"sql_redo"},{"type":"string","optional":false,"field":"operation"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.timestamp","version":1,"field":"createtime"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.timestamp","version":1,"field":"createtime"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"zztar.whtest.t1.row"},"payload":{"scn":2847668,"seg_owner":"whtest","table_name":"t1","timestamp":1600829206000,"sql_redo":"insert into \"whtest\".\"t1\"(\"id\",\"name\",\"createtime\") values (557005146,'533888119 ',timestamp ' 2020-09-23 10:46:46')","operation":"insert","data":{"id":5.57005146e8,"name":"533888119","createtime":1600829206000},"before":null}}
到此这篇关于oracle同步数据到kafka的文章就介绍到这了,更多相关oracle同步数据到kafka内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!