|
|
@ -15,10 +15,13 @@ package remote |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"context" |
|
|
|
|
|
|
|
"crypto/md5" |
|
|
|
|
|
|
|
"encoding/json" |
|
|
|
"sync" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/go-kit/kit/log" |
|
|
|
"github.com/go-kit/kit/log" |
|
|
|
|
|
|
|
"github.com/go-kit/kit/log/level" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
"github.com/prometheus/common/model" |
|
|
|
"github.com/prometheus/common/model" |
|
|
@ -37,6 +40,8 @@ type Storage struct { |
|
|
|
logger log.Logger |
|
|
|
logger log.Logger |
|
|
|
mtx sync.Mutex |
|
|
|
mtx sync.Mutex |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
configHash [16]byte |
|
|
|
|
|
|
|
|
|
|
|
// For writes
|
|
|
|
// For writes
|
|
|
|
walDir string |
|
|
|
walDir string |
|
|
|
queues []*QueueManager |
|
|
|
queues []*QueueManager |
|
|
@ -77,6 +82,19 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { |
|
|
|
s.mtx.Lock() |
|
|
|
s.mtx.Lock() |
|
|
|
defer s.mtx.Unlock() |
|
|
|
defer s.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
hash := md5.Sum(cfgBytes) |
|
|
|
|
|
|
|
if hash == s.configHash { |
|
|
|
|
|
|
|
level.Debug(s.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") |
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
s.configHash = hash |
|
|
|
|
|
|
|
|
|
|
|
// Update write queues
|
|
|
|
// Update write queues
|
|
|
|
newQueues := []*QueueManager{} |
|
|
|
newQueues := []*QueueManager{} |
|
|
|
// TODO: we should only stop & recreate queues which have changes,
|
|
|
|
// TODO: we should only stop & recreate queues which have changes,
|
|
|
|