diff --git a/retrieval/discovery/kubernetes/discovery.go b/retrieval/discovery/kubernetes/discovery.go index c2db40cb5..1b7e02ca1 100644 --- a/retrieval/discovery/kubernetes/discovery.go +++ b/retrieval/discovery/kubernetes/discovery.go @@ -14,16 +14,13 @@ package kubernetes import ( - "bytes" "encoding/json" "fmt" "io/ioutil" "net" "net/http" "os" - "sort" "strconv" - "strings" "sync" "time" @@ -113,10 +110,8 @@ type Discovery struct { nodes map[string]*Node services map[string]map[string]*Service // map of namespace to (map of pod name to pod) - pods map[string]map[string]*Pod nodesMu sync.RWMutex servicesMu sync.RWMutex - podsMu sync.RWMutex runDone chan struct{} } @@ -140,6 +135,19 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { log.Debugf("Kubernetes Discovery.Run beginning") defer close(ch) + var wg sync.WaitGroup + + pd := &podDiscovery{ + retryInterval: time.Duration(kd.Conf.RetryInterval), + pods: map[string]map[string]*Pod{}, + kd: kd, + } + wg.Add(1) + go func() { + pd.run(ctx, ch) + wg.Done() + }() + // Send an initial full view. // TODO(fabxc): this does not include all available services and service // endpoints yet. Service endpoints were also missing in the previous Sources() method. @@ -148,22 +156,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { all = append(all, kd.updateAPIServersTargetGroup()) all = append(all, kd.updateNodesTargetGroup()) - pods, _, err := kd.getPods() - if err != nil { - log.Errorf("Cannot initialize pods collection: %s", err) - return - } - kd.podsMu.Lock() - kd.pods = pods - kd.podsMu.Unlock() - - all = append(all, kd.updatePodsTargetGroup()) - for _, ns := range kd.pods { - for _, pod := range ns { - all = append(all, kd.updatePodTargetGroup(pod)) - } - } - select { case ch <- all: case <-ctx.Done(): @@ -176,7 +168,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { go kd.watchNodes(update, ctx.Done(), retryInterval) go kd.startServiceWatch(update, ctx.Done(), retryInterval) - go kd.watchPods(update, ctx.Done(), retryInterval) for { tg := []*config.TargetGroup{} @@ -195,13 +186,6 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case *endpointsEvent: log.Debugf("k8s discovery received endpoint event (EventType=%s, Endpoint Name=%s)", obj.EventType, obj.Endpoints.ObjectMeta.Name) tg = append(tg, kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)) - case *podEvent: - log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", obj.EventType, obj.Pod.ObjectMeta.Name) - // Update the per-pod target group - kd.updatePod(obj.Pod, obj.EventType) - tg = append(tg, kd.updatePodTargetGroup(obj.Pod)) - // ...and update the all pods target group - tg = append(tg, kd.updatePodsTargetGroup()) } } @@ -217,6 +201,8 @@ func (kd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } } } + + wg.Wait() } func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { @@ -227,6 +213,9 @@ func (kd *Discovery) queryAPIServerPath(path string) (*http.Response, error) { return kd.queryAPIServerReq(req) } +type client struct { +} + func (kd *Discovery) queryAPIServerReq(req *http.Request) (*http.Response, error) { // Lock in case we need to rotate API servers to request. kd.apiServersMu.Lock() @@ -824,243 +813,3 @@ func nodeAddresses(node *Node) (net.IP, map[NodeAddressType][]net.IP, error) { } return nil, nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) } - -func (kd *Discovery) updatePod(pod *Pod, eventType EventType) { - kd.podsMu.Lock() - defer kd.podsMu.Unlock() - - switch eventType { - case Deleted: - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; ok { - delete(kd.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) - if len(kd.pods[pod.ObjectMeta.Namespace]) == 0 { - delete(kd.pods, pod.ObjectMeta.Namespace) - } - } - case Added, Modified: - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { - kd.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} - } - kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod - } -} - -func (kd *Discovery) getPods() (map[string]map[string]*Pod, string, error) { - res, err := kd.queryAPIServerPath(podsURL) - if err != nil { - return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) - } - - var pods PodList - if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { - body, _ := ioutil.ReadAll(res.Body) - return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) - } - - podMap := map[string]map[string]*Pod{} - for idx, pod := range pods.Items { - if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { - podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} - } - log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) - podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] - } - - return podMap, pods.ResourceVersion, nil -} - -func (kd *Discovery) watchPods(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) { - until(func() { - pods, resourceVersion, err := kd.getPods() - if err != nil { - log.Errorf("Cannot initialize pods collection: %s", err) - return - } - kd.podsMu.Lock() - kd.pods = pods - kd.podsMu.Unlock() - - req, err := http.NewRequest("GET", podsURL, nil) - if err != nil { - log.Errorf("Cannot create pods request: %s", err) - return - } - - values := req.URL.Query() - values.Add("watch", "true") - values.Add("resourceVersion", resourceVersion) - req.URL.RawQuery = values.Encode() - res, err := kd.queryAPIServerReq(req) - if err != nil { - log.Errorf("Failed to watch pods: %s", err) - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - log.Errorf("Failed to watch pods: %d", res.StatusCode) - return - } - - d := json.NewDecoder(res.Body) - - for { - var event podEvent - if err := d.Decode(&event); err != nil { - log.Errorf("Watch pods unexpectedly closed: %s", err) - return - } - - select { - case events <- &event: - case <-done: - } - } - }, retryInterval, done) -} - -func podSource(pod *Pod) string { - return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name -} - -type ByContainerPort []ContainerPort - -func (a ByContainerPort) Len() int { return len(a) } -func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } -func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -type ByContainerName []Container - -func (a ByContainerName) Len() int { return len(a) } -func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } -func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { - var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) - if pod.PodStatus.PodIP == "" { - log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) - return targets - } - - if pod.PodStatus.Phase != "Running" { - log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) - return targets - } - - ready := "unknown" - for _, cond := range pod.PodStatus.Conditions { - if strings.ToLower(cond.Type) == "ready" { - ready = strings.ToLower(cond.Status) - } - } - - sort.Sort(ByContainerName(pod.PodSpec.Containers)) - - for _, container := range pod.PodSpec.Containers { - // Collect a list of TCP ports - // Sort by port number, ascending - // Product a target pointed at the first port - // Include a label containing all ports (portName=port,PortName=port,...,) - var tcpPorts []ContainerPort - var portLabel *bytes.Buffer = bytes.NewBufferString(",") - - for _, port := range container.Ports { - if port.Protocol == "TCP" { - tcpPorts = append(tcpPorts, port) - } - } - - if len(tcpPorts) == 0 { - log.Debugf("skipping container %s with no TCP ports", container.Name) - continue - } - - sort.Sort(ByContainerPort(tcpPorts)) - - t := model.LabelSet{ - model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), - podNameLabel: model.LabelValue(pod.ObjectMeta.Name), - podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), - podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), - podContainerNameLabel: model.LabelValue(container.Name), - podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), - podReadyLabel: model.LabelValue(ready), - } - - for _, port := range tcpPorts { - portLabel.WriteString(port.Name) - portLabel.WriteString("=") - portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) - portLabel.WriteString(",") - t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) - } - - t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) - - for k, v := range pod.ObjectMeta.Labels { - labelName := strutil.SanitizeLabelName(podLabelPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - - for k, v := range pod.ObjectMeta.Annotations { - labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) - t[model.LabelName(labelName)] = model.LabelValue(v) - } - - targets = append(targets, t) - - if !allContainers { - break - } - } - - if len(targets) == 0 { - log.Debugf("no targets for pod %s", pod.ObjectMeta.Name) - } - - return targets -} - -func (kd *Discovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup { - kd.podsMu.RLock() - defer kd.podsMu.RUnlock() - - tg := &config.TargetGroup{ - Source: podSource(pod), - } - - // If this pod doesn't exist, return an empty target group - if _, ok := kd.pods[pod.ObjectMeta.Namespace]; !ok { - return tg - } - if _, ok := kd.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { - return tg - } - - tg.Labels = model.LabelSet{ - roleLabel: model.LabelValue("container"), - } - tg.Targets = updatePodTargets(pod, true) - - return tg -} - -func (kd *Discovery) updatePodsTargetGroup() *config.TargetGroup { - tg := &config.TargetGroup{ - Source: podsTargetGroupName, - Labels: model.LabelSet{ - roleLabel: model.LabelValue("pod"), - }, - } - - for _, namespace := range kd.pods { - for _, pod := range namespace { - tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) - } - } - - return tg -} diff --git a/retrieval/discovery/kubernetes/pod.go b/retrieval/discovery/kubernetes/pod.go new file mode 100644 index 000000000..8296188f8 --- /dev/null +++ b/retrieval/discovery/kubernetes/pod.go @@ -0,0 +1,332 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/util/strutil" + "golang.org/x/net/context" +) + +type podDiscovery struct { + mtx sync.RWMutex + pods map[string]map[string]*Pod + retryInterval time.Duration + kd *Discovery +} + +func (d *podDiscovery) run(ctx context.Context, ch chan<- []*config.TargetGroup) { + pods, _, err := d.getPods() + if err != nil { + log.Errorf("Cannot initialize pods collection: %s", err) + return + } + d.pods = pods + + initial := []*config.TargetGroup{d.updatePodsTargetGroup()} + for _, ns := range d.pods { + for _, pod := range ns { + initial = append(initial, d.updatePodTargetGroup(pod)) + } + } + + select { + case ch <- initial: + case <-ctx.Done(): + return + } + + update := make(chan *podEvent, 10) + go d.watchPods(update, ctx.Done(), d.retryInterval) + + for { + tgs := []*config.TargetGroup{} + select { + case <-ctx.Done(): + return + case e := <-update: + log.Debugf("k8s discovery received pod event (EventType=%s, Pod Name=%s)", e.EventType, e.Pod.ObjectMeta.Name) + // Update the per-pod target group + d.updatePod(e.Pod, e.EventType) + tgs = append(tgs, d.updatePodTargetGroup(e.Pod)) + // ...and update the all pods target group + tgs = append(tgs, d.updatePodsTargetGroup()) + } + if tgs == nil { + continue + } + + for _, tg := range tgs { + select { + case ch <- []*config.TargetGroup{tg}: + case <-ctx.Done(): + return + } + } + } +} + +func (d *podDiscovery) getPods() (map[string]map[string]*Pod, string, error) { + res, err := d.kd.queryAPIServerPath(podsURL) + if err != nil { + return nil, "", fmt.Errorf("unable to list Kubernetes pods: %s", err) + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response: %d %s", res.StatusCode, res.Status) + } + + var pods PodList + if err := json.NewDecoder(res.Body).Decode(&pods); err != nil { + body, _ := ioutil.ReadAll(res.Body) + return nil, "", fmt.Errorf("unable to list Kubernetes pods; unexpected response body: %s", string(body)) + } + + podMap := map[string]map[string]*Pod{} + for idx, pod := range pods.Items { + if _, ok := podMap[pod.ObjectMeta.Namespace]; !ok { + podMap[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + log.Debugf("Got pod %s in namespace %s", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) + podMap[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = &pods.Items[idx] + } + + return podMap, pods.ResourceVersion, nil +} + +func (d *podDiscovery) watchPods(events chan *podEvent, done <-chan struct{}, retryInterval time.Duration) { + until(func() { + pods, resourceVersion, err := d.getPods() + if err != nil { + log.Errorf("Cannot initialize pods collection: %s", err) + return + } + d.mtx.Lock() + d.pods = pods + d.mtx.Unlock() + + req, err := http.NewRequest("GET", podsURL, nil) + if err != nil { + log.Errorf("Cannot create pods request: %s", err) + return + } + + values := req.URL.Query() + values.Add("watch", "true") + values.Add("resourceVersion", resourceVersion) + req.URL.RawQuery = values.Encode() + res, err := d.kd.queryAPIServerReq(req) + if err != nil { + log.Errorf("Failed to watch pods: %s", err) + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + log.Errorf("Failed to watch pods: %d", res.StatusCode) + return + } + + d := json.NewDecoder(res.Body) + + for { + var event podEvent + if err := d.Decode(&event); err != nil { + log.Errorf("Watch pods unexpectedly closed: %s", err) + return + } + + select { + case events <- &event: + case <-done: + } + } + }, retryInterval, done) +} + +func (d *podDiscovery) updatePod(pod *Pod, eventType EventType) { + d.mtx.Lock() + defer d.mtx.Unlock() + + switch eventType { + case Deleted: + if _, ok := d.pods[pod.ObjectMeta.Namespace]; ok { + delete(d.pods[pod.ObjectMeta.Namespace], pod.ObjectMeta.Name) + if len(d.pods[pod.ObjectMeta.Namespace]) == 0 { + delete(d.pods, pod.ObjectMeta.Namespace) + } + } + case Added, Modified: + if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { + d.pods[pod.ObjectMeta.Namespace] = map[string]*Pod{} + } + d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name] = pod + } +} + +func (d *podDiscovery) updatePodTargetGroup(pod *Pod) *config.TargetGroup { + d.mtx.RLock() + defer d.mtx.RUnlock() + + tg := &config.TargetGroup{ + Source: podSource(pod), + } + + // If this pod doesn't exist, return an empty target group + if _, ok := d.pods[pod.ObjectMeta.Namespace]; !ok { + return tg + } + if _, ok := d.pods[pod.ObjectMeta.Namespace][pod.ObjectMeta.Name]; !ok { + return tg + } + + tg.Labels = model.LabelSet{ + roleLabel: model.LabelValue("container"), + } + tg.Targets = updatePodTargets(pod, true) + + return tg +} + +func (d *podDiscovery) updatePodsTargetGroup() *config.TargetGroup { + tg := &config.TargetGroup{ + Source: podsTargetGroupName, + Labels: model.LabelSet{ + roleLabel: model.LabelValue("pod"), + }, + } + + for _, namespace := range d.pods { + for _, pod := range namespace { + tg.Targets = append(tg.Targets, updatePodTargets(pod, false)...) + } + } + + return tg +} + +func podSource(pod *Pod) string { + return sourcePodPrefix + ":" + pod.ObjectMeta.Namespace + ":" + pod.ObjectMeta.Name +} + +func updatePodTargets(pod *Pod, allContainers bool) []model.LabelSet { + var targets []model.LabelSet = make([]model.LabelSet, 0, len(pod.PodSpec.Containers)) + if pod.PodStatus.PodIP == "" { + log.Debugf("skipping pod %s -- PodStatus.PodIP is empty", pod.ObjectMeta.Name) + return targets + } + + if pod.PodStatus.Phase != "Running" { + log.Debugf("skipping pod %s -- status is not `Running`", pod.ObjectMeta.Name) + return targets + } + + ready := "unknown" + for _, cond := range pod.PodStatus.Conditions { + if strings.ToLower(cond.Type) == "ready" { + ready = strings.ToLower(cond.Status) + } + } + + sort.Sort(ByContainerName(pod.PodSpec.Containers)) + + for _, container := range pod.PodSpec.Containers { + // Collect a list of TCP ports + // Sort by port number, ascending + // Product a target pointed at the first port + // Include a label containing all ports (portName=port,PortName=port,...,) + var tcpPorts []ContainerPort + var portLabel *bytes.Buffer = bytes.NewBufferString(",") + + for _, port := range container.Ports { + if port.Protocol == "TCP" { + tcpPorts = append(tcpPorts, port) + } + } + + if len(tcpPorts) == 0 { + log.Debugf("skipping container %s with no TCP ports", container.Name) + continue + } + + sort.Sort(ByContainerPort(tcpPorts)) + + t := model.LabelSet{ + model.AddressLabel: model.LabelValue(net.JoinHostPort(pod.PodIP, strconv.FormatInt(int64(tcpPorts[0].ContainerPort), 10))), + podNameLabel: model.LabelValue(pod.ObjectMeta.Name), + podAddressLabel: model.LabelValue(pod.PodStatus.PodIP), + podNamespaceLabel: model.LabelValue(pod.ObjectMeta.Namespace), + podContainerNameLabel: model.LabelValue(container.Name), + podContainerPortNameLabel: model.LabelValue(tcpPorts[0].Name), + podReadyLabel: model.LabelValue(ready), + } + + for _, port := range tcpPorts { + portLabel.WriteString(port.Name) + portLabel.WriteString("=") + portLabel.WriteString(strconv.FormatInt(int64(port.ContainerPort), 10)) + portLabel.WriteString(",") + t[model.LabelName(podContainerPortMapPrefix+port.Name)] = model.LabelValue(strconv.FormatInt(int64(port.ContainerPort), 10)) + } + + t[model.LabelName(podContainerPortListLabel)] = model.LabelValue(portLabel.String()) + + for k, v := range pod.ObjectMeta.Labels { + labelName := strutil.SanitizeLabelName(podLabelPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + for k, v := range pod.ObjectMeta.Annotations { + labelName := strutil.SanitizeLabelName(podAnnotationPrefix + k) + t[model.LabelName(labelName)] = model.LabelValue(v) + } + + targets = append(targets, t) + + if !allContainers { + break + } + } + + if len(targets) == 0 { + log.Debugf("no targets for pod %s", pod.ObjectMeta.Name) + } + + return targets +} + +type ByContainerPort []ContainerPort + +func (a ByContainerPort) Len() int { return len(a) } +func (a ByContainerPort) Less(i, j int) bool { return a[i].ContainerPort < a[j].ContainerPort } +func (a ByContainerPort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +type ByContainerName []Container + +func (a ByContainerName) Len() int { return len(a) } +func (a ByContainerName) Less(i, j int) bool { return a[i].Name < a[j].Name } +func (a ByContainerName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }