agent: initial pass threading through tokens for services/checks

pull/891/head
Ryan Uber 2015-04-27 18:26:23 -07:00
parent 205af6ba75
commit bfb27d18cd
4 changed files with 58 additions and 20 deletions

View File

@ -155,7 +155,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
Port: agent.config.Ports.Server, Port: agent.config.Ports.Server,
Tags: []string{}, Tags: []string{},
} }
agent.state.AddService(&consulService) agent.state.AddService(&consulService, "")
} else { } else {
err = agent.setupClient() err = agent.setupClient()
agent.state.SetIface(agent.client) agent.state.SetIface(agent.client)
@ -591,7 +591,8 @@ func (a *Agent) purgeCheck(checkID string) error {
// AddService is used to add a service entry. // AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, persist bool) error { func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes,
persist bool, token string) error {
if service.Service == "" { if service.Service == "" {
return fmt.Errorf("Service name missing") return fmt.Errorf("Service name missing")
} }
@ -621,7 +622,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe
} }
// Add the service // Add the service
a.state.AddService(service) a.state.AddService(service, token)
// Persist the service to a file // Persist the service to a file
if persist { if persist {
@ -645,7 +646,7 @@ func (a *Agent) AddService(service *structs.NodeService, chkTypes CheckTypes, pe
ServiceID: service.ID, ServiceID: service.ID,
ServiceName: service.Service, ServiceName: service.Service,
} }
if err := a.AddCheck(check, chkType, persist); err != nil { if err := a.AddCheck(check, chkType, persist, token); err != nil {
return err return err
} }
} }
@ -696,7 +697,8 @@ func (a *Agent) RemoveService(serviceID string, persist bool) error {
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered. The Check may include a CheckType which // ensure it is registered. The Check may include a CheckType which
// is used to automatically update the check status // is used to automatically update the check status
func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist bool) error { func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType,
persist bool, token string) error {
if check.CheckID == "" { if check.CheckID == "" {
return fmt.Errorf("CheckID missing") return fmt.Errorf("CheckID missing")
} }
@ -775,7 +777,7 @@ func (a *Agent) AddCheck(check *structs.HealthCheck, chkType *CheckType, persist
} }
// Add to the local state for anti-entropy // Add to the local state for anti-entropy
a.state.AddCheck(check) a.state.AddCheck(check, token)
// Persist the check // Persist the check
if persist { if persist {
@ -920,7 +922,7 @@ func (a *Agent) loadServices(conf *Config) error {
for _, service := range conf.Services { for _, service := range conf.Services {
ns := service.NodeService() ns := service.NodeService()
chkTypes := service.CheckTypes() chkTypes := service.CheckTypes()
if err := a.AddService(ns, chkTypes, false); err != nil { if err := a.AddService(ns, chkTypes, false, service.Token); err != nil {
return fmt.Errorf("Failed to register service '%s': %v", service.ID, err) return fmt.Errorf("Failed to register service '%s': %v", service.ID, err)
} }
} }
@ -962,7 +964,7 @@ func (a *Agent) loadServices(conf *Config) error {
} else { } else {
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q", a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
svc.ID, filePath) svc.ID, filePath)
return a.AddService(svc, nil, false) return a.AddService(svc, nil, false, "")
} }
}) })
@ -991,7 +993,7 @@ func (a *Agent) loadChecks(conf *Config) error {
for _, check := range conf.Checks { for _, check := range conf.Checks {
health := check.HealthCheck(conf.NodeName) health := check.HealthCheck(conf.NodeName)
chkType := &check.CheckType chkType := &check.CheckType
if err := a.AddCheck(health, chkType, false); err != nil { if err := a.AddCheck(health, chkType, false, ""); err != nil {
return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check) return fmt.Errorf("Failed to register check '%s': %v %v", check.Name, err, check)
} }
} }
@ -1035,7 +1037,7 @@ func (a *Agent) loadChecks(conf *Config) error {
// services into the active pool // services into the active pool
p.Check.Status = structs.HealthCritical p.Check.Status = structs.HealthCritical
if err := a.AddCheck(p.Check, p.ChkType, false); err != nil { if err := a.AddCheck(p.Check, p.ChkType, false, ""); err != nil {
// Purge the check if it is unable to be restored. // Purge the check if it is unable to be restored.
a.logger.Printf("[WARN] agent: Failed to restore check %q: %s", a.logger.Printf("[WARN] agent: Failed to restore check %q: %s",
p.Check.CheckID, err) p.Check.CheckID, err)
@ -1112,7 +1114,7 @@ func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error {
ServiceName: service.Service, ServiceName: service.Service,
Status: structs.HealthCritical, Status: structs.HealthCritical,
} }
a.AddCheck(check, nil, true) a.AddCheck(check, nil, true, "")
a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID) a.logger.Printf("[INFO] agent: Service %q entered maintenance mode", serviceID)
return nil return nil
@ -1158,7 +1160,7 @@ func (a *Agent) EnableNodeMaintenance(reason string) {
Notes: reason, Notes: reason,
Status: structs.HealthCritical, Status: structs.HealthCritical,
} }
a.AddCheck(check, nil, true) a.AddCheck(check, nil, true, "")
a.logger.Printf("[INFO] agent: Node entered maintenance mode") a.logger.Printf("[INFO] agent: Node entered maintenance mode")
} }

