pull stream support UDP mode

pull/132/head
macbookpro 2019-01-05 01:19:17 +08:00
parent cd373b1ab2
commit f156df5683
3 changed files with 121 additions and 36 deletions

View File

@ -32,7 +32,7 @@ type RTSPClient struct {
Path string Path string
CustomPath string //custom path for pusher CustomPath string //custom path for pusher
ID string ID string
Conn net.Conn Conn *RichConn
Session string Session string
Seq int Seq int
connRW *bufio.ReadWriter connRW *bufio.ReadWriter
@ -57,6 +57,7 @@ type RTSPClient struct {
vRTPChannel int vRTPChannel int
vRTPControlChannel int vRTPControlChannel int
UDPServer *UDPServer
RTPHandles []func(*RTPPack) RTPHandles []func(*RTPPack)
StopHandles []func() StopHandles []func()
} }
@ -76,6 +77,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent
URL: rawUrl, URL: rawUrl,
ID: shortid.MustGenerate(), ID: shortid.MustGenerate(),
Path: url.Path, Path: url.Path,
TransType: TRANS_TYPE_UDP,
vRTPChannel: 0, vRTPChannel: 0,
vRTPControlChannel: 1, vRTPControlChannel: 1,
aRTPChannel: 2, aRTPChannel: 2,
@ -206,7 +208,6 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
// handle error // handle error
return err return err
} }
client.Conn = conn
networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(204800) networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(204800)
@ -214,6 +215,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
conn, conn,
timeout, timeout,
} }
client.Conn = &timeoutConn
client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer)) client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer))
headers := make(map[string]string) headers := make(map[string]string)
@ -280,7 +282,21 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
_url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.VControl, "/") _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.VControl, "/")
} }
headers = make(map[string]string) headers = make(map[string]string)
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel) if client.TransType == TRANS_TYPE_TCP {
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel)
} else {
if client.UDPServer == nil {
client.UDPServer = &UDPServer{RTSPClient: client}
}
//RTP/AVP;unicast;client_port=64864-64865
err = client.UDPServer.SetupVideo()
if err != nil {
logger.Printf("Setup video err.%v", err)
return err
}
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.VPort, client.UDPServer.VControlPort)
client.Conn.timeout = 0 // UDP ignore timeout
}
if Session != "" { if Session != "" {
headers["Session"] = Session headers["Session"] = Session
} }
@ -300,7 +316,20 @@ func (client *RTSPClient) Start(timeout time.Duration) error {
_url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.AControl, "/") _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.AControl, "/")
} }
headers = make(map[string]string) headers = make(map[string]string)
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel) if client.TransType == TRANS_TYPE_TCP {
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel)
} else {
if client.UDPServer == nil {
client.UDPServer = &UDPServer{RTSPClient: client}
}
err = client.UDPServer.SetupAudio()
if err != nil {
logger.Printf("Setup audio err.%v", err)
return err
}
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.APort, client.UDPServer.AControlPort)
client.Conn.timeout = 0 // UDP ignore timeout
}
if Session != "" { if Session != "" {
headers["Session"] = Session headers["Session"] = Session
} }

View File

