Merge branch 'module' of https://github.com/cnlh/nps into module

This commit is contained in:
unknown 2019-12-21 17:22:54 +08:00
commit 621894ec58
34 changed files with 2415 additions and 637 deletions

38
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@ -0,0 +1,38 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: bug
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Opening '...'
2. Click on '....'
3. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots or logs**
Add screenshots or logs to help explain your problem.
**Server (please complete the following information):**
- OS: [e.g. Centos, Windows]
- ARCH: [e.g. Amd64, Arm]
- Tunnel [e.g. TCP, HTTP]
- Version [e.g. 0.24.0]
**Client (please complete the following information):**
- OS: [e.g. Centos, Windows]
- ARCH: [e.g. Amd64, Arm]
- Tunnel [e.g. TCP, HTTP]
- Version [e.g. 0.24.0]
**Additional context**
Add any other context about the problem here.

View File

@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: enhancement
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

10
Dockerfile.npc Executable file
View File

@ -0,0 +1,10 @@
FROM golang as builder
WORKDIR /go/src/github.com/cnlh/nps
COPY . .
RUN go get -d -v ./...
RUN CGO_ENABLED=0 go build -ldflags="-w -s -extldflags -static" ./cmd/npc/npc.go
FROM scratch
COPY --from=builder /go/src/github.com/cnlh/nps/npc /
VOLUME /conf
ENTRYPOINT ["/npc"]

11
Dockerfile.nps Executable file
View File

@ -0,0 +1,11 @@
FROM golang as builder
WORKDIR /go/src/github.com/cnlh/nps
COPY . .
RUN go get -d -v ./...
RUN CGO_ENABLED=0 go build -ldflags="-w -s -extldflags -static" ./cmd/nps/nps.go
FROM scratch
COPY --from=builder /go/src/github.com/cnlh/nps/nps /
COPY --from=builder /go/src/github.com/cnlh/nps/web /web
VOLUME /conf
CMD ["/nps"]

View File