View File

@ -97,8 +97,12 @@ func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Requ
return nil, nil return nil, nil
} }
// Get the provided token, if any
var token string
s.parseToken(req, &token)
// Add the check // Add the check
if err := s.agent.AddCheck(health, chkType, true); err != nil { if err := s.agent.AddCheck(health, chkType, true, token); err != nil {
return nil, err return nil, err
} }
s.syncChanges() s.syncChanges()
@ -199,8 +203,12 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
} }
} }
// Get the provided token, if any
var token string
s.parseToken(req, &token)
// Add the check // Add the check
if err := s.agent.AddService(ns, chkTypes, true); err != nil { if err := s.agent.AddService(ns, chkTypes, true, token); err != nil {
return nil, err return nil, err
} }
s.syncChanges() s.syncChanges()

View File

@ -48,10 +48,12 @@ type localState struct {
// Services tracks the local services // Services tracks the local services
services map[string]*structs.NodeService services map[string]*structs.NodeService
serviceStatus map[string]syncStatus serviceStatus map[string]syncStatus
serviceTokens map[string]string
// Checks tracks the local checks // Checks tracks the local checks
checks map[string]*structs.HealthCheck checks map[string]*structs.HealthCheck
checkStatus map[string]syncStatus checkStatus map[string]syncStatus
checkTokens map[string]string
// Used to track checks that are being deferred // Used to track checks that are being deferred
deferCheck map[string]*time.Timer deferCheck map[string]*time.Timer
@ -71,8 +73,10 @@ func (l *localState) Init(config *Config, logger *log.Logger) {
l.logger = logger l.logger = logger
l.services = make(map[string]*structs.NodeService) l.services = make(map[string]*structs.NodeService)
l.serviceStatus = make(map[string]syncStatus) l.serviceStatus = make(map[string]syncStatus)
l.serviceTokens = make(map[string]string)
l.checks = make(map[string]*structs.HealthCheck) l.checks = make(map[string]*structs.HealthCheck)
l.checkStatus = make(map[string]syncStatus) l.checkStatus = make(map[string]syncStatus)
l.checkTokens = make(map[string]string)
l.deferCheck = make(map[string]*time.Timer) l.deferCheck = make(map[string]*time.Timer)
l.consulCh = make(chan struct{}, 1) l.consulCh = make(chan struct{}, 1)
l.triggerCh = make(chan struct{}, 1) l.triggerCh = make(chan struct{}, 1)
@ -122,7 +126,7 @@ func (l *localState) isPaused() bool {
// AddService is used to add a service entry to the local state. // AddService is used to add a service entry to the local state.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (l *localState) AddService(service *structs.NodeService) { func (l *localState) AddService(service *structs.NodeService, token string) {
// Assign the ID if none given // Assign the ID if none given
if service.ID == "" && service.Service != "" { if service.ID == "" && service.Service != "" {
service.ID = service.Service service.ID = service.Service
@ -133,6 +137,7 @@ func (l *localState) AddService(service *structs.NodeService) {
l.services[service.ID] = service l.services[service.ID] = service
l.serviceStatus[service.ID] = syncStatus{} l.serviceStatus[service.ID] = syncStatus{}
l.serviceTokens[service.ID] = token
l.changeMade() l.changeMade()
} }
@ -163,7 +168,7 @@ func (l *localState) Services() map[string]*structs.NodeService {
// AddCheck is used to add a health check to the local state. // AddCheck is used to add a health check to the local state.
// This entry is persistent and the agent will make a best effort to // This entry is persistent and the agent will make a best effort to
// ensure it is registered // ensure it is registered
func (l *localState) AddCheck(check *structs.HealthCheck) { func (l *localState) AddCheck(check *structs.HealthCheck, token string) {
// Set the node name // Set the node name
check.Node = l.config.NodeName check.Node = l.config.NodeName
@ -172,6 +177,7 @@ func (l *localState) AddCheck(check *structs.HealthCheck) {
l.checks[check.CheckID] = check l.checks[check.CheckID] = check
l.checkStatus[check.CheckID] = syncStatus{} l.checkStatus[check.CheckID] = syncStatus{}
l.checkTokens[check.CheckID] = token
l.changeMade() l.changeMade()
} }
@ -436,11 +442,16 @@ func (l *localState) deleteService(id string) error {
return fmt.Errorf("ServiceID missing") return fmt.Errorf("ServiceID missing")
} }
token := l.serviceTokens[id]
if token == "" {
token = l.config.ACLToken
}
req := structs.DeregisterRequest{ req := structs.DeregisterRequest{
Datacenter: l.config.Datacenter, Datacenter: l.config.Datacenter,
Node: l.config.NodeName, Node: l.config.NodeName,
ServiceID: id, ServiceID: id,
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, WriteRequest: structs.WriteRequest{Token: token},
} }
var out struct{} var out struct{}
err := l.iface.RPC("Catalog.Deregister", &req, &out) err := l.iface.RPC("Catalog.Deregister", &req, &out)
@ -457,11 +468,16 @@ func (l *localState) deleteCheck(id string) error {
return fmt.Errorf("CheckID missing") return fmt.Errorf("CheckID missing")
} }
token := l.checkTokens[id]
if token == "" {
token = l.config.ACLToken
}
req := structs.DeregisterRequest{ req := structs.DeregisterRequest{
Datacenter: l.config.Datacenter, Datacenter: l.config.Datacenter,
Node: l.config.NodeName, Node: l.config.NodeName,
CheckID: id, CheckID: id,
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, WriteRequest: structs.WriteRequest{Token: token},
} }
var out struct{} var out struct{}
err := l.iface.RPC("Catalog.Deregister", &req, &out) err := l.iface.RPC("Catalog.Deregister", &req, &out)
@ -474,12 +490,17 @@ func (l *localState) deleteCheck(id string) error {
// syncService is used to sync a service to the server // syncService is used to sync a service to the server
func (l *localState) syncService(id string) error { func (l *localState) syncService(id string) error {
token := l.serviceTokens[id]
if token == "" {
token = l.config.ACLToken
}
req := structs.RegisterRequest{ req := structs.RegisterRequest{
Datacenter: l.config.Datacenter, Datacenter: l.config.Datacenter,
Node: l.config.NodeName, Node: l.config.NodeName,
Address: l.config.AdvertiseAddr, Address: l.config.AdvertiseAddr,
Service: l.services[id], Service: l.services[id],
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, WriteRequest: structs.WriteRequest{Token: token},
} }
// If the service has associated checks that are out of sync, // If the service has associated checks that are out of sync,
@ -530,13 +551,19 @@ func (l *localState) syncCheck(id string) error {
service = serv service = serv
} }
} }
token := l.checkTokens[id]
if token == "" {
token = l.config.ACLToken
}
req := structs.RegisterRequest{ req := structs.RegisterRequest{
Datacenter: l.config.Datacenter, Datacenter: l.config.Datacenter,
Node: l.config.NodeName, Node: l.config.NodeName,
Address: l.config.AdvertiseAddr, Address: l.config.AdvertiseAddr,
Service: service, Service: service,
Check: l.checks[id], Check: l.checks[id],
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken}, WriteRequest: structs.WriteRequest{Token: token},
} }
var out struct{} var out struct{}
err := l.iface.RPC("Catalog.Register", &req, &out) err := l.iface.RPC("Catalog.Register", &req, &out)

View File

@ -13,6 +13,7 @@ type ServiceDefinition struct {
Port int Port int
Check CheckType Check CheckType
Checks CheckTypes Checks CheckTypes
Token string
} }
func (s *ServiceDefinition) NodeService() *structs.NodeService { func (s *ServiceDefinition) NodeService() *structs.NodeService {