diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index a27699fa7f..406dba293f 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -158,6 +158,7 @@ func UnsecuredDependencies(s *options.KubeletServer) (*kubelet.Dependencies, err ContainerManager: nil, DockerClient: dockerClient, KubeClient: nil, + HeartbeatClient: nil, ExternalKubeClient: nil, EventClient: nil, Mounter: mounter, @@ -319,11 +320,13 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { kubeDeps.KubeClient = nil kubeDeps.ExternalKubeClient = nil kubeDeps.EventClient = nil + kubeDeps.HeartbeatClient = nil glog.Warningf("standalone mode, no API client") - } else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil { + } else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil { // initialize clients if not standalone mode and any of the clients are not provided var kubeClient clientset.Interface var eventClient v1core.EventsGetter + var heartbeatClient v1core.CoreV1Interface var externalKubeClient clientgoclientset.Interface clientConfig, err := CreateAPIServerClientConfig(s) @@ -352,16 +355,24 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { if err != nil { glog.Warningf("New kubeClient from clientConfig error: %v", err) } + // make a separate client for events eventClientConfig := *clientConfig eventClientConfig.QPS = float32(s.EventRecordQPS) eventClientConfig.Burst = int(s.EventBurst) - tmpClient, err := clientgoclientset.NewForConfig(&eventClientConfig) + eventClient, err = v1core.NewForConfig(&eventClientConfig) if err != nil { glog.Warningf("Failed to create API Server client for Events: %v", err) } - eventClient = tmpClient.CoreV1() + // make a separate client for heartbeat with throttling disabled and a timeout attached + heartbeatClientConfig := *clientConfig + heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration + heartbeatClientConfig.QPS = float32(-1) + heartbeatClient, err = v1core.NewForConfig(&heartbeatClientConfig) + if err != nil { + glog.Warningf("Failed to create API Server client for heartbeat: %v", err) + } } else { switch { case s.RequireKubeConfig: @@ -373,6 +384,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { kubeDeps.KubeClient = kubeClient kubeDeps.ExternalKubeClient = externalKubeClient + if heartbeatClient != nil { + kubeDeps.HeartbeatClient = heartbeatClient + } if eventClient != nil { kubeDeps.EventClient = eventClient } diff --git a/pkg/controller/cloud/node_controller.go b/pkg/controller/cloud/node_controller.go index 23170181ba..154c09e8b2 100644 --- a/pkg/controller/cloud/node_controller.go +++ b/pkg/controller/cloud/node_controller.go @@ -183,7 +183,7 @@ func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloud if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) { return } - _, err = nodeutil.PatchNodeStatus(cnc.kubeClient, types.NodeName(node.Name), node, newNode) + _, err = nodeutil.PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode) if err != nil { glog.Errorf("Error patching node with cloud ip addresses = [%v]", err) } diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index cd8c207bd4..5b2ad44101 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -230,6 +230,8 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index bc633e4b6f..2b6c6cd95d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -236,6 +236,7 @@ type Dependencies struct { ContainerManager cm.ContainerManager DockerClient libdocker.Interface EventClient v1core.EventsGetter + HeartbeatClient v1core.CoreV1Interface KubeClient clientset.Interface ExternalKubeClient clientgoclientset.Interface Mounter mount.Interface @@ -451,6 +452,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, hostname: hostname, nodeName: nodeName, kubeClient: kubeDeps.KubeClient, + heartbeatClient: kubeDeps.HeartbeatClient, rootDirectory: rootDirectory, resyncInterval: kubeCfg.SyncFrequency.Duration, sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources), @@ -866,12 +868,13 @@ type serviceLister interface { type Kubelet struct { kubeletConfiguration kubeletconfiginternal.KubeletConfiguration - hostname string - nodeName types.NodeName - runtimeCache kubecontainer.RuntimeCache - kubeClient clientset.Interface - iptClient utilipt.Interface - rootDirectory string + hostname string + nodeName types.NodeName + runtimeCache kubecontainer.RuntimeCache + kubeClient clientset.Interface + heartbeatClient v1core.CoreV1Interface + iptClient utilipt.Interface + rootDirectory string // podWorkers handle syncing Pods in response to events. podWorkers PodWorkers diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 59b77a4893..b7be57b1c3 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -139,7 +139,7 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate if requiresUpdate { - if _, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), + if _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil { glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) return false @@ -367,7 +367,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) { // It synchronizes node status to master, registering the kubelet first if // necessary. func (kl *Kubelet) syncNodeStatus() { - if kl.kubeClient == nil { + if kl.kubeClient == nil || kl.heartbeatClient == nil { return } if kl.registerNode { @@ -404,7 +404,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { if tryNumber == 0 { util.FromApiserverCache(&opts) } - node, err := kl.kubeClient.Core().Nodes().Get(string(kl.nodeName), opts) + node, err := kl.heartbeatClient.Nodes().Get(string(kl.nodeName), opts) if err != nil { return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) } @@ -423,7 +423,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { kl.setNodeStatus(node) // Patch the current status on the API server - updatedNode, err := nodeutil.PatchNodeStatus(kl.kubeClient, types.NodeName(kl.nodeName), originalNode, node) + updatedNode, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node) if err != nil { return err } diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index e12ddc56d0..9721f3bef4 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -23,6 +23,7 @@ import ( goruntime "runtime" "sort" "strconv" + "sync/atomic" "testing" "time" @@ -43,6 +44,8 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/fake" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" core "k8s.io/client-go/testing" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" @@ -133,6 +136,7 @@ func TestNodeStatusWithCloudProviderNodeIP(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.hostname = testKubeletHostname existingNode := v1.Node{ @@ -215,6 +219,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { t, inputImageList, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ @@ -339,6 +344,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ @@ -525,10 +531,64 @@ func TestUpdateExistingNodeStatus(t *testing.T) { assert.True(t, apiequality.Semantic.DeepEqual(expectedNode, updatedNode), "%s", diff.ObjectDiff(expectedNode, updatedNode)) } +func TestUpdateExistingNodeStatusTimeout(t *testing.T) { + attempts := int64(0) + + // set up a listener that hangs connections + ln, err := net.Listen("tcp", "127.0.0.1:0") + assert.NoError(t, err) + defer ln.Close() + go func() { + // accept connections and just let them hang + for { + _, err := ln.Accept() + if err != nil { + t.Log(err) + return + } + t.Log("accepted connection") + atomic.AddInt64(&attempts, 1) + } + }() + + config := &rest.Config{ + Host: "http://" + ln.Addr().String(), + QPS: -1, + Timeout: time.Second, + } + assert.NoError(t, err) + + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used + kubelet.heartbeatClient, err = v1core.NewForConfig(config) + kubelet.containerManager = &localCM{ + ContainerManager: cm.NewStubContainerManager(), + allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI), + }, + capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI), + }, + } + + // should return an error, but not hang + assert.Error(t, kubelet.updateNodeStatus()) + + // should have attempted multiple times + if actualAttempts := atomic.LoadInt64(&attempts); actualAttempts != nodeStatusUpdateRetry { + t.Errorf("Expected %d attempts, got %d", nodeStatusUpdateRetry, actualAttempts) + } +} + func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ @@ -737,6 +797,7 @@ func TestUpdateNodeStatusError(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used // No matching node for the kubelet testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{}}).ReactionChain assert.Error(t, kubelet.updateNodeStatus()) @@ -999,6 +1060,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { t, inputImageList, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet + kubelet.kubeClient = nil // ensure only the heartbeat client is used kubelet.containerManager = &localCM{ ContainerManager: cm.NewStubContainerManager(), allocatable: v1.ResourceList{ @@ -1059,6 +1121,7 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { func TestUpdateDefaultLabels(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used cases := []struct { name string diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 7834ddb428..771e938b66 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -159,6 +159,7 @@ func newTestKubeletWithImageList( kubelet := &Kubelet{} kubelet.recorder = fakeRecorder kubelet.kubeClient = fakeKubeClient + kubelet.heartbeatClient = fakeKubeClient.CoreV1() kubelet.os = &containertest.FakeOS{} kubelet.mounter = &mount.FakeMounter{} diff --git a/pkg/kubemark/hollow_kubelet.go b/pkg/kubemark/hollow_kubelet.go index db52f2f088..5fe4d7b13a 100644 --- a/pkg/kubemark/hollow_kubelet.go +++ b/pkg/kubemark/hollow_kubelet.go @@ -67,6 +67,7 @@ func NewHollowKubelet( volumePlugins = append(volumePlugins, secret.ProbeVolumePlugins()...) d := &kubelet.Dependencies{ KubeClient: client, + HeartbeatClient: client.CoreV1(), DockerClient: dockerClient, CAdvisorInterface: cadvisorInterface, Cloud: nil, diff --git a/pkg/util/node/BUILD b/pkg/util/node/BUILD index 0c1f2348f7..4c7f548787 100644 --- a/pkg/util/node/BUILD +++ b/pkg/util/node/BUILD @@ -18,6 +18,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", ], ) diff --git a/pkg/util/node/node.go b/pkg/util/node/node.go index 166fb0cde3..0138a56b59 100644 --- a/pkg/util/node/node.go +++ b/pkg/util/node/node.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" clientset "k8s.io/client-go/kubernetes" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/pkg/api" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" ) @@ -150,7 +151,7 @@ func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.N } // PatchNodeStatus patches node status. -func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) { +func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, error) { oldData, err := json.Marshal(oldNode) if err != nil { return nil, fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) @@ -171,7 +172,7 @@ func PatchNodeStatus(c clientset.Interface, nodeName types.NodeName, oldNode *v1 return nil, fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) } - updatedNode, err := c.Core().Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") + updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") if err != nil { return nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) }