欢迎光临
我们一直在努力

c#中异步基于消息通信的完成端口的TCP/IP协议的组件实现(源代码)-.NET教程,C#语言

建站超值云服务器,限时71元/月

源代码:

客户端:

using system;

using system.io;

using system.componentmodel;

using system.collections;

using system.diagnostics;

using system.net;

using system.net.sockets;

using system.threading;

namespace mykj

{

?///

?/// mytcpipclient 提供在net tcp_ip 协议上基于消息的客户端

?///

?public class mytcpipclient : system.componentmodel.component

?{

??private int buffersize=2048;

??private string tcpipserverip="127.0.0.1";

??private int tcpipserverport=11000;

??private socket clientsocket=null;

??private manualresetevent connectdone = new manualresetevent(false);

??private manualresetevent senddone = new manualresetevent(false);

??

??private void connectcallback(iasyncresult ar)

??{

???try

???{

????socket client = (socket) ar.asyncstate;

????client.endconnect(ar);

????

???}

???catch (exception e)

???{

????onerrorevent(new erroreventargs(e));

???}

???finally

???{

????connectdone.set();

???}

??}

??private void sendcallback(iasyncresult ar)

??{

???try

???{

????socket client = (socket) ar.asyncstate;

????int bytessent = client.endsend(ar);

????//console.writeline(bytessent);

???}

???catch (exception e)

???{

????onerrorevent(new erroreventargs(e));

???}

???finally

???{

????senddone.set();

???}

??}

??private void receivecallback(iasyncresult ar)

??{

???socket handler=null;

???try

???{

????lock(ar)

????{

?????stateobject state = (stateobject) ar.asyncstate;

?????handler = state.worksocket;

?????

?????int bytesread = handler.endreceive(ar);

?????

?????if (bytesread > 0)

?????{

??????int readpiont=0;?

??????while(readpiont??????{?

???????if(state.cortrol==0 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<24)&0xff000000;

????????state.packsize=bi1;

????????readpiont++;

????????state.cortrol=1;

???????}

??????

???????if(state.cortrol==1 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<16)&0x00ff0000;

????????state.packsize=state.packsize+bi1;

????????readpiont++;

????????state.cortrol=2;

???????}

??????

???????if(state.cortrol==2 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<8)&0x0000ff00;

????????state.packsize=state.packsize+bi1;

????????readpiont++;

????????state.cortrol=3;

???????}

???????

???????if(state.cortrol==3 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=bi1&0xff;

????????state.packsize=state.packsize+bi1-4;

????????readpiont++;

????????state.cortrol=4;

???????}

???????

???????if(state.cortrol==4 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<24)&0xff000000;

????????state.residualsize=bi1;

????????readpiont++;

????????state.cortrol=5;

????????state.packsize-=1;

???????}

???????

???????if(state.cortrol==5 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<16)&0x00ff0000;

????????state.residualsize=state.residualsize+bi1;

????????readpiont++;

????????state.cortrol=6;

????????state.packsize-=1;

???????}

???????

???????if(state.cortrol==6 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<8)&0x0000ff00;

????????state.residualsize=state.residualsize+bi1;

????????readpiont++;

????????state.cortrol=7;

????????state.packsize-=1;

???????}

???????if(state.cortrol==7 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=bi1&0xff;

????????state.residualsize=state.residualsize+bi1;

????????state.datastream.setlength(0);

????????state.datastream.position=0;

????????

????????readpiont++;

????????state.cortrol=8;

????????state.packsize-=1;

???????}

???????

???????if(state.cortrol==8 && readpiont???????{

????????int bi1=bytesread-readpiont;

????????int bi2=(int)(state.residualsize-state.datastream.length);

????????if(bi1>=bi2)

????????{

?????????state.datastream.write(state.buffer,readpiont,bi2);

?????????readpiont+=bi2;

?????????oninceptevent(new incepteventargs(state.datastream,handler));

?????????state.cortrol=9;

?????????state.packsize-=bi2;

?????????}

????????else

????????{

?????????state.datastream.write(state.buffer,readpiont,bi1);

?????????readpiont+=bi1;

?????????state.packsize-=bi1;

????????}

???????}

???????if(state.cortrol==9 && readpiont???????{

????????int bi1=bytesread-readpiont;

????????if(bi1????????{

?????????state.packsize=state.packsize-bi1;

?????????readpiont+=bi1;

????????}?

????????else

????????{

?????????state.cortrol=0;

?????????readpiont+=(int)state.packsize;

????????}

???????}

??????}

?????}

?????else

?????{

??????throw(new exception("读入的数据小于1bit"));

?????}

?????if(handler.connected==true)

?????{

??????handler.beginreceive(state.buffer,0,buffersize,0,

???????new asynccallback(receivecallback), state);

?????}

????}

???}

???catch (exception e)

???{

????onerrorevent(new erroreventargs(e));

????

???}

??}

