Java利用Redis实现消息队列的示例代码
程序员文章站
2022-06-29 20:09:58
本文介绍了java利用redis实现消息队列的示例代码,分享给大家,具体如下:
应用场景
为什么要用redis?
二进制存储、java序列化传输、io...
本文介绍了java利用redis实现消息队列的示例代码,分享给大家,具体如下:
应用场景
为什么要用redis?
二进制存储、java序列化传输、io连接数高、连接频繁
一、序列化
这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了bytearrayoutputstream和bytearrayinputstream; 注意:每个需要序列化的对象都要实现serializable接口;
其代码如下:
package utils; import java.io.*; /** * created by kinglf on 2016/10/17. */ public class objectutil { /** * 对象转byte[] * @param obj * @return * @throws ioexception */ public static byte[] object2bytes(object obj) throws ioexception{ bytearrayoutputstream bo=new bytearrayoutputstream(); objectoutputstream oo=new objectoutputstream(bo); oo.writeobject(obj); byte[] bytes=bo.tobytearray(); bo.close(); oo.close(); return bytes; } /** * byte[]转对象 * @param bytes * @return * @throws exception */ public static object bytes2object(byte[] bytes) throws exception{ bytearrayinputstream in=new bytearrayinputstream(bytes); objectinputstream sin=new objectinputstream(in); return sin.readobject(); } }
二、消息类(实现serializable接口)
package model; import java.io.serializable; /** * created by kinglf on 2016/10/17. */ public class message implements serializable { private static final long serialversionuid = -389326121047047723l; private int id; private string content; public message(int id, string content) { this.id = id; this.content = content; } public int getid() { return id; } public void setid(int id) { this.id = id; } public string getcontent() { return content; } public void setcontent(string content) { this.content = content; } }
三、redis的操作
利用redis做队列,我们采用的是redis中list的push和pop操作;
结合队列的特点:
只允许在一端插入新元素只能在队列的尾部fifo:先进先出原则
redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而redis中list药push或 pop的对象仅需要转换成byte[]即可
java采用jedis进行redis的存储和redis的连接池设置
上代码:
package utils; import redis.clients.jedis.jedis; import redis.clients.jedis.jedispool; import redis.clients.jedis.jedispoolconfig; import java.util.list; import java.util.map; import java.util.set; /** * created by kinglf on 2016/10/17. */ public class jedisutil { private static string jedis_ip; private static int jedis_port; private static string jedis_password; private static jedispool jedispool; static { //configuration自行写的配置文件解析类,继承自properties configuration conf=configuration.getinstance(); jedis_ip=conf.getstring("jedis.ip","127.0.0.1"); jedis_port=conf.getint("jedis.port",6379); jedis_password=conf.getstring("jedis.password",null); jedispoolconfig config=new jedispoolconfig(); config.setmaxactive(5000); config.setmaxidle(256); config.setmaxwait(5000l); config.settestonborrow(true); config.settestonreturn(true); config.settestwhileidle(true); config.setminevictableidletimemillis(60000l); config.settimebetweenevictionrunsmillis(3000l); config.setnumtestsperevictionrun(-1); jedispool=new jedispool(config,jedis_ip,jedis_port,60000); } /** * 获取数据 * @param key * @return */ public static string get(string key){ string value=null; jedis jedis=null; try{ jedis=jedispool.getresource(); value=jedis.get(key); }catch (exception e){ jedispool.returnbrokenresource(jedis); e.printstacktrace(); }finally { close(jedis); } return value; } private static void close(jedis jedis) { try{ jedispool.returnresource(jedis); }catch (exception e){ if(jedis.isconnected()){ jedis.quit(); jedis.disconnect(); } } } public static byte[] get(byte[] key){ byte[] value = null; jedis jedis = null; try { jedis = jedispool.getresource(); value = jedis.get(key); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return value; } public static void set(byte[] key, byte[] value) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.set(key, value); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } public static void set(byte[] key, byte[] value, int time) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.set(key, value); jedis.expire(key, time); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } public static void hset(byte[] key, byte[] field, byte[] value) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.hset(key, field, value); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } public static void hset(string key, string field, string value) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.hset(key, field, value); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } /** * 获取数据 * * @param key * @return */ public static string hget(string key, string field) { string value = null; jedis jedis = null; try { jedis = jedispool.getresource(); value = jedis.hget(key, field); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return value; } /** * 获取数据 * * @param key * @return */ public static byte[] hget(byte[] key, byte[] field) { byte[] value = null; jedis jedis = null; try { jedis = jedispool.getresource(); value = jedis.hget(key, field); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return value; } public static void hdel(byte[] key, byte[] field) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.hdel(key, field); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } /** * 存储redis队列 顺序存储 * @param key reids键名 * @param value 键值 */ public static void lpush(byte[] key, byte[] value) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.lpush(key, value); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } /** * 存储redis队列 反向存储 * @param key reids键名 * @param value 键值 */ public static void rpush(byte[] key, byte[] value) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.rpush(key, value); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } /** * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端 * @param key reids键名 * @param destination 键值 */ public static void rpoplpush(byte[] key, byte[] destination) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.rpoplpush(key, destination); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } /** * 获取队列数据 * @param key 键名 * @return */ public static list lpoplist(byte[] key) { list list = null; jedis jedis = null; try { jedis = jedispool.getresource(); list = jedis.lrange(key, 0, -1); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return list; } /** * 获取队列数据 * @param key 键名 * @return */ public static byte[] rpop(byte[] key) { byte[] bytes = null; jedis jedis = null; try { jedis = jedispool.getresource(); bytes = jedis.rpop(key); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return bytes; } public static void hmset(object key, map hash) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.hmset(key.tostring(), hash); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } public static void hmset(object key, map hash, int time) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.hmset(key.tostring(), hash); jedis.expire(key.tostring(), time); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } public static list hmget(object key, string... fields) { list result = null; jedis jedis = null; try { jedis = jedispool.getresource(); result = jedis.hmget(key.tostring(), fields); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return result; } public static set hkeys(string key) { set result = null; jedis jedis = null; try { jedis = jedispool.getresource(); result = jedis.hkeys(key); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return result; } public static list lrange(byte[] key, int from, int to) { list result = null; jedis jedis = null; try { jedis = jedispool.getresource(); result = jedis.lrange(key, from, to); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return result; } public static map hgetall(byte[] key) { map result = null; jedis jedis = null; try { jedis = jedispool.getresource(); result = jedis.hgetall(key); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return result; } public static void del(byte[] key) { jedis jedis = null; try { jedis = jedispool.getresource(); jedis.del(key); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } } public static long llen(byte[] key) { long len = 0; jedis jedis = null; try { jedis = jedispool.getresource(); jedis.llen(key); } catch (exception e) { //释放redis对象 jedispool.returnbrokenresource(jedis); e.printstacktrace(); } finally { //返还到连接池 close(jedis); } return len; } }
四、configuration主要用于读取redis的配置信息
package utils; import java.io.ioexception; import java.io.inputstream; import java.util.properties; /** * created by kinglf on 2016/10/17. */ public class configuration extends properties { private static final long serialversionuid = -2296275030489943706l; private static configuration instance = null; public static synchronized configuration getinstance() { if (instance == null) { instance = new configuration(); } return instance; } public string getproperty(string key, string defaultvalue) { string val = getproperty(key); return (val == null || val.isempty()) ? defaultvalue : val; } public string getstring(string name, string defaultvalue) { return this.getproperty(name, defaultvalue); } public int getint(string name, int defaultvalue) { string val = this.getproperty(name); return (val == null || val.isempty()) ? defaultvalue : integer.parseint(val); } public long getlong(string name, long defaultvalue) { string val = this.getproperty(name); return (val == null || val.isempty()) ? defaultvalue : integer.parseint(val); } public float getfloat(string name, float defaultvalue) { string val = this.getproperty(name); return (val == null || val.isempty()) ? defaultvalue : float.parsefloat(val); } public double getdouble(string name, double defaultvalue) { string val = this.getproperty(name); return (val == null || val.isempty()) ? defaultvalue : double.parsedouble(val); } public byte getbyte(string name, byte defaultvalue) { string val = this.getproperty(name); return (val == null || val.isempty()) ? defaultvalue : byte.parsebyte(val); } public configuration() { inputstream in = classloader.getsystemclassloader().getresourceasstream("config.xml"); try { this.loadfromxml(in); in.close(); } catch (ioexception ioe) { } } }
五、测试
import model.message; import utils.jedisutil; import utils.objectutil; import redis.clients.jedis.jedis; import java.io.ioexception; /** * created by kinglf on 2016/10/17. */ public class testredisqueue { public static byte[] rediskey = "key".getbytes(); static { try { init(); } catch (ioexception e) { e.printstacktrace(); } } private static void init() throws ioexception { for (int i = 0; i < 1000000; i++) { message message = new message(i, "这是第" + i + "个内容"); jedisutil.lpush(rediskey, objectutil.object2bytes(message)); } } public static void main(string[] args) { try { pop(); } catch (exception e) { e.printstacktrace(); } } private static void pop() throws exception { byte[] bytes = jedisutil.rpop(rediskey); message msg = (message) objectutil.bytes2object(bytes); if (msg != null) { system.out.println(msg.getid() + "----" + msg.getcontent()); } } }
每执行一次pop()方法,结果如下: <br>1----这是第1个内容 <br>2----这是第2个内容 <br>3----这是第3个内容 <br>4----这是第4个内容
总结
至此,整个redis消息队列的生产者和消费者代码已经完成
1.message 需要传送的实体类(需实现serializable接口)
2.configuration redis的配置读取类,继承自properties
3.objectutil 将对象和byte数组双向转换的工具类
4.jedis 通过消息队列的先进先出(fifo)的特点结合redis的list中的push和pop操作进行封装的工具类
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。