From cd9ac1bac71c4089b0a0ca8d6fa8449052356a76 Mon Sep 17 00:00:00 2001 From: Vigilans Date: Thu, 24 Sep 2020 10:24:26 +0800 Subject: [PATCH] API: Implements RoutingService --- app/router/command/command.go | 90 +++++ app/router/command/command.pb.go | 525 +++++++++++++++++++++++++ app/router/command/command.proto | 59 +++ app/router/command/command_grpc.pb.go | 154 ++++++++ app/router/command/command_test.go | 334 ++++++++++++++++ app/router/command/config.go | 94 +++++ app/router/command/errors.generated.go | 9 + 7 files changed, 1265 insertions(+) create mode 100644 app/router/command/command.go create mode 100644 app/router/command/command.pb.go create mode 100644 app/router/command/command.proto create mode 100644 app/router/command/command_grpc.pb.go create mode 100644 app/router/command/command_test.go create mode 100644 app/router/command/config.go create mode 100644 app/router/command/errors.generated.go diff --git a/app/router/command/command.go b/app/router/command/command.go new file mode 100644 index 00000000..6add0441 --- /dev/null +++ b/app/router/command/command.go @@ -0,0 +1,90 @@ +// +build !confonly + +package command + +//go:generate errorgen + +import ( + "context" + + "google.golang.org/grpc" + + "v2ray.com/core" + "v2ray.com/core/common" + "v2ray.com/core/features/routing" + "v2ray.com/core/features/stats" +) + +// routingServer is an implementation of RoutingService. +type routingServer struct { + router routing.Router + routingStats stats.Channel +} + +// NewRoutingServer creates a statistics service with statistics manager. +func NewRoutingServer(router routing.Router, routingStats stats.Channel) RoutingServiceServer { + return &routingServer{ + router: router, + routingStats: routingStats, + } +} + +func (s *routingServer) TestRoute(ctx context.Context, request *TestRouteRequest) (*RoutingContext, error) { + if request.RoutingContext == nil { + return nil, newError("Invalid routing request.") + } + route, err := s.router.PickRoute(AsRoutingContext(request.RoutingContext)) + if err != nil { + return nil, err + } + if request.PublishResult && s.routingStats != nil { + s.routingStats.Publish(route) + } + return AsProtobufMessage(request.FieldSelectors)(route), nil +} + +func (s *routingServer) SubscribeRoutingStats(request *SubscribeRoutingStatsRequest, stream RoutingService_SubscribeRoutingStatsServer) error { + if s.routingStats == nil { + return newError("Routing statistics not enabled.") + } + genMessage := AsProtobufMessage(request.FieldSelectors) + subscriber, err := stats.SubscribeRunnableChannel(s.routingStats) + if err != nil { + return err + } + defer stats.UnsubscribeClosableChannel(s.routingStats, subscriber) // nolint: errcheck + for { + select { + case value, received := <-subscriber: + route, ok := value.(routing.Route) + if !(received && ok) { + return newError("Receiving upstream statistics failed.") + } + err := stream.Send(genMessage(route)) + if err != nil { + return err + } + case <-stream.Context().Done(): + return stream.Context().Err() + } + } +} + +func (s *routingServer) mustEmbedUnimplementedRoutingServiceServer() {} + +type service struct { + v *core.Instance +} + +func (s *service) Register(server *grpc.Server) { + common.Must(s.v.RequireFeatures(func(router routing.Router, stats stats.Manager) { + RegisterRoutingServiceServer(server, NewRoutingServer(router, nil)) + })) +} + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) { + s := core.MustFromContext(ctx) + return &service{v: s}, nil + })) +} diff --git a/app/router/command/command.pb.go b/app/router/command/command.pb.go new file mode 100644 index 00000000..2c3691b2 --- /dev/null +++ b/app/router/command/command.pb.go @@ -0,0 +1,525 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.13.0 +// source: app/router/command/command.proto + +package command + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + net "v2ray.com/core/common/net" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +// RoutingContext is the context with information relative to routing process. +// It conforms to the structure of v2ray.core.features.routing.Context and v2ray.core.features.routing.Route. +type RoutingContext struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InboundTag string `protobuf:"bytes,1,opt,name=InboundTag,proto3" json:"InboundTag,omitempty"` + Network net.Network `protobuf:"varint,2,opt,name=Network,proto3,enum=v2ray.core.common.net.Network" json:"Network,omitempty"` + SourceIPs [][]byte `protobuf:"bytes,3,rep,name=SourceIPs,proto3" json:"SourceIPs,omitempty"` + TargetIPs [][]byte `protobuf:"bytes,4,rep,name=TargetIPs,proto3" json:"TargetIPs,omitempty"` + SourcePort uint32 `protobuf:"varint,5,opt,name=SourcePort,proto3" json:"SourcePort,omitempty"` + TargetPort uint32 `protobuf:"varint,6,opt,name=TargetPort,proto3" json:"TargetPort,omitempty"` + TargetDomain string `protobuf:"bytes,7,opt,name=TargetDomain,proto3" json:"TargetDomain,omitempty"` + Protocol string `protobuf:"bytes,8,opt,name=Protocol,proto3" json:"Protocol,omitempty"` + User string `protobuf:"bytes,9,opt,name=User,proto3" json:"User,omitempty"` + Attributes map[string]string `protobuf:"bytes,10,rep,name=Attributes,proto3" json:"Attributes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + OutboundGroupTags []string `protobuf:"bytes,11,rep,name=OutboundGroupTags,proto3" json:"OutboundGroupTags,omitempty"` + OutboundTag string `protobuf:"bytes,12,opt,name=OutboundTag,proto3" json:"OutboundTag,omitempty"` +} + +func (x *RoutingContext) Reset() { + *x = RoutingContext{} + if protoimpl.UnsafeEnabled { + mi := &file_app_router_command_command_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RoutingContext) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RoutingContext) ProtoMessage() {} + +func (x *RoutingContext) ProtoReflect() protoreflect.Message { + mi := &file_app_router_command_command_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RoutingContext.ProtoReflect.Descriptor instead. +func (*RoutingContext) Descriptor() ([]byte, []int) { + return file_app_router_command_command_proto_rawDescGZIP(), []int{0} +} + +func (x *RoutingContext) GetInboundTag() string { + if x != nil { + return x.InboundTag + } + return "" +} + +func (x *RoutingContext) GetNetwork() net.Network { + if x != nil { + return x.Network + } + return net.Network_Unknown +} + +func (x *RoutingContext) GetSourceIPs() [][]byte { + if x != nil { + return x.SourceIPs + } + return nil +} + +func (x *RoutingContext) GetTargetIPs() [][]byte { + if x != nil { + return x.TargetIPs + } + return nil +} + +func (x *RoutingContext) GetSourcePort() uint32 { + if x != nil { + return x.SourcePort + } + return 0 +} + +func (x *RoutingContext) GetTargetPort() uint32 { + if x != nil { + return x.TargetPort + } + return 0 +} + +func (x *RoutingContext) GetTargetDomain() string { + if x != nil { + return x.TargetDomain + } + return "" +} + +func (x *RoutingContext) GetProtocol() string { + if x != nil { + return x.Protocol + } + return "" +} + +func (x *RoutingContext) GetUser() string { + if x != nil { + return x.User + } + return "" +} + +func (x *RoutingContext) GetAttributes() map[string]string { + if x != nil { + return x.Attributes + } + return nil +} + +func (x *RoutingContext) GetOutboundGroupTags() []string { + if x != nil { + return x.OutboundGroupTags + } + return nil +} + +func (x *RoutingContext) GetOutboundTag() string { + if x != nil { + return x.OutboundTag + } + return "" +} + +// SubscribeRoutingStatsRequest subscribes to routing statistics channel if opened by v2ray-core. +// * FieldSelectors selects a subset of fields in routing statistics to return. Valid selectors: +// - inbound: Selects connection's inbound tag. +// - network: Selects connection's network. +// - ip: Equivalent as "ip_source" and "ip_target", selects both source and target IP. +// - port: Equivalent as "port_source" and "port_target", selects both source and target port. +// - domain: Selects target domain. +// - protocol: Select connection's protocol. +// - user: Select connection's inbound user email. +// - attributes: Select connection's additional attributes. +// - outbound: Equivalent as "outbound" and "outbound_group", select both outbound tag and outbound group tags. +// * If FieldSelectors is left empty, all fields will be returned. +type SubscribeRoutingStatsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FieldSelectors []string `protobuf:"bytes,1,rep,name=FieldSelectors,proto3" json:"FieldSelectors,omitempty"` +} + +func (x *SubscribeRoutingStatsRequest) Reset() { + *x = SubscribeRoutingStatsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_app_router_command_command_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRoutingStatsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRoutingStatsRequest) ProtoMessage() {} + +func (x *SubscribeRoutingStatsRequest) ProtoReflect() protoreflect.Message { + mi := &file_app_router_command_command_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRoutingStatsRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRoutingStatsRequest) Descriptor() ([]byte, []int) { + return file_app_router_command_command_proto_rawDescGZIP(), []int{1} +} + +func (x *SubscribeRoutingStatsRequest) GetFieldSelectors() []string { + if x != nil { + return x.FieldSelectors + } + return nil +} + +// TestRouteRequest manually tests a routing result according to the routing context message. +// * RoutingContext is the routing message without outbound information. +// * FieldSelectors selects the fields to return in the routing result. All fields are returned if left empty. +// * PublishResult broadcasts the routing result to routing statistics channel if set true. +type TestRouteRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + RoutingContext *RoutingContext `protobuf:"bytes,1,opt,name=RoutingContext,proto3" json:"RoutingContext,omitempty"` + FieldSelectors []string `protobuf:"bytes,2,rep,name=FieldSelectors,proto3" json:"FieldSelectors,omitempty"` + PublishResult bool `protobuf:"varint,3,opt,name=PublishResult,proto3" json:"PublishResult,omitempty"` +} + +func (x *TestRouteRequest) Reset() { + *x = TestRouteRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_app_router_command_command_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestRouteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestRouteRequest) ProtoMessage() {} + +func (x *TestRouteRequest) ProtoReflect() protoreflect.Message { + mi := &file_app_router_command_command_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestRouteRequest.ProtoReflect.Descriptor instead. +func (*TestRouteRequest) Descriptor() ([]byte, []int) { + return file_app_router_command_command_proto_rawDescGZIP(), []int{2} +} + +func (x *TestRouteRequest) GetRoutingContext() *RoutingContext { + if x != nil { + return x.RoutingContext + } + return nil +} + +func (x *TestRouteRequest) GetFieldSelectors() []string { + if x != nil { + return x.FieldSelectors + } + return nil +} + +func (x *TestRouteRequest) GetPublishResult() bool { + if x != nil { + return x.PublishResult + } + return false +} + +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_app_router_command_command_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_app_router_command_command_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_app_router_command_command_proto_rawDescGZIP(), []int{3} +} + +var File_app_router_command_command_proto protoreflect.FileDescriptor + +var file_app_router_command_command_proto_rawDesc = []byte{ + 0x0a, 0x20, 0x61, 0x70, 0x70, 0x2f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x1d, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, + 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x1a, 0x18, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x6e, 0x65, 0x74, 0x2f, 0x6e, 0x65, + 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa8, 0x04, 0x0a, 0x0e, + 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x1e, + 0x0a, 0x0a, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0a, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x12, 0x38, + 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x1e, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x63, 0x6f, 0x6d, + 0x6d, 0x6f, 0x6e, 0x2e, 0x6e, 0x65, 0x74, 0x2e, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x52, + 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x1c, 0x0a, 0x09, 0x53, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x49, 0x50, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x09, 0x53, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x49, 0x50, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, + 0x49, 0x50, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x09, 0x54, 0x61, 0x72, 0x67, 0x65, + 0x74, 0x49, 0x50, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x6f, + 0x72, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x50, 0x6f, + 0x72, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, + 0x50, 0x6f, 0x72, 0x74, 0x12, 0x22, 0x0a, 0x0c, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x44, 0x6f, + 0x6d, 0x61, 0x69, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x54, 0x61, 0x72, 0x67, + 0x65, 0x74, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x55, 0x73, 0x65, 0x72, 0x18, 0x09, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x55, 0x73, 0x65, 0x72, 0x12, 0x5d, 0x0a, 0x0a, 0x41, 0x74, 0x74, 0x72, + 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x76, + 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, + 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x52, 0x6f, 0x75, + 0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x2e, 0x41, 0x74, 0x74, 0x72, + 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x41, 0x74, 0x74, + 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x11, 0x4f, 0x75, 0x74, 0x62, 0x6f, + 0x75, 0x6e, 0x64, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x54, 0x61, 0x67, 0x73, 0x18, 0x0b, 0x20, 0x03, + 0x28, 0x09, 0x52, 0x11, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x47, 0x72, 0x6f, 0x75, + 0x70, 0x54, 0x61, 0x67, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, + 0x64, 0x54, 0x61, 0x67, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x4f, 0x75, 0x74, 0x62, + 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x1a, 0x3d, 0x0a, 0x0f, 0x41, 0x74, 0x74, 0x72, 0x69, + 0x62, 0x75, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x46, 0x0a, 0x1c, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x53, + 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, + 0x46, 0x69, 0x65, 0x6c, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x22, 0xb7, + 0x01, 0x0a, 0x10, 0x54, 0x65, 0x73, 0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x55, 0x0a, 0x0e, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, + 0x6e, 0x74, 0x65, 0x78, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x76, 0x32, + 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x52, 0x6f, 0x75, 0x74, + 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x0e, 0x52, 0x6f, 0x75, 0x74, + 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x46, 0x69, + 0x65, 0x6c, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x09, 0x52, 0x0e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, + 0x72, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, + 0x73, 0x68, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x08, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x32, 0x89, 0x02, 0x0a, 0x0e, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x87, 0x01, 0x0a, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, + 0x3b, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, + 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, + 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, + 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x76, + 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, + 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x52, 0x6f, 0x75, + 0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, + 0x6d, 0x0a, 0x09, 0x54, 0x65, 0x73, 0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x12, 0x2f, 0x2e, 0x76, + 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, + 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x65, 0x73, + 0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, + 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, + 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x52, 0x6f, + 0x75, 0x74, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x22, 0x00, 0x42, 0x68, + 0x0a, 0x21, 0x63, 0x6f, 0x6d, 0x2e, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, + 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, + 0x61, 0x6e, 0x64, 0x50, 0x01, 0x5a, 0x21, 0x76, 0x32, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, + 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0xaa, 0x02, 0x1d, 0x56, 0x32, 0x52, 0x61, 0x79, + 0x2e, 0x43, 0x6f, 0x72, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x72, + 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_app_router_command_command_proto_rawDescOnce sync.Once + file_app_router_command_command_proto_rawDescData = file_app_router_command_command_proto_rawDesc +) + +func file_app_router_command_command_proto_rawDescGZIP() []byte { + file_app_router_command_command_proto_rawDescOnce.Do(func() { + file_app_router_command_command_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_router_command_command_proto_rawDescData) + }) + return file_app_router_command_command_proto_rawDescData +} + +var file_app_router_command_command_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_app_router_command_command_proto_goTypes = []interface{}{ + (*RoutingContext)(nil), // 0: v2ray.core.app.router.command.RoutingContext + (*SubscribeRoutingStatsRequest)(nil), // 1: v2ray.core.app.router.command.SubscribeRoutingStatsRequest + (*TestRouteRequest)(nil), // 2: v2ray.core.app.router.command.TestRouteRequest + (*Config)(nil), // 3: v2ray.core.app.router.command.Config + nil, // 4: v2ray.core.app.router.command.RoutingContext.AttributesEntry + (net.Network)(0), // 5: v2ray.core.common.net.Network +} +var file_app_router_command_command_proto_depIdxs = []int32{ + 5, // 0: v2ray.core.app.router.command.RoutingContext.Network:type_name -> v2ray.core.common.net.Network + 4, // 1: v2ray.core.app.router.command.RoutingContext.Attributes:type_name -> v2ray.core.app.router.command.RoutingContext.AttributesEntry + 0, // 2: v2ray.core.app.router.command.TestRouteRequest.RoutingContext:type_name -> v2ray.core.app.router.command.RoutingContext + 1, // 3: v2ray.core.app.router.command.RoutingService.SubscribeRoutingStats:input_type -> v2ray.core.app.router.command.SubscribeRoutingStatsRequest + 2, // 4: v2ray.core.app.router.command.RoutingService.TestRoute:input_type -> v2ray.core.app.router.command.TestRouteRequest + 0, // 5: v2ray.core.app.router.command.RoutingService.SubscribeRoutingStats:output_type -> v2ray.core.app.router.command.RoutingContext + 0, // 6: v2ray.core.app.router.command.RoutingService.TestRoute:output_type -> v2ray.core.app.router.command.RoutingContext + 5, // [5:7] is the sub-list for method output_type + 3, // [3:5] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_app_router_command_command_proto_init() } +func file_app_router_command_command_proto_init() { + if File_app_router_command_command_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_app_router_command_command_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RoutingContext); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_router_command_command_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRoutingStatsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_router_command_command_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestRouteRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_router_command_command_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_app_router_command_command_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_app_router_command_command_proto_goTypes, + DependencyIndexes: file_app_router_command_command_proto_depIdxs, + MessageInfos: file_app_router_command_command_proto_msgTypes, + }.Build() + File_app_router_command_command_proto = out.File + file_app_router_command_command_proto_rawDesc = nil + file_app_router_command_command_proto_goTypes = nil + file_app_router_command_command_proto_depIdxs = nil +} diff --git a/app/router/command/command.proto b/app/router/command/command.proto new file mode 100644 index 00000000..84210a2e --- /dev/null +++ b/app/router/command/command.proto @@ -0,0 +1,59 @@ +syntax = "proto3"; + +package v2ray.core.app.router.command; +option csharp_namespace = "V2Ray.Core.App.Router.Command"; +option go_package = "v2ray.com/core/app/router/command"; +option java_package = "com.v2ray.core.app.router.command"; +option java_multiple_files = true; + +import "common/net/network.proto"; + +// RoutingContext is the context with information relative to routing process. +// It conforms to the structure of v2ray.core.features.routing.Context and v2ray.core.features.routing.Route. +message RoutingContext { + string InboundTag = 1; + v2ray.core.common.net.Network Network = 2; + repeated bytes SourceIPs = 3; + repeated bytes TargetIPs = 4; + uint32 SourcePort = 5; + uint32 TargetPort = 6; + string TargetDomain = 7; + string Protocol = 8; + string User = 9; + map Attributes = 10; + repeated string OutboundGroupTags = 11; + string OutboundTag = 12; +} + +// SubscribeRoutingStatsRequest subscribes to routing statistics channel if opened by v2ray-core. +// * FieldSelectors selects a subset of fields in routing statistics to return. Valid selectors: +// - inbound: Selects connection's inbound tag. +// - network: Selects connection's network. +// - ip: Equivalent as "ip_source" and "ip_target", selects both source and target IP. +// - port: Equivalent as "port_source" and "port_target", selects both source and target port. +// - domain: Selects target domain. +// - protocol: Select connection's protocol. +// - user: Select connection's inbound user email. +// - attributes: Select connection's additional attributes. +// - outbound: Equivalent as "outbound" and "outbound_group", select both outbound tag and outbound group tags. +// * If FieldSelectors is left empty, all fields will be returned. +message SubscribeRoutingStatsRequest { + repeated string FieldSelectors = 1; +} + +// TestRouteRequest manually tests a routing result according to the routing context message. +// * RoutingContext is the routing message without outbound information. +// * FieldSelectors selects the fields to return in the routing result. All fields are returned if left empty. +// * PublishResult broadcasts the routing result to routing statistics channel if set true. +message TestRouteRequest { + RoutingContext RoutingContext = 1; + repeated string FieldSelectors = 2; + bool PublishResult = 3; +} + +service RoutingService { + rpc SubscribeRoutingStats(SubscribeRoutingStatsRequest) returns (stream RoutingContext) {} + rpc TestRoute(TestRouteRequest) returns (RoutingContext) {} +} + +message Config {} diff --git a/app/router/command/command_grpc.pb.go b/app/router/command/command_grpc.pb.go new file mode 100644 index 00000000..7b51b2cc --- /dev/null +++ b/app/router/command/command_grpc.pb.go @@ -0,0 +1,154 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package command + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion6 + +// RoutingServiceClient is the client API for RoutingService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type RoutingServiceClient interface { + SubscribeRoutingStats(ctx context.Context, in *SubscribeRoutingStatsRequest, opts ...grpc.CallOption) (RoutingService_SubscribeRoutingStatsClient, error) + TestRoute(ctx context.Context, in *TestRouteRequest, opts ...grpc.CallOption) (*RoutingContext, error) +} + +type routingServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewRoutingServiceClient(cc grpc.ClientConnInterface) RoutingServiceClient { + return &routingServiceClient{cc} +} + +func (c *routingServiceClient) SubscribeRoutingStats(ctx context.Context, in *SubscribeRoutingStatsRequest, opts ...grpc.CallOption) (RoutingService_SubscribeRoutingStatsClient, error) { + stream, err := c.cc.NewStream(ctx, &_RoutingService_serviceDesc.Streams[0], "/v2ray.core.app.router.command.RoutingService/SubscribeRoutingStats", opts...) + if err != nil { + return nil, err + } + x := &routingServiceSubscribeRoutingStatsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type RoutingService_SubscribeRoutingStatsClient interface { + Recv() (*RoutingContext, error) + grpc.ClientStream +} + +type routingServiceSubscribeRoutingStatsClient struct { + grpc.ClientStream +} + +func (x *routingServiceSubscribeRoutingStatsClient) Recv() (*RoutingContext, error) { + m := new(RoutingContext) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *routingServiceClient) TestRoute(ctx context.Context, in *TestRouteRequest, opts ...grpc.CallOption) (*RoutingContext, error) { + out := new(RoutingContext) + err := c.cc.Invoke(ctx, "/v2ray.core.app.router.command.RoutingService/TestRoute", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RoutingServiceServer is the server API for RoutingService service. +// All implementations must embed UnimplementedRoutingServiceServer +// for forward compatibility +type RoutingServiceServer interface { + SubscribeRoutingStats(*SubscribeRoutingStatsRequest, RoutingService_SubscribeRoutingStatsServer) error + TestRoute(context.Context, *TestRouteRequest) (*RoutingContext, error) + mustEmbedUnimplementedRoutingServiceServer() +} + +// UnimplementedRoutingServiceServer must be embedded to have forward compatible implementations. +type UnimplementedRoutingServiceServer struct { +} + +func (*UnimplementedRoutingServiceServer) SubscribeRoutingStats(*SubscribeRoutingStatsRequest, RoutingService_SubscribeRoutingStatsServer) error { + return status.Errorf(codes.Unimplemented, "method SubscribeRoutingStats not implemented") +} +func (*UnimplementedRoutingServiceServer) TestRoute(context.Context, *TestRouteRequest) (*RoutingContext, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestRoute not implemented") +} +func (*UnimplementedRoutingServiceServer) mustEmbedUnimplementedRoutingServiceServer() {} + +func RegisterRoutingServiceServer(s *grpc.Server, srv RoutingServiceServer) { + s.RegisterService(&_RoutingService_serviceDesc, srv) +} + +func _RoutingService_SubscribeRoutingStats_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscribeRoutingStatsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(RoutingServiceServer).SubscribeRoutingStats(m, &routingServiceSubscribeRoutingStatsServer{stream}) +} + +type RoutingService_SubscribeRoutingStatsServer interface { + Send(*RoutingContext) error + grpc.ServerStream +} + +type routingServiceSubscribeRoutingStatsServer struct { + grpc.ServerStream +} + +func (x *routingServiceSubscribeRoutingStatsServer) Send(m *RoutingContext) error { + return x.ServerStream.SendMsg(m) +} + +func _RoutingService_TestRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TestRouteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RoutingServiceServer).TestRoute(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v2ray.core.app.router.command.RoutingService/TestRoute", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RoutingServiceServer).TestRoute(ctx, req.(*TestRouteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _RoutingService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "v2ray.core.app.router.command.RoutingService", + HandlerType: (*RoutingServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "TestRoute", + Handler: _RoutingService_TestRoute_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "SubscribeRoutingStats", + Handler: _RoutingService_SubscribeRoutingStats_Handler, + ServerStreams: true, + }, + }, + Metadata: "app/router/command/command.proto", +} diff --git a/app/router/command/command_test.go b/app/router/command/command_test.go new file mode 100644 index 00000000..d9fcf585 --- /dev/null +++ b/app/router/command/command_test.go @@ -0,0 +1,334 @@ +package command_test + +import ( + "context" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" + "v2ray.com/core/app/router" + . "v2ray.com/core/app/router/command" + "v2ray.com/core/app/stats" + "v2ray.com/core/common" + "v2ray.com/core/common/net" + "v2ray.com/core/features/routing" + "v2ray.com/core/testing/mocks" +) + +func TestServiceSubscribeRoutingStats(t *testing.T) { + c := stats.NewChannel(&stats.ChannelConfig{ + SubscriberLimit: 1, + BufferSize: 16, + BroadcastTimeout: 100, + }) + common.Must(c.Start()) + defer c.Close() + + lis := bufconn.Listen(1024 * 1024) + bufDialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + testCases := []*RoutingContext{ + {InboundTag: "in", OutboundTag: "out"}, + {TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"}, + {TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"}, + {SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"}, + {Network: net.Network_UDP, OutboundGroupTags: []string{"outergroup", "innergroup"}, OutboundTag: "out"}, + {Protocol: "bittorrent", OutboundTag: "blocked"}, + {User: "example@v2fly.org", OutboundTag: "out"}, + {SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"}, + } + errCh := make(chan error) + nextPub := make(chan struct{}) + + // Server goroutine + go func() { + server := grpc.NewServer() + RegisterRoutingServiceServer(server, NewRoutingServer(nil, c)) + errCh <- server.Serve(lis) + }() + + // Publisher goroutine + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + for { // Wait until there's one subscriber in routing stats channel + if len(c.Subscribers()) > 0 { + break + } + if ctx.Err() != nil { + errCh <- ctx.Err() + } + } + for _, tc := range testCases { + c.Publish(AsRoutingRoute(tc)) + } + + // Wait for next round of publishing + <-nextPub + + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + for { // Wait until there's one subscriber in routing stats channel + if len(c.Subscribers()) > 0 { + break + } + if ctx.Err() != nil { + errCh <- ctx.Err() + } + } + for _, tc := range testCases { + c.Publish(AsRoutingRoute(tc)) + } + }() + + // Client goroutine + go func() { + conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) + if err != nil { + errCh <- err + } + defer lis.Close() + defer conn.Close() + client := NewRoutingServiceClient(conn) + + // Test retrieving all fields + streamCtx, streamClose := context.WithCancel(context.Background()) + stream, err := client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{}) + if err != nil { + errCh <- err + } + + for _, tc := range testCases { + msg, err := stream.Recv() + if err != nil { + errCh <- err + } + if r := cmp.Diff(msg, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" { + t.Error(r) + } + } + + // Test that double subscription will fail + errStream, err := client.SubscribeRoutingStats(context.Background(), &SubscribeRoutingStatsRequest{ + FieldSelectors: []string{"ip", "port", "domain", "outbound"}, + }) + if err != nil { + errCh <- err + } + if _, err := errStream.Recv(); err == nil { + t.Error("unexpected successful subscription") + } + + // Test the unsubscription of stream works well + streamClose() + timeOutCtx, timeout := context.WithTimeout(context.Background(), time.Second) + defer timeout() + for { // Wait until there's no subscriber in routing stats channel + if len(c.Subscribers()) == 0 { + break + } + if timeOutCtx.Err() != nil { + t.Error("unexpected subscribers not decreased in channel") + errCh <- timeOutCtx.Err() + } + } + + // Test retrieving only a subset of fields + streamCtx, streamClose = context.WithCancel(context.Background()) + stream, err = client.SubscribeRoutingStats(streamCtx, &SubscribeRoutingStatsRequest{ + FieldSelectors: []string{"ip", "port", "domain", "outbound"}, + }) + if err != nil { + errCh <- err + } + + close(nextPub) // Send nextPub signal to start next round of publishing + for _, tc := range testCases { + msg, err := stream.Recv() + stat := &RoutingContext{ // Only a subset of stats is retrieved + SourceIPs: tc.SourceIPs, + TargetIPs: tc.TargetIPs, + SourcePort: tc.SourcePort, + TargetPort: tc.TargetPort, + TargetDomain: tc.TargetDomain, + OutboundGroupTags: tc.OutboundGroupTags, + OutboundTag: tc.OutboundTag, + } + if err != nil { + errCh <- err + } + if r := cmp.Diff(msg, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" { + t.Error(r) + } + } + streamClose() + + // Client passed all tests successfully + errCh <- nil + }() + + // Wait for goroutines to complete + select { + case <-time.After(2 * time.Second): + t.Fatal("Test timeout after 2s") + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + } +} + +func TestSerivceTestRoute(t *testing.T) { + c := stats.NewChannel(&stats.ChannelConfig{ + SubscriberLimit: 1, + BufferSize: 16, + BroadcastTimeout: 100, + }) + common.Must(c.Start()) + defer c.Close() + + r := new(router.Router) + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + common.Must(r.Init(&router.Config{ + Rule: []*router.RoutingRule{ + { + InboundTag: []string{"in"}, + TargetTag: &router.RoutingRule_Tag{Tag: "out"}, + }, + { + Protocol: []string{"bittorrent"}, + TargetTag: &router.RoutingRule_Tag{Tag: "blocked"}, + }, + { + PortList: &net.PortList{Range: []*net.PortRange{{From: 8080, To: 8080}}}, + TargetTag: &router.RoutingRule_Tag{Tag: "out"}, + }, + { + SourcePortList: &net.PortList{Range: []*net.PortRange{{From: 9999, To: 9999}}}, + TargetTag: &router.RoutingRule_Tag{Tag: "out"}, + }, + { + Domain: []*router.Domain{{Type: router.Domain_Domain, Value: "com"}}, + TargetTag: &router.RoutingRule_Tag{Tag: "out"}, + }, + { + SourceGeoip: []*router.GeoIP{{CountryCode: "private", Cidr: []*router.CIDR{{Ip: []byte{127, 0, 0, 0}, Prefix: 8}}}}, + TargetTag: &router.RoutingRule_Tag{Tag: "out"}, + }, + { + UserEmail: []string{"example@v2fly.org"}, + TargetTag: &router.RoutingRule_Tag{Tag: "out"}, + }, + { + Networks: []net.Network{net.Network_UDP, net.Network_TCP}, + TargetTag: &router.RoutingRule_Tag{Tag: "out"}, + }, + }, + }, mocks.NewDNSClient(mockCtl), mocks.NewOutboundManager(mockCtl))) + + lis := bufconn.Listen(1024 * 1024) + bufDialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + errCh := make(chan error) + + // Server goroutine + go func() { + server := grpc.NewServer() + RegisterRoutingServiceServer(server, NewRoutingServer(r, c)) + errCh <- server.Serve(lis) + }() + + // Client goroutine + go func() { + conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) + if err != nil { + errCh <- err + } + defer lis.Close() + defer conn.Close() + client := NewRoutingServiceClient(conn) + + testCases := []*RoutingContext{ + {InboundTag: "in", OutboundTag: "out"}, + {TargetIPs: [][]byte{{1, 2, 3, 4}}, TargetPort: 8080, OutboundTag: "out"}, + {TargetDomain: "example.com", TargetPort: 443, OutboundTag: "out"}, + {SourcePort: 9999, TargetPort: 9999, OutboundTag: "out"}, + {Network: net.Network_UDP, Protocol: "bittorrent", OutboundTag: "blocked"}, + {User: "example@v2fly.org", OutboundTag: "out"}, + {SourceIPs: [][]byte{{127, 0, 0, 1}}, Attributes: map[string]string{"attr": "value"}, OutboundTag: "out"}, + } + + // Test simple TestRoute + for _, tc := range testCases { + route, err := client.TestRoute(context.Background(), &TestRouteRequest{RoutingContext: tc}) + if err != nil { + errCh <- err + } + if r := cmp.Diff(route, tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" { + t.Error(r) + } + } + + // Test TestRoute with special options + sub, err := c.Subscribe() + if err != nil { + errCh <- err + } + for _, tc := range testCases { + route, err := client.TestRoute(context.Background(), &TestRouteRequest{ + RoutingContext: tc, + FieldSelectors: []string{"ip", "port", "domain", "outbound"}, + PublishResult: true, + }) + stat := &RoutingContext{ // Only a subset of stats is retrieved + SourceIPs: tc.SourceIPs, + TargetIPs: tc.TargetIPs, + SourcePort: tc.SourcePort, + TargetPort: tc.TargetPort, + TargetDomain: tc.TargetDomain, + OutboundGroupTags: tc.OutboundGroupTags, + OutboundTag: tc.OutboundTag, + } + if err != nil { + errCh <- err + } + if r := cmp.Diff(route, stat, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" { + t.Error(r) + } + select { // Check that routing result has been published to statistics channel + case msg, received := <-sub: + if route, ok := msg.(routing.Route); received && ok { + if r := cmp.Diff(AsProtobufMessage(nil)(route), tc, cmpopts.IgnoreUnexported(RoutingContext{})); r != "" { + t.Error(r) + } + } else { + t.Error("unexpected failure in receiving published routing result") + } + case <-time.After(100 * time.Millisecond): + t.Error("unexpected failure in receiving published routing result") + } + } + + // Client passed all tests successfully + errCh <- nil + }() + + // Wait for goroutines to complete + select { + case <-time.After(2 * time.Second): + t.Fatal("Test timeout after 2s") + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + } +} diff --git a/app/router/command/config.go b/app/router/command/config.go new file mode 100644 index 00000000..1385f296 --- /dev/null +++ b/app/router/command/config.go @@ -0,0 +1,94 @@ +package command + +import ( + "strings" + + "v2ray.com/core/common/net" + "v2ray.com/core/features/routing" +) + +// routingContext is an wrapper of protobuf RoutingContext as implementation of routing.Context and routing.Route. +type routingContext struct { + *RoutingContext +} + +func (c routingContext) GetSourceIPs() []net.IP { + return mapBytesToIPs(c.RoutingContext.GetSourceIPs()) +} + +func (c routingContext) GetSourcePort() net.Port { + return net.Port(c.RoutingContext.GetSourcePort()) +} + +func (c routingContext) GetTargetIPs() []net.IP { + return mapBytesToIPs(c.RoutingContext.GetTargetIPs()) +} + +func (c routingContext) GetTargetPort() net.Port { + return net.Port(c.RoutingContext.GetTargetPort()) +} + +// AsRoutingContext converts a protobuf RoutingContext into an implementation of routing.Context. +func AsRoutingContext(r *RoutingContext) routing.Context { + return routingContext{r} +} + +// AsRoutingRoute converts a protobuf RoutingContext into an implementation of routing.Route. +func AsRoutingRoute(r *RoutingContext) routing.Route { + return routingContext{r} +} + +var fieldMap = map[string]func(*RoutingContext, routing.Route){ + "inbound": func(s *RoutingContext, r routing.Route) { s.InboundTag = r.GetInboundTag() }, + "network": func(s *RoutingContext, r routing.Route) { s.Network = r.GetNetwork() }, + "ip_source": func(s *RoutingContext, r routing.Route) { s.SourceIPs = mapIPsToBytes(r.GetSourceIPs()) }, + "ip_target": func(s *RoutingContext, r routing.Route) { s.TargetIPs = mapIPsToBytes(r.GetTargetIPs()) }, + "port_source": func(s *RoutingContext, r routing.Route) { s.SourcePort = uint32(r.GetSourcePort()) }, + "port_target": func(s *RoutingContext, r routing.Route) { s.TargetPort = uint32(r.GetTargetPort()) }, + "domain": func(s *RoutingContext, r routing.Route) { s.TargetDomain = r.GetTargetDomain() }, + "protocol": func(s *RoutingContext, r routing.Route) { s.Protocol = r.GetProtocol() }, + "user": func(s *RoutingContext, r routing.Route) { s.User = r.GetUser() }, + "attributes": func(s *RoutingContext, r routing.Route) { s.Attributes = r.GetAttributes() }, + "outbound_group": func(s *RoutingContext, r routing.Route) { s.OutboundGroupTags = r.GetOutboundGroupTags() }, + "outbound": func(s *RoutingContext, r routing.Route) { s.OutboundTag = r.GetOutboundTag() }, +} + +// AsProtobufMessage takes selectors of fields and returns a function to convert routing.Route to protobuf RoutingContext. +func AsProtobufMessage(fieldSelectors []string) func(routing.Route) *RoutingContext { + initializers := []func(*RoutingContext, routing.Route){} + for field, init := range fieldMap { + if len(fieldSelectors) == 0 { // If selectors not set, retrieve all fields + initializers = append(initializers, init) + continue + } + for _, selector := range fieldSelectors { + if strings.HasPrefix(field, selector) { + initializers = append(initializers, init) + break + } + } + } + return func(ctx routing.Route) *RoutingContext { + message := new(RoutingContext) + for _, init := range initializers { + init(message, ctx) + } + return message + } +} + +func mapBytesToIPs(bytes [][]byte) []net.IP { + var ips []net.IP + for _, rawIP := range bytes { + ips = append(ips, net.IP(rawIP)) + } + return ips +} + +func mapIPsToBytes(ips []net.IP) [][]byte { + var bytes [][]byte + for _, ip := range ips { + bytes = append(bytes, []byte(ip)) + } + return bytes +} diff --git a/app/router/command/errors.generated.go b/app/router/command/errors.generated.go new file mode 100644 index 00000000..66f78051 --- /dev/null +++ b/app/router/command/errors.generated.go @@ -0,0 +1,9 @@ +package command + +import "v2ray.com/core/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +}