Merge pull request #19357 from timstclair/stats-refactor

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-01-13 23:54:48 -08:00
commit a385de1e43
4 changed files with 227 additions and 124 deletions

View File

@ -18,14 +18,12 @@ package server
import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/pprof"
"path"
"strconv"
"strings"
"sync"
@ -49,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/httplog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flushwriter"
@ -222,7 +221,7 @@ func (s *Server) InstallDefaultHandlers() {
Operation("getPods"))
s.restfulCont.Add(ws)
s.restfulCont.Handle("/stats/", &httpHandler{f: s.handleStats})
s.restfulCont.Add(stats.CreateHandlers(s.host))
s.restfulCont.Handle("/metrics", prometheus.Handler())
ws = new(restful.WebService)
@ -357,13 +356,6 @@ func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.f(w, r)
}
// error serializes an error object into an HTTP response.
func (s *Server) error(w http.ResponseWriter, err error) {
msg := fmt.Sprintf("Internal Error: %v", err)
glog.Infof("HTTP InternalServerError: %s", msg)
http.Error(w, msg, http.StatusInternalServerError)
}
// Checks if kubelet's sync loop that updates containers is working.
func (s *Server) syncLoopHealthCheck(req *http.Request) error {
duration := s.host.ResyncInterval() * 2
@ -501,11 +493,6 @@ func (s *Server) getRunningPods(request *restful.Request, response *restful.Resp
response.Write(data)
}
// handleStats handles stats requests against the Kubelet.
func (s *Server) handleStats(w http.ResponseWriter, req *http.Request) {
s.serveStats(w, req)
}
// getLogs handles logs requests against the Kubelet.
func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
s.host.ServeLogs(response, request.Request)
@ -1061,107 +1048,3 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
).Log()
s.restfulCont.ServeHTTP(w, req)
}
type StatsRequest struct {
// The name of the container for which to request stats.
// Default: /
ContainerName string `json:"containerName,omitempty"`
// Max number of stats to return.
// If start and end time are specified this limit is ignored.
// Default: 60
NumStats int `json:"num_stats,omitempty"`
// Start time for which to query information.
// If omitted, the beginning of time is assumed.
Start time.Time `json:"start,omitempty"`
// End time for which to query information.
// If omitted, current time is assumed.
End time.Time `json:"end,omitempty"`
// Whether to also include information from subcontainers.
// Default: false.
Subcontainers bool `json:"subcontainers,omitempty"`
}
// serveStats implements stats logic.
func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
// Stats requests are in the following forms:
//
// /stats/ : Root container stats
// /stats/container/ : Non-Kubernetes container stats (returns a map)
// /stats/<pod name>/<container name> : Stats for Kubernetes pod/container
// /stats/<namespace>/<pod name>/<uid>/<container name> : Stats for Kubernetes namespace/pod/uid/container
components := strings.Split(strings.TrimPrefix(path.Clean(req.URL.Path), "/"), "/")
var stats interface{}
var err error
var query StatsRequest
query.NumStats = 60
err = json.NewDecoder(req.Body).Decode(&query)
if err != nil && err != io.EOF {
s.error(w, err)
return
}
cadvisorRequest := cadvisorapi.ContainerInfoRequest{
NumStats: query.NumStats,
Start: query.Start,
End: query.End,
}
switch len(components) {
case 1:
// Root container stats.
var statsMap map[string]*cadvisorapi.ContainerInfo
statsMap, err = s.host.GetRawContainerInfo("/", &cadvisorRequest, false)
stats = statsMap["/"]
case 2:
// Non-Kubernetes container stats.
if components[1] != "container" {
http.Error(w, fmt.Sprintf("unknown stats request type %q", components[1]), http.StatusNotFound)
return
}
containerName := path.Join("/", query.ContainerName)
stats, err = s.host.GetRawContainerInfo(containerName, &cadvisorRequest, query.Subcontainers)
case 3:
// Backward compatibility without uid information, does not support namespace
pod, ok := s.host.GetPodByName(api.NamespaceDefault, components[1])
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
stats, err = s.host.GetContainerInfo(kubecontainer.GetPodFullName(pod), "", components[2], &cadvisorRequest)
case 5:
pod, ok := s.host.GetPodByName(components[1], components[2])
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
stats, err = s.host.GetContainerInfo(kubecontainer.GetPodFullName(pod), types.UID(components[3]), components[4], &cadvisorRequest)
default:
http.Error(w, fmt.Sprintf("Unknown resource: %v", components), http.StatusNotFound)
return
}
switch err {
case nil:
break
case kubecontainer.ErrContainerNotFound:
http.Error(w, err.Error(), http.StatusNotFound)
return
default:
s.error(w, err)
return
}
if stats == nil {
fmt.Fprint(w, "{}")
return
}
data, err := json.Marshal(stats)
if err != nil {
s.error(w, err)
return
}
w.Header().Add("Content-type", "application/json")
w.Write(data)
}

