diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index 1b1293d70c..622caf4226 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -39,7 +39,13 @@ type MaterializerDeps struct { Logger hclog.Logger } -// Fetch implements cache.Type +// Fetch service health from the materialized view. If no materialized view +// exists, create one and start it running in a goroutine. The goroutine will +// exit when the cache entry storing the result is expired, the cache will call +// Close on the result.State. +// +// Fetch implements part of the cache.Type interface, and assumes that the +// caller ensures that only a single call to Fetch is running at any time. func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { if opts.LastResult != nil && opts.LastResult.State != nil { return opts.LastResult.State.(*streamingHealthState).Fetch(opts) @@ -53,6 +59,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque Token: srvReq.Token, Datacenter: srvReq.Datacenter, Index: index, + // TODO(streaming): set Namespace from srvReq.EnterpriseMeta.Namespace } if srvReq.Connect { req.Topic = pbsubscribe.Topic_ServiceHealthConnect diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index d88b38dd3c..3e794611b6 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -486,6 +486,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { } func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() if !t.Run(name, fn) { t.FailNow() } diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 87f8729617..b02d018c68 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -386,6 +386,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin } func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() if !t.Run(name, fn) { t.FailNow() } diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index f6abd81274..0bed8bfc2e 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -117,10 +117,10 @@ func (m *Materializer) Run(ctx context.Context) { func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool { // temporary is an interface used by net and other std lib packages to // show error types represent temporary/recoverable errors. - _, ok := err.(interface { + temp, ok := err.(interface { Temporary() bool }) - return !ok || failures > 0 + return !ok || !temp.Temporary() || failures > 0 } // runSubscription opens a new subscribe streaming call to the servers and runs