欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java Socket编程实例(三)- TCP服务端线程池

程序员文章站 2024-03-12 13:56:44
一、服务端回传服务类: import java.io.ioexception; import java.io.inputstream; import ja...

一、服务端回传服务类:

import java.io.ioexception; 
import java.io.inputstream; 
import java.io.outputstream; 
import java.net.socket; 
import java.util.logging.level; 
import java.util.logging.logger; 
 
public class echoprotocol implements runnable { 
  private static final int bufsize = 32; // size (in bytes) of i/o buffer 
  private socket clientsocket; // socket connect to client 
  private logger logger; // server logger 
 
  public echoprotocol(socket clientsocket, logger logger) { 
    this.clientsocket = clientsocket; 
    this.logger = logger; 
  } 
 
  public static void handleechoclient(socket clientsocket, logger logger) { 
    try { 
      // get the input and output i/o streams from socket 
      inputstream in = clientsocket.getinputstream(); 
      outputstream out = clientsocket.getoutputstream(); 
 
      int recvmsgsize; // size of received message 
      int totalbytesechoed = 0; // bytes received from client 
      byte[] echobuffer = new byte[bufsize]; // receive buffer 
      // receive until client closes connection, indicated by -1 
      while ((recvmsgsize = in.read(echobuffer)) != -1) { 
        out.write(echobuffer, 0, recvmsgsize); 
        totalbytesechoed += recvmsgsize; 
      } 
 
      logger.info("client " + clientsocket.getremotesocketaddress() + ", echoed " + totalbytesechoed + " bytes."); 
       
    } catch (ioexception ex) { 
      logger.log(level.warning, "exception in echo protocol", ex); 
    } finally { 
      try { 
        clientsocket.close(); 
      } catch (ioexception e) { 
      } 
    } 
  } 
 
  public void run() { 
    handleechoclient(this.clientsocket, this.logger); 
  } 
} 

二、每个客户端请求都新启一个线程的tcp服务端:

import java.io.ioexception; 
import java.net.serversocket; 
import java.net.socket; 
import java.util.logging.logger; 
 
public class tcpechoserverthread { 
 
  public static void main(string[] args) throws ioexception { 
    // create a server socket to accept client connection requests 
    serversocket servsock = new serversocket(5500); 
 
    logger logger = logger.getlogger("practical"); 
 
    // run forever, accepting and spawning a thread for each connection 
    while (true) { 
      socket clntsock = servsock.accept(); // block waiting for connection 
      // spawn thread to handle new connection 
      thread thread = new thread(new echoprotocol(clntsock, logger)); 
      thread.start(); 
      logger.info("created and started thread " + thread.getname()); 
    } 
    /* not reached */ 
  } 
} 

三、固定线程数的tcp服务端:

import java.io.ioexception; 
import java.net.serversocket; 
import java.net.socket; 
import java.util.logging.level; 
import java.util.logging.logger; 
 
public class tcpechoserverpool { 
  public static void main(string[] args) throws ioexception { 
    int threadpoolsize = 3; // fixed threadpoolsize 
 
    final serversocket servsock = new serversocket(5500); 
    final logger logger = logger.getlogger("practical"); 
 
    // spawn a fixed number of threads to service clients 
    for (int i = 0; i < threadpoolsize; i++) { 
      thread thread = new thread() { 
        public void run() { 
          while (true) { 
            try { 
              socket clntsock = servsock.accept(); // wait for a connection 
              echoprotocol.handleechoclient(clntsock, logger); // handle it 
            } catch (ioexception ex) { 
              logger.log(level.warning, "client accept failed", ex); 
            } 
          } 
        } 
      }; 
      thread.start(); 
      logger.info("created and started thread = " + thread.getname()); 
    } 
  } 
} 

四、使用线程池(使用spring的线程次会有队列、最大线程数、最小线程数和超时时间的概念)

1.线程池工具类:

import java.util.concurrent.*; 
 
/** 
 * 任务执行者 
 * 
 * @author watson xu 
 * @since 1.0.0 <p>2013-6-8 上午10:33:09</p> 
 */ 
public class threadpooltaskexecutor { 
 
  private threadpooltaskexecutor() { 
 
  } 
 
  private static executorservice executor = executors.newcachedthreadpool(new threadfactory() { 
    int count; 
 
    /* 执行器会在需要自行任务而线程池中没有线程的时候来调用该程序。对于callable类型的调用通过封装以后转化为runnable */ 
    public thread newthread(runnable r) { 
      count++; 
      thread invokethread = new thread(r); 
      invokethread.setname("courser thread-" + count); 
      invokethread.setdaemon(false);// //???????????? 
 
      return invokethread; 
    } 
  }); 
 
  public static void invoke(runnable task, timeunit unit, long timeout) throws timeoutexception, runtimeexception { 
    invoke(task, null, unit, timeout); 
  } 
 
  public static <t> t invoke(runnable task, t result, timeunit unit, long timeout) throws timeoutexception, 
      runtimeexception { 
    future<t> future = executor.submit(task, result); 
    t t = null; 
    try { 
      t = future.get(timeout, unit); 
    } catch (timeoutexception e) { 
      throw new timeoutexception("thread invoke timeout ..."); 
    } catch (exception e) { 
      throw new runtimeexception(e); 
    } 
    return t; 
  } 
 
  public static <t> t invoke(callable<t> task, timeunit unit, long timeout) throws timeoutexception, runtimeexception { 
    // 这里将任务提交给执行器,任务已经启动,这里是异步的。 
    future<t> future = executor.submit(task); 
    // system.out.println("task aready in thread"); 
    t t = null; 
    try { 
      /* 
       * 这里的操作是确认任务是否已经完成,有了这个操作以后 
       * 1)对invoke()的调用线程变成了等待任务完成状态 
       * 2)主线程可以接收子线程的处理结果 
       */ 
      t = future.get(timeout, unit); 
    } catch (timeoutexception e) { 
      throw new timeoutexception("thread invoke timeout ..."); 
    } catch (exception e) { 
      throw new runtimeexception(e); 
    } 
 
    return t; 
  } 
} 

2.具有伸缩性的tcp服务端:

import java.io.ioexception; 
import java.net.serversocket; 
import java.net.socket; 
import java.util.concurrent.timeunit; 
import java.util.logging.logger; 
 
import demo.callable.threadpooltaskexecutor; 
 
 
public class tcpechoserverexecutor { 
 
  public static void main(string[] args) throws ioexception { 
    // create a server socket to accept client connection requests 
    serversocket servsock = new serversocket(5500); 
 
    logger logger = logger.getlogger("practical"); 
     
    // run forever, accepting and spawning threads to service each connection 
    while (true) { 
      socket clntsock = servsock.accept(); // block waiting for connection 
      //executorservice.submit(new echoprotocol(clntsock, logger)); 
      try { 
        threadpooltaskexecutor.invoke(new echoprotocol(clntsock, logger), timeunit.seconds, 3); 
      } catch (exception e) { 
      }  
      //service.execute(new timelimitechoprotocol(clntsock, logger)); 
    } 
    /* not reached */ 
  } 
} 

以上就是本文的全部内容,查看更多java的语法,大家可以关注:《thinking in java 中文手册》、《jdk 1.7 参考手册官方英文版》、《jdk 1.6 api java 中文参考手册》、《jdk 1.5 api java 中文参考手册》,也希望大家多多支持。