欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

分布式WebSocket实现(通过RabbitMQ)

程序员文章站 2022-03-19 16:03:47
分布式WebSocket实现(通过RabbitMQ)1、实现思路1、WebSocket接收用户或者接口传过来的数据时,统一发送到RabbitMQ2、每个服务器监听RabbitMQ数据并获取数据,通过判断数据中persons是否为空来判断是单发还是群发,若persons不为空有用户id,每个服务器对比自己session中是否有这个用户id,若没有则不操作,若有则推送给该用户消息{ "persons":["123","121"], "msg":""}3、使用websocket在线测试 (we...

分布式WebSocket实现(通过RabbitMQ)

1、实现思路

1、WebSocket接收用户或者接口传过来的数据时,统一发送到RabbitMQ

2、每个服务器监听RabbitMQ数据并获取数据,通过判断数据中persons是否为空来判断是单发还是群发,若persons不为空有用户id,每个服务器对比自己session中是否有这个用户id,若没有则不操作,若有则推送给该用户消息

{
 "persons":["123","121"],
 "msg":""
}

3、使用websocket在线测试 (websocket-test.com)测试时,可以发送JSON数据来指定发给那个用户,例子如下:

{
 "persons":["123","121"],
 "msg":""
}

若persons为空则为群发

2、效果截图

1、服务器单发消息(使用PostMan模拟)

分布式WebSocket实现(通过RabbitMQ)

分布式WebSocket实现(通过RabbitMQ)

2、服务器群发

分布式WebSocket实现(通过RabbitMQ)
分布式WebSocket实现(通过RabbitMQ)

3、服务器多发(用户121,123,124) 发送给121,123消息

分布式WebSocket实现(通过RabbitMQ)
分布式WebSocket实现(通过RabbitMQ)

分布式WebSocket实现(通过RabbitMQ)

4、用户群发消息

分布式WebSocket实现(通过RabbitMQ)

5、用户单发

分布式WebSocket实现(通过RabbitMQ)

6、用户多发同理,在persons中添加用户即可

7、定时推送,上面截图中已经有体现

3、代码实现

1、导入依赖

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.75</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

2、编写配置文件

spring:
  rabbitmq:
    port: 5672
    username: test
    password: test
    virtual-host: /test    

3、编写配置类

WebSocketConfig RabbitConfig

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Bean
    public TaskScheduler taskScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        return taskScheduler;
    }
}
@Configuration
public class RabbitConfig {
    public static final String FANOUT_EXCHANGE ="FanoutExchange";

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
    }
    
}

4、枚举类

@AllArgsConstructor
public enum ResultEnum {
    /**
     * 发送成功
     */
    SEND_SUCCESS(1,"发送成功"),
    /**
     *发送用户未指定,进行群发
     */
    PERSON_NULL_GROUP_SEND(3,"发送用户未指定,进行群发"),
    /**
     * "消息为空,不进行发送"
     */
    MSG_NULL_NOT_SEND(4,"消息为空,不进行发送"),
    /**
     * 发送的消息为空,不进行发送
     */

    SEND_NULL(5,"发送的消息为空,不进行发送");

    /**
     * 编码
     */
    private  int code;
    /**
     * 内容
     */
    private  String title;

    /**
     * 获取编码
     * @return int
     */
    public int getCode() {
        return code;
    }

    /**
     * 获取标题
     * @return String
     */
    public String getTitle() {
        return title;
    }
}

5、创建传输对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgDTO implements Serializable {
    /**
     * 用户组
     */
    private List<String> persons;
    /**
     * 消息
     */
    private String msg;
}

