高性能tcpserver – 1.网络通信协议

高性能tcpserver – 2.创建高性能socket服务器socketasynceventargs的实现(iocp)

高性能tcpserver – 3.命令通道(处理:掉包,粘包,垃圾包)

高性能tcpserver – 4.文件通道(处理:文件分包,支持断点续传)

高性能tcpserver – 5.客户端管理

高性能tcpserver – 6.代码下载

 

socketasynceventargs对象管理 — 用于checkout/checkin socketasynceventargs对象

socketargspool socketargspool = new socketargspool(max_clientcount);

this.m_eventargs = this.m_socketargspool.checkout();// 初始化对象

this.m_bufferpool.checkin(m_eventargs);// 回收对象

 

socketargsbufferpool对象管理 — 用于checkout/checkin socketasynceventargs的buffer

socketargsbufferpool bufferpool = new socketargsbufferpool(max_clientcount, max_clientbuffersize);

this.m_bufferpool.checkout(this.m_eventargs);// 设置setbuffer

this.m_bufferpool.checkin(m_eventargs);// 回收对象

 

socketentitypool对象管理 — 用于checkout/checkin socketentity

socketentitypool socketentitypool = new socketentitypool(max_clientcount, max_clientbuffersize);// 初始化

m_socketentity = this.m_socketentitypool.checkout();

m_socketentity.socketclient = socket;

m_bufferrecv = m_socketentity.bufferrecv; m_bufferrecv.clear();// 每个client的接收缓冲区

m_handle = m_socketentity.protocolhandle;// 每个client的处理类

m_analysis = m_socketentity.protocolanalysis;// 每个client的解析类

this.m_socketentitypool.checkin(socketentity);// 回收对象

 

部分代码

服务器监听和接收客户端连接

public void start(int port)

        {

            ipendpoint ipep = new ipendpoint(ipaddress.any, port);

            this.m_listenersocket = new socket(addressfamily.internetwork, sockettype.stream, protocoltype.tcp);

            this.m_listenersocket.bind(ipep);

            this.m_listenersocket.listen(100);

            listenforconnection(m_listenerargs);

        }

 

        void listenforconnection(socketasynceventargs args)

        {

            lock (this)

            {

                args.acceptsocket = null;

                m_listenersocket.invokeasyncmethod(new socketasyncmethod(m_listenersocket.acceptasync), acceptasynccompleted, args);

            }

        }

 

        void acceptasynccompleted(object sender, socketasynceventargs e)

        {

            if (e.socketerror == socketerror.operationaborted)

            {

                cloghelp.appendlog(“[error] acceptasynccompleted:socketerror.operationaborted”);

                return; //server was stopped

            }

 

            if (e.socketerror == socketerror.success)

            {

                socket acceptsocket = e.acceptsocket;

                if (acceptsocket != null)

                {

                    if (connections + 1 <= max_clientcount)

                    {

                        ipendpoint clientep = (ipendpoint)acceptsocket.remoteendpoint;

                        sn = string.format(“{0}:{1}”, clientep.address.tostring(), clientep.port);

                        lock (lockindex)

                        {

                            connections = interlocked.increment(ref connections);

                            program.addmessage(“已连接,sn:” + sn + “,当前连接数:” + cserverintance.connections.tostring());

                        }

                        csocketdao socketdao = new csocketdao(socketargspool, bufferpool, socketentitypool, acceptsocket, sn);

                        csingleton<cclientmgr>.getinstance().addonlineclient(socketdao);

                    }

                    else

                    {

                        program.addmessage(“超过最大连接数:” + max_clientcount.tostring() + “,拒接连接”);

                    }

                }

            }

            

            //continue to accept!

            listenforconnection(e);

        }

服务器数据处理

 void receiveasynccompleted(object sender, socketasynceventargs e)

        {

            if (!this.m_connected) return;

 

            try

            {

                m_eventargs = e;

 

                if (m_eventargs.bytestransferred == 0)

                {

                    socketcatcherror(“bytestransferred=0”); //graceful disconnect

                    return;

                }

                if (m_eventargs.socketerror != socketerror.success)

                {

                    socketcatcherror(“socketerror=” + (e.socketerror).tostring()); //not graceful disconnect

                    return;

                }

 

                //数据存储

                recvtime = datetime.now;

                m_bufferrecv.put(e);

                m_analysis.bagstatus = cprotocolanalysis.ebagstatus.bagnone;

 

                // 粘包处理

                while (m_bufferrecv.hasremaining())

                {

                    // 掉包处理

                    if (cprotocolanalysis.ebagstatus.baglost == m_analysis.bagstatus) break;

 

                    m_handle.process(m_bufferrecv, m_analysis, m_strsn);// 数据解析(垃圾包处理)

 

                    if (string.isnullorempty(m_struid))

                    {

                        if (!string.isnullorempty(m_analysis.uid))

                        {

                            m_struid = m_analysis.uid;

                            csingleton<cclientmgr>.getinstance().addclientuid(m_struid, m_strsn, this);

                        }

                    }

 

                    if (m_analysis.whethertosend)

                    {

                        string data = cprotocolbase.getprotocol(m_analysis);

                        sendrealtime(data);

                    }

                }

   

                listenfordata(e);

            }

            catch (exception ex)

            {

                cloghelp.appendlog(“[error] receiveasynccompleted,errmsg:” + ex.message);

            }

        }