欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Oracle数据同步项目yugong

程序员文章站 2022-07-02 09:06:49
...
  • 概述
之前尝试了基于物化视图+java source的oracle数据同步方案,为了把物化视图的变化信息传递给java source发送给外部程序,需要使用触发器、存储过程/函数。触发器用于监控物化视图的数据变化,调用存储过程从而间接调用java source(存储过程可以指向一个java source)。该方案的缺点如下:
1、给每个同步表建立物化视图,会消耗存储资源
2、java source部分代码可能需要依赖第三方包,需要在数据库服务器上加载大量外部jar包
3、每个物化视图上都需要建立触发器,监控数据变化(看了yugong后,在想是否直接在物化视图日志上建立触发器,但相对就会繁琐,需要根据日志表记录从主表拿数据,然后传递给java source,最后删除日志)

  • yugong
最近看了阿里基于oracle的数据库迁移项目yugong,其也是基于物化视图进行的实现。差异在于:
1、虽然都是基于物化视图,但yugong项目仅使用物化视图日志,且创建物化视图日志时使用了参数PRIMARY KEY、SEQUENCE,这样日志中将包含主键列、操作序号。extractor会按照SEQUENCE$$顺序抽取变化数据,并把已抽取的变化数据从日志表中删除。而原来的方案使用的物化视图的快速刷新,commit后自动刷新到物化视图,日志被清空
2、yugong中数据抽取使用的是jdbc,即extractor部分(从源库抽取数据),根据日志表中的主键列从源表获取数据;而原方案使用触发器获取数据变化。
3、yugong的applier部分(更新到目标库),同样使用jdbc,直接把转化后的数据更新入目标库;而原方案利用java source把变化数据发送给外部程序处理。
4、yugong引入了Translator,用于异构数据转化;而原方案使用外部的consumer处理

yugong详细介绍,可以参考:
https://github.com/alibaba/yugong/wiki/AdminGuide
http://blog.csdn.net/sunnylinner/article/details/52064637
物化视图详细介绍,可以参考:
http://www.cnblogs.com/linjiqin/archive/2012/05/23/2514795.html

下面主要介绍下使用时遇到的问题:
1、SEQUENCE$$标识符无效,原因在于查询增量记录时以SEQUENCE$$进行排序,获取顺序操作记录。用于测试的表之前创建过物化视图日志,但是创建时未使用SEQUENCE
2、yugong.extractor.noupdate.thresold,需要注意该值的设置,小于等于0将一直处于增量状态。若大于0,处于追赶状态,执行增量次数超过该值将结束增量,释放资源给下一个表。需要与yugong.table.concurrent.size配合使用。比如:thresold=0、concurrent.size=1,同步两张表,只有一张表处于同步状态,另一张表处于等待状态。因为只有一个处理线程,而thresold=0线程一直得不到释放。同样,thresold=3、concurrent.size=5,还是同步两张表,同步线程不会因为并发线程数多而不释放。因此,如果需要持续同步大量表,就需要设置thresold=0、concurrent.size=n,n大于等于同步表数。n可能很大,也可以想办法让同步过的表再次加入同步队列。
3、大量表持续同步时开启并行模式,抽取、入目标库都使用多线程,因此应该独立部署。但前提与两个数据库都可直连
4、yugong每个表对应一个instance,负责表的迁移,包含extractor、translator、applier。extractor与translator、applier无法分离,独立部署,不适用目标库无法直连的情况

Clob字段类型无法正常同步的问题:Clob类型字段在数据抽取时被转换为String类型的值,但ColumnMeta.type没有改变,值与类型不匹配。在数据更新或插入目标库中时,ps.setObject(index,cv.getValue(),cv.getColumn().getType()),String类型无法转换为Clob类型,sql执行失败。解决方案:重新设置字段类型,与值保持一致,col.setType(Types.VARCHAR);
public abstract class AbstractOracleRecordExtractor extends AbstractRecordExtractor {
	
