From 6bbe276b18a088a21125966ce38a380aacd2f2df Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Sat, 4 Jan 2020 20:49:17 +0800 Subject: [PATCH] change window calculation, bandwidth calculation --- lib/mux/conn.go | 72 +++++++++++++++++++++++++++++++++++++++++++------ lib/mux/mux.go | 2 +- 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 6d65f7c..85ba1ef 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -202,7 +202,7 @@ type ReceiveWindow struct { bufQueue ReceiveWindowQueue element *common.ListElement count int8 - bw *bandwidth + bw *writeBandwidth once sync.Once // receive window send the current max size and read size to send window // means done size actually store the size receive window has read @@ -215,7 +215,7 @@ func (Self *ReceiveWindow) New(mux *Mux) { Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false) Self.mux = mux Self.window.New() - Self.bw = NewBandwidth(nil) + Self.bw = NewWriteBandwidth() } func (Self *ReceiveWindow) remainingSize(maxSize uint32, delta uint16) (n uint32) { @@ -232,9 +232,15 @@ func (Self *ReceiveWindow) calcSize() { // calculating maximum receive window size if Self.count == 0 { //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) - conns := Self.mux.connMap.Size() - n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * - (Self.mux.bw.Get() + Self.bw.Get())) + //conns := Self.mux.connMap.Size() + muxBw := Self.mux.bw.Get() + connBw := Self.bw.Get() + //logs.Warn("muxbw connbw", muxBw, connBw) + var n uint32 + if connBw > 0 && muxBw > 0 { + n = uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * + (muxBw + connBw)) + } //logs.Warn(n) if n < common.MAXIMUM_SEGMENT_SIZE*30 { //logs.Warn("window small", n, Self.mux.bw.Get(), Self.bw.Get()) @@ -252,9 +258,12 @@ func (Self *ReceiveWindow) calcSize() { n = 2 * size // twice grow } - if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) { - logs.Warn("window too large, calculated:", n, "limit:", common.MAXIMUM_WINDOW_SIZE/uint32(conns)) - n = common.MAXIMUM_WINDOW_SIZE / uint32(conns) + if connBw > 0 && muxBw > 0 { + limit := uint32(common.MAXIMUM_WINDOW_SIZE * (connBw / (muxBw + connBw))) + if n > limit { + logs.Warn("window too large, calculated:", n, "limit:", limit, connBw, muxBw) + n = limit + } } // set the maximum size //logs.Warn("n", n) @@ -664,3 +673,50 @@ func (Self *SendWindow) SetTimeOut(t time.Time) { // waiting for receive a receive window size Self.timeout = t } + +type writeBandwidth struct { + writeBW uint64 // store in bits, but it's float64 + readEnd time.Time + duration float64 + bufLength uint32 +} + +const writeCalcThreshold uint32 = 5 * 1024 * 1024 + +func NewWriteBandwidth() *writeBandwidth { + return &writeBandwidth{} +} + +func (Self *writeBandwidth) StartRead() { + if Self.readEnd.IsZero() { + Self.readEnd = time.Now() + } + Self.duration += time.Now().Sub(Self.readEnd).Seconds() + if Self.bufLength >= writeCalcThreshold { + Self.calcBandWidth() + } +} + +func (Self *writeBandwidth) SetCopySize(n uint16) { + Self.bufLength += uint32(n) + Self.endRead() +} + +func (Self *writeBandwidth) endRead() { + Self.readEnd = time.Now() +} + +func (Self *writeBandwidth) calcBandWidth() { + atomic.StoreUint64(&Self.writeBW, math.Float64bits(float64(Self.bufLength)/Self.duration)) + Self.bufLength = 0 + Self.duration = 0 +} + +func (Self *writeBandwidth) Get() (bw float64) { + // The zero value, 0 for numeric types + bw = math.Float64frombits(atomic.LoadUint64(&Self.writeBW)) + if bw <= 0 { + bw = 0 + } + return +} diff --git a/lib/mux/mux.go b/lib/mux/mux.go index f89bbc4..52bd81c 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -443,7 +443,7 @@ func (Self *bandwidth) Get() (bw float64) { // The zero value, 0 for numeric types bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth)) if bw <= 0 { - bw = 100 + bw = 0 } //logs.Warn(bw) return