欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Oracle在线及重做日志分析数据同步实例

程序员文章站 2022-10-30 19:37:09
sl_syncdata实时同步程序-稳定版 sl_syncdata配置: --双向复制数据表实时同步 支持断点续传 -- 一、初始环境配置 --安装logminer -- @d:\oracle\pr...

sl_syncdata实时同步程序-稳定版

sl_syncdata配置:

--双向复制数据表实时同步 支持断点续传

-- 一、初始环境配置

--安装logminer

-- @d:\oracle\product\10.2.0\db_1\rdbms\admin\dbmslm.sql 

-- @d:\oracle\product\10.2.0\db_1\rdbms\admin\dbmslmd.sql 

--设置两端

--alter database archive;

--alter database force logging;

--alter database add supplemental  log data;--不开启的话只能看到以sys用户登陆下的数据更新

--alter database add supplemental log data (primary key,unique,foreign key) columns;

alter session set nls_date_format='yyyy-mm-dd hh24:mi:ss';

--1 日志目录

create directory logmnr as 'd:\oracle\product\10.2.0\logmnr';

create directory syncdataconfig as 'd:\oracle\product\10.2.0\logmnr\config';

--2 dml队列目录

create directory syncdata as 'd:\oracle\product\10.2.0\logmnr\syncdata';

--3 scn标志位初始化

-- 实例化启动进程时的scn到d:\oracle\product\10.2.0\logmnr\scn.txt, 标志为全局 g_scn

declare

 fhandle utl_file.file_type;

 v_scn number(30);

begin

  v_scn:=dbms_flashback.get_system_change_number();

  fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'w');

  utl_file.put_line(fhandle , v_scn);

  utl_file.fclose(fhandle);

end;

/

-- 4 archive log 标志位初始化

declare

 fhandle utl_file.file_type;

 v_recid v$archived_log.recid%type;

begin

    select max(recid) into v_recid  from v$archived_log order by 1;

    fhandle := utl_file.fopen ('syncdataconfig','archivelog_ck.txt', 'w');

    utl_file.put_line(fhandle ,to_char(v_recid));--初始化捕获进程前存在的归档不传送 

    utl_file.fclose(fhandle);

    exception 

     when others then return;

end;

/

-- 5 dml队列初始化

declare

 fhandle utl_file.file_type;

begin

  fhandle := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'w');

  utl_file.put_line(fhandle , 'q0000000000'); -- 初始化队列序列标志

  utl_file.fclose(fhandle);

end;

/

-- 7 发送队列检查点标志初始化

declare

 fhandle utl_file.file_type;

begin

  fhandle := utl_file.fopen('syncdataconfig', 'send_ck.txt', 'w');

  utl_file.fclose(fhandle);

end;

/

-- 8 xid事务标识集合初始化 

declare

 fhandle utl_file.file_type;

begin

  fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'w');

  utl_file.fclose(fhandle);

end;

/

-- 9 所有队列序号集合初始化 

declare

 fhandle utl_file.file_type;

begin

  fhandle := utl_file.fopen('syncdataconfig', 'queue_array.txt', 'w');

  utl_file.fclose(fhandle);

end;

/

-- 10 捕获日志初始化

declare

 fhandle utl_file.file_type;

begin

  fhandle := utl_file.fopen('logmnr', 'capture_log.txt', 'w');

  utl_file.fclose(fhandle);

end;

/

-- 11 发送日志初始化

declare

 fhandle utl_file.file_type;

begin

  fhandle := utl_file.fopen('logmnr', 'send_log.txt', 'w');

  utl_file.fclose(fhandle);

end;

/

/*-- 9 数据库对照字典初始化

begin

 dbms_logmnr_d.build(dictionary_filename=>'rac1emz_dict.ora',

 dictionary_location=>'d:\oracle\product\10.2.0\logmnr\config');

end;

/*/

--10 初始化自定义类型

create or replace type mytabletype as table of varchar2 (255);

-- 二、进程

--1 源数据库捕获进程

--2 源数据库传播进程 --检查点记录,支持断点续传

--三、创建源数据库捕获进程任务调度

/* 创建可执行程序 */

begin

    dbms_scheduler.create_program(

        program_name        => 'sys.proc_datacapture_archivelog',

        program_action      => 'sys.datacapture_archivelog',

        program_type        => 'stored_procedure',

        number_of_arguments => 3,

        comments            => '数据同步程序',

        enabled             => false

    );

end;

/

/* 设置可执行程序的输入参数 */

begin

    dbms_scheduler.define_program_argument(

        program_name      => 'sys.proc_datacapture_archivelog',

        argument_position => 1,

        argument_type     => 'varchar2',

        default_value     => ''

    );

    dbms_scheduler.define_program_argument(

        program_name      => 'sys.proc_datacapture_archivelog',

        argument_position => 2,

        argument_type     => 'varchar2',

        default_value     => ''

    );

    dbms_scheduler.define_program_argument(

        program_name      => 'sys.proc_datacapture_archivelog',

        argument_position => 3,

        argument_type     => 'varchar2',

        default_value     => ''

    );

