fix queue bug

This commit is contained in:
ffdfgdfg 2019-10-27 23:25:12 +08:00
parent 442354db17
commit 5f35415849
4 changed files with 275 additions and 194 deletions

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
"io" "io"
"net" "net"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -41,12 +40,12 @@ func NewConn(connId int32, mux *Mux, label ...string) *conn {
} }
c.receiveWindow.New(mux) c.receiveWindow.New(mux)
c.sendWindow.New(mux) c.sendWindow.New(mux)
logm := &connLog{ //logm := &connLog{
startTime: time.Now(), // startTime: time.Now(),
isClose: false, // isClose: false,
logs: []string{c.label + "new conn success"}, // logs: []string{c.label + "new conn success"},
} //}
setM(label[0], int(connId), logm) //setM(label[0], int(connId), logm)
return c return c
} }
@ -59,15 +58,15 @@ func (s *conn) Read(buf []byte) (n int, err error) {
} }
// waiting for takeout from receive window finish or timeout // waiting for takeout from receive window finish or timeout
n, err = s.receiveWindow.Read(buf, s.connId) n, err = s.receiveWindow.Read(buf, s.connId)
var errstr string //var errstr string
if err == nil { //if err == nil {
errstr = "err:nil" // errstr = "err:nil"
} else { //} else {
errstr = err.Error() // errstr = err.Error()
} //}
d := getM(s.label, int(s.connId)) //d := getM(s.label, int(s.connId))
d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100])) //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
setM(s.label, int(s.connId), d) //setM(s.label, int(s.connId), d)
return return
} }
@ -102,10 +101,10 @@ func (s *conn) closeProcess() {
} }
s.sendWindow.CloseWindow() s.sendWindow.CloseWindow()
s.receiveWindow.CloseWindow() s.receiveWindow.CloseWindow()
d := getM(s.label, int(s.connId)) //d := getM(s.label, int(s.connId))
d.isClose = true //d.isClose = true
d.logs = append(d.logs, s.label+"close "+time.Now().String()) //d.logs = append(d.logs, s.label+"close "+time.Now().String())
setM(s.label, int(s.connId), d) //setM(s.label, int(s.connId), d)
return return
} }
@ -154,12 +153,12 @@ func (Self *window) CloseWindow() {
} }
type ReceiveWindow struct { type ReceiveWindow struct {
bufQueue FIFOQueue bufQueue ReceiveWindowQueue
element *ListElement element *ListElement
readLength uint32 readLength uint32
readOp chan struct{} readOp chan struct{}
readWait bool readWait bool
windowFull bool windowFull uint32
count int8 count int8
//bw *bandwidth //bw *bandwidth
once sync.Once once sync.Once
@ -179,7 +178,7 @@ func (Self *ReceiveWindow) New(mux *Mux) {
func (Self *ReceiveWindow) remainingSize() (n uint32) { func (Self *ReceiveWindow) remainingSize() (n uint32) {
// receive window remaining // receive window remaining
return Self.maxSize - Self.bufQueue.Len() return atomic.LoadUint32(&Self.maxSize) - Self.bufQueue.Len()
} }
func (Self *ReceiveWindow) readSize() (n uint32) { func (Self *ReceiveWindow) readSize() (n uint32) {
@ -207,7 +206,7 @@ func (Self *ReceiveWindow) calcSize() {
} }
// set the maximum size // set the maximum size
//logs.Warn("n", n) //logs.Warn("n", n)
Self.maxSize = n atomic.StoreUint32(&Self.maxSize, n)
Self.count = -10 Self.count = -10
} }
Self.count += 1 Self.count += 1
@ -229,7 +228,7 @@ func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err
Self.calcSize() Self.calcSize()
//logs.Warn("read session calc size finish", Self.maxSize) //logs.Warn("read session calc size finish", Self.maxSize)
if Self.remainingSize() == 0 { if Self.remainingSize() == 0 {
Self.windowFull = true atomic.StoreUint32(&Self.windowFull, 1)
//logs.Warn("window full true", Self.windowFull) //logs.Warn("window full true", Self.windowFull)
} }
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize()) Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize())
@ -259,7 +258,7 @@ copyData:
} }
//logs.Warn("pop element", Self.element.l, Self.element.part) //logs.Warn("pop element", Self.element.l, Self.element.part)
} }
l = copy(p[pOff:], Self.element.buf[Self.off:]) l = copy(p[pOff:], Self.element.buf[Self.off:Self.element.l])
//Self.bw.SetCopySize(l) //Self.bw.SetCopySize(l)
pOff += l pOff += l
Self.off += uint32(l) Self.off += uint32(l)
@ -281,13 +280,16 @@ copyData:
} }
func (Self *ReceiveWindow) sendStatus(id int32) { func (Self *ReceiveWindow) sendStatus(id int32) {
if Self.windowFull || Self.bufQueue.Len() == 0 { if Self.bufQueue.Len() == 0 {
// window is full before read or empty now // window is full before read or empty now
Self.windowFull = false Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), Self.readSize())
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, Self.readSize())
// acknowledge other side, have empty some receive window space // acknowledge other side, have empty some receive window space
//} //}
} }
if atomic.LoadUint32(&Self.windowFull) > 0 && Self.remainingSize() > 0 {
atomic.StoreUint32(&Self.windowFull, 0)
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), Self.readSize())
}
} }
func (Self *ReceiveWindow) SetTimeOut(t time.Time) { func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
@ -309,8 +311,7 @@ type SendWindow struct {
buf []byte buf []byte
sentLength uint32 sentLength uint32
setSizeCh chan struct{} setSizeCh chan struct{}
setSizeWait int32 setSizeWait uint32
unSlide uint32
timeout time.Time timeout time.Time
window window
} }
@ -352,10 +353,10 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
if Self.RemainingSize() == 0 { if Self.RemainingSize() == 0 {
//logs.Warn("waiting for another window size after slide") //logs.Warn("waiting for another window size after slide")
// keep the wait status // keep the wait status
atomic.StoreInt32(&Self.setSizeWait, 1) //atomic.StoreUint32(&Self.setSizeWait, 1)
return false return false
} }
if atomic.CompareAndSwapInt32(&Self.setSizeWait, 1, 0) { if atomic.CompareAndSwapUint32(&Self.setSizeWait, 1, 0) {
// send window into the wait status, need notice the channel // send window into the wait status, need notice the channel
select { select {
case Self.setSizeCh <- struct{}{}: case Self.setSizeCh <- struct{}{}:
@ -372,7 +373,7 @@ func (Self *SendWindow) SetSize(windowSize, readLength uint32) (closed bool) {
func (Self *SendWindow) slide(windowSize, readLength uint32) { func (Self *SendWindow) slide(windowSize, readLength uint32) {
atomic.AddUint32(&Self.sentLength, ^readLength-1) atomic.AddUint32(&Self.sentLength, ^readLength-1)
atomic.SwapUint32(&Self.maxSize, windowSize) atomic.StoreUint32(&Self.maxSize, windowSize)
} }
func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) { func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
@ -386,7 +387,7 @@ func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err err
// send window buff is drain, return eof and get another one // send window buff is drain, return eof and get another one
} }
if Self.RemainingSize() == 0 { if Self.RemainingSize() == 0 {
atomic.StoreInt32(&Self.setSizeWait, 1) atomic.StoreUint32(&Self.setSizeWait, 1)
// into the wait status // into the wait status
err = Self.waitReceiveWindow() err = Self.waitReceiveWindow()
if err != nil { if err != nil {
@ -395,20 +396,20 @@ func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err err
} }
if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE { if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
sendSize = common.MAXIMUM_SEGMENT_SIZE sendSize = common.MAXIMUM_SEGMENT_SIZE
part = true
//logs.Warn("cut buf by mss") //logs.Warn("cut buf by mss")
} else { } else {
sendSize = uint32(len(Self.buf[Self.off:])) sendSize = uint32(len(Self.buf[Self.off:]))
part = false
} }
if Self.RemainingSize() < sendSize { if Self.RemainingSize() < sendSize {
// usable window size is small than // usable window size is small than
// window MAXIMUM_SEGMENT_SIZE or send buf left // window MAXIMUM_SEGMENT_SIZE or send buf left
sendSize = Self.RemainingSize() sendSize = Self.RemainingSize()
//logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:])) //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
part = true
} }
//logs.Warn("send size", sendSize) //logs.Warn("send size", sendSize)
if sendSize < uint32(len(Self.buf[Self.off:])) {
part = true
}
p = Self.buf[Self.off : sendSize+Self.off] p = Self.buf[Self.off : sendSize+Self.off]
Self.off += sendSize Self.off += sendSize
atomic.AddUint32(&Self.sentLength, sendSize) atomic.AddUint32(&Self.sentLength, sendSize)

View File

@ -1,7 +1,6 @@
package mux package mux
import ( import (
"bytes"
"errors" "errors"
"io" "io"
"math" "math"
@ -29,7 +28,7 @@ type Mux struct {
pingTimer *time.Timer pingTimer *time.Timer
connType string connType string
writeQueue PriorityQueue writeQueue PriorityQueue
bufCh chan *bytes.Buffer //bufQueue BytesQueue
sync.Mutex sync.Mutex
} }
@ -40,16 +39,16 @@ func NewMux(c net.Conn, connType string) *Mux {
conn: c, conn: c,
connMap: NewConnMap(), connMap: NewConnMap(),
id: 0, id: 0,
closeChan: make(chan struct{}, 3), closeChan: make(chan struct{}, 1),
newConnCh: make(chan *conn, 10), newConnCh: make(chan *conn, 10),
bw: new(bandwidth), bw: new(bandwidth),
IsClose: false, IsClose: false,
connType: connType, connType: connType,
bufCh: make(chan *bytes.Buffer),
pingCh: make(chan []byte), pingCh: make(chan []byte),
pingTimer: time.NewTimer(15 * time.Second), pingTimer: time.NewTimer(15 * time.Second),
} }
m.writeQueue.New() m.writeQueue.New()
//m.bufQueue.New()
//read session by flag //read session by flag
m.readSession() m.readSession()
//ping //ping
@ -111,16 +110,18 @@ func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
func (s *Mux) writeSession() { func (s *Mux) writeSession() {
go s.packBuf() go s.packBuf()
go s.writeBuf() //go s.writeBuf()
} }
func (s *Mux) packBuf() { func (s *Mux) packBuf() {
buffer := common.BuffPool.Get()
for { for {
if s.IsClose { if s.IsClose {
break break
} }
buffer.Reset()
pack := s.writeQueue.Pop() pack := s.writeQueue.Pop()
buffer := common.BuffPool.Get() //buffer := common.BuffPool.Get()
err := pack.Pack(buffer) err := pack.Pack(buffer)
common.MuxPack.Put(pack) common.MuxPack.Put(pack)
if err != nil { if err != nil {
@ -129,35 +130,38 @@ func (s *Mux) packBuf() {
break break
} }
//logs.Warn(buffer.String()) //logs.Warn(buffer.String())
select { //s.bufQueue.Push(buffer)
case s.bufCh <- buffer:
case <-s.closeChan:
break
}
}
}
func (s *Mux) writeBuf() {
for {
if s.IsClose {
break
}
select {
case buffer := <-s.bufCh:
l := buffer.Len() l := buffer.Len()
n, err := buffer.WriteTo(s.conn) n, err := buffer.WriteTo(s.conn)
common.BuffPool.Put(buffer) //common.BuffPool.Put(buffer)
if err != nil || int(n) != l { if err != nil || int(n) != l {
logs.Warn("close from write session fail ", err, n, l) logs.Warn("close from write session fail ", err, n, l)
s.Close() s.Close()
break break
} }
case <-s.closeChan:
break
}
} }
} }
//func (s *Mux) writeBuf() {
// for {
// if s.IsClose {
// break
// }
// buffer, err := s.bufQueue.Pop()
// if err != nil {
// break
// }
// l := buffer.Len()
// n, err := buffer.WriteTo(s.conn)
// common.BuffPool.Put(buffer)
// if err != nil || int(n) != l {
// logs.Warn("close from write session fail ", err, n, l)
// s.Close()
// break
// }
// }
//}
func (s *Mux) ping() { func (s *Mux) ping() {
go func() { go func() {
now, _ := time.Now().UTC().MarshalText() now, _ := time.Now().UTC().MarshalText()
@ -310,8 +314,7 @@ func (s *Mux) Close() error {
} }
s.IsClose = true s.IsClose = true
s.connMap.Close() s.connMap.Close()
s.closeChan <- struct{}{} //s.bufQueue.Stop()
s.closeChan <- struct{}{}
s.closeChan <- struct{}{} s.closeChan <- struct{}{}
close(s.newConnCh) close(s.newConnCh)
return s.conn.Close() return s.conn.Close()

View File

@ -4,12 +4,14 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/common"
"io"
"log" "log"
"net" "net"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
_ "net/http/pprof" _ "net/http/pprof"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
"unsafe" "unsafe"
@ -48,42 +50,42 @@ func TestNewMux(t *testing.T) {
//c2.(*net.TCPConn).SetReadBuffer(0) //c2.(*net.TCPConn).SetReadBuffer(0)
//c2.(*net.TCPConn).SetReadBuffer(0) //c2.(*net.TCPConn).SetReadBuffer(0)
go func(c2 net.Conn, c *conn) { go func(c2 net.Conn, c *conn) {
//wg := sync.WaitGroup{} wg := sync.WaitGroup{}
//wg.Add(1) wg.Add(1)
//go func() { go func() {
// _, err = common.CopyBuffer(c2, c) _, err = common.CopyBuffer(c2, c)
// if err != nil { if err != nil {
// c2.Close() c2.Close()
// c.Close() c.Close()
// logs.Warn("close npc by copy from nps", err, c.connId) //logs.Warn("close npc by copy from nps", err, c.connId)
// } }
// wg.Done() wg.Done()
//}() }()
//wg.Add(1) wg.Add(1)
//go func() { go func() {
// _, err = common.CopyBuffer(c, c2) _, err = common.CopyBuffer(c, c2)
// if err != nil { if err != nil {
// c2.Close() c2.Close()
// c.Close() c.Close()
// logs.Warn("close npc by copy from server", err, c.connId) //logs.Warn("close npc by copy from server", err, c.connId)
// } }
// wg.Done() wg.Done()
//}() }()
////logs.Warn("npc wait") //logs.Warn("npc wait")
//wg.Wait() wg.Wait()
}(c2, c.(*conn)) }(c2, c.(*conn))
} }
}() }()
go func() { go func() {
//m1 := NewMux(conn1, "tcp") m1 := NewMux(conn1, "tcp")
l, err := net.Listen("tcp", "127.0.0.1:7777") l, err := net.Listen("tcp", "127.0.0.1:7777")
if err != nil { if err != nil {
logs.Warn(err) logs.Warn(err)
} }
for { for {
//logs.Warn("nps starting accept") //logs.Warn("nps starting accept")
_, err := l.Accept() conns, err := l.Accept()
if err != nil { if err != nil {
logs.Warn(err) logs.Warn(err)
continue continue
@ -91,37 +93,38 @@ func TestNewMux(t *testing.T) {
//conns.(*net.TCPConn).SetReadBuffer(0) //conns.(*net.TCPConn).SetReadBuffer(0)
//conns.(*net.TCPConn).SetReadBuffer(0) //conns.(*net.TCPConn).SetReadBuffer(0)
//logs.Warn("nps accept success starting new conn") //logs.Warn("nps accept success starting new conn")
//tmpCpnn, err := m1.NewConn() tmpCpnn, err := m1.NewConn()
//if err != nil { if err != nil {
// logs.Warn("nps new conn err ", err) logs.Warn("nps new conn err ", err)
// continue continue
//} }
////logs.Warn("nps new conn success ", tmpCpnn.connId) //logs.Warn("nps new conn success ", tmpCpnn.connId)
//go func(tmpCpnn *conn, conns net.Conn) { go func(tmpCpnn *conn, conns net.Conn) {
// //go func() { go func() {
// // _, err := common.CopyBuffer(tmpCpnn, conns) _, err := common.CopyBuffer(tmpCpnn, conns)
// // if err != nil { if err != nil {
// // conns.Close() conns.Close()
// // tmpCpnn.Close() tmpCpnn.Close()
// // logs.Warn("close nps by copy from user", tmpCpnn.connId, err) //logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
// // } }
// //}() }()
// ////time.Sleep(time.Second) //time.Sleep(time.Second)
// //_, err = common.CopyBuffer(conns, tmpCpnn) _, err = common.CopyBuffer(conns, tmpCpnn)
// //if err != nil { if err != nil {
// // conns.Close() conns.Close()
// // tmpCpnn.Close() tmpCpnn.Close()
// // logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
// //} }
//}(tmpCpnn, conns) }(tmpCpnn, conns)
} }
}() }()
go NewLogServer() //go NewLogServer()
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
go test_raw(i) go test_raw(i)
} }
//test_request()
for { for {
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
@ -154,7 +157,7 @@ func client() {
func test_request() { func test_request() {
conn, _ := net.Dial("tcp", "127.0.0.1:7777") conn, _ := net.Dial("tcp", "127.0.0.1:7777")
for { for {
conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1 conn.Write([]byte(`GET / HTTP/1.1
Host: 127.0.0.1:7777 Host: 127.0.0.1:7777
Connection: keep-alive Connection: keep-alive
@ -177,39 +180,42 @@ Connection: keep-alive
} }
func test_raw(k int) { func test_raw(k int) {
for i := 0; i < 1; i++ { for i := 0; i < 10; i++ {
ti := time.Now() ti := time.Now()
_, _ = net.Dial("tcp", "127.0.0.1:7777") conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
logs.Warn("conn dial err", err)
}
tid := time.Now() tid := time.Now()
// conn.Write([]byte(`GET / HTTP/1.1 conn.Write([]byte(`GET / HTTP/1.1
//Host: 127.0.0.1:7777 Host: 127.0.0.1:7777
//
//
//`)) `))
// tiw := time.Now() tiw := time.Now()
//buf := make([]byte, 3572) buf := make([]byte, 3572)
//n, err := io.ReadFull(conn, buf) n, err := io.ReadFull(conn, buf)
////n, err := conn.Read(buf) //n, err := conn.Read(buf)
//if err != nil { if err != nil {
// logs.Warn("close by read response err", err) logs.Warn("close by read response err", err)
// break break
//} }
////logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
////time.Sleep(time.Second) //time.Sleep(time.Second)
//err = conn.Close() err = conn.Close()
//if err != nil { if err != nil {
// logs.Warn("close conn err ", err) logs.Warn("close conn err ", err)
//} }
now := time.Now() now := time.Now()
du := now.Sub(ti).Seconds() du := now.Sub(ti).Seconds()
dud := now.Sub(tid).Seconds() dud := now.Sub(tid).Seconds()
//duw := now.Sub(tiw).Seconds() duw := now.Sub(tiw).Seconds()
//if du > 1 { if du > 1 {
logs.Warn("duration long", du, dud, k, i) logs.Warn("duration long", du, dud, duw, k, i)
//} }
//if n != 3572 { if n != 3572 {
// logs.Warn("n loss", n, string(buf)) logs.Warn("n loss", n, string(buf))
//} }
} }
} }
@ -293,11 +299,11 @@ func TestFIFO(t *testing.T) {
logs.EnableFuncCallDepth(true) logs.EnableFuncCallDepth(true)
logs.SetLogFuncCallDepth(3) logs.SetLogFuncCallDepth(3)
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
d := new(FIFOQueue) d := new(ReceiveWindowQueue)
d.New() d.New()
go func() { go func() {
time.Sleep(time.Second) time.Sleep(time.Second)
for i := 0; i < 30000; i++ { for i := 0; i < 30010; i++ {
data, err := d.Pop() data, err := d.Pop()
if err == nil { if err == nil {
//fmt.Println(i, string(data.buf), err) //fmt.Println(i, string(data.buf), err)
@ -306,7 +312,9 @@ func TestFIFO(t *testing.T) {
//fmt.Println("err", err) //fmt.Println("err", err)
logs.Warn("err", err) logs.Warn("err", err)
} }
//logs.Warn(d.Len())
} }
logs.Warn("pop finish")
}() }()
go func() { go func() {
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
@ -314,10 +322,10 @@ func TestFIFO(t *testing.T) {
go func(i int) { go func(i int) {
for n := 0; n < 10; n++ { for n := 0; n < 10; n++ {
data := new(ListElement) data := new(ListElement)
by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n)) by := []byte("test " + strconv.Itoa(i) + " " + strconv.Itoa(n)) //
_ = data.New(by, uint16(len(by)), true) _ = data.New(by, uint16(len(by)), true)
//fmt.Println(string((*data).buf), data) //fmt.Println(string((*data).buf), data)
logs.Warn(string((*data).buf), data) //logs.Warn(string((*data).buf), data)
d.Push(data) d.Push(data)
} }
}(i) }(i)
@ -337,11 +345,12 @@ func TestPriority(t *testing.T) {
d.New() d.New()
go func() { go func() {
time.Sleep(time.Second) time.Sleep(time.Second)
for i := 0; i < 36000; i++ { for i := 0; i < 36005; i++ {
data := d.Pop() data := d.Pop()
//fmt.Println(i, string(data.buf), err) //fmt.Println(i, string(data.buf), err)
logs.Warn(i, string(data.Content), data) logs.Warn(i, string(data.Content), data)
} }
logs.Warn("pop finish")
}() }()
go func() { go func() {
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)

View File

@ -1,10 +1,12 @@
package mux package mux
import ( import (
"bytes"
"errors" "errors"
"github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/common"
"io" "io"
"math" "math"
"runtime"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -13,7 +15,7 @@ import (
type QueueOp struct { type QueueOp struct {
readOp chan struct{} readOp chan struct{}
cleanOp chan struct{} cleanOp chan struct{}
popWait int32 popWait uint32
} }
func (Self *QueueOp) New() { func (Self *QueueOp) New() {
@ -22,7 +24,7 @@ func (Self *QueueOp) New() {
} }
func (Self *QueueOp) allowPop() (closed bool) { func (Self *QueueOp) allowPop() (closed bool) {
if atomic.CompareAndSwapInt32(&Self.popWait, 1, 0) { if atomic.CompareAndSwapUint32(&Self.popWait, 1, 0) {
select { select {
case Self.readOp <- struct{}{}: case Self.readOp <- struct{}{}:
return false return false
@ -44,7 +46,7 @@ type PriorityQueue struct {
highestChain *bufChain highestChain *bufChain
middleChain *bufChain middleChain *bufChain
lowestChain *bufChain lowestChain *bufChain
hunger uint8 starving uint8
} }
func (Self *PriorityQueue) New() { func (Self *PriorityQueue) New() {
@ -73,7 +75,7 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
return return
} }
const maxHunger uint8 = 10 const maxStarving uint8 = 8
func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
startPop: startPop:
@ -82,31 +84,32 @@ startPop:
packager = (*common.MuxPackager)(ptr) packager = (*common.MuxPackager)(ptr)
return return
} }
if Self.hunger < maxHunger { if Self.starving < maxStarving {
ptr, ok = Self.middleChain.popTail() ptr, ok = Self.middleChain.popTail()
if ok { if ok {
packager = (*common.MuxPackager)(ptr) packager = (*common.MuxPackager)(ptr)
Self.hunger++ Self.starving++
return return
} }
} }
ptr, ok = Self.lowestChain.popTail() ptr, ok = Self.lowestChain.popTail()
if ok { if ok {
packager = (*common.MuxPackager)(ptr) packager = (*common.MuxPackager)(ptr)
if Self.hunger > 0 { if Self.starving > 0 {
Self.hunger = uint8(Self.hunger / 2) Self.starving = uint8(Self.starving / 2)
} }
return return
} }
if Self.hunger > 0 { if Self.starving > 0 {
ptr, ok = Self.middleChain.popTail() ptr, ok = Self.middleChain.popTail()
if ok { if ok {
packager = (*common.MuxPackager)(ptr) packager = (*common.MuxPackager)(ptr)
Self.starving++
return return
} }
} }
// PriorityQueue is empty, notice Push method // PriorityQueue is empty, notice Push method
if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) { if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) {
select { select {
case <-Self.readOp: case <-Self.readOp:
goto startPop goto startPop
@ -133,7 +136,7 @@ func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) {
return nil return nil
} }
type FIFOQueue struct { type ReceiveWindowQueue struct {
QueueOp QueueOp
chain *bufChain chain *bufChain
length uint32 length uint32
@ -141,21 +144,21 @@ type FIFOQueue struct {
timeout time.Time timeout time.Time
} }
func (Self *FIFOQueue) New() { func (Self *ReceiveWindowQueue) New() {
Self.QueueOp.New() Self.QueueOp.New()
Self.chain = new(bufChain) Self.chain = new(bufChain)
Self.chain.new(64) Self.chain.new(64)
Self.stopOp = make(chan struct{}, 1) Self.stopOp = make(chan struct{}, 1)
} }
func (Self *FIFOQueue) Push(element *ListElement) { func (Self *ReceiveWindowQueue) Push(element *ListElement) {
Self.chain.pushHead(unsafe.Pointer(element)) Self.chain.pushHead(unsafe.Pointer(element))
atomic.AddUint32(&Self.length, uint32(element.l)) atomic.AddUint32(&Self.length, uint32(element.l))
Self.allowPop() Self.allowPop()
return return
} }
func (Self *FIFOQueue) Pop() (element *ListElement, err error) { func (Self *ReceiveWindowQueue) Pop() (element *ListElement, err error) {
startPop: startPop:
ptr, ok := Self.chain.popTail() ptr, ok := Self.chain.popTail()
if ok { if ok {
@ -163,7 +166,7 @@ startPop:
atomic.AddUint32(&Self.length, ^uint32(element.l-1)) atomic.AddUint32(&Self.length, ^uint32(element.l-1))
return return
} }
if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) { if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) {
t := Self.timeout.Sub(time.Now()) t := Self.timeout.Sub(time.Now())
if t <= 0 { if t <= 0 {
t = time.Minute t = time.Minute
@ -186,18 +189,62 @@ startPop:
goto startPop goto startPop
} }
func (Self *FIFOQueue) Len() (n uint32) { func (Self *ReceiveWindowQueue) Len() (n uint32) {
return atomic.LoadUint32(&Self.length) return atomic.LoadUint32(&Self.length)
} }
func (Self *FIFOQueue) Stop() { func (Self *ReceiveWindowQueue) Stop() {
Self.stopOp <- struct{}{} Self.stopOp <- struct{}{}
} }
func (Self *FIFOQueue) SetTimeOut(t time.Time) { func (Self *ReceiveWindowQueue) SetTimeOut(t time.Time) {
Self.timeout = t Self.timeout = t
} }
type BytesQueue struct {
QueueOp
chain *bufChain
stopOp chan struct{}
}
func (Self *BytesQueue) New() {
Self.QueueOp.New()
Self.chain = new(bufChain)
Self.chain.new(8)
Self.stopOp = make(chan struct{}, 1)
}
func (Self *BytesQueue) Push(buf *bytes.Buffer) {
Self.chain.pushHead(unsafe.Pointer(buf))
Self.allowPop()
return
}
func (Self *BytesQueue) Pop() (buf *bytes.Buffer, err error) {
startPop:
ptr, ok := Self.chain.popTail()
if ok {
buf = (*bytes.Buffer)(ptr)
return
}
if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) {
select {
case <-Self.readOp:
goto startPop
case <-Self.cleanOp:
return
case <-Self.stopOp:
err = io.EOF
return
}
}
goto startPop
}
func (Self *BytesQueue) Stop() {
Self.stopOp <- struct{}{}
}
// https://golang.org/src/sync/poolqueue.go // https://golang.org/src/sync/poolqueue.go
type bufDequeue struct { type bufDequeue struct {
@ -225,6 +272,7 @@ type bufDequeue struct {
// is set to nil atomically by the consumer and read // is set to nil atomically by the consumer and read
// atomically by the producer. // atomically by the producer.
vals []unsafe.Pointer vals []unsafe.Pointer
starving uint32
} }
const dequeueBits = 32 const dequeueBits = 32
@ -253,6 +301,10 @@ func (d *bufDequeue) pack(head, tail uint32) uint64 {
// queue is full. // queue is full.
func (d *bufDequeue) pushHead(val unsafe.Pointer) bool { func (d *bufDequeue) pushHead(val unsafe.Pointer) bool {
var slot *unsafe.Pointer var slot *unsafe.Pointer
var starve uint8
if atomic.LoadUint32(&d.starving) > 0 {
runtime.Gosched()
}
for { for {
ptrs := atomic.LoadUint64(&d.headTail) ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs) head, tail := d.unpack(ptrs)
@ -263,8 +315,15 @@ func (d *bufDequeue) pushHead(val unsafe.Pointer) bool {
ptrs2 := d.pack(head+1, tail) ptrs2 := d.pack(head+1, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[head&uint32(len(d.vals)-1)] slot = &d.vals[head&uint32(len(d.vals)-1)]
if starve >= 3 && atomic.LoadUint32(&d.starving) > 0 {
atomic.StoreUint32(&d.starving, 0)
}
break break
} }
starve++
if starve >= 3 {
atomic.StoreUint32(&d.starving, 1)
}
} }
// The head slot is free, so we own it. // The head slot is free, so we own it.
*slot = val *slot = val
@ -322,7 +381,7 @@ type bufChain struct {
// tail is the bufDequeue to popTail from. This is accessed // tail is the bufDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic. // by consumers, so reads and writes must be atomic.
tail *bufChainElt tail *bufChainElt
chainStatus int32 newChain uint32
} }
type bufChainElt struct { type bufChainElt struct {
@ -359,7 +418,15 @@ func (c *bufChain) new(initSize int) {
} }
func (c *bufChain) pushHead(val unsafe.Pointer) { func (c *bufChain) pushHead(val unsafe.Pointer) {
startPush:
for { for {
if atomic.LoadUint32(&c.newChain) > 0 {
runtime.Gosched()
} else {
break
}
}
d := loadPoolChainElt(&c.head) d := loadPoolChainElt(&c.head)
if d.pushHead(val) { if d.pushHead(val) {
@ -368,7 +435,7 @@ func (c *bufChain) pushHead(val unsafe.Pointer) {
// The current dequeue is full. Allocate a new one of twice // The current dequeue is full. Allocate a new one of twice
// the size. // the size.
if atomic.CompareAndSwapInt32(&c.chainStatus, 0, 1) { if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) {
newSize := len(d.vals) * 2 newSize := len(d.vals) * 2
if newSize >= dequeueLimit { if newSize >= dequeueLimit {
// Can't make it any bigger. // Can't make it any bigger.
@ -378,11 +445,12 @@ func (c *bufChain) pushHead(val unsafe.Pointer) {
d2 := &bufChainElt{prev: d} d2 := &bufChainElt{prev: d}
d2.vals = make([]unsafe.Pointer, newSize) d2.vals = make([]unsafe.Pointer, newSize)
d2.pushHead(val) d2.pushHead(val)
storePoolChainElt(&d.next, d2)
storePoolChainElt(&c.head, d2) storePoolChainElt(&c.head, d2)
atomic.StoreInt32(&c.chainStatus, 0) storePoolChainElt(&d.next, d2)
} atomic.StoreUint32(&c.newChain, 0)
return
} }
goto startPush
} }
func (c *bufChain) popTail() (unsafe.Pointer, bool) { func (c *bufChain) popTail() (unsafe.Pointer, bool) {