2014-06-06 23:40:48 +00:00
|
|
|
/*
|
2016-06-03 00:25:58 +00:00
|
|
|
Copyright 2014 The Kubernetes Authors.
|
2014-06-06 23:40:48 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package config
|
|
|
|
|
|
|
|
import (
|
2017-03-17 12:53:46 +00:00
|
|
|
"fmt"
|
|
|
|
"time"
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2015-08-05 22:05:17 +00:00
|
|
|
"github.com/golang/glog"
|
2017-03-17 12:53:46 +00:00
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
|
|
"k8s.io/client-go/tools/cache"
|
2015-08-05 22:03:47 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api"
|
2017-03-17 12:53:46 +00:00
|
|
|
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
|
2015-08-05 22:03:47 +00:00
|
|
|
"k8s.io/kubernetes/pkg/util/config"
|
2014-06-06 23:40:48 +00:00
|
|
|
)
|
|
|
|
|
2014-07-15 13:03:08 +00:00
|
|
|
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
|
2014-06-06 23:40:48 +00:00
|
|
|
type ServiceConfigHandler interface {
|
2015-08-08 19:16:55 +00:00
|
|
|
// OnServiceUpdate gets called when a configuration has been changed by one of the sources.
|
2014-07-11 11:48:18 +00:00
|
|
|
// This is the union of all the configuration sources.
|
2015-08-08 19:16:55 +00:00
|
|
|
OnServiceUpdate(services []api.Service)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2014-07-15 13:03:08 +00:00
|
|
|
// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
|
2014-06-06 23:40:48 +00:00
|
|
|
type EndpointsConfigHandler interface {
|
2015-08-08 19:16:55 +00:00
|
|
|
// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
|
2014-06-06 23:40:48 +00:00
|
|
|
// service on any of the configuration sources. An example is when a new
|
|
|
|
// service comes up, or when containers come up or down for an existing service.
|
2017-03-09 15:42:45 +00:00
|
|
|
//
|
|
|
|
// NOTE: For efficiency, endpoints are being passed by reference, thus,
|
|
|
|
// OnEndpointsUpdate should NOT modify pointers of a given slice.
|
|
|
|
// Those endpoints objects are shared with other layers of the system and
|
|
|
|
// are guaranteed to be immutable with the assumption that are also
|
|
|
|
// not mutated by those handlers. Make a deep copy if you need to modify
|
|
|
|
// them in your code.
|
|
|
|
OnEndpointsUpdate(endpoints []*api.Endpoints)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2014-07-08 16:55:11 +00:00
|
|
|
// EndpointsConfig tracks a set of endpoints configurations.
|
|
|
|
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
|
|
|
|
type EndpointsConfig struct {
|
2017-03-17 12:53:46 +00:00
|
|
|
informer cache.Controller
|
|
|
|
lister listers.EndpointsLister
|
|
|
|
handlers []EndpointsConfigHandler
|
|
|
|
// updates channel is used to trigger registered handlers.
|
|
|
|
updates chan struct{}
|
2014-07-08 16:55:11 +00:00
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2014-09-02 10:00:28 +00:00
|
|
|
// NewEndpointsConfig creates a new EndpointsConfig.
|
2017-03-17 12:53:46 +00:00
|
|
|
func NewEndpointsConfig(c cache.Getter, period time.Duration) *EndpointsConfig {
|
|
|
|
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
|
|
|
|
return newEndpointsConfig(endpointsLW, period)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newEndpointsConfig(lw cache.ListerWatcher, period time.Duration) *EndpointsConfig {
|
|
|
|
result := &EndpointsConfig{}
|
|
|
|
|
|
|
|
store, informer := cache.NewIndexerInformer(
|
|
|
|
lw,
|
|
|
|
&api.Endpoints{},
|
|
|
|
period,
|
|
|
|
cache.ResourceEventHandlerFuncs{
|
|
|
|
AddFunc: result.handleAddEndpoints,
|
|
|
|
UpdateFunc: result.handleUpdateEndpoints,
|
|
|
|
DeleteFunc: result.handleDeleteEndpoints,
|
|
|
|
},
|
|
|
|
cache.Indexers{},
|
|
|
|
)
|
|
|
|
result.informer = informer
|
|
|
|
result.lister = listers.NewEndpointsLister(store)
|
|
|
|
return result
|
2014-07-08 16:55:11 +00:00
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2017-02-27 16:38:59 +00:00
|
|
|
// RegisterHandler registers a handler which is called on every endpoints change.
|
2014-07-08 16:55:11 +00:00
|
|
|
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
|
2017-03-17 12:53:46 +00:00
|
|
|
c.handlers = append(c.handlers, handler)
|
2014-07-08 16:55:11 +00:00
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2017-03-17 14:29:44 +00:00
|
|
|
// Run starts the underlying informer and goroutine responsible for calling
|
|
|
|
// registered handlers.
|
2017-03-17 12:53:46 +00:00
|
|
|
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
|
|
|
|
// The updates channel is used to send interrupts to the Endpoints handler.
|
|
|
|
// It's buffered because we never want to block for as long as there is a
|
|
|
|
// pending interrupt, but don't want to drop them if the handler is doing
|
|
|
|
// work.
|
|
|
|
c.updates = make(chan struct{}, 1)
|
|
|
|
go c.informer.Run(stopCh)
|
|
|
|
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
|
|
|
|
utilruntime.HandleError(fmt.Errorf("endpoint controller not synced"))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// We have synced informers. Now we can start delivering updates
|
|
|
|
// to the registered handler.
|
2014-07-08 16:55:11 +00:00
|
|
|
go func() {
|
2017-03-17 12:53:46 +00:00
|
|
|
for range c.updates {
|
|
|
|
endpoints, err := c.lister.List(labels.Everything())
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error while listing endpoints from cache: %v", err)
|
|
|
|
// This will cause a retry (if there isn't any other trigger in-flight).
|
|
|
|
c.dispatchUpdate()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if endpoints == nil {
|
|
|
|
endpoints = []*api.Endpoints{}
|
|
|
|
}
|
|
|
|
for i := range c.handlers {
|
|
|
|
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
|
|
|
|
c.handlers[i].OnEndpointsUpdate(endpoints)
|
|
|
|
}
|
2014-07-08 16:55:11 +00:00
|
|
|
}
|
|
|
|
}()
|
2017-03-17 12:53:46 +00:00
|
|
|
// Close updates channel when stopCh is closed.
|
|
|
|
go func() {
|
|
|
|
<-stopCh
|
|
|
|
close(c.updates)
|
|
|
|
}()
|
2014-07-08 16:55:11 +00:00
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2017-03-17 12:53:46 +00:00
|
|
|
func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) {
|
|
|
|
c.dispatchUpdate()
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 12:53:46 +00:00
|
|
|
func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) {
|
|
|
|
c.dispatchUpdate()
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 12:53:46 +00:00
|
|
|
func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) {
|
|
|
|
c.dispatchUpdate()
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 12:53:46 +00:00
|
|
|
func (c *EndpointsConfig) dispatchUpdate() {
|
|
|
|
select {
|
|
|
|
case c.updates <- struct{}{}:
|
|
|
|
default:
|
|
|
|
glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-07-08 16:55:11 +00:00
|
|
|
// ServiceConfig tracks a set of service configurations.
|
|
|
|
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
|
|
|
|
type ServiceConfig struct {
|
2017-03-17 14:15:51 +00:00
|
|
|
informer cache.Controller
|
|
|
|
lister listers.ServiceLister
|
|
|
|
handlers []ServiceConfigHandler
|
|
|
|
// updates channel is used to trigger registered handlers
|
|
|
|
updates chan struct{}
|
2014-07-08 16:55:11 +00:00
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2014-07-08 16:55:11 +00:00
|
|
|
// NewServiceConfig creates a new ServiceConfig.
|
2017-03-17 14:15:51 +00:00
|
|
|
func NewServiceConfig(c cache.Getter, period time.Duration) *ServiceConfig {
|
|
|
|
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
|
|
|
|
return newServiceConfig(servicesLW, period)
|
|
|
|
}
|
|
|
|
|
|
|
|
func newServiceConfig(lw cache.ListerWatcher, period time.Duration) *ServiceConfig {
|
|
|
|
result := &ServiceConfig{}
|
|
|
|
|
|
|
|
store, informer := cache.NewIndexerInformer(
|
|
|
|
lw,
|
|
|
|
&api.Service{},
|
|
|
|
period,
|
|
|
|
cache.ResourceEventHandlerFuncs{
|
|
|
|
AddFunc: result.handleAddService,
|
|
|
|
UpdateFunc: result.handleUpdateService,
|
|
|
|
DeleteFunc: result.handleDeleteService,
|
|
|
|
},
|
|
|
|
cache.Indexers{},
|
|
|
|
)
|
|
|
|
result.informer = informer
|
|
|
|
result.lister = listers.NewServiceLister(store)
|
|
|
|
return result
|
2014-07-08 16:55:11 +00:00
|
|
|
}
|
2014-06-06 23:40:48 +00:00
|
|
|
|
2017-02-27 16:38:59 +00:00
|
|
|
// RegisterHandler registers a handler which is called on every services change.
|
2014-07-08 16:55:11 +00:00
|
|
|
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
|
2017-03-17 14:15:51 +00:00
|
|
|
c.handlers = append(c.handlers, handler)
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 14:29:44 +00:00
|
|
|
// Run starts the underlying informer and goroutine responsible for calling
|
|
|
|
// registered handlers.
|
2017-03-17 14:15:51 +00:00
|
|
|
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
|
|
|
// The updates channel is used to send interrupts to the Services handler.
|
|
|
|
// It's buffered because we never want to block for as long as there is a
|
|
|
|
// pending interrupt, but don't want to drop them if the handler is doing
|
|
|
|
// work.
|
|
|
|
c.updates = make(chan struct{}, 1)
|
|
|
|
go c.informer.Run(stopCh)
|
|
|
|
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
|
|
|
|
utilruntime.HandleError(fmt.Errorf("service controller not synced"))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// We hanve synced informers. Now we can start delivering updates
|
|
|
|
// to the registered handler.
|
2014-07-08 16:55:11 +00:00
|
|
|
go func() {
|
2017-03-17 14:15:51 +00:00
|
|
|
for range c.updates {
|
|
|
|
services, err := c.lister.List(labels.Everything())
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error while listing services from cache: %v", err)
|
|
|
|
// This will cause a retry (if there isnt' any other trigger in-flight).
|
|
|
|
c.dispatchUpdate()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
svcs := make([]api.Service, 0, len(services))
|
|
|
|
for i := range services {
|
|
|
|
svcs = append(svcs, *services[i])
|
|
|
|
}
|
|
|
|
for i := range c.handlers {
|
|
|
|
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
|
|
|
|
c.handlers[i].OnServiceUpdate(svcs)
|
|
|
|
}
|
2014-07-08 16:55:11 +00:00
|
|
|
}
|
|
|
|
}()
|
2017-03-17 14:15:51 +00:00
|
|
|
// Close updates channel when stopCh is closed.
|
|
|
|
go func() {
|
|
|
|
<-stopCh
|
|
|
|
close(c.updates)
|
|
|
|
}()
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 14:15:51 +00:00
|
|
|
func (c *ServiceConfig) handleAddService(_ interface{}) {
|
|
|
|
c.dispatchUpdate()
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 14:15:51 +00:00
|
|
|
func (c *ServiceConfig) handleUpdateService(_, _ interface{}) {
|
|
|
|
c.dispatchUpdate()
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 14:15:51 +00:00
|
|
|
func (c *ServiceConfig) handleDeleteService(_ interface{}) {
|
|
|
|
c.dispatchUpdate()
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
|
2017-03-17 14:15:51 +00:00
|
|
|
func (c *ServiceConfig) dispatchUpdate() {
|
|
|
|
select {
|
|
|
|
case c.updates <- struct{}{}:
|
|
|
|
default:
|
|
|
|
glog.V(4).Infof("Service handler alread has a pending interrupt.")
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-12-04 07:54:40 +00:00
|
|
|
// watchForUpdates invokes bcaster.Notify() with the latest version of an object
|
2014-07-08 16:55:11 +00:00
|
|
|
// when changes occur.
|
2014-12-04 07:54:40 +00:00
|
|
|
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
|
2015-02-20 17:30:33 +00:00
|
|
|
for true {
|
|
|
|
<-updates
|
2014-12-04 07:54:40 +00:00
|
|
|
bcaster.Notify(accessor.MergedState())
|
2014-06-06 23:40:48 +00:00
|
|
|
}
|
|
|
|
}
|