diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 99b6a05..f3217d8 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -4,7 +4,6 @@ import ( "errors" "io" "net" - "strconv" "sync" "sync/atomic" "time" @@ -41,12 +40,12 @@ func NewConn(connId int32, mux *Mux, label ...string) *conn { } c.receiveWindow.New(mux) c.sendWindow.New(mux) - logm := &connLog{ - startTime: time.Now(), - isClose: false, - logs: []string{c.label + "new conn success"}, - } - setM(label[0], int(connId), logm) + //logm := &connLog{ + // startTime: time.Now(), + // isClose: false, + // logs: []string{c.label + "new conn success"}, + //} + //setM(label[0], int(connId), logm) return c } @@ -59,15 +58,15 @@ func (s *conn) Read(buf []byte) (n int, err error) { } // waiting for takeout from receive window finish or timeout n, err = s.receiveWindow.Read(buf, s.connId) - var errstr string - if err == nil { - errstr = "err:nil" - } else { - errstr = err.Error() - } - d := getM(s.label, int(s.connId)) - d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100])) - setM(s.label, int(s.connId), d) + //var errstr string + //if err == nil { + // errstr = "err:nil" + //} else { + // errstr = err.Error() + //} + //d := getM(s.label, int(s.connId)) + //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100])) + //setM(s.label, int(s.connId), d) return } @@ -102,10 +101,10 @@ func (s *conn) closeProcess() { } s.sendWindow.CloseWindow() s.receiveWindow.CloseWindow() - d := getM(s.label, int(s.connId)) - d.isClose = true - d.logs = append(d.logs, s.label+"close "+time.Now().String()) - setM(s.label, int(s.connId), d) + //d := getM(s.label, int(s.connId)) + //d.isClose = true + //d.logs = append(d.logs, s.label+"close "+time.Now().String()) + //setM(s.label, int(s.connId), d) return } @@ -154,12 +153,12 @@ func (Self *window) CloseWindow() { } type ReceiveWindow struct { - bufQueue FIFOQueue + bufQueue ReceiveWindowQueue element *ListElement readLength uint32 readOp chan struct{} readWait bool - windowFull bool + windowFull uint32 count int8 //bw *bandwidth once sync.Once @@ -179,7 +178,7 @@ func (Self *ReceiveWindow) New(mux *Mux) { func (Self *ReceiveWindow) remainingSize() (n uint32) { // receive window remaining - return Self.maxSize - Self.bufQueue.Len() + return atomic.LoadUint32(&Self.maxSize) - Self.bufQueue.Len() } func (Self *ReceiveWindow) readSize() (n uint32) { @@ -207,7 +206,7 @@ func (Self *ReceiveWindow) calcSize() { } // set the maximum size //logs.Warn("n", n) - Self.maxSize = n + atomic.StoreUint32(&Self.maxSize, n) Self.count = -10 } Self.count += 1 @@ -229,7 +228,7 @@ func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err Self.calcSize() //logs.Warn("read session calc size finish", Self.maxSize) if Self.remainingSize() == 0 { - Self.windowFull = true + atomic.StoreUint32(&Self.windowFull, 1) //logs.Warn("window full true", Self.windowFull) } Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize()) @@ -259,7 +258,7 @@ copyData: } //logs.Warn("pop element", Self.element.l, Self.element.part) } - l = copy(p[pOff:], Self.element.buf[Self.off:]) + l = copy(p[pOff:], Self.element.buf[Self.off:Self.element.l]) //Self.bw.SetCopySize(l) pOff += l Self.off += uint32(l) @@ -281,13 +280,16 @@ copyData: } func (Self *ReceiveWindow) sendStatus(id int32) { - if Self.windowFull || Self.bufQueue.Len() == 0 { + if Self.bufQueue.Len() == 0 { // window is full before read or empty now - Self.windowFull = false - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize()) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), Self.readSize()) // acknowledge other side, have empty some receive window space //} } + if atomic.LoadUint32(&Self.windowFull) > 0 && Self.remainingSize() > 0 { + atomic.StoreUint32(&Self.windowFull, 0) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), Self.readSize()) + } } func (Self *ReceiveWindow) SetTimeOut(t time.Time) { @@ -309,8 +311,7 @@ type SendWindow struct { buf []byte sentLength uint32 setSizeCh chan struct{} - setSizeWait int32 - unSlide uint32 + setSizeWait uint32 timeout time.Time window } @@ -352,10 +353,10 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) { if Self.RemainingSize() == 0 { //logs.Warn("waiting for another window size after slide") // keep the wait status - atomic.StoreInt32(&Self.setSizeWait, 1) + //atomic.StoreUint32(&Self.setSizeWait, 1) return false } - if atomic.CompareAndSwapInt32(&Self.setSizeWait, 1, 0) { + if atomic.CompareAndSwapUint32(&Self.setSizeWait, 1, 0) { // send window into the wait status, need notice the channel select { case Self.setSizeCh <- struct{}{}: @@ -372,7 +373,7 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) { func (Self *SendWindow) slide(windowSize, readLength uint32) { atomic.AddUint32(&Self.sentLength, ^readLength-1) - atomic.SwapUint32(&Self.maxSize, windowSize) + atomic.StoreUint32(&Self.maxSize, windowSize) } func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) { @@ -386,7 +387,7 @@ func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err err // send window buff is drain, return eof and get another one } if Self.RemainingSize() == 0 { - atomic.StoreInt32(&Self.setSizeWait, 1) + atomic.StoreUint32(&Self.setSizeWait, 1) // into the wait status err = Self.waitReceiveWindow() if err != nil { @@ -395,20 +396,20 @@ func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err err } if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE { sendSize = common.MAXIMUM_SEGMENT_SIZE - part = true //logs.Warn("cut buf by mss") } else { sendSize = uint32(len(Self.buf[Self.off:])) - part = false } if Self.RemainingSize() < sendSize { // usable window size is small than // window MAXIMUM_SEGMENT_SIZE or send buf left sendSize = Self.RemainingSize() //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:])) - part = true } //logs.Warn("send size", sendSize) + if sendSize < uint32(len(Self.buf[Self.off:])) { + part = true + } p = Self.buf[Self.off : sendSize+Self.off] Self.off += sendSize atomic.AddUint32(&Self.sentLength, sendSize) diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 8300977..8a6086c 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -1,7 +1,6 @@ package mux import ( - "bytes" "errors" "io" "math" @@ -29,7 +28,7 @@ type Mux struct { pingTimer *time.Timer connType string writeQueue PriorityQueue - bufCh chan *bytes.Buffer + //bufQueue BytesQueue sync.Mutex } @@ -40,16 +39,16 @@ func NewMux(c net.Conn, connType string) *Mux { conn: c, connMap: NewConnMap(), id: 0, - closeChan: make(chan struct{}, 3), + closeChan: make(chan struct{}, 1), newConnCh: make(chan *conn, 10), bw: new(bandwidth), IsClose: false, connType: connType, - bufCh: make(chan *bytes.Buffer), pingCh: make(chan []byte), pingTimer: time.NewTimer(15 * time.Second), } m.writeQueue.New() + //m.bufQueue.New() //read session by flag m.readSession() //ping @@ -111,16 +110,18 @@ func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { func (s *Mux) writeSession() { go s.packBuf() - go s.writeBuf() + //go s.writeBuf() } func (s *Mux) packBuf() { + buffer := common.BuffPool.Get() for { if s.IsClose { break } + buffer.Reset() pack := s.writeQueue.Pop() - buffer := common.BuffPool.Get() + //buffer := common.BuffPool.Get() err := pack.Pack(buffer) common.MuxPack.Put(pack) if err != nil { @@ -129,34 +130,37 @@ func (s *Mux) packBuf() { break } //logs.Warn(buffer.String()) - select { - case s.bufCh <- buffer: - case <-s.closeChan: + //s.bufQueue.Push(buffer) + l := buffer.Len() + n, err := buffer.WriteTo(s.conn) + //common.BuffPool.Put(buffer) + if err != nil || int(n) != l { + logs.Warn("close from write session fail ", err, n, l) + s.Close() break } } } -func (s *Mux) writeBuf() { - for { - if s.IsClose { - break - } - select { - case buffer := <-s.bufCh: - l := buffer.Len() - n, err := buffer.WriteTo(s.conn) - common.BuffPool.Put(buffer) - if err != nil || int(n) != l { - logs.Warn("close from write session fail ", err, n, l) - s.Close() - break - } - case <-s.closeChan: - break - } - } -} +//func (s *Mux) writeBuf() { +// for { +// if s.IsClose { +// break +// } +// buffer, err := s.bufQueue.Pop() +// if err != nil { +// break +// } +// l := buffer.Len() +// n, err := buffer.WriteTo(s.conn) +// common.BuffPool.Put(buffer) +// if err != nil || int(n) != l { +// logs.Warn("close from write session fail ", err, n, l) +// s.Close() +// break +// } +// } +//} func (s *Mux) ping() { go func() { @@ -310,8 +314,7 @@ func (s *Mux) Close() error { } s.IsClose = true s.connMap.Close() - s.closeChan <- struct{}{} - s.closeChan <- struct{}{} + //s.bufQueue.Stop() s.closeChan <- struct{}{} close(s.newConnCh) return s.conn.Close() diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index 0ac54d5..e3f9dcc 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -4,12 +4,14 @@ import ( "bufio" "fmt" "github.com/cnlh/nps/lib/common" + "io" "log" "net" "net/http" "net/http/httputil" _ "net/http/pprof" "strconv" + "sync" "testing" "time" "unsafe" @@ -48,42 +50,42 @@ func TestNewMux(t *testing.T) { //c2.(*net.TCPConn).SetReadBuffer(0) //c2.(*net.TCPConn).SetReadBuffer(0) go func(c2 net.Conn, c *conn) { - //wg := sync.WaitGroup{} - //wg.Add(1) - //go func() { - // _, err = common.CopyBuffer(c2, c) - // if err != nil { - // c2.Close() - // c.Close() - // logs.Warn("close npc by copy from nps", err, c.connId) - // } - // wg.Done() - //}() - //wg.Add(1) - //go func() { - // _, err = common.CopyBuffer(c, c2) - // if err != nil { - // c2.Close() - // c.Close() - // logs.Warn("close npc by copy from server", err, c.connId) - // } - // wg.Done() - //}() - ////logs.Warn("npc wait") - //wg.Wait() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + _, err = common.CopyBuffer(c2, c) + if err != nil { + c2.Close() + c.Close() + //logs.Warn("close npc by copy from nps", err, c.connId) + } + wg.Done() + }() + wg.Add(1) + go func() { + _, err = common.CopyBuffer(c, c2) + if err != nil { + c2.Close() + c.Close() + //logs.Warn("close npc by copy from server", err, c.connId) + } + wg.Done() + }() + //logs.Warn("npc wait") + wg.Wait() }(c2, c.(*conn)) } }() go func() { - //m1 := NewMux(conn1, "tcp") + m1 := NewMux(conn1, "tcp") l, err := net.Listen("tcp", "127.0.0.1:7777") if err != nil { logs.Warn(err) } for { //logs.Warn("nps starting accept") - _, err := l.Accept() + conns, err := l.Accept() if err != nil { logs.Warn(err) continue @@ -91,37 +93,38 @@ func TestNewMux(t *testing.T) { //conns.(*net.TCPConn).SetReadBuffer(0) //conns.(*net.TCPConn).SetReadBuffer(0) //logs.Warn("nps accept success starting new conn") - //tmpCpnn, err := m1.NewConn() - //if err != nil { - // logs.Warn("nps new conn err ", err) - // continue - //} - ////logs.Warn("nps new conn success ", tmpCpnn.connId) - //go func(tmpCpnn *conn, conns net.Conn) { - // //go func() { - // // _, err := common.CopyBuffer(tmpCpnn, conns) - // // if err != nil { - // // conns.Close() - // // tmpCpnn.Close() - // // logs.Warn("close nps by copy from user", tmpCpnn.connId, err) - // // } - // //}() - // ////time.Sleep(time.Second) - // //_, err = common.CopyBuffer(conns, tmpCpnn) - // //if err != nil { - // // conns.Close() - // // tmpCpnn.Close() - // // logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) - // //} - //}(tmpCpnn, conns) + tmpCpnn, err := m1.NewConn() + if err != nil { + logs.Warn("nps new conn err ", err) + continue + } + //logs.Warn("nps new conn success ", tmpCpnn.connId) + go func(tmpCpnn *conn, conns net.Conn) { + go func() { + _, err := common.CopyBuffer(tmpCpnn, conns) + if err != nil { + conns.Close() + tmpCpnn.Close() + //logs.Warn("close nps by copy from user", tmpCpnn.connId, err) + } + }() + //time.Sleep(time.Second) + _, err = common.CopyBuffer(conns, tmpCpnn) + if err != nil { + conns.Close() + tmpCpnn.Close() + //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) + } + }(tmpCpnn, conns) } }() - go NewLogServer() + //go NewLogServer() time.Sleep(time.Second * 5) for i := 0; i < 1000; i++ { go test_raw(i) } + //test_request() for { time.Sleep(time.Second * 5) @@ -154,7 +157,7 @@ func client() { func test_request() { conn, _ := net.Dial("tcp", "127.0.0.1:7777") for { - conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1 + conn.Write([]byte(`GET / HTTP/1.1 Host: 127.0.0.1:7777 Connection: keep-alive @@ -177,39 +180,42 @@ Connection: keep-alive } func test_raw(k int) { - for i := 0; i < 1; i++ { + for i := 0; i < 10; i++ { ti := time.Now() - _, _ = net.Dial("tcp", "127.0.0.1:7777") + conn, err := net.Dial("tcp", "127.0.0.1:7777") + if err != nil { + logs.Warn("conn dial err", err) + } tid := time.Now() - // conn.Write([]byte(`GET / HTTP/1.1 - //Host: 127.0.0.1:7777 - // - // - //`)) - // tiw := time.Now() - //buf := make([]byte, 3572) - //n, err := io.ReadFull(conn, buf) - ////n, err := conn.Read(buf) - //if err != nil { - // logs.Warn("close by read response err", err) - // break - //} - ////logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) - ////time.Sleep(time.Second) - //err = conn.Close() - //if err != nil { - // logs.Warn("close conn err ", err) - //} + conn.Write([]byte(`GET / HTTP/1.1 +Host: 127.0.0.1:7777 + + +`)) + tiw := time.Now() + buf := make([]byte, 3572) + n, err := io.ReadFull(conn, buf) + //n, err := conn.Read(buf) + if err != nil { + logs.Warn("close by read response err", err) + break + } + logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) + //time.Sleep(time.Second) + err = conn.Close() + if err != nil { + logs.Warn("close conn err ", err) + } now := time.Now() du := now.Sub(ti).Seconds() dud := now.Sub(tid).Seconds() - //duw := now.Sub(tiw).Seconds() - //if du > 1 { - logs.Warn("duration long", du, dud, k, i) - //} - //if n != 3572 { - // logs.Warn("n loss", n, string(buf)) - //} + duw := now.Sub(tiw).Seconds() + if du > 1 { + logs.Warn("duration long", du, dud, duw, k, i) + } + if n != 3572 { + logs.Warn("n loss", n, string(buf)) + } } } @@ -293,11 +299,11 @@ func TestFIFO(t *testing.T) { logs.EnableFuncCallDepth(true) logs.SetLogFuncCallDepth(3) time.Sleep(time.Second * 5) - d := new(FIFOQueue) + d := new(ReceiveWindowQueue) d.New() go func() { time.Sleep(time.Second) - for i := 0; i < 30000; i++ { + for i := 0; i < 30010; i++ { data, err := d.Pop() if err == nil { //fmt.Println(i, string(data.buf), err) @@ -306,7 +312,9 @@ func TestFIFO(t *testing.T) { //fmt.Println("err", err) logs.Warn("err", err) } + //logs.Warn(d.Len()) } + logs.Warn("pop finish") }() go func() { time.Sleep(time.Second * 10) @@ -314,10 +322,10 @@ func TestFIFO(t *testing.T) { go func(i int) { for n := 0; n < 10; n++ { data := new(ListElement) - by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n)) + by := []byte("test " + strconv.Itoa(i) + " " + strconv.Itoa(n)) // _ = data.New(by, uint16(len(by)), true) //fmt.Println(string((*data).buf), data) - logs.Warn(string((*data).buf), data) + //logs.Warn(string((*data).buf), data) d.Push(data) } }(i) @@ -337,11 +345,12 @@ func TestPriority(t *testing.T) { d.New() go func() { time.Sleep(time.Second) - for i := 0; i < 36000; i++ { + for i := 0; i < 36005; i++ { data := d.Pop() //fmt.Println(i, string(data.buf), err) logs.Warn(i, string(data.Content), data) } + logs.Warn("pop finish") }() go func() { time.Sleep(time.Second * 10) diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 488c616..6ed2dd6 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -1,10 +1,12 @@ package mux import ( + "bytes" "errors" "github.com/cnlh/nps/lib/common" "io" "math" + "runtime" "sync/atomic" "time" "unsafe" @@ -13,7 +15,7 @@ import ( type QueueOp struct { readOp chan struct{} cleanOp chan struct{} - popWait int32 + popWait uint32 } func (Self *QueueOp) New() { @@ -22,7 +24,7 @@ func (Self *QueueOp) New() { } func (Self *QueueOp) allowPop() (closed bool) { - if atomic.CompareAndSwapInt32(&Self.popWait, 1, 0) { + if atomic.CompareAndSwapUint32(&Self.popWait, 1, 0) { select { case Self.readOp <- struct{}{}: return false @@ -44,7 +46,7 @@ type PriorityQueue struct { highestChain *bufChain middleChain *bufChain lowestChain *bufChain - hunger uint8 + starving uint8 } func (Self *PriorityQueue) New() { @@ -73,7 +75,7 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) { return } -const maxHunger uint8 = 10 +const maxStarving uint8 = 8 func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { startPop: @@ -82,31 +84,32 @@ startPop: packager = (*common.MuxPackager)(ptr) return } - if Self.hunger < maxHunger { + if Self.starving < maxStarving { ptr, ok = Self.middleChain.popTail() if ok { packager = (*common.MuxPackager)(ptr) - Self.hunger++ + Self.starving++ return } } ptr, ok = Self.lowestChain.popTail() if ok { packager = (*common.MuxPackager)(ptr) - if Self.hunger > 0 { - Self.hunger = uint8(Self.hunger / 2) + if Self.starving > 0 { + Self.starving = uint8(Self.starving / 2) } return } - if Self.hunger > 0 { + if Self.starving > 0 { ptr, ok = Self.middleChain.popTail() if ok { packager = (*common.MuxPackager)(ptr) + Self.starving++ return } } // PriorityQueue is empty, notice Push method - if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) { + if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) { select { case <-Self.readOp: goto startPop @@ -133,7 +136,7 @@ func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) { return nil } -type FIFOQueue struct { +type ReceiveWindowQueue struct { QueueOp chain *bufChain length uint32 @@ -141,21 +144,21 @@ type FIFOQueue struct { timeout time.Time } -func (Self *FIFOQueue) New() { +func (Self *ReceiveWindowQueue) New() { Self.QueueOp.New() Self.chain = new(bufChain) Self.chain.new(64) Self.stopOp = make(chan struct{}, 1) } -func (Self *FIFOQueue) Push(element *ListElement) { +func (Self *ReceiveWindowQueue) Push(element *ListElement) { Self.chain.pushHead(unsafe.Pointer(element)) atomic.AddUint32(&Self.length, uint32(element.l)) Self.allowPop() return } -func (Self *FIFOQueue) Pop() (element *ListElement, err error) { +func (Self *ReceiveWindowQueue) Pop() (element *ListElement, err error) { startPop: ptr, ok := Self.chain.popTail() if ok { @@ -163,7 +166,7 @@ startPop: atomic.AddUint32(&Self.length, ^uint32(element.l-1)) return } - if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) { + if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) { t := Self.timeout.Sub(time.Now()) if t <= 0 { t = time.Minute @@ -186,18 +189,62 @@ startPop: goto startPop } -func (Self *FIFOQueue) Len() (n uint32) { +func (Self *ReceiveWindowQueue) Len() (n uint32) { return atomic.LoadUint32(&Self.length) } -func (Self *FIFOQueue) Stop() { +func (Self *ReceiveWindowQueue) Stop() { Self.stopOp <- struct{}{} } -func (Self *FIFOQueue) SetTimeOut(t time.Time) { +func (Self *ReceiveWindowQueue) SetTimeOut(t time.Time) { Self.timeout = t } +type BytesQueue struct { + QueueOp + chain *bufChain + stopOp chan struct{} +} + +func (Self *BytesQueue) New() { + Self.QueueOp.New() + Self.chain = new(bufChain) + Self.chain.new(8) + Self.stopOp = make(chan struct{}, 1) +} + +func (Self *BytesQueue) Push(buf *bytes.Buffer) { + Self.chain.pushHead(unsafe.Pointer(buf)) + Self.allowPop() + return +} + +func (Self *BytesQueue) Pop() (buf *bytes.Buffer, err error) { +startPop: + ptr, ok := Self.chain.popTail() + if ok { + buf = (*bytes.Buffer)(ptr) + return + } + if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) { + select { + case <-Self.readOp: + goto startPop + case <-Self.cleanOp: + return + case <-Self.stopOp: + err = io.EOF + return + } + } + goto startPop +} + +func (Self *BytesQueue) Stop() { + Self.stopOp <- struct{}{} +} + // https://golang.org/src/sync/poolqueue.go type bufDequeue struct { @@ -224,7 +271,8 @@ type bufDequeue struct { // index has moved beyond it and typ has been set to nil. This // is set to nil atomically by the consumer and read // atomically by the producer. - vals []unsafe.Pointer + vals []unsafe.Pointer + starving uint32 } const dequeueBits = 32 @@ -253,6 +301,10 @@ func (d *bufDequeue) pack(head, tail uint32) uint64 { // queue is full. func (d *bufDequeue) pushHead(val unsafe.Pointer) bool { var slot *unsafe.Pointer + var starve uint8 + if atomic.LoadUint32(&d.starving) > 0 { + runtime.Gosched() + } for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) @@ -263,8 +315,15 @@ func (d *bufDequeue) pushHead(val unsafe.Pointer) bool { ptrs2 := d.pack(head+1, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { slot = &d.vals[head&uint32(len(d.vals)-1)] + if starve >= 3 && atomic.LoadUint32(&d.starving) > 0 { + atomic.StoreUint32(&d.starving, 0) + } break } + starve++ + if starve >= 3 { + atomic.StoreUint32(&d.starving, 1) + } } // The head slot is free, so we own it. *slot = val @@ -321,8 +380,8 @@ type bufChain struct { // tail is the bufDequeue to popTail from. This is accessed // by consumers, so reads and writes must be atomic. - tail *bufChainElt - chainStatus int32 + tail *bufChainElt + newChain uint32 } type bufChainElt struct { @@ -359,30 +418,39 @@ func (c *bufChain) new(initSize int) { } func (c *bufChain) pushHead(val unsafe.Pointer) { +startPush: for { - d := loadPoolChainElt(&c.head) - - if d.pushHead(val) { - return - } - - // The current dequeue is full. Allocate a new one of twice - // the size. - if atomic.CompareAndSwapInt32(&c.chainStatus, 0, 1) { - newSize := len(d.vals) * 2 - if newSize >= dequeueLimit { - // Can't make it any bigger. - newSize = dequeueLimit - } - - d2 := &bufChainElt{prev: d} - d2.vals = make([]unsafe.Pointer, newSize) - d2.pushHead(val) - storePoolChainElt(&d.next, d2) - storePoolChainElt(&c.head, d2) - atomic.StoreInt32(&c.chainStatus, 0) + if atomic.LoadUint32(&c.newChain) > 0 { + runtime.Gosched() + } else { + break } } + + d := loadPoolChainElt(&c.head) + + if d.pushHead(val) { + return + } + + // The current dequeue is full. Allocate a new one of twice + // the size. + if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) { + newSize := len(d.vals) * 2 + if newSize >= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &bufChainElt{prev: d} + d2.vals = make([]unsafe.Pointer, newSize) + d2.pushHead(val) + storePoolChainElt(&c.head, d2) + storePoolChainElt(&d.next, d2) + atomic.StoreUint32(&c.newChain, 0) + return + } + goto startPush } func (c *bufChain) popTail() (unsafe.Pointer, bool) {