redo web UI |web close| client log |system info |p2p |max、ump optimization

This commit is contained in:
刘河
2019-03-01 17:23:14 +08:00
parent 534d428c6d
commit f526c56784
82 changed files with 15199 additions and 4561 deletions

View File

@@ -10,7 +10,6 @@ import (
type conn struct {
net.Conn
readMsgCh chan []byte
getStatusCh chan struct{}
connStatusOkCh chan struct{}
connStatusFailCh chan struct{}
@@ -18,8 +17,14 @@ type conn struct {
writeTimeOut time.Time
sendMsgCh chan *msg //mux
sendStatusCh chan int32 //mux
readBuffer []byte
startRead int //now read position
endRead int //now end read
readFlag bool
readCh chan struct{}
connId int32
isClose bool
readWait bool
mux *Mux
}
@@ -37,7 +42,8 @@ func NewMsg(connId int32, content []byte) *msg {
func NewConn(connId int32, mux *Mux, sendMsgCh chan *msg, sendStatusCh chan int32) *conn {
return &conn{
readMsgCh: make(chan []byte),
readCh: make(chan struct{}),
readBuffer: pool.BufPoolCopy.Get().([]byte),
getStatusCh: make(chan struct{}),
connStatusOkCh: make(chan struct{}),
connStatusFailCh: make(chan struct{}),
@@ -51,75 +57,92 @@ func NewConn(connId int32, mux *Mux, sendMsgCh chan *msg, sendStatusCh chan int3
}
}
func (s *conn) Read(buf []byte) (int, error) {
func (s *conn) Read(buf []byte) (n int, err error) {
if s.isClose {
return 0, errors.New("the conn has closed")
}
var b []byte
if t := s.readTimeOut.Sub(time.Now()); t > 0 {
timer := time.NewTimer(t)
select {
case <-timer.C:
s.Close()
return 0, errors.New("read timeout")
case b = <-s.readMsgCh:
if s.endRead-s.startRead == 0 {
s.readWait = true
if t := s.readTimeOut.Sub(time.Now()); t > 0 {
timer := time.NewTimer(t)
select {
case <-timer.C:
s.readWait = false
return 0, errors.New("read timeout")
case <-s.readCh:
}
} else {
<-s.readCh
}
} else {
b = <-s.readMsgCh
}
defer pool.PutBufPoolCopy(b)
s.readWait = false
if s.isClose {
return 0, io.EOF
}
s.sendStatusCh <- s.connId
return copy(buf, b), nil
if len(buf) < s.endRead-s.startRead {
n = copy(buf, s.readBuffer[s.startRead:s.startRead+len(buf)])
s.startRead += n
} else {
n = copy(buf, s.readBuffer[s.startRead:s.endRead])
s.startRead = 0
s.endRead = 0
s.sendStatusCh <- s.connId
}
return
}
func (s *conn) Write(buf []byte) (int, error) {
if s.isClose {
return 0, errors.New("the conn has closed")
}
ch := make(chan struct{})
go s.write(buf, ch)
if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
timer := time.NewTimer(t)
select {
case <-timer.C:
s.Close()
return 0, errors.New("write timeout")
case s.sendMsgCh <- NewMsg(s.connId, buf):
case <-ch:
}
} else {
s.sendMsgCh <- NewMsg(s.connId, buf)
<-ch
}
if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
timer := time.NewTimer(t)
select {
case <-timer.C:
s.Close()
return 0, errors.New("write timeout")
case <-s.getStatusCh:
}
} else {
<-s.getStatusCh
}
if s.isClose {
return 0, io.EOF
}
return len(buf), nil
}
func (s *conn) write(buf []byte, ch chan struct{}) {
start := 0
l := len(buf)
for {
if l-start > pool.PoolSizeCopy {
s.sendMsgCh <- NewMsg(s.connId, buf[start:start+pool.PoolSizeCopy])
start += pool.PoolSizeCopy
<-s.getStatusCh
} else {
s.sendMsgCh <- NewMsg(s.connId, buf[start:l])
<-s.getStatusCh
break
}
}
ch <- struct{}{}
}
func (s *conn) Close() error {
if s.isClose {
return errors.New("the conn has closed")
}
s.isClose = true
pool.PutBufPoolCopy(s.readBuffer)
close(s.getStatusCh)
close(s.readMsgCh)
close(s.connStatusOkCh)
close(s.connStatusFailCh)
s.sendMsgCh <- NewMsg(s.connId, nil)
close(s.readCh)
if !s.mux.isClose {
s.sendMsgCh <- NewMsg(s.connId, nil)
}
return nil
}

View File

@@ -23,7 +23,7 @@ func NewConnMap() *connMap {
func (s *connMap) Get(id int32) (*conn, bool) {
s.Lock()
defer s.Unlock()
if v, ok := s.connMap[id]; ok {
if v, ok := s.connMap[id]; ok && v != nil {
return v, true
}
return nil, false

View File

@@ -125,6 +125,9 @@ func (s *Mux) writeSession() {
raw.Reset()
select {
case msg := <-s.sendMsgCh:
if msg == nil {
break
}
if msg.content == nil { //close
binary.Write(raw, binary.LittleEndian, MUX_CONN_CLOSE)
binary.Write(raw, binary.LittleEndian, msg.connId)
@@ -152,8 +155,12 @@ func (s *Mux) writeSession() {
func (s *Mux) readSession() {
go func() {
raw := bytes.NewBuffer([]byte{})
buf := pool.BufPoolCopy.Get().([]byte)
defer pool.PutBufPoolCopy(buf)
for {
var flag, i int32
var n int
var err error
if binary.Read(s.conn, binary.LittleEndian, &flag) == nil {
if binary.Read(s.conn, binary.LittleEndian, &i) != nil {
break
@@ -170,19 +177,18 @@ func (s *Mux) readSession() {
continue
case MUX_PING_FLAG: //ping
continue
case MUX_NEW_MSG:
if n, err = ReadLenBytes(buf, s.conn); err != nil {
break
}
}
if conn, ok := s.connMap.Get(i); ok {
if conn, ok := s.connMap.Get(i); ok && !conn.isClose {
switch flag {
case MUX_NEW_MSG: //new msg from remote conn
buf := pool.BufPoolCopy.Get().([]byte)
if n, err := ReadLenBytes(buf, s.conn); err == nil {
if !conn.isClose {
conn.readMsgCh <- buf[:n]
} else {
pool.PutBufPoolCopy(buf)
}
} else { //read len bytes error,the mux has broken
break
copy(conn.readBuffer, buf[:n])
conn.endRead = n
if conn.readWait {
conn.readCh <- struct{}{}
}
case MUX_MSG_SEND_OK: //the remote has read
conn.getStatusCh <- struct{}{}

View File

@@ -2,6 +2,7 @@ package mux
import (
"github.com/cnlh/nps/lib/common"
conn3 "github.com/cnlh/nps/lib/conn"
"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
"log"
"net"
@@ -35,8 +36,8 @@ func TestNewMux(t *testing.T) {
if err != nil {
log.Fatalln(err)
}
go common.CopyBuffer(c2, c)
common.CopyBuffer(c, c2)
go common.CopyBuffer(c2, conn3.NewCryptConn(c, true, nil))
common.CopyBuffer(conn3.NewCryptConn(c, true, nil), c2)
c.Close()
c2.Close()
}(c)
@@ -59,8 +60,8 @@ func TestNewMux(t *testing.T) {
if err != nil {
log.Fatalln(err)
}
go common.CopyBuffer(tmpCpnn, conn)
common.CopyBuffer(conn, tmpCpnn)
go common.CopyBuffer(conn3.NewCryptConn(tmpCpnn, true, nil), conn)
common.CopyBuffer(conn, conn3.NewCryptConn(tmpCpnn, true, nil))
conn.Close()
tmpCpnn.Close()
}(conn)