Merge pull request #196 from prometheus/fix/timeout-target-scrapes

Target uses HTTP transport with deadlines
pull/203/head
Bernerd Schaefer 12 years ago
commit c98fc8a495

@ -0,0 +1,30 @@
package retrieval
import (
"net"
"net/http"
"time"
)
// NewDeadlineClient returns a new http.Client which will time out long running
// requests.
func NewDeadlineClient(timeout time.Duration) http.Client {
return http.Client{
Transport: &http.Transport{
// We need to disable keepalive, becasue we set a deadline on the
// underlying connection.
DisableKeepAlives: true,
Dial: func(netw, addr string) (c net.Conn, err error) {
start := time.Now()
c, err = net.DialTimeout(netw, addr, timeout)
if err == nil {
c.SetDeadline(start.Add(timeout))
}
return
},
},
}
}

@ -117,6 +117,7 @@ type target struct {
Deadline time.Duration
// Any base labels that are added to this target and its metrics.
baseLabels model.LabelSet
client http.Client
}
// Furnish a reasonably configured target for querying.
@ -125,6 +126,7 @@ func NewTarget(address string, deadline time.Duration, baseLabels model.LabelSet
address: address,
Deadline: deadline,
baseLabels: baseLabels,
client: NewDeadlineClient(deadline),
}
scheduler := &healthScheduler{
@ -162,75 +164,53 @@ func (t *target) recordScrapeHealth(results chan format.Result, timestamp time.T
func (t *target) Scrape(earliest time.Time, results chan format.Result) (err error) {
now := time.Now()
futureState := t.state
if err = t.scrape(now, results); err != nil {
t.recordScrapeHealth(results, now, false)
futureState = UNREACHABLE
} else {
t.recordScrapeHealth(results, now, true)
futureState = ALIVE
}
defer func() {
futureState := t.state
switch err {
case nil:
t.recordScrapeHealth(results, now, true)
futureState = ALIVE
default:
t.recordScrapeHealth(results, now, false)
futureState = UNREACHABLE
}
t.scheduler.Reschedule(earliest, futureState)
t.state = futureState
}()
done := make(chan bool)
go func(start time.Time) {
defer func() {
ms := float64(time.Since(start)) / float64(time.Millisecond)
labels := map[string]string{address: t.Address(), outcome: success}
if err != nil {
labels[outcome] = failure
}
targetOperationLatencies.Add(labels, ms)
targetOperations.Increment(labels)
}()
t.scheduler.Reschedule(earliest, futureState)
t.state = futureState
defer func() {
done <- true
}()
return
}
var resp *http.Response // Don't shadow "err" from the enclosing function.
resp, err = http.Get(t.Address())
func (t *target) scrape(timestamp time.Time, results chan format.Result) (err error) {
defer func(start time.Time) {
ms := float64(time.Since(start)) / float64(time.Millisecond)
labels := map[string]string{address: t.Address(), outcome: success}
if err != nil {
return
labels[outcome] = failure
}
defer resp.Body.Close()
processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header)
if err != nil {
return
}
targetOperationLatencies.Add(labels, ms)
targetOperations.Increment(labels)
}(time.Now())
// XXX: This is a wart; we need to handle this more gracefully down the
// road, especially once we have service discovery support.
baseLabels := model.LabelSet{model.InstanceLabel: model.LabelValue(t.Address())}
for baseLabel, baseValue := range t.baseLabels {
baseLabels[baseLabel] = baseValue
}
resp, err := t.client.Get(t.Address())
if err != nil {
return
}
defer resp.Body.Close()
err = processor.Process(resp.Body, now, baseLabels, results)
if err != nil {
return
}
}(time.Now())
processor, err := format.DefaultRegistry.ProcessorForRequestHeader(resp.Header)
if err != nil {
return
}
select {
case <-done:
break
case <-time.After(t.Deadline):
err = fmt.Errorf("Target %s exceeded %s deadline.", t, t.Deadline)
// XXX: This is a wart; we need to handle this more gracefully down the
// road, especially once we have service discovery support.
baseLabels := model.LabelSet{model.InstanceLabel: model.LabelValue(t.Address())}
for baseLabel, baseValue := range t.baseLabels {
baseLabels[baseLabel] = baseValue
}
return
return processor.Process(resp.Body, timestamp, baseLabels, results)
}
func (t target) State() TargetState {

@ -16,6 +16,8 @@ package retrieval
import (
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/retrieval/format"
"net/http"
"net/http/httptest"
"testing"
"time"
)
@ -63,3 +65,45 @@ func TestTargetRecordScrapeHealth(t *testing.T) {
t.Fatalf("Expected and actual samples not equal. Expected: %v, actual: %v", expected, actual)
}
}
func TestTargetScrapeTimeout(t *testing.T) {
signal := make(chan bool, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-signal
w.Header().Set("X-Prometheus-API-Version", "0.0.1")
w.Write([]byte(`[]`))
}))
defer server.Close()
testTarget := NewTarget(server.URL, 10*time.Millisecond, model.LabelSet{})
results := make(chan format.Result, 1024)
// scrape once without timeout
signal <- true
if err := testTarget.Scrape(time.Now(), results); err != nil {
t.Fatal(err)
}
// let the deadline lapse
time.Sleep(15*time.Millisecond)
// now scrape again
signal <- true
if err := testTarget.Scrape(time.Now(), results); err != nil {
t.Fatal(err)
}
// now timeout
if err := testTarget.Scrape(time.Now(), results); err == nil {
t.Fatal("expected scrape to timeout")
} else {
signal <- true // let handler continue
}
// now scrape again without timeout
signal <- true
if err := testTarget.Scrape(time.Now(), results); err != nil {
t.Fatal(err)
}
}

Loading…
Cancel
Save