Merge pull request #9152 from hashicorp/dnephin/streaming-enable-connect

use streaming backend for connect service health
pull/9107/head
Daniel Nephin 2021-03-12 13:05:16 -05:00 committed by GitHub
commit 8f2171d26c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 49 additions and 24 deletions

View File

@ -377,8 +377,6 @@ func New(bd BaseDeps) (*Agent, error) {
Cache: bd.Cache,
NetRPC: &a,
CacheName: cacheName,
// Temporarily until streaming supports all connect events
CacheNameConnect: cachetype.HealthServicesName,
}
a.serviceManager = NewServiceManager(&a)
@ -540,6 +538,7 @@ func (a *Agent) Start(ctx context.Context) error {
// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache,
Health: a.rpcClientHealth,
Logger: a.logger.Named(logging.ProxyConfig),
State: a.State,
Source: &structs.QuerySource{

View File

@ -202,9 +202,7 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers {
return stream.SnapshotHandlers{
topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth),
// The connect topic is temporarily disabled until the correct events are
// created for terminating gateway changes.
//topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect),
topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth),
topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect),
}
}

View File

@ -4,11 +4,12 @@ import (
"errors"
"sync"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
)
var (
@ -58,6 +59,8 @@ type ManagerConfig struct {
// Cache is the agent's cache instance that can be used to retrieve, store and
// monitor state for the proxies.
Cache *cache.Cache
// Health provides service health updates on a notification channel.
Health Health
// state is the agent's local state to be watched for new proxy registrations.
State *local.State
// source describes the current agent's identity, it's used directly for
@ -195,6 +198,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
// Set the necessary dependencies
state.logger = m.Logger.With("service_id", sid.String())
state.cache = m.Cache
state.health = m.Health
state.source = m.Source
state.dnsConfig = m.DNSConfig
state.intentionDefaultAllow = m.IntentionDefaultAllow

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/sdk/testutil"
@ -342,7 +343,13 @@ func testManager_BasicLifecycle(
state.TriggerSyncChanges = func() {}
// Create manager
m, err := NewManager(ManagerConfig{c, state, source, DNSConfig{}, logger, nil, false})
m, err := NewManager(ManagerConfig{
Cache: c,
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName},
State: state,
Source: source,
Logger: logger,
})
require.NoError(err)
// And run it

View File

@ -9,13 +9,14 @@ import (
"strings"
"time"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/copystructure"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/copystructure"
"github.com/mitchellh/mapstructure"
)
type CacheNotifier interface {
@ -23,6 +24,10 @@ type CacheNotifier interface {
correlationID string, ch chan<- cache.UpdateEvent) error
}
type Health interface {
Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- cache.UpdateEvent) error
}
const (
coalesceTimeout = 200 * time.Millisecond
rootsWatchID = "roots"
@ -54,6 +59,7 @@ type state struct {
logger hclog.Logger
source *structs.QuerySource
cache CacheNotifier
health Health
dnsConfig DNSConfig
serverSNIFn ServerSNIFunc
intentionDefaultAllow bool
@ -155,6 +161,7 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
taggedAddresses: taggedAddresses,
proxyCfg: proxyCfg,
token: token,
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
// reasonable number of upstream watches to all deliver their initial
// messages in parallel without blocking the cache.Notify loops. It's not a
@ -225,7 +232,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
var finalMeta structs.EnterpriseMeta
finalMeta.Merge(entMeta)
return s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
return s.health.Notify(ctx, structs.ServiceSpecificRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{
Token: s.token,
@ -443,7 +450,7 @@ func (s *state) initWatchesMeshGateway() error {
return err
}
err = s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
err = s.health.Notify(s.ctx, structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: structs.ConsulServiceName,
@ -969,7 +976,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
// Watch the health endpoint to discover endpoints for the service
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: svc.Service.Name,
@ -1267,7 +1274,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok {
ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: svc.Name,

View File

@ -6,12 +6,14 @@ import (
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
)
func TestStateChanged(t *testing.T) {
@ -143,6 +145,10 @@ func (cn *testCacheNotifier) Notify(ctx context.Context, t string, r cache.Reque
return nil
}
func (cn *testCacheNotifier) Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error) {
panic("Get: not implemented")
}
func (cn *testCacheNotifier) getNotifierRequest(t testing.TB, correlationId string) testCacheNotifierRequest {
cn.lock.RLock()
req, ok := cn.notifiers[correlationId]
@ -1521,6 +1527,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// setup a new testing cache notifier
cn := newTestCacheNotifier()
state.cache = cn
state.health = &health.Client{Cache: cn, CacheName: cachetype.HealthServicesName}
// setup the local datacenter information
state.source = &structs.QuerySource{

View File

@ -12,8 +12,6 @@ type Client struct {
Cache CacheGetter
// CacheName to use for service health.
CacheName string
// CacheNameConnect is the name of the cache to use for connect service health.
CacheNameConnect string
}
type NetRPC interface {
@ -22,6 +20,7 @@ type NetRPC interface {
type CacheGetter interface {
Get(ctx context.Context, t string, r cache.Request) (interface{}, cache.ResultMeta, error)
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
}
func (c *Client) ServiceNodes(
@ -54,12 +53,7 @@ func (c *Client) getServiceNodes(
return out, cache.ResultMeta{}, err
}
cacheName := c.CacheName
if req.Connect {
cacheName = c.CacheNameConnect
}
raw, md, err := c.Cache.Get(ctx, cacheName, &req)
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
if err != nil {
return out, md, err
}
@ -71,3 +65,12 @@ func (c *Client) getServiceNodes(
return *value, md, nil
}
func (c *Client) Notify(
ctx context.Context,
req structs.ServiceSpecificRequest,
correlationID string,
ch chan<- cache.UpdateEvent,
) error {
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
}