multiple changes

This commit is contained in:
ffdfgdfg
2019-10-12 22:56:37 +08:00
parent 4c8d7b0738
commit 1f8e441090
14 changed files with 176 additions and 33 deletions

View File

@@ -49,4 +49,5 @@ const (
MUX_PING_RETURN
MUX_PING int32 = -1
MAXIMUM_SEGMENT_SIZE = PoolSizeWindow
MAXIMUM_WINDOW_SIZE = 1<<31 - 1
)

View File

@@ -158,8 +158,10 @@ func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (e
Self.Flag = flag
Self.Id = id
switch flag {
case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART:
Self.Content = WindowBuff.Get()
err = Self.BasePackager.NewPac(content...)
//logs.Warn(Self.Length, string(Self.Content))
case MUX_MSG_SEND_OK:
// MUX_MSG_SEND_OK contains two data
switch content[0].(type) {
@@ -190,6 +192,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
switch Self.Flag {
case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
err = Self.BasePackager.Pack(writer)
WindowBuff.Put(Self.Content)
case MUX_MSG_SEND_OK:
err = binary.Write(writer, binary.LittleEndian, Self.Window)
if err != nil {
@@ -201,7 +204,6 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) {
}
func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
Self.BasePackager.clean() // also clean the content
err = binary.Read(reader, binary.LittleEndian, &Self.Flag)
if err != nil {
return
@@ -212,7 +214,10 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) {
}
switch Self.Flag {
case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN:
Self.Content = WindowBuff.Get() // need get a window buf from pool
Self.BasePackager.clean() // also clean the content
err = Self.BasePackager.UnPack(reader)
//logs.Warn("unpack", Self.Length, string(Self.Content))
case MUX_MSG_SEND_OK:
err = binary.Read(reader, binary.LittleEndian, &Self.Window)
if err != nil {

View File

@@ -104,11 +104,7 @@ func (Self *windowBufferPool) Get() (buf []byte) {
}
func (Self *windowBufferPool) Put(x []byte) {
if len(x) == PoolSizeWindow {
Self.pool.Put(x[:PoolSizeWindow]) // make buf to full
} else {
x = nil
}
Self.pool.Put(x[:PoolSizeWindow]) // make buf to full
}
type bufferPool struct {
@@ -146,13 +142,10 @@ func (Self *muxPackagerPool) New() {
}
func (Self *muxPackagerPool) Get() *MuxPackager {
pack := Self.pool.Get().(*MuxPackager)
pack.Content = WindowBuff.Get()
return pack
return Self.pool.Get().(*MuxPackager)
}
func (Self *muxPackagerPool) Put(pack *MuxPackager) {
WindowBuff.Put(pack.Content)
Self.pool.Put(pack)
}

View File

@@ -268,6 +268,9 @@ func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
defer CopyBuff.Put(buf)
for {
nr, er := src.Read(buf)
//if len(pr)>0 && pr[0] && nr > 50 {
// logs.Warn(string(buf[:50]))
//}
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {

View File

@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
@@ -13,6 +14,23 @@ import (
)
func InstallNps() {
unit := `[Unit]
Description=nps - convenient proxy server
Documentation=https://github.com/cnlh/nps/
After=network-online.target remote-fs.target nss-lookup.target
Wants=network-online.target`
service := `[Service]
Type=simple
KillMode=process
Restart=always
RestartSec=15s
StandardOutput=append:/var/log/nps/nps.log
ExecStartPre=/bin/echo 'Starting nps'
ExecStopPost=/bin/echo 'Stopping nps'
ExecStart=`
install := `[Install]
WantedBy=multi-user.target`
path := common.GetInstallPath()
if common.FileExists(path) {
log.Fatalf("the path %s has exist, does not support install", path)
@@ -35,21 +53,35 @@ func InstallNps() {
log.Fatalln(err)
} else {
os.Chmod("/usr/local/bin/nps", 0755)
service += "/usr/local/bin/nps"
log.Println("Executable files have been copied to", "/usr/local/bin/nps")
}
} else {
os.Chmod("/usr/bin/nps", 0755)
service += "/usr/bin/nps"
log.Println("Executable files have been copied to", "/usr/bin/nps")
}
systemd := unit + "\n\n" + service + "\n\n" + install
_ = os.Remove("/usr/lib/systemd/system/nps.service")
err := ioutil.WriteFile("/usr/lib/systemd/system/nps.service", []byte(systemd), 0644)
if err != nil {
log.Println("Write systemd service err ", err)
}
_ = os.Mkdir("/var/log/nps", 644)
}
log.Println("install ok!")
log.Println("Static files and configuration files in the current directory will be useless")
log.Println("The new configuration file is located in", path, "you can edit them")
if !common.IsWindows() {
log.Println("You can start with nps test|start|stop|restart|status anywhere")
log.Println(`You can start with:
sudo systemctl enable|disable|start|stop|restart|status nps
or:
nps test|start|stop|restart|status
anywhere!`)
} else {
log.Println("You can copy executable files to any directory and start working with nps.exe test|start|stop|restart|status")
log.Println(`You can copy executable files to any directory and start working with:
nps.exe test|start|stop|restart|status
now!`)
}
}
func MkidrDirAll(path string, v ...string) {

View File

@@ -183,6 +183,10 @@ func (Self *ReceiveWindow) CalcSize() {
n = Self.bufQueue.Len()
}
// set the minimal size
if n > common.MAXIMUM_WINDOW_SIZE {
n = common.MAXIMUM_WINDOW_SIZE
}
// set the maximum size
//logs.Warn("n", n)
Self.maxSize = n
Self.count = -5
@@ -248,6 +252,10 @@ copyData:
l = 0
Self.bw.EndRead()
Self.sendStatus(id)
if Self.off == uint32(Self.element.l) {
//logs.Warn("put the element end ", string(Self.element.buf[:15]))
common.WindowBuff.Put(Self.element.buf)
}
if pOff < len(p) && Self.element.part {
// element is a part of the segments, trying to fill up buf p
goto copyData

View File

@@ -25,6 +25,7 @@ type Mux struct {
pingOk int
latency float64
pingCh chan []byte
pingTimer *time.Timer
connType string
writeQueue PriorityQueue
bufCh chan *bytes.Buffer
@@ -42,6 +43,7 @@ func NewMux(c net.Conn, connType string) *Mux {
connType: connType,
bufCh: make(chan *bytes.Buffer),
pingCh: make(chan []byte),
pingTimer: time.NewTimer(15 * time.Second),
}
m.writeQueue.New()
//read session by flag
@@ -119,6 +121,7 @@ func (s *Mux) packBuf() {
common.BuffPool.Put(buffer)
break
}
//logs.Warn(buffer.String())
select {
case s.bufCh <- buffer:
case <-s.closeChan:
@@ -153,7 +156,7 @@ func (s *Mux) ping() {
now, _ := time.Now().UTC().MarshalText()
s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
// send the ping flag and get the latency first
ticker := time.NewTicker(time.Second * 15)
ticker := time.NewTicker(time.Second * 5)
for {
if s.IsClose {
ticker.Stop()
@@ -168,6 +171,10 @@ func (s *Mux) ping() {
}
now, _ := time.Now().UTC().MarshalText()
s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now)
if !s.pingTimer.Stop() {
<-s.pingTimer.C
}
s.pingTimer.Reset(15 * time.Second)
if s.pingOk > 10 && s.connType == "kcp" {
s.Close()
break
@@ -186,10 +193,15 @@ func (s *Mux) pingReturn() {
case data = <-s.pingCh:
case <-s.closeChan:
break
case <-s.pingTimer.C:
logs.Error("mux: ping time out")
s.Close()
break
}
_ = now.UnmarshalText(data)
s.latency = time.Now().UTC().Sub(now).Seconds() / 2
//logs.Warn("latency", s.latency)
logs.Warn("latency", s.latency)
common.WindowBuff.Put(data)
if s.latency <= 0 {
logs.Warn("latency err", s.latency)
}
@@ -218,6 +230,7 @@ func (s *Mux) readSession() {
continue
case common.MUX_PING_FLAG: //ping
s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content)
common.WindowBuff.Put(pack.Content)
continue
case common.MUX_PING_RETURN:
s.pingCh <- pack.Content

View File

@@ -1,8 +1,11 @@
package mux
import (
"bufio"
"fmt"
"net"
"net/http"
"net/http/httputil"
_ "net/http/pprof"
"sync"
"testing"
@@ -37,6 +40,7 @@ func TestNewMux(t *testing.T) {
c2, err := net.Dial("tcp", "127.0.0.1:80")
if err != nil {
logs.Warn(err)
c.Close()
continue
}
go func(c2 net.Conn, c net.Conn) {
@@ -107,6 +111,9 @@ func TestNewMux(t *testing.T) {
}
}()
time.Sleep(time.Second * 5)
//go test_request()
for {
time.Sleep(time.Second * 5)
}
@@ -135,6 +142,51 @@ func client() {
}
}
func test_request() {
conn, _ := net.Dial("tcp", "127.0.0.1:7777")
for {
conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1
Host: 127.0.0.1:7777
Connection: keep-alive
`))
r, err := http.ReadResponse(bufio.NewReader(conn), nil)
if err != nil {
logs.Warn("close by read response err", err)
break
}
logs.Warn("read response success", r)
b, err := httputil.DumpResponse(r, true)
if err != nil {
logs.Warn("close by dump response err", err)
break
}
fmt.Println(string(b[:20]), err)
time.Sleep(time.Second)
}
}
func test_raw() {
conn, _ := net.Dial("tcp", "127.0.0.1:7777")
for {
conn.Write([]byte(`GET /videojs5/test HTTP/1.1
Host: 127.0.0.1:7777
Connection: keep-alive
`))
buf := make([]byte, 1000000)
n, err := conn.Read(buf)
if err != nil {
logs.Warn("close by read response err", err)
break
}
logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n]))
time.Sleep(time.Second)
}
}
func TestNewConn(t *testing.T) {
buf := common.GetBufPoolCopy()
logs.Warn(len(buf), cap(buf))

View File

@@ -51,15 +51,23 @@ func (Self *PriorityQueue) New() {
func (Self *PriorityQueue) Push(packager *common.MuxPackager) {
Self.mutex.Lock()
if Self.popWait {
defer Self.allowPop()
}
if packager.Flag == common.MUX_CONN_CLOSE {
Self.insert(packager) // the close package may need priority,
// prevent wait too long to close
} else {
switch packager.Flag {
case common.MUX_PING_FLAG, common.MUX_PING_RETURN:
Self.list.PushFront(packager)
// the ping package need highest priority
// prevent ping calculation error
case common.MUX_CONN_CLOSE:
Self.insert(packager)
// the close package may need priority too, set second
// prevent wait too long to close conn
default:
Self.list.PushBack(packager)
}
if Self.popWait {
Self.mutex.Unlock()
Self.allowPop()
return
}
Self.mutex.Unlock()
return
}
@@ -68,7 +76,14 @@ func (Self *PriorityQueue) insert(packager *common.MuxPackager) {
element := Self.list.Back()
for {
if element == nil { // PriorityQueue dose not have any of msg package with this close package id
Self.list.PushFront(packager) // insert close package to first
element = Self.list.Front()
if element != nil {
Self.list.InsertAfter(packager, element)
// insert close package to second
} else {
Self.list.PushFront(packager)
// list is empty, push to front
}
break
}
if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG &&
@@ -136,11 +151,13 @@ func (Self *FIFOQueue) New() {
func (Self *FIFOQueue) Push(element *ListElement) {
Self.mutex.Lock()
if Self.popWait {
defer Self.allowPop()
}
Self.list = append(Self.list, element)
Self.length += uint32(element.l)
if Self.popWait {
Self.mutex.Unlock()
Self.allowPop()
return
}
Self.mutex.Unlock()
return
}

View File

@@ -1,8 +1,8 @@
package version
const VERSION = "0.23.2"
const VERSION = "0.23.3"
// Compulsory minimum version, Minimum downward compatibility to this version
func GetVersion() string {
return "0.21.0"
return "0.22.0"
}