consul: Thread Tombstone GC through

pull/577/head
Armon Dadgar 2014-12-10 23:49:44 -08:00
parent 103112b591
commit 10604a6fa8
4 changed files with 33 additions and 20 deletions

View File

@ -21,6 +21,7 @@ type consulFSM struct {
logger *log.Logger
path string
state *StateStore
gc *TombstoneGC
}
// consulSnapshot is used to provide a snapshot of the current
@ -38,7 +39,7 @@ type snapshotHeader struct {
}
// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) {
func NewFSM(gc *TombstoneGC, path string, logOutput io.Writer) (*consulFSM, error) {
// Create a temporary path for the state store
tmpPath, err := ioutil.TempDir(path, "state")
if err != nil {
@ -46,7 +47,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) {
}
// Create a state store
state, err := NewStateStorePath(tmpPath, logOutput)
state, err := NewStateStorePath(gc, tmpPath, logOutput)
if err != nil {
return nil, err
}
@ -56,6 +57,7 @@ func NewFSM(path string, logOutput io.Writer) (*consulFSM, error) {
logger: log.New(logOutput, "", log.LstdFlags),
path: path,
state: state,
gc: gc,
}
return fsm, nil
}
@ -236,7 +238,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
}
// Create a new state store
state, err := NewStateStorePath(tmpPath, c.logOutput)
state, err := NewStateStorePath(c.gc, tmpPath, c.logOutput)
if err != nil {
return err
}

View File

@ -42,7 +42,7 @@ func TestFSM_RegisterNode(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -82,7 +82,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -139,7 +139,7 @@ func TestFSM_DeregisterService(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -198,7 +198,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -257,7 +257,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -328,7 +328,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -372,7 +372,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Try to restore on a new FSM
fsm2, err := NewFSM(path, os.Stderr)
fsm2, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -453,7 +453,7 @@ func TestFSM_KVSSet(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -492,7 +492,7 @@ func TestFSM_KVSDelete(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -542,7 +542,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -593,7 +593,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -654,7 +654,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -738,7 +738,7 @@ func TestFSM_KVSLock(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -787,7 +787,7 @@ func TestFSM_KVSUnlock(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -854,7 +854,7 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -15,7 +15,7 @@ func TestHealthCheckRace(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
fsm, err := NewFSM(path, os.Stderr)
fsm, err := NewFSM(nil, path, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -134,6 +134,10 @@ type Server struct {
sessionTimers map[string]*time.Timer
sessionTimersLock sync.Mutex
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
tombstoneGC *TombstoneGC
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
@ -189,6 +193,12 @@ func NewServer(config *Config) (*Server, error) {
// Create a logger
logger := log.New(config.LogOutput, "", log.LstdFlags)
// Create the tombstone GC
gc, err := NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
if err != nil {
return nil, err
}
// Create server
s := &Server{
config: config,
@ -201,6 +211,7 @@ func NewServer(config *Config) (*Server, error) {
remoteConsuls: make(map[string][]*serverParts),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
tombstoneGC: gc,
shutdownCh: make(chan struct{}),
}
@ -320,7 +331,7 @@ func (s *Server) setupRaft() error {
// Create the FSM
var err error
s.fsm, err = NewFSM(statePath, s.config.LogOutput)
s.fsm, err = NewFSM(s.tombstoneGC, statePath, s.config.LogOutput)
if err != nil {
return err
}