欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

P2P

程序员文章站 2022-07-01 11:26:40
...

客户端

/**
 * 
 */
package com.netty.client;

import java.awt.BorderLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;

import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollBar;
import javax.swing.JTextArea;
import javax.swing.JTextField;

import com.alibaba.fastjson.JSON;
import com.netty.server.MessageDataUtils;
import com.netty.server.MessageData;

/**
 * @author HUANGLIAO322
 * 
 */
public class ClientP2P implements ActionListener {
	
	 //是否停止  
    public static int STOP=0;  
	
    public JFrame frame;  
      
    public JTextArea info; 			//聊天信息  
    
    public JTextField fromText ; 	//发送方
    public JTextField toText ; 		//接受方
    
    public JTextField fromField;	
    public JTextField toField;
    
    public JTextField msgText; 	//发送消息  
      
    public JButton sendButton;  //发送按钮
    
    public Socket socket;		//
    
    public OutputStream outputStream;
    public PrintWriter printWriter;
    

	InputStream is  ;
	DataInputStream dataInputStream;
	
    
    public ClientP2P(Socket socket){
    	
    	this.socket = socket;
		try {
			outputStream = socket.getOutputStream();
			is = socket.getInputStream();
			dataInputStream = new DataInputStream(is);
		} catch (IOException e1) {
			e1.printStackTrace();
		}
    	
    	frame=new JFrame("P2P聊天");  
		frame.setSize(500, 400);
		
		info=new JTextArea(10,30);
        info.setLineWrap(true);		//**自动换行功能   
        info.setWrapStyleWord(true);
        info.setEditable(false);
		
        JScrollBar scroll=new JScrollBar();
        scroll.add(info); 
        
        JPanel infopanel=new JPanel();  
        infopanel.add(info,BorderLayout.WEST);
        fromField = new JTextField(10);
        fromField.setText("from");
        fromField.setEditable(false);
        fromText = new JTextField(30);
        toField = new JTextField(10);
        toField.setText("to");
        toField.setEditable(false);
        toText =  new JTextField(30);
        
        JPanel panelCenter = new JPanel();
        panelCenter.add(fromField,BorderLayout.NORTH);
        panelCenter.add(fromText, BorderLayout.NORTH);
        panelCenter.add(toField,BorderLayout.NORTH);
        panelCenter.add(toText,BorderLayout.NORTH);
        
        JPanel panel=new JPanel();
        
        msgText=new JTextField(30);

        sendButton=new JButton("发送");
        panel.add(msgText);
        panel.add(sendButton);
        frame.add(infopanel,BorderLayout.NORTH);
        frame.add(panelCenter, BorderLayout.CENTER);
        frame.add(panel,BorderLayout.SOUTH);
        frame.setVisible(true);
        
        sendButton.addActionListener(this);  
        
        new Thread(){
			@Override
			public void run() {

				while(true){
					System.out.println("11111");
					if(STOP==1){
						break;
					}
					try {
						System.out.println("22222");
						
						byte[] buffer = new byte[1024];
						int len = -1;
						int position = 0;
						while((len = dataInputStream.read(buffer))!=-1){
							position = position + len;
							if(position>=3){
								byte flag = buffer[0];
								byte module = buffer[1];
								byte size = buffer[2];
								String rs = new String(buffer);
								System.out.println("rs:"+rs);
								System.out.println(position);
								System.out.println(size);
								if(position-1==size+3){
									String result = new String(buffer,3,position);
									System.out.println("result:"+result);
									MessageData msgres = JSON.parseObject(result, MessageData.class);
									info.append(msgres.getFrom() + ":"+ msgres.getData());
				                    info.append("\n");
									break;
								}
							}
						}
						System.out.println("3333");
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			}
        }.start();
          
        frame.addWindowListener(new WindowAdapter(){
            public   void   windowClosing(WindowEvent e){
            	STOP = 1;
                System.exit(0);
            }
         });
    }
    
    
	public static void main(String[] args) throws Exception {
		
		String serverIP="127.0.0.1";  
        int port=8090;
		Socket socket = new Socket(serverIP,port);
		
		new ClientP2P(socket);
		
	}
	
	public void send(String msg){

		try {
			MessageData data = new MessageData();
			data.setFrom(this.fromText.getText());
			data.setTo(this.toText.getText());
			data.setData(msg);
			System.out.println("发送:"+JSON.toJSONString(data));
			this.outputStream.write(MessageDataUtils.encodeProtocol(JSON.toJSONString(data)));
			this.outputStream.flush();
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
	

	@Override
	public void actionPerformed(ActionEvent e) {
		  
        if(e.getSource()==this.sendButton)  
        {  
            try{  
                String msg=this.msgText.getText();  
                if(msg.length()>0)  
                {
//                    this.info.append("我说:"+msg);
                	this.info.append(this.fromText.getText() + ":"+ msg);
                    this.info.append("\n");
                    this.send(msg);
                    this.msgText.setText("");  
                }
            }  
            catch(Exception ee){}  
        }  
	}

}

 

 

服务器

/**
 * 
 */
package com.netty.server;

/**
 * @author HUANGLIAO322
 */
public class MessageData {
	
	private String from;
	
	private String to;
	
	//0失败回执,1成功回执,2登录,3消息
	private int cmd;
	
	private String data;
	
	public String getFrom() {
		return from;
	}
	public void setFrom(String from) {
		this.from = from;
	}
	public String getTo() {
		return to;
	}
	public void setTo(String to) {
		this.to = to;
	}
	public int getCmd() {
		return cmd;
	}
	public void setCmd(int cmd) {
		this.cmd = cmd;
	}
	public String getData() {
		return data;
	}
	public void setData(String data) {
		this.data = data;
	}

}
/**
 * 
 */
package com.netty.server;

/**
 * @author HUANGLIAO322
 *
 */
public class MessageDataUtils  {
	
	private static final byte FLAG = 'S';
	private static final byte MODULE = '1';
	public static final int HEADER_LEN = 3;
	
	/***
	 * 编码协议
	 * @param data
	 * @return
	 */
	public static byte[] encodeProtocol(String data) {
		
		byte[] bs = data.getBytes();
		byte lenght = (byte)data.getBytes().length;
		byte[] buffer = new byte[HEADER_LEN + lenght];
		buffer[0] = FLAG;
		buffer[1] = MODULE;
		buffer[2] = lenght;
		for(int i=3,j=0; i<buffer.length;i++,j++){
			buffer[i] = bs[j];
		}
		return buffer;
		
	}
	

	/***
	 * 解码协议
	 * @param buf
	 * @return
	 */
	public static String decodeProtocol(byte[] buffer) {
		
		byte[] buf = buffer;
		if(buf.length>=HEADER_LEN){
			if(buf[0]==FLAG){
				if(buf[1]==MODULE){
					int lenght = buf[2];
					if(buf.length==HEADER_LEN+lenght){
						byte[] msg = new byte[lenght];
						for(int i=0;i<lenght;i++){
							msg[i] = buf[3+i];
						}
						return new String(msg);
					}
				}
			}
		}
		return null;
		
	}

}
/**
 * 
 */
package com.netty.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import com.alibaba.fastjson.JSON;

/**
 * @author HUANGLIAO322
 *
 */
/**
 * 解码请求消息
 */
public class RequestDecoder extends ByteToMessageDecoder{
	
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
 
		System.out.println("ZukServerRequestDecoder.decode.解码请求消息");
		
		while(true){

			byte[] dst = new byte[buffer.readableBytes()];
			buffer.readBytes(dst);
			
			if(dst.length >= MessageDataUtils.HEADER_LEN){
				String message = MessageDataUtils.decodeProtocol(dst);
				if(message!=null){
//					System.out.println("message:"+message);
					MessageData msg = JSON.parseObject(message, MessageData.class);
					out.add(msg);
				}
			}
			else{
				break;
			}
		}
		
		//数据不完整,等待完整的数据包
		return ;
	
	}

}
/**
 * 
 */
package com.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author HUANGLIAO322
 *
 */
public class Server {


	public static void main(String[] args) throws Exception {
		
		ServerBootstrap server = new ServerBootstrap();
		
		EventLoopGroup parentGroup = new NioEventLoopGroup();
		EventLoopGroup childGroup = new NioEventLoopGroup();
		server.group(parentGroup, childGroup);
		
		server.channel(NioServerSocketChannel.class);
		
		server.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				//接受消息解码
				ch.pipeline().addLast(new RequestDecoder());
				//消息处理
				ch.pipeline().addLast(new ZukServerHandler());
				//返回消息编码
				ch.pipeline().addLast(new ZukServerResponseEncoder());
				
			}
		});
	
		server.option(ChannelOption.SO_BACKLOG, 2048);// 链接缓冲池队列大小
		server.bind(8090).sync();
		
	}

	
}
/**
 * 
 */
