From f156df5683f85d4a0090a9ecc594e39dc3d306ce Mon Sep 17 00:00:00 2001 From: macbookpro Date: Sat, 5 Jan 2019 01:19:17 +0800 Subject: [PATCH] pull stream support UDP mode --- rtsp/rtsp-client.go | 37 ++++++++++++++++++++--- rtsp/rtsp-session.go | 50 ++++++++++++++++++++++--------- rtsp/udp-server.go | 70 ++++++++++++++++++++++++++++++++------------ 3 files changed, 121 insertions(+), 36 deletions(-) diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index c2bff09a..e2f731c7 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -32,7 +32,7 @@ type RTSPClient struct { Path string CustomPath string //custom path for pusher ID string - Conn net.Conn + Conn *RichConn Session string Seq int connRW *bufio.ReadWriter @@ -57,6 +57,7 @@ type RTSPClient struct { vRTPChannel int vRTPControlChannel int + UDPServer *UDPServer RTPHandles []func(*RTPPack) StopHandles []func() } @@ -76,6 +77,7 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64, agent URL: rawUrl, ID: shortid.MustGenerate(), Path: url.Path, + TransType: TRANS_TYPE_UDP, vRTPChannel: 0, vRTPControlChannel: 1, aRTPChannel: 2, @@ -206,7 +208,6 @@ func (client *RTSPClient) Start(timeout time.Duration) error { // handle error return err } - client.Conn = conn networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(204800) @@ -214,6 +215,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { conn, timeout, } + client.Conn = &timeoutConn client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(&timeoutConn, networkBuffer), bufio.NewWriterSize(&timeoutConn, networkBuffer)) headers := make(map[string]string) @@ -280,7 +282,21 @@ func (client *RTSPClient) Start(timeout time.Duration) error { _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.VControl, "/") } headers = make(map[string]string) - headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel) + if client.TransType == TRANS_TYPE_TCP { + headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel) + } else { + if client.UDPServer == nil { + client.UDPServer = &UDPServer{RTSPClient: client} + } + //RTP/AVP;unicast;client_port=64864-64865 + err = client.UDPServer.SetupVideo() + if err != nil { + logger.Printf("Setup video err.%v", err) + return err + } + headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.VPort, client.UDPServer.VControlPort) + client.Conn.timeout = 0 // UDP ignore timeout + } if Session != "" { headers["Session"] = Session } @@ -300,7 +316,20 @@ func (client *RTSPClient) Start(timeout time.Duration) error { _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.AControl, "/") } headers = make(map[string]string) - headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel) + if client.TransType == TRANS_TYPE_TCP { + headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel) + } else { + if client.UDPServer == nil { + client.UDPServer = &UDPServer{RTSPClient: client} + } + err = client.UDPServer.SetupAudio() + if err != nil { + logger.Printf("Setup audio err.%v", err) + return err + } + headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;client_port=%d-%d", client.UDPServer.APort, client.UDPServer.AControlPort) + client.Conn.timeout = 0 // UDP ignore timeout + } if Session != "" { headers["Session"] = Session } diff --git a/rtsp/rtsp-session.go b/rtsp/rtsp-session.go index 9c86a3da..90ec7d82 100644 --- a/rtsp/rtsp-session.go +++ b/rtsp/rtsp-session.go @@ -392,6 +392,25 @@ func (session *Session) handleRequest(req *Request) { setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host) } setupPath := setupUrl.String() + + // 播放器可能直接从SETUP来,不用DESCRIBE(比如可能事先已经获取过了) + if session.Pusher == nil { + session.Path = setupUrl.Path + pusher := session.Server.GetPusher(session.Path) + if pusher == nil { + res.StatusCode = 404 + res.Status = "NOT FOUND" + return + } + session.Type = SESSEION_TYPE_PLAYER + session.Player = NewPlayer(session, pusher) + session.Pusher = pusher + session.AControl = pusher.AControl() + session.VControl = pusher.VControl() + session.ACodec = pusher.ACodec() + session.VCodec = pusher.VCodec() + session.Conn.timeout = 0 + } //setupPath = setupPath[strings.LastIndex(setupPath, "/")+1:] vPath := "" if strings.Index(strings.ToLower(session.VControl), "rtsp://") == 0 { @@ -446,7 +465,7 @@ func (session *Session) handleRequest(req *Request) { session.TransType = TRANS_TYPE_UDP // no need for tcp timeout. session.Conn.timeout = 0 - if session.UDPClient == nil { + if session.Type == SESSEION_TYPE_PLAYER && session.UDPClient == nil { session.UDPClient = &UDPClient{ Session: session, } @@ -458,14 +477,15 @@ func (session *Session) handleRequest(req *Request) { } logger.Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath) if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) { - session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1]) - session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3]) - if err := session.UDPClient.SetupAudio(); err != nil { - res.StatusCode = 500 - res.Status = fmt.Sprintf("udp client setup audio error, %v", err) - return + if session.Type == SESSEION_TYPE_PLAYER { + session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1]) + session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3]) + if err := session.UDPClient.SetupAudio(); err != nil { + res.StatusCode = 500 + res.Status = fmt.Sprintf("udp client setup audio error, %v", err) + return + } } - if session.Type == SESSION_TYPE_PUSHER { if err := session.Pusher.UDPServer.SetupAudio(); err != nil { res.StatusCode = 500 @@ -485,12 +505,14 @@ func (session *Session) handleRequest(req *Request) { ts = strings.Join(tss, ";") } } else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) { - session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1]) - session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3]) - if err := session.UDPClient.SetupVideo(); err != nil { - res.StatusCode = 500 - res.Status = fmt.Sprintf("udp client setup video error, %v", err) - return + if session.Type == SESSEION_TYPE_PLAYER { + session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1]) + session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3]) + if err := session.UDPClient.SetupVideo(); err != nil { + res.StatusCode = 500 + res.Status = fmt.Sprintf("udp client setup video error, %v", err) + return + } } if session.Type == SESSION_TYPE_PUSHER { diff --git a/rtsp/udp-server.go b/rtsp/udp-server.go index 5de7962f..a4a66c3f 100644 --- a/rtsp/udp-server.go +++ b/rtsp/udp-server.go @@ -2,6 +2,8 @@ package rtsp import ( "bytes" + "fmt" + "log" "net" "strconv" "strings" @@ -11,6 +13,7 @@ import ( type UDPServer struct { *Session + *RTSPClient APort int AConn *net.UDPConn @@ -24,6 +27,45 @@ type UDPServer struct { Stoped bool } +func (s* UDPServer)AddInputBytes(bytes int) { + if s.Session != nil { + s.Session.InBytes += bytes + return + } + if s.RTSPClient != nil { + s.RTSPClient.InBytes += bytes + return + } + panic(fmt.Errorf("session and RTSPClient both nil")) +} + +func (s *UDPServer)HandleRTP(pack *RTPPack) { + if s.Session != nil { + for _, v := range s.Session.RTPHandles { + v(pack) + } + return + } + + if s.RTSPClient != nil { + for _, v := range s.RTSPClient.RTPHandles { + v(pack) + } + return + } + panic(fmt.Errorf("session and RTSPClient both nil")) +} + +func (s *UDPServer) Logger() *log.Logger { + if s.Session != nil { + return s.Session.logger + } + if s.RTSPClient != nil { + return s.RTSPClient.logger + } + panic(fmt.Errorf("session and RTSPClient both nil")) +} + func (s *UDPServer) Stop() { if s.Stoped { return @@ -48,7 +90,7 @@ func (s *UDPServer) Stop() { } func (s *UDPServer) SetupAudio() (err error) { - logger := s.logger + logger := s.Logger() addr, err := net.ResolveUDPAddr("udp", ":0") if err != nil { return @@ -77,15 +119,13 @@ func (s *UDPServer) SetupAudio() (err error) { for !s.Stoped { if n, _, err := s.AConn.ReadFromUDP(bufUDP); err == nil { rtpBytes := make([]byte, n) - s.Session.InBytes += n + s.AddInputBytes(n) copy(rtpBytes, bufUDP) pack := &RTPPack{ Type: RTP_TYPE_AUDIO, Buffer: bytes.NewBuffer(rtpBytes), } - for _, h := range s.Session.RTPHandles { - h(pack) - } + s.HandleRTP(pack) } else { logger.Println("udp server read audio pack error", err) continue @@ -119,15 +159,13 @@ func (s *UDPServer) SetupAudio() (err error) { for !s.Stoped { if n, _, err := s.AControlConn.ReadFromUDP(bufUDP); err == nil { rtpBytes := make([]byte, n) - s.Session.InBytes += n + s.AddInputBytes(n) copy(rtpBytes, bufUDP) pack := &RTPPack{ Type: RTP_TYPE_AUDIOCONTROL, Buffer: bytes.NewBuffer(rtpBytes), } - for _, h := range s.Session.RTPHandles { - h(pack) - } + s.HandleRTP(pack) } else { logger.Println("udp server read audio control pack error", err) continue @@ -138,7 +176,7 @@ func (s *UDPServer) SetupAudio() (err error) { } func (s *UDPServer) SetupVideo() (err error) { - logger := s.logger + logger := s.Logger() addr, err := net.ResolveUDPAddr("udp", ":0") if err != nil { return @@ -167,15 +205,13 @@ func (s *UDPServer) SetupVideo() (err error) { for !s.Stoped { if n, _, err := s.VConn.ReadFromUDP(bufUDP); err == nil { rtpBytes := make([]byte, n) - s.Session.InBytes += n + s.AddInputBytes(n) copy(rtpBytes, bufUDP) pack := &RTPPack{ Type: RTP_TYPE_VIDEO, Buffer: bytes.NewBuffer(rtpBytes), } - for _, h := range s.Session.RTPHandles { - h(pack) - } + s.HandleRTP(pack) } else { logger.Println("udp server read video pack error", err) continue @@ -210,15 +246,13 @@ func (s *UDPServer) SetupVideo() (err error) { for !s.Stoped { if n, _, err := s.VControlConn.ReadFromUDP(bufUDP); err == nil { rtpBytes := make([]byte, n) - s.Session.InBytes += n + s.AddInputBytes(n) copy(rtpBytes, bufUDP) pack := &RTPPack{ Type: RTP_TYPE_VIDEOCONTROL, Buffer: bytes.NewBuffer(rtpBytes), } - for _, h := range s.Session.RTPHandles { - h(pack) - } + s.HandleRTP(pack) } else { logger.Println("udp server read video control pack error", err) continue