P2P
程序员文章站
2022-07-01 11:26:40
...
客户端
/**
*
*/
package com.netty.client;
import java.awt.BorderLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollBar;
import javax.swing.JTextArea;
import javax.swing.JTextField;
import com.alibaba.fastjson.JSON;
import com.netty.server.MessageDataUtils;
import com.netty.server.MessageData;
/**
* @author HUANGLIAO322
*
*/
public class ClientP2P implements ActionListener {
//是否停止
public static int STOP=0;
public JFrame frame;
public JTextArea info; //聊天信息
public JTextField fromText ; //发送方
public JTextField toText ; //接受方
public JTextField fromField;
public JTextField toField;
public JTextField msgText; //发送消息
public JButton sendButton; //发送按钮
public Socket socket; //
public OutputStream outputStream;
public PrintWriter printWriter;
InputStream is ;
DataInputStream dataInputStream;
public ClientP2P(Socket socket){
this.socket = socket;
try {
outputStream = socket.getOutputStream();
is = socket.getInputStream();
dataInputStream = new DataInputStream(is);
} catch (IOException e1) {
e1.printStackTrace();
}
frame=new JFrame("P2P聊天");
frame.setSize(500, 400);
info=new JTextArea(10,30);
info.setLineWrap(true); //**自动换行功能
info.setWrapStyleWord(true);
info.setEditable(false);
JScrollBar scroll=new JScrollBar();
scroll.add(info);
JPanel infopanel=new JPanel();
infopanel.add(info,BorderLayout.WEST);
fromField = new JTextField(10);
fromField.setText("from");
fromField.setEditable(false);
fromText = new JTextField(30);
toField = new JTextField(10);
toField.setText("to");
toField.setEditable(false);
toText = new JTextField(30);
JPanel panelCenter = new JPanel();
panelCenter.add(fromField,BorderLayout.NORTH);
panelCenter.add(fromText, BorderLayout.NORTH);
panelCenter.add(toField,BorderLayout.NORTH);
panelCenter.add(toText,BorderLayout.NORTH);
JPanel panel=new JPanel();
msgText=new JTextField(30);
sendButton=new JButton("发送");
panel.add(msgText);
panel.add(sendButton);
frame.add(infopanel,BorderLayout.NORTH);
frame.add(panelCenter, BorderLayout.CENTER);
frame.add(panel,BorderLayout.SOUTH);
frame.setVisible(true);
sendButton.addActionListener(this);
new Thread(){
@Override
public void run() {
while(true){
System.out.println("11111");
if(STOP==1){
break;
}
try {
System.out.println("22222");
byte[] buffer = new byte[1024];
int len = -1;
int position = 0;
while((len = dataInputStream.read(buffer))!=-1){
position = position + len;
if(position>=3){
byte flag = buffer[0];
byte module = buffer[1];
byte size = buffer[2];
String rs = new String(buffer);
System.out.println("rs:"+rs);
System.out.println(position);
System.out.println(size);
if(position-1==size+3){
String result = new String(buffer,3,position);
System.out.println("result:"+result);
MessageData msgres = JSON.parseObject(result, MessageData.class);
info.append(msgres.getFrom() + ":"+ msgres.getData());
info.append("\n");
break;
}
}
}
System.out.println("3333");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
frame.addWindowListener(new WindowAdapter(){
public void windowClosing(WindowEvent e){
STOP = 1;
System.exit(0);
}
});
}
public static void main(String[] args) throws Exception {
String serverIP="127.0.0.1";
int port=8090;
Socket socket = new Socket(serverIP,port);
new ClientP2P(socket);
}
public void send(String msg){
try {
MessageData data = new MessageData();
data.setFrom(this.fromText.getText());
data.setTo(this.toText.getText());
data.setData(msg);
System.out.println("发送:"+JSON.toJSONString(data));
this.outputStream.write(MessageDataUtils.encodeProtocol(JSON.toJSONString(data)));
this.outputStream.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void actionPerformed(ActionEvent e) {
if(e.getSource()==this.sendButton)
{
try{
String msg=this.msgText.getText();
if(msg.length()>0)
{
// this.info.append("我说:"+msg);
this.info.append(this.fromText.getText() + ":"+ msg);
this.info.append("\n");
this.send(msg);
this.msgText.setText("");
}
}
catch(Exception ee){}
}
}
}
服务器
/**
*
*/
package com.netty.server;
/**
* @author HUANGLIAO322
*/
public class MessageData {
private String from;
private String to;
//0失败回执,1成功回执,2登录,3消息
private int cmd;
private String data;
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public int getCmd() {
return cmd;
}
public void setCmd(int cmd) {
this.cmd = cmd;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
/**
*
*/
package com.netty.server;
/**
* @author HUANGLIAO322
*
*/
public class MessageDataUtils {
private static final byte FLAG = 'S';
private static final byte MODULE = '1';
public static final int HEADER_LEN = 3;
/***
* 编码协议
* @param data
* @return
*/
public static byte[] encodeProtocol(String data) {
byte[] bs = data.getBytes();
byte lenght = (byte)data.getBytes().length;
byte[] buffer = new byte[HEADER_LEN + lenght];
buffer[0] = FLAG;
buffer[1] = MODULE;
buffer[2] = lenght;
for(int i=3,j=0; i<buffer.length;i++,j++){
buffer[i] = bs[j];
}
return buffer;
}
/***
* 解码协议
* @param buf
* @return
*/
public static String decodeProtocol(byte[] buffer) {
byte[] buf = buffer;
if(buf.length>=HEADER_LEN){
if(buf[0]==FLAG){
if(buf[1]==MODULE){
int lenght = buf[2];
if(buf.length==HEADER_LEN+lenght){
byte[] msg = new byte[lenght];
for(int i=0;i<lenght;i++){
msg[i] = buf[3+i];
}
return new String(msg);
}
}
}
}
return null;
}
}
/**
*
*/
package com.netty.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
import com.alibaba.fastjson.JSON;
/**
* @author HUANGLIAO322
*
*/
/**
* 解码请求消息
*/
public class RequestDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
System.out.println("ZukServerRequestDecoder.decode.解码请求消息");
while(true){
byte[] dst = new byte[buffer.readableBytes()];
buffer.readBytes(dst);
if(dst.length >= MessageDataUtils.HEADER_LEN){
String message = MessageDataUtils.decodeProtocol(dst);
if(message!=null){
// System.out.println("message:"+message);
MessageData msg = JSON.parseObject(message, MessageData.class);
out.add(msg);
}
}
else{
break;
}
}
//数据不完整,等待完整的数据包
return ;
}
}
/**
*
*/
package com.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author HUANGLIAO322
*
*/
public class Server {
public static void main(String[] args) throws Exception {
ServerBootstrap server = new ServerBootstrap();
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
server.group(parentGroup, childGroup);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//接受消息解码
ch.pipeline().addLast(new RequestDecoder());
//消息处理
ch.pipeline().addLast(new ZukServerHandler());
//返回消息编码
ch.pipeline().addLast(new ZukServerResponseEncoder());
}
});
server.option(ChannelOption.SO_BACKLOG, 2048);// 链接缓冲池队列大小
server.bind(8090).sync();
}
}
/**
*
*/
package com.netty.server;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
/**
* @author HUANGLIAO322
*
*/
public class ZukServerHandler extends SimpleChannelInboundHandler<MessageData> {
private Logger logger = LoggerFactory.getLogger(ZukServerHandler.class);
public static Map<String,String> map = new ConcurrentHashMap<String, String>();
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static Map<String, Channel> mapChannels = new ConcurrentHashMap<String, Channel>();
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
// channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
String address = incoming.remoteAddress().toString();
System.out.println("handlerAdded:"+address);
map.put(address, "");
channels.add(incoming);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
// Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
// channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
// A closed Channel is automatically removed from ChannelGroup,
// so there is no need to do "channels.remove(ctx.channel());"
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageData msg)
throws Exception {
System.out.println("msg--:"+JSON.toJSONString(msg));
final Channel channel = ctx.channel();
// channel.writeAndFlush(msg);
// IoSession session = ChannelUtils.getSessionBy(channel);
String from = msg.getFrom();
String to = msg.getTo();
if(mapChannels.get(from)==null){
System.out.println("from "+from + " is not on line.");
mapChannels.put(from, channel);
}
if(mapChannels.get(to)==null){
Channel fromChannel = mapChannels.get(from);
MessageData res = new MessageData();
res.setFrom("server");
res.setTo(from);
res.setData(to+" is not online");
fromChannel.writeAndFlush(res);
}
else{
//向接收端发送消息
mapChannels.get(to).writeAndFlush(msg);
//带消息回执
Channel fromChannel = mapChannels.get(from);
MessageData res = new MessageData();
res.setFrom("server");
res.setTo(from);
res.setData(to+" yes");
fromChannel.writeAndFlush(res);
}
//
// for (Map.Entry<String, Channel> entry: mapChannels.entrySet()) {
// System.out.println("user:" + entry.getKey());
// entry.getValue().writeAndFlush("hello.world.");
// }
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive断开连接");
// Session session = new SessionImpl(ctx.channel());
// Object object = session.getAttachment();
// if(object != null){
// Player player = (Player)object;
// SessionManager.removeSession(player.getPlayerId());
// }
}
}
/**
*
*/
package com.netty.server;
import javax.jws.WebParam.Mode;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* @author HUANGLIAO322
*
*/
/**
*
* +---------------------------------------------------------------------------
* | 包头(int)4 | 模块(int)4 | 命令(int)4 | 结果码 (int)4 | 数据长度(int)4 | ......
* +---------------------------------------------------------------------------
*
* 编码返回
*/
public class ZukServerResponseEncoder extends MessageToByteEncoder<MessageData>{
@Override
protected void encode(ChannelHandlerContext ctx, MessageData response, ByteBuf buffer)
throws Exception {
System.out.println("返回"+JSON.toJSONString(response));
byte[] bs = MessageDataUtils.encodeProtocol(JSON.toJSONString(response));
buffer.writeBytes(bs);
// System.out.println("返回请求:" + "module:" +response.getModule() +" cmd:" + response.getCmd() + " code:" + response.getCode());
//
// //包头
// buffer.writeInt(123456789);
// //module
// buffer.writeInt(response.getModule());
// //cmd
// buffer.writeInt(response.getCmd());
// //结果码
// buffer.writeInt(200);
// //长度
// int lenth = response.getData()==null? 0 : response.getData().length;
// if(lenth <= 0){
// buffer.writeInt(lenth);
// }else{
// buffer.writeInt(lenth);
// buffer.writeBytes(response.getData());
// }
//
}
}