抽取oracle数据到mysql数据库的实现过程
程序员文章站
2022-03-11 07:57:33
在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款...
在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:
1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql
2、建立一个目录etl_dir
3、运行oracle数据库程序p_etl_ora_data,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql
4、导入mysql数据,文件内容如下
load data infile "alarm_hist_inc.csv" into table alarm_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "button_authority.csv" into table button_authority fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "c3_sms_hist_inc.csv" into table c3_sms_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "datapermisson.csv" into table datapermisson fields terminated by "," enclosed by "^" lines terminated by "\r\n";
附:数据库脚本p_etl_ora_data
create or replace procedure p_etl_ora_data ( p_ora_dir varchar2, p_data_path varchar2 ) is type t_rec is record( tbn varchar2(40), whr varchar2(4000)); type t_tabs is table of t_rec; v_tabs t_tabs := t_tabs(); v_etl_dir varchar2(40) := p_ora_dir; v_load_file utl_file.file_type; procedure etl_data ( p_sql_stmt varchar2, p_data_path varchar2, p_tb_name varchar2 ) is begin declare v_var_col varchar2(32767); v_num_col number; v_date_col date; v_tmz timestamp; v_cols number; v_cols_desc dbms_sql.desc_tab; v_row_str varchar2(32767); v_col_str varchar2(32767); v_sql_id number; v_sql_ref sys_refcursor; v_exp_file utl_file.file_type; v_data_path varchar2(200); begin v_data_path := p_data_path; if regexp_substr(v_data_path, '\\$') is null then v_data_path := v_data_path || '\'; end if; v_data_path := replace(v_data_path, '\', '\\'); open v_sql_ref for p_sql_stmt; v_sql_id := dbms_sql.to_cursor_number(v_sql_ref); dbms_sql.describe_columns(v_sql_id, v_cols, v_cols_desc); for i in v_cols_desc.first .. v_cols_desc.last loop case when v_cols_desc(i).col_type in (1, 9, 96) then dbms_sql.define_column(v_sql_id, i, v_var_col, 32767); when v_cols_desc(i).col_type = 2 then dbms_sql.define_column(v_sql_id, i, v_num_col); when v_cols_desc(i).col_type = 12 then dbms_sql.define_column(v_sql_id, i, v_date_col); when v_cols_desc(i).col_type = 180 then dbms_sql.define_column(v_sql_id, i, v_tmz); end case; end loop; declare v_flush_over pls_integer := 1; v_file_over pls_integer := 1; v_file_no pls_integer := 1; v_file_name varchar2(200); v_line varchar2(400); begin while dbms_sql.fetch_rows(v_sql_id) > 0 loop if v_file_over = 1 then v_file_name := p_tb_name || '_' || v_file_no || '.csv'; v_exp_file := utl_file.fopen(v_etl_dir, v_file_name, open_mode => 'w', max_linesize => 32767); end if; v_row_str := ''; for i in 1 .. v_cols loop v_col_str := '\n'; begin case when v_cols_desc(i).col_type in (1, 9, 96) then dbms_sql.column_value(v_sql_id, i, v_var_col); if v_var_col is not null then v_col_str := '^' || v_var_col || '^'; end if; when v_cols_desc(i).col_type = 2 then dbms_sql.column_value(v_sql_id, i, v_num_col); if v_num_col is not null then v_col_str := v_num_col; end if; when v_cols_desc(i).col_type = 12 then dbms_sql.column_value(v_sql_id, i, v_date_col); if v_date_col is not null then v_col_str := '^' || to_char(v_date_col, 'yyyy-mm-dd hh24:mi:ss') || '^'; end if; when v_cols_desc(i).col_type in (180, 181, 231) then dbms_sql.column_value(v_sql_id, i, v_tmz); if v_tmz is not null then v_col_str := '^' || to_char(v_tmz, 'yyyy-mm-dd hh24:mi:ss.ff6') || '^'; end if; end case; if i = 1 then v_row_str := v_col_str; else v_row_str := v_row_str || ',' || v_col_str; end if; end; end loop; utl_file.put_line(v_exp_file, convert(v_row_str, 'utf8')); if v_file_over > 200000 /*每200000条记录就产生一个新的文件*/ then v_file_over := 1; v_flush_over := 1; v_file_no := v_file_no + 1; utl_file.fclose(v_exp_file); v_line := 'load data infile "' || v_data_path || v_file_name || '" into table ' || p_tb_name; v_line := v_line || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";'; utl_file.put_line(v_load_file, v_line); utl_file.fflush(v_load_file); continue; end if; v_file_over := v_file_over + 1; if v_flush_over > 2000 /*每2000条记录就刷新缓存,写到文件中 */ then utl_file.fflush(v_exp_file); v_flush_over := 1; else v_flush_over := v_flush_over + 1; end if; end loop; dbms_sql.close_cursor(v_sql_id); if utl_file.is_open(v_exp_file) then utl_file.fclose(v_exp_file); v_line := 'load data infile "' || v_data_path || v_file_name || '" into table ' || p_tb_name; v_line := v_line || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";'; utl_file.put_line(v_load_file, v_line); utl_file.fflush(v_load_file); end if; end; exception when others then if dbms_sql.is_open(v_sql_id) then dbms_sql.close_cursor(v_sql_id); end if; if utl_file.is_open(v_exp_file) then utl_file.fclose(v_exp_file); end if; dbms_output.put_line(sqlerrm); dbms_output.put_line(p_sql_stmt); end; end; begin begin execute immediate 'create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) '; exception when others then null; end; execute immediate 'truncate table mysql_etl_tbs'; declare v_ci pls_integer; v_cn varchar2(40); v_etl_cols varchar2(32767); v_tbn varchar2(30); v_etl_cfg varchar2(32767); v_cnf_file utl_file.file_type; v_from_pos pls_integer; begin v_cnf_file := utl_file.fopen(v_etl_dir, 'etl_tabs.cnf', 'r', 32767); loop utl_file.get_line(v_cnf_file, v_etl_cfg, 32767); v_from_pos := regexp_instr(v_etl_cfg, 'from', 1, 1, 0, 'i'); v_etl_cols := substr(v_etl_cfg, 1, v_from_pos - 1); v_etl_cols := regexp_substr(v_etl_cols, '(select)(.+)', 1, 1, 'i', 2); v_tbn := regexp_substr(v_etl_cfg, '(\s+from\s+)(\w+)(\s*)', 1, 1, 'i', 2); v_tbn := upper(v_tbn); v_tabs.extend(); v_tabs(v_tabs.last).tbn := v_tbn; v_tabs(v_tabs.last).whr := regexp_substr(v_etl_cfg, '\s+where .+', 1, 1, 'i'); v_ci := 1; loop v_cn := regexp_substr(v_etl_cols, '\s+', 1, v_ci); exit when v_cn is null; v_cn := upper(v_cn); execute immediate 'insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)' using v_tbn, v_cn, v_ci; commit; v_ci := v_ci + 1; end loop; end loop; exception when utl_file.invalid_path then dbms_output.put_line('指定的目录:etl_dir"' || '"无效!'); return; when utl_file.invalid_filename then dbms_output.put_line('指定的文件:" etl_tabs.cnf' || '"无效!'); return; when no_data_found then utl_file.fclose(v_cnf_file); when others then dbms_output.put_line(sqlerrm); return; end; declare v_cur_match sys_refcursor; v_sql_smt varchar2(32767); v_tn varchar2(40); v_cn varchar2(40); v_ci pls_integer; v_column_name varchar2(40); v_etl_cols varchar2(32767); v_line varchar2(4000); v_tbn varchar2(40); begin v_load_file := utl_file.fopen(v_etl_dir, 'load_data.sql', open_mode => 'w', max_linesize => 32767); for t_ix in v_tabs.first .. v_tabs.last loop v_sql_smt := 'select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn='':tbn:'' ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci'; v_tbn := v_tabs(t_ix).tbn; v_sql_smt := replace(v_sql_smt, ':tbn:', v_tbn); v_etl_cols := null; open v_cur_match for v_sql_smt; loop fetch v_cur_match into v_tn, v_cn, v_column_name, v_ci; exit when v_cur_match%notfound; if v_ci > 1 then v_etl_cols := v_etl_cols || ' , '; end if; if v_column_name is null then v_etl_cols := v_etl_cols || ' cast(null as number) ' || v_cn; else v_etl_cols := v_etl_cols || v_cn; end if; end loop; close v_cur_match; v_tbn := lower(v_tbn); v_sql_smt := 'select ' || v_etl_cols || ' from ' || v_tbn || v_tabs(t_ix).whr; etl_data(v_sql_smt, p_data_path, v_tbn); end loop; if utl_file.is_open(v_load_file) then utl_file.fclose(v_load_file); end if; end; end p_etl_ora_data;
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。如果你想了解更多相关内容请查看下面相关链接
上一篇: 生活类app排名:“58同城上榜”,第二改变了我们的出行方式
下一篇: 域用户配置文件