欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

redis通过pipeline提升吞吐量的方法

程序员文章站 2024-01-17 11:06:28
案例目标 简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用。 案例背景 应用系统在数据推送或事件处理过程中...

案例目标

简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用。

案例背景

应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元;

然而在某些服务中,数据操作对redis 是强依赖的,在最近的一次分析中发现:

一次数据推送会对 redis 产生近30次读写操作!

在数据推送业务中的性能压测中,以数据上报 -> 下发应答为一次事务;而对于这样的读写模型,redis 的操作过于频繁,很快便导致系统延时过高,吞吐量低下,无法满足目标;

优化过程 主要针对业务代码做的优化,其中redis 操作经过大量合并,最终降低到原来的1/5,而系统吞吐量也提升明显。

其中,redis pipeline(管道机制) 的应用是一个关键手段。

pipeline的解释

pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。

管道技术使用广泛,例如许多pop3协议已经实现支持这个功能,大大加快了从服务器下载新邮件的过程。 redis很早就支持管道(pipeline)技术。(因此无论你运行的是什么版本,你都可以使用管道(pipelining)操作redis)

普通请求模型

redis通过pipeline提升吞吐量的方法

[图-pipeline1]

pipeline请求模型

redis通过pipeline提升吞吐量的方法

[图-pipeline2]

从两个图的对比中可看出,普通的请求模型是同步的,每次请求对应一次io操作等待;

而pipeline 化之后所有的请求合并为一次io,除了时延可以降低之外,还能大幅度提升系统吞吐量。

代码实例

说明

本地开启50个线程,每个线程完成1000个key的写入,对比pipeline开启及不开启两种场景下的性能表现。

相关常量

// 并发任务
private static final int taskcount = 50;
// pipeline大小
private static final int batchsize = 10;
// 每个任务处理命令数
private static final int cmdcount = 1000;

private static final boolean usepipeline = true;

初始化连接

jedispoolconfig poolconfig = new jedispoolconfig();
poolconfig.setmaxactive(200);
poolconfig.setmaxidle(100);
poolconfig.setmaxwait(2000);
poolconfig.settestonborrow(false);
poolconfig.settestonreturn(false);

jedispool = new jedispool(poolconfig, host, port);

并发启动任务,统计执行时间

public static void main(string[] args) throws interruptedexception {
  init();

  flushdb();

  long t1 = system.currenttimemillis();
  executorservice executor = executors.newcachedthreadpool();

  countdownlatch latch = new countdownlatch(taskcount);
  for (int i = 0; i < taskcount; i++) {
   executor.submit(new demotask(i, latch));
  }

  latch.await();
  executor.shutdownnow();

  long t2 = system.currenttimemillis();

  system.out.println("execution finish time(s):" + (t2 - t1) / 1000.0);

 }

demotask 封装了执行key写入的细节,区分不同场景

 public void run() {
   logger.info("task[{}] start.", id);
   try {
    if (usepipeline) {
     runwithpipeline();
    } else {
     runwithnonpipeline();
    }
   } finally {
    latch.countdown();
   }

   logger.info("task[{}] end.", id);
  }

不使用pipeline的场景比较简单,循环执行set操作

for (int i = 0; i < cmdcount; i++) {
    jedis jedis = get();
    try {
     jedis.set(key(i), uuid.randomuuid().tostring());
    } finally {
     if (jedis != null) {
      jedispool.returnresource(jedis);
     }
    }
    if (i % batchsize == 0) {
     logger.info("task[{}] process -- {}", id, i);
    }
   }

使用pipeline,需要处理分段,如10个作为一批命令执行

for (int i = 0; i < cmdcount;) {
    jedis jedis = get();

    try {
     pipeline pipeline = jedis.pipelined();
     int j;
     for (j = 0; j < batchsize; j++) {
      if (i + j < cmdcount) {
       pipeline.set(key(i + j), uuid.randomuuid().tostring());
      } else {
       break;
      }
     }
     pipeline.sync();
     logger.info("task[{}] pipeline -- {}", id, i + j);

     i += j;

    } finally {
     if (jedis != null) {
      jedispool.returnresource(jedis);
     }
    }

   }

运行结果

不使用pipeline,整体执行26s;而使用pipeline优化后的代码,执行时间仅需要3s!

nopipeline-stat redis通过pipeline提升吞吐量的方法

[图-nopipeline]

pipeline-stat

redis通过pipeline提升吞吐量的方法

[图-pipeline]

注意事项

pipeline机制可以优化吞吐量,但无法提供原子性/事务保障,而这个可以通过redis-multi等命令实现。

参考这里

部分读写操作存在相关依赖,无法使用pipeline实现,可利用script机制,但需要在可维护性方面做好取舍。

扩展阅读

官方文档-redis-pipelining

官方文档-redis-transaction

以上这篇redis通过pipeline提升吞吐量的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。