mirror of https://github.com/k3s-io/k3s
186 lines
7.3 KiB
Go
186 lines
7.3 KiB
Go
|
package grpc_prometheus
|
||
|
|
||
|
import (
|
||
|
prom "github.com/prometheus/client_golang/prometheus"
|
||
|
"golang.org/x/net/context"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/status"
|
||
|
)
|
||
|
|
||
|
// ServerMetrics represents a collection of metrics to be registered on a
|
||
|
// Prometheus metrics registry for a gRPC server.
|
||
|
type ServerMetrics struct {
|
||
|
serverStartedCounter *prom.CounterVec
|
||
|
serverHandledCounter *prom.CounterVec
|
||
|
serverStreamMsgReceived *prom.CounterVec
|
||
|
serverStreamMsgSent *prom.CounterVec
|
||
|
serverHandledHistogramEnabled bool
|
||
|
serverHandledHistogramOpts prom.HistogramOpts
|
||
|
serverHandledHistogram *prom.HistogramVec
|
||
|
}
|
||
|
|
||
|
// NewServerMetrics returns a ServerMetrics object. Use a new instance of
|
||
|
// ServerMetrics when not using the default Prometheus metrics registry, for
|
||
|
// example when wanting to control which metrics are added to a registry as
|
||
|
// opposed to automatically adding metrics via init functions.
|
||
|
func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
|
||
|
opts := counterOptions(counterOpts)
|
||
|
return &ServerMetrics{
|
||
|
serverStartedCounter: prom.NewCounterVec(
|
||
|
opts.apply(prom.CounterOpts{
|
||
|
Name: "grpc_server_started_total",
|
||
|
Help: "Total number of RPCs started on the server.",
|
||
|
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||
|
serverHandledCounter: prom.NewCounterVec(
|
||
|
opts.apply(prom.CounterOpts{
|
||
|
Name: "grpc_server_handled_total",
|
||
|
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
|
||
|
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
|
||
|
serverStreamMsgReceived: prom.NewCounterVec(
|
||
|
opts.apply(prom.CounterOpts{
|
||
|
Name: "grpc_server_msg_received_total",
|
||
|
Help: "Total number of RPC stream messages received on the server.",
|
||
|
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||
|
serverStreamMsgSent: prom.NewCounterVec(
|
||
|
opts.apply(prom.CounterOpts{
|
||
|
Name: "grpc_server_msg_sent_total",
|
||
|
Help: "Total number of gRPC stream messages sent by the server.",
|
||
|
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||
|
serverHandledHistogramEnabled: false,
|
||
|
serverHandledHistogramOpts: prom.HistogramOpts{
|
||
|
Name: "grpc_server_handling_seconds",
|
||
|
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
|
||
|
Buckets: prom.DefBuckets,
|
||
|
},
|
||
|
serverHandledHistogram: nil,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// EnableHandlingTimeHistogram enables histograms being registered when
|
||
|
// registering the ServerMetrics on a Prometheus registry. Histograms can be
|
||
|
// expensive on Prometheus servers. It takes options to configure histogram
|
||
|
// options such as the defined buckets.
|
||
|
func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
|
||
|
for _, o := range opts {
|
||
|
o(&m.serverHandledHistogramOpts)
|
||
|
}
|
||
|
if !m.serverHandledHistogramEnabled {
|
||
|
m.serverHandledHistogram = prom.NewHistogramVec(
|
||
|
m.serverHandledHistogramOpts,
|
||
|
[]string{"grpc_type", "grpc_service", "grpc_method"},
|
||
|
)
|
||
|
}
|
||
|
m.serverHandledHistogramEnabled = true
|
||
|
}
|
||
|
|
||
|
// Describe sends the super-set of all possible descriptors of metrics
|
||
|
// collected by this Collector to the provided channel and returns once
|
||
|
// the last descriptor has been sent.
|
||
|
func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
|
||
|
m.serverStartedCounter.Describe(ch)
|
||
|
m.serverHandledCounter.Describe(ch)
|
||
|
m.serverStreamMsgReceived.Describe(ch)
|
||
|
m.serverStreamMsgSent.Describe(ch)
|
||
|
if m.serverHandledHistogramEnabled {
|
||
|
m.serverHandledHistogram.Describe(ch)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Collect is called by the Prometheus registry when collecting
|
||
|
// metrics. The implementation sends each collected metric via the
|
||
|
// provided channel and returns once the last metric has been sent.
|
||
|
func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
|
||
|
m.serverStartedCounter.Collect(ch)
|
||
|
m.serverHandledCounter.Collect(ch)
|
||
|
m.serverStreamMsgReceived.Collect(ch)
|
||
|
m.serverStreamMsgSent.Collect(ch)
|
||
|
if m.serverHandledHistogramEnabled {
|
||
|
m.serverHandledHistogram.Collect(ch)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
||
|
func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||
|
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||
|
monitor := newServerReporter(m, Unary, info.FullMethod)
|
||
|
monitor.ReceivedMessage()
|
||
|
resp, err := handler(ctx, req)
|
||
|
st, _ := status.FromError(err)
|
||
|
monitor.Handled(st.Code())
|
||
|
if err == nil {
|
||
|
monitor.SentMessage()
|
||
|
}
|
||
|
return resp, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
||
|
func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||
|
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||
|
monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
|
||
|
err := handler(srv, &monitoredServerStream{ss, monitor})
|
||
|
st, _ := status.FromError(err)
|
||
|
monitor.Handled(st.Code())
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// InitializeMetrics initializes all metrics, with their appropriate null
|
||
|
// value, for all gRPC methods registered on a gRPC server. This is useful, to
|
||
|
// ensure that all metrics exist when collecting and querying.
|
||
|
func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
|
||
|
serviceInfo := server.GetServiceInfo()
|
||
|
for serviceName, info := range serviceInfo {
|
||
|
for _, mInfo := range info.Methods {
|
||
|
preRegisterMethod(m, serviceName, &mInfo)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func streamRPCType(info *grpc.StreamServerInfo) grpcType {
|
||
|
if info.IsClientStream && !info.IsServerStream {
|
||
|
return ClientStream
|
||
|
} else if !info.IsClientStream && info.IsServerStream {
|
||
|
return ServerStream
|
||
|
}
|
||
|
return BidiStream
|
||
|
}
|
||
|
|
||
|
// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
|
||
|
type monitoredServerStream struct {
|
||
|
grpc.ServerStream
|
||
|
monitor *serverReporter
|
||
|
}
|
||
|
|
||
|
func (s *monitoredServerStream) SendMsg(m interface{}) error {
|
||
|
err := s.ServerStream.SendMsg(m)
|
||
|
if err == nil {
|
||
|
s.monitor.SentMessage()
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
|
||
|
err := s.ServerStream.RecvMsg(m)
|
||
|
if err == nil {
|
||
|
s.monitor.ReceivedMessage()
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
|
||
|
func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
|
||
|
methodName := mInfo.Name
|
||
|
methodType := string(typeFromMethodInfo(mInfo))
|
||
|
// These are just references (no increments), as just referencing will create the labels but not set values.
|
||
|
metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||
|
metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||
|
metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||
|
if metrics.serverHandledHistogramEnabled {
|
||
|
metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||
|
}
|
||
|
for _, code := range allCodes {
|
||
|
metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
|
||
|
}
|
||
|
}
|