mirror of
https://github.com/ehang-io/nps.git
synced 2025-09-02 20:16:52 +00:00
结构调整、kcp支持
This commit is contained in:
172
bridge/bridge.go
172
bridge/bridge.go
@@ -2,71 +2,103 @@ package bridge
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/cnlh/nps/lib"
|
||||
"github.com/cnlh/nps/lib/conn"
|
||||
"github.com/cnlh/nps/lib/file"
|
||||
"github.com/cnlh/nps/lib/kcp"
|
||||
"github.com/cnlh/nps/lib/lg"
|
||||
"github.com/cnlh/nps/lib/pool"
|
||||
"github.com/cnlh/nps/lib/common"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
tunnel *lib.Conn
|
||||
signal *lib.Conn
|
||||
linkMap map[int]*lib.Link
|
||||
tunnel *conn.Conn
|
||||
signal *conn.Conn
|
||||
linkMap map[int]*conn.Link
|
||||
linkStatusMap map[int]bool
|
||||
stop chan bool
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type Bridge struct {
|
||||
TunnelPort int //通信隧道端口
|
||||
listener *net.TCPListener //server端监听
|
||||
Client map[int]*Client
|
||||
RunList map[int]interface{} //运行中的任务
|
||||
lock sync.Mutex
|
||||
tunnelLock sync.Mutex
|
||||
clientLock sync.Mutex
|
||||
func NewClient(t *conn.Conn, s *conn.Conn) *Client {
|
||||
return &Client{
|
||||
linkMap: make(map[int]*conn.Link),
|
||||
stop: make(chan bool),
|
||||
linkStatusMap: make(map[int]bool),
|
||||
signal: s,
|
||||
tunnel: t,
|
||||
}
|
||||
}
|
||||
|
||||
func NewTunnel(tunnelPort int, runList map[int]interface{}) *Bridge {
|
||||
type Bridge struct {
|
||||
TunnelPort int //通信隧道端口
|
||||
tcpListener *net.TCPListener //server端监听
|
||||
kcpListener *kcp.Listener //server端监听
|
||||
Client map[int]*Client
|
||||
RunList map[int]interface{} //运行中的任务
|
||||
tunnelType string //bridge type kcp or tcp
|
||||
lock sync.Mutex
|
||||
tunnelLock sync.Mutex
|
||||
clientLock sync.RWMutex
|
||||
}
|
||||
|
||||
func NewTunnel(tunnelPort int, runList map[int]interface{}, tunnelType string) *Bridge {
|
||||
t := new(Bridge)
|
||||
t.TunnelPort = tunnelPort
|
||||
t.Client = make(map[int]*Client)
|
||||
t.RunList = runList
|
||||
t.tunnelType = tunnelType
|
||||
return t
|
||||
}
|
||||
|
||||
func (s *Bridge) StartTunnel() error {
|
||||
var err error
|
||||
s.listener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
|
||||
if err != nil {
|
||||
return err
|
||||
if s.tunnelType == "kcp" {
|
||||
s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
c, err := s.kcpListener.AcceptKCP()
|
||||
conn.SetUdpSession(c)
|
||||
if err != nil {
|
||||
lg.Println(err)
|
||||
continue
|
||||
}
|
||||
go s.cliProcess(conn.NewConn(c))
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
s.tcpListener, err = net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("0.0.0.0"), s.TunnelPort, ""})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
c, err := s.tcpListener.Accept()
|
||||
if err != nil {
|
||||
lg.Println(err)
|
||||
continue
|
||||
}
|
||||
go s.cliProcess(conn.NewConn(c))
|
||||
}
|
||||
}()
|
||||
}
|
||||
go s.tunnelProcess()
|
||||
return nil
|
||||
}
|
||||
|
||||
//tcp server
|
||||
func (s *Bridge) tunnelProcess() error {
|
||||
var err error
|
||||
for {
|
||||
conn, err := s.listener.Accept()
|
||||
if err != nil {
|
||||
lib.Println(err)
|
||||
continue
|
||||
}
|
||||
go s.cliProcess(lib.NewConn(conn))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
//验证失败,返回错误验证flag,并且关闭连接
|
||||
func (s *Bridge) verifyError(c *lib.Conn) {
|
||||
c.Write([]byte(lib.VERIFY_EER))
|
||||
func (s *Bridge) verifyError(c *conn.Conn) {
|
||||
c.Write([]byte(common.VERIFY_EER))
|
||||
c.Conn.Close()
|
||||
}
|
||||
|
||||
func (s *Bridge) cliProcess(c *lib.Conn) {
|
||||
c.SetReadDeadline(5)
|
||||
func (s *Bridge) cliProcess(c *conn.Conn) {
|
||||
c.SetReadDeadline(5, s.tunnelType)
|
||||
var buf []byte
|
||||
var err error
|
||||
if buf, err = c.ReadLen(32); err != nil {
|
||||
@@ -74,9 +106,9 @@ func (s *Bridge) cliProcess(c *lib.Conn) {
|
||||
return
|
||||
}
|
||||
//验证
|
||||
id, err := lib.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
|
||||
id, err := file.GetCsvDb().GetIdByVerifyKey(string(buf), c.Conn.RemoteAddr().String())
|
||||
if err != nil {
|
||||
lib.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
|
||||
lg.Println("当前客户端连接校验错误,关闭此客户端:", c.Conn.RemoteAddr())
|
||||
s.verifyError(c)
|
||||
return
|
||||
}
|
||||
@@ -97,40 +129,39 @@ func (s *Bridge) closeClient(id int) {
|
||||
}
|
||||
|
||||
//tcp连接类型区分
|
||||
func (s *Bridge) typeDeal(typeVal string, c *lib.Conn, id int) {
|
||||
func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) {
|
||||
switch typeVal {
|
||||
case lib.WORK_MAIN:
|
||||
case common.WORK_MAIN:
|
||||
//客户端已经存在,下线
|
||||
s.clientLock.Lock()
|
||||
if _, ok := s.Client[id]; ok {
|
||||
s.clientLock.Unlock()
|
||||
s.closeClient(id)
|
||||
} else {
|
||||
s.clientLock.Unlock()
|
||||
}
|
||||
s.clientLock.Lock()
|
||||
|
||||
s.Client[id] = &Client{
|
||||
linkMap: make(map[int]*lib.Link),
|
||||
stop: make(chan bool),
|
||||
linkStatusMap: make(map[int]bool),
|
||||
}
|
||||
lib.Printf("客户端%d连接成功,地址为:%s", id, c.Conn.RemoteAddr())
|
||||
s.Client[id].signal = c
|
||||
s.clientLock.Unlock()
|
||||
go s.GetStatus(id)
|
||||
case lib.WORK_CHAN:
|
||||
s.clientLock.Lock()
|
||||
if v, ok := s.Client[id]; ok {
|
||||
s.clientLock.Unlock()
|
||||
v.tunnel = c
|
||||
if v.signal != nil {
|
||||
v.signal.WriteClose()
|
||||
}
|
||||
v.Lock()
|
||||
v.signal = c
|
||||
v.Unlock()
|
||||
} else {
|
||||
s.Client[id] = NewClient(nil, c)
|
||||
s.clientLock.Unlock()
|
||||
}
|
||||
lg.Printf("客户端%d连接成功,地址为:%s", id, c.Conn.RemoteAddr())
|
||||
go s.GetStatus(id)
|
||||
case common.WORK_CHAN:
|
||||
s.clientLock.Lock()
|
||||
if v, ok := s.Client[id]; ok {
|
||||
s.clientLock.Unlock()
|
||||
v.Lock()
|
||||
v.tunnel = c
|
||||
v.Unlock()
|
||||
} else {
|
||||
s.Client[id] = NewClient(c, nil)
|
||||
s.clientLock.Unlock()
|
||||
return
|
||||
}
|
||||
go s.clientCopy(id)
|
||||
}
|
||||
c.SetAlive()
|
||||
c.SetAlive(s.tunnelType)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -161,13 +192,13 @@ func (s *Bridge) waitStatus(clientId, id int) (bool) {
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *Bridge) SendLinkInfo(clientId int, link *lib.Link) (tunnel *lib.Conn, err error) {
|
||||
func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link) (tunnel *conn.Conn, err error) {
|
||||
s.clientLock.Lock()
|
||||
if v, ok := s.Client[clientId]; ok {
|
||||
s.clientLock.Unlock()
|
||||
v.signal.SendLinkInfo(link)
|
||||
if err != nil {
|
||||
lib.Println("send error:", err, link.Id)
|
||||
lg.Println("send error:", err, link.Id)
|
||||
s.DelClient(clientId)
|
||||
return
|
||||
}
|
||||
@@ -192,7 +223,7 @@ func (s *Bridge) SendLinkInfo(clientId int, link *lib.Link) (tunnel *lib.Conn, e
|
||||
}
|
||||
|
||||
//得到一个tcp隧道
|
||||
func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (conn *lib.Conn, err error) {
|
||||
func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (conn *conn.Conn, err error) {
|
||||
s.clientLock.Lock()
|
||||
defer s.clientLock.Unlock()
|
||||
if v, ok := s.Client[id]; !ok {
|
||||
@@ -204,7 +235,7 @@ func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (conn *lib.Conn,
|
||||
}
|
||||
|
||||
//得到一个通信通道
|
||||
func (s *Bridge) GetSignal(id int) (conn *lib.Conn, err error) {
|
||||
func (s *Bridge) GetSignal(id int) (conn *conn.Conn, err error) {
|
||||
s.clientLock.Lock()
|
||||
defer s.clientLock.Unlock()
|
||||
if v, ok := s.Client[id]; !ok {
|
||||
@@ -257,19 +288,19 @@ func (s *Bridge) clientCopy(clientId int) {
|
||||
for {
|
||||
if id, err := client.tunnel.GetLen(); err != nil {
|
||||
s.closeClient(clientId)
|
||||
lib.Println("读取msg id 错误", err, id)
|
||||
lg.Println("读取msg id 错误", err, id)
|
||||
break
|
||||
} else {
|
||||
client.Lock()
|
||||
if link, ok := client.linkMap[id]; ok {
|
||||
client.Unlock()
|
||||
if content, err := client.tunnel.GetMsgContent(link); err != nil {
|
||||
lib.PutBufPoolCopy(content)
|
||||
pool.PutBufPoolCopy(content)
|
||||
s.closeClient(clientId)
|
||||
lib.Println("read msg content error", err, "close client")
|
||||
lg.Println("read msg content error", err, "close client")
|
||||
break
|
||||
} else {
|
||||
if len(content) == len(lib.IO_EOF) && string(content) == lib.IO_EOF {
|
||||
if len(content) == len(common.IO_EOF) && string(content) == common.IO_EOF {
|
||||
if link.Conn != nil {
|
||||
link.Conn.Close()
|
||||
}
|
||||
@@ -281,7 +312,7 @@ func (s *Bridge) clientCopy(clientId int) {
|
||||
}
|
||||
link.Flow.Add(0, len(content))
|
||||
}
|
||||
lib.PutBufPoolCopy(content)
|
||||
pool.PutBufPoolCopy(content)
|
||||
}
|
||||
} else {
|
||||
client.Unlock()
|
||||
@@ -289,5 +320,4 @@ func (s *Bridge) clientCopy(clientId int) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user