memcached java下性能测试报告、分析与问题讨论
我的项目原来使用静态HashMap来实现Key->Object的缓存,并且实现脏数据刷新.由于项目要改成集群部署.在单个jvm里运行的静态Hash结构已经无法处理脏数据问题.所以准备使用memcached做分布式缓存来解决.
从网上搜索到的资料来看 memcached能够接受较大量的请求.但其javaclient 由于大量使用同步语句、hashmap,读取流没有使用bufferedStream。造成性能并不好
为此在我的项目里我参考memcached的协议,自己实现一个客户端功能。经过测试后发现一些问题。
测试环境1:
windows xp home
迅驰T2250
1G内存
jdk1.5
server: memcached-1.2.1 nt版本
client: 自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器
测试方法
填充测试时填充10w个pojo。创建100个任务,每任务负责向memcached存储1000个pojo。
读取测试时读取10000个pojo 创建100个任务,每任务读取100个pojo。
平均值均按照线程内取平均值后,各线程再取平均值。
在进行上述测试的时候 cpu占用均在90%-100%,根据上述测试结果,开2个线程时效率最高。
thread count | write avg/per obj | write totall | read avg/per obj | read total |
10 | 2.404991 ms | 29s | 1.709544 ms | 2s |
5 | 0.704780 ms | 18s | 1.333013 ms | 2s |
2 | 0.262194 ms | 15s | 0.414683 ms | 2s |
测试环境2:
AIX5.2
IBM P650 Power4 1.5G(64bit) *4
内存8G
jdk1.4
memcached-1.2.1
libevnet1.1b
client自己编写
用于传输的pojo 40个属性,序列化后为750个byte
测试client与server同台机器
相同的测试方法。测试结果大跌眼睛 10线程时 读写速度为200ms/per object 比我的笔记本慢100倍。就没继续测试
测试环境3:
windows2000 server
xeon 1.5*4
内存8G
jdk1.5
测试时发现cpu占用不高于20%
thread count | write avg/per obj | write total | read avg/per obj | read total |
20 | 10.266615ns | 71s | 23.21283ns | 15s |
10 | 4.341574ns | 41s | 13.30084ns | 16s |
5 | 1.298717ns | 25s | 9.33258ns | 18s |
2 | 1.298717ns | 21s | 4.02503ns | 23s |
初步测试到这里 发现的问题
1.是暂时没有达到网上宣传的1.5w个对象/s (目前测试cpu为瓶颈)
2.是aix下效率低的可怕,对aix不太熟悉。应该有些设置的参数,这点还要向大家请教。
3.超过2个线程效率就开始低。没有发挥多线程的优势,不知道大家在使用过程中有没有发现这一点,还是说我写的MemBufferedDriver有问题。
我的期望是读写速度均能稳定在0.1毫秒左右,至少 1w obj/s 这样才有可能不影响到现有系统的效率
另外大家在java下使用memcached的时候效率怎么样都是怎么用的呢
下面贴下我的client的实现
memcached的协议可以参考memcache包的doc/protocol.txt
主要是socket初次打开后不关闭,直接存放到ThreadLocal中,与memcached保持一个长练接。每次使用的时候判断连接可用还是需要重新连接。
其他存储和读取实现memcached协议。使用BufferedStream。
序列化采用实现Serializable,直接使用ObjectStream来实现。(由于都是pojo简单数据对象,尝试过实现Externalizable接口自己实现序列化和使用750个byte的String 来序列化,发现性能相差不多故放弃)
MemBufferedDriver 为client实现
- public class MemBufferedDriver {
- /**
- * 存放 连到cacheserver的socket连接
- */
- private final static ThreadLocal sockPool = new ThreadLocal();
- private static String serverAddress = "localhost:11211";
- public static final byte[] BYTE_GET = new byte[]{103, 101, 116, 32};
- public static final byte[] BYTE_SET = new byte[]{115, 101, 116, 32};
- public static final byte[] BYTE_DELETE = new byte[]{100, 101, 108, 101, 116, 101, 32};
- public static final byte[] BYTE_CRLF = new byte[]{13, 10};
- public static final byte[] BYTE_SPACE = new byte[]{32};
- public static final String SERVER_STATUS_DELETED = "DELETED";
- public static final String SERVER_STATUS_NOT_FOUND = "NOT_FOUND";
- public static final String SERVER_STATUS_STORED = "STORED";
- public static final String SERVER_STATUS_ERROR = "ERROR";
- public static final String SERVER_STATUS_END = "END";
- public static final String SERVER_STATUS_VALUE = "VALUE";
- public static final String ENCODING_TYPE = "UTF-8";
- public static Socket getSocket() throws UnknownHostException, IOException {
- Socket s = (Socket) MemBufferedDriver.sockPool.get();
- if (s == null || s.isClosed()) {
- s = MemBufferedDriver.reconnect();
- MemBufferedDriver.sockPool.set(s);
- }
- return s;
- }
- private static Socket reconnect() throws UnknownHostException, IOException {
- String[] ip = MemBufferedDriver.serverAddress.split(":");
- return new Socket(ip[0], Integer.parseInt(ip[1]));
- }
- public Map getMulti(String[] keys) {
- Map map = new HashMap();
- if (keys == null || keys.length <= 0) return map;
- for (int i = 0; i < keys.length; i++) {
- Object o = get(keys[i]);
- if (o != null) map.put(keys[i], o);
- }
- return map;
- }
- public Object[] getMultiArray(String[] keys) {
- if (keys == null || keys.length <= 0) return null;
- Object[] o = new Object[keys.length];
- for (int i = 0; i < keys.length; i++)
- o[i] = get(keys[i]);
- return o;
- }
- public boolean set(String key, Object obj) {
- try {
- if (obj == null || key == null || "".equals(key)) throw new Exception("对象和key 不能为空");
- Socket s = MemBufferedDriver.getSocket();
- BufferedInputStream in = new BufferedInputStream(s.getInputStream());
- BufferedOutputStream out = new BufferedOutputStream(s.getOutputStream());
- key = encodeKey(key);
- int flag = 0;
- //序列化对象
- byte[] bs = object2Byte(obj);
- out.write(MemBufferedDriver.BYTE_SET); //write cmd
- out.write(key.getBytes()); //write key
- out.write(MemBufferedDriver.BYTE_SPACE);
- out.write(String.valueOf(flag).getBytes()); //write flag
- out.write(MemBufferedDriver.BYTE_SPACE);
- out.write("0".getBytes()); //write expire date
- out.write(MemBufferedDriver.BYTE_SPACE);
- out.write(String.valueOf(bs.length).getBytes()); //object length
- out.write(MemBufferedDriver.BYTE_CRLF);
- out.write(bs);
- out.write(MemBufferedDriver.BYTE_CRLF);
- out.flush();
- String ret = readLine(in);
- return MemBufferedDriver.SERVER_STATUS_STORED.equals(ret);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- return false;
- }
- }
- public Object get(String key) {
- try {
- Socket s = MemBufferedDriver.getSocket();
- InputStream in = s.getInputStream();
- OutputStream out = s.getOutputStream();
- key = encodeKey(key);
- out.write(MemBufferedDriver.BYTE_GET);
- out.write(key.getBytes());
- out.write(MemBufferedDriver.BYTE_CRLF);
- out.flush();
- return getObjectFromStream(in, out);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- return null;
- }
- }
- public boolean delete(String key) {
- try {
- Socket s = MemBufferedDriver.getSocket();
- InputStream in = s.getInputStream();
- OutputStream out = s.getOutputStream();
- key = encodeKey(key);
- out.write(MemBufferedDriver.BYTE_DELETE);
- out.write(key.getBytes());
- out.write(MemBufferedDriver.BYTE_CRLF);
- out.flush();
- String ret = readLine(in);
- return MemBufferedDriver.SERVER_STATUS_DELETED.equals(ret) || MemBufferedDriver.SERVER_STATUS_NOT_FOUND.equals(ret);
- } catch (Exception e) {
- return false;
- }
- }
- private Object getObjectFromStream(InputStream in, OutputStream out) throws IOException, ClassNotFoundException {
- String cmd = readLine(in);
- if (cmd.startsWith(MemBufferedDriver.SERVER_STATUS_VALUE)) {
- //return object
- String[] part = cmd.split(" ");
- String para = part[2];
- int length = Integer.parseInt(part[3]);
- byte[] bs = new byte[length];
- int count = 0;
- while (count < bs.length) count += in.read(bs, count, (bs.length - count));
- if (count != bs.length)
- throw new IOException("读取数据长度错误");
- readLine(in);
- String endstr = readLine(in);
- if (MemBufferedDriver.SERVER_STATUS_END.equals(endstr))
- return this.byte2Object(bs);
- else
- System.out.println("结束标记错误");
- }
- return null;
- }
- private String encodeKey(String key) throws UnsupportedEncodingException {
- return URLEncoder.encode(key, MemBufferedDriver.ENCODING_TYPE);
- }
- private String readLine(InputStream in) throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- boolean eol = false;
- byte[] b = new byte[1];
- while (in.read(b, 0, 1) != -1) {
- if (b[0] == 13) eol = true;
- else if (eol && b[0] == 10) break;
- else
- eol = false;
- bos.write(b, 0, 1);
- }
- if (bos.size() == 0) return null;
- return bos.toString().trim();
- }
- private byte[] object2Byte(Object o) throws IOException {
- ByteArrayOutputStream b = new ByteArrayOutputStream();
- new ObjectOutputStream(b).writeObject(o);
- return b.toByteArray();
- }
- private Object byte2Object(byte[] b) throws IOException, ClassNotFoundException {
- return new ObjectInputStream(new ByteArrayInputStream(b)).readObject();
- }
- public static void main(String[] args) throws Exception {
- MemBufferedDriver m = new MemBufferedDriver();
- System.out.println(m.set("a", "DsSD"));
- System.out.println(m.get("a"));
- }
- public static void setServerAddress(String serverAddress) {
- MemBufferedDriver.serverAddress = serverAddress;
- }
- }
java 代码
写入测试类
- public class Fill2Server extends Thread {
- public static int THREAD_COUNT = 2;
- public static Queue queue = new Queue();
- MemBufferedDriver md = new MemBufferedDriver();
- public static void main(String[] args) throws Exception {
- int size ;
- if (args.length == 3 && args[0] != null && args[1] != null) {
- MemDriver.setServerAddress(args[0]);
- size = Integer.parseInt(args[1]);
- THREAD_COUNT = Integer.parseInt(args[2]);
- new Fill2Server().doFill(size);
- } else
- System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2填充数量,不能小于10000,参数3为使用的线程数");
- }
- private void doFill(int size) throws InterruptedException {
- int taskCount = size / 1000; //每个线程负责填充1000的对象
- for (int i = 0; i < taskCount; i++) {
- Task t = new Task();
- t.setTaskId(String.valueOf(i));
- queue.add(t);
- }
- long time = System.currentTimeMillis();
- Thread tr[] = new Thread[THREAD_COUNT];
- for (int i = 0; i < THREAD_COUNT; i++) {
- FillThread ft = new FillThread();
- (tr[i] = new Thread(ft)).start();
- }
- //监控填充完成
- while (true) {
- boolean flag = true;
- for (int i = 0; i < THREAD_COUNT; i++)
- flag &= tr[i].isAlive();
- if (!flag) break;
- Thread.sleep(1000);
- }
- time = System.currentTimeMillis() - time;
- System.out.println("任务完成,共用" + (time / 1000) + "s");
- }
- class FillThread implements Runnable {
- public void run() {
- Task task;
- while (true) {
- task = (Task) queue.get();
- if (task == null) break;
- long time = System.nanoTime();
- for (int i = 0; i < 1000; i++) {
- TestBO b = new TestBO();
- md.set(task.getTaskId() + i, b);
- }
- time = System.nanoTime() - time;
- System.out.println(Thread.currentThread().getName() + " avg " + (time / 1000) + " ns ");
- }
- }
- }
- }
读取的测试方法
- public class GetFromServer extends Thread {
- public static int THREAD_COUNT = 2;
- public static Queue queue = new Queue();
- MemDriver md = new MemDriver();
- public static void main(String[] args) throws Exception {
- int size;
- if (args.length == 3 && args[0] != null && args[1] != null) {
- MemDriver.setServerAddress(args[0]);
- size = Integer.parseInt(args[1]);
- THREAD_COUNT = Integer.parseInt(args[2]);
- new GetFromServer().doFill(size);
- } else
- System.out.println("参数1 连接服务器地址 ipaddress:port ,参数2读取数量不能小于1000,参数3为使用的线程数");
- }
- private void doFill(int size) throws InterruptedException {
- int taskCount = size / 100; //每个线程负责填充1000的对象
- for (int i = 0; i < taskCount; i++) {
- Task t = new Task();
- t.setTaskId(String.valueOf(i));
- GetFromServer.queue.add(t);
- }
- long time = System.currentTimeMillis();
- Thread tr[] = new Thread[GetFromServer.THREAD_COUNT];
- for (int i = 0; i < GetFromServer.THREAD_COUNT; i++) {
- GetFromServer.FillThread ft = new GetFromServer.FillThread();
- (tr[i] = new Thread(ft)).start();
- }
- //监控填充完成
- while (true) {
- boolean flag = true;
- for (int i = 0; i < GetFromServer.THREAD_COUNT; i++)
- flag &= tr[i].isAlive();
- if (!flag) break;
- Thread.sleep(1000);
- }
- time = System.currentTimeMillis() - time;
- System.out.println("任务完成,共用" + (time / 1000) + "s");
- }
- class FillThread implements