Merge pull request #4901 from vmarmol/mon-startup

Adding sync pod latency metric (again).
pull/6/head
Rohit Jnagal 2015-02-27 10:08:25 -08:00
commit c6175facfb
4 changed files with 127 additions and 38 deletions

View File

@ -61,7 +61,10 @@ const podOomScoreAdj = -100
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncPods([]api.BoundPod) error
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
}
type SourceReadyFn func(source string) bool
@ -942,7 +945,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke
func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
start := time.Now()
defer func() {
metrics.ImagePullLatency.Observe(float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()))
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
}()
if err := kl.dockerPuller.Pull(img); err != nil {
@ -1270,7 +1273,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker
}
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
@ -1296,7 +1299,9 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
}
// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(*pod)
kl.podWorkers.UpdatePod(pod, func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
}
// Stop the workers for no-longer existing pods.
@ -1416,19 +1421,21 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
unsyncedPod := false
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
select {
case u := <-updates:
kl.updatePods(u)
kl.updatePods(u, podSyncTypes)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u)
kl.updatePods(u, podSyncTypes)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
@ -1440,25 +1447,54 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Errorf("Failed to get bound pods.")
return
}
if err := handler.SyncPods(pods); err != nil {
if err := handler.SyncPods(pods, podSyncTypes, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
}
}
func (kl *Kubelet) updatePods(u PodUpdate) {
// Updated the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")
// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for i := range u.Pods {
if _, ok := existingPods[u.Pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate
}
}
kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")
// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}
kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}
// Mark all remaining pods as sync.
for i := range kl.pods {
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}
// Returns Docker version for this Kubelet.

View File

@ -34,6 +34,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -382,6 +383,8 @@ func (cr *channelReader) GetList() [][]api.BoundPod {
return cr.list
}
var emptyPodUIDs map[types.UID]metrics.SyncPodType
func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
container := api.Container{Name: "bar"}
@ -413,7 +416,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -444,7 +447,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -491,7 +494,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -542,7 +545,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -590,7 +593,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -645,7 +648,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -690,7 +693,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -728,7 +731,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
@ -736,7 +739,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
@ -787,7 +790,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
@ -795,7 +798,7 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
fakeDocker.ClearCalls()
ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
@ -833,7 +836,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.BoundPod{})
err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -2091,7 +2094,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
})
}, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -18,6 +18,7 @@ package metrics
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
@ -35,8 +36,16 @@ var (
Help: "Image pull latency in microseconds.",
},
)
// TODO(vmarmol): Break down by number of containers in pod?
SyncPodLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Name: "sync_pod_latency_microseconds",
Help: "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync",
},
[]string{"operation_type"},
)
// TODO(vmarmol): Containers per pod
// TODO(vmarmol): Latency of pod startup
// TODO(vmarmol): Latency of SyncPods
)
@ -47,10 +56,37 @@ func Register(containerCache dockertools.DockerCache) {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(ImagePullLatency)
prometheus.MustRegister(SyncPodLatency)
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
})
}
type SyncPodType int
const (
SyncPodCreate SyncPodType = iota
SyncPodUpdate
SyncPodSync
)
func (self SyncPodType) String() string {
switch self {
case SyncPodCreate:
return "create"
case SyncPodUpdate:
return "update"
case SyncPodSync:
return "sync"
default:
return "unknown"
}
}
// Gets the time since the specified start in microseconds.
func SinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector {
return &podAndContainerCollector{
containerCache: containerCache,

View File

@ -26,7 +26,7 @@ import (
"github.com/golang/glog"
)
type syncPodFunType func(*api.BoundPod, dockertools.DockerContainers) error
type syncPodFnType func(*api.BoundPod, dockertools.DockerContainers) error
// TODO(wojtek-t) Add unit tests for this type.
type podWorkers struct {
@ -35,7 +35,7 @@ type podWorkers struct {
// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan api.BoundPod
podUpdates map[types.UID]chan workUpdate
// Track the current state of per-pod goroutines.
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
@ -46,44 +46,55 @@ type podWorkers struct {
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
// different pods at the same time.
syncPodFun syncPodFunType
syncPodFn syncPodFnType
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFun syncPodFunType) *podWorkers {
type workUpdate struct {
// The pod state to reflect.
pod *api.BoundPod
// Function to call when the update is complete.
updateCompleteFn func()
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan api.BoundPod{},
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
dockerCache: dockerCache,
syncPodFun: syncPodFun,
syncPodFn: syncPodFn,
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan api.BoundPod) {
for newPod := range podUpdates {
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
for newWork := range podUpdates {
// Since we use docker cache, getting current state shouldn't cause
// performance overhead on Docker. Moreover, as long as we run syncPod
// no matter if it changes anything, having an old version of "containers"
// can cause starting eunended containers.
func() {
defer p.setIsWorking(newPod.UID, false)
defer p.setIsWorking(newWork.pod.UID, false)
containers, err := p.dockerCache.RunningContainers()
if err != nil {
glog.Errorf("Error listing containers while syncing pod: %v", err)
return
}
err = p.syncPodFun(&newPod, containers)
err = p.syncPodFn(newWork.pod, containers)
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newPod.UID, err)
record.Eventf(&newPod, "failedSync", "Error syncing pod, skipping: %v", err)
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
record.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
return
}
newWork.updateCompleteFn()
}()
}
}
func (p *podWorkers) UpdatePod(pod api.BoundPod) {
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) {
uid := pod.UID
var podUpdates chan api.BoundPod
var podUpdates chan workUpdate
var exists bool
p.podLock.Lock()
@ -91,7 +102,7 @@ func (p *podWorkers) UpdatePod(pod api.BoundPod) {
if podUpdates, exists = p.podUpdates[uid]; !exists {
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
podUpdates = make(chan api.BoundPod, 1)
podUpdates = make(chan workUpdate, 1)
p.podUpdates[uid] = podUpdates
go p.managePodLoop(podUpdates)
}
@ -109,7 +120,10 @@ func (p *podWorkers) UpdatePod(pod api.BoundPod) {
// be resend (because we don't drop it)
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- pod
podUpdates <- workUpdate{
pod: pod,
updateCompleteFn: updateComplete,
}
}
}