From 2dbd8231d8172b9aaaeda098a1f5a93ad1ea586b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 28 Jun 2021 18:52:03 -0400 Subject: [PATCH] Merge pull request #10514 from hashicorp/dnephin/actually-enable-streaming streaming: fix not being able to enable streaming --- .changelog/10514.txt | 3 + agent/agent.go | 1 + agent/agent_test.go | 106 ------------------ agent/health_endpoint_test.go | 42 +++++-- agent/http.go | 7 ++ agent/rpcclient/health/health.go | 3 +- agent/rpcclient/health/view.go | 3 +- agent/rpcclient/health/view_test.go | 4 +- agent/streaming_test.go | 1 + agent/structs/protobuf_compat.go | 5 + agent/structs/structs.go | 21 ++++ agent/submatview/materializer.go | 5 + agent/submatview/store.go | 2 +- proto/pbcommon/common.go | 6 + .../content/api-docs/features/blocking.mdx | 3 + 15 files changed, 93 insertions(+), 119 deletions(-) create mode 100644 .changelog/10514.txt diff --git a/.changelog/10514.txt b/.changelog/10514.txt new file mode 100644 index 0000000000..f427fdcc26 --- /dev/null +++ b/.changelog/10514.txt @@ -0,0 +1,3 @@ +```release-note:bug +streaming: fix a bug that was preventing streaming from being enabled. +``` diff --git a/agent/agent.go b/agent/agent.go index 14b474ef08..73d22894c8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -394,6 +394,7 @@ func New(bd BaseDeps) (*Agent, error) { Conn: conn, Logger: bd.Logger.Named("rpcclient.health"), }, + UseStreamingBackend: a.config.UseStreamingBackend, } a.serviceManager = NewServiceManager(&a) diff --git a/agent/agent_test.go b/agent/agent_test.go index 3726edbc83..4982ae39a1 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -18,7 +18,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "testing" "time" @@ -30,7 +29,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "google.golang.org/grpc" "gopkg.in/square/go-jose.v2/jwt" @@ -937,110 +935,6 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) { } } -func TestCacheRateLimit(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - tests := []struct { - // count := number of updates performed (1 every 10ms) - count int - // rateLimit rate limiting of cache - rateLimit float64 - // Minimum number of updates to see from a cache perspective - // We add a value with tolerance to work even on a loaded CI - minUpdates int - }{ - // 250 => we have a test running for at least 2.5s - {250, 0.5, 1}, - {250, 1, 1}, - {300, 2, 2}, - } - for _, currentTest := range tests { - t.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) { - tt := currentTest - t.Parallel() - a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }") - defer a.Shutdown() - testrpc.WaitForTestAgent(t, a.RPC, "dc1") - - cfg := a.config - require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate) - require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst) - cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit) - cfg.Cache.EntryFetchMaxBurst = 1 - a.reloadConfigInternal(cfg) - require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate) - require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst) - var wg sync.WaitGroup - stillProcessing := true - - injectService := func(i int) { - srv := &structs.NodeService{ - Service: "redis", - ID: "redis", - Port: 1024 + i, - Address: fmt.Sprintf("10.0.1.%d", i%255), - } - - err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote) - require.Nil(t, err) - } - - runUpdates := func() { - wg.Add(tt.count) - for i := 0; i < tt.count; i++ { - time.Sleep(10 * time.Millisecond) - injectService(i) - wg.Done() - } - stillProcessing = false - } - - getIndex := func(t *testing.T, oldIndex int) int { - req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil) - require.NoError(t, err) - - resp := httptest.NewRecorder() - a.srv.handler(false).ServeHTTP(resp, req) - // Key doesn't actually exist so we should get 404 - if got, want := resp.Code, http.StatusOK; got != want { - t.Fatalf("bad response code got %d want %d", got, want) - } - index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index")) - require.NoError(t, err) - return index - } - - { - start := time.Now() - injectService(0) - // Get the first index - index := getIndex(t, 0) - require.Greater(t, index, 2) - go runUpdates() - numberOfUpdates := 0 - for stillProcessing { - oldIndex := index - index = getIndex(t, oldIndex) - require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only") - numberOfUpdates++ - } - elapsed := time.Since(start) - qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed) - summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit) - - // We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock - require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary)) - // We must have at least being notified a few times - require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary)) - } - wg.Wait() - }) - } -} - func TestAddServiceIPv4TaggedDefault(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index b36c23fa43..075849eb80 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -739,11 +739,16 @@ func TestHealthServiceNodes(t *testing.T) { func TestHealthServiceNodes_Blocking(t *testing.T) { cases := []struct { - name string - hcl string - grpcMetrics bool + name string + hcl string + grpcMetrics bool + queryBackend string }{ - {name: "no streaming"}, + { + name: "no streaming", + queryBackend: "blocking-query", + hcl: `use_streaming_backend = false`, + }, { name: "streaming", grpcMetrics: true, @@ -751,6 +756,7 @@ func TestHealthServiceNodes_Blocking(t *testing.T) { rpc { enable_streaming = true } use_streaming_backend = true `, + queryBackend: "streaming", }, } @@ -856,6 +862,8 @@ use_streaming_backend = true require.True(t, idx < newIdx, "index should have increased."+ "idx=%d, newIdx=%d", idx, newIdx) + require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) + idx = newIdx checkErrs() @@ -882,6 +890,7 @@ use_streaming_backend = true newIdx := getIndex(t, resp) require.Equal(t, idx, newIdx) + require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) } if tc.grpcMetrics { @@ -905,16 +914,25 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { t.Parallel() tests := []struct { - name string - config string + name string + config string + queryBackend string }{ - {"normal", ""}, - {"cache-with-streaming", ` + { + name: "blocking-query", + config: `use_streaming_backend=false`, + queryBackend: "blocking-query", + }, + { + name: "cache-with-streaming", + config: ` rpc{ enable_streaming=true } use_streaming_backend=true - `}, + `, + queryBackend: "streaming", + }, } for _, tst := range tests { t.Run(tst.name, func(t *testing.T) { @@ -986,6 +1004,8 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { t.Fatalf("bad: %v", obj) } + + require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) }) } } @@ -1511,6 +1531,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { // Should be a cache miss require.Equal(t, "MISS", resp.Header().Get("X-Cache")) + // always a blocking query, because the ingress endpoint does not yet support streaming. + require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend")) })) require.True(t, t.Run("test caching hit", func(t *testing.T) { @@ -1525,6 +1547,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { // Should be a cache HIT now! require.Equal(t, "HIT", resp.Header().Get("X-Cache")) + // always a blocking query, because the ingress endpoint does not yet support streaming. + require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend")) })) } diff --git a/agent/http.go b/agent/http.go index f6a8b448e4..ad2bbe9779 100644 --- a/agent/http.go +++ b/agent/http.go @@ -723,6 +723,13 @@ func setMeta(resp http.ResponseWriter, m structs.QueryMetaCompat) { setLastContact(resp, m.GetLastContact()) setKnownLeader(resp, m.GetKnownLeader()) setConsistency(resp, m.GetConsistencyLevel()) + setQueryBackend(resp, m.GetBackend()) +} + +func setQueryBackend(resp http.ResponseWriter, backend structs.QueryBackend) { + if b := backend.String(); b != "" { + resp.Header().Set("X-Consul-Query-Backend", b) + } } // setCacheMeta sets http response headers to indicate cache status. diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 4cd7b0f4d1..9d20f3caa8 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -42,7 +42,8 @@ func (c *Client) ServiceNodes( if err != nil { return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err } - return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err + meta := cache.ResultMeta{Index: result.Index, Hit: result.Cached} + return *result.Value.(*structs.IndexedCheckServiceNodes), meta, err } out, md, err := c.getServiceNodes(ctx, req) diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index a648686c45..2f31dde210 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -171,7 +171,8 @@ func (s *healthView) Result(index uint64) interface{} { result := structs.IndexedCheckServiceNodes{ Nodes: make(structs.CheckServiceNodes, 0, len(s.state)), QueryMeta: structs.QueryMeta{ - Index: index, + Index: index, + Backend: structs.QueryBackendStreaming, }, } for _, node := range s.state { diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index bdc59ad520..beba03d880 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -101,7 +101,8 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { empty := &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{}, QueryMeta: structs.QueryMeta{ - Index: 1, + Index: 1, + Backend: structs.QueryBackendStreaming, }, } @@ -381,6 +382,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { result := &structs.IndexedCheckServiceNodes{} + result.QueryMeta.Backend = structs.QueryBackendStreaming for _, node := range nodes { result.Nodes = append(result.Nodes, structs.CheckServiceNode{ Node: &structs.Node{Node: node}, diff --git a/agent/streaming_test.go b/agent/streaming_test.go index 0f45ad9ed4..5fa4dd4c0d 100644 --- a/agent/streaming_test.go +++ b/agent/streaming_test.go @@ -30,6 +30,7 @@ func testGRPCStreamingWorking(t *testing.T, config string) { assertIndex(t, resp) require.NotEmpty(t, resp.Header().Get("X-Consul-Index")) + require.Equal(t, "streaming", resp.Header().Get("X-Consul-Query-Backend")) } func TestGRPCWithTLSConfigs(t *testing.T) { diff --git a/agent/structs/protobuf_compat.go b/agent/structs/protobuf_compat.go index 5322288264..667443c9e9 100644 --- a/agent/structs/protobuf_compat.go +++ b/agent/structs/protobuf_compat.go @@ -44,6 +44,7 @@ type QueryMetaCompat interface { SetIndex(uint64) GetConsistencyLevel() string SetConsistencyLevel(string) + GetBackend() QueryBackend } // GetToken helps implement the QueryOptionsCompat interface @@ -269,3 +270,7 @@ func (q *QueryMeta) SetIndex(index uint64) { func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) { q.ConsistencyLevel = consistencyLevel } + +func (q *QueryMeta) GetBackend() QueryBackend { + return q.Backend +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 337a35e2f7..c46395f692 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -314,6 +314,24 @@ func (w *WriteRequest) SetTokenSecret(s string) { w.Token = s } +type QueryBackend int + +const ( + QueryBackendBlocking QueryBackend = iota + QueryBackendStreaming +) + +func (q QueryBackend) String() string { + switch q { + case QueryBackendBlocking: + return "blocking-query" + case QueryBackendStreaming: + return "streaming" + default: + return "" + } +} + // QueryMeta allows a query response to include potentially // useful metadata about a query type QueryMeta struct { @@ -338,6 +356,9 @@ type QueryMeta struct { // When NotModified is true, the response will not contain the result of // the query. NotModified bool + + // Backend used to handle this query, either blocking-query or streaming. + Backend QueryBackend } // RegisterRequest is used for the Catalog.Register endpoint diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 51402987dc..b830689e69 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -215,9 +215,13 @@ func (m *Materializer) notifyUpdateLocked(err error) { m.updateCh = make(chan struct{}) } +// Result returned from the View. type Result struct { Index uint64 Value interface{} + // Cached is true if the requested value was already available locally. If + // the value is false, it indicates that getFromView had to wait for an update, + Cached bool } // getFromView blocks until the index of the View is greater than opts.MinIndex, @@ -237,6 +241,7 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result // haven't loaded a snapshot at all yet which means we should wait for one on // the update chan. if result.Index > 0 && result.Index > minIndex { + result.Cached = true return result, nil } diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 58acf5db33..07363f7403 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -171,7 +171,7 @@ func (s *Store) Notify( u := cache.UpdateEvent{ CorrelationID: correlationID, Result: result.Value, - Meta: cache.ResultMeta{Index: result.Index}, + Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached}, } select { case updateCh <- u: diff --git a/proto/pbcommon/common.go b/proto/pbcommon/common.go index e6e981ccfa..eb396ae58e 100644 --- a/proto/pbcommon/common.go +++ b/proto/pbcommon/common.go @@ -2,6 +2,8 @@ package pbcommon import ( "time" + + "github.com/hashicorp/consul/agent/structs" ) // IsRead is always true for QueryOption @@ -97,6 +99,10 @@ func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) { q.ConsistencyLevel = consistencyLevel } +func (q *QueryMeta) GetBackend() structs.QueryBackend { + return structs.QueryBackend(0) +} + // WriteRequest only applies to writes, always false func (w WriteRequest) IsRead() bool { return false diff --git a/website/content/api-docs/features/blocking.mdx b/website/content/api-docs/features/blocking.mdx index 2b7a9ae860..4087c64d55 100644 --- a/website/content/api-docs/features/blocking.mdx +++ b/website/content/api-docs/features/blocking.mdx @@ -99,6 +99,9 @@ While streaming is a significant optimization over long polling, it will not pop `X-Consul-LastContact` or `X-Consul-KnownLeader` response headers, because the required data is not available to the client. +When the streaming backend is used, API responses will include the `X-Consul-Query-Backend` +header with a value of `streaming`. + ## Hash-based Blocking Queries