|
|
|
@ -8,15 +8,107 @@ import (
|
|
|
|
|
"github.com/google/go-cmp/cmp/cmpopts"
|
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/proto/pbcommon"
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/agent/consul/stream"
|
|
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
|
|
|
"github.com/hashicorp/consul/proto/pbcommon"
|
|
|
|
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
|
|
|
|
"github.com/hashicorp/consul/types"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func TestServiceHealthSnapshot(t *testing.T) {
|
|
|
|
|
store := NewStateStore(nil)
|
|
|
|
|
|
|
|
|
|
counter := newIndexCounter()
|
|
|
|
|
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web"))
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2))
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth)
|
|
|
|
|
buf := &snapshotAppender{}
|
|
|
|
|
req := stream.SubscribeRequest{Key: "web"}
|
|
|
|
|
|
|
|
|
|
idx, err := fn(req, buf)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.Equal(t, counter.Last(), idx)
|
|
|
|
|
|
|
|
|
|
expected := [][]stream.Event{
|
|
|
|
|
{
|
|
|
|
|
testServiceHealthEvent(t, "web", func(e *stream.Event) error {
|
|
|
|
|
e.Index = counter.Last()
|
|
|
|
|
csn := getPayloadCheckServiceNode(e.Payload)
|
|
|
|
|
csn.Node.CreateIndex = 1
|
|
|
|
|
csn.Node.ModifyIndex = 1
|
|
|
|
|
csn.Service.CreateIndex = 2
|
|
|
|
|
csn.Service.ModifyIndex = 2
|
|
|
|
|
csn.Checks[0].CreateIndex = 1
|
|
|
|
|
csn.Checks[0].ModifyIndex = 1
|
|
|
|
|
csn.Checks[1].CreateIndex = 2
|
|
|
|
|
csn.Checks[1].ModifyIndex = 2
|
|
|
|
|
return nil
|
|
|
|
|
}),
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
testServiceHealthEvent(t, "web", evNode2, func(e *stream.Event) error {
|
|
|
|
|
e.Index = counter.Last()
|
|
|
|
|
csn := getPayloadCheckServiceNode(e.Payload)
|
|
|
|
|
csn.Node.CreateIndex = 3
|
|
|
|
|
csn.Node.ModifyIndex = 3
|
|
|
|
|
csn.Service.CreateIndex = 3
|
|
|
|
|
csn.Service.ModifyIndex = 3
|
|
|
|
|
for i := range csn.Checks {
|
|
|
|
|
csn.Checks[i].CreateIndex = 3
|
|
|
|
|
csn.Checks[i].ModifyIndex = 3
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}),
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
assertDeepEqual(t, expected, buf.events, cmpEvents)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type snapshotAppender struct {
|
|
|
|
|
events [][]stream.Event
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *snapshotAppender) Append(events []stream.Event) {
|
|
|
|
|
s.events = append(s.events, events)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type indexCounter struct {
|
|
|
|
|
value uint64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *indexCounter) Next() uint64 {
|
|
|
|
|
c.value++
|
|
|
|
|
return c.value
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *indexCounter) Last() uint64 {
|
|
|
|
|
return c.value
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newIndexCounter() *indexCounter {
|
|
|
|
|
return &indexCounter{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var _ stream.SnapshotAppender = (*snapshotAppender)(nil)
|
|
|
|
|
|
|
|
|
|
func evIndexes(idx, create, modify uint64) func(e *stream.Event) error {
|
|
|
|
|
return func(e *stream.Event) error {
|
|
|
|
|
e.Index = idx
|
|
|
|
|
csn := getPayloadCheckServiceNode(e.Payload)
|
|
|
|
|
csn.Node.CreateIndex = create
|
|
|
|
|
csn.Node.ModifyIndex = modify
|
|
|
|
|
csn.Service.CreateIndex = create
|
|
|
|
|
csn.Service.ModifyIndex = modify
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestServiceHealthEventsFromChanges(t *testing.T) {
|
|
|
|
|
cases := []struct {
|
|
|
|
|
Name string
|
|
|
|
|