// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"io/ioutil"
"net/url"
"os"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
common_config "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/util/testutil"
)
var cfg = config . RemoteWriteConfig {
Name : "dev" ,
URL : & common_config . URL {
URL : & url . URL {
Scheme : "http" ,
Host : "localhost" ,
} ,
} ,
QueueConfig : config . DefaultQueueConfig ,
}
func TestNoDuplicateWriteConfigs ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "TestNoDuplicateWriteConfigs" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
cfg1 := config . RemoteWriteConfig {
Name : "write-1" ,
URL : & common_config . URL {
URL : & url . URL {
Scheme : "http" ,
Host : "localhost" ,
} ,
} ,
QueueConfig : config . DefaultQueueConfig ,
}
cfg2 := config . RemoteWriteConfig {
Name : "write-2" ,
URL : & common_config . URL {
URL : & url . URL {
Scheme : "http" ,
Host : "localhost" ,
} ,
} ,
QueueConfig : config . DefaultQueueConfig ,
}
cfg3 := config . RemoteWriteConfig {
URL : & common_config . URL {
URL : & url . URL {
Scheme : "http" ,
Host : "localhost" ,
} ,
} ,
QueueConfig : config . DefaultQueueConfig ,
}
type testcase struct {
cfgs [ ] * config . RemoteWriteConfig
err bool
}
cases := [ ] testcase {
{ // Two duplicates, we should get an error.
cfgs : [ ] * config . RemoteWriteConfig {
& cfg1 ,
& cfg1 ,
} ,
err : true ,
} ,
{ // Duplicates but with different names, we should not get an error.
cfgs : [ ] * config . RemoteWriteConfig {
& cfg1 ,
& cfg2 ,
} ,
err : false ,
} ,
{ // Duplicates but one with no name, we should not get an error.
cfgs : [ ] * config . RemoteWriteConfig {
& cfg1 ,
& cfg3 ,
} ,
err : false ,
} ,
{ // Duplicates both with no name, we should get an error.
cfgs : [ ] * config . RemoteWriteConfig {
& cfg3 ,
& cfg3 ,
} ,
err : true ,
} ,
}
for _ , tc := range cases {
s := NewWriteStorage ( nil , nil , dir , time . Millisecond )
conf := & config . Config {
GlobalConfig : config . DefaultGlobalConfig ,
RemoteWriteConfigs : tc . cfgs ,
}
err := s . ApplyConfig ( conf )
gotError := err != nil
testutil . Equals ( t , tc . err , gotError )
err = s . Close ( )
testutil . Ok ( t , err )
}
}
func TestRestartOnNameChange ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "TestRestartOnNameChange" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
hash , err := toHash ( cfg )
testutil . Ok ( t , err )
s := NewWriteStorage ( nil , nil , dir , time . Millisecond )
conf := & config . Config {
GlobalConfig : config . DefaultGlobalConfig ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig {
& cfg ,
} ,
}
testutil . Ok ( t , s . ApplyConfig ( conf ) )
testutil . Equals ( t , s . queues [ hash ] . client ( ) . Name ( ) , cfg . Name )
// Change the queues name, ensure the queue has been restarted.
conf . RemoteWriteConfigs [ 0 ] . Name = "dev-2"
testutil . Ok ( t , s . ApplyConfig ( conf ) )
hash , err = toHash ( cfg )
testutil . Ok ( t , err )
testutil . Equals ( t , s . queues [ hash ] . client ( ) . Name ( ) , conf . RemoteWriteConfigs [ 0 ] . Name )
err = s . Close ( )
testutil . Ok ( t , err )
}
func TestUpdateWithRegisterer ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "TestRestartWithRegisterer" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
s := NewWriteStorage ( nil , prometheus . NewRegistry ( ) , dir , time . Millisecond )
c1 := & config . RemoteWriteConfig {
Name : "named" ,
URL : & common_config . URL {
URL : & url . URL {
Scheme : "http" ,
Host : "localhost" ,
} ,
} ,
QueueConfig : config . DefaultQueueConfig ,
}
c2 := & config . RemoteWriteConfig {
URL : & common_config . URL {
URL : & url . URL {
Scheme : "http" ,
Host : "localhost" ,
} ,
} ,
QueueConfig : config . DefaultQueueConfig ,
}
conf := & config . Config {
GlobalConfig : config . DefaultGlobalConfig ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig { c1 , c2 } ,
}
testutil . Ok ( t , s . ApplyConfig ( conf ) )
c1 . QueueConfig . MaxShards = 10
c2 . QueueConfig . MaxShards = 10
testutil . Ok ( t , s . ApplyConfig ( conf ) )
for _ , queue := range s . queues {
testutil . Equals ( t , 10 , queue . cfg . MaxShards )
}
err = s . Close ( )
testutil . Ok ( t , err )
}
func TestWriteStorageLifecycle ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "TestWriteStorageLifecycle" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
s := NewWriteStorage ( nil , nil , dir , defaultFlushDeadline )
conf := & config . Config {
GlobalConfig : config . DefaultGlobalConfig ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig {
& config . DefaultRemoteWriteConfig ,
} ,
}
s . ApplyConfig ( conf )
testutil . Equals ( t , 1 , len ( s . queues ) )
err = s . Close ( )
testutil . Ok ( t , err )
}
func TestUpdateExternalLabels ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "TestUpdateExternalLabels" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
s := NewWriteStorage ( nil , prometheus . NewRegistry ( ) , dir , time . Second )
externalLabels := labels . FromStrings ( "external" , "true" )
conf := & config . Config {
GlobalConfig : config . GlobalConfig { } ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig {
& cfg ,
} ,
}
hash , err := toHash ( conf . RemoteWriteConfigs [ 0 ] )
testutil . Ok ( t , err )
s . ApplyConfig ( conf )
testutil . Equals ( t , 1 , len ( s . queues ) )
testutil . Equals ( t , labels . Labels ( nil ) , s . queues [ hash ] . externalLabels )
conf . GlobalConfig . ExternalLabels = externalLabels
hash , err = toHash ( conf . RemoteWriteConfigs [ 0 ] )
testutil . Ok ( t , err )
s . ApplyConfig ( conf )
testutil . Equals ( t , 1 , len ( s . queues ) )
testutil . Equals ( t , externalLabels , s . queues [ hash ] . externalLabels )
err = s . Close ( )
testutil . Ok ( t , err )
}
func TestWriteStorageApplyConfigsIdempotent ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "TestWriteStorageApplyConfigsIdempotent" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
s := NewWriteStorage ( nil , nil , dir , defaultFlushDeadline )
conf := & config . Config {
GlobalConfig : config . GlobalConfig { } ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig {
& config . DefaultRemoteWriteConfig ,
} ,
}
// We need to set URL's so that metric creation doesn't panic.
conf . RemoteWriteConfigs [ 0 ] . URL = & common_config . URL {
URL : & url . URL {
Host : "http://test-storage.com" ,
} ,
}
hash , err := toHash ( conf . RemoteWriteConfigs [ 0 ] )
testutil . Ok ( t , err )
s . ApplyConfig ( conf )
testutil . Equals ( t , 1 , len ( s . queues ) )
s . ApplyConfig ( conf )
testutil . Equals ( t , 1 , len ( s . queues ) )
_ , hashExists := s . queues [ hash ]
testutil . Assert ( t , hashExists , "Queue pointer should have remained the same" )
err = s . Close ( )
testutil . Ok ( t , err )
}
func TestWriteStorageApplyConfigsPartialUpdate ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "TestWriteStorageApplyConfigsPartialUpdate" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
s := NewWriteStorage ( nil , nil , dir , defaultFlushDeadline )
c0 := & config . RemoteWriteConfig {
RemoteTimeout : model . Duration ( 10 * time . Second ) ,
QueueConfig : config . DefaultQueueConfig ,
WriteRelabelConfigs : [ ] * relabel . Config {
{
Regex : relabel . MustNewRegexp ( ".+" ) ,
} ,
} ,
}
c1 := & config . RemoteWriteConfig {
RemoteTimeout : model . Duration ( 20 * time . Second ) ,
QueueConfig : config . DefaultQueueConfig ,
HTTPClientConfig : common_config . HTTPClientConfig {
BearerToken : "foo" ,
} ,
}
c2 := & config . RemoteWriteConfig {
RemoteTimeout : model . Duration ( 30 * time . Second ) ,
QueueConfig : config . DefaultQueueConfig ,
}
conf := & config . Config {
GlobalConfig : config . GlobalConfig { } ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig { c0 , c1 , c2 } ,
}
// We need to set URL's so that metric creation doesn't panic.
for i := range conf . RemoteWriteConfigs {
conf . RemoteWriteConfigs [ i ] . URL = & common_config . URL {
URL : & url . URL {
Host : "http://test-storage.com" ,
} ,
}
}
testutil . Ok ( t , s . ApplyConfig ( conf ) )
testutil . Equals ( t , 3 , len ( s . queues ) )
hashes := make ( [ ] string , len ( conf . RemoteWriteConfigs ) )
queues := make ( [ ] * QueueManager , len ( conf . RemoteWriteConfigs ) )
storeHashes := func ( ) {
for i := range conf . RemoteWriteConfigs {
hash , err := toHash ( conf . RemoteWriteConfigs [ i ] )
testutil . Ok ( t , err )
hashes [ i ] = hash
queues [ i ] = s . queues [ hash ]
}
}
storeHashes ( )
// Update c0 and c2.
c0 . WriteRelabelConfigs [ 0 ] = & relabel . Config { Regex : relabel . MustNewRegexp ( "foo" ) }
c2 . RemoteTimeout = model . Duration ( 50 * time . Second )
conf = & config . Config {
GlobalConfig : config . GlobalConfig { } ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig { c0 , c1 , c2 } ,
}
testutil . Ok ( t , s . ApplyConfig ( conf ) )
testutil . Equals ( t , 3 , len ( s . queues ) )
_ , hashExists := s . queues [ hashes [ 0 ] ]
testutil . Assert ( t , ! hashExists , "The queue for the first remote write configuration should have been restarted because the relabel configuration has changed." )
q , hashExists := s . queues [ hashes [ 1 ] ]
testutil . Assert ( t , hashExists , "Hash of unchanged queue should have remained the same" )
testutil . Assert ( t , q == queues [ 1 ] , "Pointer of unchanged queue should have remained the same" )
_ , hashExists = s . queues [ hashes [ 2 ] ]
testutil . Assert ( t , ! hashExists , "The queue for the third remote write configuration should have been restarted because the timeout has changed." )
storeHashes ( )
secondClient := s . queues [ hashes [ 1 ] ] . client ( )
// Update c1.
c1 . HTTPClientConfig . BearerToken = "bar"
err = s . ApplyConfig ( conf )
testutil . Ok ( t , err )
testutil . Equals ( t , 3 , len ( s . queues ) )
_ , hashExists = s . queues [ hashes [ 0 ] ]
testutil . Assert ( t , hashExists , "Pointer of unchanged queue should have remained the same" )
q , hashExists = s . queues [ hashes [ 1 ] ]
testutil . Assert ( t , hashExists , "Hash of queue with secret change should have remained the same" )
testutil . Assert ( t , secondClient != q . client ( ) , "Pointer of a client with a secret change should not be the same" )
_ , hashExists = s . queues [ hashes [ 2 ] ]
testutil . Assert ( t , hashExists , "Pointer of unchanged queue should have remained the same" )
storeHashes ( )
// Delete c0.
conf = & config . Config {
GlobalConfig : config . GlobalConfig { } ,
RemoteWriteConfigs : [ ] * config . RemoteWriteConfig { c1 , c2 } ,
}
s . ApplyConfig ( conf )
testutil . Equals ( t , 2 , len ( s . queues ) )
_ , hashExists = s . queues [ hashes [ 0 ] ]
testutil . Assert ( t , ! hashExists , "If a config is removed, the queue should be stopped and recreated." )
_ , hashExists = s . queues [ hashes [ 1 ] ]
testutil . Assert ( t , hashExists , "Pointer of unchanged queue should have remained the same" )
_ , hashExists = s . queues [ hashes [ 2 ] ]
testutil . Assert ( t , hashExists , "Pointer of unchanged queue should have remained the same" )
err = s . Close ( )
testutil . Ok ( t , err )
}