From 51a3787708bf735d24b239f2b56f44af2f2e0ee7 Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Sun, 1 Sep 2019 22:52:48 +0800 Subject: [PATCH] remove mux write queue, add connection close once --- lib/mux/conn.go | 46 +++++++++++++++-------------- lib/mux/mux.go | 77 ++++++++++++++++++++++++++++--------------------- 2 files changed, 69 insertions(+), 54 deletions(-) diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 7430454..09cac16 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -6,6 +6,7 @@ import ( "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "io" "net" + "sync" "time" ) @@ -27,9 +28,10 @@ type conn struct { isClose bool readWait bool sendClose bool // MUX_CONN_CLOSE already send - writeClose bool // close conn Write + closeFlag bool // close conn flag hasWrite int mux *Mux + once sync.Once } func NewConn(connId int32, mux *Mux) *conn { @@ -41,6 +43,7 @@ func NewConn(connId int32, mux *Mux) *conn { readQueue: NewQueue(), connId: connId, mux: mux, + once: sync.Once{}, } return c } @@ -72,18 +75,14 @@ func (s *conn) Read(buf []byte) (n int, err error) { logs.Warn("conn close by read pop err", s.connId, err) s.Close() return 0, io.EOF + } else if node.val == nil { + s.sendClose = true + logs.Warn("conn close by read ", s.connId) + s.Close() } else { - if node.val == nil { - //close - s.sendClose = true - logs.Warn("conn close by read ", s.connId) - s.Close() - return 0, io.EOF - } else { - s.readBuffer = node.val - s.endRead = node.l - s.startRead = 0 - } + s.readBuffer = node.val + s.endRead = node.l + s.startRead = 0 } } if len(buf) < s.endRead-s.startRead { @@ -101,7 +100,7 @@ func (s *conn) Write(buf []byte) (n int, err error) { if s.isClose { return 0, errors.New("the conn has closed") } - if s.writeClose { + if s.closeFlag { s.sendClose = true logs.Warn("conn close by write ", s.connId) s.Close() @@ -131,11 +130,11 @@ func (s *conn) write(buf []byte, ch chan struct{}) { l := len(buf) for { if l-start > common.PoolSizeCopy { - logs.Warn("conn write > poolsizecopy") + //logs.Warn("conn write > poolsizecopy") s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:start+common.PoolSizeCopy]) start += common.PoolSizeCopy } else { - logs.Warn("conn write <= poolsizecopy, start, len", start, l) + //logs.Warn("conn write <= poolsizecopy, start, len", start, l) s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf[start:l]) break } @@ -144,20 +143,25 @@ func (s *conn) write(buf []byte, ch chan struct{}) { } func (s *conn) Close() (err error) { + s.once.Do(s.closeProcess) + return +} + +func (s *conn) closeProcess() { if s.isClose { - return errors.New("the conn has closed") + logs.Warn("has closed ", s.connId) + return } s.isClose = true + s.readWait = false s.mux.connMap.Delete(s.connId) common.CopyBuff.Put(s.readBuffer) - if s.readWait { - s.readCh <- struct{}{} - } + close(s.readCh) s.readQueue.Clear() if !s.mux.IsClose { if !s.sendClose { - logs.Warn("conn send close") - go s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) + logs.Warn("conn send close", s.connId) + s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) } } return diff --git a/lib/mux/mux.go b/lib/mux/mux.go index c40427d..1b90c60 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -41,7 +41,7 @@ func NewMux(c net.Conn, connType string) *Mux { go m.readSession() //ping go m.ping() - go m.writeSession() + //go m.writeSession() return m } @@ -68,7 +68,7 @@ func (s *Mux) NewConn() (*conn, error) { func (s *Mux) Accept() (net.Conn, error) { if s.IsClose { - return nil, errors.New("accpet error,the conn has closed") + return nil, errors.New("accpet error,the mux has closed") } conn := <-s.newConnCh if conn == nil { @@ -91,31 +91,29 @@ func (s *Mux) sendInfo(flag uint8, id int32, content []byte) { buf := common.BuffPool.Get() //defer pool.BuffPool.Put(buf) pack := common.MuxPack.Get() + defer common.MuxPack.Put(pack) err = pack.NewPac(flag, id, content) if err != nil { - s.Close() logs.Warn("new pack err", err) common.BuffPool.Put(buf) return } err = pack.Pack(buf) if err != nil { - s.Close() logs.Warn("pack err", err) common.BuffPool.Put(buf) return } - s.writeQueue <- buf - common.MuxPack.Put(pack) - //_, err = buf.WriteTo(s.conn) - //if err != nil { - // s.Close() - // logs.Warn("write err, close mux", err) - //} - //if flag == common.MUX_CONN_CLOSE { - //} - //if flag == common.MUX_NEW_MSG { - //} + if pack.Flag == common.MUX_NEW_CONN { + logs.Warn("sendinfo mux new conn, insert to write queue", pack.Id) + } + //s.writeQueue <- buf + _, err = buf.WriteTo(s.conn) + if err != nil { + s.Close() + logs.Warn("write err, close mux", err) + } + common.BuffPool.Put(buf) return } @@ -127,7 +125,7 @@ func (s *Mux) writeSession() { n, err := buf.WriteTo(s.conn) common.BuffPool.Put(buf) if err != nil || int(n) != l { - logs.Warn("close from write to ", err, n, l) + logs.Warn("close from write session fail ", err, n, l) s.Close() break } @@ -163,8 +161,8 @@ func (s *Mux) ping() { func (s *Mux) readSession() { go func() { + pack := common.MuxPack.Get() for { - pack := common.MuxPack.Get() if pack.UnPack(s.conn) != nil { break } @@ -173,10 +171,13 @@ func (s *Mux) readSession() { //logs.Warn(pack.Flag, pack.Id, pack.Length, string(pack.Content[:10])) } } + if pack.Flag == common.MUX_NEW_CONN { + logs.Warn("unpack mux new conn", pack.Id) + } s.pingOk = 0 switch pack.Flag { case common.MUX_NEW_CONN: //new conn - logs.Warn("mux new conn", pack.Id) + logs.Warn("rec mux new conn", pack.Id) conn := NewConn(pack.Id, s) s.connMap.Set(pack.Id, conn) //it has been set before send ok s.newConnCh <- conn @@ -194,38 +195,48 @@ func (s *Mux) readSession() { switch pack.Flag { case common.MUX_NEW_MSG: //new msg from remote conn //insert wait queue - logs.Warn("mux new msg ", pack.Id) - conn.readQueue.Push(NewBufNode(pack.Content, int(pack.Length))) + buf := common.CopyBuff.Get() + buf = pack.Content + logs.Warn("rec mux new msg ", pack.Id, string(buf[0:15])) + conn.readQueue.Push(NewBufNode(buf, int(pack.Length))) //judge len if >xxx ,send stop if conn.readWait { conn.readWait = false conn.readCh <- struct{}{} } + continue case common.MUX_NEW_CONN_OK: //conn ok - logs.Warn("mux new conn ok ", pack.Id) + logs.Warn("rec mux new conn ok ", pack.Id) conn.connStatusOkCh <- struct{}{} + continue case common.MUX_NEW_CONN_Fail: - logs.Warn("mux new conn fail", pack.Id) + logs.Warn("rec mux new conn fail", pack.Id) conn.connStatusFailCh <- struct{}{} + continue case common.MUX_CONN_CLOSE: //close the connection - logs.Warn("mux conn close", pack.Id) + logs.Warn("rec mux conn close", pack.Id) s.connMap.Delete(pack.Id) - conn.writeClose = true - conn.readQueue.Push(NewBufNode(nil, 0)) - if conn.readWait { - logs.Warn("close read wait", pack.Id) - conn.readWait = false - conn.readCh <- struct{}{} + conn.closeFlag = true + conn.sendClose = true + if !conn.isClose { + conn.readQueue.Push(NewBufNode(nil, 0)) + if conn.readWait { + logs.Warn("mux conn close read wait", pack.Id) + conn.readWait = false + conn.readCh <- struct{}{} + logs.Warn("mux conn close read wait pass", pack.Id) + } } logs.Warn("receive mux conn close, finish", conn.connId) + continue } - } else if pack.Flag == common.MUX_NEW_MSG { - common.CopyBuff.Put(pack.Content) } else if pack.Flag == common.MUX_CONN_CLOSE { - logs.Warn("mux conn close no id ", pack.Id) + logs.Warn("rec mux conn close no id ", pack.Id) + continue } - common.MuxPack.Put(pack) } + common.MuxPack.Put(pack) + logs.Warn("read session put pack ", pack.Id) s.Close() }() select {