|
|
|
@ -583,6 +583,8 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *state) run() { |
|
|
|
|
logger := s.logger.Named(logging.ProxyConfig) |
|
|
|
|
|
|
|
|
|
// Close the channel we return from Watch when we stop so consumers can stop
|
|
|
|
|
// watching and clean up their goroutines. It's important we do this here and
|
|
|
|
|
// not in Close since this routine sends on this chan and so might panic if it
|
|
|
|
@ -603,10 +605,13 @@ func (s *state) run() {
|
|
|
|
|
case <-s.ctx.Done(): |
|
|
|
|
return |
|
|
|
|
case u := <-s.ch: |
|
|
|
|
logger.Trace("A blocking query returned; handling snapshot update", |
|
|
|
|
"proxy-id", s.proxyID.String(), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if err := s.handleUpdate(u, &snap); err != nil { |
|
|
|
|
s.logger.Error("watch error", |
|
|
|
|
"id", u.CorrelationID, |
|
|
|
|
"error", err, |
|
|
|
|
logger.Error("Failed to handle update from watch", |
|
|
|
|
"id", u.CorrelationID, "error", err, |
|
|
|
|
) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
@ -616,18 +621,24 @@ func (s *state) run() {
|
|
|
|
|
// etc on future updates.
|
|
|
|
|
snapCopy, err := snap.Clone() |
|
|
|
|
if err != nil { |
|
|
|
|
s.logger.Error("Failed to copy config snapshot for proxy", |
|
|
|
|
"proxy", s.proxyID, |
|
|
|
|
"error", err, |
|
|
|
|
logger.Error("Failed to copy config snapshot for proxy", |
|
|
|
|
"proxy-id", s.proxyID.String(), "error", err, |
|
|
|
|
) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
select { |
|
|
|
|
// try to send
|
|
|
|
|
case s.snapCh <- *snapCopy: |
|
|
|
|
// try to send
|
|
|
|
|
logger.Trace("Delivered new snapshot to proxy config watchers", |
|
|
|
|
"proxy-id", s.proxyID.String(), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// avoid blocking if a snapshot is already buffered
|
|
|
|
|
default: |
|
|
|
|
// avoid blocking if a snapshot is already buffered
|
|
|
|
|
logger.Trace("Failed to deliver new snapshot to proxy config watchers", |
|
|
|
|
"proxy-id", s.proxyID.String(), |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Allow the next change to trigger a send
|
|
|
|
@ -638,18 +649,25 @@ func (s *state) run() {
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
case replyCh := <-s.reqCh: |
|
|
|
|
logger.Trace("A proxy config snapshot was requested", |
|
|
|
|
"proxy-id", s.proxyID.String(), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if !snap.Valid() { |
|
|
|
|
// Not valid yet just respond with nil and move on to next task.
|
|
|
|
|
replyCh <- nil |
|
|
|
|
|
|
|
|
|
logger.Trace("The proxy's config snapshot is not valid yet", |
|
|
|
|
"proxy-id", s.proxyID.String(), |
|
|
|
|
) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// Make a deep copy of snap so we don't mutate any of the embedded structs
|
|
|
|
|
// etc on future updates.
|
|
|
|
|
snapCopy, err := snap.Clone() |
|
|
|
|
if err != nil { |
|
|
|
|
s.logger.Error("Failed to copy config snapshot for proxy", |
|
|
|
|
"proxy", s.proxyID, |
|
|
|
|
"error", err, |
|
|
|
|
logger.Error("Failed to copy config snapshot for proxy", |
|
|
|
|
"proxy-id", s.proxyID.String(), "error", err, |
|
|
|
|
) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|