欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

memcached java下性能测试报告、分析与问题讨论

程序员文章站 2024-01-10 14:16:13
...

我的项目原来使用静态HashMap来实现Key->Object的缓存,并且实现脏数据刷新.由于项目要改成集群部署.在单个jvm里运行的静态Hash结构已经无法处理脏数据问题.所以准备使用memcached做分布式缓存来解决.

从网上搜索到的资料来看 memcached能够接受较大量的请求.但其javaclient 由于大量使用同步语句、hashmap,读取流没有使用bufferedStream。造成性能并不好
为此在我的项目里我参考memcached的协议,自己实现一个客户端功能。经过测试后发现一些问题。

测试环境1:
windows xp home
迅驰T2250
1G内存
jdk1.5
server: memcached-1.2.1 nt版本
client: 自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器

测试方法
填充测试时填充10w个pojo。创建100个任务,每任务负责向memcached存储1000个pojo。
读取测试时读取10000个pojo 创建100个任务,每任务读取100个pojo。
平均值均按照线程内取平均值后,各线程再取平均值。
在进行上述测试的时候 cpu占用均在90%-100%,根据上述测试结果,开2个线程时效率最高。

thread count write avg/per obj write totall read avg/per obj read total
10 2.404991 ms 29s 1.709544 ms 2s
5 0.704780 ms 18s 1.333013 ms 2s
2 0.262194 ms 15s 0.414683 ms 2s






测试环境2:
AIX5.2
IBM P650  Power4 1.5G(64bit) *4
内存8G
jdk1.4
memcached-1.2.1
libevnet1.1b
client自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器

相同的测试方法。测试结果大跌眼睛  10线程时 读写速度为200ms/per object 比我的笔记本慢100倍。就没继续测试

测试环境3:
windows2000 server
xeon 1.5*4
内存8G
jdk1.5
测试时发现cpu占用不高于20%

thread count write avg/per obj write total read avg/per obj read total
20 10.266615ns 71s 23.21283ns 15s
10 4.341574ns 41s 13.30084ns 16s
5 1.298717ns 25s 9.33258ns 18s
2 1.298717ns 21s 4.02503ns 23s


 




初步测试到这里 发现的问题
1.是暂时没有达到网上宣传的1.5w个对象/s (目前测试cpu为瓶颈)
2.是aix下效率低的可怕,对aix不太熟悉。应该有些设置的参数,这点还要向大家请教。
3.超过2个线程效率就开始低。没有发挥多线程的优势,不知道大家在使用过程中有没有发现这一点,还是说我写的MemBufferedDriver有问题。

我的期望是读写速度均能稳定在0.1毫秒左右,至少 1w obj/s 这样才有可能不影响到现有系统的效率
另外大家在java下使用memcached的时候效率怎么样都是怎么用的呢

下面贴下我的client的实现
memcached的协议可以参考memcache包的doc/protocol.txt

主要是socket初次打开后不关闭,直接存放到ThreadLocal中,与memcached保持一个长练接。每次使用的时候判断连接可用还是需要重新连接。

其他存储和读取实现memcached协议。使用BufferedStream。
序列化采用实现Serializable,直接使用ObjectStream来实现。(由于都是pojo简单数据对象,尝试过实现Externalizable接口自己实现序列化和使用750个byte的String 来序列化,发现性能相差不多故放弃)

java 代码
MemBufferedDriver 为client实现
  1. public class MemBufferedDriver {   
  2.     /**  
  3.      * 存放 连到cacheserver的socket连接  
  4.      */  
  5.     private final static ThreadLocal sockPool = new ThreadLocal();   
  6.     private static String serverAddress = "localhost:11211";   
  7.   
  8.     public static final byte[] BYTE_GET = new byte[]{10310111632};   
  9.     public static final byte[] BYTE_SET = new byte[]{11510111632};   
  10.     public static final byte[] BYTE_DELETE = new byte[]{10010110810111610132};   
  11.     public static final byte[] BYTE_CRLF = new byte[]{1310};   
  12.     public static final byte[] BYTE_SPACE = new byte[]{32};   
  13.   
  14.     public static final String SERVER_STATUS_DELETED = "DELETED";   
  15.     public static final String SERVER_STATUS_NOT_FOUND = "NOT_FOUND";   
  16.     public static final String SERVER_STATUS_STORED = "STORED";   
  17.     public static final String SERVER_STATUS_ERROR = "ERROR";   
  18.     public static final String SERVER_STATUS_END = "END";   
  19.     public static final String SERVER_STATUS_VALUE = "VALUE";   
  20.   
  21.     public static final String ENCODING_TYPE = "UTF-8";   
  22.   
  23.   
  24.     public static Socket getSocket() throws UnknownHostException, IOException {   
  25.         Socket s = (Socket) MemBufferedDriver.sockPool.get();   
  26.         if (s == null || s.isClosed()) {   
  27.             s = MemBufferedDriver.reconnect();   
  28.             MemBufferedDriver.sockPool.set(s);   
  29.         }   
  30.         return s;   
  31.     }   
  32.   
  33.     private static Socket reconnect() throws UnknownHostException, IOException {   
  34.         String[] ip = MemBufferedDriver.serverAddress.split(":");   
  35.         return new Socket(ip[0], Integer.parseInt(ip[1]));   
  36.   
  37.     }   
  38.   
  39.   
  40.     public Map getMulti(String[] keys) {   
  41.         Map map = new HashMap();   
  42.         if (keys == null || keys.length <= 0return map;   
  43.   
  44.         for (int i = 0; i < keys.length; i++) {   
  45.             Object o = get(keys[i]);   
  46.             if (o != null) map.put(keys[i], o);   
  47.   
  48.         }   
  49.         return map;   
  50.     }   
  51.   
  52.     public Object[] getMultiArray(String[] keys) {   
  53.         if (keys == null || keys.length <= 0return null;   
  54.   
  55.         Object[] o = new Object[keys.length];   
  56.         for (int i = 0; i < keys.length; i++)   
  57.             o[i] = get(keys[i]);   
  58.   
  59.         return o;   
  60.     }   
  61.   
  62.     public boolean set(String key, Object obj) {   
  63.         try {   
  64.             if (obj == null || key == null || "".equals(key)) throw new Exception("对象和key 不能为空");   
  65.             Socket s = MemBufferedDriver.getSocket();   
  66.             BufferedInputStream in = new BufferedInputStream(s.getInputStream());   
  67.             BufferedOutputStream out = new BufferedOutputStream(s.getOutputStream());   
  68.   
  69.             key = encodeKey(key);   
  70.             int flag = 0;   
  71.   
  72.             //序列化对象   
  73.             byte[] bs = object2Byte(obj);   
  74.   
  75.             out.write(MemBufferedDriver.BYTE_SET);           //write cmd   
  76.             out.write(key.getBytes());     //write key   
  77.             out.write(MemBufferedDriver.BYTE_SPACE);   
  78.             out.write(String.valueOf(flag).getBytes());     //write flag   
  79.             out.write(MemBufferedDriver.BYTE_SPACE);   
  80.             out.write("0".getBytes());     //write expire date   
  81.             out.write(MemBufferedDriver.BYTE_SPACE);   
  82.             out.write(String.valueOf(bs.length).getBytes());     //object length   
  83.             out.write(MemBufferedDriver.BYTE_CRLF);   
  84.   
  85.             out.write(bs);   
  86.             out.write(MemBufferedDriver.BYTE_CRLF);   
  87.             out.flush();   
  88.   
  89.             String ret = readLine(in);   
  90.             return MemBufferedDriver.SERVER_STATUS_STORED.equals(ret);   
  91.         } catch (Exception e) {   
  92.             System.out.println(e.getMessage());   
  93.             return false;   
  94.         }   
  95.     }   
  96.   
  97.     public Object get(String key) {   
  98.         try {   
  99.             Socket s = MemBufferedDriver.getSocket();   
  100.             InputStream in = s.getInputStream();   
  101.             OutputStream out = s.getOutputStream();   
  102.             key = encodeKey(key);   
  103.             out.write(MemBufferedDriver.BYTE_GET);   
  104.             out.write(key.getBytes());   
  105.             out.write(MemBufferedDriver.BYTE_CRLF);   
  106.             out.flush();   
  107.   
  108.             return getObjectFromStream(in, out);   
  109.         } catch (Exception e) {   
  110.             System.out.println(e.getMessage());   
  111.             return null;   
  112.         }   
  113.     }   
  114.   
  115.     public boolean delete(String key) {   
  116.         try {   
  117.             Socket s = MemBufferedDriver.getSocket();   
  118.             InputStream in = s.getInputStream();   
  119.             OutputStream out = s.getOutputStream();   
  120.             key = encodeKey(key);   
  121.             out.write(MemBufferedDriver.BYTE_DELETE);   
  122.             out.write(key.getBytes());   
  123.             out.write(MemBufferedDriver.BYTE_CRLF);   
  124.             out.flush();   
  125.   
  126.             String ret = readLine(in);   
  127.             return MemBufferedDriver.SERVER_STATUS_DELETED.equals(ret) || MemBufferedDriver.SERVER_STATUS_NOT_FOUND.equals(ret);   
  128.         } catch (Exception e) {   
  129.             return false;   
  130.         }   
  131.     }   
  132.   
  133.     private Object getObjectFromStream(InputStream in, OutputStream out) throws IOException, ClassNotFoundException {   
  134.         String cmd = readLine(in);   
  135.         if (cmd.startsWith(MemBufferedDriver.SERVER_STATUS_VALUE)) {   
  136.             //return object   
  137.             String[] part = cmd.split(" ");   
  138.             String para = part[2];   
  139.             int length = Integer.parseInt(part[3]);   
  140.   
  141.             byte[] bs = new byte[length];   
  142.   
  143.             int count = 0;   
  144.             while (count < bs.length) count += in.read(bs, count, (bs.length - count));   
  145.             if (count != bs.length)   
  146.                 throw new IOException("读取数据长度错误");   
  147.             readLine(in);   
  148.             String endstr = readLine(in);   
  149.             if (MemBufferedDriver.SERVER_STATUS_END.equals(endstr))   
  150.                 return this.byte2Object(bs);   
  151.             else  
  152.                 System.out.println("结束标记错误");   
  153.   
  154.         }   
  155.         return null;   
  156.     }   
  157.   
  158.     private String encodeKey(String key) throws UnsupportedEncodingException {   
  159.         return URLEncoder.encode(key, MemBufferedDriver.ENCODING_TYPE);   
  160.     }   
  161.   
  162.   
  163.     private String readLine(InputStream in) throws IOException {   
  164.         ByteArrayOutputStream bos = new ByteArrayOutputStream();   
  165.         boolean eol = false;   
  166.         byte[] b = new byte[1];   
  167.         while (in.read(b, 01) != -1) {   
  168.             if (b[0] == 13) eol = true;   
  169.             else if (eol && b[0] == 10break;   
  170.             else  
  171.                 eol = false;   
  172.   
  173.             bos.write(b, 01);   
  174.         }   
  175.   
  176.   
  177.         if (bos.size() == 0return null;   
  178.         return bos.toString().trim();   
  179.     }   
  180.   
  181.   
  182.     private byte[] object2Byte(Object o) throws IOException {   
  183.         ByteArrayOutputStream b = new ByteArrayOutputStream();   
  184.         new ObjectOutputStream(b).writeObject(o);   
  185.         return b.toByteArray();   
  186.     }   
  187.   
  188.     private Object byte2Object(byte[] b) throws IOException, ClassNotFoundException {   
  189.         return new ObjectInputStream(new ByteArrayInputStream(b)).readObject();   
  190.     }   
  191.   
  192.   
  193.     public static void main(String[] args) throws Exception {   
  194.         MemBufferedDriver m = new MemBufferedDriver();   
  195.         System.out.println(m.set("a""DsSD"));   
  196.         System.out.println(m.get("a"));   
  197.     }   
  198.   
  199.     public static void setServerAddress(String serverAddress) {   
  200.         MemBufferedDriver.serverAddress = serverAddress;   
  201.     }   
  202. }  

 
java 代码
写入测试类

  1. public class Fill2Server extends Thread {   
  2.     public static int THREAD_COUNT = 2;   
  3.     public static Queue queue = new Queue();   
  4.     MemBufferedDriver md = new MemBufferedDriver();   
  5.   
  6.     public static void main(String[] args) throws Exception {   
  7.   
  8.         int size ;   
  9.         if (args.length == 3 && args[0] != null && args[1] != null) {   
  10.             MemDriver.setServerAddress(args[0]);   
  11.             size = Integer.parseInt(args[1]);   
  12.             THREAD_COUNT = Integer.parseInt(args[2]);   
  13.             new Fill2Server().doFill(size);   
  14.         } else  
  15.             System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2填充数量,不能小于10000,参数3为使用的线程数");   
  16.   
  17.     }   
  18.   
  19.     private void doFill(int size) throws InterruptedException {   
  20.         int taskCount = size / 1000;   //每个线程负责填充1000的对象   
  21.         for (int i = 0; i < taskCount; i++) {   
  22.             Task t = new Task();   
  23.             t.setTaskId(String.valueOf(i));   
  24.             queue.add(t);   
  25.         }   
  26.   
  27.         long time = System.currentTimeMillis();   
  28.         Thread tr[] = new Thread[THREAD_COUNT];   
  29.         for (int i = 0; i < THREAD_COUNT; i++) {   
  30.             FillThread ft = new FillThread();   
  31.             (tr[i] = new Thread(ft)).start();   
  32.         }   
  33.   
  34.         //监控填充完成   
  35.         while (true) {   
  36.             boolean flag = true;   
  37.             for (int i = 0; i < THREAD_COUNT; i++)   
  38.                 flag &= tr[i].isAlive();   
  39.   
  40.             if (!flag) break;   
  41.   
  42.             Thread.sleep(1000);   
  43.         }   
  44.   
  45.         time = System.currentTimeMillis() - time;   
  46.         System.out.println("任务完成,共用" + (time / 1000) + "s");   
  47.     }   
  48.   
  49.     class FillThread implements Runnable {   
  50.         public void run() {   
  51.             Task task;   
  52.             while (true) {   
  53.                 task = (Task) queue.get();   
  54.                 if (task == nullbreak;   
  55.                 long time = System.nanoTime();   
  56.                 for (int i = 0; i < 1000; i++) {   
  57.                     TestBO b = new TestBO();   
  58.                     md.set(task.getTaskId() + i, b);   
  59.                 }   
  60.                 time = System.nanoTime() - time;   
  61.                 System.out.println(Thread.currentThread().getName() + " avg " + (time / 1000) + " ns ");   
  62.             }   
  63.         }   
  64.     }   
  65. }  

 

java 代码
读取的测试方法
  1. public class GetFromServer extends Thread {   
  2.     public static int THREAD_COUNT = 2;   
  3.     public static Queue queue = new Queue();   
  4.     MemDriver md = new MemDriver();   
  5.   
  6.     public static void main(String[] args) throws Exception {   
  7.   
  8.         int size;   
  9.         if (args.length == 3 && args[0] != null && args[1] != null) {   
  10.             MemDriver.setServerAddress(args[0]);   
  11.             size = Integer.parseInt(args[1]);   
  12.             THREAD_COUNT = Integer.parseInt(args[2]);   
  13.             new GetFromServer().doFill(size);   
  14.         } else  
  15.             System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2读取数量不能小于1000,参数3为使用的线程数");   
  16.   
  17.     }   
  18.   
  19.     private void doFill(int size) throws InterruptedException {   
  20.         int taskCount = size / 100;   //每个线程负责填充1000的对象   
  21.         for (int i = 0; i < taskCount; i++) {   
  22.             Task t = new Task();   
  23.             t.setTaskId(String.valueOf(i));   
  24.             GetFromServer.queue.add(t);   
  25.         }   
  26.   
  27.         long time = System.currentTimeMillis();   
  28.         Thread tr[] = new Thread[GetFromServer.THREAD_COUNT];   
  29.         for (int i = 0; i < GetFromServer.THREAD_COUNT; i++) {   
  30.             GetFromServer.FillThread ft = new GetFromServer.FillThread();   
  31.             (tr[i] = new Thread(ft)).start();   
  32.         }   
  33.   
  34.         //监控填充完成   
  35.         while (true) {   
  36.             boolean flag = true;   
  37.             for (int i = 0; i < GetFromServer.THREAD_COUNT; i++)   
  38.                 flag &= tr[i].isAlive();   
  39.   
  40.             if (!flag) break;   
  41.   
  42.             Thread.sleep(1000);   
  43.         }   
  44.   
  45.   
  46.         time = System.currentTimeMillis() - time;   
  47.         System.out.println("任务完成,共用" + (time / 1000) + "s");   
  48.     }   
  49.   
  50.   
  51.     class FillThread implements