Update winkernel proxy for overlay

pull/564/head
ksubrmnn 2019-01-11 14:46:46 -08:00
parent 164f79e2d4
commit b724bdb19a
6 changed files with 1665 additions and 182 deletions

View File

@ -1,8 +1,10 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"hnsV1.go",
"hnsV2.go",
"metrics.go",
"proxier.go",
],
@ -15,6 +17,7 @@ go_library(
"//pkg/api/v1/service:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/apis/config:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/util/async:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -23,6 +26,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/Microsoft/hcsshim:go_default_library",
"//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
@ -43,3 +47,22 @@ filegroup(
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = [
"hns_test.go",
"proxier_test.go",
],
embed = [":go_default_library"],
deps = select({
"@io_bazel_rules_go//go/platform:windows": [
"//pkg/proxy:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library",
],
"//conditions:default": [],
}),
)

View File

@ -0,0 +1,225 @@
// +build windows
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winkernel
import (
"encoding/json"
"fmt"
"github.com/Microsoft/hcsshim"
"k8s.io/klog"
"net"
"strings"
)
type HostNetworkService interface {
getNetworkByName(name string) (*hnsNetworkInfo, error)
getEndpointByID(id string) (*endpointsInfo, error)
getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error)
createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error)
deleteEndpoint(hnsID string) error
getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error)
deleteLoadBalancer(hnsID string) error
}
// V1 HNS API
type hnsV1 struct{}
func (hns hnsV1) getNetworkByName(name string) (*hnsNetworkInfo, error) {
hnsnetwork, err := hcsshim.GetHNSNetworkByName(name)
if err != nil {
klog.Errorf("%v", err)
return nil, err
}
return &hnsNetworkInfo{
id: hnsnetwork.Id,
name: hnsnetwork.Name,
networkType: hnsnetwork.Type,
}, nil
}
func (hns hnsV1) getEndpointByID(id string) (*endpointsInfo, error) {
hnsendpoint, err := hcsshim.GetHNSEndpointByID(id)
if err != nil {
klog.Errorf("%v", err)
return nil, err
}
return &endpointsInfo{
ip: hnsendpoint.IPAddress.String(),
isLocal: !hnsendpoint.IsRemoteEndpoint, //TODO: Change isLocal to isRemote
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
}, nil
}
func (hns hnsV1) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
hnsnetwork, err := hcsshim.GetHNSNetworkByName(networkName)
if err != nil {
klog.Errorf("%v", err)
return nil, err
}
endpoints, err := hcsshim.HNSListEndpointRequest()
for _, endpoint := range endpoints {
equal := false
if endpoint.IPAddress != nil {
equal = endpoint.IPAddress.String() == ip
}
if equal && strings.EqualFold(endpoint.VirtualNetwork, hnsnetwork.Id) {
return &endpointsInfo{
ip: endpoint.IPAddress.String(),
isLocal: !endpoint.IsRemoteEndpoint,
macAddress: endpoint.MacAddress,
hnsID: endpoint.Id,
hns: hns,
}, nil
}
}
return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}
func (hns hnsV1) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
hnsNetwork, err := hcsshim.GetHNSNetworkByName(networkName)
if err != nil {
return nil, fmt.Errorf("Could not find network %s: %v", networkName, err)
}
hnsEndpoint := &hcsshim.HNSEndpoint{
MacAddress: ep.macAddress,
IPAddress: net.ParseIP(ep.ip),
}
var createdEndpoint *hcsshim.HNSEndpoint
if !ep.isLocal {
if len(ep.providerAddress) != 0 {
paPolicy := hcsshim.PaPolicy{
Type: hcsshim.PA,
PA: ep.providerAddress,
}
paPolicyJson, err := json.Marshal(paPolicy)
if err != nil {
return nil, fmt.Errorf("PA Policy creation failed: %v", err)
}
hnsEndpoint.Policies = append(hnsEndpoint.Policies, paPolicyJson)
}
createdEndpoint, err = hnsNetwork.CreateRemoteEndpoint(hnsEndpoint)
if err != nil {
return nil, fmt.Errorf("Remote endpoint creation failed: %v", err)
}
} else {
createdEndpoint, err = hnsNetwork.CreateEndpoint(hnsEndpoint)
if err != nil {
return nil, fmt.Errorf("Local endpoint creation failed: %v", err)
}
}
return &endpointsInfo{
ip: createdEndpoint.IPAddress.String(),
isLocal: createdEndpoint.IsRemoteEndpoint,
macAddress: createdEndpoint.MacAddress,
hnsID: createdEndpoint.Id,
providerAddress: ep.providerAddress, //TODO get from createdEndpoint
hns: hns,
}, nil
}
func (hns hnsV1) deleteEndpoint(hnsID string) error {
hnsendpoint, err := hcsshim.GetHNSEndpointByID(hnsID)
if err != nil {
return err
}
_, err = hnsendpoint.Delete()
if err == nil {
klog.V(3).Infof("Remote endpoint resource deleted id %s", hnsID)
}
return err
}
func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
plists, err := hcsshim.HNSListPolicyListRequest()
if err != nil {
return nil, err
}
if isDSR {
klog.V(3).Info("DSR is not supported in V1. Using non DSR instead")
}
for _, plist := range plists {
if len(plist.EndpointReferences) != len(endpoints) {
continue
}
// Validate if input meets any of the policy lists
elbPolicy := hcsshim.ELBPolicy{}
if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil {
continue
}
if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == isILB {
if len(vip) > 0 {
if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip {
continue
}
}
LogJson(plist, "Found existing Hns loadbalancer policy resource", 1)
return &loadBalancerInfo{
hnsID: plist.ID,
}, nil
}
}
var hnsEndpoints []hcsshim.HNSEndpoint
for _, ep := range endpoints {
endpoint, err := hcsshim.GetHNSEndpointByID(ep.hnsID)
if err != nil {
return nil, err
}
hnsEndpoints = append(hnsEndpoints, *endpoint)
}
lb, err := hcsshim.AddLoadBalancer(
hnsEndpoints,
isILB,
sourceVip,
vip,
protocol,
internalPort,
externalPort,
)
if err == nil {
LogJson(lb, "Hns loadbalancer policy resource", 1)
} else {
return nil, err
}
return &loadBalancerInfo{
hnsID: lb.ID,
}, err
}
func (hns hnsV1) deleteLoadBalancer(hnsID string) error {
if len(hnsID) == 0 {
// Return silently
return nil
}
// Cleanup HNS policies
hnsloadBalancer, err := hcsshim.GetPolicyListByID(hnsID)
if err != nil {
return err
}
LogJson(hnsloadBalancer, "Removing Policy", 2)
_, err = hnsloadBalancer.Delete()
return err
}

