migrate mux to nps-mux

This commit is contained in:
ffdfgdfg
2020-01-09 22:59:24 +08:00
parent 211f15882a
commit 5fedde1475
24 changed files with 62 additions and 3045 deletions

View File

@@ -36,19 +36,3 @@ WWW-Authenticate: Basic realm="easyProxy"
`
)
const (
MUX_PING_FLAG uint8 = iota
MUX_NEW_CONN_OK
MUX_NEW_CONN_Fail
MUX_NEW_MSG
MUX_NEW_MSG_PART
MUX_MSG_SEND_OK
MUX_NEW_CONN
MUX_CONN_CLOSE
MUX_PING_RETURN
MUX_PING int32 = -1
MAXIMUM_SEGMENT_SIZE = PoolSizeWindow
MAXIMUM_WINDOW_SIZE = 1 << 27 // 1<<31-1 TCP slide window size is very large,
// we use 128M, reduce memory usage
)

View File

@@ -3,13 +3,11 @@ package common
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"io"
"io/ioutil"
"net"
"strconv"
"strings"
)
type NetPackager interface {
@@ -17,222 +15,6 @@ type NetPackager interface {
UnPack(reader io.Reader) (err error)
}
type BasePackager struct {
Length uint16
Content []byte
}
func (Self *BasePackager) NewPac(contents ...interface{}) (err error) {
Self.clean()
for _, content := range contents {
switch content.(type) {
case nil:
Self.Content = Self.Content[:0]
case []byte:
err = Self.appendByte(content.([]byte))
case string:
err = Self.appendByte([]byte(content.(string)))
if err != nil {
return
}
err = Self.appendByte([]byte(CONN_DATA_SEQ))
default:
err = Self.marshal(content)
}
}
Self.setLength()
if Self.Length > MAXIMUM_SEGMENT_SIZE {
err = errors.New("mux:packer: newpack content segment too large")
}
return
}
func (Self *BasePackager) appendByte(data []byte) (err error) {
m := len(Self.Content)
n := m + len(data)
if n <= cap(Self.Content) {
Self.Content = Self.Content[0:n] // grow the length for copy
copy(Self.Content[m:n], data)
return nil
} else {
return errors.New("pack content too large")
}
}
//似乎这里涉及到父类作用域问题当子类调用父类的方法时其struct仅仅为父类的
func (Self *BasePackager) Pack(writer io.Writer) (err error) {
err = binary.Write(writer, binary.LittleEndian, Self.Length)
if err != nil {
return
}
err = binary.Write(writer, binary.LittleEndian, Self.Content)
return
}
//Unpack 会导致传入的数字类型转化成float64
//主要原因是json unmarshal并未传入正确的数据类型
func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) {
Self.clean()
n += 2 // uint16
err = binary.Read(reader, binary.LittleEndian, &Self.Length)
if err != nil {
return
}
if int(Self.Length) > cap(Self.Content) {
err = errors.New("unpack err, content length too large")
return
}
if Self.Length > MAXIMUM_SEGMENT_SIZE {
err = errors.New("mux:packer: unpack content segment too large")
return
}
Self.Content = Self.Content[:int(Self.Length)]
//n, err := io.ReadFull(reader, Self.Content)
//if n != int(Self.Length) {
// err = io.ErrUnexpectedEOF
//}
err = binary.Read(reader, binary.LittleEndian, Self.Content)
n += Self.Length
return
}
func (Self *BasePackager) marshal(content interface{}) (err error) {
tmp, err := json.Marshal(content)
if err != nil {
return err
}
err = Self.appendByte(tmp)
return
}
func (Self *BasePackager) Unmarshal(content interface{}) (err error) {
err = json.Unmarshal(Self.Content, content)
if err != nil {
return err
}
return
}
func (Self *BasePackager) setLength() {
Self.Length = uint16(len(Self.Content))
return
}
func (Self *BasePackager) clean() {
Self.Length = 0
Self.Content = Self.Content[:0] // reset length
}
func (Self *BasePackager) Split() (strList []string) {
n := bytes.IndexByte(Self.Content, 0)
strList = strings.Split(string(Self.Content[:n]), CONN_DATA_SEQ)
strList = strList[0 : len(strList)-1]
return
}
type ConnPackager struct {
// Todo
ConnType uint8
BasePackager
}
func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) {
Self.ConnType = connType
err = Self.BasePackager.NewPac(content...)
return
}
func (Self *ConnPackager) Pack(writer io.Writer) (err error) {
err = binary.Write(writer, binary.LittleEndian, Self.ConnType)
if err != nil {
return
}
err = Self.BasePackager.Pack(writer)
return
}
func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) {
err = binary.Read(reader, binary.LittleEndian, &Self.ConnType)
if err != nil && err != io.EOF {
return
}
n, err = Self.BasePackager.UnPack(reader)
n += 2
return
}
type MuxPackager struct {
Flag uint8
Id int32
Window uint64
BasePackager
}
func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) {
Self.Flag = flag
Self.Id = id
switch flag {
case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART:
Self.Content = WindowBuff.Get()
err = Self.BasePackager.NewPac(content...)
//logs.Warn(Self.Length, string(Self.Content))
case MUX_MSG_SEND_OK:
// MUX_MSG_SEND_OK contains one data
Self.Window = content[0].(uint64)
}
return
}
func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
err = binary.Write(writer, binary.LittleEndian, Self.Flag)
if err != nil {
return
}
err = binary.Write(writer, binary.LittleEndian, Self.Id)
if err != nil {
return
}
switch Self.Flag {
case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
err = Self.BasePackager.Pack(writer)
WindowBuff.Put(Self.Content)
case MUX_MSG_SEND_OK:
err = binary.Write(writer, binary.LittleEndian, Self.Window)
}
return
}
func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) {
err = binary.Read(reader, binary.LittleEndian, &Self.Flag)
if err != nil {
return
}
err = binary.Read(reader, binary.LittleEndian, &Self.Id)
if err != nil {
return
}
switch Self.Flag {
case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
Self.Content = WindowBuff.Get() // need get a window buf from pool
Self.BasePackager.clean() // also clean the content
n, err = Self.BasePackager.UnPack(reader)
//logs.Warn("unpack", Self.Length, string(Self.Content))
case MUX_MSG_SEND_OK:
err = binary.Read(reader, binary.LittleEndian, &Self.Window)
n += 8 // uint64
}
n += 5 //uint8 int32
return
}
func (Self *MuxPackager) reset() {
Self.Id = 0
Self.Flag = 0
Self.Length = 0
Self.Content = nil
Self.Window = 0
}
const (
ipV4 = 1
domainName = 3

View File

@@ -1,7 +1,6 @@
package common
import (
"bytes"
"sync"
)
@@ -9,8 +8,6 @@ const PoolSize = 64 * 1024
const PoolSizeSmall = 100
const PoolSizeUdp = 1472 + 200
const PoolSizeCopy = 32 << 10
const PoolSizeBuffer = 4096
const PoolSizeWindow = PoolSizeBuffer - 2 - 4 - 4 - 1
var BufPool = sync.Pool{
New: func() interface{} {
@@ -86,115 +83,11 @@ func (Self *copyBufferPool) Put(x []byte) {
}
}
type windowBufferPool struct {
pool sync.Pool
}
func (Self *windowBufferPool) New() {
Self.pool = sync.Pool{
New: func() interface{} {
return make([]byte, PoolSizeWindow)
},
}
}
func (Self *windowBufferPool) Get() (buf []byte) {
buf = Self.pool.Get().([]byte)
buf = buf[:PoolSizeWindow]
return buf
}
func (Self *windowBufferPool) Put(x []byte) {
x = x[:0] // clean buf
Self.pool.Put(x)
}
type bufferPool struct {
pool sync.Pool
}
func (Self *bufferPool) New() {
Self.pool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, PoolSizeBuffer))
},
}
}
func (Self *bufferPool) Get() *bytes.Buffer {
return Self.pool.Get().(*bytes.Buffer)
}
func (Self *bufferPool) Put(x *bytes.Buffer) {
x.Reset()
Self.pool.Put(x)
}
type muxPackagerPool struct {
pool sync.Pool
}
func (Self *muxPackagerPool) New() {
Self.pool = sync.Pool{
New: func() interface{} {
pack := MuxPackager{}
return &pack
},
}
}
func (Self *muxPackagerPool) Get() *MuxPackager {
return Self.pool.Get().(*MuxPackager)
}
func (Self *muxPackagerPool) Put(pack *MuxPackager) {
pack.reset()
Self.pool.Put(pack)
}
type ListElement struct {
Buf []byte
L uint16
Part bool
}
type listElementPool struct {
pool sync.Pool
}
func (Self *listElementPool) New() {
Self.pool = sync.Pool{
New: func() interface{} {
element := ListElement{}
return &element
},
}
}
func (Self *listElementPool) Get() *ListElement {
return Self.pool.Get().(*ListElement)
}
func (Self *listElementPool) Put(element *ListElement) {
element.L = 0
element.Buf = nil
element.Part = false
Self.pool.Put(element)
}
var once = sync.Once{}
var BuffPool = bufferPool{}
var CopyBuff = copyBufferPool{}
var MuxPack = muxPackagerPool{}
var WindowBuff = windowBufferPool{}
var ListElementPool = listElementPool{}
func newPool() {
BuffPool.New()
CopyBuff.New()
MuxPack.New()
WindowBuff.New()
ListElementPool.New()
}
func init() {