@ -26,6 +26,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
* [安装](#安装) * [安装](#安装)
* [编译安装](#源码安装) * [编译安装](#源码安装)
* [release安装](#release安装) * [release安装](#release安装)
* [docker安装](#docker安装)
* [使用示例以web主控模式为主](#使用示例) * [使用示例以web主控模式为主](#使用示例)
* [统一准备工作](#统一准备工作(必做)) * [统一准备工作](#统一准备工作(必做))
* [http|https域名解析](#域名解析) * [http|https域名解析](#域名解析)
@ -111,6 +112,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
* [获取用户真实ip](#获取用户真实ip) * [获取用户真实ip](#获取用户真实ip)
* [客户端地址显示](#客户端地址显示) * [客户端地址显示](#客户端地址显示)
* [客户端与服务端版本对比](#客户端与服务端版本对比) * [客户端与服务端版本对比](#客户端与服务端版本对比)
* [Linux系统限制](#Linux系统限制)
* [webAPI](#webAPI) * [webAPI](#webAPI)
* [贡献](#贡献) * [贡献](#贡献)
* [支持nps发展](#捐赠) * [支持nps发展](#捐赠)
@ -120,7 +122,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
## 安装 ## 安装
### releases安装 ### release安装
> [releases](https://github.com/cnlh/nps/releases) > [releases](https://github.com/cnlh/nps/releases)
下载对应的系统版本即可,服务端和客户端是单独的 下载对应的系统版本即可,服务端和客户端是单独的
@ -133,6 +135,10 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
> go build cmd/npc/npc.go > go build cmd/npc/npc.go
### docker安装
> [server](https://hub.docker.com/r/ffdfgdfg/nps)
> [client](https://hub.docker.com/r/ffdfgdfg/npc)
## 使用示例 ## 使用示例
### 统一准备工作(必做) ### 统一准备工作(必做)
@ -144,6 +150,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
```shell ```shell
./npc -server=1.1.1.1:8284 -vkey=客户端的密钥 ./npc -server=1.1.1.1:8284 -vkey=客户端的密钥
``` ```
**注意:运行服务端后,请确保能从客户端设备上正常访问配置文件中所配置的`bridge_port`端口telnetnetcat这类的来检查**
### 域名解析 ### 域名解析
@ -197,6 +204,9 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务
- 在刚才创建的客户端隧道管理中添加一条socks5代理填写监听的端口8003保存。 - 在刚才创建的客户端隧道管理中添加一条socks5代理填写监听的端口8003保存。
- 在外网环境的本机配置socks5代理(例如使用proxifier进行全局代理)ip为公网服务器ip1.1.1.1),端口为填写的监听端口(8003),即可畅享内网了 - 在外网环境的本机配置socks5代理(例如使用proxifier进行全局代理)ip为公网服务器ip1.1.1.1),端口为填写的监听端口(8003),即可畅享内网了
**注意**
经过socks5代理当收到socks5数据包时socket已经是accept状态。表现是扫描端口全open建立连接后短时间关闭。若想同内网表现一致建议远程连接一台设备。
### http正向代理 ### http正向代理
**适用范围:** 在外网环境下使用http正向代理访问内网站点 **适用范围:** 在外网环境下使用http正向代理访问内网站点
@ -375,7 +385,13 @@ server {
``` ```
(./nps|nps.exe) install (./nps|nps.exe) install
``` ```
安装成功后对于linuxdarwin将会把配置文件和静态文件放置于/etc/nps/并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps安装成功后可在任何位置执行 安装成功后对于linuxdarwin将会把配置文件和静态文件放置于/etc/nps/并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps安装成功后可在任何位置执行同时也会添加systemd配置。
```
sudo systemctl enable|disable|start|stop|restart|status nps
```
systemd带有开机自启自动重启配置当进程结束后15秒会启动日志输出至/var/log/nps/nps.log。
建议采用此方式启动能够捕获panic信息便于排查问题。
``` ```
nps test|start|stop|restart|status nps test|start|stop|restart|status
@ -432,6 +448,27 @@ server_ip=xxx
``` ```
./npc -config=npc配置文件路径 ./npc -config=npc配置文件路径
``` ```
可自行添加systemd service例如`npc.service`
```
[Unit]
Description=npc - convenient proxy server client
Documentation=https://github.com/cnlh/nps/
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target
[Service]
Type=simple
KillMode=process
Restart=always
RestartSec=15s
StandardOutput=append:/var/log/nps/npc.log
ExecStartPre=/bin/echo 'Starting npc'
ExecStopPost=/bin/echo 'Stopping npc'
ExecStart=/absolutely path to/npc -server=ip:port -vkey=web界面中显示的密钥
[Install]
WantedBy=multi-user.target
```
#### 配置文件说明 #### 配置文件说明
[示例配置文件](https://github.com/cnlh/nps/tree/master/conf/npc.conf) [示例配置文件](https://github.com/cnlh/nps/tree/master/conf/npc.conf)
##### 全局配置 ##### 全局配置
@ -538,11 +575,13 @@ vkey=123
[socks5] [socks5]
mode=socks5 mode=socks5
server_port=9004 server_port=9004
multi_account=multi_account.conf
``` ```
项 | 含义 项 | 含义
---|--- ---|---
mode | socks5 mode | socks5
server_port | 在服务端的代理端口 server_port | 在服务端的代理端口
multi_account | socks5多账号配置文件可选),配置后使用basic_username和basic_password无法通过认证
##### 私密代理模式 ##### 私密代理模式
```ini ```ini
@ -792,6 +831,19 @@ nps支持对客户端的隧道数量进行限制该功能默认是关闭的
nps主要通信默认基于多路复用无需开启。 nps主要通信默认基于多路复用无需开启。
多路复用基于TCP滑动窗口原理设计动态计算延迟以及带宽来算出应该往网络管道中打入的流量。
由于主要通信大多采用TCP协议并无法探测其实时丢包情况对于产生丢包重传的情况采用较大的宽容度
5分钟的等待时间超时将会关闭当前隧道连接并重新建立这将会抛弃当前所有的连接。
在Linux上可以通过调节内核参数来适应不同应用场景。
对于需求大带宽又有一定的丢包的场景,可以保持默认参数不变,尽可能少抛弃连接
高并发下可根据[Linux系统限制](#Linux系统限制) 调整
对于延迟敏感而又有一定丢包的场景可以适当调整TCP重传次数
`tcp_syn_retries`, `tcp_retries1`, `tcp_retries2`
高并发同上
nps会在系统主动关闭连接的时候拿到报错进而重新建立隧道连接
### 环境变量渲染 ### 环境变量渲染
npc支持环境变量渲染以适应在某些特殊场景下的要求。 npc支持环境变量渲染以适应在某些特殊场景下的要求。
@ -900,6 +952,11 @@ LevelInformational->6 LevelDebug->7
### 客户端与服务端版本对比 ### 客户端与服务端版本对比
为了程序正常运行,客户端与服务端的核心版本必须一致,否则将导致客户端无法成功连接致服务端。 为了程序正常运行,客户端与服务端的核心版本必须一致,否则将导致客户端无法成功连接致服务端。
### Linux系统限制
默认情况下linux对连接数量有限制对于性能好的机器完全可以调整内核参数以处理更多的连接。
`tcp_max_syn_backlog` `somaxconn`
酌情调整参数,增强网络性能
## webAPI ## webAPI
### webAPI验证说明 ### webAPI验证说明

View File

@ -312,7 +312,7 @@ func (s *Bridge) GetConnByClientId(clientId int) (target net.Conn, err error) {
func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, t *file.Tunnel) (target net.Conn, err error) { func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, t *file.Tunnel) (target net.Conn, err error) {
//if the proxy type is local //if the proxy type is local
if link.LocalProxy { if link.LocalProxy {
target, err = net.Dial(link.ConnType, link.Host) target, err = net.Dial("tcp", link.Host)
return return
} }
if v, ok := s.Client.Load(clientId); ok { if v, ok := s.Client.Load(clientId); ok {
@ -504,6 +504,7 @@ loop:
tl.Password = t.Password tl.Password = t.Password
tl.LocalPath = t.LocalPath tl.LocalPath = t.LocalPath
tl.StripPre = t.StripPre tl.StripPre = t.StripPre
tl.MultiAccount = t.MultiAccount
if !client.HasTunnel(tl) { if !client.HasTunnel(tl) {
if err := file.GetDb().NewTask(tl); err != nil { if err := file.GetDb().NewTask(tl); err != nil {
logs.Notice("Add task error ", err.Error()) logs.Notice("Add task error ", err.Error())

View File

@ -49,6 +49,11 @@ retry:
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
goto retry goto retry
} }
if c == nil {
logs.Error("Error data from server, and will be reconnected in five seconds")
time.Sleep(time.Second * 5)
goto retry
}
logs.Info("Successful connection with server %s", s.svrAddr) logs.Info("Successful connection with server %s", s.svrAddr)
//monitor the connection //monitor the connection
go s.ping() go s.ping()

View File

@ -223,8 +223,13 @@ func NewConn(tp string, vkey string, server string, connType string, proxyUrl st
if _, err := c.Write([]byte(crypt.Md5(version.GetVersion()))); err != nil { if _, err := c.Write([]byte(crypt.Md5(version.GetVersion()))); err != nil {
return nil, err return nil, err
} }
if b, err := c.GetShortContent(32); err != nil || crypt.Md5(version.GetVersion()) != string(b) { b, err := c.GetShortContent(32)
logs.Error("The client does not match the server version. The current version of the client is", version.GetVersion()) if err != nil {
logs.Error(err)
return nil, err
}
if crypt.Md5(version.GetVersion()) != string(b) {
logs.Error("The client does not match the server version. The current core version of the client is", version.GetVersion())
return nil, err return nil, err
} }
if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil { if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil {

View File

@ -71,7 +71,11 @@ func check(t *file.Health) {
var rs *http.Response var rs *http.Response
for _, v := range arr { for _, v := range arr {
if t.HealthCheckType == "tcp" { if t.HealthCheckType == "tcp" {
_, err = net.DialTimeout("tcp", v, time.Duration(t.HealthCheckTimeout)*time.Second) var c net.Conn
c, err = net.DialTimeout("tcp", v, time.Duration(t.HealthCheckTimeout)*time.Second)
if err == nil {
c.Close()
}
} else { } else {
client := &http.Client{} client := &http.Client{}
client.Timeout = time.Duration(t.HealthCheckTimeout) * time.Second client.Timeout = time.Duration(t.HealthCheckTimeout) * time.Second

View File

@ -61,7 +61,7 @@ func main() {
logs.Error("Getting bridge_port error", err) logs.Error("Getting bridge_port error", err)
os.Exit(0) os.Exit(0)
} }
logs.Info("the version of server is %s ,allow client version to be %s", version.VERSION, version.GetVersion()) logs.Info("the version of server is %s ,allow client core version to be %s", version.VERSION, version.GetVersion())
connection.InitConnectionService() connection.InitConnectionService()
crypt.InitTls(filepath.Join(common.GetRunPath(), "conf", "server.pem"), filepath.Join(common.GetRunPath(), "conf", "server.key")) crypt.InitTls(filepath.Join(common.GetRunPath(), "conf", "server.pem"), filepath.Join(common.GetRunPath(), "conf", "server.key"))
tool.InitAllowPort() tool.InitAllowPort()

2
conf/multi_account.conf Normal file
View File

@ -0,0 +1,2 @@
# key -> user | value -> pwd
npc=npc.pwd

View File

@ -40,6 +40,7 @@ server_port=10000
[socks5] [socks5]
mode=socks5 mode=socks5
server_port=19009 server_port=19009
multi_account=multi_account.conf
[file] [file]
mode=file mode=file

1
go.mod
View File

@ -12,6 +12,7 @@ require (
github.com/klauspost/cpuid v1.2.1 // indirect github.com/klauspost/cpuid v1.2.1 // indirect
github.com/klauspost/reedsolomon v1.9.2 // indirect github.com/klauspost/reedsolomon v1.9.2 // indirect
github.com/onsi/gomega v1.5.0 // indirect github.com/onsi/gomega v1.5.0 // indirect
github.com/panjf2000/ants/v2 v2.2.2
github.com/pkg/errors v0.8.0 github.com/pkg/errors v0.8.0
github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect
github.com/shirou/gopsutil v2.18.12+incompatible github.com/shirou/gopsutil v2.18.12+incompatible

2
go.sum
View File

@ -44,6 +44,8 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/panjf2000/ants/v2 v2.2.2 h1:TWzusBjq/IflXhy+/S6u5wmMLCBdJnB9tPIx9Zmhvok=
github.com/panjf2000/ants/v2 v2.2.2/go.mod h1:1GFm8bV8nyCQvU5K4WvBCTG1/YBFOD2VzjffD8fV55A=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

View File

@ -42,9 +42,13 @@ const (
MUX_NEW_CONN_OK MUX_NEW_CONN_OK
MUX_NEW_CONN_Fail MUX_NEW_CONN_Fail
MUX_NEW_MSG MUX_NEW_MSG
MUX_NEW_MSG_PART
MUX_MSG_SEND_OK MUX_MSG_SEND_OK
MUX_NEW_CONN MUX_NEW_CONN
MUX_CONN_CLOSE MUX_CONN_CLOSE
MUX_PING_RETURN MUX_PING_RETURN
MUX_PING int32 = -1 MUX_PING int32 = -1
MAXIMUM_SEGMENT_SIZE = PoolSizeWindow
MAXIMUM_WINDOW_SIZE = 1 << 25 // 1<<31-1 TCP slide window size is very large,
// we use 32M, reduce memory usage
) )

View File

@ -15,7 +15,7 @@ type NetPackager interface {
} }
type BasePackager struct { type BasePackager struct {
Length uint32 Length uint16
Content []byte Content []byte
} }
@ -65,8 +65,9 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) {
//Unpack 会导致传入的数字类型转化成float64 //Unpack 会导致传入的数字类型转化成float64
//主要原因是json unmarshal并未传入正确的数据类型 //主要原因是json unmarshal并未传入正确的数据类型
func (Self *BasePackager) UnPack(reader io.Reader) (err error) { func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) {
Self.clean() Self.clean()
n += 2 // uint16
err = binary.Read(reader, binary.LittleEndian, &Self.Length) err = binary.Read(reader, binary.LittleEndian, &Self.Length)
if err != nil { if err != nil {
return return
@ -80,6 +81,7 @@ func (Self *BasePackager) UnPack(reader io.Reader) (err error) {
// err = io.ErrUnexpectedEOF // err = io.ErrUnexpectedEOF
//} //}
err = binary.Read(reader, binary.LittleEndian, Self.Content) err = binary.Read(reader, binary.LittleEndian, Self.Content)
n += Self.Length
return return
} }
@ -101,7 +103,7 @@ func (Self *BasePackager) Unmarshal(content interface{}) (err error) {
} }
func (Self *BasePackager) setLength() { func (Self *BasePackager) setLength() {
Self.Length = uint32(len(Self.Content)) Self.Length = uint16(len(Self.Content))
return return
} }
@ -137,35 +139,45 @@ func (Self *ConnPackager) Pack(writer io.Writer) (err error) {
return return
} }
func (Self *ConnPackager) UnPack(reader io.Reader) (err error) { func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) {
err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) err = binary.Read(reader, binary.LittleEndian, &Self.ConnType)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return return
} }
err = Self.BasePackager.UnPack(reader) n, err = Self.BasePackager.UnPack(reader)
n += 2
return return
} }
type MuxPackager struct { type MuxPackager struct {
Flag uint8 Flag uint8
Id int32 Id int32
Window uint16 Window uint32
ReadLength uint32
BasePackager BasePackager
} }
func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) { func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) {
Self.Flag = flag Self.Flag = flag
Self.Id = id Self.Id = id
if flag == MUX_NEW_MSG { switch flag {
case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART:
Self.Content = WindowBuff.Get()
err = Self.BasePackager.NewPac(content...) err = Self.BasePackager.NewPac(content...)
} //logs.Warn(Self.Length, string(Self.Content))
if flag == MUX_MSG_SEND_OK { case MUX_MSG_SEND_OK:
// MUX_MSG_SEND_OK only allows one data // MUX_MSG_SEND_OK contains two data
switch content[0].(type) { switch content[0].(type) {
case int: case int:
Self.Window = uint16(content[0].(int)) Self.Window = uint32(content[0].(int))
case uint16: case uint32:
Self.Window = content[0].(uint16) Self.Window = content[0].(uint32)
}
switch content[1].(type) {
case int:
Self.ReadLength = uint32(content[1].(int))
case uint32:
Self.ReadLength = content[1].(uint32)
} }
} }
return return
@ -180,17 +192,21 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
if err != nil { if err != nil {
return return
} }
if Self.Flag == MUX_NEW_MSG { switch Self.Flag {
case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
err = Self.BasePackager.Pack(writer) err = Self.BasePackager.Pack(writer)
} WindowBuff.Put(Self.Content)
if Self.Flag == MUX_MSG_SEND_OK { case MUX_MSG_SEND_OK:
err = binary.Write(writer, binary.LittleEndian, Self.Window) err = binary.Write(writer, binary.LittleEndian, Self.Window)
if err != nil {
return
}
err = binary.Write(writer, binary.LittleEndian, Self.ReadLength)
} }
return return
} }
func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) {
Self.BasePackager.clean() // also clean the content
err = binary.Read(reader, binary.LittleEndian, &Self.Flag) err = binary.Read(reader, binary.LittleEndian, &Self.Flag)
if err != nil { if err != nil {
return return
@ -199,11 +215,21 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
if err != nil { if err != nil {
return return
} }
if Self.Flag == MUX_NEW_MSG { switch Self.Flag {
err = Self.BasePackager.UnPack(reader) case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
} Self.Content = WindowBuff.Get() // need get a window buf from pool
if Self.Flag == MUX_MSG_SEND_OK { Self.BasePackager.clean() // also clean the content
n, err = Self.BasePackager.UnPack(reader)
//logs.Warn("unpack", Self.Length, string(Self.Content))
case MUX_MSG_SEND_OK:
err = binary.Read(reader, binary.LittleEndian, &Self.Window) err = binary.Read(reader, binary.LittleEndian, &Self.Window)
if err != nil {
return
} }
n += 4 // uint32
err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength)
n += 4 // uint32
}
n += 5 //uint8 int32
return return
} }

View File

@ -9,7 +9,8 @@ const PoolSize = 64 * 1024
const PoolSizeSmall = 100 const PoolSizeSmall = 100
const PoolSizeUdp = 1472 const PoolSizeUdp = 1472
const PoolSizeCopy = 32 << 10 const PoolSizeCopy = 32 << 10
const PoolSizeWindow = 1<<16 - 1 const PoolSizeBuffer = 4096
const PoolSizeWindow = PoolSizeBuffer - 2 - 4 - 4 - 1
var BufPool = sync.Pool{ var BufPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
@ -92,22 +93,18 @@ type windowBufferPool struct {
func (Self *windowBufferPool) New() { func (Self *windowBufferPool) New() {
Self.pool = sync.Pool{ Self.pool = sync.Pool{
New: func() interface{} { New: func() interface{} {
return make([]byte, 0, PoolSizeWindow) return make([]byte, PoolSizeWindow, PoolSizeWindow)
}, },
} }
} }
func (Self *windowBufferPool) Get() (buf []byte) { func (Self *windowBufferPool) Get() (buf []byte) {
buf = Self.pool.Get().([]byte) buf = Self.pool.Get().([]byte)
return buf[:0] return buf[:PoolSizeWindow]
} }
func (Self *windowBufferPool) Put(x []byte) { func (Self *windowBufferPool) Put(x []byte) {
if cap(x) == PoolSizeWindow {
Self.pool.Put(x[:PoolSizeWindow]) // make buf to full Self.pool.Put(x[:PoolSizeWindow]) // make buf to full
} else {
x = nil
}
} }
type bufferPool struct { type bufferPool struct {
@ -117,7 +114,7 @@ type bufferPool struct {
func (Self *bufferPool) New() { func (Self *bufferPool) New() {
Self.pool = sync.Pool{ Self.pool = sync.Pool{
New: func() interface{} { New: func() interface{} {
return new(bytes.Buffer) return bytes.NewBuffer(make([]byte, 0, PoolSizeBuffer))
}, },
} }
} }
@ -145,28 +142,56 @@ func (Self *muxPackagerPool) New() {
} }
func (Self *muxPackagerPool) Get() *MuxPackager { func (Self *muxPackagerPool) Get() *MuxPackager {
pack := Self.pool.Get().(*MuxPackager) return Self.pool.Get().(*MuxPackager)
buf := CopyBuff.Get()
pack.Content = buf
return pack
} }
func (Self *muxPackagerPool) Put(pack *MuxPackager) { func (Self *muxPackagerPool) Put(pack *MuxPackager) {
CopyBuff.Put(pack.Content)
Self.pool.Put(pack) Self.pool.Put(pack)
} }
type ListElement struct {
Buf []byte
L uint16
Part bool
}
type listElementPool struct {
pool sync.Pool
}
func (Self *listElementPool) New() {
Self.pool = sync.Pool{
New: func() interface{} {
element := ListElement{}
return &element
},
}
}
func (Self *listElementPool) Get() *ListElement {
return Self.pool.Get().(*ListElement)
}
func (Self *listElementPool) Put(element *ListElement) {
element.L = 0
element.Buf = nil
element.Part = false
Self.pool.Put(element)
}
var once = sync.Once{} var once = sync.Once{}
var BuffPool = bufferPool{} var BuffPool = bufferPool{}
var CopyBuff = copyBufferPool{} var CopyBuff = copyBufferPool{}
var MuxPack = muxPackagerPool{} var MuxPack = muxPackagerPool{}
var WindowBuff = windowBufferPool{} var WindowBuff = windowBufferPool{}
var ListElementPool = listElementPool{}
func newPool() { func newPool() {
BuffPool.New() BuffPool.New()
CopyBuff.New() CopyBuff.New()
MuxPack.New() MuxPack.New()
WindowBuff.New() WindowBuff.New()
ListElementPool.New()
} }
func init() { func init() {

View File

@ -108,6 +108,9 @@ func ChangeHostAndHeader(r *http.Request, host string, header string, addr strin
} }
} }
addr = strings.Split(addr, ":")[0] addr = strings.Split(addr, ":")[0]
if prior, ok := r.Header["X-Forwarded-For"]; ok {
addr = strings.Join(prior, ", ") + ", " + addr
}
r.Header.Set("X-Forwarded-For", addr) r.Header.Set("X-Forwarded-For", addr)
r.Header.Set("X-Real-IP", addr) r.Header.Set("X-Real-IP", addr)
} }
@ -263,11 +266,14 @@ func GetPortByAddr(addr string) int {
return p return p
} }
func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { func CopyBuffer(dst io.Writer, src io.Reader, label ...string) (written int64, err error) {
buf := CopyBuff.Get() buf := CopyBuff.Get()
defer CopyBuff.Put(buf) defer CopyBuff.Put(buf)
for { for {
nr, er := src.Read(buf) nr, er := src.Read(buf)
//if len(pr)>0 && pr[0] && nr > 50 {
// logs.Warn(string(buf[:50]))
//}
if nr > 0 { if nr > 0 {
nw, ew := dst.Write(buf[0:nr]) nw, ew := dst.Write(buf[0:nr])
if nw > 0 { if nw > 0 {

View File

@ -239,12 +239,37 @@ func dealTunnel(s string) *file.Tunnel {
t.LocalPath = item[1] t.LocalPath = item[1]
case "strip_pre": case "strip_pre":
t.StripPre = item[1] t.StripPre = item[1]
case "multi_account":
t.MultiAccount = &file.MultiAccount{}
if b, err := common.ReadAllFromFile(item[1]); err != nil {
panic(err)
} else {
if content, err := common.ParseStr(string(b)); err != nil {
panic(err)
} else {
t.MultiAccount.AccountMap = dealMultiUser(content)
}
}
} }
} }
return t return t
} }
func dealMultiUser(s string) map[string]string {
multiUserMap := make(map[string]string)
for _, v := range splitStr(s) {
item := strings.Split(v, "=")
if len(item) == 0 {
continue
} else if len(item) == 1 {
item = append(item, "")
}
multiUserMap[strings.TrimSpace(item[0])] = item[1]
}
return multiUserMap
}
func delLocalService(s string) *LocalServer { func delLocalService(s string) *LocalServer {
l := new(LocalServer) l := new(LocalServer)
for _, v := range splitStr(s) { for _, v := range splitStr(s) {

View File

@ -6,13 +6,14 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/astaxie/beego/logs"
"github.com/cnlh/nps/lib/goroutine"
"io" "io"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/common"
@ -350,25 +351,29 @@ func SetUdpSession(sess *kcp.UDPSession) {
//conn1 mux conn //conn1 mux conn
func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) { func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) {
var in, out int64 //var in, out int64
var wg sync.WaitGroup //var wg sync.WaitGroup
connHandle := GetConn(conn1, crypt, snappy, rate, isServer) connHandle := GetConn(conn1, crypt, snappy, rate, isServer)
if rb != nil { if rb != nil {
connHandle.Write(rb) connHandle.Write(rb)
} }
go func(in *int64) { //go func(in *int64) {
wg.Add(1) // wg.Add(1)
*in, _ = common.CopyBuffer(connHandle, conn2) // *in, _ = common.CopyBuffer(connHandle, conn2)
connHandle.Close() // connHandle.Close()
conn2.Close() // conn2.Close()
wg.Done() // wg.Done()
}(&in) //}(&in)
out, _ = common.CopyBuffer(conn2, connHandle) //out, _ = common.CopyBuffer(conn2, connHandle)
connHandle.Close() //connHandle.Close()
conn2.Close() //conn2.Close()
wg.Wait() //wg.Wait()
if flow != nil { //if flow != nil {
flow.Add(in, out) // flow.Add(in, out)
//}
err := goroutine.CopyConnsPool.Invoke(goroutine.NewConns(connHandle, conn2, flow))
if err != nil {
logs.Error(err)
} }
} }

View File

@ -43,9 +43,16 @@ func Accept(l net.Listener, f func(c net.Conn)) {
if strings.Contains(err.Error(), "use of closed network connection") { if strings.Contains(err.Error(), "use of closed network connection") {
break break
} }
if strings.Contains(err.Error(), "the mux has closed") {
break
}
logs.Warn(err) logs.Warn(err)
continue continue
} }
if c == nil {
logs.Warn("nil connection")
break
}
go f(c) go f(c)
} }
} }

View File

@ -140,6 +140,7 @@ type Tunnel struct {
LocalPath string LocalPath string
StripPre string StripPre string
Target *Target Target *Target
MultiAccount *MultiAccount
Health Health
sync.RWMutex sync.RWMutex
} }
@ -184,6 +185,10 @@ type Target struct {
sync.RWMutex sync.RWMutex
} }
type MultiAccount struct {
AccountMap map[string]string // multi account and pwd
}
func (s *Target) GetRandomTarget() (string, error) { func (s *Target) GetRandomTarget() (string, error) {
if s.TargetArr == nil { if s.TargetArr == nil {
s.TargetArr = strings.Split(s.TargetStr, "\n") s.TargetArr = strings.Split(s.TargetStr, "\n")

73
lib/goroutine/pool.go Normal file
View File

@ -0,0 +1,73 @@
package goroutine
import (
"github.com/cnlh/nps/lib/common"
"github.com/cnlh/nps/lib/file"
"github.com/panjf2000/ants/v2"
"io"
"net"
"sync"
)
type connGroup struct {
src io.ReadWriteCloser
dst io.ReadWriteCloser
wg *sync.WaitGroup
n *int64
}
func newConnGroup(dst, src io.ReadWriteCloser, wg *sync.WaitGroup, n *int64) connGroup {
return connGroup{
src: src,
dst: dst,
wg: wg,
n: n,
}
}
func copyConnGroup(group interface{}) {
cg, ok := group.(connGroup)
if !ok {
return
}
var err error
*cg.n, err = common.CopyBuffer(cg.dst, cg.src)
if err != nil {
cg.src.Close()
cg.dst.Close()
//logs.Warn("close npc by copy from nps", err, c.connId)
}
cg.wg.Done()
}
type Conns struct {
conn1 io.ReadWriteCloser // mux connection
conn2 net.Conn // outside connection
flow *file.Flow
}
func NewConns(c1 io.ReadWriteCloser, c2 net.Conn, flow *file.Flow) Conns {
return Conns{
conn1: c1,
conn2: c2,
flow: flow,
}
}
func copyConns(group interface{}) {
conns := group.(Conns)
wg := new(sync.WaitGroup)
wg.Add(2)
var in, out int64
_ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg, &in))
// outside to mux : incoming
_ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg, &out))
// mux to outside : outgoing
wg.Wait()
if conns.flow != nil {
conns.flow.Add(in, out)
}
}
var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false))
var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false))

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
@ -13,6 +14,23 @@ import (
) )
func InstallNps() { func InstallNps() {
unit := `[Unit]
Description=nps - convenient proxy server
Documentation=https://github.com/cnlh/nps/
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target`
service := `[Service]
Type=simple
KillMode=process
Restart=always
RestartSec=15s
StandardOutput=append:/var/log/nps/nps.log
ExecStartPre=/bin/echo 'Starting nps'
ExecStopPost=/bin/echo 'Stopping nps'
ExecStart=`
install := `[Install]
WantedBy=multi-user.target`
path := common.GetInstallPath() path := common.GetInstallPath()
if common.FileExists(path) { if common.FileExists(path) {
log.Fatalf("the path %s has exist, does not support install", path) log.Fatalf("the path %s has exist, does not support install", path)
@ -35,21 +53,35 @@ func InstallNps() {
log.Fatalln(err) log.Fatalln(err)
} else { } else {
os.Chmod("/usr/local/bin/nps", 0755) os.Chmod("/usr/local/bin/nps", 0755)
service += "/usr/local/bin/nps"
log.Println("Executable files have been copied to", "/usr/local/bin/nps") log.Println("Executable files have been copied to", "/usr/local/bin/nps")
} }
} else { } else {
os.Chmod("/usr/bin/nps", 0755) os.Chmod("/usr/bin/nps", 0755)
service += "/usr/bin/nps"
log.Println("Executable files have been copied to", "/usr/bin/nps") log.Println("Executable files have been copied to", "/usr/bin/nps")
} }
systemd := unit + "\n\n" + service + "\n\n" + install
_ = os.Remove("/usr/lib/systemd/system/nps.service")
err := ioutil.WriteFile("/usr/lib/systemd/system/nps.service", []byte(systemd), 0644)
if err != nil {
log.Println("Write systemd service err ", err)
}
_ = os.Mkdir("/var/log/nps", 644)
} }
log.Println("install ok!") log.Println("install ok!")
log.Println("Static files and configuration files in the current directory will be useless") log.Println("Static files and configuration files in the current directory will be useless")
log.Println("The new configuration file is located in", path, "you can edit them") log.Println("The new configuration file is located in", path, "you can edit them")
if !common.IsWindows() { if !common.IsWindows() {
log.Println("You can start with nps test|start|stop|restart|status anywhere") log.Println(`You can start with:
sudo systemctl enable|disable|start|stop|restart|status nps
or:
nps test|start|stop|restart|status
anywhere!`)
} else { } else {
log.Println("You can copy executable files to any directory and start working with nps.exe test|start|stop|restart|status") log.Println(`You can copy executable files to any directory and start working with:
nps.exe test|start|stop|restart|status
now!`)
} }
} }
func MkidrDirAll(path string, v ...string) { func MkidrDirAll(path string, v ...string) {

View File

@ -3,8 +3,11 @@ package mux
import ( import (
"errors" "errors"
"io" "io"
"math"
"net" "net"
"runtime"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/common"
@ -15,34 +18,36 @@ type conn struct {
getStatusCh chan struct{} getStatusCh chan struct{}
connStatusOkCh chan struct{} connStatusOkCh chan struct{}
connStatusFailCh chan struct{} connStatusFailCh chan struct{}
readTimeOut time.Time
writeTimeOut time.Time
connId int32 connId int32
isClose bool isClose bool
closeFlag bool // close conn flag closeFlag bool // close conn flag
receiveWindow *window receiveWindow *ReceiveWindow
sendWindow *window sendWindow *SendWindow
readCh waitingCh
writeCh waitingCh
mux *Mux
once sync.Once once sync.Once
//label string
} }
func NewConn(connId int32, mux *Mux) *conn { func NewConn(connId int32, mux *Mux, label ...string) *conn {
c := &conn{ c := &conn{
getStatusCh: make(chan struct{}), getStatusCh: make(chan struct{}),
connStatusOkCh: make(chan struct{}), connStatusOkCh: make(chan struct{}),
connStatusFailCh: make(chan struct{}), connStatusFailCh: make(chan struct{}),
connId: connId, connId: connId,
receiveWindow: new(window), receiveWindow: new(ReceiveWindow),
sendWindow: new(window), sendWindow: new(SendWindow),
mux: mux,
once: sync.Once{}, once: sync.Once{},
} }
c.receiveWindow.NewReceive() //if len(label) > 0 {
c.sendWindow.NewSend() // c.label = label[0]
c.readCh.new() //}
c.writeCh.new() c.receiveWindow.New(mux)
c.sendWindow.New(mux)
//logm := &connLog{
// startTime: time.Now(),
// isClose: false,
// logs: []string{c.label + "new conn success"},
//}
//setM(label[0], int(connId), logm)
return c return c
} }
@ -50,39 +55,28 @@ func (s *conn) Read(buf []byte) (n int, err error) {
if s.isClose || buf == nil { if s.isClose || buf == nil {
return 0, errors.New("the conn has closed") return 0, errors.New("the conn has closed")
} }
if len(buf) == 0 {
return 0, nil
}
// waiting for takeout from receive window finish or timeout // waiting for takeout from receive window finish or timeout
go s.readWindow(buf, s.readCh.nCh, s.readCh.errCh) //now := time.Now()
if t := s.readTimeOut.Sub(time.Now()); t > 0 { n, err = s.receiveWindow.Read(buf, s.connId)
timer := time.NewTimer(t) //t := time.Now().Sub(now)
defer timer.Stop() //if t.Seconds() > 0.5 {
select { //logs.Warn("conn read long", n, t.Seconds())
case <-timer.C: //}
return 0, errors.New("read timeout") //var errstr string
case n = <-s.readCh.nCh: //if err == nil {
err = <-s.readCh.errCh // errstr = "err:nil"
} //} else {
} else { // errstr = err.Error()
n = <-s.readCh.nCh //}
err = <-s.readCh.errCh //d := getM(s.label, int(s.connId))
} //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100]))
//setM(s.label, int(s.connId), d)
return return
} }
func (s *conn) readWindow(buf []byte, nCh chan int, errCh chan error) {
n, err := s.receiveWindow.Read(buf)
if s.receiveWindow.WindowFull {
if s.receiveWindow.Size() > 0 {
// window.Read may be invoked before window.Write, and WindowFull flag change to true
// so make sure that receiveWindow is free some space
s.receiveWindow.WindowFull = false
s.mux.sendInfo(common.MUX_MSG_SEND_OK, s.connId, s.receiveWindow.Size())
// acknowledge other side, have empty some receive window space
}
}
nCh <- n
errCh <- err
}
func (s *conn) Write(buf []byte) (n int, err error) { func (s *conn) Write(buf []byte) (n int, err error) {
if s.isClose { if s.isClose {
return 0, errors.New("the conn has closed") return 0, errors.New("the conn has closed")
@ -91,45 +85,18 @@ func (s *conn) Write(buf []byte) (n int, err error) {
//s.Close() //s.Close()
return 0, errors.New("io: write on closed conn") return 0, errors.New("io: write on closed conn")
} }
s.sendWindow.SetSendBuf(buf) // set the buf to send window if len(buf) == 0 {
go s.write(s.writeCh.nCh, s.writeCh.errCh) return 0, nil
// waiting for send to other side or timeout
if t := s.writeTimeOut.Sub(time.Now()); t > 0 {
timer := time.NewTimer(t)
defer timer.Stop()
select {
case <-timer.C:
return 0, errors.New("write timeout")
case n = <-s.writeCh.nCh:
err = <-s.writeCh.errCh
}
} else {
n = <-s.writeCh.nCh
err = <-s.writeCh.errCh
} }
//logs.Warn("write buf", len(buf))
//now := time.Now()
n, err = s.sendWindow.WriteFull(buf, s.connId)
//t := time.Now().Sub(now)
//if t.Seconds() > 0.5 {
// logs.Warn("conn write long", n, t.Seconds())
//}
return return
} }
func (s *conn) write(nCh chan int, errCh chan error) {
var n int
var err error
for {
buf, err := s.sendWindow.WriteTo()
// get the usable window size buf from send window
if buf == nil && err == io.EOF {
// send window is drain, break the loop
err = nil
break
}
if err != nil {
break
}
n += len(buf)
s.mux.sendInfo(common.MUX_NEW_MSG, s.connId, buf)
// send to other side, not send nil data to other side
}
nCh <- n
errCh <- err
}
func (s *conn) Close() (err error) { func (s *conn) Close() (err error) {
s.once.Do(s.closeProcess) s.once.Do(s.closeProcess)
@ -138,288 +105,531 @@ func (s *conn) Close() (err error) {
func (s *conn) closeProcess() { func (s *conn) closeProcess() {
s.isClose = true s.isClose = true
s.mux.connMap.Delete(s.connId) s.receiveWindow.mux.connMap.Delete(s.connId)
if !s.mux.IsClose { if !s.receiveWindow.mux.IsClose {
// if server or user close the conn while reading, will get a io.EOF // if server or user close the conn while reading, will get a io.EOF
// and this Close method will be invoke, send this signal to close other side // and this Close method will be invoke, send this signal to close other side
s.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil)
} }
s.sendWindow.CloseWindow() s.sendWindow.CloseWindow()
s.receiveWindow.CloseWindow() s.receiveWindow.CloseWindow()
//d := getM(s.label, int(s.connId))
//d.isClose = true
//d.logs = append(d.logs, s.label+"close "+time.Now().String())
//setM(s.label, int(s.connId), d)
return return
} }
func (s *conn) LocalAddr() net.Addr { func (s *conn) LocalAddr() net.Addr {
return s.mux.conn.LocalAddr() return s.receiveWindow.mux.conn.LocalAddr()
} }
func (s *conn) RemoteAddr() net.Addr { func (s *conn) RemoteAddr() net.Addr {
return s.mux.conn.RemoteAddr() return s.receiveWindow.mux.conn.RemoteAddr()
} }
func (s *conn) SetDeadline(t time.Time) error { func (s *conn) SetDeadline(t time.Time) error {
s.readTimeOut = t _ = s.SetReadDeadline(t)
s.writeTimeOut = t _ = s.SetWriteDeadline(t)
return nil return nil
} }
func (s *conn) SetReadDeadline(t time.Time) error { func (s *conn) SetReadDeadline(t time.Time) error {
s.readTimeOut = t s.receiveWindow.SetTimeOut(t)
return nil return nil
} }
func (s *conn) SetWriteDeadline(t time.Time) error { func (s *conn) SetWriteDeadline(t time.Time) error {
s.writeTimeOut = t s.sendWindow.SetTimeOut(t)
return nil return nil
} }
type window struct { type window struct {
windowBuff []byte remainingWait uint64 // 64bit alignment
off uint16 off uint32
readOp chan struct{} maxSize uint32
readWait bool
WindowFull bool
usableReceiveWindow chan uint16
WriteWg sync.WaitGroup
closeOp bool closeOp bool
closeOpCh chan struct{} closeOpCh chan struct{}
WriteEndOp chan struct{} mux *Mux
mutex sync.Mutex
} }
func (Self *window) NewReceive() { func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) {
const mask = 1<<dequeueBits - 1
remaining = uint32((ptrs >> dequeueBits) & mask)
wait = uint32(ptrs & mask)
return
}
func (Self *window) pack(remaining, wait uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(remaining) << dequeueBits) |
uint64(wait&mask)
}
func (Self *window) New() {
Self.closeOpCh = make(chan struct{}, 2)
}
func (Self *window) CloseWindow() {
if !Self.closeOp {
Self.closeOp = true
Self.closeOpCh <- struct{}{}
Self.closeOpCh <- struct{}{}
}
}
type ReceiveWindow struct {
window
bufQueue ReceiveWindowQueue
element *common.ListElement
count int8
once sync.Once
}
func (Self *ReceiveWindow) New(mux *Mux) {
// initial a window for receive // initial a window for receive
Self.windowBuff = common.WindowBuff.Get() Self.bufQueue.New()
Self.readOp = make(chan struct{}) Self.element = common.ListElementPool.Get()
Self.WriteEndOp = make(chan struct{}) Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
Self.closeOpCh = make(chan struct{}, 3) Self.mux = mux
Self.window.New()
} }
func (Self *window) NewSend() { func (Self *ReceiveWindow) remainingSize(delta uint16) (n uint32) {
// initial a window for send // receive window remaining
Self.usableReceiveWindow = make(chan uint16) l := int64(atomic.LoadUint32(&Self.maxSize)) - int64(Self.bufQueue.Len())
Self.closeOpCh = make(chan struct{}, 3) l -= int64(delta)
} if l > 0 {
n = uint32(l)
func (Self *window) SetSendBuf(buf []byte) { }
// send window buff from conn write method, set it to send window
Self.mutex.Lock()
Self.windowBuff = buf
Self.off = 0
Self.mutex.Unlock()
}
func (Self *window) fullSlide() {
// slide by allocate
newBuf := common.WindowBuff.Get()
Self.liteSlide()
n := copy(newBuf[:Self.len()], Self.windowBuff)
common.WindowBuff.Put(Self.windowBuff)
Self.windowBuff = newBuf[:n]
return return
} }
func (Self *window) liteSlide() { func (Self *ReceiveWindow) calcSize() {
// slide by re slice // calculating maximum receive window size
Self.windowBuff = Self.windowBuff[Self.off:] if Self.count == 0 {
Self.off = 0 //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get())
conns := Self.mux.connMap.Size()
n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) *
Self.mux.bw.Get() / float64(conns))
if n < common.MAXIMUM_SEGMENT_SIZE*10 {
n = common.MAXIMUM_SEGMENT_SIZE * 10
}
bufLen := Self.bufQueue.Len()
if n < bufLen {
n = bufLen
}
if n < Self.maxSize/2 {
n = Self.maxSize / 2
}
// set the minimal size
if n > 2*Self.maxSize {
n = 2 * Self.maxSize
}
if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) {
n = common.MAXIMUM_WINDOW_SIZE / uint32(conns)
}
// set the maximum size
//logs.Warn("n", n)
atomic.StoreUint32(&Self.maxSize, n)
Self.count = -10
}
Self.count += 1
return return
} }
func (Self *window) Size() (n int) { func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) {
// receive Window remaining
n = common.PoolSizeWindow - Self.len()
return
}
func (Self *window) len() (n int) {
n = len(Self.windowBuff[Self.off:])
return
}
func (Self *window) cap() (n int) {
n = cap(Self.windowBuff[Self.off:])
return
}
func (Self *window) grow(n int) {
Self.windowBuff = Self.windowBuff[:Self.len()+n]
}
func (Self *window) Write(p []byte) (n int, err error) {
if Self.closeOp { if Self.closeOp {
return 0, errors.New("conn.receiveWindow: write on closed window") return errors.New("conn.receiveWindow: write on closed window")
} }
if len(p) > Self.Size() { element, err := NewListElement(buf, l, part)
return 0, errors.New("conn.receiveWindow: write too large") //logs.Warn("push the buf", len(buf), l, (&element).l)
if err != nil {
return
} }
Self.mutex.Lock() Self.calcSize() // calculate the max window size
// slide the offset var wait uint32
if len(p) > Self.cap()-Self.len() { start:
// not enough space, need to allocate ptrs := atomic.LoadUint64(&Self.remainingWait)
Self.fullSlide() _, wait = Self.unpack(ptrs)
} else { newRemaining := Self.remainingSize(l)
// have enough space, re slice // calculate the remaining window size now, plus the element we will push
Self.liteSlide() if newRemaining == 0 {
//logs.Warn("window full true", remaining)
wait = 1
} }
length := Self.len() // length before grow if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) {
Self.grow(len(p)) // grow for copy goto start
n = copy(Self.windowBuff[length:], p) // must copy data before allow Read // another goroutine change the status, make sure shall we need wait
if Self.readWait {
// if there condition is length == 0 and
// Read method just take away all the windowBuff,
// this method will block until windowBuff is empty again
// allow continue read
defer Self.allowRead()
} }
Self.mutex.Unlock() Self.bufQueue.Push(element)
return n, nil // status check finish, now we can push the element into the queue
if wait == 0 {
Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining)
// send the remaining window size, not including zero size
}
return nil
} }
func (Self *window) allowRead() (closed bool) { func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) {
if Self.closeOp { if Self.closeOp {
close(Self.readOp)
return true
}
Self.mutex.Lock()
Self.readWait = false
Self.mutex.Unlock()
select {
case <-Self.closeOpCh:
close(Self.readOp)
return true
case Self.readOp <- struct{}{}:
return false
}
}
func (Self *window) Read(p []byte) (n int, err error) {
if Self.closeOp {
return 0, io.EOF // Write method receive close signal, returns eof
}
Self.mutex.Lock()
length := Self.len() // protect the length data, it invokes
// before Write lock and after Write unlock
if length == 0 {
// window is empty, waiting for Write method send a success readOp signal
// or get timeout or close
Self.readWait = true
Self.mutex.Unlock()
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
select {
case _, ok := <-Self.readOp:
if !ok {
return 0, errors.New("conn.receiveWindow: window closed")
}
case <-Self.WriteEndOp:
return 0, io.EOF // receive eof signal, returns eof
case <-ticker.C:
return 0, errors.New("conn.receiveWindow: read time out")
case <-Self.closeOpCh:
close(Self.readOp)
return 0, io.EOF // receive close signal, returns eof return 0, io.EOF // receive close signal, returns eof
} }
} else { pOff := 0
Self.mutex.Unlock() l := 0
//logs.Warn("receive window read off, element.l", Self.off, Self.element.l)
copyData:
if Self.off == uint32(Self.element.L) {
// on the first Read method invoked, Self.off and Self.element.l
// both zero value
common.ListElementPool.Put(Self.element)
if Self.closeOp {
return 0, io.EOF
} }
minCopy := 512 Self.element, err = Self.bufQueue.Pop()
// if the queue is empty, Pop method will wait until one element push
// into the queue successful, or timeout.
// timer start on timeout parameter is set up ,
// reset to 60s if timeout and data still available
Self.off = 0
if err != nil {
return // queue receive stop or time out, break the loop and return
}
//logs.Warn("pop element", Self.element.l, Self.element.part)
}
l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L])
pOff += l
Self.off += uint32(l)
//logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len())
n += l
l = 0
if Self.off == uint32(Self.element.L) {
//logs.Warn("put the element end ", string(Self.element.buf[:15]))
common.WindowBuff.Put(Self.element.Buf)
Self.sendStatus(id, Self.element.L)
// check the window full status
}
if pOff < len(p) && Self.element.Part {
// element is a part of the segments, trying to fill up buf p
goto copyData
}
return // buf p is full or all of segments in buf, return
}
func (Self *ReceiveWindow) sendStatus(id int32, l uint16) {
var remaining, wait uint32
for { for {
Self.mutex.Lock() ptrs := atomic.LoadUint64(&Self.remainingWait)
if len(p) == n || Self.len() == 0 { remaining, wait = Self.unpack(ptrs)
Self.mutex.Unlock() remaining += uint32(l)
if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) {
break break
} }
if n+minCopy > len(p) { runtime.Gosched()
minCopy = len(p) - n // another goroutine change remaining or wait status, make sure
// we need acknowledge other side
} }
i := copy(p[n:n+minCopy], Self.windowBuff[Self.off:]) // now we get the current window status success
Self.off += uint16(i) if wait == 1 {
n += i //logs.Warn("send the wait status", remaining)
Self.mutex.Unlock() Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining)
} }
p = p[:n]
return return
} }
func (Self *window) WriteTo() (p []byte, err error) { func (Self *ReceiveWindow) SetTimeOut(t time.Time) {
if Self.closeOp { // waiting for FIFO queue Pop method
return nil, errors.New("conn.writeWindow: window closed") Self.bufQueue.SetTimeOut(t)
}
if Self.len() == 0 {
return nil, io.EOF
// send window buff is drain, return eof and get another one
}
var windowSize uint16
var ok bool
waiting:
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
// waiting for receive usable window size, or timeout
select {
case windowSize, ok = <-Self.usableReceiveWindow:
if !ok {
return nil, errors.New("conn.writeWindow: window closed")
}
case <-ticker.C:
return nil, errors.New("conn.writeWindow: write to time out")
case <-Self.closeOpCh:
return nil, errors.New("conn.writeWindow: window closed")
}
if windowSize == 0 {
goto waiting // waiting for another usable window size
}
Self.mutex.Lock()
if windowSize > uint16(Self.len()) {
// usable window size is bigger than window buff size, send the full buff
windowSize = uint16(Self.len())
}
p = Self.windowBuff[Self.off : windowSize+Self.off]
Self.off += windowSize
Self.mutex.Unlock()
return
} }
func (Self *window) SetAllowSize(value uint16) (closed bool) { func (Self *ReceiveWindow) Stop() {
// queue has no more data to push, so unblock pop method
Self.once.Do(Self.bufQueue.Stop)
}
func (Self *ReceiveWindow) CloseWindow() {
Self.window.CloseWindow()
Self.Stop()
Self.release()
}
func (Self *ReceiveWindow) release() {
//if Self.element != nil {
// if Self.element.Buf != nil {
// common.WindowBuff.Put(Self.element.Buf)
// }
// common.ListElementPool.Put(Self.element)
//}
for {
Self.element = Self.bufQueue.TryPop()
if Self.element == nil {
return
}
if Self.element.Buf != nil {
common.WindowBuff.Put(Self.element.Buf)
}
common.ListElementPool.Put(Self.element)
} // release resource
}
type SendWindow struct {
window
buf []byte
setSizeCh chan struct{}
timeout time.Time
}
func (Self *SendWindow) New(mux *Mux) {
Self.setSizeCh = make(chan struct{})
Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10
atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)<<dequeueBits)
Self.mux = mux
Self.window.New()
}
func (Self *SendWindow) SetSendBuf(buf []byte) {
// send window buff from conn write method, set it to send window
Self.buf = buf
Self.off = 0
}
func (Self *SendWindow) SetSize(windowSize, newRemaining uint32) (closed bool) {
// set the window size from receive window
defer func() { defer func() {
if recover() != nil { if recover() != nil {
closed = true closed = true
} }
}() }()
if Self.closeOp { if Self.closeOp {
close(Self.usableReceiveWindow) close(Self.setSizeCh)
return true return true
} }
select { //logs.Warn("set send window size to ", windowSize, newRemaining)
case Self.usableReceiveWindow <- value: var remaining, wait, newWait uint32
for {
ptrs := atomic.LoadUint64(&Self.remainingWait)
remaining, wait = Self.unpack(ptrs)
if remaining == newRemaining {
//logs.Warn("waiting for another window size")
return false // waiting for receive another usable window size
}
if newRemaining == 0 && wait == 1 {
newWait = 1 // keep the wait status,
// also if newRemaining is not zero, change wait to 0
}
if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(newRemaining, newWait)) {
break
}
// anther goroutine change wait status or window size
}
if wait == 1 {
// send window into the wait status, need notice the channel
//logs.Warn("send window remaining size is 0")
Self.allow()
}
// send window not into the wait status, so just do slide
return false return false
}
func (Self *SendWindow) allow() {
select {
case Self.setSizeCh <- struct{}{}:
//logs.Warn("send window remaining size is 0 finish")
return
case <-Self.closeOpCh: case <-Self.closeOpCh:
close(Self.usableReceiveWindow) close(Self.setSizeCh)
return true return
} }
} }
func (Self *window) CloseWindow() { func (Self *SendWindow) sent(sentSize uint32) {
Self.closeOp = true atomic.AddUint64(&Self.remainingWait, ^(uint64(sentSize)<<dequeueBits - 1))
Self.closeOpCh <- struct{}{} }
Self.closeOpCh <- struct{}{}
Self.closeOpCh <- struct{}{} func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) {
close(Self.closeOpCh) // returns buf segments, return only one segments, need a loop outside
// until err = io.EOF
if Self.closeOp {
return nil, 0, false, errors.New("conn.writeWindow: window closed")
}
if Self.off == uint32(len(Self.buf)) {
return nil, 0, false, io.EOF
// send window buff is drain, return eof and get another one
}
var remaining uint32
start:
ptrs := atomic.LoadUint64(&Self.remainingWait)
remaining, _ = Self.unpack(ptrs)
if remaining == 0 {
if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, 1)) {
goto start // another goroutine change the window, try again
}
// into the wait status
//logs.Warn("send window into wait status")
err = Self.waitReceiveWindow()
if err != nil {
return nil, 0, false, err
}
//logs.Warn("rem into wait finish")
goto start
}
// there are still remaining window
//logs.Warn("rem", remaining)
if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE {
sendSize = common.MAXIMUM_SEGMENT_SIZE
//logs.Warn("cut buf by mss")
} else {
sendSize = uint32(len(Self.buf[Self.off:]))
}
if remaining < sendSize {
// usable window size is small than
// window MAXIMUM_SEGMENT_SIZE or send buf left
sendSize = remaining
//logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:]))
}
//logs.Warn("send size", sendSize)
if sendSize < uint32(len(Self.buf[Self.off:])) {
part = true
}
p = Self.buf[Self.off : sendSize+Self.off]
Self.off += sendSize
Self.sent(sendSize)
return return
} }
type waitingCh struct { func (Self *SendWindow) waitReceiveWindow() (err error) {
nCh chan int t := Self.timeout.Sub(time.Now())
errCh chan error if t < 0 {
t = time.Minute * 5
}
timer := time.NewTimer(t)
defer timer.Stop()
// waiting for receive usable window size, or timeout
select {
case _, ok := <-Self.setSizeCh:
if !ok {
return errors.New("conn.writeWindow: window closed")
}
return nil
case <-timer.C:
return errors.New("conn.writeWindow: write to time out")
case <-Self.closeOpCh:
return errors.New("conn.writeWindow: window closed")
}
} }
func (Self *waitingCh) new() { func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) {
Self.nCh = make(chan int) Self.SetSendBuf(buf) // set the buf to send window
Self.errCh = make(chan error) //logs.Warn("set the buf to send window")
var bufSeg []byte
var part bool
var l uint32
for {
bufSeg, l, part, err = Self.WriteTo()
//logs.Warn("buf seg", len(bufSeg), part, err)
// get the buf segments from send window
if bufSeg == nil && part == false && err == io.EOF {
// send window is drain, break the loop
err = nil
break
}
if err != nil {
break
}
n += int(l)
l = 0
if part {
Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg)
} else {
Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg)
//logs.Warn("buf seg sent", len(bufSeg), part, err)
}
// send to other side, not send nil data to other side
}
//logs.Warn("buf seg write success")
return
} }
func (Self *waitingCh) close() { func (Self *SendWindow) SetTimeOut(t time.Time) {
close(Self.nCh) // waiting for receive a receive window size
close(Self.errCh) Self.timeout = t
} }
//type bandwidth struct {
// readStart time.Time
// lastReadStart time.Time
// readEnd time.Time
// lastReadEnd time.Time
// bufLength int
// lastBufLength int
// count int8
// readBW float64
// writeBW float64
// readBandwidth float64
//}
//
//func (Self *bandwidth) StartRead() {
// Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
// if !Self.lastReadStart.IsZero() {
// if Self.count == -5 {
// Self.calcBandWidth()
// }
// }
//}
//
//func (Self *bandwidth) EndRead() {
// Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now()
// if Self.count == -5 {
// Self.calcWriteBandwidth()
// }
// if Self.count == 0 {
// Self.calcReadBandwidth()
// Self.count = -6
// }
// Self.count += 1
//}
//
//func (Self *bandwidth) SetCopySize(n int) {
// // must be invoke between StartRead and EndRead
// Self.lastBufLength, Self.bufLength = Self.bufLength, n
//}
//// calculating
//// start end start end
//// read read
//// write
//
//func (Self *bandwidth) calcBandWidth() {
// t := Self.readStart.Sub(Self.lastReadStart)
// if Self.lastBufLength >= 32768 {
// Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds()
// }
//}
//
//func (Self *bandwidth) calcReadBandwidth() {
// // Bandwidth between nps and npc
// readTime := Self.readEnd.Sub(Self.readStart)
// Self.readBW = float64(Self.bufLength) / readTime.Seconds()
// //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds())
//}
//
//func (Self *bandwidth) calcWriteBandwidth() {
// // Bandwidth between nps and user, npc and application
// writeTime := Self.readStart.Sub(Self.lastReadEnd)
// Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds()
// //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds())
//}
//
//func (Self *bandwidth) Get() (bw float64) {
// // The zero value, 0 for numeric types
// if Self.writeBW == 0 && Self.readBW == 0 {
// //logs.Warn("bw both 0")
// return 100
// }
// if Self.writeBW == 0 && Self.readBW != 0 {
// return Self.readBW
// }
// if Self.readBW == 0 && Self.writeBW != 0 {
// return Self.writeBW
// }
// return Self.readBandwidth
//}

