Add event generation for autopilot state updates (#12626)

Whenever autopilot updates its state it notifies Consul. That notification will then trigger Consul to extract out the ready server information. If the ready servers have changed, then an event will be published to notify any subscribers of the full set of ready servers.

All these ready server event things are contained within an autopilotevents package instead of the consul package to make importing them into the grpc related packages possible
pull/12619/head
Matt Keeler 2022-04-19 13:03:03 -04:00 committed by GitHub
parent fb7022b763
commit cdad79bfc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1178 additions and 34 deletions

View File

@ -10,6 +10,7 @@ import (
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/consul/autopilotevents"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
@ -29,7 +30,8 @@ var AutopilotGauges = []prometheus.GaugeDefinition{
// AutopilotDelegate is a Consul delegate for autopilot operations.
type AutopilotDelegate struct {
server *Server
server *Server
readyServersPublisher *autopilotevents.ReadyServersEventPublisher
}
func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config {
@ -51,6 +53,8 @@ func (d *AutopilotDelegate) NotifyState(state *autopilot.State) {
} else {
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
}
d.readyServersPublisher.PublishReadyServersEvents(state)
}
func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) {
@ -63,7 +67,13 @@ func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) {
}
func (s *Server) initAutopilot(config *Config) {
apDelegate := &AutopilotDelegate{s}
apDelegate := &AutopilotDelegate{
server: s,
readyServersPublisher: autopilotevents.NewReadyServersEventPublisher(autopilotevents.Config{
Publisher: s.publisher,
GetStore: func() autopilotevents.StateStore { return s.fsm.State() },
}),
}
s.autopilot = autopilot.New(
s.raft,
@ -74,6 +84,9 @@ func (s *Server) initAutopilot(config *Config) {
autopilot.WithPromoter(s.autopilotPromoter()),
autopilot.WithReconciliationDisabled(),
)
// registers a snapshot handler for the event publisher to send as the first event for a new stream
s.publisher.RegisterHandler(autopilotevents.EventTopicReadyServers, apDelegate.readyServersPublisher.HandleSnapshot)
}
func (s *Server) autopilotServers() map[raft.ServerID]*autopilot.Server {

View File

@ -2,6 +2,7 @@ package consul
import (
"context"
"fmt"
"os"
"testing"
"time"
@ -10,6 +11,8 @@ import (
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/autopilotevents"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
@ -522,3 +525,99 @@ func TestAutopilot_MinQuorum(t *testing.T) {
}
})
}
func TestAutopilot_EventPublishing(t *testing.T) {
// This is really an integration level test. The general flow this test will follow is:
//
// 1. Start a 3 server cluster
// 2. Subscribe to the ready server events
// 3. Observe the first event which will be pretty immediately ready as it is the
// snapshot event.
// 4. Wait for multiple iterations of the autopilot state updater and ensure no
// other events are seen. The state update interval is 50ms for tests unless
// overridden.
// 5. Add a fouth server.
// 6. Wait for an event to be emitted containing 4 ready servers.
// 1. create the test cluster
cluster := newTestCluster(t, &testClusterConfig{
Servers: 3,
ServerConf: testServerACLConfig,
// We want to wait until each server has registered itself in the Catalog. Otherwise
// the first snapshot even we see might have no servers in it while things are being
// initialized. Doing this wait ensure that things are in the right state to start
// the subscription.
})
// 2. subscribe to ready server events
req := stream.SubscribeRequest{
Topic: autopilotevents.EventTopicReadyServers,
Subject: stream.SubjectNone,
Token: TestDefaultInitialManagementToken,
}
sub, err := cluster.Servers[0].publisher.Subscribe(&req)
require.NoError(t, err)
t.Cleanup(sub.Unsubscribe)
// 3. Observe that an event was generated which should be the snapshot event.
// As we have just bootstrapped the cluster with 3 servers we expect to
// see those 3 here.
validatePayload(t, 3, mustGetEventWithTimeout(t, sub, 50*time.Millisecond))
// TODO - its kind of annoying that the EventPublisher doesn't have a mode where
// it knows each event is a full state of the world. The ramifications are that
// we have to expect/ignore the framing events for EndOfSnapshot.
event := mustGetEventWithTimeout(t, sub, 10*time.Millisecond)
require.True(t, event.IsFramingEvent())
// 4. Wait for 3 iterations of the ServerHealthInterval to ensure no events
// are being published when the autopilot state is not changing.
eventNotEmitted(t, sub, 150*time.Millisecond)
// 5. Add a fourth server
_, srv := testServerWithConfig(t, testServerACLConfig, func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 0
})
joinLAN(t, srv, cluster.Servers[0])
// 6. Now wait for the event for the fourth server being added. This may take a little
// while as the joinLAN operation above doesn't wait for the server to actually get
// added to Raft.
validatePayload(t, 4, mustGetEventWithTimeout(t, sub, time.Second))
}
// mustGetEventWithTimeout is a helper function for validating that a Subscription.Next call will return
// an event within the given time. It also validates that no error is returned.
func mustGetEventWithTimeout(t *testing.T, subscription *stream.Subscription, timeout time.Duration) stream.Event {
t.Helper()
event, err := getEventWithTimeout(t, subscription, timeout)
require.NoError(t, err)
return event
}
// getEventWithTimeout is a helper function for retrieving a Event from a Subscription within the specified timeout.
func getEventWithTimeout(t *testing.T, subscription *stream.Subscription, timeout time.Duration) (stream.Event, error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
event, err := subscription.Next(ctx)
return event, err
}
// eventNotEmitted is a helper to validate that no Event is emitted for the given Subscription
func eventNotEmitted(t *testing.T, subscription *stream.Subscription, timeout time.Duration) {
t.Helper()
var event stream.Event
var err error
event, err = getEventWithTimeout(t, subscription, timeout)
require.Equal(t, context.DeadlineExceeded, err, fmt.Sprintf("event:%v", event))
}
func validatePayload(t *testing.T, expectedNumServers int, event stream.Event) {
t.Helper()
require.Equal(t, autopilotevents.EventTopicReadyServers, event.Topic)
readyServers, ok := event.Payload.(autopilotevents.EventPayloadReadyServers)
require.True(t, ok)
require.Len(t, readyServers, expectedNumServers)
}

View File

@ -0,0 +1,18 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package autopilotevents
import (
stream "github.com/hashicorp/consul/agent/consul/stream"
mock "github.com/stretchr/testify/mock"
)
// MockPublisher is an autogenerated mock type for the Publisher type
type MockPublisher struct {
mock.Mock
}
// Publish provides a mock function with given fields: _a0
func (_m *MockPublisher) Publish(_a0 []stream.Event) {
_m.Called(_a0)
}

View File

@ -0,0 +1,47 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package autopilotevents
import (
acl "github.com/hashicorp/consul/acl"
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
types "github.com/hashicorp/consul/types"
)
// MockStateStore is an autogenerated mock type for the StateStore type
type MockStateStore struct {
mock.Mock
}
// GetNodeID provides a mock function with given fields: _a0, _a1
func (_m *MockStateStore) GetNodeID(_a0 types.NodeID, _a1 *acl.EnterpriseMeta) (uint64, *structs.Node, error) {
ret := _m.Called(_a0, _a1)
var r0 uint64
if rf, ok := ret.Get(0).(func(types.NodeID, *acl.EnterpriseMeta) uint64); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(uint64)
}
var r1 *structs.Node
if rf, ok := ret.Get(1).(func(types.NodeID, *acl.EnterpriseMeta) *structs.Node); ok {
r1 = rf(_a0, _a1)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*structs.Node)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(types.NodeID, *acl.EnterpriseMeta) error); ok {
r2 = rf(_a0, _a1)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}

View File

@ -0,0 +1,28 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package autopilotevents
import (
time "time"
mock "github.com/stretchr/testify/mock"
)
// mockTimeProvider is an autogenerated mock type for the timeProvider type
type mockTimeProvider struct {
mock.Mock
}
// Now provides a mock function with given fields:
func (_m *mockTimeProvider) Now() time.Time {
ret := _m.Called()
var r0 time.Time
if rf, ok := ret.Get(0).(func() time.Time); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(time.Time)
}
return r0
}

