https 、客户端与服务端连接优化

This commit is contained in:
刘河
2019-02-01 02:06:30 +08:00
parent 717028e5f1
commit eccc221e67
32 changed files with 1106 additions and 1140 deletions

View File

@@ -5,47 +5,33 @@ import (
"github.com/cnlh/easyProxy/utils"
"log"
"net"
"strconv"
"sync"
"time"
)
type list struct {
connList chan *utils.Conn
}
func (l *list) Add(c *utils.Conn) {
l.connList <- c
}
func (l *list) Pop() *utils.Conn {
return <-l.connList
}
func (l *list) Len() int {
return len(l.connList)
}
func newList() *list {
l := new(list)
l.connList = make(chan *utils.Conn, 1000)
return l
type Client struct {
tunnel *utils.Conn
signal *utils.Conn
linkMap map[int]*utils.Link
linkStatusMap map[int]bool
stop chan bool
sync.RWMutex
}
type Bridge struct {
TunnelPort int //通信隧道端口
listener *net.TCPListener //server端监听
SignalList map[int]*list //通信
TunnelList map[int]*list //隧道
TunnelPort int //通信隧道端口
listener *net.TCPListener //server端监听
Client map[int]*Client
RunList map[int]interface{} //运行中的任务
lock sync.Mutex
tunnelLock sync.Mutex
clientLock sync.Mutex
}
func NewTunnel(tunnelPort int, runList map[int]interface{}) *Bridge {
t := new(Bridge)
t.TunnelPort = tunnelPort
t.SignalList = make(map[int]*list)
t.TunnelList = make(map[int]*list)
t.Client = make(map[int]*Client)
t.RunList = runList
return t
}
@@ -103,11 +89,12 @@ func (s *Bridge) cliProcess(c *utils.Conn) {
}
func (s *Bridge) closeClient(id int) {
if len(s.SignalList) > 0 {
s.SignalList[id].Pop().WriteClose()
s.clientLock.Lock()
defer s.clientLock.Unlock()
if v, ok := s.Client[id]; ok {
v.signal.WriteClose()
delete(s.Client, id)
}
s.DelClientSignal(id)
s.DelClientTunnel(id)
}
//tcp连接类型区分
@@ -115,113 +102,123 @@ func (s *Bridge) typeDeal(typeVal string, c *utils.Conn, id int) {
switch typeVal {
case utils.WORK_MAIN:
//客户端已经存在,下线
if _, ok := s.SignalList[id]; ok {
s.clientLock.Lock()
if _, ok := s.Client[id]; ok {
s.clientLock.Unlock()
s.closeClient(id)
} else {
s.clientLock.Unlock()
}
log.Println("客户端连接成功", c.Conn.RemoteAddr())
s.addList(s.SignalList, c, id)
s.clientLock.Lock()
s.Client[id] = &Client{
linkMap: make(map[int]*utils.Link),
stop: make(chan bool),
linkStatusMap: make(map[int]bool),
}
log.Printf("客户端%d连接成功,地址为:%s", id, c.Conn.RemoteAddr())
s.Client[id].signal = c
s.clientLock.Unlock()
go s.GetStatus(id)
case utils.WORK_CHAN:
s.addList(s.TunnelList, c, id)
s.clientLock.Lock()
if v, ok := s.Client[id]; ok {
s.clientLock.Unlock()
v.tunnel = c
} else {
s.clientLock.Unlock()
return
}
go s.clientCopy(id)
}
c.SetAlive()
return
}
//加到对应的list中
func (s *Bridge) addList(m map[int]*list, c *utils.Conn, id int) {
s.lock.Lock()
if v, ok := m[id]; ok {
v.Add(c)
} else {
l := newList()
l.Add(c)
m[id] = l
//等待
func (s *Bridge) waitStatus(clientId, id int) (bool) {
ticker := time.NewTicker(time.Millisecond * 100)
stop := time.After(time.Second * 10)
for {
select {
case <-ticker.C:
s.clientLock.Lock()
if v, ok := s.Client[clientId]; ok {
s.clientLock.Unlock()
v.Lock()
if vv, ok := v.linkStatusMap[id]; ok {
ticker.Stop()
v.Unlock()
return vv
}
v.Unlock()
} else {
s.clientLock.Unlock()
}
case <-stop:
return false
}
}
s.lock.Unlock()
return false
}
func (s *Bridge) SendLinkInfo(clientId int, link *utils.Link) (tunnel *utils.Conn, err error) {
s.clientLock.Lock()
if v, ok := s.Client[clientId]; ok {
s.clientLock.Unlock()
v.signal.SendLinkInfo(link)
if err != nil {
log.Println("send error:", err, link.Id)
s.DelClient(clientId)
return
}
if v.tunnel == nil {
err = errors.New("tunnel获取错误")
return
} else {
tunnel = v.tunnel
}
v.Lock()
v.linkMap[link.Id] = link
v.Unlock()
if !s.waitStatus(clientId, link.Id) {
err = errors.New("连接失败")
return
}
} else {
s.clientLock.Unlock()
err = errors.New("客户端未连接")
}
return
}
//得到一个tcp隧道
func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (c *utils.Conn, err error) {
retry:
if c, err = s.waitAndPop(s.TunnelList, id); err != nil {
return
func (s *Bridge) GetTunnel(id int, en, de int, crypt, mux bool) (conn *utils.Conn, err error) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
if v, ok := s.Client[id]; !ok {
err = errors.New("客户端未连接")
} else {
conn = v.tunnel
}
if _, err = c.WriteTest(); err != nil {
c.Close()
goto retry
}
c.WriteConnInfo(en, de, crypt, mux)
return
}
//得到一个通信通道
func (s *Bridge) GetSignal(id int) (err error, conn *utils.Conn) {
if v, ok := s.SignalList[id]; !ok || v.Len() == 0 {
func (s *Bridge) GetSignal(id int) (conn *utils.Conn, err error) {
s.clientLock.Lock()
defer s.clientLock.Unlock()
if v, ok := s.Client[id]; !ok {
err = errors.New("客户端未连接")
return
} else {
conn = v.signal
}
conn = s.SignalList[id].Pop()
return
}
//重回slice 复用
func (s *Bridge) ReturnSignal(conn *utils.Conn, id int) {
if v, ok := s.SignalList[id]; ok {
v.Add(conn)
}
}
//重回slice 复用
func (s *Bridge) ReturnTunnel(conn *utils.Conn, id int) {
if v, ok := s.TunnelList[id]; ok {
utils.FlushConn(conn.Conn)
v.Add(conn)
}
}
//删除通信通道
func (s *Bridge) DelClientSignal(id int) {
s.delClient(id, s.SignalList)
}
//删除隧道
func (s *Bridge) DelClientTunnel(id int) {
s.delClient(id, s.TunnelList)
}
func (s *Bridge) delClient(id int, l map[int]*list) {
if t := l[id]; t != nil {
for {
if t.Len() <= 0 {
break
}
t.Pop().Close()
}
delete(l, id)
}
}
//等待
func (s *Bridge) waitAndPop(m map[int]*list, id int) (c *utils.Conn, err error) {
ticker := time.NewTicker(time.Millisecond * 100)
stop := time.After(time.Second * 3)
for {
select {
case <-ticker.C:
s.lock.Lock()
if v, ok := m[id]; ok && v.Len() > 0 {
c = v.Pop()
ticker.Stop()
s.lock.Unlock()
return
}
s.lock.Unlock()
case <-stop:
err = errors.New("client id: " + strconv.Itoa(id) + ",err: get client conn timeout")
return
}
}
return
func (s *Bridge) DelClient(id int) {
s.closeClient(id)
}
func (s *Bridge) verify(id int) bool {
@@ -232,3 +229,66 @@ func (s *Bridge) verify(id int) bool {
}
return false
}
func (s *Bridge) GetStatus(clientId int) {
s.clientLock.Lock()
client := s.Client[clientId]
s.clientLock.Unlock()
if client == nil {
return
}
for {
if id, status, err := client.signal.GetConnStatus(); err != nil {
s.closeClient(clientId)
return
} else {
client.Lock()
client.linkStatusMap[id] = status
client.Unlock()
}
}
}
func (s *Bridge) clientCopy(clientId int) {
s.clientLock.Lock()
client := s.Client[clientId]
s.clientLock.Unlock()
for {
if id, err := client.tunnel.GetLen(); err != nil {
s.closeClient(clientId)
log.Println("读取msg id 错误", err, id)
break
} else {
client.Lock()
if link, ok := client.linkMap[id]; ok {
client.Unlock()
if content, err := client.tunnel.GetMsgContent(link); err != nil {
utils.PutBufPoolCopy(content)
s.closeClient(clientId)
log.Println("read msg content error", err, "close client")
break
} else {
if len(content) == len(utils.IO_EOF) && string(content) == utils.IO_EOF {
if link.Conn != nil {
link.Conn.Close()
}
} else {
if link.UdpListener != nil && link.UdpRemoteAddr != nil {
link.UdpListener.WriteToUDP(content, link.UdpRemoteAddr)
} else {
link.Conn.Write(content)
}
link.Flow.Add(0, len(content))
}
utils.PutBufPoolCopy(content)
}
} else {
client.Unlock()
continue
}
}
}
}