diff --git a/bridge/bridge.go b/bridge/bridge.go index b3d5bc0..dde208b 100755 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -324,7 +324,7 @@ func (s *Bridge) register(c *conn.Conn) { var hour int32 if err := binary.Read(c, binary.LittleEndian, &hour); err == nil { s.registerLock.Lock() - s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Minute * time.Duration(hour)) + s.Register[common.GetIpByAddr(c.Conn.RemoteAddr().String())] = time.Now().Add(time.Hour * time.Duration(hour)) s.registerLock.Unlock() } } diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 6fd81b8..80e7518 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -26,6 +26,7 @@ type conn struct { connId int32 isClose bool readWait bool + hasWrite int mux *Mux } @@ -83,9 +84,7 @@ func (s *conn) Read(buf []byte) (n int, err error) { } else { n = copy(buf, s.readBuffer[s.startRead:s.endRead]) s.startRead += n - if s.waitQueue.Size() < s.mux.waitQueueSize/2 { - s.mux.sendInfo(MUX_MSG_SEND_OK, s.connId, nil) - } + s.mux.sendInfo(MUX_MSG_SEND_OK, s.connId, nil) } return } @@ -116,9 +115,10 @@ func (s *conn) write(buf []byte, ch chan struct{}) { start := 0 l := len(buf) for { - if s.stopWrite { + if s.hasWrite > 10 { <-s.getStatusCh } + s.hasWrite++ if l-start > pool.PoolSizeCopy { s.mux.sendInfo(MUX_NEW_MSG, s.connId, buf[start:start+pool.PoolSizeCopy]) start += pool.PoolSizeCopy diff --git a/lib/mux/mux.go b/lib/mux/mux.go index cfbac6d..7be000c 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "github.com/cnlh/nps/lib/pool" + "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "math" "net" "sync" @@ -22,32 +23,27 @@ const ( MUX_PING MUX_CONN_CLOSE MUX_PING_RETURN - MUX_STOP_WRITE - RETRY_TIME = 2 //Heart beat allowed fault tolerance times ) type Mux struct { net.Listener - conn net.Conn - connMap *connMap - newConnCh chan *conn - id int32 - closeChan chan struct{} - IsClose bool - pingOk int - waitQueueSize int + conn net.Conn + connMap *connMap + newConnCh chan *conn + id int32 + closeChan chan struct{} + IsClose bool sync.Mutex } func NewMux(c net.Conn) *Mux { m := &Mux{ - conn: c, - connMap: NewConnMap(), - id: 0, - closeChan: make(chan struct{}), - newConnCh: make(chan *conn), - IsClose: false, - waitQueueSize: 10, //TODO :In order to be more efficient, this value can be dynamically generated according to the delay algorithm. + conn: c, + connMap: NewConnMap(), + id: 0, + closeChan: make(chan struct{}), + newConnCh: make(chan *conn), + IsClose: false, } //read session by flag go m.readSession() @@ -104,7 +100,7 @@ func (s *Mux) sendInfo(flag int32, id int32, content []byte) error { binary.Write(raw, binary.LittleEndian, int32(len(content))) binary.Write(raw, binary.LittleEndian, content) } - if _, err := s.conn.Write(raw.Bytes()); err != nil || s.pingOk > RETRY_TIME { + if _, err := s.conn.Write(raw.Bytes()); err != nil { s.Close() return err } @@ -113,7 +109,7 @@ func (s *Mux) sendInfo(flag int32, id int32, content []byte) error { func (s *Mux) ping() { go func() { - ticker := time.NewTicker(time.Second * 5) + ticker := time.NewTicker(time.Second * 1) for { select { case <-ticker.C: @@ -122,10 +118,11 @@ func (s *Mux) ping() { if (math.MaxInt32 - s.id) < 10000 { s.id = 0 } - if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || s.pingOk > RETRY_TIME { + if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil { + logs.Error("ping error,close the connection") + s.Close() break } - s.pingOk += 1 } }() select { @@ -155,7 +152,6 @@ func (s *Mux) readSession() { s.sendInfo(MUX_PING_RETURN, MUX_PING, nil) continue case MUX_PING_RETURN: - s.pingOk -= 1 continue case MUX_NEW_MSG: buf = pool.GetBufPoolCopy() @@ -173,19 +169,12 @@ func (s *Mux) readSession() { conn.readWait = false conn.readCh <- struct{}{} } - if conn.waitQueue.Size() > s.waitQueueSize { - s.sendInfo(MUX_STOP_WRITE, conn.connId, nil) - } - case MUX_STOP_WRITE: - conn.stopWrite = true case MUX_MSG_SEND_OK: //the remote has read - if conn.stopWrite { - conn.stopWrite = false - select { - case conn.getStatusCh <- struct{}{}: - default: - } + select { + case conn.getStatusCh <- struct{}{}: + default: } + conn.hasWrite -- case MUX_NEW_CONN_OK: //conn ok conn.connStatusOkCh <- struct{}{} case MUX_NEW_CONN_Fail: @@ -198,6 +187,7 @@ func (s *Mux) readSession() { pool.PutBufPoolCopy(buf) } } else { + logs.Error("read or send error") break } } @@ -214,9 +204,12 @@ func (s *Mux) Close() error { } s.IsClose = true s.connMap.Close() - s.closeChan <- struct{}{} - s.closeChan <- struct{}{} - s.closeChan <- struct{}{} + select { + case s.closeChan <- struct{}{}: + } + select { + case s.closeChan <- struct{}{}: + } return s.conn.Close() } diff --git a/server/proxy/http.go b/server/proxy/http.go index 6c78283..07d851b 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -211,14 +211,18 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) { } //根据设定,修改header和host common.ChangeHostAndHeader(r, host.HostChange, host.HeaderChange, c.Conn.RemoteAddr().String()) - b, err := httputil.DumpRequest(r, true) + b, err := httputil.DumpRequest(r, false) if err != nil { break } - host.Flow.Add(int64(len(b)), 0) logs.Trace("%s request, method %s, host %s, url %s, remote address %s, target %s", r.URL.Scheme, r.Method, r.Host, r.RequestURI, r.RemoteAddr, lk.Host) //write connClient.Write(b) + if bodyLen, err := common.CopyBuffer(connClient, r.Body); err != nil { + break + } else { + host.Flow.Add(int64(len(b))+bodyLen, 0) + } } end: if isConn { diff --git a/server/server.go b/server/server.go index f6038d9..6b9ec7f 100644 --- a/server/server.go +++ b/server/server.go @@ -91,6 +91,7 @@ func StartNewServer(bridgePort int, cnf *file.Tunnel, bridgeType string) { go proxy.NewP2PServer(p).Start() } go DealBridgeTask() + go dealClientFlow() if svr := NewMode(Bridge, cnf); svr != nil { if err := svr.Start(); err != nil { logs.Error(err) @@ -101,6 +102,16 @@ func StartNewServer(bridgePort int, cnf *file.Tunnel, bridgeType string) { } } +func dealClientFlow() { + ticker := time.NewTicker(time.Minute) + for { + select { + case <-ticker.C: + dealClientData(file.GetCsvDb().Clients) + } + } +} + //new a server by mode name func NewMode(Bridge *bridge.Bridge, c *file.Tunnel) proxy.Service { var service proxy.Service