Merge pull request #73966 from alculquicondor/fix/lint-kubelet-server

Fix lint on pkg/kubelet/server/...
pull/564/head
Kubernetes Prow Robot 2019-02-25 20:27:48 -08:00 committed by GitHub
commit 272d78f1d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 123 additions and 125 deletions

View File

@ -209,10 +209,6 @@ pkg/kubelet/prober/testing
pkg/kubelet/qos
pkg/kubelet/remote
pkg/kubelet/secret
pkg/kubelet/server
pkg/kubelet/server/portforward
pkg/kubelet/server/stats
pkg/kubelet/server/streaming
pkg/kubelet/stats
pkg/kubelet/status
pkg/kubelet/status/testing

View File

@ -107,7 +107,7 @@ func (ds *dockerService) ExecSync(_ context.Context, req *runtimeapi.ExecSyncReq
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("exec")
return nil, streaming.NewErrorStreamingDisabled("exec")
}
_, err := checkContainerStatus(ds.client, req.ContainerId)
if err != nil {
@ -119,7 +119,7 @@ func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("attach")
return nil, streaming.NewErrorStreamingDisabled("attach")
}
_, err := checkContainerStatus(ds.client, req.ContainerId)
if err != nil {
@ -131,7 +131,7 @@ func (ds *dockerService) Attach(_ context.Context, req *runtimeapi.AttachRequest
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (ds *dockerService) PortForward(_ context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("port forward")
return nil, streaming.NewErrorStreamingDisabled("port forward")
}
_, err := checkContainerStatus(ds.client, req.PodSandboxId)
if err != nil {

View File

@ -59,11 +59,11 @@ var (
)
type volumeStatsCollector struct {
statsProvider serverstats.StatsProvider
statsProvider serverstats.Provider
}
// NewVolumeStatsCollector creates a volume stats prometheus collector.
func NewVolumeStatsCollector(statsProvider serverstats.StatsProvider) prometheus.Collector {
func NewVolumeStatsCollector(statsProvider serverstats.Provider) prometheus.Collector {
return &volumeStatsCollector{statsProvider: statsProvider}
}

View File

@ -42,6 +42,7 @@ func NewKubeletAuth(authenticator authenticator.Request, authorizerAttributeGett
return &KubeletAuth{authenticator, authorizerAttributeGetter, authorizer}
}
// NewNodeAuthorizerAttributesGetter creates a new authorizer.RequestAttributesGetter for the node.
func NewNodeAuthorizerAttributesGetter(nodeName types.NodeName) authorizer.RequestAttributesGetter {
return nodeAuthorizerAttributesGetter{nodeName: nodeName}
}

View File

@ -14,10 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// package portforward contains server-side logic for handling port forwarding requests.
// Package portforward contains server-side logic for handling port forwarding requests.
package portforward
// The subprotocol "portforward.k8s.io" is used for port forwarding.
// ProtocolV1Name is the name of the subprotocol used for port forwarding.
const ProtocolV1Name = "portforward.k8s.io"
// SupportedProtocols are the supported port forwarding protocols.
var SupportedProtocols = []string{ProtocolV1Name}

View File

@ -33,7 +33,7 @@ import (
"k8s.io/klog"
)
func handleHttpStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
func handleHTTPStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
_, err := httpstream.Handshake(req, w, supportedPortForwardProtocols)
// negotiated protocol isn't currently used server side, but could be in the future
if err != nil {

View File

@ -63,7 +63,7 @@ func TestHTTPStreamReceived(t *testing.T) {
for name, test := range tests {
streams := make(chan httpstream.Stream, 1)
f := httpStreamReceived(streams)
stream := newFakeHttpStream()
stream := newFakeHTTPStream()
if len(test.port) > 0 {
stream.headers.Set("port", test.port)
}
@ -135,7 +135,7 @@ func TestGetStreamPair(t *testing.T) {
}
// removed via complete
dataStream := newFakeHttpStream()
dataStream := newFakeHTTPStream()
dataStream.headers.Set(api.StreamType, api.StreamTypeData)
complete, err := p.add(dataStream)
if err != nil {
@ -145,7 +145,7 @@ func TestGetStreamPair(t *testing.T) {
t.Fatalf("unexpected complete")
}
errorStream := newFakeHttpStream()
errorStream := newFakeHTTPStream()
errorStream.headers.Set(api.StreamType, api.StreamTypeError)
complete, err = p.add(errorStream)
if err != nil {
@ -188,7 +188,7 @@ func TestGetStreamPair(t *testing.T) {
func TestRequestID(t *testing.T) {
h := &httpStreamHandler{}
s := newFakeHttpStream()
s := newFakeHTTPStream()
s.headers.Set(api.StreamType, api.StreamTypeError)
s.id = 1
if e, a := "1", h.requestID(s); e != a {
@ -208,39 +208,39 @@ func TestRequestID(t *testing.T) {
}
}
type fakeHttpStream struct {
type fakeHTTPStream struct {
headers http.Header
id uint32
}
func newFakeHttpStream() *fakeHttpStream {
return &fakeHttpStream{
func newFakeHTTPStream() *fakeHTTPStream {
return &fakeHTTPStream{
headers: make(http.Header),
}
}
var _ httpstream.Stream = &fakeHttpStream{}
var _ httpstream.Stream = &fakeHTTPStream{}
func (s *fakeHttpStream) Read(data []byte) (int, error) {
func (s *fakeHTTPStream) Read(data []byte) (int, error) {
return 0, nil
}
func (s *fakeHttpStream) Write(data []byte) (int, error) {
func (s *fakeHTTPStream) Write(data []byte) (int, error) {
return 0, nil
}
func (s *fakeHttpStream) Close() error {
func (s *fakeHTTPStream) Close() error {
return nil
}
func (s *fakeHttpStream) Reset() error {
func (s *fakeHTTPStream) Reset() error {
return nil
}
func (s *fakeHttpStream) Headers() http.Header {
func (s *fakeHTTPStream) Headers() http.Header {
return s.headers
}
func (s *fakeHttpStream) Identifier() uint32 {
func (s *fakeHTTPStream) Identifier() uint32 {
return s.id
}

View File

@ -43,7 +43,7 @@ func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder Po
if wsstream.IsWebSocketRequest(req) {
err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
} else {
err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
}
if err != nil {

View File

@ -43,15 +43,15 @@ const (
v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
)
// options contains details about which streams are required for
// port forwarding.
// V4Options contains details about which streams are required for port
// forwarding.
// All fields included in V4Options need to be expressed explicitly in the
// CRI (pkg/kubelet/apis/cri/{version}/api.proto) PortForwardRequest.
type V4Options struct {
Ports []int32
}
// newOptions creates a new options from the Request.
// NewV4Options creates a new options from the Request.
func NewV4Options(req *http.Request) (*V4Options, error) {
if !wsstream.IsWebSocketRequest(req) {
return &V4Options{}, nil

View File

@ -88,6 +88,7 @@ type Server struct {
redirectContainerStreaming bool
}
// TLSOptions holds the TLS options.
type TLSOptions struct {
Config *tls.Config
CertFile string
@ -165,7 +166,7 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st
klog.Fatal(server.ListenAndServe())
}
// ListenAndServePodResources initializes a grpc server to serve the PodResources service
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
server := grpc.NewServer()
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider))
@ -186,7 +187,7 @@ type AuthInterface interface {
// HostInterface contains all the kubelet methods required by the server.
// For testability.
type HostInterface interface {
stats.StatsProvider
stats.Provider
GetVersionInfo() (*cadvisorapi.VersionInfo, error)
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
GetRunningPods() ([]*v1.Pod, error)
@ -533,7 +534,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist\n", podID))
response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist", podID))
return
}
// Check if containerName is valid.
@ -553,12 +554,12 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
}
}
if !containerExists {
response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q\n", containerName, podID))
response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q", containerName, podID))
return
}
if _, ok := response.ResponseWriter.(http.Flusher); !ok {
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs\n", reflect.TypeOf(response)))
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
return
}
fw := flushwriter.Wrap(response.ResponseWriter)
@ -591,7 +592,7 @@ func (s *Server) getPods(request *restful.Request, response *restful.Response) {
response.WriteError(http.StatusInternalServerError, err)
return
}
writeJsonResponse(response, data)
writeJSONResponse(response, data)
}
// getRunningPods returns a list of pods running on Kubelet. The list is
@ -608,7 +609,7 @@ func (s *Server) getRunningPods(request *restful.Request, response *restful.Resp
response.WriteError(http.StatusInternalServerError, err)
return
}
writeJsonResponse(response, data)
writeJSONResponse(response, data)
}
// getLogs handles logs requests against the Kubelet.
@ -747,11 +748,11 @@ func (s *Server) getRun(request *restful.Request, response *restful.Response) {
response.WriteError(http.StatusInternalServerError, err)
return
}
writeJsonResponse(response, data)
writeJSONResponse(response, data)
}
// Derived from go-restful writeJSON.
func writeJsonResponse(response *restful.Response, data []byte) {
func writeJSONResponse(response *restful.Response, data []byte) {
if data == nil {
response.WriteHeader(http.StatusOK)
// do not write a nil representation
@ -834,7 +835,7 @@ func (a prometheusHostAdapter) GetMachineInfo() (*cadvisorapi.MachineInfo, error
return a.host.GetCachedMachineInfo()
}
func containerPrometheusLabelsFunc(s stats.StatsProvider) metrics.ContainerLabelsFunc {
func containerPrometheusLabelsFunc(s stats.Provider) metrics.ContainerLabelsFunc {
// containerPrometheusLabels maps cAdvisor labels to prometheus labels.
return func(c *cadvisorapi.ContainerInfo) map[string]string {
// Prometheus requires that all metrics in the same family have the same labels,

View File

@ -115,7 +115,7 @@ func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
return fk.machineInfoFunc()
}
func (_ *fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
func (*fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
return &cadvisorapi.VersionInfo{}, nil
}
@ -249,23 +249,23 @@ func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types
}
// Unused functions
func (_ *fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
func (_ *fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
func (_ *fakeKubelet) GetPodCgroupRoot() string { return "" }
func (_ *fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false }
func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
func (*fakeKubelet) GetPodCgroupRoot() string { return "" }
func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false }
func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
return map[string]volume.Volume{}, true
}
func (_ *fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (_ *fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (_ *fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil }
func (_ *fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (_ *fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
func (_ *fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) ListPodStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { return nil, nil }
func (*fakeKubelet) ImageFsStats() (*statsapi.FsStats, error) { return nil, nil }
func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
func (*fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
return nil, nil, nil
}
func (_ *fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
func (*fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
return nil, nil
}
@ -744,7 +744,7 @@ func TestAuthFilters(t *testing.T) {
The kubelet API has likely registered a handler for a new path.
If the new path has a use case for partitioned authorization when requested from the kubelet API,
add a specific subresource for it in auth.go#GetRequestAttributes() and in TestAuthFilters().
Otherwise, add it to the expected list of paths that map to the "proxy" subresource in TestAuthFilters().`, path))
Otherwise, add it to the expected list of paths that map to the "proxy" subresource in TestAuthFilters()`, path))
}
}
attributesGetter := NewNodeAuthorizerAttributesGetter(types.NodeName("test"))
@ -1553,9 +1553,8 @@ func TestServePortForward(t *testing.T) {
if test.redirect {
assert.Equal(t, http.StatusFound, resp.StatusCode, "status code")
return
} else {
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
}
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
conn, err := upgradeRoundTripper.NewConnection(resp)
require.NoError(t, err, "creating streaming connection")

View File

@ -27,9 +27,7 @@ import (
"k8s.io/klog"
)
// Map to PodVolumeStats pointers since the addresses for map values are not constant and can cause pain
// if we need ever to get a pointer to one of the values (e.g. you can't)
type Cache map[types.UID]*volumeStatCalculator
type statCache map[types.UID]*volumeStatCalculator
// fsResourceAnalyzerInterface is for embedding fs functions into ResourceAnalyzer
type fsResourceAnalyzerInterface interface {
@ -38,7 +36,7 @@ type fsResourceAnalyzerInterface interface {
// fsResourceAnalyzer provides stats about fs resource usage
type fsResourceAnalyzer struct {
statsProvider StatsProvider
statsProvider Provider
calcPeriod time.Duration
cachedVolumeStats atomic.Value
startOnce sync.Once
@ -47,12 +45,12 @@ type fsResourceAnalyzer struct {
var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{}
// newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation
func newFsResourceAnalyzer(statsProvider StatsProvider, calcVolumePeriod time.Duration) *fsResourceAnalyzer {
func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration) *fsResourceAnalyzer {
r := &fsResourceAnalyzer{
statsProvider: statsProvider,
calcPeriod: calcVolumePeriod,
}
r.cachedVolumeStats.Store(make(Cache))
r.cachedVolumeStats.Store(make(statCache))
return r
}
@ -70,8 +68,8 @@ func (s *fsResourceAnalyzer) Start() {
// updateCachedPodVolumeStats calculates and caches the PodVolumeStats for every Pod known to the kubelet.
func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
oldCache := s.cachedVolumeStats.Load().(Cache)
newCache := make(Cache)
oldCache := s.cachedVolumeStats.Load().(statCache)
newCache := make(statCache)
// Copy existing entries to new map, creating/starting new entries for pods missing from the cache
for _, pod := range s.statsProvider.GetPods() {
@ -96,12 +94,12 @@ func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
// GetPodVolumeStats returns the PodVolumeStats for a given pod. Results are looked up from a cache that
// is eagerly populated in the background, and never calculated on the fly.
func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) {
cache := s.cachedVolumeStats.Load().(Cache)
if statCalc, found := cache[uid]; !found {
cache := s.cachedVolumeStats.Load().(statCache)
statCalc, found := cache[uid]
if !found {
// TODO: Differentiate between stats being empty
// See issue #20679
return PodVolumeStats{}, false
} else {
return statCalc.GetLatest()
}
return statCalc.GetLatest()
}

View File

@ -37,8 +37,8 @@ import (
"k8s.io/kubernetes/pkg/volume"
)
// Host methods required by stats handlers.
type StatsProvider interface {
// Provider hosts methods required by stats handlers.
type Provider interface {
// The following stats are provided by either CRI or cAdvisor.
//
// ListPodStats returns the stats of all the containers managed by pods.
@ -96,11 +96,12 @@ type StatsProvider interface {
}
type handler struct {
provider StatsProvider
provider Provider
summaryProvider SummaryProvider
}
func CreateHandlers(rootPath string, provider StatsProvider, summaryProvider SummaryProvider) *restful.WebService {
// CreateHandlers creates the REST handlers for the stats.
func CreateHandlers(rootPath string, provider Provider, summaryProvider SummaryProvider) *restful.WebService {
h := &handler{provider, summaryProvider}
ws := &restful.WebService{}
@ -130,7 +131,7 @@ func CreateHandlers(rootPath string, provider StatsProvider, summaryProvider Sum
return ws
}
type StatsRequest struct {
type statsRequest struct {
// The name of the container for which to request stats.
// Default: /
// +optional
@ -158,7 +159,7 @@ type StatsRequest struct {
Subcontainers bool `json:"subcontainers,omitempty"`
}
func (r *StatsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest {
func (r *statsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest {
return &cadvisorapi.ContainerInfoRequest{
NumStats: r.NumStats,
Start: r.Start,
@ -166,9 +167,9 @@ func (r *StatsRequest) cadvisorRequest() *cadvisorapi.ContainerInfoRequest {
}
}
func parseStatsRequest(request *restful.Request) (StatsRequest, error) {
func parseStatsRequest(request *restful.Request) (statsRequest, error) {
// Default request.
query := StatsRequest{
query := statsRequest{
NumStats: 60,
}

View File

@ -37,7 +37,7 @@ type resourceAnalyzer struct {
var _ ResourceAnalyzer = &resourceAnalyzer{}
// NewResourceAnalyzer returns a new ResourceAnalyzer
func NewResourceAnalyzer(statsProvider StatsProvider, calVolumeFrequency time.Duration) ResourceAnalyzer {
func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration) ResourceAnalyzer {
fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency)
summaryProvider := NewSummaryProvider(statsProvider)
return &resourceAnalyzer{fsAnalyzer, summaryProvider}

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util"
)
// SummaryProvider provides summaries of the stats from Kubelet.
type SummaryProvider interface {
// Get provides a new Summary with the stats from Kubelet,
// and will update some stats if updateStats is true
@ -41,14 +42,14 @@ type summaryProviderImpl struct {
// systemBootTime is the time at which the system was started
systemBootTime metav1.Time
provider StatsProvider
provider Provider
}
var _ SummaryProvider = &summaryProviderImpl{}
// NewSummaryProvider returns a SummaryProvider using the stats provided by the
// specified statsProvider.
func NewSummaryProvider(statsProvider StatsProvider) SummaryProvider {
func NewSummaryProvider(statsProvider Provider) SummaryProvider {
kubeletCreationTime := metav1.Now()
bootTime, err := util.GetBootTime()
if err != nil {

View File

@ -32,7 +32,7 @@ import (
// volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result
type volumeStatCalculator struct {
statsProvider StatsProvider
statsProvider Provider
jitterPeriod time.Duration
pod *v1.Pod
stopChannel chan struct{}
@ -49,7 +49,7 @@ type PodVolumeStats struct {
}
// newVolumeStatCalculator creates a new VolumeStatCalculator
func newVolumeStatCalculator(statsProvider StatsProvider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator {
func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod) *volumeStatCalculator {
return &volumeStatCalculator{
statsProvider: statsProvider,
jitterPeriod: jitterPeriod,
@ -79,11 +79,11 @@ func (s *volumeStatCalculator) StopOnce() *volumeStatCalculator {
// getLatest returns the most recent PodVolumeStats from the cache
func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) {
if result := s.latest.Load(); result == nil {
result := s.latest.Load()
if result == nil {
return PodVolumeStats{}, false
} else {
return result.(PodVolumeStats), true
}
return result.(PodVolumeStats), true
}
// calcAndStoreStats calculates PodVolumeStats for a given pod and writes the result to the s.latest cache.
@ -102,8 +102,8 @@ func (s *volumeStatCalculator) calcAndStoreStats() {
}
// Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats
ephemeralStats := []stats.VolumeStats{}
persistentStats := []stats.VolumeStats{}
var ephemeralStats []stats.VolumeStats
var persistentStats []stats.VolumeStats
for name, v := range volumes {
metric, err := v.GetMetrics()
if err != nil {

View File

@ -17,7 +17,6 @@ limitations under the License.
package streaming
import (
"fmt"
"net/http"
"strconv"
@ -26,16 +25,17 @@ import (
"google.golang.org/grpc/status"
)
func ErrorStreamingDisabled(method string) error {
return status.Errorf(codes.NotFound, fmt.Sprintf("streaming method %s disabled", method))
// NewErrorStreamingDisabled creates an error for disabled streaming method.
func NewErrorStreamingDisabled(method string) error {
return status.Errorf(codes.NotFound, "streaming method %s disabled", method)
}
// The error returned when the maximum number of in-flight requests is exceeded.
func ErrorTooManyInFlight() error {
return status.Errorf(codes.ResourceExhausted, "maximum number of in-flight requests exceeded")
// NewErrorTooManyInFlight creates an error for exceeding the maximum number of in-flight requests.
func NewErrorTooManyInFlight() error {
return status.Error(codes.ResourceExhausted, "maximum number of in-flight requests exceeded")
}
// Translates a CRI streaming error into an appropriate HTTP response.
// WriteError translates a CRI streaming error into an appropriate HTTP response.
func WriteError(err error, w http.ResponseWriter) error {
var status int
switch grpc.Code(err) {
@ -43,9 +43,9 @@ func WriteError(err error, w http.ResponseWriter) error {
status = http.StatusNotFound
case codes.ResourceExhausted:
// We only expect to hit this if there is a DoS, so we just wait the full TTL.
// If this is ever hit in steady-state operations, consider increasing the MaxInFlight requests,
// If this is ever hit in steady-state operations, consider increasing the maxInFlight requests,
// or plumbing through the time to next expiration.
w.Header().Set("Retry-After", strconv.Itoa(int(CacheTTL.Seconds())))
w.Header().Set("Retry-After", strconv.Itoa(int(cacheTTL.Seconds())))
status = http.StatusTooManyRequests
default:
status = http.StatusInternalServerError

View File

@ -29,12 +29,12 @@ import (
)
var (
// Timeout after which tokens become invalid.
CacheTTL = 1 * time.Minute
// The maximum number of in-flight requests to allow.
MaxInFlight = 1000
// Length of the random base64 encoded token identifying the request.
TokenLen = 8
// cacheTTL is the timeout after which tokens become invalid.
cacheTTL = 1 * time.Minute
// maxInFlight is the maximum number of in-flight requests to allow.
maxInFlight = 1000
// tokenLen is the length of the random base64 encoded token identifying the request.
tokenLen = 8
)
// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
@ -77,14 +77,14 @@ func (c *requestCache) Insert(req request) (token string, err error) {
// Remove expired entries.
c.gc()
// If the cache is full, reject the request.
if c.ll.Len() == MaxInFlight {
return "", ErrorTooManyInFlight()
if c.ll.Len() == maxInFlight {
return "", NewErrorTooManyInFlight()
}
token, err = c.uniqueToken()
if err != nil {
return "", err
}
ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(CacheTTL)})
ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(cacheTTL)})
c.tokens[token] = ele
return token, nil
@ -112,15 +112,15 @@ func (c *requestCache) Consume(token string) (req request, found bool) {
// uniqueToken generates a random URL-safe token and ensures uniqueness.
func (c *requestCache) uniqueToken() (string, error) {
const maxTries = 10
// Number of bytes to be TokenLen when base64 encoded.
tokenSize := math.Ceil(float64(TokenLen) * 6 / 8)
// Number of bytes to be tokenLen when base64 encoded.
tokenSize := math.Ceil(float64(tokenLen) * 6 / 8)
rawToken := make([]byte, int(tokenSize))
for i := 0; i < maxTries; i++ {
if _, err := rand.Read(rawToken); err != nil {
return "", err
}
encoded := base64.RawURLEncoding.EncodeToString(rawToken)
token := encoded[:TokenLen]
token := encoded[:tokenLen]
// If it's unique, return it. Otherwise retry.
if _, exists := c.tokens[encoded]; !exists {
return token, nil

View File

@ -35,22 +35,22 @@ func TestInsert(t *testing.T) {
// Insert normal
oldestTok, err := c.Insert(nextRequest())
require.NoError(t, err)
assert.Len(t, oldestTok, TokenLen)
assert.Len(t, oldestTok, tokenLen)
assertCacheSize(t, c, 1)
// Insert until full
for i := 0; i < MaxInFlight-2; i++ {
for i := 0; i < maxInFlight-2; i++ {
tok, err := c.Insert(nextRequest())
require.NoError(t, err)
assert.Len(t, tok, TokenLen)
assert.Len(t, tok, tokenLen)
}
assertCacheSize(t, c, MaxInFlight-1)
assertCacheSize(t, c, maxInFlight-1)
newestReq := nextRequest()
newestTok, err := c.Insert(newestReq)
require.NoError(t, err)
assert.Len(t, newestTok, TokenLen)
assertCacheSize(t, c, MaxInFlight)
assert.Len(t, newestTok, tokenLen)
assertCacheSize(t, c, maxInFlight)
require.Contains(t, c.tokens, oldestTok, "oldest request should still be cached")
// Consume newest token.
@ -62,8 +62,8 @@ func TestInsert(t *testing.T) {
// Insert again (still full)
tok, err := c.Insert(nextRequest())
require.NoError(t, err)
assert.Len(t, tok, TokenLen)
assertCacheSize(t, c, MaxInFlight)
assert.Len(t, tok, tokenLen)
assertCacheSize(t, c, maxInFlight)
// Insert again (should evict)
_, err = c.Insert(nextRequest())
@ -71,9 +71,9 @@ func TestInsert(t *testing.T) {
errResponse := httptest.NewRecorder()
require.NoError(t, WriteError(err, errResponse))
assert.Equal(t, errResponse.Code, http.StatusTooManyRequests)
assert.Equal(t, strconv.Itoa(int(CacheTTL.Seconds())), errResponse.HeaderMap.Get("Retry-After"))
assert.Equal(t, strconv.Itoa(int(cacheTTL.Seconds())), errResponse.HeaderMap.Get("Retry-After"))
assertCacheSize(t, c, MaxInFlight)
assertCacheSize(t, c, maxInFlight)
_, ok = c.Consume(oldestTok)
assert.True(t, ok, "oldest request should be valid")
}
@ -142,7 +142,7 @@ func TestConsume(t *testing.T) {
require.NoError(t, err)
assertCacheSize(t, c, 1)
clock.Step(2 * CacheTTL)
clock.Step(2 * cacheTTL)
_, ok := c.Consume(tok)
assert.False(t, ok)
@ -167,7 +167,7 @@ func TestGC(t *testing.T) {
// expired: tok1, tok2
// non-expired: tok3, tok4
clock.Step(2 * CacheTTL)
clock.Step(2 * cacheTTL)
tok3, err := c.Insert(nextRequest())
require.NoError(t, err)
assertCacheSize(t, c, 1)
@ -186,14 +186,14 @@ func TestGC(t *testing.T) {
assert.True(t, ok)
// When full, nothing is expired.
for i := 0; i < MaxInFlight; i++ {
for i := 0; i < maxInFlight; i++ {
_, err := c.Insert(nextRequest())
require.NoError(t, err)
}
assertCacheSize(t, c, MaxInFlight)
assertCacheSize(t, c, maxInFlight)
// When everything is expired
clock.Step(2 * CacheTTL)
clock.Step(2 * cacheTTL)
_, err = c.Insert(nextRequest())
require.NoError(t, err)
assertCacheSize(t, c, 1)

View File

@ -39,7 +39,7 @@ import (
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
)
// The library interface to serve the stream requests.
// Server is the library interface to serve the stream requests.
type Server interface {
http.Handler
@ -59,7 +59,7 @@ type Server interface {
Stop() error
}
// The interface to execute the commands and provide the streams.
// Runtime is the interface to execute the commands and provide the streams.
type Runtime interface {
Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
@ -103,6 +103,7 @@ var DefaultConfig = Config{
SupportedPortForwardProtocols: portforward.SupportedProtocols,
}
// NewServer creates a new Server for stream requests.
// TODO(tallclair): Add auth(n/z) interface & handling.
func NewServer(config Config, runtime Runtime) (Server, error) {
s := &server{
@ -243,9 +244,8 @@ func (s *server) Start(stayUp bool) error {
s.config.BaseURL.Host = listener.Addr().String()
if s.config.TLSConfig != nil {
return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
} else {
return s.server.Serve(listener)
}
return s.server.Serve(listener)
}
func (s *server) Stop() error {