mirror of https://github.com/hashicorp/consul
Merge pull request #10514 from hashicorp/dnephin/actually-enable-streaming
streaming: fix not being able to enable streamingpull/10526/head
parent
256cbe25cd
commit
2dbd8231d8
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
streaming: fix a bug that was preventing streaming from being enabled.
|
||||||
|
```
|
|
@ -394,6 +394,7 @@ func New(bd BaseDeps) (*Agent, error) {
|
||||||
Conn: conn,
|
Conn: conn,
|
||||||
Logger: bd.Logger.Named("rpcclient.health"),
|
Logger: bd.Logger.Named("rpcclient.health"),
|
||||||
},
|
},
|
||||||
|
UseStreamingBackend: a.config.UseStreamingBackend,
|
||||||
}
|
}
|
||||||
|
|
||||||
a.serviceManager = NewServiceManager(&a)
|
a.serviceManager = NewServiceManager(&a)
|
||||||
|
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -30,7 +29,6 @@ import (
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"gopkg.in/square/go-jose.v2/jwt"
|
"gopkg.in/square/go-jose.v2/jwt"
|
||||||
|
|
||||||
|
@ -937,110 +935,6 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCacheRateLimit(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
tests := []struct {
|
|
||||||
// count := number of updates performed (1 every 10ms)
|
|
||||||
count int
|
|
||||||
// rateLimit rate limiting of cache
|
|
||||||
rateLimit float64
|
|
||||||
// Minimum number of updates to see from a cache perspective
|
|
||||||
// We add a value with tolerance to work even on a loaded CI
|
|
||||||
minUpdates int
|
|
||||||
}{
|
|
||||||
// 250 => we have a test running for at least 2.5s
|
|
||||||
{250, 0.5, 1},
|
|
||||||
{250, 1, 1},
|
|
||||||
{300, 2, 2},
|
|
||||||
}
|
|
||||||
for _, currentTest := range tests {
|
|
||||||
t.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
|
|
||||||
tt := currentTest
|
|
||||||
t.Parallel()
|
|
||||||
a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }")
|
|
||||||
defer a.Shutdown()
|
|
||||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
|
||||||
|
|
||||||
cfg := a.config
|
|
||||||
require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate)
|
|
||||||
require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst)
|
|
||||||
cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit)
|
|
||||||
cfg.Cache.EntryFetchMaxBurst = 1
|
|
||||||
a.reloadConfigInternal(cfg)
|
|
||||||
require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate)
|
|
||||||
require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
stillProcessing := true
|
|
||||||
|
|
||||||
injectService := func(i int) {
|
|
||||||
srv := &structs.NodeService{
|
|
||||||
Service: "redis",
|
|
||||||
ID: "redis",
|
|
||||||
Port: 1024 + i,
|
|
||||||
Address: fmt.Sprintf("10.0.1.%d", i%255),
|
|
||||||
}
|
|
||||||
|
|
||||||
err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
|
||||||
require.Nil(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
runUpdates := func() {
|
|
||||||
wg.Add(tt.count)
|
|
||||||
for i := 0; i < tt.count; i++ {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
injectService(i)
|
|
||||||
wg.Done()
|
|
||||||
}
|
|
||||||
stillProcessing = false
|
|
||||||
}
|
|
||||||
|
|
||||||
getIndex := func(t *testing.T, oldIndex int) int {
|
|
||||||
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
resp := httptest.NewRecorder()
|
|
||||||
a.srv.handler(false).ServeHTTP(resp, req)
|
|
||||||
// Key doesn't actually exist so we should get 404
|
|
||||||
if got, want := resp.Code, http.StatusOK; got != want {
|
|
||||||
t.Fatalf("bad response code got %d want %d", got, want)
|
|
||||||
}
|
|
||||||
index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index"))
|
|
||||||
require.NoError(t, err)
|
|
||||||
return index
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
start := time.Now()
|
|
||||||
injectService(0)
|
|
||||||
// Get the first index
|
|
||||||
index := getIndex(t, 0)
|
|
||||||
require.Greater(t, index, 2)
|
|
||||||
go runUpdates()
|
|
||||||
numberOfUpdates := 0
|
|
||||||
for stillProcessing {
|
|
||||||
oldIndex := index
|
|
||||||
index = getIndex(t, oldIndex)
|
|
||||||
require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only")
|
|
||||||
numberOfUpdates++
|
|
||||||
}
|
|
||||||
elapsed := time.Since(start)
|
|
||||||
qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed)
|
|
||||||
summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit)
|
|
||||||
|
|
||||||
// We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock
|
|
||||||
require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary))
|
|
||||||
// We must have at least being notified a few times
|
|
||||||
require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary))
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddServiceIPv4TaggedDefault(t *testing.T) {
|
func TestAddServiceIPv4TaggedDefault(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -739,11 +739,16 @@ func TestHealthServiceNodes(t *testing.T) {
|
||||||
|
|
||||||
func TestHealthServiceNodes_Blocking(t *testing.T) {
|
func TestHealthServiceNodes_Blocking(t *testing.T) {
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
hcl string
|
hcl string
|
||||||
grpcMetrics bool
|
grpcMetrics bool
|
||||||
|
queryBackend string
|
||||||
}{
|
}{
|
||||||
{name: "no streaming"},
|
{
|
||||||
|
name: "no streaming",
|
||||||
|
queryBackend: "blocking-query",
|
||||||
|
hcl: `use_streaming_backend = false`,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
name: "streaming",
|
name: "streaming",
|
||||||
grpcMetrics: true,
|
grpcMetrics: true,
|
||||||
|
@ -751,6 +756,7 @@ func TestHealthServiceNodes_Blocking(t *testing.T) {
|
||||||
rpc { enable_streaming = true }
|
rpc { enable_streaming = true }
|
||||||
use_streaming_backend = true
|
use_streaming_backend = true
|
||||||
`,
|
`,
|
||||||
|
queryBackend: "streaming",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -856,6 +862,8 @@ use_streaming_backend = true
|
||||||
require.True(t, idx < newIdx, "index should have increased."+
|
require.True(t, idx < newIdx, "index should have increased."+
|
||||||
"idx=%d, newIdx=%d", idx, newIdx)
|
"idx=%d, newIdx=%d", idx, newIdx)
|
||||||
|
|
||||||
|
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
|
||||||
|
|
||||||
idx = newIdx
|
idx = newIdx
|
||||||
|
|
||||||
checkErrs()
|
checkErrs()
|
||||||
|
@ -882,6 +890,7 @@ use_streaming_backend = true
|
||||||
|
|
||||||
newIdx := getIndex(t, resp)
|
newIdx := getIndex(t, resp)
|
||||||
require.Equal(t, idx, newIdx)
|
require.Equal(t, idx, newIdx)
|
||||||
|
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
|
||||||
}
|
}
|
||||||
|
|
||||||
if tc.grpcMetrics {
|
if tc.grpcMetrics {
|
||||||
|
@ -905,16 +914,25 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
config string
|
config string
|
||||||
|
queryBackend string
|
||||||
}{
|
}{
|
||||||
{"normal", ""},
|
{
|
||||||
{"cache-with-streaming", `
|
name: "blocking-query",
|
||||||
|
config: `use_streaming_backend=false`,
|
||||||
|
queryBackend: "blocking-query",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "cache-with-streaming",
|
||||||
|
config: `
|
||||||
rpc{
|
rpc{
|
||||||
enable_streaming=true
|
enable_streaming=true
|
||||||
}
|
}
|
||||||
use_streaming_backend=true
|
use_streaming_backend=true
|
||||||
`},
|
`,
|
||||||
|
queryBackend: "streaming",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, tst := range tests {
|
for _, tst := range tests {
|
||||||
t.Run(tst.name, func(t *testing.T) {
|
t.Run(tst.name, func(t *testing.T) {
|
||||||
|
@ -986,6 +1004,8 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
|
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
|
||||||
t.Fatalf("bad: %v", obj)
|
t.Fatalf("bad: %v", obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1511,6 +1531,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
|
||||||
|
|
||||||
// Should be a cache miss
|
// Should be a cache miss
|
||||||
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
|
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
|
||||||
|
// always a blocking query, because the ingress endpoint does not yet support streaming.
|
||||||
|
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
|
||||||
}))
|
}))
|
||||||
|
|
||||||
require.True(t, t.Run("test caching hit", func(t *testing.T) {
|
require.True(t, t.Run("test caching hit", func(t *testing.T) {
|
||||||
|
@ -1525,6 +1547,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
|
||||||
|
|
||||||
// Should be a cache HIT now!
|
// Should be a cache HIT now!
|
||||||
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
|
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
|
||||||
|
// always a blocking query, because the ingress endpoint does not yet support streaming.
|
||||||
|
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -723,6 +723,13 @@ func setMeta(resp http.ResponseWriter, m structs.QueryMetaCompat) {
|
||||||
setLastContact(resp, m.GetLastContact())
|
setLastContact(resp, m.GetLastContact())
|
||||||
setKnownLeader(resp, m.GetKnownLeader())
|
setKnownLeader(resp, m.GetKnownLeader())
|
||||||
setConsistency(resp, m.GetConsistencyLevel())
|
setConsistency(resp, m.GetConsistencyLevel())
|
||||||
|
setQueryBackend(resp, m.GetBackend())
|
||||||
|
}
|
||||||
|
|
||||||
|
func setQueryBackend(resp http.ResponseWriter, backend structs.QueryBackend) {
|
||||||
|
if b := backend.String(); b != "" {
|
||||||
|
resp.Header().Set("X-Consul-Query-Backend", b)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setCacheMeta sets http response headers to indicate cache status.
|
// setCacheMeta sets http response headers to indicate cache status.
|
||||||
|
|
|
@ -42,7 +42,8 @@ func (c *Client) ServiceNodes(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
|
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
|
||||||
}
|
}
|
||||||
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
|
meta := cache.ResultMeta{Index: result.Index, Hit: result.Cached}
|
||||||
|
return *result.Value.(*structs.IndexedCheckServiceNodes), meta, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out, md, err := c.getServiceNodes(ctx, req)
|
out, md, err := c.getServiceNodes(ctx, req)
|
||||||
|
|
|
@ -171,7 +171,8 @@ func (s *healthView) Result(index uint64) interface{} {
|
||||||
result := structs.IndexedCheckServiceNodes{
|
result := structs.IndexedCheckServiceNodes{
|
||||||
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
|
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
|
||||||
QueryMeta: structs.QueryMeta{
|
QueryMeta: structs.QueryMeta{
|
||||||
Index: index,
|
Index: index,
|
||||||
|
Backend: structs.QueryBackendStreaming,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, node := range s.state {
|
for _, node := range s.state {
|
||||||
|
|
|
@ -101,7 +101,8 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
|
||||||
empty := &structs.IndexedCheckServiceNodes{
|
empty := &structs.IndexedCheckServiceNodes{
|
||||||
Nodes: structs.CheckServiceNodes{},
|
Nodes: structs.CheckServiceNodes{},
|
||||||
QueryMeta: structs.QueryMeta{
|
QueryMeta: structs.QueryMeta{
|
||||||
Index: 1,
|
Index: 1,
|
||||||
|
Backend: structs.QueryBackendStreaming,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,6 +382,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
|
||||||
|
|
||||||
func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
|
func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
|
||||||
result := &structs.IndexedCheckServiceNodes{}
|
result := &structs.IndexedCheckServiceNodes{}
|
||||||
|
result.QueryMeta.Backend = structs.QueryBackendStreaming
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
result.Nodes = append(result.Nodes, structs.CheckServiceNode{
|
result.Nodes = append(result.Nodes, structs.CheckServiceNode{
|
||||||
Node: &structs.Node{Node: node},
|
Node: &structs.Node{Node: node},
|
||||||
|
|
|
@ -30,6 +30,7 @@ func testGRPCStreamingWorking(t *testing.T, config string) {
|
||||||
|
|
||||||
assertIndex(t, resp)
|
assertIndex(t, resp)
|
||||||
require.NotEmpty(t, resp.Header().Get("X-Consul-Index"))
|
require.NotEmpty(t, resp.Header().Get("X-Consul-Index"))
|
||||||
|
require.Equal(t, "streaming", resp.Header().Get("X-Consul-Query-Backend"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGRPCWithTLSConfigs(t *testing.T) {
|
func TestGRPCWithTLSConfigs(t *testing.T) {
|
||||||
|
|
|
@ -44,6 +44,7 @@ type QueryMetaCompat interface {
|
||||||
SetIndex(uint64)
|
SetIndex(uint64)
|
||||||
GetConsistencyLevel() string
|
GetConsistencyLevel() string
|
||||||
SetConsistencyLevel(string)
|
SetConsistencyLevel(string)
|
||||||
|
GetBackend() QueryBackend
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetToken helps implement the QueryOptionsCompat interface
|
// GetToken helps implement the QueryOptionsCompat interface
|
||||||
|
@ -269,3 +270,7 @@ func (q *QueryMeta) SetIndex(index uint64) {
|
||||||
func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) {
|
func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) {
|
||||||
q.ConsistencyLevel = consistencyLevel
|
q.ConsistencyLevel = consistencyLevel
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *QueryMeta) GetBackend() QueryBackend {
|
||||||
|
return q.Backend
|
||||||
|
}
|
||||||
|
|
|
@ -314,6 +314,24 @@ func (w *WriteRequest) SetTokenSecret(s string) {
|
||||||
w.Token = s
|
w.Token = s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type QueryBackend int
|
||||||
|
|
||||||
|
const (
|
||||||
|
QueryBackendBlocking QueryBackend = iota
|
||||||
|
QueryBackendStreaming
|
||||||
|
)
|
||||||
|
|
||||||
|
func (q QueryBackend) String() string {
|
||||||
|
switch q {
|
||||||
|
case QueryBackendBlocking:
|
||||||
|
return "blocking-query"
|
||||||
|
case QueryBackendStreaming:
|
||||||
|
return "streaming"
|
||||||
|
default:
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// QueryMeta allows a query response to include potentially
|
// QueryMeta allows a query response to include potentially
|
||||||
// useful metadata about a query
|
// useful metadata about a query
|
||||||
type QueryMeta struct {
|
type QueryMeta struct {
|
||||||
|
@ -338,6 +356,9 @@ type QueryMeta struct {
|
||||||
// When NotModified is true, the response will not contain the result of
|
// When NotModified is true, the response will not contain the result of
|
||||||
// the query.
|
// the query.
|
||||||
NotModified bool
|
NotModified bool
|
||||||
|
|
||||||
|
// Backend used to handle this query, either blocking-query or streaming.
|
||||||
|
Backend QueryBackend
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterRequest is used for the Catalog.Register endpoint
|
// RegisterRequest is used for the Catalog.Register endpoint
|
||||||
|
|
|
@ -215,9 +215,13 @@ func (m *Materializer) notifyUpdateLocked(err error) {
|
||||||
m.updateCh = make(chan struct{})
|
m.updateCh = make(chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Result returned from the View.
|
||||||
type Result struct {
|
type Result struct {
|
||||||
Index uint64
|
Index uint64
|
||||||
Value interface{}
|
Value interface{}
|
||||||
|
// Cached is true if the requested value was already available locally. If
|
||||||
|
// the value is false, it indicates that getFromView had to wait for an update,
|
||||||
|
Cached bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// getFromView blocks until the index of the View is greater than opts.MinIndex,
|
// getFromView blocks until the index of the View is greater than opts.MinIndex,
|
||||||
|
@ -237,6 +241,7 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result
|
||||||
// haven't loaded a snapshot at all yet which means we should wait for one on
|
// haven't loaded a snapshot at all yet which means we should wait for one on
|
||||||
// the update chan.
|
// the update chan.
|
||||||
if result.Index > 0 && result.Index > minIndex {
|
if result.Index > 0 && result.Index > minIndex {
|
||||||
|
result.Cached = true
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -171,7 +171,7 @@ func (s *Store) Notify(
|
||||||
u := cache.UpdateEvent{
|
u := cache.UpdateEvent{
|
||||||
CorrelationID: correlationID,
|
CorrelationID: correlationID,
|
||||||
Result: result.Value,
|
Result: result.Value,
|
||||||
Meta: cache.ResultMeta{Index: result.Index},
|
Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached},
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case updateCh <- u:
|
case updateCh <- u:
|
||||||
|
|
|
@ -2,6 +2,8 @@ package pbcommon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IsRead is always true for QueryOption
|
// IsRead is always true for QueryOption
|
||||||
|
@ -97,6 +99,10 @@ func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) {
|
||||||
q.ConsistencyLevel = consistencyLevel
|
q.ConsistencyLevel = consistencyLevel
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *QueryMeta) GetBackend() structs.QueryBackend {
|
||||||
|
return structs.QueryBackend(0)
|
||||||
|
}
|
||||||
|
|
||||||
// WriteRequest only applies to writes, always false
|
// WriteRequest only applies to writes, always false
|
||||||
func (w WriteRequest) IsRead() bool {
|
func (w WriteRequest) IsRead() bool {
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -99,6 +99,9 @@ While streaming is a significant optimization over long polling, it will not pop
|
||||||
`X-Consul-LastContact` or `X-Consul-KnownLeader` response headers, because the required
|
`X-Consul-LastContact` or `X-Consul-KnownLeader` response headers, because the required
|
||||||
data is not available to the client.
|
data is not available to the client.
|
||||||
|
|
||||||
|
When the streaming backend is used, API responses will include the `X-Consul-Query-Backend`
|
||||||
|
header with a value of `streaming`.
|
||||||
|
|
||||||
|
|
||||||
## Hash-based Blocking Queries
|
## Hash-based Blocking Queries
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue