Browse Source

consul: adding basic skeleton

pull/19/head
Armon Dadgar 11 years ago
parent
commit
7fbd8ba370
  1. 27
      Makefile
  2. 70
      consul/config.go
  3. 41
      consul/fsm.go
  4. 223
      consul/server.go
  5. 37
      consul/server_test.go

27
Makefile

@ -0,0 +1,27 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)
all: deps
@mkdir -p bin/
@bash --norc -i ./scripts/build.sh
cov:
gocov test ./... | gocov-html > /tmp/coverage.html
open /tmp/coverage.html
deps:
go get -d -v ./...
echo $(DEPS) | xargs -n1 go get -d
test: deps
go list ./... | xargs -n1 go test
integ:
go list ./... | INTEG_TESTS=yes xargs -n1 go test
web:
./scripts/website_run.sh
web-push:
./scripts/website_push.sh
.PNONY: all cov deps integ test web web-push

70
consul/config.go

@ -0,0 +1,70 @@
package consul
import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"io"
"os"
)
const (
DefaultRaftAddr = "0.0.0.0:8300"
DefaultLANSerfPort = 8301
DefaultWANSerfPort = 8302
)
// Config is used to configure the server
type Config struct {
// Datacenter is the datacenter this Consul server represents
Datacenter string
// DataDir is the directory to store our state in
DataDir string
// Node name is the name we use to advertise. Defaults to hostname.
NodeName string
// Bind address for Raft (TCP)
RaftBindAddr string
// RaftConfig is the configuration used for Raft in the local DC
RaftConfig *raft.Config
// SerfLocalConfig is the configuration for the local serf
SerfLocalConfig *serf.Config
// SerfRemoteConfig is the configuration for the remtoe serf
SerfRemoteConfig *serf.Config
// LogOutput is the location to write logs to. If this is not set,
// logs will go to stderr.
LogOutput io.Writer
}
// DefaultConfig is used to return a sane default configuration
func DefaultConfig() *Config {
hostname, err := os.Hostname()
if err != nil {
panic(err)
}
conf := &Config{
Datacenter: "dc1",
NodeName: hostname,
RaftBindAddr: DefaultRaftAddr,
RaftConfig: raft.DefaultConfig(),
SerfLocalConfig: serf.DefaultConfig(),
SerfRemoteConfig: serf.DefaultConfig(),
}
// Remote Serf should use the WAN timing, since we are using it
// to communicate between DC's
conf.SerfRemoteConfig.MemberlistConfig = memberlist.DefaultWANConfig()
// Ensure we don't have port conflicts
conf.SerfLocalConfig.MemberlistConfig.Port = DefaultLANSerfPort
conf.SerfRemoteConfig.MemberlistConfig.Port = DefaultWANSerfPort
return conf
}

41
consul/fsm.go

@ -0,0 +1,41 @@
package consul
import (
"github.com/hashicorp/raft"
"io"
)
// consulFSM implements a finite state machine that is used
// along with Raft to provide strong consistency for various
// data that requires it. We implement this outside the Server
// to avoid exposing this outside the package.
type consulFSM struct {
server *Server
}
// consulSnapshot is used to provide a snapshot of the current
// state in a way that can be accessed concurrently with operations
// that may modify the live state.
type consulSnapshot struct {
fsm *consulFSM
}
func (c *consulFSM) Apply([]byte) interface{} {
return nil
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
snap := &consulSnapshot{fsm: c}
return snap, nil
}
func (c *consulFSM) Restore(io.ReadCloser) error {
return nil
}
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
return nil
}
func (s *consulSnapshot) Release() {
}

223
consul/server.go

