欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

TCP通信 服务端客户端实现

程序员文章站 2022-06-05 20:58:39
...

Socket编程

服务端TCPServer

服务端,监听连接,提交线程池 并发处理每个连接

package communication.tcp;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * @author wangying
 * @version $Id: TcpServer.java, v 0.1 2019-4-19 下午12:09:31 wangying Exp $
 */
public class TCPServer extends Thread {

    private static Logger logger = LoggerFactory.getLogger(TCPServer.class);

    private ServerSocket server = null;

    private volatile boolean closing = false;
    // 线程池用于 处理每个客户端连接
    private static final ExecutorService pool = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        pool.submit(new TCPServer());
    }

    @Override
    public void run() {
        try {
            if (!isBound()) {
                tryToBind();
            }
        } catch (IOException e) {
            return;
        }

        while (!closing) {
            try {
                Socket client = server.accept();
                if (logger.isDebugEnabled()) {
                    logger.debug("server recieve message from " + client.getInetAddress());
                }
                // 提交监听器线程 来处理客户端的连接
                TcpServerListener listener = new TcpServerListener(client);
                // listener.setDaemon(true);
                pool.submit(listener);
            } catch (IOException e) {
                if (!closing)
                    logger.warn("control socket error: ", e);
                else {
                    logger.warn("shutting down listen thread due to shutdown() call");
                    break;
                }
            }
        }// 结束循环

    }

    public boolean isBound() {
        return server != null && server.isBound();
    }

    public void tryToBind() throws IOException {
        // ServerSocket 绑定服务端口
        server = new ServerSocket(8200);

        server.setReuseAddress(true);

        if (server.isBound()) {
            logger.info("server socket bound to " + server.getLocalPort());
        } else {
            logger.info("server socket isn't bound");
        }
    }

    /**
     * 关闭tcp ServerSocket
     */
    public void shutdown() {
        closing = true;
        try {
            if (server != null) { // 防止NPE
                server.close();
            }
            server = null;
        } catch (IOException e) {
            logger.error(e + "shutdown tcp server failed");
        }
    }

}

连接处理TcpServerListener

对于每个连接的处理,此处为最简单的echo 回声响应客户端

package communication.tcp;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.Socket;
import java.net.SocketException;

/**
 * 
 * @author wangying
 * @version $Id: TcpServerListener.java, v 0.1 2019-4-19 下午5:23:18 wangying Exp $
 */
public class TcpServerListener extends Thread {

    private static Logger logger = LoggerFactory.getLogger(TcpServerListener.class);

    private Socket client;

    private static String CHARSET = "UTF-8";


    TcpServerListener(Socket client) {
        this.client = client;
        try {
            client.setSoTimeout(1000);
        } catch (SocketException e) {
            logger.warn("Error while setting soTimeout to " + 1000);
            e.printStackTrace();
        }
        this.setName("TcpServerListener-" + client.getRemoteSocketAddress());
    }
    @Override
    public void run() {
        BufferedReader reader = null;
        PrintWriter writer = null;
        try {
            // 双工 读写
            reader = new BufferedReader(new InputStreamReader(client.getInputStream(), CHARSET));
            String reqMsg = reader.readLine();
            logger.info("server recive client message:" + reqMsg);
            writer = new PrintWriter(new OutputStreamWriter(client.getOutputStream(), CHARSET));
            handleRecvMessage(reqMsg, writer);
        } catch (IOException e) {
            e.printStackTrace();
            logger.warn("a control connection broke", e);
        } finally {
            IOUtils.closeQuietly(writer);
            IOUtils.closeQuietly(reader);
            IOUtils.closeQuietly(client);
        }
    }

    private void handleRecvMessage(String msg, Writer writer) {
        String respMsg = "";
        // 响应
        respMsg = "i have received " + msg;
        logger.info("server response message:" + respMsg);
        // return execute result to collector
        try {
            writer.write(respMsg);
            writer.flush();
        } catch (IOException e) {
            logger.error(e + "write response to collector error!");
        }
    }
}

客户端TCPClient

package communication.tcp;


import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.Socket;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 * @author wangying
 * @version $Id: TcpClient.java, v 0.1 2014年12月4日 下午7:57:56 songfei01 Exp $
 */
public class TcpClient extends Thread{
    private static final Logger logger = LoggerFactory.getLogger(TcpClient.class);
    
    private static final ExecutorService pool = Executors.newFixedThreadPool(10);
    /**
     * 连接超时,默认5000ms
     */
    private static final int connectTimeout = 5000;

    private static final int DEFAULT_READE_TIMEOUT = 10000;

    public static void main(String[] args) {
        for(int i=0;i<10;i++) {
            pool.submit(new TcpClient());
        }

    }
    @Override
    public void run() {
        String host = "localhost";
        String port = "8200";
        while (true) {
            String request = "我是" + Thread.currentThread().getName() +"客户端";
            String resp = new TcpClient().post(host, port, request, "UTF-8");
            logger.info("收到响应:" + resp);
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public String post(String host, String port, String request, String charset) {

        String serviceResponse = null;
        try {
            // 发送请求,拿到响应
            serviceResponse = send(request, charset, host, Integer.valueOf(port)).get();
        } catch (Exception e) {
            return new String( "发送失败:" + host);
        }
        return serviceResponse;

    }

    // 发送数据
    public Future<String> send(String requestMsg, String charset, String ip, int port) {
        Future<String> future = pool.submit(new SendCall(requestMsg, charset, ip, port));
        return future;
    }

    public String sendMessage(String msg, String charset, String host, int port) {
        BufferedReader reader = null;
        BufferedWriter writer = null;
        Socket socket = null;
        try {
            socket = new Socket(host, port);
            // 设置读取超时时间
            socket.setSoTimeout(DEFAULT_READE_TIMEOUT);
            // 写入请求
            writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), charset));
            logger.info("send data to {}:{}", new Object[] { host, port });
            logger.info("发送 REQUEST:{}", msg);
            writer.write(msg + "\n");
            writer.flush();
            // 读取响应
            reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), charset));
            String result = reader.readLine();
            return result;

        } catch (IOException e) {
            logger.error(e + "发送消息失败");
        } finally {
            // 静默关闭
            IOUtils.closeQuietly(reader);
            IOUtils.closeQuietly(writer);
            IOUtils.closeQuietly(socket);
        }
        return null;
    }

    /**
     * 发送任务
     */
    private class SendCall implements Callable<String> {
        private final String requestMsg;
        private final String charset;
        private final String ip;
        private final int port;

        /**
         *
         */
        public SendCall(String requestMsg, String charset, String ip, int port) {
            super();
            this.requestMsg = requestMsg;
            this.charset = charset;
            this.ip = ip;
            this.port = port;
        }

        /**
         * 客户端 发送任务
         * @see Callable#call()
         */
        @Override
        public String call() throws Exception {
            String resp;
            try {
                resp = sendMessage(requestMsg, charset, ip, port);

            } catch (Exception e) {
                String failMsg = "发送失败,ip:" + this.ip + "port:" + this.port;
                return new String(failMsg);
            }
            return resp;
        }
    }


}