From 909ffa0f2b1a16fb00aff02f5817be8db7d20004 Mon Sep 17 00:00:00 2001 From: macbookpro Date: Sat, 17 Nov 2018 10:23:00 +0800 Subject: [PATCH] =?UTF-8?q?support=20pull=20and=20distribute;=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=8B=89=E6=B5=81=E8=BD=AC=E5=8F=91=20support=20save?= =?UTF-8?q?=20stream=20to=20file;=E6=94=AF=E6=8C=81=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E6=B5=81=E5=88=B0=E6=9C=AC=E5=9C=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- routers/stats.go | 6 +- rtsp/player-client.go | 264 ------------------------ rtsp/pusher.go | 137 ++++++++++++- rtsp/rtsp-client.go | 465 ++++++++++++++++++++++++++++++++++++++++++ rtsp/rtsp-server.go | 120 +++++++++-- rtsp/rtsp-session.go | 10 +- 6 files changed, 710 insertions(+), 292 deletions(-) delete mode 100644 rtsp/player-client.go create mode 100644 rtsp/rtsp-client.go diff --git a/routers/stats.go b/routers/stats.go index ab532a89..5ff7a4c9 100644 --- a/routers/stats.go +++ b/routers/stats.go @@ -50,7 +50,7 @@ func (h *APIHandler) Pushers(c *gin.Context) { hostname := utils.GetRequestHostname(c.Request) pushers := make([]interface{}, 0) for _, pusher := range rtsp.Instance.GetPushers() { - port := pusher.Server.TCPPort + port := pusher.Server().TCPPort rtsp := fmt.Sprintf("rtsp://%s:%d%s", hostname, port, pusher.Path) if port == 554 { rtsp = fmt.Sprintf("rtsp://%s%s", hostname, pusher.Path) @@ -62,8 +62,8 @@ func (h *APIHandler) Pushers(c *gin.Context) { "id": pusher.ID, "path": rtsp, "transType": pusher.TransType.String(), - "inBytes": pusher.InBytes, - "outBytes": pusher.OutBytes, + "inBytes": pusher.InBytes(), + "outBytes": pusher.OutBytes(), "startAt": utils.DateTime(pusher.StartAt), "onlines": len(pusher.GetPlayers()), }) diff --git a/rtsp/player-client.go b/rtsp/player-client.go deleted file mode 100644 index 5c636448..00000000 --- a/rtsp/player-client.go +++ /dev/null @@ -1,264 +0,0 @@ -package rtsp - -import ( - "bufio" - "fmt" - "github.com/reactivex/rxgo/observable" - "io" - "net" - "net/url" - "strconv" - "strings" -) - -type PlayerClient struct { - Stoped bool - Path string - Conn *net.Conn - AuthHeaders bool - Session *string - Seq int - connRW *bufio.ReadWriter - InBytes uint64 -} - -func NewPlayerClient(path string) *PlayerClient { - session := &PlayerClient{ - Stoped: false, - Path: path, - } - return session -} - -func (client *PlayerClient) Start() observable.Observable { - return observable.Start(func() interface{} { - l, err := url.Parse(client.Path) - if err != nil { - return err - } - conn, err := net.Dial("tcp", l.Hostname()+":"+l.Port()) - if err != nil { - // handle error - return err - } - client.Conn = &conn - client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(conn, 10240), bufio.NewWriterSize(conn, 10240)) - - headers := make(map[string]string) - headers["Require"] = "implicit-play" - // An OPTIONS request returns the request types the server will accept. - resp, err := client.Request("OPTIONS", headers) - if err != nil { - return err - } - fmt.Println("StatusCode:", resp.StatusCode) - - // A DESCRIBE request includes an RTSP URL (rtsp://...), and the type of reply data that can be handled. This reply includes the presentation description, - // typically in Session Description Protocol (SDP) format. Among other things, the presentation description lists the media streams controlled with the aggregate URL. - // In the typical case, there is one media stream each for audio and video. - headers = make(map[string]string) - headers["Accept"] = "application/sdp" - resp, err = client.Request("DESCRIBE", headers) - if err != nil { - return err - } - - //fmt.Println("StatusCode:",resp.StatusCode) - headers = make(map[string]string) - headers["Transport"] = "RTP/AVP;unicast;client_port=8000-8001" - resp, err = client.Request("SETUP", headers) - if err != nil { - return err - } - - //fmt.Println("StatusCode:",resp.StatusCode) - - ////fmt.Fprintf(conn, "GET / HTTP/1.0\r\n\r\n") - ////status, err := bufio.NewReader(conn).ReadString('\n') - ////url.Host - ////text, err := reader.ReadString('\n') - ////if err != nil { - //// return err - ////} - //return text - - return 0 - }) - //return observable.Just(1) -} - -func (client *PlayerClient) Request(method string, headers map[string]string) (resp *Response, err error) { - headers["User-Agent"] = "EasyDarwinGo" - if client.AuthHeaders { - //headers["Authorization"] = this.digest(method, _url); - } - if client.Session != nil { - headers["Session"] = *client.Session - } - client.Seq++ - cseq := client.Seq - builder := strings.Builder{} - builder.WriteString(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, client.Path)) - builder.WriteString(fmt.Sprintf("CSeq: %d\r\n", cseq)) - for k, v := range headers { - builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)) - } - builder.WriteString(fmt.Sprintf("\r\n")) - s := builder.String() - fmt.Println("C->S >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - fmt.Println(s) - _, err = client.connRW.WriteString(s) - if err != nil { - return - } - client.connRW.Flush() - lineCount := 0 - statusCode := 200 - status := "" - sid := "" - contentLen := 0 - respHeader := make(map[string]string) - var line []byte - builder.Reset() - for !client.Stoped { - if line, _, err = client.connRW.ReadLine(); err != nil { - return - } else { - if len(line) == 0 { - fmt.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") - fmt.Println(builder.String()) - resp = NewResponse(statusCode, status, strconv.Itoa(cseq), sid, "") - return - } - s := string(line) - builder.Write(line) - builder.WriteString("\r\n") - - if lineCount == 0 { - splits := strings.Split(s, " ") - if len(splits) < 3 { - err = fmt.Errorf("StatusCode Line error:%s", s) - return - } - statusCode, err = strconv.Atoi(splits[1]) - if err != nil { - return - } - if statusCode != 200 { - err = fmt.Errorf("Response StatusCode is :%d", statusCode) - return - } - status = splits[2] - } - lineCount++ - splits := strings.Split(s, ":") - if len(splits) == 2 { - respHeader[splits[0]] = strings.TrimSpace(splits[1]) - } - if strings.Index(s, "Session:") == 0 { - splits := strings.Split(s, ":") - sid = strings.TrimSpace(splits[1]) - } - //if strings.Index(s, "CSeq:") == 0 { - // splits := strings.Split(s, ":") - // cseq, err = strconv.Atoi(strings.TrimSpace(splits[1])) - // if err != nil { - // err = fmt.Errorf("Atoi CSeq err. line:%s", s) - // return - // } - //} - if strings.Index(s, "Content-Length:") == 0 { - splits := strings.Split(s, ":") - contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) - if err != nil { - return - } - content := make([]byte, contentLen) - _, err = io.ReadFull(client.connRW, content) - if err != nil { - err = fmt.Errorf("Read content err.ContentLength:%d", contentLen) - return - } - body := string(content) - builder.Write(content) - resp = &Response{ - Body: body, - Status: status, - StatusCode: statusCode, - } - - fmt.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") - fmt.Println(builder.String()) - return - } - } - } - return -} - -/* - request(method, headers, _url) { - _url = _url || `${this.origin}${this.pathname}${this.search}`; - headers = headers || {}; - headers["User-Agent"] = "EasyDarwin"; - if(this.authHeaders) { - headers["Authorization"] = this.digest(method, _url); - } - if(this.session) { - headers["Session"] = this.session; - } - var cseq = ++this.cseq; - var req = `${method} ${_url} RTSP/1.0\r\n`; - req += `CSeq: ${cseq}\r\n`; - req += Object.keys(headers).map(header => { - return `${header}: ${headers[header]}\r\n` - }).join(""); - console.log(`[RTSP Client][${utils.formatDateTime()}] >>>>>> ${req}`); - this.socket.write(`${req}\r\n`); - - return new Promise((resolve, reject) => { - var timer; - var reqHeaders = headers; - var listener = (statusLine, headers, body) => { - if(headers["CSeq"] != cseq) { - console.log(`Bad RTSP CSeq [${headers["CSeq"]}], want[${cseq}]!`); - return; - } - timer && clearTimeout(timer); - this.removeListener("response", listener); - var code = statusLine.split(/\s/)[1]; - if(code == "200") { - resolve({ - headers: headers, - body: body - }) - return; - } - if(code == "401" && !this.authHeaders) { - var type = headers[WWW_AUTH].split(" ")[0]; - this.authHeaders = { - type: type - }; - var reg = new RegExp('([a-z]+)=\"([^,\s]+)\"',"g"); - var matchs = reg.exec(headers[WWW_AUTH]); - while(matchs) { - this.authHeaders[matchs[1]] = matchs[2]; - matchs = reg.exec(headers[WWW_AUTH]); - } - resolve(this.request(method, reqHeaders, _url)); - return; - } - reject(new Error(`Bad RTSP status code ${code}!`)); - return; - } - var timeout = utils.db.get("rtspClientTimeout").cloneDeep().value() || 5; - if(!isNaN(timeout)){ - timer = setTimeout(() => { - this.removeListener("response", listener); - reject(new Error(`${method} timeout`)); - }, timeout * 1000); - } - this.on("response", listener); - }) - } -*/ diff --git a/rtsp/pusher.go b/rtsp/pusher.go index 19192d2b..cfe0e766 100644 --- a/rtsp/pusher.go +++ b/rtsp/pusher.go @@ -10,7 +10,7 @@ import ( type Pusher struct { *Session - + *RTSPClient players map[string]*Player //SessionID <-> Player playersLock sync.RWMutex gopCacheEnable bool @@ -22,9 +22,130 @@ type Pusher struct { queue []*RTPPack } +func (pusher *Pusher) String() string { + if pusher.Session != nil { + return pusher.Session.String() + } + return pusher.RTSPClient.String() +} + +func (pusher *Pusher) Server() *Server { + if pusher.Session != nil { + return pusher.Session.Server + } + return pusher.RTSPClient.Server +} + +func (pusher *Pusher) SDPRaw() string { + if pusher.Session != nil { + return pusher.Session.SDPRaw + } + return pusher.RTSPClient.SDPRaw +} + +func (pusher *Pusher) Stoped() bool { + if pusher.Session != nil { + return pusher.Session.Stoped + } + return pusher.RTSPClient.Stoped +} + +func (pusher *Pusher) Path() string { + if pusher.Session != nil { + return pusher.Session.Path + } + return pusher.RTSPClient.Path +} + +func (pusher *Pusher) ID() string { + if pusher.Session != nil { + return pusher.Session.ID + } + return pusher.RTSPClient.ID +} + +func (pusher *Pusher) VCodec() string { + if pusher.Session != nil { + return pusher.Session.VCodec + } + return pusher.RTSPClient.VCodec +} + +func (pusher *Pusher) ACodec() string { + if pusher.Session != nil { + return pusher.Session.ACodec + } + return pusher.RTSPClient.ACodec +} + +func (pusher *Pusher) AControl() string { + if pusher.Session != nil { + return pusher.Session.AControl + } + return pusher.RTSPClient.AControl +} + +func (pusher *Pusher) VControl() string { + if pusher.Session != nil { + return pusher.Session.VControl + } + return pusher.RTSPClient.VControl +} + +func (pusher *Pusher) URL() string { + if pusher.Session != nil { + return pusher.Session.URL + } + return pusher.RTSPClient.URL +} + +func (pusher *Pusher) AddOutputBytes(size int) { + if pusher.Session != nil { + pusher.Session.OutBytes += size + return + } + pusher.RTSPClient.OutBytes += size +} + +func (pusher *Pusher) InBytes() int { + if pusher.Session != nil { + return pusher.Session.InBytes + } + return pusher.RTSPClient.InBytes +} + +func (pusher *Pusher) OutBytes() int { + if pusher.Session != nil { + return pusher.Session.OutBytes + } + return pusher.RTSPClient.OutBytes +} + +func NewClientPusher(client *RTSPClient) (pusher *Pusher) { + pusher = &Pusher{ + RTSPClient: client, + Session: nil, + players: make(map[string]*Player), + gopCacheEnable: utils.Conf().Section("rtsp").Key("gop_cache_enable").MustBool(true), + gopCache: make([]*RTPPack, 0), + + cond: sync.NewCond(&sync.Mutex{}), + queue: make([]*RTPPack, 0), + } + client.RTPHandles = append(client.RTPHandles, func(pack *RTPPack) { + pusher.QueueRTP(pack) + }) + client.StopHandles = append(client.StopHandles, func() { + pusher.Server().RemovePusher(pusher) + pusher.cond.Broadcast() + }) + return +} + func NewPusher(session *Session) (pusher *Pusher) { pusher = &Pusher{ Session: session, + RTSPClient: nil, players: make(map[string]*Player), gopCacheEnable: utils.Conf().Section("rtsp").Key("gop_cache_enable").MustBool(true), gopCache: make([]*RTPPack, 0), @@ -36,7 +157,7 @@ func NewPusher(session *Session) (pusher *Pusher) { pusher.QueueRTP(pack) }) session.StopHandles = append(session.StopHandles, func() { - pusher.Server.RemovePusher(pusher) + pusher.Server().RemovePusher(pusher) pusher.cond.Broadcast() if pusher.UDPServer != nil { pusher.UDPServer.Stop() @@ -55,7 +176,7 @@ func (pusher *Pusher) QueueRTP(pack *RTPPack) *Pusher { } func (pusher *Pusher) Start() { - for !pusher.Stoped { + for !pusher.Stoped() { var pack *RTPPack pusher.cond.L.Lock() if len(pusher.queue) == 0 { @@ -67,7 +188,7 @@ func (pusher *Pusher) Start() { } pusher.cond.L.Unlock() if pack == nil { - if !pusher.Stoped { + if !pusher.Stoped() { log.Printf("pusher not stoped, but queue take out nil pack") } continue @@ -75,12 +196,12 @@ func (pusher *Pusher) Start() { if pusher.gopCacheEnable { pusher.gopCacheLock.Lock() - if strings.EqualFold(pusher.VCodec, "h264") { + if strings.EqualFold(pusher.VCodec(), "h264") { if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && rtp.IsKeyframeStart() { pusher.gopCache = make([]*RTPPack, 0) } pusher.gopCache = append(pusher.gopCache, pack) - } else if strings.EqualFold(pusher.VCodec, "h265") { + } else if strings.EqualFold(pusher.VCodec(), "h265") { if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && rtp.IsKeyframeStartH265() { pusher.gopCache = make([]*RTPPack, 0) } @@ -96,7 +217,7 @@ func (pusher *Pusher) Start() { func (pusher *Pusher) BroadcastRTP(pack *RTPPack) *Pusher { for _, player := range pusher.GetPlayers() { player.QueueRTP(pack) - pusher.OutBytes += pack.Buffer.Len() + pusher.AddOutputBytes(pack.Buffer.Len()) } return pusher } @@ -116,7 +237,7 @@ func (pusher *Pusher) AddPlayer(player *Player) *Pusher { pusher.gopCacheLock.RLock() for _, pack := range pusher.gopCache { player.QueueRTP(pack) - pusher.OutBytes += pack.Buffer.Len() + pusher.AddOutputBytes(pack.Buffer.Len()) } pusher.gopCacheLock.RUnlock() } diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go new file mode 100644 index 00000000..1b9ee2cf --- /dev/null +++ b/rtsp/rtsp-client.go @@ -0,0 +1,465 @@ +package rtsp + +import ( + "bufio" + "bytes" + "encoding/binary" + "fmt" + "io" + "log" + "net" + "net/url" + "strconv" + "strings" + "time" + + "github.com/pixelbender/go-sdp/sdp" + "github.com/reactivex/rxgo/observable" +) + +type RTSPClient struct { + Server *Server + Stoped bool + Status string + URL string + Path string + ID string + Conn net.Conn + AuthHeaders bool + Session *string + Seq int + connRW *bufio.ReadWriter + InBytes int + OutBytes int + Sdp *sdp.Session + AControl string + VControl string + ACodec string + VCodec string + OptionIntervalMillis int64 + SDPRaw string + + //tcp channels + aRTPChannel int + aRTPControlChannel int + vRTPChannel int + vRTPControlChannel int + + RTPHandles []func(*RTPPack) + StopHandles []func() +} + +func (client *RTSPClient) String() string { + return fmt.Sprintf("client[%s]", client.URL) +} + +func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64) *RTSPClient { + url, err := url.Parse(rawUrl) + if err != nil { + return nil + } + client := &RTSPClient{ + Server: server, + Stoped: false, + URL: rawUrl, + ID: url.Path, + Path: url.Path, + vRTPChannel: 0, + vRTPControlChannel: 1, + aRTPChannel: 2, + aRTPControlChannel: 3, + OptionIntervalMillis: sendOptionMillis, + } + return client +} + +func (client *RTSPClient) Start() observable.Observable { + source := make(chan interface{}) + requestStream := func() interface{} { + l, err := url.Parse(client.URL) + setStatus := func() { + if err != nil { + client.Status = "Error" + } else { + client.Status = "OK" + } + } + defer setStatus() + if err != nil { + return err + } + conn, err := net.Dial("tcp", l.Hostname()+":"+l.Port()) + if err != nil { + // handle error + return err + } + client.Conn = conn + client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(conn, 10240), bufio.NewWriterSize(conn, 10240)) + + headers := make(map[string]string) + headers["Require"] = "implicit-play" + // An OPTIONS request returns the request types the server will accept. + resp, err := client.Request("OPTIONS", headers) + if err != nil { + return err + } + + // A DESCRIBE request includes an RTSP URL (rtsp://...), and the type of reply data that can be handled. This reply includes the presentation description, + // typically in Session Description Protocol (SDP) format. Among other things, the presentation description lists the media streams controlled with the aggregate URL. + // In the typical case, there is one media stream each for audio and video. + headers = make(map[string]string) + headers["Accept"] = "application/sdp" + resp, err = client.Request("DESCRIBE", headers) + if err != nil { + return err + } + + sess, err := sdp.ParseString(resp.Body) + if err != nil { + return err + } + client.Sdp = sess + client.SDPRaw = resp.Body + for _, media := range sess.Media { + switch media.Type { + case "video": + client.VControl = media.Attributes.Get("control") + client.VCodec = media.Formats[0].Name + var _url = "" + if strings.Index(strings.ToLower(client.VControl), "rtsp://") == 0 { + _url = client.VControl + } else { + _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.VControl, "/") + } + headers = make(map[string]string) + headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel) + resp, err = client.RequestWithPath("SETUP", _url, headers, true) + if err != nil { + return err + } + case "audio": + client.AControl = media.Attributes.Get("control") + client.VCodec = media.Formats[0].Name + var _url = "" + if strings.Index(strings.ToLower(client.AControl), "rtsp://") == 0 { + _url = client.AControl + } else { + _url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.AControl, "/") + } + headers = make(map[string]string) + headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel) + resp, err = client.RequestWithPath("SETUP", _url, headers, true) + if err != nil { + return err + } + } + } + headers = make(map[string]string) + resp, err = client.Request("PLAY", headers) + if err != nil { + return err + } + return 0 + } + stream := func(ch chan interface{}) { + OptionIntervalMillis := client.OptionIntervalMillis + startTime := time.Now() + loggerTime := time.Now().Add(-10 * time.Second) + defer func() { + if client.Stoped { + close(ch) + } + }() + for !client.Stoped { + if OptionIntervalMillis > 0 { + elapse := time.Now().Sub(startTime) + if elapse > time.Duration(OptionIntervalMillis*int64(time.Millisecond)) { + startTime = time.Now() + headers := make(map[string]string) + headers["Require"] = "implicit-play" + // An OPTIONS request returns the request types the server will accept. + if err := client.RequestNoResp("OPTIONS", headers); err != nil { + // ignore... + //ch <- err + //return + } + } + } + b, err := client.connRW.ReadByte() + if err != nil { + if !client.Stoped { + log.Printf("client.connRW.ReadByte err:%v", err) + ch <- err + } + return + } + switch b { + case 0x24: // rtp + header := make([]byte, 4) + header[0] = b + _, err := io.ReadFull(client.connRW, header[1:]) + if err != nil { + + if !client.Stoped { + ch <- err + log.Printf("io.ReadFull err:%v", err) + } + return + } + channel := int(header[1]) + length := binary.BigEndian.Uint16(header[2:]) + content := make([]byte, length) + _, err = io.ReadFull(client.connRW, content) + if err != nil { + if !client.Stoped { + ch <- err + log.Printf("io.ReadFull err:%v", err) + } + return + } + //ch <- append(header, content...) + rtpBuf := bytes.NewBuffer(content) + var pack *RTPPack + switch channel { + case client.aRTPChannel: + pack = &RTPPack{ + Type: RTP_TYPE_AUDIO, + Buffer: rtpBuf, + } + case client.aRTPControlChannel: + pack = &RTPPack{ + Type: RTP_TYPE_AUDIOCONTROL, + Buffer: rtpBuf, + } + case client.vRTPChannel: + pack = &RTPPack{ + Type: RTP_TYPE_VIDEO, + Buffer: rtpBuf, + } + case client.vRTPControlChannel: + pack = &RTPPack{ + Type: RTP_TYPE_VIDEOCONTROL, + Buffer: rtpBuf, + } + default: + log.Printf("unknow rtp pack type, channel:%v", channel) + continue + } + if pack == nil { + log.Printf("session tcp got nil rtp pack") + continue + } + elapsed := time.Now().Sub(loggerTime) + if elapsed >= 10*time.Second { + log.Printf("client[%v]read rtp frame.", client) + loggerTime = time.Now() + } + client.InBytes += int(length + 4) + for _, h := range client.RTPHandles { + h(pack) + } + + default: // rtsp + builder := strings.Builder{} + builder.WriteByte(b) + contentLen := 0 + for !client.Stoped { + line, prefix, err := client.connRW.ReadLine() + if err != nil { + if !client.Stoped { + ch <- err + log.Printf("client.connRW.ReadLine err:%v", err) + } + return + } + if len(line) == 0 { + if contentLen != 0 { + content := make([]byte, contentLen) + _, err = io.ReadFull(client.connRW, content) + if err != nil { + if !client.Stoped { + err = fmt.Errorf("Read content err.ContentLength:%d", contentLen) + ch <- err + } + return + } + builder.Write(content) + } + log.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") + log.Println(builder.String()) + break + } + s := string(line) + builder.Write(line) + if !prefix { + builder.WriteString("\r\n") + } + + if strings.Index(s, "Content-Length:") == 0 { + splits := strings.Split(s, ":") + contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) + if err != nil { + if !client.Stoped { + ch <- err + log.Printf("strconv.Atoi err:%v, str:%v", err, splits[1]) + } + return + } + } + } + } + } + } + go func() { + r := requestStream() + source <- r + switch r.(type) { + case error: + return + } + stream(source) + }() + return observable.Observable(source) + + //return observable.Just(1) +} + +func (client *RTSPClient) Stop() { + if client.Stoped { + return + } + client.Stoped = true + for _, h := range client.StopHandles { + h() + } + if client.Conn != nil { + client.connRW.Flush() + client.Conn.Close() + client.Conn = nil + } +} + +func (client *RTSPClient) RequestWithPath(method string, path string, headers map[string]string, needResp bool) (resp *Response, err error) { + headers["User-Agent"] = "EasyDarwinGo" + if client.AuthHeaders { + //headers["Authorization"] = this.digest(method, _url); + } + if client.Session != nil { + headers["Session"] = *client.Session + } + client.Seq++ + cseq := client.Seq + builder := strings.Builder{} + builder.WriteString(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, path)) + builder.WriteString(fmt.Sprintf("CSeq: %d\r\n", cseq)) + for k, v := range headers { + builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v)) + } + builder.WriteString(fmt.Sprintf("\r\n")) + s := builder.String() + log.Println("C->S >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + log.Println(s) + _, err = client.connRW.WriteString(s) + if err != nil { + return + } + client.connRW.Flush() + + if !needResp { + return nil, nil + } + lineCount := 0 + statusCode := 200 + status := "" + sid := "" + contentLen := 0 + respHeader := make(map[string]string) + var line []byte + builder.Reset() + for !client.Stoped { + isPrefix := false + if line, isPrefix, err = client.connRW.ReadLine(); err != nil { + return + } else { + if len(line) == 0 { + body := "" + if contentLen > 0 { + content := make([]byte, contentLen) + _, err = io.ReadFull(client.connRW, content) + if err != nil { + err = fmt.Errorf("Read content err.ContentLength:%d", contentLen) + return + } + body = string(content) + builder.Write(content) + } + resp = NewResponse(statusCode, status, strconv.Itoa(cseq), sid, body) + + log.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") + log.Println(builder.String()) + return + } + s := string(line) + builder.Write(line) + if !isPrefix { + builder.WriteString("\r\n") + } + + if lineCount == 0 { + splits := strings.Split(s, " ") + if len(splits) < 3 { + err = fmt.Errorf("StatusCode Line error:%s", s) + return + } + statusCode, err = strconv.Atoi(splits[1]) + if err != nil { + return + } + if statusCode != 200 { + err = fmt.Errorf("Response StatusCode is :%d", statusCode) + return + } + status = splits[2] + } + lineCount++ + splits := strings.Split(s, ":") + if len(splits) == 2 { + respHeader[splits[0]] = strings.TrimSpace(splits[1]) + } + if strings.Index(s, "Session:") == 0 { + splits := strings.Split(s, ":") + sid = strings.TrimSpace(splits[1]) + } + //if strings.Index(s, "CSeq:") == 0 { + // splits := strings.Split(s, ":") + // cseq, err = strconv.Atoi(strings.TrimSpace(splits[1])) + // if err != nil { + // err = fmt.Errorf("Atoi CSeq err. line:%s", s) + // return + // } + //} + if strings.Index(s, "Content-Length:") == 0 { + splits := strings.Split(s, ":") + contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) + if err != nil { + return + } + } + } + } + if client.Stoped { + err = fmt.Errorf("Client Stoped.") + } + return +} + +func (client *RTSPClient) Request(method string, headers map[string]string) (resp *Response, err error) { + return client.RequestWithPath(method, client.URL, headers, true) +} + +func (client *RTSPClient) RequestNoResp(method string, headers map[string]string) (err error) { + if _, err := client.RequestWithPath(method, client.URL, headers, false); err != nil { + return err + } + return nil +} diff --git a/rtsp/rtsp-server.go b/rtsp/rtsp-server.go index 6c7250f6..892b5740 100644 --- a/rtsp/rtsp-server.go +++ b/rtsp/rtsp-server.go @@ -4,23 +4,32 @@ import ( "fmt" "log" "net" + "os" + "os/exec" + "path" "sync" + "syscall" + "time" "github.com/penggy/EasyGoLib/utils" ) type Server struct { - TCPListener *net.TCPListener - TCPPort int - Stoped bool - pushers map[string]*Pusher // Path <-> Pusher - pushersLock sync.RWMutex + TCPListener *net.TCPListener + TCPPort int + Stoped bool + pushers map[string]*Pusher // Path <-> Pusher + pushersLock sync.RWMutex + addPusherCh chan *Pusher + removePusherCh chan *Pusher } var Instance *Server = &Server{ - Stoped: true, - TCPPort: utils.Conf().Section("rtsp").Key("port").MustInt(554), - pushers: make(map[string]*Pusher), + Stoped: true, + TCPPort: utils.Conf().Section("rtsp").Key("port").MustInt(554), + pushers: make(map[string]*Pusher), + addPusherCh: make(chan *Pusher), + removePusherCh: make(chan *Pusher), } func GetServer() *Server { @@ -37,6 +46,80 @@ func (server *Server) Start() (err error) { return } + localRecord := utils.Conf().Section("rtsp").Key("save_stream_to_mp4").MustInt(0) + ffmpeg := utils.Conf().Section("rtsp").Key("ffmpeg_path").MustString("") + mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("") + SaveStreamToLocal := false + if (len(ffmpeg) > 0) && localRecord > 0 && len(mp4Path) > 0 { + err := utils.EnsureDir(mp4Path) + if err != nil { + log.Printf("Create mp4_dir_path[%s] err:%v.", mp4Path, err) + } else { + SaveStreamToLocal = true + } + } + go func() { // save to local. + pusher2ffmpegMap := make(map[*Pusher]*exec.Cmd) + if SaveStreamToLocal { + log.Printf("Prepare to save stream to local....") + defer log.Printf("End save stream to local....") + } + var pusher *Pusher + addChnOk := true + removeChnOk := true + for addChnOk || removeChnOk { + select { + case pusher, addChnOk = <-server.addPusherCh: + if SaveStreamToLocal { + if addChnOk { + dir := path.Join(mp4Path, pusher.Path()) + err := utils.EnsureDir(dir) + if err != nil { + log.Printf("EnsureDir:[%s] err:%v.", dir, err) + continue + } + path := path.Join(dir, fmt.Sprintf("%s.mp4", time.Now().Format("20060102150405"))) + cmd := exec.Command(ffmpeg, "-i", pusher.URL(), "-c:v", "copy", "-c:a", "copy", path) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err = cmd.Start() + if err != nil { + log.Printf("Start ffmpeg err:%v", err) + } + pusher2ffmpegMap[pusher] = cmd + log.Printf("add ffmpeg to pull stream from pusher[%v]", pusher) + } else { + log.Printf("addPusherChan closed") + } + } + case pusher, removeChnOk = <-server.removePusherCh: + if SaveStreamToLocal { + if removeChnOk { + cmd := pusher2ffmpegMap[pusher] + proc := cmd.Process + if proc != nil { + log.Printf("prepare to SIGTERM to process:%v", proc) + proc.Signal(syscall.SIGTERM) + // proc.Kill() + } + delete(pusher2ffmpegMap, pusher) + log.Printf("delete ffmpeg from pull stream from pusher[%v]", pusher) + } else { + for _, cmd := range pusher2ffmpegMap { + proc := cmd.Process + if proc != nil { + log.Printf("prepare to SIGTERM to process:%v", proc) + proc.Signal(syscall.SIGTERM) + } + } + pusher2ffmpegMap = make(map[*Pusher]*exec.Cmd) + log.Printf("removePusherChan closed") + } + } + } + } + }() + server.Stoped = false server.TCPListener = listener log.Println("rtsp server start on", server.TCPPort) @@ -69,25 +152,38 @@ func (server *Server) Stop() { server.pushersLock.Lock() server.pushers = make(map[string]*Pusher) server.pushersLock.Unlock() + + close(server.addPusherCh) + close(server.removePusherCh) } func (server *Server) AddPusher(pusher *Pusher) { + added := false server.pushersLock.Lock() - if _, ok := server.pushers[pusher.Path]; !ok { - server.pushers[pusher.Path] = pusher + if _, ok := server.pushers[pusher.Path()]; !ok { + server.pushers[pusher.Path()] = pusher go pusher.Start() log.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers)) + added = true } server.pushersLock.Unlock() + if added { + server.addPusherCh <- pusher + } } func (server *Server) RemovePusher(pusher *Pusher) { + removed := false server.pushersLock.Lock() - if _pusher, ok := server.pushers[pusher.Path]; ok && pusher.ID == _pusher.ID { - delete(server.pushers, pusher.Path) + if _pusher, ok := server.pushers[pusher.Path()]; ok && pusher.ID() == _pusher.ID() { + delete(server.pushers, pusher.Path()) log.Printf("%v end, now pusher size[%d]\n", pusher, len(server.pushers)) + removed = true } server.pushersLock.Unlock() + if removed { + server.removePusherCh <- pusher + } } func (server *Server) GetPusher(path string) (pusher *Pusher) { diff --git a/rtsp/rtsp-session.go b/rtsp/rtsp-session.go index e48803d7..1393d8a0 100644 --- a/rtsp/rtsp-session.go +++ b/rtsp/rtsp-session.go @@ -349,11 +349,11 @@ func (session *Session) handleRequest(req *Request) { } session.Player = NewPlayer(session, pusher) session.Pusher = pusher - session.AControl = pusher.AControl - session.VControl = pusher.VControl - session.ACodec = pusher.ACodec - session.VCodec = pusher.VCodec - res.SetBody(session.Pusher.SDPRaw) + session.AControl = pusher.AControl() + session.VControl = pusher.VControl() + session.ACodec = pusher.ACodec() + session.VCodec = pusher.VCodec() + res.SetBody(session.Pusher.SDPRaw()) case "SETUP": ts := req.Header["Transport"] control := req.URL[strings.LastIndex(req.URL, "/")+1:]