Oracle在线及重做日志分析数据同步实例
sl_syncdata实时同步程序-稳定版
sl_syncdata配置:
--双向复制数据表实时同步 支持断点续传
-- 一、初始环境配置
--安装logminer
-- @d:\oracle\product\10.2.0\db_1\rdbms\admin\dbmslm.sql
-- @d:\oracle\product\10.2.0\db_1\rdbms\admin\dbmslmd.sql
--设置两端
--alter database archive;
--alter database force logging;
--alter database add supplemental log data;--不开启的话只能看到以sys用户登陆下的数据更新
--alter database add supplemental log data (primary key,unique,foreign key) columns;
alter session set nls_date_format='yyyy-mm-dd hh24:mi:ss';
--1 日志目录
create directory logmnr as 'd:\oracle\product\10.2.0\logmnr';
create directory syncdataconfig as 'd:\oracle\product\10.2.0\logmnr\config';
--2 dml队列目录
create directory syncdata as 'd:\oracle\product\10.2.0\logmnr\syncdata';
--3 scn标志位初始化
-- 实例化启动进程时的scn到d:\oracle\product\10.2.0\logmnr\scn.txt, 标志为全局 g_scn
declare
fhandle utl_file.file_type;
v_scn number(30);
begin
v_scn:=dbms_flashback.get_system_change_number();
fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'w');
utl_file.put_line(fhandle , v_scn);
utl_file.fclose(fhandle);
end;
/
-- 4 archive log 标志位初始化
declare
fhandle utl_file.file_type;
v_recid v$archived_log.recid%type;
begin
select max(recid) into v_recid from v$archived_log order by 1;
fhandle := utl_file.fopen ('syncdataconfig','archivelog_ck.txt', 'w');
utl_file.put_line(fhandle ,to_char(v_recid));--初始化捕获进程前存在的归档不传送
utl_file.fclose(fhandle);
exception
when others then return;
end;
/
-- 5 dml队列初始化
declare
fhandle utl_file.file_type;
begin
fhandle := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'w');
utl_file.put_line(fhandle , 'q0000000000'); -- 初始化队列序列标志
utl_file.fclose(fhandle);
end;
/
-- 7 发送队列检查点标志初始化
declare
fhandle utl_file.file_type;
begin
fhandle := utl_file.fopen('syncdataconfig', 'send_ck.txt', 'w');
utl_file.fclose(fhandle);
end;
/
-- 8 xid事务标识集合初始化
declare
fhandle utl_file.file_type;
begin
fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'w');
utl_file.fclose(fhandle);
end;
/
-- 9 所有队列序号集合初始化
declare
fhandle utl_file.file_type;
begin
fhandle := utl_file.fopen('syncdataconfig', 'queue_array.txt', 'w');
utl_file.fclose(fhandle);
end;
/
-- 10 捕获日志初始化
declare
fhandle utl_file.file_type;
begin
fhandle := utl_file.fopen('logmnr', 'capture_log.txt', 'w');
utl_file.fclose(fhandle);
end;
/
-- 11 发送日志初始化
declare
fhandle utl_file.file_type;
begin
fhandle := utl_file.fopen('logmnr', 'send_log.txt', 'w');
utl_file.fclose(fhandle);
end;
/
/*-- 9 数据库对照字典初始化
begin
dbms_logmnr_d.build(dictionary_filename=>'rac1emz_dict.ora',
dictionary_location=>'d:\oracle\product\10.2.0\logmnr\config');
end;
/*/
--10 初始化自定义类型
create or replace type mytabletype as table of varchar2 (255);
-- 二、进程
--1 源数据库捕获进程
--2 源数据库传播进程 --检查点记录,支持断点续传
--三、创建源数据库捕获进程任务调度
/* 创建可执行程序 */
begin
dbms_scheduler.create_program(
program_name => 'sys.proc_datacapture_archivelog',
program_action => 'sys.datacapture_archivelog',
program_type => 'stored_procedure',
number_of_arguments => 3,
comments => '数据同步程序',
enabled => false
);
end;
/
/* 设置可执行程序的输入参数 */
begin
dbms_scheduler.define_program_argument(
program_name => 'sys.proc_datacapture_archivelog',
argument_position => 1,
argument_type => 'varchar2',
default_value => ''
);
dbms_scheduler.define_program_argument(
program_name => 'sys.proc_datacapture_archivelog',
argument_position => 2,
argument_type => 'varchar2',
default_value => ''
);
dbms_scheduler.define_program_argument(
program_name => 'sys.proc_datacapture_archivelog',
argument_position => 3,
argument_type => 'varchar2',
default_value => ''
);
end;
/
/* 创建调度表 */
begin
dbms_scheduler.create_schedule(
schedule_name => 'sys.sch_datacapture_archivelog',
repeat_interval => 'freq=secondly;interval=5',
start_date => sysdate,
comments => '数据同步调度'
);
end;
/
/* 创建作业 */
begin
dbms_scheduler.create_job(
job_name => 'sys.job_demosync',
program_name => 'sys.proc_datacapture_archivelog',
schedule_name => 'sys.sch_datacapture_archivelog',
job_class => 'default_job_class',
comments => '数据同步作业',
auto_drop => false,
enabled => false
);
end;
/
/* 启动可执行程序 */
exec dbms_scheduler.enable('proc_datacapture_archivelog');
/* 启动作业 */
exec dbms_scheduler.enable('job_demosync');
/* 设置不同的作业参数 */
begin
dbms_scheduler.set_job_argument_value(
job_name => 'sys.job_demosync',
argument_position => 1,
argument_value => 'test'
);
dbms_scheduler.set_job_argument_value(
job_name => 'sys.job_demosync',
argument_position => 2,
argument_value => 'demo'
);
dbms_scheduler.set_job_argument_value(
job_name => 'sys.job_demosync',
argument_position => 3,
argument_value => 'rac2emz.test'
);
end;
/
----job管理-----------------------------------------------------------------------
/* 禁用job */
exec dbms_scheduler.disable('job_demosync');
/* 执行job */
exec dbms_scheduler.run_job('job_demosync');
/* 停止job */
exec dbms_scheduler.stop_job('job_demosync');
/* 删除job */
exec dbms_scheduler.drop_job('job_demosync');
exec dbms_scheduler.drop_schedule('sys.sch_datacapture_archivelog');
exec dbms_scheduler.drop_program('sys.proc_datacapture_archivelog');
------------------------------------------------
--三、创建源数据库发送进程任务调度
/* 创建可执行程序 */
begin
dbms_scheduler.create_program(
program_name => 'sys.proc_datareplicat',
program_action => 'sys.datareplicat',
program_type => 'stored_procedure',
number_of_arguments => 0,
comments => '数据同步程序-发送进程',
enabled => false
);
end;
/
/* 创建调度表 */
begin
dbms_scheduler.create_schedule(
schedule_name => 'sys.sch_datareplicat',
repeat_interval => 'freq=secondly;interval=2',
start_date => sysdate,
comments => '数据同步调度-发送进程'
);
end;
/
/* 创建作业 */
begin
dbms_scheduler.create_job(
job_name => 'sys.job_demosync2',
program_name => 'sys.proc_datareplicat',
schedule_name => 'sys.sch_datareplicat',
job_class => 'default_job_class',
comments => '数据同步作业-发送进程',
auto_drop => false,
enabled => false
);
end;
/
/* 启动可执行程序 */
exec dbms_scheduler.enable('proc_datareplicat');
/* 启动作业 */
exec dbms_scheduler.enable('job_demosync2');
----job管理-----------------------------------------------------------------------
/* 禁用job */
exec dbms_scheduler.disable('job_demosync2');
/* 执行job */
exec dbms_scheduler.run_job('job_demosync2');
/* 停止job */
exec dbms_scheduler.stop_job('job_demosync2');
/* 删除job */
exec dbms_scheduler.drop_job('job_demosync2');
exec dbms_scheduler.drop_schedule('sys.sch_datareplicat');
exec dbms_scheduler.drop_program('sys.proc_datareplicat');
--查看状态
select t.job_name,t.repeat_interval,t.state,t.start_date,t.last_start_date,t.next_run_date,t.comments
from dba_scheduler_jobs t
where t.job_name in('job_demosync','job_demosync2');
------------------------------------------------------
抓取归档日志——存储过程:datacapture_archivelog.prc
create or replace procedure datacapture_archivelog(tableowner varchar2,
tablename varchar2,
database_link_name varchar2) is
--作者:shl
--日期:2012-04-28
v_scn number(30);
fhandle utl_file.file_type; --oracle文件类型
fp_buffer varchar2(4000); --文件输出缓存
logfile utl_file.file_type; --oracle文件类型
logfile_buffer varchar2(4000); --文件输出缓存
v_recid number(10); --表archive log 序号
v_recidck number(10); --archivelog_ck 序号
sql_txt varchar2(4000); --待执行的 sql_redo
v_dblink_name varchar2(20) := '@' || database_link_name; --目的数据库联接
sqlwhere varchar2(1000); --待执行的 sql_redo 条件语句
v_tableowner varchar2(20); --待同步的用户
v_tablename varchar2(20); --待同步的表
v_queueck number(10); --队列序列scn
v_queuename varchar(20); --dml队列名称
ischeckredo number(1) := 0; --是已经否抽取redo日志
v_sqlcommitnum number(5) := 10000; --已经提交的事务dml 分批提交给目的数据库
v_sqlnum number := 0; --已经提交的事务dml当前个数
l_string long;
l_data mytabletype := mytabletype();
n number;
isddl number(1) := 0; --是否是dll操作 0不是 1是
iscommited number(1) := 0; --是否已经commit操作 0不是 1是
xid_str varchar2(8000); --事务序号数组
j number := 0; --tb_dml_xidsqn序列下标
type tb_xid_type is table of v$logmnr_contents.xid%type index by binary_integer; --定义待排除dll
tb_ddl_xid tb_xid_type; --定义待排除dll表
tb_dml_xid tb_xid_type; --定义待排除dll表
duplicat_xid number(1) := 0; -- 0 不重复 1 重复
cursor c_ddl is
select distinct xid
from v$logmnr_contents
where seg_name = v_tablename
and operation = 'ddl';
begin
--打开捕获日志
logfile := utl_file.fopen('logmnr', 'capture_log.txt', 'a');
--初始化schema
v_tableowner := tableowner;
v_tablename := tablename;
--捕获归档日志
--dbms_output.put_line();
utl_file.put_line(logfile, '');
utl_file.put_line(logfile, 'analyze archivelog...start:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'r');
utl_file.get_line(fhandle, fp_buffer);
v_scn := fp_buffer;
utl_file.fclose(fhandle);
--dbms_output.put_line();
utl_file.put_line(logfile, '0====v_scn:' || v_scn);
select max(recid) into v_recid from v$archived_log order by 1;
/*exception
when no_data_found then
return;*/
--dbms_output.put_line();
utl_file.put_line(logfile, '1====1v_recid:' || v_recid);
fhandle := utl_file.fopen('syncdataconfig', 'archivelog_ck.txt', 'r');
utl_file.get_line(fhandle, fp_buffer);
v_recidck := fp_buffer;
utl_file.fclose(fhandle);
--dbms_output.put_line();
utl_file.put_line(logfile, '1====2v_recid_ck:' || v_recidck);
if trim(v_recid) != trim(nvl(v_recidck, 0)) then
--dbms_output.put_line();
utl_file.put_line(logfile, '2====1存在未分析的archive log');
else
if ischeckredo = 0 then
utl_file.put_line(logfile, '2====1 no new archive log');
utl_file.put_line(logfile, '2====analyze archivelog...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
utl_file.fclose(logfile);
sys.datacapture_redo(tableowner, tablename, database_link_name);
ischeckredo := 1;
return;
end if;
end if;
--一次加载从archive_ck归档检查点到最新归档,及 当前 redo
for rs in (select name, recid from v$archived_log where recid >= v_recidck) loop
dbms_logmnr.add_logfile(options => dbms_logmnr.addfile,
logfilename => rs.name);
--dbms_output.put_line();
utl_file.put_line(logfile, '2====2 archive logfilename:' || rs.name);
end loop;
--dbms_output.put_line();
utl_file.put_line(logfile, '2====3补充redo log');
for rs in (select f.member
from v$logfile f, v$log l
where f.group# = l.group#
and l.status = 'current') loop
dbms_logmnr.add_logfile(options => dbms_logmnr.addfile,
logfilename => rs.member);
--dbms_output.put_line();
utl_file.put_line(logfile, '2====3 redo logfilename:' || rs.member);
end loop;
dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog);
--dbms_logmnr.start_logmnr(dictfilename => 'd:\oracle\product\10.2.0\logmnr\rac1emz_testdict.ora');
--待排除dll
open c_ddl;
fetch c_ddl bulk collect
into tb_ddl_xid;
close c_ddl;
--待查询的dml
for c_tranxid in (select distinct xid
from v$logmnr_contents
where seg_name = v_tablename) loop
isddl := 0;
duplicat_xid := 0;
iscommited := 0;
--判断是否dll操作
for i in 1 .. tb_ddl_xid.count loop
if c_tranxid.xid = tb_ddl_xid(i) then
isddl := 1;
--dbms_output.put_line();
utl_file.put_line(logfile, '3==1 ddl xid:' || c_tranxid.xid);
end if;
end loop;
--不是dll,添加dml xid
if isddl = 0 then
--是否已经commit
for c_commitxid in (select xid
from v$logmnr_contents
where xid = c_tranxid.xid
and operation = 'commit') loop
iscommited := 1;
--dbms_output.put_line();
utl_file.put_line(logfile, '3==1 commited xid:' || c_commitxid.xid);
end loop;
if iscommited = 1 then
--判断xid事务标识是否已经存在,如果存在不添加,防止重复分析xid
fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'r');
loop
begin
utl_file.get_line(fhandle, fp_buffer);
if rtrim(ltrim(fp_buffer)) = c_tranxid.xid then
duplicat_xid := 1;
--dbms_output.put_line(); --重复xid
utl_file.put_line(logfile, '3====2 已分析过xid :' || c_tranxid.xid);
end if;
exception
when no_data_found then
exit;
end;
end loop;
--判断没有重复的插入表tb_dml_xid
if duplicat_xid = 0 then
tb_dml_xid(j) := c_tranxid.xid;
j := j + 1;
end if;
utl_file.fclose(fhandle);
end if;
end if;
end loop;
--没有dml tran 退出程序
--dbms_output.put_line();
utl_file.put_line(logfile, '3====3 dml tran count :' || tb_dml_xid.count);
if tb_dml_xid.count = 0 then
dbms_logmnr.end_logmnr;
else
--打开xid事务标识集合,准备新增xid事务标识
fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'a');
--dbms_output.put_line();
utl_file.put_line(logfile, '5====打开xid事务标识集合');
--创建xid_str字符串
--dbms_output.put_line();
utl_file.put_line(logfile, '5====创建xid_str字符串');
for i in 1 .. tb_dml_xid.count loop
if tb_dml_xid.count != 1 then
if i = 1 then
--新增入一行xid事务标识
utl_file.put_line(fhandle, tb_dml_xid(i - 1));
--dbms_output.put_line();
utl_file.put_line(logfile, 'a1_' || tb_dml_xid(i - 1));
xid_str := tb_dml_xid(i - 1);
else
--新增入一行xid事务标识
utl_file.put_line(fhandle, tb_dml_xid(i - 1));
utl_file.put_line(logfile, 'a2_' || tb_dml_xid(i - 1));
xid_str := xid_str || ',' || tb_dml_xid(i - 1);
end if;
else
--新增入一行xid事务标识
utl_file.put_line(fhandle, tb_dml_xid(i - 1));
--dbms_output.put_line('a3_' || tb_dml_xid(i - 1));
utl_file.put_line(logfile, 'a3_' || tb_dml_xid(i - 1));
xid_str := tb_dml_xid(i - 1);
end if;
end loop;
utl_file.fclose(fhandle);
--dbms_output.put_line();
utl_file.put_line(logfile, '5====关闭xid事务标识集合');
--dbms_output.put_line();
utl_file.put_line(logfile, '5====xid_str字符串: ' || xid_str);
l_string := xid_str || ',';
loop
exit when l_string is null;
n := instr(l_string, ',');
l_data.extend;
l_data(l_data.count) := ltrim(rtrim(substr(l_string, 1, n - 1)));
l_string := substr(l_string, n + 1);
end loop;
--队列
--读取队列开始序列
fhandle := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'r');
utl_file.get_line(fhandle, fp_buffer);
utl_file.fclose(fhandle);
--dbms_output.put_line();
utl_file.put_line(logfile, '6====读取队列开始序列');
--更新队列
v_queueck := to_number(substr(fp_buffer, 2, 10));
v_queuename := 'q' || lpad((v_queueck + 1), 10, '0');
fhandle := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'w');
utl_file.put_line(fhandle, v_queuename);
utl_file.fclose(fhandle);
--dbms_output.put_line();
utl_file.put_line(logfile, '7====更新队列队列序列');
----创建队列文件
fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'w');
utl_file.fclose(fhandle);
utl_file.put_line(logfile, '8====创建队列v_queuename:' || v_queuename);
--添加dml
fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'a');
for c_sql in (select sql_redo, operation
from v$logmnr_contents
where xid in (select *
from the (select cast(l_data as mytabletype)
from dual))
and operation in
('insert', 'delete', 'update', 'commit')
order by scn) loop
if c_sql.sql_redo is not null then
--判断如果是insert,不截取rowid
if c_sql.operation = 'insert' or c_sql.operation = 'commit' then
if c_sql.operation = 'insert' then
sql_txt := replace(c_sql.sql_redo,
'"' || v_tablename || '"',
'"' || v_tablename || '"' || v_dblink_name);
utl_file.put_line(fhandle, sql_txt);
utl_file.put_line(logfile, '9====sql_redo1:' || sql_txt);
else
utl_file.put_line(fhandle, c_sql.sql_redo);
utl_file.put_line(logfile, '9====sql_redo2:' || c_sql.sql_redo);
end if;
else
sql_txt := substr(c_sql.sql_redo,
0,
instr(c_sql.sql_redo, 'rowid') - 5);
sql_txt := replace(sql_txt,
'"' || v_tablename || '"',
'"' || v_tablename || '"' || v_dblink_name);
utl_file.put_line(fhandle, sql_txt);
utl_file.put_line(logfile, '9====sql_redo3:' || sql_txt);
end if;
end if;
end loop;
utl_file.fclose(fhandle);
utl_file.put_line(logfile, '10====');
--新增一行到 queue_arrary集合
fhandle := utl_file.fopen('syncdataconfig', 'queue_array.txt', 'a');
utl_file.put_line(fhandle, v_queuename);
utl_file.fclose(fhandle);
--dbms_output.put_line('11====新增到queue_arrary集合的队列名称:' || v_queuename);
utl_file.put_line(logfile, '11====新增到queue_arrary集合的队列名称:' || v_queuename);
dbms_logmnr.end_logmnr;
--dbms_output.put_line('9====');
utl_file.put_line(logfile, '9====');
v_scn := dbms_flashback.get_system_change_number();
fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'w');
utl_file.put_line(fhandle, v_scn);
utl_file.fclose(fhandle);
--dbms_output.put_line('10====v_scn:' || v_scn);
utl_file.put_line(logfile, '10====v_scn:' || v_scn);
end if;
--结束后变更检查点
--归档检查点
fhandle := utl_file.fopen('syncdataconfig', 'archivelog_ck.txt', 'w');
utl_file.put_line(fhandle, v_recid);
utl_file.fclose(fhandle);
--dbms_output.put_line('11====v_recid_ck:' || v_recid);
utl_file.put_line(logfile, '11====v_recid_ck:' || v_recid);
v_scn := dbms_flashback.get_system_change_number();
fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'w');
utl_file.put_line(fhandle, v_scn);
utl_file.fclose(fhandle);
--dbms_output.put_line('12====v_scn:' || v_scn);
utl_file.put_line(logfile, '12====v_scn:' || v_scn);
utl_file.put_line(logfile, '2====analyze archivelog...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
utl_file.fclose(logfile);
if ischeckredo = 0 then
sys.datacapture_redo(tableowner, tablename, database_link_name);
end if;
exception
when others then
dbms_logmnr.end_logmnr;
utl_file.put_line(logfile, '2====analyze archivelog...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
--dbms_output.put_line(sqlerrm);
utl_file.put_line(logfile, '13====2sqlerrm: ' || sqlerrm);
utl_file.fclose(logfile);
return;
end datacapture_archivelog;
/
-----------------------------------------------------------------------------
抓取重做日志——存储过程:datacapture_redo.prc
create or replace procedure datacapture_redo(tableowner varchar2,
tablename varchar2,
database_link_name varchar2) is
--作者:shl
--日期:2012-04-28
v_scn number(30);
fhandle utl_file.file_type; --oracle文件类型
fp_buffer varchar2(4000); --文件输出缓存
logfile utl_file.file_type; --oracle文件类型
logfile_buffer varchar2(4000); --文件输出缓存
v_recid number(10); --表archive log 序号
v_recidck number(10); --archivelog_ck 序号
sql_txt varchar2(4000); --待执行的 sql_redo
v_dblink_name varchar2(20) := '@'||database_link_name; --目的数据库联接
sqlwhere varchar2(1000); --待执行的 sql_redo 条件语句
v_tableowner varchar2(20); --待同步的用户
v_tablename varchar2(20); --待同步的表
v_queueck number(10); --队列序列scn
v_queuename varchar(20); --dml队列名称
v_sqlcommitnum number(5) := 10000; --已经提交的事务dml 分批提交给目的数据库
v_sqlnum number:= 0; --已经提交的事务dml当前个数
l_string long;
l_data mytabletype := mytabletype();
n number;
isddl number(1) := 0; --是否是ddl操作 0不是 1是
iscommited number(1) := 0; --是否已经commit操作 0不是 1是
xid_str varchar2(8000); --事务序号数组
j number := 0; --tb_dml_xidsqn序列下标
type tb_xid_type is table of v$logmnr_contents.xid%type index by binary_integer; --定义待排除dll
tb_ddl_xid tb_xid_type; --定义待排除dll表
tb_dml_xid tb_xid_type; --定义待排除dll表
duplicat_xid number(1) := 0; -- 0 不重复 1 重复
cursor c_ddl is
select distinct xid
from v$logmnr_contents
where seg_name = v_tablename
and operation = 'ddl';
begin
--打开捕获日志
logfile := utl_file.fopen('logmnr', 'capture_log.txt', 'a');
--初始化schema
v_tableowner := tableowner;
v_tablename := tablename;
--捕获在线日志
--dbms_output.put_line('analyze redo...start');
utl_file.put_line(logfile, '');
utl_file.put_line(logfile, 'analyze redo...start :'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'r');
utl_file.get_line(fhandle, fp_buffer);
v_scn := fp_buffer;
utl_file.fclose(fhandle);
--dbms_output.put_line('1====v_scn:' || v_scn);
utl_file.put_line(logfile, '1====v_scn:' || v_scn);
for rs in (select f.member
from v$logfile f, v$log l
where f.group# = l.group#
and l.status = 'current') loop
dbms_logmnr.add_logfile(options => dbms_logmnr.addfile,
logfilename => rs.member);
dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog);
--dbms_logmnr.start_logmnr(dictfilename => 'd:\oracle\product\10.2.0\logmnr\rac1emz_testdict.ora');
--dbms_output.put_line('2====logfilename:' || rs.member);
utl_file.put_line(logfile, '2====logfilename:' || rs.member);
--待排除dll
open c_ddl;
fetch c_ddl bulk collect
into tb_ddl_xid;
close c_ddl;
--待查询的dml
for c_tranxid in (select distinct xid
from v$logmnr_contents
where seg_name = v_tablename) loop
isddl:= 0;
duplicat_xid := 0;
iscommited := 0;
--判断是否ddl操作
for i in 1 .. tb_ddl_xid.count loop
if c_tranxid.xid = tb_ddl_xid(i) then
isddl := 1;
--dbms_output.put_line('3==1 ddl xid:'||c_tranxid.xid);
utl_file.put_line(logfile, '3==1 ddl xid:'||c_tranxid.xid);
end if;
end loop;
--不是dll,添加dml xid
if isddl = 0 then
--dbms_output.put_line('3==1 dml xid:'||c_tranxid.xid);
utl_file.put_line(logfile, '3==1 dml xid:'||c_tranxid.xid);
--是否已经commit
for c_commit in (select xid from v$logmnr_contents
where xid = c_tranxid.xid
and operation = 'commit') loop
iscommited := 1;
--dbms_output.put_line('3==1 commited xid:'||c_commit.xid);
utl_file.put_line(logfile, '3==1 commited xid:'||c_commit.xid);
end loop;
if iscommited = 1 then
--判断xid事务标识是否已经存在,如果存在不添加,防止重复分析xid
fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'r');
loop
begin
utl_file.get_line(fhandle, fp_buffer);
if rtrim(ltrim(fp_buffer)) = c_tranxid.xid then
duplicat_xid := 1;
--dbms_output.put_line('3====2 已分析过xid :' || c_tranxid.xid); --重复xid
utl_file.put_line(logfile, '3====2 已分析过xid :' || c_tranxid.xid);
end if;
exception
when no_data_found then
exit;
end;
end loop;
--判断没有重复的插入表tb_dml_xid
if duplicat_xid = 0 then
tb_dml_xid(j) := c_tranxid.xid;
j := j + 1;
end if;
utl_file.fclose(fhandle);
end if;
end if;
end loop;
--没有dml tran 退出程序
--dbms_output.put_line('3====3 dml tran count :' || tb_dml_xid.count);
utl_file.put_line(logfile, '3====3 dml tran count :' || tb_dml_xid.count);
if tb_dml_xid.count = 0 then
dbms_logmnr.end_logmnr;
utl_file.put_line(logfile, 'analyze redo...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
utl_file.fclose(logfile);
return;
end if;
--打开xid事务标识集合,准备新增xid事务标识
fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'a');
--dbms_output.put_line('4====打开xid事务标识集合');
utl_file.put_line(logfile, '4====打开xid事务标识集合');
--创建xid_str字符串
--dbms_output.put_line('4====创建xid_str字符串');
utl_file.put_line(logfile, '4====创建xid_str字符串');
for i in 1 .. tb_dml_xid.count loop
if tb_dml_xid.count != 1 then
if i = 1 then
--新增入一行xid事务标识
utl_file.put_line(fhandle, tb_dml_xid(i - 1));
--dbms_output.put_line('a1_' || tb_dml_xid(i - 1));
utl_file.put_line(logfile, 'a1_' || tb_dml_xid(i - 1));
xid_str := tb_dml_xid(i - 1);
else
--新增入一行xid事务标识
utl_file.put_line(fhandle, tb_dml_xid(i - 1));
--dbms_output.put_line('a2_' || tb_dml_xid(i - 1));
utl_file.put_line(logfile, 'a2_' || tb_dml_xid(i - 1));
xid_str := xid_str || ',' || tb_dml_xid(i - 1);
end if;
else
--新增入一行xid事务标识
utl_file.put_line(fhandle, tb_dml_xid(i - 1));
--dbms_output.put_line('a3_' || tb_dml_xid(i - 1));
utl_file.put_line(logfile, 'a3_' || tb_dml_xid(i - 1));
xid_str := tb_dml_xid(i - 1);
end if;
end loop;
utl_file.fclose(fhandle);
--dbms_output.put_line('4====关闭xid事务标识集合');
utl_file.put_line(logfile, '4====关闭xid事务标识集合');
--dbms_output.put_line('4====xid_str字符串: ' || xid_str);
utl_file.put_line(logfile, '4====xid_str字符串: ' || xid_str);
l_string := xid_str || ',';
loop
exit when l_string is null;
n := instr(l_string, ',');
l_data.extend;
l_data(l_data.count) := ltrim(rtrim(substr(l_string, 1, n - 1)));
l_string := substr(l_string, n + 1);
end loop;
--队列
--读取队列开始序列
fhandle := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'r');
utl_file.get_line(fhandle, fp_buffer);
utl_file.fclose(fhandle);
--dbms_output.put_line('5====读取队列开始序列');
utl_file.put_line(logfile, '5====读取队列开始序列');
--更新队列
v_queueck := to_number(substr(fp_buffer, 2, 10));
v_queuename := 'q' || lpad((v_queueck + 1), 10, '0');
fhandle := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'w');
utl_file.put_line(fhandle, v_queuename);
utl_file.fclose(fhandle);
--dbms_output.put_line('6====更新队列队列序列');
utl_file.put_line(logfile, '6====更新队列队列序列');
----创建队列
fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'w');
utl_file.fclose(fhandle);
utl_file.put_line(logfile, '7====创建队列v_queuename:' || v_queuename);
--添加dml
fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'a');
for c_sql in (select sql_redo, operation
from v$logmnr_contents
where xid in (select *
from the (select cast(l_data as mytabletype)
from dual))
and operation in
('insert', 'delete', 'update', 'commit')
order by scn) loop
if c_sql.sql_redo is not null then
--判断如果是insert,不截取rowid
if c_sql.operation = 'insert' or c_sql.operation = 'commit' then
if c_sql.operation = 'insert' then
sql_txt := replace(c_sql.sql_redo,
'"' || v_tablename || '"',
'"' || v_tablename || '"' || v_dblink_name);
utl_file.put_line(fhandle, sql_txt);
--dbms_output.put_line('8====sql_redo1:' || sql_txt);
utl_file.put_line(logfile, '8====sql_redo1:' || sql_txt);
else
utl_file.put_line(fhandle, c_sql.sql_redo);
--dbms_output.put_line('8====sql_redo2:' || c_sql.sql_redo);
utl_file.put_line(logfile, '8====sql_redo2:' || c_sql.sql_redo);
end if;
else
sql_txt := substr(c_sql.sql_redo,
0,
instr(c_sql.sql_redo, 'rowid') - 5);
sql_txt := replace(sql_txt,
'"' || v_tablename || '"',
'"' || v_tablename || '"' || v_dblink_name);
utl_file.put_line(fhandle, sql_txt);
--dbms_output.put_line('8====sql_redo3:' || sql_txt);
utl_file.put_line(logfile, '8====sql_redo3:' || sql_txt);
end if;
end if;
end loop;
utl_file.fclose(fhandle);
--新增一行到 queue_arrary集合
fhandle := utl_file.fopen('syncdataconfig', 'queue_array.txt', 'a');
utl_file.put_line(fhandle, v_queuename);
utl_file.fclose(fhandle);
--dbms_output.put_line('11====新增到queue_arrary集合的队列名称:' || v_queuename);
utl_file.put_line(logfile, '10====新增到queue_arrary集合的队列名称:' || v_queuename);
--dbms_output.put_line('9====fclose');
utl_file.put_line(logfile, '11====fclose');
dbms_logmnr.end_logmnr;
--dbms_output.put_line('10====end_logmnr');
utl_file.put_line(logfile, '10====end_logmnr');
end loop;
v_scn := dbms_flashback.get_system_change_number();
fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'w');
utl_file.put_line(fhandle, v_scn);
utl_file.fclose(fhandle);
--dbms_output.put_line('11====v_scn:' || v_scn);
utl_file.put_line(logfile, '12====v_scn:' || v_scn);
utl_file.put_line(logfile, 'analyze redo...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
utl_file.fclose(logfile);
exception
when others then
dbms_logmnr.end_logmnr;
--dbms_output.put_line(sqlerrm||': '||sqlcode);
utl_file.put_line(logfile, 'analyze redo...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
utl_file.put_line(logfile, sqlerrm||': '||sqlcode);
utl_file.fclose(logfile);
return;
end datacapture_redo;
/
-----------------------------------------------------
数据应用——存储过程:datareplicat.prc
create or replace procedure datareplicat is
--作者:shl
--日期:2012-04-28
fhandle utl_file.file_type; --oracle文件类型
fp_buffer varchar2(4000); --文件输出缓存
v_queuename varchar(20); --queue_arrary.txt中的dml队列名称
v_send_ck varchar(20); --send_ck.txt的dml队列名称
v_run_sql varchar(4000); --dml
logfile utl_file.file_type; --oracle文件类型
logfile_buffer varchar2(4000); --文件输出缓存
begin
--打开发送日志
logfile := utl_file.fopen('logmnr', 'send_log.txt', 'a');
utl_file.put_line(logfile, '');
utl_file.put_line(logfile, 'datareplicat......start:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
--读取send_ck.txt,获取检查点
fhandle := utl_file.fopen('syncdataconfig', 'send_ck.txt', 'r');
begin
utl_file.get_line(fhandle, fp_buffer);
v_send_ck := fp_buffer;
--dbms_output.put_line();
utl_file.put_line(logfile, '1====1v_send_ck:' || v_send_ck);
exception
when no_data_found then
v_send_ck := null;
--dbms_output.put_line();
utl_file.put_line(logfile, '1====1v_send_ck is null');
end;
utl_file.fclose(fhandle);
--如果send_ck.txt为空,从queue_array.txt读取第一个队列名
if v_send_ck is null then
fhandle := utl_file.fopen('syncdataconfig', 'queue_array.txt', 'r');
begin
utl_file.get_line(fhandle, fp_buffer); --读取第一个队列名
v_queuename := fp_buffer;
--dbms_output.put_line();
utl_file.put_line(logfile, '2====1queue_array.txt head->v_queuename:' ||v_queuename);
v_send_ck := v_queuename;
exception
when no_data_found then
--queue_arrary.txt没有队列直接退出
--dbms_output.put_line();
utl_file.put_line(logfile,'2====2queue_array.txt->no queuename!');
--dbms_output.put_line();
utl_file.put_line(logfile,'2====2datareplicat......end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
return;
end;
utl_file.fclose(fhandle);
else
--如果send_ck.txt不为空,读取下一个队列
v_queuename := 'q' ||
lpad((to_number(substr(v_send_ck, 2, 10)) + 1), 10, '0');
--dbms_output.put_line();
utl_file.put_line(logfile,'2====3next queuename:' || v_queuename);
begin
fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'r');
utl_file.get_line(fhandle, fp_buffer);
utl_file.fclose(fhandle);
--dbms_output.put_line();
utl_file.put_line(logfile,'2====4queue_array.txt->' || v_queuename);
v_send_ck := v_queuename;
exception
when others then
--没有队列直接退出
--dbms_output.put_line();
utl_file.put_line(logfile,'2====5queue_array.txt->no new queuename or dml in queue file!');
--dbms_output.put_line();
utl_file.put_line(logfile,'2====5datareplicat......end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
utl_file.fclose(logfile);
return;
end;
end if;
--读取syncdata队列 v_queuename,执行队列q***********.txt命令
fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'r');
loop
begin
utl_file.get_line(fhandle, fp_buffer);
v_run_sql := replace(fp_buffer, ';', null);
if v_run_sql != 'commit' then
execute immediate v_run_sql;
--dbms_output.put_line();
utl_file.put_line(logfile,'3====1run_sql:' || v_run_sql);
else
commit;
--dbms_output.put_line();
utl_file.put_line(logfile,'3====2commit');
end if;
exception
when no_data_found then
exit;
end;
end loop;
utl_file.fclose(fhandle);
--执行成功后 send_ck.txt记录该检查点
fhandle := utl_file.fopen('syncdataconfig', 'send_ck.txt', 'w');
utl_file.put_line(fhandle, v_send_ck);
utl_file.fclose(fhandle);
--dbms_output.put_line();
utl_file.put_line(logfile,'4====v_send_ck changed:' || v_send_ck);
--dbms_output.put_line();
utl_file.put_line(logfile,'4====datareplicat......end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
exception
when others then
--dbms_output.put_line();
utl_file.put_line(logfile,'datareplicat......end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));
--dbms_output.put_line();
utl_file.put_line(logfile,sqlerrm);
rollback;
utl_file.put_line(logfile,'rollback');
utl_file.fclose(logfile);
return;
end datareplicat;
/