欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

Mina源码阅读笔记(四)—Mina的连接IoConnector2

程序员文章站 2022-05-19 22:50:11
...

接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续: AbstractIoAcceptor: 001 package org.apache.mina.core.rewrite.service; 002 003 import java.io.IOException; 004 import java.net.SocketAddress; 005 import java.util.ArrayList; 0

接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续:

AbstractIoAcceptor:

001 package org.apache.mina.core.rewrite.service;

002

003 import java.io.IOException;

004 import java.net.SocketAddress;

005 import java.util.ArrayList;

006 import java.util.Collections;

007 import java.util.HashSet;

008 import java.util.List;

009 import java.util.Set;

010 import java.util.concurrent.Executor;

011

012 public abstract class AbstractIoAcceptor extends AbstractIoService implements

013 IoAcceptor {

014

015 private final List defaultLocalAddresses = new ArrayList();

016

017 private final List unmodifiableDeffaultLocalAddresses = Collections

018 .unmodifiableList(defaultLocalAddresses);

019

020 private final Set boundAddresses = new HashSet();

021

022 private boolean disconnectOnUnbind = true;

023

024 /** 这里不是很明白,为什么要用protected 而 不是private */

025 protected final Object bindLock = new Object();

026

027 /**

028 * 注意这个构造方法是一定要写的,否则编译不通过:抽象类继承时候,构造方法都要写,而且必须包含super

029 *

030 * @param param

031 * sessionConfig

032 * @param executor

033 */

034 protected AbstractIoAcceptor(Object param, Executor executor) {

035 super(param, executor);

036 defaultLocalAddresses.add(null);

037 }

038

039 @Override

040 public SocketAddress getLocalAddress() {

041

042 Set localAddresses = getLocalAddresses();

043 if (localAddresses.isEmpty()) {

044 return null;

045 }

046 return localAddresses.iterator().next();

047 }

048

049 @Override

050 public final Set getLocalAddresses() {

051 Set localAddresses = new HashSet();

052 synchronized (boundAddresses) {

053 localAddresses.addAll(boundAddresses);

054 }

055 return localAddresses;

056 }

057

058 @Override

059 public void bind(SocketAddress localAddress) throws IOException {

060 // TODO Auto-generated method stub

061

062 }

063

064 @Override

065 public void bind(Iterable extends SocketAddress> localAddresses)

066 throws IOException {

067 // TODO isDisposing()

068

069 if (localAddresses == null) {

070 throw new IllegalArgumentException("localAddresses");

071 }

072

073 List localAddressesCopy = new ArrayList();

074

075 for (SocketAddress a : localAddresses) {

076 // TODO check address type

077 localAddressesCopy.add(a);

078 }

079

080 if (localAddressesCopy.isEmpty()) {

081 throw new IllegalArgumentException("localAddresses is empty");

082 }

083

084 boolean active = false;

085

086 synchronized (bindLock) {

087 synchronized (boundAddresses) {

088 if (boundAddresses.isEmpty()) {

089 active = true;

090 }

091 }

092 }

093 /** implement in abstractIoService */

094 if (getHandler() == null) {

095 throw new IllegalArgumentException("handler is not set");

096 }

097

098 try {

099 Set addresses = bindInternal(localAddressesCopy);

100

101 synchronized (boundAddresses) {

102 boundAddresses.addAll(addresses);

103 }

104 } catch (IOException e) {

105 throw e;

106 } catch (RuntimeException e) {

107 throw e;

108 } catch (Throwable e) {

109 throw new RuntimeException("Filed ti bind");

110 }

111

112 if(active){

113 //do sth

114 }

115 }

116

117 protected abstract Set bindInternal(

118 List extends SocketAddress> localAddress) throws Exception;

119

120 @Override

121 public void unbind(SocketAddress localAddress) {

122 // TODO Auto-generated method stub

123

124 }

125 }
polling:

01 package org.apache.mina.core.rewrite.polling;

02

03 import java.net.SocketAddress;

04 import java.nio.channels.ServerSocketChannel;

05 import java.util.List;

06 import java.util.Set;

07 import java.util.concurrent.Executor;

08 import java.util.concurrent.Semaphore;

09 import java.util.concurrent.atomic.AtomicReference;

10

11 import org.apache.mina.core.rewrite.service.AbstractIoAcceptor;

12