View File

@ -0,0 +1,303 @@
package autopilotevents
import (
"fmt"
"net"
"sort"
"sync"
"time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/types"
autopilot "github.com/hashicorp/raft-autopilot"
)
const (
EventTopicReadyServers stream.StringTopic = "ready-servers"
)
// ReadyServerInfo includes information about a server that is ready
// to handle incoming requests.
type ReadyServerInfo struct {
ID string
Address string
TaggedAddresses map[string]string
Version string
}
func (info *ReadyServerInfo) Equal(other *ReadyServerInfo) bool {
if info.ID != other.ID {
return false
}
if info.Version != other.Version {
return false
}
if info.Address != other.Address {
return false
}
if len(info.TaggedAddresses) != len(other.TaggedAddresses) {
return false
}
for tag, infoAddr := range info.TaggedAddresses {
if otherAddr, ok := other.TaggedAddresses[tag]; !ok || infoAddr != otherAddr {
return false
}
}
return true
}
// EventPayloadReadyServers
type EventPayloadReadyServers []ReadyServerInfo
func (e EventPayloadReadyServers) Subject() stream.Subject { return stream.SubjectNone }
func (e EventPayloadReadyServers) HasReadPermission(authz acl.Authorizer) bool {
// Any service in the mesh will need access to where the servers live. Therefore
// we check if the authorizer grants permissions on any service and if so then
// we allow seeing where the servers are.
var authzContext acl.AuthorizerContext
structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier).
FillAuthzContext(&authzContext)
return authz.ServiceWriteAny(&authzContext) == acl.Allow
}
func ExtractEventPayload(event stream.Event) (EventPayloadReadyServers, error) {
if event.Topic != EventTopicReadyServers {
return nil, fmt.Errorf("unexpected topic (%q) for a %q event", event.Topic, EventTopicReadyServers)
}
if payload, ok := event.Payload.(EventPayloadReadyServers); ok {
return payload, nil
}
return nil, fmt.Errorf("unexpected payload type %T for %q event", event.Payload, EventTopicReadyServers)
}
type Config struct {
GetStore func() StateStore
Publisher Publisher
timeProvider timeProvider
}
// ReadyServersEventPublisher is capable to tracking changes to ready servers
// between consecutive calls to PublishReadyServersEvents. It will then publish
// "ready-servers" events as necessary.
type ReadyServersEventPublisher struct {
Config
previous EventPayloadReadyServers
snapshotLock sync.RWMutex
snapshot []stream.Event
}
func NewReadyServersEventPublisher(config Config) *ReadyServersEventPublisher {
return &ReadyServersEventPublisher{
Config: config,
snapshot: []stream.Event{
{
Topic: EventTopicReadyServers,
Index: 0,
Payload: EventPayloadReadyServers{},
},
},
}
}
//go:generate mockery -name StateStore -inpkg -testonly
type StateStore interface {
GetNodeID(types.NodeID, *acl.EnterpriseMeta) (uint64, *structs.Node, error)
}
//go:generate mockery -name Publisher -inpkg -testonly
type Publisher interface {
Publish([]stream.Event)
}
//go:generate mockery -name timeProvider -inpkg -testonly
type timeProvider interface {
Now() time.Time
}
// PublishReadyServersEvents will publish a "ready-servers" event if the list of
// ready servers has changed since the last time events were published.
func (r *ReadyServersEventPublisher) PublishReadyServersEvents(state *autopilot.State) {
if events, ok := r.readyServersEvents(state); ok {
// update the latest snapshot so that any new event subscription will see
// use the latest state.
r.snapshotLock.Lock()
r.snapshot = events
r.snapshotLock.Unlock()
// if the event publisher were to not be able to keep up with procesing events
// then its possible this blocks. It could cause autopilot to not update its
// state as often as it should. However if this blocks for over 10s then
// not updating the autopilot state as quickly is likely the least of our
// concerns. If we need to make this async then we probably need to single
// flight these to ensure proper event ordering.
r.Publisher.Publish(events)
}
}
func (r *ReadyServersEventPublisher) readyServersEvents(state *autopilot.State) ([]stream.Event, bool) {
// First, we need to pull all the ready servers out from the autopilot state.
servers := r.autopilotStateToReadyServers(state)
// Next we, sort the servers list to make comparison easier later on. We do
// this outside of the next length check conditional block to ensure that all
// values of previousReadyServers we store will be sorted and the future
// comparison's will remain valid.
sort.Slice(servers, func(i, j int) bool {
// no two servers can have the same id so this is sufficient
return servers[i].ID < servers[j].ID
})
// If the number of ready servers hasn't changed then we need to inspect individual
// servers to see if there are differences. If the number of servers has changed
// we know that an event should be generated and sent.
if len(r.previous) == len(servers) {
diff := false
// We are relying on the fact that both of the slices will be sorted and that
// we don't care what the actual differences are but instead just that they
// have differences.
for i := 0; i < len(servers); i++ {
if !r.previous[i].Equal(&servers[i]) {
diff = true
break
}
}
// The list of ready servers is identical to the previous ones. Therefore
// we will not send any event.
if !diff {
return nil, false
}
}
r.previous = servers
return []stream.Event{r.newReadyServersEvent(servers)}, true
}
// autopilotStateToReadyServers will iterate through all servers in the autopilot
// state and compile a list of servers which are "ready". Readiness means that
// they would be an acceptable target for stale queries.
func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopilot.State) EventPayloadReadyServers {
var servers EventPayloadReadyServers
for _, srv := range state.Servers {
// All healthy servers are caught up enough to be included in a ready servers.
// Servers with voting rights that are still healthy according to Serf are
// also included as they have likely just fallen behind the leader a little
// after initially replicating state. They are still acceptable targets
// for most stale queries and clients can bound the staleness if necessary.
// Including them is a means to prevent flapping the list of servers we
// advertise as ready and flooding the network with notifications to all
// dataplanes of server updates.
//
// TODO (agentless) for a non-voting server that is still alive but fell
// behind, should we cause it to be removed. For voters we know they were caught
// up at some point but for non-voters we cannot know the same thing.
if srv.Health.Healthy || (srv.HasVotingRights() && srv.Server.NodeStatus == autopilot.NodeAlive) {
// autopilot information contains addresses in the <host>:<port> form. We only care about the
// the host so we parse it out here and discard the port.
host, err := extractHost(string(srv.Server.Address))
if err != nil || host == "" {
continue
}
servers = append(servers, ReadyServerInfo{
ID: string(srv.Server.ID),
Address: host,
Version: srv.Server.Version,
TaggedAddresses: r.getTaggedAddresses(srv),
})
}
}
return servers
}
// getTaggedAddresses will get the tagged addresses for the given server or return nil
// if it encounters an error or unregistered server.
func (r *ReadyServersEventPublisher) getTaggedAddresses(srv *autopilot.ServerState) map[string]string {
// we have no callback to lookup the tagged addresses so we can return early
if r.GetStore == nil {
return nil
}
// Assuming we have been provided a callback to get a state store implementation, then
// we will attempt to lookup the node for the autopilot server. We use this to get the
// tagged addresses so that consumers of these events will be able to distinguish LAN
// vs WAN addresses as well as IP protocol differentiation. At first I thought we may
// need to hook into catalog events so that if the tagged addresses change then
// we can synthesize new events. That would be pretty complex so this code does not
// deal with that. The reasoning why that is probably okay is that autopilot will
// send us the state at least once every 30s. That means that we will grab the nodes
// from the catalog at that often and publish the events. So while its not quite
// as responsive as actually watching for the Catalog changes, its MUCH simpler to
// code and reason about and having those addresses be updated within 30s is good enough.
_, node, err := r.GetStore().GetNodeID(types.NodeID(srv.Server.ID), structs.NodeEnterpriseMetaInDefaultPartition())
if err != nil || node == nil {
// no catalog information means we should return a nil addres map
return nil
}
if len(node.TaggedAddresses) == 0 {
return nil
}
addrs := make(map[string]string)
for tag, address := range node.TaggedAddresses {
// just like for the Nodes main Address, we only care about the IPs and not the
// port so we parse the host out and discard the port.
host, err := extractHost(address)
if err != nil || host == "" {
continue
}
addrs[tag] = host
}
return addrs
}
// newReadyServersEvent will create a stream.Event with the provided ready server info.
func (r *ReadyServersEventPublisher) newReadyServersEvent(servers EventPayloadReadyServers) stream.Event {
now := time.Now()
if r.timeProvider != nil {
now = r.timeProvider.Now()
}
return stream.Event{
Topic: EventTopicReadyServers,
Index: uint64(now.UnixMicro()),
Payload: servers,
}
}
// HandleSnapshot is the EventPublisher callback to generate a snapshot for the "ready-servers" event streams.
func (r *ReadyServersEventPublisher) HandleSnapshot(_ stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
r.snapshotLock.RLock()
defer r.snapshotLock.RUnlock()
buf.Append(r.snapshot)
return r.snapshot[0].Index, nil
}
// extractHost is a small convenience function to catch errors regarding
// missing ports from the net.SplitHostPort function.
func extractHost(addr string) (string, error) {
host, _, err := net.SplitHostPort(addr)
if err == nil {
return host, nil
}
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
return addr, nil
}
return "", err
}

