v2ray-core/transport/internet/ws/wsconn.go

202 lines
3.8 KiB
Go
Raw Normal View History

2016-08-13 13:44:36 +00:00
package ws
import (
"bufio"
"io"
"net"
"sync"
"time"
"github.com/gorilla/websocket"
2016-12-04 08:10:47 +00:00
"v2ray.com/core/common/errors"
2016-08-20 18:55:45 +00:00
"v2ray.com/core/common/log"
2016-08-13 13:44:36 +00:00
)
type wsconn struct {
wsc *websocket.Conn
readBuffer *bufio.Reader
connClosing bool
reusable bool
rlock *sync.Mutex
wlock *sync.Mutex
2016-10-02 21:43:58 +00:00
config *Config
2016-08-13 13:44:36 +00:00
}
func (ws *wsconn) Read(b []byte) (n int, err error) {
2016-08-14 03:06:39 +00:00
ws.rlock.Lock()
2016-08-15 10:56:33 +00:00
n, err = ws.read(b)
ws.rlock.Unlock()
return n, err
}
func (ws *wsconn) read(b []byte) (n int, err error) {
2016-08-13 13:44:36 +00:00
if ws.connClosing {
return 0, io.EOF
}
2016-08-14 03:06:39 +00:00
2016-08-15 10:56:33 +00:00
n, err = ws.readNext(b)
return n, err
}
2016-08-13 13:44:36 +00:00
2016-08-15 10:56:33 +00:00
func (ws *wsconn) getNewReadBuffer() error {
_, r, err := ws.wsc.NextReader()
if err != nil {
2016-12-04 08:43:33 +00:00
log.Warning("WS transport: ws connection NewFrameReader return ", err)
2016-08-15 10:56:33 +00:00
ws.connClosing = true
ws.Close()
return err
}
ws.readBuffer = bufio.NewReader(r)
return nil
}
2016-08-13 13:44:36 +00:00
2016-08-15 10:56:33 +00:00
func (ws *wsconn) readNext(b []byte) (n int, err error) {
if ws.readBuffer == nil {
err = ws.getNewReadBuffer()
if err != nil {
return 0, err
2016-08-13 13:44:36 +00:00
}
2016-08-15 10:56:33 +00:00
}
2016-08-13 13:44:36 +00:00
2016-08-15 10:56:33 +00:00
n, err = ws.readBuffer.Read(b)
if err == nil {
2016-08-13 13:44:36 +00:00
return n, err
2016-08-15 10:56:33 +00:00
}
2016-08-13 13:44:36 +00:00
2016-12-04 08:10:47 +00:00
if errors.Cause(err) == io.EOF {
2016-08-15 10:56:33 +00:00
ws.readBuffer = nil
if n == 0 {
return ws.readNext(b)
}
return n, nil
2016-08-13 13:44:36 +00:00
}
return n, err
}
func (ws *wsconn) Write(b []byte) (n int, err error) {
2016-08-14 03:06:39 +00:00
ws.wlock.Lock()
2016-08-13 13:44:36 +00:00
if ws.connClosing {
2016-11-17 22:21:44 +00:00
return 0, io.ErrClosedPipe
2016-08-13 13:44:36 +00:00
}
2016-08-15 10:59:14 +00:00
n, err = ws.write(b)
2016-08-14 03:06:39 +00:00
ws.wlock.Unlock()
2016-08-13 13:44:36 +00:00
return n, err
}
2016-08-15 10:59:14 +00:00
func (ws *wsconn) write(b []byte) (n int, err error) {
wr, err := ws.wsc.NextWriter(websocket.BinaryMessage)
if err != nil {
2016-12-04 08:43:33 +00:00
log.Warning("WS transport: ws connection NewFrameReader return ", err)
2016-08-15 10:59:14 +00:00
ws.connClosing = true
ws.Close()
return 0, err
}
n, err = wr.Write(b)
if err != nil {
return 0, err
}
err = wr.Close()
if err != nil {
return 0, err
}
return n, err
}
2016-08-13 13:44:36 +00:00
func (ws *wsconn) Close() error {
ws.connClosing = true
2016-08-14 12:40:59 +00:00
ws.wlock.Lock()
2016-08-14 03:06:39 +00:00
ws.wsc.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add((time.Second * 5)))
2016-08-14 12:40:59 +00:00
ws.wlock.Unlock()
2016-08-13 13:44:36 +00:00
err := ws.wsc.Close()
return err
}
func (ws *wsconn) LocalAddr() net.Addr {
return ws.wsc.LocalAddr()
}
func (ws *wsconn) RemoteAddr() net.Addr {
return ws.wsc.RemoteAddr()
}
func (ws *wsconn) SetDeadline(t time.Time) error {
return func() error {
errr := ws.SetReadDeadline(t)
errw := ws.SetWriteDeadline(t)
if errr == nil || errw == nil {
return nil
}
if errr != nil {
return errr
}
return errw
}()
}
func (ws *wsconn) SetReadDeadline(t time.Time) error {
return ws.wsc.SetReadDeadline(t)
}
func (ws *wsconn) SetWriteDeadline(t time.Time) error {
return ws.wsc.SetWriteDeadline(t)
}
func (ws *wsconn) setup() {
ws.connClosing = false
2016-08-15 09:53:11 +00:00
/*
https://godoc.org/github.com/gorilla/websocket#Conn.NextReader
https://godoc.org/github.com/gorilla/websocket#Conn.NextWriter
2016-08-15 10:59:50 +00:00
Both Read and write access are both exclusive.
2016-08-15 09:53:11 +00:00
And in both case it will need a lock.
*/
2016-08-13 13:44:36 +00:00
ws.rlock = &sync.Mutex{}
ws.wlock = &sync.Mutex{}
2016-08-14 03:06:39 +00:00
ws.pingPong()
2016-08-13 13:44:36 +00:00
}
func (ws *wsconn) Reusable() bool {
return ws.reusable && !ws.connClosing
}
func (ws *wsconn) SetReusable(reusable bool) {
2016-10-17 12:35:13 +00:00
if !ws.config.ConnectionReuse.IsEnabled() {
2016-08-13 13:44:36 +00:00
return
}
ws.reusable = reusable
}
func (ws *wsconn) pingPong() {
pongRcv := make(chan int, 0)
ws.wsc.SetPongHandler(func(data string) error {
pongRcv <- 0
return nil
})
go func() {
for !ws.connClosing {
2016-08-14 12:40:59 +00:00
ws.wlock.Lock()
2016-08-13 13:44:36 +00:00
ws.wsc.WriteMessage(websocket.PingMessage, nil)
2016-08-14 12:40:59 +00:00
ws.wlock.Unlock()
2016-08-15 11:13:18 +00:00
tick := time.After(time.Second * 3)
2016-08-13 13:44:36 +00:00
select {
case <-pongRcv:
break
2016-08-15 11:13:18 +00:00
case <-tick:
2016-08-15 04:23:15 +00:00
if !ws.connClosing {
log.Debug("WS:Closing as ping is not responded~" + ws.wsc.UnderlyingConn().LocalAddr().String() + "-" + ws.wsc.UnderlyingConn().RemoteAddr().String())
}
2016-08-13 13:44:36 +00:00
ws.Close()
}
2016-08-15 11:13:18 +00:00
<-time.After(time.Second * 27)
2016-08-13 13:44:36 +00:00
}
return
}()
}