??

??///

??/// 连接服务器

??///

??public void conn()

??{

???try

???{

????clientsocket=new socket(addressfamily.internetwork,sockettype.stream,protocoltype.tcp);?

????ipaddress ipaddress = ipaddress.parse(tcpipserverip);

????ipendpoint remoteep = new ipendpoint(ipaddress, tcpipserverport);

????connectdone.reset();

????clientsocket.beginconnect(remoteep,new asynccallback(connectcallback),clientsocket);

????connectdone.waitone();

????stateobject state = new stateobject(buffersize,clientsocket);

????clientsocket.beginreceive(state.buffer,0,buffersize,0,

?????new asynccallback(receivecallback), state);?

???}

???catch(exception e)

???{

????onerrorevent(new erroreventargs(e));

???}

???

??}

??///

??/// 断开连接

??///

??public void close()

??{

???try

???{

????clientsocket.shutdown(socketshutdown.both);

????clientsocket.close();

???}

???catch(exception e)

???{

????onerrorevent(new erroreventargs(e));

???}

???

??}

??///

??/// 发送一个流数据

??///

??/// 数据流

??public void send(stream astream)

??{

???try

???{

????if(clientsocket.connected==false)

????{

?????throw(new exception("没有连接服务器不可以发送信息!"));

????}

????astream.position=0;

????byte[] bytedata=new byte[buffersize];

????int bi1=(int)((astream.length+8)/buffersize);

????int bi2=(int)astream.length;

????if(((astream.length+8)%buffersize)>0)

????{

?????bi1=bi1+1;

????}

????bi1=bi1*buffersize;

????

????bytedata[0]=system.convert.tobyte(bi1>>24);

????bytedata[1]=system.convert.tobyte((bi1&0x00ff0000)>>16);

????bytedata[2]=system.convert.tobyte((bi1&0x0000ff00)>>8);

????bytedata[3]=system.convert.tobyte((bi1&0x000000ff));

????

????bytedata[4]=system.convert.tobyte(bi2>>24);

????bytedata[5]=system.convert.tobyte((bi2&0x00ff0000)>>16);

????bytedata[6]=system.convert.tobyte((bi2&0x0000ff00)>>8);

????bytedata[7]=system.convert.tobyte((bi2&0x000000ff));

????

????int n = astream.read(bytedata, 8, bytedata.length-8);

????

????while (n>0)

????{

?????clientsocket.beginsend(bytedata, 0, bytedata.length, 0,?new asynccallback(sendcallback), clientsocket);

?????senddone.waitone();

?????bytedata=new byte[buffersize];

?????n = astream.read(bytedata,0,bytedata.length);

????}

???}

???catch (exception e)

???{

????onerrorevent(new erroreventargs(e));

???}

??}

??

??///

??/// 构造

??///

??/// 父控件

??public mytcpipclient(system.componentmodel.icontainer container)

??{

???container.add(this);

???initializecomponent();

???//

???// todo: 在 initializecomponent 调用后添加任何构造函数代码

???//

??}

??///

??/// 构造

??///

??public mytcpipclient()

??{

???initializecomponent();

???//

???// todo: 在 initializecomponent 调用后添加任何构造函数代码

???//

??}

??#region component designer generated code

??///

??/// 设计器支持所需的方法 – 不要使用代码编辑器修改

??/// 此方法的内容。

??///

??private void initializecomponent()

??{

??}

??#endregion

??///

??/// 要连接的服务器ip地址

??///

??public string tcpipserverip

??{

???get

???{

????return tcpipserverip;

???}

???set

???{

????tcpipserverip=value;

???}

??}

??///

??/// 要连接的服务器所使用的端口

??///

??public int tcpipserverport

??{

???get

???{

????return tcpipserverport;

???}

???set

???{

????tcpipserverport=value;

???}

??}

??///

??/// 缓冲器大小

??///

??public int buffersize

??{

???get

???{

????return buffersize;

???}

???set

???{

????buffersize=value;

???}

??}

??

??///

??/// 连接的活动状态

??///

??public bool activ

??{

???get

???{

????if(clientsocket==null)

????{

?????return false;

????}

????return clientsocket.connected;

???}

??}

??///

??/// 接收到数据引发的事件

??///

??public event inceptevent incept;

??///

??/// 引发接收数据事件

??///

??/// 接收数据

??protected virtual void oninceptevent(incepteventargs e)

??{

???if (incept != null)

???{

????incept(this, e);

???}

??}

??///

??/// 发生错误引发的事件

??///

??public event errorevent error;

??///

??/// 引发错误事件

??///

??/// 错误数据

??protected virtual void onerrorevent(erroreventargs e)

??{

???if (error != null)

???{

????error(this, e);

???}

??}

??

?}

?

?///

?/// 接收数据事件

?///

?public class incepteventargs : eventargs

?{?

??private readonly stream datastream;

??private readonly socket clientsocket;

??///

??/// 构造

??///

??/// 接收到的数据

??/// 接收的插座

??public incepteventargs(stream astream,socket clientsocket)

??{

???datastream=astream;

???clientsocket=clientsocket;

??}

??///

??/// 接受的数据流

??///

??public stream astream

??{????

???get { return datastream;}?????

??}

??///

??/// 接收的插座

??///

??public socket clientsocket

??{????

???get { return clientsocket;}?????

??}

?}?

?///

?/// 定义接收委托

?///

?public delegate void inceptevent(object sender, incepteventargs e);

?///

?/// 错处事件

?///

?public class erroreventargs : eventargs

?{?

??private readonly exception error;

??///

??/// 构造

??///

??/// 错误信息对象

??public erroreventargs(exception error)

??{

???error=error;

??}

??///

??/// 错误信息对象

??///

??public exception error

??{????

???get { return error;}?????

??}

?}

?///

?/// 错误委托

?///

?public delegate void errorevent(object sender, erroreventargs e);

?

?

?

}

服务器端:

using system;

using system.io;

using system.componentmodel;

using system.collections;

using system.diagnostics;

using system.net;

using system.net.sockets;

using system.threading;

namespace mykj

