Use separate client for node status loop

pull/6/head
Jordan Liggitt 2017-09-14 12:15:52 -04:00
parent 3c8fb4b90f
commit f8f57d8959
No known key found for this signature in database
GPG Key ID: 39928704103C7229
10 changed files with 102 additions and 16 deletions

View File

@ -158,6 +158,7 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err
ContainerManager: nil, ContainerManager: nil,
DockerClient: dockerClient, DockerClient: dockerClient,
KubeClient: nil, KubeClient: nil,
HeartbeatClient: nil,
ExternalKubeClient: nil, ExternalKubeClient: nil,
EventClient: nil, EventClient: nil,
Mounter: mounter, Mounter: mounter,
@ -319,11 +320,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
kubeDeps.KubeClient = nil kubeDeps.KubeClient = nil
kubeDeps.ExternalKubeClient = nil kubeDeps.ExternalKubeClient = nil
kubeDeps.EventClient = nil kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
glog.Warningf("standalone mode, no API client") glog.Warningf("standalone mode, no API client")
} else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil { } else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil {
// initialize clients if not standalone mode and any of the clients are not provided // initialize clients if not standalone mode and any of the clients are not provided
var kubeClient clientset.Interface var kubeClient clientset.Interface
var eventClient v1core.EventsGetter var eventClient v1core.EventsGetter
var heartbeatClient v1core.CoreV1Interface
var externalKubeClient clientgoclientset.Interface var externalKubeClient clientgoclientset.Interface
clientConfig, err := CreateAPIServerClientConfig(s) clientConfig, err := CreateAPIServerClientConfig(s)
@ -352,16 +355,24 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
if err != nil { if err != nil {
glog.Warningf("New kubeClient from clientConfig error: %v", err) glog.Warningf("New kubeClient from clientConfig error: %v", err)
} }
// make a separate client for events // make a separate client for events
eventClientConfig := *clientConfig eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS) eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst) eventClientConfig.Burst = int(s.EventBurst)
tmpClient, err := clientgoclientset.NewForConfig(&eventClientConfig) eventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil { if err != nil {
glog.Warningf("Failed to create API Server client for Events: %v", err) glog.Warningf("Failed to create API Server client for Events: %v", err)
} }
eventClient = tmpClient.CoreV1()
// make a separate client for heartbeat with throttling disabled and a timeout attached
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
heartbeatClientConfig.QPS = float32(-1)
heartbeatClient, err = v1core.NewForConfig(&heartbeatClientConfig)
if err != nil {
glog.Warningf("Failed to create API Server client for heartbeat: %v", err)
}
} else { } else {
switch { switch {
case s.RequireKubeConfig: case s.RequireKubeConfig:
@ -373,6 +384,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
kubeDeps.KubeClient = kubeClient kubeDeps.KubeClient = kubeClient
kubeDeps.ExternalKubeClient = externalKubeClient kubeDeps.ExternalKubeClient = externalKubeClient
if heartbeatClient != nil {
kubeDeps.HeartbeatClient = heartbeatClient
}
if eventClient != nil { if eventClient != nil {
kubeDeps.EventClient = eventClient kubeDeps.EventClient = eventClient
} }

View File

@ -183,7 +183,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud
if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) { if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) {
return return
} }
_, err = nodeutil.PatchNodeStatus(cnc.kubeClient, types.NodeName(node.Name), node, newNode) _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode)
if err != nil { if err != nil {
glog.Errorf("Error patching node with cloud ip addresses = [%v]", err) glog.Errorf("Error patching node with cloud ip addresses = [%v]", err)
} }

View File

@ -230,6 +230,8 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",

View File

