add e2e test on the hostport predicates

pull/6/head
chenxingyu 2017-10-11 11:45:25 +08:00
parent 2d44ef9dfa
commit 954c97fe6d
9 changed files with 277 additions and 46 deletions

View File

@ -903,42 +903,9 @@ func PodFitsHostPorts(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s
existingPorts := nodeInfo.UsedPorts()
// try to see whether two hostPorts will conflict or not
for existingPort := range existingPorts {
existingHostPortInfo := decode(existingPort)
if existingHostPortInfo.hostIP == "0.0.0.0" {
// loop through all the want hostPort to see if there exists a conflict
for wantPort := range wantPorts {
wantHostPortInfo := decode(wantPort)
// if there already exists one hostPort whose hostIP is 0.0.0.0, then the other want hostport (which has the same protocol and port) will not fit
if wantHostPortInfo.hostPort == existingHostPortInfo.hostPort && wantHostPortInfo.protocol == existingHostPortInfo.protocol {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
}
}
}
}
for wantPort := range wantPorts {
wantHostPortInfo := decode(wantPort)
if wantHostPortInfo.hostIP == "0.0.0.0" {
// loop through all the existing hostPort to see if there exists a conflict
for existingPort := range existingPorts {
existingHostPortInfo := decode(existingPort)
// if there already exists one hostPort whose hostIP may be 127.0.0.1, then a hostPort (which wants 0.0.0.0 hostIP and has the same protocol and port) will not fit
if wantHostPortInfo.hostPort == existingHostPortInfo.hostPort && wantHostPortInfo.protocol == existingHostPortInfo.protocol {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
}
}
} else {
// general check hostPort conflict procedure for hostIP is not 0.0.0.0
if wantPort != "" && existingPorts[wantPort] {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
}
}
// try to see whether existingPorts and wantPorts will conflict or not
if portsConflict(existingPorts, wantPorts) {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
}
return true, nil, nil

View File

@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
schedutil "k8s.io/kubernetes/plugin/pkg/scheduler/util"
)
// FindLabelsInSet gets as many key/value pairs as possible out of a label set.
@ -98,7 +99,7 @@ type hostPortInfo struct {
hostPort string
}
// decode a string ("protocol/hostIP/hostPort") to *hostPortInfo object
// decode decodes string ("protocol/hostIP/hostPort") to *hostPortInfo object.
func decode(info string) *hostPortInfo {
hostPortInfoSlice := strings.Split(info, "/")
@ -112,3 +113,48 @@ func decode(info string) *hostPortInfo {
hostPort: hostPort,
}
}
// specialPortConflictCheck detects whether specailHostPort(whose hostIP is 0.0.0.0) is conflict with otherHostPorts.
// return true if we have a conflict.
func specialPortConflictCheck(specialHostPort string, otherHostPorts map[string]bool) bool {
specialHostPortInfo := decode(specialHostPort)
if specialHostPortInfo.hostIP == schedutil.DefaultBindAllHostIP {
// loop through all the otherHostPorts to see if there exists a conflict
for hostPortItem := range otherHostPorts {
hostPortInfo := decode(hostPortItem)
// if there exists one hostPortItem which has the same hostPort and protocol with the specialHostPort, that will cause a conflict
if specialHostPortInfo.hostPort == hostPortInfo.hostPort && specialHostPortInfo.protocol == hostPortInfo.protocol {
return true
}
}
}
return false
}
// portsConflict check whether existingPorts and wantPorts conflict with each other
// return true if we have a conflict
func portsConflict(existingPorts, wantPorts map[string]bool) bool {
for existingPort := range existingPorts {
if specialPortConflictCheck(existingPort, wantPorts) {
return true
}
}
for wantPort := range wantPorts {
if specialPortConflictCheck(wantPort, existingPorts) {
return true
}
// general check hostPort conflict procedure for hostIP is not 0.0.0.0
if existingPorts[wantPort] {
return true
}
}
return false
}

View File

@ -113,3 +113,151 @@ func Test_decode(t *testing.T) {
}
}
func Test_specialPortConflictCheck(t *testing.T) {
type args struct {
specialHostPort string
otherHostPorts map[string]bool
}
tests := []struct {
name string
args args
want bool
}{
{
name: "test-1",
args: args{
specialHostPort: "TCP/0.0.0.0/80",
otherHostPorts: map[string]bool{
"TCP/127.0.0.2/8080": true,
"TCP/127.0.0.1/80": true,
"UDP/127.0.0.2/8080": true,
},
},
want: true,
},
{
name: "test-2",
args: args{
specialHostPort: "TCP/0.0.0.0/80",
otherHostPorts: map[string]bool{
"TCP/127.0.0.2/8080": true,
"UDP/127.0.0.1/80": true,
"UDP/127.0.0.2/8080": true,
},
},
want: false,
},
{
name: "test-3",
args: args{
specialHostPort: "TCP/0.0.0.0/80",
otherHostPorts: map[string]bool{
"TCP/127.0.0.2/8080": true,
"TCP/127.0.0.1/8090": true,
"UDP/127.0.0.2/8080": true,
},
},
want: false,
},
{
name: "test-4",
args: args{
specialHostPort: "TCP/0.0.0.0/80",
otherHostPorts: map[string]bool{
"UDP/127.0.0.2/8080": true,
"UDP/127.0.0.1/8090": true,
"TCP/127.0.0.2/8080": true,
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := specialPortConflictCheck(tt.args.specialHostPort, tt.args.otherHostPorts); got != tt.want {
t.Errorf("specialPortConflictCheck() = %v, want %v", got, tt.want)
}
})
}
}
func Test_portsConflict(t *testing.T) {
type args struct {
existingPorts map[string]bool
wantPorts map[string]bool
}
tests := []struct {
name string
args args
want bool
}{
{
name: "test1",
args: args{
existingPorts: map[string]bool{
"UDP/127.0.0.1/8080": true,
},
wantPorts: map[string]bool{
"UDP/127.0.0.1/8080": true,
},
},
want: true,
},
{
name: "test2",
args: args{
existingPorts: map[string]bool{
"UDP/127.0.0.2/8080": true,
},
wantPorts: map[string]bool{
"UDP/127.0.0.1/8080": true,
},
},
want: false,
},
{
name: "test3",
args: args{
existingPorts: map[string]bool{
"TCP/127.0.0.1/8080": true,
},
wantPorts: map[string]bool{
"UDP/127.0.0.1/8080": true,
},
},
want: false,
},
{
name: "test4",
args: args{
existingPorts: map[string]bool{
"TCP/0.0.0.0/8080": true,
},
wantPorts: map[string]bool{
"TCP/127.0.0.1/8080": true,
},
},
want: true,
},
{
name: "test5",
args: args{
existingPorts: map[string]bool{
"TCP/127.0.0.1/8080": true,
},
wantPorts: map[string]bool{
"TCP/0.0.0.0/8080": true,
},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := portsConflict(tt.args.existingPorts, tt.args.wantPorts); got != tt.want {
t.Errorf("portsConflict() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -330,7 +330,6 @@ func (sched *Scheduler) scheduleOne() {
if err != nil {
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
err := sched.bind(&assumedPod, &v1.Binding{

View File

@ -256,7 +256,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
case <-waitPodExpireChan:
case <-time.After(wait.ForeverTestTimeout):
close(timeout)
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout)
}
// We use conflicted pod ports to incur fit predicate failure if first pod not removed.
@ -273,7 +273,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
}
}
@ -307,7 +307,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
t.Errorf("err want=%v, get=%v", expectErr, err)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout)
}
// We mimic the workflow of cache behavior when a pod is removed by user.
@ -334,7 +334,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
t.Errorf("binding want=%v, get=%v", expectBinding, b)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
}
}

View File

@ -228,7 +228,7 @@ func TestExpirePod(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[1]},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": false, "TCP/127.0.0.1/8080": true},
usedPorts: map[string]bool{"TCP/127.0.0.1/8080": true},
},
}}
@ -277,7 +277,7 @@ func TestAddPodWillConfirm(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{testPods[0]},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true, "TCP/127.0.0.1/8080": false},
usedPorts: map[string]bool{"TCP/127.0.0.1/80": true},
},
}}
@ -332,7 +332,7 @@ func TestAddPodWillReplaceAssumed(t *testing.T) {
},
allocatableResource: &Resource{},
pods: []*v1.Pod{updatedPod.DeepCopy()},
usedPorts: map[int]bool{90: true},
usedPorts: map[string]bool{"TCP/0.0.0.0/90": true},
},
},
}}

