diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index ec2cb69..69ce96a 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -157,11 +157,11 @@ type MuxPackager struct { func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) { Self.Flag = flag Self.Id = id - if flag == MUX_NEW_MSG || flag == MUX_NEW_MSG_PART || flag == MUX_PING_FLAG { + switch flag { + case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: err = Self.BasePackager.NewPac(content...) - } - if flag == MUX_MSG_SEND_OK { - // MUX_MSG_SEND_OK only allows one data + case MUX_MSG_SEND_OK: + // MUX_MSG_SEND_OK contains two data switch content[0].(type) { case int: Self.Window = uint32(content[0].(int)) @@ -187,10 +187,10 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { if err != nil { return } - if Self.Flag == MUX_NEW_MSG || Self.Flag == MUX_NEW_MSG_PART || Self.Flag == MUX_PING_FLAG { + switch Self.Flag { + case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: err = Self.BasePackager.Pack(writer) - } - if Self.Flag == MUX_MSG_SEND_OK { + case MUX_MSG_SEND_OK: err = binary.Write(writer, binary.LittleEndian, Self.Window) if err != nil { return @@ -210,10 +210,10 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { if err != nil { return } - if Self.Flag == MUX_NEW_MSG || Self.Flag == MUX_NEW_MSG_PART || Self.Flag == MUX_PING_FLAG { + switch Self.Flag { + case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: err = Self.BasePackager.UnPack(reader) - } - if Self.Flag == MUX_MSG_SEND_OK { + case MUX_MSG_SEND_OK: err = binary.Read(reader, binary.LittleEndian, &Self.Window) if err != nil { return diff --git a/lib/mux/conn.go b/lib/mux/conn.go index f4d5396..5dd69ea 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -2,7 +2,6 @@ package mux import ( "errors" - "github.com/astaxie/beego/logs" "io" "math" "net" @@ -169,14 +168,13 @@ func (Self *ReceiveWindow) ReadSize() (n uint32) { n = Self.readLength Self.readLength = 0 Self.bufQueue.mutex.Unlock() - Self.count += 1 return } func (Self *ReceiveWindow) CalcSize() { // calculating maximum receive window size if Self.count == 0 { - logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) + //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) n := uint32(2 * Self.mux.latency * Self.bw.Get()) if n < 8192 { n = 8192 @@ -185,10 +183,11 @@ func (Self *ReceiveWindow) CalcSize() { n = Self.bufQueue.Len() } // set the minimal size - logs.Warn("n", n) + //logs.Warn("n", n) Self.maxSize = n Self.count = -5 } + Self.count += 1 } func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) { @@ -205,7 +204,7 @@ func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err //logs.Warn("read session calc size ", Self.maxSize) // calculating the receive window size Self.CalcSize() - logs.Warn("read session calc size finish", Self.maxSize) + //logs.Warn("read session calc size finish", Self.maxSize) if Self.RemainingSize() == 0 { Self.windowFull = true //logs.Warn("window full true", Self.windowFull) @@ -325,10 +324,10 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) { return true } if readLength == 0 && Self.maxSize == windowSize { - logs.Warn("waiting for another window size") + //logs.Warn("waiting for another window size") return false // waiting for receive another usable window size } - logs.Warn("set send window size to ", windowSize, readLength) + //logs.Warn("set send window size to ", windowSize, readLength) Self.mutex.Lock() Self.slide(windowSize, readLength) if Self.setSizeWait { @@ -513,7 +512,7 @@ func (Self *bandwidth) calcWriteBandwidth() { func (Self *bandwidth) Get() (bw float64) { // The zero value, 0 for numeric types if Self.writeBW == 0 && Self.readBW == 0 { - logs.Warn("bw both 0") + //logs.Warn("bw both 0") return 100 } if Self.writeBW == 0 && Self.readBW != 0 { diff --git a/lib/mux/mux.go b/lib/mux/mux.go index e6a9e67..9023b82 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -150,7 +150,7 @@ func (s *Mux) writeBuf() { func (s *Mux) ping() { go func() { - now, _ := time.Now().MarshalText() + now, _ := time.Now().UTC().MarshalText() s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) // send the ping flag and get the latency first ticker := time.NewTicker(time.Second * 15) @@ -166,7 +166,7 @@ func (s *Mux) ping() { if (math.MaxInt32 - s.id) < 10000 { s.id = 0 } - now, _ := time.Now().MarshalText() + now, _ := time.Now().UTC().MarshalText() s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) if s.pingOk > 10 && s.connType == "kcp" { s.Close() @@ -188,8 +188,11 @@ func (s *Mux) pingReturn() { break } _ = now.UnmarshalText(data) - s.latency = time.Since(now).Seconds() - s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil) + s.latency = time.Now().UTC().Sub(now).Seconds() / 2 + //logs.Warn("latency", s.latency) + if s.latency <= 0 { + logs.Warn("latency err", s.latency) + } } }() } @@ -214,9 +217,10 @@ func (s *Mux) readSession() { s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) continue case common.MUX_PING_FLAG: //ping - s.pingCh <- pack.Content + s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content) continue case common.MUX_PING_RETURN: + s.pingCh <- pack.Content continue } if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {