Oracle数据同步项目yugong
程序员文章站
2022-07-02 09:06:49
...
- 概述
1、给每个同步表建立物化视图,会消耗存储资源
2、java source部分代码可能需要依赖第三方包,需要在数据库服务器上加载大量外部jar包
3、每个物化视图上都需要建立触发器,监控数据变化(看了yugong后,在想是否直接在物化视图日志上建立触发器,但相对就会繁琐,需要根据日志表记录从主表拿数据,然后传递给java source,最后删除日志)
- yugong
1、虽然都是基于物化视图,但yugong项目仅使用物化视图日志,且创建物化视图日志时使用了参数PRIMARY KEY、SEQUENCE,这样日志中将包含主键列、操作序号。extractor会按照SEQUENCE$$顺序抽取变化数据,并把已抽取的变化数据从日志表中删除。而原来的方案使用的物化视图的快速刷新,commit后自动刷新到物化视图,日志被清空
2、yugong中数据抽取使用的是jdbc,即extractor部分(从源库抽取数据),根据日志表中的主键列从源表获取数据;而原方案使用触发器获取数据变化。
3、yugong的applier部分(更新到目标库),同样使用jdbc,直接把转化后的数据更新入目标库;而原方案利用java source把变化数据发送给外部程序处理。
4、yugong引入了Translator,用于异构数据转化;而原方案使用外部的consumer处理
yugong详细介绍,可以参考:
https://github.com/alibaba/yugong/wiki/AdminGuide
http://blog.csdn.net/sunnylinner/article/details/52064637
物化视图详细介绍,可以参考:
http://www.cnblogs.com/linjiqin/archive/2012/05/23/2514795.html
下面主要介绍下使用时遇到的问题:
1、SEQUENCE$$标识符无效,原因在于查询增量记录时以SEQUENCE$$进行排序,获取顺序操作记录。用于测试的表之前创建过物化视图日志,但是创建时未使用SEQUENCE
2、yugong.extractor.noupdate.thresold,需要注意该值的设置,小于等于0将一直处于增量状态。若大于0,处于追赶状态,执行增量次数超过该值将结束增量,释放资源给下一个表。需要与yugong.table.concurrent.size配合使用。比如:thresold=0、concurrent.size=1,同步两张表,只有一张表处于同步状态,另一张表处于等待状态。因为只有一个处理线程,而thresold=0线程一直得不到释放。同样,thresold=3、concurrent.size=5,还是同步两张表,同步线程不会因为并发线程数多而不释放。因此,如果需要持续同步大量表,就需要设置thresold=0、concurrent.size=n,n大于等于同步表数。n可能很大,也可以想办法让同步过的表再次加入同步队列。
3、大量表持续同步时开启并行模式,抽取、入目标库都使用多线程,因此应该独立部署。但前提与两个数据库都可直连
4、yugong每个表对应一个instance,负责表的迁移,包含extractor、translator、applier。extractor与translator、applier无法分离,独立部署,不适用目标库无法直连的情况
Clob字段类型无法正常同步的问题:Clob类型字段在数据抽取时被转换为String类型的值,但ColumnMeta.type没有改变,值与类型不匹配。在数据更新或插入目标库中时,ps.setObject(index,cv.getValue(),cv.getColumn().getType()),String类型无法转换为Clob类型,sql执行失败。解决方案:重新设置字段类型,与值保持一致,col.setType(Types.VARCHAR);
public abstract class AbstractOracleRecordExtractor extends AbstractRecordExtractor { protected ColumnValue getColumnValue(ResultSet rs, String encoding, ColumnMeta col) throws SQLException { if(){ ... }else if (YuGongUtils.isClobType(col.getType())) { value = rs.getString(col.getName()); col.setType(Types.VARCHAR); } ... } }
Blob字段类型同样无法正常同步:和Clob同样的问题,获取数据时被转换为byte[],ps.setObject时值与类型不一致。
注意事项:最初考虑取值时不转换,直接取Blob,这样值与类型就一致了,但同样失败:表或视图不存在。原因在于Blob使用LOCATOR(定位器)实现,指向数据库中SQL BLOB,不能把A库中的BLOB作为值直接作用于B库中的BLOB。BLOB可参考:http://blog.csdn.net/terryzero/article/details/3939014
解决方案:在Applier中调用ps.setObject(index,cv.getValue(),cv.getColumn().getType())时,进行数据类型判断,当类型为Types.BLOB时,执行ps.setBinaryStream(index, new ByteArrayInputStream((byte[])cv.getValue()));
if(cv.getColumn().getType()==Types.BLOB){ ps.setBinaryStream(index, new ByteArrayInputStream((byte[])cv.getValue())); }else{ ps.setObject(index, cv.getValue(), cv.getColumn().getType()); }
同样CLOB也可以用同样的解决方案:取值时仍然转换为String类型,占位符赋值时处理
if(cv.getColumn().getType()==Types.CLOB){ ps.setCharacterStream(index, new StringReader((String)cv.getValue())); }else{ ps.setObject(index, cv.getValue(), cv.getColumn().getType()); }
java.lang.AbstractMethodError:oracle.jdbc.driver.T4CPreparedStatement.setBlob(ILjava/io/InputStream:Oracle驱动版本的问题,数据库驱动改ojdbc6.jar 即可
全局schema问题:
1、源库:不指定表时,默认取当前连接schema下的所有表;指定表时,若不指定schema,有可能取到多个schema(一个表存在多个schema中),这种情况需要指定schema。源码详见:TableMetaGenerator.getTableMetasWithoutColumn
2、目标库:默认使用源库的schema,当源库和目标库不一致时,需要增加全局schema转换器,实现如下:
- 增加配置项:yugong.applier.table.schema
- 增加全局schema转换类:SchemaDataTranslator,代码如下:
public class SchemaDataTranslator extends AbstractDataTranslator implements DataTranslator { private String tableSchema; public SchemaDataTranslator(String tableSchema){ this.tableSchema=tableSchema; } public String translatorSchema() { return tableSchema; } public List<Record> translator(List<Record> records) { for (Record record : records) { String schema = translatorSchema(); if (schema != null) { record.setSchemaName(schema); } } return records; } }
- 初始化instance时引入Translator:yugong.applier.table.schema,修改YuGongController.buildTranslator代码如下:
private DataTranslator buildTranslator(String name) throws Exception { String tableName = YuGongUtils.toPascalCase(name); String translatorName = tableName + "DataTranslator"; String packageName = DataTranslator.class.getPackage().getName(); Class clazz = null; try { clazz = Class.forName(packageName + "." + translatorName); } catch (ClassNotFoundException e) { File file = new File(translatorDir, translatorName + ".java"); if (!file.exists()) { // 兼容下表名 file = new File(translatorDir, tableName + ".java"); if (!file.exists()) { String targetSchema=config.getString("yugong.applier.table.schema", null); if(StringUtils.isNotBlank(targetSchema)){ return new SchemaDataTranslator(targetSchema); } return null; } } String javaSource = StringUtils.join(IOUtils.readLines(new FileInputStream(file)), "\n"); clazz = compiler.compile(javaSource); } return (DataTranslator) clazz.newInstance(); }
上一篇: maven-dependency-plugin插件
下一篇: 基于oracle的增量数据采集实现总结
推荐阅读
-
Oracle在线及重做日志分析数据同步实例
-
MySQL/RDS数据如何同步到MaxCompute之实践讲解 cmysqlpostgresql项目管理
-
eclipse Java web项目数据库由oracle更改为mysql中遇到的问题(使用JPA注解)附上修改过程
-
跨Oracle数据库如何实现表级别的实时同步
-
java-Web项目部署步骤之Oracle数据库导入.dmp文件数据
-
Oracle数据同步项目yugong
-
Oracle数据同步项目yugong
-
Oracle数据同步解决方案之databus
-
Oracle数据同步解决方案之databus
-
oracle 触发器,当一个表更新或插入时将数据同步至另个库中的某个表中