Browse Source

fix "timeout" config not working issue.

fix pull stream error when url do not contain a port
修改了"timeout"配置不生效的bug
修改了拉流时如果url不含有端口,则不成功的bug
pull/132/head
macbookpro 6 years ago
parent
commit
09b65d1f61
  1. 25
      rtsp/rich-conn.go
  2. 18
      rtsp/rtsp-client.go
  3. 15
      rtsp/rtsp-server.go
  4. 15
      rtsp/rtsp-session.go

25
rtsp/rich-conn.go

@ -0,0 +1,25 @@
package rtsp
import (
"net"
"time"
)
type RichConn struct {
net.Conn
timeout time.Duration
}
func (conn *RichConn) Read(b []byte) (n int, err error) {
if conn.timeout > 0 {
conn.Conn.SetReadDeadline(time.Now().Add(conn.timeout))
}
return conn.Conn.Read(b)
}
func (conn *RichConn) Write(b []byte) (n int, err error) {
if conn.timeout > 0 {
conn.Conn.SetWriteDeadline(time.Now().Add(conn.timeout))
}
return conn.Conn.Write(b)
}

18
rtsp/rtsp-client.go

@ -5,6 +5,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"github.com/penggy/EasyGoLib/utils"
"io"
"log"
"net"
@ -88,13 +89,25 @@ func (client *RTSPClient) Start() observable.Observable {
if err != nil {
return err
}
conn, err := net.Dial("tcp", l.Hostname()+":"+l.Port())
port := l.Port()
if len(port) == 0 {
port = "554"
}
conn, err := net.Dial("tcp", l.Hostname()+":"+port)
if err != nil {
// handle error
return err
}
client.Conn = conn
client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(conn, 10240), bufio.NewWriterSize(conn, 10240))
networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576)
timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0)
timeoutConn := RichConn{
conn,
time.Duration(timeoutMillis) * time.Millisecond,
}
client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer))
headers := make(map[string]string)
headers["Require"] = "implicit-play"
@ -311,6 +324,7 @@ func (client *RTSPClient) Start() observable.Observable {
}
}
go func() {
defer client.Stop()
r := requestStream()
source <- r
switch r.(type) {

15
rtsp/rtsp-server.go

@ -128,17 +128,20 @@ func (server *Server) Start() (err error) {
log.Println("rtsp server start on", server.TCPPort)
networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576)
for !server.Stoped {
conn, err := server.TCPListener.AcceptTCP()
conn, err := server.TCPListener.Accept()
if err != nil {
log.Println(err)
continue
}
if err := conn.SetReadBuffer(networkBuffer); err != nil {
log.Printf("rtsp server conn set read buffer error, %v", err)
}
if err := conn.SetWriteBuffer(networkBuffer); err != nil {
log.Printf("rtsp server conn set write buffer error, %v", err)
if tcpConn, ok := conn.(*net.TCPConn); ok {
if err := tcpConn.SetReadBuffer(networkBuffer); err != nil {
log.Printf("rtsp server conn set read buffer error, %v", err)
}
if err := tcpConn.SetWriteBuffer(networkBuffer); err != nil {
log.Printf("rtsp server conn set write buffer error, %v", err)
}
}
session := NewSession(server, conn)
go session.Start()
}

15
rtsp/rtsp-session.go

@ -87,7 +87,7 @@ const UDP_BUF_SIZE = 1048576
type Session struct {
ID string
Server *Server
Conn *net.TCPConn
Conn net.Conn
connRW *bufio.ReadWriter
connWLock sync.RWMutex
Type SessionType
@ -127,13 +127,16 @@ func (session *Session) String() string {
return fmt.Sprintf("session[%v][%v][%s][%s]", session.Type, session.TransType, session.Path, session.ID)
}
func NewSession(server *Server, conn *net.TCPConn) *Session {
func NewSession(server *Server, conn net.Conn) *Session {
networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576)
timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0)
timeoutTCPConn := RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond}
session := &Session{
ID: shortid.MustGenerate(),
Server: server,
Conn: conn,
connRW: bufio.NewReadWriter(bufio.NewReaderSize(conn, networkBuffer), bufio.NewWriterSize(conn, networkBuffer)),
connRW: bufio.NewReadWriter(bufio.NewReaderSize(&timeoutTCPConn, networkBuffer), bufio.NewWriterSize(&timeoutTCPConn, networkBuffer)),
StartAt: time.Now(),
Timeout: utils.Conf().Section("rtsp").Key("timeout").MustInt(0),
@ -263,9 +266,9 @@ func (session *Session) Start() {
}
func (session *Session) handleRequest(req *Request) {
if session.Timeout > 0 {
session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second))
}
//if session.Timeout > 0 {
// session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second))
//}
log.Println("<<<", req)
res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "")

Loading…
Cancel
Save