package com.netty.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;

/**
 * @author HUANGLIAO322
 *
 */
public class ZukServerHandler  extends SimpleChannelInboundHandler<MessageData> {
	 
	private Logger logger = LoggerFactory.getLogger(ZukServerHandler.class);
	
	public static Map<String,String> map = new ConcurrentHashMap<String, String>();
	public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
	
	public static Map<String, Channel> mapChannels = new ConcurrentHashMap<String, Channel>();
	
	@Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
		
		Channel incoming = ctx.channel();
		// Broadcast a message to multiple Channels
		// channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
		String address = incoming.remoteAddress().toString();
		System.out.println("handlerAdded:"+address);
		map.put(address, "");
		channels.add(incoming);
    }
	
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
		// Channel incoming = ctx.channel();
		
		// Broadcast a message to multiple Channels
		// channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");

		// A closed Channel is automatically removed from ChannelGroup,
		// so there is no need to do "channels.remove(ctx.channel());"
    }
	
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, MessageData msg)
			throws Exception {
		
		System.out.println("msg--:"+JSON.toJSONString(msg));
		
		final Channel channel = ctx.channel();
//		channel.writeAndFlush(msg);
//		IoSession session = ChannelUtils.getSessionBy(channel);
		
		String from = msg.getFrom();
		String to = msg.getTo();
		
		if(mapChannels.get(from)==null){
			System.out.println("from "+from + " is not on line.");
			mapChannels.put(from, channel);
		}
		if(mapChannels.get(to)==null){
			Channel fromChannel = mapChannels.get(from);
			MessageData res = new MessageData();
			res.setFrom("server");
			res.setTo(from);
			res.setData(to+" is not online");
			fromChannel.writeAndFlush(res);
		}
		else{
			
			//向接收端发送消息
			mapChannels.get(to).writeAndFlush(msg);
			
			//带消息回执
			Channel fromChannel = mapChannels.get(from);
			MessageData res = new MessageData();
			res.setFrom("server");
			res.setTo(from);
			res.setData(to+" yes");
			fromChannel.writeAndFlush(res);
		}
//		
//		for (Map.Entry<String, Channel>  entry: mapChannels.entrySet()) {
//			System.out.println("user:" + entry.getKey());
//			entry.getValue().writeAndFlush("hello.world.");
//		}
		
	}
	
	
	 
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("channelInactive断开连接");
//		Session session = new SessionImpl(ctx.channel());
//		Object object = session.getAttachment();
//		if(object != null){
//			Player player = (Player)object;
//			SessionManager.removeSession(player.getPlayerId());
//		}
	}
 

}
/**
 * 
 */
