Create chaosmonkey package, use ServiceTestJig for upgrade tests

pull/6/head
Isaac Hollander McCreery 2016-04-06 15:12:31 -07:00
parent f29d597d02
commit 4e5e1b8ac5
4 changed files with 362 additions and 274 deletions

View File

@ -0,0 +1,149 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package chaosmonkey
import . "github.com/onsi/ginkgo"
// Disruption is the type to construct a chaosmonkey with; see Do for more information.
type Disruption func()
// Test is the type to register with a chaosmonkey. A test will run asynchronously across the
// chaosmonkey's Disruption. A Test takes a Semaphore as an argument. It should call sem.Ready()
// once it's ready for the disruption to start and should then wait until sem.StopCh (which is a
// <-chan struct{}) is closed, which signals that the disruption is over. It should then clean up
// and return. See Do and Semaphore for more information.
type Test func(sem *Semaphore)
// Interface can be implemented if you prefer to define tests without dealing with a Semaphore. You
// may define a struct that implements Interface's three methods (Setup, Test, and Teardown) and
// RegisterInterface. See RegisterInterface for more information.
type Interface interface {
Setup()
Test(stopCh <-chan struct{})
Teardown()
}
type chaosmonkey struct {
disruption Disruption
tests []Test
}
// New creates and returns a chaosmonkey, with which the caller should register Tests and call Do.
// See Do for more information.
func New(disruption Disruption) *chaosmonkey {
return &chaosmonkey{
disruption,
[]Test{},
}
}
// Register registers the given Test with the chaosmonkey, so that the test will run over the
// Disruption.
func (cm *chaosmonkey) Register(test Test) {
cm.tests = append(cm.tests, test)
}
// RegisterInterface registers the given Interface with the chaosmonkey, so the chaosmonkey will
// call Setup, Test, and Teardown properly. Test can tell that the Disruption is finished when
// stopCh is closed.
func (cm *chaosmonkey) RegisterInterface(in Interface) {
cm.Register(func(sem *Semaphore) {
in.Setup()
sem.Ready()
in.Test(sem.StopCh)
in.Teardown()
})
}
// Do performs the Disruption while testing the registered Tests. Once the caller has registered
// all Tests with the chaosmonkey, they call Do. Do starts each registered test asynchronously and
// waits for each test to signal that it is ready by calling sem.Ready(). Do will then do the
// Disruption, and when it's complete, close sem.StopCh to signal to the registered Tests that the
// Disruption is over, and wait for all Tests to return.
func (cm *chaosmonkey) Do() {
sems := []*Semaphore{}
// All semaphores have the same StopCh.
stopCh := make(chan struct{})
for _, test := range cm.tests {
sem := newSemaphore(stopCh)
sems = append(sems, sem)
go func() {
defer GinkgoRecover()
defer sem.done()
test(sem)
}()
}
By("Waiting for all async tests to be ready")
for _, sem := range sems {
sem.waitForReadyOrDone()
}
By("Starting disruption")
cm.disruption()
By("Disruption complete; stopping async validations")
close(stopCh)
By("Waiting for async validations to complete")
for _, sem := range sems {
sem.waitForDone()
}
}
// Semaphore is taken by a Test and provides: Ready(), for the Test to call when it's ready for the
// disruption to start; and StopCh, the closure of which signals to the Test that the disruption is
// finished.
type Semaphore struct {
readyCh chan struct{}
StopCh <-chan struct{}
doneCh chan struct{}
}
func newSemaphore(stopCh <-chan struct{}) *Semaphore {
// We don't want to block on Ready() or done()
return &Semaphore {
make(chan struct{}, 1),
stopCh,
make(chan struct{}, 1),
}
}
// Ready is called by the Test to signal that the Test is ready for the disruption to start.
func (sem *Semaphore) Ready() {
close(sem.readyCh)
}
// done is an internal method for Go to defer, both to wait for all tests to return, but also to
// sense if a test panicked before calling Ready. See waitForReadyOrDone.
func (sem *Semaphore) done() {
close(sem.doneCh)
}
// We would like to just check if all tests are ready, but if they fail (which Ginkgo implements as
// a panic), they may not have called Ready(). We check done as well to see if the function has
// already returned; if it has, we don't care if it's ready, and just continue.
func (sem *Semaphore) waitForReadyOrDone() {
select {
case <-sem.readyCh:
case <-sem.doneCh:
}
}
// waitForDone is an internal method for Go to wait on all Tests returning.
func (sem *Semaphore) waitForDone() {
<-sem.doneCh
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package chaosmonkey
import (
"sync/atomic"
"testing"
)
func TestDoWithPanic(t *testing.T) {
var counter int64 = 0
cm := New(func() {})
tests := []Test{
// No panic
func(sem *Semaphore) {
defer atomic.AddInt64(&counter, 1)
sem.Ready()
},
// Panic after sem.Ready()
func(sem *Semaphore) {
defer atomic.AddInt64(&counter, 1)
sem.Ready()
panic("Panic after calling sem.Ready()")
},
// Panic before sem.Ready()
func(sem *Semaphore) {
defer atomic.AddInt64(&counter, 1)
panic("Panic before calling sem.Ready()")
},
}
for _, test := range tests {
cm.Register(test)
}
cm.Do()
// Check that all funcs in tests were called.
if int(counter) != len(tests) {
t.Errorf("Expected counter to be %v, but it was %v", len(tests), counter)
}
}

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2015 The Kubernetes Authors All rights reserved. Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -20,30 +20,118 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"net/http"
"os" "os"
"os/exec" "os/exec"
"path" "path"
"strings" "strings"
"sync"
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/chaosmonkey"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
) )
// TODO(mikedanese): Add setup, validate, and teardown for:
// - secrets
// - volumes
// - persistent volumes
var _ = framework.KubeDescribe("Upgrade [Feature:Upgrade]", func() {
f := framework.NewDefaultFramework("cluster-upgrade")
framework.KubeDescribe("master upgrade", func() {
It("should maintain responsive services [Feature:MasterUpgrade]", func() {
cm := chaosmonkey.New(func() {
v, err := realVersion(framework.TestContext.UpgradeTarget)
framework.ExpectNoError(err)
framework.ExpectNoError(masterUpgrade(v))
framework.ExpectNoError(checkMasterVersion(f.Client, v))
})
cm.Register(func(sem *chaosmonkey.Semaphore) {
// Close over f.
testServiceRemainsUp(f, sem)
})
cm.Do()
})
})
framework.KubeDescribe("node upgrade", func() {
It("should maintain a functioning cluster [Feature:NodeUpgrade]", func() {
cm := chaosmonkey.New(func() {
v, err := realVersion(framework.TestContext.UpgradeTarget)
framework.ExpectNoError(err)
framework.ExpectNoError(nodeUpgrade(f, v))
framework.ExpectNoError(checkNodesVersions(f.Client, v))
})
cm.Register(func(sem *chaosmonkey.Semaphore) {
// Close over f.
testServiceUpBeforeAndAfter(f, sem)
})
cm.Do()
})
It("should maintain responsive services [Feature:ExperimentalNodeUpgrade]", func() {
cm := chaosmonkey.New(func() {
v, err := realVersion(framework.TestContext.UpgradeTarget)
framework.ExpectNoError(err)
framework.ExpectNoError(nodeUpgrade(f, v))
framework.ExpectNoError(checkNodesVersions(f.Client, v))
})
cm.Register(func(sem *chaosmonkey.Semaphore) {
// Close over f.
testServiceRemainsUp(f, sem)
})
cm.Do()
})
})
framework.KubeDescribe("cluster upgrade", func() {
It("should maintain a functioning cluster [Feature:ClusterUpgrade]", func() {
cm := chaosmonkey.New(func() {
v, err := realVersion(framework.TestContext.UpgradeTarget)
framework.ExpectNoError(err)
framework.ExpectNoError(masterUpgrade(v))
framework.ExpectNoError(checkMasterVersion(f.Client, v))
framework.ExpectNoError(nodeUpgrade(f, v))
framework.ExpectNoError(checkNodesVersions(f.Client, v))
})
cm.Register(func(sem *chaosmonkey.Semaphore) {
// Close over f.
testServiceUpBeforeAndAfter(f, sem)
})
cm.Do()
})
It("should maintain responsive services [Feature:ExperimentalClusterUpgrade]", func() {
cm := chaosmonkey.New(func() {
v, err := realVersion(framework.TestContext.UpgradeTarget)
framework.ExpectNoError(err)
framework.ExpectNoError(masterUpgrade(v))
framework.ExpectNoError(checkMasterVersion(f.Client, v))
framework.ExpectNoError(nodeUpgrade(f, v))
framework.ExpectNoError(checkNodesVersions(f.Client, v))
})
cm.Register(func(sem *chaosmonkey.Semaphore) {
// Close over f.
testServiceRemainsUp(f, sem)
})
cm.Do()
})
})
})
// realVersion turns a version constant s into a version string deployable on // realVersion turns a version constant s into a version string deployable on
// GKE. See hack/get-build.sh for more information. // GKE. See hack/get-build.sh for more information.
func realVersion(s string) (string, error) { func realVersion(s string) (string, error) {
framework.Logf(fmt.Sprintf("Getting real version for %q", s))
v, _, err := runCmd(path.Join(framework.TestContext.RepoRoot, "hack/get-build.sh"), "-v", s) v, _, err := runCmd(path.Join(framework.TestContext.RepoRoot, "hack/get-build.sh"), "-v", s)
if err != nil { if err != nil {
return v, err return v, err
} }
framework.Logf("Version for %q is %q", s, v)
return strings.TrimPrefix(strings.TrimSpace(v), "v"), nil return strings.TrimPrefix(strings.TrimSpace(v), "v"), nil
} }
@ -80,7 +168,7 @@ func masterUpgradeGKE(v string) error {
return err return err
} }
var nodeUpgrade = func(f *framework.Framework, replicas int32, v string) error { var nodeUpgrade = func(f *framework.Framework, v string) error {
// Perform the upgrade. // Perform the upgrade.
var err error var err error
switch framework.TestContext.Provider { switch framework.TestContext.Provider {
@ -95,7 +183,7 @@ var nodeUpgrade = func(f *framework.Framework, replicas int32, v string) error {
return err return err
} }
// Wait for it to complete and validate nodes and pods are healthy. // Wait for it to complete and validate nodes are healthy.
// //
// TODO(ihmccreery) We shouldn't have to wait for nodes to be ready in // TODO(ihmccreery) We shouldn't have to wait for nodes to be ready in
// GKE; the operation shouldn't return until they all are. // GKE; the operation shouldn't return until they all are.
@ -103,8 +191,7 @@ var nodeUpgrade = func(f *framework.Framework, replicas int32, v string) error {
if _, err := checkNodesReady(f.Client, restartNodeReadyAgainTimeout, framework.TestContext.CloudConfig.NumNodes); err != nil { if _, err := checkNodesReady(f.Client, restartNodeReadyAgainTimeout, framework.TestContext.CloudConfig.NumNodes); err != nil {
return err return err
} }
framework.Logf("Waiting up to %v for all pods to be running and ready after the upgrade", restartPodReadyAgainTimeout) return nil
return framework.WaitForPodsRunningReady(f.Namespace.Name, replicas, restartPodReadyAgainTimeout)
} }
func nodeUpgradeGCE(rawV string) error { func nodeUpgradeGCE(rawV string) error {
@ -170,191 +257,77 @@ func nodeUpgradeGKE(v string) error {
return err return err
} }
var _ = framework.KubeDescribe("Upgrade [Feature:Upgrade]", func() { func testServiceUpBeforeAndAfter(f *framework.Framework, sem *chaosmonkey.Semaphore) {
testService(f, sem, false)
}
svcName, replicas := "baz", int32(2) func testServiceRemainsUp(f *framework.Framework, sem *chaosmonkey.Semaphore) {
var rcName, ip, v string testService(f, sem, true)
var ingress api.LoadBalancerIngress }
BeforeEach(func() { // testService is a helper for testServiceUpBeforeAndAfter and testServiceRemainsUp with a flag for testDuringUpgrade
// The version is determined once at the beginning of the test so that //
// the master and nodes won't be skewed if the value changes during the // TODO(ihmccreery) remove this abstraction once testServiceUpBeforeAndAfter is no longer needed, because node upgrades
// test. // maintain a responsive service.
By(fmt.Sprintf("Getting real version for %q", framework.TestContext.UpgradeTarget)) func testService(f *framework.Framework, sem *chaosmonkey.Semaphore, testDuringUpgrade bool) {
var err error // Setup
v, err = realVersion(framework.TestContext.UpgradeTarget) serviceName := "service-test"
framework.ExpectNoError(err)
framework.Logf("Version for %q is %q", framework.TestContext.UpgradeTarget, v) jig := NewServiceTestJig(f.Client, serviceName)
// nodeIP := pickNodeIP(jig.Client) // for later
By("creating a TCP service " + serviceName + " with type=LoadBalancer in namespace " + f.Namespace.Name)
// TODO it's weird that we have to do this and then wait WaitForLoadBalancer which changes
// tcpService.
tcpService := jig.CreateTCPServiceOrFail(f.Namespace.Name, func(s *api.Service) {
s.Spec.Type = api.ServiceTypeLoadBalancer
}) })
tcpService = jig.WaitForLoadBalancerOrFail(f.Namespace.Name, tcpService.Name)
jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer)
f := framework.NewDefaultFramework("cluster-upgrade") // Get info to hit it with
var w *ServiceTestFixture tcpIngressIP := getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
BeforeEach(func() { svcPort := tcpService.Spec.Ports[0].Port
By("Setting up the service, RC, and pods")
w = NewServerTest(f.Client, f.Namespace.Name, svcName)
rc := w.CreateWebserverRC(replicas)
rcName = rc.ObjectMeta.Name
svc := w.BuildServiceSpec()
svc.Spec.Type = api.ServiceTypeLoadBalancer
w.CreateService(svc)
By("Waiting for the service to become reachable") By("creating pod to be part of service " + serviceName)
result, err := waitForLoadBalancerIngress(f.Client, svcName, f.Namespace.Name) // TODO newRCTemplate only allows for the creation of one replica... that probably won't
Expect(err).NotTo(HaveOccurred()) // work so well.
ingresses := result.Status.LoadBalancer.Ingress jig.RunOrFail(f.Namespace.Name, nil)
if len(ingresses) != 1 {
framework.Failf("Was expecting only 1 ingress IP but got %d (%v): %v", len(ingresses), ingresses, result)
}
ingress = ingresses[0]
framework.Logf("Got load balancer ingress point %v", ingress)
ip = ingress.IP
if ip == "" {
ip = ingress.Hostname
}
testLoadBalancerReachable(ingress, 80)
// TODO(mikedanese): Add setup, validate, and teardown for: // Hit it once before considering ourselves ready
// - secrets By("hitting the pod through the service's LoadBalancer")
// - volumes jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeoutDefault)
// - persistent volumes
})
AfterEach(func() { sem.Ready()
w.Cleanup()
})
framework.KubeDescribe("master upgrade", func() { if testDuringUpgrade {
It("should maintain responsive services [Feature:MasterUpgrade]", func() { // Continuous validation
By("Validating cluster before master upgrade") wait.Until(func() {
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas)) By("hitting the pod through the service's LoadBalancer")
By("Performing a master upgrade") // TODO this is way too long of a timeout; make it shorter since we've already
testUpgrade(ip, v, masterUpgrade) // validated it's working.
By("Checking master version") jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeoutDefault)
framework.ExpectNoError(checkMasterVersion(f.Client, v)) }, 3*time.Second, sem.StopCh)
By("Validating cluster after master upgrade") } else {
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas)) // Block until chaosmonkey is done
}) By("waiting for upgrade to finish without checking if service remains up")
}) <-sem.StopCh
}
framework.KubeDescribe("node upgrade", func() { // TODO(ihmccreery) We maybe shouldn't have to wait for the pods to be running again. I
It("should maintain a functioning cluster [Feature:NodeUpgrade]", func() { // pulled this over from the NodeUpgrade test, but I'm not sure what the need for it there
By("Validating cluster before node upgrade") // was.
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas)) //
By("Performing a node upgrade") // Also 1 is a magic number from newRCTemplate.
// Circumnavigate testUpgrade, since services don't necessarily stay up. framework.Logf("Waiting up to %v for all pods to be running and ready after the upgrade", restartPodReadyAgainTimeout)
framework.Logf("Starting upgrade") framework.ExpectNoError(framework.WaitForPodsRunningReady(f.Namespace.Name, 1, restartPodReadyAgainTimeout))
framework.ExpectNoError(nodeUpgrade(f, replicas, v))
framework.Logf("Upgrade complete")
By("Checking node versions")
framework.ExpectNoError(checkNodesVersions(f.Client, v))
By("Validating cluster after node upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
})
It("should maintain responsive services [Feature:ExperimentalNodeUpgrade]", func() { // Validation
By("Validating cluster before node upgrade") jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer)
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
By("Performing a node upgrade")
testUpgrade(ip, v, func(v string) error {
return nodeUpgrade(f, replicas, v)
})
By("Checking node versions")
framework.ExpectNoError(checkNodesVersions(f.Client, v))
By("Validating cluster after node upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
})
})
framework.KubeDescribe("cluster upgrade", func() {
It("should maintain responsive services [Feature:ClusterUpgrade]", func() {
By("Validating cluster before master upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
By("Performing a master upgrade")
testUpgrade(ip, v, masterUpgrade)
By("Checking master version")
framework.ExpectNoError(checkMasterVersion(f.Client, v))
By("Validating cluster after master upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
By("Validating cluster before node upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
By("Performing a node upgrade")
// Circumnavigate testUpgrade, since services don't necessarily stay up.
framework.Logf("Starting upgrade")
framework.ExpectNoError(nodeUpgrade(f, replicas, v))
framework.Logf("Upgrade complete")
By("Checking node versions")
framework.ExpectNoError(checkNodesVersions(f.Client, v))
By("Validating cluster after node upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
})
It("should maintain responsive services [Feature:ExperimentalClusterUpgrade]", func() {
By("Validating cluster before master upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
By("Performing a master upgrade")
testUpgrade(ip, v, masterUpgrade)
By("Checking master version")
framework.ExpectNoError(checkMasterVersion(f.Client, v))
By("Validating cluster after master upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
By("Validating cluster before node upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
By("Performing a node upgrade")
testUpgrade(ip, v, func(v string) error {
return nodeUpgrade(f, replicas, v)
})
By("Checking node versions")
framework.ExpectNoError(checkNodesVersions(f.Client, v))
By("Validating cluster after node upgrade")
framework.ExpectNoError(validate(f, svcName, rcName, ingress, replicas))
})
})
})
func testUpgrade(ip, v string, upF func(v string) error) {
framework.Logf("Starting async validation")
httpClient := http.Client{Timeout: 2 * time.Second}
done := make(chan struct{}, 1)
// Let's make sure we've finished the heartbeat before shutting things down.
var wg sync.WaitGroup
go wait.Until(func() {
defer GinkgoRecover()
wg.Add(1)
defer wg.Done()
if err := wait.Poll(framework.Poll, framework.SingleCallTimeout, func() (bool, error) {
r, err := httpClient.Get("http://" + ip)
if err != nil {
framework.Logf("Error reaching %s: %v", ip, err)
return false, nil
}
if r.StatusCode < http.StatusOK || r.StatusCode >= http.StatusNotFound {
framework.Logf("Bad response; status: %d, response: %v", r.StatusCode, r)
return false, nil
}
return true, nil
}); err != nil {
// We log the error here because the test will fail at the very end
// because this validation runs in another goroutine. Without this,
// a failure is very confusing to track down because from the logs
// everything looks fine.
msg := fmt.Sprintf("Failed to contact service during upgrade: %v", err)
framework.Logf(msg)
framework.Failf(msg)
}
}, 200*time.Millisecond, done)
framework.Logf("Starting upgrade")
framework.ExpectNoError(upF(v))
done <- struct{}{}
framework.Logf("Stopping async validation")
wg.Wait()
framework.Logf("Upgrade complete")
} }
func checkMasterVersion(c *client.Client, want string) error { func checkMasterVersion(c *client.Client, want string) error {
framework.Logf("Checking master version")
v, err := c.Discovery().ServerVersion() v, err := c.Discovery().ServerVersion()
if err != nil { if err != nil {
return fmt.Errorf("checkMasterVersion() couldn't get the master version: %v", err) return fmt.Errorf("checkMasterVersion() couldn't get the master version: %v", err)
@ -418,6 +391,9 @@ func runCmd(command string, args ...string) (string, string, error) {
cmd := exec.Command(command, args...) cmd := exec.Command(command, args...)
// We also output to the OS stdout/stderr to aid in debugging in case cmd // We also output to the OS stdout/stderr to aid in debugging in case cmd
// hangs and never returns before the test gets killed. // hangs and never returns before the test gets killed.
//
// This creates some ugly output because gcloud doesn't always provide
// newlines.
cmd.Stdout = io.MultiWriter(os.Stdout, &bout) cmd.Stdout = io.MultiWriter(os.Stdout, &bout)
cmd.Stderr = io.MultiWriter(os.Stderr, &berr) cmd.Stderr = io.MultiWriter(os.Stderr, &berr)
err := cmd.Run() err := cmd.Run()
@ -429,51 +405,17 @@ func runCmd(command string, args ...string) (string, string, error) {
return stdout, stderr, nil return stdout, stderr, nil
} }
func validate(f *framework.Framework, svcNameWant, rcNameWant string, ingress api.LoadBalancerIngress, podsWant int32) error {
framework.Logf("Beginning cluster validation")
// Verify RC.
rcs, err := f.Client.ReplicationControllers(f.Namespace.Name).List(api.ListOptions{})
if err != nil {
return fmt.Errorf("error listing RCs: %v", err)
}
if len(rcs.Items) != 1 {
return fmt.Errorf("wanted 1 RC with name %s, got %d", rcNameWant, len(rcs.Items))
}
if got := rcs.Items[0].Name; got != rcNameWant {
return fmt.Errorf("wanted RC name %q, got %q", rcNameWant, got)
}
// Verify pods.
if err := framework.VerifyPods(f.Client, f.Namespace.Name, rcNameWant, false, podsWant); err != nil {
return fmt.Errorf("failed to find %d %q pods: %v", podsWant, rcNameWant, err)
}
// Verify service.
svc, err := f.Client.Services(f.Namespace.Name).Get(svcNameWant)
if err != nil {
return fmt.Errorf("error getting service %s: %v", svcNameWant, err)
}
if svcNameWant != svc.Name {
return fmt.Errorf("wanted service name %q, got %q", svcNameWant, svc.Name)
}
// TODO(mikedanese): Make testLoadBalancerReachable return an error.
testLoadBalancerReachable(ingress, 80)
framework.Logf("Cluster validation succeeded")
return nil
}
// migRollingUpdate starts a MIG rolling update, upgrading the nodes to a new // migRollingUpdate starts a MIG rolling update, upgrading the nodes to a new
// instance template named tmpl, and waits up to nt times the number of nodes // instance template named tmpl, and waits up to nt times the number of nodes
// for it to complete. // for it to complete.
func migRollingUpdate(tmpl string, nt time.Duration) error { func migRollingUpdate(tmpl string, nt time.Duration) error {
By(fmt.Sprintf("starting the MIG rolling update to %s", tmpl)) framework.Logf(fmt.Sprintf("starting the MIG rolling update to %s", tmpl))
id, err := migRollingUpdateStart(tmpl, nt) id, err := migRollingUpdateStart(tmpl, nt)
if err != nil { if err != nil {
return fmt.Errorf("couldn't start the MIG rolling update: %v", err) return fmt.Errorf("couldn't start the MIG rolling update: %v", err)
} }
By(fmt.Sprintf("polling the MIG rolling update (%s) until it completes", id)) framework.Logf(fmt.Sprintf("polling the MIG rolling update (%s) until it completes", id))
if err := migRollingUpdatePoll(id, nt); err != nil { if err := migRollingUpdatePoll(id, nt); err != nil {
return fmt.Errorf("err waiting until update completed: %v", err) return fmt.Errorf("err waiting until update completed: %v", err)
} }
@ -611,61 +553,3 @@ func migRollingUpdatePoll(id string, nt time.Duration) error {
framework.Logf("MIG rolling update complete after %v", time.Since(start)) framework.Logf("MIG rolling update complete after %v", time.Since(start))
return nil return nil
} }
func testLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool {
loadBalancerLagTimeout := loadBalancerLagTimeoutDefault
if framework.ProviderIs("aws") {
loadBalancerLagTimeout = loadBalancerLagTimeoutAWS
}
return testLoadBalancerReachableInTime(ingress, port, loadBalancerLagTimeout)
}
func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool {
ip := ingress.IP
if ip == "" {
ip = ingress.Hostname
}
return testReachableInTime(conditionFuncDecorator(ip, port, testReachableHTTP, "/", "test-webserver"), timeout)
}
func conditionFuncDecorator(ip string, port int, fn func(string, int, string, string) (bool, error), request string, expect string) wait.ConditionFunc {
return func() (bool, error) {
return fn(ip, port, request, expect)
}
}
func testReachableInTime(testFunc wait.ConditionFunc, timeout time.Duration) bool {
By(fmt.Sprintf("Waiting up to %v", timeout))
err := wait.PollImmediate(framework.Poll, timeout, testFunc)
if err != nil {
Expect(err).NotTo(HaveOccurred(), "Error waiting")
return false
}
return true
}
func waitForLoadBalancerIngress(c *client.Client, serviceName, namespace string) (*api.Service, error) {
// TODO: once support ticket 21807001 is resolved, reduce this timeout
// back to something reasonable
const timeout = 20 * time.Minute
var service *api.Service
By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a LoadBalancer ingress point", timeout, serviceName, namespace))
i := 1
for start := time.Now(); time.Since(start) < timeout; time.Sleep(3 * time.Second) {
service, err := c.Services(namespace).Get(serviceName)
if err != nil {
framework.Logf("Get service failed, ignoring for 5s: %v", err)
continue
}
if len(service.Status.LoadBalancer.Ingress) > 0 {
return service, nil
}
if i%5 == 0 {
framework.Logf("Waiting for service %s in namespace %s to have a LoadBalancer ingress point (%v)", serviceName, namespace, time.Since(start))
}
i++
}
return service, fmt.Errorf("service %s in namespace %s doesn't have a LoadBalancer ingress point after %.2f seconds", serviceName, namespace, timeout.Seconds())
}

View File

@ -481,6 +481,8 @@ var _ = framework.KubeDescribe("Services", func() {
// Change the services to LoadBalancer. // Change the services to LoadBalancer.
// Here we test that LoadBalancers can receive static IP addresses. This isn't
// necessary, but is an additional feature this monolithic test checks.
requestedIP := "" requestedIP := ""
staticIPName := "" staticIPName := ""
if framework.ProviderIs("gce", "gke") { if framework.ProviderIs("gce", "gke") {