GCE Cloud provider changes for ESIPP

Add feature gate (ExternalTrafficLocalOnly) for alpha feature
pull/6/head
Girish Kalele 2016-08-15 11:22:44 -07:00
parent 4b55492570
commit b82c028f77
8 changed files with 629 additions and 62 deletions

View File

@ -16,6 +16,13 @@ limitations under the License.
package service
import (
"strconv"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
)
const (
// AnnotationLoadBalancerSourceRangesKey is the key of the annotation on a service to set allowed ingress ranges on their LoadBalancers
//
@ -25,4 +32,58 @@ const (
//
// Not all cloud providers support this annotation, though AWS & GCE do.
AnnotationLoadBalancerSourceRangesKey = "service.beta.kubernetes.io/load-balancer-source-ranges"
// AnnotationExternalTraffic An annotation that denotes if this Service desires to route external traffic to local
// endpoints only. This preserves Source IP and avoids a second hop.
AnnotationExternalTraffic = "service.alpha.kubernetes.io/external-traffic"
// AnnotationValueExternalTrafficLocal Value of annotation to specify local endpoints behaviour
AnnotationValueExternalTrafficLocal = "OnlyLocal"
// AnnotationValueExternalTrafficGlobal Value of annotation to specify global (legacy) behaviour
AnnotationValueExternalTrafficGlobal = "Global"
// AnnotationHealthCheckNodePort Annotation specifying the healthcheck nodePort for the service
// If not specified, annotation is created by the service api backend with the allocated nodePort
// Will use user-specified nodePort value if specified by the client
AnnotationHealthCheckNodePort = "service.alpha.kubernetes.io/healthcheck-nodeport"
)
// NeedsHealthCheck Check service for health check annotations
func NeedsHealthCheck(service *api.Service) bool {
if l, ok := service.Annotations[AnnotationExternalTraffic]; ok {
if l == AnnotationValueExternalTrafficLocal {
return true
} else if l == AnnotationValueExternalTrafficGlobal {
return false
} else {
glog.Errorf("Invalid value for annotation %v", AnnotationExternalTraffic)
return false
}
}
return false
}
// GetServiceHealthCheckNodePort Return health check node port annotation for service, if one exists
func GetServiceHealthCheckNodePort(service *api.Service) int32 {
if NeedsHealthCheck(service) {
if l, ok := service.Annotations[AnnotationHealthCheckNodePort]; ok {
p, err := strconv.Atoi(l)
if err != nil {
glog.Errorf("Failed to parse annotation %v: %v", AnnotationHealthCheckNodePort, err)
return 0
}
return int32(p)
}
}
return 0
}
// GetServiceHealthCheckPathPort Return the path and nodePort programmed into the Cloud LB Health Check
func GetServiceHealthCheckPathPort(service *api.Service) (string, int32) {
if !NeedsHealthCheck(service) {
return "", 0
}
port := GetServiceHealthCheckNodePort(service)
if port == 0 {
return "", 0
}
return "/healthz", port
}

View File

@ -31,7 +31,7 @@ import (
"gopkg.in/gcfg.v1"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/service"
apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
@ -70,8 +70,16 @@ const (
// were to continuously return a nextPageToken.
maxPages = 25
// Target Pool creation is limited to 200 instances.
maxTargetPoolCreateInstances = 200
// HTTP Load Balancer parameters
// Configure 2 second period for external health checks.
gceHcCheckIntervalSeconds = int64(2)
gceHcTimeoutSeconds = int64(1)
// Start sending requests as soon as a pod is found on the node.
gceHcHealthyThreshold = int64(1)
// Defaults to 5 * 2 = 10 seconds before the LB will steer traffic away
gceHcUnhealthyThreshold = int64(5)
)
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
@ -665,7 +673,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv
// is because the forwarding rule is used as the indicator that the load
// balancer is fully created - it's what getLoadBalancer checks for.
// Check if user specified the allow source range
sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService)
sourceRanges, err := apiservice.GetLoadBalancerSourceRanges(apiService)
if err != nil {
return nil, err
}
@ -720,7 +728,18 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
}
if tpExists && tpNeedsUpdate {
if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil {
// Generate the list of health checks for this target pool to pass to deleteTargetPool
var hc *compute.HttpHealthCheck
if path, _ := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
var err error
hc, err = gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
}
}
// Pass healthchecks to deleteTargetPool to cleanup health checks prior to cleaning up the target pool itself.
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hc); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
@ -734,7 +753,18 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv
createInstances = createInstances[:maxTargetPoolCreateInstances]
}
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType); err != nil {
// Create health checks for this target pool to pass to createTargetPool for health check links
var hc *compute.HttpHealthCheck
if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
glog.Infof("service %v needs health checks on :%d/%s)", apiService.Name, healthCheckNodePort, path)
var err error
hc, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
if err != nil {
return nil, fmt.Errorf("Failed to create health check for localized service %v on node port %v: %v", loadBalancerName, healthCheckNodePort, err)
}
}
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType, hc); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
if len(hosts) <= maxTargetPoolCreateInstances {
@ -767,9 +797,53 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: ipAddress}}
return status, nil
}
func makeHealthCheckDescription(serviceName string) string {
return fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName)
}
func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) {
newHC := &compute.HttpHealthCheck{
Name: name,
Port: int64(port),
RequestPath: path,
Host: "",
Description: makeHealthCheckDescription(name),
CheckIntervalSec: gceHcCheckIntervalSeconds,
TimeoutSec: gceHcTimeoutSeconds,
HealthyThreshold: gceHcHealthyThreshold,
UnhealthyThreshold: gceHcUnhealthyThreshold,
}
hc, err = gce.GetHttpHealthCheck(name)
if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path)
if err = gce.CreateHttpHealthCheck(newHC); err != nil {
return nil, err
}
hc, err = gce.GetHttpHealthCheck(name)
if err != nil {
glog.Errorf("Failed to get http health check %v", err)
return nil, err
}
return hc, nil
}
// Validate health check fields
drift := hc.Port != int64(port) || hc.RequestPath != path || hc.Description != makeHealthCheckDescription(name)
drift = drift || hc.CheckIntervalSec != gceHcCheckIntervalSeconds || hc.TimeoutSec != gceHcTimeoutSeconds
drift = drift || hc.UnhealthyThreshold != gceHcUnhealthyThreshold || hc.HealthyThreshold != gceHcHealthyThreshold
if drift {
glog.Infof("Health check %v exists but parameters have drifted - updating", name)
if err := gce.UpdateHttpHealthCheck(newHC); err != nil {
return nil, err
}
}
return hc, nil
}
// Passing nil for requested IP is perfectly fine - it just means that no specific
// IP is being requested.
// Returns whether the forwarding rule exists, whether it needs to be updated,
@ -962,16 +1036,24 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
return nil
}
func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error {
func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity, hc *compute.HttpHealthCheck) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
hcLinks := []string{}
if hc != nil {
hcLinks = append(hcLinks, hc.SelfLink)
}
if len(hcLinks) > 0 {
glog.Infof("Creating targetpool %v with healthchecking", name)
}
pool := &compute.TargetPool{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
Instances: instances,
SessionAffinity: translateAffinityType(affinityType),
HealthChecks: hcLinks,
}
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
@ -1279,6 +1361,16 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *api.
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, loadBalancerName,
gce.region)
var hc *compute.HttpHealthCheck
if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" {
var err error
hc, err = gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
return err
}
}
errs := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(loadBalancerName, gce.region) },
// Even though we don't hold on to static IPs for load balancers, it's
@ -1291,7 +1383,7 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *api.
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return err
}
if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil {
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hc); err != nil {
return err
}
return nil
@ -1319,7 +1411,14 @@ func (gce *GCECloud) deleteForwardingRule(name, region string) error {
return nil
}
func (gce *GCECloud) deleteTargetPool(name, region string) error {
func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error {
if hc != nil {
glog.Infof("Deleting health check %v", hc.Name)
if err := gce.DeleteHttpHealthCheck(hc.Name); err != nil {
glog.Warningf("Failed to delete health check %v: %v", hc, err)
return err
}
}
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)

View File

@ -36,8 +36,11 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/types"
featuregate "k8s.io/kubernetes/pkg/util/config"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/sets"
@ -137,6 +140,14 @@ type serviceInfo struct {
stickyMaxAgeSeconds int
externalIPs []string
loadBalancerSourceRanges []string
onlyNodeLocalEndpoints bool
healthCheckNodePort int
}
// internal struct for endpoints information
type endpointsInfo struct {
ip string
localEndpoint bool
}
// returns a new serviceInfo struct
@ -152,7 +163,7 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo {
type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap map[proxy.ServicePortName]*serviceInfo
endpointsMap map[proxy.ServicePortName][]string
endpointsMap map[proxy.ServicePortName][]*endpointsInfo
portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
@ -215,9 +226,12 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
glog.Warningf("invalid nodeIP, initialize kube-proxy with 127.0.0.1 as nodeIP")
nodeIP = net.ParseIP("127.0.0.1")
}
go healthcheck.Run()
return &Proxier{
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
endpointsMap: make(map[proxy.ServicePortName][]string),
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod,
iptables: ipt,
@ -287,7 +301,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Hunt for service and endpoint chains.
for chain := range existingNATChains {
chainString := string(chain)
if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") {
if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") {
writeLine(natChains, existingNATChains[chain]) // flush
writeLine(natRules, "-X", chainString) // delete
}
@ -421,6 +435,18 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
info.sessionAffinityType = service.Spec.SessionAffinity
info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly()
if info.onlyNodeLocalEndpoints {
p := apiservice.GetServiceHealthCheckNodePort(service)
if p == 0 {
glog.Errorf("Service does not contain necessary annotation %v",
apiservice.AnnotationHealthCheckNodePort)
} else {
info.healthCheckNodePort = int(p)
// Turn on healthcheck responder to listen on the health check nodePort
healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort)
}
}
proxier.serviceMap[serviceName] = info
glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
@ -428,22 +454,61 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
}
staleUDPServices := sets.NewString()
// Remove services missing from the update.
for name := range proxier.serviceMap {
// Remove serviceports missing from the update.
for name, info := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1).Infof("Removing service %q", name)
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String())
if info.protocol == api.ProtocolUDP {
staleUDPServices.Insert(info.clusterIP.String())
}
delete(proxier.serviceMap, name)
if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
// Remove ServiceListener health check nodePorts from the health checker
// TODO - Stats
healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort)
}
}
}
proxier.syncProxyRules()
proxier.deleteServiceConnections(staleUDPServices.List())
}
// Generate a list of ip strings from the list of endpoint infos
func flattenEndpointsInfo(endPoints []*endpointsInfo) []string {
var endpointIPs []string
for _, ep := range endPoints {
endpointIPs = append(endpointIPs, ep.ip)
}
return endpointIPs
}
// Reconstruct the list of endpoint infos from the endpointIP list
// Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos
// from the full []hostPortInfo slice.
//
// For e.g. if input is
// endpoints = []hostPortInfo{ {host="1.1.1.1", port=22, localEndpointOnly=<bool>}, {host="2.2.2.2", port=80, localEndpointOnly=<bool>} }
// endpointIPs = []string{ "2.2.2.2:80" }
//
// then output will be
//
// []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=<bool>} }
func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo {
lookupSet := sets.NewString()
for _, ip := range endpointIPs {
lookupSet.Insert(ip)
}
var filteredEndpoints []*endpointsInfo
for _, hpp := range endPoints {
key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))
if lookupSet.Has(key) {
filteredEndpoints = append(filteredEndpoints, &endpointsInfo{ip: key, localEndpoint: hpp.localEndpoint})
}
}
return filteredEndpoints
}
// OnEndpointsUpdate takes in a slice of updated endpoints.
func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
start := time.Now()
@ -457,6 +522,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
staleConnections := make(map[endpointServicePair]bool)
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
// Update endpoints for services.
for i := range allEndpoints {
@ -464,59 +530,85 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
portsToEndpoints := map[string][]hostPortPair{}
portsToEndpoints := map[string][]hostPortInfo{}
for i := range svcEndpoints.Subsets {
ss := &svcEndpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)})
var isLocalEndpoint bool
if addr.NodeName != nil {
isLocalEndpoint = *addr.NodeName == proxier.hostname
isLocalEndpoint = featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && isLocalEndpoint
}
hostPortObject := hostPortInfo{
host: addr.IP,
port: int(port.Port),
localEndpoint: isLocalEndpoint,
}
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject)
}
}
}
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
svcPortToInfoMap[svcPort] = portsToEndpoints[portname]
curEndpoints := proxier.endpointsMap[svcPort]
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
removedEndpoints := getRemovedEndpoints(curEndpoints, newEndpoints)
// Flatten the list of current endpoint infos to just a list of ips as strings
curEndpointIPs := flattenEndpointsInfo(curEndpoints)
if len(curEndpointIPs) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEndpoints) {
removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEndpoints)
for _, ep := range removedEndpoints {
staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
}
glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
proxier.endpointsMap[svcPort] = newEndpoints
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
// Once the set operations using the list of ips are complete, build the list of endpoint infos
proxier.endpointsMap[svcPort] = proxier.buildEndpointInfoList(portsToEndpoints[portname], newEndpoints)
}
activeEndpoints[svcPort] = true
}
}
// Remove endpoints missing from the update.
for name := range proxier.endpointsMap {
if !activeEndpoints[name] {
for svcPort := range proxier.endpointsMap {
if !activeEndpoints[svcPort] {
// record endpoints of unactive service to stale connections
for _, ep := range proxier.endpointsMap[name] {
staleConnections[endpointServicePair{endpoint: ep, servicePortName: name}] = true
for _, ep := range proxier.endpointsMap[svcPort] {
staleConnections[endpointServicePair{endpoint: ep.ip, servicePortName: svcPort}] = true
}
glog.V(2).Infof("Removing endpoints for %q", name)
delete(proxier.endpointsMap, name)
glog.V(2).Infof("Removing endpoints for %q", svcPort)
delete(proxier.endpointsMap, svcPort)
}
}
proxier.updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort])
}
proxier.syncProxyRules()
proxier.deleteEndpointConnections(staleConnections)
}
// used in OnEndpointsUpdate
type hostPortPair struct {
host string
port int
// updateHealthCheckEntries - send the new set of local endpoints to the health checker
func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) {
// Use a set instead of a slice to provide deduplication
endpoints := sets.NewString()
for _, portInfo := range hostPorts {
if portInfo.localEndpoint {
// kube-proxy health check only needs local endpoints
endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
}
}
healthcheck.UpdateEndpoints(name, endpoints)
}
func isValidEndpoint(hpp *hostPortPair) bool {
// used in OnEndpointsUpdate
type hostPortInfo struct {
host string
port int
localEndpoint bool
}
func isValidEndpoint(hpp *hostPortInfo) bool {
return hpp.host != "" && hpp.port > 0
}
@ -531,7 +623,7 @@ func slicesEquiv(lhs, rhs []string) bool {
return false
}
func flattenValidEndpoints(endpoints []hostPortPair) []string {
func flattenValidEndpoints(endpoints []hostPortInfo) []string {
// Convert Endpoint objects into strings for easier use later.
var result []string
for i := range endpoints {
@ -569,6 +661,15 @@ func serviceFirewallChainName(s proxy.ServicePortName, protocol string) utilipta
return utiliptables.Chain("KUBE-FW-" + portProtoHash(s, protocol))
}
// serviceLBPortChainName takes the ServicePortName for a service and
// returns the associated iptables chain. This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do
// this because Iptables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
func serviceLBChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain {
return utiliptables.Chain("KUBE-XLB-" + portProtoHash(s, protocol))
}
// This is the same as servicePortChainName but with the endpoint included.
func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain {
hash := sha256.Sum256([]byte(s.String() + protocol + endpoint))
@ -784,6 +885,18 @@ func (proxier *Proxier) syncProxyRules() {
}
activeNATChains[svcChain] = true
svcXlbChain := serviceLBChainName(svcName, protocol)
if svcInfo.onlyNodeLocalEndpoints {
// Only for services with the externalTraffic annotation set to OnlyLocal
// create the per-service LB chain, retaining counters if possible.
if lbChain, ok := existingNATChains[svcXlbChain]; ok {
writeLine(natChains, lbChain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain))
}
activeNATChains[svcXlbChain] = true
}
// Capture the clusterIP.
args := []string{
"-A", string(kubeServicesChain),
@ -879,17 +992,24 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
}
// We have to SNAT packets from external IPs.
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// Each source match rule in the FW chain may jump to either the SVC or the XLB chain
chosenChain := svcXlbChain
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if !svcInfo.onlyNodeLocalEndpoints {
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
chosenChain = svcChain
}
if len(svcInfo.loadBalancerSourceRanges) == 0 {
// allow all sources, so jump directly to KUBE-SVC chain
writeLine(natRules, append(args, "-j", string(svcChain))...)
// allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
writeLine(natRules, append(args, "-j", string(chosenChain))...)
} else {
// firewall filter based on each source range
allowFromNode := false
for _, src := range svcInfo.loadBalancerSourceRanges {
writeLine(natRules, append(args, "-s", src, "-j", string(svcChain))...)
writeLine(natRules, append(args, "-s", src, "-j", string(chosenChain))...)
// ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src)
if cidr.Contains(proxier.nodeIP) {
@ -900,7 +1020,7 @@ func (proxier *Proxier) syncProxyRules() {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host.
if allowFromNode {
writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(svcChain))...)
writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...)
}
}
@ -961,11 +1081,12 @@ func (proxier *Proxier) syncProxyRules() {
// Generate the per-endpoint chains. We do this in multiple passes so we
// can group rules together.
endpoints := make([]string, 0)
// These two slices parallel each other - keep in sync
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
for _, ep := range proxier.endpointsMap[svcName] {
endpoints = append(endpoints, ep)
endpointChain := servicePortEndpointChainName(svcName, protocol, ep)
endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip)
endpointChains = append(endpointChains, endpointChain)
// Create the endpoint chain, retaining counters if possible.
@ -1014,29 +1135,73 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", svcName.String(),
}
// Handle traffic that loops back to the originator with SNAT.
// Technically we only need to do this if the endpoint is on this
// host, but we don't have that information, so we just do this for
// all endpoints.
// TODO: if we grow logic to get this node's pod CIDR, we can use it.
writeLine(natRules, append(args,
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]),
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]),
"-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i])
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip)
writeLine(natRules, args...)
}
// The logic below this applies only if this service is marked as OnlyLocal
if !svcInfo.onlyNodeLocalEndpoints {
continue
}
// Now write ingress loadbalancing & DNAT rules only for services that have a localOnly annotation
// TODO - This logic may be combinable with the block above that creates the svc balancer chain
localEndpoints := make([]*endpointsInfo, 0)
localEndpointChains := make([]utiliptables.Chain, 0)
for i := range endpointChains {
if endpoints[i].localEndpoint {
// These slices parallel each other; must be kept in sync
localEndpoints = append(localEndpoints, endpoints[i])
localEndpointChains = append(localEndpointChains, endpointChains[i])
}
}
numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 {
// Blackhole all traffic since there are no local endpoints
args := []string{
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcName.String()),
"-j",
string(KubeMarkDropChain),
}
writeLine(natRules, args...)
} else {
// Setup probability filter rules only over local endpoints
for i, endpointChain := range localEndpointChains {
// Balancing rules in the per-service chain.
args := []string{
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcName.String()),
}
if i < (numLocalEndpoints - 1) {
// Each rule is a probabilistic match.
args = append(args,
"-m", "statistic",
"--mode", "random",
"--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i)))
}
// The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain))
writeLine(natRules, args...)
}
}
}
// Delete chains no longer in use.
for chain := range existingNATChains {
if !activeNATChains[chain] {
chainString := string(chain)
if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") {
if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
// Ignore chains that aren't ours.
continue
}

View File

@ -28,12 +28,14 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/rest"
apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/registry/endpoint"
"k8s.io/kubernetes/pkg/registry/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/service/portallocator"
"k8s.io/kubernetes/pkg/runtime"
featuregate "k8s.io/kubernetes/pkg/util/config"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/watch"
@ -133,6 +135,35 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
}
}
if featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && shouldCheckOrAssignHealthCheckNodePort(service) {
var healthCheckNodePort int
var err error
if l, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; ok {
healthCheckNodePort, err = strconv.Atoi(l)
if err != nil || healthCheckNodePort <= 0 {
return nil, errors.NewInternalError(fmt.Errorf("Failed to parse annotation %v: %v", apiservice.AnnotationHealthCheckNodePort, err))
}
}
if healthCheckNodePort > 0 {
// If the request has a health check nodePort in mind, attempt to reserve it
err := nodePortOp.Allocate(int(healthCheckNodePort))
if err != nil {
return nil, errors.NewInternalError(fmt.Errorf("Failed to allocate requested HealthCheck nodePort %v: %v", healthCheckNodePort, err))
}
} else {
// If the request has no health check nodePort specified, allocate any
healthCheckNodePort, err = nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return nil, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
// Insert the newly allocated health check port as an annotation (plan of record for Alpha)
service.Annotations[apiservice.AnnotationHealthCheckNodePort] = fmt.Sprintf("%d", healthCheckNodePort)
}
}
out, err := rs.registry.CreateService(ctx, service)
if err != nil {
err = rest.CheckGeneratedNameError(Strategy, err, service)
@ -398,3 +429,12 @@ func shouldAssignNodePorts(service *api.Service) bool {
return false
}
}
func shouldCheckOrAssignHealthCheckNodePort(service *api.Service) bool {
if service.Spec.Type == api.ServiceTypeLoadBalancer {
// True if Service-type == LoadBalancer AND annotation AnnotationExternalTraffic present
return (featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && apiservice.NeedsHealthCheck(service))
}
glog.V(4).Infof("Service type: %v does not need health check node port", service.Spec.Type)
return false
}

