mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
230 lines
6.1 KiB
230 lines
6.1 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: MPL-2.0 |
|
|
|
package submatview |
|
|
|
import ( |
|
"context" |
|
"sync" |
|
"time" |
|
|
|
"github.com/hashicorp/go-hclog" |
|
|
|
"github.com/hashicorp/consul/lib/retry" |
|
"github.com/hashicorp/consul/proto/private/pbsubscribe" |
|
) |
|
|
|
// View receives events from, and return results to, Materializer. A view is |
|
// responsible for converting the pbsubscribe.Event.Payload into the local |
|
// type, and storing it so that it can be returned by Result(). |
|
type View interface { |
|
// Update is called when one or more events are received. The first call will |
|
// include _all_ events in the initial snapshot which may be an empty set. |
|
// Subsequent calls will contain one or more update events in the order they |
|
// are received. |
|
Update(events []*pbsubscribe.Event) error |
|
|
|
// Result returns the type-specific cache result based on the state. When no |
|
// events have been delivered yet the result should be an empty value type |
|
// suitable to return to clients in case there is an empty result on the |
|
// servers. The index the materialized view represents is maintained |
|
// separately and passed in in case the return type needs an Index field |
|
// populating. This allows implementations to not worry about maintaining |
|
// indexes seen during Update. |
|
Result(index uint64) interface{} |
|
|
|
// Reset the view to the zero state, done in preparation for receiving a new |
|
// snapshot. |
|
Reset() |
|
} |
|
|
|
// Result returned from the View. |
|
type Result struct { |
|
Index uint64 |
|
Value interface{} |
|
// Cached is true if the requested value was already available locally. If |
|
// the value is false, it indicates that GetFromView had to wait for an update, |
|
Cached bool |
|
} |
|
|
|
type Deps struct { |
|
View View |
|
Logger hclog.Logger |
|
Waiter *retry.Waiter |
|
Request func(index uint64) *pbsubscribe.SubscribeRequest |
|
} |
|
|
|
// materializer consumes the event stream, handling any framing events, and |
|
// allows for querying the materialized view. |
|
type materializer struct { |
|
retryWaiter *retry.Waiter |
|
logger hclog.Logger |
|
|
|
// lock protects the mutable state - all fields below it must only be accessed |
|
// while holding lock. |
|
lock sync.Mutex |
|
index uint64 |
|
view View |
|
updateCh chan struct{} |
|
err error |
|
} |
|
|
|
func newMaterializer(logger hclog.Logger, view View, waiter *retry.Waiter) *materializer { |
|
m := materializer{ |
|
view: view, |
|
retryWaiter: waiter, |
|
logger: logger, |
|
updateCh: make(chan struct{}), |
|
} |
|
if m.retryWaiter == nil { |
|
m.retryWaiter = defaultWaiter() |
|
} |
|
return &m |
|
} |
|
|
|
// Query blocks until the index of the View is greater than opts.MinIndex, |
|
// or the context is cancelled. |
|
func (m *materializer) query(ctx context.Context, minIndex uint64) (Result, error) { |
|
m.lock.Lock() |
|
|
|
result := Result{ |
|
Index: m.index, |
|
Value: m.view.Result(m.index), |
|
} |
|
|
|
updateCh := m.updateCh |
|
m.lock.Unlock() |
|
|
|
// If our index is > req.Index return right away. If index is zero then we |
|
// haven't loaded a snapshot at all yet which means we should wait for one on |
|
// the update chan. |
|
if result.Index > 0 && result.Index > minIndex { |
|
result.Cached = true |
|
return result, nil |
|
} |
|
|
|
for { |
|
select { |
|
case <-updateCh: |
|
// View updated, return the new result |
|
m.lock.Lock() |
|
result.Index = m.index |
|
|
|
switch { |
|
case m.err != nil: |
|
err := m.err |
|
m.lock.Unlock() |
|
return result, err |
|
case result.Index <= minIndex: |
|
// get a reference to the new updateCh, the previous one was closed |
|
updateCh = m.updateCh |
|
m.lock.Unlock() |
|
continue |
|
} |
|
|
|
result.Value = m.view.Result(m.index) |
|
m.lock.Unlock() |
|
return result, nil |
|
|
|
case <-ctx.Done(): |
|
// Update the result value to the latest because callers may still |
|
// use the value when the error is context.DeadlineExceeded |
|
m.lock.Lock() |
|
result.Value = m.view.Result(m.index) |
|
m.lock.Unlock() |
|
return result, ctx.Err() |
|
} |
|
} |
|
} |
|
|
|
func (m *materializer) currentIndex() uint64 { |
|
var resp uint64 |
|
|
|
m.lock.Lock() |
|
resp = m.index |
|
m.lock.Unlock() |
|
|
|
return resp |
|
} |
|
|
|
// notifyUpdateLocked closes the current update channel and recreates a new |
|
// one. It must be called while holding the m.lock lock. |
|
func (m *materializer) notifyUpdateLocked(err error) { |
|
m.err = err |
|
close(m.updateCh) |
|
m.updateCh = make(chan struct{}) |
|
} |
|
|
|
// reset clears the state ready to start a new stream from scratch. |
|
func (m *materializer) reset() { |
|
m.lock.Lock() |
|
defer m.lock.Unlock() |
|
|
|
m.view.Reset() |
|
m.index = 0 |
|
} |
|
|
|
// updateView updates the view from a sequence of events and stores |
|
// the corresponding Raft index. |
|
func (m *materializer) updateView(events []*pbsubscribe.Event, index uint64) error { |
|
m.lock.Lock() |
|
defer m.lock.Unlock() |
|
|
|
if err := m.view.Update(events); err != nil { |
|
return err |
|
} |
|
|
|
m.index = index |
|
m.notifyUpdateLocked(nil) |
|
m.retryWaiter.Reset() |
|
return nil |
|
} |
|
|
|
func (m *materializer) handleError(req *pbsubscribe.SubscribeRequest, err error) { |
|
failures := m.retryWaiter.Failures() |
|
if isNonTemporaryOrConsecutiveFailure(err, failures) { |
|
m.lock.Lock() |
|
m.notifyUpdateLocked(err) |
|
m.lock.Unlock() |
|
} |
|
|
|
logger := m.logger.With( |
|
"err", err, |
|
"topic", req.Topic, |
|
"failure_count", failures+1, |
|
) |
|
|
|
if req.GetWildcardSubject() { |
|
logger = logger.With("wildcard_subject", true) |
|
} else if sub := req.GetNamedSubject(); sub != nil { |
|
logger = logger.With("key", sub.Key) |
|
} else { |
|
logger = logger.With("key", req.Key) // nolint:staticcheck // SA1019 intentional use of deprecated field |
|
} |
|
|
|
logger.Error("subscribe call failed") |
|
} |
|
|
|
// isNonTemporaryOrConsecutiveFailure returns true if the error is not a |
|
// temporary error or if failures > 0. |
|
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool { |
|
// temporary is an interface used by net and other std lib packages to |
|
// show error types represent temporary/recoverable errors. |
|
temp, ok := err.(interface { |
|
Temporary() bool |
|
}) |
|
return !ok || !temp.Temporary() || failures > 0 |
|
} |
|
|
|
func defaultWaiter() *retry.Waiter { |
|
return &retry.Waiter{ |
|
MinFailures: 1, |
|
// Start backing off with small increments (200-400ms) which will double |
|
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000 |
|
// after that). (retry.Wait applies Max limit after jitter right now). |
|
Factor: 200 * time.Millisecond, |
|
MinWait: 0, |
|
MaxWait: 60 * time.Second, |
|
Jitter: retry.NewJitter(100), |
|
} |
|
}
|
|
|