View File

@ -2,28 +2,35 @@ package mux
import ( import (
"sync" "sync"
"time"
) )
type connMap struct { type connMap struct {
connMap map[int32]*conn connMap map[int32]*conn
closeCh chan struct{} //closeCh chan struct{}
sync.RWMutex sync.RWMutex
} }
func NewConnMap() *connMap { func NewConnMap() *connMap {
connMap := &connMap{ connMap := &connMap{
connMap: make(map[int32]*conn), connMap: make(map[int32]*conn),
closeCh: make(chan struct{}), //closeCh: make(chan struct{}),
} }
go connMap.clean() //go connMap.clean()
return connMap return connMap
} }
func (s *connMap) Size() (n int) {
s.Lock()
n = len(s.connMap)
s.Unlock()
return
}
func (s *connMap) Get(id int32) (*conn, bool) { func (s *connMap) Get(id int32) (*conn, bool) {
s.Lock() s.Lock()
defer s.Unlock() v, ok := s.connMap[id]
if v, ok := s.connMap[id]; ok && v != nil { s.Unlock()
if ok && v != nil {
return v, true return v, true
} }
return nil, false return nil, false
@ -31,40 +38,38 @@ func (s *connMap) Get(id int32) (*conn, bool) {
func (s *connMap) Set(id int32, v *conn) { func (s *connMap) Set(id int32, v *conn) {
s.Lock() s.Lock()
defer s.Unlock()
s.connMap[id] = v s.connMap[id] = v
s.Unlock()
} }
func (s *connMap) Close() { func (s *connMap) Close() {
s.Lock() //s.closeCh <- struct{}{} // stop the clean goroutine first
defer s.Unlock()
for _, v := range s.connMap { for _, v := range s.connMap {
v.isClose = true v.Close() // close all the connections in the mux
} }
s.closeCh <- struct{}{}
} }
func (s *connMap) Delete(id int32) { func (s *connMap) Delete(id int32) {
s.Lock() s.Lock()
defer s.Unlock()
delete(s.connMap, id) delete(s.connMap, id)
s.Unlock()
} }
func (s *connMap) clean() { //func (s *connMap) clean() {
ticker := time.NewTimer(time.Minute * 1) // ticker := time.NewTimer(time.Minute * 1)
for { // for {
select { // select {
case <-ticker.C: // case <-ticker.C:
s.Lock() // s.Lock()
for _, v := range s.connMap { // for _, v := range s.connMap {
if v.isClose { // if v.isClose {
delete(s.connMap, v.connId) // delete(s.connMap, v.connId)
} // }
} // }
s.Unlock() // s.Unlock()
case <-s.closeCh: // case <-s.closeCh:
ticker.Stop() // ticker.Stop()
return // return
} // }
} // }
} //}

View File

@ -1,11 +1,10 @@
package mux package mux
import ( import (
"bytes"
"errors" "errors"
"io"
"math" "math"
"net" "net"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -14,6 +13,7 @@ import (
) )
type Mux struct { type Mux struct {
latency uint64 // we store latency in bits, but it's float64
net.Listener net.Listener
conn net.Conn conn net.Conn
connMap *connMap connMap *connMap
@ -21,30 +21,39 @@ type Mux struct {
id int32 id int32
closeChan chan struct{} closeChan chan struct{}
IsClose bool IsClose bool
pingOk int pingOk uint32
counter *latencyCounter
bw *bandwidth
pingCh chan []byte
pingCheckTime uint32
connType string connType string
writeQueue Queue writeQueue PriorityQueue
bufCh chan *bytes.Buffer newConnQueue ConnQueue
sync.Mutex
} }
func NewMux(c net.Conn, connType string) *Mux { func NewMux(c net.Conn, connType string) *Mux {
//c.(*net.TCPConn).SetReadBuffer(0)
//c.(*net.TCPConn).SetWriteBuffer(0)
m := &Mux{ m := &Mux{
conn: c, conn: c,
connMap: NewConnMap(), connMap: NewConnMap(),
id: 0, id: 0,
closeChan: make(chan struct{}), closeChan: make(chan struct{}, 1),
newConnCh: make(chan *conn), newConnCh: make(chan *conn),
bw: new(bandwidth),
IsClose: false, IsClose: false,
connType: connType, connType: connType,
bufCh: make(chan *bytes.Buffer), pingCh: make(chan []byte),
counter: newLatencyCounter(),
} }
m.writeQueue.New() m.writeQueue.New()
m.newConnQueue.New()
//read session by flag //read session by flag
go m.readSession() m.readSession()
//ping //ping
go m.ping() m.ping()
go m.writeSession() m.pingReturn()
m.writeSession()
return m return m
} }
@ -52,7 +61,7 @@ func (s *Mux) NewConn() (*conn, error) {
if s.IsClose { if s.IsClose {
return nil, errors.New("the mux has closed") return nil, errors.New("the mux has closed")
} }
conn := NewConn(s.getId(), s) conn := NewConn(s.getId(), s, "nps ")
//it must be set before send //it must be set before send
s.connMap.Set(conn.connId, conn) s.connMap.Set(conn.connId, conn)
s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil) s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil)
@ -83,12 +92,17 @@ func (s *Mux) Addr() net.Addr {
return s.conn.LocalAddr() return s.conn.LocalAddr()
} }
func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) { func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) {
if s.IsClose {
return
}
var err error var err error
pack := common.MuxPack.Get() pack := common.MuxPack.Get()
err = pack.NewPac(flag, id, data) err = pack.NewPac(flag, id, data...)
if err != nil { if err != nil {
common.MuxPack.Put(pack) common.MuxPack.Put(pack)
logs.Error("mux: new pack err", err)
s.Close()
return return
} }
s.writeQueue.Push(pack) s.writeQueue.Push(pack)
@ -97,123 +111,183 @@ func (s *Mux) sendInfo(flag uint8, id int32, data interface{}) {
func (s *Mux) writeSession() { func (s *Mux) writeSession() {
go s.packBuf() go s.packBuf()
go s.writeBuf() //go s.writeBuf()
<-s.closeChan
} }
func (s *Mux) packBuf() { func (s *Mux) packBuf() {
//buffer := common.BuffPool.Get()
for { for {
if s.IsClose {
break
}
//buffer.Reset()
pack := s.writeQueue.Pop() pack := s.writeQueue.Pop()
buffer := common.BuffPool.Get() if s.IsClose {
err := pack.Pack(buffer) break
}
//buffer := common.BuffPool.Get()
err := pack.Pack(s.conn)
common.MuxPack.Put(pack) common.MuxPack.Put(pack)
if err != nil { if err != nil {
logs.Warn("pack err", err) logs.Error("mux: pack err", err)
common.BuffPool.Put(buffer) //common.BuffPool.Put(buffer)
break
}
select {
case s.bufCh <- buffer:
case <-s.closeChan:
break
}
}
}
func (s *Mux) writeBuf() {
for {
select {
case buffer := <-s.bufCh:
l := buffer.Len()
n, err := buffer.WriteTo(s.conn)
common.BuffPool.Put(buffer)
if err != nil || int(n) != l {
logs.Warn("close from write session fail ", err, n, l)
s.Close() s.Close()
break break
} }
case <-s.closeChan: //logs.Warn(buffer.String())
break //s.bufQueue.Push(buffer)
} //l := buffer.Len()
//n, err := buffer.WriteTo(s.conn)
//common.BuffPool.Put(buffer)
//if err != nil || int(n) != l {
// logs.Error("mux: close from write session fail ", err, n, l)
// s.Close()
// break
//}
} }
} }
//func (s *Mux) writeBuf() {
// for {
// if s.IsClose {
// break
// }
// buffer, err := s.bufQueue.Pop()
// if err != nil {
// break
// }
// l := buffer.Len()
// n, err := buffer.WriteTo(s.conn)
// common.BuffPool.Put(buffer)
// if err != nil || int(n) != l {
// logs.Warn("close from write session fail ", err, n, l)
// s.Close()
// break
// }
// }
//}
func (s *Mux) ping() { func (s *Mux) ping() {
go func() { go func() {
ticker := time.NewTicker(time.Second * 1) now, _ := time.Now().UTC().MarshalText()
s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
// send the ping flag and get the latency first
ticker := time.NewTicker(time.Second * 5)
for { for {
if s.IsClose {
ticker.Stop()
break
}
select { select {
case <-ticker.C: case <-ticker.C:
} }
//Avoid going beyond the scope if atomic.LoadUint32(&s.pingCheckTime) >= 60 {
if (math.MaxInt32 - s.id) < 10000 { logs.Error("mux: ping time out")
s.id = 0 s.Close()
// more than 5 minutes not receive the ping return package,
// mux conn is damaged, maybe a packet drop, close it
break
} }
s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, nil) now, _ := time.Now().UTC().MarshalText()
if s.pingOk > 10 && s.connType == "kcp" { s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
atomic.AddUint32(&s.pingCheckTime, 1)
if atomic.LoadUint32(&s.pingOk) > 10 && s.connType == "kcp" {
logs.Error("mux: kcp ping err")
s.Close() s.Close()
break break
} }
s.pingOk++ atomic.AddUint32(&s.pingOk, 1)
} }
}() }()
select { }
case <-s.closeChan:
func (s *Mux) pingReturn() {
go func() {
var now time.Time
var data []byte
for {
if s.IsClose {
break
} }
select {
case data = <-s.pingCh:
atomic.StoreUint32(&s.pingCheckTime, 0)
case <-s.closeChan:
break
}
_ = now.UnmarshalText(data)
latency := time.Now().UTC().Sub(now).Seconds() / 2
if latency > 0 {
atomic.StoreUint64(&s.latency, math.Float64bits(s.counter.Latency(latency)))
// convert float64 to bits, store it atomic
}
//logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency)))
if cap(data) > 0 {
common.WindowBuff.Put(data)
}
}
}()
} }
func (s *Mux) readSession() { func (s *Mux) readSession() {
go func() { go func() {
pack := common.MuxPack.Get() var connection *conn
for { for {
pack = common.MuxPack.Get() if s.IsClose {
if pack.UnPack(s.conn) != nil {
break break
} }
s.pingOk = 0 connection = s.newConnQueue.Pop()
if s.IsClose {
break // make sure that is closed
}
s.connMap.Set(connection.connId, connection) //it has been set before send ok
s.newConnCh <- connection
s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
}
}()
go func() {
pack := common.MuxPack.Get()
var l uint16
var err error
for {
if s.IsClose {
break
}
pack = common.MuxPack.Get()
s.bw.StartRead()
if l, err = pack.UnPack(s.conn); err != nil {
logs.Error("mux: read session unpack from connection err", err)
s.Close()
break
}
s.bw.SetCopySize(l)
atomic.StoreUint32(&s.pingOk, 0)
switch pack.Flag { switch pack.Flag {
case common.MUX_NEW_CONN: //new connection case common.MUX_NEW_CONN: //new connection
connection := NewConn(pack.Id, s) connection := NewConn(pack.Id, s)
s.connMap.Set(pack.Id, connection) //it has been set before send ok s.newConnQueue.Push(connection)
go func(connection *conn) {
connection.sendWindow.SetAllowSize(512) // set the initial receive window
}(connection)
s.newConnCh <- connection
s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil)
continue continue
case common.MUX_PING_FLAG: //ping case common.MUX_PING_FLAG: //ping
go s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, nil) s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content)
common.WindowBuff.Put(pack.Content)
continue continue
case common.MUX_PING_RETURN: case common.MUX_PING_RETURN:
//go func(content []byte) {
s.pingCh <- pack.Content
//}(pack.Content)
continue continue
} }
if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose { if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose {
switch pack.Flag { switch pack.Flag {
case common.MUX_NEW_MSG: //new msg from remote connection case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection
//insert wait queue err = s.newMsg(connection, pack)
if connection.isClose {
continue
}
connection.receiveWindow.WriteWg.Add(1)
go func(connection *conn, content []byte) { // do not block read session
_, err := connection.receiveWindow.Write(content)
if err != nil { if err != nil {
logs.Warn("mux new msg err close", err) logs.Error("mux: read session connection new msg err", err)
connection.Close() connection.Close()
} }
size := connection.receiveWindow.Size()
if size == 0 {
connection.receiveWindow.WindowFull = true
}
s.sendInfo(common.MUX_MSG_SEND_OK, connection.connId, size)
connection.receiveWindow.WriteWg.Done()
}(connection, pack.Content)
continue continue
case common.MUX_NEW_CONN_OK: //connection ok case common.MUX_NEW_CONN_OK: //connection ok
connection.connStatusOkCh <- struct{}{} connection.connStatusOkCh <- struct{}{}
go connection.sendWindow.SetAllowSize(512)
// set the initial receive window both side
continue continue
case common.MUX_NEW_CONN_Fail: case common.MUX_NEW_CONN_Fail:
connection.connStatusFailCh <- struct{}{} connection.connStatusFailCh <- struct{}{}
@ -222,15 +296,14 @@ func (s *Mux) readSession() {
if connection.isClose { if connection.isClose {
continue continue
} }
go connection.sendWindow.SetAllowSize(pack.Window) connection.sendWindow.SetSize(pack.Window, pack.ReadLength)
continue continue
case common.MUX_CONN_CLOSE: //close the connection case common.MUX_CONN_CLOSE: //close the connection
s.connMap.Delete(pack.Id)
connection.closeFlag = true connection.closeFlag = true
go func(connection *conn) { //s.connMap.Delete(pack.Id)
connection.receiveWindow.WriteWg.Wait() //go func(connection *conn) {
connection.receiveWindow.WriteEndOp <- struct{}{} // close signal to receive window connection.receiveWindow.Stop() // close signal to receive window
}(connection) //}(connection)
continue continue
} }
} else if pack.Flag == common.MUX_CONN_CLOSE { } else if pack.Flag == common.MUX_CONN_CLOSE {
@ -241,32 +314,195 @@ func (s *Mux) readSession() {
common.MuxPack.Put(pack) common.MuxPack.Put(pack)
s.Close() s.Close()
}() }()
select {
case <-s.closeChan:
}
} }
func (s *Mux) Close() error { func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) {
if connection.isClose {
err = io.ErrClosedPipe
return
}
//logs.Warn("read session receive new msg", pack.Length)
//go func(connection *conn, pack *common.MuxPackager) { // do not block read session
//insert into queue
if pack.Flag == common.MUX_NEW_MSG_PART {
err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id)
}
if pack.Flag == common.MUX_NEW_MSG {
err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id)
}
//logs.Warn("read session write success", pack.Length)
return
}
func (s *Mux) Close() (err error) {
logs.Warn("close mux") logs.Warn("close mux")
if s.IsClose { if s.IsClose {
return errors.New("the mux has closed") return errors.New("the mux has closed")
} }
s.IsClose = true s.IsClose = true
s.connMap.Close() s.connMap.Close()
s.closeChan <- struct{}{} s.connMap = nil
s.closeChan <- struct{}{} //s.bufQueue.Stop()
s.closeChan <- struct{}{}
s.closeChan <- struct{}{}
s.closeChan <- struct{}{} s.closeChan <- struct{}{}
close(s.newConnCh) close(s.newConnCh)
return s.conn.Close() err = s.conn.Close()
s.release()
return
}
func (s *Mux) release() {
for {
pack := s.writeQueue.TryPop()
if pack == nil {
break
}
if pack.BasePackager.Content != nil {
common.WindowBuff.Put(pack.BasePackager.Content)
}
common.MuxPack.Put(pack)
}
for {
connection := s.newConnQueue.TryPop()
if connection == nil {
break
}
connection = nil
}
s.writeQueue.Stop()
s.newConnQueue.Stop()
} }
//get new connId as unique flag //get new connId as unique flag
func (s *Mux) getId() (id int32) { func (s *Mux) getId() (id int32) {
//Avoid going beyond the scope
if (math.MaxInt32 - s.id) < 10000 {
atomic.StoreInt32(&s.id, 0)
}
id = atomic.AddInt32(&s.id, 1) id = atomic.AddInt32(&s.id, 1)
if _, ok := s.connMap.Get(id); ok { if _, ok := s.connMap.Get(id); ok {
s.getId() return s.getId()
} }
return return
} }
type bandwidth struct {
readBandwidth uint64 // store in bits, but it's float64
readStart time.Time
lastReadStart time.Time
bufLength uint32
}
func (Self *bandwidth) StartRead() {
if Self.readStart.IsZero() {
Self.readStart = time.Now()
}
if Self.bufLength >= common.MAXIMUM_SEGMENT_SIZE*300 {
Self.lastReadStart, Self.readStart = Self.readStart, time.Now()
Self.calcBandWidth()
}
}
func (Self *bandwidth) SetCopySize(n uint16) {
Self.bufLength += uint32(n)
}
func (Self *bandwidth) calcBandWidth() {
t := Self.readStart.Sub(Self.lastReadStart)
atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds()))
Self.bufLength = 0
}
func (Self *bandwidth) Get() (bw float64) {
// The zero value, 0 for numeric types
bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth))
if bw <= 0 {
bw = 100
}
//logs.Warn(bw)
return
}
const counterBits = 4
const counterMask = 1<<counterBits - 1
func newLatencyCounter() *latencyCounter {
return &latencyCounter{
buf: make([]float64, 1<<counterBits, 1<<counterBits),
headMin: 0,
}
}
type latencyCounter struct {
buf []float64 //buf is a fixed length ring buffer,
// if buffer is full, new value will replace the oldest one.
headMin uint8 //head indicate the head in ring buffer,
// in meaning, slot in list will be replaced;
// min indicate this slot value is minimal in list.
}
func (Self *latencyCounter) unpack(idxs uint8) (head, min uint8) {
head = uint8((idxs >> counterBits) & counterMask)
// we set head is 4 bits
min = uint8(idxs & counterMask)
return
}
func (Self *latencyCounter) pack(head, min uint8) uint8 {
return uint8(head<<counterBits) |
uint8(min&counterMask)
}
func (Self *latencyCounter) add(value float64) {
head, min := Self.unpack(Self.headMin)
Self.buf[head] = value
if head == min {
min = Self.minimal()
//if head equals min, means the min slot already be replaced,
// so we need to find another minimal value in the list,
// and change the min indicator
}
if Self.buf[min] > value {
min = head
}
head++
Self.headMin = Self.pack(head, min)
}
func (Self *latencyCounter) minimal() (min uint8) {
var val float64
var i uint8
for i = 0; i < counterMask; i++ {
if Self.buf[i] > 0 {
if val > Self.buf[i] {
val = Self.buf[i]
min = i
}
}
}
return
}
func (Self *latencyCounter) Latency(value float64) (latency float64) {
Self.add(value)
_, min := Self.unpack(Self.headMin)
latency = Self.buf[min] * Self.countSuccess()
return
}
const lossRatio = 1.6
func (Self *latencyCounter) countSuccess() (successRate float64) {
var success, loss, i uint8
_, min := Self.unpack(Self.headMin)
for i = 0; i < counterMask; i++ {
if Self.buf[i] > lossRatio*Self.buf[min] && Self.buf[i] > 0 {
loss++
}
if Self.buf[i] <= lossRatio*Self.buf[min] && Self.buf[i] > 0 {
success++
}
}
// counting all the data in the ring buf, except zero
successRate = float64(success) / float64(loss+success)
return
}

