Modular 、Functional enhancement

This commit is contained in:
刘河
2019-04-08 17:01:08 +08:00
parent 0c87b4119a
commit 824b12a2f8
41 changed files with 754 additions and 242 deletions

View File

@@ -8,7 +8,6 @@ import (
"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
"github.com/cnlh/nps/vender/github.com/xtaci/kcp"
"net"
"os"
"time"
)
@@ -43,106 +42,47 @@ retry:
time.Sleep(time.Second * 5)
goto retry
}
logs.Info("Successful connection with server %s", s.svrAddr)
//monitor the connection
go s.ping()
s.processor(c)
}
//处理
func (s *TRPClient) processor(c *conn.Conn) {
s.signal = c
go s.dealChan()
//start a channel connection
go s.newChan()
//start health check if the it's open
if s.cnf != nil && len(s.cnf.Healths) > 0 {
go heathCheck(s.cnf.Healths, s.signal)
}
//msg connection, eg udp
s.handleMain()
}
//handle main connection
func (s *TRPClient) handleMain() {
for {
flags, err := c.ReadFlag()
flags, err := s.signal.ReadFlag()
if err != nil {
logs.Error("Accept server data error %s, end this service", err.Error())
break
}
switch flags {
case common.VERIFY_EER:
logs.Error("VKey:%s is incorrect, the server refuses to connect, please check", s.vKey)
os.Exit(0)
case common.RES_CLOSE:
logs.Error("The authentication key is connected by another client or the server closes the client.")
os.Exit(0)
case common.RES_MSG:
logs.Error("Server-side return error")
break
case common.NEW_UDP_CONN:
//读取服务端地址、密钥 继续做处理
if lAddr, err := c.GetShortLenContent(); err != nil {
//read server udp addr and password
if lAddr, err := s.signal.GetShortLenContent(); err != nil {
logs.Warn(err)
return
} else if pwd, err := c.GetShortLenContent(); err == nil {
} else if pwd, err := s.signal.GetShortLenContent(); err == nil {
go s.newUdpConn(string(lAddr), string(pwd))
}
default:
logs.Warn("The error could not be resolved")
break
}
}
c.Close()
s.Close()
}
func (s *TRPClient) newUdpConn(rAddr string, md5Password string) {
tmpConn, err := common.GetLocalUdpAddr()
if err != nil {
logs.Error(err)
return
}
localAddr, _ := net.ResolveUDPAddr("udp", tmpConn.LocalAddr().String())
localConn, err := net.ListenUDP("udp", localAddr)
if err != nil {
logs.Error(err)
return
}
localKcpConn, err := kcp.NewConn(rAddr, nil, 150, 3, localConn)
if err != nil {
logs.Error(err)
return
}
conn.SetUdpSession(localKcpConn)
localToolConn := conn.NewConn(localKcpConn)
//写入密钥、provider身份
if _, err := localToolConn.Write([]byte(md5Password)); err != nil {
logs.Error(err)
return
}
if _, err := localToolConn.Write([]byte(common.WORK_P2P_PROVIDER)); err != nil {
logs.Error(err)
return
}
//接收服务端传的visitor地址
var b []byte
if b, err = localToolConn.GetShortLenContent(); err != nil {
logs.Error(err)
return
}
//向visitor地址发送测试消息
visitorAddr, err := net.ResolveUDPAddr("udp", string(b))
if err != nil {
logs.Error(err)
return
}
//向目标IP发送探测包
if _, err := localConn.WriteTo([]byte("test"), visitorAddr); err != nil {
logs.Error(err)
return
}
//给服务端发反馈
if _, err := localToolConn.Write([]byte(common.VERIFY_SUCCESS)); err != nil {
logs.Error(err)
return
}
//关闭与服务端的连接
localConn.Close()
//关闭与服务端udp conn建立新的监听
if localConn, err = net.ListenUDP("udp", localAddr); err != nil {
var localConn net.PacketConn
var err error
var remoteAddress string
if remoteAddress, localConn, err = handleP2PUdp(rAddr, md5Password, common.WORK_P2P_PROVIDER); err != nil {
logs.Error(err)
return
}
@@ -151,6 +91,7 @@ func (s *TRPClient) newUdpConn(rAddr string, md5Password string) {
logs.Error(err)
return
}
logs.Trace("start local p2p udp listen, local address", localConn.LocalAddr().String())
//接收新的监听得到conn
for {
udpTunnel, err := l.AcceptKCP()
@@ -159,23 +100,24 @@ func (s *TRPClient) newUdpConn(rAddr string, md5Password string) {
l.Close()
return
}
if udpTunnel.RemoteAddr().String() == string(b) {
if udpTunnel.RemoteAddr().String() == string(remoteAddress) {
conn.SetUdpSession(udpTunnel)
//读取link,设置msgCh 设置msgConn消息回传响应机制
logs.Trace("successful connection with client ,address %s", udpTunnel.RemoteAddr().String())
//read link info from remote
l := mux.NewMux(udpTunnel, s.bridgeConnType)
for {
connMux, err := l.Accept()
if err != nil {
continue
}
go s.srcProcess(connMux)
go s.handleChan(connMux)
}
}
}
}
//mux tunnel
func (s *TRPClient) dealChan() {
func (s *TRPClient) newChan() {
tunnel, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl)
if err != nil {
logs.Error("connect to ", s.svrAddr, "error:", err)
@@ -189,11 +131,11 @@ func (s *TRPClient) dealChan() {
s.Close()
break
}
go s.srcProcess(src)
go s.handleChan(src)
}
}
func (s *TRPClient) srcProcess(src net.Conn) {
func (s *TRPClient) handleChan(src net.Conn) {
lk, err := conn.NewConn(src).GetLinkInfo()
if err != nil {
src.Close()
@@ -218,9 +160,8 @@ loop:
for {
select {
case <-s.ticker.C:
if s.tunnel.IsClose {
if s.tunnel != nil && s.tunnel.IsClose {
s.Close()
s.ticker.Stop()
break loop
}
}