13 public abstract class AbstractPollingIoAcceptor extends AbstractIoAcceptor {

14

15 private final Semaphore lock = new Semaphore(1);

16

17 private volatile boolean selectable;

18

19 private AtomicReference acceptorRef = new AtomicReference();

20

21 /**

22 * define the num of sockets that can wait to be accepted.

23 */

24 protected int backlog = 50;

25

26 /**

27 * 一样的,这个构造方法也要写

28 *

29 * @param param

30 * @param executor

31 */

32 protected AbstractPollingIoAcceptor(Object param, Executor executor) {

33 super(param, executor);

34 // TODO Auto-generated constructor stub

35 }

36

37 /**

38 * init the polling system. will be called at construction time

39 *

40 * @throws Exception

41 */

42 protected abstract void init() throws Exception;

43

44 protected abstract void destory() throws Exception;

45

46 protected abstract int select() throws Exception;

47 /**这里有点儿变动*/

48 protected abstract ServerSocketChannel open(SocketAddress localAddress) throws Exception;

49

50 @Override

51 protected Set bindInternal(

52 List extends SocketAddress> localAddress) throws Exception {

53 // ...

54 try {

55 lock.acquire();

56 Thread.sleep(10);

57 } finally {

58 lock.release();

59 }

60 // ...

61 return null;

62 }

63

64 /**

65 * this class is called by startupAcceptor() method it's a thread accepting

66 * incoming connections from client

67 *

68 * @author ChenHui

69 *

70 */

71 private class Acceptor implements Runnable {

72 @Override

73 public void run() {

74 assert (acceptorRef.get() == this);

75

76 int nHandles = 0;

77

78 lock.release();

79

80 while (selectable) {

81 try {

82 int selected = select();

83

84 // nHandles+=registerHandles();

85

86 if (nHandles == 0) {

87 acceptorRef.set(null);

88 // ...

89 }

90 } catch (Exception e) {

91

92 }

93 }

94 }

95 }

96 }
好了最后看NioSoeketAcceptor:

001 package org.apache.mina.rewrite.transport.socket.nio;

002

003 import java.net.InetSocketAddress;

004 import java.net.ServerSocket;

005 import java.net.SocketAddress;

006 import java.nio.channels.SelectionKey;

007 import java.nio.channels.Selector;

008 import java.nio.channels.ServerSocketChannel;

009 import java.util.concurrent.Executor;

010

011 import org.apache.mina.core.rewrite.polling.AbstractPollingIoAcceptor;

012 import org.apache.mina.rewrite.transport.socket.SocketAcceptor;

013

014 public final class NioSocketAcceptor extends AbstractPollingIoAcceptor

015 implements SocketAcceptor {

016

017 private volatile Selector selector;

018

019 protected NioSocketAcceptor(Object param, Executor executor) {

020 super(param, executor);

021 // TODO Auto-generated constructor stub

022 }

023

024 @Override

025 public int getManagedSessionCount() {

026 // TODO Auto-generated method stub

027 return 0;

028 }

029

030 /**

031 * 这个方法继承自AbstractIoAcceptor

032 *

033 * The type NioSocketAcceptor must implement the inherited abstract method

034 * SocketAcceptor.getLocalAddress() to override

035 * AbstractIoAcceptor.getLocalAddress()

036 */

037 @Override

038 public InetSocketAddress getLocalAddress() {

039 // TODO Auto-generated method stub

040 return null;

041 }

042

043 @Override

044 public void setDefaultLocalAddress(InetSocketAddress localAddress) {

045 // TODO Auto-generated method stub

046

047 }

048

049 @Override

050 public boolean isReuseAddress() {

051 // TODO Auto-generated method stub

052 return false;

053 }

054

055 @Override

056 protected void init() throws Exception {

057 selector = Selector.open();

058 }

059

060 @Override

061 protected void destory() throws Exception {

062 if (selector != null) {

063 selector.close();

064 }

065 }

066

067 @Override

068 protected int select() throws Exception {

069 return selector.select();

070 }

071

072 @Override

073 protected void dispose0() throws Exception {

074 // TODO Auto-generated method stub

075

076 }

077

078 protected ServerSocketChannel open(SocketAddress localAddress)

079 throws Exception {

080 ServerSocketChannel channel =ServerSocketChannel.open();

081

082 boolean success=false;

083

084 try{

085 channel.configureBlocking(false);

086

087 ServerSocket socket=channel.socket();

088

089 socket.setReuseAddress(isReuseAddress());

090

091 socket.bind(localAddress);

092

093 channel.register(selector, SelectionKey.OP_ACCEPT);

094

095 success=true;

096 }finally{

097 if(!success){

098 //close(channel);

099 }

100 }

101 return channel;

102 }

103

104 @Override

105 public boolean isActive() {

106 // TODO Auto-generated method stub

107 return false;

108 }

109

110 }
------------------------------------------------------

到此为止将连接部分都写完了,在连接部分还有些零碎的东西,比如handlerpollingsession了。