diff --git a/command/agent/agent.go b/command/agent/agent.go index 289637adcc..418c5fef4c 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -47,6 +47,18 @@ type Agent struct { checkTTLs map[string]*CheckTTL checkLock sync.Mutex + // eventCh is used to receive user events + eventCh chan serf.UserEvent + + // eventBuf stores the most recent events in a ring buffer + // using eventIndex as the next index to insert into. This + // is guarded by eventLock. When an insert happens, the + // eventNotify group is notified. + eventBuf []*UserEvent + eventIndex int + eventLock sync.RWMutex + eventNotify consul.NotifyGroup + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -89,6 +101,8 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { logOutput: logOutput, checkMonitors: make(map[string]*CheckMonitor), checkTTLs: make(map[string]*CheckTTL), + eventCh: make(chan serf.UserEvent, 1024), + eventBuf: make([]*UserEvent, 256), shutdownCh: make(chan struct{}), } @@ -108,6 +122,9 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { return nil, err } + // Start handling events + go agent.handleEvents() + // Write out the PID file if necessary err = agent.storePid() if err != nil { @@ -219,6 +236,14 @@ func (a *Agent) consulConfig() *consul.Config { // Setup the ServerUp callback base.ServerUp = a.state.ConsulServerUp + // Setup the user event callback + base.UserEventHandler = func(e serf.UserEvent) { + select { + case a.eventCh <- e: + case <-a.shutdownCh: + } + } + // Setup the loggers base.LogOutput = a.logOutput return base diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go new file mode 100644 index 0000000000..bd3e90f6ef --- /dev/null +++ b/command/agent/event_endpoint.go @@ -0,0 +1,176 @@ +package agent + +import ( + "bytes" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/hashicorp/consul/consul/structs" +) + +const ( + // maxQueryTime is used to bound the limit of a blocking query + maxQueryTime = 600 * time.Second +) + +// EventFire is used to fire a new event +func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Mandate a PUT request + if req.Method != "PUT" { + resp.WriteHeader(405) + return nil, nil + } + + // Get the datacenter + var dc string + s.parseDC(req, &dc) + + event := &UserEvent{} + event.Name = strings.TrimPrefix(req.URL.Path, "/v1/event/fire/") + if event.Name == "" { + resp.WriteHeader(400) + resp.Write([]byte("Missing name")) + return nil, nil + } + + // Get the filters + if filt := req.URL.Query().Get("node"); filt != "" { + event.NodeFilter = filt + } + if filt := req.URL.Query().Get("service"); filt != "" { + event.ServiceFilter = filt + } + if filt := req.URL.Query().Get("tag"); filt != "" { + event.TagFilter = filt + } + + // Get the payload + if req.ContentLength > 0 { + var buf bytes.Buffer + if _, err := io.Copy(&buf, req.Body); err != nil { + return nil, err + } + event.Payload = buf.Bytes() + } + + // Try to fire the event + if err := s.agent.UserEvent(dc, event); err != nil { + return nil, err + } + + // Return the event + return event, nil +} + +// EventList is used to retrieve the recent list of events +func (s *HTTPServer) EventList(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Parse the query options, since we simulate a blocking query + var b structs.QueryOptions + if parseWait(resp, req, &b) { + return nil, nil + } + + // Look for a name filter + var nameFilter string + if filt := req.URL.Query().Get("name"); filt != "" { + nameFilter = filt + } + + // Lots of this logic is borrowed from consul/rpc.go:blockingRPC + // However we cannot use that directly since this code has some + // slight semantics differences... + var timeout <-chan time.Time + var notifyCh chan struct{} + + // Fast path non-blocking + if b.MinQueryIndex == 0 { + goto RUN_QUERY + } + + // Restrict the max query time + if b.MaxQueryTime > maxQueryTime { + b.MaxQueryTime = maxQueryTime + } + + // Ensure a time limit is set if we have an index + if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 { + b.MaxQueryTime = maxQueryTime + } + + // Setup a query timeout + if b.MaxQueryTime > 0 { + timeout = time.After(b.MaxQueryTime) + } + + // Setup a notification channel for changes +SETUP_NOTIFY: + if b.MinQueryIndex > 0 { + notifyCh = make(chan struct{}, 1) + s.agent.eventNotify.Wait(notifyCh) + } + +RUN_QUERY: + // Get the recent events + events := s.agent.UserEvents() + + // Filter the events if necessary + if nameFilter != "" { + n := len(events) + for i := 0; i < n; i++ { + if events[i].Name == nameFilter { + continue + } + events[i], events[n-1] = events[n-1], nil + i-- + n-- + } + events = events[:n] + } + + // Determine the index + var index uint64 + if len(events) == 0 { + // Return a non-zero index to prevent a hot query loop. This + // can be caused by a watch for example when there is no matching + // events. + index = 1 + } else { + last := events[len(events)-1] + index = uuidToUint64(last.ID) + } + setIndex(resp, index) + + // Check for exactly match on the query value. Because + // the index value is not monotonic, we just ensure it is + // not an exact match. + if index > 0 && index == b.MinQueryIndex { + select { + case <-notifyCh: + goto SETUP_NOTIFY + case <-timeout: + } + } + return events, nil +} + +// uuidToUint64 is a bit of a hack to generate a 64bit Consul index. +// In effect, we take our random UUID, convert it to a 128 bit number, +// then XOR the high-order and low-order 64bit's together to get the +// output. This lets us generate an index which can be used to simulate +// the blocking behavior of other catalog endpoints. +func uuidToUint64(uuid string) uint64 { + lower := uuid[0:8] + uuid[9:13] + uuid[14:18] + upper := uuid[19:23] + uuid[24:36] + lowVal, err := strconv.ParseUint(lower, 16, 64) + if err != nil { + panic("Failed to convert " + lower) + } + highVal, err := strconv.ParseUint(upper, 16, 64) + if err != nil { + panic("Failed to convert " + upper) + } + return lowVal ^ highVal +} diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go new file mode 100644 index 0000000000..6740a3c574 --- /dev/null +++ b/command/agent/event_endpoint_test.go @@ -0,0 +1,200 @@ +package agent + +import ( + "bytes" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/hashicorp/consul/testutil" +) + +func TestEventFire(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + body := bytes.NewBuffer([]byte("test")) + url := "/v1/event/fire/test?node=Node&service=foo&tag=bar" + req, err := http.NewRequest("PUT", url, body) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := httptest.NewRecorder() + obj, err := srv.EventFire(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + event, ok := obj.(*UserEvent) + if !ok { + t.Fatalf("bad: %#v", obj) + } + + if event.ID == "" { + t.Fatalf("bad: %#v", event) + } + if event.Name != "test" { + t.Fatalf("bad: %#v", event) + } + if string(event.Payload) != "test" { + t.Fatalf("bad: %#v", event) + } + if event.NodeFilter != "Node" { + t.Fatalf("bad: %#v", event) + } + if event.ServiceFilter != "foo" { + t.Fatalf("bad: %#v", event) + } + if event.TagFilter != "bar" { + t.Fatalf("bad: %#v", event) + } + }) +} + +func TestEventList(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + p := &UserEvent{Name: "test"} + if err := srv.agent.UserEvent("", p); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + req, err := http.NewRequest("GET", "/v1/event/list", nil) + if err != nil { + return false, err + } + resp := httptest.NewRecorder() + obj, err := srv.EventList(resp, req) + if err != nil { + return false, err + } + + list, ok := obj.([]*UserEvent) + if !ok { + return false, fmt.Errorf("bad: %#v", obj) + } + if len(list) != 1 || list[0].Name != "test" { + return false, fmt.Errorf("bad: %#v", list) + } + header := resp.Header().Get("X-Consul-Index") + if header == "" || header == "0" { + return false, fmt.Errorf("bad: %#v", header) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + }) +} + +func TestEventList_Filter(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + p := &UserEvent{Name: "test"} + if err := srv.agent.UserEvent("", p); err != nil { + t.Fatalf("err: %v", err) + } + + p = &UserEvent{Name: "foo"} + if err := srv.agent.UserEvent("", p); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + req, err := http.NewRequest("GET", "/v1/event/list?name=foo", nil) + if err != nil { + return false, err + } + resp := httptest.NewRecorder() + obj, err := srv.EventList(resp, req) + if err != nil { + return false, err + } + + list, ok := obj.([]*UserEvent) + if !ok { + return false, fmt.Errorf("bad: %#v", obj) + } + if len(list) != 1 || list[0].Name != "foo" { + return false, fmt.Errorf("bad: %#v", list) + } + header := resp.Header().Get("X-Consul-Index") + if header == "" || header == "0" { + return false, fmt.Errorf("bad: %#v", header) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + }) +} + +func TestEventList_Blocking(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + p := &UserEvent{Name: "test"} + if err := srv.agent.UserEvent("", p); err != nil { + t.Fatalf("err: %v", err) + } + + var index string + testutil.WaitForResult(func() (bool, error) { + req, err := http.NewRequest("GET", "/v1/event/list", nil) + if err != nil { + return false, err + } + resp := httptest.NewRecorder() + _, err = srv.EventList(resp, req) + if err != nil { + return false, err + } + header := resp.Header().Get("X-Consul-Index") + if header == "" || header == "0" { + return false, fmt.Errorf("bad: %#v", header) + } + index = header + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + go func() { + time.Sleep(50 * time.Millisecond) + p := &UserEvent{Name: "second"} + if err := srv.agent.UserEvent("", p); err != nil { + t.Fatalf("err: %v", err) + } + }() + + testutil.WaitForResult(func() (bool, error) { + url := "/v1/event/list?index=" + index + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return false, err + } + resp := httptest.NewRecorder() + obj, err := srv.EventList(resp, req) + if err != nil { + return false, err + } + + list, ok := obj.([]*UserEvent) + if !ok { + return false, fmt.Errorf("bad: %#v", obj) + } + if len(list) != 2 || list[1].Name != "second" { + return false, fmt.Errorf("bad: %#v", list) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + }) +} + +func TestUUIDToUint64(t *testing.T) { + inp := "cb9a81ad-fff6-52ac-92a7-5f70687805ec" + + // Output value was computed using python + if uuidToUint64(inp) != 6430540886266763072 { + t.Fatalf("bad") + } +} diff --git a/command/agent/http.go b/command/agent/http.go index bfc67577c5..ad1a64bfd2 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -93,6 +93,9 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) s.mux.HandleFunc("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) + s.mux.HandleFunc("/v1/event/fire/", s.wrap(s.EventFire)) + s.mux.HandleFunc("/v1/event/list", s.wrap(s.EventList)) + s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint)) s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate)) diff --git a/command/agent/user_event.go b/command/agent/user_event.go new file mode 100644 index 0000000000..be1037d51d --- /dev/null +++ b/command/agent/user_event.go @@ -0,0 +1,267 @@ +package agent + +import ( + "bytes" + "fmt" + "regexp" + + "github.com/hashicorp/consul/consul/structs" + "github.com/ugorji/go/codec" +) + +const ( + // userEventMaxVersion is the maximum protocol version we understand + userEventMaxVersion = 1 +) + +// UserEventParam is used to parameterize a user event +type UserEvent struct { + // ID of the user event. Automatically generated. + ID string + + // Name of the event + Name string `codec:"n"` + + // Optional payload + Payload []byte `codec:"p,omitempty"` + + // NodeFilter is a regular expression to filter on nodes + NodeFilter string `codec:"nf,omitempty"` + + // ServiceFilter is a regular expression to filter on services + ServiceFilter string `codec:"sf,omitempty"` + + // TagFilter is a regular expression to filter on tags of a service, + // must be provided with ServiceFilter + TagFilter string `codec:"tf,omitempty"` + + // Version of the user event. Automatically generated. + Version int `codec:"v"` + + // LTime is the lamport time. Automatically generated. + LTime uint64 `codec:"-"` +} + +// validateUserEventParams is used to sanity check the inputs +func validateUserEventParams(params *UserEvent) error { + // Validate the inputs + if params.Name == "" { + return fmt.Errorf("User event missing name") + } + if params.TagFilter != "" && params.ServiceFilter == "" { + return fmt.Errorf("Cannot provide tag filter without service filter") + } + if params.NodeFilter != "" { + if _, err := regexp.Compile(params.NodeFilter); err != nil { + return fmt.Errorf("Invalid node filter: %v", err) + } + } + if params.ServiceFilter != "" { + if _, err := regexp.Compile(params.ServiceFilter); err != nil { + return fmt.Errorf("Invalid service filter: %v", err) + } + } + if params.TagFilter != "" { + if _, err := regexp.Compile(params.TagFilter); err != nil { + return fmt.Errorf("Invalid tag filter: %v", err) + } + } + return nil +} + +// UserEvent is used to fire an event via the Serf layer on the LAN +func (a *Agent) UserEvent(dc string, params *UserEvent) error { + // Validate the params + if err := validateUserEventParams(params); err != nil { + return err + } + + // Format message + params.ID = generateUUID() + params.Version = userEventMaxVersion + payload, err := encodeUserEvent(¶ms) + if err != nil { + return fmt.Errorf("UserEvent encoding failed: %v", err) + } + + // Check if this is the local DC, fire locally + if dc == "" || dc == a.config.Datacenter { + if a.server != nil { + return a.server.UserEvent(params.Name, payload) + } else { + return a.client.UserEvent(params.Name, payload) + } + } else { + // Send an RPC to remote datacenter to service this + args := structs.EventFireRequest{ + Datacenter: dc, + Name: params.Name, + Payload: payload, + } + + // Any server can process in the remote DC, since the + // gossip will take over anyways + args.AllowStale = true + var out structs.EventFireResponse + return a.RPC("Internal.EventFire", &args, &out) + } +} + +// handleEvents is used to process incoming user events +func (a *Agent) handleEvents() { + for { + select { + case e := <-a.eventCh: + // Decode the event + msg := new(UserEvent) + if err := decodeUserEvent(e.Payload, msg); err != nil { + a.logger.Printf("[ERR] agent: Failed to decode event: %v", err) + continue + } + msg.LTime = uint64(e.LTime) + + // Skip if we don't pass filtering + if !a.shouldProcessUserEvent(msg) { + continue + } + + // Ingest the event + a.ingestUserEvent(msg) + + case <-a.shutdownCh: + return + } + } +} + +// shouldProcessUserEvent checks if an event makes it through our filters +func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool { + // Check the version + if msg.Version > userEventMaxVersion { + a.logger.Printf("[WARN] agent: Event version %d may have unsupported features (%s)", + msg.Version, msg.Name) + } + + // Apply the filters + if msg.NodeFilter != "" { + re, err := regexp.Compile(msg.NodeFilter) + if err != nil { + a.logger.Printf("[ERR] agent: Failed to parse node filter '%s' for event '%s': %v", + msg.NodeFilter, msg.Name, err) + return false + } + if !re.MatchString(a.config.NodeName) { + return false + } + } + + if msg.ServiceFilter != "" { + re, err := regexp.Compile(msg.ServiceFilter) + if err != nil { + a.logger.Printf("[ERR] agent: Failed to parse service filter '%s' for event '%s': %v", + msg.ServiceFilter, msg.Name, err) + return false + } + + var tagRe *regexp.Regexp + if msg.TagFilter != "" { + re, err := regexp.Compile(msg.TagFilter) + if err != nil { + a.logger.Printf("[ERR] agent: Failed to parse tag filter '%s' for event '%s': %v", + msg.TagFilter, msg.Name, err) + return false + } + tagRe = re + } + + // Scan for a match + services := a.state.Services() + found := false + OUTER: + for name, info := range services { + // Check the service name + if !re.MatchString(name) { + continue + } + if tagRe == nil { + found = true + break + } + + // Look for a matching tag + for _, tag := range info.Tags { + if !tagRe.MatchString(tag) { + continue + } + found = true + break OUTER + } + } + + // No matching services + if !found { + return false + } + } + return true +} + +// ingestUserEvent is used to process an event that passes filtering +func (a *Agent) ingestUserEvent(msg *UserEvent) { + a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID) + a.eventLock.Lock() + defer func() { + a.eventLock.Unlock() + a.eventNotify.Notify() + }() + + idx := a.eventIndex + a.eventBuf[idx] = msg + a.eventIndex = (idx + 1) % len(a.eventBuf) +} + +// UserEvents is used to return a slice of the most recent +// user events. +func (a *Agent) UserEvents() []*UserEvent { + n := len(a.eventBuf) + out := make([]*UserEvent, n) + a.eventLock.RLock() + defer a.eventLock.RUnlock() + + // Check if the buffer is full + if a.eventBuf[a.eventIndex] != nil { + if a.eventIndex == 0 { + copy(out, a.eventBuf) + } else { + copy(out, a.eventBuf[a.eventIndex:]) + copy(out[n-a.eventIndex:], a.eventBuf[:a.eventIndex]) + } + } else { + // We haven't filled the buffer yet + copy(out, a.eventBuf[:a.eventIndex]) + out = out[:a.eventIndex] + } + return out +} + +// LastUserEvent is used to return the lastest user event. +// This will return nil if there is no recent event. +func (a *Agent) LastUserEvent() *UserEvent { + a.eventLock.RLock() + defer a.eventLock.RUnlock() + n := len(a.eventBuf) + idx := (((a.eventIndex - 1) % n) + n) % n + return a.eventBuf[idx] +} + +// Decode is used to decode a MsgPack encoded object +func decodeUserEvent(buf []byte, out interface{}) error { + return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) +} + +// encodeUserEvent is used to encode user event +func encodeUserEvent(msg interface{}) ([]byte, error) { + var buf bytes.Buffer + err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg) + return buf.Bytes(), err +} diff --git a/command/agent/user_event_test.go b/command/agent/user_event_test.go new file mode 100644 index 0000000000..3c2fcbe34e --- /dev/null +++ b/command/agent/user_event_test.go @@ -0,0 +1,188 @@ +package agent + +import ( + "os" + "strings" + "testing" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" +) + +func TestValidateUserEventParams(t *testing.T) { + p := &UserEvent{} + err := validateUserEventParams(p) + if err == nil || err.Error() != "User event missing name" { + t.Fatalf("err: %v", err) + } + p.Name = "foo" + + p.NodeFilter = "(" + err = validateUserEventParams(p) + if err == nil || !strings.Contains(err.Error(), "Invalid node filter") { + t.Fatalf("err: %v", err) + } + + p.NodeFilter = "" + p.ServiceFilter = "(" + err = validateUserEventParams(p) + if err == nil || !strings.Contains(err.Error(), "Invalid service filter") { + t.Fatalf("err: %v", err) + } + + p.ServiceFilter = "foo" + p.TagFilter = "(" + err = validateUserEventParams(p) + if err == nil || !strings.Contains(err.Error(), "Invalid tag filter") { + t.Fatalf("err: %v", err) + } + + p.ServiceFilter = "" + p.TagFilter = "foo" + err = validateUserEventParams(p) + if err == nil || !strings.Contains(err.Error(), "tag filter without service") { + t.Fatalf("err: %v", err) + } +} + +func TestShouldProcessUserEvent(t *testing.T) { + conf := nextConfig() + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + srv1 := &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Tags: []string{"test", "foo", "bar", "master"}, + Port: 5000, + } + agent.state.AddService(srv1) + + p := &UserEvent{} + if !agent.shouldProcessUserEvent(p) { + t.Fatalf("bad") + } + + // Bad node name + p = &UserEvent{ + NodeFilter: "foobar", + } + if agent.shouldProcessUserEvent(p) { + t.Fatalf("bad") + } + + // Good node name + p = &UserEvent{ + NodeFilter: "^Node", + } + if !agent.shouldProcessUserEvent(p) { + t.Fatalf("bad") + } + + // Bad service name + p = &UserEvent{ + ServiceFilter: "foobar", + } + if agent.shouldProcessUserEvent(p) { + t.Fatalf("bad") + } + + // Good service name + p = &UserEvent{ + ServiceFilter: ".*sql", + } + if !agent.shouldProcessUserEvent(p) { + t.Fatalf("bad") + } + + // Bad tag name + p = &UserEvent{ + ServiceFilter: ".*sql", + TagFilter: "slave", + } + if agent.shouldProcessUserEvent(p) { + t.Fatalf("bad") + } + + // Good service name + p = &UserEvent{ + ServiceFilter: ".*sql", + TagFilter: "master", + } + if !agent.shouldProcessUserEvent(p) { + t.Fatalf("bad") + } +} + +func TestIngestUserEvent(t *testing.T) { + conf := nextConfig() + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + for i := 0; i < 512; i++ { + msg := &UserEvent{LTime: uint64(i)} + agent.ingestUserEvent(msg) + if agent.LastUserEvent() != msg { + t.Fatalf("bad: %#v", msg) + } + events := agent.UserEvents() + + expectLen := 256 + if i < 256 { + expectLen = i + 1 + } + if len(events) != expectLen { + t.Fatalf("bad: %d %d %d", i, expectLen, len(events)) + } + + counter := i + for j := len(events) - 1; j >= 0; j-- { + if events[j].LTime != uint64(counter) { + t.Fatalf("bad: %#v", events) + } + counter-- + } + } +} + +func TestFireReceiveEvent(t *testing.T) { + conf := nextConfig() + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + srv1 := &structs.NodeService{ + ID: "mysql", + Service: "mysql", + Tags: []string{"test", "foo", "bar", "master"}, + Port: 5000, + } + agent.state.AddService(srv1) + + p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} + err := agent.UserEvent("", p1) + if err != nil { + t.Fatalf("err: %v", err) + } + + p2 := &UserEvent{Name: "deploy"} + err = agent.UserEvent("", p2) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult( + func() (bool, error) { + return len(agent.UserEvents()) == 1, nil + }, + func(err error) { + t.Fatalf("bad len") + }) + + last := agent.LastUserEvent() + if last.ID != p2.ID { + t.Fatalf("bad: %#v", last) + } +} diff --git a/command/agent/util.go b/command/agent/util.go index e2fdf44abd..b753505b50 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -1,6 +1,8 @@ package agent import ( + crand "crypto/rand" + "fmt" "math" "math/rand" "os" @@ -59,3 +61,18 @@ func ExecScript(script string) (*exec.Cmd, error) { cmd := exec.Command(shell, flag, script) return cmd, nil } + +// generateUUID is used to generate a random UUID +func generateUUID() string { + buf := make([]byte, 16) + if _, err := crand.Read(buf); err != nil { + panic(fmt.Errorf("failed to read random bytes: %v", err)) + } + + return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x", + buf[0:4], + buf[4:6], + buf[6:8], + buf[8:10], + buf[10:16]) +} diff --git a/command/event.go b/command/event.go new file mode 100644 index 0000000000..d0030d1e73 --- /dev/null +++ b/command/event.go @@ -0,0 +1,139 @@ +package command + +import ( + "flag" + "fmt" + "regexp" + "strings" + + "github.com/armon/consul-api" + "github.com/mitchellh/cli" +) + +// EventCommand is a Command implementation that is used to +// fire new events +type EventCommand struct { + Ui cli.Ui +} + +func (c *EventCommand) Help() string { + helpText := ` +Usage: consul event [options] [payload] + + Dispatches a custom user event across a datacenter. An event must provide + a name, but a payload is optional. Events support filtering using + regular expressions on node name, service, and tag definitions. + +Options: + + -http-addr=127.0.0.1:8500 HTTP address of the Consul agent. + -datacenter="" Datacenter to dispatch in. Defaults to that of agent. + -name="" Name of the event. + -node="" Regular expression to filter on node names + -service="" Regular expression to filter on service instances + -tag="" Regular expression to filter on service tags. Must be used + with -service. +` + return strings.TrimSpace(helpText) +} + +func (c *EventCommand) Run(args []string) int { + var datacenter, name, node, service, tag string + cmdFlags := flag.NewFlagSet("event", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.StringVar(&datacenter, "datacenter", "", "") + cmdFlags.StringVar(&name, "name", "", "") + cmdFlags.StringVar(&node, "node", "", "") + cmdFlags.StringVar(&service, "service", "", "") + cmdFlags.StringVar(&tag, "tag", "", "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + // Check for a name + if name == "" { + c.Ui.Error("Event name must be specified") + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + + // Validate the filters + if node != "" { + if _, err := regexp.Compile(node); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to compile node filter regexp: %v", err)) + return 1 + } + } + if service != "" { + if _, err := regexp.Compile(service); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to compile service filter regexp: %v", err)) + return 1 + } + } + if tag != "" { + if _, err := regexp.Compile(tag); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to compile tag filter regexp: %v", err)) + return 1 + } + } + if tag != "" && service == "" { + c.Ui.Error("Cannot provide tag filter without service filter.") + return 1 + } + + // Check for a payload + var payload []byte + args = cmdFlags.Args() + switch len(args) { + case 0: + case 1: + payload = []byte(args[0]) + default: + c.Ui.Error("Too many command line arguments.") + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + + // Create and test the HTTP client + client, err := HTTPClient(*httpAddr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + _, err = client.Agent().NodeName() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) + return 1 + } + + // Prepare the request + event := client.Event() + params := &consulapi.UserEvent{ + Name: name, + Payload: payload, + NodeFilter: node, + ServiceFilter: service, + TagFilter: tag, + } + opts := &consulapi.WriteOptions{ + Datacenter: datacenter, + } + + // Fire the event + id, _, err := event.Fire(params, opts) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error firing event: %s", err)) + return 1 + } + + // Write out the ID + c.Ui.Output(fmt.Sprintf("Event ID: %s", id)) + return 0 +} + +func (c *EventCommand) Synopsis() string { + return "Fire a new event" +} diff --git a/command/event_test.go b/command/event_test.go new file mode 100644 index 0000000000..ee4b929a03 --- /dev/null +++ b/command/event_test.go @@ -0,0 +1,29 @@ +package command + +import ( + "github.com/mitchellh/cli" + "strings" + "testing" +) + +func TestEventCommand_implements(t *testing.T) { + var _ cli.Command = &WatchCommand{} +} + +func TestEventCommandRun(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + + ui := new(cli.MockUi) + c := &EventCommand{Ui: ui} + args := []string{"-http-addr=" + a1.httpAddr, "-name=cmd"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.OutputWriter.String(), "Event ID: ") { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } +} diff --git a/command/watch.go b/command/watch.go index 30bb57fab6..0ad514b920 100644 --- a/command/watch.go +++ b/command/watch.go @@ -41,6 +41,7 @@ Options: Watch Specification: -key=val Specifies the key to watch. Only for 'key' type. + -name=val Specifies an event name to watch. Only for 'event' type. -passingonly=[true|false] Specifies if only hosts passing all checks are displayed. Optional for 'service' type. Defaults false. -prefix=val Specifies the key prefix to watch. Only for 'keyprefix' type. @@ -50,13 +51,13 @@ Watch Specification: -tag=val Specifies the service tag to filter on. Optional for 'service' type. -type=val Specifies the watch type. One of key, keyprefix - services, nodes, service, or checks. + services, nodes, service, checks, or event. ` return strings.TrimSpace(helpText) } func (c *WatchCommand) Run(args []string) int { - var watchType, datacenter, token, key, prefix, service, tag, passingOnly, state string + var watchType, datacenter, token, key, prefix, service, tag, passingOnly, state, name string cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError) cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.StringVar(&watchType, "type", "", "") @@ -68,6 +69,7 @@ func (c *WatchCommand) Run(args []string) int { cmdFlags.StringVar(&tag, "tag", "", "") cmdFlags.StringVar(&passingOnly, "passingonly", "", "") cmdFlags.StringVar(&state, "state", "", "") + cmdFlags.StringVar(&name, "name", "", "") httpAddr := HTTPAddrFlag(cmdFlags) if err := cmdFlags.Parse(args); err != nil { return 1 @@ -110,6 +112,9 @@ func (c *WatchCommand) Run(args []string) int { if state != "" { params["state"] = state } + if name != "" { + params["name"] = name + } if passingOnly != "" { b, err := strconv.ParseBool(passingOnly) if err != nil { diff --git a/commands.go b/commands.go index 1b98e013af..f68cf20801 100644 --- a/commands.go +++ b/commands.go @@ -25,6 +25,12 @@ func init() { }, nil }, + "event": func() (cli.Command, error) { + return &command.EventCommand{ + Ui: ui, + }, nil + }, + "force-leave": func() (cli.Command, error) { return &command.ForceLeaveCommand{ Ui: ui, diff --git a/consul/client.go b/consul/client.go index 70626d2de2..050d147c2b 100644 --- a/consul/client.go +++ b/consul/client.go @@ -201,6 +201,11 @@ func (c *Client) RemoveFailedNode(node string) error { return c.serf.RemoveFailedNode(node) } +// UserEvent is used to fire an event via the Serf layer +func (c *Client) UserEvent(name string, payload []byte) error { + return c.serf.UserEvent(userEventName(name), payload, false) +} + // lanEventHandler is used to handle events from the lan Serf cluster func (c *Client) lanEventHandler() { for { @@ -295,14 +300,22 @@ func (c *Client) localEvent(event serf.UserEvent) { return } - switch event.Name { - case newLeaderEvent: + switch name := event.Name; { + case name == newLeaderEvent: c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload) // Trigger the callback if c.config.ServerUp != nil { c.config.ServerUp() } + case isUserEvent(name): + event.Name = rawUserEventName(name) + c.logger.Printf("[DEBUG] consul: user event: %s", event.Name) + + // Trigger the callback + if c.config.UserEventHandler != nil { + c.config.UserEventHandler(event) + } default: c.logger.Printf("[WARN] consul: Unhandled local event: %v", event) } diff --git a/consul/client_test.go b/consul/client_test.go index 59785b57b8..a783c3a52c 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -2,12 +2,14 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/testutil" "net" "os" "testing" "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/serf/serf" ) func testClientConfig(t *testing.T, NodeName string) (string, *Config) { @@ -44,6 +46,17 @@ func testClientDC(t *testing.T, dc string) (string, *Client) { return dir, client } +func testClientWithConfig(t *testing.T, cb func(c *Config)) (string, *Client) { + name := fmt.Sprintf("Client %d", getPort()) + dir, config := testClientConfig(t, name) + cb(config) + client, err := NewClient(config) + if err != nil { + t.Fatalf("err: %v", err) + } + return dir, client +} + func TestClient_StartStop(t *testing.T) { dir, client := testClient(t) defer os.RemoveAll(dir) @@ -178,3 +191,81 @@ func TestClient_RPC_TLS(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestClientServer_UserEvent(t *testing.T) { + clientOut := make(chan serf.UserEvent, 2) + dir1, c1 := testClientWithConfig(t, func(conf *Config) { + conf.UserEventHandler = func(e serf.UserEvent) { + clientOut <- e + } + }) + defer os.RemoveAll(dir1) + defer c1.Shutdown() + + serverOut := make(chan serf.UserEvent, 2) + dir2, s1 := testServerWithConfig(t, func(conf *Config) { + conf.UserEventHandler = func(e serf.UserEvent) { + serverOut <- e + } + }) + defer os.RemoveAll(dir2) + defer s1.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := c1.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Check the members + testutil.WaitForResult(func() (bool, error) { + return len(c1.LANMembers()) == 2 && len(s1.LANMembers()) == 2, nil + }, func(err error) { + t.Fatalf("bad len") + }) + + // Fire the user event + err := c1.UserEvent("foo", []byte("bar")) + if err != nil { + t.Fatalf("err: %v", err) + } + + err = s1.UserEvent("bar", []byte("baz")) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for all the events + var serverFoo, serverBar, clientFoo, clientBar bool + for i := 0; i < 4; i++ { + select { + case e := <-clientOut: + switch e.Name { + case "foo": + clientFoo = true + case "bar": + clientBar = true + default: + t.Fatalf("Bad: %#v", e) + } + + case e := <-serverOut: + switch e.Name { + case "foo": + serverFoo = true + case "bar": + serverBar = true + default: + t.Fatalf("Bad: %#v", e) + } + + case <-time.After(10 * time.Second): + t.Fatalf("timeout") + } + } + + if !(serverFoo && serverBar && clientFoo && clientBar) { + t.Fatalf("missing events") + } +} diff --git a/consul/config.go b/consul/config.go index f49c8933a6..5404e71e6c 100644 --- a/consul/config.go +++ b/consul/config.go @@ -163,6 +163,10 @@ type Config struct { // ServerUp callback can be used to trigger a notification that // a Consul server is now up and known about. ServerUp func() + + // UserEventHandler callback can be used to handle incoming + // user events. This function should not block. + UserEventHandler func(serf.UserEvent) } // CheckVersion is used to check if the ProtocolVersion is valid diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index 53252e78ee..5a38b31a22 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -46,3 +46,19 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, return nil }) } + +// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC +// call to fire an event. The primary use case is to enable user events being +// triggered in a remote DC. +func (m *Internal) EventFire(args *structs.EventFireRequest, + reply *structs.EventFireResponse) error { + if done, err := m.srv.forward("Internal.EventFire", args, args, reply); done { + return err + } + + // Set the query meta data + m.srv.setQueryMeta(&reply.QueryMeta) + + // Fire the event + return m.srv.UserEvent(args.Name, args.Payload) +} diff --git a/consul/serf.go b/consul/serf.go index 37aae27257..0a6dc18ca3 100644 --- a/consul/serf.go +++ b/consul/serf.go @@ -11,8 +11,26 @@ const ( // StatusReap is used to update the status of a node if we // are handling a EventMemberReap StatusReap = serf.MemberStatus(-1) + + // userEventPrefix is pre-pended to a user event to distinguish it + userEventPrefix = "consul:event:" ) +// userEventName computes the name of a user event +func userEventName(name string) string { + return userEventPrefix + name +} + +// isUserEvent checks if a serf event is a user event +func isUserEvent(name string) bool { + return strings.HasPrefix(name, userEventPrefix) +} + +// rawUserEventName is used to get the raw user event name +func rawUserEventName(name string) string { + return strings.TrimPrefix(name, userEventPrefix) +} + // lanEventHandler is used to handle events from the lan Serf cluster func (s *Server) lanEventHandler() { for { @@ -102,14 +120,22 @@ func (s *Server) localEvent(event serf.UserEvent) { return } - switch event.Name { - case newLeaderEvent: + switch name := event.Name; { + case name == newLeaderEvent: s.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload) // Trigger the callback if s.config.ServerUp != nil { s.config.ServerUp() } + case isUserEvent(name): + event.Name = rawUserEventName(name) + s.logger.Printf("[DEBUG] consul: user event: %s", event.Name) + + // Trigger the callback + if s.config.UserEventHandler != nil { + s.config.UserEventHandler(event) + } default: s.logger.Printf("[WARN] consul: Unhandled local event: %v", event) } diff --git a/consul/serf_test.go b/consul/serf_test.go new file mode 100644 index 0000000000..07225a7293 --- /dev/null +++ b/consul/serf_test.go @@ -0,0 +1,21 @@ +package consul + +import ( + "testing" +) + +func TestUserEventNames(t *testing.T) { + out := userEventName("foo") + if out != "consul:event:foo" { + t.Fatalf("bad: %v", out) + } + if !isUserEvent(out) { + t.Fatalf("bad") + } + if isUserEvent("foo") { + t.Fatalf("bad") + } + if raw := rawUserEventName(out); raw != "foo" { + t.Fatalf("bad: %v", raw) + } +} diff --git a/consul/server.go b/consul/server.go index d3fe6da924..a16b0e0be4 100644 --- a/consul/server.go +++ b/consul/server.go @@ -530,6 +530,11 @@ func (s *Server) RemoveFailedNode(node string) error { return nil } +// UserEvent is used to fire an event via the Serf layer on the LAN +func (s *Server) UserEvent(name string, payload []byte) error { + return s.serfLAN.UserEvent(userEventName(name), payload, false) +} + // IsLeader checks if this server is the cluster leader func (s *Server) IsLeader() bool { return s.raft.State() == raft.Leader diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 95f273f4fc..910a1d3e05 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -493,6 +493,29 @@ type ACLPolicy struct { QueryMeta } +// EventFireRequest is used to ask a server to fire +// a Serf event. It is a bit odd, since it doesn't depend on +// the catalog or leader. Any node can respond, so it's not quite +// like a standard write request. This is used only internally. +type EventFireRequest struct { + Datacenter string + Name string + Payload []byte + + // Not using WriteRequest so that any server can process + // the request. It is a bit unusual... + QueryOptions +} + +func (r *EventFireRequest) RequestDatacenter() string { + return r.Datacenter +} + +// EventFireResponse is used to respond to a fire request. +type EventFireResponse struct { + QueryMeta +} + // msgpackHandle is a shared handle for encoding/decoding of structs var msgpackHandle = &codec.MsgpackHandle{} diff --git a/watch/funcs.go b/watch/funcs.go index a6d1015964..bad10bec05 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -21,6 +21,7 @@ func init() { "nodes": nodesWatch, "service": serviceWatch, "checks": checksWatch, + "event": eventWatch, } } @@ -164,3 +165,30 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) { } return fn, nil } + +// eventWatch is used to watch for events, optionally filtering on name +func eventWatch(params map[string]interface{}) (WatchFunc, error) { + var name string + if err := assignValue(params, "name", &name); err != nil { + return nil, err + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + event := p.client.Event() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + events, meta, err := event.List(name, &opts) + if err != nil { + return 0, nil, err + } + + // Prune to only the new events + for i := 0; i < len(events); i++ { + if event.IDToIndex(events[i].ID) == p.lastIndex { + events = events[i+1:] + break + } + } + return meta.LastIndex, events, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go index d0dbb0c8f2..fa0ff87741 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -392,3 +392,43 @@ func TestChecksWatch_Service(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func TestEventWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"event", "name": "foo"}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.UserEvent) + if !ok || len(v) == 0 || string(v[len(v)-1].Name) != "foo" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + event := plan.client.Event() + params := &consulapi.UserEvent{Name: "foo"} + if _, _, err := event.Fire(params, nil); err != nil { + t.Fatalf("err: %v", err) + } + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} diff --git a/website/source/docs/agent/http.html.markdown b/website/source/docs/agent/http.html.markdown index 2b59a2da98..17d21ec903 100644 --- a/website/source/docs/agent/http.html.markdown +++ b/website/source/docs/agent/http.html.markdown @@ -18,6 +18,7 @@ All endpoints fall into one of several categories: * health - Manages health checks * session - Session manipulation * acl - ACL creations and management +* event - User Events * status - Consul system status * internal - Internal APIs. Purposely undocumented, subject to change. @@ -1188,6 +1189,97 @@ It returns a JSON body like this: ... ] +## Event + +The Event endpoints are used to fire new events and to query the available +events. + +The following endpoints are supported: + +* /v1/event/fire/\: Fires a new user event +* /v1/event/list: Lists the most recent events an agent has seen. + +### /v1/event/fire/\ + +The fire endpoint is used to trigger a new user event. A user event +needs a name, and optionally takes a number of parameters. + +By default, the agent's local datacenter is used, but another datacenter +can be specified using the "?dc=" query parameter. + +The fire endpoint expects a PUT request, with an optional body. +The body contents are opaque to Consul, and become the "payload" +of the event. + +The `?node=`, `?service=`, and `?tag=` query parameters may optionally +be provided. They respectively provide a regular expression to filter +by node name, service, and service tags. + +The return code is 200 on success, along with a body like: + + { + "ID":"b54fe110-7af5-cafc-d1fb-afc8ba432b1c", + "Name":"deploy", + "Payload":null, + "NodeFilter":"", + "ServiceFilter":"", + "TagFilter":"", + "Version":1, + "LTime":0 + } + +This is used to provide the ID of the newly fired event. + +### /v1/event/list + +Thie endpoint is hit with a GET and returns the most recent +events known by the agent. As a consequence of how the +[event command](/docs/commands/event.html) works, each agent +may have a different view of the events. Events are broadcast using +the [gossip protocol](/docs/internals/gossip.html), which means +they have no total ordering, nor do they make a promise of delivery. + +Additionally, each node applies the node, service and tag filters +locally before storing the event. This means the events at each agent +may be different depending on their configuration. + +This endpoint does allow for filtering on events by name by providing +the `?name=` query parameter. + +Lastly, to support [watches](/docs/agent/watches.html), this endpoint +supports blocking queries. However, the semantics of this endpoint +is slightly different. Most blocking queries provide a monotonic index, +and block until a newer index is available. This can be supported as +a consequence of the total ordering of the [consensus protocol](/docs/internals/consensus.html). +With gossip, there is no ordering, and instead `X-Consul-Index` maps +to the newest event that matches the query. + +In practice, this means the index is only useful when used against a +single agent, and has no meaning globally. Because Consul defines +the index as being opaque, clients should not be expecting a natural +ordering either. + +Lastly, agent's only buffer the most recent entries. The number +of entries should not be depended upon, but currently defaults to +256. This value could change in the future. The buffer should be large +enough for most clients and watches. + +It returns a JSON body like this: + + [ + { + "ID": "b54fe110-7af5-cafc-d1fb-afc8ba432b1c", + "Name": "deploy", + "Payload": "MTYwOTAzMA=="", + "NodeFilter": "", + "ServiceFilter": "", + "TagFilter": "", + "Version": 1, + "LTime": 19 + }, + ... + ] + ## Status The Status endpoints are used to get information about the status diff --git a/website/source/docs/agent/watches.html.markdown b/website/source/docs/agent/watches.html.markdown index a3b03157a6..561e6594c3 100644 --- a/website/source/docs/agent/watches.html.markdown +++ b/website/source/docs/agent/watches.html.markdown @@ -62,6 +62,7 @@ The following types are supported, with more documentation below: * `nodes` - Watch the list of nodes * `service`- Watch the instances of a service * `checks` - Watch the value of health checks +* `event` - Watch for custom user events ### Type: key @@ -284,3 +285,45 @@ An example of the output of this command: } ] + +### Type: event + +The "event" watch type is used to monitor for custom user +events. These are fired using the [consul event](/docs/commands/event.html) command. +It takes only a single optional "name" parameter, which restricts +the watch to only events with the given name. + +This maps to the `v1/event/list` API internvally. + +Here is an example configuration: + + { + "type": "event", + "name": "web-deploy", + "handler": "/usr/bin/my-deploy-handler.sh" + } + +Or, using the watch command: + + $ consul watch -type event -name web-deploy /usr/bin/my-deploy-handler.sh + +An example of the output of this command: + + [ + { + "ID": "f07f3fcc-4b7d-3a7c-6d1e-cf414039fcee", + "Name": "web-deploy", + "Payload": "MTYwOTAzMA==", + "NodeFilter": "", + "ServiceFilter": "", + "TagFilter": "", + "Version": 1, + "LTime": 18 + }, + ... + ] + +To fire a new `web-deploy` event the following could be used: + + $ consul event -name web-deploy 1609030 + diff --git a/website/source/docs/commands/event.html.markdown b/website/source/docs/commands/event.html.markdown new file mode 100644 index 0000000000..8b851f7980 --- /dev/null +++ b/website/source/docs/commands/event.html.markdown @@ -0,0 +1,56 @@ +--- +layout: "docs" +page_title: "Commands: Event" +sidebar_current: "docs-commands-event" +--- + +# Consul Event + +Command: `consul event` + +The event command provides a mechanism to fire a custom user event to an +entire datacenter. These events are opaque to Consul, but they can be used +to build scripting infrastructure to do automated deploys, restart services, +or perform any other orchestration action. Events can be handled by +[using a watch](/docs/agent/watches.html). + +Under the hood, events are propogated using the [gossip protocol](/docs/internals/gossip.html). +While the details are not important for using events, an understanding of +the semantics is useful. The gossip layer will make a best-effort to deliver +the event, but there is **no guarantee** delivery. Unlike most Consul data, which is +replicated using [consensus](/docs/internals/consensus.html), event data +is purely peer-to-peer over gossip. This means it is not persisted and does +not have a total ordering. In practice, this means you cannot rely on the +order of message delivery. An advantage however is that events can still +be used even in the absense of server nodes or during an outage. + +The underlying gossip also sets limits on the size of a user event +message. It is hard to give an exact number, as it depends on various +parameters of the event, but the payload should be kept very small +(< 100 bytes). Specifying too large of an event will return an error. + +## Usage + +Usage: `consul event [options] [payload]` + +The only required option is `-name` which specifies the event name. An optional +payload can be provided as the final argument. + +The list of available flags are: + +* `-http-addr` - Address to the HTTP server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8500" which is the default HTTP address of a Consul agent. + +* `-datacenter` - Datacenter to query. Defaults to that of agent. + +* `-name` - The name of the event. + +* `-node` - Regular expression to filter nodes which should evaluate the event. + +* `-service` - Regular expression to filter to only nodes which matching services. + +* `-tag` - Regular expression to filter to only nodes with a service that has + a matching tag. This must be used with `-service`. As an example, you may + do "-server mysql -tag slave". + diff --git a/website/source/docs/commands/watch.html.markdown b/website/source/docs/commands/watch.html.markdown index 7f565a3594..a0d53fc1d4 100644 --- a/website/source/docs/commands/watch.html.markdown +++ b/website/source/docs/commands/watch.html.markdown @@ -37,6 +37,8 @@ The list of available flags are: * `-key` - Key to watch. Only for `key` type. +* `-name`- Event name to watch. Only for `event` type. + * `-passingonly=[true|false]` - Should only passing entries be returned. Default false. only for `service` type. @@ -49,5 +51,5 @@ The list of available flags are: * `-tag` - Service tag to filter on. Optional for `service` type. * `-type` - Watch type. Required, one of "key", "keyprefix", "services", - "nodes", "services", or "checks". + "nodes", "services", "checks", or "event". diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 427a7be74c..04855527a0 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -57,6 +57,10 @@