@ -392,6 +392,25 @@ func (session *Session) handleRequest(req *Request) {
setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host) setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host)
} }
setupPath := setupUrl.String() setupPath := setupUrl.String()
// 播放器可能直接从SETUP来不用DESCRIBE比如可能事先已经获取过了
if session.Pusher == nil {
session.Path = setupUrl.Path
pusher := session.Server.GetPusher(session.Path)
if pusher == nil {
res.StatusCode = 404
res.Status = "NOT FOUND"
return
}
session.Type = SESSEION_TYPE_PLAYER
session.Player = NewPlayer(session, pusher)
session.Pusher = pusher
session.AControl = pusher.AControl()
session.VControl = pusher.VControl()
session.ACodec = pusher.ACodec()
session.VCodec = pusher.VCodec()
session.Conn.timeout = 0
}
//setupPath = setupPath[strings.LastIndex(setupPath, "/")+1:] //setupPath = setupPath[strings.LastIndex(setupPath, "/")+1:]
vPath := "" vPath := ""
if strings.Index(strings.ToLower(session.VControl), "rtsp://") == 0 { if strings.Index(strings.ToLower(session.VControl), "rtsp://") == 0 {
@ -446,7 +465,7 @@ func (session *Session) handleRequest(req *Request) {
session.TransType = TRANS_TYPE_UDP session.TransType = TRANS_TYPE_UDP
// no need for tcp timeout. // no need for tcp timeout.
session.Conn.timeout = 0 session.Conn.timeout = 0
if session.UDPClient == nil { if session.Type == SESSEION_TYPE_PLAYER && session.UDPClient == nil {
session.UDPClient = &UDPClient{ session.UDPClient = &UDPClient{
Session: session, Session: session,
} }
@ -458,14 +477,15 @@ func (session *Session) handleRequest(req *Request) {
} }
logger.Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath) logger.Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath)
if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) { if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) {
session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1]) if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3]) session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1])
if err := session.UDPClient.SetupAudio(); err != nil { session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3])
res.StatusCode = 500 if err := session.UDPClient.SetupAudio(); err != nil {
res.Status = fmt.Sprintf("udp client setup audio error, %v", err) res.StatusCode = 500
return res.Status = fmt.Sprintf("udp client setup audio error, %v", err)
return
}
} }
if session.Type == SESSION_TYPE_PUSHER { if session.Type == SESSION_TYPE_PUSHER {
if err := session.Pusher.UDPServer.SetupAudio(); err != nil { if err := session.Pusher.UDPServer.SetupAudio(); err != nil {
res.StatusCode = 500 res.StatusCode = 500
@ -485,12 +505,14 @@ func (session *Session) handleRequest(req *Request) {
ts = strings.Join(tss, ";") ts = strings.Join(tss, ";")
} }
} else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) { } else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) {
session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1]) if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3]) session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1])
if err := session.UDPClient.SetupVideo(); err != nil { session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3])
res.StatusCode = 500 if err := session.UDPClient.SetupVideo(); err != nil {
res.Status = fmt.Sprintf("udp client setup video error, %v", err) res.StatusCode = 500
return res.Status = fmt.Sprintf("udp client setup video error, %v", err)
return
}
} }
if session.Type == SESSION_TYPE_PUSHER { if session.Type == SESSION_TYPE_PUSHER {

View File

@ -2,6 +2,8 @@ package rtsp
import ( import (
"bytes" "bytes"
"fmt"
"log"
"net" "net"
"strconv" "strconv"
"strings" "strings"
@ -11,6 +13,7 @@ import (
type UDPServer struct { type UDPServer struct {
*Session *Session
*RTSPClient
APort int APort int
AConn *net.UDPConn AConn *net.UDPConn
@ -24,6 +27,45 @@ type UDPServer struct {
Stoped bool Stoped bool
} }
func (s* UDPServer)AddInputBytes(bytes int) {
if s.Session != nil {
s.Session.InBytes += bytes
return
}
if s.RTSPClient != nil {
s.RTSPClient.InBytes += bytes
return
}
panic(fmt.Errorf("session and RTSPClient both nil"))
}
func (s *UDPServer)HandleRTP(pack *RTPPack) {
if s.Session != nil {
for _, v := range s.Session.RTPHandles {
v(pack)
}
return
}
if s.RTSPClient != nil {
for _, v := range s.RTSPClient.RTPHandles {
v(pack)
}
return
}
panic(fmt.Errorf("session and RTSPClient both nil"))
}
func (s *UDPServer) Logger() *log.Logger {
if s.Session != nil {
return s.Session.logger
}
if s.RTSPClient != nil {
return s.RTSPClient.logger
}
panic(fmt.Errorf("session and RTSPClient both nil"))
}
func (s *UDPServer) Stop() { func (s *UDPServer) Stop() {
if s.Stoped { if s.Stoped {
return return
@ -48,7 +90,7 @@ func (s *UDPServer) Stop() {
} }
func (s *UDPServer) SetupAudio() (err error) { func (s *UDPServer) SetupAudio() (err error) {
logger := s.logger logger := s.Logger()
addr, err := net.ResolveUDPAddr("udp", ":0") addr, err := net.ResolveUDPAddr("udp", ":0")
if err != nil { if err != nil {
return return
@ -77,15 +119,13 @@ func (s *UDPServer) SetupAudio() (err error) {
for !s.Stoped { for !s.Stoped {
if n, _, err := s.AConn.ReadFromUDP(bufUDP); err == nil { if n, _, err := s.AConn.ReadFromUDP(bufUDP); err == nil {
rtpBytes := make([]byte, n) rtpBytes := make([]byte, n)
s.Session.InBytes += n s.AddInputBytes(n)
copy(rtpBytes, bufUDP) copy(rtpBytes, bufUDP)
pack := &RTPPack{ pack := &RTPPack{
Type: RTP_TYPE_AUDIO, Type: RTP_TYPE_AUDIO,
Buffer: bytes.NewBuffer(rtpBytes), Buffer: bytes.NewBuffer(rtpBytes),
} }
for _, h := range s.Session.RTPHandles { s.HandleRTP(pack)
h(pack)
}
} else { } else {
logger.Println("udp server read audio pack error", err) logger.Println("udp server read audio pack error", err)
continue continue
@ -119,15 +159,13 @@ func (s *UDPServer) SetupAudio() (err error) {
for !s.Stoped { for !s.Stoped {
if n, _, err := s.AControlConn.ReadFromUDP(bufUDP); err == nil { if n, _, err := s.AControlConn.ReadFromUDP(bufUDP); err == nil {
rtpBytes := make([]byte, n) rtpBytes := make([]byte, n)
s.Session.InBytes += n s.AddInputBytes(n)
copy(rtpBytes, bufUDP) copy(rtpBytes, bufUDP)
pack := &RTPPack{ pack := &RTPPack{
Type: RTP_TYPE_AUDIOCONTROL, Type: RTP_TYPE_AUDIOCONTROL,
Buffer: bytes.NewBuffer(rtpBytes), Buffer: bytes.NewBuffer(rtpBytes),
} }
for _, h := range s.Session.RTPHandles { s.HandleRTP(pack)
h(pack)
}
} else { } else {
logger.Println("udp server read audio control pack error", err) logger.Println("udp server read audio control pack error", err)
continue continue
@ -138,7 +176,7 @@ func (s *UDPServer) SetupAudio() (err error) {
} }
func (s *UDPServer) SetupVideo() (err error) { func (s *UDPServer) SetupVideo() (err error) {
logger := s.logger logger := s.Logger()
addr, err := net.ResolveUDPAddr("udp", ":0") addr, err := net.ResolveUDPAddr("udp", ":0")
if err != nil { if err != nil {
return return
@ -167,15 +205,13 @@ func (s *UDPServer) SetupVideo() (err error) {
for !s.Stoped { for !s.Stoped {
if n, _, err := s.VConn.ReadFromUDP(bufUDP); err == nil { if n, _, err := s.VConn.ReadFromUDP(bufUDP); err == nil {
rtpBytes := make([]byte, n) rtpBytes := make([]byte, n)
s.Session.InBytes += n s.AddInputBytes(n)
copy(rtpBytes, bufUDP) copy(rtpBytes, bufUDP)
pack := &RTPPack{ pack := &RTPPack{
Type: RTP_TYPE_VIDEO, Type: RTP_TYPE_VIDEO,
Buffer: bytes.NewBuffer(rtpBytes), Buffer: bytes.NewBuffer(rtpBytes),
} }
for _, h := range s.Session.RTPHandles { s.HandleRTP(pack)
h(pack)
}
} else { } else {
logger.Println("udp server read video pack error", err) logger.Println("udp server read video pack error", err)
continue continue
@ -210,15 +246,13 @@ func (s *UDPServer) SetupVideo() (err error) {
for !s.Stoped { for !s.Stoped {
if n, _, err := s.VControlConn.ReadFromUDP(bufUDP); err == nil { if n, _, err := s.VControlConn.ReadFromUDP(bufUDP); err == nil {
rtpBytes := make([]byte, n) rtpBytes := make([]byte, n)
s.Session.InBytes += n s.AddInputBytes(n)
copy(rtpBytes, bufUDP) copy(rtpBytes, bufUDP)
pack := &RTPPack{ pack := &RTPPack{
Type: RTP_TYPE_VIDEOCONTROL, Type: RTP_TYPE_VIDEOCONTROL,
Buffer: bytes.NewBuffer(rtpBytes), Buffer: bytes.NewBuffer(rtpBytes),
} }
for _, h := range s.Session.RTPHandles { s.HandleRTP(pack)
h(pack)
}
} else { } else {
logger.Println("udp server read video control pack error", err) logger.Println("udp server read video control pack error", err)
continue continue