end;

/

/* 创建调度表 */

begin

    dbms_scheduler.create_schedule(

        schedule_name   => 'sys.sch_datacapture_archivelog',

        repeat_interval => 'freq=secondly;interval=5',

        start_date      => sysdate,

        comments        => '数据同步调度'

    );

end;

/

/* 创建作业 */

begin

    dbms_scheduler.create_job( 

        job_name      => 'sys.job_demosync',     

        program_name  => 'sys.proc_datacapture_archivelog',

        schedule_name => 'sys.sch_datacapture_archivelog',

        job_class     => 'default_job_class',            

        comments      => '数据同步作业',

        auto_drop     => false,

        enabled       => false

    );

end;

/

/* 启动可执行程序 */

exec dbms_scheduler.enable('proc_datacapture_archivelog');

/* 启动作业 */

exec dbms_scheduler.enable('job_demosync');

/* 设置不同的作业参数 */

begin

    dbms_scheduler.set_job_argument_value(

        job_name          => 'sys.job_demosync',

        argument_position => 1,

        argument_value    => 'test'

    );

    dbms_scheduler.set_job_argument_value(

        job_name          => 'sys.job_demosync',

        argument_position => 2,

        argument_value    => 'demo'

    );

    dbms_scheduler.set_job_argument_value(

        job_name          => 'sys.job_demosync',

        argument_position => 3,

        argument_value    => 'rac2emz.test'

    );

end;

/

----job管理-----------------------------------------------------------------------

/* 禁用job */

exec dbms_scheduler.disable('job_demosync');

/* 执行job */

exec dbms_scheduler.run_job('job_demosync'); 

/* 停止job */

exec dbms_scheduler.stop_job('job_demosync');

/* 删除job */

exec dbms_scheduler.drop_job('job_demosync');

exec dbms_scheduler.drop_schedule('sys.sch_datacapture_archivelog');

exec dbms_scheduler.drop_program('sys.proc_datacapture_archivelog');

------------------------------------------------

--三、创建源数据库发送进程任务调度

/* 创建可执行程序 */

begin

    dbms_scheduler.create_program(

        program_name        => 'sys.proc_datareplicat',

        program_action      => 'sys.datareplicat',

        program_type        => 'stored_procedure',

        number_of_arguments => 0,

        comments            => '数据同步程序-发送进程',

        enabled             => false

    );

end;

/

/* 创建调度表 */

begin

    dbms_scheduler.create_schedule(

        schedule_name   => 'sys.sch_datareplicat',

        repeat_interval => 'freq=secondly;interval=2',

        start_date      => sysdate,

        comments        => '数据同步调度-发送进程'

    );

end;

/

/* 创建作业 */

begin

    dbms_scheduler.create_job( 

        job_name      => 'sys.job_demosync2',     

        program_name  => 'sys.proc_datareplicat',

        schedule_name => 'sys.sch_datareplicat',

        job_class     => 'default_job_class',            

        comments      => '数据同步作业-发送进程',

        auto_drop     => false,

        enabled       => false

    );

end;

/

/* 启动可执行程序 */

exec dbms_scheduler.enable('proc_datareplicat');

/* 启动作业 */

exec dbms_scheduler.enable('job_demosync2');

----job管理-----------------------------------------------------------------------

/* 禁用job */

exec dbms_scheduler.disable('job_demosync2');

/* 执行job */

exec dbms_scheduler.run_job('job_demosync2'); 

/* 停止job */

exec dbms_scheduler.stop_job('job_demosync2');

/* 删除job */

exec dbms_scheduler.drop_job('job_demosync2');

exec dbms_scheduler.drop_schedule('sys.sch_datareplicat');

exec dbms_scheduler.drop_program('sys.proc_datareplicat');

--查看状态

select t.job_name,t.repeat_interval,t.state,t.start_date,t.last_start_date,t.next_run_date,t.comments 

from dba_scheduler_jobs t

where t.job_name in('job_demosync','job_demosync2');

------------------------------------------------------

抓取归档日志——存储过程:datacapture_archivelog.prc

create or replace procedure datacapture_archivelog(tableowner         varchar2,

                                                   tablename          varchar2,

                                                   database_link_name varchar2) is

--作者:shl

--日期:2012-04-28

  v_scn         number(30);

  fhandle       utl_file.file_type; --oracle文件类型

  fp_buffer     varchar2(4000); --文件输出缓存

  logfile        utl_file.file_type; --oracle文件类型

  logfile_buffer varchar2(4000); --文件输出缓存

  v_recid       number(10); --表archive log 序号

  v_recidck     number(10); --archivelog_ck 序号

  sql_txt       varchar2(4000); --待执行的 sql_redo

  v_dblink_name varchar2(20) := '@' || database_link_name; --目的数据库联接

  sqlwhere      varchar2(1000); --待执行的 sql_redo 条件语句

  v_tableowner  varchar2(20); --待同步的用户

  v_tablename   varchar2(20); --待同步的表

  v_queueck     number(10); --队列序列scn

  v_queuename   varchar(20); --dml队列名称

  ischeckredo   number(1) := 0; --是已经否抽取redo日志

  v_sqlcommitnum number(5) := 10000; --已经提交的事务dml 分批提交给目的数据库

  v_sqlnum    number := 0; --已经提交的事务dml当前个数

  l_string long;

  l_data   mytabletype := mytabletype();

  n        number;

  isddl      number(1) := 0; --是否是dll操作 0不是 1是

  iscommited number(1) := 0; --是否已经commit操作 0不是 1是

  xid_str varchar2(8000); --事务序号数组

  j       number := 0; --tb_dml_xidsqn序列下标

  type tb_xid_type is table of v$logmnr_contents.xid%type index by binary_integer; --定义待排除dll

  tb_ddl_xid   tb_xid_type; --定义待排除dll表

  tb_dml_xid   tb_xid_type; --定义待排除dll表

  duplicat_xid number(1) := 0; -- 0  不重复 1 重复

  cursor c_ddl is

    select distinct xid

      from v$logmnr_contents

     where seg_name = v_tablename

       and operation = 'ddl';

begin

