Browse Source

server: wire up in-process Resource Service (#16978)

pull/16991/head
Dan Upton 2 years ago committed by GitHub
parent
commit
a37a441991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      acl/resolver/danger.go
  2. 71
      agent/consul/server.go
  3. 11
      agent/grpc-external/services/peerstream/subscription_view.go
  4. 84
      agent/grpc-internal/pipe.go
  5. 70
      agent/grpc-internal/pipe_test.go

15
acl/resolver/danger.go

@ -0,0 +1,15 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package resolver
import "github.com/hashicorp/consul/acl"
// DANGER_NO_AUTH implements an ACL resolver short-circuit authorization in
// cases where it is handled somewhere else or expressly not required.
type DANGER_NO_AUTH struct{}
// ResolveTokenAndDefaultMeta returns an authorizer with unfettered permissions.
func (DANGER_NO_AUTH) ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (Result, error) {
return Result{Authorizer: acl.ManageAll()}, nil
}

71
agent/consul/server.go

@ -33,10 +33,12 @@ import (
"go.etcd.io/bbolt"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
"github.com/hashicorp/consul/agent/consul/fsm"
@ -67,11 +69,11 @@ import (
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/demo"
"github.com/hashicorp/consul/internal/storage"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
@ -424,6 +426,14 @@ type Server struct {
// routineManager is responsible for managing longer running go routines
// run by the Server
routineManager *routine.Manager
// typeRegistry contains Consul's registered resource types.
typeRegistry resource.Registry
// internalResourceServiceClient is a client that can be used to communicate
// with the Resource Service in-process (i.e. not via the network) without auth.
// It should only be used for purely-internal workloads, such as controllers.
internalResourceServiceClient pbresource.ResourceServiceClient
}
type connHandler interface {
@ -486,6 +496,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
publisher: flat.EventPublisher,
incomingRPCLimiter: incomingRPCLimiter,
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
typeRegistry: resource.NewRegistry(),
}
incomingRPCLimiter.Register(s)
@ -750,7 +761,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
// Initialize external gRPC server
s.setupExternalGRPC(config, s.raftStorageBackend, logger)
s.setupExternalGRPC(config, logger)
// Initialize internal gRPC server.
//
@ -767,6 +778,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
})
go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
if err := s.setupInternalResourceService(logger); err != nil {
return nil, err
}
// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
s.initAutopilot(config)
@ -803,6 +818,10 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
return nil, err
}
if s.config.DevMode {
demo.Register(s.typeRegistry)
}
return s, nil
}
@ -1197,7 +1216,7 @@ func (s *Server) setupRPC() error {
}
// Initialize and register services on external gRPC server.
func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logger hclog.Logger) {
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
@ -1262,20 +1281,50 @@ func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logg
})
s.peerStreamServer.Register(s.externalGRPCServer)
registry := resource.NewRegistry()
if s.config.DevMode {
demo.Register(registry)
}
resourcegrpc.NewServer(resourcegrpc.Config{
Registry: registry,
Backend: backend,
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: s.ACLResolver,
Logger: logger.Named("grpc-api.resource"),
}).Register(s.externalGRPCServer)
}
func (s *Server) setupInternalResourceService(logger hclog.Logger) error {
server := grpc.NewServer()
resourcegrpc.NewServer(resourcegrpc.Config{
Registry: s.typeRegistry,
Backend: s.raftStorageBackend,
ACLResolver: resolver.DANGER_NO_AUTH{},
Logger: logger.Named("grpc-api.resource"),
}).Register(server)
pipe := agentgrpc.NewPipeListener()
go server.Serve(pipe)
go func() {
<-s.shutdownCh
server.Stop()
}()
conn, err := grpc.Dial("",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(pipe.DialContext),
grpc.WithBlock(),
)
if err != nil {
server.Stop()
return err
}
go func() {
<-s.shutdownCh
conn.Close()
}()
s.internalResourceServiceClient = pbresource.NewResourceServiceClient(conn)
return nil
}
// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
s.logger.Info("shutting down server")

11
agent/grpc-external/services/peerstream/subscription_view.go vendored

@ -9,7 +9,6 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/stream"
@ -78,7 +77,7 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
}
deps := submatview.LocalMaterializerDeps{
Backend: e.sub,
ACLResolver: DANGER_NO_AUTH{},
ACLResolver: resolver.DANGER_NO_AUTH{},
Deps: submatview.Deps{
View: newExportedServicesView(),
Logger: e.logger,
@ -88,14 +87,6 @@ func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, err
return submatview.NewLocalMaterializer(deps), nil
}
// DANGER_NO_AUTH implements submatview.ACLResolver to short-circuit authorization
// in cases where it is handled somewhere else (e.g. in an RPC handler).
type DANGER_NO_AUTH struct{}
func (DANGER_NO_AUTH) ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (resolver.Result, error) {
return resolver.Result{Authorizer: acl.ManageAll()}, nil
}
// Type implements submatview.Request
func (e *exportedServiceRequest) Type() string {
return "leader.peering.stream.exportedServiceRequest"

84
agent/grpc-internal/pipe.go

@ -0,0 +1,84 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package internal
import (
"context"
"errors"
"net"
"sync/atomic"
)
// ErrPipeClosed is returned when calling Accept or DialContext on a closed
// PipeListener.
var ErrPipeClosed = errors.New("pipe listener has been closed")
// PipeListener implements the net.Listener interface using a net.Pipe so that
// you can interact with a gRPC service in the same process without going over
// the network.
type PipeListener struct {
conns chan net.Conn
closed atomic.Bool
done chan struct{}
}
var _ net.Listener = (*PipeListener)(nil)
// NewPipeListener creates a new PipeListener.
func NewPipeListener() *PipeListener {
return &PipeListener{
conns: make(chan net.Conn),
done: make(chan struct{}),
}
}
// Accept a connection.
func (p *PipeListener) Accept() (net.Conn, error) {
select {
case conn := <-p.conns:
return conn, nil
case <-p.done:
return nil, ErrPipeClosed
}
}
// Close the listener.
func (p *PipeListener) Close() error {
if p.closed.CompareAndSwap(false, true) {
close(p.done)
}
return nil
}
// DialContext dials the server over an in-process pipe.
func (p *PipeListener) DialContext(ctx context.Context, _ string) (net.Conn, error) {
if p.closed.Load() {
return nil, ErrPipeClosed
}
serverConn, clientConn := net.Pipe()
select {
// Send the server connection to whatever is accepting connections from the
// PipeListener. This will block until something has accepted the conn.
case p.conns <- serverConn:
return clientConn, nil
case <-ctx.Done():
serverConn.Close()
clientConn.Close()
return nil, ctx.Err()
case <-p.done:
serverConn.Close()
clientConn.Close()
return nil, ErrPipeClosed
}
}
// Add returns the listener's address.
func (*PipeListener) Addr() net.Addr { return pipeAddr{} }
type pipeAddr struct{}
func (pipeAddr) Network() string { return "pipe" }
func (pipeAddr) String() string { return "pipe" }

70
agent/grpc-internal/pipe_test.go

@ -0,0 +1,70 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package internal
import (
"bufio"
"context"
"net"
"testing"
"github.com/stretchr/testify/require"
)
func TestPipeListener_RoundTrip(t *testing.T) {
lis := NewPipeListener()
t.Cleanup(func() { _ = lis.Close() })
go echoServer(lis)
conn, err := lis.DialContext(context.Background(), "")
require.NoError(t, err)
t.Cleanup(func() { _ = conn.Close() })
input := []byte("Hello World\n")
_, err = conn.Write(input)
require.NoError(t, err)
output := make([]byte, len(input))
_, err = conn.Read(output)
require.NoError(t, err)
require.Equal(t, string(input), string(output))
}
func TestPipeListener_Closed(t *testing.T) {
lis := NewPipeListener()
require.NoError(t, lis.Close())
_, err := lis.Accept()
require.ErrorIs(t, err, ErrPipeClosed)
_, err = lis.DialContext(context.Background(), "")
require.ErrorIs(t, err, ErrPipeClosed)
}
func echoServer(lis net.Listener) {
handleConn := func(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
msg, err := reader.ReadBytes('\n')
if err != nil {
return
}
if _, err := conn.Write(msg); err != nil {
return
}
}
}
for {
conn, err := lis.Accept()
if err != nil {
return
}
go handleConn(conn)
}
}
Loading…
Cancel
Save