From a66b465046f9b978058d5b3268119081fcad37c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=B2=B3?= Date: Tue, 19 Mar 2019 22:41:40 +0800 Subject: [PATCH] Https kcp --- bridge/bridge.go | 26 +++++++------ client/client.go | 34 ++++++++--------- client/local.go | 4 +- conf/nps.conf | 3 +- lib/common/util.go | 9 +++++ lib/file/file.go | 2 +- lib/mux/mux.go | 19 +++++++--- lib/mux/pmux.go | 11 +----- server/connection/connection.go | 10 +---- server/proxy/base.go | 14 +++---- server/proxy/http.go | 65 ++++++++++++++++++++++++++++++--- server/proxy/tcp.go | 4 +- server/server.go | 2 +- 13 files changed, 129 insertions(+), 74 deletions(-) diff --git a/bridge/bridge.go b/bridge/bridge.go index dde208b..db6fe1e 100755 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -16,6 +16,7 @@ import ( "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "github.com/cnlh/nps/vender/github.com/xtaci/kcp" "net" + "os" "strconv" "strings" "sync" @@ -70,15 +71,14 @@ func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int func (s *Bridge) StartTunnel() error { go s.ping() - l, err := connection.GetBridgeListener(s.tunnelType) - if err != nil { - return err - } if s.tunnelType == "kcp" { - listener, ok := l.(*kcp.Listener) - if !ok { + listener, err := kcp.ListenWithOptions(beego.AppConfig.String("bridge_ip")+":"+beego.AppConfig.String("bridge_port"), nil, 150, 3) + if err != nil { + logs.Error(err) + os.Exit(0) return err } + logs.Info("server start, the bridge type is %s, the bridge port is %d", s.tunnelType, s.TunnelPort) go func() { for { c, err := listener.AcceptKCP() @@ -91,8 +91,10 @@ func (s *Bridge) StartTunnel() error { } }() } else { - listener, ok := l.(net.Listener) - if !ok { + listener, err := connection.GetBridgeListener(s.tunnelType) + if err != nil { + logs.Error(err) + os.Exit(0) return err } go func() { @@ -253,10 +255,10 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) { if v, ok := s.Client[id]; ok { s.clientLock.Unlock() v.Lock() - v.tunnel = mux.NewMux(c.Conn) + v.tunnel = mux.NewMux(c.Conn, s.tunnelType) v.Unlock() } else { - s.Client[id] = NewClient(mux.NewMux(c.Conn), nil, nil) + s.Client[id] = NewClient(mux.NewMux(c.Conn, s.tunnelType), nil, nil) s.clientLock.Unlock() } case common.WORK_CONFIG: @@ -282,10 +284,10 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) { if v, ok := s.Client[id]; ok { s.clientLock.Unlock() v.Lock() - v.file = mux.NewMux(c.Conn) + v.file = mux.NewMux(c.Conn, s.tunnelType) v.Unlock() } else { - s.Client[id] = NewClient(nil, mux.NewMux(c.Conn), nil) + s.Client[id] = NewClient(nil, mux.NewMux(c.Conn, s.tunnelType), nil) s.clientLock.Unlock() } case common.WORK_P2P: diff --git a/client/client.go b/client/client.go index a3fc3da..3b1a2cd 100755 --- a/client/client.go +++ b/client/client.go @@ -15,11 +15,11 @@ import ( type TRPClient struct { svrAddr string bridgeConnType string - stop chan bool proxyUrl string vKey string tunnel *mux.Mux signal *conn.Conn + ticker *time.Ticker cnf *config.Config } @@ -29,7 +29,6 @@ func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl st svrAddr: svraddr, vKey: vKey, bridgeConnType: bridgeConnType, - stop: make(chan bool), proxyUrl: proxyUrl, cnf: cnf, } @@ -51,8 +50,9 @@ retry: } func (s *TRPClient) Close() { - s.stop <- true + s.tunnel.Close() s.signal.Close() + s.ticker.Stop() } //处理 @@ -168,7 +168,7 @@ func (s *TRPClient) newUdpConn(rAddr string, md5Password string) { if udpTunnel.RemoteAddr().String() == string(b) { conn.SetUdpSession(udpTunnel) //读取link,设置msgCh 设置msgConn消息回传响应机制 - l := mux.NewMux(udpTunnel) + l := mux.NewMux(udpTunnel, s.bridgeConnType) for { connMux, err := l.Accept() if err != nil { @@ -187,18 +187,16 @@ func (s *TRPClient) dealChan() { logs.Error("connect to ", s.svrAddr, "error:", err) return } - go func() { - s.tunnel = mux.NewMux(tunnel.Conn) - for { - src, err := s.tunnel.Accept() - if err != nil { - logs.Warn(err) - break - } - go s.srcProcess(src) + s.tunnel = mux.NewMux(tunnel.Conn, s.bridgeConnType) + for { + src, err := s.tunnel.Accept() + if err != nil { + logs.Warn(err) + s.Close() + break } - }() - <-s.stop + go s.srcProcess(src) + } } func (s *TRPClient) srcProcess(src net.Conn) { @@ -221,14 +219,14 @@ func (s *TRPClient) srcProcess(src net.Conn) { } func (s *TRPClient) ping() { - ticker := time.NewTicker(time.Second * 5) + s.ticker = time.NewTicker(time.Second * 5) loop: for { select { - case <-ticker.C: + case <-s.ticker.C: if s.tunnel.IsClose { s.Close() - ticker.Stop() + s.ticker.Stop() break loop } } diff --git a/client/local.go b/client/local.go index d4ab35d..4e6f562 100644 --- a/client/local.go +++ b/client/local.go @@ -39,7 +39,7 @@ func startLocalFileServer(config *config.CommonConfig, t *file.Tunnel, vkey stri } logs.Info("start local file system, local path %s, strip prefix %s ,remote port %s ", t.LocalPath, t.StripPre, t.Ports) fileServer = append(fileServer, srv) - listener := mux.NewMux(remoteConn.Conn) + listener := mux.NewMux(remoteConn.Conn, common.CONN_TCP) logs.Warn(srv.Serve(listener)) } @@ -88,7 +88,7 @@ func processP2P(localTcpConn net.Conn, config *config.CommonConfig, l *config.Lo if udpConn == nil { return } - muxSession = mux.NewMux(udpConn) + muxSession = mux.NewMux(udpConn, "kcp") } nowConn, err := muxSession.NewConn() if err != nil { diff --git a/conf/nps.conf b/conf/nps.conf index b336632..409b1c4 100755 --- a/conf/nps.conf +++ b/conf/nps.conf @@ -6,7 +6,8 @@ runmode = dev #HTTP(S) proxy port, no startup if empty http_proxy_port=80 -#https_proxy_port=8024 +#https_proxy_port=443 +https_just_proxy=true #certFile absolute path pem_path=conf/server.pem #KeyFile absolute path diff --git a/lib/common/util.go b/lib/common/util.go index ce23ea6..4b7bf00 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -335,3 +335,12 @@ func RemoveArrVal(arr []string, val string) []string { } return arr } + +func BytesToNum(b []byte) int { + var str string + for i := 0; i < len(b); i++ { + str += strconv.Itoa(int(b[i])) + } + x, _ := strconv.Atoi(str) + return int(x) +} diff --git a/lib/file/file.go b/lib/file/file.go index 113f01c..e5de93f 100644 --- a/lib/file/file.go +++ b/lib/file/file.go @@ -561,7 +561,7 @@ func (s *Csv) GetInfoByHost(host string, r *http.Request) (h *Host, err error) { v.Location = "/" } if strings.Index(r.RequestURI, v.Location) == 0 { - if h == nil || (len(v.Location) > len(h.Location)) { + if h == nil || (len(v.Location) < len(h.Location)) { h = v } } diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 7be000c..32b6f43 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "errors" "github.com/cnlh/nps/lib/pool" - "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "math" "net" "sync" @@ -33,10 +32,12 @@ type Mux struct { id int32 closeChan chan struct{} IsClose bool + pingOk int + connType string sync.Mutex } -func NewMux(c net.Conn) *Mux { +func NewMux(c net.Conn, connType string) *Mux { m := &Mux{ conn: c, connMap: NewConnMap(), @@ -44,6 +45,7 @@ func NewMux(c net.Conn) *Mux { closeChan: make(chan struct{}), newConnCh: make(chan *conn), IsClose: false, + connType: connType, } //read session by flag go m.readSession() @@ -85,7 +87,11 @@ func (s *Mux) Accept() (net.Conn, error) { if s.IsClose { return nil, errors.New("accpet error,the conn has closed") } - return <-s.newConnCh, nil + conn := <-s.newConnCh + if conn == nil { + return nil, errors.New("accpet error,the conn has closed") + } + return conn, nil } func (s *Mux) Addr() net.Addr { @@ -118,11 +124,11 @@ func (s *Mux) ping() { if (math.MaxInt32 - s.id) < 10000 { s.id = 0 } - if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil { - logs.Error("ping error,close the connection") + if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || (s.pingOk > 10 && s.connType == "kcp") { s.Close() break } + s.pingOk++ } }() select { @@ -141,6 +147,7 @@ func (s *Mux) readSession() { if binary.Read(s.conn, binary.LittleEndian, &i) != nil { break } + s.pingOk = 0 switch flag { case MUX_NEW_CONN: //new conn conn := NewConn(i, s) @@ -187,7 +194,6 @@ func (s *Mux) readSession() { pool.PutBufPoolCopy(buf) } } else { - logs.Error("read or send error") break } } @@ -210,6 +216,7 @@ func (s *Mux) Close() error { select { case s.closeChan <- struct{}{}: } + close(s.newConnCh) return s.conn.Close() } diff --git a/lib/mux/pmux.go b/lib/mux/pmux.go index 498fd84..1609e26 100644 --- a/lib/mux/pmux.go +++ b/lib/mux/pmux.go @@ -88,7 +88,7 @@ func (pMux *PortMux) process(conn net.Conn) { var ch chan *PortConn var rs []byte var buffer bytes.Buffer - switch bytesToNum(buf) { + switch common.BytesToNum(buf) { case HTTP_CONNECT, HTTP_DELETE, HTTP_GET, HTTP_HEAD, HTTP_OPTIONS, HTTP_POST, HTTP_PUT, HTTP_TRACE: //http and manager buffer.Reset() r := bufio.NewReader(conn) @@ -161,12 +161,3 @@ func (pMux *PortMux) GetHttpsListener() net.Listener { func (pMux *PortMux) GetManagerListener() net.Listener { return NewPortListener(pMux.managerConn, pMux.Listener.Addr()) } - -func bytesToNum(b []byte) int { - var str string - for i := 0; i < len(b); i++ { - str += strconv.Itoa(int(b[i])) - } - x, _ := strconv.Atoi(str) - return int(x) -} diff --git a/server/connection/connection.go b/server/connection/connection.go index 79c144d..a0f223e 100644 --- a/server/connection/connection.go +++ b/server/connection/connection.go @@ -4,7 +4,6 @@ import ( "github.com/cnlh/nps/lib/mux" "github.com/cnlh/nps/vender/github.com/astaxie/beego" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" - "github.com/cnlh/nps/vender/github.com/xtaci/kcp" "net" "os" "strconv" @@ -32,7 +31,7 @@ func InitConnectionService() { } } -func GetBridgeListener(tp string) (interface{}, error) { +func GetBridgeListener(tp string) (net.Listener, error) { logs.Info("server start, the bridge type is %s, the bridge port is %s", tp, bridgePort) var p int var err error @@ -41,13 +40,6 @@ func GetBridgeListener(tp string) (interface{}, error) { } if pMux != nil { return pMux.GetClientListener(), nil - } else if tp == "udp" { - if p, err = beego.AppConfig.Int("bridge_port"); err != nil { - logs.Error(err) - os.Exit(0) - } else { - return kcp.ListenWithOptions(":"+strconv.Itoa(p), nil, 150, 3) - } } return net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP(beego.AppConfig.String("bridge_ip")), p, ""}) } diff --git a/server/proxy/base.go b/server/proxy/base.go index 6ec40ee..245ef9d 100644 --- a/server/proxy/base.go +++ b/server/proxy/base.go @@ -72,21 +72,21 @@ func (s *BaseServer) checkFlow() error { } //与客户端建立通道 -func (s *BaseServer) DealClient(c *conn.Conn, addr string, rb []byte, tp string) error { - link := conn.NewLink(tp, addr, s.task.Client.Cnf.Crypt, s.task.Client.Cnf.Compress, c.Conn.RemoteAddr().String()) +func (s *BaseServer) DealClient(c *conn.Conn, client *file.Client, addr string, rb []byte, tp string) error { + link := conn.NewLink(tp, addr, client.Cnf.Crypt, client.Cnf.Compress, c.Conn.RemoteAddr().String()) - if target, err := s.bridge.SendLinkInfo(s.task.Client.Id, link, c.Conn.RemoteAddr().String(), s.task); err != nil { - logs.Warn("task id %d get connection from client id %d error %s", s.task.Id, s.task.Client.Id, err.Error()) + if target, err := s.bridge.SendLinkInfo(client.Id, link, c.Conn.RemoteAddr().String(), s.task); err != nil { + logs.Warn("task id %d get connection from client id %d error %s", s.task.Id, client.Id, err.Error()) c.Close() return err } else { if rb != nil { //HTTP proxy crypt or compress - conn.GetConn(target, link.Crypt, link.Compress, s.task.Client.Rate, true).Write(rb) + conn.GetConn(target, link.Crypt, link.Compress, client.Rate, true).Write(rb) } - conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, s.task.Client.Rate, s.task.Flow, true) + conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, client.Rate, s.task.Flow, true) } - s.task.Client.AddConn() + client.AddConn() return nil } diff --git a/server/proxy/http.go b/server/proxy/http.go index 07d851b..d7f5238 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -2,6 +2,7 @@ package proxy import ( "bufio" + "bytes" "crypto/tls" "github.com/cnlh/nps/bridge" "github.com/cnlh/nps/lib/common" @@ -14,6 +15,7 @@ import ( "net" "net/http" "net/http/httputil" + "net/url" "os" "path/filepath" "strconv" @@ -49,6 +51,49 @@ func NewHttp(bridge *bridge.Bridge, c *file.Tunnel) *httpServer { } } +func (s *httpServer) processHttps(c net.Conn) { + buf := make([]byte, 2<<10) + n, err := c.Read(buf) + if err != nil { + return + } + var host *file.Host + file.GetCsvDb().Lock() + for _, host = range file.GetCsvDb().Hosts { + if bytes.Index(buf[:n], []byte(host.Host)) >= 0 { + break + } + } + file.GetCsvDb().Unlock() + if host == nil { + logs.Error("new https connection can't be parsed!", c.RemoteAddr().String()) + c.Close() + return + } + var targetAddr string + r := new(http.Request) + r.RequestURI = "/" + r.URL = new(url.URL) + r.URL.Scheme = "https" + r.Host = host.Host + //read the host form connection + if !host.Client.GetConn() { //conn num limit + logs.Notice("connections exceed the current client %d limit %d ,now connection num %d", host.Client.Id, host.Client.MaxConn, host.Client.NowConn) + c.Close() + return + } + //流量限制 + if host.Client.Flow.FlowLimit > 0 && (host.Client.Flow.FlowLimit<<20) < (host.Client.Flow.ExportFlow+host.Client.Flow.InletFlow) { + logs.Warn("Traffic exceeded client id %s", host.Client.Id) + return + } + if targetAddr, err = host.GetRandomTarget(); err != nil { + logs.Warn(err.Error()) + } + logs.Trace("new https connection,clientId %d,host %s,remote address %s", host.Client.Id, r.Host, c.RemoteAddr().String()) + s.DealClient(conn.NewConn(c), host.Client, targetAddr, buf[:n], common.CONN_TCP) +} + func (s *httpServer) Start() error { var err error var httpSrv, httpsSrv *http.Server @@ -81,16 +126,26 @@ func (s *httpServer) Start() error { } httpsSrv = s.NewServer(s.httpsPort, "https") go func() { - logs.Info("Start https listener, port is", s.httpsPort) l, err := connection.GetHttpsListener() if err != nil { logs.Error(err) os.Exit(0) } - err = httpsSrv.ServeTLS(l, s.pemPath, s.keyPath) - if err != nil { - logs.Error(err) - os.Exit(0) + if b, err := beego.AppConfig.Bool("https_just_proxy"); err == nil && b { + for { + c, err := l.Accept() + if err != nil { + logs.Error(err) + break + } + go s.processHttps(c) + } + } else { + err = httpsSrv.ServeTLS(l, s.pemPath, s.keyPath) + if err != nil { + logs.Error(err) + os.Exit(0) + } } }() } diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index f27deef..d0fac85 100755 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -115,7 +115,7 @@ func ProcessTunnel(c *conn.Conn, s *TunnelModeServer) error { logs.Warn("tcp port %d ,client id %d,task id %d connect error %s", s.task.Port, s.task.Client.Id, s.task.Id, err.Error()) return err } - return s.DealClient(c, targetAddr, nil, common.CONN_TCP) + return s.DealClient(c, s.task.Client, targetAddr, nil, common.CONN_TCP) } //http代理模式 @@ -133,5 +133,5 @@ func ProcessHttp(c *conn.Conn, s *TunnelModeServer) error { if err := s.auth(r, c, s.task.Client.Cnf.U, s.task.Client.Cnf.P); err != nil { return err } - return s.DealClient(c, addr, rb, common.CONN_TCP) + return s.DealClient(c, s.task.Client, addr, rb, common.CONN_TCP) } diff --git a/server/server.go b/server/server.go index 6b9ec7f..5f6ae09 100644 --- a/server/server.go +++ b/server/server.go @@ -66,7 +66,7 @@ func DealBridgeTask() { logs.Info("Connections exceed the current client %d limit", t.Client.Id) s.Conn.Close() } else if t.Status { - go proxy.NewBaseServer(Bridge, t).DealClient(s.Conn, t.Target, nil, common.CONN_TCP) + go proxy.NewBaseServer(Bridge, t).DealClient(s.Conn, t.Client, t.Target, nil, common.CONN_TCP) } else { s.Conn.Close() logs.Trace("This key %s cannot be processed,status is close", s.Password)