From ebfda68bef156d20e9dbfdd20ceeddf8a09d1edd Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 19 Dec 2013 14:48:14 -0800 Subject: [PATCH] Adding initial consul client --- consul/client.go | 220 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 consul/client.go diff --git a/consul/client.go b/consul/client.go new file mode 100644 index 0000000000..c7b73948c5 --- /dev/null +++ b/consul/client.go @@ -0,0 +1,220 @@ +package consul + +import ( + "fmt" + "github.com/hashicorp/serf/serf" + "log" + "net" + "os" + "path/filepath" + "sync" +) + +// Client is Consul client which uses RPC to communicate with the +// services for service discovery, health checking, and DC forwarding. +type Client struct { + config *Config + + // Connection pool to consul servers + connPool *ConnPool + + // consuls tracks the locally known servers + consuls []net.Addr + consulLock sync.RWMutex + + // eventCh is used to receive events from the + // serf cluster in the datacenter + eventCh chan serf.Event + + // Logger uses the provided LogOutput + logger *log.Logger + + // serf is the Serf cluster maintained inside the DC + // which contains all the DC nodes + serf *serf.Serf + + shutdown bool + shutdownCh chan struct{} + shutdownLock sync.Mutex +} + +// NewClient is used to construct a new Consul client from the +// configuration, potentially returning an error +func NewClient(config *Config) (*Client, error) { + // Check for a data directory! + if config.DataDir == "" { + return nil, fmt.Errorf("Config must provide a DataDir") + } + + // Ensure we have a log output + if config.LogOutput == nil { + config.LogOutput = os.Stderr + } + + // Create a logger + logger := log.New(config.LogOutput, "", log.LstdFlags) + + // Create server + c := &Client{ + config: config, + connPool: NewPool(5), + eventCh: make(chan serf.Event, 256), + logger: logger, + shutdownCh: make(chan struct{}), + } + + // Start the Serf listeners to prevent a deadlock + go c.lanEventHandler() + + // Initialize the lan Serf + var err error + c.serf, err = c.setupSerf(config.SerfLANConfig, + c.eventCh, serfLANSnapshot) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("Failed to start lan serf: %v", err) + } + return c, nil +} + +// setupSerf is used to setup and initialize a Serf +func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) { + conf.NodeName = c.config.NodeName + conf.Role = fmt.Sprintf("node:%s", c.config.Datacenter) + conf.MemberlistConfig.LogOutput = c.config.LogOutput + conf.LogOutput = c.config.LogOutput + conf.EventCh = ch + conf.SnapshotPath = filepath.Join(c.config.DataDir, path) + if err := ensurePath(conf.SnapshotPath, false); err != nil { + return nil, err + } + return serf.Create(conf) +} + +// Shutdown is used to shutdown the client +func (c *Client) Shutdown() error { + c.logger.Printf("[INFO] Shutting down Consul client") + c.shutdownLock.Lock() + defer c.shutdownLock.Unlock() + + if c.shutdown { + return nil + } + + c.shutdown = true + close(c.shutdownCh) + + if c.serf != nil { + c.serf.Shutdown() + } + + // Close the connection pool + c.connPool.Shutdown() + return nil +} + +// Leave is used to prepare for a graceful shutdown +func (c *Client) Leave() error { + c.logger.Printf("[INFO] Consul client starting leave") + + // Leave the LAN pool + if c.serf != nil { + if err := c.serf.Leave(); err != nil { + c.logger.Printf("[ERR] Failed to leave LAN Serf cluster: %v", err) + } + } + return nil +} + +// JoinLAN is used to have Consul client join the inner-DC pool +// The target address should be another node inside the DC +// listening on the Serf LAN address +func (c *Client) JoinLAN(addr string) error { + _, err := c.serf.Join([]string{addr}, false) + return err +} + +// LANMembers is used to return the members of the LAN cluster +func (c *Client) LANMembers() []serf.Member { + return c.serf.Members() +} + +// lanEventHandler is used to handle events from the lan Serf cluster +func (c *Client) lanEventHandler() { + for { + select { + case e := <-c.eventCh: + switch e.EventType() { + case serf.EventMemberJoin: + c.nodeJoin(e.(serf.MemberEvent)) + case serf.EventMemberLeave: + fallthrough + case serf.EventMemberFailed: + c.nodeFail(e.(serf.MemberEvent)) + case serf.EventUser: + default: + c.logger.Printf("[WARN] Unhandled LAN Serf Event: %#v", e) + } + case <-c.shutdownCh: + return + } + } +} + +// nodeJoin is used to handle join events on the serf cluster +func (c *Client) nodeJoin(me serf.MemberEvent) { + for _, m := range me.Members { + ok, dc, port := isConsulServer(m) + if !ok { + continue + } + if dc != c.config.Datacenter { + c.logger.Printf("[WARN] Consul server %s for datacenter %s has joined wrong cluster", + m.Name, dc) + continue + } + + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} + c.logger.Printf("[INFO] Adding Consul server (Datacenter: %s) (Addr: %s)", dc, addr) + + // Check if this server is known + found := false + c.consulLock.Lock() + for _, c := range c.consuls { + if c.String() == addr.String() { + found = true + break + } + } + + // Add to the list if not known + if !found { + c.consuls = append(c.consuls, addr) + } + c.consulLock.Unlock() + } +} + +// nodeFail is used to handle fail events on the serf cluster +func (c *Client) nodeFail(me serf.MemberEvent) { + for _, m := range me.Members { + ok, dc, port := isConsulServer(m) + if !ok { + continue + } + var addr net.Addr = &net.TCPAddr{IP: m.Addr, Port: port} + c.logger.Printf("[INFO] Removing Consul server (Datacenter: %s) (Addr: %s)", dc, addr) + + // Remove the server if known + c.consulLock.Lock() + n := len(c.consuls) + for i := 0; i < n; i++ { + if c.consuls[i].String() == addr.String() { + c.consuls[i], c.consuls[n-1] = c.consuls[n-1], nil + c.consuls = c.consuls[:n-1] + break + } + } + c.consulLock.Unlock() + } +}