欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springboot集成socket通信并采用线程池处理多客户端请求

程序员文章站 2024-03-22 23:27:04
...

 

       首先推荐一篇不错的文章  https://www.cnblogs.com/yiwangzhibujian/p/7107785.html  这篇文章已经非常清晰的讲述了socket通信的基础和应用,本文在此基础上展示socket通信应用于springboot。

       本文默认读者已经对socket通信有所了解,建立在此基础上,开启springboot集成socket之旅!

0 背景

      HTTP连接是常用的C-S通信方式,按照“请求—响应”的方式,即客户端先向服务器发出请求,服务器端再回复数据。
      但某些场景下,需要服务器主动向客户端推送数据,保持客户端与服务器数据的实时与同步,这就用到socket接口来通信。

 

      以下所示代码实现功能为:客户端向服务器发送数据,服务器接收数据进行相应处理,并返回结果给客户端的过程。因为有多客户端请求,因此本文用到线程池对多个请求进行处理。因此,这里建议了解一下线程池的使用,包括:定义线程池的几种方法及区别、线程池如何关闭、以及线程池的拒绝策略。

1 新建SocketServer类

package io.transwarp.esb.socket;

import io.transwarp.esb.service.DataConversionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author 
 * @description 服务端socket
 * @date 2019/7/30 14:57
 */
@Service
public class SocketServer {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
//    @Value("${port}")
    private Integer port;
    private boolean started;
    private ServerSocket serverSocket;
    //使用多线程,需要线程池,防止并发过高时创建过多线程耗尽资源
    private ExecutorService threadPool = Executors.newCachedThreadPool();
    
    public void start(){
        start(8090);
    }
    private void start(Integer port){
        try {
            serverSocket =  new ServerSocket(port == null ? this.port : port);
            started = true;
            logger.info("Socket服务已启动,占用端口: {}", serverSocket.getLocalPort());
        }catch (IOException e){
            logger.error("端口异常信息",e);
            System.exit(0);
        }
        while (started){
            try {
                Socket socket = serverSocket.accept();
                Runnable runnable = () -> {
                    try {
                        //接收客户端数据
                        StringBuilder xmlString = onMessage(socket);
                        //处理逻辑:xmlStringToEsb为处理结果 
                        //返回给客户端
                        sendMessage(socket,xmlStringToEsb);
                        socket.close();
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                };
                //接收线程返回结果
                Future future = threadPool.submit(runnable);
                logger.info(future.isDone()+"--------");
            }catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private static StringBuilder onMessage(Socket socket){
        byte[] bytes = new byte[1024];
        int len;
        try{
            // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
            InputStream inputStream = socket.getInputStream();
            StringBuilder sb = new StringBuilder();
            while ((len = inputStream.read(bytes)) != -1) {
                // 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
                sb.append(new String(bytes, 0, len, "UTF-8"));
            }
            //此处,需要关闭服务器的输出流,但不能使用inputStream.close().
            socket.shutdownInput();
            return sb;
        }catch (IOException e){
            e.printStackTrace();
        }
        return null;
    }
    private static void sendMessage(Socket socket,String message){
        try {
            //向客户端返回数据
            OutputStream outputStream = socket.getOutputStream();
            //首先需要计算得知消息的长度
            byte[] sendBytes = message.getBytes("UTF-8");
            //然后将消息的长度优先发送出去
            outputStream.write(sendBytes.length >> 8);
            outputStream.write(sendBytes.length);
            //然后将消息再次发送出去
            outputStream.write(sendBytes);
            outputStream.flush();
            outputStream.close();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

         这里注意:inputStream.close()表示关闭了输出流,相应的Socket也将关闭,和直接关闭Socket一个性质;而socket.shutdownInput()表示告知客户端这边已经写完了,那么客户端收到消息后,就能知道已经读取完消息,如果客户端有要返回给客户的消息那么就可以通过服务端的输出流发送给客户端,如果没有,直接关闭Socket。

 

        以下代码表示,采用长度+类型+数据模式的传输方式时的发送和接收数据方式;使用场景:客户端和服务器进行多次数据传输时*/

    //发送数据
    outputStream.write(sendBytes.length >> 8);
    outputStream.write(sendBytes.length);
    //然后将消息再次发送出去
    outputStream.write(sendBytes);


     //接收数据
     InputStream inputStream = socket.getInputStream();
     byte[] bytes;
     // 因为可以复用Socket且能判断长度,所以可以一个Socket用到底
     while (true) {
      // 首先读取两个字节表示的长度
      int first = inputStream.read();
      //如果读取的值为-1 说明到了流的末尾,Socket已经被关闭了,此时将不能再去读取
      if(first==-1){
        break;
      }
      int second = inputStream.read();
      int length = (first << 8) + second;
      // 然后构造一个指定长的byte数组
      bytes = new byte[length];
      // 然后读取指定长度的消息即可
      inputStream.read(bytes);
      System.out.println("get message from client: " + new String(bytes, "UTF-8"));

 

2 新建SocketClient类进行测试

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
 * @author wangyan
 * @description
 * @date 2019/7/30 18:56
 */
public class SocketClient {
    public static void main(String[] args) throws IOException {
        socketClient2F();
    }
    public void socketClient2F() throws IOException{
        final Logger logger = LoggerFactory.getLogger(SocketClient.class);

        String host = "127.0.0.1";
        int port = 8090;
        Socket socket = new Socket(host, port);
        // 建立连接后获得输出流
        OutputStream outputStream = socket.getOutputStream();
        String message = "<a>12</a>";
        //首先需要计算得知消息的长度
        byte[] sendBytes = message.getBytes("UTF-8");
        //然后将消息的长度优先发送出去
        outputStream.write(sendBytes.length >>8);
        outputStream.write(sendBytes.length);
        //然后将消息再次发送出去
        outputStream.write(sendBytes);
        outputStream.flush();
        socket.shutdownOutput();


        InputStream inputStream = socket.getInputStream();
        byte[] bytes = new byte[1024];
        int len;
        StringBuilder sb = new StringBuilder();
        while ((len = inputStream.read(bytes)) != -1) {
            // 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
            sb.append(new String(bytes, 0, len, "UTF-8"));
        }
        System.out.println(sb);
        outputStream.close();
        inputStream.close();
        socket.close();
    }
}

 

3 启动类里添加使得SocketServer启动的方法

import io.transwarp.esb.socket.SocketServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

@SpringBootApplication
public class EsbApplication {
	public static void main(String[] args) {
		ApplicationContext applicationContext = SpringApplication.run(EsbApplication.class, args);
		applicationContext.getBean(SocketServer.class).start();
	}
}

     

        以上便可以通过启动springboot项目,启动SocketServer,同时使用SocketClient类测试!