欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

redis中的Pipeline

程序员文章站 2022-07-13 16:47:00
...

遇到的问题

 

在使用redis的过程中,尤其是在做大数据“实时计算”的过程中,也许会经常遇到下列场景:比如网站每个页面的实时pv运算,使用storm(或者spark streaming)从kafka中消费实时点击流数据进行统计计算;并计算好的结果放入redis进行存储,如果redis中已经存在 需要先取出来与最新统计数据相加再放入redis;另外为了减少与redis的交互次数,降低redis的存取压力,一般不会消费一条数据后就立即放入redis,而是处理一批数据后再存入redis,大致流程如下:

 


redis中的Pipeline
            
    
    博客分类: redis大数据 redis中的pipelinejava redis pipeline 
 

 

网站实时pv计算中可能比这个图的处理要更复杂些,但大致流程也就这样了。这里我们重点看下redis的相关处理,java伪代码实现如下:

 

 

public static void main(String[] args) {
        Redis redis = null;//省略redis实例化过程
        //省略storm计算过程,这里假设已经计算好,并放入一个MAP中
        Map<String,Integer> pagesPv = new HashMap();
        pagesPv.put("page_index",4);
        pagesPv.put("page_product",2);
        pagesPv.put("page_cart",3);
        pagesPv.put("page_order",1);
       
        for(String key:pagesPv.keySet()){
            redis.incrBy(key,pagesPv.get(key));//incrBy是redis中的自增操作
        }
    }

 

 

首先说下redis中的incrBy方法,可以实现数字类型的自增操作。也可以用redisget方法先获取到老数据,然后加上新增的值,再调用redisset方法写入redis。但这就跟redis有两次操作,这里更推荐是用incrBy

 

咋一看上述代码其实没啥问题,这里Map里只有4条数据,也就是说上述的for循环需要与redis服务器之间做4次远程调用。但一个稍微知名一点的网站,每分钟可能会数万级别的访问量,对应到这个Map里可能会有数百个页面,也就是说这个真实的情况下,如果使用上述代码的话,这个for循环中与redis服务之间会有数百次远程调用操作,对redis服务来说这无疑是一笔较大的开销。讲到这里该是pipeline登场的时候了。

 

Pipeline是什么

 

Pipeline其实是redisjava客户端对redis的基本事务的调用封装。这里所谓的基本事务与传统的关系型数据库事务回滚不同,而是按照顺序执行一批命令,在redis中是使用MULTIEXEC命令实现的,而Pipeline只是对这两个命令的java客户端封装而已。

 

redis的客户端中执行事务分三步:首先执行MULTI命令;然后输入一批我们想要执行的其他操作(比如一批set操作),这批操作会被写到一个队列里;最后再执行EXEC命令,客户端会把这上述队列“一次性”的发生到服务端,并等待服务端返回。

 

再来看服务端:服务端接收到这个命令操作队列后,按照顺序一个一个执行,每个命令操作都会对应一个返回值,待队列中所有的命令都执行完成后“一次性”把这些返回值拼装成list返回给客户端。并且这个返回值list的数量和顺序与命令队列一致。整个pipeline的执行过程结束,在有redis需要命令批量执行的情况下,使用pipeline可以大大减少redis客户端与服务端交互次数,从而提升多命令的执行性能。

 

回到文章开头的场景,假设Map里有500个值,for循环中就是执行500incrBy操作,对应的与服务端的交互就需要500次。如果使用pipeline,只需要交互两次即可完成,java伪代码实现如下:

 

public static void main(String[] args) {
        Redis redis = null;//省略redis实例化过程
        Map<String,Integer> pagesPv = new HashMap();
        pagesPv.put("page_index",4);
        pagesPv.put("page_product",2);
        pagesPv.put("page_cart",3);
        pagesPv.put("page_order",1);
 
        Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例
        for(String key:pagesPv.keySet()){
            //注意这里调用Pipeline的incrBy方法,此时命令并没有执行
            pipeline.incrBy(key,pagesPv.get(key));
        }
        //执行sync方法,此时批量向服务端提交命令,并等待返回
        pipeline.sync();
}
 

 

整个过程大致分为三步:

1、通过redis.pipelined()方法,获取pipeline实例,与上述讲解中的MULTI命令对应。

2、以前调用的是redis. incrBy()方法,现在改为调用pipelineincrBy方法,注意此时不会向服务端发起调用请求,只是把命令写入队列。

3、执行pipeline.sync()方法,与上述流程中的EXEC命令对应,此时会把第2步中的命令队列一次性的提交给服务端,并等待服务端返回。

 

Pipeline是非线程安全的

 

Pipeline能提升性能,而且使用起来也非常方便,但使用的时候一定要注意一点“Pipeline是非线程安全的。也就是说多个线程如果公用一个Pipeline实例,会出现线程安全问题,典型的就是数据返回结果错乱。正确的用法是在每次需要用到Pipeline的地方,都新建一个实例即:

Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例

为什么在多线程下是线程不安全的,其实很好理解,看下图:


redis中的Pipeline
            
    
    博客分类: redis大数据 redis中的pipelinejava redis pipeline 
 

AB两个线程共用一个pipeline实例,同时向redis服务端提交5个命令,各自都期望收到5个返回值。但真实的结果是有一个收到10个结果,有一个会失败,这其实不是我们期望的。

 

在使用Spring框架的java程序中,redis的客户端对象是线程安全的,可以单例注入spring容器,在需要redis客户端的地方直接使用@Resource直接获取即可。

 
@Resource
private Redis redis;
public void method(){
redis.xxx;
}

 

pipeline是非线程安全的,每次都必须新建实例,如果你的代码中出现了下列代码,请注意会有线程安全问题,请及时修正:

 

//错误的写法,pipeline被单例注入了spring容器,全局复用
@Resource
private Pipeline pipeline;
public void method(){
pipeline.xxx;
}

SpringPipeline的正确用法:

 
@Resource
private Redis redis;
public void method(){
    Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例
    pipeline.xxx;//多个操作
    pipeline. sync();
}

 

 

最后提一点,Pipeline是一次提交一个队列给服务端,这个队列如果太大会占用更多内存,以及增加网络传输时间。所以 Pipeline里一次提交的命令数也不要太多,根据实际数据量大小  一般几百条还是可以的。

 

好了,关于redis中的pipeline就总结到这里。

 

出处

http://moon-walker.iteye.com/blog/2397962

 

  • redis中的Pipeline
            
    
    博客分类: redis大数据 redis中的pipelinejava redis pipeline 
  • 大小: 20.5 KB
  • redis中的Pipeline
            
    
    博客分类: redis大数据 redis中的pipelinejava redis pipeline 
  • 大小: 82.5 KB