欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

在Redis集群中使用pipeline批量插入的实现方法

程序员文章站 2022-06-24 23:23:30
由于项目中需要使用批量插入功能, 所以在网上查找到了redis 批量插入可以使用pipeline来高效的插入, 示例代码如下: string key = "key...

由于项目中需要使用批量插入功能, 所以在网上查找到了redis 批量插入可以使用pipeline来高效的插入, 示例代码如下:

string key = "key";
jedis jedis = new jedis("xx.xx.xx.xx");
pipeline p = jedis.pipelined();
list<string> mydata = .... //要插入的数据列表
for(string data: mydata){
  p.hset(key, data);
}
p.sync();
jedis.close();

但实际上遇到的问题是,项目上所用到的redis是集群,初始化的时候使用的类是jediscluster而不是jedis. 去查了jediscluster的文档, 并没有发现提供有像jedis一样的获取pipeline对象的 pipelined()方法.

google了一下, 发现了解决方案.

redis集群规范有说: redis 集群的键空间被分割为 16384 个槽(slot), 集群的最大节点数量也是 16384 个。每个主节点都负责处理 16384 个哈希槽的其中一部分。当我们说一个集群处于“稳定”(stable)状态时, 指的是集群没有在执行重配置(reconfiguration)操作, 每个哈希槽都只由一个节点进行处理。

所以我们可以根据要插入的key知道这个key所对应的槽的号码, 再通过这个槽的号码从集群中找到对应jedis. 具体实现如下

//初始化得到了jedis cluster, 如何获取hostandport集合代码就不写了

set<hostandport> nodes = .....

jediscluster jediscluster = new jediscluster(nodes);



map<string, jedispool> nodemap = jediscluster.getclusternodes();

string anyhost = nodemap.keyset().iterator().next();

//getslothostmap方法在下面有

treemap<long, string> slothostmap = getslothostmap(anyhost); 

  private static treemap<long, string> getslothostmap(string anyhostandportstr) {
    treemap<long, string> tree = new treemap<long, string>();
    string parts[] = anyhostandportstr.split(":");
    hostandport anyhostandport = new hostandport(parts[0], integer.parseint(parts[1]));
    try{
      jedis jedis = new jedis(anyhostandport.gethost(), anyhostandport.getport());
      list<object> list = jedis.clusterslots();
      for (object object : list) {
        list<object> list1 = (list<object>) object;
        list<object> master = (list<object>) list1.get(2);
        string hostandport = new string((byte[]) master.get(0)) + ":" + master.get(1);
        tree.put((long) list1.get(0), hostandport);
        tree.put((long) list1.get(1), hostandport);
      }
      jedis.close();
    }catch(exception e){
      
    }
    return tree;
  }

上面这几步可以在初始化的时候就完成. 不需要每次都调用, 把nodemap和slothostmap都定义为静态变量.

//获取槽号

int slot = jedisclustercrc16.getslot(key); 

//获取到对应的jedis对象

map.entry<long, string> entry = slothostmap.lowerentry(long.valueof(slot));

jedis jedis = nodemap.get(entry.getvalue()).getresource();

建议上面这步操作可以封装成一个静态方法, 比如命名为public static jedis getjedisbykey(string key) 之类的. 意思就是在集群中, 通过key获取到这个key所对应的jedis对象.

这样再通过上面的jedis.pipelined();来就可以进行批量插入了.

注:这个方法是从google上搜来的, 直到目前我使用起来还没发现什么问题. 如果哪位大神发现有什么不对的地方欢迎提出来.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。