Adjust Kubernetes SD to pipeline changes

pull/977/head
Fabian Reinartz 2015-08-14 13:04:23 +02:00
parent 4e84b86510
commit f269943950
1 changed files with 78 additions and 56 deletions

View File

@ -160,64 +160,62 @@ func (kd *KubernetesDiscovery) Sources() []string {
}
// Run implements the TargetProvider interface.
func (kd *KubernetesDiscovery) Run(ch chan<- *config.TargetGroup) {
func (kd *KubernetesDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
defer close(ch)
kd.updateNodesTargetGroup(ch)
select {
case ch <- kd.updateNodesTargetGroup():
case <-done:
return
}
for _, ns := range kd.services {
for _, service := range ns {
kd.addService(service, ch)
select {
case ch <- kd.addService(service):
case <-done:
return
}
}
}
retryInterval := time.Duration(kd.Conf.RetryInterval)
update := make(chan interface{}, 10)
defer close(update)
go kd.watchNodes(update, retryInterval)
go kd.watchServices(update, retryInterval)
go kd.watchServiceEndpoints(update, retryInterval)
go kd.watchNodes(update, done, retryInterval)
go kd.watchServices(update, done, retryInterval)
go kd.watchServiceEndpoints(update, done, retryInterval)
var tg *config.TargetGroup
for {
select {
case <-kd.runDone:
case <-done:
return
case event := <-update:
switch obj := event.(type) {
case *nodeEvent:
kd.updateNode(obj.Node, obj.EventType)
kd.updateNodesTargetGroup(ch)
tg = kd.updateNodesTargetGroup()
case *serviceEvent:
kd.updateService(obj.Service, obj.EventType, ch)
tg = kd.updateService(obj.Service, obj.EventType)
case *endpointsEvent:
kd.updateServiceEndpoints(obj.Endpoints, obj.EventType, ch)
tg = kd.updateServiceEndpoints(obj.Endpoints, obj.EventType)
}
}
select {
case ch <- tg:
case <-done:
return
}
}
}
// Stop implements the TargetProvider interface.
func (kd *KubernetesDiscovery) Stop() {
log.Debugf("Stopping Kubernetes discovery for %s", kd.Conf.Server)
// The lock prevents Run from terminating while the watchers attempt
// to send on their channels.
func (kd *KubernetesDiscovery) updateNodesTargetGroup() *config.TargetGroup {
kd.nodesMu.Lock()
defer kd.nodesMu.Unlock()
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
// Terminate Run.
kd.runDone <- struct{}{}
log.Debugf("Kubernetes discovery for %s stopped.", kd.Conf.Server)
}
func (kd *KubernetesDiscovery) updateNodesTargetGroup(ch chan<- *config.TargetGroup) {
kd.nodesMu.Lock()
defer kd.nodesMu.Unlock()
tg := &config.TargetGroup{Source: nodesTargetGroupName}
// Now let's loop through the nodes & add them to the target group with appropriate labels.
@ -235,7 +233,7 @@ func (kd *KubernetesDiscovery) updateNodesTargetGroup(ch chan<- *config.TargetGr
tg.Targets = append(tg.Targets, t)
}
ch <- tg
return tg
}
func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) {
@ -253,7 +251,7 @@ func (kd *KubernetesDiscovery) updateNode(node *Node, eventType EventType) {
}
// watchNodes watches nodes as they come & go.
func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, retryInterval time.Duration) {
func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+nodesURL, nil)
if err != nil {
@ -283,13 +281,17 @@ func (kd *KubernetesDiscovery) watchNodes(events chan interface{}, retryInterval
return
}
kd.nodesResourceVersion = event.Node.ObjectMeta.ResourceVersion
events <- &event
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, kd.runDone)
}, retryInterval, done)
}
// watchServices watches services as they come & go.
func (kd *KubernetesDiscovery) watchServices(events chan interface{}, retryInterval time.Duration) {
func (kd *KubernetesDiscovery) watchServices(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+servicesURL, nil)
if err != nil {
@ -320,64 +322,77 @@ func (kd *KubernetesDiscovery) watchServices(events chan interface{}, retryInter
return
}
kd.servicesResourceVersion = event.Service.ObjectMeta.ResourceVersion
events <- &event
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, kd.runDone)
}, retryInterval, done)
}
func (kd *KubernetesDiscovery) updateService(service *Service, eventType EventType, ch chan<- *config.TargetGroup) {
func (kd *KubernetesDiscovery) updateService(service *Service, eventType EventType) *config.TargetGroup {
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
name := service.ObjectMeta.Name
namespace := service.ObjectMeta.Namespace
_, ok := kd.services[namespace][name]
var (
name = service.ObjectMeta.Name
namespace = service.ObjectMeta.Namespace
_, exists = kd.services[namespace][name]
)
switch eventType {
case deleted:
if ok {
kd.deleteService(service, ch)
if exists {
return kd.deleteService(service)
}
case added, modified:
kd.addService(service, ch)
return kd.addService(service)
}
return nil
}
func (kd *KubernetesDiscovery) deleteService(service *Service, ch chan<- *config.TargetGroup) {
func (kd *KubernetesDiscovery) deleteService(service *Service) *config.TargetGroup {
tg := &config.TargetGroup{Source: serviceSource(service)}
ch <- tg
delete(kd.services[service.ObjectMeta.Namespace], service.ObjectMeta.Name)
if len(kd.services[service.ObjectMeta.Namespace]) == 0 {
delete(kd.services, service.ObjectMeta.Namespace)
}
return tg
}
func (kd *KubernetesDiscovery) addService(service *Service, ch chan<- *config.TargetGroup) {
func (kd *KubernetesDiscovery) addService(service *Service) *config.TargetGroup {
namespace, ok := kd.services[service.ObjectMeta.Namespace]
if !ok {
namespace = map[string]*Service{}
kd.services[service.ObjectMeta.Namespace] = namespace
}
namespace[service.ObjectMeta.Name] = service
endpointURL := fmt.Sprintf(serviceEndpointsURL, service.ObjectMeta.Namespace, service.ObjectMeta.Name)
res, err := kd.client.Get(kd.Conf.Server + endpointURL)
if err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return
return nil
}
if res.StatusCode != http.StatusOK {
log.Errorf("Failed to get service endpoints: %s", res.StatusCode)
return
return nil
}
var endpoints Endpoints
if err := json.NewDecoder(res.Body).Decode(&endpoints); err != nil {
log.Errorf("Error getting service endpoints: %s", err)
return
return nil
}
kd.updateServiceTargetGroup(service, &endpoints, ch)
return kd.updateServiceTargetGroup(service, &endpoints)
}
func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoints *Endpoints, ch chan<- *config.TargetGroup) {
func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoints *Endpoints) *config.TargetGroup {
tg := &config.TargetGroup{
Source: serviceSource(service),
Labels: clientmodel.LabelSet{
@ -413,11 +428,11 @@ func (kd *KubernetesDiscovery) updateServiceTargetGroup(service *Service, endpoi
}
}
ch <- tg
return tg
}
// watchServiceEndpoints watches service endpoints as they come & go.
func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, retryInterval time.Duration) {
func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, done <-chan struct{}, retryInterval time.Duration) {
until(func() {
req, err := http.NewRequest("GET", kd.Conf.Server+endpointsURL, nil)
if err != nil {
@ -448,19 +463,26 @@ func (kd *KubernetesDiscovery) watchServiceEndpoints(events chan interface{}, re
return
}
kd.servicesResourceVersion = event.Endpoints.ObjectMeta.ResourceVersion
events <- &event
select {
case events <- &event:
case <-done:
}
}
}, retryInterval, kd.runDone)
}, retryInterval, done)
}
func (kd *KubernetesDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType, ch chan<- *config.TargetGroup) {
func (kd *KubernetesDiscovery) updateServiceEndpoints(endpoints *Endpoints, eventType EventType) *config.TargetGroup {
kd.servicesMu.Lock()
defer kd.servicesMu.Unlock()
serviceNamespace := endpoints.ObjectMeta.Namespace
serviceName := endpoints.ObjectMeta.Name
if service, ok := kd.services[serviceNamespace][serviceName]; ok {
kd.updateServiceTargetGroup(service, endpoints, ch)
return kd.updateServiceTargetGroup(service, endpoints)
}
return nil
}
func newKubernetesHTTPClient(conf *config.KubernetesSDConfig) (*http.Client, error) {