NIO例子
程序员文章站
2022-05-04 08:48:45
client向server发送questionId,server随机得到1个答案,然后把questionId和答案发送给clientAnswerServerpublic class AnswerServer { private static AtomicLong atomicLong = new AtomicLong(); public static final String[] answers = {"A","B","C","D"}; public stat.....
client向server发送questionId,server随机得到1个答案,然后把questionId和答案发送给client
AnswerServer
public class AnswerServer {
private static AtomicLong atomicLong = new AtomicLong();
public static final String[] answers = {"A","B","C","D"};
public static final String RES_CONNECTION_OK = "OK";
static Random random = new Random(3);
private Selector selector;
public AnswerServer(){
try{
this.init();
}catch (Exception e){
e.printStackTrace();
}
}
public void init() throws Exception{
//打开1个io多路复用器
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//绑定服务端口
serverSocketChannel.socket().bind(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));
System.out.println("answer server linten port="+Constant.SERVER.PORT);
//对于服务端来说,一定要先注册一个OP_ACCEPT事件用来响应客户端的请求连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void start() throws Exception{
while (true) {
this.selector.select();
System.out.println("the coming keys="+this.selector.selectedKeys());
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
SocketChannel socketChannel = null;
int type = 0;
try {
if (key.isAcceptable()) {
//这里key.channel是ServerSocketChannel
type = SelectionKey.OP_ACCEPT;
this.accept(key);
} else if (key.isValid() && key.isReadable()) {
type = SelectionKey.OP_READ;
socketChannel = (SocketChannel) key.channel();
this.read(socketChannel, key);
}
}catch (Exception e) {
key.cancel();
if(socketChannel!=null){
socketChannel.socket().close();
socketChannel.close();
}
System.out.println("exception at type="+type);
e.printStackTrace();
}
}
}
}
private void accept(SelectionKey key) throws Exception {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("开始注册读时间");
socketChannel.register(this.selector,SelectionKey.OP_READ);
this.sayAcceptOk(socketChannel);
}
private void sayAcceptOk(SocketChannel socketChannel) throws Exception{
ByteBuffer byteBuffer = ByteBuffer.wrap(AnswerServer.RES_CONNECTION_OK.getBytes());
System.out.println("AnswerServer发送连接成功!");
socketChannel.write(byteBuffer);
}
private void read(SocketChannel channel,SelectionKey key) throws Exception {
System.out.println(atomicLong.incrementAndGet()+",开始读数据 key="+key);
// 先得到题目
ByteBuffer buffer = ByteBuffer.allocate(50);
while (channel.isOpen() && channel.read(buffer) != -1) {
if (buffer.position() > 0) {
System.out.println("数据读完了-----------------------position="+buffer.position());
break;
}
}
if (buffer.position() == 0) {
System.out.println("没有数据了-----------------------");
return; // 如果没数据了, 则不继续后面的处理
}
String questionId = ByteBufferUtil.toString(buffer);
System.out.println("========read questionId :" + questionId);
String answer = answers[random.nextInt(4)];
String response = "questionId="+questionId+",answerId="+answer;
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
channel.write(responseBuffer);
}
public static void main(String[] args) throws Exception{
new AnswerServer().start();
}
}
AnswerNIOClient
public class AnswerNIOClient implements MyRunnable {
private Selector selector;
SocketChannel socketChannel;
private AtomicLong atomicLong = new AtomicLong();
public AnswerNIOClient() throws Exception {
this.init();
}
@Override
public void init() throws Exception {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// System.out.println("end init method");
}
@Override
public void start() throws Exception {
// System.out.println("call start method");
try {
this.doConnect();
// System.out.println("after call doConnect method");
}catch (Exception e){
System.out.println("连接失败!");
e.printStackTrace();
System.exit(-1);
}
while (true) {
selector.select();
try {
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
SocketChannel sc = (SocketChannel) key.channel();
System.out.println("isTrue=" + (sc==socketChannel));
System.out.println("socketChannel=" + socketChannel);
if(key.isConnectable()) {
if (sc.finishConnect()) {
System.out.println("==========finishConnect==================================");
sc.register(selector, SelectionKey.OP_READ);
this.doWrite(socketChannel);
} else {
//连接失败 进程退出
System.exit(1);
}
} else if (key.isValid() && key.isReadable()) {
this.doRead(sc, key);
} else if (key.isValid() && key.isWritable()) {
this.doWrite(sc);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void doConnect() throws Exception {
//通过ip和端口号连接到服务器
boolean connected = this.socketChannel.connect(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));
System.out.println("doConnect="+connected);
if(connected){
//向多路复用器注册可读事件
socketChannel.register(this.selector,SelectionKey.OP_READ);
} else {
//若连接服务器失败,则向多路复用器注册连接事件
socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
}
}
private void doRead(SocketChannel socketChannel,SelectionKey key) throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
socketChannel.read(readBuffer);
String content = ByteBufferUtil.toString(readBuffer);
System.out.println("服务器响应! 问题及答案:"+content);
socketChannel.register(this.selector,SelectionKey.OP_WRITE);
}
private void doWrite(SocketChannel socketChannel) throws Exception {
Long questionId = getQuestionId();
ByteBuffer byteBuffer = ByteBuffer.wrap(String.valueOf(questionId).getBytes());
socketChannel.write(byteBuffer);
System.out.println("AnswerNIOClient 发送查询答案! questionId="+questionId);
socketChannel.register(this.selector,SelectionKey.OP_READ);
}
private Long getQuestionId() throws Exception{
TimeUnit.SECONDS.sleep(3);
Long questionId = atomicLong.incrementAndGet();
return questionId;
}
public static void main(String[] args) throws Exception{
new AnswerNIOClient().start();
}
}
AnswerBIOClient
public class AnswerBIOClient {
Socket socket = new Socket();
private AtomicLong atomicLong = new AtomicLong();
private boolean isConnectionOk = false;
public AnswerBIOClient(){
try{
init();
}catch (Exception e){
e.printStackTrace();
}
}
public void init() throws Exception{
socket.connect(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));
if(socket.isConnected()) {
System.out.println("AnswerBIOClient连接成功! 服务端端口="+socket.getPort()+" 客户端端口="+socket.getLocalPort());
}
}
public void start() throws Exception{
Runnable readRunnable = ()->{
try {
read();
}catch (Exception e){
System.out.println("readRunnable exception "+e.getMessage());
e.printStackTrace();
}
};
Runnable writeRunnable = ()->{
try {
write();
}catch (Exception e){
System.out.println("readRunnable exception "+e.getMessage());
e.printStackTrace();
}
};
Thread readThread = new Thread(readRunnable,"readThread");
Thread writeThread = new Thread(writeRunnable,"writeThread");
readThread.start();
writeThread.start();
}
private void read1() throws Exception{
while(true) {
//服务器返回需要加\n
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("等待读数据");
String line = null;
while((line=bufferedReader.readLine())!=null) {
System.out.println("AnswerBIOClient read server data =" + line);
}
}
}
private void read() throws Exception{
while(true) {
System.out.println("等待读数据");
byte[] bytes = new byte[512];
int readLength = socket.getInputStream().read(bytes);
System.out.println("readLength="+readLength);
while (readLength!=-1) {
String result = new String(bytes,0,readLength);
if(!isConnectionOk){
isConnectionOk = result.equals(AnswerServer.RES_CONNECTION_OK);
System.out.println("result="+result+" isConnectionOk="+isConnectionOk+" o:"+result.equals(AnswerServer.RES_CONNECTION_OK));
}
}
}
}
private void write() throws Exception{
while (true) {
if (isConnectionOk) {
TimeUnit.SECONDS.sleep(5);
Long questionId = atomicLong.incrementAndGet();
System.out.println("客户端开始发送 questionId=" + questionId);
socket.getOutputStream().write(String.valueOf(questionId).getBytes());
socket.getOutputStream().flush();
}
}
}
public static void main(String[] args) throws Exception{
new AnswerBIOClient().start();
}
}
本文地址:https://blog.csdn.net/kq1983/article/details/107495341
上一篇: 鲲鹏解决方案 1.0vs1.5
下一篇: 2020.7.29(Map、快速排序)