diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 7d2e351..23bc5d5 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -23,7 +23,7 @@ type conn struct { receiveWindow *ReceiveWindow sendWindow *SendWindow once sync.Once - label string + //label string } func NewConn(connId int32, mux *Mux, label ...string) *conn { @@ -36,9 +36,9 @@ func NewConn(connId int32, mux *Mux, label ...string) *conn { sendWindow: new(SendWindow), once: sync.Once{}, } - if len(label) > 0 { - c.label = label[0] - } + //if len(label) > 0 { + // c.label = label[0] + //} c.receiveWindow.New(mux) c.sendWindow.New(mux) //logm := &connLog{ diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 585c980..b64243f 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -14,21 +14,20 @@ import ( type Mux struct { net.Listener - conn net.Conn - connMap *connMap - newConnCh chan *conn - id int32 - closeChan chan struct{} - IsClose bool - pingOk int - latency float64 - bw *bandwidth - pingCh chan []byte - pingTimer *time.Timer - connType string - writeQueue PriorityQueue - //bufQueue BytesQueue - //sync.Mutex + conn net.Conn + connMap *connMap + newConnCh chan *conn + id int32 + closeChan chan struct{} + IsClose bool + pingOk int + latency float64 + bw *bandwidth + pingCh chan []byte + pingCheck bool + connType string + writeQueue PriorityQueue + newConnQueue ConnQueue } func NewMux(c net.Conn, connType string) *Mux { @@ -39,15 +38,14 @@ func NewMux(c net.Conn, connType string) *Mux { connMap: NewConnMap(), id: 0, closeChan: make(chan struct{}, 1), - newConnCh: make(chan *conn, 10), + newConnCh: make(chan *conn), bw: new(bandwidth), IsClose: false, connType: connType, pingCh: make(chan []byte), - pingTimer: time.NewTimer(15 * time.Second), } m.writeQueue.New() - //m.bufQueue.New() + m.newConnQueue.New() //read session by flag m.readSession() //ping @@ -101,6 +99,8 @@ func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { err = pack.NewPac(flag, id, data...) if err != nil { common.MuxPack.Put(pack) + logs.Error("mux: new pack err") + s.Close() return } s.writeQueue.Push(pack) @@ -124,7 +124,7 @@ func (s *Mux) packBuf() { err := pack.Pack(buffer) common.MuxPack.Put(pack) if err != nil { - logs.Warn("pack err", err) + logs.Error("mux: pack err", err) common.BuffPool.Put(buffer) break } @@ -134,7 +134,7 @@ func (s *Mux) packBuf() { n, err := buffer.WriteTo(s.conn) //common.BuffPool.Put(buffer) if err != nil || int(n) != l { - logs.Warn("close from write session fail ", err, n, l) + logs.Error("mux: close from write session fail ", err, n, l) s.Close() break } @@ -170,21 +170,23 @@ func (s *Mux) ping() { for { if s.IsClose { ticker.Stop() - if !s.pingTimer.Stop() { - <-s.pingTimer.C - } break } select { case <-ticker.C: } + if s.pingCheck { + logs.Error("mux: ping time out") + s.Close() + // more than 5 seconds not receive the ping return package, + // mux conn is damaged, maybe a packet drop, close it + break + } now, _ := time.Now().UTC().MarshalText() s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) - if !s.pingTimer.Stop() { - <-s.pingTimer.C - } - s.pingTimer.Reset(15 * time.Second) + s.pingCheck = true if s.pingOk > 10 && s.connType == "kcp" { + logs.Error("mux: kcp ping err") s.Close() break } @@ -203,12 +205,9 @@ func (s *Mux) pingReturn() { } select { case data = <-s.pingCh: + s.pingCheck = false case <-s.closeChan: break - case <-s.pingTimer.C: - logs.Error("mux: ping time out") - s.Close() - break } _ = now.UnmarshalText(data) latency := time.Now().UTC().Sub(now).Seconds() / 2 @@ -222,6 +221,15 @@ func (s *Mux) pingReturn() { } func (s *Mux) readSession() { + go func() { + var connection *conn + for { + connection = s.newConnQueue.Pop() + s.connMap.Set(connection.connId, connection) //it has been set before send ok + s.newConnCh <- connection + s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) + } + }() go func() { pack := common.MuxPack.Get() var l uint16 @@ -233,18 +241,16 @@ func (s *Mux) readSession() { pack = common.MuxPack.Get() s.bw.StartRead() if l, err = pack.UnPack(s.conn); err != nil { + logs.Error("mux: read session unpack from connection err") + s.Close() break } s.bw.SetCopySize(l) s.pingOk = 0 switch pack.Flag { case common.MUX_NEW_CONN: //new connection - connection := NewConn(pack.Id, s, "npc ") - s.connMap.Set(pack.Id, connection) //it has been set before send ok - //go func(connection *conn) { - s.newConnCh <- connection - s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) - //}(connection) + connection := NewConn(pack.Id, s) + s.newConnQueue.Push(connection) continue case common.MUX_PING_FLAG: //ping s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content) @@ -261,6 +267,7 @@ func (s *Mux) readSession() { case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection err = s.newMsg(connection, pack) if err != nil { + logs.Error("mux: read session connection new msg err") connection.Close() } continue diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 4790779..0bfea18 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -33,6 +33,14 @@ func (Self *PriorityQueue) New() { } func (Self *PriorityQueue) Push(packager *common.MuxPackager) { + //logs.Warn("push start") + Self.push(packager) + Self.cond.Broadcast() + //logs.Warn("push finish") + return +} + +func (Self *PriorityQueue) push(packager *common.MuxPackager) { switch packager.Flag { case common.MUX_PING_FLAG, common.MUX_PING_RETURN: Self.highestChain.pushHead(unsafe.Pointer(packager)) @@ -44,8 +52,6 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) { default: Self.lowestChain.pushHead(unsafe.Pointer(packager)) } - Self.cond.Signal() - return } const maxStarving uint8 = 8 @@ -121,6 +127,72 @@ func (Self *PriorityQueue) Stop() { Self.cond.Broadcast() } +type ConnQueue struct { + chain *bufChain + starving uint8 + stop bool + cond *sync.Cond +} + +func (Self *ConnQueue) New() { + Self.chain = new(bufChain) + Self.chain.new(32) + locker := new(sync.Mutex) + Self.cond = sync.NewCond(locker) +} + +func (Self *ConnQueue) Push(connection *conn) { + Self.chain.pushHead(unsafe.Pointer(connection)) + Self.cond.Broadcast() + return +} + +func (Self *ConnQueue) Pop() (connection *conn) { + var iter bool + for { + connection = Self.pop() + if connection != nil { + return + } + if Self.stop { + return + } + if iter { + break + // trying to pop twice + } + iter = true + runtime.Gosched() + } + Self.cond.L.Lock() + defer Self.cond.L.Unlock() + for connection = Self.pop(); connection == nil; { + if Self.stop { + return + } + //logs.Warn("queue into wait") + Self.cond.Wait() + // wait for it with no more iter + connection = Self.pop() + //logs.Warn("queue wait finish", packager) + } + return +} + +func (Self *ConnQueue) pop() (connection *conn) { + ptr, ok := Self.chain.popTail() + if ok { + connection = (*conn)(ptr) + return + } + return +} + +func (Self *ConnQueue) Stop() { + Self.stop = true + Self.cond.Broadcast() +} + func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) { if uint16(len(buf)) != l { err = errors.New("ListElement: buf length not match") @@ -180,24 +252,12 @@ startPop: if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) { goto startPop // another goroutine is pushing } - t := Self.timeout.Sub(time.Now()) - if t <= 0 { - t = time.Minute - } - timer := time.NewTimer(t) - defer timer.Stop() - //logs.Warn("queue into wait") - select { - case <-Self.readOp: - //logs.Warn("queue wait finish") - goto startPop - case <-Self.stopOp: - err = io.EOF - return - case <-timer.C: - err = errors.New("mux.queue: read time out") + err = Self.waitPush() + // there is no more data in queue, wait for it + if err != nil { return } + goto startPop // wait finish, trying to get the new status } // length is not zero, so try to pop for { @@ -223,6 +283,29 @@ func (Self *ReceiveWindowQueue) allowPop() (closed bool) { } } +func (Self *ReceiveWindowQueue) waitPush() (err error) { + //logs.Warn("wait push") + //defer logs.Warn("wait push finish") + t := Self.timeout.Sub(time.Now()) + if t <= 0 { + t = time.Second * 10 + } + timer := time.NewTimer(t) + defer timer.Stop() + //logs.Warn("queue into wait") + select { + case <-Self.readOp: + //logs.Warn("queue wait finish") + return nil + case <-Self.stopOp: + err = io.EOF + return + case <-timer.C: + err = errors.New("mux.queue: read time out") + return + } +} + func (Self *ReceiveWindowQueue) Len() (n uint32) { ptrs := atomic.LoadUint64(&Self.lengthWait) n, _ = Self.chain.head.unpack(ptrs)