From a4b105dedb14e9235d88264f47f5cc523da53718 Mon Sep 17 00:00:00 2001 From: Guy Lewin Date: Tue, 17 Mar 2020 13:52:44 -0400 Subject: [PATCH] [Feature] Server Plugin - Ping and NewWorkConn RPC (#1702) --- doc/server_plugin.md | 41 ++++++++++++++++- models/plugin/server/manager.go | 78 +++++++++++++++++++++++++++++++-- models/plugin/server/plugin.go | 6 ++- models/plugin/server/types.go | 10 +++++ server/control.go | 17 ++++++- server/service.go | 24 +++++++--- 6 files changed, 161 insertions(+), 15 deletions(-) diff --git a/doc/server_plugin.md b/doc/server_plugin.md index fa511b3a..cacb0a42 100644 --- a/doc/server_plugin.md +++ b/doc/server_plugin.md @@ -70,7 +70,7 @@ The response can look like any of the following: ### Operation -Currently `Login` and `NewProxy` operations are supported. +Currently `Login`, `NewProxy`, `Ping` and `NewWorkConn` operations are supported. #### Login @@ -135,6 +135,43 @@ Create new proxy } ``` +#### Ping + +Heartbeat from frpc + +``` +{ + "content": { + "user": { + "user": , + "metas": mapstring + "run_id": + }, + "timestamp": , + "privilege_key": + } +} +``` + +#### NewWorkConn + +New work connection received from frpc (RPC sent after `run_id` is matched with an existing frp connection) + +``` +{ + "content": { + "user": { + "user": , + "metas": mapstring + "run_id": + }, + "run_id": + "timestamp": , + "privilege_key": + } +} +``` + ### Server Plugin Configuration ```ini @@ -155,7 +192,7 @@ ops = NewProxy addr: the address where the external RPC service listens on. path: http request url path for the POST request. -ops: operations plugin needs to handle (e.g. "Login", "NewProxy"). +ops: operations plugin needs to handle (e.g. "Login", "NewProxy", ...). ### Metadata diff --git a/models/plugin/server/manager.go b/models/plugin/server/manager.go index 94642932..62992c86 100644 --- a/models/plugin/server/manager.go +++ b/models/plugin/server/manager.go @@ -24,14 +24,18 @@ import ( ) type Manager struct { - loginPlugins []Plugin - newProxyPlugins []Plugin + loginPlugins []Plugin + newProxyPlugins []Plugin + pingPlugins []Plugin + newWorkConnPlugins []Plugin } func NewManager() *Manager { return &Manager{ - loginPlugins: make([]Plugin, 0), - newProxyPlugins: make([]Plugin, 0), + loginPlugins: make([]Plugin, 0), + newProxyPlugins: make([]Plugin, 0), + pingPlugins: make([]Plugin, 0), + newWorkConnPlugins: make([]Plugin, 0), } } @@ -42,6 +46,12 @@ func (m *Manager) Register(p Plugin) { if p.IsSupport(OpNewProxy) { m.newProxyPlugins = append(m.newProxyPlugins, p) } + if p.IsSupport(OpPing) { + m.pingPlugins = append(m.pingPlugins, p) + } + if p.IsSupport(OpNewWorkConn) { + m.pingPlugins = append(m.pingPlugins, p) + } } func (m *Manager) Login(content *LoginContent) (*LoginContent, error) { @@ -103,3 +113,63 @@ func (m *Manager) NewProxy(content *NewProxyContent) (*NewProxyContent, error) { } return content, nil } + +func (m *Manager) Ping(content *PingContent) (*PingContent, error) { + var ( + res = &Response{ + Reject: false, + Unchange: true, + } + retContent interface{} + err error + ) + reqid, _ := util.RandId() + xl := xlog.New().AppendPrefix("reqid: " + reqid) + ctx := xlog.NewContext(context.Background(), xl) + ctx = NewReqidContext(ctx, reqid) + + for _, p := range m.pingPlugins { + res, retContent, err = p.Handle(ctx, OpPing, *content) + if err != nil { + xl.Warn("send Ping request to plugin [%s] error: %v", p.Name(), err) + return nil, errors.New("send Ping request to plugin error") + } + if res.Reject { + return nil, fmt.Errorf("%s", res.RejectReason) + } + if !res.Unchange { + content = retContent.(*PingContent) + } + } + return content, nil +} + +func (m *Manager) NewWorkConn(content *NewWorkConnContent) (*NewWorkConnContent, error) { + var ( + res = &Response{ + Reject: false, + Unchange: true, + } + retContent interface{} + err error + ) + reqid, _ := util.RandId() + xl := xlog.New().AppendPrefix("reqid: " + reqid) + ctx := xlog.NewContext(context.Background(), xl) + ctx = NewReqidContext(ctx, reqid) + + for _, p := range m.pingPlugins { + res, retContent, err = p.Handle(ctx, OpPing, *content) + if err != nil { + xl.Warn("send NewWorkConn request to plugin [%s] error: %v", p.Name(), err) + return nil, errors.New("send NewWorkConn request to plugin error") + } + if res.Reject { + return nil, fmt.Errorf("%s", res.RejectReason) + } + if !res.Unchange { + content = retContent.(*NewWorkConnContent) + } + } + return content, nil +} diff --git a/models/plugin/server/plugin.go b/models/plugin/server/plugin.go index fd16b145..a89a16b0 100644 --- a/models/plugin/server/plugin.go +++ b/models/plugin/server/plugin.go @@ -21,8 +21,10 @@ import ( const ( APIVersion = "0.1.0" - OpLogin = "Login" - OpNewProxy = "NewProxy" + OpLogin = "Login" + OpNewProxy = "NewProxy" + OpPing = "Ping" + OpNewWorkConn = "NewWorkConn" ) type Plugin interface { diff --git a/models/plugin/server/types.go b/models/plugin/server/types.go index 5d9c695e..017236d0 100644 --- a/models/plugin/server/types.go +++ b/models/plugin/server/types.go @@ -45,3 +45,13 @@ type NewProxyContent struct { User UserInfo `json:"user"` msg.NewProxy } + +type PingContent struct { + User UserInfo `json:"user"` + msg.Ping +} + +type NewWorkConnContent struct { + User UserInfo `json:"user"` + msg.NewWorkConn +} diff --git a/server/control.go b/server/control.go index e832ea90..4d7529e3 100644 --- a/server/control.go +++ b/server/control.go @@ -450,10 +450,23 @@ func (ctl *Control) manager() { ctl.CloseProxy(m) xl.Info("close proxy [%s] success", m.ProxyName) case *msg.Ping: - if err := ctl.authVerifier.VerifyPing(m); err != nil { + content := &plugin.PingContent{ + User: plugin.UserInfo{ + User: ctl.loginMsg.User, + Metas: ctl.loginMsg.Metas, + RunId: ctl.loginMsg.RunId, + }, + Ping: *m, + } + retContent, err := ctl.pluginManager.Ping(content) + if err == nil { + m = &retContent.Ping + err = ctl.authVerifier.VerifyPing(m) + } + if err != nil { xl.Warn("received invalid ping: %v", err) ctl.sendCh <- &msg.Pong{ - Error: "invalid authentication in ping", + Error: util.GenerateResponseErrorString("invalid ping", err, ctl.serverCfg.DetailedErrorsToClient), } return } diff --git a/server/service.go b/server/service.go index 43870733..d3c31699 100644 --- a/server/service.go +++ b/server/service.go @@ -457,13 +457,27 @@ func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) xl.Warn("No client control found for run id [%s]", newMsg.RunId) return fmt.Errorf("no client control found for run id [%s]", newMsg.RunId) } - // Check auth. - if err := svr.authVerifier.VerifyNewWorkConn(newMsg); err != nil { - xl.Warn("Invalid authentication in NewWorkConn message on run id [%s]", newMsg.RunId) + // server plugin hook + content := &plugin.NewWorkConnContent{ + User: plugin.UserInfo{ + User: ctl.loginMsg.User, + Metas: ctl.loginMsg.Metas, + RunId: ctl.loginMsg.RunId, + }, + NewWorkConn: *newMsg, + } + retContent, err := svr.pluginManager.NewWorkConn(content) + if err == nil { + newMsg = &retContent.NewWorkConn + // Check auth. + err = svr.authVerifier.VerifyNewWorkConn(newMsg) + } + if err != nil { + xl.Warn("invalid NewWorkConn with run id [%s]", newMsg.RunId) msg.WriteMsg(workConn, &msg.StartWorkConn{ - Error: "invalid authentication in NewWorkConn", + Error: util.GenerateResponseErrorString("invalid NewWorkConn", err, ctl.serverCfg.DetailedErrorsToClient), }) - return fmt.Errorf("invalid authentication in NewWorkConn message on run id [%s]", newMsg.RunId) + return fmt.Errorf("invalid NewWorkConn with run id [%s]", newMsg.RunId) } return ctl.RegisterWorkConn(workConn) }