基于TCP异步Socket模型的介绍
程序员文章站
2023-12-13 10:49:40
tcp异步socket模型c#的tcp异步socket模型是通过begin-end模式实现的。例如提供beginconnect、beginaccept、 beginsend...
tcp异步socket模型
c#的tcp异步socket模型是通过begin-end模式实现的。例如提供beginconnect、beginaccept、 beginsend 和 beginreceive等。
复制代码 代码如下:
iasyncresult beginaccept(asynccallback callback, object state);
asynccallback回调在函数执行完毕后执行。state对象被用于在执行函数和回调函数间传输信息。
复制代码 代码如下:
socket socket = new socket(
addressfamily.internetwork,
sockettype.stream,
protocoltype.tcp);
ipendpoint iep = new ipendpoint(ipaddress.any, 8888);
socket.bind(iep);
socket.listen(5);
socket.beginaccept (new asynccallback(callbackaccept), socket);
private void callbackaccept(iasyncresult iar)
{
socket server = (socket)iar.asyncstate;
socket client = server.endaccept(iar);
}
则在accept一个tcpclient,需要维护tcpclient列表。
复制代码 代码如下:
private list<tcpclientstate> clients;
异步tcp服务器完整实现
复制代码 代码如下:
/// <summary>
/// 异步tcp服务器
/// </summary>
public class asynctcpserver : idisposable
{
#region fields
private tcplistener listener;
private list<tcpclientstate> clients;
private bool disposed = false;
#endregion
#region ctors
/// <summary>
/// 异步tcp服务器
/// </summary>
/// <param name="listenport">监听的端口</param>
public asynctcpserver(int listenport)
: this(ipaddress.any, listenport)
{
}
/// <summary>
/// 异步tcp服务器
/// </summary>
/// <param name="localep">监听的终结点</param>
public asynctcpserver(ipendpoint localep)
: this(localep.address, localep.port)
{
}
/// <summary>
/// 异步tcp服务器
/// </summary>
/// <param name="localipaddress">监听的ip地址</param>
/// <param name="listenport">监听的端口</param>
public asynctcpserver(ipaddress localipaddress, int listenport)
{
address = localipaddress;
port = listenport;
this.encoding = encoding.default;
clients = new list<tcpclientstate>();
listener = new tcplistener(address, port);
listener.allownattraversal(true);
}
#endregion
#region properties
/// <summary>
/// 服务器是否正在运行
/// </summary>
public bool isrunning { get; private set; }
/// <summary>
/// 监听的ip地址
/// </summary>
public ipaddress address { get; private set; }
/// <summary>
/// 监听的端口
/// </summary>
public int port { get; private set; }
/// <summary>
/// 通信使用的编码
/// </summary>
public encoding encoding { get; set; }
#endregion
#region server
/// <summary>
/// 启动服务器
/// </summary>
/// <returns>异步tcp服务器</returns>
public asynctcpserver start()
{
if (!isrunning)
{
isrunning = true;
listener.start();
listener.beginaccepttcpclient(
new asynccallback(handletcpclientaccepted), listener);
}
return this;
}
/// <summary>
/// 启动服务器
/// </summary>
/// <param name="backlog">
/// 服务器所允许的挂起连接序列的最大长度
/// </param>
/// <returns>异步tcp服务器</returns>
public asynctcpserver start(int backlog)
{
if (!isrunning)
{
isrunning = true;
listener.start(backlog);
listener.beginaccepttcpclient(
new asynccallback(handletcpclientaccepted), listener);
}
return this;
}
/// <summary>
/// 停止服务器
/// </summary>
/// <returns>异步tcp服务器</returns>
public asynctcpserver stop()
{
if (isrunning)
{
isrunning = false;
listener.stop();
lock (this.clients)
{
for (int i = 0; i < this.clients.count; i++)
{
this.clients[i].tcpclient.client.disconnect(false);
}
this.clients.clear();
}
}
return this;
}
#endregion
#region receive
private void handletcpclientaccepted(iasyncresult ar)
{
if (isrunning)
{
tcplistener tcplistener = (tcplistener)ar.asyncstate;
tcpclient tcpclient = tcplistener.endaccepttcpclient(ar);
byte[] buffer = new byte[tcpclient.receivebuffersize];
tcpclientstate internalclient
= new tcpclientstate(tcpclient, buffer);
lock (this.clients)
{
this.clients.add(internalclient);
raiseclientconnected(tcpclient);
}
networkstream networkstream = internalclient.networkstream;
networkstream.beginread(
internalclient.buffer,
0,
internalclient.buffer.length,
handledatagramreceived,
internalclient);
tcplistener.beginaccepttcpclient(
new asynccallback(handletcpclientaccepted), ar.asyncstate);
}
}
private void handledatagramreceived(iasyncresult ar)
{
if (isrunning)
{
tcpclientstate internalclient = (tcpclientstate)ar.asyncstate;
networkstream networkstream = internalclient.networkstream;
int numberofreadbytes = 0;
try
{
numberofreadbytes = networkstream.endread(ar);
}
catch
{
numberofreadbytes = 0;
}
if (numberofreadbytes == 0)
{
// connection has been closed
lock (this.clients)
{
this.clients.remove(internalclient);
raiseclientdisconnected(internalclient.tcpclient);
return;
}
}
// received byte and trigger event notification
byte[] receivedbytes = new byte[numberofreadbytes];
buffer.blockcopy(
internalclient.buffer, 0,
receivedbytes, 0, numberofreadbytes);
raisedatagramreceived(internalclient.tcpclient, receivedbytes);
raiseplaintextreceived(internalclient.tcpclient, receivedbytes);
// continue listening for tcp datagram packets
networkstream.beginread(
internalclient.buffer,
0,
internalclient.buffer.length,
handledatagramreceived,
internalclient);
}
}
#endregion
#region events
/// <summary>
/// 接收到数据报文事件
/// </summary>
public event eventhandler<tcpdatagramreceivedeventargs<byte[]>> datagramreceived;
/// <summary>
/// 接收到数据报文明文事件
/// </summary>
public event eventhandler<tcpdatagramreceivedeventargs<string>> plaintextreceived;
private void raisedatagramreceived(tcpclient sender, byte[] datagram)
{
if (datagramreceived != null)
{
datagramreceived(this, new tcpdatagramreceivedeventargs<byte[]>(sender, datagram));
}
}
private void raiseplaintextreceived(tcpclient sender, byte[] datagram)
{
if (plaintextreceived != null)
{
plaintextreceived(this, new tcpdatagramreceivedeventargs<string>(
sender, this.encoding.getstring(datagram, 0, datagram.length)));
}
}
/// <summary>
/// 与客户端的连接已建立事件
/// </summary>
public event eventhandler<tcpclientconnectedeventargs> clientconnected;
/// <summary>
/// 与客户端的连接已断开事件
/// </summary>
public event eventhandler<tcpclientdisconnectedeventargs> clientdisconnected;
private void raiseclientconnected(tcpclient tcpclient)
{
if (clientconnected != null)
{
clientconnected(this, new tcpclientconnectedeventargs(tcpclient));
}
}
private void raiseclientdisconnected(tcpclient tcpclient)
{
if (clientdisconnected != null)
{
clientdisconnected(this, new tcpclientdisconnectedeventargs(tcpclient));
}
}
#endregion
#region send
/// <summary>
/// 发送报文至指定的客户端
/// </summary>
/// <param name="tcpclient">客户端</param>
/// <param name="datagram">报文</param>
public void send(tcpclient tcpclient, byte[] datagram)
{
if (!isrunning)
throw new invalidprogramexception("this tcp server has not been started.");
if (tcpclient == null)
throw new argumentnullexception("tcpclient");
if (datagram == null)
throw new argumentnullexception("datagram");
tcpclient.getstream().beginwrite(
datagram, 0, datagram.length, handledatagramwritten, tcpclient);
}
private void handledatagramwritten(iasyncresult ar)
{
((tcpclient)ar.asyncstate).getstream().endwrite(ar);
}
/// <summary>
/// 发送报文至指定的客户端
/// </summary>
/// <param name="tcpclient">客户端</param>
/// <param name="datagram">报文</param>
public void send(tcpclient tcpclient, string datagram)
{
send(tcpclient, this.encoding.getbytes(datagram));
}
/// <summary>
/// 发送报文至所有客户端
/// </summary>
/// <param name="datagram">报文</param>
public void sendall(byte[] datagram)
{
if (!isrunning)
throw new invalidprogramexception("this tcp server has not been started.");
for (int i = 0; i < this.clients.count; i++)
{
send(this.clients[i].tcpclient, datagram);
}
}
/// <summary>
/// 发送报文至所有客户端
/// </summary>
/// <param name="datagram">报文</param>
public void sendall(string datagram)
{
if (!isrunning)
throw new invalidprogramexception("this tcp server has not been started.");
sendall(this.encoding.getbytes(datagram));
}
#endregion
#region idisposable members
/// <summary>
/// performs application-defined tasks associated with freeing,
/// releasing, or resetting unmanaged resources.
/// </summary>
public void dispose()
{
dispose(true);
gc.suppressfinalize(this);
}
/// <summary>
/// releases unmanaged and - optionally - managed resources
/// </summary>
/// <param name="disposing"><c>true</c> to release
/// both managed and unmanaged resources; <c>false</c>
/// to release only unmanaged resources.</param>
protected virtual void dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
try
{
stop();
if (listener != null)
{
listener = null;
}
}
catch (socketexception ex)
{
exceptionhandler.handle(ex);
}
}
disposed = true;
}
}
#endregion
}
使用举例
复制代码 代码如下:
class program
{
static asynctcpserver server;
static void main(string[] args)
{
logfactory.assign(new consolelogfactory());
server = new asynctcpserver(9999);
server.encoding = encoding.utf8;
server.clientconnected +=
new eventhandler<tcpclientconnectedeventargs>(server_clientconnected);
server.clientdisconnected +=
new eventhandler<tcpclientdisconnectedeventargs>(server_clientdisconnected);
server.plaintextreceived +=
new eventhandler<tcpdatagramreceivedeventargs<string>>(server_plaintextreceived);
server.start();
console.writeline("tcp server has been started.");
console.writeline("type something to send to client...");
while (true)
{
string text = console.readline();
server.sendall(text);
}
}
static void server_clientconnected(object sender, tcpclientconnectedeventargs e)
{
logger.debug(string.format(cultureinfo.invariantculture,
"tcp client {0} has connected.",
e.tcpclient.client.remoteendpoint.tostring()));
}
static void server_clientdisconnected(object sender, tcpclientdisconnectedeventargs e)
{
logger.debug(string.format(cultureinfo.invariantculture,
"tcp client {0} has disconnected.",
e.tcpclient.client.remoteendpoint.tostring()));
}
static void server_plaintextreceived(object sender, tcpdatagramreceivedeventargs<string> e)
{
if (e.datagram != "received")
{
console.write(string.format("client : {0} --> ",
e.tcpclient.client.remoteendpoint.tostring()));
console.writeline(string.format("{0}", e.datagram));
server.send(e.tcpclient, "server has received you text : " + e.datagram);
}
}
}