From 46683dd67d2d39c7b293ffde72f1fbc37e55ab04 Mon Sep 17 00:00:00 2001 From: Yecheng Fu Date: Tue, 10 Apr 2018 16:53:00 +0800 Subject: [PATCH] Simplify code. - Unified `send` function. - Pass InformerSynced functions to `cache.WaitForCacheSync`. - Use `Role\w+` constants instead of literal string. Signed-off-by: Yecheng Fu --- discovery/kubernetes/endpoints.go | 27 +++++---------------------- discovery/kubernetes/ingress.go | 17 ++++------------- discovery/kubernetes/kubernetes.go | 21 ++++++++++++++++----- discovery/kubernetes/node.go | 19 ++++--------------- discovery/kubernetes/pod.go | 20 ++++---------------- discovery/kubernetes/service.go | 16 ++++------------ 6 files changed, 37 insertions(+), 83 deletions(-) diff --git a/discovery/kubernetes/endpoints.go b/discovery/kubernetes/endpoints.go index a31845541..b60eef7ab 100644 --- a/discovery/kubernetes/endpoints.go +++ b/discovery/kubernetes/endpoints.go @@ -126,30 +126,13 @@ func (e *Endpoints) enqueue(obj interface{}) { func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { defer e.queue.ShutDown() - cacheSyncs := []cache.InformerSynced{ - e.endpointsInf.HasSynced, - e.serviceInf.HasSynced, - e.podInf.HasSynced, - } - if !cache.WaitForCacheSync(ctx.Done(), cacheSyncs...) { + if !cache.WaitForCacheSync(ctx.Done(), e.endpointsInf.HasSynced, e.serviceInf.HasSynced, e.podInf.HasSynced) { level.Error(e.logger).Log("msg", "endpoints informer unable to sync cache") return } - // Send target groups for pod updates. - send := func(tg *targetgroup.Group) { - if tg == nil { - return - } - level.Debug(e.logger).Log("msg", "endpoints update", "tg", fmt.Sprintf("%#v", tg)) - select { - case <-ctx.Done(): - case ch <- []*targetgroup.Group{tg}: - } - } - go func() { - for e.process(send) { + for e.process(ctx, ch) { } }() @@ -157,7 +140,7 @@ func (e *Endpoints) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { <-ctx.Done() } -func (e *Endpoints) process(send func(tg *targetgroup.Group)) bool { +func (e *Endpoints) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { keyObj, quit := e.queue.Get() if quit { return false @@ -177,7 +160,7 @@ func (e *Endpoints) process(send func(tg *targetgroup.Group)) bool { return true } if !exists { - send(&targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)}) + send(ctx, e.logger, RoleEndpoint, ch, &targetgroup.Group{Source: endpointsSourceFromNamespaceAndName(namespace, name)}) return true } eps, err := convertToEndpoints(o) @@ -185,7 +168,7 @@ func (e *Endpoints) process(send func(tg *targetgroup.Group)) bool { level.Error(e.logger).Log("msg", "converting to Endpoints object failed", "err", err) return true } - send(e.buildEndpoints(eps)) + send(ctx, e.logger, RoleEndpoint, ch, e.buildEndpoints(eps)) return true } diff --git a/discovery/kubernetes/ingress.go b/discovery/kubernetes/ingress.go index f15fd4eb8..c997890a6 100644 --- a/discovery/kubernetes/ingress.go +++ b/discovery/kubernetes/ingress.go @@ -73,16 +73,8 @@ func (s *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return } - // Send target groups for ingress updates. - send := func(tg *targetgroup.Group) { - select { - case <-ctx.Done(): - case ch <- []*targetgroup.Group{tg}: - } - } - go func() { - for s.process(send) { + for s.process(ctx, ch) { } }() @@ -90,8 +82,7 @@ func (s *Ingress) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { <-ctx.Done() } -func (s *Ingress) process(send func(tg *targetgroup.Group)) bool { - +func (s *Ingress) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { keyObj, quit := s.queue.Get() if quit { return false @@ -109,7 +100,7 @@ func (s *Ingress) process(send func(tg *targetgroup.Group)) bool { return true } if !exists { - send(&targetgroup.Group{Source: ingressSourceFromNamespaceAndName(namespace, name)}) + send(ctx, s.logger, RoleIngress, ch, &targetgroup.Group{Source: ingressSourceFromNamespaceAndName(namespace, name)}) return true } eps, err := convertToIngress(o) @@ -117,7 +108,7 @@ func (s *Ingress) process(send func(tg *targetgroup.Group)) bool { level.Error(s.logger).Log("msg", "converting to Ingress object failed", "err", err) return true } - send(s.buildIngress(eps)) + send(ctx, s.logger, RoleIngress, ch, s.buildIngress(eps)) return true } diff --git a/discovery/kubernetes/kubernetes.go b/discovery/kubernetes/kubernetes.go index 592eab217..a3aaf509c 100644 --- a/discovery/kubernetes/kubernetes.go +++ b/discovery/kubernetes/kubernetes.go @@ -262,7 +262,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { namespaces := d.getNamespaces() switch d.role { - case "endpoints": + case RoleEndpoint: for _, namespace := range namespaces { elw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -299,7 +299,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { go eps.serviceInf.Run(ctx.Done()) go eps.podInf.Run(ctx.Done()) } - case "pod": + case RolePod: for _, namespace := range namespaces { plw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -316,7 +316,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { d.discoverers = append(d.discoverers, pod) go pod.informer.Run(ctx.Done()) } - case "service": + case RoleService: for _, namespace := range namespaces { slw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -333,7 +333,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { d.discoverers = append(d.discoverers, svc) go svc.informer.Run(ctx.Done()) } - case "ingress": + case RoleIngress: for _, namespace := range namespaces { ilw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -350,7 +350,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { d.discoverers = append(d.discoverers, ingress) go ingress.informer.Run(ctx.Done()) } - case "node": + case RoleNode: nlw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return d.client.CoreV1().Nodes().List(options) @@ -385,3 +385,14 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { func lv(s string) model.LabelValue { return model.LabelValue(s) } + +func send(ctx context.Context, l log.Logger, role Role, ch chan<- []*targetgroup.Group, tg *targetgroup.Group) { + if tg == nil { + return + } + level.Debug(l).Log("msg", "kubernetes discovery update", "role", string(role), "tg", fmt.Sprintf("%#v", tg)) + select { + case <-ctx.Done(): + case ch <- []*targetgroup.Group{tg}: + } +} diff --git a/discovery/kubernetes/node.go b/discovery/kubernetes/node.go index 36e59e66f..d5372df44 100644 --- a/discovery/kubernetes/node.go +++ b/discovery/kubernetes/node.go @@ -81,19 +81,8 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return } - // Send target groups for service updates. - send := func(tg *targetgroup.Group) { - if tg == nil { - return - } - select { - case <-ctx.Done(): - case ch <- []*targetgroup.Group{tg}: - } - } - go func() { - for n.process(send) { + for n.process(ctx, ch) { } }() @@ -101,7 +90,7 @@ func (n *Node) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { <-ctx.Done() } -func (n *Node) process(send func(tg *targetgroup.Group)) bool { +func (n *Node) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { keyObj, quit := n.queue.Get() if quit { return false @@ -119,7 +108,7 @@ func (n *Node) process(send func(tg *targetgroup.Group)) bool { return true } if !exists { - send(&targetgroup.Group{Source: nodeSourceFromName(name)}) + send(ctx, n.logger, RoleNode, ch, &targetgroup.Group{Source: nodeSourceFromName(name)}) return true } node, err := convertToNode(o) @@ -127,7 +116,7 @@ func (n *Node) process(send func(tg *targetgroup.Group)) bool { level.Error(n.logger).Log("msg", "converting to Node object failed", "err", err) return true } - send(n.buildNode(node)) + send(ctx, n.logger, RoleNode, ch, n.buildNode(node)) return true } diff --git a/discovery/kubernetes/pod.go b/discovery/kubernetes/pod.go index 096763425..9f9a2d73f 100644 --- a/discovery/kubernetes/pod.go +++ b/discovery/kubernetes/pod.go @@ -86,20 +86,8 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return } - // Send target groups for pod updates. - send := func(tg *targetgroup.Group) { - if tg == nil { - return - } - level.Debug(p.logger).Log("msg", "pod update", "tg", fmt.Sprintf("%#v", tg)) - select { - case <-ctx.Done(): - case ch <- []*targetgroup.Group{tg}: - } - } - go func() { - for p.process(send) { + for p.process(ctx, ch) { } }() @@ -107,7 +95,7 @@ func (p *Pod) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { <-ctx.Done() } -func (p *Pod) process(send func(tg *targetgroup.Group)) bool { +func (p *Pod) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { keyObj, quit := p.queue.Get() if quit { return false @@ -125,7 +113,7 @@ func (p *Pod) process(send func(tg *targetgroup.Group)) bool { return true } if !exists { - send(&targetgroup.Group{Source: podSourceFromNamespaceAndName(namespace, name)}) + send(ctx, p.logger, RolePod, ch, &targetgroup.Group{Source: podSourceFromNamespaceAndName(namespace, name)}) return true } eps, err := convertToPod(o) @@ -133,7 +121,7 @@ func (p *Pod) process(send func(tg *targetgroup.Group)) bool { level.Error(p.logger).Log("msg", "converting to Pod object failed", "err", err) return true } - send(p.buildPod(eps)) + send(ctx, p.logger, RolePod, ch, p.buildPod(eps)) return true } diff --git a/discovery/kubernetes/service.go b/discovery/kubernetes/service.go index 625f2343c..ba85a7d6d 100644 --- a/discovery/kubernetes/service.go +++ b/discovery/kubernetes/service.go @@ -79,16 +79,8 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { return } - // Send target groups for service updates. - send := func(tg *targetgroup.Group) { - select { - case <-ctx.Done(): - case ch <- []*targetgroup.Group{tg}: - } - } - go func() { - for s.process(send) { + for s.process(ctx, ch) { } }() @@ -96,7 +88,7 @@ func (s *Service) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { <-ctx.Done() } -func (s *Service) process(send func(tg *targetgroup.Group)) bool { +func (s *Service) process(ctx context.Context, ch chan<- []*targetgroup.Group) bool { keyObj, quit := s.queue.Get() if quit { return false @@ -114,7 +106,7 @@ func (s *Service) process(send func(tg *targetgroup.Group)) bool { return true } if !exists { - send(&targetgroup.Group{Source: serviceSourceFromNamespaceAndName(namespace, name)}) + send(ctx, s.logger, RoleService, ch, &targetgroup.Group{Source: serviceSourceFromNamespaceAndName(namespace, name)}) return true } eps, err := convertToService(o) @@ -122,7 +114,7 @@ func (s *Service) process(send func(tg *targetgroup.Group)) bool { level.Error(s.logger).Log("msg", "converting to Service object failed", "err", err) return true } - send(s.buildService(eps)) + send(ctx, s.logger, RoleService, ch, s.buildService(eps)) return true }