diff --git a/rtsp/rich-conn.go b/rtsp/rich-conn.go new file mode 100644 index 00000000..c2297ed5 --- /dev/null +++ b/rtsp/rich-conn.go @@ -0,0 +1,25 @@ +package rtsp + +import ( + "net" + "time" +) + +type RichConn struct { + net.Conn + timeout time.Duration +} + +func (conn *RichConn) Read(b []byte) (n int, err error) { + if conn.timeout > 0 { + conn.Conn.SetReadDeadline(time.Now().Add(conn.timeout)) + } + return conn.Conn.Read(b) +} + +func (conn *RichConn) Write(b []byte) (n int, err error) { + if conn.timeout > 0 { + conn.Conn.SetWriteDeadline(time.Now().Add(conn.timeout)) + } + return conn.Conn.Write(b) +} diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index 1b9ee2cf..aeb8d8d5 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/binary" "fmt" + "github.com/penggy/EasyGoLib/utils" "io" "log" "net" @@ -88,13 +89,25 @@ func (client *RTSPClient) Start() observable.Observable { if err != nil { return err } - conn, err := net.Dial("tcp", l.Hostname()+":"+l.Port()) + port := l.Port() + if len(port) == 0 { + port = "554" + } + conn, err := net.Dial("tcp", l.Hostname()+":"+port) if err != nil { // handle error return err } client.Conn = conn - client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(conn, 10240), bufio.NewWriterSize(conn, 10240)) + + networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) + timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0) + + timeoutConn := RichConn{ + conn, + time.Duration(timeoutMillis) * time.Millisecond, + } + client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer)) headers := make(map[string]string) headers["Require"] = "implicit-play" @@ -311,6 +324,7 @@ func (client *RTSPClient) Start() observable.Observable { } } go func() { + defer client.Stop() r := requestStream() source <- r switch r.(type) { diff --git a/rtsp/rtsp-server.go b/rtsp/rtsp-server.go index 2b977f0d..ea6ba50a 100644 --- a/rtsp/rtsp-server.go +++ b/rtsp/rtsp-server.go @@ -128,17 +128,20 @@ func (server *Server) Start() (err error) { log.Println("rtsp server start on", server.TCPPort) networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) for !server.Stoped { - conn, err := server.TCPListener.AcceptTCP() + conn, err := server.TCPListener.Accept() if err != nil { log.Println(err) continue } - if err := conn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("rtsp server conn set read buffer error, %v", err) - } - if err := conn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("rtsp server conn set write buffer error, %v", err) + if tcpConn, ok := conn.(*net.TCPConn); ok { + if err := tcpConn.SetReadBuffer(networkBuffer); err != nil { + log.Printf("rtsp server conn set read buffer error, %v", err) + } + if err := tcpConn.SetWriteBuffer(networkBuffer); err != nil { + log.Printf("rtsp server conn set write buffer error, %v", err) + } } + session := NewSession(server, conn) go session.Start() } diff --git a/rtsp/rtsp-session.go b/rtsp/rtsp-session.go index 1393d8a0..be75e0ac 100644 --- a/rtsp/rtsp-session.go +++ b/rtsp/rtsp-session.go @@ -87,7 +87,7 @@ const UDP_BUF_SIZE = 1048576 type Session struct { ID string Server *Server - Conn *net.TCPConn + Conn net.Conn connRW *bufio.ReadWriter connWLock sync.RWMutex Type SessionType @@ -127,13 +127,16 @@ func (session *Session) String() string { return fmt.Sprintf("session[%v][%v][%s][%s]", session.Type, session.TransType, session.Path, session.ID) } -func NewSession(server *Server, conn *net.TCPConn) *Session { +func NewSession(server *Server, conn net.Conn) *Session { networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) + timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0) + timeoutTCPConn := RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond} + session := &Session{ ID: shortid.MustGenerate(), Server: server, Conn: conn, - connRW: bufio.NewReadWriter(bufio.NewReaderSize(conn, networkBuffer), bufio.NewWriterSize(conn, networkBuffer)), + connRW: bufio.NewReadWriter(bufio.NewReaderSize(&timeoutTCPConn, networkBuffer), bufio.NewWriterSize(&timeoutTCPConn, networkBuffer)), StartAt: time.Now(), Timeout: utils.Conf().Section("rtsp").Key("timeout").MustInt(0), @@ -263,9 +266,9 @@ func (session *Session) Start() { } func (session *Session) handleRequest(req *Request) { - if session.Timeout > 0 { - session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second)) - } + //if session.Timeout > 0 { + // session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second)) + //} log.Println("<<<", req) res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "")