socket通信例子
程序员文章站
2022-04-24 13:28:29
...
NIO 同步非阻塞方式,socket 的通信例子。
服务器端:
package com.kangzye.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import com.sun.corba.se.spi.ior.Writeable;
import io.netty.channel.kqueue.AcceptFilter;
public class NioTestServer {
public static void main(String[] args) throws IOException, InterruptedException {
new NioTestServer().start();
}
public NioTestServer() { }
public void start() throws IOException, InterruptedException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 这里有个坑,如果这里指定监#听 localhost ,那么客户端使用真实内网ip,是无法连接的,会被refuse, 客户需要连接 127.0.0.1 才正常(前提是客户端和服务去在一台机器)
serverSocketChannel.bind(new InetSocketAddress(3300));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
// 监听Accept 就绪事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 无限循环轮训 selector,搜集事件进行处理
while (true) {
int c = selector.select(3000);
if(c<=0){
System.out.println("c is :"+c);
Thread.sleep(100);
continue;
}else{
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isAcceptable()){
accept(key, selector);
}else if(key.isWritable()){
System.out.println("--writeable");
//write(key, selector);
}else if(key.isReadable()){
read((SocketChannel) key.channel());
}else if(key.isConnectable()){
System.out.println("--connectable");
}
// 必须remove掉,否则下次select()时,旧的事件依然存在。
iterator.remove();
}
}
}
}
private void accept(SelectionKey selectionKey, Selector sel) throws IOException {
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socket = channel.accept();
socket.configureBlocking(false);
// 将新的socket 加入到 selector ,监#听可读事件
socket.register(sel, SelectionKey.OP_READ, "a");
// 这里有个吭, OP_WRITE 表示写就绪 ,如果监#听此事件,则当有通U道连接成功并就绪,则后面每次 selector.select() 都会有OP_WRITE 的SelectionKey返回。
// 显然,这是没有必要的。如果监#听了 OP_WRITE 事件,系统扫#描时,会每次都把可通信的 channel 都搜#集到 selectionKeys 。无意义。
//socket.register(sel, SelectionKey.OP_WRITE, "b");
}
void read(SocketChannel socketChannel){
ByteBuffer dst = ByteBuffer.allocate(1024);
int c=-1;
try {
c = socketChannel.read(dst);
} catch (IOException e) {
e.printStackTrace();
}
// 必须要 flip 一下, flip 的作用是,将 limit 设置成 position, position 设置为0 。 对buffer 的每次
// get(),都是从 position读取,并同时将position向右+1。所以,因为buffer 是可读又可写,在写完之后,要读时,必须进行flip
// 同时在读取完后要进行写时,则必须进行 clear() 再才可以写。 clear 操作是将 position设置为0, limit设置为 capacity 的值。
dst.flip();
System.out.printf("c:%s,postion:%s,limit:%s", c ,dst.position(), dst.limit());
String w = new String(dst.array(), dst.position(), dst.limit());
System.out.println(w);
try {
feedback(socketChannel, w);
} catch (IOException e) {
e.printStackTrace();
}
}
// 反馈 时间+收到的内容
void feedback(SocketChannel socketchannel, String w) throws IOException {
String d = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
socketchannel.write(ByteBuffer.wrap((d + "::" + w).getBytes()));
}
}
客户端:
package com.kangzye.nio;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TalkClient {
private SocketChannel socketChannel;
private Selector sel;
private BufferedInputStream bufferedInputStream;
public static void main(String[] args) throws ClosedChannelException, InterruptedException {
new TalkClient().start();
Thread.sleep(Integer.MAX_VALUE);
}
public TalkClient() {
try {
this.sel = Selector.open();
this.socketChannel = SocketChannel.open(new InetSocketAddress("172.28.80.234", 3300));
// this.socketChannel.connect(new InetSocketAddress("172.28.80.234", 3300));
this.socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() throws ClosedChannelException {
this.socketChannel.register(this.sel, SelectionKey.OP_CONNECT);
this.socketChannel.register(this.sel, SelectionKey.OP_READ);
// write 事件,不需要监#听。
// this.socketChannel.register(this.sel, SelectionKey.OP_WRITE);
prepareRead();
toWrite(this.socketChannel);
}
private void prepareRead() {
new Thread() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
}
int select;
try {
select = sel.select(2000);
System.out.print("select:" + select);
} catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> selectedKeys = sel.selectedKeys();
System.out.println("size:" + selectedKeys.size());
for (Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();) {
SelectionKey key = iterator.next();
System.out.println("---------------------------");
System.out.println("Accesptable:" + key.isAcceptable());
System.out.println("Connectable:" + key.isConnectable());
System.out.println("Readable:" + key.isReadable());
System.out.println("valid:" + key.isValid());
System.out.println("writable:" + key.isWritable());
System.out.println("---------------------------");
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer dst = ByteBuffer.allocate(2000);
try {
channel.read(dst);
System.out.println(new String(dst.array()));
} catch (IOException e) {
e.printStackTrace();
}
} else {
System.out.println("not readable : " + key.readyOps());
}
// 必须删除
iterator.remove();
}
}
}
}.start();
}
void toWrite(SocketChannel socketChannel) {
new Thread() {
public void run() {
while (true) {
System.out.println("Input::");
ByteBuffer bb = getInput();
System.out.println(new String(bb.array(), bb.position(), bb.limit()));
try {
socketChannel.write(bb);
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
}
};
}.start();
}
ByteBuffer getInput() {
try {
this.bufferedInputStream = new BufferedInputStream(System.in);
byte[] bs = new byte[200];
int len = bufferedInputStream.read(bs);
return ByteBuffer.wrap(bs,0, len); // 必须指定poistion 和 limit,否则将后面的空格也当做了数据。
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
上一篇: 11. PHP接入微信企业付款功能
下一篇: json字符串传到前台input的方法