mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
810 lines
22 KiB
810 lines
22 KiB
package peering |
|
|
|
import ( |
|
"context" |
|
"io" |
|
"testing" |
|
"time" |
|
|
|
"github.com/golang/protobuf/ptypes" |
|
"github.com/stretchr/testify/require" |
|
"google.golang.org/genproto/googleapis/rpc/code" |
|
"google.golang.org/grpc" |
|
"google.golang.org/grpc/codes" |
|
"google.golang.org/grpc/status" |
|
|
|
"github.com/hashicorp/consul/agent/consul/state" |
|
"github.com/hashicorp/consul/agent/consul/stream" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/proto/pbpeering" |
|
"github.com/hashicorp/consul/proto/pbservice" |
|
"github.com/hashicorp/consul/proto/pbstatus" |
|
"github.com/hashicorp/consul/proto/prototest" |
|
"github.com/hashicorp/consul/sdk/testutil" |
|
"github.com/hashicorp/consul/sdk/testutil/retry" |
|
) |
|
|
|
func TestStreamResources_Server_FirstRequest(t *testing.T) { |
|
type testCase struct { |
|
name string |
|
input *pbpeering.ReplicationMessage |
|
wantErr error |
|
} |
|
|
|
run := func(t *testing.T, tc testCase) { |
|
srv := NewService(testutil.Logger(t), nil) |
|
client := newMockClient(context.Background()) |
|
|
|
errCh := make(chan error, 1) |
|
client.errCh = errCh |
|
|
|
go func() { |
|
// Pass errors from server handler into errCh so that they can be seen by the client on Recv(). |
|
// This matches gRPC's behavior when an error is returned by a server. |
|
err := srv.StreamResources(client.replicationStream) |
|
if err != nil { |
|
errCh <- err |
|
} |
|
}() |
|
|
|
err := client.Send(tc.input) |
|
require.NoError(t, err) |
|
|
|
msg, err := client.Recv() |
|
require.Nil(t, msg) |
|
require.Error(t, err) |
|
require.EqualError(t, err, tc.wantErr.Error()) |
|
} |
|
|
|
tt := []testCase{ |
|
{ |
|
name: "unexpected response", |
|
input: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Response_{ |
|
Response: &pbpeering.ReplicationMessage_Response{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
ResourceID: "api-service", |
|
Nonce: "2", |
|
}, |
|
}, |
|
}, |
|
wantErr: status.Error(codes.InvalidArgument, "first message when initiating a peering must be a subscription request"), |
|
}, |
|
{ |
|
name: "missing peer id", |
|
input: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{}, |
|
}, |
|
}, |
|
wantErr: status.Error(codes.InvalidArgument, "initial subscription request must specify a PeerID"), |
|
}, |
|
{ |
|
name: "unexpected nonce", |
|
input: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
PeerID: "63b60245-c475-426b-b314-4588d210859d", |
|
Nonce: "1", |
|
}, |
|
}, |
|
}, |
|
wantErr: status.Error(codes.InvalidArgument, "initial subscription request must not contain a nonce"), |
|
}, |
|
{ |
|
name: "unknown resource", |
|
input: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
PeerID: "63b60245-c475-426b-b314-4588d210859d", |
|
ResourceURL: "nomad.Job", |
|
}, |
|
}, |
|
}, |
|
wantErr: status.Error(codes.InvalidArgument, "subscription request to unknown resource URL: nomad.Job"), |
|
}, |
|
} |
|
|
|
for _, tc := range tt { |
|
t.Run(tc.name, func(t *testing.T) { |
|
run(t, tc) |
|
}) |
|
} |
|
|
|
} |
|
|
|
func TestStreamResources_Server_Terminate(t *testing.T) { |
|
publisher := stream.NewEventPublisher(10 * time.Second) |
|
store := newStateStore(t, publisher) |
|
|
|
srv := NewService(testutil.Logger(t), &testStreamBackend{ |
|
store: store, |
|
pub: publisher, |
|
}) |
|
|
|
it := incrementalTime{ |
|
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), |
|
} |
|
srv.streams.timeNow = it.Now |
|
|
|
client := newMockClient(context.Background()) |
|
|
|
errCh := make(chan error, 1) |
|
client.errCh = errCh |
|
|
|
go func() { |
|
// Pass errors from server handler into errCh so that they can be seen by the client on Recv(). |
|
// This matches gRPC's behavior when an error is returned by a server. |
|
if err := srv.StreamResources(client.replicationStream); err != nil { |
|
errCh <- err |
|
} |
|
}() |
|
|
|
// Receive a subscription from a peer |
|
peerID := "63b60245-c475-426b-b314-4588d210859d" |
|
sub := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
PeerID: peerID, |
|
ResourceURL: pbpeering.TypeURLService, |
|
}, |
|
}, |
|
} |
|
err := client.Send(sub) |
|
require.NoError(t, err) |
|
|
|
runStep(t, "new stream gets tracked", func(t *testing.T) { |
|
retry.Run(t, func(r *retry.R) { |
|
status, ok := srv.StreamStatus(peerID) |
|
require.True(r, ok) |
|
require.True(r, status.Connected) |
|
}) |
|
}) |
|
|
|
// Receive subscription to my-peer-B's resources |
|
receivedSub, err := client.Recv() |
|
require.NoError(t, err) |
|
|
|
expect := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
PeerID: peerID, |
|
}, |
|
}, |
|
} |
|
prototest.AssertDeepEqual(t, expect, receivedSub) |
|
|
|
runStep(t, "terminate the stream", func(t *testing.T) { |
|
done := srv.ConnectedStreams()[peerID] |
|
close(done) |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
_, ok := srv.StreamStatus(peerID) |
|
require.False(r, ok) |
|
}) |
|
}) |
|
|
|
receivedTerm, err := client.Recv() |
|
require.NoError(t, err) |
|
expect = &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Terminated_{ |
|
Terminated: &pbpeering.ReplicationMessage_Terminated{}, |
|
}, |
|
} |
|
prototest.AssertDeepEqual(t, expect, receivedTerm) |
|
} |
|
|
|
func TestStreamResources_Server_StreamTracker(t *testing.T) { |
|
publisher := stream.NewEventPublisher(10 * time.Second) |
|
store := newStateStore(t, publisher) |
|
|
|
srv := NewService(testutil.Logger(t), &testStreamBackend{ |
|
store: store, |
|
pub: publisher, |
|
}) |
|
|
|
it := incrementalTime{ |
|
base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), |
|
} |
|
srv.streams.timeNow = it.Now |
|
|
|
client := newMockClient(context.Background()) |
|
|
|
errCh := make(chan error, 1) |
|
go func() { |
|
errCh <- srv.StreamResources(client.replicationStream) |
|
}() |
|
|
|
peerID := "63b60245-c475-426b-b314-4588d210859d" |
|
sub := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
PeerID: peerID, |
|
ResourceURL: pbpeering.TypeURLService, |
|
}, |
|
}, |
|
} |
|
err := client.Send(sub) |
|
require.NoError(t, err) |
|
|
|
runStep(t, "new stream gets tracked", func(t *testing.T) { |
|
retry.Run(t, func(r *retry.R) { |
|
status, ok := srv.StreamStatus(peerID) |
|
require.True(r, ok) |
|
require.True(r, status.Connected) |
|
}) |
|
}) |
|
|
|
runStep(t, "client receives initial subscription", func(t *testing.T) { |
|
ack, err := client.Recv() |
|
require.NoError(t, err) |
|
|
|
expectAck := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
PeerID: peerID, |
|
Nonce: "", |
|
}, |
|
}, |
|
} |
|
prototest.AssertDeepEqual(t, expectAck, ack) |
|
}) |
|
|
|
var sequence uint64 |
|
var lastSendSuccess time.Time |
|
|
|
runStep(t, "ack tracked as success", func(t *testing.T) { |
|
ack := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
PeerID: peerID, |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "1", |
|
|
|
// Acks do not have an Error populated in the request |
|
}, |
|
}, |
|
} |
|
err := client.Send(ack) |
|
require.NoError(t, err) |
|
sequence++ |
|
|
|
lastSendSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC() |
|
|
|
expect := StreamStatus{ |
|
Connected: true, |
|
LastAck: lastSendSuccess, |
|
} |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
status, ok := srv.StreamStatus(peerID) |
|
require.True(r, ok) |
|
require.Equal(r, expect, status) |
|
}) |
|
}) |
|
|
|
var lastNack time.Time |
|
var lastNackMsg string |
|
|
|
runStep(t, "nack tracked as error", func(t *testing.T) { |
|
nack := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
PeerID: peerID, |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "2", |
|
Error: &pbstatus.Status{ |
|
Code: int32(code.Code_UNAVAILABLE), |
|
Message: "bad bad not good", |
|
}, |
|
}, |
|
}, |
|
} |
|
err := client.Send(nack) |
|
require.NoError(t, err) |
|
sequence++ |
|
|
|
lastNackMsg = "client peer was unable to apply resource: bad bad not good" |
|
lastNack = it.base.Add(time.Duration(sequence) * time.Second).UTC() |
|
|
|
expect := StreamStatus{ |
|
Connected: true, |
|
LastAck: lastSendSuccess, |
|
LastNack: lastNack, |
|
LastNackMessage: lastNackMsg, |
|
} |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
status, ok := srv.StreamStatus(peerID) |
|
require.True(r, ok) |
|
require.Equal(r, expect, status) |
|
}) |
|
}) |
|
|
|
var lastRecvSuccess time.Time |
|
|
|
runStep(t, "response applied locally", func(t *testing.T) { |
|
resp := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Response_{ |
|
Response: &pbpeering.ReplicationMessage_Response{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
ResourceID: "api", |
|
Nonce: "21", |
|
Operation: pbpeering.ReplicationMessage_Response_UPSERT, |
|
}, |
|
}, |
|
} |
|
err := client.Send(resp) |
|
require.NoError(t, err) |
|
sequence++ |
|
|
|
ack, err := client.Recv() |
|
require.NoError(t, err) |
|
|
|
expectAck := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "21", |
|
}, |
|
}, |
|
} |
|
prototest.AssertDeepEqual(t, expectAck, ack) |
|
|
|
lastRecvSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC() |
|
|
|
expect := StreamStatus{ |
|
Connected: true, |
|
LastAck: lastSendSuccess, |
|
LastNack: lastNack, |
|
LastNackMessage: lastNackMsg, |
|
LastReceiveSuccess: lastRecvSuccess, |
|
} |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
status, ok := srv.StreamStatus(peerID) |
|
require.True(r, ok) |
|
require.Equal(r, expect, status) |
|
}) |
|
}) |
|
|
|
var lastRecvError time.Time |
|
var lastRecvErrorMsg string |
|
|
|
runStep(t, "response fails to apply locally", func(t *testing.T) { |
|
resp := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Response_{ |
|
Response: &pbpeering.ReplicationMessage_Response{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
ResourceID: "web", |
|
Nonce: "24", |
|
|
|
// Unknown operation gets NACKed |
|
Operation: pbpeering.ReplicationMessage_Response_Unknown, |
|
}, |
|
}, |
|
} |
|
err := client.Send(resp) |
|
require.NoError(t, err) |
|
sequence++ |
|
|
|
ack, err := client.Recv() |
|
require.NoError(t, err) |
|
|
|
expectNack := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "24", |
|
Error: &pbstatus.Status{ |
|
Code: int32(code.Code_INVALID_ARGUMENT), |
|
Message: `unsupported operation: "Unknown"`, |
|
}, |
|
}, |
|
}, |
|
} |
|
prototest.AssertDeepEqual(t, expectNack, ack) |
|
|
|
lastRecvError = it.base.Add(time.Duration(sequence) * time.Second).UTC() |
|
lastRecvErrorMsg = `unsupported operation: "Unknown"` |
|
|
|
expect := StreamStatus{ |
|
Connected: true, |
|
LastAck: lastSendSuccess, |
|
LastNack: lastNack, |
|
LastNackMessage: lastNackMsg, |
|
LastReceiveSuccess: lastRecvSuccess, |
|
LastReceiveError: lastRecvError, |
|
LastReceiveErrorMessage: lastRecvErrorMsg, |
|
} |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
status, ok := srv.StreamStatus(peerID) |
|
require.True(r, ok) |
|
require.Equal(r, expect, status) |
|
}) |
|
}) |
|
|
|
runStep(t, "client disconnect marks stream as disconnected", func(t *testing.T) { |
|
client.Close() |
|
|
|
sequence++ |
|
lastRecvError := it.base.Add(time.Duration(sequence) * time.Second).UTC() |
|
|
|
sequence++ |
|
disconnectTime := it.base.Add(time.Duration(sequence) * time.Second).UTC() |
|
|
|
expect := StreamStatus{ |
|
Connected: false, |
|
LastAck: lastSendSuccess, |
|
LastNack: lastNack, |
|
LastNackMessage: lastNackMsg, |
|
DisconnectTime: disconnectTime, |
|
LastReceiveSuccess: lastRecvSuccess, |
|
LastReceiveErrorMessage: io.EOF.Error(), |
|
LastReceiveError: lastRecvError, |
|
} |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
status, ok := srv.StreamStatus(peerID) |
|
require.True(r, ok) |
|
require.Equal(r, expect, status) |
|
}) |
|
}) |
|
|
|
select { |
|
case err := <-errCh: |
|
// Client disconnect is not an error, but should make the handler return. |
|
require.NoError(t, err) |
|
case <-time.After(50 * time.Millisecond): |
|
t.Fatalf("timed out waiting for handler to finish") |
|
} |
|
} |
|
|
|
func TestStreamResources_Server_ServiceUpdates(t *testing.T) { |
|
publisher := stream.NewEventPublisher(10 * time.Second) |
|
store := newStateStore(t, publisher) |
|
|
|
// Create a peering |
|
var lastIdx uint64 = 1 |
|
err := store.PeeringWrite(lastIdx, &pbpeering.Peering{ |
|
Name: "my-peering", |
|
}) |
|
require.NoError(t, err) |
|
|
|
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peering"}) |
|
require.NoError(t, err) |
|
require.NotNil(t, p) |
|
|
|
srv := NewService(testutil.Logger(t), &testStreamBackend{ |
|
store: store, |
|
pub: publisher, |
|
}) |
|
|
|
client := newMockClient(context.Background()) |
|
|
|
errCh := make(chan error, 1) |
|
client.errCh = errCh |
|
|
|
go func() { |
|
// Pass errors from server handler into errCh so that they can be seen by the client on Recv(). |
|
// This matches gRPC's behavior when an error is returned by a server. |
|
if err := srv.StreamResources(client.replicationStream); err != nil { |
|
errCh <- err |
|
} |
|
}() |
|
|
|
// Issue a services subscription to server |
|
init := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
PeerID: p.ID, |
|
ResourceURL: pbpeering.TypeURLService, |
|
}, |
|
}, |
|
} |
|
require.NoError(t, client.Send(init)) |
|
|
|
// Receive a services subscription from server |
|
receivedSub, err := client.Recv() |
|
require.NoError(t, err) |
|
|
|
expect := &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
PeerID: p.ID, |
|
}, |
|
}, |
|
} |
|
prototest.AssertDeepEqual(t, expect, receivedSub) |
|
|
|
// Register a service that is not yet exported |
|
mysql := &structs.CheckServiceNode{ |
|
Node: &structs.Node{Node: "foo", Address: "10.0.0.1"}, |
|
Service: &structs.NodeService{ID: "mysql-1", Service: "mysql", Port: 5000}, |
|
} |
|
|
|
lastIdx++ |
|
require.NoError(t, store.EnsureNode(lastIdx, mysql.Node)) |
|
|
|
lastIdx++ |
|
require.NoError(t, store.EnsureService(lastIdx, "foo", mysql.Service)) |
|
|
|
runStep(t, "exporting mysql leads to an UPSERT event", func(t *testing.T) { |
|
entry := &structs.ExportedServicesConfigEntry{ |
|
Name: "default", |
|
Services: []structs.ExportedService{ |
|
{ |
|
Name: "mysql", |
|
Consumers: []structs.ServiceConsumer{ |
|
{ |
|
PeerName: "my-peering", |
|
}, |
|
}, |
|
}, |
|
{ |
|
// Mongo does not get pushed because it does not have instances registered. |
|
Name: "mongo", |
|
Consumers: []structs.ServiceConsumer{ |
|
{ |
|
PeerName: "my-peering", |
|
}, |
|
}, |
|
}, |
|
}, |
|
} |
|
lastIdx++ |
|
err = store.EnsureConfigEntry(lastIdx, entry) |
|
require.NoError(t, err) |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
msg, err := client.RecvWithTimeout(100 * time.Millisecond) |
|
require.NoError(r, err) |
|
require.Equal(r, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) |
|
require.Equal(r, mysql.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) |
|
|
|
var nodes pbservice.IndexedCheckServiceNodes |
|
require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) |
|
require.Len(r, nodes.Nodes, 1) |
|
}) |
|
}) |
|
|
|
mongo := &structs.CheckServiceNode{ |
|
Node: &structs.Node{Node: "zip", Address: "10.0.0.3"}, |
|
Service: &structs.NodeService{ID: "mongo-1", Service: "mongo", Port: 5000}, |
|
} |
|
|
|
runStep(t, "registering mongo instance leads to an UPSERT event", func(t *testing.T) { |
|
lastIdx++ |
|
require.NoError(t, store.EnsureNode(lastIdx, mongo.Node)) |
|
|
|
lastIdx++ |
|
require.NoError(t, store.EnsureService(lastIdx, "zip", mongo.Service)) |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
msg, err := client.RecvWithTimeout(100 * time.Millisecond) |
|
require.NoError(r, err) |
|
require.Equal(r, pbpeering.ReplicationMessage_Response_UPSERT, msg.GetResponse().Operation) |
|
require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) |
|
|
|
var nodes pbservice.IndexedCheckServiceNodes |
|
require.NoError(r, ptypes.UnmarshalAny(msg.GetResponse().Resource, &nodes)) |
|
require.Len(r, nodes.Nodes, 1) |
|
}) |
|
}) |
|
|
|
runStep(t, "un-exporting mysql leads to a DELETE event for mysql", func(t *testing.T) { |
|
entry := &structs.ExportedServicesConfigEntry{ |
|
Name: "default", |
|
Services: []structs.ExportedService{ |
|
{ |
|
Name: "mongo", |
|
Consumers: []structs.ServiceConsumer{ |
|
{ |
|
PeerName: "my-peering", |
|
}, |
|
}, |
|
}, |
|
}, |
|
} |
|
lastIdx++ |
|
err = store.EnsureConfigEntry(lastIdx, entry) |
|
require.NoError(t, err) |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
msg, err := client.RecvWithTimeout(100 * time.Millisecond) |
|
require.NoError(r, err) |
|
require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) |
|
require.Equal(r, mysql.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) |
|
require.Nil(r, msg.GetResponse().Resource) |
|
}) |
|
}) |
|
|
|
runStep(t, "deleting the config entry leads to a DELETE event for mongo", func(t *testing.T) { |
|
lastIdx++ |
|
err = store.DeleteConfigEntry(lastIdx, structs.ExportedServices, "default", nil) |
|
require.NoError(t, err) |
|
|
|
retry.Run(t, func(r *retry.R) { |
|
msg, err := client.RecvWithTimeout(100 * time.Millisecond) |
|
require.NoError(r, err) |
|
require.Equal(r, pbpeering.ReplicationMessage_Response_DELETE, msg.GetResponse().Operation) |
|
require.Equal(r, mongo.Service.CompoundServiceName().String(), msg.GetResponse().ResourceID) |
|
require.Nil(r, msg.GetResponse().Resource) |
|
}) |
|
}) |
|
} |
|
|
|
type testStreamBackend struct { |
|
pub state.EventPublisher |
|
store *state.Store |
|
} |
|
|
|
func (b *testStreamBackend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) { |
|
return b.pub.Subscribe(req) |
|
} |
|
|
|
func (b *testStreamBackend) Store() Store { |
|
return b.store |
|
} |
|
|
|
func (b *testStreamBackend) Forward(info structs.RPCInfo, f func(conn *grpc.ClientConn) error) (handled bool, err error) { |
|
return true, nil |
|
} |
|
|
|
func (b *testStreamBackend) GetAgentCACertificates() ([]string, error) { |
|
return []string{}, nil |
|
} |
|
|
|
func (b *testStreamBackend) GetServerAddresses() ([]string, error) { |
|
return []string{}, nil |
|
} |
|
|
|
func (b *testStreamBackend) GetServerName() string { |
|
return "" |
|
} |
|
|
|
func (b *testStreamBackend) EncodeToken(tok *structs.PeeringToken) ([]byte, error) { |
|
return nil, nil |
|
} |
|
|
|
func (b *testStreamBackend) DecodeToken([]byte) (*structs.PeeringToken, error) { |
|
return nil, nil |
|
} |
|
|
|
func (b *testStreamBackend) EnterpriseCheckPartitions(partition string) error { |
|
return nil |
|
} |
|
|
|
func (b *testStreamBackend) Apply() Apply { |
|
return nil |
|
} |
|
|
|
func Test_processResponse(t *testing.T) { |
|
type testCase struct { |
|
name string |
|
in *pbpeering.ReplicationMessage_Response |
|
expect *pbpeering.ReplicationMessage |
|
wantErr bool |
|
} |
|
|
|
run := func(t *testing.T, tc testCase) { |
|
reply, err := processResponse(tc.in) |
|
if tc.wantErr { |
|
require.Error(t, err) |
|
} else { |
|
require.NoError(t, err) |
|
} |
|
require.Equal(t, tc.expect, reply) |
|
} |
|
|
|
tt := []testCase{ |
|
{ |
|
name: "valid upsert", |
|
in: &pbpeering.ReplicationMessage_Response{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
ResourceID: "api", |
|
Nonce: "1", |
|
Operation: pbpeering.ReplicationMessage_Response_UPSERT, |
|
}, |
|
expect: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "1", |
|
}, |
|
}, |
|
}, |
|
wantErr: false, |
|
}, |
|
{ |
|
name: "valid delete", |
|
in: &pbpeering.ReplicationMessage_Response{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
ResourceID: "api", |
|
Nonce: "1", |
|
Operation: pbpeering.ReplicationMessage_Response_DELETE, |
|
}, |
|
expect: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "1", |
|
}, |
|
}, |
|
}, |
|
wantErr: false, |
|
}, |
|
{ |
|
name: "invalid resource url", |
|
in: &pbpeering.ReplicationMessage_Response{ |
|
ResourceURL: "nomad.Job", |
|
Nonce: "1", |
|
Operation: pbpeering.ReplicationMessage_Response_Unknown, |
|
}, |
|
expect: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: "nomad.Job", |
|
Nonce: "1", |
|
Error: &pbstatus.Status{ |
|
Code: int32(code.Code_INVALID_ARGUMENT), |
|
Message: `received response for unknown resource type "nomad.Job"`, |
|
}, |
|
}, |
|
}, |
|
}, |
|
wantErr: true, |
|
}, |
|
{ |
|
name: "unknown operation", |
|
in: &pbpeering.ReplicationMessage_Response{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "1", |
|
Operation: pbpeering.ReplicationMessage_Response_Unknown, |
|
}, |
|
expect: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "1", |
|
Error: &pbstatus.Status{ |
|
Code: int32(code.Code_INVALID_ARGUMENT), |
|
Message: `unsupported operation: "Unknown"`, |
|
}, |
|
}, |
|
}, |
|
}, |
|
wantErr: true, |
|
}, |
|
{ |
|
name: "out of range operation", |
|
in: &pbpeering.ReplicationMessage_Response{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "1", |
|
Operation: pbpeering.ReplicationMessage_Response_Operation(100000), |
|
}, |
|
expect: &pbpeering.ReplicationMessage{ |
|
Payload: &pbpeering.ReplicationMessage_Request_{ |
|
Request: &pbpeering.ReplicationMessage_Request{ |
|
ResourceURL: pbpeering.TypeURLService, |
|
Nonce: "1", |
|
Error: &pbstatus.Status{ |
|
Code: int32(code.Code_INVALID_ARGUMENT), |
|
Message: `unsupported operation: "100000"`, |
|
}, |
|
}, |
|
}, |
|
}, |
|
wantErr: true, |
|
}, |
|
} |
|
for _, tc := range tt { |
|
t.Run(tc.name, func(t *testing.T) { |
|
run(t, tc) |
|
}) |
|
} |
|
}
|
|
|