Browse Source

Merge pull request #32 from matttproud/refactor/wrap-target-as-interface

Refactor Target into Interface to Enhance Testability
pull/35/head
juliusv 12 years ago
parent
commit
a7ed7cae91
  1. 7
      retrieval/interface_test.go
  2. 13
      retrieval/scheduler_test.go
  3. 86
      retrieval/target.go
  4. 16
      retrieval/targetmanager.go
  5. 8
      retrieval/targetpool.go
  6. 42
      retrieval/targetpool_test.go

7
retrieval/interface_test.go

@ -18,6 +18,9 @@ import (
)
func TestInterfaces(t *testing.T) {
var _ scheduler = &healthScheduler{}
var _ healthReporter = Target{}
var (
_ Target = &target{}
_ healthReporter = target{}
_ scheduler = &healthScheduler{}
)
}

13
retrieval/scheduler_test.go

@ -14,6 +14,7 @@
package retrieval
import (
"github.com/matttproud/prometheus/utility/test"
"testing"
"time"
)
@ -44,7 +45,7 @@ func (t *fakeTimeProvider) Now() (time time.Time) {
return
}
func TestHealthScheduler(t *testing.T) {
func testHealthScheduler(t test.Tester) {
now := time.Now()
var scenarios = []struct {
futureHealthState []TargetState
@ -109,3 +110,13 @@ func TestHealthScheduler(t *testing.T) {
}
}
}
func TestHealthScheduler(t *testing.T) {
testHealthScheduler(t)
}
func BenchmarkHealthScheduler(b *testing.B) {
for i := 0; i < b.N; i++ {
testHealthScheduler(b)
}
}

86
retrieval/target.go

@ -23,35 +23,87 @@ import (
"time"
)
// The state of the given Target.
type TargetState int
const (
// The Target has not been seen; we know nothing about it, except that it is
// on our docket for examination.
UNKNOWN TargetState = iota
// The Target has been found and successfully queried.
ALIVE
// The Target was either historically found or not found and then determined
// to be unhealthy by either not responding or disappearing.
UNREACHABLE
)
// A healthReporter is a type that can provide insight into its health state.
//
// It mainly exists for testability reasons to decouple the scheduler behaviors
// from fully-fledged Target and other types.
type healthReporter interface {
// Report the last-known health state for this target.
State() TargetState
}
// A Target represents an endpoint that should be interrogated for metrics.
//
// The protocol described by this type will likely change in future iterations,
// as it offers no good support for aggregated targets and fan out. Thusly,
// it is likely that the current Target and target uses will be
// wrapped with some resolver type.
//
// For the future, the Target protocol will abstract away the exact means that
// metrics are retrieved and deserialized from the given instance to which it
// refers.
type Target interface {
// Retrieve values from this target.
//
// earliest refers to the soonest available opportunity to reschedule the
// target for a future retrieval. It is up to the underlying scheduler type,
// alluded to in the scheduledFor function, to use this as it wants to. The
// current use case is to create a common batching time for scraping multiple
// Targets in the future through the TargetPool.
Scrape(earliest time.Time, results chan Result) error
// Fulfill the healthReporter interface.
State() TargetState
// Report the soonest time at which this Target may be scheduled for
// retrieval. This value needn't convey that the operation occurs at this
// time, but it should occur no sooner than it.
//
// Right now, this is used as the sorting key in TargetPool.
scheduledFor() time.Time
// The address to which the Target corresponds. Out of all of the available
// points in this interface, this one is the best candidate to change given
// the ways to express the endpoint.
Address() string
// How frequently queries occur.
Interval() time.Duration
}
type Target struct {
// target is a Target that refers to a singular HTTP or HTTPS endpoint.
type target struct {
// scheduler provides the scheduling strategy that is used to formulate what
// is returned in Target.scheduledFor.
scheduler scheduler
state TargetState
Address string
Deadline time.Duration
address string
// What is the deadline for the HTTP or HTTPS against this endpoint.
Deadline time.Duration
// Any base labels that are added to this target and its metrics.
BaseLabels model.LabelSet
// XXX: Move this to a field with the target manager initialization instead of here.
Interval time.Duration
interval time.Duration
}
func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) *Target {
target := &Target{
Address: address,
// Furnish a reasonably configured target for querying.
func NewTarget(address string, interval, deadline time.Duration, baseLabels model.LabelSet) Target {
target := &target{
address: address,
Deadline: deadline,
Interval: interval,
interval: interval,
BaseLabels: baseLabels,
}
@ -69,7 +121,7 @@ type Result struct {
Target Target
}
func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
func (t *target) Scrape(earliest time.Time, results chan Result) (err error) {
result := Result{}
defer func() {
@ -92,7 +144,7 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
request := func() {
ti := time.Now()
resp, err := http.Get(t.Address)
resp, err := http.Get(t.Address())
if err != nil {
return
}
@ -110,7 +162,7 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
return
}
baseLabels := map[string]string{"instance": t.Address}
baseLabels := map[string]string{"instance": t.Address()}
for name, v := range intermediate {
asMap, ok := v.(map[string]interface{})
@ -199,10 +251,18 @@ func (t *Target) Scrape(earliest time.Time, results chan Result) (err error) {
return
}
func (t Target) State() TargetState {
func (t target) State() TargetState {
return t.state
}
func (t Target) scheduledFor() time.Time {
func (t target) scheduledFor() time.Time {
return t.scheduler.ScheduledFor()
}
func (t target) Address() string {
return t.address
}
func (t target) Interval() time.Duration {
return t.interval
}

16
retrieval/targetmanager.go

@ -24,8 +24,8 @@ import (
type TargetManager interface {
acquire()
release()
Add(t *Target)
Remove(t *Target)
Add(t Target)
Remove(t Target)
AddTargetsFromConfig(config *config.Config)
}
@ -51,20 +51,20 @@ func (m targetManager) release() {
<-m.requestAllowance
}
func (m targetManager) Add(t *Target) {
targetPool, ok := m.pools[t.Interval]
func (m targetManager) Add(t Target) {
targetPool, ok := m.pools[t.Interval()]
if !ok {
targetPool.manager = m
log.Printf("Pool %s does not exist; creating and starting...", t.Interval)
go targetPool.Run(m.results, t.Interval)
log.Printf("Pool %s does not exist; creating and starting...", t.Interval())
go targetPool.Run(m.results, t.Interval())
}
heap.Push(&targetPool, t)
m.pools[t.Interval] = targetPool
m.pools[t.Interval()] = targetPool
}
func (m targetManager) Remove(t *Target) {
func (m targetManager) Remove(t Target) {
panic("not implemented")
}

8
retrieval/targetpool.go

@ -8,7 +8,7 @@ import (
type TargetPool struct {
done chan bool
targets []*Target
targets []*target
manager TargetManager
}
@ -37,7 +37,7 @@ func (p *TargetPool) Pop() interface{} {
}
func (p *TargetPool) Push(element interface{}) {
p.targets = append(p.targets, element.(*Target))
p.targets = append(p.targets, element.(*target))
}
func (p TargetPool) Swap(i, j int) {
@ -62,7 +62,7 @@ func (p TargetPool) Stop() {
p.done <- true
}
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Target) {
func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *target) {
p.manager.acquire()
defer p.manager.release()
@ -71,7 +71,7 @@ func (p *TargetPool) runSingle(earliest time.Time, results chan Result, t *Targe
func (p *TargetPool) runIteration(results chan Result) {
for i := 0; i < p.Len(); i++ {
target := heap.Pop(p).(*Target)
target := heap.Pop(p).(*target)
if target == nil {
break
}

42
retrieval/targetpool_test.go

@ -15,6 +15,7 @@ package retrieval
import (
"container/heap"
"github.com/matttproud/prometheus/utility/test"
"testing"
"time"
)
@ -28,7 +29,7 @@ func (s literalScheduler) ScheduledFor() time.Time {
func (s literalScheduler) Reschedule(earliest time.Time, future TargetState) {
}
func TestTargetPool(t *testing.T) {
func testTargetPool(t test.Tester) {
type expectation struct {
size int
}
@ -113,25 +114,54 @@ func TestTargetPool(t *testing.T) {
pool := TargetPool{}
for _, input := range scenario.inputs {
target := Target{
Address: input.address,
target := target{
address: input.address,
scheduler: literalScheduler(input.scheduledFor),
}
heap.Push(&pool, &target)
}
targets := []Target{}
if pool.Len() != len(scenario.outputs) {
t.Errorf("%s %d. expected TargetPool size to be %d but was %d", scenario.name, i, len(scenario.outputs), pool.Len())
} else {
for j, output := range scenario.outputs {
target := heap.Pop(&pool).(*Target)
target := heap.Pop(&pool).(Target)
if target.Address != output.address {
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address)
if target.Address() != output.address {
t.Errorf("%s %d.%d. expected Target address to be %s but was %s", scenario.name, i, j, output.address, target.Address())
}
targets = append(targets, target)
}
if pool.Len() != 0 {
t.Errorf("%s %d. expected pool to be empty, had %d", scenario.name, i, pool.Len())
}
if len(targets) != len(scenario.outputs) {
t.Errorf("%s %d. expected to receive %d elements, got %d", scenario.name, i, len(scenario.outputs), len(targets))
}
for _, target := range targets {
heap.Push(&pool, target)
}
if pool.Len() != len(scenario.outputs) {
t.Errorf("%s %d. expected to repopulated with %d elements, got %d", scenario.name, i, len(scenario.outputs), pool.Len())
}
}
}
}
func TestTargetPool(t *testing.T) {
testTargetPool(t)
}
func BenchmarkTargetPool(b *testing.B) {
for i := 0; i < b.N; i++ {
testTargetPool(b)
}
}

Loading…
Cancel
Save