计算机是学什么内容的/山西网络营销seo
kettle如何3秒内写入100万条数据到Redis
1.实现结果
先来看下实现结果,如下图,本地写入100万数据,耗时2.3s,每秒44万。接下来说说如何实现:
数据存储结构样例:
2.添加redis包放到kettle的lib目录
jedis-3.1.0.jar
3.生成记录步骤
用于生成测试数据:
4.增加序列步骤
用于生成redis的key值
5.Json输出
用于将原始数据封装为一个json,存储到redis中:
json输出:字段页签,用于说明json中包含的字段信息:
6.Java 写入redis缓存
主要使用到了Pipeline类,实现批量提交:
详细代码如下:
// etl-java-redis
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;private Jedis jedis=null;
private JedisPool pool=null;
Pipeline pipe = null;
int cache_size=10000; // 批量提交大小
int cur_size=0; // 当前数据缓存量public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {if (first) {first = false;// connect to redis serverString redis_ip = getVariable("redis.ip", "127.0.0.1");String redis_port = getVariable("redis.port", "6379");String redis_password = getVariable("redis.password", "");cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));logBasic(redis_ip+":"+redis_port);logBasic("redis_password:"+redis_password);// 连接池方式JedisPoolConfig config = new JedisPoolConfig();config.setMaxIdle(8);config.setMaxTotal(18);pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);jedis = pool.getResource(); jedis.select(1);// 切换数据库pipe = jedis.pipelined(); // 创建pipeline 对象logBasic("Server is running: " + jedis.ping());}Object[] r = getRow();if (r == null) {setOutputDone();pipe.sync();jedis.close();pool.close();return false;}// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large// enough to handle any new fields you are creating in this step.r = createOutputRow(r, data.outputRowMeta.size());/*Redis数据存储(Redis-String)key : KEYvalue : JsonData*/String key = get(Fields.In, "id").getString(r);String value = get(Fields.In, "JsonData").getString(r);logDebug(key + "\t" + value);// 写入缓存pipe.set(key, value);cur_size++;if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交pipe.sync(); // 同步cur_size=0; // 复位}// Send the row on to the next step.putRow(data.outputRowMeta, r);return true;
}
7.命名参数
将可变参数存储到命名参数中,方便迁移:
– 本文结束 –