From 77e94cfd38c702f73eb1726ea8a727f3fbe2ae94 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 28 Aug 2014 13:42:07 -0700 Subject: [PATCH] agent: First pass at event endpoints --- command/agent/event_endpoint.go | 147 +++++++++++++++++++++++++++ command/agent/event_endpoint_test.go | 14 +++ command/agent/http.go | 3 + command/agent/user_event.go | 1 + 4 files changed, 165 insertions(+) create mode 100644 command/agent/event_endpoint.go create mode 100644 command/agent/event_endpoint_test.go diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go new file mode 100644 index 0000000000..d0ba9ed68c --- /dev/null +++ b/command/agent/event_endpoint.go @@ -0,0 +1,147 @@ +package agent + +import ( + "bytes" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/hashicorp/consul/consul/structs" +) + +const ( + // maxQueryTime is used to bound the limit of a blocking query + maxQueryTime = 600 * time.Second +) + +// EventFire is used to fire a new event +func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Mandate a PUT request + if req.Method != "PUT" { + resp.WriteHeader(405) + return nil, nil + } + + event := &UserEvent{} + event.Name = strings.TrimPrefix(req.URL.Path, "/v1/event/fire/") + if event.Name == "" { + resp.WriteHeader(400) + resp.Write([]byte("Missing name")) + return nil, nil + } + + // Get the filters + if filt := req.URL.Query().Get("node"); filt != "" { + event.NodeFilter = filt + } + if filt := req.URL.Query().Get("service"); filt != "" { + event.ServiceFilter = filt + } + if filt := req.URL.Query().Get("tag"); filt != "" { + event.TagFilter = filt + } + + // Get the payload + if req.ContentLength > 0 { + var buf bytes.Buffer + if _, err := io.Copy(&buf, req.Body); err != nil { + return nil, err + } + event.Payload = buf.Bytes() + } + + // Try to fire the event + if err := s.agent.UserEvent(event); err != nil { + return nil, err + } + + // Return the event + return event, nil +} + +// EventList is used to retrieve the recent list of events +func (s *HTTPServer) EventList(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Parse the query options, since we simulate a blocking query + var b structs.QueryOptions + if parseWait(resp, req, &b) { + return nil, nil + } + + // Lots of this logic is borrowed from consul/rpc.go:blockingRPC + // However we cannot use that directly since this code has some + // slight semantics differences... + var timeout <-chan time.Time + var notifyCh chan struct{} + + // Fast path non-blocking + if b.MinQueryIndex == 0 { + goto RUN_QUERY + } + + // Restrict the max query time + if b.MaxQueryTime > maxQueryTime { + b.MaxQueryTime = maxQueryTime + } + + // Ensure a time limit is set if we have an index + if b.MinQueryIndex > 0 && b.MaxQueryTime == 0 { + b.MaxQueryTime = maxQueryTime + } + + // Setup a query timeout + if b.MaxQueryTime > 0 { + timeout = time.After(b.MaxQueryTime) + } + + // Setup a notification channel for changes +SETUP_NOTIFY: + if b.MinQueryIndex > 0 { + notifyCh = make(chan struct{}, 1) + s.agent.eventNotify.Wait(notifyCh) + } + +RUN_QUERY: + // Get the recent events + events := s.agent.UserEvents() + var index uint64 + if len(events) == 0 { + index = 0 + } else { + last := events[len(events)-1] + index = uuidToUint64(last.ID) + } + setIndex(resp, index) + + // Check for exactly match on the query value. Because + // the index value is not monotonic, we just ensure it is + // not an exact match. + if index > 0 && index == b.MinQueryIndex { + select { + case <-notifyCh: + goto SETUP_NOTIFY + case <-timeout: + } + } + return events, nil +} + +// uuidToUint64 is a bit of a hack to generate a 64bit Consul index. +// In effect, we take our random UUID, convert it to a 128 bit number, +// then XOR the high-order and low-order 64bit's together to get the +// output. This lets us generate an index which can be used to simulate +// the blocking behavior of other catalog endpoints. +func uuidToUint64(uuid string) uint64 { + lower := uuid[0:8] + uuid[9:13] + uuid[14:18] + upper := uuid[19:23] + uuid[24:36] + lowVal, err := strconv.ParseUint(lower, 16, 64) + if err != nil { + panic("Failed to convert " + lower) + } + highVal, err := strconv.ParseUint(upper, 16, 64) + if err != nil { + panic("Failed to convert " + upper) + } + return lowVal ^ highVal +} diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go new file mode 100644 index 0000000000..47a299a57b --- /dev/null +++ b/command/agent/event_endpoint_test.go @@ -0,0 +1,14 @@ +package agent + +import ( + "testing" +) + +func TestUUIDToUint64(t *testing.T) { + inp := "cb9a81ad-fff6-52ac-92a7-5f70687805ec" + + // Output value was computed using python + if uuidToUint64(inp) != 6430540886266763072 { + t.Fatalf("bad") + } +} diff --git a/command/agent/http.go b/command/agent/http.go index bfc67577c5..ad1a64bfd2 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -93,6 +93,9 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) s.mux.HandleFunc("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) + s.mux.HandleFunc("/v1/event/fire/", s.wrap(s.EventFire)) + s.mux.HandleFunc("/v1/event/list", s.wrap(s.EventList)) + s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint)) s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate)) diff --git a/command/agent/user_event.go b/command/agent/user_event.go index 393193885a..6004a1bda9 100644 --- a/command/agent/user_event.go +++ b/command/agent/user_event.go @@ -190,6 +190,7 @@ func (a *Agent) shouldProcessUserEvent(msg *UserEvent) bool { // ingestUserEvent is used to process an event that passes filtering func (a *Agent) ingestUserEvent(msg *UserEvent) { + a.logger.Printf("[DEBUG] agent: new event: %s (%s)", msg.Name, msg.ID) a.eventLock.Lock() defer func() { a.eventLock.Unlock()