Merge pull request #66257 from mtaufen/node-lease

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Kubelet creates and manages node leases

This extends the Kubelet to create and periodically update leases in a
new kube-node-lease namespace. Based on [KEP-0009](https://github.com/kubernetes/community/blob/master/keps/sig-node/0009-node-heartbeat.md),
these leases can be used as a node health signal, and will allow us to
reduce the load caused by over-frequent node status reporting.

- add NodeLease feature gate
- add kube-node-lease system namespace for node leases
- add Kubelet option for lease duration
- add Kubelet-internal lease controller to create and update lease
- add e2e test for NodeLease feature

I would like to determine a standard policy for lease renewal frequency, based on the configured lease duration, so that we don't need to expose frequency as an additional knob. The renew interval is currently calculated as 1/3 of the lease duration.

```release-note
kubelet: Users can now enable the alpha NodeLease feature gate to have the Kubelet create and periodically renew a Lease in the kube-node-lease namespace. The lease duration defaults to 40s, and can be configured via the kubelet.config.k8s.io/v1beta1.KubeletConfiguration's NodeLeaseDurationSeconds field.
```

/cc @wojtek-t @liggitt
pull/8/head
Kubernetes Submit Queue 2018-08-26 22:32:16 -07:00 committed by GitHub
commit aec270256b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 883 additions and 21 deletions

View File

@ -307,6 +307,7 @@ func TestValidateKubeletConfiguration(t *testing.T) {
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: "promiscuous-bridge",
NodeLeaseDurationSeconds: 40,
},
},
},

View File

@ -127,6 +127,7 @@ ComponentConfigs:
MakeIPTablesUtilChains: true
MaxOpenFiles: 1000000
MaxPods: 110
NodeLeaseDurationSeconds: 40
NodeStatusUpdateFrequency: 10s
OOMScoreAdj: -999
PodCIDR: ""

View File

@ -138,6 +138,7 @@ kubeAPIQPS: 5
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeLeaseDurationSeconds: 40
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1

View File

@ -133,6 +133,7 @@ kubeAPIQPS: 5
makeIPTablesUtilChains: true
maxOpenFiles: 1000000
maxPods: 110
nodeLeaseDurationSeconds: 40
nodeStatusUpdateFrequency: 10s
oomScoreAdj: -999
podPidsLimit: -1

View File

@ -553,7 +553,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
// initialize clients if not standalone mode and any of the clients are not provided
var kubeClient clientset.Interface
var eventClient v1core.EventsGetter
var heartbeatClient v1core.CoreV1Interface
var heartbeatClient clientset.Interface
var externalKubeClient clientset.Interface
clientConfig, err := createAPIServerClientConfig(s)
@ -601,8 +601,15 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
// make a separate client for heartbeat with throttling disabled and a timeout attached
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
// if the NodeLease feature is enabled, the timeout is the minimum of the lease duration and status update frequency
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
}
heartbeatClientConfig.QPS = float32(-1)
heartbeatClient, err = v1core.NewForConfig(&heartbeatClientConfig)
heartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
glog.Warningf("Failed to create API Server client for heartbeat: %v", err)
}

View File

@ -35,6 +35,8 @@ const (
NamespaceSystem string = "kube-system"
// NamespacePublic is the namespace where we place public info (ConfigMaps)
NamespacePublic string = "kube-public"
// NamespaceNodeLease is the namespace where we place node lease objects (used for node heartbeats)
NamespaceNodeLease string = "kube-node-lease"
// TerminationMessagePathDefault means the default path to capture the application termination message running in a container
TerminationMessagePathDefault string = "/dev/termination-log"
)

View File

