support pull and distribute;支持拉流转发

support save stream to file;支持存储流到本地
pull/132/head
macbookpro 2018-11-17 10:23:00 +08:00
parent 82dd483566
commit 909ffa0f2b
6 changed files with 710 additions and 292 deletions

View File

@ -50,7 +50,7 @@ func (h *APIHandler) Pushers(c *gin.Context) {
hostname := utils.GetRequestHostname(c.Request)
pushers := make([]interface{}, 0)
for _, pusher := range rtsp.Instance.GetPushers() {
port := pusher.Server.TCPPort
port := pusher.Server().TCPPort
rtsp := fmt.Sprintf("rtsp://%s:%d%s", hostname, port, pusher.Path)
if port == 554 {
rtsp = fmt.Sprintf("rtsp://%s%s", hostname, pusher.Path)
@ -62,8 +62,8 @@ func (h *APIHandler) Pushers(c *gin.Context) {
"id": pusher.ID,
"path": rtsp,
"transType": pusher.TransType.String(),
"inBytes": pusher.InBytes,
"outBytes": pusher.OutBytes,
"inBytes": pusher.InBytes(),
"outBytes": pusher.OutBytes(),
"startAt": utils.DateTime(pusher.StartAt),
"onlines": len(pusher.GetPlayers()),
})

View File

@ -1,264 +0,0 @@
package rtsp
import (
"bufio"
"fmt"
"github.com/reactivex/rxgo/observable"
"io"
"net"
"net/url"
"strconv"
"strings"
)
type PlayerClient struct {
Stoped bool
Path string
Conn *net.Conn
AuthHeaders bool
Session *string
Seq int
connRW *bufio.ReadWriter
InBytes uint64
}
func NewPlayerClient(path string) *PlayerClient {
session := &PlayerClient{
Stoped: false,
Path: path,
}
return session
}
func (client *PlayerClient) Start() observable.Observable {
return observable.Start(func() interface{} {
l, err := url.Parse(client.Path)
if err != nil {
return err
}
conn, err := net.Dial("tcp", l.Hostname()+":"+l.Port())
if err != nil {
// handle error
return err
}
client.Conn = &conn
client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(conn, 10240), bufio.NewWriterSize(conn, 10240))
headers := make(map[string]string)
headers["Require"] = "implicit-play"
// An OPTIONS request returns the request types the server will accept.
resp, err := client.Request("OPTIONS", headers)
if err != nil {
return err
}
fmt.Println("StatusCode:", resp.StatusCode)
// A DESCRIBE request includes an RTSP URL (rtsp://...), and the type of reply data that can be handled. This reply includes the presentation description,
// typically in Session Description Protocol (SDP) format. Among other things, the presentation description lists the media streams controlled with the aggregate URL.
// In the typical case, there is one media stream each for audio and video.
headers = make(map[string]string)
headers["Accept"] = "application/sdp"
resp, err = client.Request("DESCRIBE", headers)
if err != nil {
return err
}
//fmt.Println("StatusCode:",resp.StatusCode)
headers = make(map[string]string)
headers["Transport"] = "RTP/AVP;unicast;client_port=8000-8001"
resp, err = client.Request("SETUP", headers)
if err != nil {
return err
}
//fmt.Println("StatusCode:",resp.StatusCode)
////fmt.Fprintf(conn, "GET / HTTP/1.0\r\n\r\n")
////status, err := bufio.NewReader(conn).ReadString('\n')
////url.Host
////text, err := reader.ReadString('\n')
////if err != nil {
//// return err
////}
//return text
return 0
})
//return observable.Just(1)
}
func (client *PlayerClient) Request(method string, headers map[string]string) (resp *Response, err error) {
headers["User-Agent"] = "EasyDarwinGo"
if client.AuthHeaders {
//headers["Authorization"] = this.digest(method, _url);
}
if client.Session != nil {
headers["Session"] = *client.Session
}
client.Seq++
cseq := client.Seq
builder := strings.Builder{}
builder.WriteString(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, client.Path))
builder.WriteString(fmt.Sprintf("CSeq: %d\r\n", cseq))
for k, v := range headers {
builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
}
builder.WriteString(fmt.Sprintf("\r\n"))
s := builder.String()
fmt.Println("C->S >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
fmt.Println(s)
_, err = client.connRW.WriteString(s)
if err != nil {
return
}
client.connRW.Flush()
lineCount := 0
statusCode := 200
status := ""
sid := ""
contentLen := 0
respHeader := make(map[string]string)
var line []byte
builder.Reset()
for !client.Stoped {
if line, _, err = client.connRW.ReadLine(); err != nil {
return
} else {
if len(line) == 0 {
fmt.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
fmt.Println(builder.String())
resp = NewResponse(statusCode, status, strconv.Itoa(cseq), sid, "")
return
}
s := string(line)
builder.Write(line)
builder.WriteString("\r\n")
if lineCount == 0 {
splits := strings.Split(s, " ")
if len(splits) < 3 {
err = fmt.Errorf("StatusCode Line error:%s", s)
return
}
statusCode, err = strconv.Atoi(splits[1])
if err != nil {
return
}
if statusCode != 200 {
err = fmt.Errorf("Response StatusCode is :%d", statusCode)
return
}
status = splits[2]
}
lineCount++
splits := strings.Split(s, ":")
if len(splits) == 2 {
respHeader[splits[0]] = strings.TrimSpace(splits[1])
}
if strings.Index(s, "Session:") == 0 {
splits := strings.Split(s, ":")
sid = strings.TrimSpace(splits[1])
}
//if strings.Index(s, "CSeq:") == 0 {
// splits := strings.Split(s, ":")
// cseq, err = strconv.Atoi(strings.TrimSpace(splits[1]))
// if err != nil {
// err = fmt.Errorf("Atoi CSeq err. line:%s", s)
// return
// }
//}
if strings.Index(s, "Content-Length:") == 0 {
splits := strings.Split(s, ":")
contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1]))
if err != nil {
return
}
content := make([]byte, contentLen)
_, err = io.ReadFull(client.connRW, content)
if err != nil {
err = fmt.Errorf("Read content err.ContentLength:%d", contentLen)
return
}
body := string(content)
builder.Write(content)
resp = &Response{
Body: body,
Status: status,
StatusCode: statusCode,
}
fmt.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
fmt.Println(builder.String())
return
}
}
}
return
}
/*
request(method, headers, _url) {
_url = _url || `${this.origin}${this.pathname}${this.search}`;
headers = headers || {};
headers["User-Agent"] = "EasyDarwin";
if(this.authHeaders) {
headers["Authorization"] = this.digest(method, _url);
}
if(this.session) {
headers["Session"] = this.session;
}
var cseq = ++this.cseq;
var req = `${method} ${_url} RTSP/1.0\r\n`;
req += `CSeq: ${cseq}\r\n`;
req += Object.keys(headers).map(header => {
return `${header}: ${headers[header]}\r\n`
}).join("");
console.log(`[RTSP Client][${utils.formatDateTime()}] >>>>>> ${req}`);
this.socket.write(`${req}\r\n`);
return new Promise((resolve, reject) => {
var timer;
var reqHeaders = headers;
var listener = (statusLine, headers, body) => {
if(headers["CSeq"] != cseq) {
console.log(`Bad RTSP CSeq [${headers["CSeq"]}], want[${cseq}]!`);
return;
}
timer && clearTimeout(timer);
this.removeListener("response", listener);
var code = statusLine.split(/\s/)[1];
if(code == "200") {
resolve({
headers: headers,
body: body
})
return;
}
if(code == "401" && !this.authHeaders) {
var type = headers[WWW_AUTH].split(" ")[0];
this.authHeaders = {
type: type
};
var reg = new RegExp('([a-z]+)=\"([^,\s]+)\"',"g");
var matchs = reg.exec(headers[WWW_AUTH]);
while(matchs) {
this.authHeaders[matchs[1]] = matchs[2];
matchs = reg.exec(headers[WWW_AUTH]);
}
resolve(this.request(method, reqHeaders, _url));
return;
}
reject(new Error(`Bad RTSP status code ${code}!`));
return;
}
var timeout = utils.db.get("rtspClientTimeout").cloneDeep().value() || 5;
if(!isNaN(timeout)){
timer = setTimeout(() => {
this.removeListener("response", listener);
reject(new Error(`${method} timeout`));
}, timeout * 1000);
}
this.on("response", listener);
})
}
*/

View File

@ -10,7 +10,7 @@ import (
type Pusher struct {
*Session
*RTSPClient
players map[string]*Player //SessionID <-> Player
playersLock sync.RWMutex
gopCacheEnable bool
@ -22,9 +22,130 @@ type Pusher struct {
queue []*RTPPack
}
func (pusher *Pusher) String() string {
if pusher.Session != nil {
return pusher.Session.String()
}
return pusher.RTSPClient.String()
}
func (pusher *Pusher) Server() *Server {
if pusher.Session != nil {
return pusher.Session.Server
}
return pusher.RTSPClient.Server
}
func (pusher *Pusher) SDPRaw() string {
if pusher.Session != nil {
return pusher.Session.SDPRaw
}
return pusher.RTSPClient.SDPRaw
}
func (pusher *Pusher) Stoped() bool {
if pusher.Session != nil {
return pusher.Session.Stoped
}
return pusher.RTSPClient.Stoped
}
func (pusher *Pusher) Path() string {
if pusher.Session != nil {
return pusher.Session.Path
}
return pusher.RTSPClient.Path
}
func (pusher *Pusher) ID() string {
if pusher.Session != nil {
return pusher.Session.ID
}
return pusher.RTSPClient.ID
}
func (pusher *Pusher) VCodec() string {
if pusher.Session != nil {
return pusher.Session.VCodec
}
return pusher.RTSPClient.VCodec
}
func (pusher *Pusher) ACodec() string {
if pusher.Session != nil {
return pusher.Session.ACodec
}
return pusher.RTSPClient.ACodec
}
func (pusher *Pusher) AControl() string {
if pusher.Session != nil {
return pusher.Session.AControl
}
return pusher.RTSPClient.AControl
}
func (pusher *Pusher) VControl() string {
if pusher.Session != nil {
return pusher.Session.VControl
}
return pusher.RTSPClient.VControl
}
func (pusher *Pusher) URL() string {
if pusher.Session != nil {
return pusher.Session.URL
}
return pusher.RTSPClient.URL
}
func (pusher *Pusher) AddOutputBytes(size int) {
if pusher.Session != nil {
pusher.Session.OutBytes += size
return
}
pusher.RTSPClient.OutBytes += size
}
func (pusher *Pusher) InBytes() int {
if pusher.Session != nil {
return pusher.Session.InBytes
}
return pusher.RTSPClient.InBytes
}
func (pusher *Pusher) OutBytes() int {
if pusher.Session != nil {
return pusher.Session.OutBytes
}
return pusher.RTSPClient.OutBytes
}
func NewClientPusher(client *RTSPClient) (pusher *Pusher) {
pusher = &Pusher{
RTSPClient: client,
Session: nil,
players: make(map[string]*Player),
gopCacheEnable: utils.Conf().Section("rtsp").Key("gop_cache_enable").MustBool(true),
gopCache: make([]*RTPPack, 0),
cond: sync.NewCond(&sync.Mutex{}),
queue: make([]*RTPPack, 0),
}
client.RTPHandles = append(client.RTPHandles, func(pack *RTPPack) {
pusher.QueueRTP(pack)
})
client.StopHandles = append(client.StopHandles, func() {
pusher.Server().RemovePusher(pusher)
pusher.cond.Broadcast()
})
return
}
func NewPusher(session *Session) (pusher *Pusher) {
pusher = &Pusher{
Session: session,
RTSPClient: nil,
players: make(map[string]*Player),
gopCacheEnable: utils.Conf().Section("rtsp").Key("gop_cache_enable").MustBool(true),
gopCache: make([]*RTPPack, 0),
@ -36,7 +157,7 @@ func NewPusher(session *Session) (pusher *Pusher) {
pusher.QueueRTP(pack)
})
session.StopHandles = append(session.StopHandles, func() {
pusher.Server.RemovePusher(pusher)
pusher.Server().RemovePusher(pusher)
pusher.cond.Broadcast()
if pusher.UDPServer != nil {
pusher.UDPServer.Stop()
@ -55,7 +176,7 @@ func (pusher *Pusher) QueueRTP(pack *RTPPack) *Pusher {
}
func (pusher *Pusher) Start() {
for !pusher.Stoped {
for !pusher.Stoped() {
var pack *RTPPack
pusher.cond.L.Lock()
if len(pusher.queue) == 0 {
@ -67,7 +188,7 @@ func (pusher *Pusher) Start() {
}
pusher.cond.L.Unlock()
if pack == nil {
if !pusher.Stoped {
if !pusher.Stoped() {
log.Printf("pusher not stoped, but queue take out nil pack")
}
continue
@ -75,12 +196,12 @@ func (pusher *Pusher) Start() {
if pusher.gopCacheEnable {
pusher.gopCacheLock.Lock()
if strings.EqualFold(pusher.VCodec, "h264") {
if strings.EqualFold(pusher.VCodec(), "h264") {
if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && rtp.IsKeyframeStart() {
pusher.gopCache = make([]*RTPPack, 0)
}
pusher.gopCache = append(pusher.gopCache, pack)
} else if strings.EqualFold(pusher.VCodec, "h265") {
} else if strings.EqualFold(pusher.VCodec(), "h265") {
if rtp := ParseRTP(pack.Buffer.Bytes()); rtp != nil && rtp.IsKeyframeStartH265() {
pusher.gopCache = make([]*RTPPack, 0)
}
@ -96,7 +217,7 @@ func (pusher *Pusher) Start() {
func (pusher *Pusher) BroadcastRTP(pack *RTPPack) *Pusher {
for _, player := range pusher.GetPlayers() {
player.QueueRTP(pack)
pusher.OutBytes += pack.Buffer.Len()
pusher.AddOutputBytes(pack.Buffer.Len())
}
return pusher
}
@ -116,7 +237,7 @@ func (pusher *Pusher) AddPlayer(player *Player) *Pusher {
pusher.gopCacheLock.RLock()
for _, pack := range pusher.gopCache {
player.QueueRTP(pack)
pusher.OutBytes += pack.Buffer.Len()
pusher.AddOutputBytes(pack.Buffer.Len())
}
pusher.gopCacheLock.RUnlock()
}

465
rtsp/rtsp-client.go Normal file
View File

@ -0,0 +1,465 @@
package rtsp
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"log"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/pixelbender/go-sdp/sdp"
"github.com/reactivex/rxgo/observable"
)
type RTSPClient struct {
Server *Server
Stoped bool
Status string
URL string
Path string
ID string
Conn net.Conn
AuthHeaders bool
Session *string
Seq int
connRW *bufio.ReadWriter
InBytes int
OutBytes int
Sdp *sdp.Session
AControl string
VControl string
ACodec string
VCodec string
OptionIntervalMillis int64
SDPRaw string
//tcp channels
aRTPChannel int
aRTPControlChannel int
vRTPChannel int
vRTPControlChannel int
RTPHandles []func(*RTPPack)
StopHandles []func()
}
func (client *RTSPClient) String() string {
return fmt.Sprintf("client[%s]", client.URL)
}
func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64) *RTSPClient {
url, err := url.Parse(rawUrl)
if err != nil {
return nil
}
client := &RTSPClient{
Server: server,
Stoped: false,
URL: rawUrl,
ID: url.Path,
Path: url.Path,
vRTPChannel: 0,
vRTPControlChannel: 1,
aRTPChannel: 2,
aRTPControlChannel: 3,
OptionIntervalMillis: sendOptionMillis,
}
return client
}
func (client *RTSPClient) Start() observable.Observable {
source := make(chan interface{})
requestStream := func() interface{} {
l, err := url.Parse(client.URL)
setStatus := func() {
if err != nil {
client.Status = "Error"
} else {
client.Status = "OK"
}
}
defer setStatus()
if err != nil {
return err
}
conn, err := net.Dial("tcp", l.Hostname()+":"+l.Port())
if err != nil {
// handle error
return err
}
client.Conn = conn
client.connRW = bufio.NewReadWriter(bufio.NewReaderSize(conn, 10240), bufio.NewWriterSize(conn, 10240))
headers := make(map[string]string)
headers["Require"] = "implicit-play"
// An OPTIONS request returns the request types the server will accept.
resp, err := client.Request("OPTIONS", headers)
if err != nil {
return err
}
// A DESCRIBE request includes an RTSP URL (rtsp://...), and the type of reply data that can be handled. This reply includes the presentation description,
// typically in Session Description Protocol (SDP) format. Among other things, the presentation description lists the media streams controlled with the aggregate URL.
// In the typical case, there is one media stream each for audio and video.
headers = make(map[string]string)
headers["Accept"] = "application/sdp"
resp, err = client.Request("DESCRIBE", headers)
if err != nil {
return err
}
sess, err := sdp.ParseString(resp.Body)
if err != nil {
return err
}
client.Sdp = sess
client.SDPRaw = resp.Body
for _, media := range sess.Media {
switch media.Type {
case "video":
client.VControl = media.Attributes.Get("control")
client.VCodec = media.Formats[0].Name
var _url = ""
if strings.Index(strings.ToLower(client.VControl), "rtsp://") == 0 {
_url = client.VControl
} else {
_url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.VControl, "/")
}
headers = make(map[string]string)
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.vRTPChannel, client.vRTPControlChannel)
resp, err = client.RequestWithPath("SETUP", _url, headers, true)
if err != nil {
return err
}
case "audio":
client.AControl = media.Attributes.Get("control")
client.VCodec = media.Formats[0].Name
var _url = ""
if strings.Index(strings.ToLower(client.AControl), "rtsp://") == 0 {
_url = client.AControl
} else {
_url = strings.TrimRight(client.URL, "/") + "/" + strings.TrimLeft(client.AControl, "/")
}
headers = make(map[string]string)
headers["Transport"] = fmt.Sprintf("RTP/AVP/TCP;unicast;interleaved=%d-%d", client.aRTPChannel, client.aRTPControlChannel)
resp, err = client.RequestWithPath("SETUP", _url, headers, true)
if err != nil {
return err
}
}
}
headers = make(map[string]string)
resp, err = client.Request("PLAY", headers)
if err != nil {
return err
}
return 0
}
stream := func(ch chan interface{}) {
OptionIntervalMillis := client.OptionIntervalMillis
startTime := time.Now()
loggerTime := time.Now().Add(-10 * time.Second)
defer func() {
if client.Stoped {
close(ch)
}
}()
for !client.Stoped {
if OptionIntervalMillis > 0 {
elapse := time.Now().Sub(startTime)
if elapse > time.Duration(OptionIntervalMillis*int64(time.Millisecond)) {
startTime = time.Now()
headers := make(map[string]string)
headers["Require"] = "implicit-play"
// An OPTIONS request returns the request types the server will accept.
if err := client.RequestNoResp("OPTIONS", headers); err != nil {
// ignore...
//ch <- err
//return
}
}
}
b, err := client.connRW.ReadByte()
if err != nil {
if !client.Stoped {
log.Printf("client.connRW.ReadByte err:%v", err)
ch <- err
}
return
}
switch b {
case 0x24: // rtp
header := make([]byte, 4)
header[0] = b
_, err := io.ReadFull(client.connRW, header[1:])
if err != nil {
if !client.Stoped {
ch <- err
log.Printf("io.ReadFull err:%v", err)
}
return
}
channel := int(header[1])
length := binary.BigEndian.Uint16(header[2:])
content := make([]byte, length)
_, err = io.ReadFull(client.connRW, content)
if err != nil {
if !client.Stoped {
ch <- err
log.Printf("io.ReadFull err:%v", err)
}
return
}
//ch <- append(header, content...)
rtpBuf := bytes.NewBuffer(content)
var pack *RTPPack
switch channel {
case client.aRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIO,
Buffer: rtpBuf,
}
case client.aRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIOCONTROL,
Buffer: rtpBuf,
}
case client.vRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEO,
Buffer: rtpBuf,
}
case client.vRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEOCONTROL,
Buffer: rtpBuf,
}
default:
log.Printf("unknow rtp pack type, channel:%v", channel)
continue
}
if pack == nil {
log.Printf("session tcp got nil rtp pack")
continue
}
elapsed := time.Now().Sub(loggerTime)
if elapsed >= 10*time.Second {
log.Printf("client[%v]read rtp frame.", client)
loggerTime = time.Now()
}
client.InBytes += int(length + 4)
for _, h := range client.RTPHandles {
h(pack)
}
default: // rtsp
builder := strings.Builder{}
builder.WriteByte(b)
contentLen := 0
for !client.Stoped {
line, prefix, err := client.connRW.ReadLine()
if err != nil {
if !client.Stoped {
ch <- err
log.Printf("client.connRW.ReadLine err:%v", err)
}
return
}
if len(line) == 0 {
if contentLen != 0 {
content := make([]byte, contentLen)
_, err = io.ReadFull(client.connRW, content)
if err != nil {
if !client.Stoped {
err = fmt.Errorf("Read content err.ContentLength:%d", contentLen)
ch <- err
}
return
}
builder.Write(content)
}
log.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
log.Println(builder.String())
break
}
s := string(line)
builder.Write(line)
if !prefix {
builder.WriteString("\r\n")
}
if strings.Index(s, "Content-Length:") == 0 {
splits := strings.Split(s, ":")
contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1]))
if err != nil {
if !client.Stoped {
ch <- err
log.Printf("strconv.Atoi err:%v, str:%v", err, splits[1])
}
return
}
}
}
}
}
}
go func() {
r := requestStream()
source <- r
switch r.(type) {
case error:
return
}
stream(source)
}()
return observable.Observable(source)
//return observable.Just(1)
}
func (client *RTSPClient) Stop() {
if client.Stoped {
return
}
client.Stoped = true
for _, h := range client.StopHandles {
h()
}
if client.Conn != nil {
client.connRW.Flush()
client.Conn.Close()
client.Conn = nil
}
}
func (client *RTSPClient) RequestWithPath(method string, path string, headers map[string]string, needResp bool) (resp *Response, err error) {
headers["User-Agent"] = "EasyDarwinGo"
if client.AuthHeaders {
//headers["Authorization"] = this.digest(method, _url);
}
if client.Session != nil {
headers["Session"] = *client.Session
}
client.Seq++
cseq := client.Seq
builder := strings.Builder{}
builder.WriteString(fmt.Sprintf("%s %s RTSP/1.0\r\n", method, path))
builder.WriteString(fmt.Sprintf("CSeq: %d\r\n", cseq))
for k, v := range headers {
builder.WriteString(fmt.Sprintf("%s: %s\r\n", k, v))
}
builder.WriteString(fmt.Sprintf("\r\n"))
s := builder.String()
log.Println("C->S >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
log.Println(s)
_, err = client.connRW.WriteString(s)
if err != nil {
return
}
client.connRW.Flush()
if !needResp {
return nil, nil
}
lineCount := 0
statusCode := 200
status := ""
sid := ""
contentLen := 0
respHeader := make(map[string]string)
var line []byte
builder.Reset()
for !client.Stoped {
isPrefix := false
if line, isPrefix, err = client.connRW.ReadLine(); err != nil {
return
} else {
if len(line) == 0 {
body := ""
if contentLen > 0 {
content := make([]byte, contentLen)
_, err = io.ReadFull(client.connRW, content)
if err != nil {
err = fmt.Errorf("Read content err.ContentLength:%d", contentLen)
return
}
body = string(content)
builder.Write(content)
}
resp = NewResponse(statusCode, status, strconv.Itoa(cseq), sid, body)
log.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
log.Println(builder.String())
return
}
s := string(line)
builder.Write(line)
if !isPrefix {
builder.WriteString("\r\n")
}
if lineCount == 0 {
splits := strings.Split(s, " ")
if len(splits) < 3 {
err = fmt.Errorf("StatusCode Line error:%s", s)
return
}
statusCode, err = strconv.Atoi(splits[1])
if err != nil {
return
}
if statusCode != 200 {
err = fmt.Errorf("Response StatusCode is :%d", statusCode)
return
}
status = splits[2]
}
lineCount++
splits := strings.Split(s, ":")
if len(splits) == 2 {
respHeader[splits[0]] = strings.TrimSpace(splits[1])
}
if strings.Index(s, "Session:") == 0 {
splits := strings.Split(s, ":")
sid = strings.TrimSpace(splits[1])
}
//if strings.Index(s, "CSeq:") == 0 {
// splits := strings.Split(s, ":")
// cseq, err = strconv.Atoi(strings.TrimSpace(splits[1]))
// if err != nil {
// err = fmt.Errorf("Atoi CSeq err. line:%s", s)
// return
// }
//}
if strings.Index(s, "Content-Length:") == 0 {
splits := strings.Split(s, ":")
contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1]))
if err != nil {
return
}
}
}
}
if client.Stoped {
err = fmt.Errorf("Client Stoped.")
}
return
}
func (client *RTSPClient) Request(method string, headers map[string]string) (resp *Response, err error) {
return client.RequestWithPath(method, client.URL, headers, true)
}
func (client *RTSPClient) RequestNoResp(method string, headers map[string]string) (err error) {
if _, err := client.RequestWithPath(method, client.URL, headers, false); err != nil {
return err
}
return nil
}

View File

@ -4,23 +4,32 @@ import (
"fmt"
"log"
"net"
"os"
"os/exec"
"path"
"sync"
"syscall"
"time"
"github.com/penggy/EasyGoLib/utils"
)
type Server struct {
TCPListener *net.TCPListener
TCPPort int
Stoped bool
pushers map[string]*Pusher // Path <-> Pusher
pushersLock sync.RWMutex
TCPListener *net.TCPListener
TCPPort int
Stoped bool
pushers map[string]*Pusher // Path <-> Pusher
pushersLock sync.RWMutex
addPusherCh chan *Pusher
removePusherCh chan *Pusher
}
var Instance *Server = &Server{
Stoped: true,
TCPPort: utils.Conf().Section("rtsp").Key("port").MustInt(554),
pushers: make(map[string]*Pusher),
Stoped: true,
TCPPort: utils.Conf().Section("rtsp").Key("port").MustInt(554),
pushers: make(map[string]*Pusher),
addPusherCh: make(chan *Pusher),
removePusherCh: make(chan *Pusher),
}
func GetServer() *Server {
@ -37,6 +46,80 @@ func (server *Server) Start() (err error) {
return
}
localRecord := utils.Conf().Section("rtsp").Key("save_stream_to_mp4").MustInt(0)
ffmpeg := utils.Conf().Section("rtsp").Key("ffmpeg_path").MustString("")
mp4Path := utils.Conf().Section("rtsp").Key("mp4_dir_path").MustString("")
SaveStreamToLocal := false
if (len(ffmpeg) > 0) && localRecord > 0 && len(mp4Path) > 0 {
err := utils.EnsureDir(mp4Path)
if err != nil {
log.Printf("Create mp4_dir_path[%s] err:%v.", mp4Path, err)
} else {
SaveStreamToLocal = true
}
}
go func() { // save to local.
pusher2ffmpegMap := make(map[*Pusher]*exec.Cmd)
if SaveStreamToLocal {
log.Printf("Prepare to save stream to local....")
defer log.Printf("End save stream to local....")
}
var pusher *Pusher
addChnOk := true
removeChnOk := true
for addChnOk || removeChnOk {
select {
case pusher, addChnOk = <-server.addPusherCh:
if SaveStreamToLocal {
if addChnOk {
dir := path.Join(mp4Path, pusher.Path())
err := utils.EnsureDir(dir)
if err != nil {
log.Printf("EnsureDir:[%s] err:%v.", dir, err)
continue
}
path := path.Join(dir, fmt.Sprintf("%s.mp4", time.Now().Format("20060102150405")))
cmd := exec.Command(ffmpeg, "-i", pusher.URL(), "-c:v", "copy", "-c:a", "copy", path)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Start()
if err != nil {
log.Printf("Start ffmpeg err:%v", err)
}
pusher2ffmpegMap[pusher] = cmd
log.Printf("add ffmpeg to pull stream from pusher[%v]", pusher)
} else {
log.Printf("addPusherChan closed")
}
}
case pusher, removeChnOk = <-server.removePusherCh:
if SaveStreamToLocal {
if removeChnOk {
cmd := pusher2ffmpegMap[pusher]
proc := cmd.Process
if proc != nil {
log.Printf("prepare to SIGTERM to process:%v", proc)
proc.Signal(syscall.SIGTERM)
// proc.Kill()
}
delete(pusher2ffmpegMap, pusher)
log.Printf("delete ffmpeg from pull stream from pusher[%v]", pusher)
} else {
for _, cmd := range pusher2ffmpegMap {
proc := cmd.Process
if proc != nil {
log.Printf("prepare to SIGTERM to process:%v", proc)
proc.Signal(syscall.SIGTERM)
}
}
pusher2ffmpegMap = make(map[*Pusher]*exec.Cmd)
log.Printf("removePusherChan closed")
}
}
}
}
}()
server.Stoped = false
server.TCPListener = listener
log.Println("rtsp server start on", server.TCPPort)
@ -69,25 +152,38 @@ func (server *Server) Stop() {
server.pushersLock.Lock()
server.pushers = make(map[string]*Pusher)
server.pushersLock.Unlock()
close(server.addPusherCh)
close(server.removePusherCh)
}
func (server *Server) AddPusher(pusher *Pusher) {
added := false
server.pushersLock.Lock()
if _, ok := server.pushers[pusher.Path]; !ok {
server.pushers[pusher.Path] = pusher
if _, ok := server.pushers[pusher.Path()]; !ok {
server.pushers[pusher.Path()] = pusher
go pusher.Start()
log.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers))
added = true
}
server.pushersLock.Unlock()
if added {
server.addPusherCh <- pusher
}
}
func (server *Server) RemovePusher(pusher *Pusher) {
removed := false
server.pushersLock.Lock()
if _pusher, ok := server.pushers[pusher.Path]; ok && pusher.ID == _pusher.ID {
delete(server.pushers, pusher.Path)
if _pusher, ok := server.pushers[pusher.Path()]; ok && pusher.ID() == _pusher.ID() {
delete(server.pushers, pusher.Path())
log.Printf("%v end, now pusher size[%d]\n", pusher, len(server.pushers))
removed = true
}
server.pushersLock.Unlock()
if removed {
server.removePusherCh <- pusher
}
}
func (server *Server) GetPusher(path string) (pusher *Pusher) {

View File

@ -349,11 +349,11 @@ func (session *Session) handleRequest(req *Request) {
}
session.Player = NewPlayer(session, pusher)
session.Pusher = pusher
session.AControl = pusher.AControl
session.VControl = pusher.VControl
session.ACodec = pusher.ACodec
session.VCodec = pusher.VCodec
res.SetBody(session.Pusher.SDPRaw)
session.AControl = pusher.AControl()
session.VControl = pusher.VControl()
session.ACodec = pusher.ACodec()
session.VCodec = pusher.VCodec()
res.SetBody(session.Pusher.SDPRaw())
case "SETUP":
ts := req.Header["Transport"]
control := req.URL[strings.LastIndex(req.URL, "/")+1:]