diff --git a/test/e2e/BUILD b/test/e2e/BUILD index 32d6f8b601..55d8212c26 100644 --- a/test/e2e/BUILD +++ b/test/e2e/BUILD @@ -51,6 +51,7 @@ go_library( "federation-replicaset.go", "federation-util.go", "federation-util-14.go", + "firewall.go", "garbage_collector.go", "generated_clientset.go", "gke_local_ssd.go", diff --git a/test/e2e/firewall.go b/test/e2e/firewall.go new file mode 100644 index 0000000000..c2850b0883 --- /dev/null +++ b/test/e2e/firewall.go @@ -0,0 +1,161 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "time" + + "k8s.io/kubernetes/pkg/api/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "k8s.io/kubernetes/pkg/master/ports" + "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/test/e2e/framework" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + firewallTimeoutDefault = 3 * time.Minute + firewallTestTcpTimeout = time.Duration(1 * time.Second) + // Set ports outside of 30000-32767, 80 and 8080 to avoid being whitelisted by the e2e cluster + firewallTestHttpPort = int32(29999) + firewallTestUdpPort = int32(29998) +) + +var _ = framework.KubeDescribe("Firewall rule", func() { + var firewall_test_name = "firewall-test" + f := framework.NewDefaultFramework(firewall_test_name) + + var cs clientset.Interface + var cloudConfig framework.CloudConfig + var gceCloud *gcecloud.GCECloud + + BeforeEach(func() { + framework.SkipUnlessProviderIs("gce") + cs = f.ClientSet + cloudConfig = framework.TestContext.CloudConfig + gceCloud = cloudConfig.Provider.(*gcecloud.GCECloud) + }) + + // This test takes around 4 minutes to run + It("[Slow] [Serial] should create valid firewall rules for LoadBalancer type service", func() { + ns := f.Namespace.Name + // This source ranges is just used to examine we have exact same things on LB firewall rules + firewallTestSourceRanges := []string{"0.0.0.0/1", "128.0.0.0/1"} + serviceName := "firewall-test-loadbalancer" + + jig := NewServiceTestJig(cs, serviceName) + nodesNames := jig.GetNodesNames(maxNodesForEndpointsTests) + if len(nodesNames) <= 0 { + framework.Failf("Expect at least 1 node, got: %v", nodesNames) + } + nodesSet := sets.NewString(nodesNames...) + + // OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE + By("Creating a LoadBalancer type service with onlyLocal annotation") + svc := jig.createOnlyLocalLoadBalancerService(ns, serviceName, + loadBalancerCreateTimeoutDefault, false, func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: firewallTestHttpPort}} + svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges + }) + defer func() { + jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.LoadBalancerSourceRanges = nil + }) + Expect(cs.Core().Services(svc.Namespace).Delete(svc.Name, nil)).NotTo(HaveOccurred()) + }() + svcExternalIP := svc.Status.LoadBalancer.Ingress[0].IP + + By("Checking if service's firewall rules are correct") + nodeTags := framework.GetInstanceTags(cloudConfig, nodesNames[0]) + expFw := framework.ConstructFirewallForLBService(svc, nodeTags.Items) + fw, err := gceCloud.GetFirewall(expFw.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(framework.VerifyFirewallRule(fw, expFw, cloudConfig.Network, false)).NotTo(HaveOccurred()) + + By(fmt.Sprintf("Creating netexec pods on at most %v nodes", maxNodesForEndpointsTests)) + for i, nodeName := range nodesNames { + podName := fmt.Sprintf("netexec%v", i) + jig.LaunchNetexecPodOnNode(f, nodeName, podName, firewallTestHttpPort, firewallTestUdpPort, true) + defer func() { + framework.Logf("Cleaning up the netexec pod: %v", podName) + Expect(cs.Core().Pods(ns).Delete(podName, nil)).NotTo(HaveOccurred()) + }() + } + + // Send requests from outside of the cluster because internal traffic is whitelisted + By("Accessing the external service ip from outside, all non-master nodes should be reached") + Expect(testHitNodesFromOutside(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet)).NotTo(HaveOccurred()) + + // Check if there are overlapping tags on the firewall that extend beyond just the vms in our cluster + // by removing the tag on one vm and make sure it doesn't get any traffic. This is an imperfect + // simulation, we really want to check that traffic doesn't reach a vm outside the GKE cluster, but + // that's much harder to do in the current e2e framework. + By("Removing tags from one of the nodes") + nodesSet.Delete(nodesNames[0]) + removedTags := framework.SetInstanceTags(cloudConfig, nodesNames[0], []string{}) + defer func() { + By("Adding tags back to the node and wait till the traffic is recovered") + nodesSet.Insert(nodesNames[0]) + framework.SetInstanceTags(cloudConfig, nodesNames[0], removedTags) + // Make sure traffic is recovered before exit + Expect(testHitNodesFromOutside(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet)).NotTo(HaveOccurred()) + }() + + By("Accessing serivce through the external ip and examine got no response from the node without tags") + Expect(testHitNodesFromOutsideWithCount(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet, 15)).NotTo(HaveOccurred()) + }) + + It("should have correct firewall rules for e2e cluster", func() { + By("Gathering firewall related information") + masterTags := framework.GetInstanceTags(cloudConfig, cloudConfig.MasterName) + Expect(len(masterTags.Items)).Should(Equal(1)) + + nodes := framework.GetReadySchedulableNodesOrDie(cs) + if len(nodes.Items) <= 0 { + framework.Failf("Expect at least 1 node, got: %v", len(nodes.Items)) + } + nodeTags := framework.GetInstanceTags(cloudConfig, nodes.Items[0].Name) + Expect(len(nodeTags.Items)).Should(Equal(1)) + + By("Checking if e2e firewall rules are correct") + for _, expFw := range framework.GetE2eFirewalls(cloudConfig.MasterName, masterTags.Items[0], nodeTags.Items[0], cloudConfig.Network) { + fw, err := gceCloud.GetFirewall(expFw.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(framework.VerifyFirewallRule(fw, expFw, cloudConfig.Network, false)).NotTo(HaveOccurred()) + } + + By("Checking well known ports on master and nodes are not exposed externally") + nodeAddrs := framework.NodeAddresses(nodes, v1.NodeExternalIP) + Expect(len(nodeAddrs)).NotTo(BeZero()) + masterAddr := framework.GetMasterAddress(cs) + flag, _ := testNotReachableHTTPTimeout(masterAddr, ports.ControllerManagerPort, firewallTestTcpTimeout) + Expect(flag).To(BeTrue()) + flag, _ = testNotReachableHTTPTimeout(masterAddr, ports.SchedulerPort, firewallTestTcpTimeout) + Expect(flag).To(BeTrue()) + flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletPort, firewallTestTcpTimeout) + Expect(flag).To(BeTrue()) + flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletReadOnlyPort, firewallTestTcpTimeout) + Expect(flag).To(BeTrue()) + flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.ProxyStatusPort, firewallTestTcpTimeout) + Expect(flag).To(BeTrue()) + }) +}) diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index d90c9571bf..ef5ffa8aff 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -13,6 +13,7 @@ go_library( "cleanup.go", "exec_util.go", "federation_util.go", + "firewall_util.go", "framework.go", "get-kubemark-resource-usage.go", "google_compute.go", @@ -100,6 +101,7 @@ go_library( "//vendor:github.com/spf13/viper", "//vendor:golang.org/x/crypto/ssh", "//vendor:golang.org/x/net/websocket", + "//vendor:google.golang.org/api/compute/v1", "//vendor:google.golang.org/api/googleapi", "//vendor:gopkg.in/yaml.v2", "//vendor:k8s.io/client-go/kubernetes", diff --git a/test/e2e/framework/firewall_util.go b/test/e2e/framework/firewall_util.go new file mode 100644 index 0000000000..059e0fa077 --- /dev/null +++ b/test/e2e/framework/firewall_util.go @@ -0,0 +1,319 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "strconv" + "strings" + + "k8s.io/kubernetes/pkg/api/v1" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/cloudprovider" + gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "k8s.io/kubernetes/pkg/util/sets" + + . "github.com/onsi/gomega" + compute "google.golang.org/api/compute/v1" +) + +// MakeFirewallNameForLBService return the expected firewall name for a LB service. +// This should match the formatting of makeFirewallName() in pkg/cloudprovider/providers/gce/gce.go +func MakeFirewallNameForLBService(name string) string { + return fmt.Sprintf("k8s-fw-%s", name) +} + +// ConstructFirewallForLBService returns the expected GCE firewall rule for a loadbalancer type service +func ConstructFirewallForLBService(svc *v1.Service, nodesTags []string) *compute.Firewall { + if svc.Spec.Type != v1.ServiceTypeLoadBalancer { + Failf("can not construct firewall rule for non-loadbalancer type service") + } + fw := compute.Firewall{} + fw.Name = MakeFirewallNameForLBService(cloudprovider.GetLoadBalancerName(svc)) + fw.TargetTags = nodesTags + if svc.Spec.LoadBalancerSourceRanges == nil { + fw.SourceRanges = []string{"0.0.0.0/0"} + } else { + fw.SourceRanges = svc.Spec.LoadBalancerSourceRanges + } + for _, sp := range svc.Spec.Ports { + fw.Allowed = append(fw.Allowed, &compute.FirewallAllowed{ + IPProtocol: strings.ToLower(string(sp.Protocol)), + Ports: []string{strconv.Itoa(int(sp.Port))}, + }) + } + return &fw +} + +// GetNodeTags gets tags from one of the Kubernetes nodes +func GetNodeTags(c clientset.Interface, cloudConfig CloudConfig) *compute.Tags { + nodes := GetReadySchedulableNodesOrDie(c) + Expect(len(nodes.Items) > 0).Should(BeTrue()) + nodeTags := GetInstanceTags(cloudConfig, nodes.Items[0].Name) + return nodeTags +} + +// GetInstanceTags gets tags from GCE instance with given name. +func GetInstanceTags(cloudConfig CloudConfig, instanceName string) *compute.Tags { + gceCloud := cloudConfig.Provider.(*gcecloud.GCECloud) + res, err := gceCloud.GetComputeService().Instances.Get(cloudConfig.ProjectID, cloudConfig.Zone, + instanceName).Do() + if err != nil { + Failf("Failed to get instance tags for %v: %v", instanceName, err) + } + return res.Tags +} + +// SetInstanceTags sets tags on GCE instance with given name. +func SetInstanceTags(cloudConfig CloudConfig, instanceName string, tags []string) []string { + gceCloud := cloudConfig.Provider.(*gcecloud.GCECloud) + // Re-get instance everytime because we need the latest fingerprint for updating metadata + resTags := GetInstanceTags(cloudConfig, instanceName) + _, err := gceCloud.GetComputeService().Instances.SetTags( + cloudConfig.ProjectID, cloudConfig.Zone, instanceName, + &compute.Tags{Fingerprint: resTags.Fingerprint, Items: tags}).Do() + if err != nil { + Failf("failed to set instance tags: %v", err) + } + Logf("Sent request to set tags %v on instance: %v", tags, instanceName) + return resTags.Items +} + +// GetInstancePrefix returns the INSTANCE_PREFIX env we set for e2e cluster. +// From cluster/gce/config-test.sh, master name is set up using below format: +// MASTER_NAME="${INSTANCE_PREFIX}-master" +func GetInstancePrefix(masterName string) (string, error) { + if !strings.HasSuffix(masterName, "-master") { + return "", fmt.Errorf("unexpected master name format: %v", masterName) + } + return masterName[:len(masterName)-7], nil +} + +// GetClusterName returns the CLUSTER_NAME env we set for e2e cluster. +// From cluster/gce/config-test.sh, cluster name is set up using below format: +// CLUSTER_NAME="${CLUSTER_NAME:-${INSTANCE_PREFIX}}" +func GetClusterName(instancePrefix string) string { + return instancePrefix +} + +// GetClusterIpRange returns the CLUSTER_IP_RANGE env we set for e2e cluster. +// From cluster/gce/config-test.sh, cluster ip range is set up using below command: +// CLUSTER_IP_RANGE="${CLUSTER_IP_RANGE:-10.180.0.0/14}" +// Warning: this need to be consistent with the CLUSTER_IP_RANGE in startup scripts, +// which is hardcoded currently. +func GetClusterIpRange() string { + return "10.180.0.0/14" +} + +// GetE2eFirewalls returns all firewall rules we create for an e2e cluster. +// From cluster/gce/util.sh, all firewall rules should be consistent with the ones created by startup scripts. +func GetE2eFirewalls(masterName, masterTag, nodeTag, network string) []*compute.Firewall { + instancePrefix, err := GetInstancePrefix(masterName) + Expect(err).NotTo(HaveOccurred()) + clusterName := GetClusterName(instancePrefix) + clusterIpRange := GetClusterIpRange() + + fws := []*compute.Firewall{} + fws = append(fws, &compute.Firewall{ + Name: clusterName + "-default-internal-master", + SourceRanges: []string{"10.0.0.0/8"}, + TargetTags: []string{masterTag}, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: []string{"1-2379"}, + }, + { + IPProtocol: "tcp", + Ports: []string{"2382-65535"}, + }, + { + IPProtocol: "udp", + Ports: []string{"1-65535"}, + }, + { + IPProtocol: "icmp", + }, + }, + }) + fws = append(fws, &compute.Firewall{ + Name: clusterName + "-default-internal-node", + SourceRanges: []string{"10.0.0.0/8"}, + TargetTags: []string{nodeTag}, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: []string{"1-65535"}, + }, + { + IPProtocol: "udp", + Ports: []string{"1-65535"}, + }, + { + IPProtocol: "icmp", + }, + }, + }) + fws = append(fws, &compute.Firewall{ + Name: network + "-default-ssh", + SourceRanges: []string{"0.0.0.0/0"}, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: []string{"22"}, + }, + }, + }) + fws = append(fws, &compute.Firewall{ + Name: masterName + "-etcd", + SourceTags: []string{masterTag}, + TargetTags: []string{masterTag}, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: []string{"2380"}, + }, + { + IPProtocol: "tcp", + Ports: []string{"2381"}, + }, + }, + }) + fws = append(fws, &compute.Firewall{ + Name: masterName + "-https", + SourceRanges: []string{"0.0.0.0/0"}, + TargetTags: []string{masterTag}, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: []string{"443"}, + }, + }, + }) + fws = append(fws, &compute.Firewall{ + Name: nodeTag + "-all", + SourceRanges: []string{clusterIpRange}, + TargetTags: []string{nodeTag}, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + }, + { + IPProtocol: "udp", + }, + { + IPProtocol: "icmp", + }, + { + IPProtocol: "esp", + }, + { + IPProtocol: "ah", + }, + { + IPProtocol: "sctp", + }, + }, + }) + fws = append(fws, &compute.Firewall{ + Name: nodeTag + "-" + instancePrefix + "-http-alt", + SourceRanges: []string{"0.0.0.0/0"}, + TargetTags: []string{nodeTag}, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: []string{"80"}, + }, + { + IPProtocol: "tcp", + Ports: []string{"8080"}, + }, + }, + }) + fws = append(fws, &compute.Firewall{ + Name: nodeTag + "-" + instancePrefix + "-nodeports", + SourceRanges: []string{"0.0.0.0/0"}, + TargetTags: []string{nodeTag}, + Allowed: []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: []string{"30000-32767"}, + }, + { + IPProtocol: "udp", + Ports: []string{"30000-32767"}, + }, + }, + }) + return fws +} + +// PackProtocolsPortsFromFirewall packs protocols and ports in an unified way for verification. +func PackProtocolsPortsFromFirewall(alloweds []*compute.FirewallAllowed) []string { + protocolPorts := []string{} + for _, allowed := range alloweds { + for _, port := range allowed.Ports { + protocolPorts = append(protocolPorts, strings.ToLower(allowed.IPProtocol+"/"+port)) + } + } + return protocolPorts +} + +// SameStringArray verifies whether two string arrays have the same strings, return error if not. +// Order does not matter. +// When `include` is set to true, verifies whether result includes all elements from expected. +func SameStringArray(result, expected []string, include bool) error { + res := sets.NewString(result...) + exp := sets.NewString(expected...) + if !include { + diff := res.Difference(exp) + if len(diff) != 0 { + return fmt.Errorf("found differences: %v", diff) + } + } else { + if !res.IsSuperset(exp) { + return fmt.Errorf("some elements are missing: expected %v, got %v", expected, result) + } + } + return nil +} + +// VerifyFirewallRule verifies whether the result firewall is consistent with the expected firewall. +// When `portsSubset` is false, match given ports exactly. Otherwise, only check ports are included. +func VerifyFirewallRule(res, exp *compute.Firewall, network string, portsSubset bool) error { + if res.Name != exp.Name { + return fmt.Errorf("incorrect name: %v, expected %v", res.Name, exp.Name) + } + // Sample Network value: https://www.googleapis.com/compute/v1/projects/{project-id}/global/networks/e2e + if !strings.HasSuffix(res.Network, "/"+network) { + return fmt.Errorf("incorrect network: %v, expected ends with: %v", res.Network, "/"+network) + } + if err := SameStringArray(PackProtocolsPortsFromFirewall(res.Allowed), + PackProtocolsPortsFromFirewall(exp.Allowed), portsSubset); err != nil { + return fmt.Errorf("incorrect allowed protocols ports: %v", err) + } + if err := SameStringArray(res.SourceRanges, exp.SourceRanges, false); err != nil { + return fmt.Errorf("incorrect source ranges %v, expected %v: %v", res.SourceRanges, exp.SourceRanges, err) + } + if err := SameStringArray(res.SourceTags, exp.SourceTags, false); err != nil { + return fmt.Errorf("incorrect source tags %v, expected %v: %v", res.SourceTags, exp.SourceTags, err) + } + if err := SameStringArray(res.TargetTags, exp.TargetTags, false); err != nil { + return fmt.Errorf("incorrect target tags %v, expected %v: %v", res.TargetTags, exp.TargetTags, err) + } + return nil +} diff --git a/test/e2e/ingress.go b/test/e2e/ingress.go index ee3aaf4c86..0ad0b9eb49 100644 --- a/test/e2e/ingress.go +++ b/test/e2e/ingress.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" ) const ( @@ -119,7 +120,7 @@ var _ = framework.KubeDescribe("Loadbalancing: L7", func() { } }) - It("shoud create ingress with given static-ip ", func() { + It("shoud create ingress with given static-ip", func() { // ip released when the rest of lb resources are deleted in cleanupGCE ip := gceController.createStaticIP(ns) By(fmt.Sprintf("allocated static ip %v: %v through the GCE cloud provider", ns, ip)) @@ -136,6 +137,14 @@ var _ = framework.KubeDescribe("Loadbalancing: L7", func() { By("should reject HTTP traffic") framework.ExpectNoError(pollURL(fmt.Sprintf("http://%v/", ip), "", lbPollTimeout, jig.pollInterval, httpClient, true)) + By("should have correct firewall rule for ingress") + fw := gceController.getFirewallRule() + expFw := jig.constructFirewallForIngress(gceController) + // Passed the last argument as `true` to verify the backend ports is a subset + // of the allowed ports in firewall rule, given there may be other existing + // ingress resources and backends we are not aware of. + Expect(framework.VerifyFirewallRule(fw, expFw, gceController.cloud.Network, true)).NotTo(HaveOccurred()) + // TODO: uncomment the restart test once we have a way to synchronize // and know that the controller has resumed watching. If we delete // the ingress before the controller is ready we will leak. diff --git a/test/e2e/ingress_utils.go b/test/e2e/ingress_utils.go index 4e44a5d7ea..5c20f5c49a 100644 --- a/test/e2e/ingress_utils.go +++ b/test/e2e/ingress_utils.go @@ -34,6 +34,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "time" @@ -76,6 +77,12 @@ const ( // to split uid from other naming/metadata. clusterDelimiter = "--" + // Name of the default http backend service + defaultBackendName = "default-http-backend" + + // IP src range from which the GCE L7 performs health checks. + GCEL7SrcRange = "130.211.0.0/22" + // Cloud resources created by the ingress controller older than this // are automatically purged to prevent running out of quota. // TODO(37335): write soak tests and bump this up to a week. @@ -604,6 +611,18 @@ func (cont *GCEIngressController) canDelete(resourceName, creationTimestamp stri return false } +func (cont *GCEIngressController) getFirewallRuleName() string { + return fmt.Sprintf("%vfw-l7%v%v", k8sPrefix, clusterDelimiter, cont.UID) +} + +func (cont *GCEIngressController) getFirewallRule() *compute.Firewall { + gceCloud := cont.cloud.Provider.(*gcecloud.GCECloud) + fwName := cont.getFirewallRuleName() + fw, err := gceCloud.GetFirewall(fwName) + Expect(err).NotTo(HaveOccurred()) + return fw +} + func (cont *GCEIngressController) deleteFirewallRule(del bool) (msg string) { fwList := []compute.Firewall{} regex := fmt.Sprintf("%vfw-l7%v.*", k8sPrefix, clusterDelimiter) @@ -897,6 +916,50 @@ func (j *testJig) curlServiceNodePort(ns, name string, port int) { framework.ExpectNoError(pollURL(u, "", 30*time.Second, j.pollInterval, &http.Client{Timeout: reqTimeout}, false)) } +// getIngressNodePorts returns all related backend services' nodePorts. +// Current GCE ingress controller allows traffic to the default HTTP backend +// by default, so retrieve its nodePort as well. +func (j *testJig) getIngressNodePorts() []string { + nodePorts := []string{} + defaultSvc, err := j.client.Core().Services(api.NamespaceSystem).Get(defaultBackendName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + nodePorts = append(nodePorts, strconv.Itoa(int(defaultSvc.Spec.Ports[0].NodePort))) + + backendSvcs := []string{} + if j.ing.Spec.Backend != nil { + backendSvcs = append(backendSvcs, j.ing.Spec.Backend.ServiceName) + } + for _, rule := range j.ing.Spec.Rules { + for _, ingPath := range rule.HTTP.Paths { + backendSvcs = append(backendSvcs, ingPath.Backend.ServiceName) + } + } + for _, svcName := range backendSvcs { + svc, err := j.client.Core().Services(j.ing.Namespace).Get(svcName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + nodePorts = append(nodePorts, strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + } + return nodePorts +} + +// constructFirewallForIngress returns the expected GCE firewall rule for the ingress resource +func (j *testJig) constructFirewallForIngress(gceController *GCEIngressController) *compute.Firewall { + nodeTags := framework.GetNodeTags(j.client, gceController.cloud) + nodePorts := j.getIngressNodePorts() + + fw := compute.Firewall{} + fw.Name = gceController.getFirewallRuleName() + fw.SourceRanges = []string{GCEL7SrcRange} + fw.TargetTags = nodeTags.Items + fw.Allowed = []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: nodePorts, + }, + } + return &fw +} + // ingFromManifest reads a .json/yaml file and returns the rc in it. func ingFromManifest(fileName string) *extensions.Ingress { var ing extensions.Ingress diff --git a/test/e2e/service.go b/test/e2e/service.go index f0ad23c8d2..7fc108a65a 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -1207,7 +1207,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow]", func() { serviceName := "external-local" jig := NewServiceTestJig(cs, serviceName) - svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true) + svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true, nil) serviceLBNames = append(serviceLBNames, getLoadBalancerName(svc)) healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc)) if healthCheckNodePort == 0 { @@ -1273,7 +1273,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow]", func() { jig := NewServiceTestJig(cs, serviceName) nodes := jig.getNodes(maxNodesForEndpointsTests) - svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, false) + svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, false, nil) serviceLBNames = append(serviceLBNames, getLoadBalancerName(svc)) defer func() { jig.ChangeServiceType(svc.Namespace, svc.Name, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout) @@ -1333,7 +1333,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow]", func() { jig := NewServiceTestJig(cs, serviceName) nodes := jig.getNodes(maxNodesForEndpointsTests) - svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true) + svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true, nil) serviceLBNames = append(serviceLBNames, getLoadBalancerName(svc)) defer func() { jig.ChangeServiceType(svc.Namespace, svc.Name, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout) @@ -1382,7 +1382,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow]", func() { framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint") } - svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true) + svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true, nil) serviceLBNames = append(serviceLBNames, getLoadBalancerName(svc)) defer func() { jig.ChangeServiceType(svc.Namespace, svc.Name, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout) @@ -1744,6 +1744,10 @@ func testReachableHTTP(ip string, port int, request string, expect string) (bool } func testReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) { + return testReachableHTTPWithContentTimeout(ip, port, request, expect, content, 5*time.Second) +} + +func testReachableHTTPWithContentTimeout(ip string, port int, request string, expect string, content *bytes.Buffer, timeout time.Duration) (bool, error) { url := fmt.Sprintf("http://%s:%d%s", ip, port, request) if ip == "" { framework.Failf("Got empty IP for reachability check (%s)", url) @@ -1756,7 +1760,7 @@ func testReachableHTTPWithContent(ip string, port int, request string, expect st framework.Logf("Testing HTTP reachability of %v", url) - resp, err := httpGetNoConnectionPool(url) + resp, err := httpGetNoConnectionPoolTimeout(url, timeout) if err != nil { framework.Logf("Got error testing for reachability of %s: %v", url, err) return false, nil @@ -1809,6 +1813,10 @@ func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, err } func testNotReachableHTTP(ip string, port int) (bool, error) { + return testNotReachableHTTPTimeout(ip, port, 5*time.Second) +} + +func testNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) (bool, error) { url := fmt.Sprintf("http://%s:%d", ip, port) if ip == "" { framework.Failf("Got empty IP for non-reachability check (%s)", url) @@ -1821,7 +1829,7 @@ func testNotReachableHTTP(ip string, port int) (bool, error) { framework.Logf("Testing HTTP non-reachability of %v", url) - resp, err := httpGetNoConnectionPool(url) + resp, err := httpGetNoConnectionPoolTimeout(url, timeout) if err != nil { framework.Logf("Confirmed that %s is not reachable", url) return true, nil @@ -1914,6 +1922,45 @@ func testNotReachableUDP(ip string, port int, request string) (bool, error) { return false, nil } +func testHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error { + return testHitNodesFromOutsideWithCount(externalIP, httpPort, timeout, expectedHosts, 1) +} + +func testHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String, countToSucceed int) error { + framework.Logf("Waiting up to %v for satisfying expectedHosts for %v times", timeout, countToSucceed) + hittedHosts := sets.NewString() + count := 0 + condition := func() (bool, error) { + var respBody bytes.Buffer + reached, err := testReachableHTTPWithContentTimeout(externalIP, int(httpPort), "/hostname", "", &respBody, 1*time.Second) + if err != nil || !reached { + return false, nil + } + hittedHost := strings.TrimSpace(respBody.String()) + if !expectedHosts.Has(hittedHost) { + framework.Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count) + count = 0 + return false, nil + } + if !hittedHosts.Has(hittedHost) { + hittedHosts.Insert(hittedHost) + framework.Logf("Missing %+v, got %+v", expectedHosts.Difference(hittedHosts), hittedHosts) + } + if hittedHosts.Equal(expectedHosts) { + count++ + if count >= countToSucceed { + return true, nil + } + } + return false, nil + } + + if err := wait.Poll(time.Second, timeout, condition); err != nil { + return fmt.Errorf("error waiting for expectedHosts: %v, hittedHosts: %v, count: %v, expected count: %v", expectedHosts, hittedHosts, count, countToSucceed) + } + return nil +} + // Creates a replication controller that serves its hostname and a service on top of it. func startServeHostnameService(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, port, replicas int) ([]string, string, error) { podNames := make([]string, replicas) @@ -2089,12 +2136,16 @@ func verifyServeHostnameServiceDown(c clientset.Interface, host string, serviceI // This masks problems where the iptables rule has changed, but we don't see it // This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout func httpGetNoConnectionPool(url string) (*http.Response, error) { + return httpGetNoConnectionPoolTimeout(url, 5*time.Second) +} + +func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) { tr := utilnet.SetTransportDefaults(&http.Transport{ DisableKeepAlives: true, }) client := &http.Client{ Transport: tr, - Timeout: 5 * time.Second, + Timeout: timeout, } return client.Get(url) @@ -2226,7 +2277,8 @@ func (j *ServiceTestJig) createOnlyLocalNodePortService(namespace, serviceName s // createOnlyLocalLoadBalancerService creates a loadbalancer service and waits for it to // acquire an ingress IP. If createPod is true, it also creates an RC with 1 // replica of the standard netexec container used everywhere in this test. -func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool) *v1.Service { +func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool, + tweak func(svc *v1.Service)) *v1.Service { By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and annotation for local-traffic-only") svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeLoadBalancer @@ -2234,7 +2286,9 @@ func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceNa svc.Spec.SessionAffinity = v1.ServiceAffinityNone svc.ObjectMeta.Annotations = map[string]string{ service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} - svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: 80}} + if tweak != nil { + tweak(svc) + } }) if createPod { @@ -2286,6 +2340,15 @@ func (j *ServiceTestJig) getNodes(maxNodesForTest int) (nodes *v1.NodeList) { return nodes } +func (j *ServiceTestJig) GetNodesNames(maxNodesForTest int) []string { + nodes := j.getNodes(maxNodesForTest) + nodesNames := []string{} + for _, node := range nodes.Items { + nodesNames = append(nodesNames, node.Name) + } + return nodesNames +} + func (j *ServiceTestJig) waitForEndpointOnNode(namespace, serviceName, nodeName string) { err := wait.PollImmediate(framework.Poll, loadBalancerCreateTimeoutDefault, func() (bool, error) { endpoints, err := j.Client.Core().Endpoints(namespace).Get(serviceName, metav1.GetOptions{}) @@ -2735,6 +2798,52 @@ func (t *ServiceTestFixture) Cleanup() []error { return errs } +// newNetexecPodSpec returns the pod spec of netexec pod +func newNetexecPodSpec(podName string, httpPort, udpPort int32, hostNetwork bool) *v1.Pod { + pod := &v1.Pod{ + ObjectMeta: v1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "netexec", + Image: framework.NetexecImageName, + Command: []string{ + "/netexec", + fmt.Sprintf("--http-port=%d", httpPort), + fmt.Sprintf("--udp-port=%d", udpPort), + }, + Ports: []v1.ContainerPort{ + { + Name: "http", + ContainerPort: httpPort, + }, + { + Name: "udp", + ContainerPort: udpPort, + }, + }, + }, + }, + HostNetwork: hostNetwork, + }, + } + return pod +} + +func (j *ServiceTestJig) LaunchNetexecPodOnNode(f *framework.Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) { + framework.Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name) + pod := newNetexecPodSpec(podName, httpPort, udpPort, hostNetwork) + pod.Spec.NodeName = nodeName + pod.ObjectMeta.Labels = j.Labels + podClient := f.ClientSet.Core().Pods(f.Namespace.Name) + _, err := podClient.Create(pod) + framework.ExpectNoError(err) + framework.ExpectNoError(f.WaitForPodRunning(podName)) + framework.Logf("Netexec pod %q in namespace %q running", pod.Name, f.Namespace.Name) +} + // newEchoServerPodSpec returns the pod spec of echo server pod func newEchoServerPodSpec(podName string) *v1.Pod { port := 8080