Merge pull request #2504 from prometheus/grobie/fix-discovery-naming

Follow golang naming conventions in discovery packages
pull/2503/merge
Fabian Reinartz 2017-03-17 08:01:48 +01:00 committed by GitHub
commit 0a7c8e9da1
14 changed files with 255 additions and 256 deletions

View File

@ -60,17 +60,17 @@ func init() {
prometheus.MustRegister(azureSDRefreshFailuresCount) prometheus.MustRegister(azureSDRefreshFailuresCount)
} }
// AzureDiscovery periodically performs Azure-SD requests. It implements // Discovery periodically performs Azure-SD requests. It implements
// the TargetProvider interface. // the TargetProvider interface.
type AzureDiscovery struct { type Discovery struct {
cfg *config.AzureSDConfig cfg *config.AzureSDConfig
interval time.Duration interval time.Duration
port int port int
} }
// NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. // NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets.
func NewDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery { func NewDiscovery(cfg *config.AzureSDConfig) *Discovery {
return &AzureDiscovery{ return &Discovery{
cfg: cfg, cfg: cfg,
interval: time.Duration(cfg.RefreshInterval), interval: time.Duration(cfg.RefreshInterval),
port: cfg.Port, port: cfg.Port,
@ -78,8 +78,8 @@ func NewDiscovery(cfg *config.AzureSDConfig) *AzureDiscovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
ticker := time.NewTicker(ad.interval) ticker := time.NewTicker(d.interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -89,7 +89,7 @@ func (ad *AzureDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGro
default: default:
} }
tg, err := ad.refresh() tg, err := d.refresh()
if err != nil { if err != nil {
log.Errorf("unable to refresh during Azure discovery: %s", err) log.Errorf("unable to refresh during Azure discovery: %s", err)
} else { } else {
@ -156,7 +156,7 @@ func newAzureResourceFromID(id string) (azureResource, error) {
}, nil }, nil
} }
func (ad *AzureDiscovery) refresh() (tg *config.TargetGroup, err error) { func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
t0 := time.Now() t0 := time.Now()
defer func() { defer func() {
azureSDRefreshDuration.Observe(time.Since(t0).Seconds()) azureSDRefreshDuration.Observe(time.Since(t0).Seconds())
@ -165,7 +165,7 @@ func (ad *AzureDiscovery) refresh() (tg *config.TargetGroup, err error) {
} }
}() }()
tg = &config.TargetGroup{} tg = &config.TargetGroup{}
client, err := createAzureClient(*ad.cfg) client, err := createAzureClient(*d.cfg)
if err != nil { if err != nil {
return tg, fmt.Errorf("could not create Azure client: %s", err) return tg, fmt.Errorf("could not create Azure client: %s", err)
} }
@ -246,7 +246,7 @@ func (ad *AzureDiscovery) refresh() (tg *config.TargetGroup, err error) {
for _, ip := range *networkInterface.Properties.IPConfigurations { for _, ip := range *networkInterface.Properties.IPConfigurations {
if ip.Properties.PrivateIPAddress != nil { if ip.Properties.PrivateIPAddress != nil {
labels[azureLabelMachinePrivateIP] = model.LabelValue(*ip.Properties.PrivateIPAddress) labels[azureLabelMachinePrivateIP] = model.LabelValue(*ip.Properties.PrivateIPAddress)
address := net.JoinHostPort(*ip.Properties.PrivateIPAddress, fmt.Sprintf("%d", ad.port)) address := net.JoinHostPort(*ip.Properties.PrivateIPAddress, fmt.Sprintf("%d", d.port))
labels[model.AddressLabel] = model.LabelValue(address) labels[model.AddressLabel] = model.LabelValue(address)
ch <- target{labelSet: labels, err: nil} ch <- target{labelSet: labels, err: nil}
return return

View File

@ -117,12 +117,12 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) {
} }
// shouldWatch returns whether the service of the given name should be watched. // shouldWatch returns whether the service of the given name should be watched.
func (cd *Discovery) shouldWatch(name string) bool { func (d *Discovery) shouldWatch(name string) bool {
// If there's no fixed set of watched services, we watch everything. // If there's no fixed set of watched services, we watch everything.
if len(cd.watchedServices) == 0 { if len(d.watchedServices) == 0 {
return true return true
} }
for _, sn := range cd.watchedServices { for _, sn := range d.watchedServices {
if sn == name { if sn == name {
return true return true
} }
@ -131,13 +131,13 @@ func (cd *Discovery) shouldWatch(name string) bool {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Watched services and their cancelation functions. // Watched services and their cancelation functions.
services := map[string]func(){} services := map[string]func(){}
var lastIndex uint64 var lastIndex uint64
for { for {
catalog := cd.client.Catalog() catalog := d.client.Catalog()
t0 := time.Now() t0 := time.Now()
srvs, meta, err := catalog.Services(&consul.QueryOptions{ srvs, meta, err := catalog.Services(&consul.QueryOptions{
WaitIndex: lastIndex, WaitIndex: lastIndex,
@ -167,19 +167,19 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// If the datacenter was not set from clientConf, let's get it from the local Consul agent // If the datacenter was not set from clientConf, let's get it from the local Consul agent
// (Consul default is to use local node's datacenter if one isn't given for a query). // (Consul default is to use local node's datacenter if one isn't given for a query).
if cd.clientDatacenter == "" { if d.clientDatacenter == "" {
info, err := cd.client.Agent().Self() info, err := d.client.Agent().Self()
if err != nil { if err != nil {
log.Errorf("Error retrieving datacenter name: %s", err) log.Errorf("Error retrieving datacenter name: %s", err)
time.Sleep(retryInterval) time.Sleep(retryInterval)
continue continue
} }
cd.clientDatacenter = info["Config"]["Datacenter"].(string) d.clientDatacenter = info["Config"]["Datacenter"].(string)
} }
// Check for new services. // Check for new services.
for name := range srvs { for name := range srvs {
if !cd.shouldWatch(name) { if !d.shouldWatch(name) {
continue continue
} }
if _, ok := services[name]; ok { if _, ok := services[name]; ok {
@ -187,13 +187,13 @@ func (cd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
} }
srv := &consulService{ srv := &consulService{
client: cd.client, client: d.client,
name: name, name: name,
labels: model.LabelSet{ labels: model.LabelSet{
serviceLabel: model.LabelValue(name), serviceLabel: model.LabelValue(name),
datacenterLabel: model.LabelValue(cd.clientDatacenter), datacenterLabel: model.LabelValue(d.clientDatacenter),
}, },
tagSeparator: cd.tagSeparator, tagSeparator: d.tagSeparator,
} }
wctx, cancel := context.WithCancel(ctx) wctx, cancel := context.WithCancel(ctx)

View File

@ -64,7 +64,6 @@ type Discovery struct {
names []string names []string
interval time.Duration interval time.Duration
m sync.RWMutex
port int port int
qtype uint16 qtype uint16
} }
@ -89,30 +88,30 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (dd *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
ticker := time.NewTicker(dd.interval) ticker := time.NewTicker(d.interval)
defer ticker.Stop() defer ticker.Stop()
// Get an initial set right away. // Get an initial set right away.
dd.refreshAll(ctx, ch) d.refreshAll(ctx, ch)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
dd.refreshAll(ctx, ch) d.refreshAll(ctx, ch)
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
} }
func (dd *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGroup) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(dd.names)) wg.Add(len(d.names))
for _, name := range dd.names { for _, name := range d.names {
go func(n string) { go func(n string) {
if err := dd.refresh(ctx, n, ch); err != nil { if err := d.refresh(ctx, n, ch); err != nil {
log.Errorf("Error refreshing DNS targets: %s", err) log.Errorf("Error refreshing DNS targets: %s", err)
} }
wg.Done() wg.Done()
@ -122,8 +121,8 @@ func (dd *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetG
wg.Wait() wg.Wait()
} }
func (dd *Discovery) refresh(ctx context.Context, name string, ch chan<- []*config.TargetGroup) error { func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*config.TargetGroup) error {
response, err := lookupAll(name, dd.qtype) response, err := lookupAll(name, d.qtype)
dnsSDLookupsCount.Inc() dnsSDLookupsCount.Inc()
if err != nil { if err != nil {
dnsSDLookupFailuresCount.Inc() dnsSDLookupFailuresCount.Inc()
@ -144,9 +143,9 @@ func (dd *Discovery) refresh(ctx context.Context, name string, ch chan<- []*conf
target = hostPort(addr.Target, int(addr.Port)) target = hostPort(addr.Target, int(addr.Port))
case *dns.A: case *dns.A:
target = hostPort(addr.A.String(), dd.port) target = hostPort(addr.A.String(), d.port)
case *dns.AAAA: case *dns.AAAA:
target = hostPort(addr.AAAA.String(), dd.port) target = hostPort(addr.AAAA.String(), d.port)
default: default:
log.Warnf("%q is not a valid SRV record", record) log.Warnf("%q is not a valid SRV record", record)
continue continue

View File

@ -65,9 +65,9 @@ func init() {
prometheus.MustRegister(ec2SDRefreshDuration) prometheus.MustRegister(ec2SDRefreshDuration)
} }
// EC2Discovery periodically performs EC2-SD requests. It implements // Discovery periodically performs EC2-SD requests. It implements
// the TargetProvider interface. // the TargetProvider interface.
type EC2Discovery struct { type Discovery struct {
aws *aws.Config aws *aws.Config
interval time.Duration interval time.Duration
profile string profile string
@ -75,12 +75,12 @@ type EC2Discovery struct {
} }
// NewDiscovery returns a new EC2Discovery which periodically refreshes its targets. // NewDiscovery returns a new EC2Discovery which periodically refreshes its targets.
func NewDiscovery(conf *config.EC2SDConfig) *EC2Discovery { func NewDiscovery(conf *config.EC2SDConfig) *Discovery {
creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "") creds := credentials.NewStaticCredentials(conf.AccessKey, conf.SecretKey, "")
if conf.AccessKey == "" && conf.SecretKey == "" { if conf.AccessKey == "" && conf.SecretKey == "" {
creds = nil creds = nil
} }
return &EC2Discovery{ return &Discovery{
aws: &aws.Config{ aws: &aws.Config{
Region: &conf.Region, Region: &conf.Region,
Credentials: creds, Credentials: creds,
@ -92,12 +92,12 @@ func NewDiscovery(conf *config.EC2SDConfig) *EC2Discovery {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
ticker := time.NewTicker(ed.interval) ticker := time.NewTicker(d.interval)
defer ticker.Stop() defer ticker.Stop()
// Get an initial set right away. // Get an initial set right away.
tg, err := ed.refresh() tg, err := d.refresh()
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
@ -111,7 +111,7 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
tg, err := ed.refresh() tg, err := d.refresh()
if err != nil { if err != nil {
log.Error(err) log.Error(err)
continue continue
@ -128,7 +128,7 @@ func (ed *EC2Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup
} }
} }
func (ed *EC2Discovery) refresh() (tg *config.TargetGroup, err error) { func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
t0 := time.Now() t0 := time.Now()
defer func() { defer func() {
ec2SDRefreshDuration.Observe(time.Since(t0).Seconds()) ec2SDRefreshDuration.Observe(time.Since(t0).Seconds())
@ -138,8 +138,8 @@ func (ed *EC2Discovery) refresh() (tg *config.TargetGroup, err error) {
}() }()
sess, err := session.NewSessionWithOptions(session.Options{ sess, err := session.NewSessionWithOptions(session.Options{
Config: *ed.aws, Config: *d.aws,
Profile: ed.profile, Profile: d.profile,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create aws session: %s", err) return nil, fmt.Errorf("could not create aws session: %s", err)
@ -147,7 +147,7 @@ func (ed *EC2Discovery) refresh() (tg *config.TargetGroup, err error) {
ec2s := ec2.New(sess) ec2s := ec2.New(sess)
tg = &config.TargetGroup{ tg = &config.TargetGroup{
Source: *ed.aws.Region, Source: *d.aws.Region,
} }
if err = ec2s.DescribeInstancesPages(nil, func(p *ec2.DescribeInstancesOutput, lastPage bool) bool { if err = ec2s.DescribeInstancesPages(nil, func(p *ec2.DescribeInstancesOutput, lastPage bool) bool {
for _, r := range p.Reservations { for _, r := range p.Reservations {
@ -159,7 +159,7 @@ func (ed *EC2Discovery) refresh() (tg *config.TargetGroup, err error) {
ec2LabelInstanceID: model.LabelValue(*inst.InstanceId), ec2LabelInstanceID: model.LabelValue(*inst.InstanceId),
} }
labels[ec2LabelPrivateIP] = model.LabelValue(*inst.PrivateIpAddress) labels[ec2LabelPrivateIP] = model.LabelValue(*inst.PrivateIpAddress)
addr := net.JoinHostPort(*inst.PrivateIpAddress, fmt.Sprintf("%d", ed.port)) addr := net.JoinHostPort(*inst.PrivateIpAddress, fmt.Sprintf("%d", d.port))
labels[model.AddressLabel] = model.LabelValue(addr) labels[model.AddressLabel] = model.LabelValue(addr)
if inst.PublicIpAddress != nil { if inst.PublicIpAddress != nil {

View File

@ -51,10 +51,10 @@ func init() {
prometheus.MustRegister(fileSDReadErrorsCount) prometheus.MustRegister(fileSDReadErrorsCount)
} }
// FileDiscovery provides service discovery functionality based // Discovery provides service discovery functionality based
// on files that contain target groups in JSON or YAML format. Refreshing // on files that contain target groups in JSON or YAML format. Refreshing
// happens using file watches and periodic refreshes. // happens using file watches and periodic refreshes.
type FileDiscovery struct { type Discovery struct {
paths []string paths []string
watcher *fsnotify.Watcher watcher *fsnotify.Watcher
interval time.Duration interval time.Duration
@ -66,17 +66,17 @@ type FileDiscovery struct {
} }
// NewDiscovery returns a new file discovery for the given paths. // NewDiscovery returns a new file discovery for the given paths.
func NewDiscovery(conf *config.FileSDConfig) *FileDiscovery { func NewDiscovery(conf *config.FileSDConfig) *Discovery {
return &FileDiscovery{ return &Discovery{
paths: conf.Files, paths: conf.Files,
interval: time.Duration(conf.RefreshInterval), interval: time.Duration(conf.RefreshInterval),
} }
} }
// listFiles returns a list of all files that match the configured patterns. // listFiles returns a list of all files that match the configured patterns.
func (fd *FileDiscovery) listFiles() []string { func (d *Discovery) listFiles() []string {
var paths []string var paths []string
for _, p := range fd.paths { for _, p := range d.paths {
files, err := filepath.Glob(p) files, err := filepath.Glob(p)
if err != nil { if err != nil {
log.Errorf("Error expanding glob %q: %s", p, err) log.Errorf("Error expanding glob %q: %s", p, err)
@ -89,36 +89,36 @@ func (fd *FileDiscovery) listFiles() []string {
// watchFiles sets watches on all full paths or directories that were configured for // watchFiles sets watches on all full paths or directories that were configured for
// this file discovery. // this file discovery.
func (fd *FileDiscovery) watchFiles() { func (d *Discovery) watchFiles() {
if fd.watcher == nil { if d.watcher == nil {
panic("no watcher configured") panic("no watcher configured")
} }
for _, p := range fd.paths { for _, p := range d.paths {
if idx := strings.LastIndex(p, "/"); idx > -1 { if idx := strings.LastIndex(p, "/"); idx > -1 {
p = p[:idx] p = p[:idx]
} else { } else {
p = "./" p = "./"
} }
if err := fd.watcher.Add(p); err != nil { if err := d.watcher.Add(p); err != nil {
log.Errorf("Error adding file watch for %q: %s", p, err) log.Errorf("Error adding file watch for %q: %s", p, err)
} }
} }
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer fd.stop() defer d.stop()
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
log.Errorf("Error creating file watcher: %s", err) log.Errorf("Error creating file watcher: %s", err)
return return
} }
fd.watcher = watcher d.watcher = watcher
fd.refresh(ctx, ch) d.refresh(ctx, ch)
ticker := time.NewTicker(fd.interval) ticker := time.NewTicker(d.interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -126,7 +126,7 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou
case <-ctx.Done(): case <-ctx.Done():
return return
case event := <-fd.watcher.Events: case event := <-d.watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation. // fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out. // It's unclear what they are and why they are sent - filter them out.
if len(event.Name) == 0 { if len(event.Name) == 0 {
@ -140,14 +140,14 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou
// different combinations of operations. For all practical purposes // different combinations of operations. For all practical purposes
// this is inaccurate. // this is inaccurate.
// The most reliable solution is to reload everything if anything happens. // The most reliable solution is to reload everything if anything happens.
fd.refresh(ctx, ch) d.refresh(ctx, ch)
case <-ticker.C: case <-ticker.C:
// Setting a new watch after an update might fail. Make sure we don't lose // Setting a new watch after an update might fail. Make sure we don't lose
// those files forever. // those files forever.
fd.refresh(ctx, ch) d.refresh(ctx, ch)
case err := <-fd.watcher.Errors: case err := <-d.watcher.Errors:
if err != nil { if err != nil {
log.Errorf("Error on file watch: %s", err) log.Errorf("Error on file watch: %s", err)
} }
@ -156,8 +156,8 @@ func (fd *FileDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGrou
} }
// stop shuts down the file watcher. // stop shuts down the file watcher.
func (fd *FileDiscovery) stop() { func (d *Discovery) stop() {
log.Debugf("Stopping file discovery for %s...", fd.paths) log.Debugf("Stopping file discovery for %s...", d.paths)
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)
@ -166,37 +166,37 @@ func (fd *FileDiscovery) stop() {
go func() { go func() {
for { for {
select { select {
case <-fd.watcher.Errors: case <-d.watcher.Errors:
case <-fd.watcher.Events: case <-d.watcher.Events:
// Drain all events and errors. // Drain all events and errors.
case <-done: case <-done:
return return
} }
} }
}() }()
if err := fd.watcher.Close(); err != nil { if err := d.watcher.Close(); err != nil {
log.Errorf("Error closing file watcher for %s: %s", fd.paths, err) log.Errorf("Error closing file watcher for %s: %s", d.paths, err)
} }
log.Debugf("File discovery for %s stopped.", fd.paths) log.Debugf("File discovery for %s stopped.", d.paths)
} }
// refresh reads all files matching the discovery's patterns and sends the respective // refresh reads all files matching the discovery's patterns and sends the respective
// updated target groups through the channel. // updated target groups through the channel.
func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup) {
t0 := time.Now() t0 := time.Now()
defer func() { defer func() {
fileSDScanDuration.Observe(time.Since(t0).Seconds()) fileSDScanDuration.Observe(time.Since(t0).Seconds())
}() }()
ref := map[string]int{} ref := map[string]int{}
for _, p := range fd.listFiles() { for _, p := range d.listFiles() {
tgroups, err := readFile(p) tgroups, err := readFile(p)
if err != nil { if err != nil {
fileSDReadErrorsCount.Inc() fileSDReadErrorsCount.Inc()
log.Errorf("Error reading file %q: %s", p, err) log.Errorf("Error reading file %q: %s", p, err)
// Prevent deletion down below. // Prevent deletion down below.
ref[p] = fd.lastRefresh[p] ref[p] = d.lastRefresh[p]
continue continue
} }
select { select {
@ -208,7 +208,7 @@ func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.Target
ref[p] = len(tgroups) ref[p] = len(tgroups)
} }
// Send empty updates for sources that disappeared. // Send empty updates for sources that disappeared.
for f, n := range fd.lastRefresh { for f, n := range d.lastRefresh {
m, ok := ref[f] m, ok := ref[f]
if !ok || n > m { if !ok || n > m {
for i := m; i < n; i++ { for i := m; i < n; i++ {
@ -220,9 +220,9 @@ func (fd *FileDiscovery) refresh(ctx context.Context, ch chan<- []*config.Target
} }
} }
} }
fd.lastRefresh = ref d.lastRefresh = ref
fd.watchFiles() d.watchFiles()
} }
// fileSource returns a source ID for the i-th target group in the file. // fileSource returns a source ID for the i-th target group in the file.

View File

@ -44,9 +44,6 @@ const (
gceLabelInstanceStatus = gceLabel + "instance_status" gceLabelInstanceStatus = gceLabel + "instance_status"
gceLabelTags = gceLabel + "tags" gceLabelTags = gceLabel + "tags"
gceLabelMetadata = gceLabel + "metadata_" gceLabelMetadata = gceLabel + "metadata_"
// Constants for instrumentation.
namespace = "prometheus"
) )
var ( var (
@ -67,9 +64,9 @@ func init() {
prometheus.MustRegister(gceSDRefreshDuration) prometheus.MustRegister(gceSDRefreshDuration)
} }
// GCEDiscovery periodically performs GCE-SD requests. It implements // Discovery periodically performs GCE-SD requests. It implements
// the TargetProvider interface. // the TargetProvider interface.
type GCEDiscovery struct { type Discovery struct {
project string project string
zone string zone string
filter string filter string
@ -81,9 +78,9 @@ type GCEDiscovery struct {
tagSeparator string tagSeparator string
} }
// NewGCEDiscovery returns a new GCEDiscovery which periodically refreshes its targets. // NewDiscovery returns a new Discovery which periodically refreshes its targets.
func NewDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) { func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) {
gd := &GCEDiscovery{ gd := &Discovery{
project: conf.Project, project: conf.Project,
zone: conf.Zone, zone: conf.Zone,
filter: conf.Filter, filter: conf.Filter,
@ -105,9 +102,9 @@ func NewDiscovery(conf *config.GCESDConfig) (*GCEDiscovery, error) {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
// Get an initial set right away. // Get an initial set right away.
tg, err := gd.refresh() tg, err := d.refresh()
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} else { } else {
@ -117,13 +114,13 @@ func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup
} }
} }
ticker := time.NewTicker(gd.interval) ticker := time.NewTicker(d.interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
tg, err := gd.refresh() tg, err := d.refresh()
if err != nil { if err != nil {
log.Error(err) log.Error(err)
continue continue
@ -138,7 +135,7 @@ func (gd *GCEDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup
} }
} }
func (gd *GCEDiscovery) refresh() (tg *config.TargetGroup, err error) { func (d *Discovery) refresh() (tg *config.TargetGroup, err error) {
t0 := time.Now() t0 := time.Now()
defer func() { defer func() {
gceSDRefreshDuration.Observe(time.Since(t0).Seconds()) gceSDRefreshDuration.Observe(time.Since(t0).Seconds())
@ -148,12 +145,12 @@ func (gd *GCEDiscovery) refresh() (tg *config.TargetGroup, err error) {
}() }()
tg = &config.TargetGroup{ tg = &config.TargetGroup{
Source: fmt.Sprintf("GCE_%s_%s", gd.project, gd.zone), Source: fmt.Sprintf("GCE_%s_%s", d.project, d.zone),
} }
ilc := gd.isvc.List(gd.project, gd.zone) ilc := d.isvc.List(d.project, d.zone)
if len(gd.filter) > 0 { if len(d.filter) > 0 {
ilc = ilc.Filter(gd.filter) ilc = ilc.Filter(d.filter)
} }
err = ilc.Pages(nil, func(l *compute.InstanceList) error { err = ilc.Pages(nil, func(l *compute.InstanceList) error {
for _, inst := range l.Items { for _, inst := range l.Items {
@ -161,7 +158,7 @@ func (gd *GCEDiscovery) refresh() (tg *config.TargetGroup, err error) {
continue continue
} }
labels := model.LabelSet{ labels := model.LabelSet{
gceLabelProject: model.LabelValue(gd.project), gceLabelProject: model.LabelValue(d.project),
gceLabelZone: model.LabelValue(inst.Zone), gceLabelZone: model.LabelValue(inst.Zone),
gceLabelInstanceName: model.LabelValue(inst.Name), gceLabelInstanceName: model.LabelValue(inst.Name),
gceLabelInstanceStatus: model.LabelValue(inst.Status), gceLabelInstanceStatus: model.LabelValue(inst.Status),
@ -170,14 +167,14 @@ func (gd *GCEDiscovery) refresh() (tg *config.TargetGroup, err error) {
labels[gceLabelNetwork] = model.LabelValue(priIface.Network) labels[gceLabelNetwork] = model.LabelValue(priIface.Network)
labels[gceLabelSubnetwork] = model.LabelValue(priIface.Subnetwork) labels[gceLabelSubnetwork] = model.LabelValue(priIface.Subnetwork)
labels[gceLabelPrivateIP] = model.LabelValue(priIface.NetworkIP) labels[gceLabelPrivateIP] = model.LabelValue(priIface.NetworkIP)
addr := fmt.Sprintf("%s:%d", priIface.NetworkIP, gd.port) addr := fmt.Sprintf("%s:%d", priIface.NetworkIP, d.port)
labels[model.AddressLabel] = model.LabelValue(addr) labels[model.AddressLabel] = model.LabelValue(addr)
// Tags in GCE are usually only used for networking rules. // Tags in GCE are usually only used for networking rules.
if inst.Tags != nil && len(inst.Tags.Items) > 0 { if inst.Tags != nil && len(inst.Tags.Items) > 0 {
// We surround the separated list with the separator as well. This way regular expressions // We surround the separated list with the separator as well. This way regular expressions
// in relabeling rules don't have to consider tag positions. // in relabeling rules don't have to consider tag positions.
tags := gd.tagSeparator + strings.Join(inst.Tags.Items, gd.tagSeparator) + gd.tagSeparator tags := d.tagSeparator + strings.Join(inst.Tags.Items, d.tagSeparator) + d.tagSeparator
labels[gceLabelTags] = model.LabelValue(tags) labels[gceLabelTags] = model.LabelValue(tags)
} }

View File

@ -45,33 +45,33 @@ func makeEndpoints() *v1.Endpoints {
Namespace: "default", Namespace: "default",
}, },
Subsets: []v1.EndpointSubset{ Subsets: []v1.EndpointSubset{
v1.EndpointSubset{ {
Addresses: []v1.EndpointAddress{ Addresses: []v1.EndpointAddress{
v1.EndpointAddress{ {
IP: "1.2.3.4", IP: "1.2.3.4",
}, },
}, },
Ports: []v1.EndpointPort{ Ports: []v1.EndpointPort{
v1.EndpointPort{ {
Name: "testport", Name: "testport",
Port: 9000, Port: 9000,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
}, },
}, },
}, },
v1.EndpointSubset{ {
Addresses: []v1.EndpointAddress{ Addresses: []v1.EndpointAddress{
v1.EndpointAddress{ {
IP: "2.3.4.5", IP: "2.3.4.5",
}, },
}, },
NotReadyAddresses: []v1.EndpointAddress{ NotReadyAddresses: []v1.EndpointAddress{
v1.EndpointAddress{ {
IP: "2.3.4.5", IP: "2.3.4.5",
}, },
}, },
Ports: []v1.EndpointPort{ Ports: []v1.EndpointPort{
v1.EndpointPort{ {
Name: "testport", Name: "testport",
Port: 9001, Port: 9001,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
@ -89,21 +89,21 @@ func TestEndpointsDiscoveryInitial(t *testing.T) {
k8sDiscoveryTest{ k8sDiscoveryTest{
discovery: n, discovery: n,
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:9000", "__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true", "__meta_kubernetes_endpoint_ready": "true",
}, },
model.LabelSet{ {
"__address__": "2.3.4.5:9001", "__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true", "__meta_kubernetes_endpoint_ready": "true",
}, },
model.LabelSet{ {
"__address__": "2.3.4.5:9001", "__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_port_protocol": "TCP",
@ -130,20 +130,20 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
Spec: v1.PodSpec{ Spec: v1.PodSpec{
NodeName: "testnode", NodeName: "testnode",
Containers: []v1.Container{ Containers: []v1.Container{
v1.Container{ {
Name: "c1", Name: "c1",
Ports: []v1.ContainerPort{ Ports: []v1.ContainerPort{
v1.ContainerPort{ {
Name: "mainport", Name: "mainport",
ContainerPort: 9000, ContainerPort: 9000,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
}, },
}, },
}, },
v1.Container{ {
Name: "c2", Name: "c2",
Ports: []v1.ContainerPort{ Ports: []v1.ContainerPort{
v1.ContainerPort{ {
Name: "sideport", Name: "sideport",
ContainerPort: 9001, ContainerPort: 9001,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
@ -169,9 +169,9 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
Namespace: "default", Namespace: "default",
}, },
Subsets: []v1.EndpointSubset{ Subsets: []v1.EndpointSubset{
v1.EndpointSubset{ {
Addresses: []v1.EndpointAddress{ Addresses: []v1.EndpointAddress{
v1.EndpointAddress{ {
IP: "4.3.2.1", IP: "4.3.2.1",
TargetRef: &v1.ObjectReference{ TargetRef: &v1.ObjectReference{
Kind: "Pod", Kind: "Pod",
@ -181,7 +181,7 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
}, },
}, },
Ports: []v1.EndpointPort{ Ports: []v1.EndpointPort{
v1.EndpointPort{ {
Name: "testport", Name: "testport",
Port: 9000, Port: 9000,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
@ -194,9 +194,9 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
}() }()
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "4.3.2.1:9000", "__address__": "4.3.2.1:9000",
"__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_port_protocol": "TCP",
@ -211,7 +211,7 @@ func TestEndpointsDiscoveryAdd(t *testing.T) {
"__meta_kubernetes_pod_container_port_number": "9000", "__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP", "__meta_kubernetes_pod_container_port_protocol": "TCP",
}, },
model.LabelSet{ {
"__address__": "1.2.3.4:9001", "__address__": "1.2.3.4:9001",
"__meta_kubernetes_pod_name": "testpod", "__meta_kubernetes_pod_name": "testpod",
"__meta_kubernetes_pod_ip": "1.2.3.4", "__meta_kubernetes_pod_ip": "1.2.3.4",
@ -242,7 +242,7 @@ func TestEndpointsDiscoveryDelete(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { eps.Delete(makeEndpoints()) }() }, afterStart: func() { go func() { eps.Delete(makeEndpoints()) }() },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Source: "endpoints/default/testendpoints", Source: "endpoints/default/testendpoints",
}, },
}, },
@ -257,7 +257,7 @@ func TestEndpointsDiscoveryDeleteUnknownCacheState(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { eps.Delete(cache.DeletedFinalStateUnknown{Obj: makeEndpoints()}) }() }, afterStart: func() { go func() { eps.Delete(cache.DeletedFinalStateUnknown{Obj: makeEndpoints()}) }() },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Source: "endpoints/default/testendpoints", Source: "endpoints/default/testendpoints",
}, },
}, },
@ -278,28 +278,28 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) {
Namespace: "default", Namespace: "default",
}, },
Subsets: []v1.EndpointSubset{ Subsets: []v1.EndpointSubset{
v1.EndpointSubset{ {
Addresses: []v1.EndpointAddress{ Addresses: []v1.EndpointAddress{
v1.EndpointAddress{ {
IP: "1.2.3.4", IP: "1.2.3.4",
}, },
}, },
Ports: []v1.EndpointPort{ Ports: []v1.EndpointPort{
v1.EndpointPort{ {
Name: "testport", Name: "testport",
Port: 9000, Port: 9000,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
}, },
}, },
}, },
v1.EndpointSubset{ {
Addresses: []v1.EndpointAddress{ Addresses: []v1.EndpointAddress{
v1.EndpointAddress{ {
IP: "2.3.4.5", IP: "2.3.4.5",
}, },
}, },
Ports: []v1.EndpointPort{ Ports: []v1.EndpointPort{
v1.EndpointPort{ {
Name: "testport", Name: "testport",
Port: 9001, Port: 9001,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
@ -311,15 +311,15 @@ func TestEndpointsDiscoveryUpdate(t *testing.T) {
}() }()
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:9000", "__address__": "1.2.3.4:9000",
"__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_port_protocol": "TCP",
"__meta_kubernetes_endpoint_ready": "true", "__meta_kubernetes_endpoint_ready": "true",
}, },
model.LabelSet{ {
"__address__": "2.3.4.5:9001", "__address__": "2.3.4.5:9001",
"__meta_kubernetes_endpoint_port_name": "testport", "__meta_kubernetes_endpoint_port_name": "testport",
"__meta_kubernetes_endpoint_port_protocol": "TCP", "__meta_kubernetes_endpoint_port_protocol": "TCP",

View File

@ -59,9 +59,9 @@ func init() {
} }
} }
// Kubernetes implements the TargetProvider interface for discovering // Discovery implements the TargetProvider interface for discovering
// targets from Kubernetes. // targets from Kubernetes.
type Kubernetes struct { type Discovery struct {
client kubernetes.Interface client kubernetes.Interface
role config.KubernetesRole role config.KubernetesRole
logger log.Logger logger log.Logger
@ -76,7 +76,7 @@ func init() {
} }
// New creates a new Kubernetes discovery for the given role. // New creates a new Kubernetes discovery for the given role.
func New(l log.Logger, conf *config.KubernetesSDConfig) (*Kubernetes, error) { func New(l log.Logger, conf *config.KubernetesSDConfig) (*Discovery, error) {
var ( var (
kcfg *rest.Config kcfg *rest.Config
err error err error
@ -136,7 +136,7 @@ func New(l log.Logger, conf *config.KubernetesSDConfig) (*Kubernetes, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Kubernetes{ return &Discovery{
client: c, client: c,
logger: l, logger: l,
role: conf.Role, role: conf.Role,
@ -146,16 +146,16 @@ func New(l log.Logger, conf *config.KubernetesSDConfig) (*Kubernetes, error) {
const resyncPeriod = 10 * time.Minute const resyncPeriod = 10 * time.Minute
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
rclient := k.client.Core().GetRESTClient() rclient := d.client.Core().GetRESTClient()
switch k.role { switch d.role {
case "endpoints": case "endpoints":
elw := cache.NewListWatchFromClient(rclient, "endpoints", api.NamespaceAll, nil) elw := cache.NewListWatchFromClient(rclient, "endpoints", api.NamespaceAll, nil)
slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil) slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil)
plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil)
eps := NewEndpoints( eps := NewEndpoints(
k.logger.With("kubernetes_sd", "endpoint"), d.logger.With("kubernetes_sd", "endpoint"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod), cache.NewSharedInformer(elw, &apiv1.Endpoints{}, resyncPeriod),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
@ -178,7 +178,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case "pod": case "pod":
plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil) plw := cache.NewListWatchFromClient(rclient, "pods", api.NamespaceAll, nil)
pod := NewPod( pod := NewPod(
k.logger.With("kubernetes_sd", "pod"), d.logger.With("kubernetes_sd", "pod"),
cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod), cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncPeriod),
) )
go pod.informer.Run(ctx.Done()) go pod.informer.Run(ctx.Done())
@ -191,7 +191,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case "service": case "service":
slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil) slw := cache.NewListWatchFromClient(rclient, "services", api.NamespaceAll, nil)
svc := NewService( svc := NewService(
k.logger.With("kubernetes_sd", "service"), d.logger.With("kubernetes_sd", "service"),
cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod), cache.NewSharedInformer(slw, &apiv1.Service{}, resyncPeriod),
) )
go svc.informer.Run(ctx.Done()) go svc.informer.Run(ctx.Done())
@ -204,7 +204,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
case "node": case "node":
nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil) nlw := cache.NewListWatchFromClient(rclient, "nodes", api.NamespaceAll, nil)
node := NewNode( node := NewNode(
k.logger.With("kubernetes_sd", "node"), d.logger.With("kubernetes_sd", "node"),
cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod), cache.NewSharedInformer(nlw, &apiv1.Node{}, resyncPeriod),
) )
go node.informer.Run(ctx.Done()) go node.informer.Run(ctx.Done())
@ -215,7 +215,7 @@ func (k *Kubernetes) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
node.Run(ctx, ch) node.Run(ctx, ch)
default: default:
k.logger.Errorf("unknown Kubernetes discovery kind %q", k.role) d.logger.Errorf("unknown Kubernetes discovery kind %q", d.role)
} }
<-ctx.Done() <-ctx.Done()

View File

@ -167,7 +167,7 @@ func makeNode(name, address string, labels map[string]string, annotations map[st
}, },
Status: v1.NodeStatus{ Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{ Addresses: []v1.NodeAddress{
v1.NodeAddress{ {
Type: v1.NodeInternalIP, Type: v1.NodeInternalIP,
Address: address, Address: address,
}, },
@ -197,9 +197,9 @@ func TestNodeDiscoveryInitial(t *testing.T) {
k8sDiscoveryTest{ k8sDiscoveryTest{
discovery: n, discovery: n,
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:10250", "__address__": "1.2.3.4:10250",
"instance": "test", "instance": "test",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
@ -223,9 +223,9 @@ func TestNodeDiscoveryAdd(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Add(makeEnumeratedNode(1)) }() }, afterStart: func() { go func() { i.Add(makeEnumeratedNode(1)) }() },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:10250", "__address__": "1.2.3.4:10250",
"instance": "test1", "instance": "test1",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
@ -248,9 +248,9 @@ func TestNodeDiscoveryDelete(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Delete(makeEnumeratedNode(0)) }() }, afterStart: func() { go func() { i.Delete(makeEnumeratedNode(0)) }() },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:10250", "__address__": "1.2.3.4:10250",
"instance": "test0", "instance": "test0",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
@ -263,7 +263,7 @@ func TestNodeDiscoveryDelete(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Source: "node/test0", Source: "node/test0",
}, },
}, },
@ -278,9 +278,9 @@ func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeEnumeratedNode(0)}) }() }, afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeEnumeratedNode(0)}) }() },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:10250", "__address__": "1.2.3.4:10250",
"instance": "test0", "instance": "test0",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
@ -293,7 +293,7 @@ func TestNodeDiscoveryDeleteUnknownCacheState(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Source: "node/test0", Source: "node/test0",
}, },
}, },
@ -319,9 +319,9 @@ func TestNodeDiscoveryUpdate(t *testing.T) {
}() }()
}, },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:10250", "__address__": "1.2.3.4:10250",
"instance": "test0", "instance": "test0",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4",
@ -334,9 +334,9 @@ func TestNodeDiscoveryUpdate(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:10250", "__address__": "1.2.3.4:10250",
"instance": "test0", "instance": "test0",
"__meta_kubernetes_node_address_InternalIP": "1.2.3.4", "__meta_kubernetes_node_address_InternalIP": "1.2.3.4",

View File

@ -47,22 +47,22 @@ func makeMultiPortPod() *v1.Pod {
Spec: v1.PodSpec{ Spec: v1.PodSpec{
NodeName: "testnode", NodeName: "testnode",
Containers: []v1.Container{ Containers: []v1.Container{
v1.Container{ {
Name: "testcontainer0", Name: "testcontainer0",
Ports: []v1.ContainerPort{ Ports: []v1.ContainerPort{
v1.ContainerPort{ {
Name: "testport0", Name: "testport0",
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
ContainerPort: int32(9000), ContainerPort: int32(9000),
}, },
v1.ContainerPort{ {
Name: "testport1", Name: "testport1",
Protocol: v1.ProtocolUDP, Protocol: v1.ProtocolUDP,
ContainerPort: int32(9001), ContainerPort: int32(9001),
}, },
}, },
}, },
v1.Container{ {
Name: "testcontainer1", Name: "testcontainer1",
}, },
}, },
@ -71,7 +71,7 @@ func makeMultiPortPod() *v1.Pod {
PodIP: "1.2.3.4", PodIP: "1.2.3.4",
HostIP: "2.3.4.5", HostIP: "2.3.4.5",
Conditions: []v1.PodCondition{ Conditions: []v1.PodCondition{
v1.PodCondition{ {
Type: v1.PodReady, Type: v1.PodReady,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
}, },
@ -89,10 +89,10 @@ func makePod() *v1.Pod {
Spec: v1.PodSpec{ Spec: v1.PodSpec{
NodeName: "testnode", NodeName: "testnode",
Containers: []v1.Container{ Containers: []v1.Container{
v1.Container{ {
Name: "testcontainer", Name: "testcontainer",
Ports: []v1.ContainerPort{ Ports: []v1.ContainerPort{
v1.ContainerPort{ {
Name: "testport", Name: "testport",
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
ContainerPort: int32(9000), ContainerPort: int32(9000),
@ -105,7 +105,7 @@ func makePod() *v1.Pod {
PodIP: "1.2.3.4", PodIP: "1.2.3.4",
HostIP: "2.3.4.5", HostIP: "2.3.4.5",
Conditions: []v1.PodCondition{ Conditions: []v1.PodCondition{
v1.PodCondition{ {
Type: v1.PodReady, Type: v1.PodReady,
Status: v1.ConditionTrue, Status: v1.ConditionTrue,
}, },
@ -121,23 +121,23 @@ func TestPodDiscoveryInitial(t *testing.T) {
k8sDiscoveryTest{ k8sDiscoveryTest{
discovery: n, discovery: n,
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:9000", "__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer0", "__meta_kubernetes_pod_container_name": "testcontainer0",
"__meta_kubernetes_pod_container_port_name": "testport0", "__meta_kubernetes_pod_container_port_name": "testport0",
"__meta_kubernetes_pod_container_port_number": "9000", "__meta_kubernetes_pod_container_port_number": "9000",
"__meta_kubernetes_pod_container_port_protocol": "TCP", "__meta_kubernetes_pod_container_port_protocol": "TCP",
}, },
model.LabelSet{ {
"__address__": "1.2.3.4:9001", "__address__": "1.2.3.4:9001",
"__meta_kubernetes_pod_container_name": "testcontainer0", "__meta_kubernetes_pod_container_name": "testcontainer0",
"__meta_kubernetes_pod_container_port_name": "testport1", "__meta_kubernetes_pod_container_port_name": "testport1",
"__meta_kubernetes_pod_container_port_number": "9001", "__meta_kubernetes_pod_container_port_number": "9001",
"__meta_kubernetes_pod_container_port_protocol": "UDP", "__meta_kubernetes_pod_container_port_protocol": "UDP",
}, },
model.LabelSet{ {
"__address__": "1.2.3.4", "__address__": "1.2.3.4",
"__meta_kubernetes_pod_container_name": "testcontainer1", "__meta_kubernetes_pod_container_name": "testcontainer1",
}, },
@ -165,9 +165,9 @@ func TestPodDiscoveryAdd(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Add(makePod()) }() }, afterStart: func() { go func() { i.Add(makePod()) }() },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:9000", "__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport", "__meta_kubernetes_pod_container_port_name": "testport",
@ -197,9 +197,9 @@ func TestPodDiscoveryDelete(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Delete(makePod()) }() }, afterStart: func() { go func() { i.Delete(makePod()) }() },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:9000", "__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport", "__meta_kubernetes_pod_container_port_name": "testport",
@ -219,7 +219,7 @@ func TestPodDiscoveryDelete(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Source: "pod/default/testpod", Source: "pod/default/testpod",
}, },
}, },
@ -234,9 +234,9 @@ func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makePod()}) }() }, afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makePod()}) }() },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:9000", "__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport", "__meta_kubernetes_pod_container_port_name": "testport",
@ -256,7 +256,7 @@ func TestPodDiscoveryDeleteUnknownCacheState(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Source: "pod/default/testpod", Source: "pod/default/testpod",
}, },
}, },
@ -273,10 +273,10 @@ func TestPodDiscoveryUpdate(t *testing.T) {
Spec: v1.PodSpec{ Spec: v1.PodSpec{
NodeName: "testnode", NodeName: "testnode",
Containers: []v1.Container{ Containers: []v1.Container{
v1.Container{ {
Name: "testcontainer", Name: "testcontainer",
Ports: []v1.ContainerPort{ Ports: []v1.ContainerPort{
v1.ContainerPort{ {
Name: "testport", Name: "testport",
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
ContainerPort: int32(9000), ContainerPort: int32(9000),
@ -295,9 +295,9 @@ func TestPodDiscoveryUpdate(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Update(makePod()) }() }, afterStart: func() { go func() { i.Update(makePod()) }() },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:9000", "__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport", "__meta_kubernetes_pod_container_port_name": "testport",
@ -317,9 +317,9 @@ func TestPodDiscoveryUpdate(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__address__": "1.2.3.4:9000", "__address__": "1.2.3.4:9000",
"__meta_kubernetes_pod_container_name": "testcontainer", "__meta_kubernetes_pod_container_name": "testcontainer",
"__meta_kubernetes_pod_container_port_name": "testport", "__meta_kubernetes_pod_container_port_name": "testport",

View File

@ -47,12 +47,12 @@ func makeMultiPortService() *v1.Service {
}, },
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{ Ports: []v1.ServicePort{
v1.ServicePort{ {
Name: "testport0", Name: "testport0",
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
Port: int32(30900), Port: int32(30900),
}, },
v1.ServicePort{ {
Name: "testport1", Name: "testport1",
Protocol: v1.ProtocolUDP, Protocol: v1.ProtocolUDP,
Port: int32(30901), Port: int32(30901),
@ -70,7 +70,7 @@ func makeSuffixedService(suffix string) *v1.Service {
}, },
Spec: v1.ServiceSpec{ Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{ Ports: []v1.ServicePort{
v1.ServicePort{ {
Name: "testport", Name: "testport",
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
Port: int32(30900), Port: int32(30900),
@ -91,14 +91,14 @@ func TestServiceDiscoveryInitial(t *testing.T) {
k8sDiscoveryTest{ k8sDiscoveryTest{
discovery: n, discovery: n,
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__meta_kubernetes_service_port_protocol": "TCP", "__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900", "__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport0", "__meta_kubernetes_service_port_name": "testport0",
}, },
model.LabelSet{ {
"__meta_kubernetes_service_port_protocol": "UDP", "__meta_kubernetes_service_port_protocol": "UDP",
"__address__": "testservice.default.svc:30901", "__address__": "testservice.default.svc:30901",
"__meta_kubernetes_service_port_name": "testport1", "__meta_kubernetes_service_port_name": "testport1",
@ -123,9 +123,9 @@ func TestServiceDiscoveryAdd(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Add(makeService()) }() }, afterStart: func() { go func() { i.Add(makeService()) }() },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__meta_kubernetes_service_port_protocol": "TCP", "__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900", "__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport", "__meta_kubernetes_service_port_name": "testport",
@ -149,9 +149,9 @@ func TestServiceDiscoveryDelete(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Delete(makeService()) }() }, afterStart: func() { go func() { i.Delete(makeService()) }() },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__meta_kubernetes_service_port_protocol": "TCP", "__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900", "__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport", "__meta_kubernetes_service_port_name": "testport",
@ -165,7 +165,7 @@ func TestServiceDiscoveryDelete(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Source: "svc/default/testservice", Source: "svc/default/testservice",
}, },
}, },
@ -180,9 +180,9 @@ func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeService()}) }() }, afterStart: func() { go func() { i.Delete(cache.DeletedFinalStateUnknown{Obj: makeService()}) }() },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__meta_kubernetes_service_port_protocol": "TCP", "__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900", "__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport", "__meta_kubernetes_service_port_name": "testport",
@ -196,7 +196,7 @@ func TestServiceDiscoveryDeleteUnknownCacheState(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Source: "svc/default/testservice", Source: "svc/default/testservice",
}, },
}, },
@ -211,9 +211,9 @@ func TestServiceDiscoveryUpdate(t *testing.T) {
discovery: n, discovery: n,
afterStart: func() { go func() { i.Update(makeMultiPortService()) }() }, afterStart: func() { go func() { i.Update(makeMultiPortService()) }() },
expectedInitial: []*config.TargetGroup{ expectedInitial: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__meta_kubernetes_service_port_protocol": "TCP", "__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900", "__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport", "__meta_kubernetes_service_port_name": "testport",
@ -227,14 +227,14 @@ func TestServiceDiscoveryUpdate(t *testing.T) {
}, },
}, },
expectedRes: []*config.TargetGroup{ expectedRes: []*config.TargetGroup{
&config.TargetGroup{ {
Targets: []model.LabelSet{ Targets: []model.LabelSet{
model.LabelSet{ {
"__meta_kubernetes_service_port_protocol": "TCP", "__meta_kubernetes_service_port_protocol": "TCP",
"__address__": "testservice.default.svc:30900", "__address__": "testservice.default.svc:30900",
"__meta_kubernetes_service_port_name": "testport0", "__meta_kubernetes_service_port_name": "testport0",
}, },
model.LabelSet{ {
"__meta_kubernetes_service_port_protocol": "UDP", "__meta_kubernetes_service_port_protocol": "UDP",
"__address__": "testservice.default.svc:30901", "__address__": "testservice.default.svc:30901",
"__meta_kubernetes_service_port_name": "testport1", "__meta_kubernetes_service_port_name": "testport1",

View File

@ -81,7 +81,7 @@ type Discovery struct {
token string token string
} }
// Initialize sets up the discovery for usage. // NewDiscovery returns a new Marathon Discovery.
func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) {
tls, err := httputil.NewTLSConfig(conf.TLSConfig) tls, err := httputil.NewTLSConfig(conf.TLSConfig)
if err != nil { if err != nil {
@ -114,13 +114,13 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) {
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-time.After(md.refreshInterval): case <-time.After(d.refreshInterval):
err := md.updateServices(ctx, ch) err := d.updateServices(ctx, ch)
if err != nil { if err != nil {
log.Errorf("Error while updating services: %s", err) log.Errorf("Error while updating services: %s", err)
} }
@ -128,7 +128,7 @@ func (md *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
} }
} }
func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.TargetGroup) (err error) { func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*config.TargetGroup) (err error) {
t0 := time.Now() t0 := time.Now()
defer func() { defer func() {
refreshDuration.Observe(time.Since(t0).Seconds()) refreshDuration.Observe(time.Since(t0).Seconds())
@ -137,7 +137,7 @@ func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Tar
} }
}() }()
targetMap, err := md.fetchTargetGroups() targetMap, err := d.fetchTargetGroups()
if err != nil { if err != nil {
return err return err
} }
@ -154,7 +154,7 @@ func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Tar
} }
// Remove services which did disappear. // Remove services which did disappear.
for source := range md.lastRefresh { for source := range d.lastRefresh {
_, ok := targetMap[source] _, ok := targetMap[source]
if !ok { if !ok {
select { select {
@ -166,13 +166,13 @@ func (md *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Tar
} }
} }
md.lastRefresh = targetMap d.lastRefresh = targetMap
return nil return nil
} }
func (md *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) { func (d *Discovery) fetchTargetGroups() (map[string]*config.TargetGroup, error) {
url := RandomAppsURL(md.servers) url := RandomAppsURL(d.servers)
apps, err := md.appsClient(md.client, url, md.token) apps, err := d.appsClient(d.client, url, d.token)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -125,27 +125,19 @@ func TestMarathonSDSendGroup(t *testing.T) {
} }
func TestMarathonSDRemoveApp(t *testing.T) { func TestMarathonSDRemoveApp(t *testing.T) {
var ch = make(chan []*config.TargetGroup) var ch = make(chan []*config.TargetGroup, 1)
md, err := NewDiscovery(&conf) md, err := NewDiscovery(&conf)
if err != nil { if err != nil {
t.Fatalf("%s", err) t.Fatalf("%s", err)
} }
md.appsClient = func(client *http.Client, url, token string) (*AppList, error) { md.appsClient = func(client *http.Client, url, token string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 1), nil return marathonTestAppList(marathonValidLabel, 1), nil
} }
go func() {
up1 := (<-ch)[0]
up2 := (<-ch)[0]
if up2.Source != up1.Source {
t.Fatalf("Source is different: %s", up2)
if len(up2.Targets) > 0 {
t.Fatalf("Got a non-empty target set: %s", up2.Targets)
}
}
}()
if err := md.updateServices(context.Background(), ch); err != nil { if err := md.updateServices(context.Background(), ch); err != nil {
t.Fatalf("Got error on first update: %s", err) t.Fatalf("Got error on first update: %s", err)
} }
up1 := (<-ch)[0]
md.appsClient = func(client *http.Client, url, token string) (*AppList, error) { md.appsClient = func(client *http.Client, url, token string) (*AppList, error) {
return marathonTestAppList(marathonValidLabel, 0), nil return marathonTestAppList(marathonValidLabel, 0), nil
@ -153,6 +145,14 @@ func TestMarathonSDRemoveApp(t *testing.T) {
if err := md.updateServices(context.Background(), ch); err != nil { if err := md.updateServices(context.Background(), ch); err != nil {
t.Fatalf("Got error on second update: %s", err) t.Fatalf("Got error on second update: %s", err)
} }
up2 := (<-ch)[0]
if up2.Source != up1.Source {
t.Fatalf("Source is different: %s", up2)
if len(up2.Targets) > 0 {
t.Fatalf("Got a non-empty target set: %s", up2.Targets)
}
}
} }
func TestMarathonSDRunAndStop(t *testing.T) { func TestMarathonSDRunAndStop(t *testing.T) {
@ -160,6 +160,7 @@ func TestMarathonSDRunAndStop(t *testing.T) {
refreshInterval = model.Duration(time.Millisecond * 10) refreshInterval = model.Duration(time.Millisecond * 10)
conf = config.MarathonSDConfig{Servers: testServers, RefreshInterval: refreshInterval} conf = config.MarathonSDConfig{Servers: testServers, RefreshInterval: refreshInterval}
ch = make(chan []*config.TargetGroup) ch = make(chan []*config.TargetGroup)
doneCh = make(chan error)
) )
md, err := NewDiscovery(&conf) md, err := NewDiscovery(&conf)
if err != nil { if err != nil {
@ -171,21 +172,21 @@ func TestMarathonSDRunAndStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {
for { md.Run(ctx, ch)
select { close(doneCh)
case _, ok := <-ch:
if !ok {
return
}
cancel()
case <-time.After(md.refreshInterval * 3):
cancel()
t.Fatalf("Update took too long.")
}
}
}() }()
md.Run(ctx, ch) timeout := time.After(md.refreshInterval * 3)
for {
select {
case <-ch:
cancel()
case <-doneCh:
return
case <-timeout:
t.Fatalf("Update took too long.")
}
}
} }
func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int) *AppList { func marathonTestZeroTaskPortAppList(labels map[string]string, runningTasks int) *AppList {

View File

@ -29,7 +29,9 @@ import (
"github.com/prometheus/prometheus/util/treecache" "github.com/prometheus/prometheus/util/treecache"
) )
type ZookeeperDiscovery struct { // Discovery implements the TargetProvider interface for discovering
// targets from Zookeeper.
type Discovery struct {
conn *zk.Conn conn *zk.Conn
sources map[string]*config.TargetGroup sources map[string]*config.TargetGroup
@ -40,13 +42,13 @@ type ZookeeperDiscovery struct {
parse func(data []byte, path string) (model.LabelSet, error) parse func(data []byte, path string) (model.LabelSet, error)
} }
// NewNerveDiscovery returns a new NerveDiscovery for the given config. // NewNerveDiscovery returns a new Discovery for the given Nerve config.
func NewNerveDiscovery(conf *config.NerveSDConfig) *ZookeeperDiscovery { func NewNerveDiscovery(conf *config.NerveSDConfig) *Discovery {
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember)
} }
// NewServersetDiscovery returns a new ServersetDiscovery for the given config. // NewServersetDiscovery returns a new Discovery for the given serverset config.
func NewServersetDiscovery(conf *config.ServersetSDConfig) *ZookeeperDiscovery { func NewServersetDiscovery(conf *config.ServersetSDConfig) *Discovery {
return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember)
} }
@ -57,14 +59,14 @@ func NewDiscovery(
timeout time.Duration, timeout time.Duration,
paths []string, paths []string,
pf func(data []byte, path string) (model.LabelSet, error), pf func(data []byte, path string) (model.LabelSet, error),
) *ZookeeperDiscovery { ) *Discovery {
conn, _, err := zk.Connect(srvs, time.Duration(timeout)) conn, _, err := zk.Connect(srvs, timeout)
conn.SetLogger(treecache.ZookeeperLogger{}) conn.SetLogger(treecache.ZookeeperLogger{})
if err != nil { if err != nil {
return nil return nil
} }
updates := make(chan treecache.ZookeeperTreeCacheEvent) updates := make(chan treecache.ZookeeperTreeCacheEvent)
sd := &ZookeeperDiscovery{ sd := &Discovery{
conn: conn, conn: conn,
updates: updates, updates: updates,
sources: map[string]*config.TargetGroup{}, sources: map[string]*config.TargetGroup{},
@ -77,34 +79,34 @@ func NewDiscovery(
} }
// Run implements the TargetProvider interface. // Run implements the TargetProvider interface.
func (sd *ZookeeperDiscovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
defer func() { defer func() {
for _, tc := range sd.treeCaches { for _, tc := range d.treeCaches {
tc.Stop() tc.Stop()
} }
// Drain event channel in case the treecache leaks goroutines otherwise. // Drain event channel in case the treecache leaks goroutines otherwise.
for range sd.updates { for range d.updates {
} }
sd.conn.Close() d.conn.Close()
}() }()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
case event := <-sd.updates: case event := <-d.updates:
tg := &config.TargetGroup{ tg := &config.TargetGroup{
Source: event.Path, Source: event.Path,
} }
if event.Data != nil { if event.Data != nil {
labelSet, err := sd.parse(*event.Data, event.Path) labelSet, err := d.parse(*event.Data, event.Path)
if err == nil { if err == nil {
tg.Targets = []model.LabelSet{labelSet} tg.Targets = []model.LabelSet{labelSet}
sd.sources[event.Path] = tg d.sources[event.Path] = tg
} else { } else {
delete(sd.sources, event.Path) delete(d.sources, event.Path)
} }
} else { } else {
delete(sd.sources, event.Path) delete(d.sources, event.Path)
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():