diff --git a/config/config.go b/config/config.go index b23fff575..a94342d84 100644 --- a/config/config.go +++ b/config/config.go @@ -1195,6 +1195,7 @@ type RemoteWriteConfig struct { Name string `yaml:"name,omitempty"` SendExemplars bool `yaml:"send_exemplars,omitempty"` SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"` + RoundRobinDNS bool `yaml:"round_robin_dns,omitempty"` // ProtobufMessage specifies the protobuf message to use against the remote // receiver as specified in https://prometheus.io/docs/specs/remote_write_spec_2_0/ ProtobufMessage RemoteWriteProtoMsg `yaml:"protobuf_message,omitempty"` diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 2093ed883..2a64d8c38 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -2797,6 +2797,12 @@ write_relabel_configs: # For the `io.prometheus.write.v2.Request` message, this option is noop (always true). [ send_native_histograms: | default = false ] +# When enabled, remote-write will resolve the URL host name via DNS, choose one of the IP addresses at random, and connect to it. +# When disabled, remote-write relies on Go's standard behavior, which is to try to connect to each address in turn. +# The connection timeout applies to the whole operation, i.e. in the latter case it is spread over all attempt. +# This is an experimental feature, and its behavior might still change, or even get removed. +[ round_robin_dns: | default = false ] + # Optionally configures AWS's Signature Verification 4 signing process to # sign requests. Cannot be set at the same time as basic_auth, authorization, oauth2, or azuread. # To use the default credentials from the AWS SDK, use `sigv4: {}`. diff --git a/storage/remote/client.go b/storage/remote/client.go index 98eb8d3d7..ad766be9b 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -145,6 +145,7 @@ type ClientConfig struct { RetryOnRateLimit bool WriteProtoMsg config.RemoteWriteProtoMsg ChunkedReadLimit uint64 + RoundRobinDNS bool } // ReadClient will request the STREAMED_XOR_CHUNKS method of remote read but can @@ -180,7 +181,11 @@ func NewReadClient(name string, conf *ClientConfig) (ReadClient, error) { // NewWriteClient creates a new client for remote write. func NewWriteClient(name string, conf *ClientConfig) (WriteClient, error) { - httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage_write_client") + var httpOpts []config_util.HTTPClientOption + if conf.RoundRobinDNS { + httpOpts = []config_util.HTTPClientOption{config_util.WithDialContextFunc(newDialContextWithRoundRobinDNS().dialContextFn())} + } + httpClient, err := config_util.NewClientFromConfig(conf.HTTPClientConfig, "remote_storage_write_client", httpOpts...) if err != nil { return nil, err } diff --git a/storage/remote/dial_context.go b/storage/remote/dial_context.go new file mode 100644 index 000000000..b842728e4 --- /dev/null +++ b/storage/remote/dial_context.go @@ -0,0 +1,62 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "math/rand" + "net" + "net/http" + "time" + + "github.com/prometheus/common/config" +) + +type hostResolver interface { + LookupHost(context.Context, string) ([]string, error) +} + +type dialContextWithRoundRobinDNS struct { + dialContext config.DialContextFunc + resolver hostResolver + rand *rand.Rand +} + +// newDialContextWithRoundRobinDNS creates a new dialContextWithRoundRobinDNS. +// We discourage creating new instances of struct dialContextWithRoundRobinDNS by explicitly setting its members, +// except for testing purposes, and recommend using newDialContextWithRoundRobinDNS. +func newDialContextWithRoundRobinDNS() *dialContextWithRoundRobinDNS { + return &dialContextWithRoundRobinDNS{ + dialContext: http.DefaultTransport.(*http.Transport).DialContext, + resolver: net.DefaultResolver, + rand: rand.New(rand.NewSource(time.Now().Unix())), + } +} + +func (dc *dialContextWithRoundRobinDNS) dialContextFn() config.DialContextFunc { + return func(ctx context.Context, network, addr string) (net.Conn, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return dc.dialContext(ctx, network, addr) + } + + addrs, err := dc.resolver.LookupHost(ctx, host) + if err != nil || len(addrs) == 0 { + return dc.dialContext(ctx, network, addr) + } + + randomAddr := net.JoinHostPort(addrs[dc.rand.Intn(len(addrs))], port) + return dc.dialContext(ctx, network, randomAddr) + } +} diff --git a/storage/remote/dial_context_test.go b/storage/remote/dial_context_test.go new file mode 100644 index 000000000..d2716df53 --- /dev/null +++ b/storage/remote/dial_context_test.go @@ -0,0 +1,161 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "errors" + "math/rand" + "net" + "sync" + "testing" + "time" + + "github.com/prometheus/common/config" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +const ( + testNetwork = "tcp" + testAddrWithoutPort = "this-is-my-addr.without-port" + testAddrWithPort = "this-is-my-addr.without-port:123" + testPort = "123" + ip1 = "1.2.3.4" + ip2 = "5.6.7.8" + ip3 = "9.0.1.2" + randSeed int64 = 123456789 +) + +var ( + errMockLookupHost = errors.New("this is a mocked error") + testLookupResult = []string{ip1, ip2, ip3} + testLookupResultWithPort = []string{net.JoinHostPort(ip1, testPort), net.JoinHostPort(ip2, testPort), net.JoinHostPort(ip3, testPort)} +) + +type mockDialContext struct { + mock.Mock + + addrFrequencyMu sync.Mutex + addrFrequency map[string]int +} + +func newMockDialContext(acceptableAddresses []string) *mockDialContext { + m := &mockDialContext{ + addrFrequencyMu: sync.Mutex{}, + addrFrequency: make(map[string]int), + } + for _, acceptableAddr := range acceptableAddresses { + m.On("dialContext", mock.Anything, mock.Anything, acceptableAddr).Return(nil, nil) + } + return m +} + +func (dc *mockDialContext) dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + dc.addrFrequencyMu.Lock() + defer dc.addrFrequencyMu.Unlock() + args := dc.MethodCalled("dialContext", ctx, network, addr) + dc.addrFrequency[addr]++ + return nil, args.Error(1) +} + +func (dc *mockDialContext) getCount(addr string) int { + dc.addrFrequencyMu.Lock() + defer dc.addrFrequencyMu.Unlock() + return dc.addrFrequency[addr] +} + +type mockedLookupHost struct { + withErr bool + result []string +} + +func (lh *mockedLookupHost) LookupHost(context.Context, string) ([]string, error) { + if lh.withErr { + return nil, errMockLookupHost + } + return lh.result, nil +} + +func createDialContextWithRoundRobinDNS(dialContext config.DialContextFunc, resolver hostResolver, r *rand.Rand) dialContextWithRoundRobinDNS { + return dialContextWithRoundRobinDNS{ + dialContext: dialContext, + resolver: resolver, + rand: r, + } +} + +func TestDialContextWithRandomConnections(t *testing.T) { + numberOfRuns := 2 * len(testLookupResult) + var mdc *mockDialContext + testCases := map[string]struct { + addr string + setup func() dialContextWithRoundRobinDNS + check func() + }{ + "if address contains no port call default DealContext": { + addr: testAddrWithoutPort, + setup: func() dialContextWithRoundRobinDNS { + mdc = newMockDialContext([]string{testAddrWithoutPort}) + return createDialContextWithRoundRobinDNS(mdc.dialContext, &mockedLookupHost{withErr: false}, rand.New(rand.NewSource(time.Now().Unix()))) + }, + check: func() { + require.Equal(t, numberOfRuns, mdc.getCount(testAddrWithoutPort)) + }, + }, + "if lookup host returns error call default DealContext": { + addr: testAddrWithPort, + setup: func() dialContextWithRoundRobinDNS { + mdc = newMockDialContext([]string{testAddrWithPort}) + return createDialContextWithRoundRobinDNS(mdc.dialContext, &mockedLookupHost{withErr: true}, rand.New(rand.NewSource(time.Now().Unix()))) + }, + check: func() { + require.Equal(t, numberOfRuns, mdc.getCount(testAddrWithPort)) + }, + }, + "if lookup returns no addresses call default DealContext": { + addr: testAddrWithPort, + setup: func() dialContextWithRoundRobinDNS { + mdc = newMockDialContext([]string{testAddrWithPort}) + return createDialContextWithRoundRobinDNS(mdc.dialContext, &mockedLookupHost{}, rand.New(rand.NewSource(time.Now().Unix()))) + }, + check: func() { + require.Equal(t, numberOfRuns, mdc.getCount(testAddrWithPort)) + }, + }, + "if lookup host is successful, shuffle results": { + addr: testAddrWithPort, + setup: func() dialContextWithRoundRobinDNS { + mdc = newMockDialContext(testLookupResultWithPort) + return createDialContextWithRoundRobinDNS(mdc.dialContext, &mockedLookupHost{result: testLookupResult}, rand.New(rand.NewSource(randSeed))) + }, + check: func() { + // we ensure that not all runs will choose the first element of the lookup + require.NotEqual(t, numberOfRuns, mdc.getCount(testLookupResultWithPort[0])) + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + dc := tc.setup() + require.NotNil(t, dc) + for i := 0; i < numberOfRuns; i++ { + _, err := dc.dialContextFn()(context.Background(), testNetwork, tc.addr) + require.NoError(t, err) + } + tc.check() + }) + } +} diff --git a/storage/remote/write.go b/storage/remote/write.go index 639f34452..036309544 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -180,6 +180,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { GoogleIAMConfig: rwConf.GoogleIAMConfig, Headers: rwConf.Headers, RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit, + RoundRobinDNS: rwConf.RoundRobinDNS, }) if err != nil { return err