diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index c6956f5..45456c5 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -162,10 +162,9 @@ func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { } type MuxPackager struct { - Flag uint8 - Id int32 - Window uint32 - ReadLength uint32 + Flag uint8 + Id int32 + Window uint64 BasePackager } @@ -178,19 +177,8 @@ func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (e err = Self.BasePackager.NewPac(content...) //logs.Warn(Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: - // MUX_MSG_SEND_OK contains two data - switch content[0].(type) { - case int: - Self.Window = uint32(content[0].(int)) - case uint32: - Self.Window = content[0].(uint32) - } - switch content[1].(type) { - case int: - Self.ReadLength = uint32(content[1].(int)) - case uint32: - Self.ReadLength = content[1].(uint32) - } + // MUX_MSG_SEND_OK contains one data + Self.Window = content[0].(uint64) } return } @@ -210,10 +198,6 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { WindowBuff.Put(Self.Content) case MUX_MSG_SEND_OK: err = binary.Write(writer, binary.LittleEndian, Self.Window) - if err != nil { - return - } - err = binary.Write(writer, binary.LittleEndian, Self.ReadLength) } return } @@ -235,12 +219,7 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) { //logs.Warn("unpack", Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: err = binary.Read(reader, binary.LittleEndian, &Self.Window) - if err != nil { - return - } - n += 4 // uint32 - err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength) - n += 4 // uint32 + n += 8 // uint64 } n += 5 //uint8 int32 return @@ -251,7 +230,6 @@ func (Self *MuxPackager) reset() { Self.Flag = 0 Self.Length = 0 Self.Content = nil - Self.ReadLength = 0 Self.Window = 0 } diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 52fb642..a2b2f5f 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -3,6 +3,7 @@ package mux import ( "errors" "github.com/astaxie/beego/logs" + "github.com/cnlh/nps/lib/common" "io" "math" "net" @@ -10,8 +11,6 @@ import ( "sync" "sync/atomic" "time" - - "github.com/cnlh/nps/lib/common" ) type conn struct { @@ -146,25 +145,44 @@ func (s *conn) SetWriteDeadline(t time.Time) error { } type window struct { - remainingWait uint64 // 64bit alignment - off uint32 - maxSize uint32 - closeOp bool - closeOpCh chan struct{} - mux *Mux + maxSizeDone uint64 + // 64bit alignment + // maxSizeDone contains 4 parts + // 1 31 1 31 + // wait maxSize useless done + // wait zero means false, one means true + off uint32 + closeOp bool + closeOpCh chan struct{} + mux *Mux } -func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) { - const mask = 1<> dequeueBits) & mask) - wait = uint32(ptrs & mask) +const windowBits = 31 +const waitBits = dequeueBits + windowBits +const mask1 = 1 +const mask31 = 1<> dequeueBits) & mask31) + done = uint32(ptrs & mask31) + //logs.Warn("unpack", maxSize, done) + if ((ptrs >> waitBits) & mask1) == 1 { + wait = true + return + } return } -func (Self *window) pack(remaining, wait uint32) uint64 { - const mask = 1< 0 { n = uint32(l) @@ -213,27 +233,33 @@ func (Self *ReceiveWindow) calcSize() { conns := Self.mux.connMap.Size() n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * Self.mux.bw.Get() / float64(conns)) + //logs.Warn(n) if n < common.MAXIMUM_SEGMENT_SIZE*10 { n = common.MAXIMUM_SEGMENT_SIZE * 10 } - //bufLen := Self.bufQueue.Len() - //if n < bufLen { - // n = bufLen - //} - if n < Self.maxSize/2 { - n = Self.maxSize / 2 + for { + ptrs := atomic.LoadUint64(&Self.maxSizeDone) + size, read, wait := Self.unpack(ptrs) + if n < size/2 { + n = size / 2 + // half reduce + } + // set the minimal size + if n > 2*size { + n = 2 * size + // twice grow + } + if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) { + //logs.Warn("window too large", n) + n = common.MAXIMUM_WINDOW_SIZE / uint32(conns) + } + // set the maximum size + //logs.Warn("n", n) + if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(n, read, wait)) { + // only change the maxSize + break + } } - // set the minimal size - if n > 2*Self.maxSize { - n = 2 * Self.maxSize - } - if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) { - logs.Warn("window too large", n) - n = common.MAXIMUM_WINDOW_SIZE / uint32(conns) - } - // set the maximum size - //logs.Warn("n", n) - atomic.StoreUint32(&Self.maxSize, n) Self.count = -10 } Self.count += 1 @@ -245,30 +271,40 @@ func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err return errors.New("conn.receiveWindow: write on closed window") } element, err := NewListElement(buf, l, part) - //logs.Warn("push the buf", len(buf), l, (&element).l) + //logs.Warn("push the buf", len(buf), l, element.L) if err != nil { return } Self.calcSize() // calculate the max window size - var wait uint32 + var wait bool + var maxSize, read uint32 start: - ptrs := atomic.LoadUint64(&Self.remainingWait) - _, wait = Self.unpack(ptrs) - newRemaining := Self.remainingSize(l) + ptrs := atomic.LoadUint64(&Self.maxSizeDone) + maxSize, read, wait = Self.unpack(ptrs) + remain := Self.remainingSize(maxSize, l) // calculate the remaining window size now, plus the element we will push - if newRemaining == 0 { + if remain == 0 && !wait { //logs.Warn("window full true", remaining) - wait = 1 - } - if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) { - goto start - // another goroutine change the status, make sure shall we need wait - } + wait = true + if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) { + // only change the wait status, not send the read size + goto start + // another goroutine change the status, make sure shall we need wait + } + //logs.Warn("receive window full") + } else if !wait { + if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, wait)) { + // reset read size here, and send the read size directly + goto start + // another goroutine change the status, make sure shall we need wait + } + } // maybe there are still some data received even if window is full, just keep the wait status + // and push into queue. when receive window read enough, send window will be acknowledged. Self.bufQueue.Push(element) // status check finish, now we can push the element into the queue - if wait == 0 { - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining) - // send the remaining window size, not including zero size + if !wait { + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false)) + // send the current status to send window } return nil } @@ -279,7 +315,7 @@ func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) { } pOff := 0 l := 0 - //logs.Warn("receive window read off, element.l", Self.off, Self.element.l) + //logs.Warn("receive window read off, element.l", Self.off, Self.element.L) copyData: if Self.off == uint32(Self.element.L) { // on the first Read method invoked, Self.off and Self.element.l @@ -291,14 +327,13 @@ copyData: Self.element, err = Self.bufQueue.Pop() // if the queue is empty, Pop method will wait until one element push // into the queue successful, or timeout. - // timer start on timeout parameter is set up , - // reset to 60s if timeout and data still available + // timer start on timeout parameter is set up Self.off = 0 if err != nil { Self.CloseWindow() // also close the window, to avoid read twice return // queue receive stop or time out, break the loop and return } - //logs.Warn("pop element", Self.element.l, Self.element.part) + //logs.Warn("pop element", Self.element.L, Self.element.Part) } l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L]) pOff += l @@ -320,22 +355,41 @@ copyData: } func (Self *ReceiveWindow) sendStatus(id int32, l uint16) { - var remaining, wait uint32 + var maxSize, read uint32 + var wait bool for { - ptrs := atomic.LoadUint64(&Self.remainingWait) - remaining, wait = Self.unpack(ptrs) - remaining += uint32(l) - if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) { - break + ptrs := atomic.LoadUint64(&Self.maxSizeDone) + maxSize, read, wait = Self.unpack(ptrs) + if read <= (read+uint32(l))&mask31 { + read += uint32(l) + remain := Self.remainingSize(maxSize, 0) + if wait && remain > 0 || remain == maxSize { + if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, false)) { + // now we get the current window status success + // receive window free up some space we need acknowledge send window, also reset the read size + // still having a condition that receive window is empty and not send the status to send window + // so send the status here + //logs.Warn("receive window free up some space", remain) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false)) + break + } + } else { + if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) { + // receive window not into the wait status, or still not having any space now, + // just change the read size + break + } + } + } else { + //overflow + if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, uint32(l), wait)) { + // reset to l + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false)) + break + } } runtime.Gosched() // another goroutine change remaining or wait status, make sure - // we need acknowledge other side - } - // now we get the current window status success - if wait == 1 { - //logs.Warn("send the wait status", remaining) - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining) } return } @@ -380,12 +434,14 @@ type SendWindow struct { buf []byte setSizeCh chan struct{} timeout time.Time + // send window receive the receive window max size and read size + // done size store the size send window has send, send and read will be totally equal + // so send minus read, send window can get the current window size remaining } func (Self *SendWindow) New(mux *Mux) { Self.setSizeCh = make(chan struct{}) - Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10 - atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)< 0 { + return uint32(l) + } + return 0 +} + +func (Self *SendWindow) SetSize(currentMaxSizeDone uint64) (closed bool) { // set the window size from receive window defer func() { if recover() != nil { @@ -408,26 +472,34 @@ func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) { return true } //logs.Warn("set send window size to ", windowSize, newRemaining) - var remaining, wait, newWait uint32 + var maxsize, send uint32 + var wait, newWait bool + currentMaxSize, read, _ := Self.unpack(currentMaxSizeDone) for { - ptrs := atomic.LoadUint64(&Self.remainingWait) - remaining, wait = Self.unpack(ptrs) - if remaining == newRemaining { - //logs.Warn("waiting for another window size") - return false // waiting for receive another usable window size + ptrs := atomic.LoadUint64(&Self.maxSizeDone) + maxsize, send, wait = Self.unpack(ptrs) + if read > send { + logs.Error("read > send") + return } - if newRemaining == 0 && wait == 1 { - newWait = 1 // keep the wait status, - // also if newRemaining is not zero, change wait to 0 + if read == 0 && currentMaxSize == maxsize { + return } - if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) { + send -= read + remain := Self.remainingSize(currentMaxSize, send) + if remain == 0 && wait { + // just keep the wait status + newWait = true + } + // remain > 0, change wait to false. or remain == 0, wait is false, just keep it + if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(currentMaxSize, send, newWait)) { break } // anther goroutine change wait status or window size } - if wait == 1 { + if wait && !newWait { // send window into the wait status, need notice the channel - //logs.Warn("send window remaining size is 0") + //logs.Warn("send window allow") Self.allow() } // send window not into the wait status, so just do slide @@ -446,18 +518,20 @@ func (Self *SendWindow) allow() { } func (Self *SendWindow) sent(sentSize uint32) { - var remaining, wait uint32 + var maxSie, send uint32 + var wait bool for { - ptrs := atomic.LoadUint64(&Self.remainingWait) - remaining, wait = Self.unpack(ptrs) - if remaining >= sentSize { - atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)< common.MAXIMUM_SEGMENT_SIZE { sendSize = common.MAXIMUM_SEGMENT_SIZE //logs.Warn("cut buf by mss") } else { sendSize = uint32(len(Self.buf[Self.off:])) } - if remaining < sendSize { + if remain < sendSize { // usable window size is small than // window MAXIMUM_SEGMENT_SIZE or send buf left - sendSize = remaining + sendSize = remain //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:])) } //logs.Warn("send size", sendSize) diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 254945e..c0bcdd8 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -303,7 +303,7 @@ func (s *Mux) readSession() { if connection.isClose { continue } - connection.sendWindow.SetSize(pack.Window, pack.ReadLength) + connection.sendWindow.SetSize(pack.Window) continue case common.MUX_CONN_CLOSE: //close the connection connection.closeFlag = true diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index fb5adb2..0d71ee4 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/goroutine" - "github.com/xtaci/kcp-go" "io" "log" "net" @@ -34,8 +33,8 @@ func TestNewMux(t *testing.T) { //poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false)) time.Sleep(time.Second * 3) go func() { - //m2 := NewMux(conn2, "tcp") - m2 := NewMux(conn2, "kcp") + m2 := NewMux(conn2, "tcp") + //m2 := NewMux(conn2, "kcp") for { //logs.Warn("npc starting accept") c, err := m2.Accept() @@ -84,8 +83,8 @@ func TestNewMux(t *testing.T) { }() go func() { - //m1 := NewMux(conn1, "tcp") - m1 := NewMux(conn1, "kcp") + m1 := NewMux(conn1, "tcp") + //m1 := NewMux(conn1, "kcp") l, err := net.Listen("tcp", "127.0.0.1:7777") if err != nil { logs.Warn(err) @@ -147,14 +146,14 @@ func TestNewMux(t *testing.T) { func server() { var err error - //l, err := net.Listen("tcp", "127.0.0.1:9999") - l, err := kcp.Listen("127.0.0.1:9999") + l, err := net.Listen("tcp", "127.0.0.1:9999") + //l, err := kcp.Listen("127.0.0.1:9999") if err != nil { logs.Warn(err) } go func() { conn1, err = l.Accept() - logs.Info("accept", conn1) + //logs.Info("accept", conn1) if err != nil { logs.Warn(err) } @@ -164,9 +163,9 @@ func server() { func client() { var err error - //conn2, err = net.Dial("tcp", "127.0.0.1:9999") - logs.Warn("dial") - conn2, err = kcp.Dial("127.0.0.1:9999") + conn2, err = net.Dial("tcp", "127.0.0.1:9999") + //logs.Warn("dial") + //conn2, err = kcp.Dial("127.0.0.1:9999") if err != nil { logs.Warn(err) }