6、WebSocket服务端

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketEndPoint {

    /**
     * 用于存放所有在线客户端
     */
    private static final Map<String, Session> SESSION_MAP = new ConcurrentHashMap<>();

    private static AmqpTemplate amqpTemplate;

    @Autowired
    public synchronized void setAmqpTemplate(AmqpTemplate amqpTemplate) {
        WebSocketEndPoint.amqpTemplate = amqpTemplate;
    }

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    public static Map<String, Session> getSessionMap(){
        return SESSION_MAP;
    }
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        log.info("有新的客户端上线: {}", session.getId());
        SESSION_MAP.put(userId, session);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        String sessionId = session.getId();
        log.info("有客户端离线: {}", sessionId);
        SESSION_MAP.remove(sessionId);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message)  {
        JSONObject object  = JSON.parseObject(message);
        MsgDTO msgDTO = JSON.toJavaObject(object, MsgDTO.class);
        if (ObjectUtils.isEmpty(msgDTO) || !StringUtils.hasText(msgDTO.getMsg())){
            return;
        }
        String msg = JSON.toJSONString(msgDTO);
        log.info("消息:{}",msg);
        msg = msg.replace("\\t","").replace("\\n","");
        amqpTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"",msg);
        log.info("{}","消息为空,不进行发送");
    }

    /**
     * @param session 消息
     * @param error 异常
     */
    @OnError
    public void onError(Session session, Throwable error) {
        String sessionId = session.getId();
        if (SESSION_MAP.get(sessionId) != null) {
            log.info("发生了错误,移除客户端: {}", sessionId);
            SESSION_MAP.remove(sessionId);
        }
        log.error("发生异常:",error);
    }

    /**
     * 指定对象发送消息
     *
     * @param message 消息对象
     */
     public static void sendToUser(List<String> persons, String message) {
         persons.forEach(userId -> send(userId,message));
     }

    /**
     * 单发 1对1
     * @param userId  用户id
     * @param message 消息
     */
     public  static void send(String userId,String message) {
         log.info("{}",userId);
         if (SESSION_MAP.containsKey(userId)) {
             SESSION_MAP.get(userId).getAsyncRemote().sendText(message);
             log.info("消息发送成功");
         }
         log.info("本服务器中无此用户session");
     }
    /**
     * 群发
     * @param message 消息
     */
    public static void batchSend(String message) {
        for(String sessionId : SESSION_MAP.keySet())
        {
            SESSION_MAP.get(sessionId).getAsyncRemote().sendText(message);
        }
    }
}

7、定时推送

@Component
@EnableScheduling
@Slf4j
public class ScheduleTask {
    @Scheduled(cron = "0/10 * *  * * ?")
    public void sendMessage(){
        String message = "定时报时间,现在:"+new Date();
        log.info("{}","**********定时任务执行***********");
        Map<String, Session> map = WebSocketEndPoint.getSessionMap();
        for (String userId : map.keySet()) {
            map.get(userId).getAsyncRemote().sendText(message);
        }
    }
}

8、发送消息服

SendMessageService SendMessageServiceImpl

public interface SendMessageService {

    /**
     *  发送消息到RabbitMQ
     * @param msgDTO 对象
     * @return String
     */
    String send(MsgDTO msgDTO);

    /**
     * 发送消息给用户
     * @param msgDTO 对象
     */
    void sendToUser(MsgDTO msgDTO);
}
@Service
@Slf4j
public class SendMessageServiceImpl implements SendMessageService {


    final
    AmqpTemplate amqpTemplate;

    public SendMessageServiceImpl(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }


    @Override
    public String send(MsgDTO msgDTO) {
        String message = msgDTO.getMsg();
        if (StringUtils.hasText(message)) {
            String msg = JSON.toJSONString(msgDTO);
            msg = msg.replace("\\t","").replace("\\n","");
            amqpTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"",msg);
            return ResultEnum.SEND_SUCCESS.getTitle();
        }
        return ResultEnum.SEND_NULL.getTitle();
    }
    /**
     *
     * @param msgDTO  实体类
     */
    @Override
    public void sendToUser(MsgDTO msgDTO) {
        List<String> persons = msgDTO.getPersons();
        String message = msgDTO.getMsg();
        if (!StringUtils.hasText(message)) {
            return;
        }
        if (ObjectUtils.isEmpty(persons)) {
            WebSocketEndPoint.batchSend(message);
            return;
        }
        WebSocketEndPoint.sendToUser(persons,message);
    }
}

9、RabbitMQ监听

@Slf4j
@Service
public class RabbitMsgListener {

    final SendMessageServiceImpl sendMessageService;

    public RabbitMsgListener(SendMessageServiceImpl sendMessageService) {
        this.sendMessageService = sendMessageService;
    }

    @RabbitListener(bindings={
            //@QueueBinding注解要完成队列和交换机的
            @QueueBinding(
                    //@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
                    value = @Queue(),
                    //创建一个交换机
                    exchange=@Exchange(name= RabbitConfig.FANOUT_EXCHANGE,type="fanout")
            )
    })
    public void fanoutReceive(String msg) {
        JSONObject object  = JSON.parseObject(msg);
        MsgDTO msgDTO = JSON.toJavaObject(object, MsgDTO.class);
        sendMessageService.sendToUser(msgDTO);
        log.info("{}","监听并推送消息");
    }
}

10、控制层

@RestController
@RequestMapping("/send")
public class PushMessageController {

    private final SendMessageService sendService;

    public PushMessageController(SendMessageService sendService) {
        this.sendService = sendService;
    }
    /**
     *
     * @param map 用户+消息  用户为空则为群发
     * @return String
     */
    @GetMapping("/sendToUser")
    public String sendToUser(@RequestBody Map<String, Object> map) {
        JSONObject param =new JSONObject(map);
        MsgDTO msgDTO = param.toJavaObject(MsgDTO.class);
        return sendService.send(msgDTO);
    }
}

本文地址:https://blog.csdn.net/qq_26018075/article/details/111847137