mirror of https://github.com/k3s-io/k3s
Adds e2e firewall tests.
For LoadBalancer type service: - Verifies corresponding firewall rule has correct sourceRanges, ports & protocols, target tags. - Verifies requests can reach all expected instances. - Verifies requests can not reach instances that are not included. For Ingress resrouce: - Verifies the ingress firewall rule has correct sourceRanges, target tags and tcp ports. For general e2e cluster: - Verifies all required firewall rules has correct sourceRange, ports & protocols, source tags and target tags. - Verifies well know ports on master and nodes are not exposed externallypull/6/head
parent
e3c6ab1c8f
commit
b43e2134a2
|
@ -51,6 +51,7 @@ go_library(
|
|||
"federation-replicaset.go",
|
||||
"federation-util.go",
|
||||
"federation-util-14.go",
|
||||
"firewall.go",
|
||||
"garbage_collector.go",
|
||||
"generated_clientset.go",
|
||||
"gke_local_ssd.go",
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
const (
|
||||
firewallTimeoutDefault = 3 * time.Minute
|
||||
firewallTestTcpTimeout = time.Duration(1 * time.Second)
|
||||
// Set ports outside of 30000-32767, 80 and 8080 to avoid being whitelisted by the e2e cluster
|
||||
firewallTestHttpPort = int32(29999)
|
||||
firewallTestUdpPort = int32(29998)
|
||||
)
|
||||
|
||||
var _ = framework.KubeDescribe("Firewall rule", func() {
|
||||
var firewall_test_name = "firewall-test"
|
||||
f := framework.NewDefaultFramework(firewall_test_name)
|
||||
|
||||
var cs clientset.Interface
|
||||
var cloudConfig framework.CloudConfig
|
||||
var gceCloud *gcecloud.GCECloud
|
||||
|
||||
BeforeEach(func() {
|
||||
framework.SkipUnlessProviderIs("gce")
|
||||
cs = f.ClientSet
|
||||
cloudConfig = framework.TestContext.CloudConfig
|
||||
gceCloud = cloudConfig.Provider.(*gcecloud.GCECloud)
|
||||
})
|
||||
|
||||
// This test takes around 4 minutes to run
|
||||
It("[Slow] [Serial] should create valid firewall rules for LoadBalancer type service", func() {
|
||||
ns := f.Namespace.Name
|
||||
// This source ranges is just used to examine we have exact same things on LB firewall rules
|
||||
firewallTestSourceRanges := []string{"0.0.0.0/1", "128.0.0.0/1"}
|
||||
serviceName := "firewall-test-loadbalancer"
|
||||
|
||||
jig := NewServiceTestJig(cs, serviceName)
|
||||
nodesNames := jig.GetNodesNames(maxNodesForEndpointsTests)
|
||||
if len(nodesNames) <= 0 {
|
||||
framework.Failf("Expect at least 1 node, got: %v", nodesNames)
|
||||
}
|
||||
nodesSet := sets.NewString(nodesNames...)
|
||||
|
||||
// OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE
|
||||
By("Creating a LoadBalancer type service with onlyLocal annotation")
|
||||
svc := jig.createOnlyLocalLoadBalancerService(ns, serviceName,
|
||||
loadBalancerCreateTimeoutDefault, false, func(svc *v1.Service) {
|
||||
svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: firewallTestHttpPort}}
|
||||
svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges
|
||||
})
|
||||
defer func() {
|
||||
jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeNodePort
|
||||
svc.Spec.LoadBalancerSourceRanges = nil
|
||||
})
|
||||
Expect(cs.Core().Services(svc.Namespace).Delete(svc.Name, nil)).NotTo(HaveOccurred())
|
||||
}()
|
||||
svcExternalIP := svc.Status.LoadBalancer.Ingress[0].IP
|
||||
|
||||
By("Checking if service's firewall rules are correct")
|
||||
nodeTags := framework.GetInstanceTags(cloudConfig, nodesNames[0])
|
||||
expFw := framework.ConstructFirewallForLBService(svc, nodeTags.Items)
|
||||
fw, err := gceCloud.GetFirewall(expFw.Name)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(framework.VerifyFirewallRule(fw, expFw, cloudConfig.Network, false)).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Creating netexec pods on at most %v nodes", maxNodesForEndpointsTests))
|
||||
for i, nodeName := range nodesNames {
|
||||
podName := fmt.Sprintf("netexec%v", i)
|
||||
jig.LaunchNetexecPodOnNode(f, nodeName, podName, firewallTestHttpPort, firewallTestUdpPort, true)
|
||||
defer func() {
|
||||
framework.Logf("Cleaning up the netexec pod: %v", podName)
|
||||
Expect(cs.Core().Pods(ns).Delete(podName, nil)).NotTo(HaveOccurred())
|
||||
}()
|
||||
}
|
||||
|
||||
// Send requests from outside of the cluster because internal traffic is whitelisted
|
||||
By("Accessing the external service ip from outside, all non-master nodes should be reached")
|
||||
Expect(testHitNodesFromOutside(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet)).NotTo(HaveOccurred())
|
||||
|
||||
// Check if there are overlapping tags on the firewall that extend beyond just the vms in our cluster
|
||||
// by removing the tag on one vm and make sure it doesn't get any traffic. This is an imperfect
|
||||
// simulation, we really want to check that traffic doesn't reach a vm outside the GKE cluster, but
|
||||
// that's much harder to do in the current e2e framework.
|
||||
By("Removing tags from one of the nodes")
|
||||
nodesSet.Delete(nodesNames[0])
|
||||
removedTags := framework.SetInstanceTags(cloudConfig, nodesNames[0], []string{})
|
||||
defer func() {
|
||||
By("Adding tags back to the node and wait till the traffic is recovered")
|
||||
nodesSet.Insert(nodesNames[0])
|
||||
framework.SetInstanceTags(cloudConfig, nodesNames[0], removedTags)
|
||||
// Make sure traffic is recovered before exit
|
||||
Expect(testHitNodesFromOutside(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet)).NotTo(HaveOccurred())
|
||||
}()
|
||||
|
||||
By("Accessing serivce through the external ip and examine got no response from the node without tags")
|
||||
Expect(testHitNodesFromOutsideWithCount(svcExternalIP, firewallTestHttpPort, firewallTimeoutDefault, nodesSet, 15)).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should have correct firewall rules for e2e cluster", func() {
|
||||
By("Gathering firewall related information")
|
||||
masterTags := framework.GetInstanceTags(cloudConfig, cloudConfig.MasterName)
|
||||
Expect(len(masterTags.Items)).Should(Equal(1))
|
||||
|
||||
nodes := framework.GetReadySchedulableNodesOrDie(cs)
|
||||
if len(nodes.Items) <= 0 {
|
||||
framework.Failf("Expect at least 1 node, got: %v", len(nodes.Items))
|
||||
}
|
||||
nodeTags := framework.GetInstanceTags(cloudConfig, nodes.Items[0].Name)
|
||||
Expect(len(nodeTags.Items)).Should(Equal(1))
|
||||
|
||||
By("Checking if e2e firewall rules are correct")
|
||||
for _, expFw := range framework.GetE2eFirewalls(cloudConfig.MasterName, masterTags.Items[0], nodeTags.Items[0], cloudConfig.Network) {
|
||||
fw, err := gceCloud.GetFirewall(expFw.Name)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(framework.VerifyFirewallRule(fw, expFw, cloudConfig.Network, false)).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
By("Checking well known ports on master and nodes are not exposed externally")
|
||||
nodeAddrs := framework.NodeAddresses(nodes, v1.NodeExternalIP)
|
||||
Expect(len(nodeAddrs)).NotTo(BeZero())
|
||||
masterAddr := framework.GetMasterAddress(cs)
|
||||
flag, _ := testNotReachableHTTPTimeout(masterAddr, ports.ControllerManagerPort, firewallTestTcpTimeout)
|
||||
Expect(flag).To(BeTrue())
|
||||
flag, _ = testNotReachableHTTPTimeout(masterAddr, ports.SchedulerPort, firewallTestTcpTimeout)
|
||||
Expect(flag).To(BeTrue())
|
||||
flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletPort, firewallTestTcpTimeout)
|
||||
Expect(flag).To(BeTrue())
|
||||
flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.KubeletReadOnlyPort, firewallTestTcpTimeout)
|
||||
Expect(flag).To(BeTrue())
|
||||
flag, _ = testNotReachableHTTPTimeout(nodeAddrs[0], ports.ProxyStatusPort, firewallTestTcpTimeout)
|
||||
Expect(flag).To(BeTrue())
|
||||
})
|
||||
})
|
|
@ -13,6 +13,7 @@ go_library(
|
|||
"cleanup.go",
|
||||
"exec_util.go",
|
||||
"federation_util.go",
|
||||
"firewall_util.go",
|
||||
"framework.go",
|
||||
"get-kubemark-resource-usage.go",
|
||||
"google_compute.go",
|
||||
|
@ -100,6 +101,7 @@ go_library(
|
|||
"//vendor:github.com/spf13/viper",
|
||||
"//vendor:golang.org/x/crypto/ssh",
|
||||
"//vendor:golang.org/x/net/websocket",
|
||||
"//vendor:google.golang.org/api/compute/v1",
|
||||
"//vendor:google.golang.org/api/googleapi",
|
||||
"//vendor:gopkg.in/yaml.v2",
|
||||
"//vendor:k8s.io/client-go/kubernetes",
|
||||
|
|
|
@ -0,0 +1,319 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package framework
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
||||
. "github.com/onsi/gomega"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
)
|
||||
|
||||
// MakeFirewallNameForLBService return the expected firewall name for a LB service.
|
||||
// This should match the formatting of makeFirewallName() in pkg/cloudprovider/providers/gce/gce.go
|
||||
func MakeFirewallNameForLBService(name string) string {
|
||||
return fmt.Sprintf("k8s-fw-%s", name)
|
||||
}
|
||||
|
||||
// ConstructFirewallForLBService returns the expected GCE firewall rule for a loadbalancer type service
|
||||
func ConstructFirewallForLBService(svc *v1.Service, nodesTags []string) *compute.Firewall {
|
||||
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
|
||||
Failf("can not construct firewall rule for non-loadbalancer type service")
|
||||
}
|
||||
fw := compute.Firewall{}
|
||||
fw.Name = MakeFirewallNameForLBService(cloudprovider.GetLoadBalancerName(svc))
|
||||
fw.TargetTags = nodesTags
|
||||
if svc.Spec.LoadBalancerSourceRanges == nil {
|
||||
fw.SourceRanges = []string{"0.0.0.0/0"}
|
||||
} else {
|
||||
fw.SourceRanges = svc.Spec.LoadBalancerSourceRanges
|
||||
}
|
||||
for _, sp := range svc.Spec.Ports {
|
||||
fw.Allowed = append(fw.Allowed, &compute.FirewallAllowed{
|
||||
IPProtocol: strings.ToLower(string(sp.Protocol)),
|
||||
Ports: []string{strconv.Itoa(int(sp.Port))},
|
||||
})
|
||||
}
|
||||
return &fw
|
||||
}
|
||||
|
||||
// GetNodeTags gets tags from one of the Kubernetes nodes
|
||||
func GetNodeTags(c clientset.Interface, cloudConfig CloudConfig) *compute.Tags {
|
||||
nodes := GetReadySchedulableNodesOrDie(c)
|
||||
Expect(len(nodes.Items) > 0).Should(BeTrue())
|
||||
nodeTags := GetInstanceTags(cloudConfig, nodes.Items[0].Name)
|
||||
return nodeTags
|
||||
}
|
||||
|
||||
// GetInstanceTags gets tags from GCE instance with given name.
|
||||
func GetInstanceTags(cloudConfig CloudConfig, instanceName string) *compute.Tags {
|
||||
gceCloud := cloudConfig.Provider.(*gcecloud.GCECloud)
|
||||
res, err := gceCloud.GetComputeService().Instances.Get(cloudConfig.ProjectID, cloudConfig.Zone,
|
||||
instanceName).Do()
|
||||
if err != nil {
|
||||
Failf("Failed to get instance tags for %v: %v", instanceName, err)
|
||||
}
|
||||
return res.Tags
|
||||
}
|
||||
|
||||
// SetInstanceTags sets tags on GCE instance with given name.
|
||||
func SetInstanceTags(cloudConfig CloudConfig, instanceName string, tags []string) []string {
|
||||
gceCloud := cloudConfig.Provider.(*gcecloud.GCECloud)
|
||||
// Re-get instance everytime because we need the latest fingerprint for updating metadata
|
||||
resTags := GetInstanceTags(cloudConfig, instanceName)
|
||||
_, err := gceCloud.GetComputeService().Instances.SetTags(
|
||||
cloudConfig.ProjectID, cloudConfig.Zone, instanceName,
|
||||
&compute.Tags{Fingerprint: resTags.Fingerprint, Items: tags}).Do()
|
||||
if err != nil {
|
||||
Failf("failed to set instance tags: %v", err)
|
||||
}
|
||||
Logf("Sent request to set tags %v on instance: %v", tags, instanceName)
|
||||
return resTags.Items
|
||||
}
|
||||
|
||||
// GetInstancePrefix returns the INSTANCE_PREFIX env we set for e2e cluster.
|
||||
// From cluster/gce/config-test.sh, master name is set up using below format:
|
||||
// MASTER_NAME="${INSTANCE_PREFIX}-master"
|
||||
func GetInstancePrefix(masterName string) (string, error) {
|
||||
if !strings.HasSuffix(masterName, "-master") {
|
||||
return "", fmt.Errorf("unexpected master name format: %v", masterName)
|
||||
}
|
||||
return masterName[:len(masterName)-7], nil
|
||||
}
|
||||
|
||||
// GetClusterName returns the CLUSTER_NAME env we set for e2e cluster.
|
||||
// From cluster/gce/config-test.sh, cluster name is set up using below format:
|
||||
// CLUSTER_NAME="${CLUSTER_NAME:-${INSTANCE_PREFIX}}"
|
||||
func GetClusterName(instancePrefix string) string {
|
||||
return instancePrefix
|
||||
}
|
||||
|
||||
// GetClusterIpRange returns the CLUSTER_IP_RANGE env we set for e2e cluster.
|
||||
// From cluster/gce/config-test.sh, cluster ip range is set up using below command:
|
||||
// CLUSTER_IP_RANGE="${CLUSTER_IP_RANGE:-10.180.0.0/14}"
|
||||
// Warning: this need to be consistent with the CLUSTER_IP_RANGE in startup scripts,
|
||||
// which is hardcoded currently.
|
||||
func GetClusterIpRange() string {
|
||||
return "10.180.0.0/14"
|
||||
}
|
||||
|
||||
// GetE2eFirewalls returns all firewall rules we create for an e2e cluster.
|
||||
// From cluster/gce/util.sh, all firewall rules should be consistent with the ones created by startup scripts.
|
||||
func GetE2eFirewalls(masterName, masterTag, nodeTag, network string) []*compute.Firewall {
|
||||
instancePrefix, err := GetInstancePrefix(masterName)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
clusterName := GetClusterName(instancePrefix)
|
||||
clusterIpRange := GetClusterIpRange()
|
||||
|
||||
fws := []*compute.Firewall{}
|
||||
fws = append(fws, &compute.Firewall{
|
||||
Name: clusterName + "-default-internal-master",
|
||||
SourceRanges: []string{"10.0.0.0/8"},
|
||||
TargetTags: []string{masterTag},
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"1-2379"},
|
||||
},
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"2382-65535"},
|
||||
},
|
||||
{
|
||||
IPProtocol: "udp",
|
||||
Ports: []string{"1-65535"},
|
||||
},
|
||||
{
|
||||
IPProtocol: "icmp",
|
||||
},
|
||||
},
|
||||
})
|
||||
fws = append(fws, &compute.Firewall{
|
||||
Name: clusterName + "-default-internal-node",
|
||||
SourceRanges: []string{"10.0.0.0/8"},
|
||||
TargetTags: []string{nodeTag},
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"1-65535"},
|
||||
},
|
||||
{
|
||||
IPProtocol: "udp",
|
||||
Ports: []string{"1-65535"},
|
||||
},
|
||||
{
|
||||
IPProtocol: "icmp",
|
||||
},
|
||||
},
|
||||
})
|
||||
fws = append(fws, &compute.Firewall{
|
||||
Name: network + "-default-ssh",
|
||||
SourceRanges: []string{"0.0.0.0/0"},
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"22"},
|
||||
},
|
||||
},
|
||||
})
|
||||
fws = append(fws, &compute.Firewall{
|
||||
Name: masterName + "-etcd",
|
||||
SourceTags: []string{masterTag},
|
||||
TargetTags: []string{masterTag},
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"2380"},
|
||||
},
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"2381"},
|
||||
},
|
||||
},
|
||||
})
|
||||
fws = append(fws, &compute.Firewall{
|
||||
Name: masterName + "-https",
|
||||
SourceRanges: []string{"0.0.0.0/0"},
|
||||
TargetTags: []string{masterTag},
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"443"},
|
||||
},
|
||||
},
|
||||
})
|
||||
fws = append(fws, &compute.Firewall{
|
||||
Name: nodeTag + "-all",
|
||||
SourceRanges: []string{clusterIpRange},
|
||||
TargetTags: []string{nodeTag},
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
},
|
||||
{
|
||||
IPProtocol: "udp",
|
||||
},
|
||||
{
|
||||
IPProtocol: "icmp",
|
||||
},
|
||||
{
|
||||
IPProtocol: "esp",
|
||||
},
|
||||
{
|
||||
IPProtocol: "ah",
|
||||
},
|
||||
{
|
||||
IPProtocol: "sctp",
|
||||
},
|
||||
},
|
||||
})
|
||||
fws = append(fws, &compute.Firewall{
|
||||
Name: nodeTag + "-" + instancePrefix + "-http-alt",
|
||||
SourceRanges: []string{"0.0.0.0/0"},
|
||||
TargetTags: []string{nodeTag},
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"80"},
|
||||
},
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"8080"},
|
||||
},
|
||||
},
|
||||
})
|
||||
fws = append(fws, &compute.Firewall{
|
||||
Name: nodeTag + "-" + instancePrefix + "-nodeports",
|
||||
SourceRanges: []string{"0.0.0.0/0"},
|
||||
TargetTags: []string{nodeTag},
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: []string{"30000-32767"},
|
||||
},
|
||||
{
|
||||
IPProtocol: "udp",
|
||||
Ports: []string{"30000-32767"},
|
||||
},
|
||||
},
|
||||
})
|
||||
return fws
|
||||
}
|
||||
|
||||
// PackProtocolsPortsFromFirewall packs protocols and ports in an unified way for verification.
|
||||
func PackProtocolsPortsFromFirewall(alloweds []*compute.FirewallAllowed) []string {
|
||||
protocolPorts := []string{}
|
||||
for _, allowed := range alloweds {
|
||||
for _, port := range allowed.Ports {
|
||||
protocolPorts = append(protocolPorts, strings.ToLower(allowed.IPProtocol+"/"+port))
|
||||
}
|
||||
}
|
||||
return protocolPorts
|
||||
}
|
||||
|
||||
// SameStringArray verifies whether two string arrays have the same strings, return error if not.
|
||||
// Order does not matter.
|
||||
// When `include` is set to true, verifies whether result includes all elements from expected.
|
||||
func SameStringArray(result, expected []string, include bool) error {
|
||||
res := sets.NewString(result...)
|
||||
exp := sets.NewString(expected...)
|
||||
if !include {
|
||||
diff := res.Difference(exp)
|
||||
if len(diff) != 0 {
|
||||
return fmt.Errorf("found differences: %v", diff)
|
||||
}
|
||||
} else {
|
||||
if !res.IsSuperset(exp) {
|
||||
return fmt.Errorf("some elements are missing: expected %v, got %v", expected, result)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// VerifyFirewallRule verifies whether the result firewall is consistent with the expected firewall.
|
||||
// When `portsSubset` is false, match given ports exactly. Otherwise, only check ports are included.
|
||||
func VerifyFirewallRule(res, exp *compute.Firewall, network string, portsSubset bool) error {
|
||||
if res.Name != exp.Name {
|
||||
return fmt.Errorf("incorrect name: %v, expected %v", res.Name, exp.Name)
|
||||
}
|
||||
// Sample Network value: https://www.googleapis.com/compute/v1/projects/{project-id}/global/networks/e2e
|
||||
if !strings.HasSuffix(res.Network, "/"+network) {
|
||||
return fmt.Errorf("incorrect network: %v, expected ends with: %v", res.Network, "/"+network)
|
||||
}
|
||||
if err := SameStringArray(PackProtocolsPortsFromFirewall(res.Allowed),
|
||||
PackProtocolsPortsFromFirewall(exp.Allowed), portsSubset); err != nil {
|
||||
return fmt.Errorf("incorrect allowed protocols ports: %v", err)
|
||||
}
|
||||
if err := SameStringArray(res.SourceRanges, exp.SourceRanges, false); err != nil {
|
||||
return fmt.Errorf("incorrect source ranges %v, expected %v: %v", res.SourceRanges, exp.SourceRanges, err)
|
||||
}
|
||||
if err := SameStringArray(res.SourceTags, exp.SourceTags, false); err != nil {
|
||||
return fmt.Errorf("incorrect source tags %v, expected %v: %v", res.SourceTags, exp.SourceTags, err)
|
||||
}
|
||||
if err := SameStringArray(res.TargetTags, exp.TargetTags, false); err != nil {
|
||||
return fmt.Errorf("incorrect target tags %v, expected %v: %v", res.TargetTags, exp.TargetTags, err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -24,6 +24,7 @@ import (
|
|||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -136,6 +137,14 @@ var _ = framework.KubeDescribe("Loadbalancing: L7", func() {
|
|||
By("should reject HTTP traffic")
|
||||
framework.ExpectNoError(pollURL(fmt.Sprintf("http://%v/", ip), "", lbPollTimeout, jig.pollInterval, httpClient, true))
|
||||
|
||||
By("should have correct firewall rule for ingress")
|
||||
fw := gceController.getFirewallRule()
|
||||
expFw := jig.constructFirewallForIngress(gceController)
|
||||
// Passed the last argument as `true` to verify the backend ports is a subset
|
||||
// of the allowed ports in firewall rule, given there may be other existing
|
||||
// ingress resources and backends we are not aware of.
|
||||
Expect(framework.VerifyFirewallRule(fw, expFw, gceController.cloud.Network, true)).NotTo(HaveOccurred())
|
||||
|
||||
// TODO: uncomment the restart test once we have a way to synchronize
|
||||
// and know that the controller has resumed watching. If we delete
|
||||
// the ingress before the controller is ready we will leak.
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -76,6 +77,12 @@ const (
|
|||
// to split uid from other naming/metadata.
|
||||
clusterDelimiter = "--"
|
||||
|
||||
// Name of the default http backend service
|
||||
defaultBackendName = "default-http-backend"
|
||||
|
||||
// IP src range from which the GCE L7 performs health checks.
|
||||
GCEL7SrcRange = "130.211.0.0/22"
|
||||
|
||||
// Cloud resources created by the ingress controller older than this
|
||||
// are automatically purged to prevent running out of quota.
|
||||
// TODO(37335): write soak tests and bump this up to a week.
|
||||
|
@ -604,6 +611,18 @@ func (cont *GCEIngressController) canDelete(resourceName, creationTimestamp stri
|
|||
return false
|
||||
}
|
||||
|
||||
func (cont *GCEIngressController) getFirewallRuleName() string {
|
||||
return fmt.Sprintf("%vfw-l7%v%v", k8sPrefix, clusterDelimiter, cont.UID)
|
||||
}
|
||||
|
||||
func (cont *GCEIngressController) getFirewallRule() *compute.Firewall {
|
||||
gceCloud := cont.cloud.Provider.(*gcecloud.GCECloud)
|
||||
fwName := cont.getFirewallRuleName()
|
||||
fw, err := gceCloud.GetFirewall(fwName)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
return fw
|
||||
}
|
||||
|
||||
func (cont *GCEIngressController) deleteFirewallRule(del bool) (msg string) {
|
||||
fwList := []compute.Firewall{}
|
||||
regex := fmt.Sprintf("%vfw-l7%v.*", k8sPrefix, clusterDelimiter)
|
||||
|
@ -897,6 +916,50 @@ func (j *testJig) curlServiceNodePort(ns, name string, port int) {
|
|||
framework.ExpectNoError(pollURL(u, "", 30*time.Second, j.pollInterval, &http.Client{Timeout: reqTimeout}, false))
|
||||
}
|
||||
|
||||
// getIngressNodePorts returns all related backend services' nodePorts.
|
||||
// Current GCE ingress controller allows traffic to the default HTTP backend
|
||||
// by default, so retrieve its nodePort as well.
|
||||
func (j *testJig) getIngressNodePorts() []string {
|
||||
nodePorts := []string{}
|
||||
defaultSvc, err := j.client.Core().Services(api.NamespaceSystem).Get(defaultBackendName, metav1.GetOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
nodePorts = append(nodePorts, strconv.Itoa(int(defaultSvc.Spec.Ports[0].NodePort)))
|
||||
|
||||
backendSvcs := []string{}
|
||||
if j.ing.Spec.Backend != nil {
|
||||
backendSvcs = append(backendSvcs, j.ing.Spec.Backend.ServiceName)
|
||||
}
|
||||
for _, rule := range j.ing.Spec.Rules {
|
||||
for _, ingPath := range rule.HTTP.Paths {
|
||||
backendSvcs = append(backendSvcs, ingPath.Backend.ServiceName)
|
||||
}
|
||||
}
|
||||
for _, svcName := range backendSvcs {
|
||||
svc, err := j.client.Core().Services(j.ing.Namespace).Get(svcName, metav1.GetOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
nodePorts = append(nodePorts, strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
|
||||
}
|
||||
return nodePorts
|
||||
}
|
||||
|
||||
// constructFirewallForIngress returns the expected GCE firewall rule for the ingress resource
|
||||
func (j *testJig) constructFirewallForIngress(gceController *GCEIngressController) *compute.Firewall {
|
||||
nodeTags := framework.GetNodeTags(j.client, gceController.cloud)
|
||||
nodePorts := j.getIngressNodePorts()
|
||||
|
||||
fw := compute.Firewall{}
|
||||
fw.Name = gceController.getFirewallRuleName()
|
||||
fw.SourceRanges = []string{GCEL7SrcRange}
|
||||
fw.TargetTags = nodeTags.Items
|
||||
fw.Allowed = []*compute.FirewallAllowed{
|
||||
{
|
||||
IPProtocol: "tcp",
|
||||
Ports: nodePorts,
|
||||
},
|
||||
}
|
||||
return &fw
|
||||
}
|
||||
|
||||
// ingFromManifest reads a .json/yaml file and returns the rc in it.
|
||||
func ingFromManifest(fileName string) *extensions.Ingress {
|
||||
var ing extensions.Ingress
|
||||
|
|
|
@ -1207,7 +1207,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow]", func() {
|
|||
serviceName := "external-local"
|
||||
jig := NewServiceTestJig(cs, serviceName)
|
||||
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true)
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true, nil)
|
||||
serviceLBNames = append(serviceLBNames, getLoadBalancerName(svc))
|
||||
healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc))
|
||||
if healthCheckNodePort == 0 {
|
||||
|
@ -1273,7 +1273,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow]", func() {
|
|||
jig := NewServiceTestJig(cs, serviceName)
|
||||
nodes := jig.getNodes(maxNodesForEndpointsTests)
|
||||
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, false)
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, false, nil)
|
||||
serviceLBNames = append(serviceLBNames, getLoadBalancerName(svc))
|
||||
defer func() {
|
||||
jig.ChangeServiceType(svc.Namespace, svc.Name, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
|
||||
|
@ -1333,7 +1333,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow]", func() {
|
|||
jig := NewServiceTestJig(cs, serviceName)
|
||||
nodes := jig.getNodes(maxNodesForEndpointsTests)
|
||||
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true)
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true, nil)
|
||||
serviceLBNames = append(serviceLBNames, getLoadBalancerName(svc))
|
||||
defer func() {
|
||||
jig.ChangeServiceType(svc.Namespace, svc.Name, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
|
||||
|
@ -1382,7 +1382,7 @@ var _ = framework.KubeDescribe("ESIPP [Slow]", func() {
|
|||
framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint")
|
||||
}
|
||||
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true)
|
||||
svc := jig.createOnlyLocalLoadBalancerService(namespace, serviceName, loadBalancerCreateTimeout, true, nil)
|
||||
serviceLBNames = append(serviceLBNames, getLoadBalancerName(svc))
|
||||
defer func() {
|
||||
jig.ChangeServiceType(svc.Namespace, svc.Name, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout)
|
||||
|
@ -1744,6 +1744,10 @@ func testReachableHTTP(ip string, port int, request string, expect string) (bool
|
|||
}
|
||||
|
||||
func testReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) {
|
||||
return testReachableHTTPWithContentTimeout(ip, port, request, expect, content, 5*time.Second)
|
||||
}
|
||||
|
||||
func testReachableHTTPWithContentTimeout(ip string, port int, request string, expect string, content *bytes.Buffer, timeout time.Duration) (bool, error) {
|
||||
url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
|
||||
if ip == "" {
|
||||
framework.Failf("Got empty IP for reachability check (%s)", url)
|
||||
|
@ -1756,7 +1760,7 @@ func testReachableHTTPWithContent(ip string, port int, request string, expect st
|
|||
|
||||
framework.Logf("Testing HTTP reachability of %v", url)
|
||||
|
||||
resp, err := httpGetNoConnectionPool(url)
|
||||
resp, err := httpGetNoConnectionPoolTimeout(url, timeout)
|
||||
if err != nil {
|
||||
framework.Logf("Got error testing for reachability of %s: %v", url, err)
|
||||
return false, nil
|
||||
|
@ -1809,6 +1813,10 @@ func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, err
|
|||
}
|
||||
|
||||
func testNotReachableHTTP(ip string, port int) (bool, error) {
|
||||
return testNotReachableHTTPTimeout(ip, port, 5*time.Second)
|
||||
}
|
||||
|
||||
func testNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) (bool, error) {
|
||||
url := fmt.Sprintf("http://%s:%d", ip, port)
|
||||
if ip == "" {
|
||||
framework.Failf("Got empty IP for non-reachability check (%s)", url)
|
||||
|
@ -1821,7 +1829,7 @@ func testNotReachableHTTP(ip string, port int) (bool, error) {
|
|||
|
||||
framework.Logf("Testing HTTP non-reachability of %v", url)
|
||||
|
||||
resp, err := httpGetNoConnectionPool(url)
|
||||
resp, err := httpGetNoConnectionPoolTimeout(url, timeout)
|
||||
if err != nil {
|
||||
framework.Logf("Confirmed that %s is not reachable", url)
|
||||
return true, nil
|
||||
|
@ -1914,6 +1922,45 @@ func testNotReachableUDP(ip string, port int, request string) (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func testHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error {
|
||||
return testHitNodesFromOutsideWithCount(externalIP, httpPort, timeout, expectedHosts, 1)
|
||||
}
|
||||
|
||||
func testHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String, countToSucceed int) error {
|
||||
framework.Logf("Waiting up to %v for satisfying expectedHosts for %v times", timeout, countToSucceed)
|
||||
hittedHosts := sets.NewString()
|
||||
count := 0
|
||||
condition := func() (bool, error) {
|
||||
var respBody bytes.Buffer
|
||||
reached, err := testReachableHTTPWithContentTimeout(externalIP, int(httpPort), "/hostname", "", &respBody, 1*time.Second)
|
||||
if err != nil || !reached {
|
||||
return false, nil
|
||||
}
|
||||
hittedHost := strings.TrimSpace(respBody.String())
|
||||
if !expectedHosts.Has(hittedHost) {
|
||||
framework.Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count)
|
||||
count = 0
|
||||
return false, nil
|
||||
}
|
||||
if !hittedHosts.Has(hittedHost) {
|
||||
hittedHosts.Insert(hittedHost)
|
||||
framework.Logf("Missing %+v, got %+v", expectedHosts.Difference(hittedHosts), hittedHosts)
|
||||
}
|
||||
if hittedHosts.Equal(expectedHosts) {
|
||||
count++
|
||||
if count >= countToSucceed {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err := wait.Poll(time.Second, timeout, condition); err != nil {
|
||||
return fmt.Errorf("error waiting for expectedHosts: %v, hittedHosts: %v, count: %v, expected count: %v", expectedHosts, hittedHosts, count, countToSucceed)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Creates a replication controller that serves its hostname and a service on top of it.
|
||||
func startServeHostnameService(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, port, replicas int) ([]string, string, error) {
|
||||
podNames := make([]string, replicas)
|
||||
|
@ -2089,12 +2136,16 @@ func verifyServeHostnameServiceDown(c clientset.Interface, host string, serviceI
|
|||
// This masks problems where the iptables rule has changed, but we don't see it
|
||||
// This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout
|
||||
func httpGetNoConnectionPool(url string) (*http.Response, error) {
|
||||
return httpGetNoConnectionPoolTimeout(url, 5*time.Second)
|
||||
}
|
||||
|
||||
func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
|
||||
tr := utilnet.SetTransportDefaults(&http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
})
|
||||
client := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: 5 * time.Second,
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
return client.Get(url)
|
||||
|
@ -2226,7 +2277,8 @@ func (j *ServiceTestJig) createOnlyLocalNodePortService(namespace, serviceName s
|
|||
// createOnlyLocalLoadBalancerService creates a loadbalancer service and waits for it to
|
||||
// acquire an ingress IP. If createPod is true, it also creates an RC with 1
|
||||
// replica of the standard netexec container used everywhere in this test.
|
||||
func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool) *v1.Service {
|
||||
func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceName string, timeout time.Duration, createPod bool,
|
||||
tweak func(svc *v1.Service)) *v1.Service {
|
||||
By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer and annotation for local-traffic-only")
|
||||
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
|
||||
svc.Spec.Type = v1.ServiceTypeLoadBalancer
|
||||
|
@ -2234,7 +2286,9 @@ func (j *ServiceTestJig) createOnlyLocalLoadBalancerService(namespace, serviceNa
|
|||
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
|
||||
svc.ObjectMeta.Annotations = map[string]string{
|
||||
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
|
||||
svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: 80}}
|
||||
if tweak != nil {
|
||||
tweak(svc)
|
||||
}
|
||||
})
|
||||
|
||||
if createPod {
|
||||
|
@ -2286,6 +2340,15 @@ func (j *ServiceTestJig) getNodes(maxNodesForTest int) (nodes *v1.NodeList) {
|
|||
return nodes
|
||||
}
|
||||
|
||||
func (j *ServiceTestJig) GetNodesNames(maxNodesForTest int) []string {
|
||||
nodes := j.getNodes(maxNodesForTest)
|
||||
nodesNames := []string{}
|
||||
for _, node := range nodes.Items {
|
||||
nodesNames = append(nodesNames, node.Name)
|
||||
}
|
||||
return nodesNames
|
||||
}
|
||||
|
||||
func (j *ServiceTestJig) waitForEndpointOnNode(namespace, serviceName, nodeName string) {
|
||||
err := wait.PollImmediate(framework.Poll, loadBalancerCreateTimeoutDefault, func() (bool, error) {
|
||||
endpoints, err := j.Client.Core().Endpoints(namespace).Get(serviceName, metav1.GetOptions{})
|
||||
|
@ -2735,6 +2798,52 @@ func (t *ServiceTestFixture) Cleanup() []error {
|
|||
return errs
|
||||
}
|
||||
|
||||
// newNetexecPodSpec returns the pod spec of netexec pod
|
||||
func newNetexecPodSpec(podName string, httpPort, udpPort int32, hostNetwork bool) *v1.Pod {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: podName,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "netexec",
|
||||
Image: framework.NetexecImageName,
|
||||
Command: []string{
|
||||
"/netexec",
|
||||
fmt.Sprintf("--http-port=%d", httpPort),
|
||||
fmt.Sprintf("--udp-port=%d", udpPort),
|
||||
},
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
Name: "http",
|
||||
ContainerPort: httpPort,
|
||||
},
|
||||
{
|
||||
Name: "udp",
|
||||
ContainerPort: udpPort,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
HostNetwork: hostNetwork,
|
||||
},
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
func (j *ServiceTestJig) LaunchNetexecPodOnNode(f *framework.Framework, nodeName, podName string, httpPort, udpPort int32, hostNetwork bool) {
|
||||
framework.Logf("Creating netexec pod %q on node %v in namespace %q", podName, nodeName, f.Namespace.Name)
|
||||
pod := newNetexecPodSpec(podName, httpPort, udpPort, hostNetwork)
|
||||
pod.Spec.NodeName = nodeName
|
||||
pod.ObjectMeta.Labels = j.Labels
|
||||
podClient := f.ClientSet.Core().Pods(f.Namespace.Name)
|
||||
_, err := podClient.Create(pod)
|
||||
framework.ExpectNoError(err)
|
||||
framework.ExpectNoError(f.WaitForPodRunning(podName))
|
||||
framework.Logf("Netexec pod %q in namespace %q running", pod.Name, f.Namespace.Name)
|
||||
}
|
||||
|
||||
// newEchoServerPodSpec returns the pod spec of echo server pod
|
||||
func newEchoServerPodSpec(podName string) *v1.Pod {
|
||||
port := 8080
|
||||
|
|
Loading…
Reference in New Issue