From 41b3ca7da9c9c683bcdf7493e1092fddec4befca Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Fri, 6 Dec 2013 16:54:33 -0800 Subject: [PATCH] consul: refactor into more files --- consul/rpc.go | 46 ++++++++++++++++++++++++++++++++++ consul/serf.go | 25 +++++++++++++++++++ consul/server.go | 65 ------------------------------------------------ 3 files changed, 71 insertions(+), 65 deletions(-) create mode 100644 consul/rpc.go create mode 100644 consul/serf.go diff --git a/consul/rpc.go b/consul/rpc.go new file mode 100644 index 0000000000..09697d1ac2 --- /dev/null +++ b/consul/rpc.go @@ -0,0 +1,46 @@ +package consul + +import ( + "github.com/ugorji/go/codec" + "net" +) + +// listen is used to listen for incoming RPC connections +func (s *Server) listen() { + for { + // Accept a connection + conn, err := s.rpcListener.Accept() + if err != nil { + if s.shutdown { + return + } + s.logger.Printf("[ERR] Failed to accept RPC conn: %v", err) + continue + } + + // Track this client + s.rpcClientLock.Lock() + s.rpcClients[conn] = struct{}{} + s.rpcClientLock.Unlock() + + go s.handleConn(conn) + } +} + +// handleConn is used to service a single RPC connection +func (s *Server) handleConn(conn net.Conn) { + defer func() { + conn.Close() + s.rpcClientLock.Lock() + delete(s.rpcClients, conn) + s.rpcClientLock.Unlock() + }() + + rpcCodec := codec.GoRpc.ServerCodec(conn, &codec.MsgpackHandle{}) + for !s.shutdown { + if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { + s.logger.Printf("[ERR] RPC error: %v (%v)", err, conn) + return + } + } +} diff --git a/consul/serf.go b/consul/serf.go new file mode 100644 index 0000000000..6ef6a14ada --- /dev/null +++ b/consul/serf.go @@ -0,0 +1,25 @@ +package connsul + +// lanEventHandler is used to handle events from the lan Serf cluster +func (s *Server) lanEventHandler() { + for { + select { + case e := <-s.eventChLAN: + s.logger.Printf("[INFO] LAN Event: %v", e) + case <-s.shutdownCh: + return + } + } +} + +// wanEventHandler is used to handle events from the wan Serf cluster +func (s *Server) wanEventHandler() { + for { + select { + case e := <-s.eventChWAN: + s.logger.Printf("[INFO] WAN Event: %v", e) + case <-s.shutdownCh: + return + } + } +} diff --git a/consul/server.go b/consul/server.go index a0943d7e77..69e8262ce3 100644 --- a/consul/server.go +++ b/consul/server.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" - "github.com/ugorji/go/codec" "log" "net" "net/rpc" @@ -252,67 +251,3 @@ func (s *Server) Shutdown() error { return nil } - -// lanEventHandler is used to handle events from the lan Serf cluster -func (s *Server) lanEventHandler() { - for { - select { - case e := <-s.eventChLAN: - s.logger.Printf("[INFO] LAN Event: %v", e) - case <-s.shutdownCh: - return - } - } -} - -// wanEventHandler is used to handle events from the wan Serf cluster -func (s *Server) wanEventHandler() { - for { - select { - case e := <-s.eventChWAN: - s.logger.Printf("[INFO] WAN Event: %v", e) - case <-s.shutdownCh: - return - } - } -} - -// listen is used to listen for incoming RPC connections -func (s *Server) listen() { - for { - // Accept a connection - conn, err := s.rpcListener.Accept() - if err != nil { - if s.shutdown { - return - } - s.logger.Printf("[ERR] Failed to accept RPC conn: %v", err) - continue - } - - // Track this client - s.rpcClientLock.Lock() - s.rpcClients[conn] = struct{}{} - s.rpcClientLock.Unlock() - - go s.handleConn(conn) - } -} - -// handleConn is used to service a single RPC connection -func (s *Server) handleConn(conn net.Conn) { - defer func() { - conn.Close() - s.rpcClientLock.Lock() - delete(s.rpcClients, conn) - s.rpcClientLock.Unlock() - }() - - rpcCodec := codec.GoRpc.ServerCodec(conn, &codec.MsgpackHandle{}) - for !s.shutdown { - if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { - s.logger.Printf("[ERR] RPC error: %v (%v)", err, conn) - return - } - } -}