--打开捕获日志

  logfile := utl_file.fopen('logmnr', 'capture_log.txt', 'a');

  --初始化schema

  v_tableowner := tableowner;

  v_tablename  := tablename;

  --捕获归档日志

  --dbms_output.put_line();

  utl_file.put_line(logfile, '');

  utl_file.put_line(logfile, 'analyze archivelog...start:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

  fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'r');

  utl_file.get_line(fhandle, fp_buffer);

  v_scn := fp_buffer;

  utl_file.fclose(fhandle);

  --dbms_output.put_line();

  utl_file.put_line(logfile, '0====v_scn:' || v_scn);

  select max(recid) into v_recid from v$archived_log order by 1;

  /*exception

  when no_data_found then

    return;*/

  --dbms_output.put_line();

  utl_file.put_line(logfile, '1====1v_recid:' || v_recid);

  fhandle := utl_file.fopen('syncdataconfig', 'archivelog_ck.txt', 'r');

  utl_file.get_line(fhandle, fp_buffer);

  v_recidck := fp_buffer;

  utl_file.fclose(fhandle);

  --dbms_output.put_line();

  utl_file.put_line(logfile, '1====2v_recid_ck:' || v_recidck);

  if trim(v_recid) != trim(nvl(v_recidck, 0)) then

    --dbms_output.put_line();

    utl_file.put_line(logfile, '2====1存在未分析的archive log');

  else

    if ischeckredo = 0 then

      utl_file.put_line(logfile, '2====1 no new archive log');

      utl_file.put_line(logfile, '2====analyze archivelog...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

      utl_file.fclose(logfile);

      sys.datacapture_redo(tableowner, tablename, database_link_name);

      ischeckredo := 1;

      return;

    end if;

  end if;

  --一次加载从archive_ck归档检查点到最新归档,及 当前 redo

  for rs in (select name, recid from v$archived_log where recid >= v_recidck) loop

    dbms_logmnr.add_logfile(options     => dbms_logmnr.addfile,

                            logfilename => rs.name);

    --dbms_output.put_line();

    utl_file.put_line(logfile, '2====2 archive logfilename:' || rs.name);

  end loop;

  --dbms_output.put_line();

  utl_file.put_line(logfile, '2====3补充redo log');

  for rs in (select f.member

               from v$logfile f, v$log l

              where f.group# = l.group#

                and l.status = 'current') loop

    dbms_logmnr.add_logfile(options     => dbms_logmnr.addfile,

                            logfilename => rs.member);

    --dbms_output.put_line();

    utl_file.put_line(logfile, '2====3 redo logfilename:' || rs.member);

  end loop;

  dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog);

  --dbms_logmnr.start_logmnr(dictfilename => 'd:\oracle\product\10.2.0\logmnr\rac1emz_testdict.ora');

  --待排除dll

  open c_ddl;

  fetch c_ddl bulk collect

    into tb_ddl_xid;

  close c_ddl;

  --待查询的dml

  for c_tranxid in (select distinct xid

                      from v$logmnr_contents

                     where seg_name = v_tablename) loop

    isddl        := 0;

    duplicat_xid := 0;

    iscommited   := 0;

    --判断是否dll操作

    for i in 1 .. tb_ddl_xid.count loop

      if c_tranxid.xid = tb_ddl_xid(i) then

        isddl := 1;

        --dbms_output.put_line();

        utl_file.put_line(logfile, '3==1 ddl xid:' || c_tranxid.xid);

      end if;

    end loop;

    --不是dll,添加dml xid

    if isddl = 0 then

      --是否已经commit

      for c_commitxid in (select xid

                            from v$logmnr_contents

                           where xid = c_tranxid.xid

                             and operation = 'commit') loop

        iscommited := 1;

        --dbms_output.put_line();

        utl_file.put_line(logfile, '3==1 commited xid:' || c_commitxid.xid);

      end loop;

      if iscommited = 1 then

        --判断xid事务标识是否已经存在,如果存在不添加,防止重复分析xid

        fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'r');

        loop

          begin

            utl_file.get_line(fhandle, fp_buffer);

            if rtrim(ltrim(fp_buffer)) = c_tranxid.xid then

              duplicat_xid := 1;

              --dbms_output.put_line(); --重复xid

              utl_file.put_line(logfile, '3====2 已分析过xid :' || c_tranxid.xid);

            end if;

          exception

            when no_data_found then

              exit;

          end;

        end loop;

        --判断没有重复的插入表tb_dml_xid

        if duplicat_xid = 0 then

          tb_dml_xid(j) := c_tranxid.xid;

          j := j + 1;

        end if;

        utl_file.fclose(fhandle);

      end if;

    end if;

  end loop;

  --没有dml  tran 退出程序

  --dbms_output.put_line();

  utl_file.put_line(logfile, '3====3 dml  tran count :' || tb_dml_xid.count);

  if tb_dml_xid.count = 0 then

    dbms_logmnr.end_logmnr;

  else

    --打开xid事务标识集合,准备新增xid事务标识

    fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'a');

    --dbms_output.put_line();

    utl_file.put_line(logfile, '5====打开xid事务标识集合');

    --创建xid_str字符串

    --dbms_output.put_line();

    utl_file.put_line(logfile, '5====创建xid_str字符串');

    for i in 1 .. tb_dml_xid.count loop

      if tb_dml_xid.count != 1 then

        if i = 1 then

          --新增入一行xid事务标识

          utl_file.put_line(fhandle, tb_dml_xid(i - 1));

          --dbms_output.put_line();

          utl_file.put_line(logfile, 'a1_' || tb_dml_xid(i - 1));

          xid_str := tb_dml_xid(i - 1);

        else

          --新增入一行xid事务标识

          utl_file.put_line(fhandle, tb_dml_xid(i - 1));

          utl_file.put_line(logfile, 'a2_' || tb_dml_xid(i - 1));

          xid_str := xid_str || ',' || tb_dml_xid(i - 1);

        end if;

      else

        --新增入一行xid事务标识

        utl_file.put_line(fhandle, tb_dml_xid(i - 1));

        --dbms_output.put_line('a3_' || tb_dml_xid(i - 1));

        utl_file.put_line(logfile, 'a3_' || tb_dml_xid(i - 1));

        xid_str := tb_dml_xid(i - 1);

      end if;

    end loop;

    utl_file.fclose(fhandle);

    --dbms_output.put_line();

    utl_file.put_line(logfile, '5====关闭xid事务标识集合');

    --dbms_output.put_line();

    utl_file.put_line(logfile, '5====xid_str字符串: ' || xid_str);

  

    l_string := xid_str || ',';

    loop

      exit when l_string is null;

      n := instr(l_string, ',');

      l_data.extend;

      l_data(l_data.count) := ltrim(rtrim(substr(l_string, 1, n - 1)));

      l_string := substr(l_string, n + 1);

    end loop;

  

    --队列

    --读取队列开始序列

    fhandle := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'r');

    utl_file.get_line(fhandle, fp_buffer);

    utl_file.fclose(fhandle);

    --dbms_output.put_line();

    utl_file.put_line(logfile, '6====读取队列开始序列');

  

    --更新队列

    v_queueck   := to_number(substr(fp_buffer, 2, 10));

    v_queuename := 'q' || lpad((v_queueck + 1), 10, '0');

    fhandle     := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'w');

    utl_file.put_line(fhandle, v_queuename);

    utl_file.fclose(fhandle);

    --dbms_output.put_line();

    utl_file.put_line(logfile, '7====更新队列队列序列');

  

    ----创建队列文件

    fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'w');

    utl_file.fclose(fhandle);

    utl_file.put_line(logfile, '8====创建队列v_queuename:' || v_queuename);

    --添加dml

    fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'a');

    for c_sql in (select sql_redo, operation

                    from v$logmnr_contents

                   where xid in (select *

                                   from the (select cast(l_data as mytabletype)

                                               from dual))

                     and operation in

                         ('insert', 'delete', 'update', 'commit')

                   order by scn) loop

      if c_sql.sql_redo is not null then

        --判断如果是insert,不截取rowid

        if c_sql.operation = 'insert' or c_sql.operation = 'commit' then

          if c_sql.operation = 'insert' then

            sql_txt := replace(c_sql.sql_redo,

                               '"' || v_tablename || '"',

                               '"' || v_tablename || '"' || v_dblink_name);

            utl_file.put_line(fhandle, sql_txt);

            utl_file.put_line(logfile, '9====sql_redo1:' || sql_txt);

          else

            utl_file.put_line(fhandle, c_sql.sql_redo);

            utl_file.put_line(logfile, '9====sql_redo2:' || c_sql.sql_redo);

          end if;

        else

          sql_txt := substr(c_sql.sql_redo,

                            0,

                            instr(c_sql.sql_redo, 'rowid') - 5);

          sql_txt := replace(sql_txt,

                             '"' || v_tablename || '"',

                             '"' || v_tablename || '"' || v_dblink_name);

          utl_file.put_line(fhandle, sql_txt);

          utl_file.put_line(logfile, '9====sql_redo3:' || sql_txt);

        end if;

      end if;

    end loop;

    utl_file.fclose(fhandle);

    utl_file.put_line(logfile, '10====');

  

    --新增一行到 queue_arrary集合

    fhandle := utl_file.fopen('syncdataconfig', 'queue_array.txt', 'a');

    utl_file.put_line(fhandle, v_queuename);

    utl_file.fclose(fhandle);

    --dbms_output.put_line('11====新增到queue_arrary集合的队列名称:' || v_queuename);

    utl_file.put_line(logfile, '11====新增到queue_arrary集合的队列名称:' || v_queuename);

  

    dbms_logmnr.end_logmnr;

    --dbms_output.put_line('9====');

    utl_file.put_line(logfile, '9====');

    v_scn   := dbms_flashback.get_system_change_number();

    fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'w');

    utl_file.put_line(fhandle, v_scn);

    utl_file.fclose(fhandle);

    --dbms_output.put_line('10====v_scn:' || v_scn);

    utl_file.put_line(logfile, '10====v_scn:' || v_scn);

  end if;

  --结束后变更检查点

  --归档检查点

  fhandle := utl_file.fopen('syncdataconfig', 'archivelog_ck.txt', 'w');

  utl_file.put_line(fhandle, v_recid);

  utl_file.fclose(fhandle);

  --dbms_output.put_line('11====v_recid_ck:' || v_recid);

  utl_file.put_line(logfile, '11====v_recid_ck:' || v_recid);

  v_scn   := dbms_flashback.get_system_change_number();

  fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'w');

  utl_file.put_line(fhandle, v_scn);

  utl_file.fclose(fhandle);

  --dbms_output.put_line('12====v_scn:' || v_scn);

  utl_file.put_line(logfile, '12====v_scn:' || v_scn);

  

  utl_file.put_line(logfile, '2====analyze archivelog...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

  utl_file.fclose(logfile);

  

  if ischeckredo = 0 then

    sys.datacapture_redo(tableowner, tablename, database_link_name);

  end if;

  

exception

  when others then

    dbms_logmnr.end_logmnr;

    utl_file.put_line(logfile, '2====analyze archivelog...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

    --dbms_output.put_line(sqlerrm);

    utl_file.put_line(logfile, '13====2sqlerrm: ' || sqlerrm);

    utl_file.fclose(logfile);

    return;

end datacapture_archivelog;

/

-----------------------------------------------------------------------------

抓取重做日志——存储过程:datacapture_redo.prc

create or replace procedure datacapture_redo(tableowner         varchar2,

                                             tablename          varchar2,

                                             database_link_name varchar2) is

--作者:shl

--日期:2012-04-28

  v_scn         number(30);

  fhandle       utl_file.file_type; --oracle文件类型

  fp_buffer     varchar2(4000); --文件输出缓存

  logfile        utl_file.file_type; --oracle文件类型

  logfile_buffer varchar2(4000); --文件输出缓存

  v_recid       number(10); --表archive log 序号

  v_recidck     number(10); --archivelog_ck 序号

  sql_txt       varchar2(4000); --待执行的 sql_redo

  v_dblink_name varchar2(20) := '@'||database_link_name; --目的数据库联接

  sqlwhere      varchar2(1000); --待执行的 sql_redo 条件语句

  v_tableowner  varchar2(20); --待同步的用户

  v_tablename   varchar2(20); --待同步的表

  v_queueck     number(10); --队列序列scn

  v_queuename   varchar(20); --dml队列名称

  v_sqlcommitnum number(5) := 10000; --已经提交的事务dml 分批提交给目的数据库

  v_sqlnum    number:= 0; --已经提交的事务dml当前个数

  l_string long;

  l_data   mytabletype := mytabletype();

  n        number;

  isddl      number(1) := 0; --是否是ddl操作 0不是 1是

  iscommited number(1) := 0; --是否已经commit操作 0不是 1是

  xid_str varchar2(8000); --事务序号数组

  j       number := 0; --tb_dml_xidsqn序列下标

  type tb_xid_type is table of v$logmnr_contents.xid%type index by binary_integer; --定义待排除dll

  tb_ddl_xid   tb_xid_type; --定义待排除dll表

  tb_dml_xid   tb_xid_type; --定义待排除dll表

  duplicat_xid number(1) := 0; -- 0  不重复 1 重复

  cursor c_ddl is

    select distinct xid

      from v$logmnr_contents

     where seg_name = v_tablename

       and operation = 'ddl';

begin

--打开捕获日志

  logfile := utl_file.fopen('logmnr', 'capture_log.txt', 'a');

  --初始化schema

  v_tableowner := tableowner;

  v_tablename  := tablename;

  --捕获在线日志

  --dbms_output.put_line('analyze redo...start');

  utl_file.put_line(logfile, '');

  utl_file.put_line(logfile, 'analyze redo...start :'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

  fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'r');

  utl_file.get_line(fhandle, fp_buffer);

  v_scn := fp_buffer;

  utl_file.fclose(fhandle);

  --dbms_output.put_line('1====v_scn:' || v_scn);

  utl_file.put_line(logfile, '1====v_scn:' || v_scn);

  

  for rs in (select f.member

               from v$logfile f, v$log l

              where f.group# = l.group#

                and l.status = 'current') loop

    dbms_logmnr.add_logfile(options     => dbms_logmnr.addfile,

                            logfilename => rs.member);

    dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog);

    --dbms_logmnr.start_logmnr(dictfilename => 'd:\oracle\product\10.2.0\logmnr\rac1emz_testdict.ora');

    --dbms_output.put_line('2====logfilename:' || rs.member);

    utl_file.put_line(logfile, '2====logfilename:' || rs.member);

  

    --待排除dll

    open c_ddl;

    fetch c_ddl bulk collect

      into tb_ddl_xid;

    close c_ddl;

  

    --待查询的dml

    for c_tranxid in (select distinct xid

                        from v$logmnr_contents

                       where seg_name = v_tablename) loop

      isddl:= 0;

      duplicat_xid := 0;

      iscommited   := 0;

      --判断是否ddl操作

      for i in 1 .. tb_ddl_xid.count loop

        if c_tranxid.xid = tb_ddl_xid(i) then

          isddl := 1;

          --dbms_output.put_line('3==1 ddl xid:'||c_tranxid.xid);

          utl_file.put_line(logfile, '3==1 ddl xid:'||c_tranxid.xid);

        end if;

      end loop;

      

      --不是dll,添加dml xid

      if isddl = 0 then

      --dbms_output.put_line('3==1 dml xid:'||c_tranxid.xid);

      utl_file.put_line(logfile, '3==1 dml xid:'||c_tranxid.xid);

      

        --是否已经commit

        for  c_commit in (select xid from v$logmnr_contents

                             where xid = c_tranxid.xid

                               and operation = 'commit') loop

            iscommited := 1;

            --dbms_output.put_line('3==1 commited xid:'||c_commit.xid);

            utl_file.put_line(logfile, '3==1 commited xid:'||c_commit.xid);

        end loop;

        

        if iscommited = 1 then

          --判断xid事务标识是否已经存在,如果存在不添加,防止重复分析xid

          fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'r');

          loop

            begin

              utl_file.get_line(fhandle, fp_buffer);

              if rtrim(ltrim(fp_buffer)) = c_tranxid.xid then

                duplicat_xid := 1;

                --dbms_output.put_line('3====2 已分析过xid :' || c_tranxid.xid); --重复xid

                utl_file.put_line(logfile, '3====2 已分析过xid :' || c_tranxid.xid);

              end if;

            exception

              when no_data_found then

                exit;

            end;

          end loop;

          --判断没有重复的插入表tb_dml_xid

          if duplicat_xid = 0 then

            tb_dml_xid(j) := c_tranxid.xid;

            j := j + 1;

          end if;

          utl_file.fclose(fhandle);

        end if;

      end if;

    end loop;

  

    --没有dml tran 退出程序

    --dbms_output.put_line('3====3 dml tran count :' || tb_dml_xid.count);

    utl_file.put_line(logfile, '3====3 dml tran count :' || tb_dml_xid.count);

    

    if tb_dml_xid.count = 0 then

      dbms_logmnr.end_logmnr;

      utl_file.put_line(logfile, 'analyze redo...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

      utl_file.fclose(logfile);

      return;

    end if;

  

    --打开xid事务标识集合,准备新增xid事务标识

    fhandle := utl_file.fopen('syncdataconfig', 'xid_array.txt', 'a');

    --dbms_output.put_line('4====打开xid事务标识集合');

    utl_file.put_line(logfile, '4====打开xid事务标识集合');

    --创建xid_str字符串

    --dbms_output.put_line('4====创建xid_str字符串');

    utl_file.put_line(logfile, '4====创建xid_str字符串');

    

    for i in 1 .. tb_dml_xid.count loop

      if tb_dml_xid.count != 1 then

        if i = 1 then

          --新增入一行xid事务标识

          utl_file.put_line(fhandle, tb_dml_xid(i - 1));

          --dbms_output.put_line('a1_' || tb_dml_xid(i - 1));

          utl_file.put_line(logfile, 'a1_' || tb_dml_xid(i - 1));

          xid_str := tb_dml_xid(i - 1);

        else

          --新增入一行xid事务标识

          utl_file.put_line(fhandle, tb_dml_xid(i - 1));

          --dbms_output.put_line('a2_' || tb_dml_xid(i - 1));

          utl_file.put_line(logfile, 'a2_' || tb_dml_xid(i - 1));

          xid_str := xid_str || ',' || tb_dml_xid(i - 1);

        end if;

      else

        --新增入一行xid事务标识

        utl_file.put_line(fhandle, tb_dml_xid(i - 1));

        --dbms_output.put_line('a3_' || tb_dml_xid(i - 1));

        utl_file.put_line(logfile, 'a3_' || tb_dml_xid(i - 1));

        xid_str := tb_dml_xid(i - 1);

      end if;

    end loop;

    utl_file.fclose(fhandle);

    --dbms_output.put_line('4====关闭xid事务标识集合');

    utl_file.put_line(logfile, '4====关闭xid事务标识集合');

    --dbms_output.put_line('4====xid_str字符串: ' || xid_str);

    utl_file.put_line(logfile, '4====xid_str字符串: ' || xid_str);

    l_string := xid_str || ',';

    loop

      exit when l_string is null;

      n := instr(l_string, ',');

      l_data.extend;

      l_data(l_data.count) := ltrim(rtrim(substr(l_string, 1, n - 1)));

      l_string := substr(l_string, n + 1);

    end loop;

  

    --队列

    --读取队列开始序列

    fhandle := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'r');

    utl_file.get_line(fhandle, fp_buffer);

    utl_file.fclose(fhandle);

    --dbms_output.put_line('5====读取队列开始序列');

    utl_file.put_line(logfile, '5====读取队列开始序列');

  

    --更新队列

    v_queueck   := to_number(substr(fp_buffer, 2, 10));

    v_queuename := 'q' || lpad((v_queueck + 1), 10, '0');

    fhandle     := utl_file.fopen('syncdataconfig', 'queue_ck.txt', 'w');

    utl_file.put_line(fhandle, v_queuename);

    utl_file.fclose(fhandle);

    --dbms_output.put_line('6====更新队列队列序列');

    utl_file.put_line(logfile, '6====更新队列队列序列');

  

    ----创建队列

    fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'w');

    utl_file.fclose(fhandle);

    utl_file.put_line(logfile, '7====创建队列v_queuename:' || v_queuename);

    --添加dml

    fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'a');

    for c_sql in (select sql_redo, operation

                    from v$logmnr_contents

                   where xid in (select *

                                   from the (select cast(l_data as mytabletype)

                                               from dual))

                     and operation in

                         ('insert', 'delete', 'update', 'commit')

                   order by scn) loop

      if c_sql.sql_redo is not null then

        --判断如果是insert,不截取rowid

        if c_sql.operation = 'insert' or c_sql.operation = 'commit' then

          if c_sql.operation = 'insert' then

            sql_txt := replace(c_sql.sql_redo,

                               '"' || v_tablename || '"',

                               '"' || v_tablename || '"' || v_dblink_name);

            utl_file.put_line(fhandle, sql_txt);

            --dbms_output.put_line('8====sql_redo1:' || sql_txt);

            utl_file.put_line(logfile, '8====sql_redo1:' || sql_txt);

          else

            utl_file.put_line(fhandle, c_sql.sql_redo);

            --dbms_output.put_line('8====sql_redo2:' || c_sql.sql_redo);

            utl_file.put_line(logfile, '8====sql_redo2:' || c_sql.sql_redo);

          end if;

        else

          sql_txt := substr(c_sql.sql_redo,

                            0,

                            instr(c_sql.sql_redo, 'rowid') - 5);

          sql_txt := replace(sql_txt,

                             '"' || v_tablename || '"',

                             '"' || v_tablename || '"' || v_dblink_name);

          utl_file.put_line(fhandle, sql_txt);

          --dbms_output.put_line('8====sql_redo3:' || sql_txt);

          utl_file.put_line(logfile, '8====sql_redo3:' || sql_txt);

        end if;

      end if;

    end loop;

    utl_file.fclose(fhandle);

  

    --新增一行到 queue_arrary集合

    fhandle := utl_file.fopen('syncdataconfig', 'queue_array.txt', 'a');

    utl_file.put_line(fhandle, v_queuename);

    utl_file.fclose(fhandle);

    --dbms_output.put_line('11====新增到queue_arrary集合的队列名称:' || v_queuename);

    utl_file.put_line(logfile, '10====新增到queue_arrary集合的队列名称:' || v_queuename);

  

    --dbms_output.put_line('9====fclose');

    utl_file.put_line(logfile, '11====fclose');

    dbms_logmnr.end_logmnr;

    --dbms_output.put_line('10====end_logmnr');

    utl_file.put_line(logfile, '10====end_logmnr');

  end loop;

  v_scn   := dbms_flashback.get_system_change_number();

  fhandle := utl_file.fopen('syncdataconfig', 'scn_ck.txt', 'w');

  utl_file.put_line(fhandle, v_scn);

  utl_file.fclose(fhandle);

  --dbms_output.put_line('11====v_scn:' || v_scn);

  utl_file.put_line(logfile, '12====v_scn:' || v_scn);

  

  utl_file.put_line(logfile, 'analyze redo...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

  utl_file.fclose(logfile);

  

exception

  when others then

    dbms_logmnr.end_logmnr;

    --dbms_output.put_line(sqlerrm||': '||sqlcode);

    utl_file.put_line(logfile, 'analyze redo...end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

    utl_file.put_line(logfile, sqlerrm||': '||sqlcode);

    utl_file.fclose(logfile);

    return;

  

end datacapture_redo;

/

-----------------------------------------------------

数据应用——存储过程:datareplicat.prc

create or replace procedure datareplicat is

--作者:shl

--日期:2012-04-28

  fhandle     utl_file.file_type; --oracle文件类型

  fp_buffer   varchar2(4000); --文件输出缓存

  v_queuename varchar(20); --queue_arrary.txt中的dml队列名称

  v_send_ck   varchar(20); --send_ck.txt的dml队列名称

  v_run_sql   varchar(4000); --dml

  logfile        utl_file.file_type; --oracle文件类型

  logfile_buffer varchar2(4000); --文件输出缓存

begin

  --打开发送日志

  logfile := utl_file.fopen('logmnr', 'send_log.txt', 'a');

  utl_file.put_line(logfile, '');

  utl_file.put_line(logfile, 'datareplicat......start:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

  --读取send_ck.txt,获取检查点

  fhandle := utl_file.fopen('syncdataconfig', 'send_ck.txt', 'r');

  begin

    utl_file.get_line(fhandle, fp_buffer);

    v_send_ck := fp_buffer;

    --dbms_output.put_line();

    utl_file.put_line(logfile, '1====1v_send_ck:' || v_send_ck);

  exception

    when no_data_found then

      v_send_ck := null;

      --dbms_output.put_line();

      utl_file.put_line(logfile, '1====1v_send_ck is null');

  end;

  utl_file.fclose(fhandle);

  --如果send_ck.txt为空,从queue_array.txt读取第一个队列名

  if v_send_ck is null then

    fhandle := utl_file.fopen('syncdataconfig', 'queue_array.txt', 'r');

    begin

      utl_file.get_line(fhandle, fp_buffer); --读取第一个队列名

      v_queuename := fp_buffer;

      --dbms_output.put_line();

      utl_file.put_line(logfile, '2====1queue_array.txt head->v_queuename:' ||v_queuename);

      v_send_ck := v_queuename;

    exception

      when no_data_found then

        --queue_arrary.txt没有队列直接退出

        --dbms_output.put_line();

        utl_file.put_line(logfile,'2====2queue_array.txt->no queuename!');

        --dbms_output.put_line();

        utl_file.put_line(logfile,'2====2datareplicat......end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

        return;

    end;

    utl_file.fclose(fhandle);

  else

    --如果send_ck.txt不为空,读取下一个队列

    v_queuename := 'q' ||

                   lpad((to_number(substr(v_send_ck, 2, 10)) + 1), 10, '0');

    --dbms_output.put_line();

    utl_file.put_line(logfile,'2====3next queuename:' || v_queuename);

    begin

      fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'r');

      utl_file.get_line(fhandle, fp_buffer);

      utl_file.fclose(fhandle);

      --dbms_output.put_line();

      utl_file.put_line(logfile,'2====4queue_array.txt->' || v_queuename);

      v_send_ck := v_queuename;

    exception

      when others then

        --没有队列直接退出

        --dbms_output.put_line();

        utl_file.put_line(logfile,'2====5queue_array.txt->no new queuename or dml in queue file!');

        --dbms_output.put_line();

        utl_file.put_line(logfile,'2====5datareplicat......end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

        utl_file.fclose(logfile);

        return;

    end;

  end if;

  --读取syncdata队列 v_queuename,执行队列q***********.txt命令

  fhandle := utl_file.fopen('syncdata', v_queuename || '.txt', 'r');

  loop

    begin

      utl_file.get_line(fhandle, fp_buffer);

      v_run_sql := replace(fp_buffer, ';', null);

      if v_run_sql != 'commit' then

        execute immediate v_run_sql;

        --dbms_output.put_line();

        utl_file.put_line(logfile,'3====1run_sql:' || v_run_sql);

      else

        commit;

        --dbms_output.put_line();

        utl_file.put_line(logfile,'3====2commit');

      end if;

    exception

      when no_data_found then

        exit;

    end;

  end loop;

  utl_file.fclose(fhandle);

  --执行成功后 send_ck.txt记录该检查点

  fhandle := utl_file.fopen('syncdataconfig', 'send_ck.txt', 'w');

  utl_file.put_line(fhandle, v_send_ck);

  utl_file.fclose(fhandle);

  --dbms_output.put_line();

  utl_file.put_line(logfile,'4====v_send_ck changed:' || v_send_ck);

  --dbms_output.put_line();

  utl_file.put_line(logfile,'4====datareplicat......end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

exception

  when others then

    --dbms_output.put_line();

    utl_file.put_line(logfile,'datareplicat......end:'||to_char(sysdate,'yyyy-mm-dd hh24:mi:ss'));

    --dbms_output.put_line();

    utl_file.put_line(logfile,sqlerrm);

    rollback;

    utl_file.put_line(logfile,'rollback');

    utl_file.fclose(logfile);

    return;

end datareplicat;

/