diff --git a/lib/common/const.go b/lib/common/const.go index 2fd5bb6..2f195f3 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -49,6 +49,6 @@ const ( MUX_PING_RETURN MUX_PING int32 = -1 MAXIMUM_SEGMENT_SIZE = PoolSizeWindow - MAXIMUM_WINDOW_SIZE = 1 << 25 // 1<<31-1 TCP slide window size is very large, - // we use 32M, reduce memory usage + MAXIMUM_WINDOW_SIZE = 1 << 27 // 1<<31-1 TCP slide window size is very large, + // we use 128M, reduce memory usage ) diff --git a/lib/mux/conn.go b/lib/mux/conn.go index f6ea2ae..6d65f7c 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -202,6 +202,7 @@ type ReceiveWindow struct { bufQueue ReceiveWindowQueue element *common.ListElement count int8 + bw *bandwidth once sync.Once // receive window send the current max size and read size to send window // means done size actually store the size receive window has read @@ -211,9 +212,10 @@ func (Self *ReceiveWindow) New(mux *Mux) { // initial a window for receive Self.bufQueue.New() Self.element = common.ListElementPool.Get() - Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*10, 0, false) + Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false) Self.mux = mux Self.window.New() + Self.bw = NewBandwidth(nil) } func (Self *ReceiveWindow) remainingSize(maxSize uint32, delta uint16) (n uint32) { @@ -232,10 +234,11 @@ func (Self *ReceiveWindow) calcSize() { //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) conns := Self.mux.connMap.Size() n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * - Self.mux.bw.Get() / float64(conns)) + (Self.mux.bw.Get() + Self.bw.Get())) //logs.Warn(n) - if n < common.MAXIMUM_SEGMENT_SIZE*10 { - n = common.MAXIMUM_SEGMENT_SIZE * 10 + if n < common.MAXIMUM_SEGMENT_SIZE*30 { + //logs.Warn("window small", n, Self.mux.bw.Get(), Self.bw.Get()) + n = common.MAXIMUM_SEGMENT_SIZE * 30 } for { ptrs := atomic.LoadUint64(&Self.maxSizeDone) @@ -313,6 +316,13 @@ func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) { if Self.closeOp { return 0, io.EOF // receive close signal, returns eof } + Self.bw.StartRead() + n, err = Self.readFromQueue(p, id) + Self.bw.SetCopySize(uint16(n)) + return +} + +func (Self *ReceiveWindow) readFromQueue(p []byte, id int32) (n int, err error) { pOff := 0 l := 0 //logs.Warn("receive window read off, element.l", Self.off, Self.element.L) @@ -363,7 +373,7 @@ func (Self *ReceiveWindow) sendStatus(id int32, l uint16) { if read <= (read+uint32(l))&mask31 { read += uint32(l) remain := Self.remainingSize(maxSize, 0) - if wait && remain > 0 || remain == maxSize { + if wait && remain > 0 || read >= maxSize/2 || remain == maxSize { if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, false)) { // now we get the current window status success // receive window free up some space we need acknowledge send window, also reset the read size @@ -441,7 +451,7 @@ type SendWindow struct { func (Self *SendWindow) New(mux *Mux) { Self.setSizeCh = make(chan struct{}) - Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*10, 0, false) + Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false) Self.mux = mux Self.window.New() } @@ -654,82 +664,3 @@ func (Self *SendWindow) SetTimeOut(t time.Time) { // waiting for receive a receive window size Self.timeout = t } - -//type bandwidth struct { -// readStart time.Time -// lastReadStart time.Time -// readEnd time.Time -// lastReadEnd time.Time -// bufLength int -// lastBufLength int -// count int8 -// readBW float64 -// writeBW float64 -// readBandwidth float64 -//} -// -//func (Self *bandwidth) StartRead() { -// Self.lastReadStart, Self.readStart = Self.readStart, time.Now() -// if !Self.lastReadStart.IsZero() { -// if Self.count == -5 { -// Self.calcBandWidth() -// } -// } -//} -// -//func (Self *bandwidth) EndRead() { -// Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now() -// if Self.count == -5 { -// Self.calcWriteBandwidth() -// } -// if Self.count == 0 { -// Self.calcReadBandwidth() -// Self.count = -6 -// } -// Self.count += 1 -//} -// -//func (Self *bandwidth) SetCopySize(n int) { -// // must be invoke between StartRead and EndRead -// Self.lastBufLength, Self.bufLength = Self.bufLength, n -//} -//// calculating -//// start end start end -//// read read -//// write -// -//func (Self *bandwidth) calcBandWidth() { -// t := Self.readStart.Sub(Self.lastReadStart) -// if Self.lastBufLength >= 32768 { -// Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds() -// } -//} -// -//func (Self *bandwidth) calcReadBandwidth() { -// // Bandwidth between nps and npc -// readTime := Self.readEnd.Sub(Self.readStart) -// Self.readBW = float64(Self.bufLength) / readTime.Seconds() -// //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds()) -//} -// -//func (Self *bandwidth) calcWriteBandwidth() { -// // Bandwidth between nps and user, npc and application -// writeTime := Self.readStart.Sub(Self.lastReadEnd) -// Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds() -// //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds()) -//} -// -//func (Self *bandwidth) Get() (bw float64) { -// // The zero value, 0 for numeric types -// if Self.writeBW == 0 && Self.readBW == 0 { -// //logs.Warn("bw both 0") -// return 100 -// } -// if Self.writeBW == 0 && Self.readBW != 0 { -// return Self.readBW -// } -// if Self.readBW == 0 && Self.writeBW != 0 { -// return Self.writeBW -// } -// return Self.readBandwidth -//} diff --git a/lib/mux/mux.go b/lib/mux/mux.go index c0bcdd8..f89bbc4 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -220,7 +220,7 @@ func (s *Mux) pingReturn() { case data = <-s.pingCh: atomic.StoreUint32(&s.pingCheckTime, 0) case <-s.closeChan: - break + return } _ = now.UnmarshalText(data) latency := time.Now().UTC().Sub(now).Seconds() / 2 diff --git a/lib/mux/sysGetsock_nowindows.go b/lib/mux/sysGetsock_nowindows.go index 86c78ad..85d2197 100644 --- a/lib/mux/sysGetsock_nowindows.go +++ b/lib/mux/sysGetsock_nowindows.go @@ -14,7 +14,7 @@ func sysGetSock(fd *os.File) (bufferSize int, err error) { if fd != nil { return syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF) } else { - return 1400 * 320, nil + return 5 * 1024 * 1024, nil } } diff --git a/lib/mux/sysGetsock_windows.go b/lib/mux/sysGetsock_windows.go index 5cf213f..579d620 100644 --- a/lib/mux/sysGetsock_windows.go +++ b/lib/mux/sysGetsock_windows.go @@ -14,7 +14,7 @@ func sysGetSock(fd *os.File) (bufferSize int, err error) { // not support, WTF??? // Todo // return syscall.GetsockoptInt((syscall.Handle)(unsafe.Pointer(fd.Fd())), syscall.SOL_SOCKET, syscall.SO_RCVBUF) - bufferSize = 10 * 1024 * 1024 + bufferSize = 5 * 1024 * 1024 return }