View File

@ -0,0 +1,239 @@
// +build windows
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winkernel
import (
"encoding/json"
"fmt"
"github.com/Microsoft/hcsshim/hcn"
"k8s.io/klog"
"strings"
)
type hnsV2 struct{}
func (hns hnsV2) getNetworkByName(name string) (*hnsNetworkInfo, error) {
hnsnetwork, err := hcn.GetNetworkByName(name)
if err != nil {
klog.Errorf("%v", err)
return nil, err
}
var remoteSubnets []*remoteSubnetInfo
for _, policy := range hnsnetwork.Policies {
if policy.Type == hcn.RemoteSubnetRoute {
policySettings := hcn.RemoteSubnetRoutePolicySetting{}
err = json.Unmarshal(policy.Settings, &policySettings)
if err != nil {
return nil, fmt.Errorf("Failed to unmarshal Remote Subnet policy settings")
}
rs := &remoteSubnetInfo{
destinationPrefix: policySettings.DestinationPrefix,
isolationId: policySettings.IsolationId,
providerAddress: policySettings.ProviderAddress,
drMacAddress: policySettings.DistributedRouterMacAddress,
}
remoteSubnets = append(remoteSubnets, rs)
}
}
return &hnsNetworkInfo{
id: hnsnetwork.Id,
name: hnsnetwork.Name,
networkType: string(hnsnetwork.Type),
remoteSubnets: remoteSubnets,
}, nil
}
func (hns hnsV2) getEndpointByID(id string) (*endpointsInfo, error) {
hnsendpoint, err := hcn.GetEndpointByID(id)
if err != nil {
return nil, err
}
return &endpointsInfo{ //TODO: fill out PA
ip: hnsendpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
macAddress: hnsendpoint.MacAddress,
hnsID: hnsendpoint.Id,
hns: hns,
}, nil
}
func (hns hnsV2) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
hnsnetwork, err := hcn.GetNetworkByName(networkName)
if err != nil {
klog.Errorf("%v", err)
return nil, err
}
endpoints, err := hcn.ListEndpoints()
for _, endpoint := range endpoints {
equal := false
if endpoint.IpConfigurations != nil && len(endpoint.IpConfigurations) > 0 {
equal = endpoint.IpConfigurations[0].IpAddress == ip
}
if equal && strings.EqualFold(endpoint.HostComputeNetwork, hnsnetwork.Id) {
return &endpointsInfo{
ip: endpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(endpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote
macAddress: endpoint.MacAddress,
hnsID: endpoint.Id,
hns: hns,
}, nil
}
}
return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}
func (hns hnsV2) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
hnsNetwork, err := hcn.GetNetworkByName(networkName)
if err != nil {
return nil, fmt.Errorf("Could not find network %s: %v", networkName, err)
}
var flags hcn.EndpointFlags
if !ep.isLocal {
flags |= hcn.EndpointFlagsRemoteEndpoint
}
ipConfig := &hcn.IpConfig{
IpAddress: ep.ip,
}
hnsEndpoint := &hcn.HostComputeEndpoint{
IpConfigurations: []hcn.IpConfig{*ipConfig},
MacAddress: ep.macAddress,
Flags: flags,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
}
var createdEndpoint *hcn.HostComputeEndpoint
if !ep.isLocal {
if len(ep.providerAddress) != 0 {
policySettings := hcn.ProviderAddressEndpointPolicySetting{
ProviderAddress: ep.providerAddress,
}
policySettingsJson, err := json.Marshal(policySettings)
if err != nil {
return nil, fmt.Errorf("PA Policy creation failed: %v", err)
}
paPolicy := hcn.EndpointPolicy{
Type: hcn.NetworkProviderAddress,
Settings: policySettingsJson,
}
hnsEndpoint.Policies = append(hnsEndpoint.Policies, paPolicy)
}
createdEndpoint, err = hnsNetwork.CreateRemoteEndpoint(hnsEndpoint)
if err != nil {
return nil, fmt.Errorf("Remote endpoint creation failed: %v", err)
}
} else {
createdEndpoint, err = hnsNetwork.CreateEndpoint(hnsEndpoint)
if err != nil {
return nil, fmt.Errorf("Local endpoint creation failed: %v", err)
}
}
return &endpointsInfo{
ip: createdEndpoint.IpConfigurations[0].IpAddress,
isLocal: uint32(createdEndpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0,
macAddress: createdEndpoint.MacAddress,
hnsID: createdEndpoint.Id,
providerAddress: ep.providerAddress, //TODO get from createdEndpoint
hns: hns,
}, nil
}
func (hns hnsV2) deleteEndpoint(hnsID string) error {
hnsendpoint, err := hcn.GetEndpointByID(hnsID)
if err != nil {
return err
}
err = hnsendpoint.Delete()
if err == nil {
klog.V(3).Infof("Remote endpoint resource deleted id %s", hnsID)
}
return err
}
func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
plists, err := hcn.ListLoadBalancers()
if err != nil {
return nil, err
}
for _, plist := range plists {
if len(plist.HostComputeEndpoints) != len(endpoints) {
continue
}
// Validate if input meets any of the policy lists
lbPortMapping := plist.PortMappings[0]
if lbPortMapping.Protocol == uint32(protocol) && lbPortMapping.InternalPort == internalPort && lbPortMapping.ExternalPort == externalPort && (lbPortMapping.Flags&1 != 0) == isILB {
if len(vip) > 0 {
if len(plist.FrontendVIPs) == 0 || plist.FrontendVIPs[0] != vip {
continue
}
}
LogJson(plist, "Found existing Hns loadbalancer policy resource", 1)
return &loadBalancerInfo{
hnsID: plist.Id,
}, nil
}
}
var hnsEndpoints []hcn.HostComputeEndpoint
for _, ep := range endpoints {
endpoint, err := hcn.GetEndpointByID(ep.hnsID)
if err != nil {
return nil, err
}
hnsEndpoints = append(hnsEndpoints, *endpoint)
}
vips := []string{}
if len(vip) > 0 {
vips = append(vips, vip)
}
lb, err := hcn.AddLoadBalancer(
hnsEndpoints,
isILB,
isDSR,
sourceVip,
vips,
protocol,
internalPort,
externalPort,
)
if err != nil {
return nil, err
}
LogJson(lb, "Hns loadbalancer policy resource", 1)
return &loadBalancerInfo{
hnsID: lb.Id,
}, err
}
func (hns hnsV2) deleteLoadBalancer(hnsID string) error {
lb, err := hcn.GetLoadBalancerByID(hnsID)
if err != nil {
// Return silently
return nil
}
err = lb.Delete()
return err
}

