diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index cf4b988..c6956f5 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -246,6 +246,15 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) { return } +func (Self *MuxPackager) reset() { + Self.Id = 0 + Self.Flag = 0 + Self.Length = 0 + Self.Content = nil + Self.ReadLength = 0 + Self.Window = 0 +} + const ( ipV4 = 1 domainName = 3 diff --git a/lib/common/pool.go b/lib/common/pool.go index 3ed19dc..31931f9 100644 --- a/lib/common/pool.go +++ b/lib/common/pool.go @@ -93,18 +93,20 @@ type windowBufferPool struct { func (Self *windowBufferPool) New() { Self.pool = sync.Pool{ New: func() interface{} { - return make([]byte, PoolSizeWindow, PoolSizeWindow) + return make([]byte, PoolSizeWindow) }, } } func (Self *windowBufferPool) Get() (buf []byte) { buf = Self.pool.Get().([]byte) - return buf[:PoolSizeWindow] + buf = buf[:PoolSizeWindow] + return buf } func (Self *windowBufferPool) Put(x []byte) { - Self.pool.Put(x[:PoolSizeWindow]) // make buf to full + x = x[:0] // clean buf + Self.pool.Put(x) } type bufferPool struct { @@ -146,6 +148,7 @@ func (Self *muxPackagerPool) Get() *MuxPackager { } func (Self *muxPackagerPool) Put(pack *MuxPackager) { + pack.reset() Self.pool.Put(pack) } diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 28ec1de..0f28acb 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -2,6 +2,7 @@ package mux import ( "errors" + "github.com/astaxie/beego/logs" "io" "math" "net" @@ -215,10 +216,10 @@ func (Self *ReceiveWindow) calcSize() { if n < common.MAXIMUM_SEGMENT_SIZE*10 { n = common.MAXIMUM_SEGMENT_SIZE * 10 } - bufLen := Self.bufQueue.Len() - if n < bufLen { - n = bufLen - } + //bufLen := Self.bufQueue.Len() + //if n < bufLen { + // n = bufLen + //} if n < Self.maxSize/2 { n = Self.maxSize / 2 } @@ -227,6 +228,7 @@ func (Self *ReceiveWindow) calcSize() { n = 2 * Self.maxSize } if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) { + logs.Warn("window too large", n) n = common.MAXIMUM_WINDOW_SIZE / uint32(conns) } // set the maximum size diff --git a/lib/mux/mux.go b/lib/mux/mux.go index ae9f335..ab02323 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -5,7 +5,9 @@ import ( "io" "math" "net" + "os" "sync/atomic" + "syscall" "time" "github.com/astaxie/beego/logs" @@ -35,13 +37,17 @@ func NewMux(c net.Conn, connType string) *Mux { //c.(*net.TCPConn).SetReadBuffer(0) //c.(*net.TCPConn).SetWriteBuffer(0) _ = c.SetDeadline(time.Time{}) + fd, err := getConnFd(c) + if err != nil { + logs.Warn(err) + } m := &Mux{ conn: c, connMap: NewConnMap(), id: 0, closeChan: make(chan struct{}, 1), newConnCh: make(chan *conn), - bw: new(bandwidth), + bw: NewBandwidth(fd), IsClose: false, connType: connType, pingCh: make(chan []byte), @@ -58,6 +64,26 @@ func NewMux(c net.Conn, connType string) *Mux { return m } +func getConnFd(c net.Conn) (fd *os.File, err error) { + switch c.(type) { + case *net.TCPConn: + fd, err = c.(*net.TCPConn).File() + if err != nil { + return + } + return + case *net.UDPConn: + fd, err = c.(*net.UDPConn).File() + if err != nil { + return + } + return + default: + err = errors.New("mux:unknown conn type, only tcp or kcp") + return + } +} + func (s *Mux) NewConn() (*conn, error) { if s.IsClose { return nil, errors.New("the mux has closed") @@ -392,13 +418,19 @@ type bandwidth struct { readStart time.Time lastReadStart time.Time bufLength uint32 + fd *os.File + calcThreshold uint32 +} + +func NewBandwidth(fd *os.File) *bandwidth { + return &bandwidth{fd: fd} } func (Self *bandwidth) StartRead() { if Self.readStart.IsZero() { Self.readStart = time.Now() } - if Self.bufLength >= common.MAXIMUM_SEGMENT_SIZE*300 { + if Self.bufLength >= Self.calcThreshold { Self.lastReadStart, Self.readStart = Self.readStart, time.Now() Self.calcBandWidth() } @@ -410,7 +442,21 @@ func (Self *bandwidth) SetCopySize(n uint16) { func (Self *bandwidth) calcBandWidth() { t := Self.readStart.Sub(Self.lastReadStart) - atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds())) + bufferSize, err := syscall.GetsockoptInt(int(Self.fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF) + //logs.Warn(bufferSize) + if err != nil { + logs.Warn(err) + Self.bufLength = 0 + return + } + if Self.bufLength >= uint32(bufferSize) { + atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds())) + // calculate the hole socket buffer, the time meaning to fill the buffer + //logs.Warn(Self.Get()) + } else { + Self.calcThreshold = uint32(bufferSize) + } + // socket buffer size is bigger than bufLength, so we don't calculate it Self.bufLength = 0 }