grpc for QueryStats

pull/1524/head^2 v3.30
Darien Raymond 2018-07-10 23:40:58 +02:00
parent 7efa7ee632
commit b288b3c773
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
5 changed files with 312 additions and 29 deletions

View File

@ -6,14 +6,22 @@ import (
"context"
grpc "google.golang.org/grpc"
"v2ray.com/core"
"v2ray.com/core/app/stats"
"v2ray.com/core/common"
"v2ray.com/core/common/strmatcher"
)
// statsServer is an implementation of StatsService.
type statsServer struct {
stats core.StatManager
}
func NewStatsServer(manager core.StatManager) StatsServiceServer {
return &statsServer{stats: manager}
}
func (s *statsServer) GetStats(ctx context.Context, request *GetStatsRequest) (*GetStatsResponse, error) {
c := s.stats.GetCounter(request.Name)
if c == nil {
@ -33,14 +41,44 @@ func (s *statsServer) GetStats(ctx context.Context, request *GetStatsRequest) (*
}, nil
}
func (s *statsServer) QueryStats(ctx context.Context, request *QueryStatsRequest) (*QueryStatsResponse, error) {
matcher, err := strmatcher.Substr.New(request.Pattern)
if err != nil {
return nil, err
}
response := &QueryStatsResponse{}
manager, ok := s.stats.(*stats.Manager)
if !ok {
return nil, newError("QueryStats only works its own stats.Manager.")
}
manager.Visit(func(name string, c core.StatCounter) bool {
if matcher.Match(name) {
var value int64
if request.Reset_ {
value = c.Set(0)
} else {
value = c.Value()
}
response.Stat = append(response.Stat, &Stat{
Name: name,
Value: value,
})
}
return true
})
return response, nil
}
type service struct {
v *core.Instance
}
func (s *service) Register(server *grpc.Server) {
RegisterStatsServiceServer(server, &statsServer{
stats: s.v.Stats(),
})
RegisterStatsServiceServer(server, NewStatsServer(s.v.Stats()))
}
func init() {

View File

@ -5,8 +5,7 @@ import fmt "fmt"
import math "math"
import (
"context"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
@ -35,7 +34,7 @@ func (m *GetStatsRequest) Reset() { *m = GetStatsRequest{} }
func (m *GetStatsRequest) String() string { return proto.CompactTextString(m) }
func (*GetStatsRequest) ProtoMessage() {}
func (*GetStatsRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_command_a956deb495595f76, []int{0}
return fileDescriptor_command_d2105e779ee1253c, []int{0}
}
func (m *GetStatsRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GetStatsRequest.Unmarshal(m, b)
@ -81,7 +80,7 @@ func (m *Stat) Reset() { *m = Stat{} }
func (m *Stat) String() string { return proto.CompactTextString(m) }
func (*Stat) ProtoMessage() {}
func (*Stat) Descriptor() ([]byte, []int) {
return fileDescriptor_command_a956deb495595f76, []int{1}
return fileDescriptor_command_d2105e779ee1253c, []int{1}
}
func (m *Stat) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Stat.Unmarshal(m, b)
@ -126,7 +125,7 @@ func (m *GetStatsResponse) Reset() { *m = GetStatsResponse{} }
func (m *GetStatsResponse) String() string { return proto.CompactTextString(m) }
func (*GetStatsResponse) ProtoMessage() {}
func (*GetStatsResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_command_a956deb495595f76, []int{2}
return fileDescriptor_command_d2105e779ee1253c, []int{2}
}
func (m *GetStatsResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GetStatsResponse.Unmarshal(m, b)
@ -153,6 +152,90 @@ func (m *GetStatsResponse) GetStat() *Stat {
return nil
}
type QueryStatsRequest struct {
Pattern string `protobuf:"bytes,1,opt,name=pattern,proto3" json:"pattern,omitempty"`
Reset_ bool `protobuf:"varint,2,opt,name=reset,proto3" json:"reset,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *QueryStatsRequest) Reset() { *m = QueryStatsRequest{} }
func (m *QueryStatsRequest) String() string { return proto.CompactTextString(m) }
func (*QueryStatsRequest) ProtoMessage() {}
func (*QueryStatsRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_command_d2105e779ee1253c, []int{3}
}
func (m *QueryStatsRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_QueryStatsRequest.Unmarshal(m, b)
}
func (m *QueryStatsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_QueryStatsRequest.Marshal(b, m, deterministic)
}
func (dst *QueryStatsRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_QueryStatsRequest.Merge(dst, src)
}
func (m *QueryStatsRequest) XXX_Size() int {
return xxx_messageInfo_QueryStatsRequest.Size(m)
}
func (m *QueryStatsRequest) XXX_DiscardUnknown() {
xxx_messageInfo_QueryStatsRequest.DiscardUnknown(m)
}
var xxx_messageInfo_QueryStatsRequest proto.InternalMessageInfo
func (m *QueryStatsRequest) GetPattern() string {
if m != nil {
return m.Pattern
}
return ""
}
func (m *QueryStatsRequest) GetReset_() bool {
if m != nil {
return m.Reset_
}
return false
}
type QueryStatsResponse struct {
Stat []*Stat `protobuf:"bytes,1,rep,name=stat,proto3" json:"stat,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *QueryStatsResponse) Reset() { *m = QueryStatsResponse{} }
func (m *QueryStatsResponse) String() string { return proto.CompactTextString(m) }
func (*QueryStatsResponse) ProtoMessage() {}
func (*QueryStatsResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_command_d2105e779ee1253c, []int{4}
}
func (m *QueryStatsResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_QueryStatsResponse.Unmarshal(m, b)
}
func (m *QueryStatsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_QueryStatsResponse.Marshal(b, m, deterministic)
}
func (dst *QueryStatsResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_QueryStatsResponse.Merge(dst, src)
}
func (m *QueryStatsResponse) XXX_Size() int {
return xxx_messageInfo_QueryStatsResponse.Size(m)
}
func (m *QueryStatsResponse) XXX_DiscardUnknown() {
xxx_messageInfo_QueryStatsResponse.DiscardUnknown(m)
}
var xxx_messageInfo_QueryStatsResponse proto.InternalMessageInfo
func (m *QueryStatsResponse) GetStat() []*Stat {
if m != nil {
return m.Stat
}
return nil
}
type Config struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@ -163,7 +246,7 @@ func (m *Config) Reset() { *m = Config{} }
func (m *Config) String() string { return proto.CompactTextString(m) }
func (*Config) ProtoMessage() {}
func (*Config) Descriptor() ([]byte, []int) {
return fileDescriptor_command_a956deb495595f76, []int{3}
return fileDescriptor_command_d2105e779ee1253c, []int{5}
}
func (m *Config) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Config.Unmarshal(m, b)
@ -187,6 +270,8 @@ func init() {
proto.RegisterType((*GetStatsRequest)(nil), "v2ray.core.app.stats.command.GetStatsRequest")
proto.RegisterType((*Stat)(nil), "v2ray.core.app.stats.command.Stat")
proto.RegisterType((*GetStatsResponse)(nil), "v2ray.core.app.stats.command.GetStatsResponse")
proto.RegisterType((*QueryStatsRequest)(nil), "v2ray.core.app.stats.command.QueryStatsRequest")
proto.RegisterType((*QueryStatsResponse)(nil), "v2ray.core.app.stats.command.QueryStatsResponse")
proto.RegisterType((*Config)(nil), "v2ray.core.app.stats.command.Config")
}
@ -203,6 +288,7 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StatsServiceClient interface {
GetStats(ctx context.Context, in *GetStatsRequest, opts ...grpc.CallOption) (*GetStatsResponse, error)
QueryStats(ctx context.Context, in *QueryStatsRequest, opts ...grpc.CallOption) (*QueryStatsResponse, error)
}
type statsServiceClient struct {
@ -222,9 +308,19 @@ func (c *statsServiceClient) GetStats(ctx context.Context, in *GetStatsRequest,
return out, nil
}
func (c *statsServiceClient) QueryStats(ctx context.Context, in *QueryStatsRequest, opts ...grpc.CallOption) (*QueryStatsResponse, error) {
out := new(QueryStatsResponse)
err := c.cc.Invoke(ctx, "/v2ray.core.app.stats.command.StatsService/QueryStats", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// StatsServiceServer is the server API for StatsService service.
type StatsServiceServer interface {
GetStats(context.Context, *GetStatsRequest) (*GetStatsResponse, error)
QueryStats(context.Context, *QueryStatsRequest) (*QueryStatsResponse, error)
}
func RegisterStatsServiceServer(s *grpc.Server, srv StatsServiceServer) {
@ -249,6 +345,24 @@ func _StatsService_GetStats_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _StatsService_QueryStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(QueryStatsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StatsServiceServer).QueryStats(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/v2ray.core.app.stats.command.StatsService/QueryStats",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StatsServiceServer).QueryStats(ctx, req.(*QueryStatsRequest))
}
return interceptor(ctx, in, info, handler)
}
var _StatsService_serviceDesc = grpc.ServiceDesc{
ServiceName: "v2ray.core.app.stats.command.StatsService",
HandlerType: (*StatsServiceServer)(nil),
@ -257,32 +371,40 @@ var _StatsService_serviceDesc = grpc.ServiceDesc{
MethodName: "GetStats",
Handler: _StatsService_GetStats_Handler,
},
{
MethodName: "QueryStats",
Handler: _StatsService_QueryStats_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "v2ray.com/core/app/stats/command/command.proto",
}
func init() {
proto.RegisterFile("v2ray.com/core/app/stats/command/command.proto", fileDescriptor_command_a956deb495595f76)
proto.RegisterFile("v2ray.com/core/app/stats/command/command.proto", fileDescriptor_command_d2105e779ee1253c)
}
var fileDescriptor_command_a956deb495595f76 = []byte{
// 267 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0x3f, 0x4b, 0x03, 0x31,
0x14, 0xc0, 0xbd, 0x5a, 0xeb, 0xf9, 0x14, 0x94, 0xe0, 0x50, 0xa4, 0xc3, 0x91, 0xa9, 0x8b, 0xef,
0xe4, 0x04, 0x17, 0x27, 0xbd, 0x41, 0x10, 0x07, 0x49, 0xc1, 0xc1, 0x2d, 0xc6, 0xa7, 0x14, 0xcd,
0x25, 0x26, 0xe9, 0x41, 0xf1, 0x1b, 0xf9, 0x29, 0x25, 0xb9, 0x1e, 0x82, 0xe0, 0xe1, 0x94, 0xf7,
0x92, 0xdf, 0xef, 0xfd, 0x21, 0x80, 0x6d, 0xe5, 0xe4, 0x1a, 0x95, 0xd1, 0xa5, 0x32, 0x8e, 0x4a,
0x69, 0x6d, 0xe9, 0x83, 0x0c, 0xbe, 0x54, 0x46, 0x6b, 0xd9, 0x3c, 0xf7, 0x27, 0x5a, 0x67, 0x82,
0x61, 0xb3, 0x9e, 0x77, 0x84, 0xd2, 0x5a, 0x4c, 0x2c, 0x6e, 0x18, 0x7e, 0x09, 0x87, 0x37, 0x14,
0x16, 0xf1, 0x4e, 0xd0, 0xc7, 0x8a, 0x7c, 0x60, 0x0c, 0xc6, 0x8d, 0xd4, 0x34, 0xcd, 0x8a, 0x6c,
0xbe, 0x27, 0x52, 0xcc, 0x8e, 0x61, 0xc7, 0x91, 0xa7, 0x30, 0x1d, 0x15, 0xd9, 0x3c, 0x17, 0x5d,
0xc2, 0xcf, 0x60, 0x1c, 0xcd, 0xbf, 0x8c, 0x56, 0xbe, 0xaf, 0x28, 0x19, 0xdb, 0xa2, 0x4b, 0xf8,
0x2d, 0x1c, 0xfd, 0xb4, 0xf3, 0xd6, 0x34, 0x9e, 0xd8, 0x05, 0x8c, 0xe3, 0x4c, 0xc9, 0xde, 0xaf,
0x38, 0x0e, 0xcd, 0x8b, 0x51, 0x15, 0x89, 0xe7, 0x39, 0x4c, 0x6a, 0xd3, 0xbc, 0x2c, 0x5f, 0xab,
0x4f, 0x38, 0x48, 0x25, 0x17, 0xe4, 0xda, 0xa5, 0x22, 0xf6, 0x06, 0x79, 0xdf, 0x85, 0x9d, 0x0e,
0xd7, 0xfb, 0xb5, 0xfc, 0x09, 0xfe, 0x17, 0xef, 0x86, 0xe7, 0x5b, 0xd7, 0x77, 0x50, 0x28, 0xa3,
0x07, 0xb5, 0xfb, 0xec, 0x71, 0x77, 0x13, 0x7e, 0x8d, 0x66, 0x0f, 0x95, 0x90, 0x6b, 0xac, 0x23,
0x79, 0x65, 0x6d, 0xda, 0xc8, 0x63, 0xdd, 0x3d, 0x3f, 0x4d, 0xd2, 0xa7, 0x9d, 0x7f, 0x07, 0x00,
0x00, 0xff, 0xff, 0x10, 0x3a, 0x8a, 0xf3, 0xe6, 0x01, 0x00, 0x00,
var fileDescriptor_command_d2105e779ee1253c = []byte{
// 321 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x92, 0xb1, 0x4e, 0xc3, 0x30,
0x10, 0x86, 0x49, 0x5b, 0xda, 0x72, 0x20, 0x01, 0x16, 0x43, 0x55, 0x75, 0x88, 0x3c, 0x75, 0xc1,
0xa9, 0x82, 0xc4, 0xc2, 0x04, 0x19, 0x90, 0x50, 0x07, 0x70, 0x25, 0x06, 0x36, 0x13, 0x0e, 0x54,
0x41, 0x62, 0xd7, 0x76, 0x22, 0xe5, 0x95, 0x78, 0x38, 0x9e, 0x01, 0xc5, 0x49, 0x54, 0xa0, 0x6a,
0x54, 0xa6, 0xdc, 0xc5, 0xff, 0x77, 0xf7, 0xdf, 0xd9, 0xc0, 0xf2, 0x50, 0x8b, 0x82, 0xc5, 0x32,
0x09, 0x62, 0xa9, 0x31, 0x10, 0x4a, 0x05, 0xc6, 0x0a, 0x6b, 0x82, 0x58, 0x26, 0x89, 0x48, 0x5f,
0x9a, 0x2f, 0x53, 0x5a, 0x5a, 0x49, 0x26, 0x8d, 0x5e, 0x23, 0x13, 0x4a, 0x31, 0xa7, 0x65, 0xb5,
0x86, 0x5e, 0xc1, 0xf1, 0x2d, 0xda, 0x45, 0xf9, 0x8f, 0xe3, 0x2a, 0x43, 0x63, 0x09, 0x81, 0x5e,
0x2a, 0x12, 0x1c, 0x79, 0xbe, 0x37, 0x3d, 0xe0, 0x2e, 0x26, 0x67, 0xb0, 0xaf, 0xd1, 0xa0, 0x1d,
0x75, 0x7c, 0x6f, 0x3a, 0xe4, 0x55, 0x42, 0x67, 0xd0, 0x2b, 0xc9, 0x6d, 0x44, 0x2e, 0x3e, 0x32,
0x74, 0x44, 0x97, 0x57, 0x09, 0xbd, 0x83, 0x93, 0x75, 0x3b, 0xa3, 0x64, 0x6a, 0x90, 0x5c, 0x42,
0xaf, 0xf4, 0xe4, 0xe8, 0xc3, 0x90, 0xb2, 0x36, 0xbf, 0xac, 0x44, 0xb9, 0xd3, 0xd3, 0x08, 0x4e,
0x1f, 0x32, 0xd4, 0xc5, 0x2f, 0xf3, 0x23, 0x18, 0x28, 0x61, 0x2d, 0xea, 0xb4, 0x76, 0xd3, 0xa4,
0x5b, 0x46, 0x98, 0x03, 0xf9, 0x59, 0x64, 0xc3, 0x52, 0xf7, 0x5f, 0x96, 0x86, 0xd0, 0x8f, 0x64,
0xfa, 0xba, 0x7c, 0x0b, 0xbf, 0x3c, 0x38, 0x72, 0x35, 0x17, 0xa8, 0xf3, 0x65, 0x8c, 0xe4, 0x1d,
0x86, 0xcd, 0xe4, 0xe4, 0xbc, 0xbd, 0xe0, 0x9f, 0x0b, 0x19, 0xb3, 0x5d, 0xe5, 0x95, 0x7b, 0xba,
0x47, 0x56, 0x00, 0xeb, 0xa9, 0x48, 0xd0, 0xce, 0x6f, 0x2c, 0x71, 0x3c, 0xdb, 0x1d, 0x68, 0x5a,
0xde, 0xcc, 0xc1, 0x8f, 0x65, 0xd2, 0x0a, 0xde, 0x7b, 0x4f, 0x83, 0x3a, 0xfc, 0xec, 0x4c, 0x1e,
0x43, 0x2e, 0x0a, 0x16, 0x95, 0xca, 0x6b, 0xa5, 0xdc, 0x16, 0x0d, 0x8b, 0xaa, 0xe3, 0xe7, 0xbe,
0x7b, 0xbb, 0x17, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xcc, 0x9e, 0xb8, 0xeb, 0xed, 0x02, 0x00,
0x00,
}

View File

@ -22,8 +22,18 @@ message GetStatsResponse {
Stat stat = 1;
}
message QueryStatsRequest {
string pattern = 1;
bool reset = 2;
}
message QueryStatsResponse {
repeated Stat stat = 1;
}
service StatsService {
rpc GetStats(GetStatsRequest) returns (GetStatsResponse) {}
rpc QueryStats(QueryStatsRequest) returns (QueryStatsResponse) {}
}
message Config {}

View File

@ -0,0 +1,102 @@
package command_test
import (
"testing"
context "golang.org/x/net/context"
"v2ray.com/core/app/stats"
. "v2ray.com/core/app/stats/command"
. "v2ray.com/ext/assert"
)
func TestGetStats(t *testing.T) {
assert := With(t)
m, err := stats.NewManager(context.Background(), &stats.Config{})
assert(err, IsNil)
sc, err := m.RegisterCounter("test_counter")
assert(err, IsNil)
sc.Set(1)
s := NewStatsServer(m)
testCases := []struct {
name string
reset bool
value int64
err bool
}{
{
name: "counterNotExist",
err: true,
},
{
name: "test_counter",
reset: true,
value: 1,
},
{
name: "test_counter",
value: 0,
},
}
for _, tc := range testCases {
resp, err := s.GetStats(context.Background(), &GetStatsRequest{
Name: tc.name,
Reset_: tc.reset,
})
if tc.err {
assert(err, IsNotNil)
} else {
assert(err, IsNil)
assert(resp.Stat.Name, Equals, tc.name)
assert(resp.Stat.Value, Equals, tc.value)
}
}
}
func TestQueryStats(t *testing.T) {
assert := With(t)
m, err := stats.NewManager(context.Background(), &stats.Config{})
assert(err, IsNil)
sc1, err := m.RegisterCounter("test_counter")
assert(err, IsNil)
sc1.Set(1)
sc2, err := m.RegisterCounter("test_counter_2")
assert(err, IsNil)
sc2.Set(2)
sc3, err := m.RegisterCounter("test_counter_3")
assert(err, IsNil)
sc3.Set(3)
s := NewStatsServer(m)
resp, err := s.QueryStats(context.Background(), &QueryStatsRequest{
Pattern: "counter_",
})
assert(err, IsNil)
assert(len(resp.Stat), Equals, 2)
v2 := false
v3 := false
for _, sc := range resp.Stat {
switch sc.Name {
case "test_counter_2":
assert(sc.Value, Equals, int64(2))
v2 = true
case "test_counter_3":
assert(sc.Value, Equals, int64(3))
v3 = true
default:
t.Error("unexpected stat name: ", sc.Name)
t.Fail()
}
}
assert(v2, IsTrue)
assert(v3, IsTrue)
}

View File

@ -74,6 +74,17 @@ func (m *Manager) GetCounter(name string) core.StatCounter {
return nil
}
func (m *Manager) Visit(visitor func(string, core.StatCounter) bool) {
m.access.RLock()
defer m.access.RUnlock()
for name, c := range m.counters {
if !visitor(name, c) {
break
}
}
}
// Start implements common.Runnable.
func (m *Manager) Start() error {
return nil