no message

This commit is contained in:
刘河 2019-08-26 21:42:20 +08:00
parent f35a73f734
commit 4bb7e33b16
6 changed files with 32 additions and 52 deletions

View File

@ -160,6 +160,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
} }
func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
Self.Length=0
err = binary.Read(reader, binary.LittleEndian, &Self.Flag) err = binary.Read(reader, binary.LittleEndian, &Self.Flag)
if err != nil { if err != nil {
return return

View File

@ -264,15 +264,14 @@ func GetPortByAddr(addr string) int {
return p return p
} }
func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { func CopyBuffer(dst io.Writer, src io.Reader,connId int32) (written int64, err error) {
buf := pool.GetBufPoolCopy() buf := pool.GetBufPoolCopy()
defer pool.PutBufPoolCopy(buf) defer pool.PutBufPoolCopy(buf)
for { for {
nr, er := src.Read(buf) nr, er := src.Read(buf)
logs.Warn("read finish", nr, er)
if nr > 0 { if nr > 0 {
logs.Warn("write",connId, nr, string(buf[0:10]))
nw, ew := dst.Write(buf[0:nr]) nw, ew := dst.Write(buf[0:nr])
logs.Warn("write finish", nw, ew)
if nw > 0 { if nw > 0 {
written += int64(nw) written += int64(nw)
} }

View File

@ -46,7 +46,6 @@ func NewConn(connId int32, mux *Mux) *conn {
} }
func (s *conn) Read(buf []byte) (n int, err error) { func (s *conn) Read(buf []byte) (n int, err error) {
logs.Warn("starting read ", s.connId)
if s.isClose || buf == nil { if s.isClose || buf == nil {
return 0, errors.New("the conn has closed") return 0, errors.New("the conn has closed")
} }
@ -73,18 +72,16 @@ func (s *conn) Read(buf []byte) (n int, err error) {
s.Close() s.Close()
return 0, io.EOF return 0, io.EOF
} else { } else {
pool.PutBufPoolCopy(s.readBuffer) //pool.PutBufPoolCopy(s.readBuffer)
if node.val == nil { if node.val == nil {
//close //close
s.sendClose = true s.sendClose = true
s.Close() s.Close()
logs.Warn("close from read msg ", s.connId)
return 0, io.EOF return 0, io.EOF
} else { } else {
s.readBuffer = node.val s.readBuffer = node.val
s.endRead = node.l s.endRead = node.l
s.startRead = 0 s.startRead = 0
logs.Warn("get a new data buffer ", s.connId)
} }
} }
} }
@ -95,12 +92,10 @@ func (s *conn) Read(buf []byte) (n int, err error) {
n = copy(buf, s.readBuffer[s.startRead:s.endRead]) n = copy(buf, s.readBuffer[s.startRead:s.endRead])
s.startRead += n s.startRead += n
} }
logs.Warn("end read ", s.connId)
return return
} }
func (s *conn) Write(buf []byte) (n int, err error) { func (s *conn) Write(buf []byte) (n int, err error) {
logs.Warn("trying write", s.connId)
if s.isClose { if s.isClose {
return 0, errors.New("the conn has closed") return 0, errors.New("the conn has closed")
} }
@ -120,7 +115,6 @@ func (s *conn) Write(buf []byte) (n int, err error) {
if s.isClose { if s.isClose {
return 0, io.EOF return 0, io.EOF
} }
logs.Warn("write success ", s.connId)
return len(buf), nil return len(buf), nil
} }
func (s *conn) write(buf []byte, ch chan struct{}) { func (s *conn) write(buf []byte, ch chan struct{}) {
@ -139,9 +133,7 @@ func (s *conn) write(buf []byte, ch chan struct{}) {
} }
func (s *conn) Close() (err error) { func (s *conn) Close() (err error) {
logs.Warn("start closing ", s.connId)
if s.isClose { if s.isClose {
logs.Warn("already closed", s.connId)
return errors.New("the conn has closed") return errors.New("the conn has closed")
} }
s.isClose = true s.isClose = true
@ -153,7 +145,6 @@ func (s *conn) Close() (err error) {
s.mux.connMap.Delete(s.connId) s.mux.connMap.Delete(s.connId)
if !s.mux.IsClose { if !s.mux.IsClose {
if !s.sendClose { if !s.sendClose {
logs.Warn("start send closing msg", s.connId)
err = s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) err = s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
logs.Warn("send closing msg ok ", s.connId) logs.Warn("send closing msg ok ", s.connId)
if err != nil { if err != nil {
@ -161,7 +152,6 @@ func (s *conn) Close() (err error) {
return return
} }
} else { } else {
logs.Warn("send mux conn close pass ", s.connId)
} }
} }
return return

View File

@ -85,7 +85,6 @@ func (s *Mux) Addr() net.Addr {
func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) { func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) {
if flag == common.MUX_NEW_MSG { if flag == common.MUX_NEW_MSG {
logs.Warn("trying write to mux new msg", id)
} }
buf := pool.BuffPool.Get() buf := pool.BuffPool.Get()
pack := common.MuxPackager{} pack := common.MuxPackager{}
@ -109,10 +108,8 @@ func (s *Mux) sendInfo(flag uint8, id int32, content []byte) (err error) {
} }
pool.BuffPool.Put(buf) pool.BuffPool.Put(buf)
if flag == common.MUX_CONN_CLOSE { if flag == common.MUX_CONN_CLOSE {
logs.Warn("write to mux conn close success", id)
} }
if flag == common.MUX_NEW_MSG { if flag == common.MUX_NEW_MSG {
logs.Warn("write to mux new msg success", id)
} }
return return
} }
@ -164,6 +161,11 @@ func (s *Mux) readSession() {
if pack.UnPack(s.conn) != nil { if pack.UnPack(s.conn) != nil {
break break
} }
if pack.Flag != 0 && pack.Flag != 7 {
if pack.Length>10 {
logs.Warn(pack.Flag, pack.Id, pack.Length,string(pack.Content[:10]))
}
}
s.pingOk = 0 s.pingOk = 0
switch pack.Flag { switch pack.Flag {
case common.MUX_NEW_CONN: //new conn case common.MUX_NEW_CONN: //new conn
@ -180,7 +182,6 @@ func (s *Mux) readSession() {
continue continue
} }
if conn, ok := s.connMap.Get(pack.Id); ok && !conn.isClose { if conn, ok := s.connMap.Get(pack.Id); ok && !conn.isClose {
logs.Warn("read session flag id", pack.Flag, pack.Id)
switch pack.Flag { switch pack.Flag {
case common.MUX_NEW_MSG: //new msg from remote conn case common.MUX_NEW_MSG: //new msg from remote conn
//insert wait queue //insert wait queue
@ -190,7 +191,6 @@ func (s *Mux) readSession() {
conn.readWait = false conn.readWait = false
conn.readCh <- struct{}{} conn.readCh <- struct{}{}
} }
logs.Warn("push a read buffer ", conn.connId, pack.Id)
case common.MUX_NEW_CONN_OK: //conn ok case common.MUX_NEW_CONN_OK: //conn ok
conn.connStatusOkCh <- struct{}{} conn.connStatusOkCh <- struct{}{}
case common.MUX_NEW_CONN_Fail: case common.MUX_NEW_CONN_Fail:
@ -203,7 +203,6 @@ func (s *Mux) readSession() {
conn.readCh <- struct{}{} conn.readCh <- struct{}{}
} }
s.connMap.Delete(pack.Id) s.connMap.Delete(pack.Id)
logs.Warn("read session mux conn close finish", pack.Id)
} }
} else if pack.Flag == common.MUX_NEW_MSG { } else if pack.Flag == common.MUX_NEW_MSG {
pool.PutBufPoolCopy(pack.Content) pool.PutBufPoolCopy(pack.Content)

View File

@ -26,25 +26,20 @@ func TestNewMux(t *testing.T) {
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
go func() { go func() {
m2 := NewMux(conn2, "tcp") m2 := NewMux(conn2, "tcp")
connCh := make(chan bool, 1)
for { for {
c, err := m2.Accept() c, err := m2.Accept()
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
connCh <- true c2, err := net.Dial("tcp", "127.0.0.1:8080")
go func(c net.Conn, ch chan bool) { if err != nil {
c2, err := net.Dial("tcp", "127.0.0.1:80") log.Fatalln(err)
if err != nil { }
log.Fatalln(err) go common.CopyBuffer(c2, c,0)
} common.CopyBuffer(c, c2,0)
go common.CopyBuffer(c2, c) c2.Close()
common.CopyBuffer(c, c2) c.Close()
c2.Close() logs.Warn("close npc")
c.Close()
logs.Warn("close npc")
<-ch
}(c, connCh)
} }
}() }()
@ -54,25 +49,22 @@ func TestNewMux(t *testing.T) {
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
connCh := make(chan bool, 1)
for { for {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
connCh <- true
go func(conn net.Conn, ch chan bool) { tmpCpnn, err := m1.NewConn()
tmpCpnn, err := m1.NewConn() if err != nil {
if err != nil { log.Fatalln(err)
log.Fatalln(err) }
} go common.CopyBuffer(tmpCpnn, conn,tmpCpnn.connId)
go common.CopyBuffer(tmpCpnn, conn) _, err = common.CopyBuffer(conn, tmpCpnn,tmpCpnn.connId)
common.CopyBuffer(conn, tmpCpnn) logs.Warn(err, tmpCpnn.connId)
conn.Close() conn.Close()
//tmpCpnn.Close() tmpCpnn.Close()
logs.Warn("close from out nps ", tmpCpnn.connId) logs.Warn("close from out nps ", tmpCpnn.connId)
<-ch
}(conn, connCh)
} }
}() }()

View File

@ -33,8 +33,7 @@ var BufPoolSmall = sync.Pool{
} }
var BufPoolCopy = sync.Pool{ var BufPoolCopy = sync.Pool{
New: func() interface{} { New: func() interface{} {
buf := make([]byte, PoolSizeCopy) return make([]byte, PoolSizeCopy)
return &buf
}, },
} }
@ -46,12 +45,12 @@ func PutBufPoolUdp(buf []byte) {
func PutBufPoolCopy(buf []byte) { func PutBufPoolCopy(buf []byte) {
if cap(buf) == PoolSizeCopy { if cap(buf) == PoolSizeCopy {
BufPoolCopy.Put(&buf) BufPoolCopy.Put(buf[:PoolSizeCopy])
} }
} }
func GetBufPoolCopy() []byte { func GetBufPoolCopy() []byte {
return (*BufPoolCopy.Get().(*[]byte))[:PoolSizeCopy] return (BufPoolCopy.Get().([]byte))[:PoolSizeCopy]
} }
func PutBufPoolMax(buf []byte) { func PutBufPoolMax(buf []byte) {