欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

socket通信例子

程序员文章站 2022-04-24 13:28:29
...

 NIO 同步非阻塞方式,socket 的通信例子。

服务器端:

package com.kangzye.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

import com.sun.corba.se.spi.ior.Writeable;

import io.netty.channel.kqueue.AcceptFilter;

public class NioTestServer {
	public static void main(String[] args) throws IOException, InterruptedException {
		new NioTestServer().start();
	}
	public NioTestServer() { }

	public void start() throws IOException, InterruptedException {
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		// 这里有个坑,如果这里指定监#听 localhost ,那么客户端使用真实内网ip,是无法连接的,会被refuse, 客户需要连接 127.0.0.1 才正常(前提是客户端和服务去在一台机器)
		serverSocketChannel.bind(new InetSocketAddress(3300)); 
		serverSocketChannel.configureBlocking(false);
		Selector selector = Selector.open();
		// 监听Accept 就绪事件
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		// 无限循环轮训 selector,搜集事件进行处理
		while (true) {
			int c = selector.select(3000);
			if(c<=0){
				System.out.println("c is :"+c);
				Thread.sleep(100);
				continue;
			}else{
				Set<SelectionKey> selectedKeys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = selectedKeys.iterator();
				while(iterator.hasNext()){
					SelectionKey key = iterator.next();
					if(key.isAcceptable()){
						accept(key, selector);
					}else if(key.isWritable()){
						System.out.println("--writeable");
						//write(key, selector);
					}else if(key.isReadable()){
						read((SocketChannel) key.channel());
					}else if(key.isConnectable()){
						System.out.println("--connectable");
					}
					// 必须remove掉,否则下次select()时,旧的事件依然存在。
					iterator.remove();
				}
			}
		}
	}


        private void accept(SelectionKey selectionKey, Selector sel) throws IOException {
		ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
		SocketChannel socket = channel.accept();
		socket.configureBlocking(false);
		// 将新的socket 加入到 selector ,监#听可读事件
		socket.register(sel, SelectionKey.OP_READ, "a");
		// 这里有个吭, OP_WRITE 表示写就绪 ,如果监#听此事件,则当有通U道连接成功并就绪,则后面每次 selector.select() 都会有OP_WRITE 的SelectionKey返回。
		// 显然,这是没有必要的。如果监#听了 OP_WRITE 事件,系统扫#描时,会每次都把可通信的 channel 都搜#集到 selectionKeys 。无意义。
		//socket.register(sel, SelectionKey.OP_WRITE, "b");
	} 

	void read(SocketChannel socketChannel){
		ByteBuffer dst = ByteBuffer.allocate(1024);
		int c=-1;
		try {
			c = socketChannel.read(dst);
		} catch (IOException e) {
			e.printStackTrace();
		}
		// 必须要 flip 一下, flip 的作用是,将 limit 设置成 position, position 设置为0 。 对buffer 的每次
		// get(),都是从 position读取,并同时将position向右+1。所以,因为buffer 是可读又可写,在写完之后,要读时,必须进行flip
		// 同时在读取完后要进行写时,则必须进行 clear() 再才可以写。 clear 操作是将 position设置为0, limit设置为 capacity 的值。
		dst.flip();
		System.out.printf("c:%s,postion:%s,limit:%s", c ,dst.position(), dst.limit());
		String w = new String(dst.array(), dst.position(), dst.limit());
		System.out.println(w);
		try {
			feedback(socketChannel, w);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	// 反馈 时间+收到的内容
	void feedback(SocketChannel socketchannel, String w) throws IOException {
		String d = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
		socketchannel.write(ByteBuffer.wrap((d + "::" + w).getBytes()));
	}
}

 

客户端:

package com.kangzye.nio;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TalkClient {
	private SocketChannel socketChannel;
	private Selector sel;
	private BufferedInputStream bufferedInputStream;

	public static void main(String[] args) throws ClosedChannelException, InterruptedException {
		new TalkClient().start();
		Thread.sleep(Integer.MAX_VALUE);
	}

	public TalkClient() {
		try {
			this.sel = Selector.open();
			this.socketChannel = SocketChannel.open(new InetSocketAddress("172.28.80.234", 3300));
			// this.socketChannel.connect(new InetSocketAddress("172.28.80.234",  3300));
			this.socketChannel.configureBlocking(false);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void start() throws ClosedChannelException {
		this.socketChannel.register(this.sel, SelectionKey.OP_CONNECT);
		this.socketChannel.register(this.sel, SelectionKey.OP_READ);
		// write 事件,不需要监#听。
		// this.socketChannel.register(this.sel, SelectionKey.OP_WRITE);
		prepareRead();
		toWrite(this.socketChannel);
	}

	private void prepareRead() {
		new Thread() {
			@Override
			public void run() {
				while (true) {
					try {
						Thread.sleep(2000);
					} catch (InterruptedException e1) {
					}
					int select;
					try {
						select = sel.select(2000);
						System.out.print("select:" + select);
					} catch (IOException e) {
						e.printStackTrace();
					}
					Set<SelectionKey> selectedKeys = sel.selectedKeys();
					System.out.println("size:" + selectedKeys.size());

					for (Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();) {
						SelectionKey key = iterator.next();
						System.out.println("---------------------------");
						System.out.println("Accesptable:" + key.isAcceptable());
						System.out.println("Connectable:" + key.isConnectable());
						System.out.println("Readable:" + key.isReadable());
						System.out.println("valid:" + key.isValid());
						System.out.println("writable:" + key.isWritable());
						System.out.println("---------------------------");
						if (key.isReadable()) {
							SocketChannel channel = (SocketChannel) key.channel();
							ByteBuffer dst = ByteBuffer.allocate(2000);
							try {
								channel.read(dst);
								System.out.println(new String(dst.array()));
							} catch (IOException e) {
								e.printStackTrace();
							}
						} else {
							System.out.println("not readable : " + key.readyOps());
						}
						// 必须删除
						iterator.remove();
					}
				}
			}
		}.start();
	}

	void toWrite(SocketChannel socketChannel) {
		new Thread() {
			public void run() {
				while (true) {
					System.out.println("Input::");
					ByteBuffer bb = getInput();
					System.out.println(new String(bb.array(), bb.position(), bb.limit()));
					try {
						socketChannel.write(bb);
					} catch (IOException e) {
						e.printStackTrace();
					}
					try {
						Thread.sleep(3000);
					} catch (InterruptedException e) {
					}
				}
			};
		}.start();
	}

	ByteBuffer getInput() {
		try {
			this.bufferedInputStream = new BufferedInputStream(System.in);
			byte[] bs = new byte[200];
			int len = bufferedInputStream.read(bs);
			return ByteBuffer.wrap(bs,0, len);  // 必须指定poistion 和 limit,否则将后面的空格也当做了数据。
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}

	}
}

 

 

相关标签: socket nio