Fix golint failures of util/bandwith/*.go

k3s-v1.15.3
viegasdom 2019-04-11 14:17:08 +01:00
parent c2254cf19a
commit 1c468bf2e2
30 changed files with 482 additions and 118 deletions

View File

@ -155,6 +155,7 @@ filename | sha512 hash
* [metatada-proxy addon] Bump prometheus-to-sd v0.5.0 to pick up security fixes.
* kube-proxy no longer automatically cleans up network rules created by running kube-proxy in other modes. If you are switching the mode that kube-proxy is in running in (EG: iptables to IPVS), you will need to run `kube-proxy --cleanup`, or restart the worker node (recommended) before restarting kube-proxy. ([#76109](https://github.com/kubernetes/kubernetes/pull/76109), [@vllry](https://github.com/vllry))
* If you are not switching kube-proxy between different modes, this change should not require any action.
* This fixes a bug where restarting the iptables proxier can cause connections to fail (https://github.com/kubernetes/kubernetes/issues/75360).
* kubeadm: fixes error when upgrading from v1.13 to v1.14 clusters created with kubeadm v1.12. Please note that it is required to upgrade etcd during the final v1.13 to v1.14 upgrade. ([#75956](https://github.com/kubernetes/kubernetes/pull/75956), [@fabriziopandini](https://github.com/fabriziopandini))
* Fixes a regression proxying responses from aggregated API servers which could cause watch requests to hang until the first event was received ([#75887](https://github.com/kubernetes/kubernetes/pull/75887), [@liggitt](https://github.com/liggitt))
* Increased verbose level for local openapi aggregation logs to avoid flooding the log during normal operation ([#75781](https://github.com/kubernetes/kubernetes/pull/75781), [@roycaihw](https://github.com/roycaihw))

View File

@ -26,13 +26,13 @@ _CNI_TARBALL_ARCH_SHA256 = {
"s390x": "415cdcf02c65c22f5b7e55b0ab61208a10f2b95a0c8310176c771d07a9f448cf",
}
CRI_TOOLS_VERSION = "1.12.0"
CRI_TOOLS_VERSION = "1.14.0"
_CRI_TARBALL_ARCH_SHA256 = {
"amd64": "e7d913bcce40bf54e37ab1d4b75013c823d0551e6bc088b217bc1893207b4844",
"arm": "ca6b4ac80278d32d9cc8b8b19de140fd1cc35640f088969f7068fea2df625490",
"arm64": "8466f08b59bf36d2eebcb9428c3d4e6e224c3065d800ead09ad730ce374da6fe",
"ppc64le": "ec6254f1f6ffa064ba41825aab5612b7b005c8171fbcdac2ca3927d4e393000f",
"s390x": "814aa9cd496be416612c2653097a1c9eb5784e38aa4889034b44ebf888709057",
"amd64": "483c90a9fe679590df4332ba807991c49232e8cd326c307c575ecef7fe22327b",
"arm": "9910cecfd6558239ba015323066c7233d8371af359b9ddd0b2a35d5223bcf945",
"arm64": "f76b3d00a272c8d210e9a45f77d07d3770bee310d99c4fd9a72d6f55278882e5",
"ppc64le": "1e2cd11a1e025ed9755521cf13bb1bda986afa0052597a9bb44d31e62583413b",
"s390x": "8b7b5749cba88ef337997ae90aa04380e3cab2c040b44b505b2fcd691c4935e4",
}
ETCD_VERSION = "3.3.10"

View File

@ -295,6 +295,9 @@ NODE_PROBLEM_DETECTOR_TAR_HASH="${NODE_PROBLEM_DETECTOR_TAR_HASH:-}"
NODE_PROBLEM_DETECTOR_RELEASE_PATH="${NODE_PROBLEM_DETECTOR_RELEASE_PATH:-}"
NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS="${NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS:-}"
CNI_VERSION="${CNI_VERSION:-}"
CNI_SHA1="${CNI_SHA1:-}"
# Optional: Create autoscaler for cluster's nodes.
ENABLE_CLUSTER_AUTOSCALER="${KUBE_ENABLE_CLUSTER_AUTOSCALER:-false}"
if [[ "${ENABLE_CLUSTER_AUTOSCALER}" == "true" ]]; then

View File

@ -307,6 +307,9 @@ NODE_PROBLEM_DETECTOR_TAR_HASH="${NODE_PROBLEM_DETECTOR_TAR_HASH:-}"
NODE_PROBLEM_DETECTOR_RELEASE_PATH="${NODE_PROBLEM_DETECTOR_RELEASE_PATH:-}"
NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS="${NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS:-}"
CNI_VERSION="${CNI_VERSION:-}"
CNI_SHA1="${CNI_SHA1:-}"
# Optional: Create autoscaler for cluster's nodes.
ENABLE_CLUSTER_AUTOSCALER="${KUBE_ENABLE_CLUSTER_AUTOSCALER:-false}"
if [[ "${ENABLE_CLUSTER_AUTOSCALER}" == "true" ]]; then

View File

@ -1150,19 +1150,19 @@ function create-master-etcd-apiserver-auth {
echo "${ETCD_APISERVER_CA_KEY}" | base64 --decode > "${ETCD_APISERVER_CA_KEY_PATH}"
ETCD_APISERVER_CA_CERT_PATH="${auth_dir}/etcd-apiserver-ca.crt"
echo "${ETCD_APISERVER_CA_CERT}" | base64 --decode | gunzip > "${auth_dir}/etcd-apiserver-ca.crt"
echo "${ETCD_APISERVER_CA_CERT}" | base64 --decode | gunzip > "${ETCD_APISERVER_CA_CERT_PATH}"
ETCD_APISERVER_SERVER_KEY_PATH="${auth_dir}/etcd-apiserver-server.key"
echo "${ETCD_APISERVER_SERVER_KEY}" | base64 --decode > "${ETCD_APISERVER_SERVER_KEY_PATH}"
ETCD_APISERVER_SERVER_CERT_PATH="${auth_dir}/etcd-apiserver-server.crt"
echo "${ETCD_APISERVER_SERVER_CERT}" | base64 --decode | gunzip > "${auth_dir}/etcd-apiserver-server.crt"
echo "${ETCD_APISERVER_SERVER_CERT}" | base64 --decode | gunzip > "${ETCD_APISERVER_SERVER_CERT_PATH}"
ETCD_APISERVER_CLIENT_KEY_PATH="${auth_dir}/etcd-apiserver-client.key"
echo "${ETCD_APISERVER_CLIENT_KEY}" | base64 --decode > "${auth_dir}/etcd-apiserver-client.key"
echo "${ETCD_APISERVER_CLIENT_KEY}" | base64 --decode > "${ETCD_APISERVER_CLIENT_KEY_PATH}"
ETCD_APISERVER_CLIENT_CERT_PATH="${auth_dir}/etcd-apiserver-client.crt"
echo "${ETCD_APISERVER_CLIENT_CERT}" | base64 --decode | gunzip > "${auth_dir}/etcd-apiserver-client.crt"
echo "${ETCD_APISERVER_CLIENT_CERT}" | base64 --decode | gunzip > "${ETCD_APISERVER_CLIENT_CERT_PATH}"
fi
}

View File

@ -28,8 +28,8 @@ DEFAULT_CNI_VERSION="v0.7.5"
DEFAULT_CNI_SHA1="52e9d2de8a5f927307d9397308735658ee44ab8d"
DEFAULT_NPD_VERSION="v0.6.3"
DEFAULT_NPD_SHA1="3a6ac56be6c121f1b94450bfd1a81ad28d532369"
DEFAULT_CRICTL_VERSION="v1.12.0"
DEFAULT_CRICTL_SHA1="82ef8b44849f9da0589c87e9865d4716573eec7f"
DEFAULT_CRICTL_VERSION="v1.14.0"
DEFAULT_CRICTL_SHA1="1f93c6183d0a4e186708efe7899da7a7bce9c736"
DEFAULT_MOUNTER_TAR_SHA="8003b798cf33c7f91320cd6ee5cec4fa22244571"
###
@ -235,8 +235,13 @@ function install-node-problem-detector {
}
function install-cni-binaries {
if [[ -n "${CNI_VERSION:-}" ]]; then
local -r cni_tar="cni-plugins-amd64-${CNI_VERSION}.tgz"
local -r cni_sha1="${CNI_SHA1}"
else
local -r cni_tar="cni-plugins-amd64-${DEFAULT_CNI_VERSION}.tgz"
local -r cni_sha1="${DEFAULT_CNI_SHA1}"
fi
if is-preloaded "${cni_tar}" "${cni_sha1}"; then
echo "${cni_tar} is preloaded."
return

View File

@ -1112,6 +1112,8 @@ NODE_PROBLEM_DETECTOR_VERSION: $(yaml-quote ${NODE_PROBLEM_DETECTOR_VERSION:-})
NODE_PROBLEM_DETECTOR_TAR_HASH: $(yaml-quote ${NODE_PROBLEM_DETECTOR_TAR_HASH:-})
NODE_PROBLEM_DETECTOR_RELEASE_PATH: $(yaml-quote ${NODE_PROBLEM_DETECTOR_RELEASE_PATH:-})
NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS: $(yaml-quote ${NODE_PROBLEM_DETECTOR_CUSTOM_FLAGS:-})
CNI_VERSION: $(yaml-quote ${CNI_VERSION:-})
CNI_SHA1: $(yaml-quote ${CNI_SHA1:-})
ENABLE_NODE_LOGGING: $(yaml-quote ${ENABLE_NODE_LOGGING:-false})
LOGGING_DESTINATION: $(yaml-quote ${LOGGING_DESTINATION:-})
ELASTICSEARCH_LOGGING_REPLICAS: $(yaml-quote ${ELASTICSEARCH_LOGGING_REPLICAS:-})

View File

@ -364,7 +364,7 @@ func TestWriteKeyFilesIfNotExist(t *testing.T) {
}
//TODO: check if there is a better method to compare keys
if resultingKey.D == key.D {
if resultingKey.D == test.expectedKey.D {
t.Error("created key does not match expected key")
}
}

View File

@ -319,7 +319,6 @@ pkg/security/podsecuritypolicy/util
pkg/securitycontext
pkg/serviceaccount
pkg/ssh
pkg/util/bandwidth
pkg/util/config
pkg/util/ebtables
pkg/util/env

View File

@ -51,7 +51,6 @@
./hack/update-gofmt.sh
./hack/update-openapi-spec.sh
./hack/update-translations.sh
./hack/update-vendor-licenses.sh
./hack/update-vendor.sh
./hack/verify-api-groups.sh
./hack/verify-boilerplate.sh

View File

@ -40,7 +40,6 @@ informergen=$(kube::util::find-binary "informer-gen")
GROUP_VERSIONS=(${KUBE_AVAILABLE_GROUP_VERSIONS})
GV_DIRS=()
INTERNAL_DIRS=()
for gv in "${GROUP_VERSIONS[@]}"; do
# add items, but strip off any leading apis/ you find to match command expectations
api_dir=$(kube::util::group-version-to-pkg-path "${gv}")
@ -55,19 +54,9 @@ for gv in "${GROUP_VERSIONS[@]}"; do
fi
GV_DIRS+=("${pkg_dir}")
# collect internal groups
int_group="${pkg_dir%/*}/"
if [[ "${pkg_dir}" = core/* ]]; then
int_group="api/"
fi
if ! [[ " ${INTERNAL_DIRS[@]:-} " =~ " ${int_group} " ]]; then
INTERNAL_DIRS+=("${int_group}")
fi
done
# delimit by commas for the command
GV_DIRS_CSV=$(IFS=',';echo "${GV_DIRS[*]// /,}";IFS=$)
INTERNAL_DIRS_CSV=$(IFS=',';echo "${INTERNAL_DIRS[*]// /,}";IFS=$)
# This can be called with one flag, --verify-only, so it works for both the
# update- and verify- scripts.

View File

@ -93,7 +93,8 @@ process_content () {
esac
# Find files - only root and package level
local_files=($(
local_files=()
IFS=" " read -r -a local_files <<< "$(
for dir_root in ${package} ${package_root}; do
[[ -d ${DEPS_DIR}/${dir_root} ]] || continue
@ -101,7 +102,7 @@ process_content () {
find "${DEPS_DIR}/${dir_root}" \
-xdev -follow -maxdepth ${find_maxdepth} \
-type f "${find_names[@]}"
done | sort -u))
done | sort -u)"
local index
local f
@ -126,13 +127,13 @@ process_content () {
#############################################################################
# MAIN
#############################################################################
KUBE_ROOT=$(dirname "${BASH_SOURCE}")/..
KUBE_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
source "${KUBE_ROOT}/hack/lib/init.sh"
export GO111MODULE=on
# Check bash version
if ((${BASH_VERSINFO[0]}<4)); then
if (( BASH_VERSINFO[0] < 4 )); then
echo
echo "ERROR: Bash v4+ required."
# Extra help for OSX
@ -161,7 +162,7 @@ echo "= Kubernetes licensed under: ="
echo
cat "${LICENSE_ROOT}/LICENSE"
echo
echo "= LICENSE $(cat "${LICENSE_ROOT}/LICENSE" | md5sum | awk '{print $1}')"
echo "= LICENSE $(md5sum < "${LICENSE_ROOT}/LICENSE" | awk '{print $1}')"
echo "================================================================================"
) > ${TMP_LICENSE_FILE}
@ -210,7 +211,7 @@ __EOF__
cat "${file}"
echo
echo "= ${file} $(cat "${file}" | md5sum | awk '{print $1}')"
echo "= ${file} $(md5sum < "${file}" | awk '{print $1}')"
echo "================================================================================"
echo
done >> ${TMP_LICENSE_FILE}

View File

@ -15,9 +15,7 @@
"SelectorRegexp": "k8s[.]io/kubernetes",
"AllowedPrefixes": [
"k8s.io/kubernetes/pkg/cloudprovider/providers",
"k8s.io/kubernetes/pkg/credentialprovider",
"k8s.io/kubernetes/pkg/util/mount",
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/util/mount"
],
"ForbiddenPrefixes": []
}

View File

@ -1160,8 +1160,9 @@ func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedule
nomNodeName := pod.Status.NominatedNodeName
if len(nomNodeName) > 0 {
if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
podPriority := util.GetPodPriority(pod)
for _, p := range nodeInfo.Pods() {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) {
if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority {
// There is a terminating pod on the nominated node.
return false
}

View File

@ -245,8 +245,8 @@ func RegisterCustomFitPredicate(policy schedulerapi.PredicatePolicy) string {
// IsFitPredicateRegistered is useful for testing providers.
func IsFitPredicateRegistered(name string) bool {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
_, ok := fitPredicateMap[name]
return ok
}
@ -408,8 +408,8 @@ func GetAlgorithmProvider(name string) (*AlgorithmProviderConfig, error) {
}
func getFitPredicateFunctions(names sets.String, args PluginFactoryArgs) (map[string]predicates.FitPredicate, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
fitPredicates := map[string]predicates.FitPredicate{}
for _, name := range names.List() {
@ -451,8 +451,8 @@ func getPredicateMetadataProducer(args PluginFactoryArgs) (predicates.PredicateM
}
func getPriorityFunctionConfigs(names sets.String, args PluginFactoryArgs) ([]priorities.PriorityConfig, error) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
var configs []priorities.PriorityConfig
for _, name := range names.List() {
@ -538,8 +538,8 @@ func validatePriorityOrDie(priority schedulerapi.PriorityPolicy) {
// ListRegisteredFitPredicates returns the registered fit predicates.
func ListRegisteredFitPredicates() []string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
var names []string
for name := range fitPredicateMap {
@ -550,8 +550,8 @@ func ListRegisteredFitPredicates() []string {
// ListRegisteredPriorityFunctions returns the registered priority functions.
func ListRegisteredPriorityFunctions() []string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
var names []string
for name := range priorityFunctionMap {

View File

@ -127,6 +127,7 @@ func (nt *NodeTree) removeZone(zone string) {
for i, z := range nt.zones {
if z == zone {
nt.zones = append(nt.zones[:i], nt.zones[i+1:]...)
return
}
}
}

View File

@ -22,28 +22,35 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
)
// FakeShaper provides an implementation of the bandwith.Shaper.
// Beware this is implementation has no features besides Reset and GetCIDRs.
type FakeShaper struct {
CIDRs []string
ResetCIDRs []string
}
// Limit is not implemented
func (f *FakeShaper) Limit(cidr string, egress, ingress *resource.Quantity) error {
return errors.New("unimplemented")
}
// Reset appends a particular CIDR to the set of ResetCIDRs being managed by this shaper
func (f *FakeShaper) Reset(cidr string) error {
f.ResetCIDRs = append(f.ResetCIDRs, cidr)
return nil
}
// ReconcileInterface is not implemented
func (f *FakeShaper) ReconcileInterface() error {
return errors.New("unimplemented")
}
// ReconcileCIDR is not implemented
func (f *FakeShaper) ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error {
return errors.New("unimplemented")
}
// GetCIDRs returns the set of CIDRs that are being managed by this shaper
func (f *FakeShaper) GetCIDRs() ([]string, error) {
return f.CIDRs, nil
}

View File

@ -18,7 +18,9 @@ package bandwidth
import "k8s.io/apimachinery/pkg/api/resource"
type BandwidthShaper interface {
// Shaper is designed so that the shaper structs created
// satisfy the Shaper interface.
type Shaper interface {
// Limit the bandwidth for a particular CIDR on a particular interface
// * ingress and egress are in bits/second
// * cidr is expected to be a valid network CIDR (e.g. '1.2.3.4/32' or '10.20.0.1/16')

View File

@ -44,6 +44,7 @@ type tcShaper struct {
iface string
}
// NewTCShaper makes a new tcShaper for the given interface
func NewTCShaper(iface string) BandwidthShaper {
shaper := &tcShaper{
e: exec.New(),
@ -157,12 +158,11 @@ func (t *tcShaper) findCIDRClass(cidr string) (classAndHandleList [][]string, fo
// filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1
if len(parts) != 19 {
return classAndHandleList, false, fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts)
} else {
}
resultTmp := []string{parts[18], parts[9]}
classAndHandleList = append(classAndHandleList, resultTmp)
}
}
}
if len(classAndHandleList) > 0 {
return classAndHandleList, true, nil
}

View File

@ -27,7 +27,8 @@ import (
type unsupportedShaper struct {
}
func NewTCShaper(iface string) BandwidthShaper {
// NewTCShaper makes a new unsupportedShapper for the given interface
func NewTCShaper(iface string) Shaper {
return &unsupportedShaper{}
}

View File

@ -35,6 +35,7 @@ func validateBandwidthIsReasonable(rsrc *resource.Quantity) error {
return nil
}
// ExtractPodBandwidthResources extracts the ingress and egress from the given pod annotations
func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) {
if podAnnotations == nil {
return nil, nil, nil

View File

@ -234,6 +234,8 @@ type Cacher struct {
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config Config) *Cacher {
stopCh := make(chan struct{})
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
@ -245,7 +247,6 @@ func NewCacherFromConfig(config Config) *Cacher {
panic("storage codec doesn't seem to match given type: " + err.Error())
}
stopCh := make(chan struct{})
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
@ -774,12 +775,9 @@ func (c *Cacher) isStopped() bool {
// Stop implements the graceful termination.
func (c *Cacher) Stop() {
// avoid stopping twice (note: cachers are shared with subresources)
if c.isStopped() {
return
}
c.stopLock.Lock()
if c.stopped {
// avoid stopping twice (note: cachers are shared with subresources)
c.stopLock.Unlock()
return
}

View File

@ -7,7 +7,7 @@ library install, don't mind getting HEAD (which may be less stable than a
particular release), then simply:
```sh
$ go get k8s.io/client-go@master
go get k8s.io/client-go@master
```
This will record a dependency on `k8s.io/client-go` in your go module.
@ -24,12 +24,12 @@ If you are using a version of go prior to 1.11, or do not wish to use
go modules, you can download `k8s.io/client-go` to your `$GOPATH` instead:
```sh
$ go get -u k8s.io/client-go/...
$ go get -u k8s.io/apimachinery/...
$ cd $GOPATH/src/k8s.io/client-go
$ git checkout v11.0.0
$ cd $GOPATH/src/k8s.io/apimachinery
$ git checkout kubernetes-1.14.0
go get -u k8s.io/client-go/...
go get -u k8s.io/apimachinery/...
cd $GOPATH/src/k8s.io/client-go
git checkout v11.0.0
cd $GOPATH/src/k8s.io/apimachinery
git checkout kubernetes-1.14.0
```
This downloads a version of `k8s.io/client-go` prior to v1.12.0,

View File

@ -37,7 +37,7 @@ kubectl create clusterrolebinding default-view --clusterrole=view --serviceaccou
Then, run the image in a Pod with a single instance Deployment:
$ kubectl run --rm -i demo --image=in-cluster --image-pull-policy=Never
kubectl run --rm -i demo --image=in-cluster --image-pull-policy=Never
There are 4 pods in the cluster
There are 4 pods in the cluster

View File

@ -8,13 +8,13 @@ Run the following three commands in separate terminals. Each terminal needs a un
```bash
# first terminal
$ go run *.go -kubeconfig=/my/config -logtostderr=true -id=1
go run *.go -kubeconfig=/my/config -logtostderr=true -id=1
# second terminal
$ go run *.go -kubeconfig=/my/config -logtostderr=true -id=2
go run *.go -kubeconfig=/my/config -logtostderr=true -id=2
# third terminal
$ go run *.go -kubeconfig=/my/config -logtostderr=true -id=3
go run *.go -kubeconfig=/my/config -logtostderr=true -id=3
```
> You can ignore the `-kubeconfig` flag if you are running these commands in the Kubernetes cluster.

View File

@ -22,7 +22,7 @@ Run this application with:
Running this application will use the kubeconfig file and then authenticate to the
cluster, and print the number of pods in the cluster every 10 seconds:
$ ./app
./app
There are 3 pods in the cluster
There are 3 pods in the cluster
There are 3 pods in the cluster

View File

@ -17,6 +17,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
],
)

View File

@ -25,9 +25,11 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)
const defaultPageSize = 500
const defaultPageBufferSize = 10
// ListPageFunc returns a list object for the given list options.
type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
@ -48,6 +50,9 @@ type ListPager struct {
PageFn ListPageFunc
FullListIfExpired bool
// Number of pages to buffer
PageBufferSize int32
}
// New creates a new pager from the provided pager function using the default
@ -58,6 +63,7 @@ func New(fn ListPageFunc) *ListPager {
PageSize: defaultPageSize,
PageFn: fn,
FullListIfExpired: true,
PageBufferSize: defaultPageBufferSize,
}
}
@ -73,6 +79,12 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
}
var list *metainternalversion.List
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
obj, err := p.PageFn(ctx, options)
if err != nil {
if !errors.IsResourceExpired(err) || !p.FullListIfExpired {
@ -115,3 +127,105 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
options.Continue = m.GetContinue()
}
}
// EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If
// fn returns an error, processing stops and that error is returned. If fn does not return an error,
// any error encountered while retrieving the list from the server is returned. If the context
// cancels or times out, the context error is returned. Since the list is retrieved in paginated
// chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list
// requests exceed the expiration limit of the apiserver being called.
//
// Items are retrieved in chunks from the server to reduce the impact on the server with up to
// ListPager.PageBufferSize chunks buffered concurrently in the background.
func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
return meta.EachListItem(obj, fn)
})
}
// eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
// each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
// not return an error, any error encountered while retrieving the list from the server is
// returned. If the context cancels or times out, the context error is returned. Since the list is
// retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if
// the pagination list requests exceed the expiration limit of the apiserver being called.
//
// Up to ListPager.PageBufferSize chunks are buffered concurrently in the background.
func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
if p.PageBufferSize < 0 {
return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize)
}
// Ensure background goroutine is stopped if this call exits before all list items are
// processed. Cancelation error from this deferred cancel call is never returned to caller;
// either the list result has already been sent to bgResultC or the fn error is returned and
// the cancelation error is discarded.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
chunkC := make(chan runtime.Object, p.PageBufferSize)
bgResultC := make(chan error, 1)
go func() {
defer utilruntime.HandleCrash()
var err error
defer func() {
close(chunkC)
bgResultC <- err
}()
err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error {
select {
case chunkC <- chunk: // buffer the chunk, this can block
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}()
for o := range chunkC {
err := fn(o)
if err != nil {
return err // any fn error should be returned immediately
}
}
// promote the results of our background goroutine to the foreground
return <-bgResultC
}
// eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list
// chunk. If fn returns an error, processing stops and that error is returned. If fn does not return
// an error, any error encountered while retrieving the list from the server is returned. If the
// context cancels or times out, the context error is returned. Since the list is retrieved in
// paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the
// pagination list requests exceed the expiration limit of the apiserver being called.
func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
if options.Limit == 0 {
options.Limit = p.PageSize
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
obj, err := p.PageFn(ctx, options)
if err != nil {
return err
}
m, err := meta.ListAccessor(obj)
if err != nil {
return fmt.Errorf("returned object must be a list: %v", err)
}
if err := fn(obj); err != nil {
return err
}
// if we have no more items, return.
if len(m.GetContinue()) == 0 {
return nil
}
// set the next loop up
options.Continue = m.GetContinue()
}
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
@ -115,7 +116,6 @@ func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options
}
return p.PagedList(ctx, options)
}
func TestListPager_List(t *testing.T) {
type fields struct {
PageSize int64
@ -189,7 +189,11 @@ func TestListPager_List(t *testing.T) {
PageFn: tt.fields.PageFn,
FullListIfExpired: tt.fields.FullListIfExpired,
}
got, err := p.List(tt.args.ctx, tt.args.options)
ctx := tt.args.ctx
if ctx == nil {
ctx = context.Background()
}
got, err := p.List(ctx, tt.args.options)
if (err != nil) != tt.wantErr {
t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr)
return
@ -204,3 +208,237 @@ func TestListPager_List(t *testing.T) {
})
}
}
func TestListPager_EachListItem(t *testing.T) {
type fields struct {
PageSize int64
PageFn ListPageFunc
}
tests := []struct {
name string
fields fields
want runtime.Object
wantErr bool
wantPanic bool
isExpired bool
processorErrorOnItem int
processorPanicOnItem int
cancelContextOnItem int
}{
{
name: "empty page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList},
want: list(0, "rv:20"),
},
{
name: "one page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList},
want: list(9, "rv:20"),
},
{
name: "one full page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList},
want: list(10, "rv:20"),
},
{
name: "two pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
want: list(11, "rv:20"),
},
{
name: "three pages",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList},
want: list(21, "rv:20"),
},
{
name: "expires on second page",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage},
want: list(10, "rv:20"), // all items on the first page should have been visited
wantErr: true,
isExpired: true,
},
{
name: "error processing item",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited
wantPanic: true,
processorPanicOnItem: 3,
},
{
name: "cancel context while processing",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited
wantErr: true,
cancelContextOnItem: 3,
},
{
name: "panic processing item",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 51, rv: "rv:20"}).PagedList},
want: list(3, "rv:20"), // all the items <= the one the processor returned an error on should have been visited
wantPanic: true,
},
}
processorErr := fmt.Errorf("processor error")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
p := &ListPager{
PageSize: tt.fields.PageSize,
PageFn: tt.fields.PageFn,
}
var items []runtime.Object
fn := func(obj runtime.Object) error {
items = append(items, obj)
if tt.processorErrorOnItem > 0 && len(items) == tt.processorErrorOnItem {
return processorErr
}
if tt.processorPanicOnItem > 0 && len(items) == tt.processorPanicOnItem {
panic(processorErr)
}
if tt.cancelContextOnItem > 0 && len(items) == tt.cancelContextOnItem {
cancel()
}
return nil
}
var err error
var panic interface{}
func() {
defer func() {
panic = recover()
}()
err = p.EachListItem(ctx, metav1.ListOptions{}, fn)
}()
if (panic != nil) && !tt.wantPanic {
t.Fatalf(".EachListItem() panic = %v, wantPanic %v", panic, tt.wantPanic)
} else {
return
}
if (err != nil) != tt.wantErr {
t.Errorf("ListPager.EachListItem() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.isExpired != errors.IsResourceExpired(err) {
t.Errorf("ListPager.EachListItem() error = %v, isExpired %v", err, tt.isExpired)
return
}
if tt.processorErrorOnItem > 0 && err != processorErr {
t.Errorf("ListPager.EachListItem() error = %v, processorErrorOnItem %d", err, tt.processorErrorOnItem)
return
}
l := tt.want.(*metainternalversion.List)
if !reflect.DeepEqual(items, l.Items) {
t.Errorf("ListPager.EachListItem() = %v, want %v", items, l.Items)
}
})
}
}
func TestListPager_eachListPageBuffered(t *testing.T) {
tests := []struct {
name string
totalPages int
pagesProcessed int
wantPageLists int
pageBufferSize int32
pageSize int
}{
{
name: "no buffer, one total page",
totalPages: 1,
pagesProcessed: 1,
wantPageLists: 1,
pageBufferSize: 0,
}, {
name: "no buffer, 1/5 pages processed",
totalPages: 5,
pagesProcessed: 1,
wantPageLists: 2, // 1 received for processing, 1 listed
pageBufferSize: 0,
},
{
name: "no buffer, 2/5 pages processed",
totalPages: 5,
pagesProcessed: 2,
wantPageLists: 3,
pageBufferSize: 0,
},
{
name: "no buffer, 5/5 pages processed",
totalPages: 5,
pagesProcessed: 5,
wantPageLists: 5,
pageBufferSize: 0,
},
{
name: "size 1 buffer, 1/5 pages processed",
totalPages: 5,
pagesProcessed: 1,
wantPageLists: 3,
pageBufferSize: 1,
},
{
name: "size 1 buffer, 5/5 pages processed",
totalPages: 5,
pagesProcessed: 5,
wantPageLists: 5,
pageBufferSize: 1,
},
{
name: "size 10 buffer, 1/5 page processed",
totalPages: 5,
pagesProcessed: 1,
wantPageLists: 5,
pageBufferSize: 10, // buffer is larger than list
},
}
processorErr := fmt.Errorf("processor error")
pageSize := 10
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pgr := &testPager{t: t, expectPage: int64(pageSize), remaining: tt.totalPages * pageSize, rv: "rv:20"}
pageLists := 0
wantedPageListsDone := make(chan struct{})
listFn := func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
pageLists++
if pageLists == tt.wantPageLists {
close(wantedPageListsDone)
}
return pgr.PagedList(ctx, options)
}
p := &ListPager{
PageSize: int64(pageSize),
PageBufferSize: tt.pageBufferSize,
PageFn: listFn,
}
pagesProcessed := 0
fn := func(obj runtime.Object) error {
pagesProcessed++
if tt.pagesProcessed == pagesProcessed && tt.wantPageLists > 0 {
// wait for buffering to catch up
select {
case <-time.After(time.Second):
return fmt.Errorf("Timed out waiting for %d page lists", tt.wantPageLists)
case <-wantedPageListsDone:
}
return processorErr
}
return nil
}
err := p.eachListChunkBuffered(context.Background(), metav1.ListOptions{}, fn)
if tt.pagesProcessed > 0 && err == processorErr {
// expected
} else if err != nil {
t.Fatal(err)
}
if tt.wantPageLists > 0 && pageLists != tt.wantPageLists {
t.Errorf("expected %d page lists, got %d", tt.wantPageLists, pageLists)
}
if pagesProcessed != tt.pagesProcessed {
t.Errorf("expected %d pages processed, got %d", tt.pagesProcessed, pagesProcessed)
}
})
}
}

View File

@ -43,7 +43,7 @@ import (
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo"
)
const (
@ -65,12 +65,13 @@ const (
// on AWS. A few minutes is typical, so use 10m.
LoadBalancerLagTimeoutAWS = 10 * time.Minute
// How long to wait for a load balancer to be created/modified.
//TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
// LoadBalancerCreateTimeoutDefault is the default time to wait for a load balancer to be created/modified.
// TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
LoadBalancerCreateTimeoutDefault = 20 * time.Minute
// LoadBalancerCreateTimeoutLarge is the maximum time to wait for a load balancer to be created/modified.
LoadBalancerCreateTimeoutLarge = 2 * time.Hour
// Time required by the loadbalancer to cleanup, proportional to numApps/Ing.
// LoadBalancerCleanupTimeout is the time required by the loadbalancer to cleanup, proportional to numApps/Ing.
// Bring the cleanup timeout back down to 5m once b/33588344 is resolved.
LoadBalancerCleanupTimeout = 15 * time.Minute
@ -97,10 +98,10 @@ const (
AffinityConfirmCount = 15
)
// This should match whatever the default/configured range is
// ServiceNodePortRange should match whatever the default/configured range is
var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
// A test jig to help service testing.
// ServiceTestJig is a test jig to help service testing.
type ServiceTestJig struct {
ID string
Name string
@ -255,7 +256,7 @@ func (j *ServiceTestJig) ChangeServiceType(namespace, name string, newType v1.Se
// If createPod is true, it also creates an RC with 1 replica of
// the standard netexec container used everywhere in this test.
func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName string, createPod bool) *v1.Service {
By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local")
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=NodePort and ExternalTrafficPolicy=Local")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
@ -263,7 +264,7 @@ func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName s
})
if createPod {
By("creating a pod to be part of the service " + serviceName)
ginkgo.By("creating a pod to be part of the service " + serviceName)
j.RunOrFail(namespace, nil)
}
j.SanityCheckService(svc, v1.ServiceTypeNodePort)
@ -276,7 +277,7 @@ func (j *ServiceTestJig) CreateOnlyLocalNodePortService(namespace, serviceName s
// the standard netexec container used everywhere in this test.
func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool,
tweak func(svc *v1.Service)) *v1.Service {
By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local")
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and ExternalTrafficPolicy=Local")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
@ -288,10 +289,10 @@ func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceNa
})
if createPod {
By("creating a pod to be part of the service " + serviceName)
ginkgo.By("creating a pod to be part of the service " + serviceName)
j.RunOrFail(namespace, nil)
}
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
@ -300,7 +301,7 @@ func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceNa
// CreateLoadBalancerService creates a loadbalancer service and waits
// for it to acquire an ingress IP.
func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service {
By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer")
ginkgo.By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
@ -310,7 +311,7 @@ func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string
}
})
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
ginkgo.By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
@ -402,7 +403,7 @@ func (j *ServiceTestJig) GetEndpointNodes(svc *v1.Service) map[string][]string {
return nodeMap
}
// getNodes returns the first maxNodesForTest nodes. Useful in large clusters
// GetNodes returns the first maxNodesForTest nodes. Useful in large clusters
// where we don't eg: want to create an endpoint per node.
func (j *ServiceTestJig) GetNodes(maxNodesForTest int) (nodes *v1.NodeList) {
nodes = GetReadySchedulableNodesOrDie(j.Client)
@ -1053,13 +1054,13 @@ func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, requ
return nil
}
// Simple helper class to avoid too much boilerplate in tests
// ServiceTestFixture is a simple helper class to avoid too much boilerplate in tests
type ServiceTestFixture struct {
ServiceName string
Namespace string
Client clientset.Interface
TestId string
TestID string
Labels map[string]string
rcs map[string]bool
@ -1073,9 +1074,9 @@ func NewServerTest(client clientset.Interface, namespace string, serviceName str
t.Client = client
t.Namespace = namespace
t.ServiceName = serviceName
t.TestId = t.ServiceName + "-" + string(uuid.NewUUID())
t.TestID = t.ServiceName + "-" + string(uuid.NewUUID())
t.Labels = map[string]string{
"testid": t.TestId,
"testid": t.TestID,
}
t.rcs = make(map[string]bool)
@ -1087,7 +1088,7 @@ func NewServerTest(client clientset.Interface, namespace string, serviceName str
return t
}
// Build default config for a service (which can then be changed)
// BuildServiceSpec builds default config for a service (which can then be changed)
func (t *ServiceTestFixture) BuildServiceSpec() *v1.Service {
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
@ -1128,7 +1129,7 @@ func (t *ServiceTestFixture) CreateRC(rc *v1.ReplicationController) (*v1.Replica
return rc, err
}
// Create a service, and record it for cleanup
// CreateService creates a service, and record it for cleanup
func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, error) {
result, err := t.Client.CoreV1().Services(t.Namespace).Create(service)
if err == nil {
@ -1137,7 +1138,7 @@ func (t *ServiceTestFixture) CreateService(service *v1.Service) (*v1.Service, er
return result, err
}
// Delete a service, and remove it from the cleanup list
// DeleteService deletes a service, and remove it from the cleanup list
func (t *ServiceTestFixture) DeleteService(serviceName string) error {
err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
if err == nil {
@ -1149,7 +1150,7 @@ func (t *ServiceTestFixture) DeleteService(serviceName string) error {
func (t *ServiceTestFixture) Cleanup() []error {
var errs []error
for rcName := range t.rcs {
By("stopping RC " + rcName + " in namespace " + t.Namespace)
ginkgo.By("stopping RC " + rcName + " in namespace " + t.Namespace)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// First, resize the RC to 0.
old, err := t.Client.CoreV1().ReplicationControllers(t.Namespace).Get(rcName, metav1.GetOptions{})
@ -1182,7 +1183,7 @@ func (t *ServiceTestFixture) Cleanup() []error {
}
for serviceName := range t.services {
By("deleting service " + serviceName + " in namespace " + t.Namespace)
ginkgo.By("deleting service " + serviceName + " in namespace " + t.Namespace)
err := t.Client.CoreV1().Services(t.Namespace).Delete(serviceName, nil)
if err != nil {
if !errors.IsNotFound(err) {
@ -1281,7 +1282,7 @@ func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUI
}
func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) {
By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
i := 1
for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) {
endpoints, err := c.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
@ -1323,7 +1324,7 @@ func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName strin
func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
podNames := make([]string, replicas)
name := svc.ObjectMeta.Name
By("creating service " + name + " in namespace " + ns)
ginkgo.By("creating service " + name + " in namespace " + ns)
_, err := c.CoreV1().Services(ns).Create(svc)
if err != nil {
return podNames, "", err
@ -1420,7 +1421,7 @@ func VerifyServeHostnameServiceUp(c clientset.Interface, ns, host string, expect
}
expectedEndpoints := sets.NewString(expectedPods...)
By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
for _, cmdFunc := range commands {
passed := false
gotEndpoints := sets.NewString()
@ -1567,9 +1568,9 @@ func checkAffinityFailed(tracker affinityTracker, err string) {
// number of same response observed in a row. If affinity is not expected, the
// test will keep observe until different responses observed. The function will
// return false only in case of unexpected errors.
func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, targetPort int, shouldHold bool) bool {
targetIpPort := net.JoinHostPort(targetIp, strconv.Itoa(targetPort))
cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIpPort)
func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIP string, targetPort int, shouldHold bool) bool {
targetIPPort := net.JoinHostPort(targetIP, strconv.Itoa(targetPort))
cmd := fmt.Sprintf(`wget -qO- http://%s/ -T 2`, targetIPPort)
timeout := ServiceTestTimeout
if execPod == nil {
timeout = LoadBalancerPollTimeout
@ -1577,14 +1578,14 @@ func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, target
var tracker affinityTracker
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
if execPod != nil {
if stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd); err != nil {
Logf("Failed to get response from %s. Retry until timeout", targetIpPort)
stdout, err := RunHostCmd(execPod.Namespace, execPod.Name, cmd)
if err != nil {
Logf("Failed to get response from %s. Retry until timeout", targetIPPort)
return false, nil
} else {
tracker.recordHost(stdout)
}
tracker.recordHost(stdout)
} else {
rawResponse := jig.GetHTTPContent(targetIp, targetPort, timeout, "")
rawResponse := jig.GetHTTPContent(targetIP, targetPort, timeout, "")
tracker.recordHost(rawResponse.String())
}
trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
@ -1600,9 +1601,9 @@ func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, target
if pollErr != wait.ErrWaitTimeout {
checkAffinityFailed(tracker, pollErr.Error())
return false
} else {
}
if !trackerFulfilled {
checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIpPort))
checkAffinityFailed(tracker, fmt.Sprintf("Connection to %s timed out or not enough responses.", targetIPPort))
}
if shouldHold {
checkAffinityFailed(tracker, "Affinity should hold but didn't.")
@ -1611,6 +1612,5 @@ func CheckAffinity(jig *ServiceTestJig, execPod *v1.Pod, targetIp string, target
}
return true
}
}
return true
}