View File

@ -24,14 +24,19 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/registry/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/service/portallocator"
utilnet "k8s.io/kubernetes/pkg/util/net"
featuregate "k8s.io/kubernetes/pkg/util/config"
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
func init() {
featuregate.DefaultFeatureGate.Set("AllowExtTrafficLocalEndpoints=true")
}
// TODO(wojtek-t): Cleanup this file.
// It is now testing mostly the same things as other resources but
// in a completely different way. We should unify it.
@ -821,3 +826,179 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) {
t.Errorf("Expected 'Service.Namespace does not match the provided context' error, got '%s'", err.Error())
}
}
// Validate allocation of a nodePort when the externalTraffic=OnlyLocal annotation is set
// and type is LoadBalancer
func TestServiceRegistryExternalTrafficAnnotationHealthCheckNodePortAllocation(t *testing.T) {
ctx := api.NewDefaultContext()
storage, _ := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp",
Annotations: map[string]string{
service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
},
},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeLoadBalancer,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
created_svc, err := storage.Create(ctx, svc)
if created_svc == nil || err != nil {
t.Errorf("Unexpected failure creating service %v", err)
}
created_service := created_svc.(*api.Service)
if !service.NeedsHealthCheck(created_service) {
t.Errorf("Unexpected missing annotation %s", service.AnnotationExternalTraffic)
}
port := service.GetServiceHealthCheckNodePort(created_service)
if port == 0 {
t.Errorf("Failed to allocate and create the health check node port annotation %s", service.AnnotationHealthCheckNodePort)
}
}
// Validate using the user specified nodePort when the externalTraffic=OnlyLocal annotation is set
// and type is LoadBalancer
func TestServiceRegistryExternalTrafficAnnotationHealthCheckNodePortUserAllocation(t *testing.T) {
ctx := api.NewDefaultContext()
storage, _ := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp",
Annotations: map[string]string{
service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
service.AnnotationHealthCheckNodePort: "30200",
},
},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeLoadBalancer,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
created_svc, err := storage.Create(ctx, svc)
if created_svc == nil || err != nil {
t.Errorf("Unexpected failure creating service %v", err)
}
created_service := created_svc.(*api.Service)
if !service.NeedsHealthCheck(created_service) {
t.Errorf("Unexpected missing annotation %s", service.AnnotationExternalTraffic)
}
port := service.GetServiceHealthCheckNodePort(created_service)
if port == 0 {
t.Errorf("Failed to allocate and create the health check node port annotation %s", service.AnnotationHealthCheckNodePort)
}
if port != 30200 {
t.Errorf("Failed to allocate requested nodePort expected 30200, got %d", port)
}
}
// Validate that the service creation fails when the requested port number is -1
func TestServiceRegistryExternalTrafficAnnotationNegative(t *testing.T) {
ctx := api.NewDefaultContext()
storage, _ := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp",
Annotations: map[string]string{
service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
service.AnnotationHealthCheckNodePort: "-1",
},
},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeLoadBalancer,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
created_svc, err := storage.Create(ctx, svc)
if created_svc == nil || err != nil {
return
}
t.Errorf("Unexpected creation of service with invalid healthCheckNodePort specified")
}
// Validate that the health check nodePort is not allocated when the externalTraffic annotation is !"OnlyLocal"
func TestServiceRegistryExternalTrafficAnnotationGlobal(t *testing.T) {
ctx := api.NewDefaultContext()
storage, _ := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp",
Annotations: map[string]string{
service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficGlobal,
},
},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeLoadBalancer,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
created_svc, err := storage.Create(ctx, svc)
if created_svc == nil || err != nil {
t.Errorf("Unexpected failure creating service %v", err)
}
created_service := created_svc.(*api.Service)
// Make sure the service does not have the annotation
if service.NeedsHealthCheck(created_service) {
t.Errorf("Unexpected value for annotation %s", service.AnnotationExternalTraffic)
}
// Make sure the service does not have the health check node port allocated
port := service.GetServiceHealthCheckNodePort(created_service)
if port != 0 {
t.Errorf("Unexpected allocation of health check node port annotation %s", service.AnnotationHealthCheckNodePort)
}
}
// Validate that the health check nodePort is not allocated when service type is ClusterIP
func TestServiceRegistryExternalTrafficAnnotationClusterIP(t *testing.T) {
ctx := api.NewDefaultContext()
storage, _ := NewTestREST(t, nil)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp",
Annotations: map[string]string{
service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficGlobal,
},
},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
}
created_svc, err := storage.Create(ctx, svc)
if created_svc == nil || err != nil {
t.Errorf("Unexpected failure creating service %v", err)
}
created_service := created_svc.(*api.Service)
// Make sure that ClusterIP services do not have the health check node port allocated
port := service.GetServiceHealthCheckNodePort(created_service)
if port != 0 {
t.Errorf("Unexpected allocation of health check node port annotation %s", service.AnnotationHealthCheckNodePort)
}
}