View File

@ -0,0 +1,557 @@
// +build windows
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winkernel
import (
"encoding/json"
"github.com/Microsoft/hcsshim/hcn"
"strings"
"testing"
)
const sourceVip = "192.168.1.2"
const serviceVip = "11.0.0.1"
const addressPrefix = "192.168.1.0/24"
const gatewayAddress = "192.168.1.1"
const epMacAddress = "00-11-22-33-44-55"
const epIpAddress = "192.168.1.3"
const epIpAddressRemote = "192.168.2.3"
const epPaAddress = "10.0.0.3"
const protocol = 6
const internalPort = 80
const externalPort = 32440
func TestGetNetworkByName(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testGetNetworkByName(t, hnsV1)
testGetNetworkByName(t, hnsV2)
}
func TestGetEndpointByID(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testGetEndpointByID(t, hnsV1)
testGetEndpointByID(t, hnsV2)
}
func TestGetEndpointByIpAddress(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testGetEndpointByIpAddress(t, hnsV1)
testGetEndpointByIpAddress(t, hnsV2)
}
func TestCreateEndpointLocal(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testCreateEndpointLocal(t, hnsV1)
testCreateEndpointLocal(t, hnsV2)
}
func TestCreateEndpointRemotePA(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testCreateEndpointRemote(t, hnsV1, epPaAddress)
testCreateEndpointRemote(t, hnsV2, epPaAddress)
}
func TestCreateEndpointRemoteNoPA(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testCreateEndpointRemote(t, hnsV1, "")
testCreateEndpointRemote(t, hnsV2, "")
}
func TestDeleteEndpoint(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testDeleteEndpoint(t, hnsV1)
testDeleteEndpoint(t, hnsV2)
}
func TestGetLoadBalancerExisting(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testGetLoadBalancerExisting(t, hnsV1)
testGetLoadBalancerExisting(t, hnsV2)
}
func TestGetLoadBalancerNew(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testGetLoadBalancerNew(t, hnsV1)
testGetLoadBalancerNew(t, hnsV2)
}
func TestDeleteLoadBalancer(t *testing.T) {
hnsV1 := hnsV1{}
hnsV2 := hnsV2{}
testDeleteLoadBalancer(t, hnsV1)
testDeleteLoadBalancer(t, hnsV2)
}
func testGetNetworkByName(t *testing.T, hns HostNetworkService) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
network, err := hns.getNetworkByName(Network.Name)
if err != nil {
t.Error(err)
}
if !strings.EqualFold(network.id, Network.Id) {
t.Errorf("%v does not match %v", network.id, Network.Id)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func testGetEndpointByID(t *testing.T, hns HostNetworkService) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
ipConfig := &hcn.IpConfig{
IpAddress: epIpAddress,
}
Endpoint := &hcn.HostComputeEndpoint{
IpConfigurations: []hcn.IpConfig{*ipConfig},
MacAddress: epMacAddress,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
}
Endpoint, err = Network.CreateEndpoint(Endpoint)
if err != nil {
t.Error(err)
}
endpoint, err := hns.getEndpointByID(Endpoint.Id)
if err != nil {
t.Error(err)
}
if !strings.EqualFold(endpoint.hnsID, Endpoint.Id) {
t.Errorf("%v does not match %v", endpoint.hnsID, Endpoint.Id)
}
err = Endpoint.Delete()
if err != nil {
t.Error(err)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func testGetEndpointByIpAddress(t *testing.T, hns HostNetworkService) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
ipConfig := &hcn.IpConfig{
IpAddress: epIpAddress,
}
Endpoint := &hcn.HostComputeEndpoint{
IpConfigurations: []hcn.IpConfig{*ipConfig},
MacAddress: epMacAddress,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
}
Endpoint, err = Network.CreateEndpoint(Endpoint)
if err != nil {
t.Error(err)
}
endpoint, err := hns.getEndpointByIpAddress(Endpoint.IpConfigurations[0].IpAddress, Network.Name)
if err != nil {
t.Error(err)
}
if !strings.EqualFold(endpoint.hnsID, Endpoint.Id) {
t.Errorf("%v does not match %v", endpoint.hnsID, Endpoint.Id)
}
if endpoint.ip != Endpoint.IpConfigurations[0].IpAddress {
t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress)
}
err = Endpoint.Delete()
if err != nil {
t.Error(err)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func testCreateEndpointLocal(t *testing.T, hns HostNetworkService) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
endpoint := &endpointsInfo{
ip: epIpAddress,
macAddress: epMacAddress,
isLocal: true,
}
endpoint, err = hns.createEndpoint(endpoint, Network.Name)
if err != nil {
t.Error(err)
}
Endpoint, err := hcn.GetEndpointByID(endpoint.hnsID)
if err != nil {
t.Error(err)
}
if !strings.EqualFold(endpoint.hnsID, Endpoint.Id) {
t.Errorf("%v does not match %v", endpoint.hnsID, Endpoint.Id)
}
if endpoint.ip != Endpoint.IpConfigurations[0].IpAddress {
t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress)
}
if endpoint.macAddress != Endpoint.MacAddress {
t.Errorf("%v does not match %v", endpoint.macAddress, Endpoint.MacAddress)
}
err = Endpoint.Delete()
if err != nil {
t.Error(err)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func testCreateEndpointRemote(t *testing.T, hns HostNetworkService, providerAddress string) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
endpoint := &endpointsInfo{
ip: epIpAddressRemote,
macAddress: epMacAddress,
isLocal: false,
providerAddress: providerAddress,
}
endpoint, err = hns.createEndpoint(endpoint, Network.Name)
if err != nil {
t.Error(err)
}
Endpoint, err := hcn.GetEndpointByID(endpoint.hnsID)
if err != nil {
t.Error(err)
}
if !strings.EqualFold(endpoint.hnsID, Endpoint.Id) {
t.Errorf("%v does not match %v", endpoint.hnsID, Endpoint.Id)
}
if endpoint.ip != Endpoint.IpConfigurations[0].IpAddress {
t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress)
}
if endpoint.macAddress != Endpoint.MacAddress {
t.Errorf("%v does not match %v", endpoint.macAddress, Endpoint.MacAddress)
}
if len(providerAddress) != 0 && endpoint.providerAddress != epPaAddress {
t.Errorf("%v does not match %v", endpoint.providerAddress, providerAddress)
}
err = Endpoint.Delete()
if err != nil {
t.Error(err)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func testDeleteEndpoint(t *testing.T, hns HostNetworkService) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
ipConfig := &hcn.IpConfig{
IpAddress: epIpAddress,
}
Endpoint := &hcn.HostComputeEndpoint{
IpConfigurations: []hcn.IpConfig{*ipConfig},
MacAddress: epMacAddress,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
}
Endpoint, err = Network.CreateEndpoint(Endpoint)
if err != nil {
t.Error(err)
}
err = hns.deleteEndpoint(Endpoint.Id)
if err != nil {
t.Error(err)
}
// Endpoint should no longer exist so this should fail
Endpoint, err = hcn.GetEndpointByID(Endpoint.Id)
if err == nil {
t.Error(err)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
ipConfig := &hcn.IpConfig{
IpAddress: epIpAddress,
}
Endpoint := &hcn.HostComputeEndpoint{
IpConfigurations: []hcn.IpConfig{*ipConfig},
MacAddress: epMacAddress,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
}
Endpoint, err = Network.CreateEndpoint(Endpoint)
if err != nil {
t.Error(err)
}
Endpoints := []hcn.HostComputeEndpoint{*Endpoint}
LoadBalancer, err := hcn.AddLoadBalancer(
Endpoints,
false,
false,
sourceVip,
[]string{serviceVip},
protocol,
internalPort,
externalPort,
)
if err != nil {
t.Error(err)
}
endpoint := &endpointsInfo{
ip: Endpoint.IpConfigurations[0].IpAddress,
hnsID: Endpoint.Id,
}
endpoints := []endpointsInfo{*endpoint}
lb, err := hns.getLoadBalancer(endpoints, false, false, sourceVip, serviceVip, protocol, internalPort, externalPort)
if err != nil {
t.Error(err)
}
if !strings.EqualFold(lb.hnsID, LoadBalancer.Id) {
t.Errorf("%v does not match %v", lb.hnsID, LoadBalancer.Id)
}
err = LoadBalancer.Delete()
if err != nil {
t.Error(err)
}
err = Endpoint.Delete()
if err != nil {
t.Error(err)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
ipConfig := &hcn.IpConfig{
IpAddress: epIpAddress,
}
Endpoint := &hcn.HostComputeEndpoint{
IpConfigurations: []hcn.IpConfig{*ipConfig},
MacAddress: epMacAddress,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
}
Endpoint, err = Network.CreateEndpoint(Endpoint)
if err != nil {
t.Error(err)
}
endpoint := &endpointsInfo{
ip: Endpoint.IpConfigurations[0].IpAddress,
hnsID: Endpoint.Id,
}
endpoints := []endpointsInfo{*endpoint}
lb, err := hns.getLoadBalancer(endpoints, false, false, sourceVip, serviceVip, protocol, internalPort, externalPort)
if err != nil {
t.Error(err)
}
LoadBalancer, err := hcn.GetLoadBalancerByID(lb.hnsID)
if err != nil {
t.Error(err)
}
if !strings.EqualFold(lb.hnsID, LoadBalancer.Id) {
t.Errorf("%v does not match %v", lb.hnsID, LoadBalancer.Id)
}
err = LoadBalancer.Delete()
if err != nil {
t.Error(err)
}
err = Endpoint.Delete()
if err != nil {
t.Error(err)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func testDeleteLoadBalancer(t *testing.T, hns HostNetworkService) {
Network, err := createTestNetwork()
if err != nil {
t.Error(err)
}
ipConfig := &hcn.IpConfig{
IpAddress: epIpAddress,
}
Endpoint := &hcn.HostComputeEndpoint{
IpConfigurations: []hcn.IpConfig{*ipConfig},
MacAddress: epMacAddress,
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
}
Endpoint, err = Network.CreateEndpoint(Endpoint)
if err != nil {
t.Error(err)
}
Endpoints := []hcn.HostComputeEndpoint{*Endpoint}
LoadBalancer, err := hcn.AddLoadBalancer(
Endpoints,
false,
false,
sourceVip,
[]string{serviceVip},
protocol,
internalPort,
externalPort,
)
if err != nil {
t.Error(err)
}
err = hns.deleteLoadBalancer(LoadBalancer.Id)
if err != nil {
t.Error(err)
}
// Load balancer should not longer exist
LoadBalancer, err = hcn.GetLoadBalancerByID(LoadBalancer.Id)
if err == nil {
t.Error(err)
}
err = Endpoint.Delete()
if err != nil {
t.Error(err)
}
err = Network.Delete()
if err != nil {
t.Error(err)
}
}
func createTestNetwork() (*hcn.HostComputeNetwork, error) {
network := &hcn.HostComputeNetwork{
Type: "Overlay",
Name: "TestOverlay",
MacPool: hcn.MacPool{
Ranges: []hcn.MacRange{
{
StartMacAddress: "00-15-5D-52-C0-00",
EndMacAddress: "00-15-5D-52-CF-FF",
},
},
},
Ipams: []hcn.Ipam{
{
Type: "Static",
Subnets: []hcn.Subnet{
{
IpAddressPrefix: addressPrefix,
Routes: []hcn.Route{
{
NextHop: gatewayAddress,
DestinationPrefix: "0.0.0.0/0",
},
},
},
},
},
},
SchemaVersion: hcn.SchemaVersion{
Major: 2,
Minor: 0,
},
}
vsid := &hcn.VsidPolicySetting{
IsolationId: 5000,
}
vsidJson, err := json.Marshal(vsid)
if err != nil {
return nil, err
}
sp := &hcn.SubnetPolicy{
Type: hcn.VSID,
}
sp.Settings = vsidJson
spJson, err := json.Marshal(sp)
if err != nil {
return nil, err
}
network.Ipams[0].Subnets[0].Policies = append(network.Ipams[0].Subnets[0].Policies, spJson)
return network.Create()
}

View File

@ -29,6 +29,8 @@ import (
"time"
"github.com/Microsoft/hcsshim"
"github.com/Microsoft/hcsshim/hcn"
"github.com/davecgh/go-spew/spew"
"k8s.io/klog"
@ -40,6 +42,7 @@ import (
apiservice "k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/util/async"
)
@ -82,6 +85,10 @@ type loadBalancerIngressInfo struct {
hnsID string
}
type loadBalancerInfo struct {
hnsID string
}
// internal struct for string service information
type serviceInfo struct {
clusterIP net.IP
@ -100,11 +107,22 @@ type serviceInfo struct {
hnsID string
nodePorthnsID string
policyApplied bool
remoteEndpoint *endpointsInfo
hns HostNetworkService
}
type hnsNetworkInfo struct {
name string
id string
name string
id string
networkType string
remoteSubnets []*remoteSubnetInfo
}
type remoteSubnetInfo struct {
destinationPrefix string
isolationId uint16
providerAddress string
drMacAddress string
}
func Log(v interface{}, message string, level klog.Level) {
@ -120,12 +138,14 @@ func LogJson(v interface{}, message string, level klog.Level) {
// internal struct for endpoints information
type endpointsInfo struct {
ip string
port uint16
isLocal bool
macAddress string
hnsID string
refCount uint16
ip string
port uint16
isLocal bool
macAddress string
hnsID string
refCount uint16
providerAddress string
hns HostNetworkService
}
//Uses mac prefix and IPv4 address to return a mac address
@ -139,7 +159,7 @@ func conjureMac(macPrefix string, ip net.IP) string {
return "02-11-22-33-44-55"
}
func newEndpointInfo(ip string, port uint16, isLocal bool) *endpointsInfo {
func newEndpointInfo(ip string, port uint16, isLocal bool, hns HostNetworkService) *endpointsInfo {
info := &endpointsInfo{
ip: ip,
port: port,
@ -147,6 +167,7 @@ func newEndpointInfo(ip string, port uint16, isLocal bool) *endpointsInfo {
macAddress: conjureMac("02-11", net.ParseIP(ip)),
refCount: 0,
hnsID: "",
hns: hns,
}
return info
@ -160,14 +181,17 @@ func (ep *endpointsInfo) Cleanup() {
// Remove only remote endpoints created by this service
if ep.refCount <= 0 && !ep.isLocal {
klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep)
deleteHnsEndpoint(ep.hnsID)
ep.hnsID = ""
err := ep.hns.deleteEndpoint(ep.hnsID)
if err == nil {
ep.hnsID = ""
} else {
klog.Errorf("Endpoint deletion failed for %v: %v", ep.ip, err)
}
}
}
// returns a new serviceInfo struct
func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service) *serviceInfo {
func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service, hns HostNetworkService) *serviceInfo {
onlyNodeLocalEndpoints := false
if apiservice.RequestsOnlyLocalTraffic(service) {
onlyNodeLocalEndpoints = true
@ -175,8 +199,7 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser
// set default session sticky max age 180min=10800s
stickyMaxAgeSeconds := 10800
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
// Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP && service.Spec.SessionAffinityConfig != nil {
stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds)
}
info := &serviceInfo{
@ -193,6 +216,7 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser
stickyMaxAgeSeconds: stickyMaxAgeSeconds,
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
hns: hns,
}
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
@ -256,17 +280,17 @@ func newEndpointsChangeMap(hostname string) endpointsChangeMap {
}
}
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Endpoints) bool {
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Endpoints, hns HostNetworkService) bool {
ecm.lock.Lock()
defer ecm.lock.Unlock()
change, exists := ecm.items[*namespacedName]
if !exists {
change = &endpointsChange{}
change.previous = endpointsToEndpointsMap(previous, ecm.hostname)
change.previous = endpointsToEndpointsMap(previous, ecm.hostname, hns)
ecm.items[*namespacedName] = change
}
change.current = endpointsToEndpointsMap(current, ecm.hostname)
change.current = endpointsToEndpointsMap(current, ecm.hostname, hns)
if reflect.DeepEqual(change.previous, change.current) {
delete(ecm.items, *namespacedName)
}
@ -279,7 +303,7 @@ func newServiceChangeMap() serviceChangeMap {
}
}
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Service) bool {
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Service, hns HostNetworkService) bool {
scm.lock.Lock()
defer scm.lock.Unlock()
@ -287,10 +311,10 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo
if !exists {
// Service is Added
change = &serviceChange{}
change.previous = serviceToServiceMap(previous)
change.previous = serviceToServiceMap(previous, hns)
scm.items[*namespacedName] = change
}
change.current = serviceToServiceMap(current)
change.current = serviceToServiceMap(current, hns)
if reflect.DeepEqual(change.previous, change.current) {
delete(scm.items, *namespacedName)
}
@ -420,7 +444,11 @@ type Proxier struct {
// precomputing some number of those and cache for future reuse.
precomputedProbabilities []string
network hnsNetworkInfo
hns HostNetworkService
network hnsNetworkInfo
sourceVip string
hostMac string
isDSR bool
}
type localPort struct {
@ -465,6 +493,7 @@ func NewProxier(
nodeIP net.IP,
recorder record.EventRecorder,
healthzServer healthcheck.HealthzUpdater,
config config.KubeProxyWinkernelConfiguration,
) (*Proxier, error) {
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
@ -479,19 +508,75 @@ func NewProxier(
}
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
// TODO : Make this a param
hnsNetworkName := os.Getenv("KUBE_NETWORK")
if len(hnsNetworkName) == 0 {
return nil, fmt.Errorf("Environment variable KUBE_NETWORK not initialized")
var hns HostNetworkService
hns = hnsV1{}
supportedFeatures := hcn.GetSupportedFeatures()
if supportedFeatures.Api.V2 {
hns = hnsV2{}
}
hnsNetwork, err := getHnsNetworkInfo(hnsNetworkName)
hnsNetworkName := config.NetworkName
if len(hnsNetworkName) == 0 {
klog.V(3).Infof("network-name flag not set. Checking environment variable")
hnsNetworkName = os.Getenv("KUBE_NETWORK")
if len(hnsNetworkName) == 0 {
return nil, fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
}
}
hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
if err != nil {
klog.Fatalf("Unable to find Hns Network specified by %s. Please check environment variable KUBE_NETWORK", hnsNetworkName)
klog.Errorf("Unable to find Hns Network specified by %s. Please check environment variable KUBE_NETWORK or network-name flag", hnsNetworkName)
return nil, err
}
klog.V(1).Infof("Hns Network loaded with info = %v", hnsNetworkInfo)
isDSR := config.EnableDSR
err = hcn.DSRSupported()
if isDSR && err != nil {
return nil, err
}
klog.V(1).Infof("Hns Network loaded with info = %v", hnsNetwork)
var sourceVip string
var hostMac string
if hnsNetworkInfo.networkType == "Overlay" {
err = hcn.RemoteSubnetSupported()
if err != nil {
return nil, err
}
sourceVip = config.SourceVip
if len(sourceVip) == 0 {
return nil, fmt.Errorf("source-vip flag not set")
}
interfaces, _ := net.Interfaces() //TODO create interfaces
for _, inter := range interfaces {
addresses, _ := inter.Addrs()
for _, addr := range addresses {
addrIP, _, _ := net.ParseCIDR(addr.String())
if addrIP.String() == nodeIP.String() {
klog.V(2).Infof("Host MAC address is %s", inter.HardwareAddr.String())
hostMac = inter.HardwareAddr.String()
}
}
}
if len(hostMac) == 0 {
return nil, fmt.Errorf("Could not find host mac address for %s", nodeIP)
}
existingSourceVip, _ := hns.getEndpointByIpAddress(sourceVip, hnsNetworkName)
if existingSourceVip == nil {
hnsEndpoint := &endpointsInfo{
ip: sourceVip,
isLocal: true,
macAddress: hostMac,
providerAddress: nodeIP.String(),
}
_, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
return nil, fmt.Errorf("Source Vip endpoint creation failed: %v", err)
}
}
}
proxier := &Proxier{
portsMap: make(map[localPort]closeable),
@ -507,7 +592,11 @@ func NewProxier(
recorder: recorder,
healthChecker: healthChecker,
healthzServer: healthzServer,
network: *hnsNetwork,
hns: hns,
network: *hnsNetworkInfo,
sourceVip: sourceVip,
hostMac: hostMac,
isDSR: isDSR,
}
burstSyncs := 2
@ -536,27 +625,30 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []*endpointsInfo) {
for _, ep := range endpoints {
ep.Cleanup()
}
if svcInfo.remoteEndpoint != nil {
svcInfo.remoteEndpoint.Cleanup()
}
svcInfo.policyApplied = false
}
func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() {
// Remove the Hns Policy corresponding to this service
deleteHnsLoadBalancerPolicy(svcInfo.hnsID)
hns := svcInfo.hns
hns.deleteLoadBalancer(svcInfo.hnsID)
svcInfo.hnsID = ""
deleteHnsLoadBalancerPolicy(svcInfo.nodePorthnsID)
hns.deleteLoadBalancer(svcInfo.nodePorthnsID)
svcInfo.nodePorthnsID = ""
for _, externalIp := range svcInfo.externalIPs {
deleteHnsLoadBalancerPolicy(externalIp.hnsID)
hns.deleteLoadBalancer(externalIp.hnsID)
externalIp.hnsID = ""
}
for _, lbIngressIp := range svcInfo.loadBalancerIngressIPs {
deleteHnsLoadBalancerPolicy(lbIngressIp.hnsID)
hns.deleteLoadBalancer(lbIngressIp.hnsID)
lbIngressIp.hnsID = ""
}
}
func deleteAllHnsLoadBalancerPolicy() {
@ -574,87 +666,6 @@ func deleteAllHnsLoadBalancerPolicy() {
}
// getHnsLoadBalancer returns the LoadBalancer policy resource, if already found.
// If not, it would create one and return
func getHnsLoadBalancer(endpoints []hcsshim.HNSEndpoint, isILB bool, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*hcsshim.PolicyList, error) {
plists, err := hcsshim.HNSListPolicyListRequest()
if err != nil {
return nil, err
}
for _, plist := range plists {
if len(plist.EndpointReferences) != len(endpoints) {
continue
}
// Validate if input meets any of the policy lists
elbPolicy := hcsshim.ELBPolicy{}
if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil {
continue
}
if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == isILB {
if len(vip) > 0 {
if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip {
continue
}
}
LogJson(plist, "Found existing Hns loadbalancer policy resource", 1)
return &plist, nil
}
}
//TODO: sourceVip is not used. If required, expose this as a param
var sourceVip string
lb, err := hcsshim.AddLoadBalancer(
endpoints,
isILB,
sourceVip,
vip,
protocol,
internalPort,
externalPort,
)
if err == nil {
LogJson(lb, "Hns loadbalancer policy resource", 1)
}
return lb, err
}
func deleteHnsLoadBalancerPolicy(hnsID string) {
if len(hnsID) == 0 {
// Return silently
return
}
// Cleanup HNS policies
hnsloadBalancer, err := hcsshim.GetPolicyListByID(hnsID)
if err != nil {
klog.Errorf("%v", err)
return
}
LogJson(hnsloadBalancer, "Removing Policy", 2)
_, err = hnsloadBalancer.Delete()
if err != nil {
klog.Errorf("%v", err)
}
}
func deleteHnsEndpoint(hnsID string) {
hnsendpoint, err := hcsshim.GetHNSEndpointByID(hnsID)
if err != nil {
klog.Errorf("%v", err)
return
}
_, err = hnsendpoint.Delete()
if err != nil {
klog.Errorf("%v", err)
}
klog.V(3).Infof("Remote endpoint resource deleted id %s", hnsID)
}
func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName)
if err != nil {
@ -663,29 +674,12 @@ func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
}
return &hnsNetworkInfo{
id: hnsnetwork.Id,
name: hnsnetwork.Name,
id: hnsnetwork.Id,
name: hnsnetwork.Name,
networkType: hnsnetwork.Type,
}, nil
}
func getHnsEndpointByIpAddress(ip net.IP, networkName string) (*hcsshim.HNSEndpoint, error) {
hnsnetwork, err := hcsshim.GetHNSNetworkByName(networkName)
if err != nil {
klog.Errorf("%v", err)
return nil, err
}
endpoints, err := hcsshim.HNSListEndpointRequest()
for _, endpoint := range endpoints {
equal := reflect.DeepEqual(endpoint.IPAddress, ip)
if equal && endpoint.VirtualNetwork == hnsnetwork.Id {
return &endpoint, nil
}
}
return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName)
}
// Sync is called to synchronize the proxier state to hns as soon as possible.
func (proxier *Proxier) Sync() {
proxier.syncRunner.Run()
@ -714,21 +708,21 @@ func (proxier *Proxier) isInitialized() bool {
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
@ -789,21 +783,21 @@ func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) {
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
@ -867,7 +861,7 @@ func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.S
// This function is used for incremental updated of endpointsMap.
//
// NOTE: endpoints object should NOT be modified.
func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string) proxyEndpointsMap {
func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string, hns HostNetworkService) proxyEndpointsMap {
if endpoints == nil {
return nil
}
@ -880,7 +874,7 @@ func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string) proxyEndp
for i := range ss.Ports {
port := &ss.Ports[i]
if port.Port == 0 {
klog.Warningf("ignoring invalid endpoint port %s", port.Name)
klog.Warningf("Ignoring invalid endpoint port %s", port.Name)
continue
}
svcPortName := proxy.ServicePortName{
@ -890,11 +884,11 @@ func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string) proxyEndp
for i := range ss.Addresses {
addr := &ss.Addresses[i]
if addr.IP == "" {
klog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name)
klog.Warningf("Ignoring invalid endpoint port %s with empty host", port.Name)
continue
}
isLocal := addr.NodeName != nil && *addr.NodeName == hostname
epInfo := newEndpointInfo(addr.IP, uint16(port.Port), isLocal)
epInfo := newEndpointInfo(addr.IP, uint16(port.Port), isLocal, hns)
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo)
}
if klog.V(3) {
@ -912,7 +906,7 @@ func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string) proxyEndp
// Translates single Service object to proxyServiceMap.
//
// NOTE: service object should NOT be modified.
func serviceToServiceMap(service *v1.Service) proxyServiceMap {
func serviceToServiceMap(service *v1.Service, hns HostNetworkService) proxyServiceMap {
if service == nil {
return nil
}
@ -925,7 +919,7 @@ func serviceToServiceMap(service *v1.Service) proxyServiceMap {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service)
serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service, hns)
}
return serviceMap
}
@ -972,12 +966,36 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
var hnsEndpoints []hcsshim.HNSEndpoint
hnsNetworkName := proxier.network.name
hns := proxier.hns
if proxier.network.networkType == "Overlay" {
serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.clusterIP.String(), hnsNetworkName)
if serviceVipEndpoint == nil {
klog.V(4).Infof("No existing remote endpoint for service VIP %v", svcInfo.clusterIP.String())
hnsEndpoint := &endpointsInfo{
ip: svcInfo.clusterIP.String(),
isLocal: false,
macAddress: proxier.hostMac,
providerAddress: proxier.nodeIP.String(),
}
newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.Errorf("Remote endpoint creation failed for service VIP: %v", err)
continue
}
newHnsEndpoint.refCount++
svcInfo.remoteEndpoint = newHnsEndpoint
}
}
var hnsEndpoints []endpointsInfo
klog.V(4).Infof("====Applying Policy for %s====", svcName)
// Create Remote endpoints for every endpoint, corresponding to the service
for _, ep := range proxier.endpointsMap[svcName] {
var newHnsEndpoint *hcsshim.HNSEndpoint
var newHnsEndpoint *endpointsInfo
hnsNetworkName := proxier.network.name
var err error
@ -989,14 +1007,14 @@ func (proxier *Proxier) syncProxyRules() {
}
if len(ep.hnsID) > 0 {
newHnsEndpoint, err = hcsshim.GetHNSEndpointByID(ep.hnsID)
newHnsEndpoint, err = hns.getEndpointByID(ep.hnsID)
}
if newHnsEndpoint == nil {
// First check if an endpoint resource exists for this IP, on the current host
// A Local endpoint could exist here already
// A remote endpoint was already created and proxy was restarted
newHnsEndpoint, err = getHnsEndpointByIpAddress(net.ParseIP(ep.ip), hnsNetworkName)
newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.ip, hnsNetworkName)
}
if newHnsEndpoint == nil {
@ -1004,29 +1022,63 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Local endpoint not found for %v: err: %v on network %s", ep.ip, err, hnsNetworkName)
continue
}
// hns Endpoint resource was not found, create one
hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName)
if err != nil {
klog.Errorf("%v", err)
continue
}
hnsEndpoint := &hcsshim.HNSEndpoint{
MacAddress: ep.macAddress,
IPAddress: net.ParseIP(ep.ip),
}
if proxier.network.networkType == "Overlay" {
klog.Infof("Updating network %v to check for new remote subnet policies", proxier.network.name)
networkName := proxier.network.name
updatedNetwork, err := hns.getNetworkByName(networkName)
if err != nil {
klog.Fatalf("Failed to get network %v: %v", networkName, err)
}
proxier.network = *updatedNetwork
var providerAddress string
for _, rs := range proxier.network.remoteSubnets {
_, ipNet, err := net.ParseCIDR(rs.destinationPrefix)
if err != nil {
klog.Fatalf("%v", err)
}
if ipNet.Contains(net.ParseIP(ep.ip)) {
providerAddress = rs.providerAddress
}
if ep.ip == rs.providerAddress {
providerAddress = rs.providerAddress
}
}
if len(providerAddress) == 0 {
klog.Errorf("Could not find provider address for %s", ep.ip)
continue
}
hnsEndpoint := &endpointsInfo{
ip: ep.ip,
isLocal: false,
macAddress: conjureMac("02-11", net.ParseIP(ep.ip)),
providerAddress: providerAddress,
}
newHnsEndpoint, err = hnsnetwork.CreateRemoteEndpoint(hnsEndpoint)
if err != nil {
klog.Errorf("Remote endpoint creation failed: %v", err)
continue
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.Errorf("Remote endpoint creation failed: %v, %s", err, spew.Sdump(hnsEndpoint))
continue
}
} else {
hnsEndpoint := &endpointsInfo{
ip: ep.ip,
isLocal: false,
macAddress: ep.macAddress,
}
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.Errorf("Remote endpoint creation failed: %v", err)
continue
}
}
}
// Save the hnsId for reference
LogJson(newHnsEndpoint, "Hns Endpoint resource", 1)
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
ep.hnsID = newHnsEndpoint.Id
ep.hnsID = newHnsEndpoint.hnsID
ep.refCount++
Log(ep, "Endpoint resource found", 3)
}
@ -1044,11 +1096,13 @@ func (proxier *Proxier) syncProxyRules() {
}
klog.V(4).Infof("Trying to Apply Policies for service %s", spew.Sdump(svcInfo))
var hnsLoadBalancer *hcsshim.PolicyList
var hnsLoadBalancer *loadBalancerInfo
hnsLoadBalancer, err := getHnsLoadBalancer(
hnsLoadBalancer, err := hns.getLoadBalancer(
hnsEndpoints,
false,
proxier.isDSR,
proxier.sourceVip,
svcInfo.clusterIP.String(),
Enum(svcInfo.protocol),
uint16(svcInfo.targetPort),
@ -1059,15 +1113,17 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
svcInfo.hnsID = hnsLoadBalancer.ID
klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.ID)
svcInfo.hnsID = hnsLoadBalancer.hnsID
klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID)
// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
if svcInfo.nodePort > 0 {
hnsLoadBalancer, err := getHnsLoadBalancer(
hnsLoadBalancer, err := hns.getLoadBalancer(
hnsEndpoints,
false,
"", // VIP has to be empty to automatically select the nodeIP
false,
proxier.sourceVip,
"",
Enum(svcInfo.protocol),
uint16(svcInfo.targetPort),
uint16(svcInfo.nodePort),
@ -1077,16 +1133,18 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
svcInfo.nodePorthnsID = hnsLoadBalancer.ID
klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.ID)
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID)
}
// Create a Load Balancer Policy for each external IP
for _, externalIp := range svcInfo.externalIPs {
// Try loading existing policies, if already available
hnsLoadBalancer, err := getHnsLoadBalancer(
hnsLoadBalancer, err = hns.getLoadBalancer(
hnsEndpoints,
false,
false,
proxier.sourceVip,
externalIp.ip,
Enum(svcInfo.protocol),
uint16(svcInfo.targetPort),
@ -1096,15 +1154,17 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Policy creation failed: %v", err)
continue
}
externalIp.hnsID = hnsLoadBalancer.ID
klog.V(3).Infof("Hns LoadBalancer resource created for externalIp resources %v, Id[%s]", externalIp, hnsLoadBalancer.ID)
externalIp.hnsID = hnsLoadBalancer.hnsID
klog.V(3).Infof("Hns LoadBalancer resource created for externalIp resources %v, Id[%s]", externalIp, hnsLoadBalancer.hnsID)
}
// Create a Load Balancer Policy for each loadbalancer ingress
for _, lbIngressIp := range svcInfo.loadBalancerIngressIPs {
// Try loading existing policies, if already available
hnsLoadBalancer, err := getHnsLoadBalancer(
hnsLoadBalancer, err := hns.getLoadBalancer(
hnsEndpoints,
false,
false,
proxier.sourceVip,
lbIngressIp.ip,
Enum(svcInfo.protocol),
uint16(svcInfo.targetPort),
@ -1114,7 +1174,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.Errorf("Policy creation failed: %v", err)
continue
}
lbIngressIp.hnsID = hnsLoadBalancer.ID
lbIngressIp.hnsID = hnsLoadBalancer.hnsID
klog.V(3).Infof("Hns LoadBalancer resource created for loadBalancer Ingress resources %v", lbIngressIp)
}
svcInfo.policyApplied = true

View File

@ -0,0 +1,379 @@
// +build windows
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winkernel
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/proxy"
"net"
"strings"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const testHostName = "test-hostname"
const macAddress = "00-11-22-33-44-55"
const clusterCIDR = "192.168.1.0/24"
const destinationPrefix = "192.168.2.0/24"
const providerAddress = "10.0.0.3"
const guid = "123ABC"
type fakeHealthChecker struct {
services map[types.NamespacedName]uint16
endpoints map[types.NamespacedName]int
}
func newFakeHealthChecker() *fakeHealthChecker {
return &fakeHealthChecker{
services: map[types.NamespacedName]uint16{},
endpoints: map[types.NamespacedName]int{},
}
}
func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error {
fake.services = newServices
return nil
}
func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error {
fake.endpoints = newEndpoints
return nil
}
type fakeHNS struct{}
func newFakeHNS() *fakeHNS {
return &fakeHNS{}
}
func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) {
var remoteSubnets []*remoteSubnetInfo
rs := &remoteSubnetInfo{
destinationPrefix: destinationPrefix,
isolationId: 4096,
providerAddress: providerAddress,
drMacAddress: macAddress,
}
remoteSubnets = append(remoteSubnets, rs)
return &hnsNetworkInfo{
id: strings.ToUpper(guid),
name: name,
networkType: "Overlay",
remoteSubnets: remoteSubnets,
}, nil
}
func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) {
return nil, nil
}
func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) {
_, ipNet, _ := net.ParseCIDR(destinationPrefix)
if ipNet.Contains(net.ParseIP(ip)) {
return &endpointsInfo{
ip: ip,
isLocal: true,
macAddress: macAddress,
hnsID: guid,
hns: hns,
}, nil
}
return nil, nil
}
func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) {
return &endpointsInfo{
ip: ep.ip,
isLocal: ep.isLocal,
macAddress: ep.macAddress,
hnsID: guid,
hns: hns,
}, nil
}
func (hns fakeHNS) deleteEndpoint(hnsID string) error {
return nil
}
func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) {
return &loadBalancerInfo{
hnsID: guid,
}, nil
}
func (hns fakeHNS) deleteLoadBalancer(hnsID string) error {
return nil
}
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier {
sourceVip := "192.168.1.2"
hnsNetworkInfo := &hnsNetworkInfo{
name: "TestNetwork",
networkType: networkType,
}
proxier := &Proxier{
portsMap: make(map[localPort]closeable),
serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(hostname),
clusterCIDR: clusterCIDR,
hostname: testHostName,
nodeIP: nodeIP,
healthChecker: newFakeHealthChecker(),
network: *hnsNetworkInfo,
sourceVip: sourceVip,
hostMac: macAddress,
isDSR: false,
hns: newFakeHNS(),
}
return proxier
}
func TestCreateServiceVip(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay")
if proxier == nil {
t.Error()
}
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
svcExternalIPs := "50.60.70.81"
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
}
timeoutSeconds := v1.DefaultClientIPServiceAffinitySeconds
makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP
svc.Spec.ExternalIPs = []string{svcExternalIPs}
svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
ClientIP: &v1.ClientIPConfig{
TimeoutSeconds: &timeoutSeconds,
},
}
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
}),
)
makeEndpointsMap(proxier)
proxier.syncProxyRules()
if proxier.serviceMap[svcPortName].remoteEndpoint == nil {
t.Error()
}
if proxier.serviceMap[svcPortName].remoteEndpoint.ip != svcIP {
t.Error()
}
}
func TestCreateRemoteEndpointOverlay(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay")
if proxier == nil {
t.Error()
}
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
}
makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
}},
}}
}),
)
proxier.syncProxyRules()
if proxier.endpointsMap[svcPortName][0].hnsID != guid {
t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid)
}
}
func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge")
if proxier == nil {
t.Error()
}
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
}
makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
}},
}}
}),
)
proxier.syncProxyRules()
if proxier.endpointsMap[svcPortName][0].hnsID != guid {
t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid)
}
}
func TestCreateLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay")
if proxier == nil {
t.Error()
}
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
}
makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
NodePort: int32(svcNodePort),
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
}},
}}
}),
)
proxier.syncProxyRules()
if proxier.serviceMap[svcPortName].hnsID != guid {
t.Errorf("%v does not match %v", proxier.serviceMap[svcPortName].hnsID, guid)
}
}
func makeNSN(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name}
}
func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
for i := range allServices {
proxier.OnServiceAdd(allServices[i])
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.servicesSynced = true
}
func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service {
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: map[string]string{},
},
Spec: v1.ServiceSpec{},
Status: v1.ServiceStatus{},
}
svcFunc(svc)
return svc
}
func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
for i := range allEndpoints {
proxier.OnEndpointsAdd(allEndpoints[i])
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
}
func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
ept := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
eptFunc(ept)
return ept
}