diff --git a/lib/conn/conn.go b/lib/conn/conn.go index 7758e2c..5918315 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -416,10 +416,13 @@ func SetUdpSession(sess *kcp.UDPSession) { } //conn1 mux conn -func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool) { +func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) { var in, out int64 var wg sync.WaitGroup connHandle := GetConn(conn1, crypt, snappy, rate, isServer) + if rb != nil { + connHandle.Write(rb) + } go func(in *int64) { wg.Add(1) *in, _ = common.CopyBuffer(connHandle, conn2) diff --git a/server/proxy/base.go b/server/proxy/base.go index fedbf33..08d5108 100644 --- a/server/proxy/base.go +++ b/server/proxy/base.go @@ -75,21 +75,17 @@ func (s *BaseServer) CheckFlowAndConnNum(client *file.Client) error { } //与客户端建立通道 -func (s *BaseServer) DealClient(c *conn.Conn, client *file.Client, addr string, rb []byte, tp string, f func()) error { +func (s *BaseServer) DealClient(c *conn.Conn, client *file.Client, addr string, rb []byte, tp string, f func(), flow *file.Flow) error { link := conn.NewLink(tp, addr, client.Cnf.Crypt, client.Cnf.Compress, c.Conn.RemoteAddr().String()) if target, err := s.bridge.SendLinkInfo(client.Id, link, c.Conn.RemoteAddr().String(), s.task); err != nil { logs.Warn("task id %d get connection from client id %d error %s", s.task.Id, client.Id, err.Error()) c.Close() return err } else { - if rb != nil { - //HTTP proxy crypt or compress - conn.GetConn(target, link.Crypt, link.Compress, client.Rate, true).Write(rb) - } if f != nil { f() } - conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, client.Rate, s.task.Flow, true) + conn.CopyWaitGroup(target, c.Conn, link.Crypt, link.Compress, client.Rate, flow, true, rb) } client.AddConn() return nil diff --git a/server/proxy/http.go b/server/proxy/http.go index 24f5105..07a0dd3 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -52,7 +52,7 @@ func NewHttp(bridge *bridge.Bridge, c *file.Tunnel) *httpServer { } func (s *httpServer) processHttps(c net.Conn) { - buf := make([]byte, 2<<10) + buf := make([]byte, 2048) n, err := c.Read(buf) if err != nil { return @@ -93,7 +93,7 @@ func (s *httpServer) processHttps(c net.Conn) { logs.Warn(err.Error()) } logs.Trace("new https connection,clientId %d,host %s,remote address %s", host.Client.Id, r.Host, c.RemoteAddr().String()) - s.DealClient(conn.NewConn(c), host.Client, targetAddr, buf[:n], common.CONN_TCP, nil) + s.DealClient(conn.NewConn(c), host.Client, targetAddr, buf[:n], common.CONN_TCP, nil, host.Flow) } func (s *httpServer) Start() error { diff --git a/server/proxy/socks5.go b/server/proxy/socks5.go index 0942193..2ad634f 100755 --- a/server/proxy/socks5.go +++ b/server/proxy/socks5.go @@ -142,7 +142,7 @@ func (s *Sock5ModeServer) doConnect(c net.Conn, command uint8) { } s.DealClient(conn.NewConn(c), s.task.Client, addr, nil, ltype, func() { s.sendReply(c, succeeded) - }) + }, s.task.Flow) return } diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 12c998f..1df447a 100755 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -93,7 +93,7 @@ func ProcessTunnel(c *conn.Conn, s *TunnelModeServer) error { logs.Warn("tcp port %d ,client id %d,task id %d connect error %s", s.task.Port, s.task.Client.Id, s.task.Id, err.Error()) return err } - return s.DealClient(c, s.task.Client, targetAddr, nil, common.CONN_TCP, nil) + return s.DealClient(c, s.task.Client, targetAddr, nil, common.CONN_TCP, nil, s.task.Flow) } //http代理模式 @@ -111,5 +111,5 @@ func ProcessHttp(c *conn.Conn, s *TunnelModeServer) error { if err := s.auth(r, c, s.task.Client.Cnf.U, s.task.Client.Cnf.P); err != nil { return err } - return s.DealClient(c, s.task.Client, addr, rb, common.CONN_TCP, nil) + return s.DealClient(c, s.task.Client, addr, rb, common.CONN_TCP, nil, s.task.Flow) } diff --git a/server/server.go b/server/server.go index 981005c..40e5cc3 100644 --- a/server/server.go +++ b/server/server.go @@ -68,7 +68,7 @@ func DealBridgeTask() { logs.Info("Connections exceed the current client %d limit", t.Client.Id) s.Conn.Close() } else if t.Status { - go proxy.NewBaseServer(Bridge, t).DealClient(s.Conn, t.Client, t.Target, nil, common.CONN_TCP, nil) + go proxy.NewBaseServer(Bridge, t).DealClient(s.Conn, t.Client, t.Target, nil, common.CONN_TCP, nil, t.Flow) } else { s.Conn.Close() logs.Trace("This key %s cannot be processed,status is close", s.Password)