From 2d156502a1f1e29b156ca17ffe82436127932aef Mon Sep 17 00:00:00 2001 From: macbookpro Date: Sat, 22 Dec 2018 11:36:11 +0800 Subject: [PATCH] =?UTF-8?q?1=20tag=20log=20with=20session=20id=20=20?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=B8=AD=E9=99=84=E5=8A=A0=E4=BA=86session?= =?UTF-8?q?=20id=202=20when=20response=20error,=20close=20session.=20=20?= =?UTF-8?q?=E5=BD=93=E5=93=8D=E5=BA=94=E5=BC=82=E5=B8=B8=E6=97=B6=E5=85=B3?= =?UTF-8?q?=E9=97=ADsession?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 10 +++++---- rtsp/player.go | 9 +++++---- rtsp/pusher.go | 16 ++++++++++++--- rtsp/rtsp-client.go | 40 ++++++++++++++++++++---------------- rtsp/rtsp-server.go | 46 ++++++++++++++++++++++++------------------ rtsp/rtsp-session.go | 43 ++++++++++++++++++++++++++------------- rtsp/session-logger.go | 7 +++++++ rtsp/udp-client.go | 31 ++++++++++++++-------------- rtsp/udp-server.go | 45 +++++++++++++++++++++-------------------- 9 files changed, 148 insertions(+), 99 deletions(-) create mode 100644 rtsp/session-logger.go diff --git a/main.go b/main.go index 301857ff..dfdfeabd 100644 --- a/main.go +++ b/main.go @@ -103,10 +103,6 @@ func (p *program) Start(s service.Service) (err error) { } p.StartRTSP() p.StartHTTP() - if !utils.Debug { - log.Println("log files -->", utils.LogDir()) - log.SetOutput(utils.GetLogWriter()) - } go func() { for range routers.API.RestartChan { p.StopHTTP() @@ -132,8 +128,14 @@ func main() { flag.StringVar(&utils.FlagVarConfFile, "config", "", "configure file path") flag.Parse() tail := flag.Args() + // log log.SetPrefix("[EasyDarwin] ") log.SetFlags(log.Lshortfile | log.LstdFlags) + if !utils.Debug { + log.Println("log files -->", utils.LogDir()) + log.SetOutput(utils.GetLogWriter()) + } + sec := utils.Conf().Section("service") svcConfig := &service.Config{ Name: sec.Key("name").MustString("EasyDarwin_Service"), diff --git a/rtsp/player.go b/rtsp/player.go index dfcebf6b..ee9ad672 100644 --- a/rtsp/player.go +++ b/rtsp/player.go @@ -1,7 +1,6 @@ package rtsp import ( - "log" "sync" ) @@ -27,8 +26,9 @@ func NewPlayer(session *Session, pusher *Pusher) (player *Player) { } func (player *Player) QueueRTP(pack *RTPPack) *Player { + logger := player.logger if pack == nil { - log.Printf("player queue enter nil pack, drop it") + logger.Printf("player queue enter nil pack, drop it") return player } player.cond.L.Lock() @@ -39,6 +39,7 @@ func (player *Player) QueueRTP(pack *RTPPack) *Player { } func (player *Player) Start() { + logger := player.logger for !player.Stoped { var pack *RTPPack player.cond.L.Lock() @@ -52,12 +53,12 @@ func (player *Player) Start() { player.cond.L.Unlock() if pack == nil { if !player.Stoped { - log.Printf("player not stoped, but queue take out nil pack") + logger.Printf("player not stoped, but queue take out nil pack") } continue } if err := player.SendRTP(pack); err != nil { - log.Println(err) + logger.Println(err) } } } diff --git a/rtsp/pusher.go b/rtsp/pusher.go index 528047ba..7e96169e 100644 --- a/rtsp/pusher.go +++ b/rtsp/pusher.go @@ -68,6 +68,13 @@ func (pusher *Pusher) ID() string { return pusher.RTSPClient.ID } +func (pusher *Pusher) Logger() *log.Logger { + if pusher.Session != nil { + return pusher.Session.logger + } + return pusher.RTSPClient.logger +} + func (pusher *Pusher) VCodec() string { if pusher.Session != nil { return pusher.Session.VCodec @@ -203,6 +210,7 @@ func (pusher *Pusher) QueueRTP(pack *RTPPack) *Pusher { } func (pusher *Pusher) Start() { + logger := pusher.Logger() for !pusher.Stoped() { var pack *RTPPack pusher.cond.L.Lock() @@ -216,7 +224,7 @@ func (pusher *Pusher) Start() { pusher.cond.L.Unlock() if pack == nil { if !pusher.Stoped() { - log.Printf("pusher not stoped, but queue take out nil pack") + logger.Printf("pusher not stoped, but queue take out nil pack") } continue } @@ -268,6 +276,7 @@ func (pusher *Pusher) GetPlayers() (players map[string]*Player) { } func (pusher *Pusher) AddPlayer(player *Player) *Pusher { + logger := pusher.Logger() if pusher.gopCacheEnable { pusher.gopCacheLock.RLock() for _, pack := range pusher.gopCache { @@ -281,20 +290,21 @@ func (pusher *Pusher) AddPlayer(player *Player) *Pusher { if _, ok := pusher.players[player.ID]; !ok { pusher.players[player.ID] = player go player.Start() - log.Printf("%v start, now player size[%d]", player, len(pusher.players)) + logger.Printf("%v start, now player size[%d]", player, len(pusher.players)) } pusher.playersLock.Unlock() return pusher } func (pusher *Pusher) RemovePlayer(player *Player) *Pusher { + logger := pusher.Logger() pusher.playersLock.Lock() if len(pusher.players) == 0 { pusher.playersLock.Unlock() return pusher } delete(pusher.players, player.ID) - log.Printf("%v end, now player size[%d]\n", player, len(pusher.players)) + logger.Printf("%v end, now player size[%d]\n", player, len(pusher.players)) pusher.playersLock.Unlock() return pusher } diff --git a/rtsp/rtsp-client.go b/rtsp/rtsp-client.go index c6d4ba1a..3bc9ce55 100644 --- a/rtsp/rtsp-client.go +++ b/rtsp/rtsp-client.go @@ -10,6 +10,7 @@ import ( "log" "net" "net/url" + "os" "regexp" "strconv" "strings" @@ -23,7 +24,8 @@ import ( ) type RTSPClient struct { - Server *Server + Server *Server + SessionLogger Stoped bool Status string URL string @@ -80,6 +82,10 @@ func NewRTSPClient(server *Server, rawUrl string, sendOptionMillis int64) (clien OptionIntervalMillis: sendOptionMillis, StartAt: time.Now(), } + client.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", client.ID), log.LstdFlags|log.Lshortfile) + if !utils.Debug { + client.logger.SetOutput(utils.GetLogWriter()) + } return } @@ -157,7 +163,10 @@ func (client *RTSPClient) checkAuth(method string, resp *Response) (string, erro func (client *RTSPClient) Start(timeout time.Duration) error { //source := make(chan interface{}) - + logger := client.logger + if !utils.Debug { + logger.SetOutput(utils.GetLogWriter()) + } if timeout == 0 { timeoutMillis := utils.Conf().Section("rtsp").Key("timeout").MustInt(0) timeout = time.Duration(timeoutMillis) * time.Millisecond @@ -322,7 +331,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { b, err := client.connRW.ReadByte() if err != nil { if !client.Stoped { - log.Printf("client.connRW.ReadByte err:%v", err) + logger.Printf("client.connRW.ReadByte err:%v", err) } return } @@ -334,7 +343,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { if err != nil { if !client.Stoped { - log.Printf("io.ReadFull err:%v", err) + logger.Printf("io.ReadFull err:%v", err) } return } @@ -344,7 +353,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { _, err = io.ReadFull(client.connRW, content) if err != nil { if !client.Stoped { - log.Printf("io.ReadFull err:%v", err) + logger.Printf("io.ReadFull err:%v", err) } return } @@ -373,16 +382,16 @@ func (client *RTSPClient) Start(timeout time.Duration) error { Buffer: rtpBuf, } default: - log.Printf("unknow rtp pack type, channel:%v", channel) + logger.Printf("unknow rtp pack type, channel:%v", channel) continue } if pack == nil { - log.Printf("session tcp got nil rtp pack") + logger.Printf("session tcp got nil rtp pack") continue } elapsed := time.Now().Sub(loggerTime) if elapsed >= 10*time.Second { - log.Printf("%v read rtp frame.", client) + logger.Printf("%v read rtp frame.", client) loggerTime = time.Now() } client.InBytes += int(length + 4) @@ -398,7 +407,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { line, prefix, err := client.connRW.ReadLine() if err != nil { if !client.Stoped { - log.Printf("client.connRW.ReadLine err:%v", err) + logger.Printf("client.connRW.ReadLine err:%v", err) } return } @@ -414,8 +423,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { } builder.Write(content) } - log.Println("S->C <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") - log.Println(builder.String()) + logger.Printf("<<<\n%s", builder.String()) break } s := string(line) @@ -429,7 +437,7 @@ func (client *RTSPClient) Start(timeout time.Duration) error { contentLen, err = strconv.Atoi(strings.TrimSpace(splits[1])) if err != nil { if !client.Stoped { - log.Printf("strconv.Atoi err:%v, str:%v", err, splits[1]) + logger.Printf("strconv.Atoi err:%v, str:%v", err, splits[1]) } return } @@ -475,6 +483,7 @@ func (client *RTSPClient) Stop() { } func (client *RTSPClient) RequestWithPath(method string, path string, headers map[string]string, needResp bool) (resp *Response, err error) { + logger := client.logger headers["User-Agent"] = "EasyDarwinGo" if len(headers["Authorization"]) == 0 { if len(client.authLine) != 0 { @@ -497,8 +506,7 @@ func (client *RTSPClient) RequestWithPath(method string, path string, headers ma } builder.WriteString(fmt.Sprintf("\r\n")) s := builder.String() - log.Println(">>") - log.Println(s) + logger.Printf(">>>\n%s", s) _, err = client.connRW.WriteString(s) if err != nil { return @@ -535,9 +543,7 @@ func (client *RTSPClient) RequestWithPath(method string, path string, headers ma } resp = NewResponse(statusCode, status, strconv.Itoa(cseq), sid, body) resp.Header = respHeader - - log.Println("<<") - log.Println(builder.String()) + logger.Printf("<<\n%s", builder.String()) if !(statusCode >= 200 && statusCode <= 300) { err = fmt.Errorf("Response StatusCode is :%d", statusCode) diff --git a/rtsp/rtsp-server.go b/rtsp/rtsp-server.go index 8c401908..e9f07b95 100644 --- a/rtsp/rtsp-server.go +++ b/rtsp/rtsp-server.go @@ -16,6 +16,7 @@ import ( ) type Server struct { + SessionLogger TCPListener *net.TCPListener TCPPort int Stoped bool @@ -36,11 +37,13 @@ func GetServer() *Server { addPusherCh: make(chan *Pusher), removePusherCh: make(chan *Pusher), } + Instance.logger = log.New(os.Stdout, "[RTSPServer]", log.LstdFlags|log.Lshortfile) } return Instance } func (server *Server) Start() (err error) { + logger := server.logger addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", server.TCPPort)) if err != nil { return @@ -58,7 +61,7 @@ func (server *Server) Start() (err error) { if (len(ffmpeg) > 0) && localRecord > 0 && len(m3u8_dir_path) > 0 { err := utils.EnsureDir(m3u8_dir_path) if err != nil { - log.Printf("Create m3u8_dir_path[%s] err:%v.", m3u8_dir_path, err) + logger.Printf("Create m3u8_dir_path[%s] err:%v.", m3u8_dir_path, err) } else { SaveStreamToLocal = true } @@ -66,8 +69,8 @@ func (server *Server) Start() (err error) { go func() { // save to local. pusher2ffmpegMap := make(map[*Pusher]*exec.Cmd) if SaveStreamToLocal { - log.Printf("Prepare to save stream to local....") - defer log.Printf("End save stream to local....") + logger.Printf("Prepare to save stream to local....") + defer logger.Printf("End save stream to local....") } var pusher *Pusher addChnOk := true @@ -80,7 +83,7 @@ func (server *Server) Start() (err error) { dir := path.Join(m3u8_dir_path, pusher.Path(), time.Now().Format("20060102150405")) err := utils.EnsureDir(dir) if err != nil { - log.Printf("EnsureDir:[%s] err:%v.", dir, err) + logger.Printf("EnsureDir:[%s] err:%v.", dir, err) continue } m3u8path := path.Join(dir, fmt.Sprintf("out.m3u8")) @@ -94,12 +97,12 @@ func (server *Server) Start() (err error) { } err = cmd.Start() if err != nil { - log.Printf("Start ffmpeg err:%v", err) + logger.Printf("Start ffmpeg err:%v", err) } pusher2ffmpegMap[pusher] = cmd - log.Printf("add ffmpeg [%v] to pull stream from pusher[%v]", cmd, pusher) + logger.Printf("add ffmpeg [%v] to pull stream from pusher[%v]", cmd, pusher) } else { - log.Printf("addPusherChan closed") + logger.Printf("addPusherChan closed") } } case pusher, removeChnOk = <-server.removePusherCh: @@ -108,7 +111,7 @@ func (server *Server) Start() (err error) { cmd := pusher2ffmpegMap[pusher] proc := cmd.Process if proc != nil { - log.Printf("prepare to SIGTERM to process:%v", proc) + logger.Printf("prepare to SIGTERM to process:%v", proc) proc.Signal(syscall.SIGTERM) proc.Wait() // proc.Kill() @@ -116,22 +119,22 @@ func (server *Server) Start() (err error) { // see "Wait releases any resources associated with the Cmd." // if closer, ok := cmd.Stdout.(io.Closer); ok { // closer.Close() - // log.Printf("process:%v Stdout closed.", proc) + // logger.Printf("process:%v Stdout closed.", proc) // } - log.Printf("process:%v terminate.", proc) + logger.Printf("process:%v terminate.", proc) } delete(pusher2ffmpegMap, pusher) - log.Printf("delete ffmpeg from pull stream from pusher[%v]", pusher) + logger.Printf("delete ffmpeg from pull stream from pusher[%v]", pusher) } else { for _, cmd := range pusher2ffmpegMap { proc := cmd.Process if proc != nil { - log.Printf("prepare to SIGTERM to process:%v", proc) + logger.Printf("prepare to SIGTERM to process:%v", proc) proc.Signal(syscall.SIGTERM) } } pusher2ffmpegMap = make(map[*Pusher]*exec.Cmd) - log.Printf("removePusherChan closed") + logger.Printf("removePusherChan closed") } } } @@ -140,20 +143,20 @@ func (server *Server) Start() (err error) { server.Stoped = false server.TCPListener = listener - log.Println("rtsp server start on", server.TCPPort) + logger.Println("rtsp server start on", server.TCPPort) networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) for !server.Stoped { conn, err := server.TCPListener.Accept() if err != nil { - log.Println(err) + logger.Println(err) continue } if tcpConn, ok := conn.(*net.TCPConn); ok { if err := tcpConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("rtsp server conn set read buffer error, %v", err) + logger.Printf("rtsp server conn set read buffer error, %v", err) } if err := tcpConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("rtsp server conn set write buffer error, %v", err) + logger.Printf("rtsp server conn set write buffer error, %v", err) } } @@ -164,7 +167,8 @@ func (server *Server) Start() (err error) { } func (server *Server) Stop() { - log.Println("rtsp server stop on", server.TCPPort) + logger := server.logger + logger.Println("rtsp server stop on", server.TCPPort) server.Stoped = true if server.TCPListener != nil { server.TCPListener.Close() @@ -179,12 +183,13 @@ func (server *Server) Stop() { } func (server *Server) AddPusher(pusher *Pusher) { + logger := server.logger added := false server.pushersLock.Lock() if _, ok := server.pushers[pusher.Path()]; !ok { server.pushers[pusher.Path()] = pusher go pusher.Start() - log.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers)) + logger.Printf("%v start, now pusher size[%d]", pusher, len(server.pushers)) added = true } server.pushersLock.Unlock() @@ -194,11 +199,12 @@ func (server *Server) AddPusher(pusher *Pusher) { } func (server *Server) RemovePusher(pusher *Pusher) { + logger := server.logger removed := false server.pushersLock.Lock() if _pusher, ok := server.pushers[pusher.Path()]; ok && pusher.ID() == _pusher.ID() { delete(server.pushers, pusher.Path()) - log.Printf("%v end, now pusher size[%d]\n", pusher, len(server.pushers)) + logger.Printf("%v end, now pusher size[%d]\n", pusher, len(server.pushers)) removed = true } server.pushersLock.Unlock() diff --git a/rtsp/rtsp-session.go b/rtsp/rtsp-session.go index 0e9c4a45..c29c8a52 100644 --- a/rtsp/rtsp-session.go +++ b/rtsp/rtsp-session.go @@ -9,6 +9,7 @@ import ( "log" "net" "net/url" + "os" "regexp" "strconv" "strings" @@ -85,6 +86,7 @@ func (tt TransType) String() string { const UDP_BUF_SIZE = 1048576 type Session struct { + SessionLogger ID string Server *Server Conn *RichConn @@ -143,6 +145,11 @@ func NewSession(server *Server, conn net.Conn) *Session { RTPHandles: make([]func(*RTPPack), 0), StopHandles: make([]func(), 0), } + + session.logger = log.New(os.Stdout, fmt.Sprintf("[%s]", session.ID), log.LstdFlags|log.Lshortfile) + if !utils.Debug { + session.logger.SetOutput(utils.GetLogWriter()) + } return session } @@ -169,25 +176,26 @@ func (session *Session) Start() { defer session.Stop() buf1 := make([]byte, 1) buf2 := make([]byte, 2) + logger := session.logger for !session.Stoped { if _, err := io.ReadFull(session.connRW, buf1); err != nil { - log.Println(session, err) + logger.Println(session, err) return } if buf1[0] == 0x24 { //rtp data if _, err := io.ReadFull(session.connRW, buf1); err != nil { - log.Println(err) + logger.Println(err) return } if _, err := io.ReadFull(session.connRW, buf2); err != nil { - log.Println(err) + logger.Println(err) return } channel := int(buf1[0]) rtpLen := int(binary.BigEndian.Uint16(buf2)) rtpBytes := make([]byte, rtpLen) if _, err := io.ReadFull(session.connRW, rtpBytes); err != nil { - log.Println(err) + logger.Println(err) return } rtpBuf := bytes.NewBuffer(rtpBytes) @@ -214,11 +222,11 @@ func (session *Session) Start() { Buffer: rtpBuf, } default: - log.Printf("unknow rtp pack type, %v", pack.Type) + logger.Printf("unknow rtp pack type, %v", pack.Type) continue } if pack == nil { - log.Printf("session tcp got nil rtp pack") + logger.Printf("session tcp got nil rtp pack") continue } session.InBytes += rtpLen + 4 @@ -230,7 +238,7 @@ func (session *Session) Start() { reqBuf.Write(buf1) for !session.Stoped { if line, isPrefix, err := session.connRW.ReadLine(); err != nil { - log.Println(err) + logger.Println(err) return } else { reqBuf.Write(line) @@ -248,10 +256,10 @@ func (session *Session) Start() { if contentLen > 0 { bodyBuf := make([]byte, contentLen) if n, err := io.ReadFull(session.connRW, bodyBuf); err != nil { - log.Println(err) + logger.Println(err) return } else if n != contentLen { - log.Printf("read rtsp request body failed, expect size[%d], got size[%d]", contentLen, n) + logger.Printf("read rtsp request body failed, expect size[%d], got size[%d]", contentLen, n) return } req.Body = string(bodyBuf) @@ -269,15 +277,15 @@ func (session *Session) handleRequest(req *Request) { //if session.Timeout > 0 { // session.Conn.SetDeadline(time.Now().Add(time.Duration(session.Timeout) * time.Second)) //} - - log.Println("<<<", req) + logger := session.logger + logger.Printf("<<<\n%s", req) res := NewResponse(200, "OK", req.Header["CSeq"], session.ID, "") defer func() { if p := recover(); p != nil { res.StatusCode = 500 res.Status = fmt.Sprintf("Inner Server Error, %v", p) } - log.Println(">>>", res) + logger.Printf(">>>\n%s", res) outBytes := []byte(res.String()) session.connWLock.Lock() session.connRW.Write(outBytes) @@ -293,6 +301,13 @@ func (session *Session) handleRequest(req *Request) { session.Server.AddPusher(session.Pusher) } case "TEARDOWN": + { + session.Stop() + return + } + } + if res.StatusCode != 200 { + logger.Printf("Response request error[%d]. stop session.", res.StatusCode) session.Stop() } }() @@ -317,13 +332,13 @@ func (session *Session) handleRequest(req *Request) { if ok { session.AControl = sdp.Control session.ACodec = sdp.Codec - log.Printf("audio codec[%s]\n", session.ACodec) + logger.Printf("audio codec[%s]\n", session.ACodec) } sdp, ok = session.SDPMap["video"] if ok { session.VControl = sdp.Control session.VCodec = sdp.Codec - log.Printf("video codec[%s]\n", session.VCodec) + logger.Printf("video codec[%s]\n", session.VCodec) } session.Pusher = NewPusher(session) if session.Server.GetPusher(session.Path) == nil { diff --git a/rtsp/session-logger.go b/rtsp/session-logger.go new file mode 100644 index 00000000..a0aea631 --- /dev/null +++ b/rtsp/session-logger.go @@ -0,0 +1,7 @@ +package rtsp + +import "log" + +type SessionLogger struct { + logger *log.Logger +} diff --git a/rtsp/udp-client.go b/rtsp/udp-client.go index 1d97791d..15ea9d70 100644 --- a/rtsp/udp-client.go +++ b/rtsp/udp-client.go @@ -2,7 +2,6 @@ package rtsp import ( "fmt" - "log" "net" "strings" @@ -10,7 +9,7 @@ import ( ) type UDPClient struct { - Session *Session + *Session APort int AConn *net.UDPConn @@ -48,13 +47,14 @@ func (s *UDPClient) Stop() { } func (c *UDPClient) SetupAudio() (err error) { + logger := c.logger defer func() { if err != nil { - log.Println(err) + logger.Println(err) c.Stop() } }() - host := c.Session.Conn.RemoteAddr().String() + host := c.Conn.RemoteAddr().String() host = host[:strings.LastIndex(host, ":")] addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.APort)) if err != nil { @@ -66,10 +66,10 @@ func (c *UDPClient) SetupAudio() (err error) { } networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) if err := c.AConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("udp client audio conn set read buffer error, %v", err) + logger.Printf("udp client audio conn set read buffer error, %v", err) } if err := c.AConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("udp client audio conn set write buffer error, %v", err) + logger.Printf("udp client audio conn set write buffer error, %v", err) } addr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.AControlPort)) @@ -81,22 +81,23 @@ func (c *UDPClient) SetupAudio() (err error) { return } if err := c.AControlConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("udp client audio control conn set read buffer error, %v", err) + logger.Printf("udp client audio control conn set read buffer error, %v", err) } if err := c.AControlConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("udp client audio control conn set write buffer error, %v", err) + logger.Printf("udp client audio control conn set write buffer error, %v", err) } return } func (c *UDPClient) SetupVideo() (err error) { + logger := c.logger defer func() { if err != nil { - log.Println(err) + logger.Println(err) c.Stop() } }() - host := c.Session.Conn.RemoteAddr().String() + host := c.Conn.RemoteAddr().String() host = host[:strings.LastIndex(host, ":")] addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VPort)) if err != nil { @@ -108,10 +109,10 @@ func (c *UDPClient) SetupVideo() (err error) { } networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) if err := c.VConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("udp client video conn set read buffer error, %v", err) + logger.Printf("udp client video conn set read buffer error, %v", err) } if err := c.VConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("udp client video conn set write buffer error, %v", err) + logger.Printf("udp client video conn set write buffer error, %v", err) } addr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, c.VControlPort)) @@ -123,10 +124,10 @@ func (c *UDPClient) SetupVideo() (err error) { return } if err := c.VControlConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("udp client video control conn set read buffer error, %v", err) + logger.Printf("udp client video control conn set read buffer error, %v", err) } if err := c.VControlConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("udp client video control conn set write buffer error, %v", err) + logger.Printf("udp client video control conn set write buffer error, %v", err) } return } @@ -159,7 +160,7 @@ func (c *UDPClient) SendRTP(pack *RTPPack) (err error) { err = fmt.Errorf("udp client write bytes error, %v", err) return } - // log.Printf("udp client write [%d/%d]", n, pack.Buffer.Len()) + // logger.Printf("udp client write [%d/%d]", n, pack.Buffer.Len()) c.Session.OutBytes += n return } diff --git a/rtsp/udp-server.go b/rtsp/udp-server.go index ef95bab7..5de7962f 100644 --- a/rtsp/udp-server.go +++ b/rtsp/udp-server.go @@ -2,7 +2,6 @@ package rtsp import ( "bytes" - "log" "net" "strconv" "strings" @@ -11,7 +10,7 @@ import ( ) type UDPServer struct { - Session *Session + *Session APort int AConn *net.UDPConn @@ -49,6 +48,7 @@ func (s *UDPServer) Stop() { } func (s *UDPServer) SetupAudio() (err error) { + logger := s.logger addr, err := net.ResolveUDPAddr("udp", ":0") if err != nil { return @@ -59,10 +59,10 @@ func (s *UDPServer) SetupAudio() (err error) { } networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) if err := s.AConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("udp server audio conn set read buffer error, %v", err) + logger.Printf("udp server audio conn set read buffer error, %v", err) } if err := s.AConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("udp server audio conn set write buffer error, %v", err) + logger.Printf("udp server audio conn set write buffer error, %v", err) } la := s.AConn.LocalAddr().String() strPort := la[strings.LastIndex(la, ":")+1:] @@ -72,8 +72,8 @@ func (s *UDPServer) SetupAudio() (err error) { } go func() { bufUDP := make([]byte, UDP_BUF_SIZE) - log.Printf("udp server start listen audio port[%d]", s.APort) - defer log.Printf("udp server stop listen audio port[%d]", s.APort) + logger.Printf("udp server start listen audio port[%d]", s.APort) + defer logger.Printf("udp server stop listen audio port[%d]", s.APort) for !s.Stoped { if n, _, err := s.AConn.ReadFromUDP(bufUDP); err == nil { rtpBytes := make([]byte, n) @@ -87,7 +87,7 @@ func (s *UDPServer) SetupAudio() (err error) { h(pack) } } else { - log.Println("udp server read audio pack error", err) + logger.Println("udp server read audio pack error", err) continue } } @@ -101,10 +101,10 @@ func (s *UDPServer) SetupAudio() (err error) { return } if err := s.AControlConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("udp server audio control conn set read buffer error, %v", err) + logger.Printf("udp server audio control conn set read buffer error, %v", err) } if err := s.AControlConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("udp server audio control conn set write buffer error, %v", err) + logger.Printf("udp server audio control conn set write buffer error, %v", err) } la = s.AControlConn.LocalAddr().String() strPort = la[strings.LastIndex(la, ":")+1:] @@ -114,8 +114,8 @@ func (s *UDPServer) SetupAudio() (err error) { } go func() { bufUDP := make([]byte, UDP_BUF_SIZE) - log.Printf("udp server start listen audio control port[%d]", s.AControlPort) - defer log.Printf("udp server stop listen audio control port[%d]", s.AControlPort) + logger.Printf("udp server start listen audio control port[%d]", s.AControlPort) + defer logger.Printf("udp server stop listen audio control port[%d]", s.AControlPort) for !s.Stoped { if n, _, err := s.AControlConn.ReadFromUDP(bufUDP); err == nil { rtpBytes := make([]byte, n) @@ -129,7 +129,7 @@ func (s *UDPServer) SetupAudio() (err error) { h(pack) } } else { - log.Println("udp server read audio control pack error", err) + logger.Println("udp server read audio control pack error", err) continue } } @@ -138,6 +138,7 @@ func (s *UDPServer) SetupAudio() (err error) { } func (s *UDPServer) SetupVideo() (err error) { + logger := s.logger addr, err := net.ResolveUDPAddr("udp", ":0") if err != nil { return @@ -148,10 +149,10 @@ func (s *UDPServer) SetupVideo() (err error) { } networkBuffer := utils.Conf().Section("rtsp").Key("network_buffer").MustInt(1048576) if err := s.VConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("udp server video conn set read buffer error, %v", err) + logger.Printf("udp server video conn set read buffer error, %v", err) } if err := s.VConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("udp server video conn set write buffer error, %v", err) + logger.Printf("udp server video conn set write buffer error, %v", err) } la := s.VConn.LocalAddr().String() strPort := la[strings.LastIndex(la, ":")+1:] @@ -161,8 +162,8 @@ func (s *UDPServer) SetupVideo() (err error) { } go func() { bufUDP := make([]byte, UDP_BUF_SIZE) - log.Printf("udp server start listen video port[%d]", s.VPort) - defer log.Printf("udp server stop listen video port[%d]", s.VPort) + logger.Printf("udp server start listen video port[%d]", s.VPort) + defer logger.Printf("udp server stop listen video port[%d]", s.VPort) for !s.Stoped { if n, _, err := s.VConn.ReadFromUDP(bufUDP); err == nil { rtpBytes := make([]byte, n) @@ -176,7 +177,7 @@ func (s *UDPServer) SetupVideo() (err error) { h(pack) } } else { - log.Println("udp server read video pack error", err) + logger.Println("udp server read video pack error", err) continue } } @@ -191,10 +192,10 @@ func (s *UDPServer) SetupVideo() (err error) { return } if err := s.VControlConn.SetReadBuffer(networkBuffer); err != nil { - log.Printf("udp server video control conn set read buffer error, %v", err) + logger.Printf("udp server video control conn set read buffer error, %v", err) } if err := s.VControlConn.SetWriteBuffer(networkBuffer); err != nil { - log.Printf("udp server video control conn set write buffer error, %v", err) + logger.Printf("udp server video control conn set write buffer error, %v", err) } la = s.VControlConn.LocalAddr().String() strPort = la[strings.LastIndex(la, ":")+1:] @@ -204,8 +205,8 @@ func (s *UDPServer) SetupVideo() (err error) { } go func() { bufUDP := make([]byte, UDP_BUF_SIZE) - log.Printf("udp server start listen video control port[%d]", s.VControlPort) - defer log.Printf("udp server stop listen video control port[%d]", s.VControlPort) + logger.Printf("udp server start listen video control port[%d]", s.VControlPort) + defer logger.Printf("udp server stop listen video control port[%d]", s.VControlPort) for !s.Stoped { if n, _, err := s.VControlConn.ReadFromUDP(bufUDP); err == nil { rtpBytes := make([]byte, n) @@ -219,7 +220,7 @@ func (s *UDPServer) SetupVideo() (err error) { h(pack) } } else { - log.Println("udp server read video control pack error", err) + logger.Println("udp server read video control pack error", err) continue } }