	protected ColumnValue getColumnValue(ResultSet rs, String encoding, ColumnMeta col) throws SQLException {
		if(){
			...
		}else if (YuGongUtils.isClobType(col.getType())) {
			value = rs.getString(col.getName());
			col.setType(Types.VARCHAR);
		}
		...
	}

}

Blob字段类型同样无法正常同步:和Clob同样的问题,获取数据时被转换为byte[],ps.setObject时值与类型不一致。

注意事项:最初考虑取值时不转换,直接取Blob,这样值与类型就一致了,但同样失败:表或视图不存在。原因在于Blob使用LOCATOR(定位器)实现,指向数据库中SQL BLOB,不能把A库中的BLOB作为值直接作用于B库中的BLOB。BLOB可参考:http://blog.csdn.net/terryzero/article/details/3939014

解决方案:在Applier中调用ps.setObject(index,cv.getValue(),cv.getColumn().getType())时,进行数据类型判断,当类型为Types.BLOB时,执行ps.setBinaryStream(index, new ByteArrayInputStream((byte[])cv.getValue()));
if(cv.getColumn().getType()==Types.BLOB){
	ps.setBinaryStream(index, new ByteArrayInputStream((byte[])cv.getValue()));
}else{
	ps.setObject(index, cv.getValue(), cv.getColumn().getType());
}

同样CLOB也可以用同样的解决方案:取值时仍然转换为String类型,占位符赋值时处理
if(cv.getColumn().getType()==Types.CLOB){
	ps.setCharacterStream(index, new StringReader((String)cv.getValue()));
}else{
	ps.setObject(index, cv.getValue(), cv.getColumn().getType());
}

java.lang.AbstractMethodError:oracle.jdbc.driver.T4CPreparedStatement.setBlob(ILjava/io/InputStream:Oracle驱动版本的问题,数据库驱动改ojdbc6.jar  即可

全局schema问题
1、源库:不指定表时,默认取当前连接schema下的所有表;指定表时,若不指定schema,有可能取到多个schema(一个表存在多个schema中),这种情况需要指定schema。源码详见:TableMetaGenerator.getTableMetasWithoutColumn
2、目标库:默认使用源库的schema,当源库和目标库不一致时,需要增加全局schema转换器,实现如下:
  • 增加配置项:yugong.applier.table.schema
  • 增加全局schema转换类:SchemaDataTranslator,代码如下:
public class SchemaDataTranslator extends AbstractDataTranslator implements DataTranslator {
	
	private String tableSchema;
	
	public SchemaDataTranslator(String tableSchema){
		this.tableSchema=tableSchema;
	}
	
	public String translatorSchema() {
        return tableSchema;
    }

	 public List<Record> translator(List<Record> records) {
		 for (Record record : records) {
			 String schema = translatorSchema();
            if (schema != null) {
                record.setSchemaName(schema);
            }
        }
        return records;
	}
}
  • 初始化instance时引入Translator:yugong.applier.table.schema,修改YuGongController.buildTranslator代码如下:
private DataTranslator buildTranslator(String name) throws Exception {
        String tableName = YuGongUtils.toPascalCase(name);
        String translatorName = tableName + "DataTranslator";
        String packageName = DataTranslator.class.getPackage().getName();
        Class clazz = null;
        try {
            clazz = Class.forName(packageName + "." + translatorName);
        } catch (ClassNotFoundException e) {
            File file = new File(translatorDir, translatorName + ".java");
            if (!file.exists()) {
                // 兼容下表名
                file = new File(translatorDir, tableName + ".java");
                if (!file.exists()) {
                	String targetSchema=config.getString("yugong.applier.table.schema", null);
                	if(StringUtils.isNotBlank(targetSchema)){
                		return new SchemaDataTranslator(targetSchema);
                	}
                    return null;
                }
            }

            String javaSource = StringUtils.join(IOUtils.readLines(new FileInputStream(file)), "\n");
            clazz = compiler.compile(javaSource);
        }

        return (DataTranslator) clazz.newInstance();
    }
相关标签: oracle