View File

@ -534,7 +534,7 @@ func TestAuthFilters(t *testing.T) {
// This is a sanity check that the Handle->HandleWithFilter() delegation is working
// Ideally, these would move to registered web services and this list would get shorter
expectedPaths := []string{"/healthz", "/stats/", "/metrics"}
expectedPaths := []string{"/healthz", "/metrics"}
paths := sets.NewString(fw.serverUnderTest.restfulCont.RegisteredHandlePaths()...)
for _, expectedPath := range expectedPaths {
if !paths.Has(expectedPath) {

View File

@ -0,0 +1,220 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"encoding/json"
"fmt"
"io"
"net/http"
"path"
"time"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/types"
)
// Host methods required by stats handlers.
type StatsProvider interface {
GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error)
GetPodByName(namespace, name string) (*api.Pod, bool)
}
type handler struct {
provider StatsProvider
}
func CreateHandlers(provider StatsProvider) *restful.WebService {
h := &handler{provider}
ws := &restful.WebService{}
ws.Path("/stats/").
Produces(restful.MIME_JSON)
endpoints := []struct {
path string
handler restful.RouteFunction
}{
{"", h.handleStats},
{"/container", h.handleSystemContainer},
{"/{podName}/{containerName}", h.handlePodContainer},
{"/{namespace}/{podName}/{uid}/{containerName}", h.handlePodContainer},
}
for _, e := range endpoints {
for _, method := range []string{"GET", "POST"} {
ws.Route(ws.
Method(method).
Path(e.path).
To(e.handler))
}
}
return ws
}
type StatsRequest struct {
// The name of the container for which to request stats.
// Default: /
ContainerName string `json:"containerName,omitempty"`
// Max number of stats to return.
// If start and end time are specified this limit is ignored.
// Default: 60
NumStats int `json:"num_stats,omitempty"`
// Start time for which to query information.
// If omitted, the beginning of time is assumed.
Start time.Time `json:"start,omitempty"`
// End time for which to query information.
// If omitted, current time is assumed.
End time.Time `json:"end,omitempty"`
// Whether to also include information from subcontainers.
// Default: false.
Subcontainers bool `json:"subcontainers,omitempty"`
}
func (r *StatsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest {
return &cadvisorapi.ContainerInfoRequest{
NumStats: r.NumStats,
Start: r.Start,
End: r.End,
}
}
func parseStatsRequest(request *restful.Request) (StatsRequest, error) {
// Default request.
query := StatsRequest{
NumStats: 60,
}
err := json.NewDecoder(request.Request.Body).Decode(&query)
if err != nil && err != io.EOF {
return query, err
}
return query, nil
}
// Handles root container stats requests to /stats
func (h *handler) handleStats(request *restful.Request, response *restful.Response) {
query, err := parseStatsRequest(request)
if err != nil {
handleError(response, err)
return
}
// Root container stats.
statsMap, err := h.provider.GetRawContainerInfo("/", query.cadvisorRequest(), false)
if err != nil {
handleError(response, err)
return
}
writeResponse(response, statsMap["/"])
}
// Handles non-kubernetes container stats requests to /stats/container/
func (h *handler) handleSystemContainer(request *restful.Request, response *restful.Response) {
query, err := parseStatsRequest(request)
if err != nil {
handleError(response, err)
return
}
// Non-Kubernetes container stats.
containerName := path.Join("/", query.ContainerName)
stats, err := h.provider.GetRawContainerInfo(
containerName, query.cadvisorRequest(), query.Subcontainers)
if err != nil {
handleError(response, err)
return
}
writeResponse(response, stats)
}
// Handles kubernetes pod/container stats requests to:
// /stats/<pod name>/<container name>
// /stats/<namespace>/<pod name>/<uid>/<container name>
func (h *handler) handlePodContainer(request *restful.Request, response *restful.Response) {
query, err := parseStatsRequest(request)
if err != nil {
handleError(response, err)
return
}
// Default parameters.
params := map[string]string{
"namespace": api.NamespaceDefault,
"uid": "",
}
for k, v := range request.PathParameters() {
params[k] = v
}
if params["podName"] == "" || params["containerName"] == "" {
response.WriteErrorString(http.StatusBadRequest,
fmt.Sprintf("Invalid pod container request: %v", params))
return
}
pod, ok := h.provider.GetPodByName(params["namespace"], params["podName"])
if !ok {
glog.V(4).Infof("Container not found: %v", params)
handleError(response, kubecontainer.ErrContainerNotFound)
return
}
stats, err := h.provider.GetContainerInfo(
kubecontainer.GetPodFullName(pod),
types.UID(params["uid"]),
params["containerName"],
query.cadvisorRequest())
if err != nil {
handleError(response, err)
return
}
writeResponse(response, stats)
}
func writeResponse(response *restful.Response, stats interface{}) {
if stats == nil {
return
}
err := response.WriteAsJson(stats)
if err != nil {
handleError(response, err)
}
}
// handleError serializes an error object into an HTTP response.
func handleError(response *restful.Response, err error) {
switch err {
case kubecontainer.ErrContainerNotFound:
response.WriteError(http.StatusNotFound, err)
default:
msg := fmt.Sprintf("Internal Error: %v", err)
glog.Infof("HTTP InternalServerError: %s", msg)
response.WriteErrorString(http.StatusInternalServerError, msg)
}
}

View File

@ -34,7 +34,7 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
@ -147,7 +147,7 @@ func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nod
// getContainerInfo contacts kubelet for the container information. The "Stats"
// in the returned ContainerInfo is subject to the requirements in statsRequest.
func getContainerInfo(c *client.Client, nodeName string, req *server.StatsRequest) (map[string]cadvisorapi.ContainerInfo, error) {
func getContainerInfo(c *client.Client, nodeName string, req *stats.StatsRequest) (map[string]cadvisorapi.ContainerInfo, error) {
reqBody, err := json.Marshal(req)
if err != nil {
return nil, err
@ -240,7 +240,7 @@ func getOneTimeResourceUsageOnNode(
return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
}
// Get information of all containers on the node.
containerInfos, err := getContainerInfo(c, nodeName, &server.StatsRequest{
containerInfos, err := getContainerInfo(c, nodeName, &stats.StatsRequest{
ContainerName: "/",
NumStats: numStats,
Subcontainers: true,
@ -408,7 +408,7 @@ func (r *resourceCollector) Stop() {
// collectStats gets the latest stats from kubelet's /stats/container, computes
// the resource usage, and pushes it to the buffer.
func (r *resourceCollector) collectStats(oldStats map[string]*cadvisorapi.ContainerStats) {
infos, err := getContainerInfo(r.client, r.node, &server.StatsRequest{
infos, err := getContainerInfo(r.client, r.node, &stats.StatsRequest{
ContainerName: "/",
NumStats: 1,
Subcontainers: true,