diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go new file mode 100644 index 0000000000..e7c06dbc21 --- /dev/null +++ b/consul/catalog_endpoint.go @@ -0,0 +1,49 @@ +package consul + +import ( + "github.com/hashicorp/consul/rpc" +) + +// Catalog endpoint is used to manipulate the service catalog +type Catalog struct { + *Server +} + +/* +* Register : Registers that a node provides a given service +* Deregister : Deregisters that a node provides a given service +* RemoveNode: Used to force remove a node + +* ListDatacenters: List the known datacenters +* ListServices : Lists the available services +* ListNodes : Lists the available nodes +* ServiceNodes: Returns the nodes that are part of a service +* NodeServices: Returns the services that a node is registered for + */ + +// Register is used register that a node is providing a given service. +// Returns true if the entry was added, false if it already exists, or +// an error is returned. +func (c *Catalog) Register(args *rpc.RegisterRequest, reply *bool) error { + if done, err := c.forward("Catalog.Register", args.Datacenter, args, reply); done { + return err + } + + // Run it through raft + resp, err := c.raftApply(rpc.RegisterRequestType, args) + if err != nil { + c.logger.Printf("[ERR] Register failed: %v", err) + return err + } + + // Set the response + *reply = resp.(bool) + return nil +} + +// Deregister is used to remove a service registration for a given node. +// Returns true if the entry was removed, false if it doesn't exist or +// an error is returned. +func (c *Catalog) Deregister(args *rpc.DeregisterRequest, reply *bool) error { + return nil +} diff --git a/consul/endpoints.md b/consul/endpoints.md index ca19f2fbdd..60cf9dd8e7 100644 --- a/consul/endpoints.md +++ b/consul/endpoints.md @@ -31,10 +31,8 @@ The catalog service is used to manage service discovery and registration. Nodes can register the services they provide, and deregister them later. The service exposes the following methods: -* Register : Registers that a node provides a given service -* Deregister : Deregisters that a node provides a given service - -* RemoveFailedNode: Used to force remove a failed node +* Register : Registers a node, and potentially a node service +* Deregister : Deregisters a node, and potentially a node service * ListDatacenters: List the known datacenters * ListServices : Lists the available services diff --git a/consul/fsm.go b/consul/fsm.go index 696431ef71..c035fb3883 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -1,6 +1,8 @@ package consul import ( + "fmt" + "github.com/hashicorp/consul/rpc" "github.com/hashicorp/raft" "io" ) @@ -32,11 +34,22 @@ func NewFSM() (*consulFSM, error) { return fsm, nil } -func (c *consulFSM) Apply([]byte) interface{} { - // TODO: Decode +func (c *consulFSM) Apply(buf []byte) interface{} { + switch rpc.MessageType(buf[0]) { + case rpc.RegisterRequestType: + return c.applyRegister(buf[1:]) + default: + panic(fmt.Errorf("failed to apply request: %#v", buf)) + } +} - // TODO: Execute - return nil +func (c *consulFSM) applyRegister(buf []byte) interface{} { + var req rpc.RegisterRequest + if err := rpc.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + return true } func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { diff --git a/consul/rpc.go b/consul/rpc.go index fff6cf2b53..0941f766d2 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -1,6 +1,8 @@ package consul import ( + "fmt" + "github.com/hashicorp/consul/rpc" "github.com/ugorji/go/codec" "net" ) @@ -77,3 +79,51 @@ func (s *Server) handleConsulConn(conn net.Conn) { } } } + +// forward is used to forward to a remote DC or to forward to the local leader +// Returns a bool of if forwarding was performed, as well as any error +func (s *Server) forward(method, dc string, args interface{}, reply interface{}) (bool, error) { + // Handle DC forwarding + if dc != s.config.Datacenter { + err := s.forwardDC(method, dc, args, reply) + return true, err + } + + // Handle leader forwarding + if !s.IsLeader() { + err := s.forwardLeader(method, args, reply) + return true, err + } + return false, nil +} + +// forwardLeader is used to forward an RPC call to the leader, or fail if no leader +func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error { + leader := s.raft.Leader() + if leader == nil { + return rpc.ErrNoLeader + } + return s.connPool.RPC(leader, method, args, reply) +} + +// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers +func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error { + // TODO: Fix + return fmt.Errorf("DC forwarding not supported") +} + +// raftApply is used to encode a message, run it through raft, and return +// the FSM response along with any errors +func (s *Server) raftApply(t rpc.MessageType, msg interface{}) (interface{}, error) { + buf, err := rpc.Encode(t, msg) + if err != nil { + return nil, fmt.Errorf("Failed to encode request: %v", err) + } + + future := s.raft.Apply(buf, 0) + if err := future.Error(); err != nil { + return nil, err + } + + return future.Response(), nil +} diff --git a/consul/server.go b/consul/server.go index 51f523d3b5..6d9a50eb35 100644 --- a/consul/server.go +++ b/consul/server.go @@ -210,6 +210,7 @@ func (s *Server) setupRPC() error { // Register the handlers s.rpcServer.Register(&Status{server: s}) s.rpcServer.Register(&Raft{server: s}) + s.rpcServer.Register(&Catalog{s}) list, err := net.Listen("tcp", s.config.RPCAddr) if err != nil { @@ -352,3 +353,8 @@ func (s *Server) RemoveFailedNode(node string) error { } return nil } + +// IsLeader checks if this server is the cluster leader +func (s *Server) IsLeader() bool { + return s.raft.State() == raft.Leader +} diff --git a/rpc/structs.go b/rpc/structs.go new file mode 100644 index 0000000000..ce0291efbe --- /dev/null +++ b/rpc/structs.go @@ -0,0 +1,56 @@ +package rpc + +import ( + "bytes" + "fmt" + "github.com/ugorji/go/codec" +) + +var ( + ErrNoLeader = fmt.Errorf("No cluster leader") +) + +type MessageType uint8 + +const ( + RegisterRequestType MessageType = iota + DeregisterRequestType +) + +// RegisterRequest is used for the Catalog.Register endpoint +// to register a node as providing a service. If no service +// is provided, the node is registered. +type RegisterRequest struct { + Datacenter string + Node string + Address string + ServiceName string + ServicePort int + ServiceTag string +} + +// DeregisterRequest is used for the Catalog.Deregister endpoint +// to deregister a node as providing a service. If no service is +// provided the entire node is deregistered. +type DeregisterRequest struct { + Datacenter string + Node string + ServiceName string +} + +// Decode is used to decode a MsgPack encoded object +func Decode(buf []byte, out interface{}) error { + var handle codec.MsgpackHandle + return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out) +} + +// Encode is used to encode a MsgPack object with type prefix +func Encode(t MessageType, msg interface{}) ([]byte, error) { + buf := bytes.NewBuffer(nil) + buf.WriteByte(uint8(t)) + + handle := codec.MsgpackHandle{} + encoder := codec.NewEncoder(buf, &handle) + err := encoder.Encode(msg) + return buf.Bytes(), err +}