欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

kettle如何3秒内写入100万条数据到Redis

程序员文章站 2024-03-19 18:35:28
...

kettle如何3秒内写入100万条数据到Redis

实现结果

先来看下实现结果,如下图,本地写入100万数据,耗时2.3s,每秒44万。接下来说说如何实现:
kettle如何3秒内写入100万条数据到Redis
数据存储结构样例:
kettle如何3秒内写入100万条数据到Redis

生成记录

用于生成测试数据:
kettle如何3秒内写入100万条数据到Redis

增加序列

用于生成redis的key值
kettle如何3秒内写入100万条数据到Redis

Json输出

用于将原始数据封装为一个json,存储到redis中:
kettle如何3秒内写入100万条数据到Redis
json输出:字段页签,用于说明json中包含的字段信息:
kettle如何3秒内写入100万条数据到Redis

Java 写入redis缓存

主要使用到了Pipeline类,实现批量提交:
kettle如何3秒内写入100万条数据到Redis
详细代码如下:

// etl-java-redis
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;

private Jedis jedis=null;
private JedisPool pool=null;
Pipeline pipe = null;
int cache_size=10000; // 批量提交大小
int cur_size=0; // 当前数据缓存量

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
	if (first) {
		first = false;
		// connect to redis server
		String redis_ip = getVariable("redis.ip", "127.0.0.1");
		String redis_port = getVariable("redis.port", "6379");
		String redis_password = getVariable("redis.password", "");
		cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));
		logBasic(redis_ip+":"+redis_port);
		logBasic("redis_password:"+redis_password);

		// 连接池方式
		JedisPoolConfig config = new JedisPoolConfig();
		config.setMaxIdle(8);
		config.setMaxTotal(18);
		pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);
		jedis = pool.getResource();		
		jedis.select(1);// 切换数据库
		pipe = jedis.pipelined(); // 创建pipeline 对象
		
		logBasic("Server is running: " + jedis.ping());
	}

	Object[] r = getRow();
	if (r == null) {
		setOutputDone();
		
		pipe.sync();
		jedis.close();
		pool.close();
		return false;
	}

	// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
	// enough to handle any new fields you are creating in this step.
	r = createOutputRow(r, data.outputRowMeta.size());

	/*
	Redis数据存储(Redis-String)
	key : KEY
	value : JsonData
	*/
	String key = get(Fields.In, "id").getString(r);
	String value = get(Fields.In, "JsonData").getString(r);
	logDebug(key + "\t" + value);

	// 写入缓存
	pipe.set(key, value);
	cur_size++;
	if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交
		pipe.sync(); // 同步
		cur_size=0; // 复位
	}
			
	// Send the row on to the next step.
	putRow(data.outputRowMeta, r);

	return true;
}

命名参数

将可变参数存储到命名参数中,方便迁移:
kettle如何3秒内写入100万条数据到Redis

– 本文结束 –