基于spring-redis发布订阅模式的实现
程序员文章站
2022-03-12 09:17:45
...
redis配置:
监听器的实现:
消息发送接口的实现:
测试调用的方法:
注意:1、如果有多个项目同时使用此配置,只需要保留一个项目配置文件有关注项目;2、此配置没有考虑分布式部署的环境,如果要考虑从redis list和分布式锁的方向考虑。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:redis="http://www.springframework.org/schema/redis" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd" default-autowire="byName"> <context:property-placeholder location="classpath:redis.properties" /> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxIdle" value="${redis.maxIdle}" /> <property name="maxTotal" value="${redis.maxTotal}" /> <property name="maxWaitMillis" value="${redis.maxWaitMillis}" /> <property name="testOnBorrow" value="${redis.testOnBorrow}" /> </bean> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy"> <property name="poolConfig" ref="jedisPoolConfig"></property> <property name="hostName" value="${redis.host}"></property> <property name="port" value="${redis.port}"></property> <property name="password" value="${redis.pass}"></property> </bean> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="jedisConnectionFactory"></property> <property name="defaultSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> </property> </bean> <bean id="registerMessageListener" class="com.gc.biz.cache.listener.RegisterMessageListener"> <property name="redisTemplate" ref="redisTemplate"></property> </bean> <bean id="priDocMessageListener" class="com.gc.biz.cache.listener.PriDocRegActMsgListener"> <property name="redisTemplate" ref="redisTemplate"></property> </bean> <bean id="redisDAO" class="com.gc.biz.cache.impl.MessageDaoImpl"> <property name="redisTemplate" ref="redisTemplate" /> </bean> <bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy"> <property name="connectionFactory" ref="jedisConnectionFactory" /> <property name="taskExecutor"> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> <property name="poolSize" value="3"></property> </bean> </property> <property name="messageListeners"> <map> <entry key-ref="registerMessageListener"> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="coupon|redenvelop|notify|points" /> </bean> </entry> <entry key-ref="priDocMessageListener"> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="YZM|BG" /> </bean> </entry> </map> </property> </bean> <bean id="springContext" class="com.gc.biz.cache.util.SpringContextHolder" /> <bean id="doctorDAO" class="com.gc.biz.cache.impl.DoctorDAOImpl" > <property name="redisTemplate" ref="redisTemplate" /> </bean> <bean id="remindDAO" class="com.gc.biz.cache.impl.RemindDAOImpl" /> <bean id="userDAO" class="com.gc.biz.cache.impl.UserDAOImpl" /> <bean id="userDataDAO" class="com.gc.biz.cache.impl.UserDataDAOImpl" /> </beans>
监听器的实现:
package com.gc.biz.cache.listener; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import com.gc.apps.jsk.coupon.service.CouponService; import com.gc.apps.jsk.coupon.service.impl.CouponServiceImpl; import com.gc.apps.jsk.invitationcode.service.InvitationService; import com.gc.apps.jsk.invitationcode.service.impl.InvitationServiceImpl; import com.gc.apps.jsk.login.service.RegisterService; import com.gc.apps.jsk.login.service.impl.RegisterServiceImpl; import com.gc.apps.jsk.membership.service.MemberShipService; import com.gc.apps.jsk.membership.service.impl.MemberShipServiceImpl; import com.gc.biz.member.dbobj.MemberInfo; import com.gc.common.util.StrUtil; import com.gc.frame.core.db.DBTransaction; import com.gc.frame.core.misc.StringUtil; import com.google.gson.Gson; public class RegisterMessageListener implements MessageListener { private RedisTemplate<Serializable, Serializable> redisTemplate; private static Logger logger = Logger.getLogger(RegisterMessageListener.class); public void setRedisTemplate(RedisTemplate<Serializable, Serializable> redisTemplate) { this.redisTemplate = redisTemplate; } @Override public void onMessage(Message message, byte[] pattern) { byte[] body = message.getBody();// 请使用valueSerializer byte[] channel = message.getChannel(); // 请参考配置文件,本例中key,value的序列化方式均为string。 // 其中key必须为stringSerializer。和redisTemplate.convertAndSend对应 String msgContent = (String) redisTemplate.getValueSerializer().deserialize(body); String topic = (String) redisTemplate.getStringSerializer().deserialize(channel); System.out.println(topic + ":" + msgContent); Map<String, String> map = new Gson().fromJson(msgContent, Map.class); String from = map.get("from"); if ("wx".equals(from)) { doRegisterMsg_wx(topic, msgContent); } else if ("app".equals(from)) { doRegisterMsg(topic, msgContent); } }
消息发送接口的实现:
package com.gc.biz.cache.impl; import java.io.Serializable; import org.springframework.data.redis.core.RedisTemplate; import com.gc.biz.cache.dao.MessageDao; public class MessageDaoImpl implements MessageDao{ private RedisTemplate<String , Object> redisTemplate = null; public MessageDaoImpl() { } @Override public void sendMessage(String channel, Serializable message) { redisTemplate.convertAndSend(channel, message); } public RedisTemplate<String, Object> getRedisTemplate() { return redisTemplate; } public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) { this.redisTemplate = redisTemplate; } }
测试调用的方法:
MessageDao dao = SpringContextHolder.getBean("redisDAO"); Map<String,String> map = new HashMap<String,String>(); map.put("1", "11111"); map.put("2", "22222"); dao.sendMessage("coupon", new Gson().toJson(map)); dao.sendMessage("redenvelop", new Gson().toJson(map)); dao.sendMessage("notify", new Gson().toJson(map)); map.put("UserBagID", "1"); map.put("CreateDate", "2016-06-01 16:51:35"); dao.sendMessage("iphone|xiaomi", new Gson().toJson(map));
注意:1、如果有多个项目同时使用此配置,只需要保留一个项目配置文件有关注项目;2、此配置没有考虑分布式部署的环境,如果要考虑从redis list和分布式锁的方向考虑。