View File

@ -38,14 +38,16 @@ const (
// specification of gates. Examples:
// AllAlpha=false,NewFeature=true will result in newFeature=true
// AllAlpha=true,NewFeature=false will result in newFeature=false
allAlphaGate = "AllAlpha"
allAlphaGate = "AllAlpha"
externalTrafficLocalOnly = "AllowExtTrafficLocalEndpoints"
)
var (
// Default values for recorded features. Every new feature gate should be
// represented here.
knownFeatures = map[string]featureSpec{
allAlphaGate: {false, alpha},
allAlphaGate: {false, alpha},
externalTrafficLocalOnly: {false, alpha},
}
// Special handling for a few gates.
@ -85,6 +87,10 @@ type FeatureGate interface {
// // alpha: v1.4
// MyFeature() bool
// owner: @girishkalele
// alpha: v1.4
ExternalTrafficLocalOnly() bool
// TODO: Define accessors for each non-API alpha feature.
}
@ -154,6 +160,11 @@ func (f *featureGate) Type() string {
return "mapStringBool"
}
// ExternalTrafficLocalOnly returns value for AllowExtTrafficLocalEndpoints
func (f *featureGate) ExternalTrafficLocalOnly() bool {
return f.lookup(externalTrafficLocalOnly)
}
func (f *featureGate) lookup(key string) bool {
defaultValue := f.known[key].enabled
if f.enabled != nil {

View File

@ -14,7 +14,7 @@
.PHONY: all netexec image push clean
TAG = 1.5
TAG = 1.6
PREFIX = gcr.io/google_containers

View File

@ -80,6 +80,7 @@ func main() {
func startHTTPServer(httpPort int) {
http.HandleFunc("/", rootHandler)
http.HandleFunc("/clientip", clientIpHandler)
http.HandleFunc("/echo", echoHandler)
http.HandleFunc("/exit", exitHandler)
http.HandleFunc("/hostname", hostnameHandler)
@ -102,6 +103,11 @@ func echoHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s", r.FormValue("msg"))
}
func clientIpHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("GET /clientip")
fmt.Fprintf(w, r.RemoteAddr)
}
func exitHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("GET /exit?code=%s", r.FormValue("code"))
code, err := strconv.Atoi(r.FormValue("code"))
@ -345,7 +351,7 @@ func hostNameHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, getHostName())
}
// udp server only supports the hostName command.
// udp server supports the hostName, echo and clientIP commands.
func startUDPServer(udpPort int) {
serverAddress, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", udpPort))
assertNoError(err)
@ -363,8 +369,8 @@ func startUDPServer(udpPort int) {
for {
n, clientAddress, err := serverConn.ReadFromUDP(buf)
assertNoError(err)
receivedText := strings.TrimSpace(string(buf[0:n]))
if receivedText == "hostName" || receivedText == "hostname" {
receivedText := strings.ToLower(strings.TrimSpace(string(buf[0:n])))
if receivedText == "hostname" {
log.Println("Sending udp hostName response")
_, err = serverConn.WriteToUDP([]byte(getHostName()), clientAddress)
assertNoError(err)
@ -377,6 +383,10 @@ func startUDPServer(udpPort int) {
log.Printf("Echoing %v\n", resp)
_, err = serverConn.WriteToUDP([]byte(resp), clientAddress)
assertNoError(err)
} else if receivedText == "clientip" {
log.Printf("Sending back clientip to %s", clientAddress.String())
_, err = serverConn.WriteToUDP([]byte(clientAddress.String()), clientAddress)
assertNoError(err)
} else if len(receivedText) > 0 {
log.Printf("Unknown udp command received: %v\n", receivedText)
}