From b82c028f77d85ac0be39b510ccdbc2783e43fdc7 Mon Sep 17 00:00:00 2001 From: Girish Kalele Date: Mon, 15 Aug 2016 11:22:44 -0700 Subject: [PATCH] GCE Cloud provider changes for ESIPP Add feature gate (ExternalTrafficLocalOnly) for alpha feature --- pkg/api/service/annotations.go | 61 ++++++ pkg/cloudprovider/providers/gce/gce.go | 115 ++++++++++- pkg/proxy/iptables/proxier.go | 257 ++++++++++++++++++++----- pkg/registry/service/rest.go | 40 ++++ pkg/registry/service/rest_test.go | 185 +++++++++++++++++- pkg/util/config/feature_gate.go | 15 +- test/images/netexec/Makefile | 2 +- test/images/netexec/netexec.go | 16 +- 8 files changed, 629 insertions(+), 62 deletions(-) diff --git a/pkg/api/service/annotations.go b/pkg/api/service/annotations.go index ee275d3c1e..bd13cc5166 100644 --- a/pkg/api/service/annotations.go +++ b/pkg/api/service/annotations.go @@ -16,6 +16,13 @@ limitations under the License. package service +import ( + "strconv" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" +) + const ( // AnnotationLoadBalancerSourceRangesKey is the key of the annotation on a service to set allowed ingress ranges on their LoadBalancers // @@ -25,4 +32,58 @@ const ( // // Not all cloud providers support this annotation, though AWS & GCE do. AnnotationLoadBalancerSourceRangesKey = "service.beta.kubernetes.io/load-balancer-source-ranges" + + // AnnotationExternalTraffic An annotation that denotes if this Service desires to route external traffic to local + // endpoints only. This preserves Source IP and avoids a second hop. + AnnotationExternalTraffic = "service.alpha.kubernetes.io/external-traffic" + // AnnotationValueExternalTrafficLocal Value of annotation to specify local endpoints behaviour + AnnotationValueExternalTrafficLocal = "OnlyLocal" + // AnnotationValueExternalTrafficGlobal Value of annotation to specify global (legacy) behaviour + AnnotationValueExternalTrafficGlobal = "Global" + // AnnotationHealthCheckNodePort Annotation specifying the healthcheck nodePort for the service + // If not specified, annotation is created by the service api backend with the allocated nodePort + // Will use user-specified nodePort value if specified by the client + AnnotationHealthCheckNodePort = "service.alpha.kubernetes.io/healthcheck-nodeport" ) + +// NeedsHealthCheck Check service for health check annotations +func NeedsHealthCheck(service *api.Service) bool { + if l, ok := service.Annotations[AnnotationExternalTraffic]; ok { + if l == AnnotationValueExternalTrafficLocal { + return true + } else if l == AnnotationValueExternalTrafficGlobal { + return false + } else { + glog.Errorf("Invalid value for annotation %v", AnnotationExternalTraffic) + return false + } + } + return false +} + +// GetServiceHealthCheckNodePort Return health check node port annotation for service, if one exists +func GetServiceHealthCheckNodePort(service *api.Service) int32 { + if NeedsHealthCheck(service) { + if l, ok := service.Annotations[AnnotationHealthCheckNodePort]; ok { + p, err := strconv.Atoi(l) + if err != nil { + glog.Errorf("Failed to parse annotation %v: %v", AnnotationHealthCheckNodePort, err) + return 0 + } + return int32(p) + } + } + return 0 +} + +// GetServiceHealthCheckPathPort Return the path and nodePort programmed into the Cloud LB Health Check +func GetServiceHealthCheckPathPort(service *api.Service) (string, int32) { + if !NeedsHealthCheck(service) { + return "", 0 + } + port := GetServiceHealthCheckNodePort(service) + if port == 0 { + return "", 0 + } + return "/healthz", port +} diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index fc0eaf4178..be4b5bf04e 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -31,7 +31,7 @@ import ( "gopkg.in/gcfg.v1" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/service" + apiservice "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/types" @@ -70,8 +70,16 @@ const ( // were to continuously return a nextPageToken. maxPages = 25 - // Target Pool creation is limited to 200 instances. maxTargetPoolCreateInstances = 200 + + // HTTP Load Balancer parameters + // Configure 2 second period for external health checks. + gceHcCheckIntervalSeconds = int64(2) + gceHcTimeoutSeconds = int64(1) + // Start sending requests as soon as a pod is found on the node. + gceHcHealthyThreshold = int64(1) + // Defaults to 5 * 2 = 10 seconds before the LB will steer traffic away + gceHcUnhealthyThreshold = int64(5) ) // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. @@ -665,7 +673,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv // is because the forwarding rule is used as the indicator that the load // balancer is fully created - it's what getLoadBalancer checks for. // Check if user specified the allow source range - sourceRanges, err := service.GetLoadBalancerSourceRanges(apiService) + sourceRanges, err := apiservice.GetLoadBalancerSourceRanges(apiService) if err != nil { return nil, err } @@ -720,7 +728,18 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName) } if tpExists && tpNeedsUpdate { - if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil { + // Generate the list of health checks for this target pool to pass to deleteTargetPool + var hc *compute.HttpHealthCheck + if path, _ := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" { + var err error + hc, err = gce.GetHttpHealthCheck(loadBalancerName) + if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err) + } + } + + // Pass healthchecks to deleteTargetPool to cleanup health checks prior to cleaning up the target pool itself. + if err := gce.deleteTargetPool(loadBalancerName, gce.region, hc); err != nil { return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err) } glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName) @@ -734,7 +753,18 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv createInstances = createInstances[:maxTargetPoolCreateInstances] } - if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType); err != nil { + // Create health checks for this target pool to pass to createTargetPool for health check links + var hc *compute.HttpHealthCheck + if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" { + glog.Infof("service %v needs health checks on :%d/%s)", apiService.Name, healthCheckNodePort, path) + var err error + hc, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort) + if err != nil { + return nil, fmt.Errorf("Failed to create health check for localized service %v on node port %v: %v", loadBalancerName, healthCheckNodePort, err) + } + } + // Pass healthchecks to createTargetPool which needs them as health check links in the target pool + if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType, hc); err != nil { return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err) } if len(hosts) <= maxTargetPoolCreateInstances { @@ -767,9 +797,53 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: ipAddress}} + return status, nil } +func makeHealthCheckDescription(serviceName string) string { + return fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName) +} + +func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) { + newHC := &compute.HttpHealthCheck{ + Name: name, + Port: int64(port), + RequestPath: path, + Host: "", + Description: makeHealthCheckDescription(name), + CheckIntervalSec: gceHcCheckIntervalSeconds, + TimeoutSec: gceHcTimeoutSeconds, + HealthyThreshold: gceHcHealthyThreshold, + UnhealthyThreshold: gceHcUnhealthyThreshold, + } + + hc, err = gce.GetHttpHealthCheck(name) + if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path) + if err = gce.CreateHttpHealthCheck(newHC); err != nil { + return nil, err + } + hc, err = gce.GetHttpHealthCheck(name) + if err != nil { + glog.Errorf("Failed to get http health check %v", err) + return nil, err + } + return hc, nil + } + // Validate health check fields + drift := hc.Port != int64(port) || hc.RequestPath != path || hc.Description != makeHealthCheckDescription(name) + drift = drift || hc.CheckIntervalSec != gceHcCheckIntervalSeconds || hc.TimeoutSec != gceHcTimeoutSeconds + drift = drift || hc.UnhealthyThreshold != gceHcUnhealthyThreshold || hc.HealthyThreshold != gceHcHealthyThreshold + if drift { + glog.Infof("Health check %v exists but parameters have drifted - updating", name) + if err := gce.UpdateHttpHealthCheck(newHC); err != nil { + return nil, err + } + } + return hc, nil +} + // Passing nil for requested IP is perfectly fine - it just means that no specific // IP is being requested. // Returns whether the forwarding rule exists, whether it needs to be updated, @@ -962,16 +1036,24 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s return nil } -func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error { +func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity, hc *compute.HttpHealthCheck) error { var instances []string for _, host := range hosts { instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name)) } + hcLinks := []string{} + if hc != nil { + hcLinks = append(hcLinks, hc.SelfLink) + } + if len(hcLinks) > 0 { + glog.Infof("Creating targetpool %v with healthchecking", name) + } pool := &compute.TargetPool{ Name: name, Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName), Instances: instances, SessionAffinity: translateAffinityType(affinityType), + HealthChecks: hcLinks, } op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do() if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { @@ -1279,6 +1361,16 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *api. glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, loadBalancerName, gce.region) + var hc *compute.HttpHealthCheck + if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" { + var err error + hc, err = gce.GetHttpHealthCheck(loadBalancerName) + if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { + glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err) + return err + } + } + errs := utilerrors.AggregateGoroutines( func() error { return gce.deleteFirewall(loadBalancerName, gce.region) }, // Even though we don't hold on to static IPs for load balancers, it's @@ -1291,7 +1383,7 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *api. if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil { return err } - if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil { + if err := gce.deleteTargetPool(loadBalancerName, gce.region, hc); err != nil { return err } return nil @@ -1319,7 +1411,14 @@ func (gce *GCECloud) deleteForwardingRule(name, region string) error { return nil } -func (gce *GCECloud) deleteTargetPool(name, region string) error { +func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error { + if hc != nil { + glog.Infof("Deleting health check %v", hc.Name) + if err := gce.DeleteHttpHealthCheck(hc.Name); err != nil { + glog.Warningf("Failed to delete health check %v: %v", hc, err) + return err + } + } op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 04966da9f1..801ae7b129 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -36,8 +36,11 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + apiservice "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/types" + featuregate "k8s.io/kubernetes/pkg/util/config" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" "k8s.io/kubernetes/pkg/util/sets" @@ -137,6 +140,14 @@ type serviceInfo struct { stickyMaxAgeSeconds int externalIPs []string loadBalancerSourceRanges []string + onlyNodeLocalEndpoints bool + healthCheckNodePort int +} + +// internal struct for endpoints information +type endpointsInfo struct { + ip string + localEndpoint bool } // returns a new serviceInfo struct @@ -152,7 +163,7 @@ func newServiceInfo(service proxy.ServicePortName) *serviceInfo { type Proxier struct { mu sync.Mutex // protects the following fields serviceMap map[proxy.ServicePortName]*serviceInfo - endpointsMap map[proxy.ServicePortName][]string + endpointsMap map[proxy.ServicePortName][]*endpointsInfo portsMap map[localPort]closeable haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event @@ -215,9 +226,12 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod glog.Warningf("invalid nodeIP, initialize kube-proxy with 127.0.0.1 as nodeIP") nodeIP = net.ParseIP("127.0.0.1") } + + go healthcheck.Run() + return &Proxier{ serviceMap: make(map[proxy.ServicePortName]*serviceInfo), - endpointsMap: make(map[proxy.ServicePortName][]string), + endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo), portsMap: make(map[localPort]closeable), syncPeriod: syncPeriod, iptables: ipt, @@ -287,7 +301,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Hunt for service and endpoint chains. for chain := range existingNATChains { chainString := string(chain) - if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") { + if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { writeLine(natChains, existingNATChains[chain]) // flush writeLine(natRules, "-X", chainString) // delete } @@ -421,6 +435,18 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { info.loadBalancerStatus = *api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer) info.sessionAffinityType = service.Spec.SessionAffinity info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges + info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() + if info.onlyNodeLocalEndpoints { + p := apiservice.GetServiceHealthCheckNodePort(service) + if p == 0 { + glog.Errorf("Service does not contain necessary annotation %v", + apiservice.AnnotationHealthCheckNodePort) + } else { + info.healthCheckNodePort = int(p) + // Turn on healthcheck responder to listen on the health check nodePort + healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort) + } + } proxier.serviceMap[serviceName] = info glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info)) @@ -428,22 +454,61 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { } staleUDPServices := sets.NewString() - // Remove services missing from the update. - for name := range proxier.serviceMap { + // Remove serviceports missing from the update. + for name, info := range proxier.serviceMap { if !activeServices[name] { glog.V(1).Infof("Removing service %q", name) - if proxier.serviceMap[name].protocol == api.ProtocolUDP { - staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String()) + if info.protocol == api.ProtocolUDP { + staleUDPServices.Insert(info.clusterIP.String()) } delete(proxier.serviceMap, name) + if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { + // Remove ServiceListener health check nodePorts from the health checker + // TODO - Stats + healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort) + } } } - proxier.syncProxyRules() proxier.deleteServiceConnections(staleUDPServices.List()) } +// Generate a list of ip strings from the list of endpoint infos +func flattenEndpointsInfo(endPoints []*endpointsInfo) []string { + var endpointIPs []string + for _, ep := range endPoints { + endpointIPs = append(endpointIPs, ep.ip) + } + return endpointIPs +} + +// Reconstruct the list of endpoint infos from the endpointIP list +// Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos +// from the full []hostPortInfo slice. +// +// For e.g. if input is +// endpoints = []hostPortInfo{ {host="1.1.1.1", port=22, localEndpointOnly=}, {host="2.2.2.2", port=80, localEndpointOnly=} } +// endpointIPs = []string{ "2.2.2.2:80" } +// +// then output will be +// +// []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=} } +func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo { + lookupSet := sets.NewString() + for _, ip := range endpointIPs { + lookupSet.Insert(ip) + } + var filteredEndpoints []*endpointsInfo + for _, hpp := range endPoints { + key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)) + if lookupSet.Has(key) { + filteredEndpoints = append(filteredEndpoints, &endpointsInfo{ip: key, localEndpoint: hpp.localEndpoint}) + } + } + return filteredEndpoints +} + // OnEndpointsUpdate takes in a slice of updated endpoints. func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { start := time.Now() @@ -457,6 +522,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set staleConnections := make(map[endpointServicePair]bool) + svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) // Update endpoints for services. for i := range allEndpoints { @@ -464,59 +530,85 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortPair{} + portsToEndpoints := map[string][]hostPortInfo{} for i := range svcEndpoints.Subsets { ss := &svcEndpoints.Subsets[i] for i := range ss.Ports { port := &ss.Ports[i] for i := range ss.Addresses { addr := &ss.Addresses[i] - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) + var isLocalEndpoint bool + if addr.NodeName != nil { + isLocalEndpoint = *addr.NodeName == proxier.hostname + isLocalEndpoint = featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && isLocalEndpoint + } + hostPortObject := hostPortInfo{ + host: addr.IP, + port: int(port.Port), + localEndpoint: isLocalEndpoint, + } + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) } } } - for portname := range portsToEndpoints { svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} + svcPortToInfoMap[svcPort] = portsToEndpoints[portname] curEndpoints := proxier.endpointsMap[svcPort] newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) - - if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { - removedEndpoints := getRemovedEndpoints(curEndpoints, newEndpoints) + // Flatten the list of current endpoint infos to just a list of ips as strings + curEndpointIPs := flattenEndpointsInfo(curEndpoints) + if len(curEndpointIPs) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEndpoints) { + removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEndpoints) for _, ep := range removedEndpoints { staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true } - glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) - proxier.endpointsMap[svcPort] = newEndpoints + glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) + // Once the set operations using the list of ips are complete, build the list of endpoint infos + proxier.endpointsMap[svcPort] = proxier.buildEndpointInfoList(portsToEndpoints[portname], newEndpoints) } activeEndpoints[svcPort] = true } } - // Remove endpoints missing from the update. - for name := range proxier.endpointsMap { - if !activeEndpoints[name] { + for svcPort := range proxier.endpointsMap { + if !activeEndpoints[svcPort] { // record endpoints of unactive service to stale connections - for _, ep := range proxier.endpointsMap[name] { - staleConnections[endpointServicePair{endpoint: ep, servicePortName: name}] = true + for _, ep := range proxier.endpointsMap[svcPort] { + staleConnections[endpointServicePair{endpoint: ep.ip, servicePortName: svcPort}] = true } - glog.V(2).Infof("Removing endpoints for %q", name) - delete(proxier.endpointsMap, name) + glog.V(2).Infof("Removing endpoints for %q", svcPort) + delete(proxier.endpointsMap, svcPort) } - } + proxier.updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort]) + } proxier.syncProxyRules() proxier.deleteEndpointConnections(staleConnections) } -// used in OnEndpointsUpdate -type hostPortPair struct { - host string - port int +// updateHealthCheckEntries - send the new set of local endpoints to the health checker +func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { + // Use a set instead of a slice to provide deduplication + endpoints := sets.NewString() + for _, portInfo := range hostPorts { + if portInfo.localEndpoint { + // kube-proxy health check only needs local endpoints + endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) + } + } + healthcheck.UpdateEndpoints(name, endpoints) } -func isValidEndpoint(hpp *hostPortPair) bool { +// used in OnEndpointsUpdate +type hostPortInfo struct { + host string + port int + localEndpoint bool +} + +func isValidEndpoint(hpp *hostPortInfo) bool { return hpp.host != "" && hpp.port > 0 } @@ -531,7 +623,7 @@ func slicesEquiv(lhs, rhs []string) bool { return false } -func flattenValidEndpoints(endpoints []hostPortPair) []string { +func flattenValidEndpoints(endpoints []hostPortInfo) []string { // Convert Endpoint objects into strings for easier use later. var result []string for i := range endpoints { @@ -569,6 +661,15 @@ func serviceFirewallChainName(s proxy.ServicePortName, protocol string) utilipta return utiliptables.Chain("KUBE-FW-" + portProtoHash(s, protocol)) } +// serviceLBPortChainName takes the ServicePortName for a service and +// returns the associated iptables chain. This is computed by hashing (sha256) +// then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do +// this because Iptables Chain Names must be <= 28 chars long, and the longer +// they are the harder they are to read. +func serviceLBChainName(s proxy.ServicePortName, protocol string) utiliptables.Chain { + return utiliptables.Chain("KUBE-XLB-" + portProtoHash(s, protocol)) +} + // This is the same as servicePortChainName but with the endpoint included. func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endpoint string) utiliptables.Chain { hash := sha256.Sum256([]byte(s.String() + protocol + endpoint)) @@ -784,6 +885,18 @@ func (proxier *Proxier) syncProxyRules() { } activeNATChains[svcChain] = true + svcXlbChain := serviceLBChainName(svcName, protocol) + if svcInfo.onlyNodeLocalEndpoints { + // Only for services with the externalTraffic annotation set to OnlyLocal + // create the per-service LB chain, retaining counters if possible. + if lbChain, ok := existingNATChains[svcXlbChain]; ok { + writeLine(natChains, lbChain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain)) + } + activeNATChains[svcXlbChain] = true + } + // Capture the clusterIP. args := []string{ "-A", string(kubeServicesChain), @@ -879,17 +992,24 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(fwChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()), } - // We have to SNAT packets from external IPs. - writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) + + // Each source match rule in the FW chain may jump to either the SVC or the XLB chain + chosenChain := svcXlbChain + // If we are proxying globally, we need to masquerade in case we cross nodes. + // If we are proxying only locally, we can retain the source IP. + if !svcInfo.onlyNodeLocalEndpoints { + writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) + chosenChain = svcChain + } if len(svcInfo.loadBalancerSourceRanges) == 0 { - // allow all sources, so jump directly to KUBE-SVC chain - writeLine(natRules, append(args, "-j", string(svcChain))...) + // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain + writeLine(natRules, append(args, "-j", string(chosenChain))...) } else { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.loadBalancerSourceRanges { - writeLine(natRules, append(args, "-s", src, "-j", string(svcChain))...) + writeLine(natRules, append(args, "-s", src, "-j", string(chosenChain))...) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) if cidr.Contains(proxier.nodeIP) { @@ -900,7 +1020,7 @@ func (proxier *Proxier) syncProxyRules() { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(svcChain))...) + writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) } } @@ -961,11 +1081,12 @@ func (proxier *Proxier) syncProxyRules() { // Generate the per-endpoint chains. We do this in multiple passes so we // can group rules together. - endpoints := make([]string, 0) + // These two slices parallel each other - keep in sync + endpoints := make([]*endpointsInfo, 0) endpointChains := make([]utiliptables.Chain, 0) for _, ep := range proxier.endpointsMap[svcName] { endpoints = append(endpoints, ep) - endpointChain := servicePortEndpointChainName(svcName, protocol, ep) + endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip) endpointChains = append(endpointChains, endpointChain) // Create the endpoint chain, retaining counters if possible. @@ -1014,29 +1135,73 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", svcName.String(), } // Handle traffic that loops back to the originator with SNAT. - // Technically we only need to do this if the endpoint is on this - // host, but we don't have that information, so we just do this for - // all endpoints. - // TODO: if we grow logic to get this node's pod CIDR, we can use it. writeLine(natRules, append(args, - "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i], ":")[0]), + "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]), "-j", string(KubeMarkMasqChain))...) - // Update client-affinity lists. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. - args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i]) + args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip) writeLine(natRules, args...) } + + // The logic below this applies only if this service is marked as OnlyLocal + if !svcInfo.onlyNodeLocalEndpoints { + continue + } + + // Now write ingress loadbalancing & DNAT rules only for services that have a localOnly annotation + // TODO - This logic may be combinable with the block above that creates the svc balancer chain + localEndpoints := make([]*endpointsInfo, 0) + localEndpointChains := make([]utiliptables.Chain, 0) + for i := range endpointChains { + if endpoints[i].localEndpoint { + // These slices parallel each other; must be kept in sync + localEndpoints = append(localEndpoints, endpoints[i]) + localEndpointChains = append(localEndpointChains, endpointChains[i]) + } + } + numLocalEndpoints := len(localEndpointChains) + if numLocalEndpoints == 0 { + // Blackhole all traffic since there are no local endpoints + args := []string{ + "-A", string(svcXlbChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"%s has no local endpoints"`, svcName.String()), + "-j", + string(KubeMarkDropChain), + } + writeLine(natRules, args...) + } else { + // Setup probability filter rules only over local endpoints + for i, endpointChain := range localEndpointChains { + // Balancing rules in the per-service chain. + args := []string{ + "-A", string(svcXlbChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcName.String()), + } + if i < (numLocalEndpoints - 1) { + // Each rule is a probabilistic match. + args = append(args, + "-m", "statistic", + "--mode", "random", + "--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i))) + } + // The final (or only if n == 1) rule is a guaranteed match. + args = append(args, "-j", string(endpointChain)) + writeLine(natRules, args...) + } + } } // Delete chains no longer in use. for chain := range existingNATChains { if !activeNATChains[chain] { chainString := string(chain) - if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") { + if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") { // Ignore chains that aren't ours. continue } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 77da71bbb0..4f00faf769 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -28,12 +28,14 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/rest" + apiservice "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/registry/endpoint" "k8s.io/kubernetes/pkg/registry/service/ipallocator" "k8s.io/kubernetes/pkg/registry/service/portallocator" "k8s.io/kubernetes/pkg/runtime" + featuregate "k8s.io/kubernetes/pkg/util/config" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/validation/field" "k8s.io/kubernetes/pkg/watch" @@ -133,6 +135,35 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err } } + if featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && shouldCheckOrAssignHealthCheckNodePort(service) { + var healthCheckNodePort int + var err error + if l, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; ok { + healthCheckNodePort, err = strconv.Atoi(l) + if err != nil || healthCheckNodePort <= 0 { + return nil, errors.NewInternalError(fmt.Errorf("Failed to parse annotation %v: %v", apiservice.AnnotationHealthCheckNodePort, err)) + } + } + if healthCheckNodePort > 0 { + // If the request has a health check nodePort in mind, attempt to reserve it + err := nodePortOp.Allocate(int(healthCheckNodePort)) + if err != nil { + return nil, errors.NewInternalError(fmt.Errorf("Failed to allocate requested HealthCheck nodePort %v: %v", healthCheckNodePort, err)) + } + } else { + // If the request has no health check nodePort specified, allocate any + healthCheckNodePort, err = nodePortOp.AllocateNext() + if err != nil { + // TODO: what error should be returned here? It's not a + // field-level validation failure (the field is valid), and it's + // not really an internal error. + return nil, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err)) + } + // Insert the newly allocated health check port as an annotation (plan of record for Alpha) + service.Annotations[apiservice.AnnotationHealthCheckNodePort] = fmt.Sprintf("%d", healthCheckNodePort) + } + } + out, err := rs.registry.CreateService(ctx, service) if err != nil { err = rest.CheckGeneratedNameError(Strategy, err, service) @@ -398,3 +429,12 @@ func shouldAssignNodePorts(service *api.Service) bool { return false } } + +func shouldCheckOrAssignHealthCheckNodePort(service *api.Service) bool { + if service.Spec.Type == api.ServiceTypeLoadBalancer { + // True if Service-type == LoadBalancer AND annotation AnnotationExternalTraffic present + return (featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && apiservice.NeedsHealthCheck(service)) + } + glog.V(4).Infof("Service type: %v does not need health check node port", service.Spec.Type) + return false +} diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index fec181b44a..e56e8bc5ef 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -24,14 +24,19 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/rest" + "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/registry/service/ipallocator" "k8s.io/kubernetes/pkg/registry/service/portallocator" - utilnet "k8s.io/kubernetes/pkg/util/net" - + featuregate "k8s.io/kubernetes/pkg/util/config" "k8s.io/kubernetes/pkg/util/intstr" + utilnet "k8s.io/kubernetes/pkg/util/net" ) +func init() { + featuregate.DefaultFeatureGate.Set("AllowExtTrafficLocalEndpoints=true") +} + // TODO(wojtek-t): Cleanup this file. // It is now testing mostly the same things as other resources but // in a completely different way. We should unify it. @@ -821,3 +826,179 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) { t.Errorf("Expected 'Service.Namespace does not match the provided context' error, got '%s'", err.Error()) } } + +// Validate allocation of a nodePort when the externalTraffic=OnlyLocal annotation is set +// and type is LoadBalancer +func TestServiceRegistryExternalTrafficAnnotationHealthCheckNodePortAllocation(t *testing.T) { + ctx := api.NewDefaultContext() + storage, _ := NewTestREST(t, nil) + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", + Annotations: map[string]string{ + service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + }, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeLoadBalancer, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + } + created_svc, err := storage.Create(ctx, svc) + if created_svc == nil || err != nil { + t.Errorf("Unexpected failure creating service %v", err) + } + created_service := created_svc.(*api.Service) + if !service.NeedsHealthCheck(created_service) { + t.Errorf("Unexpected missing annotation %s", service.AnnotationExternalTraffic) + } + port := service.GetServiceHealthCheckNodePort(created_service) + if port == 0 { + t.Errorf("Failed to allocate and create the health check node port annotation %s", service.AnnotationHealthCheckNodePort) + } + +} + +// Validate using the user specified nodePort when the externalTraffic=OnlyLocal annotation is set +// and type is LoadBalancer +func TestServiceRegistryExternalTrafficAnnotationHealthCheckNodePortUserAllocation(t *testing.T) { + ctx := api.NewDefaultContext() + storage, _ := NewTestREST(t, nil) + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", + Annotations: map[string]string{ + service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + service.AnnotationHealthCheckNodePort: "30200", + }, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeLoadBalancer, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + } + created_svc, err := storage.Create(ctx, svc) + if created_svc == nil || err != nil { + t.Errorf("Unexpected failure creating service %v", err) + } + created_service := created_svc.(*api.Service) + if !service.NeedsHealthCheck(created_service) { + t.Errorf("Unexpected missing annotation %s", service.AnnotationExternalTraffic) + } + port := service.GetServiceHealthCheckNodePort(created_service) + if port == 0 { + t.Errorf("Failed to allocate and create the health check node port annotation %s", service.AnnotationHealthCheckNodePort) + } + if port != 30200 { + t.Errorf("Failed to allocate requested nodePort expected 30200, got %d", port) + } +} + +// Validate that the service creation fails when the requested port number is -1 +func TestServiceRegistryExternalTrafficAnnotationNegative(t *testing.T) { + ctx := api.NewDefaultContext() + storage, _ := NewTestREST(t, nil) + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", + Annotations: map[string]string{ + service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, + service.AnnotationHealthCheckNodePort: "-1", + }, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeLoadBalancer, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + } + created_svc, err := storage.Create(ctx, svc) + if created_svc == nil || err != nil { + return + } + t.Errorf("Unexpected creation of service with invalid healthCheckNodePort specified") +} + +// Validate that the health check nodePort is not allocated when the externalTraffic annotation is !"OnlyLocal" +func TestServiceRegistryExternalTrafficAnnotationGlobal(t *testing.T) { + ctx := api.NewDefaultContext() + storage, _ := NewTestREST(t, nil) + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", + Annotations: map[string]string{ + service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficGlobal, + }, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeLoadBalancer, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + } + created_svc, err := storage.Create(ctx, svc) + if created_svc == nil || err != nil { + t.Errorf("Unexpected failure creating service %v", err) + } + created_service := created_svc.(*api.Service) + // Make sure the service does not have the annotation + if service.NeedsHealthCheck(created_service) { + t.Errorf("Unexpected value for annotation %s", service.AnnotationExternalTraffic) + } + // Make sure the service does not have the health check node port allocated + port := service.GetServiceHealthCheckNodePort(created_service) + if port != 0 { + t.Errorf("Unexpected allocation of health check node port annotation %s", service.AnnotationHealthCheckNodePort) + } +} + +// Validate that the health check nodePort is not allocated when service type is ClusterIP +func TestServiceRegistryExternalTrafficAnnotationClusterIP(t *testing.T) { + ctx := api.NewDefaultContext() + storage, _ := NewTestREST(t, nil) + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{Name: "external-lb-esipp", + Annotations: map[string]string{ + service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficGlobal, + }, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + } + created_svc, err := storage.Create(ctx, svc) + if created_svc == nil || err != nil { + t.Errorf("Unexpected failure creating service %v", err) + } + created_service := created_svc.(*api.Service) + // Make sure that ClusterIP services do not have the health check node port allocated + port := service.GetServiceHealthCheckNodePort(created_service) + if port != 0 { + t.Errorf("Unexpected allocation of health check node port annotation %s", service.AnnotationHealthCheckNodePort) + } +} diff --git a/pkg/util/config/feature_gate.go b/pkg/util/config/feature_gate.go index f8777d58c9..af80923cbf 100644 --- a/pkg/util/config/feature_gate.go +++ b/pkg/util/config/feature_gate.go @@ -38,14 +38,16 @@ const ( // specification of gates. Examples: // AllAlpha=false,NewFeature=true will result in newFeature=true // AllAlpha=true,NewFeature=false will result in newFeature=false - allAlphaGate = "AllAlpha" + allAlphaGate = "AllAlpha" + externalTrafficLocalOnly = "AllowExtTrafficLocalEndpoints" ) var ( // Default values for recorded features. Every new feature gate should be // represented here. knownFeatures = map[string]featureSpec{ - allAlphaGate: {false, alpha}, + allAlphaGate: {false, alpha}, + externalTrafficLocalOnly: {false, alpha}, } // Special handling for a few gates. @@ -85,6 +87,10 @@ type FeatureGate interface { // // alpha: v1.4 // MyFeature() bool + // owner: @girishkalele + // alpha: v1.4 + ExternalTrafficLocalOnly() bool + // TODO: Define accessors for each non-API alpha feature. } @@ -154,6 +160,11 @@ func (f *featureGate) Type() string { return "mapStringBool" } +// ExternalTrafficLocalOnly returns value for AllowExtTrafficLocalEndpoints +func (f *featureGate) ExternalTrafficLocalOnly() bool { + return f.lookup(externalTrafficLocalOnly) +} + func (f *featureGate) lookup(key string) bool { defaultValue := f.known[key].enabled if f.enabled != nil { diff --git a/test/images/netexec/Makefile b/test/images/netexec/Makefile index e0d477c941..c2188a4356 100644 --- a/test/images/netexec/Makefile +++ b/test/images/netexec/Makefile @@ -14,7 +14,7 @@ .PHONY: all netexec image push clean -TAG = 1.5 +TAG = 1.6 PREFIX = gcr.io/google_containers diff --git a/test/images/netexec/netexec.go b/test/images/netexec/netexec.go index dd9a0a7206..9aae7fd1a5 100644 --- a/test/images/netexec/netexec.go +++ b/test/images/netexec/netexec.go @@ -80,6 +80,7 @@ func main() { func startHTTPServer(httpPort int) { http.HandleFunc("/", rootHandler) + http.HandleFunc("/clientip", clientIpHandler) http.HandleFunc("/echo", echoHandler) http.HandleFunc("/exit", exitHandler) http.HandleFunc("/hostname", hostnameHandler) @@ -102,6 +103,11 @@ func echoHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s", r.FormValue("msg")) } +func clientIpHandler(w http.ResponseWriter, r *http.Request) { + log.Printf("GET /clientip") + fmt.Fprintf(w, r.RemoteAddr) +} + func exitHandler(w http.ResponseWriter, r *http.Request) { log.Printf("GET /exit?code=%s", r.FormValue("code")) code, err := strconv.Atoi(r.FormValue("code")) @@ -345,7 +351,7 @@ func hostNameHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, getHostName()) } -// udp server only supports the hostName command. +// udp server supports the hostName, echo and clientIP commands. func startUDPServer(udpPort int) { serverAddress, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", udpPort)) assertNoError(err) @@ -363,8 +369,8 @@ func startUDPServer(udpPort int) { for { n, clientAddress, err := serverConn.ReadFromUDP(buf) assertNoError(err) - receivedText := strings.TrimSpace(string(buf[0:n])) - if receivedText == "hostName" || receivedText == "hostname" { + receivedText := strings.ToLower(strings.TrimSpace(string(buf[0:n]))) + if receivedText == "hostname" { log.Println("Sending udp hostName response") _, err = serverConn.WriteToUDP([]byte(getHostName()), clientAddress) assertNoError(err) @@ -377,6 +383,10 @@ func startUDPServer(udpPort int) { log.Printf("Echoing %v\n", resp) _, err = serverConn.WriteToUDP([]byte(resp), clientAddress) assertNoError(err) + } else if receivedText == "clientip" { + log.Printf("Sending back clientip to %s", clientAddress.String()) + _, err = serverConn.WriteToUDP([]byte(clientAddress.String()), clientAddress) + assertNoError(err) } else if len(receivedText) > 0 { log.Printf("Unknown udp command received: %v\n", receivedText) }