diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index e3e0919..96ebaec 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -22,34 +22,24 @@ type BasePackager struct { Content []byte } -func (Self *BasePackager) NewPac(content []byte) (err error) { +func (Self *BasePackager) NewPac(contents ...interface{}) (err error) { Self.clean() - if content != nil { - n := len(content) - if n > MAXIMUM_SEGMENT_SIZE { - err = errors.New("mux:packer: newpack content segment too large") + for _, content := range contents { + switch content.(type) { + case nil: + Self.Content = Self.Content[:0] + case []byte: + err = Self.appendByte(content.([]byte)) + case string: + err = Self.appendByte([]byte(content.(string))) + if err != nil { + return + } + err = Self.appendByte([]byte(CONN_DATA_SEQ)) + default: + err = Self.marshal(content) } - Self.Content = Self.Content[:n] - copy(Self.Content, content) - } else { - Self.Content = Self.Content[:0] } - //for _, content := range contents { - // switch content.(type) { - // case nil: - // Self.Content = Self.Content[:0] - // case []byte: - // err = Self.appendByte(content.([]byte)) - // case string: - // err = Self.appendByte([]byte(content.(string))) - // if err != nil { - // return - // } - // err = Self.appendByte([]byte(CONN_DATA_SEQ)) - // default: - // err = Self.marshal(content) - // } - //} Self.setLength() return } @@ -88,9 +78,6 @@ func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) { if int(Self.Length) > cap(Self.Content) { err = errors.New("unpack err, content length too large") } - if Self.Length > MAXIMUM_SEGMENT_SIZE { - err = errors.New("mux:packer: unpack content segment too large") - } Self.Content = Self.Content[:int(Self.Length)] //n, err := io.ReadFull(reader, Self.Content) //if n != int(Self.Length) { @@ -141,49 +128,61 @@ type ConnPackager struct { BasePackager } -//func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) { -// Self.ConnType = connType -// err = Self.BasePackager.NewPac(content...) -// return -//} -// -//func (Self *ConnPackager) Pack(writer io.Writer) (err error) { -// err = binary.Write(writer, binary.LittleEndian, Self.ConnType) -// if err != nil { -// return -// } -// err = Self.BasePackager.Pack(writer) -// return -//} -// -//func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { -// err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) -// if err != nil && err != io.EOF { -// return -// } -// n, err = Self.BasePackager.UnPack(reader) -// n += 2 -// return -//} +func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) { + Self.ConnType = connType + err = Self.BasePackager.NewPac(content...) + return +} + +func (Self *ConnPackager) Pack(writer io.Writer) (err error) { + err = binary.Write(writer, binary.LittleEndian, Self.ConnType) + if err != nil { + return + } + err = Self.BasePackager.Pack(writer) + return +} + +func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { + err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) + if err != nil && err != io.EOF { + return + } + n, err = Self.BasePackager.UnPack(reader) + n += 2 + return +} type MuxPackager struct { - Flag uint8 - Id int32 - RemainLength uint32 + Flag uint8 + Id int32 + Window uint32 + ReadLength uint32 BasePackager } -func (Self *MuxPackager) NewPac(flag uint8, id int32, content interface{}) (err error) { +func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) { Self.Flag = flag Self.Id = id switch flag { case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART: Self.Content = WindowBuff.Get() - err = Self.BasePackager.NewPac(content.([]byte)) + err = Self.BasePackager.NewPac(content...) //logs.Warn(Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: // MUX_MSG_SEND_OK contains two data - Self.RemainLength = content.(uint32) + switch content[0].(type) { + case int: + Self.Window = uint32(content[0].(int)) + case uint32: + Self.Window = content[0].(uint32) + } + switch content[1].(type) { + case int: + Self.ReadLength = uint32(content[1].(int)) + case uint32: + Self.ReadLength = content[1].(uint32) + } } return } @@ -202,7 +201,11 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { err = Self.BasePackager.Pack(writer) WindowBuff.Put(Self.Content) case MUX_MSG_SEND_OK: - err = binary.Write(writer, binary.LittleEndian, Self.RemainLength) + err = binary.Write(writer, binary.LittleEndian, Self.Window) + if err != nil { + return + } + err = binary.Write(writer, binary.LittleEndian, Self.ReadLength) } return } @@ -223,7 +226,12 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) { n, err = Self.BasePackager.UnPack(reader) //logs.Warn("unpack", Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: - err = binary.Read(reader, binary.LittleEndian, &Self.RemainLength) + err = binary.Read(reader, binary.LittleEndian, &Self.Window) + if err != nil { + return + } + n += 4 // uint32 + err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength) n += 4 // uint32 } n += 5 //uint8 int32 @@ -265,10 +273,10 @@ func (addr *Addr) Decode(b []byte) error { pos := 1 switch addr.Type { case ipV4: - addr.Host = net.IP(b[pos : pos+net.IPv4len]).String() + addr.Host = net.IP(b[pos:pos+net.IPv4len]).String() pos += net.IPv4len case ipV6: - addr.Host = net.IP(b[pos : pos+net.IPv6len]).String() + addr.Host = net.IP(b[pos:pos+net.IPv6len]).String() pos += net.IPv6len case domainName: addrlen := int(b[pos]) diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 96bb1a9..28ec1de 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -265,7 +265,7 @@ start: Self.bufQueue.Push(element) // status check finish, now we can push the element into the queue if wait == 0 { - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, newRemaining) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining) // send the remaining window size, not including zero size } return nil @@ -333,7 +333,7 @@ func (Self *ReceiveWindow) sendStatus(id int32, l uint16) { // now we get the current window status success if wait == 1 { //logs.Warn("send the wait status", remaining) - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, remaining) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining) } return } @@ -394,7 +394,7 @@ func (Self *SendWindow) SetSendBuf(buf []byte) { Self.off = 0 } -func (Self *SendWindow) SetSize(newRemaining uint32) (closed bool) { +func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) { // set the window size from receive window defer func() { if recover() != nil { diff --git a/lib/mux/mux.go b/lib/mux/mux.go index eb75182..02b017a 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -92,13 +92,13 @@ func (s *Mux) Addr() net.Addr { return s.conn.LocalAddr() } -func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { +func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { if s.IsClose { return } var err error pack := common.MuxPack.Get() - err = pack.NewPac(flag, id, data) + err = pack.NewPac(flag, id, data...) if err != nil { common.MuxPack.Put(pack) logs.Error("mux: new pack err", err) @@ -173,7 +173,7 @@ func (s *Mux) ping() { s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) // send the ping flag and get the latency first ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() + defer ticker.Stop() for { if s.IsClose { break @@ -198,7 +198,7 @@ func (s *Mux) ping() { } atomic.AddUint32(&s.pingOk, 1) } - return + return }() } @@ -297,7 +297,7 @@ func (s *Mux) readSession() { if connection.isClose { continue } - connection.sendWindow.SetSize(pack.RemainLength) + connection.sendWindow.SetSize(pack.Window, pack.ReadLength) continue case common.MUX_CONN_CLOSE: //close the connection connection.closeFlag = true diff --git a/lib/version/version.go b/lib/version/version.go index 5206a72..5c80ede 100644 --- a/lib/version/version.go +++ b/lib/version/version.go @@ -4,5 +4,5 @@ const VERSION = "0.25.2" // Compulsory minimum version, Minimum downward compatibility to this version func GetVersion() string { - return "0.25.2" + return "0.25.0" }