redis中的Pipeline
遇到的问题
在使用redis的过程中,尤其是在做大数据“实时计算”的过程中,也许会经常遇到下列场景:比如网站每个页面的实时pv运算,使用storm(或者spark streaming)从kafka中消费实时点击流数据进行统计计算;并计算好的结果放入redis进行存储,如果redis中已经存在 需要先取出来与最新统计数据相加再放入redis;另外为了减少与redis的交互次数,降低redis的存取压力,一般不会消费一条数据后就立即放入redis,而是处理一批数据后再存入redis,大致流程如下:
网站实时pv计算中可能比这个图的处理要更复杂些,但大致流程也就这样了。这里我们重点看下redis的相关处理,java伪代码实现如下:
public static void main(String[] args) { Redis redis = null;//省略redis实例化过程 //省略storm计算过程,这里假设已经计算好,并放入一个MAP中 Map<String,Integer> pagesPv = new HashMap(); pagesPv.put("page_index",4); pagesPv.put("page_product",2); pagesPv.put("page_cart",3); pagesPv.put("page_order",1); for(String key:pagesPv.keySet()){ redis.incrBy(key,pagesPv.get(key));//incrBy是redis中的自增操作 } }
首先说下redis中的incrBy方法,可以实现数字类型的自增操作。也可以用redis的get方法先获取到老数据,然后加上新增的值,再调用redis的set方法写入redis。但这就跟redis有两次操作,这里更推荐是用incrBy。
咋一看上述代码其实没啥问题,这里Map里只有4条数据,也就是说上述的for循环需要与redis服务器之间做4次远程调用。但一个稍微知名一点的网站,每分钟可能会数万级别的访问量,对应到这个Map里可能会有数百个页面,也就是说这个真实的情况下,如果使用上述代码的话,这个for循环中与redis服务之间会有数百次远程调用操作,对redis服务来说这无疑是一笔较大的开销。讲到这里该是pipeline登场的时候了。
Pipeline是什么
Pipeline其实是redis的java客户端对redis的基本事务的调用封装。这里所谓的“基本事务”与传统的关系型数据库事务回滚不同,而是按照顺序执行一批命令,在redis中是使用MULTI和EXEC命令实现的,而Pipeline只是对这两个命令的java客户端封装而已。
在redis的客户端中执行事务分三步:首先执行MULTI命令;然后输入一批我们想要执行的其他操作(比如一批set操作),这批操作会被写到一个队列里;最后再执行EXEC命令,客户端会把这上述队列“一次性”的发生到服务端,并等待服务端返回。
再来看服务端:服务端接收到这个命令操作队列后,按照顺序一个一个执行,每个命令操作都会对应一个返回值,待队列中所有的命令都执行完成后“一次性”把这些返回值拼装成list返回给客户端。并且这个返回值list的数量和顺序与命令队列一致。整个pipeline的执行过程结束,在有redis需要命令批量执行的情况下,使用pipeline可以大大减少redis客户端与服务端交互次数,从而提升多命令的执行性能。
回到文章开头的场景,假设Map里有500个值,for循环中就是执行500次incrBy操作,对应的与服务端的交互就需要500次。如果使用pipeline,只需要交互两次即可完成,java伪代码实现如下:
public static void main(String[] args) { Redis redis = null;//省略redis实例化过程 Map<String,Integer> pagesPv = new HashMap(); pagesPv.put("page_index",4); pagesPv.put("page_product",2); pagesPv.put("page_cart",3); pagesPv.put("page_order",1); Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例 for(String key:pagesPv.keySet()){ //注意这里调用Pipeline的incrBy方法,此时命令并没有执行 pipeline.incrBy(key,pagesPv.get(key)); } //执行sync方法,此时批量向服务端提交命令,并等待返回 pipeline.sync(); }
整个过程大致分为三步:
1、通过redis.pipelined()方法,获取pipeline实例,与上述讲解中的MULTI命令对应。
2、以前调用的是redis. incrBy()方法,现在改为调用pipeline的incrBy方法,注意此时不会向服务端发起调用请求,只是把命令写入队列。
3、执行pipeline.sync()方法,与上述流程中的EXEC命令对应,此时会把第2步中的命令队列一次性的提交给服务端,并等待服务端返回。
Pipeline是非线程安全的
Pipeline能提升性能,而且使用起来也非常方便,但使用的时候一定要注意一点“Pipeline是非线程安全的”。也就是说多个线程如果公用一个Pipeline实例,会出现线程安全问题,典型的就是数据返回结果错乱。正确的用法是在每次需要用到Pipeline的地方,都新建一个实例即:
Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例
为什么在多线程下是线程不安全的,其实很好理解,看下图:
A、B两个线程共用一个pipeline实例,同时向redis服务端提交5个命令,各自都期望收到5个返回值。但真实的结果是有一个收到10个结果,有一个会失败,这其实不是我们期望的。
在使用Spring框架的java程序中,redis的客户端对象是线程安全的,可以单例注入spring容器,在需要redis客户端的地方直接使用@Resource直接获取即可。
@Resource private Redis redis; public void method(){ redis.xxx; }
但pipeline是非线程安全的,每次都必须新建实例,如果你的代码中出现了下列代码,请注意会有线程安全问题,请及时修正:
//错误的写法,pipeline被单例注入了spring容器,全局复用 @Resource private Pipeline pipeline; public void method(){ pipeline.xxx; }
Spring中Pipeline的正确用法:
@Resource private Redis redis; public void method(){ Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例 pipeline.xxx;//多个操作 pipeline. sync(); }
最后提一点,Pipeline是一次提交一个队列给服务端,这个队列如果太大会占用更多内存,以及增加网络传输时间。所以 Pipeline里一次提交的命令数也不要太多,根据实际数据量大小 一般几百条还是可以的。
好了,关于redis中的pipeline就总结到这里。
出处
http://moon-walker.iteye.com/blog/2397962
上一篇: java中的线程安全
推荐阅读