View File

@ -1,15 +1,22 @@
package mux package mux
import ( import (
"bufio"
"fmt"
"github.com/cnlh/nps/lib/common"
"github.com/cnlh/nps/lib/goroutine"
"io"
"log"
"net" "net"
"net/http" "net/http"
"net/http/httputil"
_ "net/http/pprof" _ "net/http/pprof"
"sync" "strconv"
"testing" "testing"
"time" "time"
"unsafe"
"github.com/astaxie/beego/logs" "github.com/astaxie/beego/logs"
"github.com/cnlh/nps/lib/common"
) )
var conn1 net.Conn var conn1 net.Conn
@ -23,47 +30,54 @@ func TestNewMux(t *testing.T) {
logs.SetLogFuncCallDepth(3) logs.SetLogFuncCallDepth(3)
server() server()
client() client()
//poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false))
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
go func() { go func() {
m2 := NewMux(conn2, "tcp") m2 := NewMux(conn2, "tcp")
for { for {
logs.Warn("npc starting accept") //logs.Warn("npc starting accept")
c, err := m2.Accept() c, err := m2.Accept()
if err != nil { if err != nil {
logs.Warn(err) logs.Warn(err)
continue continue
} }
logs.Warn("npc accept success ") //logs.Warn("npc accept success ")
c2, err := net.Dial("tcp", "127.0.0.1:80") c2, err := net.Dial("tcp", "127.0.0.1:80")
if err != nil { if err != nil {
logs.Warn(err) logs.Warn(err)
c.Close()
continue continue
} }
go func(c2 net.Conn, c net.Conn) { //c2.(*net.TCPConn).SetReadBuffer(0)
wg := sync.WaitGroup{} //c2.(*net.TCPConn).SetReadBuffer(0)
wg.Add(1) _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil))
go func() { //go func(c2 net.Conn, c *conn) {
_, err = common.CopyBuffer(c2, c) // wg := new(sync.WaitGroup)
if err != nil { // wg.Add(2)
c2.Close() // _ = poolConnCopy.Invoke(common.newConnGroup(c2, c, wg))
c.Close() // //go func() {
logs.Warn("close npc by copy from nps", err) // // _, err = common.CopyBuffer(c2, c)
} // // if err != nil {
wg.Done() // // c2.Close()
}() // // c.Close()
wg.Add(1) // // //logs.Warn("close npc by copy from nps", err, c.connId)
go func() { // // }
_, err = common.CopyBuffer(c, c2) // // wg.Done()
if err != nil { // //}()
c2.Close() // //wg.Add(1)
c.Close() // _ = poolConnCopy.Invoke(common.newConnGroup(c, c2, wg))
logs.Warn("close npc by copy from server", err) // //go func() {
} // // _, err = common.CopyBuffer(c, c2)
wg.Done() // // if err != nil {
}() // // c2.Close()
logs.Warn("npc wait") // // c.Close()
wg.Wait() // // //logs.Warn("close npc by copy from server", err, c.connId)
}(c2, c) // // }
// // wg.Done()
// //}()
// //logs.Warn("npc wait")
// wg.Wait()
//}(c2, c.(*conn))
} }
}() }()
@ -74,39 +88,55 @@ func TestNewMux(t *testing.T) {
logs.Warn(err) logs.Warn(err)
} }
for { for {
logs.Warn("nps starting accept") //logs.Warn("nps starting accept")
conn, err := l.Accept() conns, err := l.Accept()
if err != nil { if err != nil {
logs.Warn(err) logs.Warn(err)
continue continue
} }
logs.Warn("nps accept success starting new conn") //conns.(*net.TCPConn).SetReadBuffer(0)
//conns.(*net.TCPConn).SetReadBuffer(0)
//logs.Warn("nps accept success starting new conn")
tmpCpnn, err := m1.NewConn() tmpCpnn, err := m1.NewConn()
if err != nil { if err != nil {
logs.Warn("nps new conn err ", err) logs.Warn("nps new conn err ", err)
continue continue
} }
logs.Warn("nps new conn success ", tmpCpnn.connId) //logs.Warn("nps new conn success ", tmpCpnn.connId)
go func(tmpCpnn net.Conn, conn net.Conn) { _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil))
go func() { //go func(tmpCpnn *conn, conns net.Conn) {
_, err := common.CopyBuffer(tmpCpnn, conn) // wg := new(sync.WaitGroup)
if err != nil { // wg.Add(2)
conn.Close() // _ = poolConnCopy.Invoke(common.newConnGroup(tmpCpnn, conns, wg))
tmpCpnn.Close() // //go func() {
logs.Warn("close nps by copy from user") // // _, err := common.CopyBuffer(tmpCpnn, conns)
} // // if err != nil {
}() // // conns.Close()
//time.Sleep(time.Second) // // tmpCpnn.Close()
_, err = common.CopyBuffer(conn, tmpCpnn) // // //logs.Warn("close nps by copy from user", tmpCpnn.connId, err)
if err != nil { // // }
conn.Close() // //}()
tmpCpnn.Close() // //wg.Add(1)
logs.Warn("close nps by copy from npc ") // _ = poolConnCopy.Invoke(common.newConnGroup(conns, tmpCpnn, wg))
} // //time.Sleep(time.Second)
}(tmpCpnn, conn) // //_, err = common.CopyBuffer(conns, tmpCpnn)
// //if err != nil {
// // conns.Close()
// // tmpCpnn.Close()
// // //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err)
// //}
// wg.Wait()
//}(tmpCpnn, conns)
} }
}() }()
//go NewLogServer()
time.Sleep(time.Second * 5)
//for i := 0; i < 1; i++ {
// go test_raw(i)
//}
//test_request()
for { for {
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
} }
@ -135,6 +165,73 @@ func client() {
} }
} }
func test_request() {
conn, _ := net.Dial("tcp", "127.0.0.1:7777")
for i := 0; i < 1000; i++ {
conn.Write([]byte(`GET / HTTP/1.1
Host: 127.0.0.1:7777
Connection: keep-alive
`))
r, err := http.ReadResponse(bufio.NewReader(conn), nil)
if err != nil {
logs.Warn("close by read response err", err)
break
}
logs.Warn("read response success", r)
b, err := httputil.DumpResponse(r, true)
if err != nil {
logs.Warn("close by dump response err", err)
break
}
fmt.Println(string(b[:20]), err)
//time.Sleep(time.Second)
}
logs.Warn("finish")
}
func test_raw(k int) {
for i := 0; i < 1000; i++ {
ti := time.Now()
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
logs.Warn("conn dial err", err)
}
tid := time.Now()
conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1
Host: 127.0.0.1:7777
`))
tiw := time.Now()
buf := make([]byte, 3572)
n, err := io.ReadFull(conn, buf)
//n, err := conn.Read(buf)
if err != nil {
logs.Warn("close by read response err", err)
break
}
logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
//time.Sleep(time.Second)
err = conn.Close()
if err != nil {
logs.Warn("close conn err ", err)
}
now := time.Now()
du := now.Sub(ti).Seconds()
dud := now.Sub(tid).Seconds()
duw := now.Sub(tiw).Seconds()
if du > 1 {
logs.Warn("duration long", du, dud, duw, k, i)
}
if n != 3572 {
logs.Warn("n loss", n, string(buf))
}
}
logs.Warn("finish")
}
func TestNewConn(t *testing.T) { func TestNewConn(t *testing.T) {
buf := common.GetBufPoolCopy() buf := common.GetBufPoolCopy()
logs.Warn(len(buf), cap(buf)) logs.Warn(len(buf), cap(buf))
@ -146,3 +243,205 @@ func TestNewConn(t *testing.T) {
logs.Warn(copy(buf[:3], b), len(buf), cap(buf)) logs.Warn(copy(buf[:3], b), len(buf), cap(buf))
logs.Warn(len(buf), buf[0]) logs.Warn(len(buf), buf[0])
} }
func TestDQueue(t *testing.T) {
logs.EnableFuncCallDepth(true)
logs.SetLogFuncCallDepth(3)
d := new(bufDequeue)
d.vals = make([]unsafe.Pointer, 8)
go func() {
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
logs.Warn(i)
logs.Warn(d.popTail())
}
}()
go func() {
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
data := "test"
go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data)))
}
}()
time.Sleep(time.Second * 3)
}
func TestChain(t *testing.T) {
go func() {
log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
}()
logs.EnableFuncCallDepth(true)
logs.SetLogFuncCallDepth(3)
time.Sleep(time.Second * 5)
d := new(bufChain)
d.new(256)
go func() {
time.Sleep(time.Second)
for i := 0; i < 30000; i++ {
unsa, ok := d.popTail()
str := (*string)(unsa)
if ok {
fmt.Println(i, str, *str, ok)
//logs.Warn(i, str, *str, ok)
} else {
fmt.Println("nil", i, ok)
//logs.Warn("nil", i, ok)
}
}
}()
go func() {
time.Sleep(time.Second)
for i := 0; i < 3000; i++ {
go func(i int) {
for n := 0; n < 10; n++ {
data := "test " + strconv.Itoa(i) + strconv.Itoa(n)
fmt.Println(data, unsafe.Pointer(&data))
//logs.Warn(data, unsafe.Pointer(&data))
d.pushHead(unsafe.Pointer(&data))
}
}(i)
}
}()
time.Sleep(time.Second * 100000)
}
func TestFIFO(t *testing.T) {
go func() {
log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
}()
logs.EnableFuncCallDepth(true)
logs.SetLogFuncCallDepth(3)
time.Sleep(time.Second * 5)
d := new(ReceiveWindowQueue)
d.New()
go func() {
time.Sleep(time.Second)
for i := 0; i < 1001; i++ {
data, err := d.Pop()
if err == nil {
//fmt.Println(i, string(data.buf), err)
logs.Warn(i, string(data.Buf), err)
common.ListElementPool.Put(data)
} else {
//fmt.Println("err", err)
logs.Warn("err", err)
}
//logs.Warn(d.Len())
}
logs.Warn("pop finish")
}()
go func() {
time.Sleep(time.Second * 10)
for i := 0; i < 1000; i++ {
by := []byte("test " + strconv.Itoa(i) + " ") //
data, _ := NewListElement(by, uint16(len(by)), true)
//fmt.Println(string((*data).buf), data)
//logs.Warn(string((*data).buf), data)
d.Push(data)
}
}()
time.Sleep(time.Second * 100000)
}
func TestPriority(t *testing.T) {
go func() {
log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
}()
logs.EnableFuncCallDepth(true)
logs.SetLogFuncCallDepth(3)
time.Sleep(time.Second * 5)
d := new(PriorityQueue)
d.New()
go func() {
time.Sleep(time.Second)
for i := 0; i < 360050; i++ {
data := d.Pop()
//fmt.Println(i, string(data.buf), err)
logs.Warn(i, string(data.Content), data)
}
logs.Warn("pop finish")
}()
go func() {
time.Sleep(time.Second * 10)
for i := 0; i < 30000; i++ {
go func(i int) {
for n := 0; n < 10; n++ {
data := new(common.MuxPackager)
by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
_ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
//fmt.Println(string((*data).buf), data)
logs.Warn(string((*data).Content), data)
d.Push(data)
}
}(i)
go func(i int) {
data := new(common.MuxPackager)
_ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
//fmt.Println(string((*data).buf), data)
logs.Warn(data)
d.Push(data)
}(i)
go func(i int) {
data := new(common.MuxPackager)
_ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
//fmt.Println(string((*data).buf), data)
logs.Warn(data)
d.Push(data)
}(i)
}
}()
time.Sleep(time.Second * 100000)
}
//func TestReceive(t *testing.T) {
// go func() {
// log.Println(http.ListenAndServe("0.0.0.0:8889", nil))
// }()
// logs.EnableFuncCallDepth(true)
// logs.SetLogFuncCallDepth(3)
// time.Sleep(time.Second * 5)
// mux := new(Mux)
// mux.bw.readBandwidth = float64(1*1024*1024)
// mux.latency = float64(1/1000)
// wind := new(ReceiveWindow)
// wind.New(mux)
// wind.
// go func() {
// time.Sleep(time.Second)
// for i := 0; i < 36000; i++ {
// data := d.Pop()
// //fmt.Println(i, string(data.buf), err)
// logs.Warn(i, string(data.Content), data)
// }
// }()
// go func() {
// time.Sleep(time.Second*10)
// for i := 0; i < 3000; i++ {
// go func(i int) {
// for n := 0; n < 10; n++{
// data := new(common.MuxPackager)
// by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n))
// _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by)
// //fmt.Println(string((*data).buf), data)
// logs.Warn(string((*data).Content), data)
// d.Push(data)
// }
// }(i)
// go func(i int) {
// data := new(common.MuxPackager)
// _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil)
// //fmt.Println(string((*data).buf), data)
// logs.Warn(data)
// d.Push(data)
// }(i)
// go func(i int) {
// data := new(common.MuxPackager)
// _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil)
// //fmt.Println(string((*data).buf), data)
// logs.Warn(data)
// d.Push(data)
// }(i)
// }
// }()
// time.Sleep(time.Second * 100000)
//}

View File

@ -1,95 +1,583 @@
package mux package mux
import ( import (
"container/list" "errors"
"github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/common"
"io"
"math"
"runtime"
"sync" "sync"
"sync/atomic"
"time"
"unsafe"
) )
type Queue struct { type PriorityQueue struct {
list *list.List highestChain *bufChain
readOp chan struct{} middleChain *bufChain
cleanOp chan struct{} lowestChain *bufChain
popWait bool starving uint8
mutex sync.Mutex stop bool
cond *sync.Cond
} }
func (Self *Queue) New() { func (Self *PriorityQueue) New() {
Self.list = list.New() Self.highestChain = new(bufChain)
Self.readOp = make(chan struct{}) Self.highestChain.new(4)
Self.cleanOp = make(chan struct{}, 2) Self.middleChain = new(bufChain)
Self.middleChain.new(32)
Self.lowestChain = new(bufChain)
Self.lowestChain.new(256)
locker := new(sync.Mutex)
Self.cond = sync.NewCond(locker)
} }
func (Self *Queue) Push(packager *common.MuxPackager) { func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
Self.mutex.Lock() //logs.Warn("push start")
if Self.popWait { Self.push(packager)
defer Self.allowPop() Self.cond.Broadcast()
} //logs.Warn("push finish")
if packager.Flag == common.MUX_CONN_CLOSE {
Self.insert(packager) // the close package may need priority,
// prevent wait too long to close
} else {
Self.list.PushBack(packager)
}
Self.mutex.Unlock()
return return
} }
func (Self *Queue) allowPop() (closed bool) { func (Self *PriorityQueue) push(packager *common.MuxPackager) {
Self.mutex.Lock() switch packager.Flag {
Self.popWait = false case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
Self.mutex.Unlock() Self.highestChain.pushHead(unsafe.Pointer(packager))
// the ping package need highest priority
// prevent ping calculation error
case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail:
// the new conn package need some priority too
Self.middleChain.pushHead(unsafe.Pointer(packager))
default:
Self.lowestChain.pushHead(unsafe.Pointer(packager))
}
}
const maxStarving uint8 = 8
func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) {
var iter bool
for {
packager = Self.TryPop()
if packager != nil {
return
}
if Self.stop {
return
}
if iter {
break
// trying to pop twice
}
iter = true
runtime.Gosched()
}
Self.cond.L.Lock()
defer Self.cond.L.Unlock()
for packager = Self.TryPop(); packager == nil; {
if Self.stop {
return
}
//logs.Warn("queue into wait")
Self.cond.Wait()
// wait for it with no more iter
packager = Self.TryPop()
//logs.Warn("queue wait finish", packager)
}
return
}
func (Self *PriorityQueue) TryPop() (packager *common.MuxPackager) {
ptr, ok := Self.highestChain.popTail()
if ok {
packager = (*common.MuxPackager)(ptr)
return
}
if Self.starving < maxStarving {
// not pop too much, lowestChain will wait too long
ptr, ok = Self.middleChain.popTail()
if ok {
packager = (*common.MuxPackager)(ptr)
Self.starving++
return
}
}
ptr, ok = Self.lowestChain.popTail()
if ok {
packager = (*common.MuxPackager)(ptr)
if Self.starving > 0 {
Self.starving = uint8(Self.starving / 2)
}
return
}
if Self.starving > 0 {
ptr, ok = Self.middleChain.popTail()
if ok {
packager = (*common.MuxPackager)(ptr)
Self.starving++
return
}
}
return
}
func (Self *PriorityQueue) Stop() {
Self.stop = true
Self.cond.Broadcast()
}
type ConnQueue struct {
chain *bufChain
starving uint8
stop bool
cond *sync.Cond
}
func (Self *ConnQueue) New() {
Self.chain = new(bufChain)
Self.chain.new(32)
locker := new(sync.Mutex)
Self.cond = sync.NewCond(locker)
}
func (Self *ConnQueue) Push(connection *conn) {
Self.chain.pushHead(unsafe.Pointer(connection))
Self.cond.Broadcast()
return
}
func (Self *ConnQueue) Pop() (connection *conn) {
var iter bool
for {
connection = Self.TryPop()
if connection != nil {
return
}
if Self.stop {
return
}
if iter {
break
// trying to pop twice
}
iter = true
runtime.Gosched()
}
Self.cond.L.Lock()
defer Self.cond.L.Unlock()
for connection = Self.TryPop(); connection == nil; {
if Self.stop {
return
}
//logs.Warn("queue into wait")
Self.cond.Wait()
// wait for it with no more iter
connection = Self.TryPop()
//logs.Warn("queue wait finish", packager)
}
return
}
func (Self *ConnQueue) TryPop() (connection *conn) {
ptr, ok := Self.chain.popTail()
if ok {
connection = (*conn)(ptr)
return
}
return
}
func (Self *ConnQueue) Stop() {
Self.stop = true
Self.cond.Broadcast()
}
func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) {
if uint16(len(buf)) != l {
err = errors.New("ListElement: buf length not match")
return
}
//if l == 0 {
// logs.Warn("push zero")
//}
element = common.ListElementPool.Get()
element.Buf = buf
element.L = l
element.Part = part
return
}
type ReceiveWindowQueue struct {
chain *bufChain
stopOp chan struct{}
readOp chan struct{}
lengthWait uint64 // really strange ???? need put here
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core.
// On ARM, x86-32, and 32-bit MIPS, it is the caller's responsibility
// to arrange for 64-bit alignment of 64-bit words accessed atomically.
// The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.
timeout time.Time
}
func (Self *ReceiveWindowQueue) New() {
Self.readOp = make(chan struct{})
Self.chain = new(bufChain)
Self.chain.new(64)
Self.stopOp = make(chan struct{}, 2)
}
func (Self *ReceiveWindowQueue) Push(element *common.ListElement) {
var length, wait uint32
for {
ptrs := atomic.LoadUint64(&Self.lengthWait)
length, wait = Self.chain.head.unpack(ptrs)
length += uint32(element.L)
if atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(length, 0)) {
break
}
// another goroutine change the length or into wait, make sure
}
//logs.Warn("window push before", Self.Len(), uint32(element.l), len(element.buf))
Self.chain.pushHead(unsafe.Pointer(element))
//logs.Warn("window push", Self.Len())
if wait == 1 {
Self.allowPop()
}
return
}
func (Self *ReceiveWindowQueue) Pop() (element *common.ListElement, err error) {
var length uint32
startPop:
ptrs := atomic.LoadUint64(&Self.lengthWait)
length, _ = Self.chain.head.unpack(ptrs)
if length == 0 {
if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) {
goto startPop // another goroutine is pushing
}
err = Self.waitPush()
// there is no more data in queue, wait for it
if err != nil {
return
}
goto startPop // wait finish, trying to get the new status
}
// length is not zero, so try to pop
for {
element = Self.TryPop()
if element != nil {
return
}
runtime.Gosched() // another goroutine is still pushing
}
}
func (Self *ReceiveWindowQueue) TryPop() (element *common.ListElement) {
ptr, ok := Self.chain.popTail()
if ok {
//logs.Warn("window pop before", Self.Len())
element = (*common.ListElement)(ptr)
atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<<dequeueBits - 1))
//logs.Warn("window pop", Self.Len(), uint32(element.l))
return
}
return nil
}
func (Self *ReceiveWindowQueue) allowPop() (closed bool) {
//logs.Warn("allow pop", Self.Len())
select { select {
case Self.readOp <- struct{}{}: case Self.readOp <- struct{}{}:
return false return false
case <-Self.cleanOp: case <-Self.stopOp:
return true return true
} }
} }
func (Self *Queue) insert(packager *common.MuxPackager) { func (Self *ReceiveWindowQueue) waitPush() (err error) {
element := Self.list.Back() //logs.Warn("wait push")
for { //defer logs.Warn("wait push finish")
if element == nil { // Queue dose not have any of msg package with this close package id t := Self.timeout.Sub(time.Now())
Self.list.PushFront(packager) // insert close package to first if t <= 0 {
break t = time.Minute * 5
} }
if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG && timer := time.NewTimer(t)
element.Value.(*common.MuxPackager).Id == packager.Id { defer timer.Stop()
Self.list.InsertAfter(packager, element) // Queue has some msg package //logs.Warn("queue into wait")
// with this close package id, insert close package after last msg package
break
}
element = element.Prev()
}
}
func (Self *Queue) Pop() (packager *common.MuxPackager) {
Self.mutex.Lock()
element := Self.list.Front()
if element != nil {
packager = element.Value.(*common.MuxPackager)
Self.list.Remove(element)
Self.mutex.Unlock()
return
}
Self.popWait = true // Queue is empty, notice Push method
Self.mutex.Unlock()
select { select {
case <-Self.readOp: case <-Self.readOp:
return Self.Pop() //logs.Warn("queue wait finish")
case <-Self.cleanOp:
return nil return nil
case <-Self.stopOp:
err = io.EOF
return
case <-timer.C:
err = errors.New("mux.queue: read time out")
return
} }
} }
func (Self *Queue) Len() (n int) { func (Self *ReceiveWindowQueue) Len() (n uint32) {
n = Self.list.Len() ptrs := atomic.LoadUint64(&Self.lengthWait)
n, _ = Self.chain.head.unpack(ptrs)
return return
} }
func (Self *Queue) Clean() { func (Self *ReceiveWindowQueue) Stop() {
Self.cleanOp <- struct{}{} Self.stopOp <- struct{}{}
Self.cleanOp <- struct{}{} Self.stopOp <- struct{}{}
close(Self.cleanOp) }
func (Self *ReceiveWindowQueue) SetTimeOut(t time.Time) {
Self.timeout = t
}
// https://golang.org/src/sync/poolqueue.go
type bufDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
vals []unsafe.Pointer
starving uint32
}
const dequeueBits = 32
// dequeueLimit is the maximum size of a bufDequeue.
//
// This must be at most (1<<dequeueBits)/2 because detecting fullness
// depends on wrapping around the ring buffer without wrapping around
// the index. We divide by 4 so this fits in an int on 32-bit.
const dequeueLimit = (1 << dequeueBits) / 4
func (d *bufDequeue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
func (d *bufDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
// pushHead adds val at the head of the queue. It returns false if the
// queue is full.
func (d *bufDequeue) pushHead(val unsafe.Pointer) bool {
var slot *unsafe.Pointer
var starve uint8
if atomic.LoadUint32(&d.starving) > 0 {
runtime.Gosched()
}
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
ptrs2 := d.pack(head+1, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[head&uint32(len(d.vals)-1)]
if starve >= 3 && atomic.LoadUint32(&d.starving) > 0 {
atomic.StoreUint32(&d.starving, 0)
}
break
}
starve++
if starve >= 3 {
atomic.StoreUint32(&d.starving, 1)
}
}
// The head slot is free, so we own it.
*slot = val
return true
}
// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
func (d *bufDequeue) popTail() (unsafe.Pointer, bool) {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
slot := &d.vals[tail&uint32(len(d.vals)-1)]
var val unsafe.Pointer
for {
val = atomic.LoadPointer(slot)
if val != nil {
// We now own slot.
break
}
// Another goroutine is still pushing data on the tail.
}
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
atomic.StorePointer(slot, nil)
// At this point pushHead owns the slot.
if tail < math.MaxUint32 {
atomic.AddUint64(&d.headTail, 1)
} else {
atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1))
}
return val, true
}
// bufChain is a dynamically-sized version of bufDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type bufChain struct {
// head is the bufDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *bufChainElt
// tail is the bufDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *bufChainElt
newChain uint32
}
type bufChainElt struct {
bufDequeue
// next and prev link to the adjacent poolChainElts in this
// bufChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *bufChainElt
}
func storePoolChainElt(pp **bufChainElt, v *bufChainElt) {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
}
func loadPoolChainElt(pp **bufChainElt) *bufChainElt {
return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
}
func (c *bufChain) new(initSize int) {
// Initialize the chain.
// initSize must be a power of 2
d := new(bufChainElt)
d.vals = make([]unsafe.Pointer, initSize)
storePoolChainElt(&c.head, d)
storePoolChainElt(&c.tail, d)
}
func (c *bufChain) pushHead(val unsafe.Pointer) {
startPush:
for {
if atomic.LoadUint32(&c.newChain) > 0 {
runtime.Gosched()
} else {
break
}
}
d := loadPoolChainElt(&c.head)
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) {
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
d2 := &bufChainElt{prev: d}
d2.vals = make([]unsafe.Pointer, newSize)
d2.pushHead(val)
storePoolChainElt(&c.head, d2)
storePoolChainElt(&d.next, d2)
atomic.StoreUint32(&c.newChain, 0)
return
}
goto startPush
}
func (c *bufChain) popTail() (unsafe.Pointer, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the TryPop and the TryPop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
}
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next TryPop doesn't have to look at the empty
// dequeue again.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
} }

154
lib/mux/web.go Normal file
View File

@ -0,0 +1,154 @@
package mux
import (
"fmt"
"github.com/astaxie/beego/logs"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
)
type connLog struct {
startTime time.Time
isClose bool
logs []string
}
var logms map[int]*connLog
var logmc map[int]*connLog
var copyMaps map[int]*connLog
var copyMapc map[int]*connLog
var stashTimeNow time.Time
var mutex sync.Mutex
func deepCopyMaps() {
copyMaps = make(map[int]*connLog)
for k, v := range logms {
copyMaps[k] = &connLog{
startTime: v.startTime,
isClose: v.isClose,
logs: v.logs,
}
}
}
func deepCopyMapc() {
copyMapc = make(map[int]*connLog)
for k, v := range logmc {
copyMapc[k] = &connLog{
startTime: v.startTime,
isClose: v.isClose,
logs: v.logs,
}
}
}
func init() {
logms = make(map[int]*connLog)
logmc = make(map[int]*connLog)
}
type IntSlice []int
func (s IntSlice) Len() int { return len(s) }
func (s IntSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s IntSlice) Less(i, j int) bool { return s[i] < s[j] }
func NewLogServer() {
http.HandleFunc("/", index)
http.HandleFunc("/detail", detail)
http.HandleFunc("/stash", stash)
fmt.Println(http.ListenAndServe(":8899", nil))
}
func stash(w http.ResponseWriter, r *http.Request) {
stashTimeNow = time.Now()
deepCopyMaps()
deepCopyMapc()
w.Write([]byte("ok"))
}
func getM(label string, id int) (cL *connLog) {
label = strings.TrimSpace(label)
mutex.Lock()
defer mutex.Unlock()
if label == "nps" {
cL = logms[id]
}
if label == "npc" {
cL = logmc[id]
}
return
}
func setM(label string, id int, cL *connLog) {
label = strings.TrimSpace(label)
mutex.Lock()
defer mutex.Unlock()
if label == "nps" {
logms[id] = cL
}
if label == "npc" {
logmc[id] = cL
}
}
func index(w http.ResponseWriter, r *http.Request) {
var keys []int
for k := range copyMaps {
keys = append(keys, k)
}
sort.Sort(IntSlice(keys))
var s string
s += "<h1>nps</h1>"
for _, v := range keys {
connL := copyMaps[v]
s += "<a href='/detail?id=" + strconv.Itoa(v) + "&label=nps" + "'>" + strconv.Itoa(v) + "</a>----------"
s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------"
s += strconv.FormatBool(connL.isClose)
s += "<br>"
}
keys = keys[:0]
s += "<h1>npc</h1>"
for k := range copyMapc {
keys = append(keys, k)
}
sort.Sort(IntSlice(keys))
for _, v := range keys {
connL := copyMapc[v]
s += "<a href='/detail?id=" + strconv.Itoa(v) + "&label=npc" + "'>" + strconv.Itoa(v) + "</a>----------"
s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------"
s += strconv.FormatBool(connL.isClose)
s += "<br>"
}
w.Write([]byte(s))
}
func detail(w http.ResponseWriter, r *http.Request) {
id := r.FormValue("id")
label := r.FormValue("label")
logs.Warn(label)
i, _ := strconv.Atoi(id)
var v *connLog
if label == "nps" {
v, _ = copyMaps[i]
}
if label == "npc" {
v, _ = copyMapc[i]
}
var s string
if v != nil {
for i, vv := range v.logs {
s += "<p>" + strconv.Itoa(i+1) + ":" + vv + "</p>"
}
}
w.Write([]byte(s))
}

7
lib/mux/web_test.go Normal file
View File

@ -0,0 +1,7 @@
package mux
import "testing"
func TestWeb(t *testing.T) {
NewLogServer()
}

View File

@ -1,8 +1,8 @@
package version package version
const VERSION = "0.23.2" const VERSION = "0.24.0"
// Compulsory minimum version, Minimum downward compatibility to this version // Compulsory minimum version, Minimum downward compatibility to this version
func GetVersion() string { func GetVersion() string {
return "0.21.0" return "0.24.0"
} }

View File

@ -199,8 +199,7 @@ func (s *Sock5ModeServer) handleConn(c net.Conn) {
c.Close() c.Close()
return return
} }
if (s.task.Client.Cnf.U != "" && s.task.Client.Cnf.P != "") || (s.task.MultiAccount != nil && len(s.task.MultiAccount.AccountMap) > 0) {
if s.task.Client.Cnf.U != "" && s.task.Client.Cnf.P != "" {
buf[1] = UserPassAuth buf[1] = UserPassAuth
c.Write(buf) c.Write(buf)
if err := s.Auth(c); err != nil { if err := s.Auth(c); err != nil {
@ -237,7 +236,22 @@ func (s *Sock5ModeServer) Auth(c net.Conn) error {
if _, err := io.ReadAtLeast(c, pass, passLen); err != nil { if _, err := io.ReadAtLeast(c, pass, passLen); err != nil {
return err return err
} }
if string(user) == s.task.Client.Cnf.U && string(pass) == s.task.Client.Cnf.P {
var U, P string
if s.task.MultiAccount != nil {
// enable multi user auth
U = string(user)
var ok bool
P, ok = s.task.MultiAccount.AccountMap[U]
if !ok {
return errors.New("验证不通过")
}
} else {
U = s.task.Client.Cnf.U
P = s.task.Client.Cnf.P
}
if string(user) == U && string(pass) == P {
if _, err := c.Write([]byte{userAuthVersion, authSuccess}); err != nil { if _, err := c.Write([]byte{userAuthVersion, authSuccess}); err != nil {
return err return err
} }

View File

@ -22,7 +22,7 @@ func (self *LoginController) Verify() {
if self.GetString("password") == beego.AppConfig.String("web_password") && self.GetString("username") == beego.AppConfig.String("web_username") { if self.GetString("password") == beego.AppConfig.String("web_password") && self.GetString("username") == beego.AppConfig.String("web_username") {
self.SetSession("isAdmin", true) self.SetSession("isAdmin", true)
auth = true auth = true
server.Bridge.Register.Store(common.GetIpByAddr(self.Ctx.Request.RemoteAddr), time.Now().Add(time.Hour*time.Duration(2))) server.Bridge.Register.Store(common.GetIpByAddr(self.Ctx.Input.IP()), time.Now().Add(time.Hour*time.Duration(2)))
} }
b, err := beego.AppConfig.Bool("allow_user_login") b, err := beego.AppConfig.Bool("allow_user_login")
if err == nil && b && !auth { if err == nil && b && !auth {