mirror of https://github.com/hashicorp/consul
submatview: Refactor MaterializeView
Replace InitFilter with Reset. Removes the need to store a fatalErr and the cache-type, and removes the need to recreate the filter each time. Pass dependencies into MaterializedView. Remove context from MaterializedView. Rename state to view. Rename MaterialziedView to Materialzier. Rename to NewMaterializer Pass in retry.Waiterpull/8809/head
parent
b576a2d3c7
commit
ed45957ffb
|
@ -1,11 +1,15 @@
|
||||||
package cachetype
|
package cachetype
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-bexpr"
|
"github.com/hashicorp/go-bexpr"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/proto/pbservice"
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
|
@ -58,20 +62,56 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
|
||||||
r.Topic = pbsubscribe.Topic_ServiceHealthConnect
|
r.Topic = pbsubscribe.Topic_ServiceHealthConnect
|
||||||
}
|
}
|
||||||
|
|
||||||
view := MaterializedViewFromFetch(c, opts, r)
|
view, err := c.getMaterializedView(opts, r)
|
||||||
|
if err != nil {
|
||||||
|
return cache.FetchResult{}, err
|
||||||
|
}
|
||||||
return view.Fetch(opts)
|
return view.Fetch(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *StreamingHealthServices) getMaterializedView(opts cache.FetchOptions, r Request) (*Materializer, error) {
|
||||||
|
if opts.LastResult != nil && opts.LastResult.State != nil {
|
||||||
|
return opts.LastResult.State.(*Materializer), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
state, err := newHealthViewState(r.Filter)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
view := NewMaterializer(ViewDeps{
|
||||||
|
State: state,
|
||||||
|
Client: c.client,
|
||||||
|
Logger: c.logger,
|
||||||
|
Waiter: &retry.Waiter{
|
||||||
|
MinFailures: 1,
|
||||||
|
MinWait: 0,
|
||||||
|
MaxWait: 60 * time.Second,
|
||||||
|
Jitter: retry.NewJitter(100),
|
||||||
|
},
|
||||||
|
Request: r,
|
||||||
|
Stop: cancel,
|
||||||
|
Done: ctx.Done(),
|
||||||
|
})
|
||||||
|
go view.run(ctx)
|
||||||
|
return view, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SupportsBlocking implements cache.Type
|
// SupportsBlocking implements cache.Type
|
||||||
func (c *StreamingHealthServices) SupportsBlocking() bool {
|
func (c *StreamingHealthServices) SupportsBlocking() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMaterializedView implements StreamingCacheType
|
func newHealthViewState(filterExpr string) (View, error) {
|
||||||
func (c *StreamingHealthServices) NewMaterializedViewState() MaterializedViewState {
|
s := &healthViewState{state: make(map[string]structs.CheckServiceNode)}
|
||||||
return &healthViewState{
|
|
||||||
state: make(map[string]structs.CheckServiceNode),
|
// We apply filtering to the raw CheckServiceNodes before we are done mutating
|
||||||
}
|
// state in Update to save from storing stuff in memory we'll only filter
|
||||||
|
// later. Because the state is just a map of those types, we can simply run
|
||||||
|
// that map through filter and it will remove any entries that don't match.
|
||||||
|
var err error
|
||||||
|
s.filter, err = bexpr.CreateFilter(filterExpr, nil, s.state)
|
||||||
|
return s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// StreamingClient implements StreamingCacheType
|
// StreamingClient implements StreamingCacheType
|
||||||
|
@ -84,31 +124,18 @@ func (c *StreamingHealthServices) Logger() hclog.Logger {
|
||||||
return c.logger
|
return c.logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// healthViewState implements MaterializedViewState for storing the view state
|
// healthViewState implements View for storing the view state
|
||||||
// of a service health result. We store it as a map to make updates and
|
// of a service health result. We store it as a map to make updates and
|
||||||
// deletions a little easier but we could just store a result type
|
// deletions a little easier but we could just store a result type
|
||||||
// (IndexedCheckServiceNodes) and update it in place for each event - that
|
// (IndexedCheckServiceNodes) and update it in place for each event - that
|
||||||
// involves re-sorting each time etc. though.
|
// involves re-sorting each time etc. though.
|
||||||
type healthViewState struct {
|
type healthViewState struct {
|
||||||
state map[string]structs.CheckServiceNode
|
state map[string]structs.CheckServiceNode
|
||||||
|
// TODO: test case with filter
|
||||||
filter *bexpr.Filter
|
filter *bexpr.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitFilter implements MaterializedViewState
|
// Update implements View
|
||||||
func (s *healthViewState) InitFilter(expression string) error {
|
|
||||||
// We apply filtering to the raw CheckServiceNodes before we are done mutating
|
|
||||||
// state in Update to save from storing stuff in memory we'll only filter
|
|
||||||
// later. Because the state is just a map of those types, we can simply run
|
|
||||||
// that map through filter and it will remove any entries that don't match.
|
|
||||||
filter, err := bexpr.CreateFilter(expression, nil, s.state)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.filter = filter
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update implements MaterializedViewState
|
|
||||||
func (s *healthViewState) Update(events []*pbsubscribe.Event) error {
|
func (s *healthViewState) Update(events []*pbsubscribe.Event) error {
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
serviceHealth := event.GetServiceHealth()
|
serviceHealth := event.GetServiceHealth()
|
||||||
|
@ -127,6 +154,7 @@ func (s *healthViewState) Update(events []*pbsubscribe.Event) error {
|
||||||
delete(s.state, id)
|
delete(s.state, id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// TODO: replace with a no-op filter instead of a conditional
|
||||||
if s.filter != nil {
|
if s.filter != nil {
|
||||||
filtered, err := s.filter.Execute(s.state)
|
filtered, err := s.filter.Execute(s.state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -137,10 +165,11 @@ func (s *healthViewState) Update(events []*pbsubscribe.Event) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Result implements MaterializedViewState
|
// Result implements View
|
||||||
func (s *healthViewState) Result(index uint64) (interface{}, error) {
|
func (s *healthViewState) Result(index uint64) (interface{}, error) {
|
||||||
var result structs.IndexedCheckServiceNodes
|
var result structs.IndexedCheckServiceNodes
|
||||||
// Avoid a nil slice if there are no results in the view
|
// Avoid a nil slice if there are no results in the view
|
||||||
|
// TODO: why this ^
|
||||||
result.Nodes = structs.CheckServiceNodes{}
|
result.Nodes = structs.CheckServiceNodes{}
|
||||||
for _, node := range s.state {
|
for _, node := range s.state {
|
||||||
result.Nodes = append(result.Nodes, node)
|
result.Nodes = append(result.Nodes, node)
|
||||||
|
@ -148,3 +177,7 @@ func (s *healthViewState) Result(index uint64) (interface{}, error) {
|
||||||
result.Index = index
|
result.Index = index
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *healthViewState) Reset() {
|
||||||
|
s.state = make(map[string]structs.CheckServiceNode)
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package cachetype
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -254,6 +255,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
for _, csn := range r.Nodes {
|
for _, csn := range r.Nodes {
|
||||||
nodes = append(nodes, csn.Node.Node)
|
nodes = append(nodes, csn.Node.Node)
|
||||||
}
|
}
|
||||||
|
sort.Strings(nodes)
|
||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,8 +293,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
"Fetch should have returned before the timeout")
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
require.Equal(t, uint64(20), result.Index)
|
require.Equal(t, uint64(20), result.Index)
|
||||||
require.ElementsMatch(t, []string{"node2", "node3"},
|
require.Equal(t, []string{"node2", "node3"}, gatherNodes(result.Value))
|
||||||
gatherNodes(result.Value))
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
opts.MinIndex = result.Index
|
||||||
opts.LastResult = &result
|
opts.LastResult = &result
|
||||||
|
@ -321,8 +322,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) {
|
||||||
"Fetch should have returned before the timeout")
|
"Fetch should have returned before the timeout")
|
||||||
|
|
||||||
require.Equal(t, uint64(50), result.Index)
|
require.Equal(t, uint64(50), result.Index)
|
||||||
require.ElementsMatch(t, []string{"node3", "node4", "node5"},
|
require.Equal(t, []string{"node3", "node4", "node5"}, gatherNodes(result.Value))
|
||||||
gatherNodes(result.Value))
|
|
||||||
|
|
||||||
opts.MinIndex = result.Index
|
opts.MinIndex = result.Index
|
||||||
opts.LastResult = &result
|
opts.LastResult = &result
|
||||||
|
|
|
@ -12,35 +12,13 @@ import (
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// View is the interface used to manage they type-specific
|
||||||
// SubscribeBackoffMax controls the range of exponential backoff when errors
|
|
||||||
// are returned from subscriptions.
|
|
||||||
SubscribeBackoffMax = 60 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
// StreamingClient is the interface we need from the gRPC client stub. Separate
|
|
||||||
// interface simplifies testing.
|
|
||||||
type StreamingClient interface {
|
|
||||||
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// MaterializedViewState is the interface used to manage they type-specific
|
|
||||||
// materialized view logic.
|
// materialized view logic.
|
||||||
type MaterializedViewState interface {
|
type View interface {
|
||||||
// InitFilter is called once when the view is constructed if the subscription
|
|
||||||
// has a non-empty Filter argument. The implementor is expected to create a
|
|
||||||
// *bexpr.Filter and store it locally so it can be used to filter events
|
|
||||||
// and/or results. Ideally filtering should occur inside `Update` calls such
|
|
||||||
// that we don't store objects in the view state that are just filtered when
|
|
||||||
// the result is returned, however in some cases it might not be possible and
|
|
||||||
// the type may choose to store the whole view and only apply filtering in the
|
|
||||||
// Result method just before returning a result.
|
|
||||||
InitFilter(expression string) error
|
|
||||||
|
|
||||||
// Update is called when one or more events are received. The first call will
|
// Update is called when one or more events are received. The first call will
|
||||||
// include _all_ events in the initial snapshot which may be an empty set.
|
// include _all_ events in the initial snapshot which may be an empty set.
|
||||||
// Subsequent calls will contain one or more update events in the order they
|
// Subsequent calls will contain one or more update events in the order they
|
||||||
|
@ -55,21 +33,13 @@ type MaterializedViewState interface {
|
||||||
// populating. This allows implementations to not worry about maintaining
|
// populating. This allows implementations to not worry about maintaining
|
||||||
// indexes seen during Update.
|
// indexes seen during Update.
|
||||||
Result(index uint64) (interface{}, error)
|
Result(index uint64) (interface{}, error)
|
||||||
|
|
||||||
|
// Reset the view to the zero state, done in preparation for receiving a new
|
||||||
|
// snapshot.
|
||||||
|
Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
// StreamingCacheType is the interface a cache-type needs to implement to make
|
type Filter func(seq interface{}) (interface{}, error)
|
||||||
// use of streaming as the transport for updates from the server.
|
|
||||||
type StreamingCacheType interface {
|
|
||||||
NewMaterializedViewState() MaterializedViewState
|
|
||||||
StreamingClient() StreamingClient
|
|
||||||
Logger() hclog.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// temporary is a private interface as used by net and other std lib packages to
|
|
||||||
// show error types represent temporary/recoverable errors.
|
|
||||||
type temporary interface {
|
|
||||||
Temporary() bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// resetErr represents a server request to reset the subscription, it's typed so
|
// resetErr represents a server request to reset the subscription, it's typed so
|
||||||
// we can mark it as temporary and so attempt to retry first time without
|
// we can mark it as temporary and so attempt to retry first time without
|
||||||
|
@ -90,98 +60,87 @@ type Request struct {
|
||||||
pbsubscribe.SubscribeRequest
|
pbsubscribe.SubscribeRequest
|
||||||
// Filter is a bexpr filter expression that is used to filter events on the
|
// Filter is a bexpr filter expression that is used to filter events on the
|
||||||
// client side.
|
// client side.
|
||||||
// TODO: is this used?
|
|
||||||
Filter string
|
Filter string
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaterializedView is a partial view of the state on servers, maintained via
|
// TODO: update godoc
|
||||||
|
// Materializer is a partial view of the state on servers, maintained via
|
||||||
// streaming subscriptions. It is specialized for different cache types by
|
// streaming subscriptions. It is specialized for different cache types by
|
||||||
// providing a MaterializedViewState that encapsulates the logic to update the
|
// providing a View that encapsulates the logic to update the
|
||||||
// state and format it as the correct result type.
|
// state and format it as the correct result type.
|
||||||
//
|
//
|
||||||
// The MaterializedView object becomes the cache.Result.State for a streaming
|
// The Materializer object becomes the cache.Result.State for a streaming
|
||||||
// cache type and manages the actual streaming RPC call to the servers behind
|
// cache type and manages the actual streaming RPC call to the servers behind
|
||||||
// the scenes until the cache result is discarded when TTL expires.
|
// the scenes until the cache result is discarded when TTL expires.
|
||||||
type MaterializedView struct {
|
type Materializer struct {
|
||||||
// Properties above the lock are immutable after the view is constructed in
|
// Properties above the lock are immutable after the view is constructed in
|
||||||
// MaterializedViewFromFetch and must not be modified.
|
// NewMaterializer and must not be modified.
|
||||||
typ StreamingCacheType
|
deps ViewDeps
|
||||||
client StreamingClient
|
|
||||||
logger hclog.Logger
|
|
||||||
req Request
|
|
||||||
ctx context.Context
|
|
||||||
cancelFunc func()
|
|
||||||
|
|
||||||
// l protects the mutable state - all fields below it must only be accessed
|
// l protects the mutable state - all fields below it must only be accessed
|
||||||
// while holding l.
|
// while holding l.
|
||||||
l sync.Mutex
|
l sync.Mutex
|
||||||
index uint64
|
index uint64
|
||||||
state MaterializedViewState
|
view View
|
||||||
snapshotDone bool
|
snapshotDone bool
|
||||||
updateCh chan struct{}
|
updateCh chan struct{}
|
||||||
retryWaiter *lib.RetryWaiter
|
retryWaiter *retry.Waiter
|
||||||
err error
|
err error
|
||||||
fatalErr error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaterializedViewFromFetch retrieves an existing view from the cache result
|
// TODO: rename
|
||||||
|
type ViewDeps struct {
|
||||||
|
State View
|
||||||
|
Client StreamingClient
|
||||||
|
Logger hclog.Logger
|
||||||
|
Waiter *retry.Waiter
|
||||||
|
Request Request
|
||||||
|
Stop func()
|
||||||
|
Done <-chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamingClient is the interface we need from the gRPC client stub. Separate
|
||||||
|
// interface simplifies testing.
|
||||||
|
type StreamingClient interface {
|
||||||
|
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMaterializer retrieves an existing view from the cache result
|
||||||
// state if one exists, otherwise creates a new one. Note that the returned view
|
// state if one exists, otherwise creates a new one. Note that the returned view
|
||||||
// MUST have Close called eventually to avoid leaking resources. Typically this
|
// MUST have Close called eventually to avoid leaking resources. Typically this
|
||||||
// is done automatically if the view is returned in a cache.Result.State when
|
// is done automatically if the view is returned in a cache.Result.State when
|
||||||
// the cache evicts the result. If the view is not returned in a result state
|
// the cache evicts the result. If the view is not returned in a result state
|
||||||
// though Close must be called some other way to avoid leaking the goroutine and
|
// though Close must be called some other way to avoid leaking the goroutine and
|
||||||
// memory.
|
// memory.
|
||||||
func MaterializedViewFromFetch(
|
func NewMaterializer(deps ViewDeps) *Materializer {
|
||||||
t StreamingCacheType,
|
v := &Materializer{
|
||||||
opts cache.FetchOptions,
|
deps: deps,
|
||||||
subReq Request,
|
view: deps.State,
|
||||||
) *MaterializedView {
|
retryWaiter: deps.Waiter,
|
||||||
if opts.LastResult == nil || opts.LastResult.State == nil {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
v := &MaterializedView{
|
|
||||||
typ: t,
|
|
||||||
client: t.StreamingClient(),
|
|
||||||
logger: t.Logger(),
|
|
||||||
req: subReq,
|
|
||||||
ctx: ctx,
|
|
||||||
cancelFunc: cancel,
|
|
||||||
// Allow first retry without wait, this is important and we rely on it in
|
|
||||||
// tests.
|
|
||||||
retryWaiter: lib.NewRetryWaiter(1, 0, SubscribeBackoffMax,
|
|
||||||
lib.NewJitterRandomStagger(100)),
|
|
||||||
}
|
}
|
||||||
// Run init now otherwise there is a race between run() and a call to Fetch
|
|
||||||
// which expects a view state to exist.
|
|
||||||
v.reset()
|
v.reset()
|
||||||
go v.run()
|
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
return opts.LastResult.State.(*MaterializedView)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close implements io.Close and discards view state and stops background view
|
// Close implements io.Close and discards view state and stops background view
|
||||||
// maintenance.
|
// maintenance.
|
||||||
func (v *MaterializedView) Close() error {
|
func (v *Materializer) Close() error {
|
||||||
v.l.Lock()
|
v.l.Lock()
|
||||||
defer v.l.Unlock()
|
defer v.l.Unlock()
|
||||||
if v.cancelFunc != nil {
|
v.deps.Stop()
|
||||||
v.cancelFunc()
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *MaterializedView) run() {
|
func (v *Materializer) run(ctx context.Context) {
|
||||||
if v.ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Loop in case stream resets and we need to start over
|
// Loop in case stream resets and we need to start over
|
||||||
for {
|
for {
|
||||||
// Run a subscribe call until it fails
|
err := v.runSubscription(ctx)
|
||||||
err := v.runSubscription()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Check if the view closed
|
if ctx.Err() != nil {
|
||||||
if v.ctx.Err() != nil {
|
|
||||||
// Err doesn't matter and is likely just context cancelled
|
// Err doesn't matter and is likely just context cancelled
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -200,30 +159,36 @@ func (v *MaterializedView) run() {
|
||||||
failures := v.retryWaiter.Failures()
|
failures := v.retryWaiter.Failures()
|
||||||
v.l.Unlock()
|
v.l.Unlock()
|
||||||
|
|
||||||
// Exponential backoff to avoid hammering servers if they are closing
|
v.deps.Logger.Error("subscribe call failed",
|
||||||
// conns because of overload or resetting based on errors.
|
"err", err,
|
||||||
v.logger.Error("subscribe call failed", "err", err, "topic", v.req.Topic,
|
"topic", v.deps.Request.Topic,
|
||||||
"key", v.req.Key, "failure_count", failures)
|
"key", v.deps.Request.Key,
|
||||||
|
"failure_count", failures)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-v.ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-waitCh:
|
case <-waitCh:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Loop and keep trying to resume subscription after error
|
// Loop and keep trying to resume subscription after error
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// temporary is a private interface as used by net and other std lib packages to
|
||||||
|
// show error types represent temporary/recoverable errors.
|
||||||
|
type temporary interface {
|
||||||
|
Temporary() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// runSubscription opens a new subscribe streaming call to the servers and runs
|
// runSubscription opens a new subscribe streaming call to the servers and runs
|
||||||
// for it's lifetime or until the view is closed.
|
// for it's lifetime or until the view is closed.
|
||||||
func (v *MaterializedView) runSubscription() error {
|
func (v *Materializer) runSubscription(ctx context.Context) error {
|
||||||
ctx, cancel := context.WithCancel(v.ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Copy the request template
|
// Copy the request template
|
||||||
req := v.req
|
req := v.deps.Request
|
||||||
|
|
||||||
v.l.Lock()
|
v.l.Lock()
|
||||||
|
|
||||||
|
@ -238,7 +203,7 @@ func (v *MaterializedView) runSubscription() error {
|
||||||
|
|
||||||
v.l.Unlock()
|
v.l.Unlock()
|
||||||
|
|
||||||
s, err := v.client.Subscribe(ctx, &req.SubscribeRequest)
|
s, err := v.deps.Client.Subscribe(ctx, &req.SubscribeRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -261,7 +226,7 @@ func (v *MaterializedView) runSubscription() error {
|
||||||
v.l.Lock()
|
v.l.Lock()
|
||||||
|
|
||||||
// Deliver snapshot events to the View state
|
// Deliver snapshot events to the View state
|
||||||
if err := v.state.Update(snapshotEvents); err != nil {
|
if err := v.view.Update(snapshotEvents); err != nil {
|
||||||
v.l.Unlock()
|
v.l.Unlock()
|
||||||
// This error is kinda fatal to the view - we didn't apply some events
|
// This error is kinda fatal to the view - we didn't apply some events
|
||||||
// the server sent us which means our view is now not in sync. The only
|
// the server sent us which means our view is now not in sync. The only
|
||||||
|
@ -309,7 +274,7 @@ func (v *MaterializedView) runSubscription() error {
|
||||||
if snapshotDone {
|
if snapshotDone {
|
||||||
// We've already got a snapshot, this is an update, deliver it right away.
|
// We've already got a snapshot, this is an update, deliver it right away.
|
||||||
v.l.Lock()
|
v.l.Lock()
|
||||||
if err := v.state.Update(events); err != nil {
|
if err := v.view.Update(events); err != nil {
|
||||||
v.l.Unlock()
|
v.l.Unlock()
|
||||||
// This error is kinda fatal to the view - we didn't apply some events
|
// This error is kinda fatal to the view - we didn't apply some events
|
||||||
// the server sent us which means our view is now not in sync. The only
|
// the server sent us which means our view is now not in sync. The only
|
||||||
|
@ -336,25 +301,11 @@ func isGrpcStatus(err error, code codes.Code) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset clears the state ready to start a new stream from scratch.
|
// reset clears the state ready to start a new stream from scratch.
|
||||||
func (v *MaterializedView) reset() {
|
func (v *Materializer) reset() {
|
||||||
v.l.Lock()
|
v.l.Lock()
|
||||||
defer v.l.Unlock()
|
defer v.l.Unlock()
|
||||||
|
|
||||||
v.state = v.typ.NewMaterializedViewState()
|
v.view.Reset()
|
||||||
if v.req.Filter != "" {
|
|
||||||
if err := v.state.InitFilter(v.req.Filter); err != nil {
|
|
||||||
// If this errors we are stuck - it's fatal for the whole request as it
|
|
||||||
// means there was a bug or an invalid filter string we couldn't parse.
|
|
||||||
// Stop the whole view by closing it and cancelling context, but also set
|
|
||||||
// the error internally so that Fetch calls can return a meaningful error
|
|
||||||
// and not just "context cancelled".
|
|
||||||
v.fatalErr = err
|
|
||||||
if v.cancelFunc != nil {
|
|
||||||
v.cancelFunc()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
v.notifyUpdateLocked()
|
v.notifyUpdateLocked()
|
||||||
// Always start from zero when we have a new state so we load a snapshot from
|
// Always start from zero when we have a new state so we load a snapshot from
|
||||||
// the servers.
|
// the servers.
|
||||||
|
@ -366,7 +317,7 @@ func (v *MaterializedView) reset() {
|
||||||
|
|
||||||
// notifyUpdateLocked closes the current update channel and recreates a new
|
// notifyUpdateLocked closes the current update channel and recreates a new
|
||||||
// one. It must be called while holding the s.l lock.
|
// one. It must be called while holding the s.l lock.
|
||||||
func (v *MaterializedView) notifyUpdateLocked() {
|
func (v *Materializer) notifyUpdateLocked() {
|
||||||
if v.updateCh != nil {
|
if v.updateCh != nil {
|
||||||
close(v.updateCh)
|
close(v.updateCh)
|
||||||
}
|
}
|
||||||
|
@ -376,13 +327,13 @@ func (v *MaterializedView) notifyUpdateLocked() {
|
||||||
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
|
// Fetch implements the logic a StreamingCacheType will need during it's Fetch
|
||||||
// call. Cache types that use streaming should just be able to proxy to this
|
// call. Cache types that use streaming should just be able to proxy to this
|
||||||
// once they have a subscription object and return it's results directly.
|
// once they have a subscription object and return it's results directly.
|
||||||
func (v *MaterializedView) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
|
func (v *Materializer) Fetch(opts cache.FetchOptions) (cache.FetchResult, error) {
|
||||||
var result cache.FetchResult
|
var result cache.FetchResult
|
||||||
|
|
||||||
// Get current view Result and index
|
// Get current view Result and index
|
||||||
v.l.Lock()
|
v.l.Lock()
|
||||||
index := v.index
|
index := v.index
|
||||||
val, err := v.state.Result(v.index)
|
val, err := v.view.Result(v.index)
|
||||||
updateCh := v.updateCh
|
updateCh := v.updateCh
|
||||||
v.l.Unlock()
|
v.l.Unlock()
|
||||||
|
|
||||||
|
@ -420,7 +371,7 @@ func (v *MaterializedView) Fetch(opts cache.FetchOptions) (cache.FetchResult, er
|
||||||
if fetchErr == nil {
|
if fetchErr == nil {
|
||||||
// Only generate a new result if there was no error to avoid pointless
|
// Only generate a new result if there was no error to avoid pointless
|
||||||
// work potentially shuffling the same data around.
|
// work potentially shuffling the same data around.
|
||||||
result.Value, err = v.state.Result(v.index)
|
result.Value, err = v.view.Result(v.index)
|
||||||
}
|
}
|
||||||
v.l.Unlock()
|
v.l.Unlock()
|
||||||
|
|
||||||
|
@ -447,14 +398,8 @@ func (v *MaterializedView) Fetch(opts cache.FetchOptions) (cache.FetchResult, er
|
||||||
// Just return whatever we got originally, might still be empty
|
// Just return whatever we got originally, might still be empty
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|
||||||
case <-v.ctx.Done():
|
case <-v.deps.Done:
|
||||||
v.l.Lock()
|
return result, context.Canceled
|
||||||
err := v.fatalErr
|
|
||||||
v.l.Unlock()
|
|
||||||
if err != nil {
|
|
||||||
return result, err
|
|
||||||
}
|
|
||||||
return result, v.ctx.Err()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue