You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
EasyDarwin/rtsp/rtsp-session.go

767 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package rtsp
import (
"bufio"
"bytes"
"crypto/md5"
"encoding/binary"
"fmt"
"io"
"log"
"net"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/EasyDarwin/EasyDarwin/models"
"github.com/penggy/EasyGoLib/db"
"github.com/penggy/EasyGoLib/utils"
"github.com/teris-io/shortid"
)
type RTPPack struct {
Type RTPType
Buffer *bytes.Buffer
}
type SessionType int
const (
SESSION_TYPE_PUSHER SessionType = iota
SESSEION_TYPE_PLAYER
)
func (st SessionType) String() string {
switch st {
case SESSION_TYPE_PUSHER:
return "pusher"
case SESSEION_TYPE_PLAYER:
return "player"
}
return "unknow"
}
type RTPType int
const (
RTP_TYPE_AUDIO RTPType = iota
RTP_TYPE_VIDEO
RTP_TYPE_AUDIOCONTROL
RTP_TYPE_VIDEOCONTROL
)
func (rt RTPType) String() string {
switch rt {
case RTP_TYPE_AUDIO:
return "audio"
case RTP_TYPE_VIDEO:
return "video"
case RTP_TYPE_AUDIOCONTROL:
return "audio control"
case RTP_TYPE_VIDEOCONTROL:
return "video control"
}
return "unknow"
}
type TransType int
const (
TRANS_TYPE_TCP TransType = iota
TRANS_TYPE_UDP
)
func (tt TransType) String() string {
switch tt {
case TRANS_TYPE_TCP:
return "TCP"
case TRANS_TYPE_UDP:
return "UDP"
}
return "unknow"
}
const UDP_BUF_SIZE = 1048576
type Session struct {
SessionLogger
ID string
Server *Server
Conn *RichConn
connRW *bufio.ReadWriter
connWLock sync.RWMutex
Type SessionType
TransType TransType
Path string
URL string
SDPRaw string
SDPMap map[string]*SDPInfo
authorizationEnable bool
nonce string
closeOld bool
debugLogEnable bool
AControl string
VControl string
ACodec string
VCodec string
// stats info
InBytes int
OutBytes int
StartAt time.Time
Timeout int
Stoped bool
//tcp channels
aRTPChannel int
aRTPControlChannel int
vRTPChannel int
vRTPControlChannel int
Pusher *Pusher
Player *Player
UDPClient *UDPClient
RTPHandles []func(*RTPPack)
StopHandles []func()
}
func (session *Session) String() string {
return fmt.Sprintf("session[%v][%v][%s][%s][%s]", session.Type, session.TransType, session.Path, session.ID, session.Conn.RemoteAddr().String())
}
func NewSession(server *Server, conn net.Conn) *Session {
networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(204800)
timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0)
timeoutTCPConn := &RichConn{conn, time.Duration(timeoutMillis) * time.Millisecond}
authorizationEnable := utils.Conf().Section("rtsp").Key("authorization_enable").MustInt(0)
close_old := utils.Conf().Section("rtsp").Key("close_old").MustInt(0)
debugLogEnable := utils.Conf().Section("rtsp").Key("debug_log_enable").MustInt(0)
session := &Session{
ID: shortid.MustGenerate(),
Server: server,
Conn: timeoutTCPConn,
connRW: bufio.NewReadWriter(bufio.NewReaderSize(timeoutTCPConn, networkBuffer), bufio.NewWriterSize(timeoutTCPConn, networkBuffer)),
StartAt: time.Now(),
Timeout: utils.Conf().Section("rtsp").Key("timeout").MustInt(0),
authorizationEnable: authorizationEnable != 0,
debugLogEnable: debugLogEnable != 0,
RTPHandles: make([]func(*RTPPack), 0),
StopHandles: make([]func(), 0),
vRTPChannel: -1,
vRTPControlChannel: -1,
aRTPChannel: -1,
aRTPControlChannel: -1,
closeOld: close_old != 0,
}
session.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", session.ID), log.LstdFlags|log.Lshortfile)
if !utils.Debug {
session.logger.SetOutput(utils.GetLogWriter())
}
return session
}
func (session *Session) Stop() {
if session.Stoped {
return
}
session.Stoped = true
for _, h := range session.StopHandles {
h()
}
if session.Conn != nil {
session.connRW.Flush()
session.Conn.Close()
session.Conn = nil
}
if session.UDPClient != nil {
session.UDPClient.Stop()
session.UDPClient = nil
}
}
func (session *Session) Start() {
defer session.Stop()
buf1 := make([]byte, 1)
buf2 := make([]byte, 2)
logger := session.logger
timer := time.Unix(0, 0)
for !session.Stoped {
if _, err := io.ReadFull(session.connRW, buf1); err != nil {
logger.Println(session, err)
return
}
if buf1[0] == 0x24 { //rtp data
if _, err := io.ReadFull(session.connRW, buf1); err != nil {
logger.Println(err)
return
}
if _, err := io.ReadFull(session.connRW, buf2); err != nil {
logger.Println(err)
return
}
channel := int(buf1[0])
rtpLen := int(binary.BigEndian.Uint16(buf2))
rtpBytes := make([]byte, rtpLen)
if _, err := io.ReadFull(session.connRW, rtpBytes); err != nil {
logger.Println(err)
return
}
rtpBuf := bytes.NewBuffer(rtpBytes)
var pack *RTPPack
switch channel {
case session.aRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIO,
Buffer: rtpBuf,
}
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
logger.Println("Recv an audio RTP package")
timer = time.Now()
}
case session.aRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_AUDIOCONTROL,
Buffer: rtpBuf,
}
case session.vRTPChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEO,
Buffer: rtpBuf,
}
elapsed := time.Now().Sub(timer)
if elapsed >= 30*time.Second {
logger.Println("Recv an video RTP package")
timer = time.Now()
}
case session.vRTPControlChannel:
pack = &RTPPack{
Type: RTP_TYPE_VIDEOCONTROL,
Buffer: rtpBuf,
}
default:
logger.Printf("unknow rtp pack type, %v", channel)
continue
}
session.InBytes += rtpLen + 4
for _, h := range session.RTPHandles {
h(pack)
}
} else { // rtsp cmd
reqBuf := bytes.NewBuffer(nil)
reqBuf.Write(buf1)
for !session.Stoped {
if line, isPrefix, err := session.connRW.ReadLine(); err != nil {
logger.Println(err)
return
} else {
reqBuf.Write(line)
if !isPrefix {
reqBuf.WriteString("\r\n")
}
if len(line) == 0 {
req := NewRequest(reqBuf.String())
if req == nil {
break
}
session.InBytes += reqBuf.Len()
contentLen := req.GetContentLength()
session.InBytes += contentLen
if contentLen > 0 {
bodyBuf := make([]byte, contentLen)
if n, err := io.ReadFull(session.connRW, bodyBuf); err != nil {
logger.Println(err)
return
} else if n != contentLen {
logger.Printf("read rtsp request body failed, expect size[%d], got size[%d]", contentLen, n)
return
}
req.Body = string(bodyBuf)
}
session.handleRequest(req)
break
}
}
}
}
}
}
func CheckAuth(authLine string, method string, sessionNonce string) error {
realmRex := regexp.MustCompile(`realm="(.*?)"`)
nonceRex := regexp.MustCompile(`nonce="(.*?)"`)
usernameRex := regexp.MustCompile(`username="(.*?)"`)
responseRex := regexp.MustCompile(`response="(.*?)"`)
uriRex := regexp.MustCompile(`uri="(.*?)"`)
realm := ""
nonce := ""
username := ""
response := ""
uri := ""
result1 := realmRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
realm = result1[1]
} else {
return fmt.Errorf("CheckAuth error : no realm found")
}
result1 = nonceRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
nonce = result1[1]
} else {
return fmt.Errorf("CheckAuth error : no nonce found")
}
if sessionNonce != nonce {
return fmt.Errorf("CheckAuth error : sessionNonce not same as nonce")
}
result1 = usernameRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
username = result1[1]
} else {
return fmt.Errorf("CheckAuth error : username not found")
}
result1 = responseRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
response = result1[1]
} else {
return fmt.Errorf("CheckAuth error : response not found")
}
result1 = uriRex.FindStringSubmatch(authLine)
if len(result1) == 2 {
uri = result1[1]
} else {
return fmt.Errorf("CheckAuth error : uri not found")
}
var user models.User
err := db.SQLite.Where("Username = ?", username).First(&user).Error
if err != nil {
return fmt.Errorf("CheckAuth error : user not exists")
}
md5UserRealmPwd := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", username, realm, user.Password))))
md5MethodURL := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s", method, uri))))
myResponse := fmt.Sprintf("%x", md5.Sum([]byte(fmt.Sprintf("%s:%s:%s", md5UserRealmPwd, nonce, md5MethodURL))))
if myResponse != response {
return fmt.Errorf("CheckAuth error : response not equal")
}
return nil
}
func (session *Session) handleRequest(req *Request) {
//if session.Timeout > 0 {
// session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second))
//}
logger := session.logger
logger.Printf("<<<\n%s", req)
res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "")
defer func() {
if p := recover(); p != nil {
logger.Printf("handleRequest err ocurs:%v", p)
res.StatusCode = 500
res.Status = fmt.Sprintf("Inner Server Error, %v", p)
}
logger.Printf(">>>\n%s", res)
outBytes := []byte(res.String())
session.connWLock.Lock()
session.connRW.Write(outBytes)
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += len(outBytes)
switch req.Method {
case "PLAY", "RECORD":
switch session.Type {
case SESSEION_TYPE_PLAYER:
if session.Pusher.HasPlayer(session.Player) {
session.Player.Pause(false)
} else {
session.Pusher.AddPlayer(session.Player)
}
// case SESSION_TYPE_PUSHER:
// session.Server.AddPusher(session.Pusher)
}
case "TEARDOWN":
{
session.Stop()
return
}
}
if res.StatusCode != 200 && res.StatusCode != 401 {
logger.Printf("Response request error[%d]. stop session.", res.StatusCode)
session.Stop()
}
}()
if req.Method != "OPTIONS" {
if session.authorizationEnable {
authLine := req.Header["Authorization"]
authFailed := true
if authLine != "" {
err := CheckAuth(authLine, req.Method, session.nonce)
if err == nil {
authFailed = false
} else {
logger.Printf("%v", err)
}
}
if authFailed {
res.StatusCode = 401
res.Status = "Unauthorized"
nonce := fmt.Sprintf("%x", md5.Sum([]byte(shortid.MustGenerate())))
session.nonce = nonce
res.Header["WWW-Authenticate"] = fmt.Sprintf(`Digest realm="EasyDarwin", nonce="%s", algorithm="MD5"`, nonce)
return
}
}
}
switch req.Method {
case "OPTIONS":
res.Header["Public"] = "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, OPTIONS, ANNOUNCE, RECORD"
case "ANNOUNCE":
session.Type = SESSION_TYPE_PUSHER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
session.Path = url.Path
session.SDPRaw = req.Body
session.SDPMap = ParseSDP(req.Body)
sdp, ok := session.SDPMap["audio"]
if ok {
session.AControl = sdp.Control
session.ACodec = sdp.Codec
logger.Printf("audio codec[%s]\n", session.ACodec)
}
sdp, ok = session.SDPMap["video"]
if ok {
session.VControl = sdp.Control
session.VCodec = sdp.Codec
logger.Printf("video codec[%s]\n", session.VCodec)
}
addPusher := false
if session.closeOld {
r, _ := session.Server.TryAttachToPusher(session)
if r < -1 {
logger.Printf("reject pusher.")
res.StatusCode = 406
res.Status = "Not Acceptable"
} else if r == 0 {
addPusher = true
} else {
logger.Printf("Attached to old pusher")
// 尝试发给客户端ANNOUCE
// players := pusher.GetPlayers()
// for _, v := range players {
// sess := v.Session
// hearers := make(map[string]string)
// hearers["Content-Type"] = "application/sdp"
// hearers["Session"] = sess.ID
// hearers["Content-Length"] = strconv.Itoa(len(v.SDPRaw))
// var req = Request{Method: ANNOUNCE, URL: v.URL, Version: "1.0", Header: hearers, Body: pusher.SDPRaw()}
// sess.connWLock.Lock()
// logger.Println(req.String())
// outBytes := []byte(req.String())
// sess.connRW.Write(outBytes)
// sess.connRW.Flush()
// sess.connWLock.Unlock()
// }
}
} else {
addPusher = true
}
if addPusher {
session.Pusher = NewPusher(session)
addedToServer := session.Server.AddPusher(session.Pusher)
if !addedToServer {
logger.Printf("reject pusher.")
res.StatusCode = 406
res.Status = "Not Acceptable"
}
}
case "DESCRIBE":
session.Type = SESSEION_TYPE_PLAYER
session.URL = req.URL
url, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
session.Path = url.Path
pusher := session.Server.GetPusher(session.Path)
if pusher == nil {
res.StatusCode = 404
res.Status = "NOT FOUND"
return
}
session.Player = NewPlayer(session, pusher)
session.Pusher = pusher
session.AControl = pusher.AControl()
session.VControl = pusher.VControl()
session.ACodec = pusher.ACodec()
session.VCodec = pusher.VCodec()
session.Conn.timeout = 0
res.SetBody(session.Pusher.SDPRaw())
case "SETUP":
ts := req.Header["Transport"]
// control字段可能是`stream=1`字样也可能是rtsp://...字样。即control可能是url的path也可能是整个url
// 例1
// a=control:streamid=1
// 例2
// a=control:rtsp://192.168.1.64/trackID=1
// 例3
// a=control:?ctype=video
setupUrl, err := url.Parse(req.URL)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid URL"
return
}
if setupUrl.Port() == "" {
setupUrl.Host = fmt.Sprintf("%s:554", setupUrl.Host)
}
setupPath := setupUrl.String()
// error status. SETUP without ANNOUNCE or DESCRIBE.
if session.Pusher == nil {
res.StatusCode = 500
res.Status = "Error Status"
return
}
//setupPath = setupPath[strings.LastIndex(setupPath, "/")+1:]
vPath := ""
if strings.Index(strings.ToLower(session.VControl), "rtsp://") == 0 {
vControlUrl, err := url.Parse(session.VControl)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid VControl"
return
}
if vControlUrl.Port() == "" {
vControlUrl.Host = fmt.Sprintf("%s:554", vControlUrl.Host)
}
vPath = vControlUrl.String()
} else {
vPath = session.VControl
}
aPath := ""
if strings.Index(strings.ToLower(session.AControl), "rtsp://") == 0 {
aControlUrl, err := url.Parse(session.AControl)
if err != nil {
res.StatusCode = 500
res.Status = "Invalid AControl"
return
}
if aControlUrl.Port() == "" {
aControlUrl.Host = fmt.Sprintf("%s:554", aControlUrl.Host)
}
aPath = aControlUrl.String()
} else {
aPath = session.AControl
}
mtcp := regexp.MustCompile("interleaved=(\\d+)(-(\\d+))?")
mudp := regexp.MustCompile("client_port=(\\d+)(-(\\d+))?")
if tcpMatchs := mtcp.FindStringSubmatch(ts); tcpMatchs != nil {
session.TransType = TRANS_TYPE_TCP
if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) {
session.aRTPChannel, _ = strconv.Atoi(tcpMatchs[1])
session.aRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3])
} else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) {
session.vRTPChannel, _ = strconv.Atoi(tcpMatchs[1])
session.vRTPControlChannel, _ = strconv.Atoi(tcpMatchs[3])
} else {
res.StatusCode = 500
res.Status = fmt.Sprintf("SETUP [TCP] got UnKown control:%s", setupPath)
logger.Printf("SETUP [TCP] got UnKown control:%s", setupPath)
}
logger.Printf("Parse SETUP req.TRANSPORT:TCP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath)
} else if udpMatchs := mudp.FindStringSubmatch(ts); udpMatchs != nil {
session.TransType = TRANS_TYPE_UDP
// no need for tcp timeout.
session.Conn.timeout = 0
if session.Type == SESSEION_TYPE_PLAYER && session.UDPClient == nil {
session.UDPClient = &UDPClient{
Session: session,
}
}
if session.Type == SESSION_TYPE_PUSHER && session.Pusher.UDPServer == nil {
session.Pusher.UDPServer = &UDPServer{
Session: session,
}
}
logger.Printf("Parse SETUP req.TRANSPORT:UDP.Session.Type:%d,control:%s, AControl:%s,VControl:%s", session.Type, setupPath, aPath, vPath)
if setupPath == aPath || aPath != "" && strings.LastIndex(setupPath, aPath) == len(setupPath)-len(aPath) {
if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.APort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.AControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup audio error, %v", err)
return
}
}
if session.Type == SESSION_TYPE_PUSHER {
if err := session.Pusher.UDPServer.SetupAudio(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp server setup audio error, %v", err)
return
}
tss := strings.Split(ts, ";")
idx := -1
for i, val := range tss {
if val == udpMatchs[0] {
idx = i
}
}
tail := append([]string{}, tss[idx+1:]...)
tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.Pusher.UDPServer.APort, session.Pusher.UDPServer.AControlPort))
tss = append(tss, tail...)
ts = strings.Join(tss, ";")
}
} else if setupPath == vPath || vPath != "" && strings.LastIndex(setupPath, vPath) == len(setupPath)-len(vPath) {
if session.Type == SESSEION_TYPE_PLAYER {
session.UDPClient.VPort, _ = strconv.Atoi(udpMatchs[1])
session.UDPClient.VControlPort, _ = strconv.Atoi(udpMatchs[3])
if err := session.UDPClient.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp client setup video error, %v", err)
return
}
}
if session.Type == SESSION_TYPE_PUSHER {
if err := session.Pusher.UDPServer.SetupVideo(); err != nil {
res.StatusCode = 500
res.Status = fmt.Sprintf("udp server setup video error, %v", err)
return
}
tss := strings.Split(ts, ";")
idx := -1
for i, val := range tss {
if val == udpMatchs[0] {
idx = i
}
}
tail := append([]string{}, tss[idx+1:]...)
tss = append(tss[:idx+1], fmt.Sprintf("server_port=%d-%d", session.Pusher.UDPServer.VPort, session.Pusher.UDPServer.VControlPort))
tss = append(tss, tail...)
ts = strings.Join(tss, ";")
}
} else {
logger.Printf("SETUP [UDP] got UnKown control:%s", setupPath)
}
}
res.Header["Transport"] = ts
case "PLAY":
// error status. PLAY without ANNOUNCE or DESCRIBE.
if session.Pusher == nil {
res.StatusCode = 500
res.Status = "Error Status"
return
}
res.Header["Range"] = req.Header["Range"]
case "RECORD":
// error status. RECORD without ANNOUNCE or DESCRIBE.
if session.Pusher == nil {
res.StatusCode = 500
res.Status = "Error Status"
return
}
case "PAUSE":
if session.Player == nil {
res.StatusCode = 500
res.Status = "Error Status"
return
}
session.Player.Pause(true)
}
}
func (session *Session) SendRTP(pack *RTPPack) (err error) {
if pack == nil {
return fmt.Errorf("player send rtp got nil pack")
}
if session.TransType == TRANS_TYPE_UDP {
if session.UDPClient == nil {
return fmt.Errorf("player use udp transport but udp client not found")
}
return session.UDPClient.SendRTP(pack)
}
switch pack.Type {
case RTP_TYPE_AUDIO:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.aRTPChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(pack.Buffer.Len()))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer.Bytes())
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += pack.Buffer.Len() + 4
case RTP_TYPE_AUDIOCONTROL:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.aRTPControlChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(pack.Buffer.Len()))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer.Bytes())
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += pack.Buffer.Len() + 4
case RTP_TYPE_VIDEO:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.vRTPChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(pack.Buffer.Len()))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer.Bytes())
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += pack.Buffer.Len() + 4
case RTP_TYPE_VIDEOCONTROL:
bufChannel := make([]byte, 2)
bufChannel[0] = 0x24
bufChannel[1] = byte(session.vRTPControlChannel)
session.connWLock.Lock()
session.connRW.Write(bufChannel)
bufLen := make([]byte, 2)
binary.BigEndian.PutUint16(bufLen, uint16(pack.Buffer.Len()))
session.connRW.Write(bufLen)
session.connRW.Write(pack.Buffer.Bytes())
session.connRW.Flush()
session.connWLock.Unlock()
session.OutBytes += pack.Buffer.Len() + 4
default:
return fmt.Errorf("session tcp send rtp got unkown pack type[%v]", pack.Type)
}
return
}