package com.netty.server;

import javax.jws.WebParam.Mode;

import com.alibaba.fastjson.JSON;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * @author HUANGLIAO322
 *
 */
/**
 * 
 * +---------------------------------------------------------------------------
 * | 包头(int)4 | 模块(int)4  | 命令(int)4  |  结果码 (int)4   | 数据长度(int)4  | ......
 * +---------------------------------------------------------------------------
 * 
 * 编码返回
 */
public class ZukServerResponseEncoder extends MessageToByteEncoder<MessageData>{
 
	@Override
	protected void encode(ChannelHandlerContext ctx, MessageData response, ByteBuf buffer)
			throws Exception {
		
		System.out.println("返回"+JSON.toJSONString(response));
		byte[] bs = MessageDataUtils.encodeProtocol(JSON.toJSONString(response));
		buffer.writeBytes(bs);
		
		
//		System.out.println("返回请求:" + "module:" +response.getModule() +" cmd:" + response.getCmd() + " code:" + response.getCode());
//		
//		//包头
//		buffer.writeInt(123456789);
//		//module
//		buffer.writeInt(response.getModule());
//		//cmd
//		buffer.writeInt(response.getCmd());
//		//结果码
//		buffer.writeInt(200);
//		//长度
//		int lenth = response.getData()==null? 0 : response.getData().length;
//		if(lenth <= 0){
//			buffer.writeInt(lenth);
//		}else{
//			buffer.writeInt(lenth);
//			buffer.writeBytes(response.getData());
//		}
//	
		
	} 
	

}