View File

@ -0,0 +1,635 @@
package autopilotevents
import (
"testing"
time "time"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
structs "github.com/hashicorp/consul/agent/structs"
types "github.com/hashicorp/consul/types"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
mock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
var testTime = time.Date(2022, 4, 14, 10, 56, 00, 0, time.UTC)
var exampleState = &autopilot.State{
Servers: map[raft.ServerID]*autopilot.ServerState{
"792ae13c-d765-470b-852c-e073fdb6e849": {
Health: autopilot.ServerHealth{
Healthy: true,
},
State: autopilot.RaftLeader,
Server: autopilot.Server{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2:8300",
Version: "v1.12.0",
NodeStatus: autopilot.NodeAlive,
},
},
"65e79ff4-bbce-467b-a9d6-725c709fa985": {
Health: autopilot.ServerHealth{
Healthy: true,
},
State: autopilot.RaftVoter,
Server: autopilot.Server{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3:8300",
Version: "v1.12.0",
NodeStatus: autopilot.NodeAlive,
},
},
// this server is up according to Serf but is unhealthy
// due to having an index that is behind
"db11f0ac-0cbe-4215-80cc-b4e843f4df1e": {
Health: autopilot.ServerHealth{
Healthy: false,
},
State: autopilot.RaftVoter,
Server: autopilot.Server{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4:8300",
Version: "v1.12.0",
NodeStatus: autopilot.NodeAlive,
},
},
// this server is up according to Serf but is unhealthy
// due to having an index that is behind. It is a non-voter
// and thus will be filtered out
"4c48a154-8176-4e14-ba5d-20bf1f784a7e": {
Health: autopilot.ServerHealth{
Healthy: false,
},
State: autopilot.RaftNonVoter,
Server: autopilot.Server{
ID: "4c48a154-8176-4e14-ba5d-20bf1f784a7e",
Address: "198.18.0.5:8300",
Version: "v1.12.0",
NodeStatus: autopilot.NodeAlive,
},
},
// this is a voter that has died
"7a22eec8-de85-43a6-a76e-00b427ef6627": {
Health: autopilot.ServerHealth{
Healthy: false,
},
State: autopilot.RaftVoter,
Server: autopilot.Server{
ID: "7a22eec8-de85-43a6-a76e-00b427ef6627",
Address: "198.18.0.6",
Version: "v1.12.0",
NodeStatus: autopilot.NodeFailed,
},
},
},
}
func TestEventPayloadReadyServers_HasReadPermission(t *testing.T) {
t.Run("no service:write", func(t *testing.T) {
hasRead := EventPayloadReadyServers{}.HasReadPermission(acl.DenyAll())
require.False(t, hasRead)
})
t.Run("has service:write", func(t *testing.T) {
policy, err := acl.NewPolicyFromSource(`
service "foo" {
policy = "write"
}
`, acl.SyntaxCurrent, nil, nil)
require.NoError(t, err)
authz, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
require.NoError(t, err)
hasRead := EventPayloadReadyServers{}.HasReadPermission(authz)
require.True(t, hasRead)
})
}
func TestAutopilotStateToReadyServers(t *testing.T) {
expected := EventPayloadReadyServers{
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
Version: "v1.12.0",
},
}
r := ReadyServersEventPublisher{}
actual := r.autopilotStateToReadyServers(exampleState)
require.ElementsMatch(t, expected, actual)
}
func TestAutopilotStateToReadyServersWithTaggedAddresses(t *testing.T) {
expected := EventPayloadReadyServers{
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
TaggedAddresses: map[string]string{"wan": "5.4.3.2"},
Version: "v1.12.0",
},
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
TaggedAddresses: map[string]string{"wan": "1.2.3.4"},
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
TaggedAddresses: map[string]string{"wan": "9.8.7.6"},
Version: "v1.12.0",
},
}
store := &MockStateStore{}
t.Cleanup(func() { store.AssertExpectations(t) })
store.On("GetNodeID",
types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"),
structs.NodeEnterpriseMetaInDefaultPartition(),
).Once().Return(
uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "5.4.3.2"}},
nil,
)
store.On("GetNodeID",
types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"),
structs.NodeEnterpriseMetaInDefaultPartition(),
).Once().Return(
uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "1.2.3.4"}},
nil,
)
store.On("GetNodeID",
types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"),
structs.NodeEnterpriseMetaInDefaultPartition(),
).Once().Return(
uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "9.8.7.6"}},
nil,
)
r := NewReadyServersEventPublisher(Config{
GetStore: func() StateStore { return store },
})
actual := r.autopilotStateToReadyServers(exampleState)
require.ElementsMatch(t, expected, actual)
}
func TestAutopilotReadyServersEvents(t *testing.T) {
// we have already tested the ReadyServerInfo extraction within the
// TestAutopilotStateToReadyServers test. Therefore this test is going
// to focus only on the change detection.
//
// * - added server
// * - removed server
// * - server with address changed
// * - upgraded server with version change
expectedServers := EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
Version: "v1.12.0",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
Version: "v1.12.0",
},
}
type testCase struct {
// The elements of this slice must already be sorted
previous EventPayloadReadyServers
changeDetected bool
}
cases := map[string]testCase{
"no-change": {
previous: EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
Version: "v1.12.0",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
Version: "v1.12.0",
},
},
changeDetected: false,
},
"server-added": {
previous: EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
Version: "v1.12.0",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
// server with id db11f0ac-0cbe-4215-80cc-b4e843f4df1e will be added.
},
changeDetected: true,
},
"server-removed": {
previous: EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
Version: "v1.12.0",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
// this server isn't present in the state and will be removed
{
ID: "7e3235de-8a75-4c8d-9ec3-847ca87d07e8",
Address: "198.18.0.5",
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
Version: "v1.12.0",
},
},
changeDetected: true,
},
"address-change": {
previous: EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
// this value is different from the state and should
// cause an event to be generated
Address: "198.18.0.9",
Version: "v1.12.0",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
Version: "v1.12.0",
},
},
changeDetected: true,
},
"upgraded-version": {
previous: EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
// This is v1.12.0 in the state and therefore an
// event should be generated
Version: "v1.11.4",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
Version: "v1.12.0",
},
},
changeDetected: true,
},
}
for name, tcase := range cases {
t.Run(name, func(t *testing.T) {
r := ReadyServersEventPublisher{
previous: tcase.previous,
}
events, changeDetected := r.readyServersEvents(exampleState)
require.Equal(t, tcase.changeDetected, changeDetected, "servers: %+v", events)
if tcase.changeDetected {
require.Len(t, events, 1)
require.Equal(t, EventTopicReadyServers, events[0].Topic)
payload, ok := events[0].Payload.(EventPayloadReadyServers)
require.True(t, ok)
require.ElementsMatch(t, expectedServers, payload)
} else {
require.Empty(t, events)
}
})
}
}
func TestAutopilotPublishReadyServersEvents(t *testing.T) {
t.Run("publish", func(t *testing.T) {
pub := &MockPublisher{}
pub.On("Publish", []stream.Event{
{
Topic: EventTopicReadyServers,
Index: uint64(testTime.UnixMicro()),
Payload: EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
Version: "v1.12.0",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
Version: "v1.12.0",
},
},
},
})
mtime := &mockTimeProvider{}
mtime.On("Now").Return(testTime).Once()
t.Cleanup(func() {
mtime.AssertExpectations(t)
pub.AssertExpectations(t)
})
r := NewReadyServersEventPublisher(Config{
Publisher: pub,
timeProvider: mtime,
})
r.PublishReadyServersEvents(exampleState)
})
t.Run("suppress", func(t *testing.T) {
pub := &MockPublisher{}
mtime := &mockTimeProvider{}
t.Cleanup(func() {
mtime.AssertExpectations(t)
pub.AssertExpectations(t)
})
r := NewReadyServersEventPublisher(Config{
Publisher: pub,
timeProvider: mtime,
})
r.previous = EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
Version: "v1.12.0",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
Version: "v1.12.0",
},
}
r.PublishReadyServersEvents(exampleState)
})
}
type MockAppender struct {
mock.Mock
}
func (m *MockAppender) Append(events []stream.Event) {
m.Called(events)
}
func TestReadyServerEventsSnapshotHandler(t *testing.T) {
buf := MockAppender{}
buf.On("Append", []stream.Event{
{
Topic: EventTopicReadyServers,
Index: 0,
Payload: EventPayloadReadyServers{},
},
})
buf.On("Append", []stream.Event{
{
Topic: EventTopicReadyServers,
Index: 1649933760000000,
Payload: EventPayloadReadyServers{
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
TaggedAddresses: map[string]string{"wan": "1.2.3.4"},
Version: "v1.12.0",
},
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
TaggedAddresses: map[string]string{"wan": "5.4.3.2"},
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
TaggedAddresses: map[string]string{"wan": "9.8.7.6"},
Version: "v1.12.0",
},
},
},
}).Once()
mtime := mockTimeProvider{}
mtime.On("Now").Return(testTime).Once()
store := &MockStateStore{}
t.Cleanup(func() { store.AssertExpectations(t) })
store.On("GetNodeID",
types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"),
structs.NodeEnterpriseMetaInDefaultPartition(),
).Once().Return(
uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "5.4.3.2"}},
nil,
)
store.On("GetNodeID",
types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"),
structs.NodeEnterpriseMetaInDefaultPartition(),
).Once().Return(
uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "1.2.3.4"}},
nil,
)
store.On("GetNodeID",
types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"),
structs.NodeEnterpriseMetaInDefaultPartition(),
).Once().Return(
uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "9.8.7.6"}},
nil,
)
t.Cleanup(func() {
buf.AssertExpectations(t)
store.AssertExpectations(t)
mtime.AssertExpectations(t)
})
r := NewReadyServersEventPublisher(Config{
GetStore: func() StateStore { return store },
timeProvider: &mtime,
})
req := stream.SubscribeRequest{
Topic: EventTopicReadyServers,
Subject: stream.SubjectNone,
}
// get the first snapshot that should have the zero value event
_, err := r.HandleSnapshot(req, &buf)
require.NoError(t, err)
// setup the value to be returned by the snapshot handler
r.snapshot, _ = r.readyServersEvents(exampleState)
// now get the second snapshot which has actual servers
_, err = r.HandleSnapshot(req, &buf)
require.NoError(t, err)
}
type fakePayload struct{}
func (e fakePayload) Subject() stream.Subject { return stream.SubjectNone }
func (e fakePayload) HasReadPermission(authz acl.Authorizer) bool {
return false
}
func TestExtractEventPayload(t *testing.T) {
t.Run("wrong-topic", func(t *testing.T) {
payload, err := ExtractEventPayload(stream.NewCloseSubscriptionEvent([]string{"foo"}))
require.Nil(t, payload)
require.Error(t, err)
require.Contains(t, err.Error(), "unexpected topic")
})
t.Run("unexpected-payload", func(t *testing.T) {
payload, err := ExtractEventPayload(stream.Event{
Topic: EventTopicReadyServers,
Payload: fakePayload{},
})
require.Nil(t, payload)
require.Error(t, err)
require.Contains(t, err.Error(), "unexpected payload type")
})
t.Run("success", func(t *testing.T) {
expected := EventPayloadReadyServers{
{
ID: "a7c340ae-ce17-47da-895c-af2509767b3d",
Address: "198.18.0.1",
Version: "1.2.3",
},
}
actual, err := ExtractEventPayload(stream.Event{
Topic: EventTopicReadyServers,
Payload: expected,
})
require.NoError(t, err)
require.Equal(t, expected, actual)
})
}
func TestReadyServerInfo_Equal(t *testing.T) {
base := func() *ReadyServerInfo {
return &ReadyServerInfo{
ID: "0356e5ae-ed6b-4024-b953-e1b6a8f0f81b",
Version: "1.12.0",
Address: "198.18.0.1",
TaggedAddresses: map[string]string{
"wan": "1.2.3.4",
},
}
}
type testCase struct {
modify func(i *ReadyServerInfo)
equal bool
}
cases := map[string]testCase{
"unmodified": {
equal: true,
},
"id-mod": {
modify: func(i *ReadyServerInfo) { i.ID = "30f8f451-e54b-4c7e-a533-b55dddb51be6" },
},
"version-mod": {
modify: func(i *ReadyServerInfo) { i.Version = "1.12.1" },
},
"address-mod": {
modify: func(i *ReadyServerInfo) { i.Address = "198.18.0.2" },
},
"tagged-addresses-added": {
modify: func(i *ReadyServerInfo) { i.TaggedAddresses["wan_ipv4"] = "1.2.3.4" },
},
"tagged-addresses-mod": {
modify: func(i *ReadyServerInfo) { i.TaggedAddresses["wan"] = "4.3.2.1" },
},
}
for name, tcase := range cases {
t.Run(name, func(t *testing.T) {
original := base()
modified := base()
if tcase.modify != nil {
tcase.modify(modified)
}
require.Equal(t, tcase.equal, original.Equal(modified))
})
}
}

