diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..041b570 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,38 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '' +labels: bug +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +**To Reproduce** +Steps to reproduce the behavior: +1. Opening '...' +2. Click on '....' +3. See error + +**Expected behavior** +A clear and concise description of what you expected to happen. + +**Screenshots or logs** +Add screenshots or logs to help explain your problem. + +**Server (please complete the following information):** + - OS: [e.g. Centos, Windows] + - ARCH: [e.g. Amd64, Arm] + - Tunnel [e.g. TCP, HTTP] + - Version [e.g. 0.24.0] + +**Client (please complete the following information):** + - OS: [e.g. Centos, Windows] + - ARCH: [e.g. Amd64, Arm] + - Tunnel [e.g. TCP, HTTP] + - Version [e.g. 0.24.0] + +**Additional context** +Add any other context about the problem here. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..11fc491 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,20 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: '' +labels: enhancement +assignees: '' + +--- + +**Is your feature request related to a problem? Please describe.** +A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] + +**Describe the solution you'd like** +A clear and concise description of what you want to happen. + +**Describe alternatives you've considered** +A clear and concise description of any alternative solutions or features you've considered. + +**Additional context** +Add any other context or screenshots about the feature request here. diff --git a/Dockerfile.npc b/Dockerfile.npc new file mode 100755 index 0000000..ae6715a --- /dev/null +++ b/Dockerfile.npc @@ -0,0 +1,10 @@ +FROM golang as builder +WORKDIR /go/src/github.com/cnlh/nps +COPY . . +RUN go get -d -v ./... +RUN CGO_ENABLED=0 go build -ldflags="-w -s -extldflags -static" ./cmd/npc/npc.go + +FROM scratch +COPY --from=builder /go/src/github.com/cnlh/nps/npc / +VOLUME /conf +ENTRYPOINT ["/npc"] diff --git a/Dockerfile.nps b/Dockerfile.nps new file mode 100755 index 0000000..698ced9 --- /dev/null +++ b/Dockerfile.nps @@ -0,0 +1,11 @@ +FROM golang as builder +WORKDIR /go/src/github.com/cnlh/nps +COPY . . +RUN go get -d -v ./... +RUN CGO_ENABLED=0 go build -ldflags="-w -s -extldflags -static" ./cmd/nps/nps.go + +FROM scratch +COPY --from=builder /go/src/github.com/cnlh/nps/nps / +COPY --from=builder /go/src/github.com/cnlh/nps/web /web +VOLUME /conf +CMD ["/nps"] diff --git a/README.md b/README.md index 1dd7f77..0f78bdb 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 * [安装](#安装) * [编译安装](#源码安装) * [release安装](#release安装) + * [docker安装](#docker安装) * [使用示例(以web主控模式为主)](#使用示例) * [统一准备工作](#统一准备工作(必做)) * [http|https域名解析](#域名解析) @@ -111,6 +112,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 * [获取用户真实ip](#获取用户真实ip) * [客户端地址显示](#客户端地址显示) * [客户端与服务端版本对比](#客户端与服务端版本对比) + * [Linux系统限制](#Linux系统限制) * [webAPI](#webAPI) * [贡献](#贡献) * [支持nps发展](#捐赠) @@ -120,7 +122,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 ## 安装 -### releases安装 +### release安装 > [releases](https://github.com/cnlh/nps/releases) 下载对应的系统版本即可,服务端和客户端是单独的 @@ -133,6 +135,10 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 > go build cmd/npc/npc.go +### docker安装 +> [server](https://hub.docker.com/r/ffdfgdfg/nps) +> [client](https://hub.docker.com/r/ffdfgdfg/npc) + ## 使用示例 ### 统一准备工作(必做) @@ -144,6 +150,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 ```shell ./npc -server=1.1.1.1:8284 -vkey=客户端的密钥 ``` +**注意:运行服务端后,请确保能从客户端设备上正常访问配置文件中所配置的`bridge_port`端口,telnet,netcat这类的来检查** ### 域名解析 @@ -197,6 +204,9 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 - 在刚才创建的客户端隧道管理中添加一条socks5代理,填写监听的端口(8003),保存。 - 在外网环境的本机配置socks5代理(例如使用proxifier进行全局代理),ip为公网服务器ip(1.1.1.1),端口为填写的监听端口(8003),即可畅享内网了 +**注意** +经过socks5代理,当收到socks5数据包时socket已经是accept状态。表现是扫描端口全open,建立连接后短时间关闭。若想同内网表现一致,建议远程连接一台设备。 + ### http正向代理 **适用范围:** 在外网环境下使用http正向代理访问内网站点 @@ -375,7 +385,13 @@ server { ``` (./nps|nps.exe) install ``` -安装成功后,对于linux,darwin,将会把配置文件和静态文件放置于/etc/nps/,并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps,安装成功后可在任何位置执行 +安装成功后,对于linux,darwin,将会把配置文件和静态文件放置于/etc/nps/,并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps,安装成功后可在任何位置执行,同时也会添加systemd配置。 + +``` +sudo systemctl enable|disable|start|stop|restart|status nps +``` +systemd,带有开机自启,自动重启配置,当进程结束后15秒会启动,日志输出至/var/log/nps/nps.log。 +建议采用此方式启动,能够捕获panic信息,便于排查问题。 ``` nps test|start|stop|restart|status @@ -432,6 +448,27 @@ server_ip=xxx ``` ./npc -config=npc配置文件路径 ``` +可自行添加systemd service,例如:`npc.service` +``` +[Unit] +Description=npc - convenient proxy server client +Documentation=https://github.com/cnlh/nps/ +After=network-online.target remote-fs.target nss-lookup.target +Wants=network-online.target + +[Service] +Type=simple +KillMode=process +Restart=always +RestartSec=15s +StandardOutput=append:/var/log/nps/npc.log +ExecStartPre=/bin/echo 'Starting npc' +ExecStopPost=/bin/echo 'Stopping npc' +ExecStart=/absolutely path to/npc -server=ip:port -vkey=web界面中显示的密钥 + +[Install] +WantedBy=multi-user.target +``` #### 配置文件说明 [示例配置文件](https://github.com/cnlh/nps/tree/master/conf/npc.conf) ##### 全局配置 @@ -538,11 +575,13 @@ vkey=123 [socks5] mode=socks5 server_port=9004 +multi_account=multi_account.conf ``` 项 | 含义 ---|--- mode | socks5 server_port | 在服务端的代理端口 +multi_account | socks5多账号配置文件(可选),配置后使用basic_username和basic_password无法通过认证 ##### 私密代理模式 ```ini @@ -792,6 +831,19 @@ nps支持对客户端的隧道数量进行限制,该功能默认是关闭的 nps主要通信默认基于多路复用,无需开启。 +多路复用基于TCP滑动窗口原理设计,动态计算延迟以及带宽来算出应该往网络管道中打入的流量。 +由于主要通信大多采用TCP协议,并无法探测其实时丢包情况,对于产生丢包重传的情况,采用较大的宽容度, +5分钟的等待时间,超时将会关闭当前隧道连接并重新建立,这将会抛弃当前所有的连接。 +在Linux上,可以通过调节内核参数来适应不同应用场景。 + +对于需求大带宽又有一定的丢包的场景,可以保持默认参数不变,尽可能少抛弃连接 +高并发下可根据[Linux系统限制](#Linux系统限制) 调整 + +对于延迟敏感而又有一定丢包的场景,可以适当调整TCP重传次数 +`tcp_syn_retries`, `tcp_retries1`, `tcp_retries2` +高并发同上 +nps会在系统主动关闭连接的时候拿到报错,进而重新建立隧道连接 + ### 环境变量渲染 npc支持环境变量渲染以适应在某些特殊场景下的要求。 @@ -900,6 +952,11 @@ LevelInformational->6 LevelDebug->7 ### 客户端与服务端版本对比 为了程序正常运行,客户端与服务端的核心版本必须一致,否则将导致客户端无法成功连接致服务端。 +### Linux系统限制 +默认情况下linux对连接数量有限制,对于性能好的机器完全可以调整内核参数以处理更多的连接。 +`tcp_max_syn_backlog` `somaxconn` +酌情调整参数,增强网络性能 + ## webAPI ### webAPI验证说明 diff --git a/bridge/bridge.go b/bridge/bridge.go index d61147c..facd916 100755 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -312,7 +312,7 @@ func (s *Bridge) GetConnByClientId(clientId int) (target net.Conn, err error) { func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, t *file.Tunnel) (target net.Conn, err error) { //if the proxy type is local if link.LocalProxy { - target, err = net.Dial(link.ConnType, link.Host) + target, err = net.Dial("tcp", link.Host) return } if v, ok := s.Client.Load(clientId); ok { @@ -504,6 +504,7 @@ loop: tl.Password = t.Password tl.LocalPath = t.LocalPath tl.StripPre = t.StripPre + tl.MultiAccount = t.MultiAccount if !client.HasTunnel(tl) { if err := file.GetDb().NewTask(tl); err != nil { logs.Notice("Add task error ", err.Error()) diff --git a/client/client.go b/client/client.go index 52da907..5bd0b01 100755 --- a/client/client.go +++ b/client/client.go @@ -49,6 +49,11 @@ retry: time.Sleep(time.Second * 5) goto retry } + if c == nil { + logs.Error("Error data from server, and will be reconnected in five seconds") + time.Sleep(time.Second * 5) + goto retry + } logs.Info("Successful connection with server %s", s.svrAddr) //monitor the connection go s.ping() diff --git a/client/control.go b/client/control.go index 5673f14..3260113 100644 --- a/client/control.go +++ b/client/control.go @@ -223,8 +223,13 @@ func NewConn(tp string, vkey string, server string, connType string, proxyUrl st if _, err := c.Write([]byte(crypt.Md5(version.GetVersion()))); err != nil { return nil, err } - if b, err := c.GetShortContent(32); err != nil || crypt.Md5(version.GetVersion()) != string(b) { - logs.Error("The client does not match the server version. The current version of the client is", version.GetVersion()) + b, err := c.GetShortContent(32) + if err != nil { + logs.Error(err) + return nil, err + } + if crypt.Md5(version.GetVersion()) != string(b) { + logs.Error("The client does not match the server version. The current core version of the client is", version.GetVersion()) return nil, err } if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil { diff --git a/client/health.go b/client/health.go index 9e0760f..e804cf9 100644 --- a/client/health.go +++ b/client/health.go @@ -71,7 +71,11 @@ func check(t *file.Health) { var rs *http.Response for _, v := range arr { if t.HealthCheckType == "tcp" { - _, err = net.DialTimeout("tcp", v, time.Duration(t.HealthCheckTimeout)*time.Second) + var c net.Conn + c, err = net.DialTimeout("tcp", v, time.Duration(t.HealthCheckTimeout)*time.Second) + if err == nil { + c.Close() + } } else { client := &http.Client{} client.Timeout = time.Duration(t.HealthCheckTimeout) * time.Second diff --git a/cmd/nps/nps.go b/cmd/nps/nps.go index f66fe66..22835a2 100644 --- a/cmd/nps/nps.go +++ b/cmd/nps/nps.go @@ -61,7 +61,7 @@ func main() { logs.Error("Getting bridge_port error", err) os.Exit(0) } - logs.Info("the version of server is %s ,allow client version to be %s", version.VERSION, version.GetVersion()) + logs.Info("the version of server is %s ,allow client core version to be %s", version.VERSION, version.GetVersion()) connection.InitConnectionService() crypt.InitTls(filepath.Join(common.GetRunPath(), "conf", "server.pem"), filepath.Join(common.GetRunPath(), "conf", "server.key")) tool.InitAllowPort() diff --git a/conf/multi_account.conf b/conf/multi_account.conf new file mode 100644 index 0000000..e3cd792 --- /dev/null +++ b/conf/multi_account.conf @@ -0,0 +1,2 @@ +# key -> user | value -> pwd +npc=npc.pwd \ No newline at end of file diff --git a/conf/npc.conf b/conf/npc.conf index d4a31a8..b3dccdb 100644 --- a/conf/npc.conf +++ b/conf/npc.conf @@ -40,6 +40,7 @@ server_port=10000 [socks5] mode=socks5 server_port=19009 +multi_account=multi_account.conf [file] mode=file diff --git a/go.mod b/go.mod index 8a19eaf..a540dae 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/klauspost/cpuid v1.2.1 // indirect github.com/klauspost/reedsolomon v1.9.2 // indirect github.com/onsi/gomega v1.5.0 // indirect + github.com/panjf2000/ants/v2 v2.2.2 github.com/pkg/errors v0.8.0 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect github.com/shirou/gopsutil v2.18.12+incompatible diff --git a/go.sum b/go.sum index f3a17f4..8a74eb6 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/panjf2000/ants/v2 v2.2.2 h1:TWzusBjq/IflXhy+/S6u5wmMLCBdJnB9tPIx9Zmhvok= +github.com/panjf2000/ants/v2 v2.2.2/go.mod h1:1GFm8bV8nyCQvU5K4WvBCTG1/YBFOD2VzjffD8fV55A= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -87,4 +89,4 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= -gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= \ No newline at end of file diff --git a/lib/common/const.go b/lib/common/const.go index d77f16b..2fd5bb6 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -42,9 +42,13 @@ const ( MUX_NEW_CONN_OK MUX_NEW_CONN_Fail MUX_NEW_MSG + MUX_NEW_MSG_PART MUX_MSG_SEND_OK MUX_NEW_CONN MUX_CONN_CLOSE MUX_PING_RETURN - MUX_PING int32 = -1 + MUX_PING int32 = -1 + MAXIMUM_SEGMENT_SIZE = PoolSizeWindow + MAXIMUM_WINDOW_SIZE = 1 << 25 // 1<<31-1 TCP slide window size is very large, + // we use 32M, reduce memory usage ) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 1129940..91eeb98 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -15,7 +15,7 @@ type NetPackager interface { } type BasePackager struct { - Length uint32 + Length uint16 Content []byte } @@ -65,8 +65,9 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) { //Unpack 会导致传入的数字类型转化成float64!! //主要原因是json unmarshal并未传入正确的数据类型 -func (Self *BasePackager) UnPack(reader io.Reader) (err error) { +func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) { Self.clean() + n += 2 // uint16 err = binary.Read(reader, binary.LittleEndian, &Self.Length) if err != nil { return @@ -80,6 +81,7 @@ func (Self *BasePackager) UnPack(reader io.Reader) (err error) { // err = io.ErrUnexpectedEOF //} err = binary.Read(reader, binary.LittleEndian, Self.Content) + n += Self.Length return } @@ -101,7 +103,7 @@ func (Self *BasePackager) Unmarshal(content interface{}) (err error) { } func (Self *BasePackager) setLength() { - Self.Length = uint32(len(Self.Content)) + Self.Length = uint16(len(Self.Content)) return } @@ -137,35 +139,45 @@ func (Self *ConnPackager) Pack(writer io.Writer) (err error) { return } -func (Self *ConnPackager) UnPack(reader io.Reader) (err error) { +func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) if err != nil && err != io.EOF { return } - err = Self.BasePackager.UnPack(reader) + n, err = Self.BasePackager.UnPack(reader) + n += 2 return } type MuxPackager struct { - Flag uint8 - Id int32 - Window uint16 + Flag uint8 + Id int32 + Window uint32 + ReadLength uint32 BasePackager } func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) { Self.Flag = flag Self.Id = id - if flag == MUX_NEW_MSG { + switch flag { + case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART: + Self.Content = WindowBuff.Get() err = Self.BasePackager.NewPac(content...) - } - if flag == MUX_MSG_SEND_OK { - // MUX_MSG_SEND_OK only allows one data + //logs.Warn(Self.Length, string(Self.Content)) + case MUX_MSG_SEND_OK: + // MUX_MSG_SEND_OK contains two data switch content[0].(type) { case int: - Self.Window = uint16(content[0].(int)) - case uint16: - Self.Window = content[0].(uint16) + Self.Window = uint32(content[0].(int)) + case uint32: + Self.Window = content[0].(uint32) + } + switch content[1].(type) { + case int: + Self.ReadLength = uint32(content[1].(int)) + case uint32: + Self.ReadLength = content[1].(uint32) } } return @@ -180,17 +192,21 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { if err != nil { return } - if Self.Flag == MUX_NEW_MSG { + switch Self.Flag { + case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: err = Self.BasePackager.Pack(writer) - } - if Self.Flag == MUX_MSG_SEND_OK { + WindowBuff.Put(Self.Content) + case MUX_MSG_SEND_OK: err = binary.Write(writer, binary.LittleEndian, Self.Window) + if err != nil { + return + } + err = binary.Write(writer, binary.LittleEndian, Self.ReadLength) } return } -func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { - Self.BasePackager.clean() // also clean the content +func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) { err = binary.Read(reader, binary.LittleEndian, &Self.Flag) if err != nil { return @@ -199,11 +215,21 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { if err != nil { return } - if Self.Flag == MUX_NEW_MSG { - err = Self.BasePackager.UnPack(reader) - } - if Self.Flag == MUX_MSG_SEND_OK { + switch Self.Flag { + case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: + Self.Content = WindowBuff.Get() // need get a window buf from pool + Self.BasePackager.clean() // also clean the content + n, err = Self.BasePackager.UnPack(reader) + //logs.Warn("unpack", Self.Length, string(Self.Content)) + case MUX_MSG_SEND_OK: err = binary.Read(reader, binary.LittleEndian, &Self.Window) + if err != nil { + return + } + n += 4 // uint32 + err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength) + n += 4 // uint32 } + n += 5 //uint8 int32 return } diff --git a/lib/common/pool.go b/lib/common/pool.go index 98da5d3..1f7a47e 100644 --- a/lib/common/pool.go +++ b/lib/common/pool.go @@ -9,7 +9,8 @@ const PoolSize = 64 * 1024 const PoolSizeSmall = 100 const PoolSizeUdp = 1472 const PoolSizeCopy = 32 << 10 -const PoolSizeWindow = 1<<16 - 1 +const PoolSizeBuffer = 4096 +const PoolSizeWindow = PoolSizeBuffer - 2 - 4 - 4 - 1 var BufPool = sync.Pool{ New: func() interface{} { @@ -92,22 +93,18 @@ type windowBufferPool struct { func (Self *windowBufferPool) New() { Self.pool = sync.Pool{ New: func() interface{} { - return make([]byte, 0, PoolSizeWindow) + return make([]byte, PoolSizeWindow, PoolSizeWindow) }, } } func (Self *windowBufferPool) Get() (buf []byte) { buf = Self.pool.Get().([]byte) - return buf[:0] + return buf[:PoolSizeWindow] } func (Self *windowBufferPool) Put(x []byte) { - if cap(x) == PoolSizeWindow { - Self.pool.Put(x[:PoolSizeWindow]) // make buf to full - } else { - x = nil - } + Self.pool.Put(x[:PoolSizeWindow]) // make buf to full } type bufferPool struct { @@ -117,7 +114,7 @@ type bufferPool struct { func (Self *bufferPool) New() { Self.pool = sync.Pool{ New: func() interface{} { - return new(bytes.Buffer) + return bytes.NewBuffer(make([]byte, 0, PoolSizeBuffer)) }, } } @@ -145,28 +142,56 @@ func (Self *muxPackagerPool) New() { } func (Self *muxPackagerPool) Get() *MuxPackager { - pack := Self.pool.Get().(*MuxPackager) - buf := CopyBuff.Get() - pack.Content = buf - return pack + return Self.pool.Get().(*MuxPackager) } func (Self *muxPackagerPool) Put(pack *MuxPackager) { - CopyBuff.Put(pack.Content) Self.pool.Put(pack) } +type ListElement struct { + Buf []byte + L uint16 + Part bool +} + +type listElementPool struct { + pool sync.Pool +} + +func (Self *listElementPool) New() { + Self.pool = sync.Pool{ + New: func() interface{} { + element := ListElement{} + return &element + }, + } +} + +func (Self *listElementPool) Get() *ListElement { + return Self.pool.Get().(*ListElement) +} + +func (Self *listElementPool) Put(element *ListElement) { + element.L = 0 + element.Buf = nil + element.Part = false + Self.pool.Put(element) +} + var once = sync.Once{} var BuffPool = bufferPool{} var CopyBuff = copyBufferPool{} var MuxPack = muxPackagerPool{} var WindowBuff = windowBufferPool{} +var ListElementPool = listElementPool{} func newPool() { BuffPool.New() CopyBuff.New() MuxPack.New() WindowBuff.New() + ListElementPool.New() } func init() { diff --git a/lib/common/util.go b/lib/common/util.go index dc9afbe..9a60846 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -108,6 +108,9 @@ func ChangeHostAndHeader(r *http.Request, host string, header string, addr strin } } addr = strings.Split(addr, ":")[0] + if prior, ok := r.Header["X-Forwarded-For"]; ok { + addr = strings.Join(prior, ", ") + ", " + addr + } r.Header.Set("X-Forwarded-For", addr) r.Header.Set("X-Real-IP", addr) } @@ -263,11 +266,14 @@ func GetPortByAddr(addr string) int { return p } -func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { +func CopyBuffer(dst io.Writer, src io.Reader, label ...string) (written int64, err error) { buf := CopyBuff.Get() defer CopyBuff.Put(buf) for { nr, er := src.Read(buf) + //if len(pr)>0 && pr[0] && nr > 50 { + // logs.Warn(string(buf[:50])) + //} if nr > 0 { nw, ew := dst.Write(buf[0:nr]) if nw > 0 { diff --git a/lib/config/config.go b/lib/config/config.go index c35afb7..89a6bfd 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -239,12 +239,37 @@ func dealTunnel(s string) *file.Tunnel { t.LocalPath = item[1] case "strip_pre": t.StripPre = item[1] + case "multi_account": + t.MultiAccount = &file.MultiAccount{} + if b, err := common.ReadAllFromFile(item[1]); err != nil { + panic(err) + } else { + if content, err := common.ParseStr(string(b)); err != nil { + panic(err) + } else { + t.MultiAccount.AccountMap = dealMultiUser(content) + } + } } } return t } +func dealMultiUser(s string) map[string]string { + multiUserMap := make(map[string]string) + for _, v := range splitStr(s) { + item := strings.Split(v, "=") + if len(item) == 0 { + continue + } else if len(item) == 1 { + item = append(item, "") + } + multiUserMap[strings.TrimSpace(item[0])] = item[1] + } + return multiUserMap +} + func delLocalService(s string) *LocalServer { l := new(LocalServer) for _, v := range splitStr(s) { diff --git a/lib/conn/conn.go b/lib/conn/conn.go index 7946c0d..9f0c397 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -6,13 +6,14 @@ import ( "encoding/binary" "encoding/json" "errors" + "github.com/astaxie/beego/logs" + "github.com/cnlh/nps/lib/goroutine" "io" "net" "net/http" "net/url" "strconv" "strings" - "sync" "time" "github.com/cnlh/nps/lib/common" @@ -350,25 +351,29 @@ func SetUdpSession(sess *kcp.UDPSession) { //conn1 mux conn func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) { - var in, out int64 - var wg sync.WaitGroup + //var in, out int64 + //var wg sync.WaitGroup connHandle := GetConn(conn1, crypt, snappy, rate, isServer) if rb != nil { connHandle.Write(rb) } - go func(in *int64) { - wg.Add(1) - *in, _ = common.CopyBuffer(connHandle, conn2) - connHandle.Close() - conn2.Close() - wg.Done() - }(&in) - out, _ = common.CopyBuffer(conn2, connHandle) - connHandle.Close() - conn2.Close() - wg.Wait() - if flow != nil { - flow.Add(in, out) + //go func(in *int64) { + // wg.Add(1) + // *in, _ = common.CopyBuffer(connHandle, conn2) + // connHandle.Close() + // conn2.Close() + // wg.Done() + //}(&in) + //out, _ = common.CopyBuffer(conn2, connHandle) + //connHandle.Close() + //conn2.Close() + //wg.Wait() + //if flow != nil { + // flow.Add(in, out) + //} + err := goroutine.CopyConnsPool.Invoke(goroutine.NewConns(connHandle, conn2, flow)) + if err != nil { + logs.Error(err) } } diff --git a/lib/conn/listener.go b/lib/conn/listener.go index f80e01d..bd8e443 100644 --- a/lib/conn/listener.go +++ b/lib/conn/listener.go @@ -43,9 +43,16 @@ func Accept(l net.Listener, f func(c net.Conn)) { if strings.Contains(err.Error(), "use of closed network connection") { break } + if strings.Contains(err.Error(), "the mux has closed") { + break + } logs.Warn(err) continue } + if c == nil { + logs.Warn("nil connection") + break + } go f(c) } } diff --git a/lib/file/obj.go b/lib/file/obj.go index d3a1fbe..15dea37 100644 --- a/lib/file/obj.go +++ b/lib/file/obj.go @@ -124,22 +124,23 @@ func (s *Client) HasHost(h *Host) bool { } type Tunnel struct { - Id int - Port int - ServerIp string - Mode string - Status bool - RunStatus bool - Client *Client - Ports string - Flow *Flow - Password string - Remark string - TargetAddr string - NoStore bool - LocalPath string - StripPre string - Target *Target + Id int + Port int + ServerIp string + Mode string + Status bool + RunStatus bool + Client *Client + Ports string + Flow *Flow + Password string + Remark string + TargetAddr string + NoStore bool + LocalPath string + StripPre string + Target *Target + MultiAccount *MultiAccount Health sync.RWMutex } @@ -184,6 +185,10 @@ type Target struct { sync.RWMutex } +type MultiAccount struct { + AccountMap map[string]string // multi account and pwd +} + func (s *Target) GetRandomTarget() (string, error) { if s.TargetArr == nil { s.TargetArr = strings.Split(s.TargetStr, "\n") diff --git a/lib/goroutine/pool.go b/lib/goroutine/pool.go new file mode 100644 index 0000000..287c711 --- /dev/null +++ b/lib/goroutine/pool.go @@ -0,0 +1,73 @@ +package goroutine + +import ( + "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/file" + "github.com/panjf2000/ants/v2" + "io" + "net" + "sync" +) + +type connGroup struct { + src io.ReadWriteCloser + dst io.ReadWriteCloser + wg *sync.WaitGroup + n *int64 +} + +func newConnGroup(dst, src io.ReadWriteCloser, wg *sync.WaitGroup, n *int64) connGroup { + return connGroup{ + src: src, + dst: dst, + wg: wg, + n: n, + } +} + +func copyConnGroup(group interface{}) { + cg, ok := group.(connGroup) + if !ok { + return + } + var err error + *cg.n, err = common.CopyBuffer(cg.dst, cg.src) + if err != nil { + cg.src.Close() + cg.dst.Close() + //logs.Warn("close npc by copy from nps", err, c.connId) + } + cg.wg.Done() +} + +type Conns struct { + conn1 io.ReadWriteCloser // mux connection + conn2 net.Conn // outside connection + flow *file.Flow +} + +func NewConns(c1 io.ReadWriteCloser, c2 net.Conn, flow *file.Flow) Conns { + return Conns{ + conn1: c1, + conn2: c2, + flow: flow, + } +} + +func copyConns(group interface{}) { + conns := group.(Conns) + wg := new(sync.WaitGroup) + wg.Add(2) + var in, out int64 + _ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg, &in)) + // outside to mux : incoming + _ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg, &out)) + // mux to outside : outgoing + wg.Wait() + if conns.flow != nil { + conns.flow.Add(in, out) + } +} + +var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false)) +var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false)) diff --git a/lib/install/install.go b/lib/install/install.go index 56f3cc5..24af9b9 100644 --- a/lib/install/install.go +++ b/lib/install/install.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "os" "path/filepath" @@ -13,6 +14,23 @@ import ( ) func InstallNps() { + unit := `[Unit] +Description=nps - convenient proxy server +Documentation=https://github.com/cnlh/nps/ +After=network-online.target remote-fs.target nss-lookup.target +Wants=network-online.target` + service := `[Service] +Type=simple +KillMode=process +Restart=always +RestartSec=15s +StandardOutput=append:/var/log/nps/nps.log +ExecStartPre=/bin/echo 'Starting nps' +ExecStopPost=/bin/echo 'Stopping nps' +ExecStart=` + install := `[Install] +WantedBy=multi-user.target` + path := common.GetInstallPath() if common.FileExists(path) { log.Fatalf("the path %s has exist, does not support install", path) @@ -35,21 +53,35 @@ func InstallNps() { log.Fatalln(err) } else { os.Chmod("/usr/local/bin/nps", 0755) + service += "/usr/local/bin/nps" log.Println("Executable files have been copied to", "/usr/local/bin/nps") } } else { os.Chmod("/usr/bin/nps", 0755) + service += "/usr/bin/nps" log.Println("Executable files have been copied to", "/usr/bin/nps") } - + systemd := unit + "\n\n" + service + "\n\n" + install + _ = os.Remove("/usr/lib/systemd/system/nps.service") + err := ioutil.WriteFile("/usr/lib/systemd/system/nps.service", []byte(systemd), 0644) + if err != nil { + log.Println("Write systemd service err ", err) + } + _ = os.Mkdir("/var/log/nps", 644) } log.Println("install ok!") log.Println("Static files and configuration files in the current directory will be useless") log.Println("The new configuration file is located in", path, "you can edit them") if !common.IsWindows() { - log.Println("You can start with nps test|start|stop|restart|status anywhere") + log.Println(`You can start with: +sudo systemctl enable|disable|start|stop|restart|status nps +or: +nps test|start|stop|restart|status +anywhere!`) } else { - log.Println("You can copy executable files to any directory and start working with nps.exe test|start|stop|restart|status") + log.Println(`You can copy executable files to any directory and start working with: +nps.exe test|start|stop|restart|status +now!`) } } func MkidrDirAll(path string, v ...string) { diff --git a/lib/mux/conn.go b/lib/mux/conn.go index bf9e0d6..f665248 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -3,8 +3,11 @@ package mux import ( "errors" "io" + "math" "net" + "runtime" "sync" + "sync/atomic" "time" "github.com/cnlh/nps/lib/common" @@ -15,34 +18,36 @@ type conn struct { getStatusCh chan struct{} connStatusOkCh chan struct{} connStatusFailCh chan struct{} - readTimeOut time.Time - writeTimeOut time.Time connId int32 isClose bool closeFlag bool // close conn flag - receiveWindow *window - sendWindow *window - readCh waitingCh - writeCh waitingCh - mux *Mux + receiveWindow *ReceiveWindow + sendWindow *SendWindow once sync.Once + //label string } -func NewConn(connId int32, mux *Mux) *conn { +func NewConn(connId int32, mux *Mux, label ...string) *conn { c := &conn{ getStatusCh: make(chan struct{}), connStatusOkCh: make(chan struct{}), connStatusFailCh: make(chan struct{}), connId: connId, - receiveWindow: new(window), - sendWindow: new(window), - mux: mux, + receiveWindow: new(ReceiveWindow), + sendWindow: new(SendWindow), once: sync.Once{}, } - c.receiveWindow.NewReceive() - c.sendWindow.NewSend() - c.readCh.new() - c.writeCh.new() + //if len(label) > 0 { + // c.label = label[0] + //} + c.receiveWindow.New(mux) + c.sendWindow.New(mux) + //logm := &connLog{ + // startTime: time.Now(), + // isClose: false, + // logs: []string{c.label + "new conn success"}, + //} + //setM(label[0], int(connId), logm) return c } @@ -50,39 +55,28 @@ func (s *conn) Read(buf []byte) (n int, err error) { if s.isClose || buf == nil { return 0, errors.New("the conn has closed") } + if len(buf) == 0 { + return 0, nil + } // waiting for takeout from receive window finish or timeout - go s.readWindow(buf, s.readCh.nCh, s.readCh.errCh) - if t := s.readTimeOut.Sub(time.Now()); t > 0 { - timer := time.NewTimer(t) - defer timer.Stop() - select { - case <-timer.C: - return 0, errors.New("read timeout") - case n = <-s.readCh.nCh: - err = <-s.readCh.errCh - } - } else { - n = <-s.readCh.nCh - err = <-s.readCh.errCh - } + //now := time.Now() + n, err = s.receiveWindow.Read(buf, s.connId) + //t := time.Now().Sub(now) + //if t.Seconds() > 0.5 { + //logs.Warn("conn read long", n, t.Seconds()) + //} + //var errstr string + //if err == nil { + // errstr = "err:nil" + //} else { + // errstr = err.Error() + //} + //d := getM(s.label, int(s.connId)) + //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100])) + //setM(s.label, int(s.connId), d) return } -func (s *conn) readWindow(buf []byte, nCh chan int, errCh chan error) { - n, err := s.receiveWindow.Read(buf) - if s.receiveWindow.WindowFull { - if s.receiveWindow.Size() > 0 { - // window.Read may be invoked before window.Write, and WindowFull flag change to true - // so make sure that receiveWindow is free some space - s.receiveWindow.WindowFull = false - s.mux.sendInfo(common.MUX_MSG_SEND_OK, s.connId, s.receiveWindow.Size()) - // acknowledge other side, have empty some receive window space - } - } - nCh <- n - errCh <- err -} - func (s *conn) Write(buf []byte) (n int, err error) { if s.isClose { return 0, errors.New("the conn has closed") @@ -91,45 +85,18 @@ func (s *conn) Write(buf []byte) (n int, err error) { //s.Close() return 0, errors.New("io: write on closed conn") } - s.sendWindow.SetSendBuf(buf) // set the buf to send window - go s.write(s.writeCh.nCh, s.writeCh.errCh) - // waiting for send to other side or timeout - if t := s.writeTimeOut.Sub(time.Now()); t > 0 { - timer := time.NewTimer(t) - defer timer.Stop() - select { - case <-timer.C: - return 0, errors.New("write timeout") - case n = <-s.writeCh.nCh: - err = <-s.writeCh.errCh - } - } else { - n = <-s.writeCh.nCh - err = <-s.writeCh.errCh + if len(buf) == 0 { + return 0, nil } + //logs.Warn("write buf", len(buf)) + //now := time.Now() + n, err = s.sendWindow.WriteFull(buf, s.connId) + //t := time.Now().Sub(now) + //if t.Seconds() > 0.5 { + // logs.Warn("conn write long", n, t.Seconds()) + //} return } -func (s *conn) write(nCh chan int, errCh chan error) { - var n int - var err error - for { - buf, err := s.sendWindow.WriteTo() - // get the usable window size buf from send window - if buf == nil && err == io.EOF { - // send window is drain, break the loop - err = nil - break - } - if err != nil { - break - } - n += len(buf) - s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf) - // send to other side, not send nil data to other side - } - nCh <- n - errCh <- err -} func (s *conn) Close() (err error) { s.once.Do(s.closeProcess) @@ -138,288 +105,531 @@ func (s *conn) Close() (err error) { func (s *conn) closeProcess() { s.isClose = true - s.mux.connMap.Delete(s.connId) - if !s.mux.IsClose { + s.receiveWindow.mux.connMap.Delete(s.connId) + if !s.receiveWindow.mux.IsClose { // if server or user close the conn while reading, will get a io.EOF // and this Close method will be invoke, send this signal to close other side - s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) + s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) } s.sendWindow.CloseWindow() s.receiveWindow.CloseWindow() + //d := getM(s.label, int(s.connId)) + //d.isClose = true + //d.logs = append(d.logs, s.label+"close "+time.Now().String()) + //setM(s.label, int(s.connId), d) return } func (s *conn) LocalAddr() net.Addr { - return s.mux.conn.LocalAddr() + return s.receiveWindow.mux.conn.LocalAddr() } func (s *conn) RemoteAddr() net.Addr { - return s.mux.conn.RemoteAddr() + return s.receiveWindow.mux.conn.RemoteAddr() } func (s *conn) SetDeadline(t time.Time) error { - s.readTimeOut = t - s.writeTimeOut = t + _ = s.SetReadDeadline(t) + _ = s.SetWriteDeadline(t) return nil } func (s *conn) SetReadDeadline(t time.Time) error { - s.readTimeOut = t + s.receiveWindow.SetTimeOut(t) return nil } func (s *conn) SetWriteDeadline(t time.Time) error { - s.writeTimeOut = t + s.sendWindow.SetTimeOut(t) return nil } type window struct { - windowBuff []byte - off uint16 - readOp chan struct{} - readWait bool - WindowFull bool - usableReceiveWindow chan uint16 - WriteWg sync.WaitGroup - closeOp bool - closeOpCh chan struct{} - WriteEndOp chan struct{} - mutex sync.Mutex + remainingWait uint64 // 64bit alignment + off uint32 + maxSize uint32 + closeOp bool + closeOpCh chan struct{} + mux *Mux } -func (Self *window) NewReceive() { +func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) { + const mask = 1<> dequeueBits) & mask) + wait = uint32(ptrs & mask) + return +} + +func (Self *window) pack(remaining, wait uint32) uint64 { + const mask = 1< 0 { + n = uint32(l) + } return } -func (Self *window) liteSlide() { - // slide by re slice - Self.windowBuff = Self.windowBuff[Self.off:] - Self.off = 0 - return -} - -func (Self *window) Size() (n int) { - // receive Window remaining - n = common.PoolSizeWindow - Self.len() - return -} - -func (Self *window) len() (n int) { - n = len(Self.windowBuff[Self.off:]) - return -} - -func (Self *window) cap() (n int) { - n = cap(Self.windowBuff[Self.off:]) - return -} - -func (Self *window) grow(n int) { - Self.windowBuff = Self.windowBuff[:Self.len()+n] -} - -func (Self *window) Write(p []byte) (n int, err error) { - if Self.closeOp { - return 0, errors.New("conn.receiveWindow: write on closed window") - } - if len(p) > Self.Size() { - return 0, errors.New("conn.receiveWindow: write too large") - } - Self.mutex.Lock() - // slide the offset - if len(p) > Self.cap()-Self.len() { - // not enough space, need to allocate - Self.fullSlide() - } else { - // have enough space, re slice - Self.liteSlide() - } - length := Self.len() // length before grow - Self.grow(len(p)) // grow for copy - n = copy(Self.windowBuff[length:], p) // must copy data before allow Read - if Self.readWait { - // if there condition is length == 0 and - // Read method just take away all the windowBuff, - // this method will block until windowBuff is empty again - - // allow continue read - defer Self.allowRead() - } - Self.mutex.Unlock() - return n, nil -} - -func (Self *window) allowRead() (closed bool) { - if Self.closeOp { - close(Self.readOp) - return true - } - Self.mutex.Lock() - Self.readWait = false - Self.mutex.Unlock() - select { - case <-Self.closeOpCh: - close(Self.readOp) - return true - case Self.readOp <- struct{}{}: - return false - } -} - -func (Self *window) Read(p []byte) (n int, err error) { - if Self.closeOp { - return 0, io.EOF // Write method receive close signal, returns eof - } - Self.mutex.Lock() - length := Self.len() // protect the length data, it invokes - // before Write lock and after Write unlock - if length == 0 { - // window is empty, waiting for Write method send a success readOp signal - // or get timeout or close - Self.readWait = true - Self.mutex.Unlock() - ticker := time.NewTicker(2 * time.Minute) - defer ticker.Stop() - select { - case _, ok := <-Self.readOp: - if !ok { - return 0, errors.New("conn.receiveWindow: window closed") - } - case <-Self.WriteEndOp: - return 0, io.EOF // receive eof signal, returns eof - case <-ticker.C: - return 0, errors.New("conn.receiveWindow: read time out") - case <-Self.closeOpCh: - close(Self.readOp) - return 0, io.EOF // receive close signal, returns eof +func (Self *ReceiveWindow) calcSize() { + // calculating maximum receive window size + if Self.count == 0 { + //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) + conns := Self.mux.connMap.Size() + n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * + Self.mux.bw.Get() / float64(conns)) + if n < common.MAXIMUM_SEGMENT_SIZE*10 { + n = common.MAXIMUM_SEGMENT_SIZE * 10 } - } else { - Self.mutex.Unlock() + bufLen := Self.bufQueue.Len() + if n < bufLen { + n = bufLen + } + if n < Self.maxSize/2 { + n = Self.maxSize / 2 + } + // set the minimal size + if n > 2*Self.maxSize { + n = 2 * Self.maxSize + } + if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) { + n = common.MAXIMUM_WINDOW_SIZE / uint32(conns) + } + // set the maximum size + //logs.Warn("n", n) + atomic.StoreUint32(&Self.maxSize, n) + Self.count = -10 } - minCopy := 512 + Self.count += 1 + return +} + +func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) { + if Self.closeOp { + return errors.New("conn.receiveWindow: write on closed window") + } + element, err := NewListElement(buf, l, part) + //logs.Warn("push the buf", len(buf), l, (&element).l) + if err != nil { + return + } + Self.calcSize() // calculate the max window size + var wait uint32 +start: + ptrs := atomic.LoadUint64(&Self.remainingWait) + _, wait = Self.unpack(ptrs) + newRemaining := Self.remainingSize(l) + // calculate the remaining window size now, plus the element we will push + if newRemaining == 0 { + //logs.Warn("window full true", remaining) + wait = 1 + } + if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) { + goto start + // another goroutine change the status, make sure shall we need wait + } + Self.bufQueue.Push(element) + // status check finish, now we can push the element into the queue + if wait == 0 { + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining) + // send the remaining window size, not including zero size + } + return nil +} + +func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) { + if Self.closeOp { + return 0, io.EOF // receive close signal, returns eof + } + pOff := 0 + l := 0 + //logs.Warn("receive window read off, element.l", Self.off, Self.element.l) +copyData: + if Self.off == uint32(Self.element.L) { + // on the first Read method invoked, Self.off and Self.element.l + // both zero value + common.ListElementPool.Put(Self.element) + if Self.closeOp { + return 0, io.EOF + } + Self.element, err = Self.bufQueue.Pop() + // if the queue is empty, Pop method will wait until one element push + // into the queue successful, or timeout. + // timer start on timeout parameter is set up , + // reset to 60s if timeout and data still available + Self.off = 0 + if err != nil { + return // queue receive stop or time out, break the loop and return + } + //logs.Warn("pop element", Self.element.l, Self.element.part) + } + l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L]) + pOff += l + Self.off += uint32(l) + //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len()) + n += l + l = 0 + if Self.off == uint32(Self.element.L) { + //logs.Warn("put the element end ", string(Self.element.buf[:15])) + common.WindowBuff.Put(Self.element.Buf) + Self.sendStatus(id, Self.element.L) + // check the window full status + } + if pOff < len(p) && Self.element.Part { + // element is a part of the segments, trying to fill up buf p + goto copyData + } + return // buf p is full or all of segments in buf, return +} + +func (Self *ReceiveWindow) sendStatus(id int32, l uint16) { + var remaining, wait uint32 for { - Self.mutex.Lock() - if len(p) == n || Self.len() == 0 { - Self.mutex.Unlock() + ptrs := atomic.LoadUint64(&Self.remainingWait) + remaining, wait = Self.unpack(ptrs) + remaining += uint32(l) + if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) { break } - if n+minCopy > len(p) { - minCopy = len(p) - n - } - i := copy(p[n:n+minCopy], Self.windowBuff[Self.off:]) - Self.off += uint16(i) - n += i - Self.mutex.Unlock() + runtime.Gosched() + // another goroutine change remaining or wait status, make sure + // we need acknowledge other side + } + // now we get the current window status success + if wait == 1 { + //logs.Warn("send the wait status", remaining) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining) } - p = p[:n] return } -func (Self *window) WriteTo() (p []byte, err error) { - if Self.closeOp { - return nil, errors.New("conn.writeWindow: window closed") - } - if Self.len() == 0 { - return nil, io.EOF - // send window buff is drain, return eof and get another one - } - var windowSize uint16 - var ok bool -waiting: - ticker := time.NewTicker(2 * time.Minute) - defer ticker.Stop() - // waiting for receive usable window size, or timeout - select { - case windowSize, ok = <-Self.usableReceiveWindow: - if !ok { - return nil, errors.New("conn.writeWindow: window closed") - } - case <-ticker.C: - return nil, errors.New("conn.writeWindow: write to time out") - case <-Self.closeOpCh: - return nil, errors.New("conn.writeWindow: window closed") - } - if windowSize == 0 { - goto waiting // waiting for another usable window size - } - Self.mutex.Lock() - if windowSize > uint16(Self.len()) { - // usable window size is bigger than window buff size, send the full buff - windowSize = uint16(Self.len()) - } - p = Self.windowBuff[Self.off : windowSize+Self.off] - Self.off += windowSize - Self.mutex.Unlock() - return +func (Self *ReceiveWindow) SetTimeOut(t time.Time) { + // waiting for FIFO queue Pop method + Self.bufQueue.SetTimeOut(t) } -func (Self *window) SetAllowSize(value uint16) (closed bool) { +func (Self *ReceiveWindow) Stop() { + // queue has no more data to push, so unblock pop method + Self.once.Do(Self.bufQueue.Stop) +} + +func (Self *ReceiveWindow) CloseWindow() { + Self.window.CloseWindow() + Self.Stop() + Self.release() +} + +func (Self *ReceiveWindow) release() { + //if Self.element != nil { + // if Self.element.Buf != nil { + // common.WindowBuff.Put(Self.element.Buf) + // } + // common.ListElementPool.Put(Self.element) + //} + for { + Self.element = Self.bufQueue.TryPop() + if Self.element == nil { + return + } + if Self.element.Buf != nil { + common.WindowBuff.Put(Self.element.Buf) + } + common.ListElementPool.Put(Self.element) + } // release resource +} + +type SendWindow struct { + window + buf []byte + setSizeCh chan struct{} + timeout time.Time +} + +func (Self *SendWindow) New(mux *Mux) { + Self.setSizeCh = make(chan struct{}) + Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10 + atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)< common.MAXIMUM_SEGMENT_SIZE { + sendSize = common.MAXIMUM_SEGMENT_SIZE + //logs.Warn("cut buf by mss") + } else { + sendSize = uint32(len(Self.buf[Self.off:])) + } + if remaining < sendSize { + // usable window size is small than + // window MAXIMUM_SEGMENT_SIZE or send buf left + sendSize = remaining + //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:])) + } + //logs.Warn("send size", sendSize) + if sendSize < uint32(len(Self.buf[Self.off:])) { + part = true + } + p = Self.buf[Self.off : sendSize+Self.off] + Self.off += sendSize + Self.sent(sendSize) return } -type waitingCh struct { - nCh chan int - errCh chan error +func (Self *SendWindow) waitReceiveWindow() (err error) { + t := Self.timeout.Sub(time.Now()) + if t < 0 { + t = time.Minute * 5 + } + timer := time.NewTimer(t) + defer timer.Stop() + // waiting for receive usable window size, or timeout + select { + case _, ok := <-Self.setSizeCh: + if !ok { + return errors.New("conn.writeWindow: window closed") + } + return nil + case <-timer.C: + return errors.New("conn.writeWindow: write to time out") + case <-Self.closeOpCh: + return errors.New("conn.writeWindow: window closed") + } } -func (Self *waitingCh) new() { - Self.nCh = make(chan int) - Self.errCh = make(chan error) +func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) { + Self.SetSendBuf(buf) // set the buf to send window + //logs.Warn("set the buf to send window") + var bufSeg []byte + var part bool + var l uint32 + for { + bufSeg, l, part, err = Self.WriteTo() + //logs.Warn("buf seg", len(bufSeg), part, err) + // get the buf segments from send window + if bufSeg == nil && part == false && err == io.EOF { + // send window is drain, break the loop + err = nil + break + } + if err != nil { + break + } + n += int(l) + l = 0 + if part { + Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg) + } else { + Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg) + //logs.Warn("buf seg sent", len(bufSeg), part, err) + } + // send to other side, not send nil data to other side + } + //logs.Warn("buf seg write success") + return } -func (Self *waitingCh) close() { - close(Self.nCh) - close(Self.errCh) +func (Self *SendWindow) SetTimeOut(t time.Time) { + // waiting for receive a receive window size + Self.timeout = t } + +//type bandwidth struct { +// readStart time.Time +// lastReadStart time.Time +// readEnd time.Time +// lastReadEnd time.Time +// bufLength int +// lastBufLength int +// count int8 +// readBW float64 +// writeBW float64 +// readBandwidth float64 +//} +// +//func (Self *bandwidth) StartRead() { +// Self.lastReadStart, Self.readStart = Self.readStart, time.Now() +// if !Self.lastReadStart.IsZero() { +// if Self.count == -5 { +// Self.calcBandWidth() +// } +// } +//} +// +//func (Self *bandwidth) EndRead() { +// Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now() +// if Self.count == -5 { +// Self.calcWriteBandwidth() +// } +// if Self.count == 0 { +// Self.calcReadBandwidth() +// Self.count = -6 +// } +// Self.count += 1 +//} +// +//func (Self *bandwidth) SetCopySize(n int) { +// // must be invoke between StartRead and EndRead +// Self.lastBufLength, Self.bufLength = Self.bufLength, n +//} +//// calculating +//// start end start end +//// read read +//// write +// +//func (Self *bandwidth) calcBandWidth() { +// t := Self.readStart.Sub(Self.lastReadStart) +// if Self.lastBufLength >= 32768 { +// Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds() +// } +//} +// +//func (Self *bandwidth) calcReadBandwidth() { +// // Bandwidth between nps and npc +// readTime := Self.readEnd.Sub(Self.readStart) +// Self.readBW = float64(Self.bufLength) / readTime.Seconds() +// //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds()) +//} +// +//func (Self *bandwidth) calcWriteBandwidth() { +// // Bandwidth between nps and user, npc and application +// writeTime := Self.readStart.Sub(Self.lastReadEnd) +// Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds() +// //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds()) +//} +// +//func (Self *bandwidth) Get() (bw float64) { +// // The zero value, 0 for numeric types +// if Self.writeBW == 0 && Self.readBW == 0 { +// //logs.Warn("bw both 0") +// return 100 +// } +// if Self.writeBW == 0 && Self.readBW != 0 { +// return Self.readBW +// } +// if Self.readBW == 0 && Self.writeBW != 0 { +// return Self.writeBW +// } +// return Self.readBandwidth +//} diff --git a/lib/mux/map.go b/lib/mux/map.go index 0801201..86d09b5 100644 --- a/lib/mux/map.go +++ b/lib/mux/map.go @@ -2,28 +2,35 @@ package mux import ( "sync" - "time" ) type connMap struct { connMap map[int32]*conn - closeCh chan struct{} + //closeCh chan struct{} sync.RWMutex } func NewConnMap() *connMap { connMap := &connMap{ connMap: make(map[int32]*conn), - closeCh: make(chan struct{}), + //closeCh: make(chan struct{}), } - go connMap.clean() + //go connMap.clean() return connMap } +func (s *connMap) Size() (n int) { + s.Lock() + n = len(s.connMap) + s.Unlock() + return +} + func (s *connMap) Get(id int32) (*conn, bool) { s.Lock() - defer s.Unlock() - if v, ok := s.connMap[id]; ok && v != nil { + v, ok := s.connMap[id] + s.Unlock() + if ok && v != nil { return v, true } return nil, false @@ -31,40 +38,38 @@ func (s *connMap) Get(id int32) (*conn, bool) { func (s *connMap) Set(id int32, v *conn) { s.Lock() - defer s.Unlock() s.connMap[id] = v + s.Unlock() } func (s *connMap) Close() { - s.Lock() - defer s.Unlock() + //s.closeCh <- struct{}{} // stop the clean goroutine first for _, v := range s.connMap { - v.isClose = true + v.Close() // close all the connections in the mux } - s.closeCh <- struct{}{} } func (s *connMap) Delete(id int32) { s.Lock() - defer s.Unlock() delete(s.connMap, id) + s.Unlock() } -func (s *connMap) clean() { - ticker := time.NewTimer(time.Minute * 1) - for { - select { - case <-ticker.C: - s.Lock() - for _, v := range s.connMap { - if v.isClose { - delete(s.connMap, v.connId) - } - } - s.Unlock() - case <-s.closeCh: - ticker.Stop() - return - } - } -} +//func (s *connMap) clean() { +// ticker := time.NewTimer(time.Minute * 1) +// for { +// select { +// case <-ticker.C: +// s.Lock() +// for _, v := range s.connMap { +// if v.isClose { +// delete(s.connMap, v.connId) +// } +// } +// s.Unlock() +// case <-s.closeCh: +// ticker.Stop() +// return +// } +// } +//} diff --git a/lib/mux/mux.go b/lib/mux/mux.go index a662ad0..a43510a 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -1,11 +1,10 @@ package mux import ( - "bytes" "errors" + "io" "math" "net" - "sync" "sync/atomic" "time" @@ -14,37 +13,47 @@ import ( ) type Mux struct { + latency uint64 // we store latency in bits, but it's float64 net.Listener - conn net.Conn - connMap *connMap - newConnCh chan *conn - id int32 - closeChan chan struct{} - IsClose bool - pingOk int - connType string - writeQueue Queue - bufCh chan *bytes.Buffer - sync.Mutex + conn net.Conn + connMap *connMap + newConnCh chan *conn + id int32 + closeChan chan struct{} + IsClose bool + pingOk uint32 + counter *latencyCounter + bw *bandwidth + pingCh chan []byte + pingCheckTime uint32 + connType string + writeQueue PriorityQueue + newConnQueue ConnQueue } func NewMux(c net.Conn, connType string) *Mux { + //c.(*net.TCPConn).SetReadBuffer(0) + //c.(*net.TCPConn).SetWriteBuffer(0) m := &Mux{ conn: c, connMap: NewConnMap(), id: 0, - closeChan: make(chan struct{}), + closeChan: make(chan struct{}, 1), newConnCh: make(chan *conn), + bw: new(bandwidth), IsClose: false, connType: connType, - bufCh: make(chan *bytes.Buffer), + pingCh: make(chan []byte), + counter: newLatencyCounter(), } m.writeQueue.New() + m.newConnQueue.New() //read session by flag - go m.readSession() + m.readSession() //ping - go m.ping() - go m.writeSession() + m.ping() + m.pingReturn() + m.writeSession() return m } @@ -52,7 +61,7 @@ func (s *Mux) NewConn() (*conn, error) { if s.IsClose { return nil, errors.New("the mux has closed") } - conn := NewConn(s.getId(), s) + conn := NewConn(s.getId(), s, "nps ") //it must be set before send s.connMap.Set(conn.connId, conn) s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil) @@ -83,12 +92,17 @@ func (s *Mux) Addr() net.Addr { return s.conn.LocalAddr() } -func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { +func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { + if s.IsClose { + return + } var err error pack := common.MuxPack.Get() - err = pack.NewPac(flag, id, data) + err = pack.NewPac(flag, id, data...) if err != nil { common.MuxPack.Put(pack) + logs.Error("mux: new pack err", err) + s.Close() return } s.writeQueue.Push(pack) @@ -97,123 +111,183 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { func (s *Mux) writeSession() { go s.packBuf() - go s.writeBuf() - <-s.closeChan + //go s.writeBuf() } func (s *Mux) packBuf() { + //buffer := common.BuffPool.Get() for { + if s.IsClose { + break + } + //buffer.Reset() pack := s.writeQueue.Pop() - buffer := common.BuffPool.Get() - err := pack.Pack(buffer) + if s.IsClose { + break + } + //buffer := common.BuffPool.Get() + err := pack.Pack(s.conn) common.MuxPack.Put(pack) if err != nil { - logs.Warn("pack err", err) - common.BuffPool.Put(buffer) + logs.Error("mux: pack err", err) + //common.BuffPool.Put(buffer) + s.Close() break } - select { - case s.bufCh <- buffer: - case <-s.closeChan: - break - } - + //logs.Warn(buffer.String()) + //s.bufQueue.Push(buffer) + //l := buffer.Len() + //n, err := buffer.WriteTo(s.conn) + //common.BuffPool.Put(buffer) + //if err != nil || int(n) != l { + // logs.Error("mux: close from write session fail ", err, n, l) + // s.Close() + // break + //} } } -func (s *Mux) writeBuf() { - for { - select { - case buffer := <-s.bufCh: - l := buffer.Len() - n, err := buffer.WriteTo(s.conn) - common.BuffPool.Put(buffer) - if err != nil || int(n) != l { - logs.Warn("close from write session fail ", err, n, l) - s.Close() - break - } - case <-s.closeChan: - break - } - } -} +//func (s *Mux) writeBuf() { +// for { +// if s.IsClose { +// break +// } +// buffer, err := s.bufQueue.Pop() +// if err != nil { +// break +// } +// l := buffer.Len() +// n, err := buffer.WriteTo(s.conn) +// common.BuffPool.Put(buffer) +// if err != nil || int(n) != l { +// logs.Warn("close from write session fail ", err, n, l) +// s.Close() +// break +// } +// } +//} func (s *Mux) ping() { go func() { - ticker := time.NewTicker(time.Second * 1) + now, _ := time.Now().UTC().MarshalText() + s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) + // send the ping flag and get the latency first + ticker := time.NewTicker(time.Second * 5) for { + if s.IsClose { + ticker.Stop() + break + } select { case <-ticker.C: } - //Avoid going beyond the scope - if (math.MaxInt32 - s.id) < 10000 { - s.id = 0 + if atomic.LoadUint32(&s.pingCheckTime) >= 60 { + logs.Error("mux: ping time out") + s.Close() + // more than 5 minutes not receive the ping return package, + // mux conn is damaged, maybe a packet drop, close it + break } - s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, nil) - if s.pingOk > 10 && s.connType == "kcp" { + now, _ := time.Now().UTC().MarshalText() + s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) + atomic.AddUint32(&s.pingCheckTime, 1) + if atomic.LoadUint32(&s.pingOk) > 10 && s.connType == "kcp" { + logs.Error("mux: kcp ping err") s.Close() break } - s.pingOk++ + atomic.AddUint32(&s.pingOk, 1) + } + }() +} + +func (s *Mux) pingReturn() { + go func() { + var now time.Time + var data []byte + for { + if s.IsClose { + break + } + select { + case data = <-s.pingCh: + atomic.StoreUint32(&s.pingCheckTime, 0) + case <-s.closeChan: + break + } + _ = now.UnmarshalText(data) + latency := time.Now().UTC().Sub(now).Seconds() / 2 + if latency > 0 { + atomic.StoreUint64(&s.latency, math.Float64bits(s.counter.Latency(latency))) + // convert float64 to bits, store it atomic + } + //logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency))) + if cap(data) > 0 { + common.WindowBuff.Put(data) + } } }() - select { - case <-s.closeChan: - } } func (s *Mux) readSession() { go func() { - pack := common.MuxPack.Get() + var connection *conn for { - pack = common.MuxPack.Get() - if pack.UnPack(s.conn) != nil { + if s.IsClose { break } - s.pingOk = 0 + connection = s.newConnQueue.Pop() + if s.IsClose { + break // make sure that is closed + } + s.connMap.Set(connection.connId, connection) //it has been set before send ok + s.newConnCh <- connection + s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) + } + }() + go func() { + pack := common.MuxPack.Get() + var l uint16 + var err error + for { + if s.IsClose { + break + } + pack = common.MuxPack.Get() + s.bw.StartRead() + if l, err = pack.UnPack(s.conn); err != nil { + logs.Error("mux: read session unpack from connection err", err) + s.Close() + break + } + s.bw.SetCopySize(l) + atomic.StoreUint32(&s.pingOk, 0) switch pack.Flag { case common.MUX_NEW_CONN: //new connection connection := NewConn(pack.Id, s) - s.connMap.Set(pack.Id, connection) //it has been set before send ok - go func(connection *conn) { - connection.sendWindow.SetAllowSize(512) // set the initial receive window - }(connection) - s.newConnCh <- connection - s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) + s.newConnQueue.Push(connection) continue case common.MUX_PING_FLAG: //ping - go s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil) + s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content) + common.WindowBuff.Put(pack.Content) continue case common.MUX_PING_RETURN: + //go func(content []byte) { + s.pingCh <- pack.Content + //}(pack.Content) continue } if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose { switch pack.Flag { - case common.MUX_NEW_MSG: //new msg from remote connection - //insert wait queue - if connection.isClose { - continue + case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection + err = s.newMsg(connection, pack) + if err != nil { + logs.Error("mux: read session connection new msg err", err) + connection.Close() } - connection.receiveWindow.WriteWg.Add(1) - go func(connection *conn, content []byte) { // do not block read session - _, err := connection.receiveWindow.Write(content) - if err != nil { - logs.Warn("mux new msg err close", err) - connection.Close() - } - size := connection.receiveWindow.Size() - if size == 0 { - connection.receiveWindow.WindowFull = true - } - s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size) - connection.receiveWindow.WriteWg.Done() - }(connection, pack.Content) continue case common.MUX_NEW_CONN_OK: //connection ok connection.connStatusOkCh <- struct{}{} - go connection.sendWindow.SetAllowSize(512) - // set the initial receive window both side continue case common.MUX_NEW_CONN_Fail: connection.connStatusFailCh <- struct{}{} @@ -222,15 +296,14 @@ func (s *Mux) readSession() { if connection.isClose { continue } - go connection.sendWindow.SetAllowSize(pack.Window) + connection.sendWindow.SetSize(pack.Window, pack.ReadLength) continue case common.MUX_CONN_CLOSE: //close the connection - s.connMap.Delete(pack.Id) connection.closeFlag = true - go func(connection *conn) { - connection.receiveWindow.WriteWg.Wait() - connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window - }(connection) + //s.connMap.Delete(pack.Id) + //go func(connection *conn) { + connection.receiveWindow.Stop() // close signal to receive window + //}(connection) continue } } else if pack.Flag == common.MUX_CONN_CLOSE { @@ -241,32 +314,195 @@ func (s *Mux) readSession() { common.MuxPack.Put(pack) s.Close() }() - select { - case <-s.closeChan: - } } -func (s *Mux) Close() error { +func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) { + if connection.isClose { + err = io.ErrClosedPipe + return + } + //logs.Warn("read session receive new msg", pack.Length) + //go func(connection *conn, pack *common.MuxPackager) { // do not block read session + //insert into queue + if pack.Flag == common.MUX_NEW_MSG_PART { + err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id) + } + if pack.Flag == common.MUX_NEW_MSG { + err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id) + } + //logs.Warn("read session write success", pack.Length) + return +} + +func (s *Mux) Close() (err error) { logs.Warn("close mux") if s.IsClose { return errors.New("the mux has closed") } s.IsClose = true s.connMap.Close() - s.closeChan <- struct{}{} - s.closeChan <- struct{}{} - s.closeChan <- struct{}{} - s.closeChan <- struct{}{} + s.connMap = nil + //s.bufQueue.Stop() s.closeChan <- struct{}{} close(s.newConnCh) - return s.conn.Close() + err = s.conn.Close() + s.release() + return +} + +func (s *Mux) release() { + for { + pack := s.writeQueue.TryPop() + if pack == nil { + break + } + if pack.BasePackager.Content != nil { + common.WindowBuff.Put(pack.BasePackager.Content) + } + common.MuxPack.Put(pack) + } + for { + connection := s.newConnQueue.TryPop() + if connection == nil { + break + } + connection = nil + } + s.writeQueue.Stop() + s.newConnQueue.Stop() } //get new connId as unique flag func (s *Mux) getId() (id int32) { + //Avoid going beyond the scope + if (math.MaxInt32 - s.id) < 10000 { + atomic.StoreInt32(&s.id, 0) + } id = atomic.AddInt32(&s.id, 1) if _, ok := s.connMap.Get(id); ok { - s.getId() + return s.getId() } return } + +type bandwidth struct { + readBandwidth uint64 // store in bits, but it's float64 + readStart time.Time + lastReadStart time.Time + bufLength uint32 +} + +func (Self *bandwidth) StartRead() { + if Self.readStart.IsZero() { + Self.readStart = time.Now() + } + if Self.bufLength >= common.MAXIMUM_SEGMENT_SIZE*300 { + Self.lastReadStart, Self.readStart = Self.readStart, time.Now() + Self.calcBandWidth() + } +} + +func (Self *bandwidth) SetCopySize(n uint16) { + Self.bufLength += uint32(n) +} + +func (Self *bandwidth) calcBandWidth() { + t := Self.readStart.Sub(Self.lastReadStart) + atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds())) + Self.bufLength = 0 +} + +func (Self *bandwidth) Get() (bw float64) { + // The zero value, 0 for numeric types + bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth)) + if bw <= 0 { + bw = 100 + } + //logs.Warn(bw) + return +} + +const counterBits = 4 +const counterMask = 1<> counterBits) & counterMask) + // we set head is 4 bits + min = uint8(idxs & counterMask) + return +} + +func (Self *latencyCounter) pack(head, min uint8) uint8 { + return uint8(head< value { + min = head + } + head++ + Self.headMin = Self.pack(head, min) +} + +func (Self *latencyCounter) minimal() (min uint8) { + var val float64 + var i uint8 + for i = 0; i < counterMask; i++ { + if Self.buf[i] > 0 { + if val > Self.buf[i] { + val = Self.buf[i] + min = i + } + } + } + return +} + +func (Self *latencyCounter) Latency(value float64) (latency float64) { + Self.add(value) + _, min := Self.unpack(Self.headMin) + latency = Self.buf[min] * Self.countSuccess() + return +} + +const lossRatio = 1.6 + +func (Self *latencyCounter) countSuccess() (successRate float64) { + var success, loss, i uint8 + _, min := Self.unpack(Self.headMin) + for i = 0; i < counterMask; i++ { + if Self.buf[i] > lossRatio*Self.buf[min] && Self.buf[i] > 0 { + loss++ + } + if Self.buf[i] <= lossRatio*Self.buf[min] && Self.buf[i] > 0 { + success++ + } + } + // counting all the data in the ring buf, except zero + successRate = float64(success) / float64(loss+success) + return +} diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index c7b10e0..151def1 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -1,15 +1,22 @@ package mux import ( + "bufio" + "fmt" + "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/goroutine" + "io" + "log" "net" "net/http" + "net/http/httputil" _ "net/http/pprof" - "sync" + "strconv" "testing" "time" + "unsafe" "github.com/astaxie/beego/logs" - "github.com/cnlh/nps/lib/common" ) var conn1 net.Conn @@ -23,47 +30,54 @@ func TestNewMux(t *testing.T) { logs.SetLogFuncCallDepth(3) server() client() + //poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false)) time.Sleep(time.Second * 3) go func() { m2 := NewMux(conn2, "tcp") for { - logs.Warn("npc starting accept") + //logs.Warn("npc starting accept") c, err := m2.Accept() if err != nil { logs.Warn(err) continue } - logs.Warn("npc accept success ") + //logs.Warn("npc accept success ") c2, err := net.Dial("tcp", "127.0.0.1:80") if err != nil { logs.Warn(err) + c.Close() continue } - go func(c2 net.Conn, c net.Conn) { - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - _, err = common.CopyBuffer(c2, c) - if err != nil { - c2.Close() - c.Close() - logs.Warn("close npc by copy from nps", err) - } - wg.Done() - }() - wg.Add(1) - go func() { - _, err = common.CopyBuffer(c, c2) - if err != nil { - c2.Close() - c.Close() - logs.Warn("close npc by copy from server", err) - } - wg.Done() - }() - logs.Warn("npc wait") - wg.Wait() - }(c2, c) + //c2.(*net.TCPConn).SetReadBuffer(0) + //c2.(*net.TCPConn).SetReadBuffer(0) + _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil)) + //go func(c2 net.Conn, c *conn) { + // wg := new(sync.WaitGroup) + // wg.Add(2) + // _ = poolConnCopy.Invoke(common.newConnGroup(c2, c, wg)) + // //go func() { + // // _, err = common.CopyBuffer(c2, c) + // // if err != nil { + // // c2.Close() + // // c.Close() + // // //logs.Warn("close npc by copy from nps", err, c.connId) + // // } + // // wg.Done() + // //}() + // //wg.Add(1) + // _ = poolConnCopy.Invoke(common.newConnGroup(c, c2, wg)) + // //go func() { + // // _, err = common.CopyBuffer(c, c2) + // // if err != nil { + // // c2.Close() + // // c.Close() + // // //logs.Warn("close npc by copy from server", err, c.connId) + // // } + // // wg.Done() + // //}() + // //logs.Warn("npc wait") + // wg.Wait() + //}(c2, c.(*conn)) } }() @@ -74,39 +88,55 @@ func TestNewMux(t *testing.T) { logs.Warn(err) } for { - logs.Warn("nps starting accept") - conn, err := l.Accept() + //logs.Warn("nps starting accept") + conns, err := l.Accept() if err != nil { logs.Warn(err) continue } - logs.Warn("nps accept success starting new conn") + //conns.(*net.TCPConn).SetReadBuffer(0) + //conns.(*net.TCPConn).SetReadBuffer(0) + //logs.Warn("nps accept success starting new conn") tmpCpnn, err := m1.NewConn() if err != nil { logs.Warn("nps new conn err ", err) continue } - logs.Warn("nps new conn success ", tmpCpnn.connId) - go func(tmpCpnn net.Conn, conn net.Conn) { - go func() { - _, err := common.CopyBuffer(tmpCpnn, conn) - if err != nil { - conn.Close() - tmpCpnn.Close() - logs.Warn("close nps by copy from user") - } - }() - //time.Sleep(time.Second) - _, err = common.CopyBuffer(conn, tmpCpnn) - if err != nil { - conn.Close() - tmpCpnn.Close() - logs.Warn("close nps by copy from npc ") - } - }(tmpCpnn, conn) + //logs.Warn("nps new conn success ", tmpCpnn.connId) + _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil)) + //go func(tmpCpnn *conn, conns net.Conn) { + // wg := new(sync.WaitGroup) + // wg.Add(2) + // _ = poolConnCopy.Invoke(common.newConnGroup(tmpCpnn, conns, wg)) + // //go func() { + // // _, err := common.CopyBuffer(tmpCpnn, conns) + // // if err != nil { + // // conns.Close() + // // tmpCpnn.Close() + // // //logs.Warn("close nps by copy from user", tmpCpnn.connId, err) + // // } + // //}() + // //wg.Add(1) + // _ = poolConnCopy.Invoke(common.newConnGroup(conns, tmpCpnn, wg)) + // //time.Sleep(time.Second) + // //_, err = common.CopyBuffer(conns, tmpCpnn) + // //if err != nil { + // // conns.Close() + // // tmpCpnn.Close() + // // //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) + // //} + // wg.Wait() + //}(tmpCpnn, conns) } }() + //go NewLogServer() + time.Sleep(time.Second * 5) + //for i := 0; i < 1; i++ { + // go test_raw(i) + //} + //test_request() + for { time.Sleep(time.Second * 5) } @@ -135,6 +165,73 @@ func client() { } } +func test_request() { + conn, _ := net.Dial("tcp", "127.0.0.1:7777") + for i := 0; i < 1000; i++ { + conn.Write([]byte(`GET / HTTP/1.1 +Host: 127.0.0.1:7777 +Connection: keep-alive + + +`)) + r, err := http.ReadResponse(bufio.NewReader(conn), nil) + if err != nil { + logs.Warn("close by read response err", err) + break + } + logs.Warn("read response success", r) + b, err := httputil.DumpResponse(r, true) + if err != nil { + logs.Warn("close by dump response err", err) + break + } + fmt.Println(string(b[:20]), err) + //time.Sleep(time.Second) + } + logs.Warn("finish") +} + +func test_raw(k int) { + for i := 0; i < 1000; i++ { + ti := time.Now() + conn, err := net.Dial("tcp", "127.0.0.1:7777") + if err != nil { + logs.Warn("conn dial err", err) + } + tid := time.Now() + conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1 +Host: 127.0.0.1:7777 + + +`)) + tiw := time.Now() + buf := make([]byte, 3572) + n, err := io.ReadFull(conn, buf) + //n, err := conn.Read(buf) + if err != nil { + logs.Warn("close by read response err", err) + break + } + logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) + //time.Sleep(time.Second) + err = conn.Close() + if err != nil { + logs.Warn("close conn err ", err) + } + now := time.Now() + du := now.Sub(ti).Seconds() + dud := now.Sub(tid).Seconds() + duw := now.Sub(tiw).Seconds() + if du > 1 { + logs.Warn("duration long", du, dud, duw, k, i) + } + if n != 3572 { + logs.Warn("n loss", n, string(buf)) + } + } + logs.Warn("finish") +} + func TestNewConn(t *testing.T) { buf := common.GetBufPoolCopy() logs.Warn(len(buf), cap(buf)) @@ -146,3 +243,205 @@ func TestNewConn(t *testing.T) { logs.Warn(copy(buf[:3], b), len(buf), cap(buf)) logs.Warn(len(buf), buf[0]) } + +func TestDQueue(t *testing.T) { + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + d := new(bufDequeue) + d.vals = make([]unsafe.Pointer, 8) + go func() { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + logs.Warn(i) + logs.Warn(d.popTail()) + } + }() + go func() { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + data := "test" + go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data))) + } + }() + time.Sleep(time.Second * 3) +} + +func TestChain(t *testing.T) { + go func() { + log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) + }() + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + time.Sleep(time.Second * 5) + d := new(bufChain) + d.new(256) + go func() { + time.Sleep(time.Second) + for i := 0; i < 30000; i++ { + unsa, ok := d.popTail() + str := (*string)(unsa) + if ok { + fmt.Println(i, str, *str, ok) + //logs.Warn(i, str, *str, ok) + } else { + fmt.Println("nil", i, ok) + //logs.Warn("nil", i, ok) + } + } + }() + go func() { + time.Sleep(time.Second) + for i := 0; i < 3000; i++ { + go func(i int) { + for n := 0; n < 10; n++ { + data := "test " + strconv.Itoa(i) + strconv.Itoa(n) + fmt.Println(data, unsafe.Pointer(&data)) + //logs.Warn(data, unsafe.Pointer(&data)) + d.pushHead(unsafe.Pointer(&data)) + } + }(i) + } + }() + time.Sleep(time.Second * 100000) +} + +func TestFIFO(t *testing.T) { + go func() { + log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) + }() + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + time.Sleep(time.Second * 5) + d := new(ReceiveWindowQueue) + d.New() + go func() { + time.Sleep(time.Second) + for i := 0; i < 1001; i++ { + data, err := d.Pop() + if err == nil { + //fmt.Println(i, string(data.buf), err) + logs.Warn(i, string(data.Buf), err) + common.ListElementPool.Put(data) + } else { + //fmt.Println("err", err) + logs.Warn("err", err) + } + //logs.Warn(d.Len()) + } + logs.Warn("pop finish") + }() + go func() { + time.Sleep(time.Second * 10) + for i := 0; i < 1000; i++ { + by := []byte("test " + strconv.Itoa(i) + " ") // + data, _ := NewListElement(by, uint16(len(by)), true) + //fmt.Println(string((*data).buf), data) + //logs.Warn(string((*data).buf), data) + d.Push(data) + } + }() + time.Sleep(time.Second * 100000) +} + +func TestPriority(t *testing.T) { + go func() { + log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) + }() + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + time.Sleep(time.Second * 5) + d := new(PriorityQueue) + d.New() + go func() { + time.Sleep(time.Second) + for i := 0; i < 360050; i++ { + data := d.Pop() + //fmt.Println(i, string(data.buf), err) + logs.Warn(i, string(data.Content), data) + } + logs.Warn("pop finish") + }() + go func() { + time.Sleep(time.Second * 10) + for i := 0; i < 30000; i++ { + go func(i int) { + for n := 0; n < 10; n++ { + data := new(common.MuxPackager) + by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n)) + _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by) + //fmt.Println(string((*data).buf), data) + logs.Warn(string((*data).Content), data) + d.Push(data) + } + }(i) + go func(i int) { + data := new(common.MuxPackager) + _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil) + //fmt.Println(string((*data).buf), data) + logs.Warn(data) + d.Push(data) + }(i) + go func(i int) { + data := new(common.MuxPackager) + _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil) + //fmt.Println(string((*data).buf), data) + logs.Warn(data) + d.Push(data) + }(i) + } + }() + time.Sleep(time.Second * 100000) +} + +//func TestReceive(t *testing.T) { +// go func() { +// log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) +// }() +// logs.EnableFuncCallDepth(true) +// logs.SetLogFuncCallDepth(3) +// time.Sleep(time.Second * 5) +// mux := new(Mux) +// mux.bw.readBandwidth = float64(1*1024*1024) +// mux.latency = float64(1/1000) +// wind := new(ReceiveWindow) +// wind.New(mux) +// wind. +// go func() { +// time.Sleep(time.Second) +// for i := 0; i < 36000; i++ { +// data := d.Pop() +// //fmt.Println(i, string(data.buf), err) +// logs.Warn(i, string(data.Content), data) +// } +// }() +// go func() { +// time.Sleep(time.Second*10) +// for i := 0; i < 3000; i++ { +// go func(i int) { +// for n := 0; n < 10; n++{ +// data := new(common.MuxPackager) +// by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n)) +// _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by) +// //fmt.Println(string((*data).buf), data) +// logs.Warn(string((*data).Content), data) +// d.Push(data) +// } +// }(i) +// go func(i int) { +// data := new(common.MuxPackager) +// _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil) +// //fmt.Println(string((*data).buf), data) +// logs.Warn(data) +// d.Push(data) +// }(i) +// go func(i int) { +// data := new(common.MuxPackager) +// _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil) +// //fmt.Println(string((*data).buf), data) +// logs.Warn(data) +// d.Push(data) +// }(i) +// } +// }() +// time.Sleep(time.Second * 100000) +//} diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 081b2c9..6f14faa 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -1,95 +1,583 @@ package mux import ( - "container/list" + "errors" "github.com/cnlh/nps/lib/common" + "io" + "math" + "runtime" "sync" + "sync/atomic" + "time" + "unsafe" ) -type Queue struct { - list *list.List - readOp chan struct{} - cleanOp chan struct{} - popWait bool - mutex sync.Mutex +type PriorityQueue struct { + highestChain *bufChain + middleChain *bufChain + lowestChain *bufChain + starving uint8 + stop bool + cond *sync.Cond } -func (Self *Queue) New() { - Self.list = list.New() - Self.readOp = make(chan struct{}) - Self.cleanOp = make(chan struct{}, 2) +func (Self *PriorityQueue) New() { + Self.highestChain = new(bufChain) + Self.highestChain.new(4) + Self.middleChain = new(bufChain) + Self.middleChain.new(32) + Self.lowestChain = new(bufChain) + Self.lowestChain.new(256) + locker := new(sync.Mutex) + Self.cond = sync.NewCond(locker) } -func (Self *Queue) Push(packager *common.MuxPackager) { - Self.mutex.Lock() - if Self.popWait { - defer Self.allowPop() - } - if packager.Flag == common.MUX_CONN_CLOSE { - Self.insert(packager) // the close package may need priority, - // prevent wait too long to close - } else { - Self.list.PushBack(packager) - } - Self.mutex.Unlock() +func (Self *PriorityQueue) Push(packager *common.MuxPackager) { + //logs.Warn("push start") + Self.push(packager) + Self.cond.Broadcast() + //logs.Warn("push finish") return } -func (Self *Queue) allowPop() (closed bool) { - Self.mutex.Lock() - Self.popWait = false - Self.mutex.Unlock() +func (Self *PriorityQueue) push(packager *common.MuxPackager) { + switch packager.Flag { + case common.MUX_PING_FLAG, common.MUX_PING_RETURN: + Self.highestChain.pushHead(unsafe.Pointer(packager)) + // the ping package need highest priority + // prevent ping calculation error + case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail: + // the new conn package need some priority too + Self.middleChain.pushHead(unsafe.Pointer(packager)) + default: + Self.lowestChain.pushHead(unsafe.Pointer(packager)) + } +} + +const maxStarving uint8 = 8 + +func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { + var iter bool + for { + packager = Self.TryPop() + if packager != nil { + return + } + if Self.stop { + return + } + if iter { + break + // trying to pop twice + } + iter = true + runtime.Gosched() + } + Self.cond.L.Lock() + defer Self.cond.L.Unlock() + for packager = Self.TryPop(); packager == nil; { + if Self.stop { + return + } + //logs.Warn("queue into wait") + Self.cond.Wait() + // wait for it with no more iter + packager = Self.TryPop() + //logs.Warn("queue wait finish", packager) + } + return +} + +func (Self *PriorityQueue) TryPop() (packager *common.MuxPackager) { + ptr, ok := Self.highestChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + return + } + if Self.starving < maxStarving { + // not pop too much, lowestChain will wait too long + ptr, ok = Self.middleChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + Self.starving++ + return + } + } + ptr, ok = Self.lowestChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + if Self.starving > 0 { + Self.starving = uint8(Self.starving / 2) + } + return + } + if Self.starving > 0 { + ptr, ok = Self.middleChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + Self.starving++ + return + } + } + return +} + +func (Self *PriorityQueue) Stop() { + Self.stop = true + Self.cond.Broadcast() +} + +type ConnQueue struct { + chain *bufChain + starving uint8 + stop bool + cond *sync.Cond +} + +func (Self *ConnQueue) New() { + Self.chain = new(bufChain) + Self.chain.new(32) + locker := new(sync.Mutex) + Self.cond = sync.NewCond(locker) +} + +func (Self *ConnQueue) Push(connection *conn) { + Self.chain.pushHead(unsafe.Pointer(connection)) + Self.cond.Broadcast() + return +} + +func (Self *ConnQueue) Pop() (connection *conn) { + var iter bool + for { + connection = Self.TryPop() + if connection != nil { + return + } + if Self.stop { + return + } + if iter { + break + // trying to pop twice + } + iter = true + runtime.Gosched() + } + Self.cond.L.Lock() + defer Self.cond.L.Unlock() + for connection = Self.TryPop(); connection == nil; { + if Self.stop { + return + } + //logs.Warn("queue into wait") + Self.cond.Wait() + // wait for it with no more iter + connection = Self.TryPop() + //logs.Warn("queue wait finish", packager) + } + return +} + +func (Self *ConnQueue) TryPop() (connection *conn) { + ptr, ok := Self.chain.popTail() + if ok { + connection = (*conn)(ptr) + return + } + return +} + +func (Self *ConnQueue) Stop() { + Self.stop = true + Self.cond.Broadcast() +} + +func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) { + if uint16(len(buf)) != l { + err = errors.New("ListElement: buf length not match") + return + } + //if l == 0 { + // logs.Warn("push zero") + //} + element = common.ListElementPool.Get() + element.Buf = buf + element.L = l + element.Part = part + return +} + +type ReceiveWindowQueue struct { + chain *bufChain + stopOp chan struct{} + readOp chan struct{} + lengthWait uint64 // really strange ???? need put here + // https://golang.org/pkg/sync/atomic/#pkg-note-BUG + // On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. + // On ARM, x86-32, and 32-bit MIPS, it is the caller's responsibility + // to arrange for 64-bit alignment of 64-bit words accessed atomically. + // The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned. + timeout time.Time +} + +func (Self *ReceiveWindowQueue) New() { + Self.readOp = make(chan struct{}) + Self.chain = new(bufChain) + Self.chain.new(64) + Self.stopOp = make(chan struct{}, 2) +} + +func (Self *ReceiveWindowQueue) Push(element *common.ListElement) { + var length, wait uint32 + for { + ptrs := atomic.LoadUint64(&Self.lengthWait) + length, wait = Self.chain.head.unpack(ptrs) + length += uint32(element.L) + if atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(length, 0)) { + break + } + // another goroutine change the length or into wait, make sure + } + //logs.Warn("window push before", Self.Len(), uint32(element.l), len(element.buf)) + Self.chain.pushHead(unsafe.Pointer(element)) + //logs.Warn("window push", Self.Len()) + if wait == 1 { + Self.allowPop() + } + return +} + +func (Self *ReceiveWindowQueue) Pop() (element *common.ListElement, err error) { + var length uint32 +startPop: + ptrs := atomic.LoadUint64(&Self.lengthWait) + length, _ = Self.chain.head.unpack(ptrs) + if length == 0 { + if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) { + goto startPop // another goroutine is pushing + } + err = Self.waitPush() + // there is no more data in queue, wait for it + if err != nil { + return + } + goto startPop // wait finish, trying to get the new status + } + // length is not zero, so try to pop + for { + element = Self.TryPop() + if element != nil { + return + } + runtime.Gosched() // another goroutine is still pushing + } +} + +func (Self *ReceiveWindowQueue) TryPop() (element *common.ListElement) { + ptr, ok := Self.chain.popTail() + if ok { + //logs.Warn("window pop before", Self.Len()) + element = (*common.ListElement)(ptr) + atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *bufDequeue) pack(head, tail uint32) uint64 { + const mask = 1< 0 { + runtime.Gosched() + } + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if (tail+uint32(len(d.vals)))&(1<= 3 && atomic.LoadUint32(&d.starving) > 0 { + atomic.StoreUint32(&d.starving, 0) + } + break + } + starve++ + if starve >= 3 { + atomic.StoreUint32(&d.starving, 1) + } + } + // The head slot is free, so we own it. + *slot = val + return true +} + +// popTail removes and returns the element at the tail of the queue. +// It returns false if the queue is empty. It may be called by any +// number of consumers. +func (d *bufDequeue) popTail() (unsafe.Pointer, bool) { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + slot := &d.vals[tail&uint32(len(d.vals)-1)] + var val unsafe.Pointer + for { + val = atomic.LoadPointer(slot) + if val != nil { + // We now own slot. + break + } + // Another goroutine is still pushing data on the tail. + } + + // Tell pushHead that we're done with this slot. Zeroing the + // slot is also important so we don't leave behind references + // that could keep this object live longer than necessary. + // + // We write to val first and then publish that we're done with + atomic.StorePointer(slot, nil) + // At this point pushHead owns the slot. + if tail < math.MaxUint32 { + atomic.AddUint64(&d.headTail, 1) + } else { + atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1)) + } + return val, true +} + +// bufChain is a dynamically-sized version of bufDequeue. +// +// This is implemented as a doubly-linked list queue of poolDequeues +// where each dequeue is double the size of the previous one. Once a +// dequeue fills up, this allocates a new one and only ever pushes to +// the latest dequeue. Pops happen from the other end of the list and +// once a dequeue is exhausted, it gets removed from the list. +type bufChain struct { + // head is the bufDequeue to push to. This is only accessed + // by the producer, so doesn't need to be synchronized. + head *bufChainElt + + // tail is the bufDequeue to popTail from. This is accessed + // by consumers, so reads and writes must be atomic. + tail *bufChainElt + newChain uint32 +} + +type bufChainElt struct { + bufDequeue + + // next and prev link to the adjacent poolChainElts in this + // bufChain. + // + // next is written atomically by the producer and read + // atomically by the consumer. It only transitions from nil to + // non-nil. + // + // prev is written atomically by the consumer and read + // atomically by the producer. It only transitions from + // non-nil to nil. + next, prev *bufChainElt +} + +func storePoolChainElt(pp **bufChainElt, v *bufChainElt) { + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v)) +} + +func loadPoolChainElt(pp **bufChainElt) *bufChainElt { + return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp)))) +} + +func (c *bufChain) new(initSize int) { + // Initialize the chain. + // initSize must be a power of 2 + d := new(bufChainElt) + d.vals = make([]unsafe.Pointer, initSize) + storePoolChainElt(&c.head, d) + storePoolChainElt(&c.tail, d) +} + +func (c *bufChain) pushHead(val unsafe.Pointer) { +startPush: + for { + if atomic.LoadUint32(&c.newChain) > 0 { + runtime.Gosched() + } else { + break + } + } + + d := loadPoolChainElt(&c.head) + + if d.pushHead(val) { + return + } + + // The current dequeue is full. Allocate a new one of twice + // the size. + if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) { + newSize := len(d.vals) * 2 + if newSize >= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &bufChainElt{prev: d} + d2.vals = make([]unsafe.Pointer, newSize) + d2.pushHead(val) + storePoolChainElt(&c.head, d2) + storePoolChainElt(&d.next, d2) + atomic.StoreUint32(&c.newChain, 0) + return + } + goto startPush +} + +func (c *bufChain) popTail() (unsafe.Pointer, bool) { + d := loadPoolChainElt(&c.tail) + if d == nil { + return nil, false + } + + for { + // It's important that we load the next pointer + // *before* popping the tail. In general, d may be + // transiently empty, but if next is non-nil before + // the TryPop and the TryPop fails, then d is permanently + // empty, which is the only condition under which it's + // safe to drop d from the chain. + d2 := loadPoolChainElt(&d.next) + + if val, ok := d.popTail(); ok { + return val, ok + } + + if d2 == nil { + // This is the only dequeue. It's empty right + // now, but could be pushed to in the future. + return nil, false + } + + // The tail of the chain has been drained, so move on + // to the next dequeue. Try to drop it from the chain + // so the next TryPop doesn't have to look at the empty + // dequeue again. + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { + // We won the race. Clear the prev pointer so + // the garbage collector can collect the empty + // dequeue and so popHead doesn't back up + // further than necessary. + storePoolChainElt(&d2.prev, nil) + } + d = d2 + } } diff --git a/lib/mux/web.go b/lib/mux/web.go new file mode 100644 index 0000000..36b2017 --- /dev/null +++ b/lib/mux/web.go @@ -0,0 +1,154 @@ +package mux + +import ( + "fmt" + "github.com/astaxie/beego/logs" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +type connLog struct { + startTime time.Time + isClose bool + logs []string +} + +var logms map[int]*connLog +var logmc map[int]*connLog + +var copyMaps map[int]*connLog +var copyMapc map[int]*connLog +var stashTimeNow time.Time +var mutex sync.Mutex + +func deepCopyMaps() { + copyMaps = make(map[int]*connLog) + for k, v := range logms { + copyMaps[k] = &connLog{ + startTime: v.startTime, + isClose: v.isClose, + logs: v.logs, + } + } +} + +func deepCopyMapc() { + copyMapc = make(map[int]*connLog) + for k, v := range logmc { + copyMapc[k] = &connLog{ + startTime: v.startTime, + isClose: v.isClose, + logs: v.logs, + } + } +} + +func init() { + logms = make(map[int]*connLog) + logmc = make(map[int]*connLog) +} + +type IntSlice []int + +func (s IntSlice) Len() int { return len(s) } + +func (s IntSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s IntSlice) Less(i, j int) bool { return s[i] < s[j] } + +func NewLogServer() { + http.HandleFunc("/", index) + http.HandleFunc("/detail", detail) + http.HandleFunc("/stash", stash) + fmt.Println(http.ListenAndServe(":8899", nil)) +} + +func stash(w http.ResponseWriter, r *http.Request) { + stashTimeNow = time.Now() + deepCopyMaps() + deepCopyMapc() + w.Write([]byte("ok")) +} + +func getM(label string, id int) (cL *connLog) { + label = strings.TrimSpace(label) + mutex.Lock() + defer mutex.Unlock() + if label == "nps" { + cL = logms[id] + } + if label == "npc" { + cL = logmc[id] + } + return +} + +func setM(label string, id int, cL *connLog) { + label = strings.TrimSpace(label) + mutex.Lock() + defer mutex.Unlock() + if label == "nps" { + logms[id] = cL + } + if label == "npc" { + logmc[id] = cL + } +} + +func index(w http.ResponseWriter, r *http.Request) { + var keys []int + for k := range copyMaps { + keys = append(keys, k) + } + sort.Sort(IntSlice(keys)) + var s string + s += "

nps

" + for _, v := range keys { + connL := copyMaps[v] + s += "" + strconv.Itoa(v) + "----------" + s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------" + s += strconv.FormatBool(connL.isClose) + s += "
" + } + + keys = keys[:0] + s += "

npc

" + for k := range copyMapc { + keys = append(keys, k) + } + sort.Sort(IntSlice(keys)) + + for _, v := range keys { + connL := copyMapc[v] + s += "" + strconv.Itoa(v) + "----------" + s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------" + s += strconv.FormatBool(connL.isClose) + s += "
" + } + w.Write([]byte(s)) +} + +func detail(w http.ResponseWriter, r *http.Request) { + id := r.FormValue("id") + label := r.FormValue("label") + logs.Warn(label) + i, _ := strconv.Atoi(id) + var v *connLog + if label == "nps" { + v, _ = copyMaps[i] + } + if label == "npc" { + v, _ = copyMapc[i] + } + var s string + if v != nil { + for i, vv := range v.logs { + s += "

" + strconv.Itoa(i+1) + ":" + vv + "

" + } + } + w.Write([]byte(s)) +} diff --git a/lib/mux/web_test.go b/lib/mux/web_test.go new file mode 100644 index 0000000..91a0430 --- /dev/null +++ b/lib/mux/web_test.go @@ -0,0 +1,7 @@ +package mux + +import "testing" + +func TestWeb(t *testing.T) { + NewLogServer() +} diff --git a/lib/version/version.go b/lib/version/version.go index 902b30b..1f14ef1 100644 --- a/lib/version/version.go +++ b/lib/version/version.go @@ -1,8 +1,8 @@ package version -const VERSION = "0.23.2" +const VERSION = "0.24.0" // Compulsory minimum version, Minimum downward compatibility to this version func GetVersion() string { - return "0.21.0" + return "0.24.0" } diff --git a/server/proxy/socks5.go b/server/proxy/socks5.go index d555e9d..d79be72 100755 --- a/server/proxy/socks5.go +++ b/server/proxy/socks5.go @@ -199,8 +199,7 @@ func (s *Sock5ModeServer) handleConn(c net.Conn) { c.Close() return } - - if s.task.Client.Cnf.U != "" && s.task.Client.Cnf.P != "" { + if (s.task.Client.Cnf.U != "" && s.task.Client.Cnf.P != "") || (s.task.MultiAccount != nil && len(s.task.MultiAccount.AccountMap) > 0) { buf[1] = UserPassAuth c.Write(buf) if err := s.Auth(c); err != nil { @@ -237,7 +236,22 @@ func (s *Sock5ModeServer) Auth(c net.Conn) error { if _, err := io.ReadAtLeast(c, pass, passLen); err != nil { return err } - if string(user) == s.task.Client.Cnf.U && string(pass) == s.task.Client.Cnf.P { + + var U, P string + if s.task.MultiAccount != nil { + // enable multi user auth + U = string(user) + var ok bool + P, ok = s.task.MultiAccount.AccountMap[U] + if !ok { + return errors.New("验证不通过") + } + } else { + U = s.task.Client.Cnf.U + P = s.task.Client.Cnf.P + } + + if string(user) == U && string(pass) == P { if _, err := c.Write([]byte{userAuthVersion, authSuccess}); err != nil { return err } diff --git a/web/controllers/login.go b/web/controllers/login.go index c31e9a1..64873a4 100755 --- a/web/controllers/login.go +++ b/web/controllers/login.go @@ -22,7 +22,7 @@ func (self *LoginController) Verify() { if self.GetString("password") == beego.AppConfig.String("web_password") && self.GetString("username") == beego.AppConfig.String("web_username") { self.SetSession("isAdmin", true) auth = true - server.Bridge.Register.Store(common.GetIpByAddr(self.Ctx.Request.RemoteAddr), time.Now().Add(time.Hour*time.Duration(2))) + server.Bridge.Register.Store(common.GetIpByAddr(self.Ctx.Input.IP()), time.Now().Add(time.Hour*time.Duration(2))) } b, err := beego.AppConfig.Bool("allow_user_login") if err == nil && b && !auth {