From 1c468bf2e2c3db880af7485310334ffbff39ffe3 Mon Sep 17 00:00:00 2001 From: viegasdom Date: Thu, 11 Apr 2019 14:17:08 +0100 Subject: [PATCH] Fix golint failures of util/bandwith/*.go --- CHANGELOG-1.14.md | 1 + build/workspace.bzl | 12 +- cluster/gce/config-default.sh | 3 + cluster/gce/config-test.sh | 3 + cluster/gce/gci/configure-helper.sh | 8 +- cluster/gce/gci/configure.sh | 13 +- cluster/gce/util.sh | 2 + cmd/kubeadm/app/phases/certs/certs_test.go | 2 +- hack/.golint_failures | 1 - hack/.shellcheck_failures | 1 - hack/update-codegen.sh | 11 - hack/update-vendor-licenses.sh | 13 +- .../providers/.import-restrictions | 4 +- pkg/scheduler/core/generic_scheduler.go | 3 +- pkg/scheduler/factory/plugins.go | 20 +- pkg/scheduler/internal/cache/node_tree.go | 1 + pkg/util/bandwidth/fake_shaper.go | 7 + pkg/util/bandwidth/interfaces.go | 4 +- pkg/util/bandwidth/linux.go | 6 +- pkg/util/bandwidth/unsupported.go | 3 +- pkg/util/bandwidth/utils.go | 1 + .../apiserver/pkg/storage/cacher/cacher.go | 8 +- staging/src/k8s.io/client-go/INSTALL.md | 14 +- .../in-cluster-client-configuration/README.md | 2 +- .../examples/leader-election/README.md | 8 +- .../README.md | 2 +- .../src/k8s.io/client-go/tools/pager/BUILD | 1 + .../src/k8s.io/client-go/tools/pager/pager.go | 114 +++++++++ .../client-go/tools/pager/pager_test.go | 242 +++++++++++++++++- test/e2e/framework/service_util.go | 90 +++---- 30 files changed, 482 insertions(+), 118 deletions(-) diff --git a/CHANGELOG-1.14.md b/CHANGELOG-1.14.md index f4eeef531e..d54038e304 100644 --- a/CHANGELOG-1.14.md +++ b/CHANGELOG-1.14.md @@ -155,6 +155,7 @@ filename | sha512 hash * [metatada-proxy addon] Bump prometheus-to-sd v0.5.0 to pick up security fixes. * kube-proxy no longer automatically cleans up network rules created by running kube-proxy in other modes. If you are switching the mode that kube-proxy is in running in (EG: iptables to IPVS), you will need to run `kube-proxy --cleanup`, or restart the worker node (recommended) before restarting kube-proxy. ([#76109](https://github.com/kubernetes/kubernetes/pull/76109), [@vllry](https://github.com/vllry)) * If you are not switching kube-proxy between different modes, this change should not require any action. + * This fixes a bug where restarting the iptables proxier can cause connections to fail (https://github.com/kubernetes/kubernetes/issues/75360). * kubeadm: fixes error when upgrading from v1.13 to v1.14 clusters created with kubeadm v1.12. Please note that it is required to upgrade etcd during the final v1.13 to v1.14 upgrade. ([#75956](https://github.com/kubernetes/kubernetes/pull/75956), [@fabriziopandini](https://github.com/fabriziopandini)) * Fixes a regression proxying responses from aggregated API servers which could cause watch requests to hang until the first event was received ([#75887](https://github.com/kubernetes/kubernetes/pull/75887), [@liggitt](https://github.com/liggitt)) * Increased verbose level for local openapi aggregation logs to avoid flooding the log during normal operation ([#75781](https://github.com/kubernetes/kubernetes/pull/75781), [@roycaihw](https://github.com/roycaihw)) diff --git a/build/workspace.bzl b/build/workspace.bzl index cffc0cb71a..b4cd5ab205 100644 --- a/build/workspace.bzl +++ b/build/workspace.bzl @@ -26,13 +26,13 @@ _CNI_TARBALL_ARCH_SHA256 = { "s390x": "415cdcf02c65c22f5b7e55b0ab61208a10f2b95a0c8310176c771d07a9f448cf", } -CRI_TOOLS_VERSION = "1.12.0" +CRI_TOOLS_VERSION = "1.14.0" _CRI_TARBALL_ARCH_SHA256 = { - "amd64": "e7d913bcce40bf54e37ab1d4b75013c823d0551e6bc088b217bc1893207b4844", - "arm": "ca6b4ac80278d32d9cc8b8b19de140fd1cc35640f088969f7068fea2df625490", - "arm64": "8466f08b59bf36d2eebcb9428c3d4e6e224c3065d800ead09ad730ce374da6fe", - "ppc64le": "ec6254f1f6ffa064ba41825aab5612b7b005c8171fbcdac2ca3927d4e393000f", - "s390x": "814aa9cd496be416612c2653097a1c9eb5784e38aa4889034b44ebf888709057", + "amd64": "483c90a9fe679590df4332ba807991c49232e8cd326c307c575ecef7fe22327b", + "arm": "9910cecfd6558239ba015323066c7233d8371af359b9ddd0b2a35d5223bcf945", + "arm64": "f76b3d00a272c8d210e9a45f77d07d3770bee310d99c4fd9a72d6f55278882e5", + "ppc64le": "1e2cd11a1e025ed9755521cf13bb1bda986afa0052597a9bb44d31e62583413b", + "s390x": "8b7b5749cba88ef337997ae90aa04380e3cab2c040b44b505b2fcd691c4935e4", } ETCD_VERSION = "3.3.10" diff --git a/cluster/gce/config-default.sh b/cluster/gce/config-default.sh index a8ef5c0a9c..abc8d0daa4 100755 --- a/cluster/gce/config-default.sh +++ b/cluster/gce/config-default.sh @@ -295,6 +295,9 @@ NODE_PROBLEM_DETECTOR_TAR_HASH="${NODE_PROBLEM_DETECTOR_TAR_HASH:-}" NODE_PROBLEM_DETECTOR_RELEASE_PATH="${NODE_PROBLEM_DETECTOR_RELEASE_PATH:-}" NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS="${NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS:-}" +CNI_VERSION="${CNI_VERSION:-}" +CNI_SHA1="${CNI_SHA1:-}" + # Optional: Create autoscaler for cluster's nodes. ENABLE_CLUSTER_AUTOSCALER="${KUBE_ENABLE_CLUSTER_AUTOSCALER:-false}" if [[ "${ENABLE_CLUSTER_AUTOSCALER}" == "true" ]]; then diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index 3d5002c28d..28f0a68c19 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -307,6 +307,9 @@ NODE_PROBLEM_DETECTOR_TAR_HASH="${NODE_PROBLEM_DETECTOR_TAR_HASH:-}" NODE_PROBLEM_DETECTOR_RELEASE_PATH="${NODE_PROBLEM_DETECTOR_RELEASE_PATH:-}" NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS="${NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS:-}" +CNI_VERSION="${CNI_VERSION:-}" +CNI_SHA1="${CNI_SHA1:-}" + # Optional: Create autoscaler for cluster's nodes. ENABLE_CLUSTER_AUTOSCALER="${KUBE_ENABLE_CLUSTER_AUTOSCALER:-false}" if [[ "${ENABLE_CLUSTER_AUTOSCALER}" == "true" ]]; then diff --git a/cluster/gce/gci/configure-helper.sh b/cluster/gce/gci/configure-helper.sh index b1aa6eb2ee..2adbbf4d87 100644 --- a/cluster/gce/gci/configure-helper.sh +++ b/cluster/gce/gci/configure-helper.sh @@ -1150,19 +1150,19 @@ function create-master-etcd-apiserver-auth { echo "${ETCD_APISERVER_CA_KEY}" | base64 --decode > "${ETCD_APISERVER_CA_KEY_PATH}" ETCD_APISERVER_CA_CERT_PATH="${auth_dir}/etcd-apiserver-ca.crt" - echo "${ETCD_APISERVER_CA_CERT}" | base64 --decode | gunzip > "${auth_dir}/etcd-apiserver-ca.crt" + echo "${ETCD_APISERVER_CA_CERT}" | base64 --decode | gunzip > "${ETCD_APISERVER_CA_CERT_PATH}" ETCD_APISERVER_SERVER_KEY_PATH="${auth_dir}/etcd-apiserver-server.key" echo "${ETCD_APISERVER_SERVER_KEY}" | base64 --decode > "${ETCD_APISERVER_SERVER_KEY_PATH}" ETCD_APISERVER_SERVER_CERT_PATH="${auth_dir}/etcd-apiserver-server.crt" - echo "${ETCD_APISERVER_SERVER_CERT}" | base64 --decode | gunzip > "${auth_dir}/etcd-apiserver-server.crt" + echo "${ETCD_APISERVER_SERVER_CERT}" | base64 --decode | gunzip > "${ETCD_APISERVER_SERVER_CERT_PATH}" ETCD_APISERVER_CLIENT_KEY_PATH="${auth_dir}/etcd-apiserver-client.key" - echo "${ETCD_APISERVER_CLIENT_KEY}" | base64 --decode > "${auth_dir}/etcd-apiserver-client.key" + echo "${ETCD_APISERVER_CLIENT_KEY}" | base64 --decode > "${ETCD_APISERVER_CLIENT_KEY_PATH}" ETCD_APISERVER_CLIENT_CERT_PATH="${auth_dir}/etcd-apiserver-client.crt" - echo "${ETCD_APISERVER_CLIENT_CERT}" | base64 --decode | gunzip > "${auth_dir}/etcd-apiserver-client.crt" + echo "${ETCD_APISERVER_CLIENT_CERT}" | base64 --decode | gunzip > "${ETCD_APISERVER_CLIENT_CERT_PATH}" fi } diff --git a/cluster/gce/gci/configure.sh b/cluster/gce/gci/configure.sh index 2bb9e6224f..f03914616c 100644 --- a/cluster/gce/gci/configure.sh +++ b/cluster/gce/gci/configure.sh @@ -28,8 +28,8 @@ DEFAULT_CNI_VERSION="v0.7.5" DEFAULT_CNI_SHA1="52e9d2de8a5f927307d9397308735658ee44ab8d" DEFAULT_NPD_VERSION="v0.6.3" DEFAULT_NPD_SHA1="3a6ac56be6c121f1b94450bfd1a81ad28d532369" -DEFAULT_CRICTL_VERSION="v1.12.0" -DEFAULT_CRICTL_SHA1="82ef8b44849f9da0589c87e9865d4716573eec7f" +DEFAULT_CRICTL_VERSION="v1.14.0" +DEFAULT_CRICTL_SHA1="1f93c6183d0a4e186708efe7899da7a7bce9c736" DEFAULT_MOUNTER_TAR_SHA="8003b798cf33c7f91320cd6ee5cec4fa22244571" ### @@ -235,8 +235,13 @@ function install-node-problem-detector { } function install-cni-binaries { - local -r cni_tar="cni-plugins-amd64-${DEFAULT_CNI_VERSION}.tgz" - local -r cni_sha1="${DEFAULT_CNI_SHA1}" + if [[ -n "${CNI_VERSION:-}" ]]; then + local -r cni_tar="cni-plugins-amd64-${CNI_VERSION}.tgz" + local -r cni_sha1="${CNI_SHA1}" + else + local -r cni_tar="cni-plugins-amd64-${DEFAULT_CNI_VERSION}.tgz" + local -r cni_sha1="${DEFAULT_CNI_SHA1}" + fi if is-preloaded "${cni_tar}" "${cni_sha1}"; then echo "${cni_tar} is preloaded." return diff --git a/cluster/gce/util.sh b/cluster/gce/util.sh index 543d64f20d..337dee092e 100755 --- a/cluster/gce/util.sh +++ b/cluster/gce/util.sh @@ -1112,6 +1112,8 @@ NODE_PROBLEM_DETECTOR_VERSION: $(yaml-quote ${NODE_PROBLEM_DETECTOR_VERSION:-}) NODE_PROBLEM_DETECTOR_TAR_HASH: $(yaml-quote ${NODE_PROBLEM_DETECTOR_TAR_HASH:-}) NODE_PROBLEM_DETECTOR_RELEASE_PATH: $(yaml-quote ${NODE_PROBLEM_DETECTOR_RELEASE_PATH:-}) NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS: $(yaml-quote ${NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS:-}) +CNI_VERSION: $(yaml-quote ${CNI_VERSION:-}) +CNI_SHA1: $(yaml-quote ${CNI_SHA1:-}) ENABLE_NODE_LOGGING: $(yaml-quote ${ENABLE_NODE_LOGGING:-false}) LOGGING_DESTINATION: $(yaml-quote ${LOGGING_DESTINATION:-}) ELASTICSEARCH_LOGGING_REPLICAS: $(yaml-quote ${ELASTICSEARCH_LOGGING_REPLICAS:-}) diff --git a/cmd/kubeadm/app/phases/certs/certs_test.go b/cmd/kubeadm/app/phases/certs/certs_test.go index 20bf5d4a74..a77a33f95d 100644 --- a/cmd/kubeadm/app/phases/certs/certs_test.go +++ b/cmd/kubeadm/app/phases/certs/certs_test.go @@ -364,7 +364,7 @@ func TestWriteKeyFilesIfNotExist(t *testing.T) { } //TODO: check if there is a better method to compare keys - if resultingKey.D == key.D { + if resultingKey.D == test.expectedKey.D { t.Error("created key does not match expected key") } } diff --git a/hack/.golint_failures b/hack/.golint_failures index 09f0f40a4d..f8523d5e7d 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -319,7 +319,6 @@ pkg/security/podsecuritypolicy/util pkg/securitycontext pkg/serviceaccount pkg/ssh -pkg/util/bandwidth pkg/util/config pkg/util/ebtables pkg/util/env diff --git a/hack/.shellcheck_failures b/hack/.shellcheck_failures index 3fbdac45e6..842679db1d 100644 --- a/hack/.shellcheck_failures +++ b/hack/.shellcheck_failures @@ -51,7 +51,6 @@ ./hack/update-gofmt.sh ./hack/update-openapi-spec.sh ./hack/update-translations.sh -./hack/update-vendor-licenses.sh ./hack/update-vendor.sh ./hack/verify-api-groups.sh ./hack/verify-boilerplate.sh diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index add561e071..126db0dafd 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -40,7 +40,6 @@ informergen=$(kube::util::find-binary "informer-gen") GROUP_VERSIONS=(${KUBE_AVAILABLE_GROUP_VERSIONS}) GV_DIRS=() -INTERNAL_DIRS=() for gv in "${GROUP_VERSIONS[@]}"; do # add items, but strip off any leading apis/ you find to match command expectations api_dir=$(kube::util::group-version-to-pkg-path "${gv}") @@ -55,19 +54,9 @@ for gv in "${GROUP_VERSIONS[@]}"; do fi GV_DIRS+=("${pkg_dir}") - - # collect internal groups - int_group="${pkg_dir%/*}/" - if [[ "${pkg_dir}" = core/* ]]; then - int_group="api/" - fi - if ! [[ " ${INTERNAL_DIRS[@]:-} " =~ " ${int_group} " ]]; then - INTERNAL_DIRS+=("${int_group}") - fi done # delimit by commas for the command GV_DIRS_CSV=$(IFS=',';echo "${GV_DIRS[*]// /,}";IFS=$) -INTERNAL_DIRS_CSV=$(IFS=',';echo "${INTERNAL_DIRS[*]// /,}";IFS=$) # This can be called with one flag, --verify-only, so it works for both the # update- and verify- scripts. diff --git a/hack/update-vendor-licenses.sh b/hack/update-vendor-licenses.sh index b823cb8125..21d7d2f42c 100755 --- a/hack/update-vendor-licenses.sh +++ b/hack/update-vendor-licenses.sh @@ -93,7 +93,8 @@ process_content () { esac # Find files - only root and package level - local_files=($( + local_files=() + IFS=" " read -r -a local_files <<< "$( for dir_root in ${package} ${package_root}; do [[ -d ${DEPS_DIR}/${dir_root} ]] || continue @@ -101,7 +102,7 @@ process_content () { find "${DEPS_DIR}/${dir_root}" \ -xdev -follow -maxdepth ${find_maxdepth} \ -type f "${find_names[@]}" - done | sort -u)) + done | sort -u)" local index local f @@ -126,13 +127,13 @@ process_content () { ############################################################################# # MAIN ############################################################################# -KUBE_ROOT=$(dirname "${BASH_SOURCE}")/.. +KUBE_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. source "${KUBE_ROOT}/hack/lib/init.sh" export GO111MODULE=on # Check bash version -if ((${BASH_VERSINFO[0]}<4)); then +if (( BASH_VERSINFO[0] < 4 )); then echo echo "ERROR: Bash v4+ required." # Extra help for OSX @@ -161,7 +162,7 @@ echo "= Kubernetes licensed under: =" echo cat "${LICENSE_ROOT}/LICENSE" echo -echo "= LICENSE $(cat "${LICENSE_ROOT}/LICENSE" | md5sum | awk '{print $1}')" +echo "= LICENSE $(md5sum < "${LICENSE_ROOT}/LICENSE" | awk '{print $1}')" echo "================================================================================" ) > ${TMP_LICENSE_FILE} @@ -210,7 +211,7 @@ __EOF__ cat "${file}" echo - echo "= ${file} $(cat "${file}" | md5sum | awk '{print $1}')" + echo "= ${file} $(md5sum < "${file}" | awk '{print $1}')" echo "================================================================================" echo done >> ${TMP_LICENSE_FILE} diff --git a/pkg/cloudprovider/providers/.import-restrictions b/pkg/cloudprovider/providers/.import-restrictions index 2d0306a628..80b05fdd20 100644 --- a/pkg/cloudprovider/providers/.import-restrictions +++ b/pkg/cloudprovider/providers/.import-restrictions @@ -15,9 +15,7 @@ "SelectorRegexp": "k8s[.]io/kubernetes", "AllowedPrefixes": [ "k8s.io/kubernetes/pkg/cloudprovider/providers", - "k8s.io/kubernetes/pkg/credentialprovider", - "k8s.io/kubernetes/pkg/util/mount", - "k8s.io/kubernetes/pkg/version" + "k8s.io/kubernetes/pkg/util/mount" ], "ForbiddenPrefixes": [] } diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 116213f7e8..851daaa4bb 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -1160,8 +1160,9 @@ func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedule nomNodeName := pod.Status.NominatedNodeName if len(nomNodeName) > 0 { if nodeInfo, found := nodeNameToInfo[nomNodeName]; found { + podPriority := util.GetPodPriority(pod) for _, p := range nodeInfo.Pods() { - if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) { + if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority { // There is a terminating pod on the nominated node. return false } diff --git a/pkg/scheduler/factory/plugins.go b/pkg/scheduler/factory/plugins.go index 921f17410d..2fc64df6cf 100644 --- a/pkg/scheduler/factory/plugins.go +++ b/pkg/scheduler/factory/plugins.go @@ -245,8 +245,8 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string { // IsFitPredicateRegistered is useful for testing providers. func IsFitPredicateRegistered(name string) bool { - schedulerFactoryMutex.Lock() - defer schedulerFactoryMutex.Unlock() + schedulerFactoryMutex.RLock() + defer schedulerFactoryMutex.RUnlock() _, ok := fitPredicateMap[name] return ok } @@ -408,8 +408,8 @@ func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) { } func getFitPredicateFunctions(names sets.String, args PluginFactoryArgs) (map[string]predicates.FitPredicate, error) { - schedulerFactoryMutex.Lock() - defer schedulerFactoryMutex.Unlock() + schedulerFactoryMutex.RLock() + defer schedulerFactoryMutex.RUnlock() fitPredicates := map[string]predicates.FitPredicate{} for _, name := range names.List() { @@ -451,8 +451,8 @@ func getPredicateMetadataProducer(args PluginFactoryArgs) (predicates.PredicateM } func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]priorities.PriorityConfig, error) { - schedulerFactoryMutex.Lock() - defer schedulerFactoryMutex.Unlock() + schedulerFactoryMutex.RLock() + defer schedulerFactoryMutex.RUnlock() var configs []priorities.PriorityConfig for _, name := range names.List() { @@ -538,8 +538,8 @@ func validatePriorityOrDie(priority schedulerapi.PriorityPolicy) { // ListRegisteredFitPredicates returns the registered fit predicates. func ListRegisteredFitPredicates() []string { - schedulerFactoryMutex.Lock() - defer schedulerFactoryMutex.Unlock() + schedulerFactoryMutex.RLock() + defer schedulerFactoryMutex.RUnlock() var names []string for name := range fitPredicateMap { @@ -550,8 +550,8 @@ func ListRegisteredFitPredicates() []string { // ListRegisteredPriorityFunctions returns the registered priority functions. func ListRegisteredPriorityFunctions() []string { - schedulerFactoryMutex.Lock() - defer schedulerFactoryMutex.Unlock() + schedulerFactoryMutex.RLock() + defer schedulerFactoryMutex.RUnlock() var names []string for name := range priorityFunctionMap { diff --git a/pkg/scheduler/internal/cache/node_tree.go b/pkg/scheduler/internal/cache/node_tree.go index f29024d0ed..1c7ef2c6eb 100644 --- a/pkg/scheduler/internal/cache/node_tree.go +++ b/pkg/scheduler/internal/cache/node_tree.go @@ -127,6 +127,7 @@ func (nt *NodeTree) removeZone(zone string) { for i, z := range nt.zones { if z == zone { nt.zones = append(nt.zones[:i], nt.zones[i+1:]...) + return } } } diff --git a/pkg/util/bandwidth/fake_shaper.go b/pkg/util/bandwidth/fake_shaper.go index 8c95e3bb31..78577185d7 100644 --- a/pkg/util/bandwidth/fake_shaper.go +++ b/pkg/util/bandwidth/fake_shaper.go @@ -22,28 +22,35 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) +// FakeShaper provides an implementation of the bandwith.Shaper. +// Beware this is implementation has no features besides Reset and GetCIDRs. type FakeShaper struct { CIDRs []string ResetCIDRs []string } +// Limit is not implemented func (f *FakeShaper) Limit(cidr string, egress, ingress *resource.Quantity) error { return errors.New("unimplemented") } +// Reset appends a particular CIDR to the set of ResetCIDRs being managed by this shaper func (f *FakeShaper) Reset(cidr string) error { f.ResetCIDRs = append(f.ResetCIDRs, cidr) return nil } +// ReconcileInterface is not implemented func (f *FakeShaper) ReconcileInterface() error { return errors.New("unimplemented") } +// ReconcileCIDR is not implemented func (f *FakeShaper) ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error { return errors.New("unimplemented") } +// GetCIDRs returns the set of CIDRs that are being managed by this shaper func (f *FakeShaper) GetCIDRs() ([]string, error) { return f.CIDRs, nil } diff --git a/pkg/util/bandwidth/interfaces.go b/pkg/util/bandwidth/interfaces.go index 6b0e160aae..ec29d5d104 100644 --- a/pkg/util/bandwidth/interfaces.go +++ b/pkg/util/bandwidth/interfaces.go @@ -18,7 +18,9 @@ package bandwidth import "k8s.io/apimachinery/pkg/api/resource" -type BandwidthShaper interface { +// Shaper is designed so that the shaper structs created +// satisfy the Shaper interface. +type Shaper interface { // Limit the bandwidth for a particular CIDR on a particular interface // * ingress and egress are in bits/second // * cidr is expected to be a valid network CIDR (e.g. '1.2.3.4/32' or '10.20.0.1/16') diff --git a/pkg/util/bandwidth/linux.go b/pkg/util/bandwidth/linux.go index 7050b4f763..725c2557e8 100644 --- a/pkg/util/bandwidth/linux.go +++ b/pkg/util/bandwidth/linux.go @@ -44,6 +44,7 @@ type tcShaper struct { iface string } +// NewTCShaper makes a new tcShaper for the given interface func NewTCShaper(iface string) BandwidthShaper { shaper := &tcShaper{ e: exec.New(), @@ -157,10 +158,9 @@ func (t *tcShaper) findCIDRClass(cidr string) (classAndHandleList [][]string, fo // filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1 if len(parts) != 19 { return classAndHandleList, false, fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts) - } else { - resultTmp := []string{parts[18], parts[9]} - classAndHandleList = append(classAndHandleList, resultTmp) } + resultTmp := []string{parts[18], parts[9]} + classAndHandleList = append(classAndHandleList, resultTmp) } } if len(classAndHandleList) > 0 { diff --git a/pkg/util/bandwidth/unsupported.go b/pkg/util/bandwidth/unsupported.go index 7d556fd64d..929f5e0584 100644 --- a/pkg/util/bandwidth/unsupported.go +++ b/pkg/util/bandwidth/unsupported.go @@ -27,7 +27,8 @@ import ( type unsupportedShaper struct { } -func NewTCShaper(iface string) BandwidthShaper { +// NewTCShaper makes a new unsupportedShapper for the given interface +func NewTCShaper(iface string) Shaper { return &unsupportedShaper{} } diff --git a/pkg/util/bandwidth/utils.go b/pkg/util/bandwidth/utils.go index 451ab68836..b29825bdfb 100644 --- a/pkg/util/bandwidth/utils.go +++ b/pkg/util/bandwidth/utils.go @@ -35,6 +35,7 @@ func validateBandwidthIsReasonable(rsrc *resource.Quantity) error { return nil } +// ExtractPodBandwidthResources extracts the ingress and egress from the given pod annotations func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) { if podAnnotations == nil { return nil, nil, nil diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index d538531835..0bb903f8c4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -234,6 +234,8 @@ type Cacher struct { // its internal cache and updating its cache in the background based on the // given configuration. func NewCacherFromConfig(config Config) *Cacher { + stopCh := make(chan struct{}) + watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix @@ -245,7 +247,6 @@ func NewCacherFromConfig(config Config) *Cacher { panic("storage codec doesn't seem to match given type: " + err.Error()) } - stopCh := make(chan struct{}) reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0) // Configure reflector's pager to for an appropriate pagination chunk size for fetching data from // storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error. @@ -774,12 +775,9 @@ func (c *Cacher) isStopped() bool { // Stop implements the graceful termination. func (c *Cacher) Stop() { - // avoid stopping twice (note: cachers are shared with subresources) - if c.isStopped() { - return - } c.stopLock.Lock() if c.stopped { + // avoid stopping twice (note: cachers are shared with subresources) c.stopLock.Unlock() return } diff --git a/staging/src/k8s.io/client-go/INSTALL.md b/staging/src/k8s.io/client-go/INSTALL.md index 9236f334a3..db97efc515 100644 --- a/staging/src/k8s.io/client-go/INSTALL.md +++ b/staging/src/k8s.io/client-go/INSTALL.md @@ -7,7 +7,7 @@ library install, don't mind getting HEAD (which may be less stable than a particular release), then simply: ```sh -$ go get k8s.io/client-go@master +go get k8s.io/client-go@master ``` This will record a dependency on `k8s.io/client-go` in your go module. @@ -24,12 +24,12 @@ If you are using a version of go prior to 1.11, or do not wish to use go modules, you can download `k8s.io/client-go` to your `$GOPATH` instead: ```sh -$ go get -u k8s.io/client-go/... -$ go get -u k8s.io/apimachinery/... -$ cd $GOPATH/src/k8s.io/client-go -$ git checkout v11.0.0 -$ cd $GOPATH/src/k8s.io/apimachinery -$ git checkout kubernetes-1.14.0 +go get -u k8s.io/client-go/... +go get -u k8s.io/apimachinery/... +cd $GOPATH/src/k8s.io/client-go +git checkout v11.0.0 +cd $GOPATH/src/k8s.io/apimachinery +git checkout kubernetes-1.14.0 ``` This downloads a version of `k8s.io/client-go` prior to v1.12.0, diff --git a/staging/src/k8s.io/client-go/examples/in-cluster-client-configuration/README.md b/staging/src/k8s.io/client-go/examples/in-cluster-client-configuration/README.md index 6f42ca3575..5aaf495689 100644 --- a/staging/src/k8s.io/client-go/examples/in-cluster-client-configuration/README.md +++ b/staging/src/k8s.io/client-go/examples/in-cluster-client-configuration/README.md @@ -37,7 +37,7 @@ kubectl create clusterrolebinding default-view --clusterrole=view --serviceaccou Then, run the image in a Pod with a single instance Deployment: - $ kubectl run --rm -i demo --image=in-cluster --image-pull-policy=Never + kubectl run --rm -i demo --image=in-cluster --image-pull-policy=Never There are 4 pods in the cluster There are 4 pods in the cluster diff --git a/staging/src/k8s.io/client-go/examples/leader-election/README.md b/staging/src/k8s.io/client-go/examples/leader-election/README.md index cded1f63b3..7a182c6ecc 100644 --- a/staging/src/k8s.io/client-go/examples/leader-election/README.md +++ b/staging/src/k8s.io/client-go/examples/leader-election/README.md @@ -8,14 +8,14 @@ Run the following three commands in separate terminals. Each terminal needs a un ```bash # first terminal -$ go run *.go -kubeconfig=/my/config -logtostderr=true -id=1 +go run *.go -kubeconfig=/my/config -logtostderr=true -id=1 # second terminal -$ go run *.go -kubeconfig=/my/config -logtostderr=true -id=2 +go run *.go -kubeconfig=/my/config -logtostderr=true -id=2 # third terminal -$ go run *.go -kubeconfig=/my/config -logtostderr=true -id=3 +go run *.go -kubeconfig=/my/config -logtostderr=true -id=3 ``` > You can ignore the `-kubeconfig` flag if you are running these commands in the Kubernetes cluster. -Now kill the existing leader. You will see from the terminal outputs that one of the remaining two processes will be elected as the new leader. \ No newline at end of file +Now kill the existing leader. You will see from the terminal outputs that one of the remaining two processes will be elected as the new leader. diff --git a/staging/src/k8s.io/client-go/examples/out-of-cluster-client-configuration/README.md b/staging/src/k8s.io/client-go/examples/out-of-cluster-client-configuration/README.md index c2bccfb63b..b81e6be73a 100644 --- a/staging/src/k8s.io/client-go/examples/out-of-cluster-client-configuration/README.md +++ b/staging/src/k8s.io/client-go/examples/out-of-cluster-client-configuration/README.md @@ -22,7 +22,7 @@ Run this application with: Running this application will use the kubeconfig file and then authenticate to the cluster, and print the number of pods in the cluster every 10 seconds: - $ ./app + ./app There are 3 pods in the cluster There are 3 pods in the cluster There are 3 pods in the cluster diff --git a/staging/src/k8s.io/client-go/tools/pager/BUILD b/staging/src/k8s.io/client-go/tools/pager/BUILD index 304d5b6506..9cf8111a63 100644 --- a/staging/src/k8s.io/client-go/tools/pager/BUILD +++ b/staging/src/k8s.io/client-go/tools/pager/BUILD @@ -17,6 +17,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/pager/pager.go b/staging/src/k8s.io/client-go/tools/pager/pager.go index 74ea3586ab..d265db7868 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager.go @@ -25,9 +25,11 @@ import ( metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" ) const defaultPageSize = 500 +const defaultPageBufferSize = 10 // ListPageFunc returns a list object for the given list options. type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) @@ -48,6 +50,9 @@ type ListPager struct { PageFn ListPageFunc FullListIfExpired bool + + // Number of pages to buffer + PageBufferSize int32 } // New creates a new pager from the provided pager function using the default @@ -58,6 +63,7 @@ func New(fn ListPageFunc) *ListPager { PageSize: defaultPageSize, PageFn: fn, FullListIfExpired: true, + PageBufferSize: defaultPageBufferSize, } } @@ -73,6 +79,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti } var list *metainternalversion.List for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + obj, err := p.PageFn(ctx, options) if err != nil { if !errors.IsResourceExpired(err) || !p.FullListIfExpired { @@ -115,3 +127,105 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti options.Continue = m.GetContinue() } } + +// EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If +// fn returns an error, processing stops and that error is returned. If fn does not return an error, +// any error encountered while retrieving the list from the server is returned. If the context +// cancels or times out, the context error is returned. Since the list is retrieved in paginated +// chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list +// requests exceed the expiration limit of the apiserver being called. +// +// Items are retrieved in chunks from the server to reduce the impact on the server with up to +// ListPager.PageBufferSize chunks buffered concurrently in the background. +func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { + return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error { + return meta.EachListItem(obj, fn) + }) +} + +// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on +// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does +// not return an error, any error encountered while retrieving the list from the server is +// returned. If the context cancels or times out, the context error is returned. Since the list is +// retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if +// the pagination list requests exceed the expiration limit of the apiserver being called. +// +// Up to ListPager.PageBufferSize chunks are buffered concurrently in the background. +func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { + if p.PageBufferSize < 0 { + return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize) + } + + // Ensure background goroutine is stopped if this call exits before all list items are + // processed. Cancelation error from this deferred cancel call is never returned to caller; + // either the list result has already been sent to bgResultC or the fn error is returned and + // the cancelation error is discarded. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + chunkC := make(chan runtime.Object, p.PageBufferSize) + bgResultC := make(chan error, 1) + go func() { + defer utilruntime.HandleCrash() + + var err error + defer func() { + close(chunkC) + bgResultC <- err + }() + err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error { + select { + case chunkC <- chunk: // buffer the chunk, this can block + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) + }() + + for o := range chunkC { + err := fn(o) + if err != nil { + return err // any fn error should be returned immediately + } + } + // promote the results of our background goroutine to the foreground + return <-bgResultC +} + +// eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list +// chunk. If fn returns an error, processing stops and that error is returned. If fn does not return +// an error, any error encountered while retrieving the list from the server is returned. If the +// context cancels or times out, the context error is returned. Since the list is retrieved in +// paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the +// pagination list requests exceed the expiration limit of the apiserver being called. +func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error { + if options.Limit == 0 { + options.Limit = p.PageSize + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + obj, err := p.PageFn(ctx, options) + if err != nil { + return err + } + m, err := meta.ListAccessor(obj) + if err != nil { + return fmt.Errorf("returned object must be a list: %v", err) + } + if err := fn(obj); err != nil { + return err + } + // if we have no more items, return. + if len(m.GetContinue()) == 0 { + return nil + } + // set the next loop up + options.Continue = m.GetContinue() + } +} diff --git a/staging/src/k8s.io/client-go/tools/pager/pager_test.go b/staging/src/k8s.io/client-go/tools/pager/pager_test.go index ae517cab20..2332b53d78 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager_test.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager_test.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" "testing" + "time" "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" @@ -115,7 +116,6 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options } return p.PagedList(ctx, options) } - func TestListPager_List(t *testing.T) { type fields struct { PageSize int64 @@ -189,7 +189,11 @@ func TestListPager_List(t *testing.T) { PageFn: tt.fields.PageFn, FullListIfExpired: tt.fields.FullListIfExpired, } - got, err := p.List(tt.args.ctx, tt.args.options) + ctx := tt.args.ctx + if ctx == nil { + ctx = context.Background() + } + got, err := p.List(ctx, tt.args.options) if (err != nil) != tt.wantErr { t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) return @@ -204,3 +208,237 @@ func TestListPager_List(t *testing.T) { }) } } + +func TestListPager_EachListItem(t *testing.T) { + type fields struct { + PageSize int64 + PageFn ListPageFunc + } + tests := []struct { + name string + fields fields + want runtime.Object + wantErr bool + wantPanic bool + isExpired bool + processorErrorOnItem int + processorPanicOnItem int + cancelContextOnItem int + }{ + { + name: "empty page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, + want: list(0, "rv:20"), + }, + { + name: "one page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, + want: list(9, "rv:20"), + }, + { + name: "one full page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, + want: list(10, "rv:20"), + }, + { + name: "two pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + want: list(11, "rv:20"), + }, + { + name: "three pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, + want: list(21, "rv:20"), + }, + { + name: "expires on second page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, + want: list(10, "rv:20"), // all items on the first page should have been visited + wantErr: true, + isExpired: true, + }, + { + name: "error processing item", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, + want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited + wantPanic: true, + processorPanicOnItem: 3, + }, + { + name: "cancel context while processing", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, + want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited + wantErr: true, + cancelContextOnItem: 3, + }, + { + name: "panic processing item", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList}, + want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited + wantPanic: true, + }, + } + + processorErr := fmt.Errorf("processor error") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + p := &ListPager{ + PageSize: tt.fields.PageSize, + PageFn: tt.fields.PageFn, + } + var items []runtime.Object + + fn := func(obj runtime.Object) error { + items = append(items, obj) + if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem { + return processorErr + } + if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem { + panic(processorErr) + } + if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem { + cancel() + } + return nil + } + var err error + var panic interface{} + func() { + defer func() { + panic = recover() + }() + err = p.EachListItem(ctx, metav1.ListOptions{}, fn) + }() + if (panic != nil) && !tt.wantPanic { + t.Fatalf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic) + } else { + return + } + if (err != nil) != tt.wantErr { + t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.isExpired != errors.IsResourceExpired(err) { + t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired) + return + } + if tt.processorErrorOnItem > 0 && err != processorErr { + t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem) + return + } + l := tt.want.(*metainternalversion.List) + if !reflect.DeepEqual(items, l.Items) { + t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items) + } + }) + } +} + +func TestListPager_eachListPageBuffered(t *testing.T) { + tests := []struct { + name string + totalPages int + pagesProcessed int + wantPageLists int + pageBufferSize int32 + pageSize int + }{ + { + name: "no buffer, one total page", + totalPages: 1, + pagesProcessed: 1, + wantPageLists: 1, + pageBufferSize: 0, + }, { + name: "no buffer, 1/5 pages processed", + totalPages: 5, + pagesProcessed: 1, + wantPageLists: 2, // 1 received for processing, 1 listed + pageBufferSize: 0, + }, + { + name: "no buffer, 2/5 pages processed", + totalPages: 5, + pagesProcessed: 2, + wantPageLists: 3, + pageBufferSize: 0, + }, + { + name: "no buffer, 5/5 pages processed", + totalPages: 5, + pagesProcessed: 5, + wantPageLists: 5, + pageBufferSize: 0, + }, + { + name: "size 1 buffer, 1/5 pages processed", + totalPages: 5, + pagesProcessed: 1, + wantPageLists: 3, + pageBufferSize: 1, + }, + { + name: "size 1 buffer, 5/5 pages processed", + totalPages: 5, + pagesProcessed: 5, + wantPageLists: 5, + pageBufferSize: 1, + }, + { + name: "size 10 buffer, 1/5 page processed", + totalPages: 5, + pagesProcessed: 1, + wantPageLists: 5, + pageBufferSize: 10, // buffer is larger than list + }, + } + processorErr := fmt.Errorf("processor error") + pageSize := 10 + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"} + pageLists := 0 + wantedPageListsDone := make(chan struct{}) + listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + pageLists++ + if pageLists == tt.wantPageLists { + close(wantedPageListsDone) + } + return pgr.PagedList(ctx, options) + } + p := &ListPager{ + PageSize: int64(pageSize), + PageBufferSize: tt.pageBufferSize, + PageFn: listFn, + } + + pagesProcessed := 0 + fn := func(obj runtime.Object) error { + pagesProcessed++ + if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 { + // wait for buffering to catch up + select { + case <-time.After(time.Second): + return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists) + case <-wantedPageListsDone: + } + return processorErr + } + return nil + } + err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn) + if tt.pagesProcessed > 0 && err == processorErr { + // expected + } else if err != nil { + t.Fatal(err) + } + if tt.wantPageLists > 0 && pageLists != tt.wantPageLists { + t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists) + } + if pagesProcessed != tt.pagesProcessed { + t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed) + } + }) + } +} diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index 28bf1749da..bfd756beb7 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -43,7 +43,7 @@ import ( testutils "k8s.io/kubernetes/test/utils" imageutils "k8s.io/kubernetes/test/utils/image" - . "github.com/onsi/ginkgo" + "github.com/onsi/ginkgo" ) const ( @@ -65,12 +65,13 @@ const ( // on AWS. A few minutes is typical, so use 10m. LoadBalancerLagTimeoutAWS = 10 * time.Minute - // How long to wait for a load balancer to be created/modified. - //TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable + // LoadBalancerCreateTimeoutDefault is the default time to wait for a load balancer to be created/modified. + // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable LoadBalancerCreateTimeoutDefault = 20 * time.Minute - LoadBalancerCreateTimeoutLarge = 2 * time.Hour + // LoadBalancerCreateTimeoutLarge is the maximum time to wait for a load balancer to be created/modified. + LoadBalancerCreateTimeoutLarge = 2 * time.Hour - // Time required by the loadbalancer to cleanup, proportional to numApps/Ing. + // LoadBalancerCleanupTimeout is the time required by the loadbalancer to cleanup, proportional to numApps/Ing. // Bring the cleanup timeout back down to 5m once b/33588344 is resolved. LoadBalancerCleanupTimeout = 15 * time.Minute @@ -97,10 +98,10 @@ const ( AffinityConfirmCount = 15 ) -// This should match whatever the default/configured range is +// ServiceNodePortRange should match whatever the default/configured range is var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768} -// A test jig to help service testing. +// ServiceTestJig is a test jig to help service testing. type ServiceTestJig struct { ID string Name string @@ -255,7 +256,7 @@ func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.Se // If createPod is true, it also creates an RC with 1 replica of // the standard netexec container used everywhere in this test. func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *v1.Service { - By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local") + ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local") svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeNodePort svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal @@ -263,7 +264,7 @@ func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName s }) if createPod { - By("creating a pod to be part of the service " + serviceName) + ginkgo.By("creating a pod to be part of the service " + serviceName) j.RunOrFail(namespace, nil) } j.SanityCheckService(svc, v1.ServiceTypeNodePort) @@ -276,7 +277,7 @@ func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName s // the standard netexec container used everywhere in this test. func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool, tweak func(svc *v1.Service)) *v1.Service { - By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local") + ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local") svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer // We need to turn affinity off for our LB distribution tests @@ -288,10 +289,10 @@ func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceNa }) if createPod { - By("creating a pod to be part of the service " + serviceName) + ginkgo.By("creating a pod to be part of the service " + serviceName) j.RunOrFail(namespace, nil) } - By("waiting for loadbalancer for service " + namespace + "/" + serviceName) + ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName) svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout) j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer) return svc @@ -300,7 +301,7 @@ func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceNa // CreateLoadBalancerService creates a loadbalancer service and waits // for it to acquire an ingress IP. func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service { - By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer") + ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer") svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer // We need to turn affinity off for our LB distribution tests @@ -310,7 +311,7 @@ func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string } }) - By("waiting for loadbalancer for service " + namespace + "/" + serviceName) + ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName) svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout) j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer) return svc @@ -402,7 +403,7 @@ func (j *ServiceTestJig) GetEndpointNodes(svc *v1.Service) map[string][]string { return nodeMap } -// getNodes returns the first maxNodesForTest nodes. Useful in large clusters +// GetNodes returns the first maxNodesForTest nodes. Useful in large clusters // where we don't eg: want to create an endpoint per node. func (j *ServiceTestJig) GetNodes(maxNodesForTest int) (nodes *v1.NodeList) { nodes = GetReadySchedulableNodesOrDie(j.Client) @@ -1053,13 +1054,13 @@ func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, requ return nil } -// Simple helper class to avoid too much boilerplate in tests +// ServiceTestFixture is a simple helper class to avoid too much boilerplate in tests type ServiceTestFixture struct { ServiceName string Namespace string Client clientset.Interface - TestId string + TestID string Labels map[string]string rcs map[string]bool @@ -1073,9 +1074,9 @@ func NewServerTest(client clientset.Interface, namespace string, serviceName str t.Client = client t.Namespace = namespace t.ServiceName = serviceName - t.TestId = t.ServiceName + "-" + string(uuid.NewUUID()) + t.TestID = t.ServiceName + "-" + string(uuid.NewUUID()) t.Labels = map[string]string{ - "testid": t.TestId, + "testid": t.TestID, } t.rcs = make(map[string]bool) @@ -1087,7 +1088,7 @@ func NewServerTest(client clientset.Interface, namespace string, serviceName str return t } -// Build default config for a service (which can then be changed) +// BuildServiceSpec builds default config for a service (which can then be changed) func (t *ServiceTestFixture) BuildServiceSpec() *v1.Service { service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -1128,7 +1129,7 @@ func (t *ServiceTestFixture) CreateRC(rc *v1.ReplicationController) (*v1.Replica return rc, err } -// Create a service, and record it for cleanup +// CreateService creates a service, and record it for cleanup func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, error) { result, err := t.Client.CoreV1().Services(t.Namespace).Create(service) if err == nil { @@ -1137,7 +1138,7 @@ func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, er return result, err } -// Delete a service, and remove it from the cleanup list +// DeleteService deletes a service, and remove it from the cleanup list func (t *ServiceTestFixture) DeleteService(serviceName string) error { err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil) if err == nil { @@ -1149,7 +1150,7 @@ func (t *ServiceTestFixture) DeleteService(serviceName string) error { func (t *ServiceTestFixture) Cleanup() []error { var errs []error for rcName := range t.rcs { - By("stopping RC " + rcName + " in namespace " + t.Namespace) + ginkgo.By("stopping RC " + rcName + " in namespace " + t.Namespace) err := retry.RetryOnConflict(retry.DefaultRetry, func() error { // First, resize the RC to 0. old, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Get(rcName, metav1.GetOptions{}) @@ -1182,7 +1183,7 @@ func (t *ServiceTestFixture) Cleanup() []error { } for serviceName := range t.services { - By("deleting service " + serviceName + " in namespace " + t.Namespace) + ginkgo.By("deleting service " + serviceName + " in namespace " + t.Namespace) err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil) if err != nil { if !errors.IsNotFound(err) { @@ -1281,7 +1282,7 @@ func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUI } func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) { - By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints)) + ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints)) i := 1 for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) { endpoints, err := c.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{}) @@ -1323,7 +1324,7 @@ func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName strin func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) { podNames := make([]string, replicas) name := svc.ObjectMeta.Name - By("creating service " + name + " in namespace " + ns) + ginkgo.By("creating service " + name + " in namespace " + ns) _, err := c.CoreV1().Services(ns).Create(svc) if err != nil { return podNames, "", err @@ -1420,7 +1421,7 @@ func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expect } expectedEndpoints := sets.NewString(expectedPods...) - By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods))) + ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods))) for _, cmdFunc := range commands { passed := false gotEndpoints := sets.NewString() @@ -1567,9 +1568,9 @@ func checkAffinityFailed(tracker affinityTracker, err string) { // number of same response observed in a row. If affinity is not expected, the // test will keep observe until different responses observed. The function will // return false only in case of unexpected errors. -func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, targetPort int, shouldHold bool) bool { - targetIpPort := net.JoinHostPort(targetIp, strconv.Itoa(targetPort)) - cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIpPort) +func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIP string, targetPort int, shouldHold bool) bool { + targetIPPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort)) + cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIPPort) timeout := ServiceTestTimeout if execPod == nil { timeout = LoadBalancerPollTimeout @@ -1577,14 +1578,14 @@ func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, target var tracker affinityTracker if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) { if execPod != nil { - if stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil { - Logf("Failed to get response from %s. Retry until timeout", targetIpPort) + stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd) + if err != nil { + Logf("Failed to get response from %s. Retry until timeout", targetIPPort) return false, nil - } else { - tracker.recordHost(stdout) } + tracker.recordHost(stdout) } else { - rawResponse := jig.GetHTTPContent(targetIp, targetPort, timeout, "") + rawResponse := jig.GetHTTPContent(targetIP, targetPort, timeout, "") tracker.recordHost(rawResponse.String()) } trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount) @@ -1600,17 +1601,16 @@ func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, target if pollErr != wait.ErrWaitTimeout { checkAffinityFailed(tracker, pollErr.Error()) return false - } else { - if !trackerFulfilled { - checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIpPort)) - } - if shouldHold { - checkAffinityFailed(tracker, "Affinity should hold but didn't.") - } else { - checkAffinityFailed(tracker, "Affinity shouldn't hold but did.") - } - return true } + if !trackerFulfilled { + checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIPPort)) + } + if shouldHold { + checkAffinityFailed(tracker, "Affinity should hold but didn't.") + } else { + checkAffinityFailed(tracker, "Affinity shouldn't hold but did.") + } + return true } return true }