diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 46bd7e1ff..420a2d11a 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -374,13 +374,17 @@ type QueueManager struct { highestRecvTimestamp *maxTimestamp } -// NewQueueManager builds a new QueueManager. +// NewQueueManager builds a new QueueManager and starts a new +// WAL watcher with queue manager as the WriteTo destination. +// The WAL watcher takes the dir parameter as the base directory +// for where the WAL shall be located. Note that the full path to +// the WAL directory will be constructed as /wal. func NewQueueManager( metrics *queueManagerMetrics, watcherMetrics *wal.WatcherMetrics, readerMetrics *wal.LiveReaderMetrics, logger log.Logger, - walDir string, + dir string, samplesIn *ewmaRate, cfg config.QueueConfig, mCfg config.MetadataConfig, @@ -426,7 +430,7 @@ func NewQueueManager( highestRecvTimestamp: highestRecvTimestamp, } - t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir, enableExemplarRemoteWrite) + t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite) if t.mcfg.Send { t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) } diff --git a/storage/remote/write.go b/storage/remote/write.go index bd330fe8b..a7399ee53 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -55,7 +55,7 @@ type WriteStorage struct { watcherMetrics *wal.WatcherMetrics liveReaderMetrics *wal.LiveReaderMetrics externalLabels labels.Labels - walDir string + dir string queues map[string]*QueueManager samplesIn *ewmaRate flushDeadline time.Duration @@ -68,7 +68,7 @@ type WriteStorage struct { } // NewWriteStorage creates and runs a WriteStorage. -func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage { +func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage { if logger == nil { logger = log.NewNopLogger() } @@ -80,7 +80,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string reg: reg, flushDeadline: flushDeadline, samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), - walDir: walDir, + dir: dir, interner: newPool(), scraper: sm, quit: make(chan struct{}), @@ -175,7 +175,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.watcherMetrics, rws.liveReaderMetrics, rws.logger, - rws.walDir, + rws.dir, rws.samplesIn, rwConf.QueueConfig, rwConf.MetadataConfig, diff --git a/tsdb/wal/watcher.go b/tsdb/wal/watcher.go index cde9cfa5f..7b026b4ea 100644 --- a/tsdb/wal/watcher.go +++ b/tsdb/wal/watcher.go @@ -144,7 +144,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, walDir string, sendExemplars bool) *Watcher { +func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars bool) *Watcher { if logger == nil { logger = log.NewNopLogger() } @@ -153,7 +153,7 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge writer: writer, metrics: metrics, readerMetrics: readerMetrics, - walDir: path.Join(walDir, "wal"), + walDir: path.Join(dir, "wal"), name: name, sendExemplars: sendExemplars,