欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分布式WebSocket架构

程序员文章站 2024-03-23 10:39:52
...

技术解决方案

  • 利用Redis的Pub/Sub
  • 大致流程
ClientServiceRedis订阅Topic发起WebSocket建立WebSocket连接,把用户相对应的Session存至内存。下单发送对应Topic的消息下单成功推送订阅此Top的消息根据消息体内容查找对应Session向其他用户推送下单成功提醒ClientServiceRedis
  • 具体代码Demo如下
@Configuration
public class RedisObserverConfig {

    public static final String TOPIC_ORDER_FOOD = "websocket:order_food";

    @Autowired
    private JedisConnectionFactory jedisConnectionFactory;

    @Bean
    RedisMessageListenerContainer redisContainer() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(jedisConnectionFactory);
        container.addMessageListener(messageListener(), orderFoodTopic());
        container.setTaskExecutor(Executors.newFixedThreadPool(4));
        return container;
    }

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(orderFoodListener());
    }

    @Bean
    ChannelTopic orderFoodTopic() {
        return new ChannelTopic(TOPIC_ORDER_FOOD);
    }

    @Bean
    OrderDishesListener orderFoodListener() {
        return new OrderDishesListener();
    }

}
<bean id="jedisConnectionFactory"
		class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
		destroy-method="destroy">
		<property name="hostName" value="${redis.host}" />
		<property name="port" value="${redis.port}" />
		<property name="timeout" value="${redis.timeout}" />
		<property name="database" value="${redis.database}" />
		<property name="password" value="${redis.password}" />
		<property name="usePool" value="true" />
		<property name="poolConfig" ref="jedisPoolConfig" />
	</bean>
  • SendMessage
@Service
public class OrderFoodWebSocketService {

	@Autowired
	private RedisTemplate redisTemplate;

	public void sendMessage() {
		redisTemplate.convertAndSend(RedisObserverConfig.TOPIC_ORDER_FOOD, "hello");
	}
}
  • MessageListener
public class OrderDishesListener implements MessageListener {

    @Autowired
    private RedisSerializer<Object> jsonRedisSerializer;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        Object object = jsonRedisSerializer.deserialize(message.getBody());
        System.out.println("orderDishesListener message body = " + JSON.toJSONString(object));
    }
}

注意点

nginx支持WebSocket需要添加以下配置

location /wsapp/ {
    proxy_pass http://wsbackend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "Upgrade";
}

nginx会把Http请求升级至WebSocket

参考资料

相关标签: WebSocket