View File

@ -417,7 +417,7 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, used bool) {
// user does not explicitly set hostIP, default is 0.0.0.0
portHostIP := podPort.HostIP
if podPort.HostIP == "" {
portHostIP = "0.0.0.0"
portHostIP = util.DefaultBindAllHostIP
}
str := fmt.Sprintf("%s/%s/%d", portProtocol, portHostIP, podPort.HostPort)

View File

@ -24,6 +24,8 @@ import (
"k8s.io/kubernetes/pkg/apis/scheduling"
)
const DefaultBindAllHostIP = "0.0.0.0"
// GetUsedPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair
// will be in the result; but it does not resolve port conflict.
func GetUsedPorts(pods ...*v1.Pod) map[string]bool {

View File

@ -592,6 +592,54 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
WaitForSchedulerAfterAction(f, removeTaintFromNodeAction(cs, nodeName, testTaint), podNameNoTolerations, true)
verifyResult(cs, 1, 0, ns)
})
It("validates that there is no conflict between pods with same hostPort but different hostIP and protocol", func() {
nodeName := GetNodeThatCanRunPod(f)
// use nodeSelector to make sure the testing pods get assigned on the same node to explicitly verify there exists conflict or not
By("Trying to apply a random label on the found node.")
k := fmt.Sprintf("kubernetes.io/e2e-%s", string(uuid.NewUUID()))
v := "90"
nodeSelector := make(map[string]string)
nodeSelector[k] = v
framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v)
framework.ExpectNodeHasLabel(cs, nodeName, k, v)
defer framework.RemoveLabelOffNode(cs, nodeName, k)
By("Trying to create a pod(pod1) with hostport 80 and hostIP 127.0.0.1 and expect scheduled")
creatHostPortPodOnNode(f, "pod1", ns, "127.0.0.1", v1.ProtocolTCP, nodeSelector, true)
By("Trying to create another pod(pod2) with hostport 80 but hostIP 127.0.0.2 on the node which pod1 resides and expect scheduled")
creatHostPortPodOnNode(f, "pod2", ns, "127.0.0.2", v1.ProtocolTCP, nodeSelector, true)
By("Trying to create a third pod(pod3) with hostport 80, hostIP 127.0.0.2 but use UDP protocol on the node which pod2 resides")
creatHostPortPodOnNode(f, "pod3", ns, "127.0.0.2", v1.ProtocolUDP, nodeSelector, true)
})
It("validates that there exists conflict between pods with same hostPort and protocol but one using 0.0.0.0 hostIP", func() {
nodeName := GetNodeThatCanRunPod(f)
// use nodeSelector to make sure the testing pods get assigned on the same node to explicitly verify there exists conflict or not
By("Trying to apply a random label on the found node.")
k := fmt.Sprintf("kubernetes.io/e2e-%s", string(uuid.NewUUID()))
v := "95"
nodeSelector := make(map[string]string)
nodeSelector[k] = v
framework.AddOrUpdateLabelOnNode(cs, nodeName, k, v)
framework.ExpectNodeHasLabel(cs, nodeName, k, v)
defer framework.RemoveLabelOffNode(cs, nodeName, k)
By("Trying to create a pod(pod4) with hostport 80 and hostIP 0.0.0.0(empty string here) and expect scheduled")
creatHostPortPodOnNode(f, "pod4", ns, "", v1.ProtocolTCP, nodeSelector, true)
By("Trying to create another pod(pod5) with hostport 80 but hostIP 127.0.0.1 on the node which pod4 resides and expect not scheduled")
creatHostPortPodOnNode(f, "pod5", ns, "127.0.0.1", v1.ProtocolTCP, nodeSelector, false)
})
})
func initPausePod(f *framework.Framework, conf pausePodConfig) *v1.Pod {
@ -782,3 +830,24 @@ func CreateHostPortPods(f *framework.Framework, id string, replicas int, expectR
framework.ExpectNoError(err)
}
}
// create pod which using hostport on the specified node according to the nodeSelector
func creatHostPortPodOnNode(f *framework.Framework, podName, ns, hostIP string, protocol v1.Protocol, nodeSelector map[string]string, expectScheduled bool) {
createPausePod(f, pausePodConfig{
Name: podName,
Ports: []v1.ContainerPort{
{
HostPort: 80,
ContainerPort: 80,
Protocol: protocol,
HostIP: hostIP,
},
},
NodeSelector: nodeSelector,
})
err := framework.WaitForPodNotPending(f.ClientSet, ns, podName)
if expectScheduled {
framework.ExpectNoError(err)
}
}