@ -236,6 +236,7 @@ type Dependencies struct {
ContainerManager cm.ContainerManager ContainerManager cm.ContainerManager
DockerClient libdocker.Interface DockerClient libdocker.Interface
EventClient v1core.EventsGetter EventClient v1core.EventsGetter
HeartbeatClient v1core.CoreV1Interface
KubeClient clientset.Interface KubeClient clientset.Interface
ExternalKubeClient clientgoclientset.Interface ExternalKubeClient clientgoclientset.Interface
Mounter mount.Interface Mounter mount.Interface
@ -451,6 +452,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
hostname: hostname, hostname: hostname,
nodeName: nodeName, nodeName: nodeName,
kubeClient: kubeDeps.KubeClient, kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient,
rootDirectory: rootDirectory, rootDirectory: rootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration, resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources), sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
@ -866,12 +868,13 @@ type serviceLister interface {
type Kubelet struct { type Kubelet struct {
kubeletConfiguration kubeletconfiginternal.KubeletConfiguration kubeletConfiguration kubeletconfiginternal.KubeletConfiguration
hostname string hostname string
nodeName types.NodeName nodeName types.NodeName
runtimeCache kubecontainer.RuntimeCache runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface kubeClient clientset.Interface
iptClient utilipt.Interface heartbeatClient v1core.CoreV1Interface
rootDirectory string iptClient utilipt.Interface
rootDirectory string
// podWorkers handle syncing Pods in response to events. // podWorkers handle syncing Pods in response to events.
podWorkers PodWorkers podWorkers PodWorkers

View File

@ -139,7 +139,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
if requiresUpdate { if requiresUpdate {
if _, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), if _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName),
originalNode, existingNode); err != nil { originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false return false
@ -367,7 +367,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
// It synchronizes node status to master, registering the kubelet first if // It synchronizes node status to master, registering the kubelet first if
// necessary. // necessary.
func (kl *Kubelet) syncNodeStatus() { func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil { if kl.kubeClient == nil || kl.heartbeatClient == nil {
return return
} }
if kl.registerNode { if kl.registerNode {
@ -404,7 +404,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
if tryNumber == 0 { if tryNumber == 0 {
util.FromApiserverCache(&opts) util.FromApiserverCache(&opts)
} }
node, err := kl.kubeClient.Core().Nodes().Get(string(kl.nodeName), opts) node, err := kl.heartbeatClient.Nodes().Get(string(kl.nodeName), opts)
if err != nil { if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
} }
@ -423,7 +423,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
kl.setNodeStatus(node) kl.setNodeStatus(node)
// Patch the current status on the API server // Patch the current status on the API server
updatedNode, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), originalNode, node) updatedNode, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node)
if err != nil { if err != nil {
return err return err
} }

View File

@ -23,6 +23,7 @@ import (
goruntime "runtime" goruntime "runtime"
"sort" "sort"
"strconv" "strconv"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -43,6 +44,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -133,6 +136,7 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.hostname = testKubeletHostname kubelet.hostname = testKubeletHostname
existingNode := v1.Node{ existingNode := v1.Node{
@ -215,6 +219,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
t, inputImageList, false /* controllerAttachDetachEnabled */) t, inputImageList, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{ kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(), ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{ allocatable: v1.ResourceList{
@ -339,6 +344,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{ kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(), ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{ allocatable: v1.ResourceList{
@ -525,10 +531,64 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode)) assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode))
} }
func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
attempts := int64(0)
// set up a listener that hangs connections
ln, err := net.Listen("tcp", "127.0.0.1:0")
assert.NoError(t, err)
defer ln.Close()
go func() {
// accept connections and just let them hang
for {
_, err := ln.Accept()
if err != nil {
t.Log(err)
return
}
t.Log("accepted connection")
atomic.AddInt64(&attempts, 1)
}
}()
config := &rest.Config{
Host: "http://" + ln.Addr().String(),
QPS: -1,
Timeout: time.Second,
}
assert.NoError(t, err)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
},
capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
},
}
// should return an error, but not hang
assert.Error(t, kubelet.updateNodeStatus())
// should have attempted multiple times
if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts != nodeStatusUpdateRetry {
t.Errorf("Expected %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts)
}
}
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{ kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(), ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{ allocatable: v1.ResourceList{
@ -737,6 +797,7 @@ func TestUpdateNodeStatusError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
// No matching node for the kubelet // No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain
assert.Error(t, kubelet.updateNodeStatus()) assert.Error(t, kubelet.updateNodeStatus())
@ -999,6 +1060,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
t, inputImageList, false /* controllerAttachDetachEnabled */) t, inputImageList, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup() defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.containerManager = &localCM{ kubelet.containerManager = &localCM{
ContainerManager: cm.NewStubContainerManager(), ContainerManager: cm.NewStubContainerManager(),
allocatable: v1.ResourceList{ allocatable: v1.ResourceList{
@ -1059,6 +1121,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
func TestUpdateDefaultLabels(t *testing.T) { func TestUpdateDefaultLabels(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used
cases := []struct { cases := []struct {
name string name string

View File

@ -159,6 +159,7 @@ func newTestKubeletWithImageList(
kubelet := &Kubelet{} kubelet := &Kubelet{}
kubelet.recorder = fakeRecorder kubelet.recorder = fakeRecorder
kubelet.kubeClient = fakeKubeClient kubelet.kubeClient = fakeKubeClient
kubelet.heartbeatClient = fakeKubeClient.CoreV1()
kubelet.os = &containertest.FakeOS{} kubelet.os = &containertest.FakeOS{}
kubelet.mounter = &mount.FakeMounter{} kubelet.mounter = &mount.FakeMounter{}

View File

@ -67,6 +67,7 @@ func NewHollowKubelet(
volumePlugins = append(volumePlugins, secret.ProbeVolumePlugins()...) volumePlugins = append(volumePlugins, secret.ProbeVolumePlugins()...)
d := &kubelet.Dependencies{ d := &kubelet.Dependencies{
KubeClient: client, KubeClient: client,
HeartbeatClient: client.CoreV1(),
DockerClient: dockerClient, DockerClient: dockerClient,
CAdvisorInterface: cadvisorInterface, CAdvisorInterface: cadvisorInterface,
Cloud: nil, Cloud: nil,

View File

@ -18,6 +18,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
], ],
) )

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
) )
@ -150,7 +151,7 @@ func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.N
} }
// PatchNodeStatus patches node status. // PatchNodeStatus patches node status.
func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) { func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) {
oldData, err := json.Marshal(oldNode) oldData, err := json.Marshal(oldNode)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
@ -171,7 +172,7 @@ func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1
return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
} }
updatedNode, err := c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
} }