diff --git a/pkg/proxy/winkernel/BUILD b/pkg/proxy/winkernel/BUILD index 4d6b5e0f88..3c8c31a02b 100644 --- a/pkg/proxy/winkernel/BUILD +++ b/pkg/proxy/winkernel/BUILD @@ -1,8 +1,10 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "hnsV1.go", + "hnsV2.go", "metrics.go", "proxier.go", ], @@ -15,6 +17,7 @@ go_library( "//pkg/api/v1/service:go_default_library", "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/apis/config:go_default_library", "//pkg/proxy/healthcheck:go_default_library", "//pkg/util/async:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -23,6 +26,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//vendor/github.com/Microsoft/hcsshim:go_default_library", + "//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library", "//vendor/github.com/davecgh/go-spew/spew:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -43,3 +47,22 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) + +go_test( + name = "go_default_test", + srcs = [ + "hns_test.go", + "proxier_test.go", + ], + embed = [":go_default_library"], + deps = select({ + "@io_bazel_rules_go//go/platform:windows": [ + "//pkg/proxy:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library", + ], + "//conditions:default": [], + }), +) diff --git a/pkg/proxy/winkernel/hnsV1.go b/pkg/proxy/winkernel/hnsV1.go new file mode 100644 index 0000000000..19aa4ecd0b --- /dev/null +++ b/pkg/proxy/winkernel/hnsV1.go @@ -0,0 +1,225 @@ +// +build windows + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winkernel + +import ( + "encoding/json" + "fmt" + "github.com/Microsoft/hcsshim" + "k8s.io/klog" + "net" + "strings" +) + +type HostNetworkService interface { + getNetworkByName(name string) (*hnsNetworkInfo, error) + getEndpointByID(id string) (*endpointsInfo, error) + getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) + createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) + deleteEndpoint(hnsID string) error + getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) + deleteLoadBalancer(hnsID string) error +} + +// V1 HNS API +type hnsV1 struct{} + +func (hns hnsV1) getNetworkByName(name string) (*hnsNetworkInfo, error) { + hnsnetwork, err := hcsshim.GetHNSNetworkByName(name) + if err != nil { + klog.Errorf("%v", err) + return nil, err + } + + return &hnsNetworkInfo{ + id: hnsnetwork.Id, + name: hnsnetwork.Name, + networkType: hnsnetwork.Type, + }, nil +} +func (hns hnsV1) getEndpointByID(id string) (*endpointsInfo, error) { + hnsendpoint, err := hcsshim.GetHNSEndpointByID(id) + if err != nil { + klog.Errorf("%v", err) + return nil, err + } + return &endpointsInfo{ + ip: hnsendpoint.IPAddress.String(), + isLocal: !hnsendpoint.IsRemoteEndpoint, //TODO: Change isLocal to isRemote + macAddress: hnsendpoint.MacAddress, + hnsID: hnsendpoint.Id, + hns: hns, + }, nil +} +func (hns hnsV1) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) { + hnsnetwork, err := hcsshim.GetHNSNetworkByName(networkName) + if err != nil { + klog.Errorf("%v", err) + return nil, err + } + + endpoints, err := hcsshim.HNSListEndpointRequest() + for _, endpoint := range endpoints { + equal := false + if endpoint.IPAddress != nil { + equal = endpoint.IPAddress.String() == ip + } + if equal && strings.EqualFold(endpoint.VirtualNetwork, hnsnetwork.Id) { + return &endpointsInfo{ + ip: endpoint.IPAddress.String(), + isLocal: !endpoint.IsRemoteEndpoint, + macAddress: endpoint.MacAddress, + hnsID: endpoint.Id, + hns: hns, + }, nil + } + } + + return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName) +} +func (hns hnsV1) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) { + hnsNetwork, err := hcsshim.GetHNSNetworkByName(networkName) + if err != nil { + return nil, fmt.Errorf("Could not find network %s: %v", networkName, err) + } + hnsEndpoint := &hcsshim.HNSEndpoint{ + MacAddress: ep.macAddress, + IPAddress: net.ParseIP(ep.ip), + } + + var createdEndpoint *hcsshim.HNSEndpoint + if !ep.isLocal { + if len(ep.providerAddress) != 0 { + paPolicy := hcsshim.PaPolicy{ + Type: hcsshim.PA, + PA: ep.providerAddress, + } + paPolicyJson, err := json.Marshal(paPolicy) + if err != nil { + return nil, fmt.Errorf("PA Policy creation failed: %v", err) + } + hnsEndpoint.Policies = append(hnsEndpoint.Policies, paPolicyJson) + } + createdEndpoint, err = hnsNetwork.CreateRemoteEndpoint(hnsEndpoint) + if err != nil { + return nil, fmt.Errorf("Remote endpoint creation failed: %v", err) + } + + } else { + createdEndpoint, err = hnsNetwork.CreateEndpoint(hnsEndpoint) + if err != nil { + return nil, fmt.Errorf("Local endpoint creation failed: %v", err) + } + } + return &endpointsInfo{ + ip: createdEndpoint.IPAddress.String(), + isLocal: createdEndpoint.IsRemoteEndpoint, + macAddress: createdEndpoint.MacAddress, + hnsID: createdEndpoint.Id, + providerAddress: ep.providerAddress, //TODO get from createdEndpoint + hns: hns, + }, nil +} +func (hns hnsV1) deleteEndpoint(hnsID string) error { + hnsendpoint, err := hcsshim.GetHNSEndpointByID(hnsID) + if err != nil { + return err + } + _, err = hnsendpoint.Delete() + if err == nil { + klog.V(3).Infof("Remote endpoint resource deleted id %s", hnsID) + } + return err +} + +func (hns hnsV1) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { + plists, err := hcsshim.HNSListPolicyListRequest() + if err != nil { + return nil, err + } + + if isDSR { + klog.V(3).Info("DSR is not supported in V1. Using non DSR instead") + } + + for _, plist := range plists { + if len(plist.EndpointReferences) != len(endpoints) { + continue + } + // Validate if input meets any of the policy lists + elbPolicy := hcsshim.ELBPolicy{} + if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil { + continue + } + if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == isILB { + if len(vip) > 0 { + if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip { + continue + } + } + LogJson(plist, "Found existing Hns loadbalancer policy resource", 1) + return &loadBalancerInfo{ + hnsID: plist.ID, + }, nil + } + } + + var hnsEndpoints []hcsshim.HNSEndpoint + for _, ep := range endpoints { + endpoint, err := hcsshim.GetHNSEndpointByID(ep.hnsID) + if err != nil { + return nil, err + } + hnsEndpoints = append(hnsEndpoints, *endpoint) + } + lb, err := hcsshim.AddLoadBalancer( + hnsEndpoints, + isILB, + sourceVip, + vip, + protocol, + internalPort, + externalPort, + ) + + if err == nil { + LogJson(lb, "Hns loadbalancer policy resource", 1) + } else { + return nil, err + } + return &loadBalancerInfo{ + hnsID: lb.ID, + }, err +} +func (hns hnsV1) deleteLoadBalancer(hnsID string) error { + if len(hnsID) == 0 { + // Return silently + return nil + } + + // Cleanup HNS policies + hnsloadBalancer, err := hcsshim.GetPolicyListByID(hnsID) + if err != nil { + return err + } + LogJson(hnsloadBalancer, "Removing Policy", 2) + + _, err = hnsloadBalancer.Delete() + return err +} diff --git a/pkg/proxy/winkernel/hnsV2.go b/pkg/proxy/winkernel/hnsV2.go new file mode 100644 index 0000000000..43872095d2 --- /dev/null +++ b/pkg/proxy/winkernel/hnsV2.go @@ -0,0 +1,239 @@ +// +build windows + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winkernel + +import ( + "encoding/json" + "fmt" + + "github.com/Microsoft/hcsshim/hcn" + "k8s.io/klog" + + "strings" +) + +type hnsV2 struct{} + +func (hns hnsV2) getNetworkByName(name string) (*hnsNetworkInfo, error) { + hnsnetwork, err := hcn.GetNetworkByName(name) + if err != nil { + klog.Errorf("%v", err) + return nil, err + } + + var remoteSubnets []*remoteSubnetInfo + for _, policy := range hnsnetwork.Policies { + if policy.Type == hcn.RemoteSubnetRoute { + policySettings := hcn.RemoteSubnetRoutePolicySetting{} + err = json.Unmarshal(policy.Settings, &policySettings) + if err != nil { + return nil, fmt.Errorf("Failed to unmarshal Remote Subnet policy settings") + } + rs := &remoteSubnetInfo{ + destinationPrefix: policySettings.DestinationPrefix, + isolationId: policySettings.IsolationId, + providerAddress: policySettings.ProviderAddress, + drMacAddress: policySettings.DistributedRouterMacAddress, + } + remoteSubnets = append(remoteSubnets, rs) + } + } + + return &hnsNetworkInfo{ + id: hnsnetwork.Id, + name: hnsnetwork.Name, + networkType: string(hnsnetwork.Type), + remoteSubnets: remoteSubnets, + }, nil +} +func (hns hnsV2) getEndpointByID(id string) (*endpointsInfo, error) { + hnsendpoint, err := hcn.GetEndpointByID(id) + if err != nil { + return nil, err + } + return &endpointsInfo{ //TODO: fill out PA + ip: hnsendpoint.IpConfigurations[0].IpAddress, + isLocal: uint32(hnsendpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote + macAddress: hnsendpoint.MacAddress, + hnsID: hnsendpoint.Id, + hns: hns, + }, nil +} +func (hns hnsV2) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) { + hnsnetwork, err := hcn.GetNetworkByName(networkName) + if err != nil { + klog.Errorf("%v", err) + return nil, err + } + + endpoints, err := hcn.ListEndpoints() + for _, endpoint := range endpoints { + equal := false + if endpoint.IpConfigurations != nil && len(endpoint.IpConfigurations) > 0 { + equal = endpoint.IpConfigurations[0].IpAddress == ip + } + if equal && strings.EqualFold(endpoint.HostComputeNetwork, hnsnetwork.Id) { + return &endpointsInfo{ + ip: endpoint.IpConfigurations[0].IpAddress, + isLocal: uint32(endpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, //TODO: Change isLocal to isRemote + macAddress: endpoint.MacAddress, + hnsID: endpoint.Id, + hns: hns, + }, nil + } + } + + return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName) +} +func (hns hnsV2) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) { + hnsNetwork, err := hcn.GetNetworkByName(networkName) + if err != nil { + return nil, fmt.Errorf("Could not find network %s: %v", networkName, err) + } + var flags hcn.EndpointFlags + if !ep.isLocal { + flags |= hcn.EndpointFlagsRemoteEndpoint + } + ipConfig := &hcn.IpConfig{ + IpAddress: ep.ip, + } + hnsEndpoint := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfig}, + MacAddress: ep.macAddress, + Flags: flags, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + + var createdEndpoint *hcn.HostComputeEndpoint + if !ep.isLocal { + if len(ep.providerAddress) != 0 { + policySettings := hcn.ProviderAddressEndpointPolicySetting{ + ProviderAddress: ep.providerAddress, + } + policySettingsJson, err := json.Marshal(policySettings) + if err != nil { + return nil, fmt.Errorf("PA Policy creation failed: %v", err) + } + paPolicy := hcn.EndpointPolicy{ + Type: hcn.NetworkProviderAddress, + Settings: policySettingsJson, + } + hnsEndpoint.Policies = append(hnsEndpoint.Policies, paPolicy) + } + createdEndpoint, err = hnsNetwork.CreateRemoteEndpoint(hnsEndpoint) + if err != nil { + return nil, fmt.Errorf("Remote endpoint creation failed: %v", err) + } + } else { + createdEndpoint, err = hnsNetwork.CreateEndpoint(hnsEndpoint) + if err != nil { + return nil, fmt.Errorf("Local endpoint creation failed: %v", err) + } + } + return &endpointsInfo{ + ip: createdEndpoint.IpConfigurations[0].IpAddress, + isLocal: uint32(createdEndpoint.Flags&hcn.EndpointFlagsRemoteEndpoint) == 0, + macAddress: createdEndpoint.MacAddress, + hnsID: createdEndpoint.Id, + providerAddress: ep.providerAddress, //TODO get from createdEndpoint + hns: hns, + }, nil +} +func (hns hnsV2) deleteEndpoint(hnsID string) error { + hnsendpoint, err := hcn.GetEndpointByID(hnsID) + if err != nil { + return err + } + err = hnsendpoint.Delete() + if err == nil { + klog.V(3).Infof("Remote endpoint resource deleted id %s", hnsID) + } + return err +} +func (hns hnsV2) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { + plists, err := hcn.ListLoadBalancers() + if err != nil { + return nil, err + } + + for _, plist := range plists { + if len(plist.HostComputeEndpoints) != len(endpoints) { + continue + } + // Validate if input meets any of the policy lists + lbPortMapping := plist.PortMappings[0] + if lbPortMapping.Protocol == uint32(protocol) && lbPortMapping.InternalPort == internalPort && lbPortMapping.ExternalPort == externalPort && (lbPortMapping.Flags&1 != 0) == isILB { + if len(vip) > 0 { + if len(plist.FrontendVIPs) == 0 || plist.FrontendVIPs[0] != vip { + continue + } + } + LogJson(plist, "Found existing Hns loadbalancer policy resource", 1) + return &loadBalancerInfo{ + hnsID: plist.Id, + }, nil + } + } + + var hnsEndpoints []hcn.HostComputeEndpoint + for _, ep := range endpoints { + endpoint, err := hcn.GetEndpointByID(ep.hnsID) + if err != nil { + return nil, err + } + hnsEndpoints = append(hnsEndpoints, *endpoint) + } + + vips := []string{} + if len(vip) > 0 { + vips = append(vips, vip) + } + lb, err := hcn.AddLoadBalancer( + hnsEndpoints, + isILB, + isDSR, + sourceVip, + vips, + protocol, + internalPort, + externalPort, + ) + if err != nil { + return nil, err + } + + LogJson(lb, "Hns loadbalancer policy resource", 1) + + return &loadBalancerInfo{ + hnsID: lb.Id, + }, err +} +func (hns hnsV2) deleteLoadBalancer(hnsID string) error { + lb, err := hcn.GetLoadBalancerByID(hnsID) + if err != nil { + // Return silently + return nil + } + + err = lb.Delete() + return err +} diff --git a/pkg/proxy/winkernel/hns_test.go b/pkg/proxy/winkernel/hns_test.go new file mode 100644 index 0000000000..0829291015 --- /dev/null +++ b/pkg/proxy/winkernel/hns_test.go @@ -0,0 +1,557 @@ +// +build windows + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winkernel + +import ( + "encoding/json" + + "github.com/Microsoft/hcsshim/hcn" + + "strings" + "testing" +) + +const sourceVip = "192.168.1.2" +const serviceVip = "11.0.0.1" +const addressPrefix = "192.168.1.0/24" +const gatewayAddress = "192.168.1.1" +const epMacAddress = "00-11-22-33-44-55" +const epIpAddress = "192.168.1.3" +const epIpAddressRemote = "192.168.2.3" +const epPaAddress = "10.0.0.3" +const protocol = 6 +const internalPort = 80 +const externalPort = 32440 + +func TestGetNetworkByName(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testGetNetworkByName(t, hnsV1) + testGetNetworkByName(t, hnsV2) +} +func TestGetEndpointByID(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testGetEndpointByID(t, hnsV1) + testGetEndpointByID(t, hnsV2) +} +func TestGetEndpointByIpAddress(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testGetEndpointByIpAddress(t, hnsV1) + testGetEndpointByIpAddress(t, hnsV2) +} +func TestCreateEndpointLocal(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testCreateEndpointLocal(t, hnsV1) + testCreateEndpointLocal(t, hnsV2) +} +func TestCreateEndpointRemotePA(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testCreateEndpointRemote(t, hnsV1, epPaAddress) + testCreateEndpointRemote(t, hnsV2, epPaAddress) +} +func TestCreateEndpointRemoteNoPA(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testCreateEndpointRemote(t, hnsV1, "") + testCreateEndpointRemote(t, hnsV2, "") +} +func TestDeleteEndpoint(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testDeleteEndpoint(t, hnsV1) + testDeleteEndpoint(t, hnsV2) +} +func TestGetLoadBalancerExisting(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testGetLoadBalancerExisting(t, hnsV1) + testGetLoadBalancerExisting(t, hnsV2) +} +func TestGetLoadBalancerNew(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testGetLoadBalancerNew(t, hnsV1) + testGetLoadBalancerNew(t, hnsV2) +} +func TestDeleteLoadBalancer(t *testing.T) { + hnsV1 := hnsV1{} + hnsV2 := hnsV2{} + + testDeleteLoadBalancer(t, hnsV1) + testDeleteLoadBalancer(t, hnsV2) +} +func testGetNetworkByName(t *testing.T, hns HostNetworkService) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + network, err := hns.getNetworkByName(Network.Name) + if err != nil { + t.Error(err) + } + + if !strings.EqualFold(network.id, Network.Id) { + t.Errorf("%v does not match %v", network.id, Network.Id) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} +func testGetEndpointByID(t *testing.T, hns HostNetworkService) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + ipConfig := &hcn.IpConfig{ + IpAddress: epIpAddress, + } + Endpoint := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfig}, + MacAddress: epMacAddress, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + + Endpoint, err = Network.CreateEndpoint(Endpoint) + if err != nil { + t.Error(err) + } + + endpoint, err := hns.getEndpointByID(Endpoint.Id) + if err != nil { + t.Error(err) + } + if !strings.EqualFold(endpoint.hnsID, Endpoint.Id) { + t.Errorf("%v does not match %v", endpoint.hnsID, Endpoint.Id) + } + + err = Endpoint.Delete() + if err != nil { + t.Error(err) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} +func testGetEndpointByIpAddress(t *testing.T, hns HostNetworkService) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + ipConfig := &hcn.IpConfig{ + IpAddress: epIpAddress, + } + Endpoint := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfig}, + MacAddress: epMacAddress, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + Endpoint, err = Network.CreateEndpoint(Endpoint) + if err != nil { + t.Error(err) + } + + endpoint, err := hns.getEndpointByIpAddress(Endpoint.IpConfigurations[0].IpAddress, Network.Name) + if err != nil { + t.Error(err) + } + if !strings.EqualFold(endpoint.hnsID, Endpoint.Id) { + t.Errorf("%v does not match %v", endpoint.hnsID, Endpoint.Id) + } + if endpoint.ip != Endpoint.IpConfigurations[0].IpAddress { + t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress) + } + + err = Endpoint.Delete() + if err != nil { + t.Error(err) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} +func testCreateEndpointLocal(t *testing.T, hns HostNetworkService) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + endpoint := &endpointsInfo{ + ip: epIpAddress, + macAddress: epMacAddress, + isLocal: true, + } + + endpoint, err = hns.createEndpoint(endpoint, Network.Name) + if err != nil { + t.Error(err) + } + Endpoint, err := hcn.GetEndpointByID(endpoint.hnsID) + if err != nil { + t.Error(err) + } + if !strings.EqualFold(endpoint.hnsID, Endpoint.Id) { + t.Errorf("%v does not match %v", endpoint.hnsID, Endpoint.Id) + } + if endpoint.ip != Endpoint.IpConfigurations[0].IpAddress { + t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress) + } + if endpoint.macAddress != Endpoint.MacAddress { + t.Errorf("%v does not match %v", endpoint.macAddress, Endpoint.MacAddress) + } + + err = Endpoint.Delete() + if err != nil { + t.Error(err) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} +func testCreateEndpointRemote(t *testing.T, hns HostNetworkService, providerAddress string) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + endpoint := &endpointsInfo{ + ip: epIpAddressRemote, + macAddress: epMacAddress, + isLocal: false, + providerAddress: providerAddress, + } + + endpoint, err = hns.createEndpoint(endpoint, Network.Name) + if err != nil { + t.Error(err) + } + Endpoint, err := hcn.GetEndpointByID(endpoint.hnsID) + if err != nil { + t.Error(err) + } + if !strings.EqualFold(endpoint.hnsID, Endpoint.Id) { + t.Errorf("%v does not match %v", endpoint.hnsID, Endpoint.Id) + } + if endpoint.ip != Endpoint.IpConfigurations[0].IpAddress { + t.Errorf("%v does not match %v", endpoint.ip, Endpoint.IpConfigurations[0].IpAddress) + } + if endpoint.macAddress != Endpoint.MacAddress { + t.Errorf("%v does not match %v", endpoint.macAddress, Endpoint.MacAddress) + } + if len(providerAddress) != 0 && endpoint.providerAddress != epPaAddress { + t.Errorf("%v does not match %v", endpoint.providerAddress, providerAddress) + } + + err = Endpoint.Delete() + if err != nil { + t.Error(err) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} +func testDeleteEndpoint(t *testing.T, hns HostNetworkService) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + ipConfig := &hcn.IpConfig{ + IpAddress: epIpAddress, + } + Endpoint := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfig}, + MacAddress: epMacAddress, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + Endpoint, err = Network.CreateEndpoint(Endpoint) + if err != nil { + t.Error(err) + } + err = hns.deleteEndpoint(Endpoint.Id) + if err != nil { + t.Error(err) + } + // Endpoint should no longer exist so this should fail + Endpoint, err = hcn.GetEndpointByID(Endpoint.Id) + if err == nil { + t.Error(err) + } + + err = Network.Delete() + if err != nil { + t.Error(err) + } +} + +func testGetLoadBalancerExisting(t *testing.T, hns HostNetworkService) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + ipConfig := &hcn.IpConfig{ + IpAddress: epIpAddress, + } + Endpoint := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfig}, + MacAddress: epMacAddress, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + Endpoint, err = Network.CreateEndpoint(Endpoint) + if err != nil { + t.Error(err) + } + + Endpoints := []hcn.HostComputeEndpoint{*Endpoint} + LoadBalancer, err := hcn.AddLoadBalancer( + Endpoints, + false, + false, + sourceVip, + []string{serviceVip}, + protocol, + internalPort, + externalPort, + ) + if err != nil { + t.Error(err) + } + + endpoint := &endpointsInfo{ + ip: Endpoint.IpConfigurations[0].IpAddress, + hnsID: Endpoint.Id, + } + endpoints := []endpointsInfo{*endpoint} + lb, err := hns.getLoadBalancer(endpoints, false, false, sourceVip, serviceVip, protocol, internalPort, externalPort) + if err != nil { + t.Error(err) + } + + if !strings.EqualFold(lb.hnsID, LoadBalancer.Id) { + t.Errorf("%v does not match %v", lb.hnsID, LoadBalancer.Id) + } + + err = LoadBalancer.Delete() + if err != nil { + t.Error(err) + } + err = Endpoint.Delete() + if err != nil { + t.Error(err) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} +func testGetLoadBalancerNew(t *testing.T, hns HostNetworkService) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + ipConfig := &hcn.IpConfig{ + IpAddress: epIpAddress, + } + Endpoint := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfig}, + MacAddress: epMacAddress, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + Endpoint, err = Network.CreateEndpoint(Endpoint) + if err != nil { + t.Error(err) + } + endpoint := &endpointsInfo{ + ip: Endpoint.IpConfigurations[0].IpAddress, + hnsID: Endpoint.Id, + } + endpoints := []endpointsInfo{*endpoint} + lb, err := hns.getLoadBalancer(endpoints, false, false, sourceVip, serviceVip, protocol, internalPort, externalPort) + if err != nil { + t.Error(err) + } + LoadBalancer, err := hcn.GetLoadBalancerByID(lb.hnsID) + if err != nil { + t.Error(err) + } + if !strings.EqualFold(lb.hnsID, LoadBalancer.Id) { + t.Errorf("%v does not match %v", lb.hnsID, LoadBalancer.Id) + } + err = LoadBalancer.Delete() + if err != nil { + t.Error(err) + } + + err = Endpoint.Delete() + if err != nil { + t.Error(err) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} +func testDeleteLoadBalancer(t *testing.T, hns HostNetworkService) { + Network, err := createTestNetwork() + if err != nil { + t.Error(err) + } + + ipConfig := &hcn.IpConfig{ + IpAddress: epIpAddress, + } + Endpoint := &hcn.HostComputeEndpoint{ + IpConfigurations: []hcn.IpConfig{*ipConfig}, + MacAddress: epMacAddress, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + Endpoint, err = Network.CreateEndpoint(Endpoint) + if err != nil { + t.Error(err) + } + + Endpoints := []hcn.HostComputeEndpoint{*Endpoint} + LoadBalancer, err := hcn.AddLoadBalancer( + Endpoints, + false, + false, + sourceVip, + []string{serviceVip}, + protocol, + internalPort, + externalPort, + ) + if err != nil { + t.Error(err) + } + err = hns.deleteLoadBalancer(LoadBalancer.Id) + if err != nil { + t.Error(err) + } + // Load balancer should not longer exist + LoadBalancer, err = hcn.GetLoadBalancerByID(LoadBalancer.Id) + if err == nil { + t.Error(err) + } + + err = Endpoint.Delete() + if err != nil { + t.Error(err) + } + err = Network.Delete() + if err != nil { + t.Error(err) + } +} +func createTestNetwork() (*hcn.HostComputeNetwork, error) { + network := &hcn.HostComputeNetwork{ + Type: "Overlay", + Name: "TestOverlay", + MacPool: hcn.MacPool{ + Ranges: []hcn.MacRange{ + { + StartMacAddress: "00-15-5D-52-C0-00", + EndMacAddress: "00-15-5D-52-CF-FF", + }, + }, + }, + Ipams: []hcn.Ipam{ + { + Type: "Static", + Subnets: []hcn.Subnet{ + { + IpAddressPrefix: addressPrefix, + Routes: []hcn.Route{ + { + NextHop: gatewayAddress, + DestinationPrefix: "0.0.0.0/0", + }, + }, + }, + }, + }, + }, + SchemaVersion: hcn.SchemaVersion{ + Major: 2, + Minor: 0, + }, + } + + vsid := &hcn.VsidPolicySetting{ + IsolationId: 5000, + } + vsidJson, err := json.Marshal(vsid) + if err != nil { + return nil, err + } + + sp := &hcn.SubnetPolicy{ + Type: hcn.VSID, + } + sp.Settings = vsidJson + + spJson, err := json.Marshal(sp) + if err != nil { + return nil, err + } + + network.Ipams[0].Subnets[0].Policies = append(network.Ipams[0].Subnets[0].Policies, spJson) + + return network.Create() +} diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 83d3a92fe5..e4e15f399e 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -29,6 +29,8 @@ import ( "time" "github.com/Microsoft/hcsshim" + "github.com/Microsoft/hcsshim/hcn" + "github.com/davecgh/go-spew/spew" "k8s.io/klog" @@ -40,6 +42,7 @@ import ( apiservice "k8s.io/kubernetes/pkg/api/v1/service" "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/apis/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/util/async" ) @@ -82,6 +85,10 @@ type loadBalancerIngressInfo struct { hnsID string } +type loadBalancerInfo struct { + hnsID string +} + // internal struct for string service information type serviceInfo struct { clusterIP net.IP @@ -100,11 +107,22 @@ type serviceInfo struct { hnsID string nodePorthnsID string policyApplied bool + remoteEndpoint *endpointsInfo + hns HostNetworkService } type hnsNetworkInfo struct { - name string - id string + name string + id string + networkType string + remoteSubnets []*remoteSubnetInfo +} + +type remoteSubnetInfo struct { + destinationPrefix string + isolationId uint16 + providerAddress string + drMacAddress string } func Log(v interface{}, message string, level klog.Level) { @@ -120,12 +138,14 @@ func LogJson(v interface{}, message string, level klog.Level) { // internal struct for endpoints information type endpointsInfo struct { - ip string - port uint16 - isLocal bool - macAddress string - hnsID string - refCount uint16 + ip string + port uint16 + isLocal bool + macAddress string + hnsID string + refCount uint16 + providerAddress string + hns HostNetworkService } //Uses mac prefix and IPv4 address to return a mac address @@ -139,7 +159,7 @@ func conjureMac(macPrefix string, ip net.IP) string { return "02-11-22-33-44-55" } -func newEndpointInfo(ip string, port uint16, isLocal bool) *endpointsInfo { +func newEndpointInfo(ip string, port uint16, isLocal bool, hns HostNetworkService) *endpointsInfo { info := &endpointsInfo{ ip: ip, port: port, @@ -147,6 +167,7 @@ func newEndpointInfo(ip string, port uint16, isLocal bool) *endpointsInfo { macAddress: conjureMac("02-11", net.ParseIP(ip)), refCount: 0, hnsID: "", + hns: hns, } return info @@ -160,14 +181,17 @@ func (ep *endpointsInfo) Cleanup() { // Remove only remote endpoints created by this service if ep.refCount <= 0 && !ep.isLocal { klog.V(4).Infof("Removing endpoints for %v, since no one is referencing it", ep) - deleteHnsEndpoint(ep.hnsID) - ep.hnsID = "" + err := ep.hns.deleteEndpoint(ep.hnsID) + if err == nil { + ep.hnsID = "" + } else { + klog.Errorf("Endpoint deletion failed for %v: %v", ep.ip, err) + } } - } // returns a new serviceInfo struct -func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service) *serviceInfo { +func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, service *v1.Service, hns HostNetworkService) *serviceInfo { onlyNodeLocalEndpoints := false if apiservice.RequestsOnlyLocalTraffic(service) { onlyNodeLocalEndpoints = true @@ -175,8 +199,7 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser // set default session sticky max age 180min=10800s stickyMaxAgeSeconds := 10800 - if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP { - // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP + if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP && service.Spec.SessionAffinityConfig != nil { stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) } info := &serviceInfo{ @@ -193,6 +216,7 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *v1.ServicePort, ser stickyMaxAgeSeconds: stickyMaxAgeSeconds, loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)), onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, + hns: hns, } copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) @@ -256,17 +280,17 @@ func newEndpointsChangeMap(hostname string) endpointsChangeMap { } } -func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Endpoints) bool { +func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Endpoints, hns HostNetworkService) bool { ecm.lock.Lock() defer ecm.lock.Unlock() change, exists := ecm.items[*namespacedName] if !exists { change = &endpointsChange{} - change.previous = endpointsToEndpointsMap(previous, ecm.hostname) + change.previous = endpointsToEndpointsMap(previous, ecm.hostname, hns) ecm.items[*namespacedName] = change } - change.current = endpointsToEndpointsMap(current, ecm.hostname) + change.current = endpointsToEndpointsMap(current, ecm.hostname, hns) if reflect.DeepEqual(change.previous, change.current) { delete(ecm.items, *namespacedName) } @@ -279,7 +303,7 @@ func newServiceChangeMap() serviceChangeMap { } } -func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Service) bool { +func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Service, hns HostNetworkService) bool { scm.lock.Lock() defer scm.lock.Unlock() @@ -287,10 +311,10 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo if !exists { // Service is Added change = &serviceChange{} - change.previous = serviceToServiceMap(previous) + change.previous = serviceToServiceMap(previous, hns) scm.items[*namespacedName] = change } - change.current = serviceToServiceMap(current) + change.current = serviceToServiceMap(current, hns) if reflect.DeepEqual(change.previous, change.current) { delete(scm.items, *namespacedName) } @@ -420,7 +444,11 @@ type Proxier struct { // precomputing some number of those and cache for future reuse. precomputedProbabilities []string - network hnsNetworkInfo + hns HostNetworkService + network hnsNetworkInfo + sourceVip string + hostMac string + isDSR bool } type localPort struct { @@ -465,6 +493,7 @@ func NewProxier( nodeIP net.IP, recorder record.EventRecorder, healthzServer healthcheck.HealthzUpdater, + config config.KubeProxyWinkernelConfiguration, ) (*Proxier, error) { masqueradeValue := 1 << uint(masqueradeBit) masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue) @@ -479,19 +508,75 @@ func NewProxier( } healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps - - // TODO : Make this a param - hnsNetworkName := os.Getenv("KUBE_NETWORK") - if len(hnsNetworkName) == 0 { - return nil, fmt.Errorf("Environment variable KUBE_NETWORK not initialized") + var hns HostNetworkService + hns = hnsV1{} + supportedFeatures := hcn.GetSupportedFeatures() + if supportedFeatures.Api.V2 { + hns = hnsV2{} } - hnsNetwork, err := getHnsNetworkInfo(hnsNetworkName) + + hnsNetworkName := config.NetworkName + if len(hnsNetworkName) == 0 { + klog.V(3).Infof("network-name flag not set. Checking environment variable") + hnsNetworkName = os.Getenv("KUBE_NETWORK") + if len(hnsNetworkName) == 0 { + return nil, fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized") + } + } + + hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName) if err != nil { - klog.Fatalf("Unable to find Hns Network specified by %s. Please check environment variable KUBE_NETWORK", hnsNetworkName) + klog.Errorf("Unable to find Hns Network specified by %s. Please check environment variable KUBE_NETWORK or network-name flag", hnsNetworkName) + return nil, err + } + klog.V(1).Infof("Hns Network loaded with info = %v", hnsNetworkInfo) + isDSR := config.EnableDSR + err = hcn.DSRSupported() + if isDSR && err != nil { return nil, err } - klog.V(1).Infof("Hns Network loaded with info = %v", hnsNetwork) + var sourceVip string + var hostMac string + if hnsNetworkInfo.networkType == "Overlay" { + err = hcn.RemoteSubnetSupported() + if err != nil { + return nil, err + } + sourceVip = config.SourceVip + if len(sourceVip) == 0 { + return nil, fmt.Errorf("source-vip flag not set") + } + + interfaces, _ := net.Interfaces() //TODO create interfaces + for _, inter := range interfaces { + addresses, _ := inter.Addrs() + for _, addr := range addresses { + addrIP, _, _ := net.ParseCIDR(addr.String()) + if addrIP.String() == nodeIP.String() { + klog.V(2).Infof("Host MAC address is %s", inter.HardwareAddr.String()) + hostMac = inter.HardwareAddr.String() + } + } + } + if len(hostMac) == 0 { + return nil, fmt.Errorf("Could not find host mac address for %s", nodeIP) + } + + existingSourceVip, _ := hns.getEndpointByIpAddress(sourceVip, hnsNetworkName) + if existingSourceVip == nil { + hnsEndpoint := &endpointsInfo{ + ip: sourceVip, + isLocal: true, + macAddress: hostMac, + providerAddress: nodeIP.String(), + } + _, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName) + if err != nil { + return nil, fmt.Errorf("Source Vip endpoint creation failed: %v", err) + } + } + } proxier := &Proxier{ portsMap: make(map[localPort]closeable), @@ -507,7 +592,11 @@ func NewProxier( recorder: recorder, healthChecker: healthChecker, healthzServer: healthzServer, - network: *hnsNetwork, + hns: hns, + network: *hnsNetworkInfo, + sourceVip: sourceVip, + hostMac: hostMac, + isDSR: isDSR, } burstSyncs := 2 @@ -536,27 +625,30 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []*endpointsInfo) { for _, ep := range endpoints { ep.Cleanup() } + if svcInfo.remoteEndpoint != nil { + svcInfo.remoteEndpoint.Cleanup() + } svcInfo.policyApplied = false } func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() { // Remove the Hns Policy corresponding to this service - deleteHnsLoadBalancerPolicy(svcInfo.hnsID) + hns := svcInfo.hns + hns.deleteLoadBalancer(svcInfo.hnsID) svcInfo.hnsID = "" - deleteHnsLoadBalancerPolicy(svcInfo.nodePorthnsID) + hns.deleteLoadBalancer(svcInfo.nodePorthnsID) svcInfo.nodePorthnsID = "" for _, externalIp := range svcInfo.externalIPs { - deleteHnsLoadBalancerPolicy(externalIp.hnsID) + hns.deleteLoadBalancer(externalIp.hnsID) externalIp.hnsID = "" } for _, lbIngressIp := range svcInfo.loadBalancerIngressIPs { - deleteHnsLoadBalancerPolicy(lbIngressIp.hnsID) + hns.deleteLoadBalancer(lbIngressIp.hnsID) lbIngressIp.hnsID = "" } - } func deleteAllHnsLoadBalancerPolicy() { @@ -574,87 +666,6 @@ func deleteAllHnsLoadBalancerPolicy() { } -// getHnsLoadBalancer returns the LoadBalancer policy resource, if already found. -// If not, it would create one and return -func getHnsLoadBalancer(endpoints []hcsshim.HNSEndpoint, isILB bool, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*hcsshim.PolicyList, error) { - plists, err := hcsshim.HNSListPolicyListRequest() - if err != nil { - return nil, err - } - - for _, plist := range plists { - if len(plist.EndpointReferences) != len(endpoints) { - continue - } - // Validate if input meets any of the policy lists - elbPolicy := hcsshim.ELBPolicy{} - if err = json.Unmarshal(plist.Policies[0], &elbPolicy); err != nil { - continue - } - if elbPolicy.Protocol == protocol && elbPolicy.InternalPort == internalPort && elbPolicy.ExternalPort == externalPort && elbPolicy.ILB == isILB { - if len(vip) > 0 { - if len(elbPolicy.VIPs) == 0 || elbPolicy.VIPs[0] != vip { - continue - } - } - LogJson(plist, "Found existing Hns loadbalancer policy resource", 1) - return &plist, nil - - } - } - //TODO: sourceVip is not used. If required, expose this as a param - var sourceVip string - lb, err := hcsshim.AddLoadBalancer( - endpoints, - isILB, - sourceVip, - vip, - protocol, - internalPort, - externalPort, - ) - - if err == nil { - LogJson(lb, "Hns loadbalancer policy resource", 1) - } - return lb, err -} - -func deleteHnsLoadBalancerPolicy(hnsID string) { - if len(hnsID) == 0 { - // Return silently - return - } - - // Cleanup HNS policies - hnsloadBalancer, err := hcsshim.GetPolicyListByID(hnsID) - if err != nil { - klog.Errorf("%v", err) - return - } - LogJson(hnsloadBalancer, "Removing Policy", 2) - - _, err = hnsloadBalancer.Delete() - if err != nil { - klog.Errorf("%v", err) - } -} - -func deleteHnsEndpoint(hnsID string) { - hnsendpoint, err := hcsshim.GetHNSEndpointByID(hnsID) - if err != nil { - klog.Errorf("%v", err) - return - } - - _, err = hnsendpoint.Delete() - if err != nil { - klog.Errorf("%v", err) - } - - klog.V(3).Infof("Remote endpoint resource deleted id %s", hnsID) -} - func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) { hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName) if err != nil { @@ -663,29 +674,12 @@ func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) { } return &hnsNetworkInfo{ - id: hnsnetwork.Id, - name: hnsnetwork.Name, + id: hnsnetwork.Id, + name: hnsnetwork.Name, + networkType: hnsnetwork.Type, }, nil } -func getHnsEndpointByIpAddress(ip net.IP, networkName string) (*hcsshim.HNSEndpoint, error) { - hnsnetwork, err := hcsshim.GetHNSNetworkByName(networkName) - if err != nil { - klog.Errorf("%v", err) - return nil, err - } - - endpoints, err := hcsshim.HNSListEndpointRequest() - for _, endpoint := range endpoints { - equal := reflect.DeepEqual(endpoint.IPAddress, ip) - if equal && endpoint.VirtualNetwork == hnsnetwork.Id { - return &endpoint, nil - } - } - - return nil, fmt.Errorf("Endpoint %v not found on network %s", ip, networkName) -} - // Sync is called to synchronize the proxier state to hns as soon as possible. func (proxier *Proxier) Sync() { proxier.syncRunner.Run() @@ -714,21 +708,21 @@ func (proxier *Proxier) isInitialized() bool { func (proxier *Proxier) OnServiceAdd(service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { + if proxier.serviceChanges.update(&namespacedName, nil, service, proxier.hns) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() { + if proxier.serviceChanges.update(&namespacedName, oldService, service, proxier.hns) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnServiceDelete(service *v1.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() { + if proxier.serviceChanges.update(&namespacedName, service, nil, proxier.hns) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -789,21 +783,21 @@ func (proxier *Proxier) updateServiceMap() (result updateServiceMapResult) { func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() { + if proxier.endpointsChanges.update(&namespacedName, nil, endpoints, proxier.hns) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() { + if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints, proxier.hns) && proxier.isInitialized() { proxier.syncRunner.Run() } } func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} - if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() { + if proxier.endpointsChanges.update(&namespacedName, endpoints, nil, proxier.hns) && proxier.isInitialized() { proxier.syncRunner.Run() } } @@ -867,7 +861,7 @@ func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.S // This function is used for incremental updated of endpointsMap. // // NOTE: endpoints object should NOT be modified. -func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string) proxyEndpointsMap { +func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string, hns HostNetworkService) proxyEndpointsMap { if endpoints == nil { return nil } @@ -880,7 +874,7 @@ func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string) proxyEndp for i := range ss.Ports { port := &ss.Ports[i] if port.Port == 0 { - klog.Warningf("ignoring invalid endpoint port %s", port.Name) + klog.Warningf("Ignoring invalid endpoint port %s", port.Name) continue } svcPortName := proxy.ServicePortName{ @@ -890,11 +884,11 @@ func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string) proxyEndp for i := range ss.Addresses { addr := &ss.Addresses[i] if addr.IP == "" { - klog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) + klog.Warningf("Ignoring invalid endpoint port %s with empty host", port.Name) continue } isLocal := addr.NodeName != nil && *addr.NodeName == hostname - epInfo := newEndpointInfo(addr.IP, uint16(port.Port), isLocal) + epInfo := newEndpointInfo(addr.IP, uint16(port.Port), isLocal, hns) endpointsMap[svcPortName] = append(endpointsMap[svcPortName], epInfo) } if klog.V(3) { @@ -912,7 +906,7 @@ func endpointsToEndpointsMap(endpoints *v1.Endpoints, hostname string) proxyEndp // Translates single Service object to proxyServiceMap. // // NOTE: service object should NOT be modified. -func serviceToServiceMap(service *v1.Service) proxyServiceMap { +func serviceToServiceMap(service *v1.Service, hns HostNetworkService) proxyServiceMap { if service == nil { return nil } @@ -925,7 +919,7 @@ func serviceToServiceMap(service *v1.Service) proxyServiceMap { for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} - serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service) + serviceMap[svcPortName] = newServiceInfo(svcPortName, servicePort, service, hns) } return serviceMap } @@ -972,12 +966,36 @@ func (proxier *Proxier) syncProxyRules() { continue } - var hnsEndpoints []hcsshim.HNSEndpoint + hnsNetworkName := proxier.network.name + hns := proxier.hns + if proxier.network.networkType == "Overlay" { + serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.clusterIP.String(), hnsNetworkName) + if serviceVipEndpoint == nil { + klog.V(4).Infof("No existing remote endpoint for service VIP %v", svcInfo.clusterIP.String()) + hnsEndpoint := &endpointsInfo{ + ip: svcInfo.clusterIP.String(), + isLocal: false, + macAddress: proxier.hostMac, + providerAddress: proxier.nodeIP.String(), + } + + newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName) + if err != nil { + klog.Errorf("Remote endpoint creation failed for service VIP: %v", err) + continue + } + + newHnsEndpoint.refCount++ + svcInfo.remoteEndpoint = newHnsEndpoint + } + } + + var hnsEndpoints []endpointsInfo klog.V(4).Infof("====Applying Policy for %s====", svcName) // Create Remote endpoints for every endpoint, corresponding to the service for _, ep := range proxier.endpointsMap[svcName] { - var newHnsEndpoint *hcsshim.HNSEndpoint + var newHnsEndpoint *endpointsInfo hnsNetworkName := proxier.network.name var err error @@ -989,14 +1007,14 @@ func (proxier *Proxier) syncProxyRules() { } if len(ep.hnsID) > 0 { - newHnsEndpoint, err = hcsshim.GetHNSEndpointByID(ep.hnsID) + newHnsEndpoint, err = hns.getEndpointByID(ep.hnsID) } if newHnsEndpoint == nil { // First check if an endpoint resource exists for this IP, on the current host // A Local endpoint could exist here already // A remote endpoint was already created and proxy was restarted - newHnsEndpoint, err = getHnsEndpointByIpAddress(net.ParseIP(ep.ip), hnsNetworkName) + newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.ip, hnsNetworkName) } if newHnsEndpoint == nil { @@ -1004,29 +1022,63 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("Local endpoint not found for %v: err: %v on network %s", ep.ip, err, hnsNetworkName) continue } - // hns Endpoint resource was not found, create one - hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName) - if err != nil { - klog.Errorf("%v", err) - continue - } - hnsEndpoint := &hcsshim.HNSEndpoint{ - MacAddress: ep.macAddress, - IPAddress: net.ParseIP(ep.ip), - } + if proxier.network.networkType == "Overlay" { + klog.Infof("Updating network %v to check for new remote subnet policies", proxier.network.name) + networkName := proxier.network.name + updatedNetwork, err := hns.getNetworkByName(networkName) + if err != nil { + klog.Fatalf("Failed to get network %v: %v", networkName, err) + } + proxier.network = *updatedNetwork + var providerAddress string + for _, rs := range proxier.network.remoteSubnets { + _, ipNet, err := net.ParseCIDR(rs.destinationPrefix) + if err != nil { + klog.Fatalf("%v", err) + } + if ipNet.Contains(net.ParseIP(ep.ip)) { + providerAddress = rs.providerAddress + } + if ep.ip == rs.providerAddress { + providerAddress = rs.providerAddress + } + } + if len(providerAddress) == 0 { + klog.Errorf("Could not find provider address for %s", ep.ip) + continue + } + hnsEndpoint := &endpointsInfo{ + ip: ep.ip, + isLocal: false, + macAddress: conjureMac("02-11", net.ParseIP(ep.ip)), + providerAddress: providerAddress, + } - newHnsEndpoint, err = hnsnetwork.CreateRemoteEndpoint(hnsEndpoint) - if err != nil { - klog.Errorf("Remote endpoint creation failed: %v", err) - continue + newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName) + if err != nil { + klog.Errorf("Remote endpoint creation failed: %v, %s", err, spew.Sdump(hnsEndpoint)) + continue + } + } else { + hnsEndpoint := &endpointsInfo{ + ip: ep.ip, + isLocal: false, + macAddress: ep.macAddress, + } + + newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName) + if err != nil { + klog.Errorf("Remote endpoint creation failed: %v", err) + continue + } } } // Save the hnsId for reference LogJson(newHnsEndpoint, "Hns Endpoint resource", 1) hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint) - ep.hnsID = newHnsEndpoint.Id + ep.hnsID = newHnsEndpoint.hnsID ep.refCount++ Log(ep, "Endpoint resource found", 3) } @@ -1044,11 +1096,13 @@ func (proxier *Proxier) syncProxyRules() { } klog.V(4).Infof("Trying to Apply Policies for service %s", spew.Sdump(svcInfo)) - var hnsLoadBalancer *hcsshim.PolicyList + var hnsLoadBalancer *loadBalancerInfo - hnsLoadBalancer, err := getHnsLoadBalancer( + hnsLoadBalancer, err := hns.getLoadBalancer( hnsEndpoints, false, + proxier.isDSR, + proxier.sourceVip, svcInfo.clusterIP.String(), Enum(svcInfo.protocol), uint16(svcInfo.targetPort), @@ -1059,15 +1113,17 @@ func (proxier *Proxier) syncProxyRules() { continue } - svcInfo.hnsID = hnsLoadBalancer.ID - klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.ID) + svcInfo.hnsID = hnsLoadBalancer.hnsID + klog.V(3).Infof("Hns LoadBalancer resource created for cluster ip resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID) // If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints if svcInfo.nodePort > 0 { - hnsLoadBalancer, err := getHnsLoadBalancer( + hnsLoadBalancer, err := hns.getLoadBalancer( hnsEndpoints, false, - "", // VIP has to be empty to automatically select the nodeIP + false, + proxier.sourceVip, + "", Enum(svcInfo.protocol), uint16(svcInfo.targetPort), uint16(svcInfo.nodePort), @@ -1077,16 +1133,18 @@ func (proxier *Proxier) syncProxyRules() { continue } - svcInfo.nodePorthnsID = hnsLoadBalancer.ID - klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.ID) + svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID + klog.V(3).Infof("Hns LoadBalancer resource created for nodePort resources %v, Id [%s]", svcInfo.clusterIP, hnsLoadBalancer.hnsID) } // Create a Load Balancer Policy for each external IP for _, externalIp := range svcInfo.externalIPs { // Try loading existing policies, if already available - hnsLoadBalancer, err := getHnsLoadBalancer( + hnsLoadBalancer, err = hns.getLoadBalancer( hnsEndpoints, false, + false, + proxier.sourceVip, externalIp.ip, Enum(svcInfo.protocol), uint16(svcInfo.targetPort), @@ -1096,15 +1154,17 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("Policy creation failed: %v", err) continue } - externalIp.hnsID = hnsLoadBalancer.ID - klog.V(3).Infof("Hns LoadBalancer resource created for externalIp resources %v, Id[%s]", externalIp, hnsLoadBalancer.ID) + externalIp.hnsID = hnsLoadBalancer.hnsID + klog.V(3).Infof("Hns LoadBalancer resource created for externalIp resources %v, Id[%s]", externalIp, hnsLoadBalancer.hnsID) } // Create a Load Balancer Policy for each loadbalancer ingress for _, lbIngressIp := range svcInfo.loadBalancerIngressIPs { // Try loading existing policies, if already available - hnsLoadBalancer, err := getHnsLoadBalancer( + hnsLoadBalancer, err := hns.getLoadBalancer( hnsEndpoints, false, + false, + proxier.sourceVip, lbIngressIp.ip, Enum(svcInfo.protocol), uint16(svcInfo.targetPort), @@ -1114,7 +1174,7 @@ func (proxier *Proxier) syncProxyRules() { klog.Errorf("Policy creation failed: %v", err) continue } - lbIngressIp.hnsID = hnsLoadBalancer.ID + lbIngressIp.hnsID = hnsLoadBalancer.hnsID klog.V(3).Infof("Hns LoadBalancer resource created for loadBalancer Ingress resources %v", lbIngressIp) } svcInfo.policyApplied = true diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go new file mode 100644 index 0000000000..8c6cc3b747 --- /dev/null +++ b/pkg/proxy/winkernel/proxier_test.go @@ -0,0 +1,379 @@ +// +build windows + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package winkernel + +import ( + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/proxy" + + "net" + "strings" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const testHostName = "test-hostname" +const macAddress = "00-11-22-33-44-55" +const clusterCIDR = "192.168.1.0/24" +const destinationPrefix = "192.168.2.0/24" +const providerAddress = "10.0.0.3" +const guid = "123ABC" + +type fakeHealthChecker struct { + services map[types.NamespacedName]uint16 + endpoints map[types.NamespacedName]int +} + +func newFakeHealthChecker() *fakeHealthChecker { + return &fakeHealthChecker{ + services: map[types.NamespacedName]uint16{}, + endpoints: map[types.NamespacedName]int{}, + } +} +func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error { + fake.services = newServices + return nil +} + +func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { + fake.endpoints = newEndpoints + return nil +} + +type fakeHNS struct{} + +func newFakeHNS() *fakeHNS { + return &fakeHNS{} +} +func (hns fakeHNS) getNetworkByName(name string) (*hnsNetworkInfo, error) { + var remoteSubnets []*remoteSubnetInfo + rs := &remoteSubnetInfo{ + destinationPrefix: destinationPrefix, + isolationId: 4096, + providerAddress: providerAddress, + drMacAddress: macAddress, + } + remoteSubnets = append(remoteSubnets, rs) + return &hnsNetworkInfo{ + id: strings.ToUpper(guid), + name: name, + networkType: "Overlay", + remoteSubnets: remoteSubnets, + }, nil +} +func (hns fakeHNS) getEndpointByID(id string) (*endpointsInfo, error) { + return nil, nil +} +func (hns fakeHNS) getEndpointByIpAddress(ip string, networkName string) (*endpointsInfo, error) { + _, ipNet, _ := net.ParseCIDR(destinationPrefix) + + if ipNet.Contains(net.ParseIP(ip)) { + return &endpointsInfo{ + ip: ip, + isLocal: true, + macAddress: macAddress, + hnsID: guid, + hns: hns, + }, nil + } + return nil, nil + +} +func (hns fakeHNS) createEndpoint(ep *endpointsInfo, networkName string) (*endpointsInfo, error) { + return &endpointsInfo{ + ip: ep.ip, + isLocal: ep.isLocal, + macAddress: ep.macAddress, + hnsID: guid, + hns: hns, + }, nil +} +func (hns fakeHNS) deleteEndpoint(hnsID string) error { + return nil +} +func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, isILB bool, isDSR bool, sourceVip string, vip string, protocol uint16, internalPort uint16, externalPort uint16) (*loadBalancerInfo, error) { + return &loadBalancerInfo{ + hnsID: guid, + }, nil +} +func (hns fakeHNS) deleteLoadBalancer(hnsID string) error { + return nil +} +func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier { + sourceVip := "192.168.1.2" + hnsNetworkInfo := &hnsNetworkInfo{ + name: "TestNetwork", + networkType: networkType, + } + proxier := &Proxier{ + portsMap: make(map[localPort]closeable), + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(hostname), + clusterCIDR: clusterCIDR, + hostname: testHostName, + nodeIP: nodeIP, + healthChecker: newFakeHealthChecker(), + network: *hnsNetworkInfo, + sourceVip: sourceVip, + hostMac: macAddress, + isDSR: false, + hns: newFakeHNS(), + } + return proxier +} + +func TestCreateServiceVip(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") + if proxier == nil { + t.Error() + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcExternalIPs := "50.60.70.81" + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + } + timeoutSeconds := v1.DefaultClientIPServiceAffinitySeconds + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.ExternalIPs = []string{svcExternalIPs} + svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP + svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{ + ClientIP: &v1.ClientIPConfig{ + TimeoutSeconds: &timeoutSeconds, + }, + } + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + makeEndpointsMap(proxier) + + proxier.syncProxyRules() + + if proxier.serviceMap[svcPortName].remoteEndpoint == nil { + t.Error() + } + if proxier.serviceMap[svcPortName].remoteEndpoint.ip != svcIP { + t.Error() + } +} +func TestCreateRemoteEndpointOverlay(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") + if proxier == nil { + t.Error() + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + makeEndpointsMap(proxier, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + }}, + }} + }), + ) + + proxier.syncProxyRules() + + if proxier.endpointsMap[svcPortName][0].hnsID != guid { + t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid) + } +} +func TestCreateRemoteEndpointL2Bridge(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge") + if proxier == nil { + t.Error() + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + makeEndpointsMap(proxier, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + }}, + }} + }), + ) + + proxier.syncProxyRules() + + if proxier.endpointsMap[svcPortName][0].hnsID != guid { + t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid) + } +} +func TestCreateLoadBalancer(t *testing.T) { + syncPeriod := 30 * time.Second + proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") + if proxier == nil { + t.Error() + } + + svcIP := "10.20.30.41" + svcPort := 80 + svcNodePort := 3001 + svcPortName := proxy.ServicePortName{ + NamespacedName: makeNSN("ns1", "svc1"), + Port: "p80", + } + + makeServiceMap(proxier, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { + svc.Spec.Type = "NodePort" + svc.Spec.ClusterIP = svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: v1.ProtocolTCP, + NodePort: int32(svcNodePort), + }} + }), + ) + makeEndpointsMap(proxier, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { + ept.Subsets = []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{ + IP: epIpAddressRemote, + }}, + Ports: []v1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + }}, + }} + }), + ) + + proxier.syncProxyRules() + + if proxier.serviceMap[svcPortName].hnsID != guid { + t.Errorf("%v does not match %v", proxier.serviceMap[svcPortName].hnsID, guid) + } +} +func makeNSN(namespace, name string) types.NamespacedName { + return types.NamespacedName{Namespace: namespace, Name: name} +} +func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) { + for i := range allServices { + proxier.OnServiceAdd(allServices[i]) + } + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.servicesSynced = true +} +func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service { + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: map[string]string{}, + }, + Spec: v1.ServiceSpec{}, + Status: v1.ServiceStatus{}, + } + svcFunc(svc) + return svc +} + +func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) { + for i := range allEndpoints { + proxier.OnEndpointsAdd(allEndpoints[i]) + } + + proxier.mu.Lock() + defer proxier.mu.Unlock() + proxier.endpointsSynced = true +} + +func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints { + ept := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + eptFunc(ept) + return ept +}