From 2064f32662adcf8cb10f12fe41488225f70390cd Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Tue, 15 Apr 2014 01:02:15 +0200 Subject: [PATCH] Clean up quitting behavior and add quit trigger. The closing of Prometheus now using a sync.Once wrapper to prevent any accidental multiple invocations of it, which could trigger corruption or a race condition. The shutdown process is made more verbose through logging. A not-enabled by default web handler has been provided to trigger a remote shutdown if requested for debugging purposes. Change-Id: If4fee75196bbff1fb1e4a4ef7e1cfa53fef88f2e --- main.go | 57 +++++++++++++++++++++++++++++++++++++++++------------- web/web.go | 19 ++++++++++++++++++ 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/main.go b/main.go index cef288e31..eaccf6bb7 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "flag" "os" "os/signal" + "sync" "syscall" "time" @@ -68,6 +69,8 @@ var ( concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.") printVersion = flag.Bool("version", false, "print version information") + + shutdownTimeout = flag.Duration("shutdownGracePeriod", 0*time.Second, "The amount of time Prometheus gives background services to finish running when shutdown is requested.") ) type prometheus struct { @@ -86,6 +89,8 @@ type prometheus struct { remoteTSDBQueue *remote.TSDBQueueManager curationState metric.CurationStateUpdater + + closeOnce sync.Once } func (p *prometheus) interruptHandler() { @@ -95,7 +100,9 @@ func (p *prometheus) interruptHandler() { <-notifier glog.Warning("Received SIGINT/SIGTERM; Exiting gracefully...") - p.close() + + p.Close() + os.Exit(0) } @@ -166,7 +173,23 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error { return curator.Run(olderThan, clientmodel.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) } +func (p *prometheus) Close() { + p.closeOnce.Do(p.close) +} + func (p *prometheus) close() { + // The "Done" remarks are a misnomer for some subsystems due to lack of + // blocking and synchronization. + glog.Info("Shutdown has been requested; subsytems are closing:") + p.targetManager.Stop() + glog.Info("Remote Target Manager: Done") + p.ruleManager.Stop() + glog.Info("Rule Executor: Done") + + // Stop any currently active curation (deletion or compaction). + close(p.stopBackgroundOperations) + glog.Info("Current Curation Workers: Requested") + // Disallow further curation work. close(p.curationSema) @@ -177,21 +200,27 @@ func (p *prometheus) close() { if p.deletionTimer != nil { p.deletionTimer.Stop() } + glog.Info("Future Curation Workers: Done") - // Stop any currently active curation (deletion or compaction). - close(p.stopBackgroundOperations) + glog.Infof("Waiting %s for background systems to exit and flush before finalizing (DO NOT INTERRUPT THE PROCESS) ...", *shutdownTimeout) + + // Wart: We should have a concrete form of synchronization for this, not a + // hokey sleep statement. + time.Sleep(*shutdownTimeout) - p.ruleManager.Stop() - p.targetManager.Stop() close(p.unwrittenSamples) p.storage.Close() + glog.Info("Local Storage: Done") if p.remoteTSDBQueue != nil { p.remoteTSDBQueue.Close() + glog.Info("Remote Storage: Done") } close(p.notifications) + glog.Info("Sundry Queues: Done") + glog.Info("See you next time!") } func main() { @@ -288,13 +317,6 @@ func main() { Storage: ts, } - webService := &web.WebService{ - StatusHandler: prometheusStatus, - MetricsHandler: metricsService, - DatabasesHandler: databasesHandler, - AlertsHandler: alertsHandler, - } - prometheus := &prometheus{ compactionTimer: compactionTimer, @@ -313,7 +335,16 @@ func main() { storage: ts, remoteTSDBQueue: remoteTSDBQueue, } - defer prometheus.close() + defer prometheus.Close() + + webService := &web.WebService{ + StatusHandler: prometheusStatus, + MetricsHandler: metricsService, + DatabasesHandler: databasesHandler, + AlertsHandler: alertsHandler, + + QuitDelegate: prometheus.Close, + } prometheus.curationSema <- struct{}{} diff --git a/web/web.go b/web/web.go index 914c6d8c4..b4d9530dc 100644 --- a/web/web.go +++ b/web/web.go @@ -38,6 +38,7 @@ var ( listenAddress = flag.String("listenAddress", ":9090", "Address to listen on for web interface.") useLocalAssets = flag.Bool("useLocalAssets", false, "Read assets/templates from file instead of binary.") userAssetsPath = flag.String("userAssets", "", "Path to static asset directory, available at /user") + enableQuit = flag.Bool("web.enableRemoteShutdown", false, "Enable remote service shutdown") ) type WebService struct { @@ -45,6 +46,8 @@ type WebService struct { DatabasesHandler *DatabasesHandler MetricsHandler *api.MetricsService AlertsHandler *AlertsHandler + + QuitDelegate func() } func (w WebService) ServeForever() error { @@ -77,11 +80,27 @@ func (w WebService) ServeForever() error { exp.Handle("/user/", http.StripPrefix("/user/", http.FileServer(http.Dir(*userAssetsPath)))) } + if *enableQuit { + exp.HandleFunc("/-/quit", w.quitHandler) + } + glog.Info("listening on ", *listenAddress) return http.ListenAndServe(*listenAddress, exp.DefaultCoarseMux) } +func (s WebService) quitHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.Header().Add("Allow", "POST") + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + fmt.Fprintf(w, "Requesting termination... Goodbye!") + + s.QuitDelegate() +} + func getLocalTemplate(name string) (*template.Template, error) { return template.ParseFiles( "web/templates/_base.html",