@ -0,0 +1,223 @@
package consul
import (
"fmt"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"log"
"os"
"path/filepath"
"sync"
"time"
)
const (
serfLocalSnapshot = "serf/local.snapshot"
serfRemoteSnapshot = "serf/remote.snapshot"
raftState = "raft/"
)
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
config *Config
// eventChLocal is used to receive events from the
// serfLocal cluster
eventChLocal chan serf.Event
// eventChRemote is used to receive events from the
// serfRemote cluster
eventChRemote chan serf.Event
// fsm is the state machine used with Raft to provide
// strong consistency.
fsm *consulFSM
// Logger uses the provided LogOutput
logger *log.Logger
// The raft instance is used among Consul nodes within the
// DC to protect operations that require strong consistency
raft *raft.Raft
raftStore *raft.SQLiteStore
raftTransport *raft.NetworkTransport
// serfLocal is the Serf cluster maintained inside the DC
// which contains all the DC nodes
serfLocal *serf.Serf
// serfRemote is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers
serfRemote *serf.Serf
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
}
// NewServer is used to construct a new Consul server from the
// configuration, potentially returning an error
func NewServer(config *Config) (*Server, error) {
// Check for a data directory!
if config.DataDir == "" {
return nil, fmt.Errorf("Config must provide a DataDir")
}
// Ensure we have a log output
if config.LogOutput == nil {
config.LogOutput = os.Stderr
}
// Create a logger
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create server
s := &Server{
config: config,
eventChLocal: make(chan serf.Event, 256),
eventChRemote: make(chan serf.Event, 256),
logger: logger,
shutdownCh: make(chan struct{}),
}
// Start the Serf listeners to prevent a deadlock
go s.localEventHandler()
go s.remoteEventHandler()
// Initialize the local Serf
var err error
s.serfLocal, err = s.setupSerf(config.SerfLocalConfig, s.eventChLocal, serfLocalSnapshot)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start local serf: %v", err)
}
// Initialize the remote Serf
s.serfRemote, err = s.setupSerf(config.SerfRemoteConfig, s.eventChRemote, serfRemoteSnapshot)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start remote serf: %v", err)
}
// Initialize the Raft server
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
return s, nil
}
// ensurePath is used to make sure a path exists
func (s *Server) ensurePath(path string, dir bool) error {
if !dir {
path = filepath.Dir(path)
}
return os.MkdirAll(path, 0755)
}
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.NodeName = s.config.NodeName
conf.Role = fmt.Sprintf("consul:%s", s.config.Datacenter)
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.EventCh = ch
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
if err := s.ensurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error {
// Create the base path
path := filepath.Join(s.config.DataDir, raftState)
if err := s.ensurePath(path, true); err != nil {
return err
}
// Create the SQLite store for logs and stable storage
store, err := raft.NewSQLiteStore(path)
if err != nil {
return err
}
// Create the snapshot store
snapshots, err := raft.NewFileSnapshotStore(path, 3)
if err != nil {
store.Close()
return err
}
// Create a transport layer
trans, err := raft.NewTCPTransport(s.config.RaftBindAddr, 3, 10*time.Second)
if err != nil {
store.Close()
return err
}
// Setup the peer store
peers := raft.NewJSONPeers(path, trans)
// Create the FSM
s.fsm = &consulFSM{server: s}
// Setup the Raft store
raft, err := raft.NewRaft(s.config.RaftConfig, s.fsm, store, store, snapshots,
peers, trans)
if err != nil {
store.Close()
trans.Close()
return err
}
s.raft = raft
s.raftStore = store
s.raftTransport = trans
return nil
}
// Shutdown is used to shutdown the server
func (s *Server) Shutdown() error {
s.logger.Printf("[INFO] Shutting down Consul server")
s.shutdownLock.Lock()
defer s.shutdownLock.Unlock()
if s.shutdown {
return nil
}
s.shutdown = true
close(s.shutdownCh)
if s.serfLocal != nil {
s.serfLocal.Shutdown()
s.serfLocal = nil
}
if s.serfRemote != nil {
s.serfRemote.Shutdown()
s.serfRemote = nil
}
if s.raft != nil {
s.raft.Shutdown()
s.raftStore.Close()
s.raftTransport.Close()
s.raft = nil
s.raftStore = nil
s.raftTransport = nil
}
return nil
}
// localEventHandler is used to handle events from the local Serf cluster
func (s *Server) localEventHandler() {
}
// remoteEventHandler is used to handle events from the remote Serf cluster
func (s *Server) remoteEventHandler() {
}

37
consul/server_test.go

@ -0,0 +1,37 @@
package consul
import (
"io/ioutil"
"os"
"testing"
)
func tmpDir(t *testing.T) string {
dir, err := ioutil.TempDir("", "consul")
if err != nil {
t.Fatalf("err: %v", err)
}
return dir
}
func TestServer_StartStop(t *testing.T) {
dir := tmpDir(t)
defer os.RemoveAll(dir)
config := DefaultConfig()
config.DataDir = dir
server, err := NewServer(config)
if err != nil {
t.Fatalf("err: %v", err)
}
if err := server.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
// Idempotent
if err := server.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
}
Loading…
Cancel
Save