mirror of https://github.com/k3s-io/k3s
Fixing scheduling latency metrics
parent
d089901e46
commit
0e833bfc83
|
@ -21,6 +21,7 @@ go_library(
|
|||
"//pkg/scheduler/api:go_default_library",
|
||||
"//pkg/scheduler/api/latest:go_default_library",
|
||||
"//pkg/scheduler/factory:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//pkg/util/configz:go_default_library",
|
||||
"//pkg/util/flag:go_default_library",
|
||||
"//pkg/version:go_default_library",
|
||||
|
|
|
@ -19,6 +19,7 @@ package app
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -51,6 +52,7 @@ import (
|
|||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
latestschedulerapi "k8s.io/kubernetes/pkg/scheduler/api/latest"
|
||||
"k8s.io/kubernetes/pkg/scheduler/factory"
|
||||
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
utilflag "k8s.io/kubernetes/pkg/util/flag"
|
||||
"k8s.io/kubernetes/pkg/version"
|
||||
|
@ -221,11 +223,23 @@ func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz
|
|||
return handler
|
||||
}
|
||||
|
||||
func installMetricHandler(pathRecorderMux *mux.PathRecorderMux) {
|
||||
configz.InstallHandler(pathRecorderMux)
|
||||
defaultMetricsHandler := prometheus.Handler().ServeHTTP
|
||||
pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
|
||||
if req.Method == "DELETE" {
|
||||
metrics.Reset()
|
||||
io.WriteString(w, "metrics reset\n")
|
||||
return
|
||||
}
|
||||
defaultMetricsHandler(w, req)
|
||||
})
|
||||
}
|
||||
|
||||
// newMetricsHandler builds a metrics server from the config.
|
||||
func newMetricsHandler(config *componentconfig.KubeSchedulerConfiguration) http.Handler {
|
||||
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
|
||||
configz.InstallHandler(pathRecorderMux)
|
||||
pathRecorderMux.Handle("/metrics", prometheus.Handler())
|
||||
installMetricHandler(pathRecorderMux)
|
||||
if config.EnableProfiling {
|
||||
routes.Profiling{}.Install(pathRecorderMux)
|
||||
if config.EnableContentionProfiling {
|
||||
|
@ -242,8 +256,7 @@ func newHealthzHandler(config *componentconfig.KubeSchedulerConfiguration, separ
|
|||
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
|
||||
healthz.InstallHandler(pathRecorderMux)
|
||||
if !separateMetrics {
|
||||
configz.InstallHandler(pathRecorderMux)
|
||||
pathRecorderMux.Handle("/metrics", prometheus.Handler())
|
||||
installMetricHandler(pathRecorderMux)
|
||||
}
|
||||
if config.EnableProfiling {
|
||||
routes.Profiling{}.Install(pathRecorderMux)
|
||||
|
|
|
@ -23,29 +23,38 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const schedulerSubsystem = "scheduler"
|
||||
const (
|
||||
// SchedulerSubsystem - subsystem name used by scheduler
|
||||
SchedulerSubsystem = "scheduler"
|
||||
// SchedulingLatencyName - scheduler latency metric name
|
||||
SchedulingLatencyName = "scheduling_latencies_summary"
|
||||
|
||||
// OperationLabel - operation label name
|
||||
OperationLabel = "operation"
|
||||
// Binding - binding operation label value
|
||||
Binding = "binding"
|
||||
// SchedulingAlgorithm - scheduling algorithm operation label value
|
||||
SchedulingAlgorithm = "scheduling_algorithm"
|
||||
// E2eScheduling - e2e scheduling operation label value
|
||||
E2eScheduling = "e2e_scheduling"
|
||||
)
|
||||
|
||||
// All the histogram based metrics have 1ms as size for the smallest bucket.
|
||||
var (
|
||||
E2eSchedulingLatency = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Name: "e2e_scheduling_latency_microseconds",
|
||||
Help: "E2e scheduling latency (scheduling algorithm + binding)",
|
||||
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
|
||||
},
|
||||
)
|
||||
SchedulingAlgorithmLatency = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Name: "scheduling_algorithm_latency_microseconds",
|
||||
Help: "Scheduling algorithm latency",
|
||||
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
|
||||
SchedulingLatency = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: SchedulingLatencyName,
|
||||
Help: "Scheduling latency in microseconds split by sub-parts of the scheduling operation",
|
||||
// Make the sliding window of 5h.
|
||||
// TODO: The value for this should be based on some SLI definition (long term).
|
||||
MaxAge: 5 * time.Hour,
|
||||
},
|
||||
[]string{OperationLabel},
|
||||
)
|
||||
SchedulingAlgorithmPredicateEvaluationDuration = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: "scheduling_algorithm_predicate_evaluation",
|
||||
Help: "Scheduling algorithm predicate evaluation duration",
|
||||
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
|
||||
|
@ -53,7 +62,7 @@ var (
|
|||
)
|
||||
SchedulingAlgorithmPriorityEvaluationDuration = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: "scheduling_algorithm_priority_evaluation",
|
||||
Help: "Scheduling algorithm priority evaluation duration",
|
||||
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
|
||||
|
@ -61,52 +70,50 @@ var (
|
|||
)
|
||||
SchedulingAlgorithmPremptionEvaluationDuration = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: "scheduling_algorithm_preemption_evaluation",
|
||||
Help: "Scheduling algorithm preemption evaluation duration",
|
||||
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
|
||||
},
|
||||
)
|
||||
BindingLatency = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Name: "binding_latency_microseconds",
|
||||
Help: "Binding latency",
|
||||
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
|
||||
},
|
||||
)
|
||||
PreemptionVictims = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: "pod_preemption_victims",
|
||||
Help: "Number of selected preemption victims",
|
||||
})
|
||||
PreemptionAttempts = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Subsystem: schedulerSubsystem,
|
||||
Subsystem: SchedulerSubsystem,
|
||||
Name: "total_preemption_attempts",
|
||||
Help: "Total preemption attempts in the cluster till now",
|
||||
})
|
||||
metricsList = []prometheus.Collector{
|
||||
SchedulingLatency,
|
||||
SchedulingAlgorithmPredicateEvaluationDuration,
|
||||
SchedulingAlgorithmPriorityEvaluationDuration,
|
||||
SchedulingAlgorithmPremptionEvaluationDuration,
|
||||
PreemptionVictims,
|
||||
PreemptionAttempts,
|
||||
}
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
|
||||
// Register all metrics.
|
||||
func Register() {
|
||||
// Register the metrics.
|
||||
registerMetrics.Do(func() {
|
||||
prometheus.MustRegister(E2eSchedulingLatency)
|
||||
prometheus.MustRegister(SchedulingAlgorithmLatency)
|
||||
prometheus.MustRegister(BindingLatency)
|
||||
|
||||
prometheus.MustRegister(SchedulingAlgorithmPredicateEvaluationDuration)
|
||||
prometheus.MustRegister(SchedulingAlgorithmPriorityEvaluationDuration)
|
||||
prometheus.MustRegister(SchedulingAlgorithmPremptionEvaluationDuration)
|
||||
prometheus.MustRegister(PreemptionVictims)
|
||||
prometheus.MustRegister(PreemptionAttempts)
|
||||
for _, metric := range metricsList {
|
||||
prometheus.MustRegister(metric)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Reset resets metrics
|
||||
func Reset() {
|
||||
SchedulingLatency.Reset()
|
||||
}
|
||||
|
||||
// SinceInMicroseconds gets the time since the specified start in microseconds.
|
||||
func SinceInMicroseconds(start time.Time) float64 {
|
||||
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
|
||||
|
|
|
@ -429,7 +429,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
|
|||
return err
|
||||
}
|
||||
|
||||
metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
|
||||
metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInMicroseconds(bindingStart))
|
||||
sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
|
||||
return nil
|
||||
}
|
||||
|
@ -461,7 +461,7 @@ func (sched *Scheduler) scheduleOne() {
|
|||
}
|
||||
return
|
||||
}
|
||||
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
|
||||
metrics.SchedulingLatency.WithLabelValues(metrics.SchedulingAlgorithm).Observe(metrics.SinceInMicroseconds(start))
|
||||
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
|
||||
// This allows us to keep scheduling without waiting on binding to occur.
|
||||
assumedPod := pod.DeepCopy()
|
||||
|
@ -496,7 +496,7 @@ func (sched *Scheduler) scheduleOne() {
|
|||
Name: suggestedHost,
|
||||
},
|
||||
})
|
||||
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
|
||||
metrics.SchedulingLatency.WithLabelValues(metrics.E2eScheduling).Observe(metrics.SinceInMicroseconds(start))
|
||||
if err != nil {
|
||||
glog.Errorf("Internal error binding pod: (%v)", err)
|
||||
}
|
||||
|
|
|
@ -72,6 +72,7 @@ go_library(
|
|||
"//pkg/kubemark:go_default_library",
|
||||
"//pkg/master/ports:go_default_library",
|
||||
"//pkg/scheduler/algorithm/predicates:go_default_library",
|
||||
"//pkg/scheduler/metrics:go_default_library",
|
||||
"//pkg/scheduler/schedulercache:go_default_library",
|
||||
"//pkg/security/podsecuritypolicy/seccomp:go_default_library",
|
||||
"//pkg/ssh:go_default_library",
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
schedulermetric "k8s.io/kubernetes/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/pkg/util/system"
|
||||
"k8s.io/kubernetes/test/e2e/framework/metrics"
|
||||
|
||||
|
@ -130,6 +131,8 @@ func (m *MetricsForE2E) SummaryKind() string {
|
|||
return "MetricsForE2E"
|
||||
}
|
||||
|
||||
var SchedulingLatencyMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_" + schedulermetric.SchedulingLatencyName)
|
||||
|
||||
var InterestingApiServerMetrics = []string{
|
||||
"apiserver_request_count",
|
||||
"apiserver_request_latencies_summary",
|
||||
|
@ -439,27 +442,29 @@ func getMetrics(c clientset.Interface) (string, error) {
|
|||
return string(body), nil
|
||||
}
|
||||
|
||||
// Retrieves scheduler latency metrics.
|
||||
func getSchedulingLatency(c clientset.Interface) (*SchedulingMetrics, error) {
|
||||
result := SchedulingMetrics{}
|
||||
// Sends REST request to kube scheduler metrics
|
||||
func sendRestRequestToScheduler(c clientset.Interface, op string) (string, error) {
|
||||
opUpper := strings.ToUpper(op)
|
||||
if opUpper != "GET" && opUpper != "DELETE" {
|
||||
return "", fmt.Errorf("Unknown REST request")
|
||||
}
|
||||
|
||||
// Check if master Node is registered
|
||||
nodes, err := c.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||
ExpectNoError(err)
|
||||
|
||||
var data string
|
||||
var masterRegistered = false
|
||||
for _, node := range nodes.Items {
|
||||
if system.IsMasterNode(node.Name) {
|
||||
masterRegistered = true
|
||||
}
|
||||
}
|
||||
|
||||
var responseText string
|
||||
if masterRegistered {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
var rawData []byte
|
||||
rawData, err = c.CoreV1().RESTClient().Get().
|
||||
body, err := c.CoreV1().RESTClient().Verb(opUpper).
|
||||
Context(ctx).
|
||||
Namespace(metav1.NamespaceSystem).
|
||||
Resource("pods").
|
||||
|
@ -469,33 +474,46 @@ func getSchedulingLatency(c clientset.Interface) (*SchedulingMetrics, error) {
|
|||
Do().Raw()
|
||||
|
||||
ExpectNoError(err)
|
||||
data = string(rawData)
|
||||
responseText = string(body)
|
||||
} else {
|
||||
// If master is not registered fall back to old method of using SSH.
|
||||
if TestContext.Provider == "gke" {
|
||||
Logf("Not grabbing scheduler metrics through master SSH: unsupported for gke")
|
||||
return nil, nil
|
||||
return "", nil
|
||||
}
|
||||
cmd := "curl http://localhost:10251/metrics"
|
||||
|
||||
cmd := "curl -X " + opUpper + " http://localhost:10251/metrics"
|
||||
sshResult, err := SSH(cmd, GetMasterHost()+":22", TestContext.Provider)
|
||||
if err != nil || sshResult.Code != 0 {
|
||||
return &result, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
|
||||
return "", fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
|
||||
}
|
||||
data = sshResult.Stdout
|
||||
responseText = sshResult.Stdout
|
||||
}
|
||||
return responseText, nil
|
||||
}
|
||||
|
||||
// Retrieves scheduler latency metrics.
|
||||
func getSchedulingLatency(c clientset.Interface) (*SchedulingMetrics, error) {
|
||||
result := SchedulingMetrics{}
|
||||
data, err := sendRestRequestToScheduler(c, "GET")
|
||||
|
||||
samples, err := extractMetricSamples(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, sample := range samples {
|
||||
if sample.Metric[model.MetricNameLabel] != SchedulingLatencyMetricName {
|
||||
continue
|
||||
}
|
||||
|
||||
var metric *LatencyMetric = nil
|
||||
switch sample.Metric[model.MetricNameLabel] {
|
||||
case "scheduler_scheduling_algorithm_latency_microseconds":
|
||||
switch sample.Metric[schedulermetric.OperationLabel] {
|
||||
case schedulermetric.SchedulingAlgorithm:
|
||||
metric = &result.SchedulingLatency
|
||||
case "scheduler_binding_latency_microseconds":
|
||||
case schedulermetric.Binding:
|
||||
metric = &result.BindingLatency
|
||||
case "scheduler_e2e_scheduling_latency_microseconds":
|
||||
case schedulermetric.E2eScheduling:
|
||||
metric = &result.E2ELatency
|
||||
}
|
||||
if metric == nil {
|
||||
|
@ -507,7 +525,7 @@ func getSchedulingLatency(c clientset.Interface) (*SchedulingMetrics, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
setQuantile(metric, quantile, time.Duration(int64(latency))*time.Microsecond)
|
||||
setQuantile(metric, quantile, time.Duration(int64(latency)))
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
@ -521,6 +539,14 @@ func VerifySchedulerLatency(c clientset.Interface) (*SchedulingMetrics, error) {
|
|||
return latency, nil
|
||||
}
|
||||
|
||||
func ResetSchedulerMetrics(c clientset.Interface) error {
|
||||
responseText, err := sendRestRequestToScheduler(c, "DELETE")
|
||||
if err != nil || responseText != "metrics reset\n" {
|
||||
return fmt.Errorf("Unexpected response: %q", responseText)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func PrettyPrintJSON(metrics interface{}) string {
|
||||
output := &bytes.Buffer{}
|
||||
if err := json.NewEncoder(output).Encode(metrics); err != nil {
|
||||
|
|
|
@ -395,8 +395,6 @@ var _ = SIGDescribe("Density", func() {
|
|||
}
|
||||
|
||||
// Verify scheduler metrics.
|
||||
// TODO: Reset metrics at the beginning of the test.
|
||||
// We should do something similar to how we do it for APIserver.
|
||||
latency, err := framework.VerifySchedulerLatency(c)
|
||||
framework.ExpectNoError(err)
|
||||
if err == nil {
|
||||
|
@ -442,6 +440,7 @@ var _ = SIGDescribe("Density", func() {
|
|||
|
||||
uuid = string(utiluuid.NewUUID())
|
||||
|
||||
framework.ExpectNoError(framework.ResetSchedulerMetrics(c))
|
||||
framework.ExpectNoError(framework.ResetMetrics(c))
|
||||
framework.ExpectNoError(os.Mkdir(fmt.Sprintf(framework.TestContext.OutputDir+"/%s", uuid), 0777))
|
||||
|
||||
|
|
Loading…
Reference in New Issue