From 5f58c34c8bd53c584f832ace882fe7d175508994 Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Sat, 2 Nov 2019 22:59:52 +0800 Subject: [PATCH] perf test --- go.mod | 1 + go.sum | 2 + lib/common/pool.go | 53 ++++++++++++++ lib/mux/conn.go | 4 +- lib/mux/mux.go | 11 ++- lib/mux/mux_test.go | 122 ++++++++++++++++--------------- lib/mux/queue.go | 172 ++++++++++++++++++++------------------------ 7 files changed, 207 insertions(+), 158 deletions(-) diff --git a/go.mod b/go.mod index 8a19eaf..a540dae 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/klauspost/cpuid v1.2.1 // indirect github.com/klauspost/reedsolomon v1.9.2 // indirect github.com/onsi/gomega v1.5.0 // indirect + github.com/panjf2000/ants/v2 v2.2.2 github.com/pkg/errors v0.8.0 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect github.com/shirou/gopsutil v2.18.12+incompatible diff --git a/go.sum b/go.sum index f3a17f4..f0c4d7f 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/panjf2000/ants/v2 v2.2.2 h1:TWzusBjq/IflXhy+/S6u5wmMLCBdJnB9tPIx9Zmhvok= +github.com/panjf2000/ants/v2 v2.2.2/go.mod h1:1GFm8bV8nyCQvU5K4WvBCTG1/YBFOD2VzjffD8fV55A= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/lib/common/pool.go b/lib/common/pool.go index 240f7f9..5010012 100644 --- a/lib/common/pool.go +++ b/lib/common/pool.go @@ -2,6 +2,8 @@ package common import ( "bytes" + "github.com/panjf2000/ants/v2" + "net" "sync" ) @@ -149,11 +151,62 @@ func (Self *muxPackagerPool) Put(pack *MuxPackager) { Self.pool.Put(pack) } +type connGroup struct { + src net.Conn + dst net.Conn + wg *sync.WaitGroup +} + +func newConnGroup(src net.Conn, dst net.Conn, wg *sync.WaitGroup) connGroup { + return connGroup{ + src: src, + dst: dst, + wg: wg, + } +} + +func copyConnGroup(group interface{}) { + cg, ok := group.(connGroup) + if !ok { + return + } + _, err := CopyBuffer(cg.src, cg.dst) + if err != nil { + cg.src.Close() + cg.dst.Close() + //logs.Warn("close npc by copy from nps", err, c.connId) + } + cg.wg.Done() +} + +type Conns struct { + conn1 net.Conn + conn2 net.Conn +} + +func NewConns(c1 net.Conn, c2 net.Conn) Conns { + return Conns{ + conn1: c1, + conn2: c2, + } +} + +func copyConns(group interface{}) { + conns := group.(Conns) + wg := new(sync.WaitGroup) + wg.Add(2) + _ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg)) + _ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg)) + wg.Wait() +} + var once = sync.Once{} var BuffPool = bufferPool{} var CopyBuff = copyBufferPool{} var MuxPack = muxPackagerPool{} var WindowBuff = windowBufferPool{} +var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false)) +var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false)) func newPool() { BuffPool.New() diff --git a/lib/mux/conn.go b/lib/mux/conn.go index f3217d8..0ba6f90 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -156,7 +156,7 @@ type ReceiveWindow struct { bufQueue ReceiveWindowQueue element *ListElement readLength uint32 - readOp chan struct{} + //readOp chan struct{} readWait bool windowFull uint32 count int8 @@ -167,7 +167,7 @@ type ReceiveWindow struct { func (Self *ReceiveWindow) New(mux *Mux) { // initial a window for receive - Self.readOp = make(chan struct{}) + //Self.readOp = make(chan struct{}) Self.bufQueue.New() //Self.bw = new(bandwidth) Self.element = new(ListElement) diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 8a6086c..6a2d6b6 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -5,7 +5,6 @@ import ( "io" "math" "net" - "sync" "sync/atomic" "time" @@ -29,7 +28,7 @@ type Mux struct { connType string writeQueue PriorityQueue //bufQueue BytesQueue - sync.Mutex + //sync.Mutex } func NewMux(c net.Conn, connType string) *Mux { @@ -216,7 +215,7 @@ func (s *Mux) pingReturn() { if latency < 0.5 && latency > 0 { s.latency = latency } - //logs.Warn("latency", s.latency) + logs.Warn("latency", s.latency) common.WindowBuff.Put(data) } }() @@ -242,15 +241,19 @@ func (s *Mux) readSession() { case common.MUX_NEW_CONN: //new connection connection := NewConn(pack.Id, s, "npc ") s.connMap.Set(pack.Id, connection) //it has been set before send ok + //go func(connection *conn) { s.newConnCh <- connection s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) + //}(connection) continue case common.MUX_PING_FLAG: //ping s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content) common.WindowBuff.Put(pack.Content) continue case common.MUX_PING_RETURN: + //go func(content []byte) { s.pingCh <- pack.Content + //}(pack.Content) continue } if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose { @@ -275,8 +278,10 @@ func (s *Mux) readSession() { continue case common.MUX_CONN_CLOSE: //close the connection s.connMap.Delete(pack.Id) + //go func(connection *conn) { connection.closeFlag = true connection.receiveWindow.Stop() // close signal to receive window + //}(connection) continue } } else if pack.Flag == common.MUX_CONN_CLOSE { diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index e3f9dcc..d3061ab 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -11,7 +11,6 @@ import ( "net/http/httputil" _ "net/http/pprof" "strconv" - "sync" "testing" "time" "unsafe" @@ -30,6 +29,7 @@ func TestNewMux(t *testing.T) { logs.SetLogFuncCallDepth(3) server() client() + //poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false)) time.Sleep(time.Second * 3) go func() { m2 := NewMux(conn2, "tcp") @@ -49,31 +49,34 @@ func TestNewMux(t *testing.T) { } //c2.(*net.TCPConn).SetReadBuffer(0) //c2.(*net.TCPConn).SetReadBuffer(0) - go func(c2 net.Conn, c *conn) { - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - _, err = common.CopyBuffer(c2, c) - if err != nil { - c2.Close() - c.Close() - //logs.Warn("close npc by copy from nps", err, c.connId) - } - wg.Done() - }() - wg.Add(1) - go func() { - _, err = common.CopyBuffer(c, c2) - if err != nil { - c2.Close() - c.Close() - //logs.Warn("close npc by copy from server", err, c.connId) - } - wg.Done() - }() - //logs.Warn("npc wait") - wg.Wait() - }(c2, c.(*conn)) + _ = common.CopyConnsPool.Invoke(common.NewConns(c2, c)) + //go func(c2 net.Conn, c *conn) { + // wg := new(sync.WaitGroup) + // wg.Add(2) + // _ = poolConnCopy.Invoke(common.newConnGroup(c2, c, wg)) + // //go func() { + // // _, err = common.CopyBuffer(c2, c) + // // if err != nil { + // // c2.Close() + // // c.Close() + // // //logs.Warn("close npc by copy from nps", err, c.connId) + // // } + // // wg.Done() + // //}() + // //wg.Add(1) + // _ = poolConnCopy.Invoke(common.newConnGroup(c, c2, wg)) + // //go func() { + // // _, err = common.CopyBuffer(c, c2) + // // if err != nil { + // // c2.Close() + // // c.Close() + // // //logs.Warn("close npc by copy from server", err, c.connId) + // // } + // // wg.Done() + // //}() + // //logs.Warn("npc wait") + // wg.Wait() + //}(c2, c.(*conn)) } }() @@ -99,23 +102,30 @@ func TestNewMux(t *testing.T) { continue } //logs.Warn("nps new conn success ", tmpCpnn.connId) - go func(tmpCpnn *conn, conns net.Conn) { - go func() { - _, err := common.CopyBuffer(tmpCpnn, conns) - if err != nil { - conns.Close() - tmpCpnn.Close() - //logs.Warn("close nps by copy from user", tmpCpnn.connId, err) - } - }() - //time.Sleep(time.Second) - _, err = common.CopyBuffer(conns, tmpCpnn) - if err != nil { - conns.Close() - tmpCpnn.Close() - //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) - } - }(tmpCpnn, conns) + _ = common.CopyConnsPool.Invoke(common.NewConns(tmpCpnn, conns)) + //go func(tmpCpnn *conn, conns net.Conn) { + // wg := new(sync.WaitGroup) + // wg.Add(2) + // _ = poolConnCopy.Invoke(common.newConnGroup(tmpCpnn, conns, wg)) + // //go func() { + // // _, err := common.CopyBuffer(tmpCpnn, conns) + // // if err != nil { + // // conns.Close() + // // tmpCpnn.Close() + // // //logs.Warn("close nps by copy from user", tmpCpnn.connId, err) + // // } + // //}() + // //wg.Add(1) + // _ = poolConnCopy.Invoke(common.newConnGroup(conns, tmpCpnn, wg)) + // //time.Sleep(time.Second) + // //_, err = common.CopyBuffer(conns, tmpCpnn) + // //if err != nil { + // // conns.Close() + // // tmpCpnn.Close() + // // //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) + // //} + // wg.Wait() + //}(tmpCpnn, conns) } }() @@ -180,7 +190,7 @@ Connection: keep-alive } func test_raw(k int) { - for i := 0; i < 10; i++ { + for i := 0; i < 1; i++ { ti := time.Now() conn, err := net.Dial("tcp", "127.0.0.1:7777") if err != nil { @@ -303,7 +313,7 @@ func TestFIFO(t *testing.T) { d.New() go func() { time.Sleep(time.Second) - for i := 0; i < 30010; i++ { + for i := 0; i < 300100; i++ { data, err := d.Pop() if err == nil { //fmt.Println(i, string(data.buf), err) @@ -318,17 +328,13 @@ func TestFIFO(t *testing.T) { }() go func() { time.Sleep(time.Second * 10) - for i := 0; i < 3000; i++ { - go func(i int) { - for n := 0; n < 10; n++ { - data := new(ListElement) - by := []byte("test " + strconv.Itoa(i) + " " + strconv.Itoa(n)) // - _ = data.New(by, uint16(len(by)), true) - //fmt.Println(string((*data).buf), data) - //logs.Warn(string((*data).buf), data) - d.Push(data) - } - }(i) + for i := 0; i < 300000; i++ { + data := new(ListElement) + by := []byte("test " + strconv.Itoa(i) + " ") // + _ = data.New(by, uint16(len(by)), true) + //fmt.Println(string((*data).buf), data) + //logs.Warn(string((*data).buf), data) + d.Push(data) } }() time.Sleep(time.Second * 100000) @@ -345,7 +351,7 @@ func TestPriority(t *testing.T) { d.New() go func() { time.Sleep(time.Second) - for i := 0; i < 36005; i++ { + for i := 0; i < 360050; i++ { data := d.Pop() //fmt.Println(i, string(data.buf), err) logs.Warn(i, string(data.Content), data) @@ -354,7 +360,7 @@ func TestPriority(t *testing.T) { }() go func() { time.Sleep(time.Second * 10) - for i := 0; i < 3000; i++ { + for i := 0; i < 30000; i++ { go func(i int) { for n := 0; n < 10; n++ { data := new(common.MuxPackager) diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 6ed2dd6..e3c39a1 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -1,52 +1,24 @@ package mux import ( - "bytes" "errors" "github.com/cnlh/nps/lib/common" "io" "math" "runtime" + "sync" "sync/atomic" "time" "unsafe" ) -type QueueOp struct { - readOp chan struct{} - cleanOp chan struct{} - popWait uint32 -} - -func (Self *QueueOp) New() { - Self.readOp = make(chan struct{}) - Self.cleanOp = make(chan struct{}, 2) -} - -func (Self *QueueOp) allowPop() (closed bool) { - if atomic.CompareAndSwapUint32(&Self.popWait, 1, 0) { - select { - case Self.readOp <- struct{}{}: - return false - case <-Self.cleanOp: - return true - } - } - return -} - -func (Self *QueueOp) Clean() { - Self.cleanOp <- struct{}{} - Self.cleanOp <- struct{}{} - close(Self.cleanOp) -} - type PriorityQueue struct { - QueueOp highestChain *bufChain middleChain *bufChain lowestChain *bufChain starving uint8 + stop bool + cond *sync.Cond } func (Self *PriorityQueue) New() { @@ -56,7 +28,8 @@ func (Self *PriorityQueue) New() { Self.middleChain.new(32) Self.lowestChain = new(bufChain) Self.lowestChain.new(256) - Self.QueueOp.New() + locker := new(sync.Mutex) + Self.cond = sync.NewCond(locker) } func (Self *PriorityQueue) Push(packager *common.MuxPackager) { @@ -71,14 +44,44 @@ func (Self *PriorityQueue) Push(packager *common.MuxPackager) { default: Self.lowestChain.pushHead(unsafe.Pointer(packager)) } - Self.allowPop() + //atomic.AddUint32(&Self.count, 1) + Self.cond.Signal() return } const maxStarving uint8 = 8 func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { -startPop: + // PriorityQueue is empty, notice Push method + var iter bool + for { + packager = Self.pop() + if packager != nil { + return + } + if Self.stop { + return + } + if iter { + break + } + iter = true + runtime.Gosched() + } + Self.cond.L.Lock() + defer Self.cond.L.Unlock() + for packager = Self.pop(); packager == nil; { + if Self.stop { + return + } + Self.cond.Wait() + packager = Self.pop() + } + //atomic.AddUint32(&Self.count, ^uint32(0)) + return +} + +func (Self *PriorityQueue) pop() (packager *common.MuxPackager) { ptr, ok := Self.highestChain.popTail() if ok { packager = (*common.MuxPackager)(ptr) @@ -108,16 +111,12 @@ startPop: return } } - // PriorityQueue is empty, notice Push method - if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) { - select { - case <-Self.readOp: - goto startPop - case <-Self.cleanOp: - return nil - } - } - goto startPop + return +} + +func (Self *PriorityQueue) Stop() { + Self.stop = true + Self.cond.Broadcast() } type ListElement struct { @@ -137,18 +136,19 @@ func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) { } type ReceiveWindowQueue struct { - QueueOp chain *bufChain length uint32 stopOp chan struct{} + readOp chan struct{} + popWait uint32 timeout time.Time } func (Self *ReceiveWindowQueue) New() { - Self.QueueOp.New() + Self.readOp = make(chan struct{}) Self.chain = new(bufChain) Self.chain.new(64) - Self.stopOp = make(chan struct{}, 1) + Self.stopOp = make(chan struct{}, 2) } func (Self *ReceiveWindowQueue) Push(element *ListElement) { @@ -158,15 +158,30 @@ func (Self *ReceiveWindowQueue) Push(element *ListElement) { return } -func (Self *ReceiveWindowQueue) Pop() (element *ListElement, err error) { -startPop: +func (Self *ReceiveWindowQueue) pop() (element *ListElement) { ptr, ok := Self.chain.popTail() if ok { element = (*ListElement)(ptr) atomic.AddUint32(&Self.length, ^uint32(element.l-1)) return } + return +} + +func (Self *ReceiveWindowQueue) Pop() (element *ListElement, err error) { + var iter bool +startPop: + element = Self.pop() + if element != nil { + return + } + if !iter { + iter = true + runtime.Gosched() + goto startPop + } if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) { + iter = false t := Self.timeout.Sub(time.Now()) if t <= 0 { t = time.Minute @@ -176,8 +191,6 @@ startPop: select { case <-Self.readOp: goto startPop - case <-Self.cleanOp: - return case <-Self.stopOp: err = io.EOF return @@ -189,62 +202,31 @@ startPop: goto startPop } +func (Self *ReceiveWindowQueue) allowPop() (closed bool) { + if atomic.CompareAndSwapUint32(&Self.popWait, 1, 0) { + select { + case Self.readOp <- struct{}{}: + return false + case <-Self.stopOp: + return true + } + } + return +} + func (Self *ReceiveWindowQueue) Len() (n uint32) { return atomic.LoadUint32(&Self.length) } func (Self *ReceiveWindowQueue) Stop() { Self.stopOp <- struct{}{} + Self.stopOp <- struct{}{} } func (Self *ReceiveWindowQueue) SetTimeOut(t time.Time) { Self.timeout = t } -type BytesQueue struct { - QueueOp - chain *bufChain - stopOp chan struct{} -} - -func (Self *BytesQueue) New() { - Self.QueueOp.New() - Self.chain = new(bufChain) - Self.chain.new(8) - Self.stopOp = make(chan struct{}, 1) -} - -func (Self *BytesQueue) Push(buf *bytes.Buffer) { - Self.chain.pushHead(unsafe.Pointer(buf)) - Self.allowPop() - return -} - -func (Self *BytesQueue) Pop() (buf *bytes.Buffer, err error) { -startPop: - ptr, ok := Self.chain.popTail() - if ok { - buf = (*bytes.Buffer)(ptr) - return - } - if atomic.CompareAndSwapUint32(&Self.popWait, 0, 1) { - select { - case <-Self.readOp: - goto startPop - case <-Self.cleanOp: - return - case <-Self.stopOp: - err = io.EOF - return - } - } - goto startPop -} - -func (Self *BytesQueue) Stop() { - Self.stopOp <- struct{}{} -} - // https://golang.org/src/sync/poolqueue.go type bufDequeue struct {