Allow replacing job targets via HTTP API.

This roughly comprises the following changes:

- index target pools by job instead of scrape interval
- make targets within a pool exchangable while preserving existing
  health state for targets
- allow exchanging targets via HTTP API (PUT)
- show target lists in /status (experimental, for own debug use)
pull/86/head
Julius Volz 2013-02-22 21:07:35 +01:00
parent fc39a92a06
commit f1fc7d717a
12 changed files with 315 additions and 107 deletions

View File

@ -58,7 +58,7 @@ func (config *Config) AddJob(options map[string]string, targets []Targets) error
if len(targets) == 0 {
return fmt.Errorf("No targets configured for job '%v'", name)
}
job := &JobConfig{
job := JobConfig{
Targets: tmpJobTargets,
}
for option, value := range options {
@ -66,10 +66,20 @@ func (config *Config) AddJob(options map[string]string, targets []Targets) error
return err
}
}
config.Jobs = append(config.Jobs, *job)
config.Jobs = append(config.Jobs, job)
return nil
}
func (config *Config) GetJobByName(name string) (jobConfig *JobConfig) {
for _, job := range config.Jobs {
if job.Name == name {
jobConfig = &job
break
}
}
return
}
func (config *GlobalConfig) SetOption(option string, value string) (err error) {
switch option {
case "scrape_interval":

View File

@ -79,8 +79,12 @@ type Target interface {
// points in this interface, this one is the best candidate to change given
// the ways to express the endpoint.
Address() string
// How frequently queries occur.
Interval() time.Duration
// Return the target's base labels.
BaseLabels() model.LabelSet
// Merge a new externally supplied target definition (e.g. with changed base
// labels) into an old target definition for the same endpoint. Preserve
// remaining information - like health state - from the old target.
Merge(newTarget Target)
}
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
@ -94,19 +98,15 @@ type target struct {
// What is the deadline for the HTTP or HTTPS against this endpoint.
Deadline time.Duration
// Any base labels that are added to this target and its metrics.
BaseLabels model.LabelSet
// XXX: Move this to a field with the target manager initialization instead of here.
interval time.Duration
baseLabels model.LabelSet
}
// Furnish a reasonably configured target for querying.
func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) Target {
func NewTarget(address string, deadline time.Duration, baseLabels model.LabelSet) Target {
target := &target{
address: address,
Deadline: deadline,
interval: interval,
BaseLabels: baseLabels,
baseLabels: baseLabels,
}
scheduler := &healthScheduler{
@ -155,7 +155,7 @@ func (t *target) Scrape(earliest time.Time, results chan format.Result) (err err
// XXX: This is a wart; we need to handle this more gracefully down the
// road, especially once we have service discovery support.
baseLabels := model.LabelSet{instance: model.LabelValue(t.Address())}
for baseLabel, baseValue := range t.BaseLabels {
for baseLabel, baseValue := range t.baseLabels {
baseLabels[baseLabel] = baseValue
}
@ -200,6 +200,16 @@ func (t target) Address() string {
return t.address
}
func (t target) Interval() time.Duration {
return t.interval
func (t target) BaseLabels() model.LabelSet {
return t.baseLabels
}
// Merge a new externally supplied target definition (e.g. with changed base
// labels) into an old target definition for the same endpoint. Preserve
// remaining information - like health state - from the old target.
func (t *target) Merge(newTarget Target) {
if t.Address() != newTarget.Address() {
panic("targets don't refer to the same endpoint")
}
t.baseLabels = newTarget.BaseLabels()
}

View File

@ -14,7 +14,6 @@
package retrieval
import (
"container/heap"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/retrieval/format"
@ -25,14 +24,16 @@ import (
type TargetManager interface {
acquire()
release()
Add(t Target)
AddTarget(job *config.JobConfig, t Target, defaultScrapeInterval time.Duration)
ReplaceTargets(job *config.JobConfig, newTargets []Target, defaultScrapeInterval time.Duration)
Remove(t Target)
AddTargetsFromConfig(config *config.Config)
Pools() map[string]*TargetPool
}
type targetManager struct {
requestAllowance chan bool
pools map[time.Duration]*TargetPool
poolsByJob map[string]*TargetPool
results chan format.Result
}
@ -40,7 +41,7 @@ func NewTargetManager(results chan format.Result, requestAllowance int) TargetMa
return &targetManager{
requestAllowance: make(chan bool, requestAllowance),
results: results,
pools: make(map[time.Duration]*TargetPool),
poolsByJob: make(map[string]*TargetPool),
}
}
@ -52,17 +53,32 @@ func (m *targetManager) release() {
<-m.requestAllowance
}
func (m *targetManager) Add(t Target) {
targetPool, ok := m.pools[t.Interval()]
func (m *targetManager) TargetPoolForJob(job *config.JobConfig, defaultScrapeInterval time.Duration) (targetPool *TargetPool) {
targetPool, ok := m.poolsByJob[job.Name]
if !ok {
targetPool = NewTargetPool(m)
log.Printf("Pool %s does not exist; creating and starting...", t.Interval())
go targetPool.Run(m.results, t.Interval())
}
log.Printf("Pool for job %s does not exist; creating and starting...", job.Name)
heap.Push(targetPool, t)
m.pools[t.Interval()] = targetPool
interval := job.ScrapeInterval
if interval == 0 {
interval = defaultScrapeInterval
}
m.poolsByJob[job.Name] = targetPool
go targetPool.Run(m.results, interval)
}
return
}
func (m *targetManager) AddTarget(job *config.JobConfig, t Target, defaultScrapeInterval time.Duration) {
targetPool := m.TargetPoolForJob(job, defaultScrapeInterval)
targetPool.AddTarget(t)
m.poolsByJob[job.Name] = targetPool
}
func (m *targetManager) ReplaceTargets(job *config.JobConfig, newTargets []Target, defaultScrapeInterval time.Duration) {
targetPool := m.TargetPoolForJob(job, defaultScrapeInterval)
targetPool.replaceTargets(newTargets)
}
func (m targetManager) Remove(t Target) {
@ -79,15 +95,15 @@ func (m *targetManager) AddTargetsFromConfig(config *config.Config) {
baseLabels[label] = value
}
interval := job.ScrapeInterval
if interval == 0 {
interval = config.Global.ScrapeInterval
}
for _, endpoint := range configTargets.Endpoints {
target := NewTarget(endpoint, interval, time.Second*5, baseLabels)
m.Add(target)
target := NewTarget(endpoint, time.Second*5, baseLabels)
m.AddTarget(&job, target, config.Global.ScrapeInterval)
}
}
}
}
// XXX: Not really thread-safe. Only used in /status page for now.
func (m *targetManager) Pools() map[string]*TargetPool {
return m.poolsByJob
}

View File

@ -14,6 +14,8 @@
package retrieval
import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/retrieval/format"
"github.com/prometheus/prometheus/utility/test"
"testing"
@ -31,6 +33,10 @@ func (t fakeTarget) Address() string {
return "fake"
}
func (t fakeTarget) BaseLabels() model.LabelSet {
return model.LabelSet{}
}
func (t fakeTarget) Interval() time.Duration {
return t.interval
}
@ -52,9 +58,17 @@ func (t *fakeTarget) scheduledFor() (time time.Time) {
return
}
func (t *fakeTarget) Merge(newTarget Target) {}
func testTargetManager(t test.Tester) {
results := make(chan format.Result, 5)
targetManager := NewTargetManager(results, 3)
testJob1 := &config.JobConfig{
Name: "test_job1",
}
testJob2 := &config.JobConfig{
Name: "test_job2",
}
target1GroupA := &fakeTarget{
schedules: []time.Time{time.Now()},
@ -65,16 +79,15 @@ func testTargetManager(t test.Tester) {
interval: time.Minute,
}
targetManager.Add(target1GroupA)
targetManager.Add(target2GroupA)
targetManager.AddTarget(testJob1, target1GroupA, 0)
targetManager.AddTarget(testJob1, target2GroupA, 0)
target1GroupB := &fakeTarget{
schedules: []time.Time{time.Now()},
interval: time.Minute * 2,
}
targetManager.Add(target1GroupB)
targetManager.AddTarget(testJob2, target1GroupB, 0)
}
func TestTargetManager(t *testing.T) {

View File

@ -1,9 +1,22 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"container/heap"
"github.com/prometheus/prometheus/retrieval/format"
"log"
"sort"
"time"
)
@ -12,14 +25,18 @@ const (
)
type TargetPool struct {
done chan bool
manager TargetManager
targets []Target
done chan bool
manager TargetManager
targets []Target
addTargetQueue chan Target
replaceTargetsQueue chan []Target
}
func NewTargetPool(m TargetManager) (p *TargetPool) {
return &TargetPool{
manager: m,
manager: m,
addTargetQueue: make(chan Target),
replaceTargetsQueue: make(chan []Target),
}
}
@ -31,20 +48,6 @@ func (p TargetPool) Less(i, j int) bool {
return p.targets[i].scheduledFor().Before(p.targets[j].scheduledFor())
}
func (p *TargetPool) Pop() interface{} {
oldPool := p.targets
futureLength := p.Len() - 1
element := oldPool[futureLength]
futurePool := oldPool[0:futureLength]
p.targets = futurePool
return element
}
func (p *TargetPool) Push(element interface{}) {
p.targets = append(p.targets, element.(Target))
}
func (p TargetPool) Swap(i, j int) {
p.targets[i], p.targets[j] = p.targets[j], p.targets[i]
}
@ -56,6 +59,10 @@ func (p *TargetPool) Run(results chan format.Result, interval time.Duration) {
select {
case <-ticker:
p.runIteration(results, interval)
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
case <-p.done:
log.Printf("TargetPool exiting...")
break
@ -67,6 +74,33 @@ func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) AddTarget(target Target) {
p.addTargetQueue <- target
}
func (p *TargetPool) addTarget(target Target) {
p.targets = append(p.targets, target)
}
func (p *TargetPool) ReplaceTargets(newTargets []Target) {
p.replaceTargetsQueue <- newTargets
}
func (p *TargetPool) replaceTargets(newTargets []Target) {
// Replace old target list by new one, but reuse those targets from the old
// list of targets which are also in the new list (to preserve scheduling and
// health state).
for j, newTarget := range newTargets {
for _, oldTarget := range p.targets {
if oldTarget.Address() == newTarget.Address() {
oldTarget.Merge(newTargets[j])
newTargets[j] = oldTarget
}
}
}
p.targets = newTargets
}
func (p *TargetPool) runSingle(earliest time.Time, results chan format.Result, t Target) {
p.manager.acquire()
defer p.manager.release()
@ -80,33 +114,35 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura
targetCount := p.Len()
finished := make(chan bool, targetCount)
for i := 0; i < targetCount; i++ {
target := heap.Pop(p).(Target)
if target == nil {
break
}
// Sort p.targets by next scheduling time so we can process the earliest
// targets first.
sort.Sort(p)
for _, target := range p.targets {
now := time.Now()
if target.scheduledFor().After(now) {
heap.Push(p, target)
// None of the remaining targets are ready to be scheduled. Signal that
// we're done processing them in this scrape iteration.
for j := i; j < targetCount; j++ {
finished <- true
}
break
finished <- true
continue
}
go func() {
p.runSingle(now, results, target)
heap.Push(p, target)
go func(t Target) {
p.runSingle(now, results, t)
finished <- true
}()
}(target)
}
for i := 0; i < targetCount; i++ {
<-finished
for i := 0; i < targetCount; {
select {
case <-finished:
i++
case newTarget := <-p.addTargetQueue:
p.addTarget(newTarget)
case newTargets := <-p.replaceTargetsQueue:
p.replaceTargets(newTargets)
}
}
close(finished)
@ -114,3 +150,8 @@ func (p *TargetPool) runIteration(results chan format.Result, interval time.Dura
duration := float64(time.Since(begin) / time.Millisecond)
retrievalDurations.Add(map[string]string{intervalKey: interval.String()}, duration)
}
// XXX: Not really thread-safe. Only used in /status page for now.
func (p *TargetPool) Targets() []Target {
return p.targets
}

View File

@ -14,9 +14,9 @@
package retrieval
import (
"container/heap"
"github.com/prometheus/prometheus/retrieval/format"
"github.com/prometheus/prometheus/utility/test"
"sort"
"testing"
"time"
)
@ -111,34 +111,20 @@ func testTargetPool(t test.Tester) {
scheduler: literalScheduler(input.scheduledFor),
}
heap.Push(&pool, &target)
pool.addTarget(&target)
}
targets := []Target{}
sort.Sort(pool)
if pool.Len() != len(scenario.outputs) {
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len())
} else {
for j, output := range scenario.outputs {
target := heap.Pop(&pool).(Target)
target := pool.targets[j]
if target.Address() != output.address {
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address())
}
targets = append(targets, target)
}
if pool.Len() != 0 {
t.Errorf("%s %d. expected pool to be empty, had %d", scenario.name, i, pool.Len())
}
if len(targets) != len(scenario.outputs) {
t.Errorf("%s %d. expected to receive %d elements, got %d", scenario.name, i, len(scenario.outputs), len(targets))
}
for _, target := range targets {
heap.Push(&pool, target)
}
if pool.Len() != len(scenario.outputs) {
@ -158,7 +144,7 @@ func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) {
address: "http://example.com/metrics.json",
scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)),
}
pool.Push(target)
pool.addTarget(target)
done := make(chan bool)
go func() {
@ -174,6 +160,49 @@ func TestTargetPoolIterationWithUnhealthyTargetsFinishes(t *testing.T) {
}
}
func TestTargetPoolReplaceTargets(t *testing.T) {
pool := TargetPool{}
oldTarget1 := &target{
address: "http://example1.com/metrics.json",
scheduler: literalScheduler(time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)),
state: UNREACHABLE,
}
oldTarget2 := &target{
address: "http://example2.com/metrics.json",
scheduler: literalScheduler(time.Date(7500, 1, 1, 0, 0, 0, 0, time.UTC)),
state: UNREACHABLE,
}
newTarget1 := &target{
address: "http://example1.com/metrics.json",
scheduler: literalScheduler(time.Date(5000, 1, 1, 0, 0, 0, 0, time.UTC)),
state: ALIVE,
}
newTarget2 := &target{
address: "http://example3.com/metrics.json",
scheduler: literalScheduler(time.Date(2500, 1, 1, 0, 0, 0, 0, time.UTC)),
state: ALIVE,
}
pool.addTarget(oldTarget1)
pool.addTarget(oldTarget2)
pool.replaceTargets([]Target{newTarget1, newTarget2})
sort.Sort(pool)
if pool.Len() != 2 {
t.Errorf("Expected 2 elements in pool, had %d", pool.Len())
}
target1 := pool.targets[0].(*target)
if target1.state != newTarget1.state {
t.Errorf("Wrong first target returned from pool, expected %v, got %v", newTarget2, target1)
}
target2 := pool.targets[1].(*target)
if target2.state != oldTarget1.state {
t.Errorf("Wrong second target returned from pool, expected %v, got %v", oldTarget1, target2)
}
}
func BenchmarkTargetPool(b *testing.B) {
for i := 0; i < b.N; i++ {
testTargetPool(b)

View File

@ -1,8 +1,21 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"code.google.com/p/gorest"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/appstate"
"github.com/prometheus/prometheus/utility"
)
@ -13,12 +26,14 @@ type MetricsService struct {
queryRange gorest.EndPoint `method:"GET" path:"/query_range?{expr:string}&{end:int64}&{range:int64}&{step:int64}" output:"string"`
metrics gorest.EndPoint `method:"GET" path:"/metrics" output:"string"`
persistence metric.MetricPersistence
time utility.Time
setTargets gorest.EndPoint `method:"PUT" path:"/jobs/{jobName:string}/targets" postdata:"[]TargetGroup"`
appState *appstate.ApplicationState
time utility.Time
}
func NewMetricsService(persistence metric.MetricPersistence) *MetricsService {
func NewMetricsService(appState *appstate.ApplicationState) *MetricsService {
return &MetricsService{
persistence: persistence,
appState: appState,
}
}

View File

@ -1,3 +1,16 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
@ -70,7 +83,7 @@ func (serv MetricsService) QueryRange(Expr string, End int64, Range int64, Step
}
func (serv MetricsService) Metrics() string {
metricNames, err := serv.persistence.GetAllMetricNames()
metricNames, err := serv.appState.Persistence.GetAllMetricNames()
rb := serv.ResponseBuilder()
rb.SetContentType(gorest.Application_Json)
if err != nil {

48
web/api/targets.go Normal file
View File

@ -0,0 +1,48 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/retrieval"
"net/http"
"time"
)
type TargetGroup struct {
Endpoints []string `json:"endpoints"`
BaseLabels map[string]string `json:"baseLabels"`
}
func (serv MetricsService) SetTargets(targetGroups []TargetGroup, jobName string) {
if job := serv.appState.Config.GetJobByName(jobName); job == nil {
rb := serv.ResponseBuilder()
rb.SetResponseCode(http.StatusNotFound)
} else {
newTargets := []retrieval.Target{}
for _, targetGroup := range targetGroups {
// Do mandatory map type conversion due to Go shortcomings.
baseLabels := model.LabelSet{}
for label, value := range targetGroup.BaseLabels {
baseLabels[model.LabelName(label)] = model.LabelValue(value)
}
for _, endpoint := range targetGroup.Endpoints {
newTarget := retrieval.NewTarget(endpoint, time.Second*5, baseLabels)
newTargets = append(newTargets, newTarget)
}
}
serv.appState.TargetManager.ReplaceTargets(job, newTargets, serv.appState.Config.Global.ScrapeInterval)
}
}

View File

@ -15,15 +15,16 @@ package web
import (
"github.com/prometheus/prometheus/appstate"
"github.com/prometheus/prometheus/retrieval"
"html/template"
"net/http"
)
type PrometheusStatus struct {
Config string
Rules string
Status string
Targets string
Config string
Rules string
Status string
TargetPools map[string]*retrieval.TargetPool
}
type StatusHandler struct {
@ -32,10 +33,10 @@ type StatusHandler struct {
func (h *StatusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
status := &PrometheusStatus{
Config: h.appState.Config.ToString(0),
Rules: "TODO: list rules here",
Status: "TODO: add status information here",
Targets: "TODO: list targets here",
Config: h.appState.Config.ToString(0),
Rules: "TODO: list rules here",
Status: "TODO: add status information here",
TargetPools: h.appState.TargetManager.Pools(),
}
t, _ := template.ParseFiles("web/templates/status.html")
t.Execute(w, status)

View File

@ -26,8 +26,20 @@
</div>
<h2>Targets</h2>
<div class="grouping_box">
{{.Targets}}
<div class="grouping_box">
<ul>
{{range $job, $pool := .TargetPools}}
<li>{{$job}}
<ul>
{{range $pool.Targets}}
<li>
<a href="{{.Address}}">{{.Address}}</a> (State: {{.State}}, Base labels: {{.BaseLabels}})
</li>
{{end}}
</ul>
</li>
{{end}}
</ul>
</div>
</body>
</html>

View File

@ -29,7 +29,7 @@ var (
)
func StartServing(appState *appstate.ApplicationState) {
gorest.RegisterService(api.NewMetricsService(appState.Persistence))
gorest.RegisterService(api.NewMetricsService(appState))
exporter := registry.DefaultRegistry.YieldExporter()
http.Handle("/status", &StatusHandler{appState: appState})