View File

@ -12,13 +12,7 @@ import (
//
// Note: topics are ordinarily defined in subscribe.proto, but this one isn't
// currently available via the Subscribe endpoint.
const EventTopicCARoots stringer = "CARoots"
// stringer is a convenience type to turn a regular string into a fmt.Stringer
// so that it can be used as a stream.Topic or stream.Subject.
type stringer string
func (s stringer) String() string { return string(s) }
const EventTopicCARoots stream.StringTopic = "CARoots"
type EventPayloadCARoots struct {
CARoots structs.CARoots

View File

@ -26,7 +26,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: topicService,
Subject: stringer("nope"),
Subject: stream.StringSubject("nope"),
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@ -73,7 +73,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: topicService,
Subject: stringer("nope"),
Subject: stream.StringSubject("nope"),
Token: token.SecretID,
}
sub2, err := publisher.Subscribe(subscription2)
@ -114,7 +114,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: topicService,
Subject: stringer("nope"),
Subject: stream.StringSubject("nope"),
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@ -165,7 +165,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: topicService,
Subject: stringer("nope"),
Subject: stream.StringSubject("nope"),
Token: token.SecretID,
}
sub, err = publisher.Subscribe(subscription2)
@ -194,7 +194,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
// Register another subscription.
subscription3 := &stream.SubscribeRequest{
Topic: topicService,
Subject: stringer("nope"),
Subject: stream.StringSubject("nope"),
Token: token.SecretID,
}
sub, err = publisher.Subscribe(subscription3)
@ -236,7 +236,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
// Register the subscription.
subscription := &stream.SubscribeRequest{
Topic: topicService,
Subject: stringer("nope"),
Subject: stream.StringSubject("nope"),
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
@ -282,7 +282,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Topic: topicService,
Subject: stringer("nope"),
Subject: stream.StringSubject("nope"),
Token: token.SecretID,
}
sub, err = publisher.Subscribe(subscription2)
@ -431,7 +431,7 @@ func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
}
func (p nodePayload) Subject() stream.Subject {
return stringer(p.key)
return stream.StringSubject(p.key)
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
@ -459,7 +459,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo
// continuing...
req := &stream.SubscribeRequest{
Topic: topicService,
Subject: stringer("nope"),
Subject: stream.StringSubject("nope"),
Token: token.SecretID,
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)

View File

@ -22,11 +22,7 @@ type Subject fmt.Stringer
// SubjectNone is used when all events on a given topic are "global" and not
// further partitioned by subject. For example: the "CA Roots" topic which is
// used to notify subscribers when the global set CA root certificates changes.
const SubjectNone stringer = "none"
type stringer string
func (s stringer) String() string { return string(s) }
const SubjectNone StringSubject = "none"
// Event is a structure with identifiers and a payload. Events are Published to
// EventPublisher and returned to Subscribers.

View File

@ -22,7 +22,7 @@ var testTopic Topic = intTopic(999)
func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Subject: stringer("sub-key"),
Subject: StringSubject("sub-key"),
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -82,7 +82,7 @@ func (p simplePayload) HasReadPermission(acl.Authorizer) bool {
return !p.noReadPerm
}
func (p simplePayload) Subject() Subject { return stringer(p.key) }
func (p simplePayload) Subject() Subject { return StringSubject(p.key) }
func registerTestSnapshotHandlers(t *testing.T, publisher *EventPublisher) {
t.Helper()
@ -188,7 +188,7 @@ func consumeSub(ctx context.Context, sub *Subscription) error {
func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Subject: stringer("sub-key"),
Subject: StringSubject("sub-key"),
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -234,7 +234,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Subject: stringer("sub-key"),
Subject: StringSubject("sub-key"),
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -288,7 +288,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Subject: stringer("sub-key"),
Subject: StringSubject("sub-key"),
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -345,7 +345,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Subject: stringer("sub-key"),
Subject: StringSubject("sub-key"),
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@ -414,7 +414,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Subject: stringer("sub-key"),
Subject: StringSubject("sub-key"),
Index: 1,
}
@ -499,7 +499,7 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Subject: stringer("sub-key"),
Subject: StringSubject("sub-key"),
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@ -522,7 +522,7 @@ func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) {
func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Subject: stringer("sub-key"),
Subject: StringSubject("sub-key"),
}
publisher := NewEventPublisher(time.Second)

View File

@ -0,0 +1,11 @@
package stream
// StringSubject can be used as a Subject for Events sent to the EventPublisher
type StringSubject string
func (s StringSubject) String() string { return string(s) }
// StringTopic can be used as a Topic for Events sent to the EventPublisher
type StringTopic string
func (s StringTopic) String() string { return string(s) }

View File

@ -29,7 +29,7 @@ func TestSubscription(t *testing.T) {
req := SubscribeRequest{
Topic: testTopic,
Subject: stringer("test"),
Subject: StringSubject("test"),
}
sub := newSubscription(req, startHead, noopUnSub)
@ -103,7 +103,7 @@ func TestSubscription_Close(t *testing.T) {
req := SubscribeRequest{
Topic: testTopic,
Subject: stringer("test"),
Subject: StringSubject("test"),
}
sub := newSubscription(req, startHead, noopUnSub)