Browse Source

scrape pool doesn't rely on context as Stop() needs to be blocking to prevent Scrape loops trying to write to a closed TSDB storage.

pull/3362/head
Krasi Georgiev 7 years ago
parent
commit
9c61f0e8a0
  1. 80
      cmd/prometheus/main.go
  2. 1
      discovery/manager.go
  3. 25
      retrieval/manager.go
  4. 14
      retrieval/scrape.go
  5. 4
      retrieval/scrape_test.go

80
cmd/prometheus/main.go

@ -231,16 +231,24 @@ func main() {
cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
var (
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background())
discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager"))
ctxScrape, cancelScrape = context.WithCancel(context.Background())
scrapeManager = retrieval.NewScrapeManager(ctxScrape, log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ctxWeb, cancelWeb = context.WithCancel(context.Background())
webHandler = web.New(log.With(logger, "component", "web"), &cfg.web)
ctxDiscovery, cancelDiscovery = context.WithCancel(context.Background())
ctxRule = context.Background()
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManager = discovery.NewDiscoveryManager(ctxDiscovery, log.With(logger, "component", "discovery manager"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{Appendable: fanoutStorage,
Notifier: notifier,
QueryEngine: queryEngine,
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Logger: log.With(logger, "component", "rule manager"),
})
)
<<<<<<< HEAD
ctx := context.Background()
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
@ -253,6 +261,9 @@ func main() {
})
cfg.web.Context = ctx
=======
cfg.web.Context = ctxWeb
>>>>>>> 95b1dec3... scrape pool doesn't rely on context as Stop() needs to be blocking to prevent Scrape loops trying to write to a closed TSDB storage.
cfg.web.TSDB = localStorage.Get
cfg.web.Storage = fanoutStorage
cfg.web.QueryEngine = queryEngine
@ -274,6 +285,9 @@ func main() {
cfg.web.Flags[f.Name] = f.Value.String()
}
// Depend on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
// Monitor outgoing connections on default transport with conntrack.
http.DefaultTransport.(*http.Transport).DialContext = conntrack.NewDialContextFunc(
conntrack.DialWithTracing(),
@ -310,17 +324,6 @@ func main() {
var g group.Group
{
g.Add(
func() error {
err := discoveryManager.Run()
level.Info(logger).Log("msg", "Discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping discovery manager...")
cancelDiscovery()
},
)
term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
cancel := make(chan struct{})
@ -341,6 +344,34 @@ func main() {
},
)
}
{
g.Add(
func() error {
err := discoveryManager.Run()
level.Info(logger).Log("msg", "Discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping discovery manager...")
cancelDiscovery()
},
)
}
{
g.Add(
func() error {
err := scrapeManager.Run(discoveryManager.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}
{
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
// long and synchronous tsdb init.
@ -482,19 +513,6 @@ func main() {
},
)
}
{
g.Add(
func() error {
err := scrapeManager.Run(discoveryManager.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape manager...")
cancelScrape()
},
)
}
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
}

1
discovery/manager.go

@ -87,7 +87,6 @@ func (m *DiscoveryManager) Run() error {
return m.ctx.Err()
}
}
}
// SyncCh returns a read only channel used by all DiscoveryProviders targetSet updates

25
retrieval/manager.go

@ -14,7 +14,6 @@
package retrieval
import (
"context"
"fmt"
"github.com/go-kit/kit/log"
@ -30,27 +29,27 @@ type Appendable interface {
}
// NewScrapeManager is the ScrapeManager constructor
func NewScrapeManager(ctx context.Context, logger log.Logger, app Appendable) *ScrapeManager {
func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager {
return &ScrapeManager{
ctx: ctx,
append: app,
logger: logger,
actionCh: make(chan func()),
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}),
}
}
// ScrapeManager maintains a set of scrape pools and manages start/stop cicles
// when receiving new target groups form the discovery manager.
type ScrapeManager struct {
ctx context.Context
logger log.Logger
append Appendable
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
actionCh chan func()
graceShut chan struct{}
}
// Run starts background processing to handle target updates and reload the scraping loops.
@ -63,12 +62,20 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*config.TargetGroup) error
f()
case ts := <-tsets:
m.reload(ts)
case <-m.ctx.Done():
return m.ctx.Err()
case <-m.graceShut:
return nil
}
}
}
// Stop cancels all running scrape pools and blocks until all have exited.
func (m *ScrapeManager) Stop() {
for _, sp := range m.scrapePools {
sp.stop()
}
close(m.graceShut)
}
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error {
done := make(chan struct{})
@ -123,10 +130,10 @@ func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error {
return fmt.Errorf("target set '%v' doesn't have valid config", tsetName)
}
// scrape pool doesn't exist so start a new one
// Scrape pool doesn't exist so start a new one.
existing, ok := m.scrapePools[tsetName]
if !ok {
sp := newScrapePool(m.ctx, scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
m.scrapePools[tsetName] = sp
sp.Sync(tgroup)
@ -134,7 +141,7 @@ func (m *ScrapeManager) reload(t map[string][]*config.TargetGroup) error {
existing.Sync(tgroup)
}
// cleanup - check config and cancel the scrape loops if it don't exist in the scrape config
// Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config.
jobs := make(map[string]struct{})
for k := range m.scrapeConfigs {

14
retrieval/scrape.go

@ -117,7 +117,6 @@ func init() {
type scrapePool struct {
appendable Appendable
logger log.Logger
ctx context.Context
mtx sync.RWMutex
config *config.ScrapeConfig
@ -136,7 +135,7 @@ const maxAheadTime = 10 * time.Minute
type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
if logger == nil {
logger = log.NewNopLogger()
}
@ -152,14 +151,15 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
sp := &scrapePool{
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
logger: logger,
}
sp.newLoop = func(t *Target, s scraper) loop {
return newScrapeLoop(sp.ctx, s,
return newScrapeLoop(
context.Background(),
s,
log.With(logger, "target", t),
buffers,
func(l labels.Labels) labels.Labels { return sp.mutateSampleLabels(l, t) },
@ -189,7 +189,6 @@ func (sp *scrapePool) stop() {
delete(sp.loops, fp)
delete(sp.targets, fp)
}
wg.Wait()
}
@ -582,8 +581,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
}
}
func newScrapeLoop(
ctx context.Context,
func newScrapeLoop(ctx context.Context,
sc scraper,
l log.Logger,
buffers *pool.BytesPool,
@ -605,8 +603,8 @@ func newScrapeLoop(
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
ctx: ctx,
l: l,
ctx: ctx,
}
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)

4
retrieval/scrape_test.go

@ -44,7 +44,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp = newScrapePool(context.Background(), cfg, app, nil)
sp = newScrapePool(cfg, app, nil)
)
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
@ -231,7 +231,7 @@ func TestScrapePoolReload(t *testing.T) {
func TestScrapePoolAppender(t *testing.T) {
cfg := &config.ScrapeConfig{}
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app, nil)
sp := newScrapePool(cfg, app, nil)
wrapped := sp.appender()

Loading…
Cancel
Save