Tomcat源码之JIoEndpoint 博客分类: 多线程
程序员文章站
2024-03-23 17:03:46
...
Tomcat中提供了多种处理Socket的实现:JIoEndpoint、AprEndpoint和NioEndpoint。其中JIoEndpoint是最常见的一种实现方式。
JIOEndpoint中的线程有3部分:Socket侦听线程、监控线程和Executor。
Socket侦听线程
该线程由内部类Acceptor实现其Runnable接口,使用JDK的ServerSocket类监听某个端口,当socket连接进来时,构建一个task,交给Executor来处理。
监控线程
该线程由AsyncTimeout实现其Runnable接口,不断遍历Request队列,为Timeout的Request构造一个TIMEOUT task,交给Executor来处理。
Executor
Executor负责提供工作线程用来处理侦听线程和监控线程构造的task。该task会完成对Request回应。如果用户没有指定Executor,JIoEndpoint中会自动创建org.apache.tomcat.util.threads.ThreadPoolExecutor作为默认实现。
按照工作流程的顺序介绍一下各个部分:
初始化:bind()
该方法在父类AbstractEndpoint中的init()被调用,完成对ServerSocketFactory和ServerSocket初始化。
@Override
public void bind() throws Exception {
// Initialize thread count defaults for acceptor
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1;
}
if (serverSocketFactory == null) {
if (isSSLEnabled()) {
ServerSocketFactory = handler.getSslImplementation().getServerSocketFactory(this);
} else {
ServerSocketFactory = new DefaultServerSocketFactory(this);
}
}
if (serverSocket == null) {
try {
if (getAddress() == null) {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog());
} else {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog(), getAddress());
}
} catch (BindException orig) {
String msg;
if (getAddress() == null)
msg = orig.getMessage() + " <null>:" + getPort();
else
msg = orig.getMessage() + " " +
getAddress().toString() + ":" + getPort();
BindException be = new BindException(msg);
be.initCause(orig);
throw be;
}
}
}
启动:startInternal()
该方法在父类AbstractEndpoint中的start()被调用,生成线程资源池Executor、启动侦听线程(acceptor)和监控线程(async timeout)。
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
initializeConnectionLatch();
// Start acceptor threads
for (int i = 0; i < acceptorThreadCount; i++) {
Thread acceptorThread = new Thread(new Acceptor(),
getName() + "-Acceptor-" + i);
acceptorThread.setPriority(threadPriority);
acceptorThread.setDaemon(getDaemon());
acceptorThread.start();
}
// Start async timeout thread
Thread timeoutThread = new Thread(new AsyncTimeout(),
getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
}
至此,JIoEndpoint初始化完成,并完成启动,等待用户发起连接并进行处理。一个用户连接从侦听到处理完成主要经历以下几个Runnable接口:Acceptor、AsyncTimeout和SocketProcessor。
Acceptor内部类
Acceptor实现了Runnable接口,由侦听调用。其主要的功能就是通过ServerSocket.accept方法,得到socket,然后调用JIOEndpoint的processSocket方法来处理。processSocket属于异步调用,Acceptor会迅速返回,然后继续在accept处等待新的socket。在accept前,用awaitConnection()来保证连接数不超过最大处理数,否则线程会wait,直到有连接被处理完。acceptSocket()线程安全,所以在此不用做synchronized处理。
protected class Acceptor implements Runnable {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
try {
//if we have reached max connections, wait
awaitConnection();
Socket socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSocketFactory.acceptSocket(serverSocket);
} catch (IOException ioe) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (setSocketOptions(socket)) {
// Hand this socket off to an appropriate processor
if (!processSocket(socket)) {
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
} else {
countUpConnection();
}
} else {
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), npe);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
// The processor will recycle itself when it finishes
}
}
}
在run()方法中使用JIoEndpoint中的processSocket(),该方法主要是产生1个SocketProcessor交给Exector来处理。
protected boolean processSocket(Socket socket) {
// Process the request from this socket
try {
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
getExecutor().execute(new SocketProcessor(wrapper));
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
SocketProcessor内部类
该class同样只有一个run方法,把request最终交给handler,真正的处理就交给coyote了。
protected class SocketProcessor implements Runnable {
protected SocketWrapper<Socket> socket = null;
protected SocketStatus status = null;
public SocketProcessor(SocketWrapper<Socket> socket) {
if (socket==null) throw new NullPointerException();
this.socket = socket;
}
public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
this(socket);
this.status = status;
}
@Override
public void run() {
boolean launch = false;
synchronized (socket) {
try {
SocketState state = SocketState.OPEN;
try {
// SSL handshake
serverSocketFactory.handshake(socket.getSocket());
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.handshake"), t);
}
// Tell to close the socket
state = SocketState.CLOSED;
}
if ( (state != SocketState.CLOSED) ) {
state = (status==null)?handler.process(socket):handler.process(socket,status);
}
if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
}
countDownConnection();
try {
socket.getSocket().close();
} catch (IOException e) {
// Ignore
}
} else if (state == SocketState.ASYNC_END ||
state == SocketState.OPEN){
socket.setKeptAlive(true);
socket.access();
launch = true;
} else if (state == SocketState.LONG) {
socket.access();
waitingRequests.add(socket);
}
} finally {
if (launch) {
try {
getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.launch.fail"),
npe);
}
}
}
}
}
socket = null;
// Finish up this request
}
}
至此,一个正常的request处理流程就完成了。JIoEndpoint中还有一个Runnable内部类,该类主要是找出当然任务队列中已经超时的request,加上timeout状态标记,交给Exector处理。
AsyncTimeout内部类
protected class AsyncTimeout implements Runnable {
@Override
public void run() {
// Loop until we receive a shutdown command
while (running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
long now = System.currentTimeMillis();
Iterator<SocketWrapper<Socket>> sockets =
waitingRequests.iterator();
while (sockets.hasNext()) {
SocketWrapper<Socket> socket = sockets.next();
long access = socket.getLastAccess();
if ((now-access)>socket.getTimeout()) {
processSocketAsync(socket,SocketStatus.TIMEOUT);
}
}
// Loop if endpoint is paused
while (paused && running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}
}
JIOEndpoint中的线程有3部分:Socket侦听线程、监控线程和Executor。
Socket侦听线程
该线程由内部类Acceptor实现其Runnable接口,使用JDK的ServerSocket类监听某个端口,当socket连接进来时,构建一个task,交给Executor来处理。
监控线程
该线程由AsyncTimeout实现其Runnable接口,不断遍历Request队列,为Timeout的Request构造一个TIMEOUT task,交给Executor来处理。
Executor
Executor负责提供工作线程用来处理侦听线程和监控线程构造的task。该task会完成对Request回应。如果用户没有指定Executor,JIoEndpoint中会自动创建org.apache.tomcat.util.threads.ThreadPoolExecutor作为默认实现。
按照工作流程的顺序介绍一下各个部分:
初始化:bind()
该方法在父类AbstractEndpoint中的init()被调用,完成对ServerSocketFactory和ServerSocket初始化。
@Override
public void bind() throws Exception {
// Initialize thread count defaults for acceptor
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1;
}
if (serverSocketFactory == null) {
if (isSSLEnabled()) {
ServerSocketFactory = handler.getSslImplementation().getServerSocketFactory(this);
} else {
ServerSocketFactory = new DefaultServerSocketFactory(this);
}
}
if (serverSocket == null) {
try {
if (getAddress() == null) {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog());
} else {
serverSocket = serverSocketFactory.createSocket(getPort(),
getBacklog(), getAddress());
}
} catch (BindException orig) {
String msg;
if (getAddress() == null)
msg = orig.getMessage() + " <null>:" + getPort();
else
msg = orig.getMessage() + " " +
getAddress().toString() + ":" + getPort();
BindException be = new BindException(msg);
be.initCause(orig);
throw be;
}
}
}
启动:startInternal()
该方法在父类AbstractEndpoint中的start()被调用,生成线程资源池Executor、启动侦听线程(acceptor)和监控线程(async timeout)。
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
// Create worker collection
if (getExecutor() == null) {
createExecutor();
}
initializeConnectionLatch();
// Start acceptor threads
for (int i = 0; i < acceptorThreadCount; i++) {
Thread acceptorThread = new Thread(new Acceptor(),
getName() + "-Acceptor-" + i);
acceptorThread.setPriority(threadPriority);
acceptorThread.setDaemon(getDaemon());
acceptorThread.start();
}
// Start async timeout thread
Thread timeoutThread = new Thread(new AsyncTimeout(),
getName() + "-AsyncTimeout");
timeoutThread.setPriority(threadPriority);
timeoutThread.setDaemon(true);
timeoutThread.start();
}
}
至此,JIoEndpoint初始化完成,并完成启动,等待用户发起连接并进行处理。一个用户连接从侦听到处理完成主要经历以下几个Runnable接口:Acceptor、AsyncTimeout和SocketProcessor。
Acceptor内部类
Acceptor实现了Runnable接口,由侦听调用。其主要的功能就是通过ServerSocket.accept方法,得到socket,然后调用JIOEndpoint的processSocket方法来处理。processSocket属于异步调用,Acceptor会迅速返回,然后继续在accept处等待新的socket。在accept前,用awaitConnection()来保证连接数不超过最大处理数,否则线程会wait,直到有连接被处理完。acceptSocket()线程安全,所以在此不用做synchronized处理。
protected class Acceptor implements Runnable {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
try {
//if we have reached max connections, wait
awaitConnection();
Socket socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSocketFactory.acceptSocket(serverSocket);
} catch (IOException ioe) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (setSocketOptions(socket)) {
// Hand this socket off to an appropriate processor
if (!processSocket(socket)) {
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
} else {
countUpConnection();
}
} else {
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), npe);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
// The processor will recycle itself when it finishes
}
}
}
在run()方法中使用JIoEndpoint中的processSocket(),该方法主要是产生1个SocketProcessor交给Exector来处理。
protected boolean processSocket(Socket socket) {
// Process the request from this socket
try {
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
getExecutor().execute(new SocketProcessor(wrapper));
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
SocketProcessor内部类
该class同样只有一个run方法,把request最终交给handler,真正的处理就交给coyote了。
protected class SocketProcessor implements Runnable {
protected SocketWrapper<Socket> socket = null;
protected SocketStatus status = null;
public SocketProcessor(SocketWrapper<Socket> socket) {
if (socket==null) throw new NullPointerException();
this.socket = socket;
}
public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
this(socket);
this.status = status;
}
@Override
public void run() {
boolean launch = false;
synchronized (socket) {
try {
SocketState state = SocketState.OPEN;
try {
// SSL handshake
serverSocketFactory.handshake(socket.getSocket());
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.handshake"), t);
}
// Tell to close the socket
state = SocketState.CLOSED;
}
if ( (state != SocketState.CLOSED) ) {
state = (status==null)?handler.process(socket):handler.process(socket,status);
}
if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
}
countDownConnection();
try {
socket.getSocket().close();
} catch (IOException e) {
// Ignore
}
} else if (state == SocketState.ASYNC_END ||
state == SocketState.OPEN){
socket.setKeptAlive(true);
socket.access();
launch = true;
} else if (state == SocketState.LONG) {
socket.access();
waitingRequests.add(socket);
}
} finally {
if (launch) {
try {
getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.launch.fail"),
npe);
}
}
}
}
}
socket = null;
// Finish up this request
}
}
至此,一个正常的request处理流程就完成了。JIoEndpoint中还有一个Runnable内部类,该类主要是找出当然任务队列中已经超时的request,加上timeout状态标记,交给Exector处理。
AsyncTimeout内部类
protected class AsyncTimeout implements Runnable {
@Override
public void run() {
// Loop until we receive a shutdown command
while (running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
long now = System.currentTimeMillis();
Iterator<SocketWrapper<Socket>> sockets =
waitingRequests.iterator();
while (sockets.hasNext()) {
SocketWrapper<Socket> socket = sockets.next();
long access = socket.getLastAccess();
if ((now-access)>socket.getTimeout()) {
processSocketAsync(socket,SocketStatus.TIMEOUT);
}
}
// Loop if endpoint is paused
while (paused && running) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
}
}
}
推荐阅读
-
Tomcat源码之JIoEndpoint 博客分类: 多线程
-
Java多线程总结之线程安全队列Queue 博客分类: 多线程QueueBlockingQueueConcurrentLinkedQueue 多线程QueueBlockingQueueConcurrentLinkedQueue
-
ReentrantLockd的非公平锁lock方法实现源码解析 博客分类: java多线程 java
-
网游服务端技术总结 基于mangos源码 (一) 博客分类: 游戏c++ gameservermangos多线程
-
solr的facet源码解读(三)——facet.field之数字单值域类型 博客分类: solrlucene lucenesolrfacet数字类型
-
solrCloud源码分析之CloudSolrClient 博客分类: solr
-
《易道客》源码剖析之四:页面多次跳转的记忆 博客分类: 易道客 java易道客页面跳转开源框架
-
读ExecutorCompletionService源码 博客分类: java多线程 java
-
读ScheduledThreadPoolExecutor源码 博客分类: java多线程 java
-
读ExecutorCompletionService源码 博客分类: java多线程 java