{

?///

?/// mytcpipclient 提供在net tcp_ip 协议上基于消息的服务端

?///

?public class mytcpipserver : system.componentmodel.component

?{

??private int buffersize=2048;

??private string tcpipserverip="";

??private int tcpipserverport=11000;

??private socket listener=null;

??private manualresetevent alldone = new manualresetevent(false);

??private manualresetevent senddone = new manualresetevent(false);

??private thread thread=null;

??

??private void startlistening()

??{

???try

???{

????listener = new socket(addressfamily.internetwork,

?????sockettype.stream, protocoltype.tcp);

?????

????ipaddress ipaddress;

????if(tcpipserverip.trim()=="")

????{

?????ipaddress=ipaddress.any;?

????}

????else

????{

?????ipaddress=ipaddress.parse(tcpipserverip);

????}

????ipendpoint localendpoint = new ipendpoint(ipaddress, tcpipserverport);

????

????listener.bind(localendpoint);

????listener.listen(10);

????while (true)

????{

?????alldone.reset();

?????listener.beginaccept(new asynccallback(acceptcallback),listener);

?????alldone.waitone();

????}

???}

???catch (exception e)

???{

????onerrorserverevent(new errorservereventargs(e,listener));?

???}

??}

??

??private void readcallback(iasyncresult ar)

??{

???socket handler=null;

???try

???{

????lock(ar)

????{

?????stateobject state = (stateobject) ar.asyncstate;

?????handler = state.worksocket;

?????

?????int bytesread = handler.endreceive(ar);

?????

?????if (bytesread > 0)

?????{

??????int readpiont=0;?

??????while(readpiont??????{?

???????if(state.cortrol==0 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<24)&0xff000000;

????????state.packsize=bi1;

????????readpiont++;

????????state.cortrol=1;

???????}

??????

???????if(state.cortrol==1 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<16)&0x00ff0000;

????????state.packsize=state.packsize+bi1;

????????readpiont++;

????????state.cortrol=2;

???????}

??????

???????if(state.cortrol==2 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<8)&0x0000ff00;

????????state.packsize=state.packsize+bi1;

????????readpiont++;

????????state.cortrol=3;

???????}

???????

???????if(state.cortrol==3 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=bi1&0xff;

????????state.packsize=state.packsize+bi1-4;

????????readpiont++;

????????state.cortrol=4;

???????}

???????

???????if(state.cortrol==4 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<24)&0xff000000;

????????state.residualsize=bi1;

????????readpiont++;

????????state.cortrol=5;

????????state.packsize-=1;

???????}

???????

???????if(state.cortrol==5 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<16)&0x00ff0000;

????????state.residualsize=state.residualsize+bi1;

????????readpiont++;

????????state.cortrol=6;

????????state.packsize-=1;

???????}

???????

???????if(state.cortrol==6 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=(bi1<<8)&0x0000ff00;

????????state.residualsize=state.residualsize+bi1;

????????readpiont++;

????????state.cortrol=7;

????????state.packsize-=1;

???????}

???????if(state.cortrol==7 && readpiont???????{

????????long bi1=state.buffer[readpiont];

????????bi1=bi1&0xff;

????????state.residualsize=state.residualsize+bi1;

????????state.datastream.setlength(0);

????????state.datastream.position=0;

????????

????????readpiont++;

????????state.cortrol=8;

????????state.packsize-=1;

???????}

???????

???????if(state.cortrol==8 && readpiont???????{

????????int bi1=bytesread-readpiont;

????????int bi2=(int)(state.residualsize-state.datastream.length);

????????if(bi1>=bi2)

????????{

?????????state.datastream.write(state.buffer,readpiont,bi2);

?????????readpiont+=bi2;

?????????oninceptserverevent(new inceptservereventargs(state.datastream,state.worksocket,this));

?????????state.cortrol=9;

?????????state.packsize-=bi2;

?????????

?????????

????????}

????????else

????????{

?????????state.datastream.write(state.buffer,readpiont,bi1);

?????????readpiont+=bi1;

?????????state.packsize-=bi1;

????????}

???????}

???????if(state.cortrol==9 && readpiont???????{

????????int bi1=bytesread-readpiont;

????????if(bi1????????{

?????????state.packsize=state.packsize-bi1;

?????????readpiont+=bi1;

????????}?

????????else

????????{

?????????state.cortrol=0;

?????????readpiont+=(int)state.packsize;

????????}

???????}

??????}

??????if(handler.connected==true)

??????{

???????handler.beginreceive(state.buffer,0,buffersize,0,

????????new asynccallback(readcallback), state);

??????}

?????}

?????else

?????{

??????handler.shutdown(socketshutdown.both);

??????handler.close();

??????//throw(new exception("读入的数据小于1bit"));

?????}

????}

???}

???catch (exception e)

???{

????onerrorserverevent(new errorservereventargs(e,handler));

????

???}

??}

??

??private void sendcallback(iasyncresult ar)

??{

???socket client = (socket) ar.asyncstate;

???try

???{

????int bytessent = client.endsend(ar);

???}

???catch (exception e)

???{

????onerrorserverevent(new errorservereventargs(e,client));

???}

???finally

???{

????senddone.set();

???}

??}

??

??private void acceptcallback(iasyncresult ar)

??{

???socket handler=null;

???try

???{

????socket listener = (socket) ar.asyncstate;

????handler= listener.endaccept(ar);

????stateobject state = new stateobject(buffersize,handler);

????state.worksocket = handler;

????handler.beginreceive(state.buffer,0,buffersize,0,

?????new asynccallback(readcallback), state);

???}

???catch (exception e)

???{

????onerrorserverevent(new errorservereventargs(e,handler));

???}

???finally

???{

????alldone.set();

???}

??}

??

??///

??/// 析构

??///

??/// 不知道

??protected override void dispose(bool disposing)

??{

???abort();

??}

??

??///

??/// 引发接收事件

??///

??/// 数据

??protected virtual void oninceptserverevent(inceptservereventargs e)

??{

???if (inceptserver != null)

???{

????inceptserver(this, e);

???}

??}

??///

??/// 引发错误事件

??///

??/// 数据

??protected virtual void onerrorserverevent(errorservereventargs e)

??{

???if (errorserver != null)

???{

????errorserver(this, e);

???}

??}

??

??///

??/// 开始监听访问

??///

??public void listening()

??{

???//startlistening();

???thread=new thread(new threadstart(startlistening));

???thread.name="mytcpipserver.listening";

???thread.start();

??}

??///

??/// 异常中止服务

??///

??public void abort()

??{

???if(thread!=null)

???{

????thread.abort();

????listener.close();

???}

??}

??

??///

??///构造

??///

??/// 父控件

??public mytcpipserver(system.componentmodel.icontainer container)

??{

???container.add(this);

???initializecomponent();

???//

???// todo: 在 initializecomponent 调用后添加任何构造函数代码

???//

??}

??///

??/// 构造

??///

??public mytcpipserver()

??{

???initializecomponent();

???//

???// todo: 在 initializecomponent 调用后添加任何构造函数代码

???//

??}

??#region component designer generated code

??///

??/// 设计器支持所需的方法 – 不要使用代码编辑器修改

??/// 此方法的内容。

??///

??private void initializecomponent()

??{

??}

??#endregion

??///

??/// 要连接的服务器ip地址

??///

??public string tcpipserverip

??{

???get

???{

????return tcpipserverip;

???}

???set

???{

????tcpipserverip=value;

???}

??}

??///

??/// 要连接的服务器所使用的端口

??///

??public int tcpipserverport

??{

???get

???{

????return tcpipserverport;

???}

???set

???{

????tcpipserverport=value;

???}

??}

??///

??/// 缓冲器大小

??///

??public int buffersize

??{

???get

???{

????return buffersize;

???}

???set

???{

????buffersize=value;

???}

??}

??

??///

??/// 连接的活动状态

??///

??public bool activ

??{

???get

???{

????return listener.connected;

???}

???//set

???//{

???//?activ=value;

???//}

??}

??///

??/// 发送一个流数据

??///

??public void send(socket clientsocket,stream astream)

??{

???try

???{

????if(clientsocket.connected==false)

????{

?????throw(new exception("没有连接客户端不可以发送信息!"));

????}

????astream.position=0;

????byte[] bytedata=new byte[buffersize];

????int bi1=(int)((astream.length+8)/buffersize);

????int bi2=(int)astream.length;

????if(((astream.length+8)%buffersize)>0)

????{

?????bi1=bi1+1;

????}

????bi1=bi1*buffersize;

????

????bytedata[0]=system.convert.tobyte(bi1>>24);

????bytedata[1]=system.convert.tobyte((bi1&0x00ff0000)>>16);

????bytedata[2]=system.convert.tobyte((bi1&0x0000ff00)>>8);

????bytedata[3]=system.convert.tobyte((bi1&0x000000ff));

????

????bytedata[4]=system.convert.tobyte(bi2>>24);

????bytedata[5]=system.convert.tobyte((bi2&0x00ff0000)>>16);

????bytedata[6]=system.convert.tobyte((bi2&0x0000ff00)>>8);

????bytedata[7]=system.convert.tobyte((bi2&0x000000ff));

????

????int n = astream.read(bytedata, 8, bytedata.length-8);

????

????while (n>0)

????{

?????clientsocket.beginsend(bytedata, 0, bytedata.length, 0,?new asynccallback(sendcallback), clientsocket);

?????senddone.waitone();

?????bytedata=new byte[buffersize];

?????n = astream.read(bytedata,0,bytedata.length);

????}

???}

???catch (exception e)

???{

????onerrorserverevent(new errorservereventargs(e,clientsocket));

???}

??}

??

??///

??/// 接收到数据事件

??///

??public event inceptserverevent inceptserver;

??///

??/// 发生错误事件

??///

??public event errorserverevent errorserver;

?}

?///

?/// 状态对象

?///

?public class stateobject

?{

??///

??/// 构造

??///

??/// 缓存

??/// 工作的插座

??public stateobject(int buffersize,socket worksocket)

??{

???buffer = new byte[buffersize];

???worksocket=worksocket;

??}

??///

??/// 缓存

??///

??public byte[] buffer = null;

??///

??/// 工作插座

??///

??public socket worksocket = null;?????????????

??///

??/// 数据流

??///

??public stream datastream=new memorystream();

??///

??/// 剩余大小

??///

??public long residualsize=0;

??///

??/// 数据包大小

??///

??public long packsize=0;

??///

??/// 计数器

??///

??public int cortrol=0;

?}

?

?///

?/// 接收事件

?///

?public class inceptservereventargs : eventargs

?{?

??private readonly stream datastream;

??private readonly socket serversocket;

??private readonly mytcpipserver tcpipserver;

??///

??/// 构造

??///

??/// 数据

??/// 工作插座

??/// 提供服务的tcp/ip对象

??public inceptservereventargs(stream astream,socket serversocket,mytcpipserver tcpipserver)

??{

???datastream=astream;

???serversocket=serversocket;

???tcpipserver=tcpipserver;

??}

??? ///

??? /// 数据

??? ///

??public stream astream

??{????

???get { return datastream;}?????

??}

??///

??/// 工作插座

??///

??public socket serversocket

??{????

???get { return serversocket;}?????

??}

??///

??/// 提供tcp/ip服务的服务器对象.

??///

??public mytcpipserver tcpipserver

??{????

???get { return tcpipserver;}?????

??}

?}

?///

?/// 接收数据委托

?///

? public delegate void inceptserverevent(object sender, inceptservereventargs e);

?///

?/// 错误事件委托

?///

?public class errorservereventargs : eventargs

?{?

??private readonly exception error;

??private readonly socket serversocket;

??///

??/// 构造

??///

??/// 数据

??/// 问题插座

??public errorservereventargs(exception error,socket serversocket)

??{

???error=error;

???serversocket=serversocket;

??}

??? ///

??? /// 数据

??? ///

??public exception error

??{????

???get { return error;}?????

??}

??///

??/// 问题插座

??///

??public socket serversocket

??{????

???get { return serversocket;}?????

??}

?}

?///

?///错误事件委托

?///

? public delegate void errorserverevent(object sender, errorservereventargs e);

}

完成端口方式是nt目前最好的一种通信方式,它在大吞吐,大数量连接下,比其它方式有无法比拟的效率以及性能,因c#在的socket在内部使用的完成端口方式,在程序实现上异常简单,请大家体会.

?

?

赞(0)
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com 特别注意:本站所有转载文章言论不代表本站观点! 本站所提供的图片等素材,版权归原作者所有,如需使用,请与原作者联系。未经允许不得转载:IDC资讯中心 » c#中异步基于消息通信的完成端口的TCP/IP协议的组件实现(源代码)-.NET教程,C#语言
分享到: 更多 (0)

相关推荐

  • 暂无文章