From 6c8a0248ba25372a38aaca32f75d496a9432ed40 Mon Sep 17 00:00:00 2001 From: FFMMM Date: Fri, 29 Apr 2022 12:55:34 -0700 Subject: [PATCH] add peering cahnges from ENT 1c549d814 f1aca2618 be63e99c3 Signed-off-by: FFMMM --- agent/http_register.go | 2 +- agent/peering_endpoint.go | 38 +++++- agent/peering_endpoint_oss_test.go | 31 +++++ agent/peering_endpoint_test.go | 105 +++++++++++++++++ api/api_test.go | 17 +++ api/peering.go | 179 +++++++++++++++++++++++++++++ api/peering_test.go | 93 +++++++++++++++ 7 files changed, 461 insertions(+), 4 deletions(-) create mode 100644 api/peering.go create mode 100644 api/peering_test.go diff --git a/agent/http_register.go b/agent/http_register.go index cbef7fa6cb..854f8dccb3 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -105,7 +105,7 @@ func init() { registerEndpoint("/v1/operator/autopilot/state", []string{"GET"}, (*HTTPHandlers).OperatorAutopilotState) registerEndpoint("/v1/peering/token", []string{"POST"}, (*HTTPHandlers).PeeringGenerateToken) registerEndpoint("/v1/peering/initiate", []string{"POST"}, (*HTTPHandlers).PeeringInitiate) - registerEndpoint("/v1/peering/", []string{"GET"}, (*HTTPHandlers).PeeringRead) + registerEndpoint("/v1/peering/", []string{"GET", "DELETE"}, (*HTTPHandlers).PeeringEndpoint) registerEndpoint("/v1/peerings", []string{"GET"}, (*HTTPHandlers).PeeringList) registerEndpoint("/v1/query", []string{"GET", "POST"}, (*HTTPHandlers).PreparedQueryGeneral) // specific prepared query endpoints have more complex rules for allowed methods, so diff --git a/agent/peering_endpoint.go b/agent/peering_endpoint.go index 2db1d52d30..3032422925 100644 --- a/agent/peering_endpoint.go +++ b/agent/peering_endpoint.go @@ -8,8 +8,8 @@ import ( "github.com/hashicorp/consul/proto/pbpeering" ) -// PeeringRead fetches a peering that matches the request parameters. -func (s *HTTPHandlers) PeeringRead(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +// PeeringEndpoint handles GET, DELETE on v1/peering/name +func (s *HTTPHandlers) PeeringEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) { name, err := getPathSuffixUnescaped(req.URL.Path, "/v1/peering/") if err != nil { return nil, err @@ -23,10 +23,24 @@ func (s *HTTPHandlers) PeeringRead(resp http.ResponseWriter, req *http.Request) return nil, err } + // Switch on the method + switch req.Method { + case "GET": + return s.peeringRead(resp, req, name, entMeta.PartitionOrEmpty()) + case "DELETE": + return s.peeringDelete(resp, req, name, entMeta.PartitionOrEmpty()) + default: + return nil, MethodNotAllowedError{req.Method, []string{"GET", "DELETE"}} + } +} + +// peeringRead fetches a peering that matches the name and partition. +// This assumes that the name and partition parameters are valid +func (s *HTTPHandlers) peeringRead(resp http.ResponseWriter, req *http.Request, name, partition string) (interface{}, error) { args := pbpeering.PeeringReadRequest{ Name: name, Datacenter: s.agent.config.Datacenter, - Partition: entMeta.PartitionOrEmpty(), // should be "" in OSS + Partition: partition, // should be "" in OSS } result, err := s.agent.rpcClientPeering.PeeringRead(req.Context(), &args) @@ -116,3 +130,21 @@ func (s *HTTPHandlers) PeeringInitiate(resp http.ResponseWriter, req *http.Reque return s.agent.rpcClientPeering.Initiate(req.Context(), &args) } + +// peeringDelete initiates a deletion for a peering that matches the name and partition. +// This assumes that the name and partition parameters are valid. +func (s *HTTPHandlers) peeringDelete(resp http.ResponseWriter, req *http.Request, name, partition string) (interface{}, error) { + args := pbpeering.PeeringDeleteRequest{ + Name: name, + Datacenter: s.agent.config.Datacenter, + Partition: partition, // should be "" in OSS + } + + result, err := s.agent.rpcClientPeering.PeeringDelete(req.Context(), &args) + if err != nil { + return nil, err + } + + // TODO(peering) -- today pbpeering.PeeringDeleteResponse is a {} so the result below is actually {} + return result, nil +} diff --git a/agent/peering_endpoint_oss_test.go b/agent/peering_endpoint_oss_test.go index 5a6fa1f286..5e00b05767 100644 --- a/agent/peering_endpoint_oss_test.go +++ b/agent/peering_endpoint_oss_test.go @@ -43,3 +43,34 @@ func TestHTTP_Peering_GenerateToken_OSS_Failure(t *testing.T) { require.Contains(t, string(body), "Partitions are a Consul Enterprise feature") }) } + +func TestHTTP_PeeringEndpoint_OSS_Failure(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + a := NewTestAgent(t, "") + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + t.Run("Doesn't allow partitions on PeeringEndpoint in OSS HTTP requests", func(t *testing.T) { + req, err := http.NewRequest("GET", "/v1/peering/foo?partition=foo", nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + + require.Equal(t, http.StatusBadRequest, resp.Code) + body, _ := io.ReadAll(resp.Body) + require.Contains(t, string(body), "Partitions are a Consul Enterprise feature") + + req2, err2 := http.NewRequest("DELETE", "/v1/peering/foo?partition=foo", nil) + require.NoError(t, err2) + resp2 := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp2, req2) + + require.Equal(t, http.StatusBadRequest, resp2.Code) + body2, _ := io.ReadAll(resp2.Body) + require.Contains(t, string(body2), "Partitions are a Consul Enterprise feature") + }) +} diff --git a/agent/peering_endpoint_test.go b/agent/peering_endpoint_test.go index 0e1840cf0a..f44060a7a3 100644 --- a/agent/peering_endpoint_test.go +++ b/agent/peering_endpoint_test.go @@ -195,6 +195,41 @@ func TestHTTP_Peering_Initiate(t *testing.T) { }) } +func TestHTTP_Peering_MethodNotAllowed(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := NewTestAgent(t, "") + + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Insert peerings directly to state store. + // Note that the state store holds reference to the underlying + // variables; do not modify them after writing. + foo := &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "foo", + State: pbpeering.PeeringState_INITIAL, + PeerCAPems: nil, + PeerServerName: "fooservername", + PeerServerAddresses: []string{"addr1"}, + }, + } + _, err := a.rpcClientPeering.PeeringWrite(ctx, foo) + require.NoError(t, err) + + req, err := http.NewRequest("PUT", "/v1/peering/foo", nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusMethodNotAllowed, resp.Code) +} + func TestHTTP_Peering_Read(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -257,6 +292,76 @@ func TestHTTP_Peering_Read(t *testing.T) { }) } +func TestHTTP_Peering_Delete(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := NewTestAgent(t, "") + + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Insert peerings directly to state store. + // Note that the state store holds reference to the underlying + // variables; do not modify them after writing. + foo := &pbpeering.PeeringWriteRequest{ + Peering: &pbpeering.Peering{ + Name: "foo", + State: pbpeering.PeeringState_INITIAL, + PeerCAPems: nil, + PeerServerName: "fooservername", + PeerServerAddresses: []string{"addr1"}, + }, + } + _, err := a.rpcClientPeering.PeeringWrite(ctx, foo) + require.NoError(t, err) + + t.Run("read existing token before attempting delete", func(t *testing.T) { + req, err := http.NewRequest("GET", "/v1/peering/foo", nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + // TODO(peering): replace with API types + var pbresp pbpeering.Peering + require.NoError(t, json.NewDecoder(resp.Body).Decode(&pbresp)) + + require.Equal(t, foo.Peering.Name, pbresp.Name) + }) + + t.Run("delete the existing token we just read", func(t *testing.T) { + req, err := http.NewRequest("DELETE", "/v1/peering/foo", nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + require.Equal(t, "{}", resp.Body.String()) + }) + + t.Run("now the token is deleted, a read should 404", func(t *testing.T) { + req, err := http.NewRequest("GET", "/v1/peering/foo", nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusNotFound, resp.Code) + }) + + t.Run("delete a token that does not exist", func(t *testing.T) { + req, err := http.NewRequest("DELETE", "/v1/peering/baz", nil) + require.NoError(t, err) + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + + // TODO(peering): it may be a security concern, but do we want to say 404 here? + require.Equal(t, http.StatusOK, resp.Code) + }) +} + func TestHTTP_Peering_List(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/api/api_test.go b/api/api_test.go index 3f4e4e3255..f06f4c6823 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -51,6 +51,23 @@ func makeACLClient(t *testing.T) (*Client, *testutil.TestServer) { }) } +func makeClientWithCA(t *testing.T) (*Client, *testutil.TestServer) { + return makeClientWithConfig(t, + func(c *Config) { + c.TLSConfig = TLSConfig{ + Address: "consul.test", + CAFile: "../test/client_certs/rootca.crt", + CertFile: "../test/client_certs/client.crt", + KeyFile: "../test/client_certs/client.key", + } + }, + func(c *testutil.TestServerConfig) { + c.CAFile = "../test/client_certs/rootca.crt" + c.CertFile = "../test/client_certs/server.crt" + c.KeyFile = "../test/client_certs/server.key" + }) +} + func makeClientWithConfig( t *testing.T, cb1 configCallback, diff --git a/api/peering.go b/api/peering.go new file mode 100644 index 0000000000..95a383fd71 --- /dev/null +++ b/api/peering.go @@ -0,0 +1,179 @@ +package api + +import ( + "context" + "fmt" +) + +// PeeringState enumerates all the states a peering can be in +type PeeringState int32 + +const ( + // Undefined represents an unset value for PeeringState during + // writes. + UNDEFINED PeeringState = 0 + // INITIAL Initial means a Peering has been initialized and is awaiting + // acknowledgement from a remote peer. + INITIAL PeeringState = 1 + // Active means that the peering connection is active and healthy. + // ACTIVE PeeringState = 2 +) + +type Peering struct { + // ID is a datacenter-scoped UUID for the peering. + ID string `json:"ID,omitempty"` + // Name is the local alias for the peering relationship. + Name string `json:"Name,omitempty"` + // Partition is the local partition connecting to the peer. + Partition string `json:"Partition,omitempty"` + // State is one of the valid PeeringState values to represent the status of + // peering relationship. + State PeeringState `json:"State,omitempty"` + // PeerID is the ID that our peer assigned to this peering. + // This ID is to be used when dialing the peer, so that it can know who dialed it. + PeerID string `json:"PeerID,omitempty"` + // PeerCAPems contains all the CA certificates for the remote peer. + PeerCAPems []string `json:"PeerCAPems,omitempty"` + // PeerServerName is the name of the remote server as it relates to TLS. + PeerServerName string `json:"PeerServerName,omitempty"` + // PeerServerAddresses contains all the connection addresses for the remote peer. + PeerServerAddresses []string `json:"PeerServerAddresses,omitempty"` + // CreateIndex is the Raft index at which the Peering was created. + CreateIndex uint64 `json:"CreateIndex,omitempty"` + // ModifyIndex is the latest Raft index at which the Peering. was modified. + ModifyIndex uint64 `json:"ModifyIndex,omitempty"` +} + +// PeeringRequest is used for Read and Delete HTTP calls. +// The PeeringReadRequest and PeeringDeleteRequest look the same, so we treat them the same for now +type PeeringRequest struct { + Name string `json:"Name,omitempty"` + Partition string `json:"Partition,omitempty"` + Datacenter string `json:"Datacenter,omitempty"` +} + +type PeeringReadResponse struct { + Peering *Peering `json:"Peering,omitempty"` +} + +type PeeringGenerateTokenRequest struct { + // PeerName is the name of the remote peer. + PeerName string `json:"PeerName,omitempty"` + // Partition to be peered. + Partition string `json:"Partition,omitempty"` + Datacenter string `json:"Datacenter,omitempty"` + Token string `json:"Token,omitempty"` +} + +type PeeringGenerateTokenResponse struct { + // PeeringToken is an opaque string provided to the remote peer for it to complete + // the peering initialization handshake. + PeeringToken string `json:"PeeringToken,omitempty"` +} + +type PeeringInitiateRequest struct { + // Name of the remote peer. + PeerName string `json:"PeerName,omitempty"` + // The peering token returned from the peer's GenerateToken endpoint. + PeeringToken string `json:"PeeringToken,omitempty"` + Datacenter string `json:"Datacenter,omitempty"` + Token string `json:"Token,omitempty"` +} + +type PeeringInitiateResponse struct { + Status uint32 `json:"Status,omitempty"` +} + +type Peerings struct { + c *Client +} + +// Peerings returns a handle to the operator endpoints. +func (c *Client) Peerings() *Peerings { + return &Peerings{c: c} +} + +func (p *Peerings) Read(ctx context.Context, name string, q *QueryOptions) (*Peering, *QueryMeta, error) { + if name == "" { + return nil, nil, fmt.Errorf("peering name cannot be empty") + } + + req := p.c.newRequest("GET", fmt.Sprintf("/v1/peering/%s", name)) + req.setQueryOptions(q) + req.ctx = ctx + + rtt, resp, err := p.c.doRequest(req) + if err != nil { + return nil, nil, err + } + defer closeResponseBody(resp) + if err := requireOK(resp); err != nil { + return nil, nil, err + } + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + var out Peering + if err := decodeBody(resp, &out); err != nil { + return nil, nil, err + } + + return &out, qm, nil +} + +func (p *Peerings) GenerateToken(ctx context.Context, g PeeringGenerateTokenRequest, wq *WriteOptions) (*PeeringGenerateTokenResponse, *WriteMeta, error) { + if g.PeerName == "" { + return nil, nil, fmt.Errorf("peer name cannot be empty") + } + + req := p.c.newRequest("POST", fmt.Sprint("/v1/peering/token")) + req.setWriteOptions(wq) + req.ctx = ctx + req.obj = g + + rtt, resp, err := p.c.doRequest(req) + if err != nil { + return nil, nil, err + } + defer closeResponseBody(resp) + if err := requireOK(resp); err != nil { + return nil, nil, err + } + + wm := &WriteMeta{RequestTime: rtt} + + var out PeeringGenerateTokenResponse + if err := decodeBody(resp, &out); err != nil { + return nil, nil, err + } + + return &out, wm, nil +} + +func (p *Peerings) Initiate(ctx context.Context, i PeeringInitiateRequest, wq *WriteOptions) (*PeeringInitiateResponse, *WriteMeta, error) { + + req := p.c.newRequest("POST", fmt.Sprint("/v1/peering/initiate")) + req.setWriteOptions(wq) + req.ctx = ctx + req.obj = i + + rtt, resp, err := p.c.doRequest(req) + if err != nil { + return nil, nil, err + } + defer closeResponseBody(resp) + if err := requireOK(resp); err != nil { + return nil, nil, err + } + + wm := &WriteMeta{RequestTime: rtt} + + var out PeeringInitiateResponse + if err := decodeBody(resp, &out); err != nil { + return nil, nil, err + } + + return &out, wm, nil +} diff --git a/api/peering_test.go b/api/peering_test.go new file mode 100644 index 0000000000..44c024aa6c --- /dev/null +++ b/api/peering_test.go @@ -0,0 +1,93 @@ +package api + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/sdk/testutil" +) + +// TODO(peering): cover the following test cases: bad/ malformed input, peering with wrong token, +// peering with the wrong PeerName + +// TestAPI_Peering_GenerateToken_Read_Initiate tests the following use case: +// a server creates a peering token, reads the token, then another server calls initiate peering +func TestAPI_Peering_GenerateToken_Read_Initiate(t *testing.T) { + t.Parallel() + c, s := makeClientWithCA(t) + defer s.Stop() + s.WaitForSerfCheck(t) + options := &WriteOptions{Datacenter: "dc1"} + ctx := context.Background() + peerings := c.Peerings() + + p1 := PeeringGenerateTokenRequest{ + PeerName: "peer1", + } + var token1 string + t.Run("Generate a token happy path", func(t *testing.T) { + resp, qq, err := peerings.GenerateToken(ctx, p1, options) + token1 = resp.PeeringToken + + require.NoError(t, err) + require.NotEmpty(t, qq) + require.NotEmpty(t, resp) + }) + + t.Run("Read token generated on \"server\"", func(t *testing.T) { + resp, qq, err := peerings.Read(ctx, "peer1", nil) + + // basic ok checking + require.NoError(t, err) + require.NotEmpty(t, qq) + require.NotEmpty(t, resp) + + // token specific assertions on the "server" + require.Equal(t, "peer1", resp.Name) + require.Equal(t, "default", resp.Partition) + require.Equal(t, INITIAL, resp.State) + + }) + + t.Run("Initiate peering", func(t *testing.T) { + // make a "client" server in second DC for peering + c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.Datacenter = "dc2" + }) + defer s2.Stop() + + i := PeeringInitiateRequest{ + Datacenter: c2.config.Datacenter, + PeerName: "peer1", + PeeringToken: token1, + } + + respi, wm, err := c2.Peerings().Initiate(ctx, i, options) + + // basic checks + require.NoError(t, err) + require.NotEmpty(t, wm) + + // at first the token will be undefined + require.Equal(t, UNDEFINED, PeeringState(respi.Status)) + + // wait for the peering backend to finish the peering connection + time.Sleep(2 * time.Second) + + respr, qq, err := c2.Peerings().Read(ctx, "peer1", nil) + + // basic ok checking + require.NoError(t, err) + require.NotEmpty(t, qq) + + // require that the peering state is not undefined + require.Equal(t, INITIAL, respr.State) + + // TODO(peering) -- let's go all the way and test in code either here or somewhere else that PeeringState does move to Active + // require.Equal(t, PeeringState_ACTIVE, respr.State) + }) + +}