2019-01-12 04:58:27 +00:00
/ *
Copyright 2017 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package proxy
import (
"fmt"
"net"
"reflect"
"strings"
"sync"
2020-08-10 17:43:49 +00:00
"k8s.io/klog/v2"
2019-01-12 04:58:27 +00:00
2020-08-10 17:43:49 +00:00
v1 "k8s.io/api/core/v1"
2019-01-12 04:58:27 +00:00
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
apiservice "k8s.io/kubernetes/pkg/api/v1/service"
2019-07-14 07:58:54 +00:00
"k8s.io/kubernetes/pkg/proxy/metrics"
2019-01-12 04:58:27 +00:00
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
)
// BaseServiceInfo contains base information that defines a service.
// This could be used directly by proxier while processing services,
// or can be used for constructing a more specific ServiceInfo struct
// defined by the proxier if needed.
type BaseServiceInfo struct {
2019-09-27 21:51:53 +00:00
clusterIP net . IP
port int
protocol v1 . Protocol
nodePort int
loadBalancerStatus v1 . LoadBalancerStatus
sessionAffinityType v1 . ServiceAffinity
stickyMaxAgeSeconds int
externalIPs [ ] string
loadBalancerSourceRanges [ ] string
healthCheckNodePort int
onlyNodeLocalEndpoints bool
2019-12-12 01:27:03 +00:00
topologyKeys [ ] string
2019-01-12 04:58:27 +00:00
}
var _ ServicePort = & BaseServiceInfo { }
// String is part of ServicePort interface.
func ( info * BaseServiceInfo ) String ( ) string {
2019-09-27 21:51:53 +00:00
return fmt . Sprintf ( "%s:%d/%s" , info . clusterIP , info . port , info . protocol )
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
// ClusterIP is part of ServicePort interface.
func ( info * BaseServiceInfo ) ClusterIP ( ) net . IP {
return info . clusterIP
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
// Port is part of ServicePort interface.
func ( info * BaseServiceInfo ) Port ( ) int {
return info . port
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
// SessionAffinityType is part of the ServicePort interface.
func ( info * BaseServiceInfo ) SessionAffinityType ( ) v1 . ServiceAffinity {
return info . sessionAffinityType
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
// StickyMaxAgeSeconds is part of the ServicePort interface
func ( info * BaseServiceInfo ) StickyMaxAgeSeconds ( ) int {
return info . stickyMaxAgeSeconds
}
// Protocol is part of ServicePort interface.
func ( info * BaseServiceInfo ) Protocol ( ) v1 . Protocol {
return info . protocol
}
// LoadBalancerSourceRanges is part of ServicePort interface
func ( info * BaseServiceInfo ) LoadBalancerSourceRanges ( ) [ ] string {
return info . loadBalancerSourceRanges
}
// HealthCheckNodePort is part of ServicePort interface.
func ( info * BaseServiceInfo ) HealthCheckNodePort ( ) int {
return info . healthCheckNodePort
}
// NodePort is part of the ServicePort interface.
func ( info * BaseServiceInfo ) NodePort ( ) int {
return info . nodePort
2019-04-07 17:07:55 +00:00
}
// ExternalIPStrings is part of ServicePort interface.
func ( info * BaseServiceInfo ) ExternalIPStrings ( ) [ ] string {
2019-09-27 21:51:53 +00:00
return info . externalIPs
2019-04-07 17:07:55 +00:00
}
// LoadBalancerIPStrings is part of ServicePort interface.
func ( info * BaseServiceInfo ) LoadBalancerIPStrings ( ) [ ] string {
var ips [ ] string
2019-09-27 21:51:53 +00:00
for _ , ing := range info . loadBalancerStatus . Ingress {
2019-04-07 17:07:55 +00:00
ips = append ( ips , ing . IP )
}
return ips
}
2019-09-27 21:51:53 +00:00
// OnlyNodeLocalEndpoints is part of ServicePort interface.
func ( info * BaseServiceInfo ) OnlyNodeLocalEndpoints ( ) bool {
return info . onlyNodeLocalEndpoints
}
2019-12-12 01:27:03 +00:00
// TopologyKeys is part of ServicePort interface.
func ( info * BaseServiceInfo ) TopologyKeys ( ) [ ] string {
return info . topologyKeys
}
2019-01-12 04:58:27 +00:00
func ( sct * ServiceChangeTracker ) newBaseServiceInfo ( port * v1 . ServicePort , service * v1 . Service ) * BaseServiceInfo {
onlyNodeLocalEndpoints := false
if apiservice . RequestsOnlyLocalTraffic ( service ) {
onlyNodeLocalEndpoints = true
}
var stickyMaxAgeSeconds int
if service . Spec . SessionAffinity == v1 . ServiceAffinityClientIP {
// Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP
stickyMaxAgeSeconds = int ( * service . Spec . SessionAffinityConfig . ClientIP . TimeoutSeconds )
}
2020-12-01 01:06:26 +00:00
clusterIP := utilproxy . GetClusterIPByFamily ( sct . ipFamily , service )
2019-01-12 04:58:27 +00:00
info := & BaseServiceInfo {
2020-12-01 01:06:26 +00:00
clusterIP : net . ParseIP ( clusterIP ) ,
2020-03-13 19:24:50 +00:00
port : int ( port . Port ) ,
protocol : port . Protocol ,
nodePort : int ( port . NodePort ) ,
2019-09-27 21:51:53 +00:00
sessionAffinityType : service . Spec . SessionAffinity ,
stickyMaxAgeSeconds : stickyMaxAgeSeconds ,
onlyNodeLocalEndpoints : onlyNodeLocalEndpoints ,
2019-12-12 01:27:03 +00:00
topologyKeys : service . Spec . TopologyKeys ,
2019-01-12 04:58:27 +00:00
}
2020-11-14 08:06:46 +00:00
loadBalancerSourceRanges := make ( [ ] string , len ( service . Spec . LoadBalancerSourceRanges ) )
for i , sourceRange := range service . Spec . LoadBalancerSourceRanges {
loadBalancerSourceRanges [ i ] = strings . TrimSpace ( sourceRange )
}
2020-12-01 01:06:26 +00:00
// filter external ips, source ranges and ingress ips
// prior to dual stack services, this was considered an error, but with dual stack
// services, this is actually expected. Hence we downgraded from reporting by events
// to just log lines with high verbosity
var incorrectIPs [ ] string
info . externalIPs , incorrectIPs = utilproxy . FilterIncorrectIPVersion ( service . Spec . ExternalIPs , sct . ipFamily )
if len ( incorrectIPs ) > 0 {
klog . V ( 4 ) . Infof ( "service change tracker(%v) ignored the following external IPs(%s) for service %v/%v as they don't match IPFamily" , sct . ipFamily , strings . Join ( incorrectIPs , "," ) , service . Namespace , service . Name )
}
info . loadBalancerSourceRanges , incorrectIPs = utilproxy . FilterIncorrectCIDRVersion ( loadBalancerSourceRanges , sct . ipFamily )
if len ( incorrectIPs ) > 0 {
klog . V ( 4 ) . Infof ( "service change tracker(%v) ignored the following load balancer source ranges(%s) for service %v/%v as they don't match IPFamily" , sct . ipFamily , strings . Join ( incorrectIPs , "," ) , service . Namespace , service . Name )
}
// Obtain Load Balancer Ingress IPs
var ips [ ] string
for _ , ing := range service . Status . LoadBalancer . Ingress {
ips = append ( ips , ing . IP )
}
if len ( ips ) > 0 {
correctIPs , incorrectIPs := utilproxy . FilterIncorrectIPVersion ( ips , sct . ipFamily )
2020-11-14 08:06:46 +00:00
2019-01-12 04:58:27 +00:00
if len ( incorrectIPs ) > 0 {
2020-12-01 01:06:26 +00:00
klog . V ( 4 ) . Infof ( "service change tracker(%v) ignored the following load balancer(%s) ingress ips for service %v/%v as they don't match IPFamily" , sct . ipFamily , strings . Join ( incorrectIPs , "," ) , service . Namespace , service . Name )
2020-03-13 19:24:50 +00:00
}
2020-12-01 01:06:26 +00:00
// Create the LoadBalancerStatus with the filtered IPs
for _ , ip := range correctIPs {
info . loadBalancerStatus . Ingress = append ( info . loadBalancerStatus . Ingress , v1 . LoadBalancerIngress { IP : ip } )
2020-03-13 19:24:50 +00:00
}
2019-01-12 04:58:27 +00:00
}
if apiservice . NeedsHealthCheck ( service ) {
p := service . Spec . HealthCheckNodePort
if p == 0 {
klog . Errorf ( "Service %s/%s has no healthcheck nodeport" , service . Namespace , service . Name )
} else {
2019-09-27 21:51:53 +00:00
info . healthCheckNodePort = int ( p )
2019-01-12 04:58:27 +00:00
}
}
return info
}
type makeServicePortFunc func ( * v1 . ServicePort , * v1 . Service , * BaseServiceInfo ) ServicePort
2020-08-10 17:43:49 +00:00
// This handler is invoked by the apply function on every change. This function should not modify the
// ServiceMap's but just use the changes for any Proxier specific cleanup.
type processServiceMapChangeFunc func ( previous , current ServiceMap )
2019-01-12 04:58:27 +00:00
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
type serviceChange struct {
previous ServiceMap
current ServiceMap
}
// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
// Services, keyed by their namespace and name.
type ServiceChangeTracker struct {
// lock protects items.
lock sync . Mutex
// items maps a service to its serviceChange.
items map [ types . NamespacedName ] * serviceChange
// makeServiceInfo allows proxier to inject customized information when processing service.
2020-08-10 17:43:49 +00:00
makeServiceInfo makeServicePortFunc
processServiceMapChange processServiceMapChangeFunc
2020-12-01 01:06:26 +00:00
ipFamily v1 . IPFamily
recorder record . EventRecorder
2019-01-12 04:58:27 +00:00
}
// NewServiceChangeTracker initializes a ServiceChangeTracker
2020-12-01 01:06:26 +00:00
func NewServiceChangeTracker ( makeServiceInfo makeServicePortFunc , ipFamily v1 . IPFamily , recorder record . EventRecorder , processServiceMapChange processServiceMapChangeFunc ) * ServiceChangeTracker {
2019-01-12 04:58:27 +00:00
return & ServiceChangeTracker {
2020-08-10 17:43:49 +00:00
items : make ( map [ types . NamespacedName ] * serviceChange ) ,
makeServiceInfo : makeServiceInfo ,
recorder : recorder ,
2020-12-01 01:06:26 +00:00
ipFamily : ipFamily ,
2020-08-10 17:43:49 +00:00
processServiceMapChange : processServiceMapChange ,
2019-01-12 04:58:27 +00:00
}
}
// Update updates given service's change map based on the <previous, current> service pair. It returns true if items changed,
// otherwise return false. Update can be used to add/update/delete items of ServiceChangeMap. For example,
// Add item
// - pass <nil, service> as the <previous, current> pair.
// Update item
// - pass <oldService, service> as the <previous, current> pair.
// Delete item
// - pass <service, nil> as the <previous, current> pair.
func ( sct * ServiceChangeTracker ) Update ( previous , current * v1 . Service ) bool {
svc := current
if svc == nil {
svc = previous
}
// previous == nil && current == nil is unexpected, we should return false directly.
if svc == nil {
return false
}
2019-07-14 07:58:54 +00:00
metrics . ServiceChangesTotal . Inc ( )
2019-01-12 04:58:27 +00:00
namespacedName := types . NamespacedName { Namespace : svc . Namespace , Name : svc . Name }
sct . lock . Lock ( )
defer sct . lock . Unlock ( )
change , exists := sct . items [ namespacedName ]
if ! exists {
change = & serviceChange { }
change . previous = sct . serviceToServiceMap ( previous )
sct . items [ namespacedName ] = change
}
change . current = sct . serviceToServiceMap ( current )
// if change.previous equal to change.current, it means no change
if reflect . DeepEqual ( change . previous , change . current ) {
delete ( sct . items , namespacedName )
2020-08-10 17:43:49 +00:00
} else {
klog . V ( 2 ) . Infof ( "Service %s updated: %d ports" , namespacedName , len ( change . current ) )
2019-01-12 04:58:27 +00:00
}
2019-07-14 07:58:54 +00:00
metrics . ServiceChangesPending . Set ( float64 ( len ( sct . items ) ) )
2019-01-12 04:58:27 +00:00
return len ( sct . items ) > 0
}
// UpdateServiceMapResult is the updated results after applying service changes.
type UpdateServiceMapResult struct {
// HCServiceNodePorts is a map of Service names to node port numbers which indicate the health of that Service on this Node.
// The value(uint16) of HCServices map is the service health check node port.
HCServiceNodePorts map [ types . NamespacedName ] uint16
// UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports.
// Callers can use this to abort timeout-waits or clear connection-tracking information.
UDPStaleClusterIP sets . String
}
// UpdateServiceMap updates ServiceMap based on the given changes.
func UpdateServiceMap ( serviceMap ServiceMap , changes * ServiceChangeTracker ) ( result UpdateServiceMapResult ) {
result . UDPStaleClusterIP = sets . NewString ( )
serviceMap . apply ( changes , result . UDPStaleClusterIP )
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
result . HCServiceNodePorts = make ( map [ types . NamespacedName ] uint16 )
for svcPortName , info := range serviceMap {
2019-09-27 21:51:53 +00:00
if info . HealthCheckNodePort ( ) != 0 {
result . HCServiceNodePorts [ svcPortName . NamespacedName ] = uint16 ( info . HealthCheckNodePort ( ) )
2019-01-12 04:58:27 +00:00
}
}
return result
}
// ServiceMap maps a service to its ServicePort.
type ServiceMap map [ ServicePortName ] ServicePort
// serviceToServiceMap translates a single Service object to a ServiceMap.
//
// NOTE: service object should NOT be modified.
func ( sct * ServiceChangeTracker ) serviceToServiceMap ( service * v1 . Service ) ServiceMap {
if service == nil {
return nil
}
2020-08-10 17:43:49 +00:00
if utilproxy . ShouldSkipService ( service ) {
2019-01-12 04:58:27 +00:00
return nil
}
2020-12-01 01:06:26 +00:00
clusterIP := utilproxy . GetClusterIPByFamily ( sct . ipFamily , service )
if clusterIP == "" {
return nil
2019-01-12 04:58:27 +00:00
}
serviceMap := make ( ServiceMap )
2020-08-10 17:43:49 +00:00
svcName := types . NamespacedName { Namespace : service . Namespace , Name : service . Name }
2019-01-12 04:58:27 +00:00
for i := range service . Spec . Ports {
servicePort := & service . Spec . Ports [ i ]
2019-12-12 01:27:03 +00:00
svcPortName := ServicePortName { NamespacedName : svcName , Port : servicePort . Name , Protocol : servicePort . Protocol }
2019-01-12 04:58:27 +00:00
baseSvcInfo := sct . newBaseServiceInfo ( servicePort , service )
if sct . makeServiceInfo != nil {
serviceMap [ svcPortName ] = sct . makeServiceInfo ( servicePort , service , baseSvcInfo )
} else {
serviceMap [ svcPortName ] = baseSvcInfo
}
}
return serviceMap
}
// apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the
// udp protocol service cluster ip when service is deleted from the ServiceMap.
2020-08-10 17:43:49 +00:00
// apply triggers processServiceMapChange on every change.
2019-08-30 18:33:25 +00:00
func ( sm * ServiceMap ) apply ( changes * ServiceChangeTracker , UDPStaleClusterIP sets . String ) {
2019-01-12 04:58:27 +00:00
changes . lock . Lock ( )
defer changes . lock . Unlock ( )
for _ , change := range changes . items {
2020-08-10 17:43:49 +00:00
if changes . processServiceMapChange != nil {
changes . processServiceMapChange ( change . previous , change . current )
}
2019-08-30 18:33:25 +00:00
sm . merge ( change . current )
2019-01-12 04:58:27 +00:00
// filter out the Update event of current changes from previous changes before calling unmerge() so that can
// skip deleting the Update events.
change . previous . filter ( change . current )
2019-08-30 18:33:25 +00:00
sm . unmerge ( change . previous , UDPStaleClusterIP )
2019-01-12 04:58:27 +00:00
}
// clear changes after applying them to ServiceMap.
changes . items = make ( map [ types . NamespacedName ] * serviceChange )
2019-07-14 07:58:54 +00:00
metrics . ServiceChangesPending . Set ( 0 )
2019-01-12 04:58:27 +00:00
}
// merge adds other ServiceMap's elements to current ServiceMap.
// If collision, other ALWAYS win. Otherwise add the other to current.
// In other words, if some elements in current collisions with other, update the current by other.
// It returns a string type set which stores all the newly merged services' identifier, ServicePortName.String(), to help users
// tell if a service is deleted or updated.
// The returned value is one of the arguments of ServiceMap.unmerge().
// ServiceMap A Merge ServiceMap B will do following 2 things:
// * update ServiceMap A.
// * produce a string set which stores all other ServiceMap's ServicePortName.String().
// For example,
// - A{}
// - B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// - A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// - produce string set {"ns/cluster-ip:http"}
// - A{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 345, "UDP"}}
// - B{{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// - A updated to be {{"ns", "cluster-ip", "http"}: {"172.16.55.10", 1234, "TCP"}}
// - produce string set {"ns/cluster-ip:http"}
func ( sm * ServiceMap ) merge ( other ServiceMap ) sets . String {
// existingPorts is going to store all identifiers of all services in `other` ServiceMap.
existingPorts := sets . NewString ( )
for svcPortName , info := range other {
// Take ServicePortName.String() as the newly merged service's identifier and put it into existingPorts.
existingPorts . Insert ( svcPortName . String ( ) )
_ , exists := ( * sm ) [ svcPortName ]
if ! exists {
klog . V ( 1 ) . Infof ( "Adding new service port %q at %s" , svcPortName , info . String ( ) )
} else {
klog . V ( 1 ) . Infof ( "Updating existing service port %q at %s" , svcPortName , info . String ( ) )
}
( * sm ) [ svcPortName ] = info
}
return existingPorts
}
// filter filters out elements from ServiceMap base on given ports string sets.
func ( sm * ServiceMap ) filter ( other ServiceMap ) {
for svcPortName := range * sm {
// skip the delete for Update event.
if _ , ok := other [ svcPortName ] ; ok {
delete ( * sm , svcPortName )
}
}
}
// unmerge deletes all other ServiceMap's elements from current ServiceMap. We pass in the UDPStaleClusterIP strings sets
// for storing the stale udp service cluster IPs. We will clear stale udp connection base on UDPStaleClusterIP later
func ( sm * ServiceMap ) unmerge ( other ServiceMap , UDPStaleClusterIP sets . String ) {
for svcPortName := range other {
info , exists := ( * sm ) [ svcPortName ]
if exists {
klog . V ( 1 ) . Infof ( "Removing service port %q" , svcPortName )
2019-09-27 21:51:53 +00:00
if info . Protocol ( ) == v1 . ProtocolUDP {
UDPStaleClusterIP . Insert ( info . ClusterIP ( ) . String ( ) )
2019-01-12 04:58:27 +00:00
}
delete ( * sm , svcPortName )
} else {
klog . Errorf ( "Service port %q doesn't exists" , svcPortName )
}
}
}