From 9f03c2f6eb3614424d77c63358985bf25df25463 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=B2=B3?= Date: Sun, 17 Feb 2019 19:36:48 +0800 Subject: [PATCH] bug --- bridge/bridge.go | 73 +++++++++++++++++++++++++++++++----------- client/client.go | 46 +++++++++++++++++++------- cmd/nps/nps.go | 5 +++ conf/npc.conf | 9 ++---- lib/common/const.go | 29 +++++++++-------- lib/conn/conn.go | 20 ++++++------ lib/conn/link.go | 37 +++++++++++++++++++++ lib/pool/pool.go | 18 +++++++++-- server/proxy/base.go | 1 + server/proxy/http.go | 7 ++-- server/proxy/socks5.go | 1 + server/proxy/tcp.go | 1 + server/proxy/udp.go | 1 + 13 files changed, 183 insertions(+), 65 deletions(-) diff --git a/bridge/bridge.go b/bridge/bridge.go index 63dfa5e..4a5b964 100755 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -22,19 +22,21 @@ import ( type Client struct { tunnel *conn.Conn signal *conn.Conn + msg *conn.Conn linkMap map[int]*conn.Link linkStatusMap map[int]bool stop chan bool sync.RWMutex } -func NewClient(t *conn.Conn, s *conn.Conn) *Client { +func NewClient(t *conn.Conn, s *conn.Conn, m *conn.Conn) *Client { return &Client{ linkMap: make(map[int]*conn.Link), stop: make(chan bool), linkStatusMap: make(map[int]bool), signal: s, tunnel: t, + msg: m, } } @@ -150,6 +152,17 @@ func (s *Bridge) closeClient(id int) { delete(s.Client, id) } } +func (s *Bridge) delClient(id int) { + s.clientLock.Lock() + defer s.clientLock.Unlock() + if v, ok := s.Client[id]; ok { + if c, err := file.GetCsvDb().GetClient(id); err == nil && c.NoStore { + s.CloseClient <- c.Id + } + v.signal.Close() + delete(s.Client, id) + } +} //tcp连接类型区分 func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) { @@ -166,7 +179,7 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) { v.signal = c v.Unlock() } else { - s.Client[id] = NewClient(nil, c) + s.Client[id] = NewClient(nil, c, nil) s.clientLock.Unlock() } lg.Printf("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr()) @@ -179,7 +192,7 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) { v.tunnel = c v.Unlock() } else { - s.Client[id] = NewClient(c, nil) + s.Client[id] = NewClient(c, nil, nil) s.clientLock.Unlock() } go s.clientCopy(id) @@ -187,11 +200,44 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) { go s.GetConfig(c) case common.WORK_REGISTER: go s.register(c) + case common.WORK_SEND_STATUS: + s.clientLock.Lock() + if v, ok := s.Client[id]; ok { + s.clientLock.Unlock() + v.Lock() + v.msg = c + v.Unlock() + } else { + s.Client[id] = NewClient(nil, nil, c) + s.clientLock.Unlock() + } + go s.getMsgStatus(id) } c.SetAlive(s.tunnelType) return } +func (s *Bridge) getMsgStatus(clientId int) { + s.clientLock.Lock() + client := s.Client[clientId] + s.clientLock.Unlock() + + if client == nil { + return + } + for { + if id, err := client.msg.GetLen(); err != nil { + s.closeClient(clientId) + return + } else { + client.Lock() + if v, ok := client.linkMap[id]; ok { + v.StatusCh <- true + } + client.Unlock() + } + } +} func (s *Bridge) register(c *conn.Conn) { var hour int32 if err := binary.Read(c, binary.LittleEndian, &hour); err == nil { @@ -251,12 +297,14 @@ func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, linkAddr string) (t s.DelClient(clientId) return } + if v.tunnel == nil { err = errors.New("get tunnel connection error") return } else { tunnel = v.tunnel } + link.MsgConn = v.msg v.Lock() v.linkMap[link.Id] = link v.Unlock() @@ -412,7 +460,8 @@ func (s *Bridge) clientCopy(clientId int) { for { if id, err := client.tunnel.GetLen(); err != nil { - s.closeClient(clientId) + lg.Println("read msg content length error close client") + s.delClient(clientId) break } else { client.Lock() @@ -420,23 +469,11 @@ func (s *Bridge) clientCopy(clientId int) { client.Unlock() if content, err := client.tunnel.GetMsgContent(link); err != nil { pool.PutBufPoolCopy(content) - s.closeClient(clientId) + s.delClient(clientId) lg.Println("read msg content error", err, "close client") break } else { - if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF { - if link.Conn != nil { - link.Conn.Close() - } - } else { - if link.UdpListener != nil && link.UdpRemoteAddr != nil { - link.UdpListener.WriteToUDP(content, link.UdpRemoteAddr) - } else { - link.Conn.Write(content) - } - link.Flow.Add(0, len(content)) - } - pool.PutBufPoolCopy(content) + link.MsgCh <- content } } else { client.Unlock() diff --git a/client/client.go b/client/client.go index ebb8b46..8027dfb 100755 --- a/client/client.go +++ b/client/client.go @@ -21,6 +21,7 @@ type TRPClient struct { svrAddr string linkMap map[int]*conn.Link tunnel *conn.Conn + msgTunnel *conn.Conn bridgeConnType string stop chan bool proxyUrl string @@ -67,6 +68,7 @@ func (s *TRPClient) Close() { //处理 func (s *TRPClient) processor(c *conn.Conn) { go s.dealChan() + go s.getMsgStatus() for { flags, err := c.ReadFlag() if err != nil { @@ -83,7 +85,9 @@ func (s *TRPClient) processor(c *conn.Conn) { s.Lock() s.linkMap[link.Id] = link s.Unlock() + link.MsgConn = s.msgTunnel go s.linkProcess(link, c) + link.Run(false) } case common.RES_CLOSE: lg.Fatalln("The authentication key is connected by another client or the server closes the client.") @@ -109,9 +113,7 @@ func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) { lg.Println("connect to ", link.Host, "error:", err) return } - c.WriteSuccess(link.Id) - link.Conn = conn.NewConn(server) buf := pool.BufPoolCopy.Get().([]byte) for { @@ -123,8 +125,11 @@ func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) { c.Close() break } - lg.Println("send ok", link.Id) + if link.ConnType == common.CONN_UDP { + break + } } + <-link.StatusCh } pool.PutBufPoolCopy(buf) s.Lock() @@ -132,6 +137,31 @@ func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) { s.Unlock() } +func (s *TRPClient) getMsgStatus() { + var err error + s.msgTunnel, err = NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_SEND_STATUS, s.proxyUrl) + if err != nil { + lg.Println("connect to ", s.svrAddr, "error:", err) + return + } + go func() { + for { + if id, err := s.msgTunnel.GetLen(); err != nil { + break + } else { + s.Lock() + if v, ok := s.linkMap[id]; ok { + s.Unlock() + v.StatusCh <- true + } else { + s.Unlock() + } + } + } + }() + <-s.stop +} + //隧道模式处理 func (s *TRPClient) dealChan() { var err error @@ -140,26 +170,20 @@ func (s *TRPClient) dealChan() { lg.Println("connect to ", s.svrAddr, "error:", err) return } - go func() { for { if id, err := s.tunnel.GetLen(); err != nil { + lg.Println("get id error", err, id) break } else { s.Lock() if v, ok := s.linkMap[id]; ok { s.Unlock() if content, err := s.tunnel.GetMsgContent(v); err != nil { - lg.Println("get msg content error:", err, id) pool.PutBufPoolCopy(content) break } else { - if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF { - v.Conn.Close() - } else if v.Conn != nil { - v.Conn.Write(content) - } - pool.PutBufPoolCopy(content) + v.MsgCh <- content } } else { s.Unlock() diff --git a/cmd/nps/nps.go b/cmd/nps/nps.go index 18f1600..8a17719 100644 --- a/cmd/nps/nps.go +++ b/cmd/nps/nps.go @@ -12,6 +12,8 @@ import ( "github.com/cnlh/nps/vender/github.com/astaxie/beego" _ "github.com/cnlh/nps/web/routers" "log" + "net/http" + _ "net/http/pprof" "os" "path/filepath" ) @@ -38,6 +40,9 @@ func main() { return } } + go func() { + http.ListenAndServe("0.0.0.0:8899", nil) + }() if *logType == "stdout" { lg.InitLogFile("nps", true, common.GetLogPath()) } else { diff --git a/conf/npc.conf b/conf/npc.conf index 074dc70..325d32f 100644 --- a/conf/npc.conf +++ b/conf/npc.conf @@ -2,11 +2,8 @@ server=127.0.0.1:8284 tp=tcp vkey=123 -compress=snappy -crypt=true auto_reconnection=true -username=111 -password=222 + [web1] host=a.o.com host_change=www.proxy.com @@ -21,7 +18,7 @@ header_set_proxy=nps [tcp] mode=tcpServer -target=8001-8005,8006 +target=8001-8005,8080 port=9001-9005,9006 [socks5] @@ -34,5 +31,5 @@ port=9008 [udp] mode=udpServer -port=9009 +port=53 target=114.114.114.114:53 diff --git a/lib/common/const.go b/lib/common/const.go index aaa463d..b9d1b50 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -6,20 +6,21 @@ const ( COMPRESS_NONE_DECODE COMPRESS_SNAPY_ENCODE COMPRESS_SNAPY_DECODE - VERIFY_EER = "vkey" - VERIFY_SUCCESS = "sucs" - WORK_MAIN = "main" - WORK_CHAN = "chan" - WORK_CONFIG = "conf" - WORK_REGISTER = "rgst" - WORK_STATUS = "stus" - RES_SIGN = "sign" - RES_MSG = "msg0" - RES_CLOSE = "clse" - NEW_CONN = "conn" //新连接标志 - NEW_TASK = "task" //新连接标志 - NEW_CONF = "conf" //新连接标志 - NEW_HOST = "host" //新连接标志 + VERIFY_EER = "vkey" + VERIFY_SUCCESS = "sucs" + WORK_MAIN = "main" + WORK_CHAN = "chan" + WORK_SEND_STATUS = "sdst" + WORK_CONFIG = "conf" + WORK_REGISTER = "rgst" + WORK_STATUS = "stus" + RES_SIGN = "sign" + RES_MSG = "msg0" + RES_CLOSE = "clse" + NEW_CONN = "conn" //新连接标志 + NEW_TASK = "task" //新连接标志 + NEW_CONF = "conf" //新连接标志 + NEW_HOST = "host" //新连接标志 CONN_TCP = "tcp" CONN_UDP = "udp" diff --git a/lib/conn/conn.go b/lib/conn/conn.go index 5e31e7e..f142af1 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -74,12 +74,12 @@ func (s *Conn) ReadLen(cLen int) ([]byte, error) { return nil, errors.New("长度错误" + strconv.Itoa(cLen)) } var buf []byte - if cLen <= pool.PoolSizeSmall { + if cLen < pool.PoolSizeSmall { buf = pool.BufPoolSmall.Get().([]byte)[:cLen] - defer pool.BufPoolSmall.Put(buf) + defer pool.PutBufPoolSmall(buf) } else { buf = pool.BufPoolMax.Get().([]byte)[:cLen] - defer pool.BufPoolMax.Put(buf) + defer pool.PutBufPoolMax(buf) } if n, err := io.ReadFull(s, buf); err != nil || n != cLen { return buf, errors.New("Error reading specified length " + err.Error()) @@ -190,14 +190,10 @@ func (s *Conn) SendMsg(content []byte, link *Link) (n int, err error) { */ s.Lock() defer s.Unlock() - raw := bytes.NewBuffer([]byte{}) - binary.Write(raw, binary.LittleEndian, int32(link.Id)) - if n, err = s.Write(raw.Bytes()); err != nil { + if err = binary.Write(s.Conn, binary.LittleEndian, int32(link.Id)); err != nil { return } - raw.Reset() - binary.Write(raw, binary.LittleEndian, content) - n, err = s.WriteTo(raw.Bytes(), link.En, link.Crypt, link.Rate) + n, err = s.WriteTo(content, link.En, link.Crypt, link.Rate) return } @@ -260,6 +256,8 @@ func (s *Conn) GetLinkInfo() (lk *Link, err error) { lk.En = common.GetIntNoErrByStr(string(buf[11+hostLen])) lk.De = common.GetIntNoErrByStr(string(buf[12+hostLen])) lk.Crypt = common.GetBoolByStr(string(buf[13+hostLen])) + lk.MsgCh = make(chan []byte) + lk.StatusCh = make(chan bool) } return } @@ -399,6 +397,10 @@ func (s *Conn) GetTaskInfo() (t *file.Tunnel, err error) { return } +func (s *Conn) WriteWriteSuccess(id int) error { + return binary.Write(s.Conn, binary.LittleEndian, int32(id)) +} + //write connect success func (s *Conn) WriteSuccess(id int) (int, error) { raw := bytes.NewBuffer([]byte{}) diff --git a/lib/conn/link.go b/lib/conn/link.go index 9f50681..f0005b4 100644 --- a/lib/conn/link.go +++ b/lib/conn/link.go @@ -1,7 +1,9 @@ package conn import ( + "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/file" + "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/lib/rate" "net" ) @@ -18,6 +20,9 @@ type Link struct { UdpListener *net.UDPConn Rate *rate.Rate UdpRemoteAddr *net.UDPAddr + MsgCh chan []byte + MsgConn *Conn + StatusCh chan bool } func NewLink(id int, connType string, host string, en, de int, crypt bool, c *Conn, flow *file.Flow, udpListener *net.UDPConn, rate *rate.Rate, UdpRemoteAddr *net.UDPAddr) *Link { @@ -33,5 +38,37 @@ func NewLink(id int, connType string, host string, en, de int, crypt bool, c *Co UdpListener: udpListener, Rate: rate, UdpRemoteAddr: UdpRemoteAddr, + MsgCh: make(chan []byte), + StatusCh: make(chan bool), } } + +func (s *Link) Run(flow bool) { + go func() { + for { + select { + case content := <-s.MsgCh: + if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF { + if s.Conn != nil { + s.Conn.Close() + } + return + } else { + if s.UdpListener != nil && s.UdpRemoteAddr != nil { + s.UdpListener.WriteToUDP(content, s.UdpRemoteAddr) + } else { + s.Conn.Write(content) + } + if flow { + s.Flow.Add(0, len(content)) + } + if s.ConnType == common.CONN_UDP { + return + } + s.MsgConn.WriteWriteSuccess(s.Id) + pool.PutBufPoolCopy(content) + } + } + } + }() +} diff --git a/lib/pool/pool.go b/lib/pool/pool.go index 7c42c2c..6fbb77d 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -36,14 +36,26 @@ var BufPoolCopy = sync.Pool{ }, } +func PutBufPoolUdp(buf []byte) { + if cap(buf) == PoolSizeUdp { + BufPoolUdp.Put(buf[:PoolSizeUdp]) + } +} + func PutBufPoolCopy(buf []byte) { if cap(buf) == PoolSizeCopy { BufPoolCopy.Put(buf[:PoolSizeCopy]) } } -func PutBufPoolUdp(buf []byte) { - if cap(buf) == PoolSizeUdp { - BufPoolUdp.Put(buf[:PoolSizeUdp]) +func PutBufPoolSmall(buf []byte) { + if cap(buf) == PoolSizeSmall { + BufPoolSmall.Put(buf[:PoolSizeSmall]) + } +} + +func PutBufPoolMax(buf []byte) { + if cap(buf) == PoolSize { + BufPoolMax.Put(buf[:PoolSize]) } } diff --git a/server/proxy/base.go b/server/proxy/base.go index 9471260..4fcf3f2 100644 --- a/server/proxy/base.go +++ b/server/proxy/base.go @@ -60,6 +60,7 @@ func (s *server) linkCopy(link *conn.Link, c *conn.Conn, rb []byte, tunnel *conn } flow.Add(n, 0) } + <-link.StatusCh } pool.PutBufPoolCopy(buf) } diff --git a/server/proxy/http.go b/server/proxy/http.go index 3d11694..6ec783d 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -9,7 +9,6 @@ import ( "github.com/cnlh/nps/lib/file" "github.com/cnlh/nps/lib/lg" "github.com/cnlh/nps/vender/github.com/astaxie/beego" - "log" "net/http" "net/http/httputil" "path/filepath" @@ -137,9 +136,10 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) { } lk = conn.NewLink(host.Client.GetId(), common.CONN_TCP, host.GetRandomTarget(), host.Client.Cnf.CompressEncode, host.Client.Cnf.CompressDecode, host.Client.Cnf.Crypt, c, host.Flow, nil, host.Client.Rate, nil) if tunnel, err = s.bridge.SendLinkInfo(host.Client.Id, lk, c.Conn.RemoteAddr().String()); err != nil { - log.Println(err) + lg.Println(err) break } + lk.Run(true) isConn = false } else { r, err = http.ReadRequest(bufio.NewReader(c)) @@ -166,6 +166,7 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) { c.Close() break } + <-lk.StatusCh } end: if isConn { @@ -173,9 +174,7 @@ end: } else { tunnel.SendMsg([]byte(common.IO_EOF), lk) } - c.Close() - } func (s *httpServer) NewServer(port int) *http.Server { diff --git a/server/proxy/socks5.go b/server/proxy/socks5.go index ee3c121..eaafc4e 100755 --- a/server/proxy/socks5.go +++ b/server/proxy/socks5.go @@ -148,6 +148,7 @@ func (s *Sock5ModeServer) doConnect(c net.Conn, command uint8) { return } else { s.sendReply(c, succeeded) + link.Run(true) s.linkCopy(link, conn.NewConn(c), nil, tunnel, s.task.Flow) } return diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 0d259f3..5455395 100755 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -57,6 +57,7 @@ func (s *TunnelModeServer) dealClient(c *conn.Conn, cnf *file.Config, addr strin c.Close() return err } else { + link.Run(true) s.linkCopy(link, c, rb, tunnel, s.task.Flow) } return nil diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 1c65f01..70b26f6 100755 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -56,6 +56,7 @@ func (s *UdpModeServer) process(addr *net.UDPAddr, data []byte) { s.task.Flow.Add(len(data), 0) tunnel.SendMsg(data, link) pool.PutBufPoolUdp(data) + link.Run(true) } }