欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java Socket学习笔记(三)- TCP服务端线程池

程序员文章站 2022-04-20 10:58:38
...

一、服务端回传服务类:

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;

public class EchoProtocol implements Runnable {
    private static final int BUFSIZE = 32; // Size (in bytes) of I/O buffer
    private Socket clientSocket; // Socket connect to client
    private Logger logger; // Server logger

    public EchoProtocol(Socket clientSocket, Logger logger) {
        this.clientSocket = clientSocket;
        this.logger = logger;
    }

    public static void handleEchoClient(Socket clientSocket, Logger logger) {
        try {
            // Get the input and output I/O streams from socket
            InputStream in = clientSocket.getInputStream();
            OutputStream out = clientSocket.getOutputStream();

            int recvMsgSize; // Size of received message
            int totalBytesEchoed = 0; // Bytes received from client
            byte[] echoBuffer = new byte[BUFSIZE]; // Receive Buffer
            // Receive until client closes connection, indicated by -1
            while ((recvMsgSize = in.read(echoBuffer)) != -1) {
                out.write(echoBuffer, 0, recvMsgSize);
                totalBytesEchoed += recvMsgSize;
            }

            logger.info("Client " + clientSocket.getRemoteSocketAddress() + ", echoed " + totalBytesEchoed + " bytes.");
            
        } catch (IOException ex) {
            logger.log(Level.WARNING, "Exception in echo protocol", ex);
        } finally {
            try {
                clientSocket.close();
            } catch (IOException e) {
            }
        }
    }

    public void run() {
        handleEchoClient(this.clientSocket, this.logger);
    }
}

 

 

二、每个客户端请求都新启一个线程的Tcp服务端:

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.logging.Logger;

public class TCPEchoServerThread {

    public static void main(String[] args) throws IOException {
        // Create a server socket to accept client connection requests
        ServerSocket servSock = new ServerSocket(5500);

        Logger logger = Logger.getLogger("practical");

        // Run forever, accepting and spawning a thread for each connection
        while (true) {
            Socket clntSock = servSock.accept(); // Block waiting for connection
            // Spawn thread to handle new connection
            Thread thread = new Thread(new EchoProtocol(clntSock, logger));
            thread.start();
            logger.info("Created and started Thread " + thread.getName());
        }
        /* NOT REACHED */
    }
}

 

三、固定线程数的Tcp服务端:

 

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TCPEchoServerPool {
    public static void main(String[] args) throws IOException {
        int threadPoolSize = 3; // Fixed ThreadPoolSize

        final ServerSocket servSock = new ServerSocket(5500);
        final Logger logger = Logger.getLogger("practical");

        // Spawn a fixed number of threads to service clients
        for (int i = 0; i < threadPoolSize; i++) {
            Thread thread = new Thread() {
                public void run() {
                    while (true) {
                        try {
                            Socket clntSock = servSock.accept(); // Wait for a connection
                            EchoProtocol.handleEchoClient(clntSock, logger); // Handle it
                        } catch (IOException ex) {
                            logger.log(Level.WARNING, "Client accept failed", ex);
                        }
                    }
                }
            };
            thread.start();
            logger.info("Created and started Thread = " + thread.getName());
        }
    }
}

 

 

四、使用线程池(使用Spring的线程次会有队列、最大线程数、最小线程数和超时时间的概念),

1.线程池工具类:

import java.util.concurrent.*;

/**
 * 任务执行者
 * 
 * @author Watson Xu
 * @since 1.0.0 <p>2013-6-8 上午10:33:09</p>
 */
public class ThreadPoolTaskExecutor {

    private ThreadPoolTaskExecutor() {

    }

    private static ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
        int count;

        /* 执行器会在需要自行任务而线程池中没有线程的时候来调用该程序。对于callable类型的调用通过封装以后转化为runnable */
        public Thread newThread(Runnable r) {
            count++;
            Thread invokeThread = new Thread(r);
            invokeThread.setName("Courser Thread-" + count);
            invokeThread.setDaemon(false);// //????????????

            return invokeThread;
        }
    });

    public static void invoke(Runnable task, TimeUnit unit, long timeout) throws TimeoutException, RuntimeException {
        invoke(task, null, unit, timeout);
    }

    public static <T> T invoke(Runnable task, T result, TimeUnit unit, long timeout) throws TimeoutException,
            RuntimeException {
        Future<T> future = executor.submit(task, result);
        T t = null;
        try {
            t = future.get(timeout, unit);
        } catch (TimeoutException e) {
            throw new TimeoutException("Thread invoke timeout ...");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return t;
    }

    public static <T> T invoke(Callable<T> task, TimeUnit unit, long timeout) throws TimeoutException, RuntimeException {
        // 这里将任务提交给执行器,任务已经启动,这里是异步的。
        Future<T> future = executor.submit(task);
        // System.out.println("Task aready in thread");
        T t = null;
        try {
            /*
             * 这里的操作是确认任务是否已经完成,有了这个操作以后 
             * 1)对invoke()的调用线程变成了等待任务完成状态
             * 2)主线程可以接收子线程的处理结果
             */
            t = future.get(timeout, unit);
        } catch (TimeoutException e) {
            throw new TimeoutException("Thread invoke timeout ...");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        return t;
    }
}

 

2.具有伸缩性的Tcp服务端:

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import demo.callable.ThreadPoolTaskExecutor;


public class TCPEchoServerExecutor {

    public static void main(String[] args) throws IOException {
        // Create a server socket to accept client connection requests
        ServerSocket servSock = new ServerSocket(5500);

        Logger logger = Logger.getLogger("practical");
        
        // Run forever, accepting and spawning threads to service each connection
        while (true) {
            Socket clntSock = servSock.accept(); // Block waiting for connection
            //executorService.submit(new EchoProtocol(clntSock, logger));
            try {
                ThreadPoolTaskExecutor.invoke(new EchoProtocol(clntSock, logger), TimeUnit.SECONDS, 3);
            } catch (Exception e) {
            } 
            //service.execute(new TimelimitEchoProtocol(clntSock, logger));
        }
        /* NOT REACHED */
    }
}

 

 参考:

1.《Java TCP/IP Socket编程(原书第2版)》