diff --git a/pkg/plugin/server/manager.go b/pkg/plugin/server/manager.go index dabfb46c..d623e45c 100644 --- a/pkg/plugin/server/manager.go +++ b/pkg/plugin/server/manager.go @@ -1,17 +1,3 @@ -// Copyright 2019 fatedier, fatedier@gmail.com -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package server import ( @@ -24,238 +10,126 @@ import ( "github.com/fatedier/frp/pkg/util/xlog" ) -type Manager struct { - loginPlugins []Plugin - newProxyPlugins []Plugin - closeProxyPlugins []Plugin - pingPlugins []Plugin - newWorkConnPlugins []Plugin - newUserConnPlugins []Plugin +type CompositeOperationPluginGateway struct { + router *PluginOpsRouter } -func NewManager() *Manager { - return &Manager{ - loginPlugins: make([]Plugin, 0), - newProxyPlugins: make([]Plugin, 0), - closeProxyPlugins: make([]Plugin, 0), - pingPlugins: make([]Plugin, 0), - newWorkConnPlugins: make([]Plugin, 0), - newUserConnPlugins: make([]Plugin, 0), +func NewManager() *CompositeOperationPluginGateway { + return &CompositeOperationPluginGateway{ + router: NewPluginOpsRouter(), } } -func (m *Manager) Register(p Plugin) { - if p.IsSupport(OpLogin) { - m.loginPlugins = append(m.loginPlugins, p) - } - if p.IsSupport(OpNewProxy) { - m.newProxyPlugins = append(m.newProxyPlugins, p) - } - if p.IsSupport(OpCloseProxy) { - m.closeProxyPlugins = append(m.closeProxyPlugins, p) - } - if p.IsSupport(OpPing) { - m.pingPlugins = append(m.pingPlugins, p) - } - if p.IsSupport(OpNewWorkConn) { - m.newWorkConnPlugins = append(m.newWorkConnPlugins, p) - } - if p.IsSupport(OpNewUserConn) { - m.newUserConnPlugins = append(m.newUserConnPlugins, p) +type PluginOpsRouter struct { + operations map[string][]Plugin +} + +func NewPluginOpsRouter() *PluginOpsRouter { + return &PluginOpsRouter{ + operations: make(map[string][]Plugin), } } -func (m *Manager) Login(content *LoginContent) (*LoginContent, error) { - if len(m.loginPlugins) == 0 { - return content, nil - } +func (r *PluginOpsRouter) AddPlugin(op string, p Plugin) { + r.operations[op] = append(r.operations[op], p) +} - var ( - res = &Response{ - Reject: false, - Unchange: true, +func (m *CompositeOperationPluginGateway) Register(p Plugin) { + for _, op := range []string{OpLogin, OpNewProxy, OpCloseProxy, OpPing, OpNewWorkConn, OpNewUserConn} { + if p.IsSupport(op) { + m.router.AddPlugin(op, p) } - retContent any - err error - ) + } +} + +func buildCtx() (context.Context, string) { reqid, _ := util.RandID() xl := xlog.New().AppendPrefix("reqid: " + reqid) ctx := xlog.NewContext(context.Background(), xl) ctx = NewReqidContext(ctx, reqid) - - for _, p := range m.loginPlugins { - res, retContent, err = p.Handle(ctx, OpLogin, *content) - if err != nil { - xl.Warnf("send Login request to plugin [%s] error: %v", p.Name(), err) - return nil, errors.New("send Login request to plugin error") - } - if res.Reject { - return nil, fmt.Errorf("%s", res.RejectReason) - } - if !res.Unchange { - content = retContent.(*LoginContent) - } - } - return content, nil + return ctx, reqid } -func (m *Manager) NewProxy(content *NewProxyContent) (*NewProxyContent, error) { - if len(m.newProxyPlugins) == 0 { - return content, nil +func (m *CompositeOperationPluginGateway) doRequest(op string, input any) (any, error) { + ctx, _ := buildCtx() + plugins := m.router.operations[op] + if len(plugins) == 0 { + return input, nil } - var ( - res = &Response{ - Reject: false, - Unchange: true, - } - retContent any - err error - ) - reqid, _ := util.RandID() - xl := xlog.New().AppendPrefix("reqid: " + reqid) - ctx := xlog.NewContext(context.Background(), xl) - ctx = NewReqidContext(ctx, reqid) + response := &Response{ + Reject: false, + Unchange: true, + } - for _, p := range m.newProxyPlugins { - res, retContent, err = p.Handle(ctx, OpNewProxy, *content) + var retContent any + var err error + for _, p := range plugins { + response, retContent, err = p.Handle(ctx, op, input) if err != nil { - xl.Warnf("send NewProxy request to plugin [%s] error: %v", p.Name(), err) - return nil, errors.New("send NewProxy request to plugin error") + return nil, fmt.Errorf("plugin [%s] failed: %v", p.Name(), err) } - if res.Reject { - return nil, fmt.Errorf("%s", res.RejectReason) + if response.Reject { + return nil, fmt.Errorf("%s", response.RejectReason) } - if !res.Unchange { - content = retContent.(*NewProxyContent) + if !response.Unchange { + input = retContent } } - return content, nil + return input, nil } -func (m *Manager) CloseProxy(content *CloseProxyContent) error { - if len(m.closeProxyPlugins) == 0 { - return nil +func (m *CompositeOperationPluginGateway) Login(c *LoginContent) (*LoginContent, error) { + out, err := m.doRequest(OpLogin, *c) + if err != nil { + return nil, err } + return out.(*LoginContent), nil +} - errs := make([]string, 0) - reqid, _ := util.RandID() - xl := xlog.New().AppendPrefix("reqid: " + reqid) - ctx := xlog.NewContext(context.Background(), xl) - ctx = NewReqidContext(ctx, reqid) +func (m *CompositeOperationPluginGateway) NewProxy(c *NewProxyContent) (*NewProxyContent, error) { + out, err := m.doRequest(OpNewProxy, *c) + if err != nil { + return nil, err + } + return out.(*NewProxyContent), nil +} - for _, p := range m.closeProxyPlugins { - _, _, err := p.Handle(ctx, OpCloseProxy, *content) +func (m *CompositeOperationPluginGateway) Ping(c *PingContent) (*PingContent, error) { + out, err := m.doRequest(OpPing, *c) + if err != nil { + return nil, err + } + return out.(*PingContent), nil +} + +func (m *CompositeOperationPluginGateway) NewWorkConn(c *NewWorkConnContent) (*NewWorkConnContent, error) { + out, err := m.doRequest(OpNewWorkConn, *c) + if err != nil { + return nil, err + } + return out.(*NewWorkConnContent), nil +} + +func (m *CompositeOperationPluginGateway) NewUserConn(c *NewUserConnContent) (*NewUserConnContent, error) { + out, err := m.doRequest(OpNewUserConn, *c) + if err != nil { + return nil, err + } + return out.(*NewUserConnContent), nil +} + +func (m *CompositeOperationPluginGateway) CloseProxy(c *CloseProxyContent) error { + ctx, _ := buildCtx() + errs := []string{} + for _, p := range m.router.operations[OpCloseProxy] { + _, _, err := p.Handle(ctx, OpCloseProxy, *c) if err != nil { - xl.Warnf("send CloseProxy request to plugin [%s] error: %v", p.Name(), err) errs = append(errs, fmt.Sprintf("[%s]: %v", p.Name(), err)) } } - if len(errs) > 0 { - return fmt.Errorf("send CloseProxy request to plugin errors: %s", strings.Join(errs, "; ")) + return fmt.Errorf("plugin CloseProxy errors: %s", strings.Join(errs, "; ")) } return nil } - -func (m *Manager) Ping(content *PingContent) (*PingContent, error) { - if len(m.pingPlugins) == 0 { - return content, nil - } - - var ( - res = &Response{ - Reject: false, - Unchange: true, - } - retContent any - err error - ) - reqid, _ := util.RandID() - xl := xlog.New().AppendPrefix("reqid: " + reqid) - ctx := xlog.NewContext(context.Background(), xl) - ctx = NewReqidContext(ctx, reqid) - - for _, p := range m.pingPlugins { - res, retContent, err = p.Handle(ctx, OpPing, *content) - if err != nil { - xl.Warnf("send Ping request to plugin [%s] error: %v", p.Name(), err) - return nil, errors.New("send Ping request to plugin error") - } - if res.Reject { - return nil, fmt.Errorf("%s", res.RejectReason) - } - if !res.Unchange { - content = retContent.(*PingContent) - } - } - return content, nil -} - -func (m *Manager) NewWorkConn(content *NewWorkConnContent) (*NewWorkConnContent, error) { - if len(m.newWorkConnPlugins) == 0 { - return content, nil - } - - var ( - res = &Response{ - Reject: false, - Unchange: true, - } - retContent any - err error - ) - reqid, _ := util.RandID() - xl := xlog.New().AppendPrefix("reqid: " + reqid) - ctx := xlog.NewContext(context.Background(), xl) - ctx = NewReqidContext(ctx, reqid) - - for _, p := range m.newWorkConnPlugins { - res, retContent, err = p.Handle(ctx, OpNewWorkConn, *content) - if err != nil { - xl.Warnf("send NewWorkConn request to plugin [%s] error: %v", p.Name(), err) - return nil, errors.New("send NewWorkConn request to plugin error") - } - if res.Reject { - return nil, fmt.Errorf("%s", res.RejectReason) - } - if !res.Unchange { - content = retContent.(*NewWorkConnContent) - } - } - return content, nil -} - -func (m *Manager) NewUserConn(content *NewUserConnContent) (*NewUserConnContent, error) { - if len(m.newUserConnPlugins) == 0 { - return content, nil - } - - var ( - res = &Response{ - Reject: false, - Unchange: true, - } - retContent any - err error - ) - reqid, _ := util.RandID() - xl := xlog.New().AppendPrefix("reqid: " + reqid) - ctx := xlog.NewContext(context.Background(), xl) - ctx = NewReqidContext(ctx, reqid) - - for _, p := range m.newUserConnPlugins { - res, retContent, err = p.Handle(ctx, OpNewUserConn, *content) - if err != nil { - xl.Infof("send NewUserConn request to plugin [%s] error: %v", p.Name(), err) - return nil, errors.New("send NewUserConn request to plugin error") - } - if res.Reject { - return nil, fmt.Errorf("%s", res.RejectReason) - } - if !res.Unchange { - content = retContent.(*NewUserConnContent) - } - } - return content, nil -}