Merge pull request #20940 from pwittrock/volume-sync

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-02-25 02:29:31 -08:00
commit 82b0f0ff5e
4 changed files with 158 additions and 271 deletions

View File

@ -37,7 +37,6 @@ const (
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
PLEGRelistLatencyKey = "pleg_relist_latency_microseconds"
PLEGRelistIntervalKey = "pleg_relist_interval_microseconds"
MetricsVolumeCalcLatencyKey = "metrics_volume_calc_microseconds"
)
var (
@ -122,13 +121,6 @@ var (
Help: "Interval in microseconds between relisting in PLEG.",
},
)
MetricsVolumeCalcLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Name: MetricsVolumeCalcLatencyKey,
Help: "Latency in microseconds for calculating volume metrics.",
},
)
)
var registerMetrics sync.Once
@ -149,7 +141,6 @@ func Register(containerCache kubecontainer.RuntimeCache) {
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
prometheus.MustRegister(PLEGRelistLatency)
prometheus.MustRegister(PLEGRelistInterval)
prometheus.MustRegister(MetricsVolumeCalcLatency)
})
}

View File

@ -17,28 +17,19 @@ limitations under the License.
package stats
import (
"sync"
"sync/atomic"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
"github.com/golang/glog"
)
// Map to PodVolumeStats pointers since the addresses for map values are not constant and can cause pain
// if we need ever to get a pointer to one of the values (e.g. you can't)
type Cache map[types.UID]*PodVolumeStats
// PodVolumeStats encapsulates all VolumeStats for a pod
type PodVolumeStats struct {
Volumes []stats.VolumeStats
}
type Cache map[types.UID]*volumeStatCalculator
// fsResourceAnalyzerInterface is for embedding fs functions into ResourceAnalyzer
type fsResourceAnalyzerInterface interface {
@ -48,107 +39,69 @@ type fsResourceAnalyzerInterface interface {
// diskResourceAnalyzer provider stats about fs resource usage
type fsResourceAnalyzer struct {
statsProvider StatsProvider
calcVolumePeriod time.Duration
calcPeriod time.Duration
cachedVolumeStats atomic.Value
startOnce sync.Once
}
var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{}
// newFsResourceAnalyzer returns a new fsResourceAnalyzer implementation
func newFsResourceAnalyzer(statsProvider StatsProvider, calcVolumePeriod time.Duration) *fsResourceAnalyzer {
return &fsResourceAnalyzer{
statsProvider: statsProvider,
calcVolumePeriod: calcVolumePeriod,
r := &fsResourceAnalyzer{
statsProvider: statsProvider,
calcPeriod: calcVolumePeriod,
}
r.cachedVolumeStats.Store(make(Cache))
return r
}
// Start eager background caching of volume stats.
func (s *fsResourceAnalyzer) Start() {
if s.calcVolumePeriod <= 0 {
glog.Info("Volume stats collection disabled.")
return
}
glog.Info("Starting FS ResourceAnalyzer")
go wait.Forever(func() {
startTime := time.Now()
s.updateCachedPodVolumeStats()
glog.V(3).Infof("Finished calculating volume stats in %v.", time.Now().Sub(startTime))
metrics.MetricsVolumeCalcLatency.Observe(metrics.SinceInMicroseconds(startTime))
}, s.calcVolumePeriod)
s.startOnce.Do(func() {
if s.calcPeriod <= 0 {
glog.Info("Volume stats collection disabled.")
return
}
glog.Info("Starting FS ResourceAnalyzer")
go wait.Forever(func() { s.updateCachedPodVolumeStats() }, s.calcPeriod)
})
}
// updateCachedPodVolumeStats calculates and caches the PodVolumeStats for every Pod known to the kubelet.
func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
// Calculate the new volume stats map
pods := s.statsProvider.GetPods()
oldCache := s.cachedVolumeStats.Load().(Cache)
newCache := make(Cache)
// TODO: Prevent 1 pod metrics hanging from blocking other pods. Schedule pods independently and spaced
// evenly across the period to prevent cpu spikes. Ideally resource collection consumes the resources
// allocated to the pod itself to isolate bad actors.
// See issue #20675
for _, pod := range pods {
podUid := pod.GetUID()
stats, found := s.getPodVolumeStats(pod)
if !found {
glog.Warningf("Could not locate volumes for pod %s", format.Pod(pod))
continue
// Copy existing entries to new map, creating/starting new entries for pods missing from the cache
for _, pod := range s.statsProvider.GetPods() {
if value, found := oldCache[pod.GetUID()]; !found {
newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod).StartOnce()
} else {
newCache[pod.GetUID()] = value
}
newCache[podUid] = &stats
}
// Stop entries for pods that have been deleted
for uid, entry := range oldCache {
if _, found := newCache[uid]; !found {
entry.StopOnce()
}
}
// Update the cache reference
s.cachedVolumeStats.Store(newCache)
}
// getPodVolumeStats calculates PodVolumeStats for a given pod and returns the result.
func (s *fsResourceAnalyzer) getPodVolumeStats(pod *api.Pod) (PodVolumeStats, bool) {
// Find all Volumes for the Pod
volumes, found := s.statsProvider.ListVolumesForPod(pod.UID)
if !found {
return PodVolumeStats{}, found
}
// Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats
stats := make([]stats.VolumeStats, 0, len(volumes))
for name, v := range volumes {
metric, err := v.GetMetrics()
if err != nil {
// Expected for Volumes that don't support Metrics
// TODO: Disambiguate unsupported from errors
// See issue #20676
glog.V(4).Infof("Failed to calculate volume metrics for pod %s volume %s: %+v",
format.Pod(pod), name, err)
continue
}
stats = append(stats, s.parsePodVolumeStats(name, metric))
}
return PodVolumeStats{Volumes: stats}, true
}
func (s *fsResourceAnalyzer) parsePodVolumeStats(podName string, metric *volume.Metrics) stats.VolumeStats {
available := uint64(metric.Available.Value())
capacity := uint64(metric.Capacity.Value())
used := uint64((metric.Used.Value()))
return stats.VolumeStats{
Name: podName,
FsStats: stats.FsStats{
AvailableBytes: &available,
CapacityBytes: &capacity,
UsedBytes: &used}}
}
// GetPodVolumeStats returns the PodVolumeStats for a given pod. Results are looked up from a cache that
// is eagerly populated in the background, and never calculated on the fly.
func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) {
// Cache hasn't been initialized yet
if s.cachedVolumeStats.Load() == nil {
return PodVolumeStats{}, false
}
cache := s.cachedVolumeStats.Load().(Cache)
stats, f := cache[uid]
if !f {
if statCalc, found := cache[uid]; !found {
// TODO: Differentiate between stats being empty
// See issue #20679
return PodVolumeStats{}, false
} else {
return statCalc.GetLatest()
}
return *stats, true
}

View File

@ -1,179 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/volume"
"github.com/stretchr/testify/assert"
)
// TestGetPodVolumeStats tests that GetPodVolumeStats reads from the cache and returns the value
func TestGetPodVolumeStats(t *testing.T) {
instance := newFsResourceAnalyzer(&MockStatsProvider{}, time.Minute*5)
vStats, found := instance.GetPodVolumeStats("testpod1")
assert.False(t, found)
assert.Equal(t, PodVolumeStats{}, vStats)
instance.cachedVolumeStats.Store(make(Cache))
vStats, found = instance.GetPodVolumeStats("testpod1")
assert.False(t, found)
assert.Equal(t, PodVolumeStats{}, vStats)
available := uint64(100)
used := uint64(200)
capacity := uint64(400)
vs1 := stats.VolumeStats{
Name: "vol1",
FsStats: stats.FsStats{
AvailableBytes: &available,
UsedBytes: &used,
CapacityBytes: &capacity,
},
}
pvs := &PodVolumeStats{
Volumes: []stats.VolumeStats{vs1},
}
instance.cachedVolumeStats.Load().(Cache)["testpod1"] = pvs
vStats, found = instance.GetPodVolumeStats("testpod1")
assert.True(t, found)
assert.Equal(t, *pvs, vStats)
}
// TestUpdateCachedPodVolumeStats tests that the cache is updated from the stats provider
func TestUpdateCachedPodVolumeStats(t *testing.T) {
statsPr := &MockStatsProvider{}
instance := newFsResourceAnalyzer(statsPr, time.Minute*5)
// Mock retrieving pods
pods := []*api.Pod{
{ObjectMeta: api.ObjectMeta{UID: "testpod1"}},
{ObjectMeta: api.ObjectMeta{UID: "testpod2"}},
}
statsPr.On("GetPods").Return(pods)
// Mock volumes for pod1
m1 := &volume.Metrics{
Available: resource.NewQuantity(100, resource.DecimalSI),
Used: resource.NewQuantity(200, resource.DecimalSI),
Capacity: resource.NewQuantity(400, resource.DecimalSI),
}
v1 := &volume.MockVolume{}
v1.On("GetMetrics").Return(m1, nil)
m2 := &volume.Metrics{
Available: resource.NewQuantity(600, resource.DecimalSI),
Used: resource.NewQuantity(700, resource.DecimalSI),
Capacity: resource.NewQuantity(1400, resource.DecimalSI),
}
v2 := &volume.MockVolume{}
v2.On("GetMetrics").Return(m2, nil)
tp1Volumes := map[string]volume.Volume{
"v1": v1,
"v2": v2,
}
statsPr.On("ListVolumesForPod", types.UID("testpod1")).Return(tp1Volumes, true)
// Mock volumes for pod2
m3 := &volume.Metrics{
Available: resource.NewQuantity(800, resource.DecimalSI),
Used: resource.NewQuantity(900, resource.DecimalSI),
Capacity: resource.NewQuantity(1800, resource.DecimalSI),
}
v3 := &volume.MockVolume{}
v3.On("GetMetrics").Return(m3, nil)
v4 := &volume.MockVolume{}
v4.On("GetMetrics").Return(nil, fmt.Errorf("Error calculating stats"))
tp2Volumes := map[string]volume.Volume{
"v3": v3,
"v4": v4,
}
statsPr.On("ListVolumesForPod", types.UID("testpod2")).Return(tp2Volumes, true)
instance.updateCachedPodVolumeStats()
actual1, found := instance.GetPodVolumeStats("testpod1")
assert.True(t, found)
assert.Len(t, actual1.Volumes, 2)
v1available := uint64(100)
v1used := uint64(200)
v1capacity := uint64(400)
assert.Contains(t, actual1.Volumes, stats.VolumeStats{
Name: "v1",
FsStats: stats.FsStats{
AvailableBytes: &v1available,
UsedBytes: &v1used,
CapacityBytes: &v1capacity,
},
})
v2available := uint64(600)
v2used := uint64(700)
v2capacity := uint64(1400)
assert.Contains(t, actual1.Volumes, stats.VolumeStats{
Name: "v2",
FsStats: stats.FsStats{
AvailableBytes: &v2available,
UsedBytes: &v2used,
CapacityBytes: &v2capacity,
},
})
v3available := uint64(800)
v3used := uint64(900)
v3capacity := uint64(1800)
actual2, found := instance.GetPodVolumeStats("testpod2")
assert.True(t, found)
assert.Len(t, actual2.Volumes, 1)
assert.Contains(t, actual2.Volumes, stats.VolumeStats{
Name: "v3",
FsStats: stats.FsStats{
AvailableBytes: &v3available,
UsedBytes: &v3used,
CapacityBytes: &v3capacity,
},
})
// Make sure the cache gets updated. The mocking libraries have trouble
pods = []*api.Pod{
{ObjectMeta: api.ObjectMeta{UID: "testpod3"}},
}
statsPr.On("GetPods").Return(pods)
// pod3 volumes
m1 = &volume.Metrics{
Available: resource.NewQuantity(150, resource.DecimalSI),
Used: resource.NewQuantity(200, resource.DecimalSI),
Capacity: resource.NewQuantity(600, resource.DecimalSI),
}
v1 = &volume.MockVolume{}
v1.On("GetMetrics").Return(m1, nil)
tp1Volumes = map[string]volume.Volume{
"v1": v1,
}
statsPr.On("ListVolumesForPod", types.UID("testpod3")).Return(tp1Volumes, true)
}

View File

@ -0,0 +1,122 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"sync"
"sync/atomic"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
"github.com/golang/glog"
)
// volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result
type volumeStatCalculator struct {
statsProvider StatsProvider
jitterPeriod time.Duration
pod *api.Pod
stopChannel chan struct{}
startO sync.Once
stopO sync.Once
latest atomic.Value
}
// PodVolumeStats encapsulates all VolumeStats for a pod
type PodVolumeStats struct {
Volumes []stats.VolumeStats
}
// newVolumeStatCalculator creates a new VolumeStatCalculator
func newVolumeStatCalculator(statsProvider StatsProvider, jitterPeriod time.Duration, pod *api.Pod) *volumeStatCalculator {
return &volumeStatCalculator{
statsProvider: statsProvider,
jitterPeriod: jitterPeriod,
pod: pod,
stopChannel: make(chan struct{}),
}
}
// StartOnce starts pod volume calc that will occur periodically in the background until s.StopOnce is called
func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator {
s.startO.Do(func() {
go wait.JitterUntil(func() {
s.calcAndStoreStats()
}, s.jitterPeriod, 1.0, s.stopChannel)
})
return s
}
// StopOnce stops background pod volume calculation. Will not stop a currently executing calculations until
// they complete their current iteration.
func (s *volumeStatCalculator) StopOnce() *volumeStatCalculator {
s.stopO.Do(func() {
close(s.stopChannel)
})
return s
}
// getLatest returns the most recent PodVolumeStats from the cache
func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) {
if result := s.latest.Load(); result == nil {
return PodVolumeStats{}, false
} else {
return result.(PodVolumeStats), true
}
}
// calcAndStoreStats calculates PodVolumeStats for a given pod and writes the result to the s.latest cache.
func (s *volumeStatCalculator) calcAndStoreStats() {
// Find all Volumes for the Pod
volumes, found := s.statsProvider.ListVolumesForPod(s.pod.UID)
if !found {
return
}
// Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats
stats := make([]stats.VolumeStats, 0, len(volumes))
for name, v := range volumes {
metric, err := v.GetMetrics()
if err != nil {
// Expected for Volumes that don't support Metrics
// TODO: Disambiguate unsupported from errors
// See issue #20676
glog.V(4).Infof("Failed to calculate volume metrics for pod %s volume %s: %+v", format.Pod(s.pod), name, err)
continue
}
stats = append(stats, s.parsePodVolumeStats(name, metric))
}
// Store the new stats
s.latest.Store(PodVolumeStats{Volumes: stats})
}
// parsePodVolumeStats converts (internal) volume.Metrics to (external) stats.VolumeStats structures
func (s *volumeStatCalculator) parsePodVolumeStats(podName string, metric *volume.Metrics) stats.VolumeStats {
available := uint64(metric.Available.Value())
capacity := uint64(metric.Capacity.Value())
used := uint64((metric.Used.Value()))
return stats.VolumeStats{
Name: podName,
FsStats: stats.FsStats{AvailableBytes: &available, CapacityBytes: &capacity, UsedBytes: &used},
}
}