mirror of https://github.com/hashicorp/consul
agent: enforce event policy during event fire
parent
d777105c11
commit
6f309c355f
|
@ -36,6 +36,10 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Get the ACL token
|
||||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// Get the filters
|
||||
if filt := req.URL.Query().Get("node"); filt != "" {
|
||||
event.NodeFilter = filt
|
||||
|
@ -57,7 +61,7 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int
|
|||
}
|
||||
|
||||
// Try to fire the event
|
||||
if err := s.agent.UserEvent(dc, event); err != nil {
|
||||
if err := s.agent.UserEvent(dc, token, event); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@ import (
|
|||
|
||||
func TestEventFire(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
body := bytes.NewBuffer([]byte("test"))
|
||||
url := "/v1/event/fire/test?node=Node&service=foo&tag=bar"
|
||||
req, err := http.NewRequest("PUT", url, body)
|
||||
|
@ -53,8 +55,10 @@ func TestEventFire(t *testing.T) {
|
|||
|
||||
func TestEventList(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
p := &UserEvent{Name: "test"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -89,13 +93,15 @@ func TestEventList(t *testing.T) {
|
|||
|
||||
func TestEventList_Filter(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
p := &UserEvent{Name: "test"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
p = &UserEvent{Name: "foo"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -130,8 +136,10 @@ func TestEventList_Filter(t *testing.T) {
|
|||
|
||||
func TestEventList_Blocking(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
p := &UserEvent{Name: "test"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
|
@ -159,7 +167,7 @@ func TestEventList_Blocking(t *testing.T) {
|
|||
go func() {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
p := &UserEvent{Name: "second"}
|
||||
if err := srv.agent.UserEvent("", p); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
|
@ -192,6 +200,8 @@ func TestEventList_Blocking(t *testing.T) {
|
|||
|
||||
func TestEventList_EventBufOrder(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
// Fire some events in a non-sequential order
|
||||
expected := &UserEvent{Name: "foo"}
|
||||
|
||||
|
@ -202,7 +212,7 @@ func TestEventList_EventBufOrder(t *testing.T) {
|
|||
expected,
|
||||
&UserEvent{Name: "bar"},
|
||||
} {
|
||||
if err := srv.agent.UserEvent("", e); err != nil {
|
||||
if err := srv.agent.UserEvent("dc1", "root", e); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func validateUserEventParams(params *UserEvent) error {
|
|||
}
|
||||
|
||||
// UserEvent is used to fire an event via the Serf layer on the LAN
|
||||
func (a *Agent) UserEvent(dc string, params *UserEvent) error {
|
||||
func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
|
||||
// Validate the params
|
||||
if err := validateUserEventParams(params); err != nil {
|
||||
return err
|
||||
|
@ -85,28 +85,22 @@ func (a *Agent) UserEvent(dc string, params *UserEvent) error {
|
|||
return fmt.Errorf("UserEvent encoding failed: %v", err)
|
||||
}
|
||||
|
||||
// Check if this is the local DC, fire locally
|
||||
if dc == "" || dc == a.config.Datacenter {
|
||||
if a.server != nil {
|
||||
return a.server.UserEvent(params.Name, payload)
|
||||
} else {
|
||||
return a.client.UserEvent(params.Name, payload)
|
||||
}
|
||||
} else {
|
||||
// Send an RPC to remote datacenter to service this
|
||||
// Send an RPC to service this
|
||||
args := structs.EventFireRequest{
|
||||
Datacenter: dc,
|
||||
Name: params.Name,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
// Pass along the ACL token, if any
|
||||
args.Token = token
|
||||
|
||||
// Any server can process in the remote DC, since the
|
||||
// gossip will take over anyways
|
||||
args.AllowStale = true
|
||||
var out structs.EventFireResponse
|
||||
return a.RPC("Internal.EventFire", &args, &out)
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvents is used to process incoming user events
|
||||
func (a *Agent) handleEvents() {
|
||||
|
|
|
@ -153,6 +153,8 @@ func TestFireReceiveEvent(t *testing.T) {
|
|||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
srv1 := &structs.NodeService{
|
||||
ID: "mysql",
|
||||
Service: "mysql",
|
||||
|
@ -162,13 +164,13 @@ func TestFireReceiveEvent(t *testing.T) {
|
|||
agent.state.AddService(srv1, "")
|
||||
|
||||
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
|
||||
err := agent.UserEvent("", p1)
|
||||
err := agent.UserEvent("dc1", "root", p1)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
p2 := &UserEvent{Name: "deploy"}
|
||||
err = agent.UserEvent("", p2)
|
||||
err = agent.UserEvent("dc1", "root", p2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -186,3 +188,66 @@ func TestFireReceiveEvent(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", last)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUserEventToken(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
|
||||
// Set the default policies to deny
|
||||
conf.ACLDefaultPolicy = "deny"
|
||||
|
||||
dir, agent := makeAgent(t, conf)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
// Create an ACL token
|
||||
args := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testEventPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var token string
|
||||
if err := agent.RPC("ACL.Apply", &args, &token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
type tcase struct {
|
||||
name string
|
||||
expect bool
|
||||
}
|
||||
cases := []tcase{
|
||||
{"foo", false},
|
||||
{"bar", false},
|
||||
{"baz", true},
|
||||
{"zip", false},
|
||||
}
|
||||
for _, c := range cases {
|
||||
event := &UserEvent{Name: c.name}
|
||||
err := agent.UserEvent("dc1", token, event)
|
||||
allowed := false
|
||||
if err == nil || err.Error() != permissionDenied {
|
||||
allowed = true
|
||||
}
|
||||
if allowed != c.expect {
|
||||
t.Fatalf("bad: %#v result: %v", c, allowed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const testEventPolicy = `
|
||||
event "foo" {
|
||||
policy = "deny"
|
||||
}
|
||||
event "bar" {
|
||||
policy = "read"
|
||||
}
|
||||
event "baz" {
|
||||
policy = "write"
|
||||
}
|
||||
`
|
||||
|
|
Loading…
Reference in New Issue