k3s/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/proxier.go

1315 lines
42 KiB
Go
Raw Normal View History

2019-01-12 04:58:27 +00:00
// +build windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package winkernel
import (
"encoding/json"
"fmt"
"net"
"os"
2020-08-10 17:43:49 +00:00
"strconv"
"strings"
2019-01-12 04:58:27 +00:00
"sync"
"sync/atomic"
"time"
"github.com/Microsoft/hcsshim"
2019-04-07 17:07:55 +00:00
"github.com/Microsoft/hcsshim/hcn"
2020-08-10 17:43:49 +00:00
discovery "k8s.io/api/discovery/v1beta1"
2019-04-07 17:07:55 +00:00
2019-01-12 04:58:27 +00:00
"github.com/davecgh/go-spew/spew"
2020-08-10 17:43:49 +00:00
v1 "k8s.io/api/core/v1"
2019-01-12 04:58:27 +00:00
"k8s.io/apimachinery/pkg/types"
2020-03-26 21:07:15 +00:00
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
2019-01-12 04:58:27 +00:00
"k8s.io/apimachinery/pkg/util/wait"
2019-04-07 17:07:55 +00:00
utilfeature "k8s.io/apiserver/pkg/util/feature"
2019-01-12 04:58:27 +00:00
"k8s.io/client-go/tools/record"
2020-08-10 17:43:49 +00:00
"k8s.io/klog/v2"
2019-01-12 04:58:27 +00:00
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
2020-08-10 17:43:49 +00:00
"k8s.io/kubernetes/pkg/features"
kubefeatures "k8s.io/kubernetes/pkg/features"
2019-01-12 04:58:27 +00:00
"k8s.io/kubernetes/pkg/proxy"
2019-04-07 17:07:55 +00:00
"k8s.io/kubernetes/pkg/proxy/apis/config"
2019-09-27 21:51:53 +00:00
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
2019-01-12 04:58:27 +00:00
"k8s.io/kubernetes/pkg/proxy/healthcheck"
2020-08-10 17:43:49 +00:00
"k8s.io/kubernetes/pkg/proxy/metaproxier"
2020-03-26 21:07:15 +00:00
"k8s.io/kubernetes/pkg/proxy/metrics"
2019-01-12 04:58:27 +00:00
"k8s.io/kubernetes/pkg/util/async"
2020-08-10 17:43:49 +00:00
utilnet "k8s.io/utils/net"
2019-01-12 04:58:27 +00:00
)
// KernelCompatTester tests whether the required kernel capabilities are
// present to run the windows kernel proxier.
type KernelCompatTester interface {
IsCompatible() error
}
// CanUseWinKernelProxier returns true if we should use the Kernel Proxier
// instead of the "classic" userspace Proxier. This is determined by checking
// the windows kernel version and for the existence of kernel features.
func CanUseWinKernelProxier(kcompat KernelCompatTester) (bool, error) {
// Check that the kernel supports what we need.
if err := kcompat.IsCompatible(); err != nil {
return false, err
}
return true, nil
}
type WindowsKernelCompatTester struct{}
// IsCompatible returns true if winkernel can support this mode of proxy
func (lkct WindowsKernelCompatTester) IsCompatible() error {
_, err := hcsshim.HNSListPolicyListRequest()
if err != nil {
return fmt.Errorf("Windows kernel is not compatible for Kernel mode")
}
return nil
}
type externalIPInfo struct {
ip string
hnsID string
}
type loadBalancerIngressInfo struct {
ip string
hnsID string
}
2019-04-07 17:07:55 +00:00
type loadBalancerInfo struct {
hnsID string
}
2019-08-30 18:33:25 +00:00
type loadBalancerFlags struct {
2020-08-10 17:43:49 +00:00
isILB bool
isDSR bool
localRoutedVIP bool
useMUX bool
preserveDIP bool
sessionAffinity bool
isIPv6 bool
2019-08-30 18:33:25 +00:00
}
2019-01-12 04:58:27 +00:00
// internal struct for string service information
type serviceInfo struct {
2020-08-10 17:43:49 +00:00
*proxy.BaseServiceInfo
targetPort int
externalIPs []*externalIPInfo
loadBalancerIngressIPs []*loadBalancerIngressInfo
hnsID string
nodePorthnsID string
policyApplied bool
remoteEndpoint *endpointsInfo
hns HostNetworkService
preserveDIP bool
localTrafficDSR bool
2019-01-12 04:58:27 +00:00
}
type hnsNetworkInfo struct {
2019-04-07 17:07:55 +00:00
name string
id string
networkType string
remoteSubnets []*remoteSubnetInfo
}
type remoteSubnetInfo struct {
destinationPrefix string
2019-08-30 18:33:25 +00:00
isolationID uint16
2019-04-07 17:07:55 +00:00
providerAddress string
drMacAddress string
2019-01-12 04:58:27 +00:00
}
const NETWORK_TYPE_OVERLAY = "overlay"
2019-01-12 04:58:27 +00:00
func Log(v interface{}, message string, level klog.Level) {
klog.V(level).InfoS("%s", message, "spewConfig", spewSdump(v))
2019-01-12 04:58:27 +00:00
}
func LogJson(interfaceName string, v interface{}, message string, level klog.Level) {
2019-01-12 04:58:27 +00:00
jsonString, err := json.Marshal(v)
if err == nil {
klog.V(level).InfoS("%s", message, interfaceName, string(jsonString))
2019-01-12 04:58:27 +00:00
}
}
2020-08-10 17:43:49 +00:00
func spewSdump(v interface{}) string {
scs := spew.NewDefaultConfig()
scs.DisableMethods = true
return scs.Sdump(v)
}
2019-01-12 04:58:27 +00:00
// internal struct for endpoints information
type endpointsInfo struct {
2019-04-07 17:07:55 +00:00
ip string
port uint16
isLocal bool
macAddress string
hnsID string
2020-08-10 17:43:49 +00:00
refCount *uint16
2019-04-07 17:07:55 +00:00
providerAddress string
hns HostNetworkService
// conditions
ready bool
serving bool
terminating bool
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
// String is part of proxy.Endpoint interface.
func (info *endpointsInfo) String() string {
return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port)))
}
// GetIsLocal is part of proxy.Endpoint interface.
func (info *endpointsInfo) GetIsLocal() bool {
return info.isLocal
}
// IsReady returns true if an endpoint is ready and not terminating.
func (info *endpointsInfo) IsReady() bool {
return info.ready
}
// IsServing returns true if an endpoint is ready, regardless of it's terminating state.
func (info *endpointsInfo) IsServing() bool {
return info.serving
}
// IsTerminating returns true if an endpoint is terminating.
func (info *endpointsInfo) IsTerminating() bool {
return info.terminating
}
2020-08-10 17:43:49 +00:00
// GetTopology returns the topology information of the endpoint.
func (info *endpointsInfo) GetTopology() map[string]string {
return nil
}
// GetZoneHint returns the zone hint for the endpoint.
func (info *endpointsInfo) GetZoneHints() sets.String {
return sets.String{}
}
2020-08-10 17:43:49 +00:00
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *endpointsInfo) IP() string {
return info.ip
}
// Port returns just the Port part of the endpoint.
func (info *endpointsInfo) Port() (int, error) {
return int(info.port), nil
}
// Equal is part of proxy.Endpoint interface.
func (info *endpointsInfo) Equal(other proxy.Endpoint) bool {
return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal()
}
2019-01-12 04:58:27 +00:00
//Uses mac prefix and IPv4 address to return a mac address
//This ensures mac addresses are unique for proper load balancing
2020-08-10 17:43:49 +00:00
//There is a possibility of MAC collisions but this Mac address is used for remote endpoints only
//and not sent on the wire.
2019-01-12 04:58:27 +00:00
func conjureMac(macPrefix string, ip net.IP) string {
if ip4 := ip.To4(); ip4 != nil {
a, b, c, d := ip4[0], ip4[1], ip4[2], ip4[3]
return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
2020-08-10 17:43:49 +00:00
} else if ip6 := ip.To16(); ip6 != nil {
a, b, c, d := ip6[15], ip6[14], ip6[13], ip6[12]
return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
2019-01-12 04:58:27 +00:00
}
return "02-11-22-33-44-55"
}
2020-08-10 17:43:49 +00:00
func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) {
for svcPortName := range oldEndpointsMap {
proxier.onEndpointsMapChange(&svcPortName)
}
for svcPortName := range newEndpointsMap {
proxier.onEndpointsMapChange(&svcPortName)
}
}
func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) {
svc, exists := proxier.serviceMap[*svcPortName]
if exists {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcPortName", svcPortName.String())
2020-08-10 17:43:49 +00:00
return
}
klog.V(3).InfoS("Endpoints are modified. Service is stale", "svcPortName", svcPortName.String())
2020-08-10 17:43:49 +00:00
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName])
} else {
// If no service exists, just cleanup the remote endpoints
klog.V(3).InfoS("Endpoints are orphaned. Cleaning up")
2020-08-10 17:43:49 +00:00
// Cleanup Endpoints references
epInfos, exists := proxier.endpointsMap[*svcPortName]
if exists {
// Cleanup Endpoints references
for _, ep := range epInfos {
epInfo, ok := ep.(*endpointsInfo)
if ok {
epInfo.Cleanup()
}
}
}
}
}
func (proxier *Proxier) serviceMapChange(previous, current proxy.ServiceMap) {
for svcPortName := range current {
proxier.onServiceMapChange(&svcPortName)
}
for svcPortName := range previous {
if _, ok := current[svcPortName]; ok {
continue
}
proxier.onServiceMapChange(&svcPortName)
}
}
func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
svc, exists := proxier.serviceMap[*svcPortName]
if exists {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcPortName", svcPortName.String())
2020-08-10 17:43:49 +00:00
return
}
klog.V(3).InfoS("Updating existing service port", "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP(), "port", svcInfo.Port(), "protocol", svcInfo.Protocol())
2020-08-10 17:43:49 +00:00
svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName])
}
}
// returns a new proxy.Endpoint which abstracts a endpointsInfo
func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint {
portNumber, err := baseInfo.Port()
if err != nil {
portNumber = 0
}
2019-01-12 04:58:27 +00:00
info := &endpointsInfo{
2020-08-10 17:43:49 +00:00
ip: baseInfo.IP(),
port: uint16(portNumber),
isLocal: baseInfo.GetIsLocal(),
macAddress: conjureMac("02-11", net.ParseIP(baseInfo.IP())),
refCount: new(uint16),
2019-01-12 04:58:27 +00:00
hnsID: "",
2020-08-10 17:43:49 +00:00
hns: proxier.hns,
ready: baseInfo.Ready,
serving: baseInfo.Serving,
terminating: baseInfo.Terminating,
2019-01-12 04:58:27 +00:00
}
return info
}
2019-08-30 18:33:25 +00:00
func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointsInfo, error) {
hnsEndpoint := &endpointsInfo{
ip: ip,
isLocal: true,
macAddress: mac,
providerAddress: providerAddress,
ready: true,
serving: true,
terminating: false,
2019-08-30 18:33:25 +00:00
}
ep, err := hns.createEndpoint(hnsEndpoint, network)
return ep, err
}
2019-01-12 04:58:27 +00:00
func (ep *endpointsInfo) Cleanup() {
Log(ep, "Endpoint Cleanup", 3)
if !ep.GetIsLocal() && ep.refCount != nil {
2020-08-10 17:43:49 +00:00
*ep.refCount--
// Remove the remote hns endpoint, if no service is referring it
// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
// Remove only remote endpoints created by this service
if *ep.refCount <= 0 && !ep.GetIsLocal() {
klog.V(4).InfoS("Removing endpoints, since no one is referencing it", "endpoint", ep.String())
err := ep.hns.deleteEndpoint(ep.hnsID)
if err == nil {
ep.hnsID = ""
} else {
klog.ErrorS(err, "Endpoint deletion failed", "ip", ep.IP())
}
2019-04-07 17:07:55 +00:00
}
ep.refCount = nil
2019-01-12 04:58:27 +00:00
}
}
2020-08-10 17:43:49 +00:00
func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 {
refCount, exists := refCountMap[hnsID]
if !exists {
refCountMap[hnsID] = new(uint16)
refCount = refCountMap[hnsID]
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
return refCount
}
2019-08-30 18:33:25 +00:00
2020-08-10 17:43:49 +00:00
// returns a new proxy.ServicePort which abstracts a serviceInfo
func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
info := &serviceInfo{BaseServiceInfo: baseInfo}
2019-08-30 18:33:25 +00:00
preserveDIP := service.Annotations["preserve-destination"] == "true"
localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal
2019-08-30 18:33:25 +00:00
err := hcn.DSRSupported()
if err != nil {
preserveDIP = false
localTrafficDSR = false
2019-08-30 18:33:25 +00:00
}
2020-03-26 21:07:15 +00:00
// targetPort is zero if it is specified as a name in port.TargetPort.
// Its real value would be got later from endpoints.
targetPort := 0
if port.TargetPort.Type == intstr.Int {
targetPort = port.TargetPort.IntValue()
}
2020-08-10 17:43:49 +00:00
info.preserveDIP = preserveDIP
info.targetPort = targetPort
info.hns = proxier.hns
info.localTrafficDSR = localTrafficDSR
2020-08-10 17:43:49 +00:00
2019-01-12 04:58:27 +00:00
for _, eip := range service.Spec.ExternalIPs {
info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
}
2020-08-10 17:43:49 +00:00
2019-01-12 04:58:27 +00:00
for _, ingress := range service.Status.LoadBalancer.Ingress {
if net.ParseIP(ingress.IP) != nil {
info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.IP})
}
2019-01-12 04:58:27 +00:00
}
return info
}
2020-08-10 17:43:49 +00:00
func (network hnsNetworkInfo) findRemoteSubnetProviderAddress(ip string) string {
var providerAddress string
for _, rs := range network.remoteSubnets {
_, ipNet, err := net.ParseCIDR(rs.destinationPrefix)
if err != nil {
klog.ErrorS(err, "Failed to parse CIDR")
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
if ipNet.Contains(net.ParseIP(ip)) {
providerAddress = rs.providerAddress
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
if ip == rs.providerAddress {
providerAddress = rs.providerAddress
2019-01-12 04:58:27 +00:00
}
}
2020-08-10 17:43:49 +00:00
return providerAddress
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
type endPointsReferenceCountMap map[string]*uint16
2019-01-12 04:58:27 +00:00
// Proxier is an hns based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
2019-12-12 01:27:03 +00:00
// TODO(imroc): implement node handler for winkernel proxier.
proxyconfig.NoopNodeHandler
2019-09-27 21:51:53 +00:00
2019-01-12 04:58:27 +00:00
// endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since policies were synced. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
2020-08-10 17:43:49 +00:00
endpointsChanges *proxy.EndpointChangeTracker
serviceChanges *proxy.ServiceChangeTracker
endPointsRefCount endPointsReferenceCountMap
mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
2019-01-12 04:58:27 +00:00
// endpointsSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating hns policies
// with some partial data after kube-proxy restart.
2020-08-10 17:43:49 +00:00
endpointsSynced bool
endpointSlicesSynced bool
servicesSynced bool
isIPv6Mode bool
initialized int32
syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
2019-01-12 04:58:27 +00:00
// These are effectively const and do not need the mutex to be held.
masqueradeAll bool
masqueradeMark string
clusterCIDR string
hostname string
nodeIP net.IP
recorder record.EventRecorder
2019-12-12 01:27:03 +00:00
serviceHealthServer healthcheck.ServiceHealthServer
healthzServer healthcheck.ProxierHealthUpdater
2019-01-12 04:58:27 +00:00
// Since converting probabilities (floats) to strings is expensive
// and we are using only probabilities in the format of 1/n, we are
// precomputing some number of those and cache for future reuse.
precomputedProbabilities []string
2020-08-10 17:43:49 +00:00
hns HostNetworkService
network hnsNetworkInfo
sourceVip string
hostMac string
isDSR bool
supportedFeatures hcn.SupportedFeatures
2019-01-12 04:58:27 +00:00
}
type localPort struct {
desc string
ip string
port int
protocol string
}
func (lp *localPort) String() string {
return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
}
func Enum(p v1.Protocol) uint16 {
if p == v1.ProtocolTCP {
return 6
}
if p == v1.ProtocolUDP {
return 17
}
if p == v1.ProtocolSCTP {
return 132
}
return 0
}
type closeable interface {
Close() error
}
2019-09-27 21:51:53 +00:00
// Proxier implements proxy.Provider
var _ proxy.Provider = &Proxier{}
2019-01-12 04:58:27 +00:00
// NewProxier returns a new Proxier
func NewProxier(
syncPeriod time.Duration,
minSyncPeriod time.Duration,
masqueradeAll bool,
masqueradeBit int,
clusterCIDR string,
hostname string,
nodeIP net.IP,
recorder record.EventRecorder,
2019-12-12 01:27:03 +00:00
healthzServer healthcheck.ProxierHealthUpdater,
2019-04-07 17:07:55 +00:00
config config.KubeProxyWinkernelConfiguration,
2019-01-12 04:58:27 +00:00
) (*Proxier, error) {
masqueradeValue := 1 << uint(masqueradeBit)
masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)
if nodeIP == nil {
klog.InfoS("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
2019-01-12 04:58:27 +00:00
nodeIP = net.ParseIP("127.0.0.1")
}
if len(clusterCIDR) == 0 {
klog.InfoS("clusterCIDR not specified, unable to distinguish between internal and external traffic")
2019-01-12 04:58:27 +00:00
}
2019-12-12 01:27:03 +00:00
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
2019-04-07 17:07:55 +00:00
var hns HostNetworkService
hns = hnsV1{}
supportedFeatures := hcn.GetSupportedFeatures()
2019-08-30 18:33:25 +00:00
if supportedFeatures.Api.V2 {
2019-04-07 17:07:55 +00:00
hns = hnsV2{}
}
2019-01-12 04:58:27 +00:00
2019-04-07 17:07:55 +00:00
hnsNetworkName := config.NetworkName
2019-01-12 04:58:27 +00:00
if len(hnsNetworkName) == 0 {
klog.V(3).InfoS("network-name flag not set. Checking environment variable")
2019-04-07 17:07:55 +00:00
hnsNetworkName = os.Getenv("KUBE_NETWORK")
if len(hnsNetworkName) == 0 {
return nil, fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
}
2019-01-12 04:58:27 +00:00
}
2019-04-07 17:07:55 +00:00
klog.V(3).InfoS("Cleaning up old HNS policy lists")
2019-08-30 18:33:25 +00:00
deleteAllHnsLoadBalancerPolicy()
// Get HNS network information
2019-04-07 17:07:55 +00:00
hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
2019-08-30 18:33:25 +00:00
for err != nil {
klog.ErrorS(err, "Unable to find HNS Network specified. Please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
2019-08-30 18:33:25 +00:00
time.Sleep(1 * time.Second)
hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
2019-04-07 17:07:55 +00:00
}
2019-08-30 18:33:25 +00:00
// Network could have been detected before Remote Subnet Routes are applied or ManagementIP is updated
// Sleep and update the network to include new information
if strings.EqualFold(hnsNetworkInfo.networkType, NETWORK_TYPE_OVERLAY) {
2019-08-30 18:33:25 +00:00
time.Sleep(10 * time.Second)
hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
if err != nil {
return nil, fmt.Errorf("Could not find HNS network %s", hnsNetworkName)
}
}
klog.V(1).InfoS("Hns Network loaded", "hnsNetworkInfo", hnsNetworkInfo)
2019-04-07 17:07:55 +00:00
isDSR := config.EnableDSR
2020-08-10 17:43:49 +00:00
if isDSR && !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinDSR) {
2019-04-07 17:07:55 +00:00
return nil, fmt.Errorf("WinDSR feature gate not enabled")
}
err = hcn.DSRSupported()
if isDSR && err != nil {
2019-01-12 04:58:27 +00:00
return nil, err
}
2019-04-07 17:07:55 +00:00
var sourceVip string
var hostMac string
if strings.EqualFold(hnsNetworkInfo.networkType, NETWORK_TYPE_OVERLAY) {
2020-08-10 17:43:49 +00:00
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinOverlay) {
2019-04-07 17:07:55 +00:00
return nil, fmt.Errorf("WinOverlay feature gate not enabled")
}
err = hcn.RemoteSubnetSupported()
if err != nil {
return nil, err
}
sourceVip = config.SourceVip
if len(sourceVip) == 0 {
return nil, fmt.Errorf("source-vip flag not set")
}
interfaces, _ := net.Interfaces() //TODO create interfaces
for _, inter := range interfaces {
addresses, _ := inter.Addrs()
for _, addr := range addresses {
addrIP, _, _ := net.ParseCIDR(addr.String())
if addrIP.String() == nodeIP.String() {
klog.V(2).InfoS("record Host MAC address", "addr", inter.HardwareAddr.String())
2019-04-07 17:07:55 +00:00
hostMac = inter.HardwareAddr.String()
}
}
}
if len(hostMac) == 0 {
return nil, fmt.Errorf("Could not find host mac address for %s", nodeIP)
}
}
2019-01-12 04:58:27 +00:00
2020-08-10 17:43:49 +00:00
isIPv6 := utilnet.IsIPv6(nodeIP)
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.WindowsEndpointSliceProxying)
2019-01-12 04:58:27 +00:00
proxier := &Proxier{
2020-08-10 17:43:49 +00:00
endPointsRefCount: make(endPointsReferenceCountMap),
serviceMap: make(proxy.ServiceMap),
endpointsMap: make(proxy.EndpointsMap),
2019-12-12 01:27:03 +00:00
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
hns: hns,
network: *hnsNetworkInfo,
sourceVip: sourceVip,
hostMac: hostMac,
isDSR: isDSR,
2020-08-10 17:43:49 +00:00
supportedFeatures: supportedFeatures,
isIPv6Mode: isIPv6,
2019-01-12 04:58:27 +00:00
}
ipFamily := v1.IPv4Protocol
if isIPv6 {
ipFamily = v1.IPv6Protocol
}
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
endPointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, endpointSlicesEnabled, proxier.endpointsMapChange)
2020-08-10 17:43:49 +00:00
proxier.endpointsChanges = endPointChangeTracker
proxier.serviceChanges = serviceChanges
2019-01-12 04:58:27 +00:00
burstSyncs := 2
klog.V(3).InfoS("record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
2019-01-12 04:58:27 +00:00
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
return proxier, nil
2020-08-10 17:43:49 +00:00
}
func NewDualStackProxier(
syncPeriod time.Duration,
minSyncPeriod time.Duration,
masqueradeAll bool,
masqueradeBit int,
clusterCIDR string,
hostname string,
nodeIP [2]net.IP,
recorder record.EventRecorder,
healthzServer healthcheck.ProxierHealthUpdater,
config config.KubeProxyWinkernelConfiguration,
) (proxy.Provider, error) {
// Create an ipv4 instance of the single-stack proxier
ipv4Proxier, err := NewProxier(syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit,
clusterCIDR, hostname, nodeIP[0], recorder, healthzServer, config)
if err != nil {
return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIP[0])
}
ipv6Proxier, err := NewProxier(syncPeriod, minSyncPeriod, masqueradeAll, masqueradeBit,
clusterCIDR, hostname, nodeIP[1], recorder, healthzServer, config)
if err != nil {
return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, clusterCIDR : %s, nodeIP:%v", err, hostname, clusterCIDR, nodeIP[1])
}
2019-01-12 04:58:27 +00:00
2020-08-10 17:43:49 +00:00
// Return a meta-proxier that dispatch calls between the two
// single-stack proxier instances
return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
2019-01-12 04:58:27 +00:00
}
// CleanupLeftovers removes all hns rules created by the Proxier
// It returns true if an error was encountered. Errors are logged.
func CleanupLeftovers() (encounteredError bool) {
// Delete all Hns Load Balancer Policies
deleteAllHnsLoadBalancerPolicy()
// TODO
// Delete all Hns Remote endpoints
return encounteredError
}
2020-08-10 17:43:49 +00:00
func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) {
2019-01-12 04:58:27 +00:00
Log(svcInfo, "Service Cleanup", 3)
// Skip the svcInfo.policyApplied check to remove all the policies
svcInfo.deleteAllHnsLoadBalancerPolicy()
// Cleanup Endpoints references
for _, ep := range endpoints {
2020-08-10 17:43:49 +00:00
epInfo, ok := ep.(*endpointsInfo)
if ok {
epInfo.Cleanup()
}
2019-01-12 04:58:27 +00:00
}
2019-04-07 17:07:55 +00:00
if svcInfo.remoteEndpoint != nil {
svcInfo.remoteEndpoint.Cleanup()
}
2019-01-12 04:58:27 +00:00
svcInfo.policyApplied = false
}
func (svcInfo *serviceInfo) deleteAllHnsLoadBalancerPolicy() {
// Remove the Hns Policy corresponding to this service
2019-04-07 17:07:55 +00:00
hns := svcInfo.hns
hns.deleteLoadBalancer(svcInfo.hnsID)
2019-01-12 04:58:27 +00:00
svcInfo.hnsID = ""
2019-04-07 17:07:55 +00:00
hns.deleteLoadBalancer(svcInfo.nodePorthnsID)
2019-01-12 04:58:27 +00:00
svcInfo.nodePorthnsID = ""
2019-08-30 18:33:25 +00:00
for _, externalIP := range svcInfo.externalIPs {
hns.deleteLoadBalancer(externalIP.hnsID)
externalIP.hnsID = ""
2019-01-12 04:58:27 +00:00
}
2019-08-30 18:33:25 +00:00
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
hns.deleteLoadBalancer(lbIngressIP.hnsID)
lbIngressIP.hnsID = ""
2019-01-12 04:58:27 +00:00
}
}
func deleteAllHnsLoadBalancerPolicy() {
plists, err := hcsshim.HNSListPolicyListRequest()
if err != nil {
return
}
for _, plist := range plists {
LogJson("policyList", plist, "Remove Policy", 3)
2019-01-12 04:58:27 +00:00
_, err = plist.Delete()
if err != nil {
klog.ErrorS(err, "Failed to delete policy list")
2019-01-12 04:58:27 +00:00
}
}
}
func getHnsNetworkInfo(hnsNetworkName string) (*hnsNetworkInfo, error) {
hnsnetwork, err := hcsshim.GetHNSNetworkByName(hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Failed to get HNS Network by name")
2019-01-12 04:58:27 +00:00
return nil, err
}
return &hnsNetworkInfo{
2019-04-07 17:07:55 +00:00
id: hnsnetwork.Id,
name: hnsnetwork.Name,
networkType: hnsnetwork.Type,
2019-01-12 04:58:27 +00:00
}, nil
}
// Sync is called to synchronize the proxier state to hns as soon as possible.
func (proxier *Proxier) Sync() {
2019-12-12 01:27:03 +00:00
if proxier.healthzServer != nil {
proxier.healthzServer.QueuedUpdate()
}
2020-08-10 17:43:49 +00:00
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
2019-01-12 04:58:27 +00:00
proxier.syncRunner.Run()
}
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil {
2019-12-12 01:27:03 +00:00
proxier.healthzServer.Updated()
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
// synthesize "last change queued" time as the informers are syncing.
metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
2019-01-12 04:58:27 +00:00
proxier.syncRunner.Loop(wait.NeverStop)
}
func (proxier *Proxier) setInitialized(value bool) {
var initialized int32
if value {
initialized = 1
}
atomic.StoreInt32(&proxier.initialized, initialized)
}
func (proxier *Proxier) isInitialized() bool {
return atomic.LoadInt32(&proxier.initialized) > 0
}
2020-08-10 17:43:49 +00:00
// OnServiceAdd is called whenever creation of new service object
// is observed.
2019-01-12 04:58:27 +00:00
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
2020-08-10 17:43:49 +00:00
proxier.OnServiceUpdate(nil, service)
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
2019-01-12 04:58:27 +00:00
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
2020-08-10 17:43:49 +00:00
if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
2019-12-12 01:27:03 +00:00
proxier.Sync()
2019-01-12 04:58:27 +00:00
}
}
2020-08-10 17:43:49 +00:00
// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
2019-01-12 04:58:27 +00:00
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
2020-08-10 17:43:49 +00:00
proxier.OnServiceUpdate(service, nil)
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
// OnServiceSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
2019-01-12 04:58:27 +00:00
func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
proxier.servicesSynced = true
2020-08-10 17:43:49 +00:00
if utilfeature.DefaultFeatureGate.Enabled(features.WindowsEndpointSliceProxying) {
proxier.setInitialized(proxier.endpointSlicesSynced)
} else {
proxier.setInitialized(proxier.endpointsSynced)
}
2019-01-12 04:58:27 +00:00
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
func shouldSkipService(svcName types.NamespacedName, service *v1.Service) bool {
// if ClusterIP is "None" or empty, skip proxying
if !helper.IsServiceIPSet(service) {
klog.V(3).InfoS("Skipping service due to clusterIP", "svcName", svcName.String(), "clusterIP", service.Spec.ClusterIP)
2019-01-12 04:58:27 +00:00
return true
}
// Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied
if service.Spec.Type == v1.ServiceTypeExternalName {
klog.V(3).InfoS("Skipping service due to Type=ExternalName", "svcName", svcName.String())
2019-01-12 04:58:27 +00:00
return true
}
return false
}
2020-08-10 17:43:49 +00:00
// OnEndpointsAdd is called whenever creation of new endpoints object
// is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
proxier.OnEndpointsUpdate(nil, endpoints)
}
2019-01-12 04:58:27 +00:00
2020-08-10 17:43:49 +00:00
// OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
2019-01-12 04:58:27 +00:00
2020-08-10 17:43:49 +00:00
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.Sync()
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
}
2019-01-12 04:58:27 +00:00
2020-08-10 17:43:49 +00:00
// OnEndpointsDelete is called whenever deletion of an existing endpoints
// object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
proxier.OnEndpointsUpdate(endpoints, nil)
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
// OnEndpointsSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed.
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
2019-12-12 01:27:03 +00:00
proxier.Sync()
2019-01-12 04:58:27 +00:00
}
}
2020-08-10 17:43:49 +00:00
// OnEndpointSliceUpdate is called whenever modification of an existing endpoint
// slice object is observed.
func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
2019-12-12 01:27:03 +00:00
proxier.Sync()
2019-01-12 04:58:27 +00:00
}
}
2020-08-10 17:43:49 +00:00
// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
// object is observed.
func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
2019-12-12 01:27:03 +00:00
proxier.Sync()
2019-01-12 04:58:27 +00:00
}
}
2020-08-10 17:43:49 +00:00
// OnEndpointSlicesSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointSlicesSynced() {
2019-01-12 04:58:27 +00:00
proxier.mu.Lock()
2020-08-10 17:43:49 +00:00
proxier.endpointSlicesSynced = true
proxier.setInitialized(proxier.servicesSynced)
2019-01-12 04:58:27 +00:00
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
2019-08-30 18:33:25 +00:00
func (proxier *Proxier) cleanupAllPolicies() {
2020-08-10 17:43:49 +00:00
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
2020-08-10 17:43:49 +00:00
continue
}
2019-08-30 18:33:25 +00:00
svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName])
}
}
func isNetworkNotFoundError(err error) bool {
if err == nil {
return false
}
if _, ok := err.(hcn.NetworkNotFoundError); ok {
return true
}
if _, ok := err.(hcsshim.NetworkNotFoundError); ok {
return true
}
return false
}
2019-01-12 04:58:27 +00:00
// This is where all of the hns save/restore calls happen.
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
start := time.Now()
defer func() {
2020-03-26 21:07:15 +00:00
SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
2019-01-12 04:58:27 +00:00
}()
// don't sync rules till we've received services and endpoints
2020-08-10 17:43:49 +00:00
if !proxier.isInitialized() {
klog.V(2).InfoS("Not syncing hns until Services and Endpoints have been received from master")
2019-01-12 04:58:27 +00:00
return
}
2019-08-30 18:33:25 +00:00
hnsNetworkName := proxier.network.name
hns := proxier.hns
prevNetworkID := proxier.network.id
updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
klog.InfoS("The HNS network %s is not present or has changed since the last sync. Please check the CNI deployment", "hnsNetworkName", hnsNetworkName)
2019-08-30 18:33:25 +00:00
proxier.cleanupAllPolicies()
if updatedNetwork != nil {
proxier.network = *updatedNetwork
}
return
}
2019-01-12 04:58:27 +00:00
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
2020-08-10 17:43:49 +00:00
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
2019-01-12 04:58:27 +00:00
2020-08-10 17:43:49 +00:00
staleServices := serviceUpdateResult.UDPStaleClusterIP
2019-01-12 04:58:27 +00:00
// merge stale services gathered from updateEndpointsMap
2020-08-10 17:43:49 +00:00
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).InfoS("Stale udp service", "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
2020-08-10 17:43:49 +00:00
staleServices.Insert(svcInfo.ClusterIP().String())
2019-01-12 04:58:27 +00:00
}
}
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
2019-08-30 18:33:25 +00:00
existingSourceVip, err := hns.getEndpointByIpAddress(proxier.sourceVip, hnsNetworkName)
if existingSourceVip == nil {
_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
}
if err != nil {
klog.ErrorS(err, "Source Vip endpoint creation failed")
2019-08-30 18:33:25 +00:00
return
}
}
klog.V(3).InfoS("Syncing Policies")
2019-01-12 04:58:27 +00:00
// Program HNS by adding corresponding policies for each service.
2020-08-10 17:43:49 +00:00
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
2020-08-10 17:43:49 +00:00
continue
}
2019-01-12 04:58:27 +00:00
if svcInfo.policyApplied {
klog.V(4).InfoS("Policy already applied", "spewConfig", spewSdump(svcInfo))
2019-01-12 04:58:27 +00:00
continue
}
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
2020-08-10 17:43:49 +00:00
serviceVipEndpoint, _ := hns.getEndpointByIpAddress(svcInfo.ClusterIP().String(), hnsNetworkName)
2019-04-07 17:07:55 +00:00
if serviceVipEndpoint == nil {
klog.V(4).InfoS("No existing remote endpoint", "ip", svcInfo.ClusterIP().String())
2019-04-07 17:07:55 +00:00
hnsEndpoint := &endpointsInfo{
2020-08-10 17:43:49 +00:00
ip: svcInfo.ClusterIP().String(),
2019-04-07 17:07:55 +00:00
isLocal: false,
macAddress: proxier.hostMac,
providerAddress: proxier.nodeIP.String(),
}
newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Remote endpoint creation failed for service VIP")
2019-04-07 17:07:55 +00:00
continue
}
2020-08-10 17:43:49 +00:00
newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
*newHnsEndpoint.refCount++
2019-04-07 17:07:55 +00:00
svcInfo.remoteEndpoint = newHnsEndpoint
}
}
var hnsEndpoints []endpointsInfo
2019-08-30 18:33:25 +00:00
var hnsLocalEndpoints []endpointsInfo
klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName.String())
2019-01-12 04:58:27 +00:00
// Create Remote endpoints for every endpoint, corresponding to the service
2019-04-07 17:07:55 +00:00
containsPublicIP := false
2020-03-26 21:07:15 +00:00
containsNodeIP := false
2019-01-12 04:58:27 +00:00
2020-08-10 17:43:49 +00:00
for _, epInfo := range proxier.endpointsMap[svcName] {
ep, ok := epInfo.(*endpointsInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointsInfo", "svcName", svcName.String())
continue
}
if !ep.IsReady() {
2020-08-10 17:43:49 +00:00
continue
}
2019-04-07 17:07:55 +00:00
var newHnsEndpoint *endpointsInfo
2019-01-12 04:58:27 +00:00
hnsNetworkName := proxier.network.name
var err error
// targetPort is zero if it is specified as a name in port.TargetPort, so the real port should be got from endpoints.
// Note that hcsshim.AddLoadBalancer() doesn't support endpoints with different ports, so only port from first endpoint is used.
// TODO(feiskyer): add support of different endpoint ports after hcsshim.AddLoadBalancer() add that.
if svcInfo.targetPort == 0 {
svcInfo.targetPort = int(ep.port)
}
if len(ep.hnsID) > 0 {
2019-04-07 17:07:55 +00:00
newHnsEndpoint, err = hns.getEndpointByID(ep.hnsID)
2019-01-12 04:58:27 +00:00
}
if newHnsEndpoint == nil {
// First check if an endpoint resource exists for this IP, on the current host
// A Local endpoint could exist here already
// A remote endpoint was already created and proxy was restarted
2020-08-10 17:43:49 +00:00
newHnsEndpoint, err = hns.getEndpointByIpAddress(ep.IP(), hnsNetworkName)
2019-01-12 04:58:27 +00:00
}
if newHnsEndpoint == nil {
2020-08-10 17:43:49 +00:00
if ep.GetIsLocal() {
klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
2019-01-12 04:58:27 +00:00
continue
}
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
klog.InfoS("Updating network to check for new remote subnet policies", "networkName", proxier.network.name)
2019-04-07 17:07:55 +00:00
networkName := proxier.network.name
updatedNetwork, err := hns.getNetworkByName(networkName)
if err != nil {
klog.ErrorS(err, "Unable to find HNS Network specified. Please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
2019-08-30 18:33:25 +00:00
proxier.cleanupAllPolicies()
return
2019-04-07 17:07:55 +00:00
}
proxier.network = *updatedNetwork
2020-08-10 17:43:49 +00:00
providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
2019-04-07 17:07:55 +00:00
if len(providerAddress) == 0 {
klog.InfoS("Could not find provider address. Assuming it is a public IP", "ip", ep.IP())
2019-04-07 17:07:55 +00:00
providerAddress = proxier.nodeIP.String()
}
2020-03-26 21:07:15 +00:00
2019-04-07 17:07:55 +00:00
hnsEndpoint := &endpointsInfo{
ip: ep.ip,
isLocal: false,
macAddress: conjureMac("02-11", net.ParseIP(ep.ip)),
providerAddress: providerAddress,
}
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Remote endpoint creation failed", "spewConfig", spewSdump(hnsEndpoint))
2019-04-07 17:07:55 +00:00
continue
}
} else {
2020-08-10 17:43:49 +00:00
2019-04-07 17:07:55 +00:00
hnsEndpoint := &endpointsInfo{
ip: ep.ip,
isLocal: false,
macAddress: ep.macAddress,
}
newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
if err != nil {
klog.ErrorS(err, "Remote endpoint creation failed")
2019-04-07 17:07:55 +00:00
continue
}
2019-01-12 04:58:27 +00:00
}
}
// For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as
// a) Source VIP configured on kube-proxy (or)
// b) Node IP of the current node
//
// For L2Bridge network the Source VIP is always the NodeIP of the current node and the same
// would be configured on kube-proxy as SourceVIP
//
// The logic for choosing the SourceVIP in Overlay networks is based on the backend endpoints:
// a) Endpoints are any IP's outside the cluster ==> Choose NodeIP as the SourceVIP
// b) Endpoints are IP addresses of a remote node => Choose NodeIP as the SourceVIP
// c) Everything else (Local POD's, Remote POD's, Node IP of current node) ==> Choose the configured SourceVIP
if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) && !ep.GetIsLocal() {
2020-08-10 17:43:49 +00:00
providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
isNodeIP := (ep.IP() == providerAddress)
isPublicIP := (len(providerAddress) == 0)
klog.InfoS("Endpoint on overlay network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName, "isNodeIP", isNodeIP, "isPublicIP", isPublicIP)
2020-08-10 17:43:49 +00:00
containsNodeIP = containsNodeIP || isNodeIP
containsPublicIP = containsPublicIP || isPublicIP
}
2019-01-12 04:58:27 +00:00
// Save the hnsId for reference
LogJson("endpointInfo", newHnsEndpoint, "Hns Endpoint resource", 1)
2019-01-12 04:58:27 +00:00
hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
2020-08-10 17:43:49 +00:00
if newHnsEndpoint.GetIsLocal() {
2019-08-30 18:33:25 +00:00
hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
2020-08-10 17:43:49 +00:00
} else {
// We only share the refCounts for remote endpoints
ep.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
*ep.refCount++
2019-08-30 18:33:25 +00:00
}
2020-08-10 17:43:49 +00:00
2019-04-07 17:07:55 +00:00
ep.hnsID = newHnsEndpoint.hnsID
2020-08-10 17:43:49 +00:00
2019-01-12 04:58:27 +00:00
Log(ep, "Endpoint resource found", 3)
}
klog.V(3).InfoS("Associated endpoints for service", "spewConfig", spewSdump(hnsEndpoints), "svcName", svcName.String())
2019-01-12 04:58:27 +00:00
if len(svcInfo.hnsID) > 0 {
// This should not happen
klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID)
2019-01-12 04:58:27 +00:00
}
if len(hnsEndpoints) == 0 {
klog.ErrorS(nil, "Endpoint information not available for service. Not applying any policy", "svcName", svcName.String())
2019-01-12 04:58:27 +00:00
continue
}
klog.V(4).Infof("Trying to Apply Policies for service", "spewConfig", spewSdump(svcInfo))
2019-04-07 17:07:55 +00:00
var hnsLoadBalancer *loadBalancerInfo
var sourceVip = proxier.sourceVip
2020-03-26 21:07:15 +00:00
if containsPublicIP || containsNodeIP {
2019-04-07 17:07:55 +00:00
sourceVip = proxier.nodeIP.String()
}
2020-08-10 17:43:49 +00:00
sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity {
klog.InfoS("Session Affinity is not supported on this version of Windows.")
2020-08-10 17:43:49 +00:00
}
2019-04-07 17:07:55 +00:00
hnsLoadBalancer, err := hns.getLoadBalancer(
2019-01-12 04:58:27 +00:00
hnsEndpoints,
2020-08-10 17:43:49 +00:00
loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP},
2019-04-07 17:07:55 +00:00
sourceVip,
2020-08-10 17:43:49 +00:00
svcInfo.ClusterIP().String(),
Enum(svcInfo.Protocol()),
2019-01-12 04:58:27 +00:00
uint16(svcInfo.targetPort),
2020-08-10 17:43:49 +00:00
uint16(svcInfo.Port()),
2019-01-12 04:58:27 +00:00
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
2019-01-12 04:58:27 +00:00
continue
}
2019-04-07 17:07:55 +00:00
svcInfo.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
2019-01-12 04:58:27 +00:00
// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
2020-08-10 17:43:49 +00:00
if svcInfo.NodePort() > 0 {
2020-03-26 21:07:15 +00:00
// If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
// This means that health services can use Node Port without falsely getting results from a different node.
nodePortEndpoints := hnsEndpoints
if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
2020-03-26 21:07:15 +00:00
nodePortEndpoints = hnsLocalEndpoints
}
2019-04-07 17:07:55 +00:00
hnsLoadBalancer, err := hns.getLoadBalancer(
2020-03-26 21:07:15 +00:00
nodePortEndpoints,
loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
2019-04-07 17:07:55 +00:00
sourceVip,
"",
2020-08-10 17:43:49 +00:00
Enum(svcInfo.Protocol()),
2019-01-12 04:58:27 +00:00
uint16(svcInfo.targetPort),
2020-08-10 17:43:49 +00:00
uint16(svcInfo.NodePort()),
2019-01-12 04:58:27 +00:00
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
2019-01-12 04:58:27 +00:00
continue
}
2019-04-07 17:07:55 +00:00
svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
2019-01-12 04:58:27 +00:00
}
// Create a Load Balancer Policy for each external IP
2019-08-30 18:33:25 +00:00
for _, externalIP := range svcInfo.externalIPs {
// Disable routing mesh if ExternalTrafficPolicy is set to local
externalIPEndpoints := hnsEndpoints
if svcInfo.localTrafficDSR {
externalIPEndpoints = hnsLocalEndpoints
}
2019-01-12 04:58:27 +00:00
// Try loading existing policies, if already available
2019-04-07 17:07:55 +00:00
hnsLoadBalancer, err = hns.getLoadBalancer(
externalIPEndpoints,
loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
2019-04-07 17:07:55 +00:00
sourceVip,
2019-08-30 18:33:25 +00:00
externalIP.ip,
2020-08-10 17:43:49 +00:00
Enum(svcInfo.Protocol()),
2019-01-12 04:58:27 +00:00
uint16(svcInfo.targetPort),
2020-08-10 17:43:49 +00:00
uint16(svcInfo.Port()),
2019-01-12 04:58:27 +00:00
)
if err != nil {
klog.ErrorS(err, "Policy creation failed", err)
2019-01-12 04:58:27 +00:00
continue
}
2019-08-30 18:33:25 +00:00
externalIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
2019-01-12 04:58:27 +00:00
}
// Create a Load Balancer Policy for each loadbalancer ingress
2019-08-30 18:33:25 +00:00
for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
2019-01-12 04:58:27 +00:00
// Try loading existing policies, if already available
2019-08-30 18:33:25 +00:00
lbIngressEndpoints := hnsEndpoints
if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
2019-08-30 18:33:25 +00:00
lbIngressEndpoints = hnsLocalEndpoints
}
2019-04-07 17:07:55 +00:00
hnsLoadBalancer, err := hns.getLoadBalancer(
2019-08-30 18:33:25 +00:00
lbIngressEndpoints,
loadBalancerFlags{isDSR: svcInfo.preserveDIP || proxier.isDSR || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode},
2019-04-07 17:07:55 +00:00
sourceVip,
2019-08-30 18:33:25 +00:00
lbIngressIP.ip,
2020-08-10 17:43:49 +00:00
Enum(svcInfo.Protocol()),
2019-01-12 04:58:27 +00:00
uint16(svcInfo.targetPort),
2020-08-10 17:43:49 +00:00
uint16(svcInfo.Port()),
2019-01-12 04:58:27 +00:00
)
if err != nil {
klog.ErrorS(err, "Policy creation failed")
2019-01-12 04:58:27 +00:00
continue
}
2019-08-30 18:33:25 +00:00
lbIngressIP.hnsID = hnsLoadBalancer.hnsID
klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
2019-01-12 04:58:27 +00:00
}
svcInfo.policyApplied = true
Log(svcInfo, "+++Policy Successfully applied for service +++", 2)
}
if proxier.healthzServer != nil {
2019-12-12 01:27:03 +00:00
proxier.healthzServer.Updated()
2019-01-12 04:58:27 +00:00
}
2019-07-14 07:58:54 +00:00
SyncProxyRulesLastTimestamp.SetToCurrentTime()
2019-01-12 04:58:27 +00:00
2019-12-12 01:27:03 +00:00
// Update service healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the serviceHealthServer
2019-01-12 04:58:27 +00:00
// will just drop those endpoints.
2020-08-10 17:43:49 +00:00
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.ErrorS(err, "Error syncing healthcheck services")
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
2019-01-12 04:58:27 +00:00
}
// Finish housekeeping.
// TODO: these could be made more consistent.
for _, svcIP := range staleServices.UnsortedList() {
// TODO : Check if this is required to cleanup stale services here
klog.V(5).InfoS("Pending delete stale service IP connections", "ip", svcIP)
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
// remove stale endpoint refcount entries
for hnsID, referenceCount := range proxier.endPointsRefCount {
if *referenceCount <= 0 {
delete(proxier.endPointsRefCount, hnsID)
}
}
2019-01-12 04:58:27 +00:00
}