From aa31d7ad0bdcc05ee0a4add6cfd0eacf04c241c5 Mon Sep 17 00:00:00 2001 From: Craig O'Donnell Date: Wed, 8 Feb 2023 11:38:36 -0500 Subject: [PATCH] support bandwidth_limit set by server plugin (#3271) * support bandwidth_limit set by server plugin * limiter at proxy level * bandwidth_limit_mode * updates tests for bandwidth_limit_mode default * bandwidth_limit_mode as string * add checkForSrv for bandwidth_limit_mode * bandwidth_limit flags for sub cmds * gci write --- README.md | 3 +++ client/proxy/proxy.go | 2 +- cmd/frpc/sub/http.go | 8 +++++++ cmd/frpc/sub/https.go | 8 +++++++ cmd/frpc/sub/root.go | 38 ++++++++++++++++--------------- cmd/frpc/sub/stcp.go | 8 +++++++ cmd/frpc/sub/sudp.go | 8 +++++++ cmd/frpc/sub/tcp.go | 8 +++++++ cmd/frpc/sub/tcpmux.go | 8 +++++++ cmd/frpc/sub/udp.go | 8 +++++++ cmd/frpc/sub/xtcp.go | 8 +++++++ conf/frpc_full.ini | 2 ++ doc/server_plugin.md | 2 ++ pkg/config/client_test.go | 35 ++++++++++++++++++++++------ pkg/config/proxy.go | 48 +++++++++++++++++++++++++++++++++++++++ pkg/config/proxy_test.go | 29 +++++++++++++++++------ pkg/config/types.go | 3 +++ pkg/msg/msg.go | 20 +++++++++------- server/proxy/http.go | 13 +++++++++++ server/proxy/https.go | 6 +++++ server/proxy/proxy.go | 19 ++++++++++++++++ server/proxy/stcp.go | 6 +++++ server/proxy/sudp.go | 6 +++++ server/proxy/tcp.go | 6 +++++ server/proxy/tcpmux.go | 6 +++++ server/proxy/udp.go | 12 ++++++++++ server/proxy/xtcp.go | 5 ++++ 27 files changed, 284 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index ff22141..73e48c7 100644 --- a/README.md +++ b/README.md @@ -713,10 +713,13 @@ type = tcp local_port = 22 remote_port = 6000 bandwidth_limit = 1MB +bandwidth_limit_mode = client ``` Set `bandwidth_limit` in each proxy's configure to enable this feature. Supported units are `MB` and `KB`. +Set `bandwidth_limit_mode` to `client` or `server` to limit bandwidth on the client or server side. Default is `client`. + ### TCP Stream Multiplexing frp supports tcp stream multiplexing since v0.10.0 like HTTP2 Multiplexing, in which case all logic connections to the same frpc are multiplexed into the same TCP connection. diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index 158773b..98a8a67 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -54,7 +54,7 @@ type Proxy interface { func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) { var limiter *rate.Limiter limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes() - if limitBytes > 0 { + if limitBytes > 0 && pxyConf.GetBaseInfo().BandwidthLimitMode == config.BandwidthLimitModeClient { limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes)) } diff --git a/cmd/frpc/sub/http.go b/cmd/frpc/sub/http.go index 4e193b7..22eeefe 100644 --- a/cmd/frpc/sub/http.go +++ b/cmd/frpc/sub/http.go @@ -39,6 +39,8 @@ func init() { httpCmd.PersistentFlags().StringVarP(&hostHeaderRewrite, "host_header_rewrite", "", "", "host header rewrite") httpCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") httpCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + httpCmd.PersistentFlags().StringVarP(&bandwidthLimit, "bandwidth_limit", "", "", "bandwidth limit") + httpCmd.PersistentFlags().StringVarP(&bandwidthLimitMode, "bandwidth_limit_mode", "", config.BandwidthLimitModeClient, "bandwidth limit mode") rootCmd.AddCommand(httpCmd) } @@ -70,6 +72,12 @@ var httpCmd = &cobra.Command{ cfg.HostHeaderRewrite = hostHeaderRewrite cfg.UseEncryption = useEncryption cfg.UseCompression = useCompression + cfg.BandwidthLimit, err = config.NewBandwidthQuantity(bandwidthLimit) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + cfg.BandwidthLimitMode = bandwidthLimitMode err = cfg.CheckForCli() if err != nil { diff --git a/cmd/frpc/sub/https.go b/cmd/frpc/sub/https.go index 8a14d39..187aa99 100644 --- a/cmd/frpc/sub/https.go +++ b/cmd/frpc/sub/https.go @@ -35,6 +35,8 @@ func init() { httpsCmd.PersistentFlags().StringVarP(&subDomain, "sd", "", "", "sub domain") httpsCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") httpsCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + httpsCmd.PersistentFlags().StringVarP(&bandwidthLimit, "bandwidth_limit", "", "", "bandwidth limit") + httpsCmd.PersistentFlags().StringVarP(&bandwidthLimitMode, "bandwidth_limit_mode", "", config.BandwidthLimitModeClient, "bandwidth limit mode") rootCmd.AddCommand(httpsCmd) } @@ -62,6 +64,12 @@ var httpsCmd = &cobra.Command{ cfg.SubDomain = subDomain cfg.UseEncryption = useEncryption cfg.UseCompression = useCompression + cfg.BandwidthLimit, err = config.NewBandwidthQuantity(bandwidthLimit) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + cfg.BandwidthLimitMode = bandwidthLimitMode err = cfg.CheckForCli() if err != nil { diff --git a/cmd/frpc/sub/root.go b/cmd/frpc/sub/root.go index 19206c6..cd39d88 100644 --- a/cmd/frpc/sub/root.go +++ b/cmd/frpc/sub/root.go @@ -54,24 +54,26 @@ var ( logMaxDays int disableLogColor bool - proxyName string - localIP string - localPort int - remotePort int - useEncryption bool - useCompression bool - customDomains string - subDomain string - httpUser string - httpPwd string - locations string - hostHeaderRewrite string - role string - sk string - multiplexer string - serverName string - bindAddr string - bindPort int + proxyName string + localIP string + localPort int + remotePort int + useEncryption bool + useCompression bool + bandwidthLimit string + bandwidthLimitMode string + customDomains string + subDomain string + httpUser string + httpPwd string + locations string + hostHeaderRewrite string + role string + sk string + multiplexer string + serverName string + bindAddr string + bindPort int tlsEnable bool ) diff --git a/cmd/frpc/sub/stcp.go b/cmd/frpc/sub/stcp.go index d84b23b..ad0a57c 100644 --- a/cmd/frpc/sub/stcp.go +++ b/cmd/frpc/sub/stcp.go @@ -37,6 +37,8 @@ func init() { stcpCmd.PersistentFlags().IntVarP(&bindPort, "bind_port", "", 0, "bind port") stcpCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") stcpCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + stcpCmd.PersistentFlags().StringVarP(&bandwidthLimit, "bandwidth_limit", "", "", "bandwidth limit") + stcpCmd.PersistentFlags().StringVarP(&bandwidthLimitMode, "bandwidth_limit_mode", "", config.BandwidthLimitModeClient, "bandwidth limit mode") rootCmd.AddCommand(stcpCmd) } @@ -70,6 +72,12 @@ var stcpCmd = &cobra.Command{ cfg.Sk = sk cfg.LocalIP = localIP cfg.LocalPort = localPort + cfg.BandwidthLimit, err = config.NewBandwidthQuantity(bandwidthLimit) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + cfg.BandwidthLimitMode = bandwidthLimitMode err = cfg.CheckForCli() if err != nil { fmt.Println(err) diff --git a/cmd/frpc/sub/sudp.go b/cmd/frpc/sub/sudp.go index f96a12e..0ae8498 100644 --- a/cmd/frpc/sub/sudp.go +++ b/cmd/frpc/sub/sudp.go @@ -37,6 +37,8 @@ func init() { sudpCmd.PersistentFlags().IntVarP(&bindPort, "bind_port", "", 0, "bind port") sudpCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") sudpCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + sudpCmd.PersistentFlags().StringVarP(&bandwidthLimit, "bandwidth_limit", "", "", "bandwidth limit") + sudpCmd.PersistentFlags().StringVarP(&bandwidthLimitMode, "bandwidth_limit_mode", "", config.BandwidthLimitModeClient, "bandwidth limit mode") rootCmd.AddCommand(sudpCmd) } @@ -70,6 +72,12 @@ var sudpCmd = &cobra.Command{ cfg.Sk = sk cfg.LocalIP = localIP cfg.LocalPort = localPort + cfg.BandwidthLimit, err = config.NewBandwidthQuantity(bandwidthLimit) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + cfg.BandwidthLimitMode = bandwidthLimitMode err = cfg.CheckForCli() if err != nil { fmt.Println(err) diff --git a/cmd/frpc/sub/tcp.go b/cmd/frpc/sub/tcp.go index 7e86734..2c597f1 100644 --- a/cmd/frpc/sub/tcp.go +++ b/cmd/frpc/sub/tcp.go @@ -33,6 +33,8 @@ func init() { tcpCmd.PersistentFlags().IntVarP(&remotePort, "remote_port", "r", 0, "remote port") tcpCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") tcpCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + tcpCmd.PersistentFlags().StringVarP(&bandwidthLimit, "bandwidth_limit", "", "", "bandwidth limit") + tcpCmd.PersistentFlags().StringVarP(&bandwidthLimitMode, "bandwidth_limit_mode", "", config.BandwidthLimitModeClient, "bandwidth limit mode") rootCmd.AddCommand(tcpCmd) } @@ -59,6 +61,12 @@ var tcpCmd = &cobra.Command{ cfg.RemotePort = remotePort cfg.UseEncryption = useEncryption cfg.UseCompression = useCompression + cfg.BandwidthLimit, err = config.NewBandwidthQuantity(bandwidthLimit) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + cfg.BandwidthLimitMode = bandwidthLimitMode err = cfg.CheckForCli() if err != nil { diff --git a/cmd/frpc/sub/tcpmux.go b/cmd/frpc/sub/tcpmux.go index cef845d..ecdd600 100644 --- a/cmd/frpc/sub/tcpmux.go +++ b/cmd/frpc/sub/tcpmux.go @@ -36,6 +36,8 @@ func init() { tcpMuxCmd.PersistentFlags().StringVarP(&multiplexer, "mux", "", "", "multiplexer") tcpMuxCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") tcpMuxCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + tcpMuxCmd.PersistentFlags().StringVarP(&bandwidthLimit, "bandwidth_limit", "", "", "bandwidth limit") + tcpMuxCmd.PersistentFlags().StringVarP(&bandwidthLimitMode, "bandwidth_limit_mode", "", config.BandwidthLimitModeClient, "bandwidth limit mode") rootCmd.AddCommand(tcpMuxCmd) } @@ -64,6 +66,12 @@ var tcpMuxCmd = &cobra.Command{ cfg.Multiplexer = multiplexer cfg.UseEncryption = useEncryption cfg.UseCompression = useCompression + cfg.BandwidthLimit, err = config.NewBandwidthQuantity(bandwidthLimit) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + cfg.BandwidthLimitMode = bandwidthLimitMode err = cfg.CheckForCli() if err != nil { diff --git a/cmd/frpc/sub/udp.go b/cmd/frpc/sub/udp.go index 984ad06..f9dfa3f 100644 --- a/cmd/frpc/sub/udp.go +++ b/cmd/frpc/sub/udp.go @@ -33,6 +33,8 @@ func init() { udpCmd.PersistentFlags().IntVarP(&remotePort, "remote_port", "r", 0, "remote port") udpCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") udpCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + udpCmd.PersistentFlags().StringVarP(&bandwidthLimit, "bandwidth_limit", "", "", "bandwidth limit") + udpCmd.PersistentFlags().StringVarP(&bandwidthLimitMode, "bandwidth_limit_mode", "", config.BandwidthLimitModeClient, "bandwidth limit mode") rootCmd.AddCommand(udpCmd) } @@ -59,6 +61,12 @@ var udpCmd = &cobra.Command{ cfg.RemotePort = remotePort cfg.UseEncryption = useEncryption cfg.UseCompression = useCompression + cfg.BandwidthLimit, err = config.NewBandwidthQuantity(bandwidthLimit) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + cfg.BandwidthLimitMode = bandwidthLimitMode err = cfg.CheckForCli() if err != nil { diff --git a/cmd/frpc/sub/xtcp.go b/cmd/frpc/sub/xtcp.go index b842698..ea201d5 100644 --- a/cmd/frpc/sub/xtcp.go +++ b/cmd/frpc/sub/xtcp.go @@ -37,6 +37,8 @@ func init() { xtcpCmd.PersistentFlags().IntVarP(&bindPort, "bind_port", "", 0, "bind port") xtcpCmd.PersistentFlags().BoolVarP(&useEncryption, "ue", "", false, "use encryption") xtcpCmd.PersistentFlags().BoolVarP(&useCompression, "uc", "", false, "use compression") + xtcpCmd.PersistentFlags().StringVarP(&bandwidthLimit, "bandwidth_limit", "", "", "bandwidth limit") + xtcpCmd.PersistentFlags().StringVarP(&bandwidthLimitMode, "bandwidth_limit_mode", "", config.BandwidthLimitModeClient, "bandwidth limit mode") rootCmd.AddCommand(xtcpCmd) } @@ -70,6 +72,12 @@ var xtcpCmd = &cobra.Command{ cfg.Sk = sk cfg.LocalIP = localIP cfg.LocalPort = localPort + cfg.BandwidthLimit, err = config.NewBandwidthQuantity(bandwidthLimit) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + cfg.BandwidthLimitMode = bandwidthLimitMode err = cfg.CheckForCli() if err != nil { fmt.Println(err) diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 8e39de1..29f6bca 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -154,6 +154,8 @@ local_ip = 127.0.0.1 local_port = 22 # limit bandwidth for this proxy, unit is KB and MB bandwidth_limit = 1MB +# where to limit bandwidth, can be 'client' or 'server', default is 'client' +bandwidth_limit_mode = client # true or false, if true, messages between frps and frpc will be encrypted, default is false use_encryption = false # if true, message will be compressed diff --git a/doc/server_plugin.md b/doc/server_plugin.md index d73d243..f4e377f 100644 --- a/doc/server_plugin.md +++ b/doc/server_plugin.md @@ -110,6 +110,8 @@ Create new proxy "proxy_type": , "use_encryption": , "use_compression": , + "bandwidth_limit": , + "bandwidth_limit_mode": , "group": , "group_key": , diff --git a/pkg/config/client_test.go b/pkg/config/client_test.go index 9cf8c80..79eef5f 100644 --- a/pkg/config/client_test.go +++ b/pkg/config/client_test.go @@ -74,6 +74,7 @@ var testClientBytesWithFull = []byte(` local_ip = 127.0.0.9 local_port = 29 bandwidth_limit = 19MB + bandwidth_limit_mode = server use_encryption use_compression remote_port = 6009 @@ -309,13 +310,14 @@ func Test_LoadClientBasicConf(t *testing.T) { proxyExpected := map[string]ProxyConf{ testUser + ".ssh": &TCPProxyConf{ BaseProxyConf: BaseProxyConf{ - ProxyName: testUser + ".ssh", - ProxyType: consts.TCPProxy, - UseCompression: true, - UseEncryption: true, - Group: "test_group", - GroupKey: "123456", - BandwidthLimit: MustBandwidthQuantity("19MB"), + ProxyName: testUser + ".ssh", + ProxyType: consts.TCPProxy, + UseCompression: true, + UseEncryption: true, + Group: "test_group", + GroupKey: "123456", + BandwidthLimit: MustBandwidthQuantity("19MB"), + BandwidthLimitMode: BandwidthLimitModeServer, Metas: map[string]string{ "var1": "123", "var2": "234", @@ -342,6 +344,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "127.0.0.9", LocalPort: 29, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 9, }, @@ -353,6 +356,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "127.0.0.9", LocalPort: 6010, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6010, }, @@ -364,6 +368,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "127.0.0.9", LocalPort: 6011, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6011, }, @@ -375,6 +380,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "127.0.0.9", LocalPort: 6019, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6019, }, @@ -388,6 +394,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "114.114.114.114", LocalPort: 59, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6009, }, @@ -401,6 +408,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "114.114.114.114", LocalPort: 6000, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6000, }, @@ -414,6 +422,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "114.114.114.114", LocalPort: 6010, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6010, }, @@ -427,6 +436,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "114.114.114.114", LocalPort: 6011, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6011, }, @@ -447,6 +457,7 @@ func Test_LoadClientBasicConf(t *testing.T) { HealthCheckIntervalS: 19, HealthCheckURL: "http://127.0.0.9:89/status", }, + BandwidthLimitMode: BandwidthLimitModeClient, }, DomainConf: DomainConf{ CustomDomains: []string{"web02.yourdomain.com"}, @@ -471,6 +482,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalPort: 8009, }, ProxyProtocolVersion: "v2", + BandwidthLimitMode: BandwidthLimitModeClient, }, DomainConf: DomainConf{ CustomDomains: []string{"web02.yourdomain.com"}, @@ -485,6 +497,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "127.0.0.1", LocalPort: 22, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, Role: "server", Sk: "abcdefg", @@ -497,6 +510,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "127.0.0.1", LocalPort: 22, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, Role: "server", Sk: "abcdefg", @@ -509,6 +523,7 @@ func Test_LoadClientBasicConf(t *testing.T) { LocalIP: "127.0.0.1", LocalPort: 10701, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, DomainConf: DomainConf{ CustomDomains: []string{"tunnel1"}, @@ -527,6 +542,7 @@ func Test_LoadClientBasicConf(t *testing.T) { "plugin_unix_path": "/var/run/docker.sock", }, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6003, }, @@ -542,6 +558,7 @@ func Test_LoadClientBasicConf(t *testing.T) { "plugin_http_passwd": "abc", }, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6004, }, @@ -557,6 +574,7 @@ func Test_LoadClientBasicConf(t *testing.T) { "plugin_passwd": "abc", }, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6005, }, @@ -574,6 +592,7 @@ func Test_LoadClientBasicConf(t *testing.T) { "plugin_http_passwd": "abc", }, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6006, }, @@ -592,6 +611,7 @@ func Test_LoadClientBasicConf(t *testing.T) { "plugin_header_X-From-Where": "frp", }, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, DomainConf: DomainConf{ CustomDomains: []string{"test.yourdomain.com"}, @@ -610,6 +630,7 @@ func Test_LoadClientBasicConf(t *testing.T) { "plugin_header_X-From-Where": "frp", }, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, DomainConf: DomainConf{ CustomDomains: []string{"test.yourdomain.com"}, diff --git a/pkg/config/proxy.go b/pkg/config/proxy.go index 92b499f..3419f48 100644 --- a/pkg/config/proxy.go +++ b/pkg/config/proxy.go @@ -141,6 +141,10 @@ type BaseProxyConf struct { // BandwidthLimit limit the bandwidth // 0 means no limit BandwidthLimit BandwidthQuantity `ini:"bandwidth_limit" json:"bandwidth_limit"` + // BandwidthLimitMode specifies whether to limit the bandwidth on the + // client or server side. Valid values include "client" and "server". + // By default, this value is "client". + BandwidthLimitMode string `ini:"bandwidth_limit_mode" json:"bandwidth_limit_mode"` // meta info for each proxy Metas map[string]string `ini:"-" json:"metas"` @@ -319,6 +323,7 @@ func defaultBaseProxyConf(proxyType string) BaseProxyConf { LocalSvrConf: LocalSvrConf{ LocalIP: "127.0.0.1", }, + BandwidthLimitMode: BandwidthLimitModeClient, } } @@ -335,6 +340,7 @@ func (cfg *BaseProxyConf) compare(cmp *BaseProxyConf) bool { cfg.GroupKey != cmp.GroupKey || cfg.ProxyProtocolVersion != cmp.ProxyProtocolVersion || !cfg.BandwidthLimit.Equal(&cmp.BandwidthLimit) || + cfg.BandwidthLimitMode != cmp.BandwidthLimitMode || !reflect.DeepEqual(cfg.Metas, cmp.Metas) { return false } @@ -389,6 +395,8 @@ func (cfg *BaseProxyConf) marshalToMsg(pMsg *msg.NewProxy) { pMsg.ProxyType = cfg.ProxyType pMsg.UseEncryption = cfg.UseEncryption pMsg.UseCompression = cfg.UseCompression + pMsg.BandwidthLimit = cfg.BandwidthLimit.String() + pMsg.BandwidthLimitMode = cfg.BandwidthLimitMode pMsg.Group = cfg.Group pMsg.GroupKey = cfg.GroupKey pMsg.Metas = cfg.Metas @@ -399,6 +407,8 @@ func (cfg *BaseProxyConf) unmarshalFromMsg(pMsg *msg.NewProxy) { cfg.ProxyType = pMsg.ProxyType cfg.UseEncryption = pMsg.UseEncryption cfg.UseCompression = pMsg.UseCompression + cfg.BandwidthLimit, _ = NewBandwidthQuantity(pMsg.BandwidthLimit) + cfg.BandwidthLimitMode = pMsg.BandwidthLimitMode cfg.Group = pMsg.Group cfg.GroupKey = pMsg.GroupKey cfg.Metas = pMsg.Metas @@ -411,6 +421,10 @@ func (cfg *BaseProxyConf) checkForCli() (err error) { } } + if cfg.BandwidthLimitMode != "client" && cfg.BandwidthLimitMode != "server" { + return fmt.Errorf("bandwidth_limit_mode should be client or server") + } + if err = cfg.LocalSvrConf.checkForCli(); err != nil { return } @@ -420,6 +434,13 @@ func (cfg *BaseProxyConf) checkForCli() (err error) { return nil } +func (cfg *BaseProxyConf) checkForSvr() (err error) { + if cfg.BandwidthLimitMode != "client" && cfg.BandwidthLimitMode != "server" { + return fmt.Errorf("bandwidth_limit_mode should be client or server") + } + return nil +} + // DomainConf func (cfg *DomainConf) check() (err error) { if len(cfg.CustomDomains) == 0 && cfg.SubDomain == "" { @@ -557,6 +578,9 @@ func (cfg *TCPProxyConf) CheckForCli() (err error) { } func (cfg *TCPProxyConf) CheckForSvr(serverCfg ServerCommonConf) error { + if err := cfg.BaseProxyConf.checkForSvr(); err != nil { + return err + } return nil } @@ -632,6 +656,10 @@ func (cfg *TCPMuxProxyConf) CheckForCli() (err error) { } func (cfg *TCPMuxProxyConf) CheckForSvr(serverCfg ServerCommonConf) (err error) { + if err := cfg.BaseProxyConf.checkForSvr(); err != nil { + return err + } + if cfg.Multiplexer != consts.HTTPConnectTCPMultiplexer { return fmt.Errorf("proxy [%s] incorrect multiplexer [%s]", cfg.ProxyName, cfg.Multiplexer) } @@ -703,6 +731,9 @@ func (cfg *UDPProxyConf) CheckForCli() (err error) { } func (cfg *UDPProxyConf) CheckForSvr(serverCfg ServerCommonConf) error { + if err := cfg.BaseProxyConf.checkForSvr(); err != nil { + return err + } return nil } @@ -788,6 +819,10 @@ func (cfg *HTTPProxyConf) CheckForCli() (err error) { } func (cfg *HTTPProxyConf) CheckForSvr(serverCfg ServerCommonConf) (err error) { + if err := cfg.BaseProxyConf.checkForSvr(); err != nil { + return err + } + if serverCfg.VhostHTTPPort == 0 { return fmt.Errorf("type [http] not support when vhost_http_port is not set") } @@ -860,6 +895,10 @@ func (cfg *HTTPSProxyConf) CheckForCli() (err error) { } func (cfg *HTTPSProxyConf) CheckForSvr(serverCfg ServerCommonConf) (err error) { + if err := cfg.BaseProxyConf.checkForSvr(); err != nil { + return err + } + if serverCfg.VhostHTTPSPort == 0 { return fmt.Errorf("type [https] not support when vhost_https_port is not set") } @@ -932,6 +971,9 @@ func (cfg *SUDPProxyConf) CheckForCli() (err error) { } func (cfg *SUDPProxyConf) CheckForSvr(serverCfg ServerCommonConf) error { + if err := cfg.BaseProxyConf.checkForSvr(); err != nil { + return err + } return nil } @@ -998,6 +1040,9 @@ func (cfg *STCPProxyConf) CheckForCli() (err error) { } func (cfg *STCPProxyConf) CheckForSvr(serverCfg ServerCommonConf) error { + if err := cfg.BaseProxyConf.checkForSvr(); err != nil { + return err + } return nil } @@ -1064,5 +1109,8 @@ func (cfg *XTCPProxyConf) CheckForCli() (err error) { } func (cfg *XTCPProxyConf) CheckForSvr(serverCfg ServerCommonConf) error { + if err := cfg.BaseProxyConf.checkForSvr(); err != nil { + return err + } return nil } diff --git a/pkg/config/proxy_test.go b/pkg/config/proxy_test.go index c603dd7..894d6fd 100644 --- a/pkg/config/proxy_test.go +++ b/pkg/config/proxy_test.go @@ -58,6 +58,7 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { local_ip = 127.0.0.9 local_port = 29 bandwidth_limit = 19MB + bandwidth_limit_mode = server use_encryption use_compression remote_port = 6009 @@ -71,13 +72,14 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { meta_var2 = 234`), expected: &TCPProxyConf{ BaseProxyConf: BaseProxyConf{ - ProxyName: testProxyPrefix + "ssh", - ProxyType: consts.TCPProxy, - UseCompression: true, - UseEncryption: true, - Group: "test_group", - GroupKey: "123456", - BandwidthLimit: MustBandwidthQuantity("19MB"), + ProxyName: testProxyPrefix + "ssh", + ProxyType: consts.TCPProxy, + UseCompression: true, + UseEncryption: true, + Group: "test_group", + GroupKey: "123456", + BandwidthLimit: MustBandwidthQuantity("19MB"), + BandwidthLimitMode: BandwidthLimitModeServer, Metas: map[string]string{ "var1": "123", "var2": "234", @@ -114,6 +116,7 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { LocalIP: "127.0.0.9", LocalPort: 29, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 9, }, @@ -139,6 +142,7 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { LocalIP: "114.114.114.114", LocalPort: 59, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6009, }, @@ -182,6 +186,7 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { HealthCheckIntervalS: 19, HealthCheckURL: "http://127.0.0.9:89/status", }, + BandwidthLimitMode: BandwidthLimitModeClient, }, DomainConf: DomainConf{ CustomDomains: []string{"web02.yourdomain.com"}, @@ -220,6 +225,7 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { LocalPort: 8009, }, ProxyProtocolVersion: "v2", + BandwidthLimitMode: BandwidthLimitModeClient, }, DomainConf: DomainConf{ CustomDomains: []string{"web02.yourdomain.com"}, @@ -246,6 +252,7 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { LocalIP: "127.0.0.1", LocalPort: 22, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, Role: "server", Sk: "abcdefg", @@ -270,6 +277,7 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { LocalIP: "127.0.0.1", LocalPort: 22, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, Role: "server", Sk: "abcdefg", @@ -293,6 +301,7 @@ func Test_Proxy_UnmarshalFromIni(t *testing.T) { LocalIP: "127.0.0.1", LocalPort: 10701, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, DomainConf: DomainConf{ CustomDomains: []string{"tunnel1"}, @@ -347,6 +356,7 @@ func Test_RangeProxy_UnmarshalFromIni(t *testing.T) { LocalIP: "127.0.0.9", LocalPort: 6010, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6010, }, @@ -358,6 +368,7 @@ func Test_RangeProxy_UnmarshalFromIni(t *testing.T) { LocalIP: "127.0.0.9", LocalPort: 6011, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6011, }, @@ -369,6 +380,7 @@ func Test_RangeProxy_UnmarshalFromIni(t *testing.T) { LocalIP: "127.0.0.9", LocalPort: 6019, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6019, }, @@ -396,6 +408,7 @@ func Test_RangeProxy_UnmarshalFromIni(t *testing.T) { LocalIP: "114.114.114.114", LocalPort: 6000, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6000, }, @@ -409,6 +422,7 @@ func Test_RangeProxy_UnmarshalFromIni(t *testing.T) { LocalIP: "114.114.114.114", LocalPort: 6010, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6010, }, @@ -422,6 +436,7 @@ func Test_RangeProxy_UnmarshalFromIni(t *testing.T) { LocalIP: "114.114.114.114", LocalPort: 6011, }, + BandwidthLimitMode: BandwidthLimitModeClient, }, RemotePort: 6011, }, diff --git a/pkg/config/types.go b/pkg/config/types.go index 28a0e46..7aefee1 100644 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -24,6 +24,9 @@ import ( const ( MB = 1024 * 1024 KB = 1024 + + BandwidthLimitModeClient = "client" + BandwidthLimitModeServer = "server" ) type BandwidthQuantity struct { diff --git a/pkg/msg/msg.go b/pkg/msg/msg.go index 4b2823c..33e8fe5 100644 --- a/pkg/msg/msg.go +++ b/pkg/msg/msg.go @@ -14,7 +14,9 @@ package msg -import "net" +import ( + "net" +) const ( TypeLogin = 'o' @@ -83,13 +85,15 @@ type LoginResp struct { // When frpc login success, send this message to frps for running a new proxy. type NewProxy struct { - ProxyName string `json:"proxy_name,omitempty"` - ProxyType string `json:"proxy_type,omitempty"` - UseEncryption bool `json:"use_encryption,omitempty"` - UseCompression bool `json:"use_compression,omitempty"` - Group string `json:"group,omitempty"` - GroupKey string `json:"group_key,omitempty"` - Metas map[string]string `json:"metas,omitempty"` + ProxyName string `json:"proxy_name,omitempty"` + ProxyType string `json:"proxy_type,omitempty"` + UseEncryption bool `json:"use_encryption,omitempty"` + UseCompression bool `json:"use_compression,omitempty"` + BandwidthLimit string `json:"bandwidth_limit,omitempty"` + BandwidthLimitMode string `json:"bandwidth_limit_mode,omitempty"` + Group string `json:"group,omitempty"` + GroupKey string `json:"group_key,omitempty"` + Metas map[string]string `json:"metas,omitempty"` // tcp and udp only RemotePort int `json:"remote_port,omitempty"` diff --git a/server/proxy/http.go b/server/proxy/http.go index e70cb65..0f25c8c 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -20,8 +20,10 @@ import ( "strings" frpIo "github.com/fatedier/golib/io" + "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" + "github.com/fatedier/frp/pkg/util/limit" frpNet "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/vhost" @@ -135,6 +137,10 @@ func (pxy *HTTPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *HTTPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err error) { xl := pxy.xl rAddr, errRet := net.ResolveTCPAddr("tcp", remoteAddr) @@ -160,6 +166,13 @@ func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err if pxy.cfg.UseCompression { rwc = frpIo.WithCompression(rwc) } + + if pxy.GetLimiter() != nil { + rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error { + return rwc.Close() + }) + } + workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn) workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn) metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType) diff --git a/server/proxy/https.go b/server/proxy/https.go index 42ecf35..1fb579e 100644 --- a/server/proxy/https.go +++ b/server/proxy/https.go @@ -17,6 +17,8 @@ package proxy import ( "strings" + "golang.org/x/time/rate" + "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/util/util" "github.com/fatedier/frp/pkg/util/vhost" @@ -74,6 +76,10 @@ func (pxy *HTTPSProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *HTTPSProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *HTTPSProxy) Close() { pxy.BaseProxy.Close() } diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 808d931..0681370 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -24,10 +24,12 @@ import ( "time" frpIo "github.com/fatedier/golib/io" + "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/msg" plugin "github.com/fatedier/frp/pkg/plugin/server" + "github.com/fatedier/frp/pkg/util/limit" frpNet "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/pkg/util/xlog" "github.com/fatedier/frp/server/controller" @@ -45,6 +47,7 @@ type Proxy interface { GetUsedPortsNum() int GetResourceController() *controller.ResourceController GetUserInfo() plugin.UserInfo + GetLimiter() *rate.Limiter Close() } @@ -56,6 +59,7 @@ type BaseProxy struct { poolCount int getWorkConnFn GetWorkConnFn serverCfg config.ServerCommonConf + limiter *rate.Limiter userInfo plugin.UserInfo mu sync.RWMutex @@ -187,6 +191,13 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, ) (pxy Proxy, err error) { xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName) + + var limiter *rate.Limiter + limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes() + if limitBytes > 0 && pxyConf.GetBaseInfo().BandwidthLimitMode == config.BandwidthLimitModeServer { + limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes)) + } + basePxy := BaseProxy{ name: pxyConf.GetBaseInfo().ProxyName, rc: rc, @@ -194,6 +205,7 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso poolCount: poolCount, getWorkConnFn: getWorkConnFn, serverCfg: serverCfg, + limiter: limiter, xl: xl, ctx: xlog.NewContext(ctx, xl), userInfo: userInfo, @@ -287,6 +299,13 @@ func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv if cfg.UseCompression { local = frpIo.WithCompression(local) } + + if pxy.GetLimiter() != nil { + local = frpIo.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error { + return local.Close() + }) + } + xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) diff --git a/server/proxy/stcp.go b/server/proxy/stcp.go index 5ac47ea..2ece405 100644 --- a/server/proxy/stcp.go +++ b/server/proxy/stcp.go @@ -15,6 +15,8 @@ package proxy import ( + "golang.org/x/time/rate" + "github.com/fatedier/frp/pkg/config" ) @@ -41,6 +43,10 @@ func (pxy *STCPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *STCPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *STCPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.VisitorManager.CloseListener(pxy.GetName()) diff --git a/server/proxy/sudp.go b/server/proxy/sudp.go index c4dba6d..93707f2 100644 --- a/server/proxy/sudp.go +++ b/server/proxy/sudp.go @@ -15,6 +15,8 @@ package proxy import ( + "golang.org/x/time/rate" + "github.com/fatedier/frp/pkg/config" ) @@ -42,6 +44,10 @@ func (pxy *SUDPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *SUDPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *SUDPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.VisitorManager.CloseListener(pxy.GetName()) diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index 0cf9c5f..1ba0fb1 100644 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -19,6 +19,8 @@ import ( "net" "strconv" + "golang.org/x/time/rate" + "github.com/fatedier/frp/pkg/config" ) @@ -74,6 +76,10 @@ func (pxy *TCPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *TCPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *TCPProxy) Close() { pxy.BaseProxy.Close() if pxy.cfg.Group == "" { diff --git a/server/proxy/tcpmux.go b/server/proxy/tcpmux.go index b812e60..4b413c3 100644 --- a/server/proxy/tcpmux.go +++ b/server/proxy/tcpmux.go @@ -19,6 +19,8 @@ import ( "net" "strings" + "golang.org/x/time/rate" + "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/consts" "github.com/fatedier/frp/pkg/util/util" @@ -94,6 +96,10 @@ func (pxy *TCPMuxProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *TCPMuxProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *TCPMuxProxy) Close() { pxy.BaseProxy.Close() } diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 5386554..3a136c3 100644 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -24,10 +24,12 @@ import ( "github.com/fatedier/golib/errors" frpIo "github.com/fatedier/golib/io" + "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/msg" "github.com/fatedier/frp/pkg/proto/udp" + "github.com/fatedier/frp/pkg/util/limit" frpNet "github.com/fatedier/frp/pkg/util/net" "github.com/fatedier/frp/server/metrics" ) @@ -198,6 +200,12 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { rwc = frpIo.WithCompression(rwc) } + if pxy.GetLimiter() != nil { + rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error { + return rwc.Close() + }) + } + pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn) ctx, cancel := context.WithCancel(context.Background()) go workConnReaderFn(pxy.workConn) @@ -225,6 +233,10 @@ func (pxy *UDPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *UDPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *UDPProxy) Close() { pxy.mu.Lock() defer pxy.mu.Unlock() diff --git a/server/proxy/xtcp.go b/server/proxy/xtcp.go index a1b45d5..b6c7be3 100644 --- a/server/proxy/xtcp.go +++ b/server/proxy/xtcp.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/fatedier/golib/errors" + "golang.org/x/time/rate" "github.com/fatedier/frp/pkg/config" "github.com/fatedier/frp/pkg/msg" @@ -88,6 +89,10 @@ func (pxy *XTCPProxy) GetConf() config.ProxyConf { return pxy.cfg } +func (pxy *XTCPProxy) GetLimiter() *rate.Limiter { + return pxy.limiter +} + func (pxy *XTCPProxy) Close() { pxy.BaseProxy.Close() pxy.rc.NatHoleController.CloseClient(pxy.GetName())