From 7bca355bb4364eda82e4e5b9da6a0316aef8f0b4 Mon Sep 17 00:00:00 2001 From: Phillip Wittrock Date: Tue, 9 Feb 2016 13:09:37 -0800 Subject: [PATCH] Spread pod volume metrics calc across calc period. Metrics are calculated independently. --- pkg/kubelet/metrics/metrics.go | 9 - .../server/stats/fs_resource_analyzer.go | 119 ++++-------- .../server/stats/fs_resource_analyzer_test.go | 179 ------------------ .../server/stats/volume_stat_caculator.go | 122 ++++++++++++ 4 files changed, 158 insertions(+), 271 deletions(-) delete mode 100644 pkg/kubelet/server/stats/fs_resource_analyzer_test.go create mode 100644 pkg/kubelet/server/stats/volume_stat_caculator.go diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 1463826aa9..4c8a521594 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -37,7 +37,6 @@ const ( PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds" PLEGRelistLatencyKey = "pleg_relist_latency_microseconds" PLEGRelistIntervalKey = "pleg_relist_interval_microseconds" - MetricsVolumeCalcLatencyKey = "metrics_volume_calc_microseconds" ) var ( @@ -122,13 +121,6 @@ var ( Help: "Interval in microseconds between relisting in PLEG.", }, ) - MetricsVolumeCalcLatency = prometheus.NewSummary( - prometheus.SummaryOpts{ - Subsystem: KubeletSubsystem, - Name: MetricsVolumeCalcLatencyKey, - Help: "Latency in microseconds for calculating volume metrics.", - }, - ) ) var registerMetrics sync.Once @@ -149,7 +141,6 @@ func Register(containerCache kubecontainer.RuntimeCache) { prometheus.MustRegister(newPodAndContainerCollector(containerCache)) prometheus.MustRegister(PLEGRelistLatency) prometheus.MustRegister(PLEGRelistInterval) - prometheus.MustRegister(MetricsVolumeCalcLatency) }) } diff --git a/pkg/kubelet/server/stats/fs_resource_analyzer.go b/pkg/kubelet/server/stats/fs_resource_analyzer.go index 75b8ea934c..c45e34694a 100644 --- a/pkg/kubelet/server/stats/fs_resource_analyzer.go +++ b/pkg/kubelet/server/stats/fs_resource_analyzer.go @@ -17,28 +17,19 @@ limitations under the License. package stats import ( + "sync" "sync/atomic" "time" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" - "k8s.io/kubernetes/pkg/kubelet/metrics" - "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/wait" - "k8s.io/kubernetes/pkg/volume" "github.com/golang/glog" ) // Map to PodVolumeStats pointers since the addresses for map values are not constant and can cause pain // if we need ever to get a pointer to one of the values (e.g. you can't) -type Cache map[types.UID]*PodVolumeStats - -// PodVolumeStats encapsulates all VolumeStats for a pod -type PodVolumeStats struct { - Volumes []stats.VolumeStats -} +type Cache map[types.UID]*volumeStatCalculator // fsResourceAnalyzerInterface is for embedding fs functions into ResourceAnalyzer type fsResourceAnalyzerInterface interface { @@ -48,107 +39,69 @@ type fsResourceAnalyzerInterface interface { // diskResourceAnalyzer provider stats about fs resource usage type fsResourceAnalyzer struct { statsProvider StatsProvider - calcVolumePeriod time.Duration + calcPeriod time.Duration cachedVolumeStats atomic.Value + startOnce sync.Once } var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{} // newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation func newFsResourceAnalyzer(statsProvider StatsProvider, calcVolumePeriod time.Duration) *fsResourceAnalyzer { - return &fsResourceAnalyzer{ - statsProvider: statsProvider, - calcVolumePeriod: calcVolumePeriod, + r := &fsResourceAnalyzer{ + statsProvider: statsProvider, + calcPeriod: calcVolumePeriod, } + r.cachedVolumeStats.Store(make(Cache)) + return r } // Start eager background caching of volume stats. func (s *fsResourceAnalyzer) Start() { - if s.calcVolumePeriod <= 0 { - glog.Info("Volume stats collection disabled.") - return - } - glog.Info("Starting FS ResourceAnalyzer") - go wait.Forever(func() { - startTime := time.Now() - s.updateCachedPodVolumeStats() - glog.V(3).Infof("Finished calculating volume stats in %v.", time.Now().Sub(startTime)) - metrics.MetricsVolumeCalcLatency.Observe(metrics.SinceInMicroseconds(startTime)) - }, s.calcVolumePeriod) + s.startOnce.Do(func() { + if s.calcPeriod <= 0 { + glog.Info("Volume stats collection disabled.") + return + } + glog.Info("Starting FS ResourceAnalyzer") + go wait.Forever(func() { s.updateCachedPodVolumeStats() }, s.calcPeriod) + }) } // updateCachedPodVolumeStats calculates and caches the PodVolumeStats for every Pod known to the kubelet. func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() { - // Calculate the new volume stats map - pods := s.statsProvider.GetPods() + oldCache := s.cachedVolumeStats.Load().(Cache) newCache := make(Cache) - // TODO: Prevent 1 pod metrics hanging from blocking other pods. Schedule pods independently and spaced - // evenly across the period to prevent cpu spikes. Ideally resource collection consumes the resources - // allocated to the pod itself to isolate bad actors. - // See issue #20675 - for _, pod := range pods { - podUid := pod.GetUID() - stats, found := s.getPodVolumeStats(pod) - if !found { - glog.Warningf("Could not locate volumes for pod %s", format.Pod(pod)) - continue + + // Copy existing entries to new map, creating/starting new entries for pods missing from the cache + for _, pod := range s.statsProvider.GetPods() { + if value, found := oldCache[pod.GetUID()]; !found { + newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod).StartOnce() + } else { + newCache[pod.GetUID()] = value } - newCache[podUid] = &stats } + + // Stop entries for pods that have been deleted + for uid, entry := range oldCache { + if _, found := newCache[uid]; !found { + entry.StopOnce() + } + } + // Update the cache reference s.cachedVolumeStats.Store(newCache) } -// getPodVolumeStats calculates PodVolumeStats for a given pod and returns the result. -func (s *fsResourceAnalyzer) getPodVolumeStats(pod *api.Pod) (PodVolumeStats, bool) { - // Find all Volumes for the Pod - volumes, found := s.statsProvider.ListVolumesForPod(pod.UID) - if !found { - return PodVolumeStats{}, found - } - - // Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats - stats := make([]stats.VolumeStats, 0, len(volumes)) - for name, v := range volumes { - metric, err := v.GetMetrics() - if err != nil { - // Expected for Volumes that don't support Metrics - // TODO: Disambiguate unsupported from errors - // See issue #20676 - glog.V(4).Infof("Failed to calculate volume metrics for pod %s volume %s: %+v", - format.Pod(pod), name, err) - continue - } - stats = append(stats, s.parsePodVolumeStats(name, metric)) - } - return PodVolumeStats{Volumes: stats}, true -} - -func (s *fsResourceAnalyzer) parsePodVolumeStats(podName string, metric *volume.Metrics) stats.VolumeStats { - available := uint64(metric.Available.Value()) - capacity := uint64(metric.Capacity.Value()) - used := uint64((metric.Used.Value())) - return stats.VolumeStats{ - Name: podName, - FsStats: stats.FsStats{ - AvailableBytes: &available, - CapacityBytes: &capacity, - UsedBytes: &used}} -} - // GetPodVolumeStats returns the PodVolumeStats for a given pod. Results are looked up from a cache that // is eagerly populated in the background, and never calculated on the fly. func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) { - // Cache hasn't been initialized yet - if s.cachedVolumeStats.Load() == nil { - return PodVolumeStats{}, false - } cache := s.cachedVolumeStats.Load().(Cache) - stats, f := cache[uid] - if !f { + if statCalc, found := cache[uid]; !found { // TODO: Differentiate between stats being empty // See issue #20679 return PodVolumeStats{}, false + } else { + return statCalc.GetLatest() } - return *stats, true } diff --git a/pkg/kubelet/server/stats/fs_resource_analyzer_test.go b/pkg/kubelet/server/stats/fs_resource_analyzer_test.go deleted file mode 100644 index b75d72939a..0000000000 --- a/pkg/kubelet/server/stats/fs_resource_analyzer_test.go +++ /dev/null @@ -1,179 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package stats - -import ( - "fmt" - "testing" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/resource" - "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" - "k8s.io/kubernetes/pkg/types" - "k8s.io/kubernetes/pkg/volume" - - "github.com/stretchr/testify/assert" -) - -// TestGetPodVolumeStats tests that GetPodVolumeStats reads from the cache and returns the value -func TestGetPodVolumeStats(t *testing.T) { - instance := newFsResourceAnalyzer(&MockStatsProvider{}, time.Minute*5) - vStats, found := instance.GetPodVolumeStats("testpod1") - assert.False(t, found) - assert.Equal(t, PodVolumeStats{}, vStats) - - instance.cachedVolumeStats.Store(make(Cache)) - vStats, found = instance.GetPodVolumeStats("testpod1") - assert.False(t, found) - assert.Equal(t, PodVolumeStats{}, vStats) - - available := uint64(100) - used := uint64(200) - capacity := uint64(400) - vs1 := stats.VolumeStats{ - Name: "vol1", - FsStats: stats.FsStats{ - AvailableBytes: &available, - UsedBytes: &used, - CapacityBytes: &capacity, - }, - } - pvs := &PodVolumeStats{ - Volumes: []stats.VolumeStats{vs1}, - } - - instance.cachedVolumeStats.Load().(Cache)["testpod1"] = pvs - vStats, found = instance.GetPodVolumeStats("testpod1") - assert.True(t, found) - assert.Equal(t, *pvs, vStats) -} - -// TestUpdateCachedPodVolumeStats tests that the cache is updated from the stats provider -func TestUpdateCachedPodVolumeStats(t *testing.T) { - statsPr := &MockStatsProvider{} - instance := newFsResourceAnalyzer(statsPr, time.Minute*5) - - // Mock retrieving pods - pods := []*api.Pod{ - {ObjectMeta: api.ObjectMeta{UID: "testpod1"}}, - {ObjectMeta: api.ObjectMeta{UID: "testpod2"}}, - } - statsPr.On("GetPods").Return(pods) - - // Mock volumes for pod1 - m1 := &volume.Metrics{ - Available: resource.NewQuantity(100, resource.DecimalSI), - Used: resource.NewQuantity(200, resource.DecimalSI), - Capacity: resource.NewQuantity(400, resource.DecimalSI), - } - v1 := &volume.MockVolume{} - v1.On("GetMetrics").Return(m1, nil) - - m2 := &volume.Metrics{ - Available: resource.NewQuantity(600, resource.DecimalSI), - Used: resource.NewQuantity(700, resource.DecimalSI), - Capacity: resource.NewQuantity(1400, resource.DecimalSI), - } - v2 := &volume.MockVolume{} - v2.On("GetMetrics").Return(m2, nil) - tp1Volumes := map[string]volume.Volume{ - "v1": v1, - "v2": v2, - } - statsPr.On("ListVolumesForPod", types.UID("testpod1")).Return(tp1Volumes, true) - - // Mock volumes for pod2 - m3 := &volume.Metrics{ - Available: resource.NewQuantity(800, resource.DecimalSI), - Used: resource.NewQuantity(900, resource.DecimalSI), - Capacity: resource.NewQuantity(1800, resource.DecimalSI), - } - v3 := &volume.MockVolume{} - v3.On("GetMetrics").Return(m3, nil) - v4 := &volume.MockVolume{} - v4.On("GetMetrics").Return(nil, fmt.Errorf("Error calculating stats")) - tp2Volumes := map[string]volume.Volume{ - "v3": v3, - "v4": v4, - } - statsPr.On("ListVolumesForPod", types.UID("testpod2")).Return(tp2Volumes, true) - - instance.updateCachedPodVolumeStats() - - actual1, found := instance.GetPodVolumeStats("testpod1") - assert.True(t, found) - assert.Len(t, actual1.Volumes, 2) - v1available := uint64(100) - v1used := uint64(200) - v1capacity := uint64(400) - assert.Contains(t, actual1.Volumes, stats.VolumeStats{ - Name: "v1", - FsStats: stats.FsStats{ - AvailableBytes: &v1available, - UsedBytes: &v1used, - CapacityBytes: &v1capacity, - }, - }) - - v2available := uint64(600) - v2used := uint64(700) - v2capacity := uint64(1400) - assert.Contains(t, actual1.Volumes, stats.VolumeStats{ - Name: "v2", - FsStats: stats.FsStats{ - AvailableBytes: &v2available, - UsedBytes: &v2used, - CapacityBytes: &v2capacity, - }, - }) - - v3available := uint64(800) - v3used := uint64(900) - v3capacity := uint64(1800) - actual2, found := instance.GetPodVolumeStats("testpod2") - assert.True(t, found) - assert.Len(t, actual2.Volumes, 1) - assert.Contains(t, actual2.Volumes, stats.VolumeStats{ - Name: "v3", - FsStats: stats.FsStats{ - AvailableBytes: &v3available, - UsedBytes: &v3used, - CapacityBytes: &v3capacity, - }, - }) - - // Make sure the cache gets updated. The mocking libraries have trouble - pods = []*api.Pod{ - {ObjectMeta: api.ObjectMeta{UID: "testpod3"}}, - } - statsPr.On("GetPods").Return(pods) - - // pod3 volumes - m1 = &volume.Metrics{ - Available: resource.NewQuantity(150, resource.DecimalSI), - Used: resource.NewQuantity(200, resource.DecimalSI), - Capacity: resource.NewQuantity(600, resource.DecimalSI), - } - v1 = &volume.MockVolume{} - v1.On("GetMetrics").Return(m1, nil) - - tp1Volumes = map[string]volume.Volume{ - "v1": v1, - } - statsPr.On("ListVolumesForPod", types.UID("testpod3")).Return(tp1Volumes, true) -} diff --git a/pkg/kubelet/server/stats/volume_stat_caculator.go b/pkg/kubelet/server/stats/volume_stat_caculator.go new file mode 100644 index 0000000000..cb15a0453b --- /dev/null +++ b/pkg/kubelet/server/stats/volume_stat_caculator.go @@ -0,0 +1,122 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stats + +import ( + "sync" + "sync/atomic" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" + "k8s.io/kubernetes/pkg/kubelet/util/format" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/volume" + + "github.com/golang/glog" +) + +// volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result +type volumeStatCalculator struct { + statsProvider StatsProvider + jitterPeriod time.Duration + pod *api.Pod + stopChannel chan struct{} + startO sync.Once + stopO sync.Once + latest atomic.Value +} + +// PodVolumeStats encapsulates all VolumeStats for a pod +type PodVolumeStats struct { + Volumes []stats.VolumeStats +} + +// newVolumeStatCalculator creates a new VolumeStatCalculator +func newVolumeStatCalculator(statsProvider StatsProvider, jitterPeriod time.Duration, pod *api.Pod) *volumeStatCalculator { + return &volumeStatCalculator{ + statsProvider: statsProvider, + jitterPeriod: jitterPeriod, + pod: pod, + stopChannel: make(chan struct{}), + } +} + +// StartOnce starts pod volume calc that will occur periodically in the background until s.StopOnce is called +func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator { + s.startO.Do(func() { + go wait.JitterUntil(func() { + s.calcAndStoreStats() + }, s.jitterPeriod, 1.0, s.stopChannel) + }) + return s +} + +// StopOnce stops background pod volume calculation. Will not stop a currently executing calculations until +// they complete their current iteration. +func (s *volumeStatCalculator) StopOnce() *volumeStatCalculator { + s.stopO.Do(func() { + close(s.stopChannel) + }) + return s +} + +// getLatest returns the most recent PodVolumeStats from the cache +func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) { + if result := s.latest.Load(); result == nil { + return PodVolumeStats{}, false + } else { + return result.(PodVolumeStats), true + } +} + +// calcAndStoreStats calculates PodVolumeStats for a given pod and writes the result to the s.latest cache. +func (s *volumeStatCalculator) calcAndStoreStats() { + // Find all Volumes for the Pod + volumes, found := s.statsProvider.ListVolumesForPod(s.pod.UID) + if !found { + return + } + + // Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats + stats := make([]stats.VolumeStats, 0, len(volumes)) + for name, v := range volumes { + metric, err := v.GetMetrics() + if err != nil { + // Expected for Volumes that don't support Metrics + // TODO: Disambiguate unsupported from errors + // See issue #20676 + glog.V(4).Infof("Failed to calculate volume metrics for pod %s volume %s: %+v", format.Pod(s.pod), name, err) + continue + } + stats = append(stats, s.parsePodVolumeStats(name, metric)) + } + + // Store the new stats + s.latest.Store(PodVolumeStats{Volumes: stats}) +} + +// parsePodVolumeStats converts (internal) volume.Metrics to (external) stats.VolumeStats structures +func (s *volumeStatCalculator) parsePodVolumeStats(podName string, metric *volume.Metrics) stats.VolumeStats { + available := uint64(metric.Available.Value()) + capacity := uint64(metric.Capacity.Value()) + used := uint64((metric.Used.Value())) + return stats.VolumeStats{ + Name: podName, + FsStats: stats.FsStats{AvailableBytes: &available, CapacityBytes: &capacity, UsedBytes: &used}, + } +}