yanf4j引入了客户端非阻塞API
yanf4j
发布一个0.50-beta2
版本,这个版本最重要的改进就是引入了客户端连接非阻塞AP
I,主要最近的工作要用到,所以添加了。两个核心类TCPConnectorController
和UDPConnectorController
分别用于TCP和UDP的客户端连接控制。例如,现在的UDP echo client可以写成:
class EchoClientHandler extends HandlerAdapter {
public void onReceive(Session udpSession, Object t) {
DatagramPacket datagramPacket = (DatagramPacket) t;
System.out.println( " recv: " + new String(datagramPacket.getData()));
}
@Override
public void onMessageSent(Session session, Object t) {
System.out.println( " send: " + new String(( byte []) t));
}
}
// 连接代码,并发送UDP包
UDPConnectorController connector = new UDPConnectorController();
connector.setSoTimeout( 1000 );
connector.setHandler( new EchoClientHandler());
connector.connect( new InetSocketAddress(InetAddress.getByName(host),
port));
for ( int i = 0 ; i < 10000 ; i ++ ) {
String s = " hello " + i;
DatagramPacket packet = new DatagramPacket(s.getBytes(), s.length());
connector.send(packet);
}
UDP不是面向连接的,因此connect方法仅仅是调用了底层DatagramChannel.connect方法,用来限制接收和发送的packet的远程端点。
再来看看TCPConnectorController的使用,同样看Echo Client的实现:
class EchoHandler extends HandlerAdapter < String > {
@Override
public void onConnected(Session session) {
try {
// 一连接就发送NUM个字符串
for ( int i = 0 ; i < NUM; i ++ )
session.send(generateString(i));
} catch (Exception e) {
}
}
public String generateString( int len) {
StringBuffer sb = new StringBuffer();
for ( int i = 0 ; i < MESSAGE_LEN; i ++ )
sb.append(i);
return sb.toString();
}
@Override
public void onReceive(Session session, String t) {
// 打印接收到字符串
if (DEBUG)
System.out.println( " recv: " + t);
}
}
// ...连接API,TCPConnectorController示例
Configuration configuration = new Configuration();
configuration.setTcpSessionReadBufferSize( 256 * 1024 ); // 设置读的缓冲区大小
TCPConnectorController connector = new TCPConnectorController(configuration,
new StringCodecFactory());
connector.setHandler( new EchoHandler());
connector.setCodecFactory( new StringCodecFactory());
try {
connector.Connect( new InetSocketAddress( " localhost " , 8080 ));
} catch (IOExceptione) {
e.printStackTrace();
}
注意,connect方法并不阻塞
,而是立即返回,连接是否建立可以通过TCPConnectorController.isConnected()方法来判断,因此通常你可能会这样使用:
connector.Connect( new InetSocketAddress( " localhost " , 8080 ));
while ( ! connector.isConnected())
;
} catch (Exception e) {
e.printStackTrace();
}
来强制确保后面对connector的使用是已经连接上的connector,然而更好的做法是在Handler的onConnected()回调方法中处理逻辑,因为这个方法仅仅在连接建立后才会被调用。
两个ConnectorController都有系列send方法,用于发送数据:
UDPConnectorController.send(DatagramPacket packet) throws InterruptedException
UDPConnectorController.send(SocketAddress targetAddr, Object msg) throws InterruptedException
0.50-beta2带来的另一个修改就是Session接口添加setReadBufferByteOrder
方法,用于设置session接收缓冲区的字节序,默认是网络字节序,也就是大端法。这个方法建议在Handler的onSessionStarted回调方法中调用。
在0.50-beta最重要的修改是引入了session发送队列缓冲区的流量控制选项
。默认情况下,session的发送缓冲队列是*的,队列的push和pop也全然不会阻塞。在设置了缓冲队列的高低水位选项后即引入了发送流量控制,规则如下:
a)当发送队列中的数据总量大于高水位标记(highWaterMark),Session.send将阻塞
b)在条件a的作用下,Session.send的阻塞将持续到发送队列中的数据总量小于于低水位标记(lowWaterMark)才解除。
缓冲队列高低水位的设置通过Controller的下列方法设置:
public void setSessionWriteQueueLowWaterMark( int lowWaterMark);
缓冲队列的流量控制想法来自ACE的ACE_Message_Queue,是通过com.google.code.yanf4j.util.MessageQueue类实现的。
0.50-beta还引入了Session.send(Object msg)的重载版本 Session.send(Object msg,long timeout),在超过timeout时间后send仍然阻塞时即终止send。注意,现在Session.send的这两个方法都返回一个bool值来表示send成功与否,并且都将响应中断(仅限启动了流量控制选项)抛出InterruptedException。