@ -350,6 +350,13 @@ const (
//
// Enables RuntimeClass, for selecting between multiple runtimes to run a pod.
RuntimeClass utilfeature.Feature = "RuntimeClass"
// owner: @mtaufen
// alpha: v1.12
//
// Kubelet uses the new Lease API to report node heartbeats,
// (Kube) Node Lifecycle Controller uses these heartbeats as a node health signal.
NodeLease utilfeature.Feature = "NodeLease"
)
func init() {
@ -409,6 +416,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
ResourceQuotaScopeSelectors: {Default: true, PreRelease: utilfeature.Beta},
CSIBlockVolume: {Default: false, PreRelease: utilfeature.Alpha},
RuntimeClass: {Default: false, PreRelease: utilfeature.Alpha},
NodeLease: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -67,6 +67,7 @@ go_library(
"//pkg/kubelet/metrics/collectors:go_default_library",
"//pkg/kubelet/mountpod:go_default_library",
"//pkg/kubelet/network/dns:go_default_library",
"//pkg/kubelet/nodelease:go_default_library",
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pod:go_default_library",
@ -227,8 +228,8 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
@ -277,6 +278,7 @@ filegroup(
"//pkg/kubelet/metrics:all-srcs",
"//pkg/kubelet/mountpod:all-srcs",
"//pkg/kubelet/network:all-srcs",
"//pkg/kubelet/nodelease:all-srcs",
"//pkg/kubelet/nodestatus:all-srcs",
"//pkg/kubelet/pleg:all-srcs",
"//pkg/kubelet/pod:all-srcs",

View File

@ -61,6 +61,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.MaxPods = 110
obj.PodPidsLimit = -1
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
obj.NodeLeaseDurationSeconds = 40
obj.CPUManagerPolicy = "none"
obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency
obj.QOSReserved = map[string]string{
@ -95,6 +96,7 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
obj.StaticPodURLHeader = make(map[string][]string)
obj.ContainerLogMaxFiles = 5
obj.ContainerLogMaxSize = "10Mi"
obj.ConfigMapAndSecretChangeDetectionStrategy = "Watch"
},
}
}

View File

@ -196,6 +196,7 @@ var (
"MaxOpenFiles",
"MaxPods",
"NodeStatusUpdateFrequency.Duration",
"NodeLeaseDurationSeconds",
"OOMScoreAdj",
"PodCIDR",
"PodPidsLimit",

View File

@ -155,6 +155,8 @@ type KubeletConfiguration struct {
// status to master. Note: be cautious when changing the constant, it
// must work with nodeMonitorGracePeriod in nodecontroller.
NodeStatusUpdateFrequency metav1.Duration
// nodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease.
NodeLeaseDurationSeconds int32
// imageMinimumGCAge is the minimum age for an unused image before it is
// garbage collected.
ImageMinimumGCAge metav1.Duration

View File

@ -106,6 +106,9 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
if obj.NodeStatusUpdateFrequency == zeroDuration {
obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second}
}
if obj.NodeLeaseDurationSeconds == 0 {
obj.NodeLeaseDurationSeconds = 40
}
if obj.ImageMinimumGCAge == zeroDuration {
obj.ImageMinimumGCAge = metav1.Duration{Duration: 2 * time.Minute}
}

View File

@ -303,6 +303,19 @@ type KubeletConfiguration struct {
// Default: "10s"
// +optional
NodeStatusUpdateFrequency metav1.Duration `json:"nodeStatusUpdateFrequency,omitempty"`
// nodeLeaseDurationSeconds is the duration the Kubelet will set on its corresponding Lease,
// when the NodeLease feature is enabled. This feature provides an indicator of node
// health by having the Kublet create and periodically renew a lease, named after the node,
// in the kube-node-lease namespace. If the lease expires, the node can be considered unhealthy.
// The lease is currently renewed every 10s, per KEP-0009. In the future, the lease renewal interval
// may be set based on the lease duration.
// Requires the NodeLease feature gate to be enabled.
// Dynamic Kubelet Config (beta): If dynamically updating this field, consider that
// decreasing the duration may reduce tolerance for issues that temporarily prevent
// the Kubelet from renewing the lease (e.g. a short-lived network issue).
// Default: 40
// +optional
NodeLeaseDurationSeconds int32 `json:"nodeLeaseDurationSeconds,omitempty"`
// imageMinimumGCAge is the minimum age for an unused image before it is
// garbage collected.
// Dynamic Kubelet Config (beta): If dynamically updating this field, consider that

View File

@ -250,6 +250,7 @@ func autoConvert_v1beta1_KubeletConfiguration_To_kubeletconfig_KubeletConfigurat
out.ClusterDNS = *(*[]string)(unsafe.Pointer(&in.ClusterDNS))
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds
out.ImageMinimumGCAge = in.ImageMinimumGCAge
if err := v1.Convert_Pointer_int32_To_int32(&in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent, s); err != nil {
return err
@ -375,6 +376,7 @@ func autoConvert_kubeletconfig_KubeletConfiguration_To_v1beta1_KubeletConfigurat
out.ClusterDNS = *(*[]string)(unsafe.Pointer(&in.ClusterDNS))
out.StreamingConnectionIdleTimeout = in.StreamingConnectionIdleTimeout
out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency
out.NodeLeaseDurationSeconds = in.NodeLeaseDurationSeconds
out.ImageMinimumGCAge = in.ImageMinimumGCAge
if err := v1.Convert_int32_To_Pointer_int32(&in.ImageGCHighThresholdPercent, &out.ImageGCHighThresholdPercent, s); err != nil {
return err

View File

@ -36,6 +36,9 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration) error
localFeatureGate := utilfeature.DefaultFeatureGate.DeepCopy()
localFeatureGate.SetFromMap(kc.FeatureGates)
if kc.NodeLeaseDurationSeconds <= 0 {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: NodeLeaseDurationSeconds must be greater than 0"))
}
if !kc.CgroupsPerQOS && len(kc.EnforceNodeAllocatable) > 0 {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: EnforceNodeAllocatable (--enforce-node-allocatable) is not supported unless CgroupsPerQOS (--cgroups-per-qos) feature is turned on"))
}

View File

@ -47,9 +47,10 @@ func TestValidateKubeletConfiguration(t *testing.T) {
RegistryBurst: 10,
RegistryPullQPS: 5,
HairpinMode: kubeletconfig.PromiscuousBridge,
NodeLeaseDurationSeconds: 1,
}
if allErrors := ValidateKubeletConfiguration(successCase); allErrors != nil {
t.Errorf("expect no errors got %v", allErrors)
t.Errorf("expect no errors, got %v", allErrors)
}
errorCase := &kubeletconfig.KubeletConfiguration{
@ -75,8 +76,10 @@ func TestValidateKubeletConfiguration(t *testing.T) {
RegistryBurst: -10,
RegistryPullQPS: -10,
HairpinMode: "foo",
NodeLeaseDurationSeconds: -1,
}
if allErrors := ValidateKubeletConfiguration(errorCase); len(allErrors.(utilerrors.Aggregate).Errors()) != 22 {
t.Errorf("expect 22 errors got %v", len(allErrors.(utilerrors.Aggregate).Errors()))
const numErrs = 23
if allErrors := ValidateKubeletConfiguration(errorCase); len(allErrors.(utilerrors.Aggregate).Errors()) != numErrs {
t.Errorf("expect %d errors, got %v", numErrs, len(allErrors.(utilerrors.Aggregate).Errors()))
}
}

View File

@ -79,6 +79,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
"k8s.io/kubernetes/pkg/kubelet/nodelease"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/preemption"
@ -239,7 +240,7 @@ type Dependencies struct {
ContainerManager cm.ContainerManager
DockerClientConfig *dockershim.ClientConfig
EventClient v1core.EventsGetter
HeartbeatClient v1core.CoreV1Interface
HeartbeatClient clientset.Interface
OnHeartbeatFailure func()
KubeClient clientset.Interface
ExternalKubeClient clientset.Interface
@ -843,6 +844,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
klet.nodeLeaseController = nodelease.NewController(klet.clock, klet.heartbeatClient, string(klet.nodeName), kubeCfg.NodeLeaseDurationSeconds, klet.onRepeatedHeartbeatFailure)
}
// Finally, put the most recent version of the config on the Kubelet, so
// people can see how it was configured.
klet.kubeletConfiguration = *kubeCfg
@ -866,7 +872,7 @@ type Kubelet struct {
nodeName types.NodeName
runtimeCache kubecontainer.RuntimeCache
kubeClient clientset.Interface
heartbeatClient v1core.CoreV1Interface
heartbeatClient clientset.Interface
iptClient utilipt.Interface
rootDirectory string
@ -1030,6 +1036,9 @@ type Kubelet struct {
// This lock is used by Kublet.updateRuntimeUp function and shouldn't be used anywhere else.
updateRuntimeMux sync.Mutex
// nodeLeaseController claims and renews the node lease for this Kubelet
nodeLeaseController nodelease.Controller
// Generates pod events.
pleg pleg.PodLifecycleEventGenerator
@ -1367,6 +1376,11 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
go kl.nodeLeaseController.Run(wait.NeverStop)
}
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

View File

@ -394,7 +394,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
if tryNumber == 0 {
util.FromApiserverCache(&opts)
}
node, err := kl.heartbeatClient.Nodes().Get(string(kl.nodeName), opts)
node, err := kl.heartbeatClient.CoreV1().Nodes().Get(string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
@ -412,7 +412,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
kl.setNodeStatus(node)
// Patch the current status on the API server
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient, types.NodeName(kl.nodeName), originalNode, node)
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
}

View File

@ -43,8 +43,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/features"
@ -570,7 +570,7 @@ func TestUpdateExistingNodeStatusTimeout(t *testing.T) {
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.kubeClient = nil // ensure only the heartbeat client is used
kubelet.heartbeatClient, err = v1core.NewForConfig(config)
kubelet.heartbeatClient, err = clientset.NewForConfig(config)
kubelet.onRepeatedHeartbeatFailure = func() {
atomic.AddInt64(&failureCallbacks, 1)
}

View File

@ -172,7 +172,7 @@ func newTestKubeletWithImageList(
kubelet := &Kubelet{}
kubelet.recorder = fakeRecorder
kubelet.kubeClient = fakeKubeClient
kubelet.heartbeatClient = fakeKubeClient.CoreV1()
kubelet.heartbeatClient = fakeKubeClient
kubelet.os = &containertest.FakeOS{}
kubelet.mounter = &mount.FakeMounter{}

View File

@ -0,0 +1,48 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["controller.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/nodelease",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/coordination/v1beta1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/coordination/v1beta1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["controller_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/coordination/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,178 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodelease
import (
"time"
coordv1beta1 "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
"k8s.io/utils/pointer"
"github.com/golang/glog"
)
const (
// renewInterval is the interval at which the lease is renewed
// TODO(mtaufen): 10s was the decision in the KEP, to keep the behavior as close to the
// current default behavior as possible. In the future, we should determine a reasonable
// fraction of the lease duration at which to renew, and use that instead.
renewInterval = 10 * time.Second
// maxUpdateRetries is the number of immediate, successive retries the Kubelet will attempt
// when renewing the lease before it waits for the renewal interval before trying again,
// similar to what we do for node status retries
maxUpdateRetries = 5
// maxBackoff is the maximum sleep time during backoff (e.g. in backoffEnsureLease)
maxBackoff = 7 * time.Second
)
// Controller manages creating and renewing the lease for this Kubelet
type Controller interface {
Run(stopCh <-chan struct{})
}
type controller struct {
client coordclientset.LeaseInterface
holderIdentity string
leaseDurationSeconds int32
renewInterval time.Duration
clock clock.Clock
onRepeatedHeartbeatFailure func()
}
// NewController constructs and returns a controller
func NewController(clock clock.Clock, client clientset.Interface, holderIdentity string, leaseDurationSeconds int32, onRepeatedHeartbeatFailure func()) Controller {
var leaseClient coordclientset.LeaseInterface
if client != nil {
leaseClient = client.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
}
return &controller{
client: leaseClient,
holderIdentity: holderIdentity,
leaseDurationSeconds: leaseDurationSeconds,
renewInterval: renewInterval,
clock: clock,
onRepeatedHeartbeatFailure: onRepeatedHeartbeatFailure,
}
}
// Run runs the controller
func (c *controller) Run(stopCh <-chan struct{}) {
if c.client == nil {
glog.Infof("node lease controller has nil client, will not claim or renew leases")
return
}
wait.Until(c.sync, c.renewInterval, stopCh)
}
func (c *controller) sync() {
lease, created := c.backoffEnsureLease()
// we don't need to update the lease if we just created it
if !created {
c.retryUpdateLease(lease)
}
}
// backoffEnsureLease attempts to create the lease if it does not exist,
// and uses exponentially increasing waits to prevent overloading the API server
// with retries. Returns the lease, and true if this call created the lease,
// false otherwise.
func (c *controller) backoffEnsureLease() (*coordv1beta1.Lease, bool) {
var (
lease *coordv1beta1.Lease
created bool
err error
)
sleep := 100 * time.Millisecond
for {
lease, created, err = c.ensureLease()
if err == nil {
break
}
sleep = minDuration(2*sleep, maxBackoff)
glog.Errorf("failed to ensure node lease exists, will retry in %v, error: %v", sleep, err)
// backoff wait
c.clock.Sleep(sleep)
}
return lease, created
}
// ensureLease creates the lease if it does not exist. Returns the lease and
// a bool (true if this call created the lease), or any error that occurs.
func (c *controller) ensureLease() (*coordv1beta1.Lease, bool, error) {
lease, err := c.client.Get(c.holderIdentity, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
// lease does not exist, create it
lease, err := c.client.Create(c.newLease(nil))
if err != nil {
return nil, false, err
}
return lease, true, nil
} else if err != nil {
// unexpected error getting lease
return nil, false, err
}
// lease already existed
return lease, false, nil
}
// retryUpdateLease attempts to update the lease for maxUpdateRetries,
// call this once you're sure the lease has been created
func (c *controller) retryUpdateLease(base *coordv1beta1.Lease) {
for i := 0; i < maxUpdateRetries; i++ {
_, err := c.client.Update(c.newLease(base))
if err == nil {
return
}
glog.Errorf("failed to update node lease, error: %v", err)
if i > 0 && c.onRepeatedHeartbeatFailure != nil {
c.onRepeatedHeartbeatFailure()
}
}
glog.Errorf("failed %d attempts to update node lease, will retry after %v", maxUpdateRetries, c.renewInterval)
}
// newLease constructs a new lease if base is nil, or returns a copy of base
// with desired state asserted on the copy.
func (c *controller) newLease(base *coordv1beta1.Lease) *coordv1beta1.Lease {
var lease *coordv1beta1.Lease
if base == nil {
lease = &coordv1beta1.Lease{}
} else {
lease = base.DeepCopy()
}
// Use the bare minimum set of fields; other fields exist for debugging/legacy,
// but we don't need to make node heartbeats more complicated by using them.
lease.Name = c.holderIdentity
lease.Spec.HolderIdentity = pointer.StringPtr(c.holderIdentity)
lease.Spec.LeaseDurationSeconds = pointer.Int32Ptr(c.leaseDurationSeconds)
lease.Spec.RenewTime = &metav1.MicroTime{Time: c.clock.Now()}
return lease
}
func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

View File

@ -0,0 +1,99 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodelease
import (
"testing"
"time"
coordv1beta1 "k8s.io/api/coordination/v1beta1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/utils/pointer"
)
func TestNewLease(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
cases := []struct {
desc string
controller *controller
base *coordv1beta1.Lease
expect *coordv1beta1.Lease
}{
{
desc: "nil base",
controller: &controller{
holderIdentity: "foo",
leaseDurationSeconds: 10,
clock: fakeClock,
},
base: nil,
expect: &coordv1beta1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Spec: coordv1beta1.LeaseSpec{
HolderIdentity: pointer.StringPtr("foo"),
LeaseDurationSeconds: pointer.Int32Ptr(10),
RenewTime: &metav1.MicroTime{Time: fakeClock.Now()},
},
},
},
{
desc: "non-nil base renew time is updated",
controller: &controller{
holderIdentity: "foo",
leaseDurationSeconds: 10,
clock: fakeClock,
},
base: &coordv1beta1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Spec: coordv1beta1.LeaseSpec{
HolderIdentity: pointer.StringPtr("foo"),
LeaseDurationSeconds: pointer.Int32Ptr(10),
RenewTime: &metav1.MicroTime{Time: fakeClock.Now().Add(-10 * time.Second)},
},
},
expect: &coordv1beta1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Spec: coordv1beta1.LeaseSpec{
HolderIdentity: pointer.StringPtr("foo"),
LeaseDurationSeconds: pointer.Int32Ptr(10),
RenewTime: &metav1.MicroTime{Time: fakeClock.Now()},
},
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
newLease := tc.controller.newLease(tc.base)
if newLease == tc.base {
t.Fatalf("the new lease must be newly allocated, but got same address as base")
}
if !apiequality.Semantic.DeepEqual(tc.expect, newLease) {
t.Errorf("unexpected result from newLease: %s", diff.ObjectDiff(tc.expect, newLease))
}
})
}
}

View File

@ -66,7 +66,7 @@ func NewHollowKubelet(
volumePlugins = append(volumePlugins, secret.ProbeVolumePlugins()...)
d := &kubelet.Dependencies{
KubeClient: client,
HeartbeatClient: client.CoreV1(),
HeartbeatClient: client,
DockerClientConfig: dockerClientConfig,
CAdvisorInterface: cadvisorInterface,
Cloud: nil,

View File

@ -41,6 +41,7 @@ go_library(
"//pkg/apis/storage/install:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubeapiserver/options:go_default_library",
"//pkg/kubelet/client:go_default_library",
"//pkg/master/reconcilers:go_default_library",
@ -113,6 +114,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
@ -29,8 +30,10 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/master/reconcilers"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
@ -84,6 +87,11 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
glog.Fatalf("failed to get listener address: %v", err)
}
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic}
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
systemNamespaces = append(systemNamespaces, corev1.NamespaceNodeLease)
}
return &Controller{
ServiceClient: serviceClient,
NamespaceClient: nsClient,
@ -92,7 +100,7 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
SystemNamespaces: []string{metav1.NamespaceSystem, metav1.NamespacePublic},
SystemNamespaces: systemNamespaces,
SystemNamespacesInterval: 1 * time.Minute,
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,

View File

@ -13,6 +13,7 @@ go_library(
deps = [
"//pkg/api/pod:go_default_library",
"//pkg/apis/authentication:go_default_library",
"//pkg/apis/coordination:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/policy:go_default_library",
"//pkg/auth/nodeidentifier:go_default_library",
@ -34,6 +35,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/apis/authentication:go_default_library",
"//pkg/apis/coordination:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/policy:go_default_library",
"//pkg/auth/nodeidentifier:go_default_library",
@ -46,6 +48,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -30,6 +30,7 @@ import (
corev1lister "k8s.io/client-go/listers/core/v1"
podutil "k8s.io/kubernetes/pkg/api/pod"
authenticationapi "k8s.io/kubernetes/pkg/apis/authentication"
coordapi "k8s.io/kubernetes/pkg/apis/coordination"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/auth/nodeidentifier"
@ -90,6 +91,7 @@ var (
nodeResource = api.Resource("nodes")
pvcResource = api.Resource("persistentvolumeclaims")
svcacctResource = api.Resource("serviceaccounts")
leaseResource = coordapi.Resource("leases")
)
func (c *nodePlugin) Admit(a admission.Attributes) error {
@ -135,6 +137,12 @@ func (c *nodePlugin) Admit(a admission.Attributes) error {
}
return nil
case leaseResource:
if c.features.Enabled(features.NodeLease) {
return c.admitLease(nodeName, a)
}
return admission.NewForbidden(a, fmt.Errorf("disabled by feature gate %s", features.NodeLease))
default:
return nil
}
@ -389,3 +397,28 @@ func (c *nodePlugin) admitServiceAccount(nodeName string, a admission.Attributes
return nil
}
func (r *nodePlugin) admitLease(nodeName string, a admission.Attributes) error {
// the request must be against the system namespace reserved for node leases
if a.GetNamespace() != api.NamespaceNodeLease {
return admission.NewForbidden(a, fmt.Errorf("can only access leases in the %q system namespace", api.NamespaceNodeLease))
}
// the request must come from a node with the same name as the lease
if a.GetOperation() == admission.Create {
// a.GetName() won't return the name on create, so we drill down to the proposed object
lease, ok := a.GetObject().(*coordapi.Lease)
if !ok {
return admission.NewForbidden(a, fmt.Errorf("unexpected type %T", a.GetObject()))
}
if lease.Name != nodeName {
return admission.NewForbidden(a, fmt.Errorf("can only access node lease with the same name as the requesting node"))
}
} else {
if a.GetName() != nodeName {
return admission.NewForbidden(a, fmt.Errorf("can only access node lease with the same name as the requesting node"))
}
}
return nil
}

View File

@ -19,6 +19,7 @@ package noderestriction
import (
"strings"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -29,15 +30,19 @@ import (
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
authenticationapi "k8s.io/kubernetes/pkg/apis/authentication"
"k8s.io/kubernetes/pkg/apis/coordination"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/auth/nodeidentifier"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"
)
var (
trEnabledFeature = utilfeature.NewFeatureGate()
trDisabledFeature = utilfeature.NewFeatureGate()
trEnabledFeature = utilfeature.NewFeatureGate()
trDisabledFeature = utilfeature.NewFeatureGate()
leaseEnabledFeature = utilfeature.NewFeatureGate()
leaseDisabledFeature = utilfeature.NewFeatureGate()
)
func init() {
@ -47,6 +52,12 @@ func init() {
if err := trDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.TokenRequest: {Default: false}}); err != nil {
panic(err)
}
if err := leaseEnabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.NodeLease: {Default: true}}); err != nil {
panic(err)
}
if err := leaseDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.NodeLease: {Default: false}}); err != nil {
panic(err)
}
}
func makeTestPod(namespace, name, node string, mirror bool) (*api.Pod, *corev1.Pod) {
@ -146,6 +157,42 @@ func Test_nodePlugin_Admit(t *testing.T) {
svcacctResource = api.Resource("serviceaccounts").WithVersion("v1")
tokenrequestKind = api.Kind("TokenRequest").WithVersion("v1")
leaseResource = coordination.Resource("leases").WithVersion("v1beta1")
leaseKind = coordination.Kind("Lease").WithVersion("v1beta1")
lease = &coordination.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "mynode",
Namespace: api.NamespaceNodeLease,
},
Spec: coordination.LeaseSpec{
HolderIdentity: pointer.StringPtr("mynode"),
LeaseDurationSeconds: pointer.Int32Ptr(40),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
leaseWrongNS = &coordination.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "mynode",
Namespace: "foo",
},
Spec: coordination.LeaseSpec{
HolderIdentity: pointer.StringPtr("mynode"),
LeaseDurationSeconds: pointer.Int32Ptr(40),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
leaseWrongName = &coordination.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceNodeLease,
},
Spec: coordination.LeaseSpec{
HolderIdentity: pointer.StringPtr("mynode"),
LeaseDurationSeconds: pointer.Int32Ptr(40),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
noExistingPodsIndex = cache.NewIndexer(cache.MetaNamespaceKeyFunc, nil)
noExistingPods = corev1lister.NewPodLister(noExistingPodsIndex)
@ -847,6 +894,67 @@ func Test_nodePlugin_Admit(t *testing.T) {
attributes: admission.NewAttributesRecord(nil, nil, podKind, coreunboundpod.Namespace, coreunboundpod.Name, podResource, "status", admission.Delete, false, bob),
err: "",
},
// Node leases
{
name: "disallowed create lease - feature disabled",
attributes: admission.NewAttributesRecord(lease, nil, leaseKind, lease.Namespace, lease.Name, leaseResource, "", admission.Create, false, mynode),
features: leaseDisabledFeature,
err: "forbidden: disabled by feature gate NodeLease",
},
{
name: "disallowed create lease in namespace other than kube-node-lease - feature enabled",
attributes: admission.NewAttributesRecord(leaseWrongNS, nil, leaseKind, leaseWrongNS.Namespace, leaseWrongNS.Name, leaseResource, "", admission.Create, false, mynode),
features: leaseEnabledFeature,
err: "forbidden: ",
},
{
name: "disallowed update lease in namespace other than kube-node-lease - feature enabled",
attributes: admission.NewAttributesRecord(leaseWrongNS, leaseWrongNS, leaseKind, leaseWrongNS.Namespace, leaseWrongNS.Name, leaseResource, "", admission.Update, false, mynode),
features: leaseEnabledFeature,
err: "forbidden: ",
},
{
name: "disallowed delete lease in namespace other than kube-node-lease - feature enabled",
attributes: admission.NewAttributesRecord(nil, nil, leaseKind, leaseWrongNS.Namespace, leaseWrongNS.Name, leaseResource, "", admission.Delete, false, mynode),
features: leaseEnabledFeature,
err: "forbidden: ",
},
{
name: "disallowed create another node's lease - feature enabled",
attributes: admission.NewAttributesRecord(leaseWrongName, nil, leaseKind, leaseWrongName.Namespace, leaseWrongName.Name, leaseResource, "", admission.Create, false, mynode),
features: leaseEnabledFeature,
err: "forbidden: ",
},
{
name: "disallowed update another node's lease - feature enabled",
attributes: admission.NewAttributesRecord(leaseWrongName, leaseWrongName, leaseKind, leaseWrongName.Namespace, leaseWrongName.Name, leaseResource, "", admission.Update, false, mynode),
features: leaseEnabledFeature,
err: "forbidden: ",
},
{
name: "disallowed delete another node's lease - feature enabled",
attributes: admission.NewAttributesRecord(nil, nil, leaseKind, leaseWrongName.Namespace, leaseWrongName.Name, leaseResource, "", admission.Delete, false, mynode),
features: leaseEnabledFeature,
err: "forbidden: ",
},
{
name: "allowed create node lease - feature enabled",
attributes: admission.NewAttributesRecord(lease, nil, leaseKind, lease.Namespace, lease.Name, leaseResource, "", admission.Create, false, mynode),
features: leaseEnabledFeature,
err: "",
},
{
name: "allowed update node lease - feature enabled",
attributes: admission.NewAttributesRecord(lease, lease, leaseKind, lease.Namespace, lease.Name, leaseResource, "", admission.Update, false, mynode),
features: leaseEnabledFeature,
err: "",
},
{
name: "allowed delete node lease - feature enabled",
attributes: admission.NewAttributesRecord(nil, nil, leaseKind, lease.Namespace, lease.Name, leaseResource, "", admission.Delete, false, mynode),
features: leaseEnabledFeature,
err: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -41,6 +41,7 @@ go_library(
deps = [
"//pkg/api/v1/persistentvolume:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/coordination:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/storage:go_default_library",
"//pkg/auth/nodeidentifier:go_default_library",

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/authorization/authorizer"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coordapi "k8s.io/kubernetes/pkg/apis/coordination"
api "k8s.io/kubernetes/pkg/apis/core"
storageapi "k8s.io/kubernetes/pkg/apis/storage"
"k8s.io/kubernetes/pkg/auth/nodeidentifier"
@ -72,6 +73,7 @@ var (
pvResource = api.Resource("persistentvolumes")
vaResource = storageapi.Resource("volumeattachments")
svcAcctResource = api.Resource("serviceaccounts")
leaseResource = coordapi.Resource("leases")
)
func (r *NodeAuthorizer) Authorize(attrs authorizer.Attributes) (authorizer.Decision, string, error) {
@ -113,6 +115,11 @@ func (r *NodeAuthorizer) Authorize(attrs authorizer.Attributes) (authorizer.Deci
return r.authorizeCreateToken(nodeName, serviceAccountVertexType, attrs)
}
return authorizer.DecisionNoOpinion, fmt.Sprintf("disabled by feature gate %s", features.TokenRequest), nil
case leaseResource:
if r.features.Enabled(features.NodeLease) {
return r.authorizeLease(nodeName, attrs)
}
return authorizer.DecisionNoOpinion, fmt.Sprintf("disabled by feature gate %s", features.NodeLease), nil
}
}
@ -215,6 +222,36 @@ func (r *NodeAuthorizer) authorizeCreateToken(nodeName string, startingType vert
return authorizer.DecisionAllow, "", nil
}
// authorizeLease authorizes node requests to coordination.k8s.io/leases.
func (r *NodeAuthorizer) authorizeLease(nodeName string, attrs authorizer.Attributes) (authorizer.Decision, string, error) {
// allowed verbs: get, create, update, patch, delete
verb := attrs.GetVerb()
if verb != "get" &&
verb != "create" &&
verb != "update" &&
verb != "patch" &&
verb != "delete" {
glog.V(2).Infof("NODE DENY: %s %#v", nodeName, attrs)
return authorizer.DecisionNoOpinion, "can only get, create, update, patch, or delete a node lease", nil
}
// the request must be against the system namespace reserved for node leases
if attrs.GetNamespace() != api.NamespaceNodeLease {
glog.V(2).Infof("NODE DENY: %s %#v", nodeName, attrs)
return authorizer.DecisionNoOpinion, fmt.Sprintf("can only access leases in the %q system namespace", api.NamespaceNodeLease), nil
}
// the request must come from a node with the same name as the lease
// note we skip this check for create, since the authorizer doesn't know the name on create
// the noderestriction admission plugin is capable of performing this check at create time
if verb != "create" && attrs.GetName() != nodeName {
glog.V(2).Infof("NODE DENY: %s %#v", nodeName, attrs)
return authorizer.DecisionNoOpinion, "can only access node lease with the same name as the requesting node", nil
}
return authorizer.DecisionAllow, "", nil
}
// hasPathFrom returns true if there is a directed path from the specified type/namespace/name to the specified Node
func (r *NodeAuthorizer) hasPathFrom(nodeName string, startingType vertexType, startingNamespace, startingName string) (bool, error) {
r.graph.lock.RLock()

View File

@ -39,10 +39,12 @@ import (
)
var (
csiEnabledFeature = utilfeature.NewFeatureGate()
csiDisabledFeature = utilfeature.NewFeatureGate()
trEnabledFeature = utilfeature.NewFeatureGate()
trDisabledFeature = utilfeature.NewFeatureGate()
csiEnabledFeature = utilfeature.NewFeatureGate()
csiDisabledFeature = utilfeature.NewFeatureGate()
trEnabledFeature = utilfeature.NewFeatureGate()
trDisabledFeature = utilfeature.NewFeatureGate()
leaseEnabledFeature = utilfeature.NewFeatureGate()
leaseDisabledFeature = utilfeature.NewFeatureGate()
)
func init() {
@ -58,6 +60,12 @@ func init() {
if err := trDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.TokenRequest: {Default: false}}); err != nil {
panic(err)
}
if err := leaseEnabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.NodeLease: {Default: true}}); err != nil {
panic(err)
}
if err := leaseDisabledFeature.Add(map[utilfeature.Feature]utilfeature.FeatureSpec{features.NodeLease: {Default: false}}); err != nil {
panic(err)
}
}
func TestAuthorizer(t *testing.T) {
@ -228,6 +236,108 @@ func TestAuthorizer(t *testing.T) {
features: trEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed node lease - feature disabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "get", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: corev1.NamespaceNodeLease},
features: leaseDisabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed get lease in namespace other than kube-node-lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "get", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: "foo"},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed create lease in namespace other than kube-node-lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "create", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: "foo"},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed update lease in namespace other than kube-node-lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "update", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: "foo"},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed patch lease in namespace other than kube-node-lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "patch", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: "foo"},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed delete lease in namespace other than kube-node-lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "delete", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: "foo"},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed get another node's lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "get", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node1", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed update another node's lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "update", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node1", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed patch another node's lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "patch", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node1", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed delete another node's lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "delete", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node1", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed list node leases - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "list", Resource: "leases", APIGroup: "coordination.k8s.io", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "disallowed watch node leases - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "watch", Resource: "leases", APIGroup: "coordination.k8s.io", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionNoOpinion,
},
{
name: "allowed get node lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "get", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionAllow,
},
{
name: "allowed create node lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "create", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionAllow,
},
{
name: "allowed update node lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "update", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionAllow,
},
{
name: "allowed patch node lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "patch", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionAllow,
},
{
name: "allowed delete node lease - feature enabled",
attrs: authorizer.AttributesRecord{User: node0, ResourceRequest: true, Verb: "delete", Resource: "leases", APIGroup: "coordination.k8s.io", Name: "node0", Namespace: corev1.NamespaceNodeLease},
features: leaseEnabledFeature,
expect: authorizer.DecisionAllow,
},
}
for _, tc := range tests {

View File

@ -160,6 +160,11 @@ func NodeRules() []rbacv1.PolicyRule {
volAttachRule := rbacv1helpers.NewRule("get").Groups(storageGroup).Resources("volumeattachments").RuleOrDie()
nodePolicyRules = append(nodePolicyRules, volAttachRule)
}
// Node leases
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
nodePolicyRules = append(nodePolicyRules, rbacv1helpers.NewRule("get", "create", "update", "patch", "delete").Groups("coordination.k8s.io").Resources("leases").RuleOrDie())
}
return nodePolicyRules
}

View File

@ -28,6 +28,8 @@ const (
NamespaceDefault string = "default"
// NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces
NamespaceAll string = ""
// NamespaceNodeLease is the namespace where we place node lease objects (used for node heartbeats)
NamespaceNodeLease string = "kube-node-lease"
)
// Volume represents a named volume in a pod that may be accessed by any container in the pod.

View File

@ -24,6 +24,7 @@ go_library(
"kubelet_etc_hosts.go",
"lifecycle_hook.go",
"networking.go",
"node_lease.go",
"pods.go",
"privileged.go",
"projected.go",
@ -45,6 +46,7 @@ go_library(
"//pkg/security/apparmor:go_default_library",
"//pkg/util/version:go_default_library",
"//staging/src/k8s.io/api/autoscaling/v1:go_default_library",
"//staging/src/k8s.io/api/coordination/v1beta1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -0,0 +1,92 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
import (
"fmt"
"time"
coordv1beta1 "k8s.io/api/coordination/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = framework.KubeDescribe("[Feature:NodeLease][NodeAlphaFeature:NodeLease]", func() {
f := framework.NewDefaultFramework("node-lease-test")
Context("when the NodeLease feature is enabled", func() {
It("the Kubelet should create and update a lease in the kube-node-lease namespace", func() {
leaseClient := f.ClientSet.CoordinationV1beta1().Leases(corev1.NamespaceNodeLease)
var (
err error
lease *coordv1beta1.Lease
)
// check that lease for this Kubelet exists in the kube-node-lease namespace
Eventually(func() error {
lease, err = leaseClient.Get(framework.TestContext.NodeName, metav1.GetOptions{})
if err != nil {
return err
}
return nil
}, 5*time.Minute, 5*time.Second).Should(BeNil())
// check basic expectations for the lease
Expect(expectLease(lease)).To(BeNil())
// ensure that at least one lease renewal happens within the
// lease duration by checking for a change to renew time
Eventually(func() error {
newLease, err := leaseClient.Get(framework.TestContext.NodeName, metav1.GetOptions{})
if err != nil {
return err
}
// check basic expectations for the latest lease
if err := expectLease(newLease); err != nil {
return err
}
// check that RenewTime has been updated on the latest lease
newTime := (*newLease.Spec.RenewTime).Time
oldTime := (*lease.Spec.RenewTime).Time
if !newTime.After(oldTime) {
return fmt.Errorf("new lease has time %v, which is not after old lease time %v", newTime, oldTime)
}
return nil
}, time.Duration(*lease.Spec.LeaseDurationSeconds)*time.Second,
time.Duration(*lease.Spec.LeaseDurationSeconds/3)*time.Second)
})
})
})
func expectLease(lease *coordv1beta1.Lease) error {
// expect values for HolderIdentity, LeaseDurationSeconds, and RenewTime
if lease.Spec.HolderIdentity == nil {
return fmt.Errorf("Spec.HolderIdentity should not be nil")
}
if lease.Spec.LeaseDurationSeconds == nil {
return fmt.Errorf("Spec.LeaseDurationSeconds should not be nil")
}
if lease.Spec.RenewTime == nil {
return fmt.Errorf("Spec.RenewTime should not be nil")
}
// ensure that the HolderIdentity matches the node name
if *lease.Spec.HolderIdentity != framework.TestContext.NodeName {
return fmt.Errorf("Spec.HolderIdentity (%v) should match the node name (%v)", *lease.Spec.HolderIdentity, framework.TestContext.NodeName)
}
return nil
}

View File

@ -23,6 +23,7 @@ go_test(
"//pkg/api/testapi:go_default_library",
"//pkg/apis/authorization:go_default_library",
"//pkg/apis/autoscaling:go_default_library",
"//pkg/apis/coordination:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/policy:go_default_library",
@ -84,6 +85,7 @@ go_test(
"//test/integration/framework:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/gopkg.in/square/go-jose.v2/jwt:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@ -38,6 +38,7 @@ import (
externalclientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/coordination"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/policy"
"k8s.io/kubernetes/pkg/auth/nodeidentifier"
@ -46,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer"
"k8s.io/kubernetes/plugin/pkg/admission/noderestriction"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
)
func TestNodeAuthorizer(t *testing.T) {
@ -82,6 +84,9 @@ func TestNodeAuthorizer(t *testing.T) {
// Enable DynamicKubeletConfig feature so that Node.Spec.ConfigSource can be set
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicKubeletConfig, true)()
// Enable NodeLease feature so that nodes can create leases
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeLease, true)()
// Set up Node+RBAC authorizer
authorizerConfig := &authorizer.AuthorizationConfig{
AuthorizationModes: []string{"Node", "RBAC"},
@ -365,6 +370,54 @@ func TestNodeAuthorizer(t *testing.T) {
}
}
getNode1Lease := func(client clientset.Interface) func() error {
return func() error {
_, err := client.Coordination().Leases(api.NamespaceNodeLease).Get("node1", metav1.GetOptions{})
return err
}
}
node1LeaseDurationSeconds := int32(40)
createNode1Lease := func(client clientset.Interface) func() error {
return func() error {
lease := &coordination.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Spec: coordination.LeaseSpec{
HolderIdentity: pointer.StringPtr("node1"),
LeaseDurationSeconds: pointer.Int32Ptr(node1LeaseDurationSeconds),
RenewTime: &metav1.MicroTime{Time: time.Now()},
},
}
_, err := client.Coordination().Leases(api.NamespaceNodeLease).Create(lease)
return err
}
}
updateNode1Lease := func(client clientset.Interface) func() error {
return func() error {
lease, err := client.Coordination().Leases(api.NamespaceNodeLease).Get("node1", metav1.GetOptions{})
if err != nil {
return err
}
lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
_, err = client.Coordination().Leases(api.NamespaceNodeLease).Update(lease)
return err
}
}
patchNode1Lease := func(client clientset.Interface) func() error {
return func() error {
node1LeaseDurationSeconds++
bs := []byte(fmt.Sprintf(`{"spec": {"leaseDurationSeconds": %d}}`, node1LeaseDurationSeconds))
_, err := client.Coordination().Leases(api.NamespaceNodeLease).Patch("node1", types.StrategicMergePatchType, bs)
return err
}
}
deleteNode1Lease := func(client clientset.Interface) func() error {
return func() error {
return client.Coordination().Leases(api.NamespaceNodeLease).Delete("node1", &metav1.DeleteOptions{})
}
}
nodeanonClient, _ := clientsetForToken(tokenNodeUnknown, clientConfig)
node1Client, node1ClientExternal := clientsetForToken(tokenNode1, clientConfig)
node2Client, node2ClientExternal := clientsetForToken(tokenNode2, clientConfig)
@ -506,6 +559,19 @@ func TestNodeAuthorizer(t *testing.T) {
expectAllowed(t, deleteNode2(node2Client))
//TODO(mikedanese): integration test node restriction of TokenRequest
// node1 allowed to operate on its own lease
expectAllowed(t, createNode1Lease(node1Client))
expectAllowed(t, getNode1Lease(node1Client))
expectAllowed(t, updateNode1Lease(node1Client))
expectAllowed(t, patchNode1Lease(node1Client))
expectAllowed(t, deleteNode1Lease(node1Client))
// node2 not allowed to operate on another node's lease
expectForbidden(t, createNode1Lease(node2Client))
expectForbidden(t, getNode1Lease(node2Client))
expectForbidden(t, updateNode1Lease(node2Client))
expectForbidden(t, patchNode1Lease(node2Client))
expectForbidden(t, deleteNode1Lease(node2Client))
}
// expect executes a function a set number of times until it either returns the