diff --git a/agent/cache-types/streaming_events_test.go b/agent/cache-types/streaming_events_test.go index 05bc3649fd..67d4257fee 100644 --- a/agent/cache-types/streaming_events_test.go +++ b/agent/cache-types/streaming_events_test.go @@ -9,17 +9,15 @@ import ( "github.com/hashicorp/consul/types" ) -func newEndOfSnapshotEvent(topic pbsubscribe.Topic, index uint64) *pbsubscribe.Event { +func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event { return &pbsubscribe.Event{ - Topic: topic, Index: index, Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, } } -func newNewSnapshotToFollowEvent(topic pbsubscribe.Topic) *pbsubscribe.Event { +func newNewSnapshotToFollowEvent() *pbsubscribe.Event { return &pbsubscribe.Event{ - Topic: topic, Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, } } @@ -37,7 +35,6 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) return &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, Key: svc, Index: index, Payload: &pbsubscribe.Event_ServiceHealth{ @@ -117,7 +114,6 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs node := fmt.Sprintf("node%d", nodeNum) return &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, Key: svc, Index: index, Payload: &pbsubscribe.Event_ServiceHealth{ @@ -164,7 +160,6 @@ func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event events[i+1] = evs[i] } return &pbsubscribe.Event{ - Topic: first.Topic, Index: first.Index, Payload: &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{Events: events}, diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 3e794611b6..5c963c7a8a 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -26,7 +26,7 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { // Initially there are no services registered. Server should send an // EndOfSnapshot message immediately with index of 1. - client.QueueEvents(newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 1)) + client.QueueEvents(newEndOfSnapshotEvent(1)) opts := cache.FetchOptions{ MinIndex: 0, @@ -230,7 +230,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { registerServiceWeb(5, 1), registerServiceWeb(5, 2), registerServiceWeb(5, 3), - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) + newEndOfSnapshotEvent(5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ @@ -301,7 +301,7 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { registerServiceWeb(50, 3), // overlap existing node registerServiceWeb(50, 4), registerServiceWeb(50, 5), - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) + newEndOfSnapshotEvent(50)) // Make another blocking query with THE SAME index. It should immediately // return the new snapshot. @@ -324,11 +324,11 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { client.QueueErr(tempError("temporary connection error")) client.QueueEvents( - newNewSnapshotToFollowEvent(pbsubscribe.Topic_ServiceHealth), + newNewSnapshotToFollowEvent(), registerServiceWeb(50, 3), // overlap existing node registerServiceWeb(50, 4), registerServiceWeb(50, 5), - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 50)) + newEndOfSnapshotEvent(50)) start := time.Now() opts.MinIndex = 49 @@ -358,7 +358,7 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { newEventServiceHealthRegister(5, 3, "web")) client.QueueEvents( batchEv, - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) + newEndOfSnapshotEvent(5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ @@ -428,7 +428,7 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { newEventServiceHealthRegister(5, 3, "web")) client.QueueEvents( batchEv, - newEndOfSnapshotEvent(pbsubscribe.Topic_ServiceHealth, 5)) + newEndOfSnapshotEvent(5)) // This contains the view state so important we share it between calls. opts := cache.FetchOptions{ diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index b7eda488e4..9fc1dc6532 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -83,7 +83,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub } elog.Trace(event) - e := newEventFromStreamEvent(req.Topic, event) + e := newEventFromStreamEvent(event) if err := serverStream.Send(e); err != nil { return err } @@ -139,12 +139,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) return event.Filter(fn) } -func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsubscribe.Event { - e := &pbsubscribe.Event{ - Topic: topic, - Key: event.Key, - Index: event.Index, - } +func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { + e := &pbsubscribe.Event{Key: event.Key, Index: event.Index} switch { case event.IsEndOfSnapshot(): e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true} diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index 60d73b3367..bf91763b09 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -107,7 +107,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { runStep(t, "receive the initial snapshot of events", func(t *testing.T) { expected := []*pbsubscribe.Event{ { - Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_ServiceHealth{ @@ -139,7 +138,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { }, }, { - Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_ServiceHealth{ @@ -171,7 +169,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { }, }, { - Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.For("reg3"), Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, @@ -192,7 +189,6 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { event := getEvent(t, chEvents) expectedEvent := &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ @@ -463,7 +459,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { runStep(t, "receive the initial snapshot of events", func(t *testing.T) { expected := []*pbsubscribe.Event{ { - Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ @@ -495,7 +490,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }, }, { - Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ @@ -527,7 +521,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }, }, { - Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, @@ -548,7 +541,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { event := getEvent(t, chEvents) expectedEvent := &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ @@ -905,11 +897,9 @@ func TestNewEventFromSteamEvent(t *testing.T) { expected pbsubscribe.Event } - testTopic := pbsubscribe.Topic_ServiceHealthConnect fn := func(t *testing.T, tc testCase) { expected := tc.expected - expected.Topic = testTopic - actual := newEventFromStreamEvent(testTopic, tc.event) + actual := newEventFromStreamEvent(tc.event) assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty()) } diff --git a/proto/pbsubscribe/subscribe.pb.go b/proto/pbsubscribe/subscribe.pb.go index 2b90c2bfa7..aae083dd92 100644 --- a/proto/pbsubscribe/subscribe.pb.go +++ b/proto/pbsubscribe/subscribe.pb.go @@ -6,14 +6,16 @@ package pbsubscribe import ( context "context" fmt "fmt" - proto "github.com/golang/protobuf/proto" - pbservice "github.com/hashicorp/consul/proto/pbservice" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" io "io" math "math" math_bits "math/bits" + + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + + pbservice "github.com/hashicorp/consul/proto/pbservice" ) // Reference imports to suppress errors if they are not otherwise used. @@ -198,16 +200,14 @@ func (m *SubscribeRequest) GetNamespace() string { // describe the current "snapshot" of the result as well as ongoing mutations to // that snapshot. type Event struct { - // Topic the event was published to - Topic Topic `protobuf:"varint,1,opt,name=Topic,proto3,enum=subscribe.Topic" json:"Topic,omitempty"` // Key is the logical identifier for the entity that was mutated. - Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"` + Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"` // Index is the raft index at which the mutation took place. At the top // level of a subscription there will always be at most one Event per index. // If multiple events are published to the same topic in a single raft // transaction then the batch of events will be encoded inside a single // top-level event to ensure they are delivered atomically to clients. - Index uint64 `protobuf:"varint,3,opt,name=Index,proto3" json:"Index,omitempty"` + Index uint64 `protobuf:"varint,2,opt,name=Index,proto3" json:"Index,omitempty"` // Payload is the actual event content. // // Types that are valid to be assigned to Payload: @@ -261,13 +261,13 @@ type isEvent_Payload interface { } type Event_EndOfSnapshot struct { - EndOfSnapshot bool `protobuf:"varint,5,opt,name=EndOfSnapshot,proto3,oneof"` + EndOfSnapshot bool `protobuf:"varint,3,opt,name=EndOfSnapshot,proto3,oneof"` } type Event_NewSnapshotToFollow struct { - NewSnapshotToFollow bool `protobuf:"varint,6,opt,name=NewSnapshotToFollow,proto3,oneof"` + NewSnapshotToFollow bool `protobuf:"varint,4,opt,name=NewSnapshotToFollow,proto3,oneof"` } type Event_EventBatch struct { - EventBatch *EventBatch `protobuf:"bytes,7,opt,name=EventBatch,proto3,oneof"` + EventBatch *EventBatch `protobuf:"bytes,5,opt,name=EventBatch,proto3,oneof"` } type Event_ServiceHealth struct { ServiceHealth *ServiceHealthUpdate `protobuf:"bytes,10,opt,name=ServiceHealth,proto3,oneof"` @@ -285,13 +285,6 @@ func (m *Event) GetPayload() isEvent_Payload { return nil } -func (m *Event) GetTopic() Topic { - if m != nil { - return m.Topic - } - return Topic_Unknown -} - func (m *Event) GetKey() string { if m != nil { return m.Key @@ -353,17 +346,17 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { if x.EndOfSnapshot { t = 1 } - _ = b.EncodeVarint(5<<3 | proto.WireVarint) + _ = b.EncodeVarint(3<<3 | proto.WireVarint) _ = b.EncodeVarint(t) case *Event_NewSnapshotToFollow: t := uint64(0) if x.NewSnapshotToFollow { t = 1 } - _ = b.EncodeVarint(6<<3 | proto.WireVarint) + _ = b.EncodeVarint(4<<3 | proto.WireVarint) _ = b.EncodeVarint(t) case *Event_EventBatch: - _ = b.EncodeVarint(7<<3 | proto.WireBytes) + _ = b.EncodeVarint(5<<3 | proto.WireBytes) if err := b.EncodeMessage(x.EventBatch); err != nil { return err } @@ -382,21 +375,21 @@ func _Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { func _Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { m := msg.(*Event) switch tag { - case 5: // Payload.EndOfSnapshot + case 3: // Payload.EndOfSnapshot if wire != proto.WireVarint { return true, proto.ErrInternalBadWireType } x, err := b.DecodeVarint() m.Payload = &Event_EndOfSnapshot{x != 0} return true, err - case 6: // Payload.NewSnapshotToFollow + case 4: // Payload.NewSnapshotToFollow if wire != proto.WireVarint { return true, proto.ErrInternalBadWireType } x, err := b.DecodeVarint() m.Payload = &Event_NewSnapshotToFollow{x != 0} return true, err - case 7: // Payload.EventBatch + case 5: // Payload.EventBatch if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -558,41 +551,41 @@ func init() { func init() { proto.RegisterFile("proto/pbsubscribe/subscribe.proto", fileDescriptor_ab3eb8c810e315fb) } var fileDescriptor_ab3eb8c810e315fb = []byte{ - // 543 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x53, 0xcd, 0x6e, 0xd3, 0x4c, - 0x14, 0xf5, 0x24, 0xfd, 0xf3, 0xcd, 0xd7, 0xca, 0xdf, 0xb4, 0x08, 0x2b, 0x45, 0x56, 0x88, 0x50, - 0x65, 0x2a, 0x11, 0xa3, 0x20, 0xc1, 0x0e, 0x44, 0xd2, 0x96, 0x20, 0xa4, 0x04, 0x39, 0xed, 0x02, - 0x76, 0x13, 0xfb, 0x12, 0x5b, 0x71, 0x67, 0x8c, 0x3d, 0x69, 0xe8, 0x9e, 0x87, 0xe0, 0x49, 0xd8, - 0xb2, 0x65, 0xc9, 0x23, 0xa0, 0xf0, 0x22, 0x28, 0x13, 0xc7, 0x71, 0x92, 0xee, 0xd8, 0xcd, 0x3d, - 0x3f, 0xe3, 0xa3, 0x39, 0xbe, 0xf0, 0x30, 0x4e, 0x84, 0x14, 0x4e, 0x3c, 0x48, 0xc7, 0x83, 0xd4, - 0x4b, 0xc2, 0x01, 0x3a, 0xf9, 0xa9, 0xa1, 0x38, 0xaa, 0xe7, 0x40, 0xb5, 0x9a, 0xab, 0x31, 0xb9, - 0x09, 0x3d, 0x74, 0xb8, 0xf0, 0x33, 0x59, 0xfd, 0x3b, 0x01, 0xa3, 0xbf, 0x50, 0xba, 0xf8, 0x79, - 0x8c, 0xa9, 0xa4, 0x27, 0xb0, 0x7d, 0x29, 0xe2, 0xd0, 0x33, 0x49, 0x8d, 0xd8, 0x07, 0x4d, 0xa3, - 0xb1, 0xbc, 0x5c, 0xe1, 0xee, 0x9c, 0xa6, 0x06, 0x94, 0xdf, 0xe1, 0xad, 0x59, 0xaa, 0x11, 0x5b, - 0x77, 0x67, 0x47, 0x7a, 0x34, 0x73, 0x8e, 0x90, 0x9b, 0x65, 0x85, 0xcd, 0x87, 0x19, 0xfa, 0x96, - 0xfb, 0xf8, 0xc5, 0xdc, 0xaa, 0x11, 0x7b, 0xcb, 0x9d, 0x0f, 0xd4, 0x02, 0x38, 0x63, 0x92, 0x79, - 0xc8, 0x25, 0x26, 0xe6, 0xb6, 0x32, 0x14, 0x10, 0xfa, 0x00, 0xf4, 0x2e, 0xbb, 0xc6, 0x34, 0x66, - 0x1e, 0x9a, 0x3b, 0x8a, 0x5e, 0x02, 0xf5, 0x1f, 0x25, 0xd8, 0x3e, 0xbf, 0x41, 0xfe, 0x8f, 0x69, - 0xe7, 0xb9, 0xca, 0xc5, 0x5c, 0x27, 0xb0, 0x7f, 0xce, 0xfd, 0xde, 0xa7, 0x3e, 0x67, 0x71, 0x1a, - 0x08, 0xa9, 0xa2, 0xed, 0x75, 0x34, 0x77, 0x15, 0xa6, 0x4d, 0x38, 0xec, 0xe2, 0x64, 0x31, 0x5e, - 0x8a, 0x0b, 0x11, 0x45, 0x62, 0xa2, 0x92, 0xce, 0xd4, 0x77, 0x91, 0xf4, 0x05, 0x80, 0x0a, 0xdd, - 0x62, 0xd2, 0x0b, 0xcc, 0xdd, 0x1a, 0xb1, 0x2b, 0xcd, 0x7b, 0x85, 0xc0, 0x4b, 0xb2, 0xa3, 0xb9, - 0x05, 0x29, 0xbd, 0x80, 0xfd, 0xfe, 0xbc, 0xbd, 0x0e, 0xb2, 0x48, 0x06, 0x26, 0x28, 0xaf, 0x55, - 0xf0, 0xae, 0xf0, 0x57, 0xb1, 0xcf, 0x24, 0xce, 0x42, 0xaf, 0xc0, 0x2d, 0x1d, 0x76, 0xdf, 0xb3, - 0xdb, 0x48, 0x30, 0xbf, 0xfe, 0xbc, 0x98, 0x85, 0xda, 0xb0, 0xa3, 0xa6, 0xd4, 0x24, 0xb5, 0xb2, - 0x5d, 0x59, 0x79, 0x46, 0x45, 0xb8, 0x19, 0x5f, 0xff, 0x4a, 0xe0, 0xf0, 0x8e, 0x6f, 0xd1, 0x47, - 0x50, 0xea, 0xc5, 0x59, 0x09, 0x47, 0x05, 0x77, 0x9b, 0x49, 0x16, 0x89, 0x61, 0x2f, 0x76, 0x4b, - 0xbd, 0x98, 0xbe, 0x01, 0xa3, 0x1d, 0xa0, 0x37, 0xca, 0x6e, 0xe8, 0x0a, 0x1f, 0x55, 0x25, 0x95, - 0xe6, 0x71, 0x23, 0xff, 0x43, 0x1b, 0xeb, 0x12, 0x77, 0xc3, 0x74, 0xfa, 0x3a, 0xab, 0x9d, 0x56, - 0x60, 0xf7, 0x8a, 0x8f, 0xb8, 0x98, 0x70, 0x43, 0xa3, 0xff, 0xaf, 0xbd, 0x93, 0x41, 0xa8, 0x09, - 0x47, 0x2b, 0x50, 0x5b, 0x70, 0x8e, 0x9e, 0x34, 0x4a, 0xa7, 0x8f, 0x41, 0xcf, 0xc3, 0xd1, 0xff, - 0x60, 0xcf, 0xc5, 0x61, 0x98, 0x4a, 0x4c, 0x0c, 0x8d, 0x1e, 0x00, 0x9c, 0x61, 0xb2, 0x98, 0x49, - 0xf3, 0x03, 0xdc, 0xef, 0x4b, 0x26, 0xb1, 0x1d, 0x30, 0x3e, 0xc4, 0x6c, 0x63, 0x62, 0x19, 0x0a, - 0x4e, 0x5f, 0x82, 0x9e, 0x6f, 0x10, 0x3d, 0x2e, 0x16, 0xb2, 0xb6, 0x57, 0xd5, 0x8d, 0x37, 0xad, - 0x6b, 0x4f, 0x49, 0xeb, 0xd5, 0xcf, 0xa9, 0x45, 0x7e, 0x4d, 0x2d, 0xf2, 0x7b, 0x6a, 0x91, 0x6f, - 0x7f, 0x2c, 0xed, 0xe3, 0x93, 0x61, 0x28, 0x83, 0xf1, 0xa0, 0xe1, 0x89, 0x6b, 0x27, 0x60, 0x69, - 0x10, 0x7a, 0x22, 0x89, 0x1d, 0x4f, 0xf0, 0x74, 0x1c, 0x39, 0x1b, 0xab, 0x3f, 0xd8, 0x51, 0xd0, - 0xb3, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x45, 0xc8, 0x3f, 0xd7, 0x16, 0x04, 0x00, 0x00, + // 542 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xf6, 0xba, 0xbf, 0x9e, 0xd0, 0xca, 0x6c, 0x8b, 0xb0, 0x5a, 0x64, 0x85, 0x08, 0x55, 0xa1, + 0x12, 0x31, 0x0a, 0x12, 0xdc, 0x40, 0x24, 0x6d, 0x09, 0x42, 0x4a, 0x90, 0xd3, 0x1e, 0xe0, 0xb6, + 0xb1, 0x87, 0xd8, 0x8a, 0xbb, 0x6b, 0xec, 0x4d, 0x43, 0xef, 0xbc, 0x03, 0x3c, 0x09, 0xcf, 0xc0, + 0x91, 0x47, 0x40, 0xe1, 0x45, 0x50, 0x36, 0x8e, 0x63, 0x27, 0xbd, 0x79, 0xbe, 0x9f, 0xdd, 0xcf, + 0xb3, 0x33, 0xf0, 0x38, 0x4e, 0x84, 0x14, 0x4e, 0x3c, 0x48, 0xc7, 0x83, 0xd4, 0x4b, 0xc2, 0x01, + 0x3a, 0xf9, 0x57, 0x43, 0x71, 0xd4, 0xc8, 0x81, 0xa3, 0xa3, 0x5c, 0x8d, 0xc9, 0x4d, 0xe8, 0xa1, + 0xc3, 0x85, 0x9f, 0xc9, 0x6a, 0xbf, 0x08, 0x98, 0xfd, 0x85, 0xd2, 0xc5, 0xaf, 0x63, 0x4c, 0x25, + 0x3d, 0x81, 0xad, 0x4b, 0x11, 0x87, 0x9e, 0x45, 0xaa, 0xa4, 0xbe, 0xdf, 0x34, 0x1b, 0xcb, 0xc3, + 0x15, 0xee, 0xce, 0x69, 0x6a, 0xc2, 0xc6, 0x07, 0xbc, 0xb5, 0xf4, 0x2a, 0xa9, 0x1b, 0xee, 0xec, + 0x93, 0x1e, 0xce, 0x9c, 0x23, 0xe4, 0xd6, 0x86, 0xc2, 0xe6, 0xc5, 0x0c, 0x7d, 0xcf, 0x7d, 0xfc, + 0x66, 0x6d, 0x56, 0x49, 0x7d, 0xd3, 0x9d, 0x17, 0xd4, 0x06, 0x38, 0x63, 0x92, 0x79, 0xc8, 0x25, + 0x26, 0xd6, 0x96, 0x32, 0x14, 0x10, 0xfa, 0x08, 0x8c, 0x2e, 0xbb, 0xc6, 0x34, 0x66, 0x1e, 0x5a, + 0xdb, 0x8a, 0x5e, 0x02, 0xb5, 0x1f, 0x3a, 0x6c, 0x9d, 0xdf, 0x20, 0x97, 0x8b, 0x14, 0xa4, 0x94, + 0x62, 0x7e, 0x9f, 0x5e, 0xbc, 0xef, 0x04, 0xf6, 0xce, 0xb9, 0xdf, 0xfb, 0xd2, 0xe7, 0x2c, 0x4e, + 0x03, 0x21, 0x55, 0xc6, 0xdd, 0x8e, 0xe6, 0x96, 0x61, 0xda, 0x84, 0x83, 0x2e, 0x4e, 0x16, 0xe5, + 0xa5, 0xb8, 0x10, 0x51, 0x24, 0x26, 0x2a, 0xfb, 0x4c, 0x7d, 0x17, 0x49, 0x5f, 0x01, 0xa8, 0x30, + 0x2d, 0x26, 0xbd, 0x40, 0xfd, 0x4b, 0xa5, 0xf9, 0xa0, 0xd0, 0xb6, 0x25, 0xd9, 0xd1, 0xdc, 0x82, + 0x94, 0x5e, 0xc0, 0x5e, 0x7f, 0xfe, 0x2a, 0x1d, 0x64, 0x91, 0x0c, 0x2c, 0x50, 0x5e, 0xbb, 0xe0, + 0x2d, 0xf1, 0x57, 0xb1, 0xcf, 0x24, 0xce, 0x42, 0x97, 0xe0, 0x96, 0x01, 0x3b, 0x1f, 0xd9, 0x6d, + 0x24, 0x98, 0x5f, 0x7b, 0x59, 0xcc, 0x42, 0xeb, 0xb0, 0xad, 0xaa, 0xd4, 0x22, 0xd5, 0x8d, 0x7a, + 0xa5, 0xf4, 0x98, 0x8a, 0x70, 0x33, 0xbe, 0xf6, 0x9d, 0xc0, 0xc1, 0x1d, 0x77, 0xd1, 0x27, 0xa0, + 0xf7, 0xe2, 0x6c, 0x14, 0x0e, 0x0b, 0xee, 0x36, 0x93, 0x2c, 0x12, 0xc3, 0x5e, 0xec, 0xea, 0xbd, + 0x98, 0xbe, 0x03, 0xb3, 0x1d, 0xa0, 0x37, 0xca, 0x4e, 0xe8, 0x0a, 0x1f, 0x55, 0xfb, 0x2b, 0xcd, + 0xe3, 0x46, 0x3e, 0x79, 0x8d, 0x55, 0x89, 0xbb, 0x66, 0x3a, 0x7d, 0x9b, 0x0d, 0x1f, 0xad, 0xc0, + 0xce, 0x15, 0x1f, 0x71, 0x31, 0xe1, 0xa6, 0x46, 0xef, 0xaf, 0xf4, 0xc9, 0x24, 0xd4, 0x82, 0xc3, + 0x12, 0xd4, 0x16, 0x9c, 0xa3, 0x27, 0x4d, 0xfd, 0xf4, 0x29, 0x18, 0x79, 0x38, 0x7a, 0x0f, 0x76, + 0x5d, 0x1c, 0x86, 0xa9, 0xc4, 0xc4, 0xd4, 0xe8, 0x3e, 0xc0, 0x19, 0x26, 0x8b, 0x9a, 0x34, 0x3f, + 0xc1, 0xc3, 0xbe, 0x64, 0x12, 0xdb, 0x01, 0xe3, 0x43, 0xcc, 0x36, 0x21, 0x96, 0xa1, 0xe0, 0xf4, + 0x35, 0x18, 0xf9, 0x66, 0xd0, 0xe3, 0xe2, 0x83, 0xac, 0xec, 0xcb, 0xd1, 0x5a, 0x4f, 0x6b, 0xda, + 0x73, 0xd2, 0x7a, 0xf3, 0x7b, 0x6a, 0x93, 0x3f, 0x53, 0x9b, 0xfc, 0x9d, 0xda, 0xe4, 0xe7, 0x3f, + 0x5b, 0xfb, 0xfc, 0x6c, 0x18, 0xca, 0x60, 0x3c, 0x68, 0x78, 0xe2, 0xda, 0x09, 0x58, 0x1a, 0x84, + 0x9e, 0x48, 0x62, 0xc7, 0x13, 0x3c, 0x1d, 0x47, 0xce, 0xda, 0x4a, 0x0f, 0xb6, 0x15, 0xf4, 0xe2, + 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x20, 0xe5, 0xc9, 0x9d, 0xee, 0x03, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -841,19 +834,14 @@ func (m *Event) MarshalToSizedBuffer(dAtA []byte) (int, error) { if m.Index != 0 { i = encodeVarintSubscribe(dAtA, i, uint64(m.Index)) i-- - dAtA[i] = 0x18 + dAtA[i] = 0x10 } if len(m.Key) > 0 { i -= len(m.Key) copy(dAtA[i:], m.Key) i = encodeVarintSubscribe(dAtA, i, uint64(len(m.Key))) i-- - dAtA[i] = 0x12 - } - if m.Topic != 0 { - i = encodeVarintSubscribe(dAtA, i, uint64(m.Topic)) - i-- - dAtA[i] = 0x8 + dAtA[i] = 0xa } return len(dAtA) - i, nil } @@ -871,7 +859,7 @@ func (m *Event_EndOfSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0 } i-- - dAtA[i] = 0x28 + dAtA[i] = 0x18 return len(dAtA) - i, nil } func (m *Event_NewSnapshotToFollow) MarshalTo(dAtA []byte) (int, error) { @@ -887,7 +875,7 @@ func (m *Event_NewSnapshotToFollow) MarshalToSizedBuffer(dAtA []byte) (int, erro dAtA[i] = 0 } i-- - dAtA[i] = 0x30 + dAtA[i] = 0x20 return len(dAtA) - i, nil } func (m *Event_EventBatch) MarshalTo(dAtA []byte) (int, error) { @@ -906,7 +894,7 @@ func (m *Event_EventBatch) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintSubscribe(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x3a + dAtA[i] = 0x2a } return len(dAtA) - i, nil } @@ -1066,9 +1054,6 @@ func (m *Event) Size() (n int) { } var l int _ = l - if m.Topic != 0 { - n += 1 + sovSubscribe(uint64(m.Topic)) - } l = len(m.Key) if l > 0 { n += 1 + l + sovSubscribe(uint64(l)) @@ -1420,25 +1405,6 @@ func (m *Event) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) - } - m.Topic = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowSubscribe - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Topic |= Topic(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) } @@ -1470,7 +1436,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { } m.Key = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 3: + case 2: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType) } @@ -1489,7 +1455,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { break } } - case 5: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field EndOfSnapshot", wireType) } @@ -1510,7 +1476,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { } b := bool(v != 0) m.Payload = &Event_EndOfSnapshot{b} - case 6: + case 4: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field NewSnapshotToFollow", wireType) } @@ -1531,7 +1497,7 @@ func (m *Event) Unmarshal(dAtA []byte) error { } b := bool(v != 0) m.Payload = &Event_NewSnapshotToFollow{b} - case 7: + case 5: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field EventBatch", wireType) } diff --git a/proto/pbsubscribe/subscribe.proto b/proto/pbsubscribe/subscribe.proto index d23ea7da80..9d1147922c 100644 --- a/proto/pbsubscribe/subscribe.proto +++ b/proto/pbsubscribe/subscribe.proto @@ -85,37 +85,34 @@ message SubscribeRequest { // describe the current "snapshot" of the result as well as ongoing mutations to // that snapshot. message Event { - // Topic the event was published to - Topic Topic = 1; - // Key is the logical identifier for the entity that was mutated. - string Key = 2; + string Key = 1; // Index is the raft index at which the mutation took place. At the top // level of a subscription there will always be at most one Event per index. // If multiple events are published to the same topic in a single raft // transaction then the batch of events will be encoded inside a single // top-level event to ensure they are delivered atomically to clients. - uint64 Index = 3; + uint64 Index = 2; // Payload is the actual event content. oneof Payload { // EndOfSnapshot indicates the event stream for the initial snapshot has // ended. Subsequent Events delivered will be mutations to that result. - bool EndOfSnapshot = 5; + bool EndOfSnapshot = 3; // NewSnapshotToFollow indicates that the client view is stale. The client // must reset its view before handing any more events. Subsequent events // in the stream will be for a new snapshot until an EndOfSnapshot event // is received. - bool NewSnapshotToFollow = 6; + bool NewSnapshotToFollow = 4; // EventBatch is a set of events. This is typically used as the payload // type where multiple events are emitted in a single topic and raft // index (e.g. transactional updates). In this case the Topic and Index // values of all events will match and the whole set should be delivered // and consumed atomically. - EventBatch EventBatch = 7; + EventBatch EventBatch = 5; // ServiceHealth is used for ServiceHealth and ServiceHealthConnect // topics.