Merge pull request #1046 from hashicorp/f-event-acl

Event ACLs
pull/1081/head
Ryan Uber 2015-07-02 07:02:07 -07:00
commit e37b5ecb69
20 changed files with 526 additions and 102 deletions

View File

@ -52,6 +52,12 @@ type ACL interface {
// ServiceRead checks for permission to read a given service // ServiceRead checks for permission to read a given service
ServiceRead(string) bool ServiceRead(string) bool
// EventRead determines if a specific event can be queried.
EventRead(string) bool
// EventWrite determines if a specific event may be fired.
EventWrite(string) bool
// ACLList checks for permission to list all the ACLs // ACLList checks for permission to list all the ACLs
ACLList() bool ACLList() bool
@ -87,6 +93,14 @@ func (s *StaticACL) ServiceWrite(string) bool {
return s.defaultAllow return s.defaultAllow
} }
func (s *StaticACL) EventRead(string) bool {
return s.defaultAllow
}
func (s *StaticACL) EventWrite(string) bool {
return s.defaultAllow
}
func (s *StaticACL) ACLList() bool { func (s *StaticACL) ACLList() bool {
return s.allowManage return s.allowManage
} }
@ -136,6 +150,9 @@ type PolicyACL struct {
// serviceRules contains the service policies // serviceRules contains the service policies
serviceRules *radix.Tree serviceRules *radix.Tree
// eventRules contains the user event policies
eventRules *radix.Tree
} }
// New is used to construct a policy based ACL from a set of policies // New is used to construct a policy based ACL from a set of policies
@ -145,6 +162,7 @@ func New(parent ACL, policy *Policy) (*PolicyACL, error) {
parent: parent, parent: parent,
keyRules: radix.New(), keyRules: radix.New(),
serviceRules: radix.New(), serviceRules: radix.New(),
eventRules: radix.New(),
} }
// Load the key policy // Load the key policy
@ -156,6 +174,12 @@ func New(parent ACL, policy *Policy) (*PolicyACL, error) {
for _, sp := range policy.Services { for _, sp := range policy.Services {
p.serviceRules.Insert(sp.Name, sp.Policy) p.serviceRules.Insert(sp.Name, sp.Policy)
} }
// Load the event policy
for _, ep := range policy.Events {
p.eventRules.Insert(ep.Event, ep.Policy)
}
return p, nil return p, nil
} }
@ -266,6 +290,37 @@ func (p *PolicyACL) ServiceWrite(name string) bool {
return p.parent.ServiceWrite(name) return p.parent.ServiceWrite(name)
} }
// EventRead is used to determine if the policy allows for a
// specific user event to be read.
func (p *PolicyACL) EventRead(name string) bool {
// Longest-prefix match on event names
if _, rule, ok := p.eventRules.LongestPrefix(name); ok {
switch rule {
case EventPolicyRead:
return true
case EventPolicyWrite:
return true
default:
return false
}
}
// Nothing matched, use parent
return p.parent.EventRead(name)
}
// EventWrite is used to determine if new events can be created
// (fired) by the policy.
func (p *PolicyACL) EventWrite(name string) bool {
// Longest-prefix match event names
if _, rule, ok := p.eventRules.LongestPrefix(name); ok {
return rule == EventPolicyWrite
}
// No match, use parent
return p.parent.EventWrite(name)
}
// ACLList checks if listing of ACLs is allowed // ACLList checks if listing of ACLs is allowed
func (p *PolicyACL) ACLList() bool { func (p *PolicyACL) ACLList() bool {
return p.parent.ACLList() return p.parent.ACLList()

View File

@ -66,11 +66,23 @@ func TestStaticACL(t *testing.T) {
if none.ServiceWrite("foobar") { if none.ServiceWrite("foobar") {
t.Fatalf("should not allow") t.Fatalf("should not allow")
} }
if none.EventRead("foobar") {
t.Fatalf("should not allow")
}
if none.EventRead("") {
t.Fatalf("should not allow")
}
if none.EventWrite("foobar") {
t.Fatalf("should not allow")
}
if none.EventWrite("") {
t.Fatalf("should not allow")
}
if none.ACLList() { if none.ACLList() {
t.Fatalf("should not noneow") t.Fatalf("should not allow")
} }
if none.ACLModify() { if none.ACLModify() {
t.Fatalf("should not noneow") t.Fatalf("should not allow")
} }
if !manage.KeyRead("foobar") { if !manage.KeyRead("foobar") {
@ -132,6 +144,20 @@ func TestPolicyACL(t *testing.T) {
Policy: ServicePolicyWrite, Policy: ServicePolicyWrite,
}, },
}, },
Events: []*EventPolicy{
&EventPolicy{
Event: "",
Policy: EventPolicyRead,
},
&EventPolicy{
Event: "foo",
Policy: EventPolicyWrite,
},
&EventPolicy{
Event: "bar",
Policy: EventPolicyDeny,
},
},
} }
acl, err := New(all, policy) acl, err := New(all, policy)
if err != nil { if err != nil {
@ -188,6 +214,27 @@ func TestPolicyACL(t *testing.T) {
t.Fatalf("Write fail: %#v", c) t.Fatalf("Write fail: %#v", c)
} }
} }
type eventcase struct {
inp string
read bool
write bool
}
eventcases := []eventcase{
{"foo", true, true},
{"foobar", true, true},
{"bar", false, false},
{"barbaz", false, false},
{"baz", true, false},
}
for _, c := range eventcases {
if c.read != acl.EventRead(c.inp) {
t.Fatalf("Event fail: %#v", c)
}
if c.write != acl.EventWrite(c.inp) {
t.Fatalf("Event fail: %#v", c)
}
}
} }
func TestPolicyACL_Parent(t *testing.T) { func TestPolicyACL_Parent(t *testing.T) {

View File

@ -13,6 +13,9 @@ const (
ServicePolicyDeny = "deny" ServicePolicyDeny = "deny"
ServicePolicyRead = "read" ServicePolicyRead = "read"
ServicePolicyWrite = "write" ServicePolicyWrite = "write"
EventPolicyRead = "read"
EventPolicyWrite = "write"
EventPolicyDeny = "deny"
) )
// Policy is used to represent the policy specified by // Policy is used to represent the policy specified by
@ -21,6 +24,7 @@ type Policy struct {
ID string `hcl:"-"` ID string `hcl:"-"`
Keys []*KeyPolicy `hcl:"key,expand"` Keys []*KeyPolicy `hcl:"key,expand"`
Services []*ServicePolicy `hcl:"service,expand"` Services []*ServicePolicy `hcl:"service,expand"`
Events []*EventPolicy `hcl:"event,expand"`
} }
// KeyPolicy represents a policy for a key // KeyPolicy represents a policy for a key
@ -43,6 +47,16 @@ func (k *ServicePolicy) GoString() string {
return fmt.Sprintf("%#v", *k) return fmt.Sprintf("%#v", *k)
} }
// EventPolicy represents a user event policy.
type EventPolicy struct {
Event string `hcl:",key"`
Policy string
}
func (e *EventPolicy) GoString() string {
return fmt.Sprintf("%#v", *e)
}
// Parse is used to parse the specified ACL rules into an // Parse is used to parse the specified ACL rules into an
// intermediary set of policies, before being compiled into // intermediary set of policies, before being compiled into
// the ACL // the ACL
@ -80,5 +94,16 @@ func Parse(rules string) (*Policy, error) {
} }
} }
// Validate the user event policies
for _, ep := range p.Events {
switch ep.Policy {
case EventPolicyRead:
case EventPolicyWrite:
case EventPolicyDeny:
default:
return nil, fmt.Errorf("Invalid event policy: %#v", ep)
}
}
return p, nil return p, nil
} }

View File

@ -24,6 +24,15 @@ service "" {
} }
service "foo" { service "foo" {
policy = "read" policy = "read"
}
event "" {
policy = "read"
}
event "foo" {
policy = "write"
}
event "bar" {
policy = "deny"
} }
` `
exp := &Policy{ exp := &Policy{
@ -55,6 +64,20 @@ service "foo" {
Policy: ServicePolicyRead, Policy: ServicePolicyRead,
}, },
}, },
Events: []*EventPolicy{
&EventPolicy{
Event: "",
Policy: EventPolicyRead,
},
&EventPolicy{
Event: "foo",
Policy: EventPolicyWrite,
},
&EventPolicy{
Event: "bar",
Policy: EventPolicyDeny,
},
},
} }
out, err := Parse(inp) out, err := Parse(inp)
@ -90,6 +113,17 @@ func TestParse_JSON(t *testing.T) {
"foo": { "foo": {
"policy": "read" "policy": "read"
} }
},
"event": {
"": {
"policy": "read"
},
"foo": {
"policy": "write"
},
"bar": {
"policy": "deny"
}
} }
}` }`
exp := &Policy{ exp := &Policy{
@ -121,6 +155,20 @@ func TestParse_JSON(t *testing.T) {
Policy: ServicePolicyRead, Policy: ServicePolicyRead,
}, },
}, },
Events: []*EventPolicy{
&EventPolicy{
Event: "",
Policy: EventPolicyRead,
},
&EventPolicy{
Event: "foo",
Policy: EventPolicyWrite,
},
&EventPolicy{
Event: "bar",
Policy: EventPolicyDeny,
},
},
} }
out, err := Parse(inp) out, err := Parse(inp)

View File

@ -36,6 +36,10 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int
return nil, nil return nil, nil
} }
// Get the ACL token
var token string
s.parseToken(req, &token)
// Get the filters // Get the filters
if filt := req.URL.Query().Get("node"); filt != "" { if filt := req.URL.Query().Get("node"); filt != "" {
event.NodeFilter = filt event.NodeFilter = filt
@ -57,7 +61,13 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int
} }
// Try to fire the event // Try to fire the event
if err := s.agent.UserEvent(dc, event); err != nil { if err := s.agent.UserEvent(dc, token, event); err != nil {
if strings.Contains(err.Error(), permissionDenied) {
resp.WriteHeader(403)
resp.Write([]byte(permissionDenied))
return nil, nil
}
resp.WriteHeader(500)
return nil, err return nil, err
} }

View File

@ -5,9 +5,11 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"strings"
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
) )
@ -51,10 +53,72 @@ func TestEventFire(t *testing.T) {
}) })
} }
func TestEventFire_token(t *testing.T) {
httpTestWithConfig(t, func(srv *HTTPServer) {
// Create an ACL token
args := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testEventPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token string
if err := srv.agent.RPC("ACL.Apply", &args, &token); err != nil {
t.Fatalf("err: %v", err)
}
type tcase struct {
event string
allowed bool
}
tcases := []tcase{
{"foo", false},
{"bar", false},
{"baz", true},
}
for _, c := range tcases {
// Try to fire the event over the HTTP interface
url := fmt.Sprintf("/v1/event/fire/%s?token=%s", c.event, token)
req, err := http.NewRequest("PUT", url, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
resp := httptest.NewRecorder()
if _, err := srv.EventFire(resp, req); err != nil {
t.Fatalf("err: %s", err)
}
// Check the result
body := resp.Body.String()
if c.allowed {
if strings.Contains(body, permissionDenied) {
t.Fatalf("bad: %s", body)
}
if resp.Code != 200 {
t.Fatalf("bad: %d", resp.Code)
}
} else {
if !strings.Contains(body, permissionDenied) {
t.Fatalf("bad: %s", body)
}
if resp.Code != 403 {
t.Fatalf("bad: %d", resp.Code)
}
}
}
}, func(c *Config) {
c.ACLDefaultPolicy = "deny"
})
}
func TestEventList(t *testing.T) { func TestEventList(t *testing.T) {
httpTest(t, func(srv *HTTPServer) { httpTest(t, func(srv *HTTPServer) {
p := &UserEvent{Name: "test"} p := &UserEvent{Name: "test"}
if err := srv.agent.UserEvent("", p); err != nil { if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -90,12 +154,12 @@ func TestEventList(t *testing.T) {
func TestEventList_Filter(t *testing.T) { func TestEventList_Filter(t *testing.T) {
httpTest(t, func(srv *HTTPServer) { httpTest(t, func(srv *HTTPServer) {
p := &UserEvent{Name: "test"} p := &UserEvent{Name: "test"}
if err := srv.agent.UserEvent("", p); err != nil { if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
p = &UserEvent{Name: "foo"} p = &UserEvent{Name: "foo"}
if err := srv.agent.UserEvent("", p); err != nil { if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -131,7 +195,7 @@ func TestEventList_Filter(t *testing.T) {
func TestEventList_Blocking(t *testing.T) { func TestEventList_Blocking(t *testing.T) {
httpTest(t, func(srv *HTTPServer) { httpTest(t, func(srv *HTTPServer) {
p := &UserEvent{Name: "test"} p := &UserEvent{Name: "test"}
if err := srv.agent.UserEvent("", p); err != nil { if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -159,7 +223,7 @@ func TestEventList_Blocking(t *testing.T) {
go func() { go func() {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
p := &UserEvent{Name: "second"} p := &UserEvent{Name: "second"}
if err := srv.agent.UserEvent("", p); err != nil { if err := srv.agent.UserEvent("dc1", "root", p); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
}() }()
@ -202,7 +266,7 @@ func TestEventList_EventBufOrder(t *testing.T) {
expected, expected,
&UserEvent{Name: "bar"}, &UserEvent{Name: "bar"},
} { } {
if err := srv.agent.UserEvent("", e); err != nil { if err := srv.agent.UserEvent("dc1", "root", e); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }

View File

@ -71,7 +71,7 @@ func validateUserEventParams(params *UserEvent) error {
} }
// UserEvent is used to fire an event via the Serf layer on the LAN // UserEvent is used to fire an event via the Serf layer on the LAN
func (a *Agent) UserEvent(dc string, params *UserEvent) error { func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
// Validate the params // Validate the params
if err := validateUserEventParams(params); err != nil { if err := validateUserEventParams(params); err != nil {
return err return err
@ -85,27 +85,20 @@ func (a *Agent) UserEvent(dc string, params *UserEvent) error {
return fmt.Errorf("UserEvent encoding failed: %v", err) return fmt.Errorf("UserEvent encoding failed: %v", err)
} }
// Check if this is the local DC, fire locally // Service the event fire over RPC. This ensures that we authorize
if dc == "" || dc == a.config.Datacenter { // the request against the token first.
if a.server != nil { args := structs.EventFireRequest{
return a.server.UserEvent(params.Name, payload) Datacenter: dc,
} else { Name: params.Name,
return a.client.UserEvent(params.Name, payload) Payload: payload,
} QueryOptions: structs.QueryOptions{Token: token},
} else {
// Send an RPC to remote datacenter to service this
args := structs.EventFireRequest{
Datacenter: dc,
Name: params.Name,
Payload: payload,
}
// Any server can process in the remote DC, since the
// gossip will take over anyways
args.AllowStale = true
var out structs.EventFireResponse
return a.RPC("Internal.EventFire", &args, &out)
} }
// Any server can process in the remote DC, since the
// gossip will take over anyways
args.AllowStale = true
var out structs.EventFireResponse
return a.RPC("Internal.EventFire", &args, &out)
} }
// handleEvents is used to process incoming user events // handleEvents is used to process incoming user events

View File

@ -153,6 +153,8 @@ func TestFireReceiveEvent(t *testing.T) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer agent.Shutdown() defer agent.Shutdown()
testutil.WaitForLeader(t, agent.RPC, "dc1")
srv1 := &structs.NodeService{ srv1 := &structs.NodeService{
ID: "mysql", ID: "mysql",
Service: "mysql", Service: "mysql",
@ -162,13 +164,13 @@ func TestFireReceiveEvent(t *testing.T) {
agent.state.AddService(srv1, "") agent.state.AddService(srv1, "")
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
err := agent.UserEvent("", p1) err := agent.UserEvent("dc1", "root", p1)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
p2 := &UserEvent{Name: "deploy"} p2 := &UserEvent{Name: "deploy"}
err = agent.UserEvent("", p2) err = agent.UserEvent("dc1", "root", p2)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -186,3 +188,66 @@ func TestFireReceiveEvent(t *testing.T) {
t.Fatalf("bad: %#v", last) t.Fatalf("bad: %#v", last)
} }
} }
func TestUserEventToken(t *testing.T) {
conf := nextConfig()
// Set the default policies to deny
conf.ACLDefaultPolicy = "deny"
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
testutil.WaitForLeader(t, agent.RPC, "dc1")
// Create an ACL token
args := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testEventPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var token string
if err := agent.RPC("ACL.Apply", &args, &token); err != nil {
t.Fatalf("err: %v", err)
}
type tcase struct {
name string
expect bool
}
cases := []tcase{
{"foo", false},
{"bar", false},
{"baz", true},
{"zip", false},
}
for _, c := range cases {
event := &UserEvent{Name: c.name}
err := agent.UserEvent("dc1", token, event)
allowed := false
if err == nil || err.Error() != permissionDenied {
allowed = true
}
if allowed != c.expect {
t.Fatalf("bad: %#v result: %v", c, allowed)
}
}
}
const testEventPolicy = `
event "foo" {
policy = "deny"
}
event "bar" {
policy = "read"
}
event "baz" {
policy = "write"
}
`

View File

@ -33,12 +33,14 @@ Options:
-service="" Regular expression to filter on service instances -service="" Regular expression to filter on service instances
-tag="" Regular expression to filter on service tags. Must be used -tag="" Regular expression to filter on service tags. Must be used
with -service. with -service.
-token="" ACL token to use during requests. Defaults to that
of the agent.
` `
return strings.TrimSpace(helpText) return strings.TrimSpace(helpText)
} }
func (c *EventCommand) Run(args []string) int { func (c *EventCommand) Run(args []string) int {
var datacenter, name, node, service, tag string var datacenter, name, node, service, tag, token string
cmdFlags := flag.NewFlagSet("event", flag.ContinueOnError) cmdFlags := flag.NewFlagSet("event", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
cmdFlags.StringVar(&datacenter, "datacenter", "", "") cmdFlags.StringVar(&datacenter, "datacenter", "", "")
@ -46,6 +48,7 @@ func (c *EventCommand) Run(args []string) int {
cmdFlags.StringVar(&node, "node", "", "") cmdFlags.StringVar(&node, "node", "", "")
cmdFlags.StringVar(&service, "service", "", "") cmdFlags.StringVar(&service, "service", "", "")
cmdFlags.StringVar(&tag, "tag", "", "") cmdFlags.StringVar(&tag, "tag", "", "")
cmdFlags.StringVar(&token, "token", "", "")
httpAddr := HTTPAddrFlag(cmdFlags) httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil { if err := cmdFlags.Parse(args); err != nil {
return 1 return 1
@ -120,6 +123,7 @@ func (c *EventCommand) Run(args []string) int {
} }
opts := &consulapi.WriteOptions{ opts := &consulapi.WriteOptions{
Datacenter: datacenter, Datacenter: datacenter,
Token: token,
} }
// Fire the event // Fire the event

View File

@ -55,6 +55,7 @@ const (
type rExecConf struct { type rExecConf struct {
datacenter string datacenter string
prefix string prefix string
token string
foreignDC bool foreignDC bool
localDC string localDC string
@ -136,6 +137,7 @@ func (c *ExecCommand) Run(args []string) int {
cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, "") cmdFlags.DurationVar(&c.conf.replWait, "wait-repl", rExecReplicationWait, "")
cmdFlags.DurationVar(&c.conf.wait, "wait", rExecQuietWait, "") cmdFlags.DurationVar(&c.conf.wait, "wait", rExecQuietWait, "")
cmdFlags.BoolVar(&c.conf.verbose, "verbose", false, "") cmdFlags.BoolVar(&c.conf.verbose, "verbose", false, "")
cmdFlags.StringVar(&c.conf.token, "token", "", "")
httpAddr := HTTPAddrFlag(cmdFlags) httpAddr := HTTPAddrFlag(cmdFlags)
if err := cmdFlags.Parse(args); err != nil { if err := cmdFlags.Parse(args); err != nil {
return 1 return 1
@ -173,7 +175,11 @@ func (c *ExecCommand) Run(args []string) int {
} }
// Create and test the HTTP client // Create and test the HTTP client
client, err := HTTPClientDC(*httpAddr, c.conf.datacenter) client, err := HTTPClientConfig(func(clientConf *consulapi.Config) {
clientConf.Address = *httpAddr
clientConf.Datacenter = c.conf.datacenter
clientConf.Token = c.conf.token
})
if err != nil { if err != nil {
c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err))
return 1 return 1
@ -625,6 +631,8 @@ Options:
-wait-repl=200ms Period to wait for replication before firing event. This is an -wait-repl=200ms Period to wait for replication before firing event. This is an
optimization to allow stale reads to be performed. optimization to allow stale reads to be performed.
-verbose Enables verbose output -verbose Enables verbose output
-token="" ACL token to use during requests. Defaults to that
of the agent.
` `
return strings.TrimSpace(helpText) return strings.TrimSpace(helpText)
} }

View File

@ -47,16 +47,15 @@ func HTTPAddrFlag(f *flag.FlagSet) *string {
// HTTPClient returns a new Consul HTTP client with the given address. // HTTPClient returns a new Consul HTTP client with the given address.
func HTTPClient(addr string) (*consulapi.Client, error) { func HTTPClient(addr string) (*consulapi.Client, error) {
return HTTPClientDC(addr, "") return HTTPClientConfig(func(c *consulapi.Config) {
c.Address = addr
})
} }
// HTTPClientDC returns a new Consul HTTP client with the given address and datacenter // HTTPClientConfig is used to return a new API client and modify its
func HTTPClientDC(addr, dc string) (*consulapi.Client, error) { // configuration by passing in a config modifier function.
func HTTPClientConfig(fn func(c *consulapi.Config)) (*consulapi.Client, error) {
conf := consulapi.DefaultConfig() conf := consulapi.DefaultConfig()
if envAddr := os.Getenv(HTTPAddrEnvName); addr == "" && envAddr != "" { fn(conf)
addr = envAddr
}
conf.Address = addr
conf.Datacenter = dc
return consulapi.NewClient(conf) return consulapi.NewClient(conf)
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/golang-lru"
) )
const ( const (
@ -30,6 +31,9 @@ const (
// anonymousToken is the token ID we re-write to if there // anonymousToken is the token ID we re-write to if there
// is no token ID provided // is no token ID provided
anonymousToken = "anonymous" anonymousToken = "anonymous"
// Maximum number of cached ACL entries
aclCacheSize = 256
) )
var ( var (
@ -89,15 +93,57 @@ func (s *Server) resolveToken(id string) (acl.ACL, error) {
} }
// Use our non-authoritative cache // Use our non-authoritative cache
return s.lookupACL(id, authDC) return s.aclCache.lookupACL(id, authDC)
}
// rpcFn is used to make an RPC call to the client or server.
type rpcFn func(string, interface{}, interface{}) error
// aclCache is used to cache ACL's and policies.
type aclCache struct {
config *Config
logger *log.Logger
// acls is a non-authoritative ACL cache
acls *lru.Cache
// aclPolicyCache is a policy cache
policies *lru.Cache
// The RPC function used to talk to the client/server
rpc rpcFn
}
// newAclCache returns a new cache layer for ACLs and policies
func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error) {
var err error
cache := &aclCache{
config: conf,
logger: logger,
rpc: rpc,
}
// Initialize the non-authoritative ACL cache
cache.acls, err = lru.New(aclCacheSize)
if err != nil {
return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
}
// Initialize the ACL policy cache
cache.policies, err = lru.New(aclCacheSize)
if err != nil {
return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err)
}
return cache, nil
} }
// lookupACL is used when we are non-authoritative, and need // lookupACL is used when we are non-authoritative, and need
// to resolve an ACL // to resolve an ACL
func (s *Server) lookupACL(id, authDC string) (acl.ACL, error) { func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
// Check the cache for the ACL // Check the cache for the ACL
var cached *aclCacheEntry var cached *aclCacheEntry
raw, ok := s.aclCache.Get(id) raw, ok := c.acls.Get(id)
if ok { if ok {
cached = raw.(*aclCacheEntry) cached = raw.(*aclCacheEntry)
} }
@ -119,22 +165,22 @@ func (s *Server) lookupACL(id, authDC string) (acl.ACL, error) {
args.ETag = cached.ETag args.ETag = cached.ETag
} }
var out structs.ACLPolicy var out structs.ACLPolicy
err := s.RPC("ACL.GetPolicy", &args, &out) err := c.rpc("ACL.GetPolicy", &args, &out)
// Handle the happy path // Handle the happy path
if err == nil { if err == nil {
return s.useACLPolicy(id, authDC, cached, &out) return c.useACLPolicy(id, authDC, cached, &out)
} }
// Check for not-found // Check for not-found
if strings.Contains(err.Error(), aclNotFound) { if strings.Contains(err.Error(), aclNotFound) {
return nil, errors.New(aclNotFound) return nil, errors.New(aclNotFound)
} else { } else {
s.logger.Printf("[ERR] consul.acl: Failed to get policy for '%s': %v", id, err) c.logger.Printf("[ERR] consul.acl: Failed to get policy for '%s': %v", id, err)
} }
// Unable to refresh, apply the down policy // Unable to refresh, apply the down policy
switch s.config.ACLDownPolicy { switch c.config.ACLDownPolicy {
case "allow": case "allow":
return acl.AllowAll(), nil return acl.AllowAll(), nil
case "extend-cache": case "extend-cache":
@ -148,7 +194,7 @@ func (s *Server) lookupACL(id, authDC string) (acl.ACL, error) {
} }
// useACLPolicy handles an ACLPolicy response // useACLPolicy handles an ACLPolicy response
func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) { func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) {
// Check if we can used the cached policy // Check if we can used the cached policy
if cached != nil && cached.ETag == p.ETag { if cached != nil && cached.ETag == p.ETag {
if p.TTL > 0 { if p.TTL > 0 {
@ -159,7 +205,7 @@ func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *struc
// Check for a cached compiled policy // Check for a cached compiled policy
var compiled acl.ACL var compiled acl.ACL
raw, ok := s.aclPolicyCache.Get(p.ETag) raw, ok := c.policies.Get(p.ETag)
if ok { if ok {
compiled = raw.(acl.ACL) compiled = raw.(acl.ACL)
} else { } else {
@ -167,7 +213,7 @@ func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *struc
parent := acl.RootACL(p.Parent) parent := acl.RootACL(p.Parent)
if parent == nil { if parent == nil {
var err error var err error
parent, err = s.lookupACL(p.Parent, authDC) parent, err = c.lookupACL(p.Parent, authDC)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -180,7 +226,7 @@ func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *struc
} }
// Cache the policy // Cache the policy
s.aclPolicyCache.Add(p.ETag, acl) c.policies.Add(p.ETag, acl)
compiled = acl compiled = acl
} }
@ -192,7 +238,7 @@ func (s *Server) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *struc
if p.TTL > 0 { if p.TTL > 0 {
cached.Expires = time.Now().Add(p.TTL) cached.Expires = time.Now().Add(p.TTL)
} }
s.aclCache.Add(id, cached) c.acls.Add(id, cached)
return compiled, nil return compiled, nil
} }

View File

@ -201,11 +201,6 @@ func (c *Client) RemoveFailedNode(node string) error {
return c.serf.RemoveFailedNode(node) return c.serf.RemoveFailedNode(node)
} }
// UserEvent is used to fire an event via the Serf layer
func (c *Client) UserEvent(name string, payload []byte) error {
return c.serf.UserEvent(userEventName(name), payload, false)
}
// KeyManagerLAN returns the LAN Serf keyring manager // KeyManagerLAN returns the LAN Serf keyring manager
func (c *Client) KeyManagerLAN() *serf.KeyManager { func (c *Client) KeyManagerLAN() *serf.KeyManager {
return c.serf.KeyManager() return c.serf.KeyManager()

View File

@ -276,26 +276,18 @@ func TestClientServer_UserEvent(t *testing.T) {
}) })
// Fire the user event // Fire the user event
err := c1.UserEvent("foo", []byte("bar")) if err := s1.UserEvent("foo", []byte("baz")); err != nil {
if err != nil {
t.Fatalf("err: %v", err)
}
err = s1.UserEvent("bar", []byte("baz"))
if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Wait for all the events // Wait for all the events
var serverFoo, serverBar, clientFoo, clientBar bool var clientReceived, serverReceived bool
for i := 0; i < 4; i++ { for i := 0; i < 2; i++ {
select { select {
case e := <-clientOut: case e := <-clientOut:
switch e.Name { switch e.Name {
case "foo": case "foo":
clientFoo = true clientReceived = true
case "bar":
clientBar = true
default: default:
t.Fatalf("Bad: %#v", e) t.Fatalf("Bad: %#v", e)
} }
@ -303,9 +295,7 @@ func TestClientServer_UserEvent(t *testing.T) {
case e := <-serverOut: case e := <-serverOut:
switch e.Name { switch e.Name {
case "foo": case "foo":
serverFoo = true serverReceived = true
case "bar":
serverBar = true
default: default:
t.Fatalf("Bad: %#v", e) t.Fatalf("Bad: %#v", e)
} }
@ -315,7 +305,7 @@ func TestClientServer_UserEvent(t *testing.T) {
} }
} }
if !(serverFoo && serverBar && clientFoo && clientBar) { if !serverReceived || !clientReceived {
t.Fatalf("missing events") t.Fatalf("missing events")
} }
} }

View File

@ -57,11 +57,22 @@ func (m *Internal) EventFire(args *structs.EventFireRequest,
return err return err
} }
// Check ACLs
acl, err := m.srv.resolveToken(args.Token)
if err != nil {
return err
}
if acl != nil && !acl.EventWrite(args.Name) {
m.srv.logger.Printf("[WARN] consul: user event %q blocked by ACLs", args.Name)
return permissionDeniedErr
}
// Set the query meta data // Set the query meta data
m.srv.setQueryMeta(&reply.QueryMeta) m.srv.setQueryMeta(&reply.QueryMeta)
// Fire the event // Fire the event
return m.srv.UserEvent(args.Name, args.Payload) return m.srv.serfLAN.UserEvent(args.Name, args.Payload, false)
} }
// KeyringOperation will query the WAN and LAN gossip keyrings of all nodes. // KeyringOperation will query the WAN and LAN gossip keyrings of all nodes.

View File

@ -325,3 +325,37 @@ func TestInternal_NodeDump_FilterACL(t *testing.T) {
} }
} }
} }
func TestInternal_EventFire_Token(t *testing.T) {
dir, srv := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDownPolicy = "deny"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir)
defer srv.Shutdown()
client := rpcClient(t, srv)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
// No token is rejected
event := structs.EventFireRequest{
Name: "foo",
Datacenter: "dc1",
Payload: []byte("nope"),
}
err := client.Call("Internal.EventFire", &event, nil)
if err == nil || err.Error() != permissionDenied {
t.Fatalf("bad: %s", err)
}
// Root token is allowed to fire
event.Token = "root"
err = client.Call("Internal.EventFire", &event, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
}

View File

@ -16,7 +16,6 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/golang-lru"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb" "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -45,9 +44,6 @@ const (
// open to a server // open to a server
serverMaxStreams = 64 serverMaxStreams = 64
// Maximum number of cached ACL entries
aclCacheSize = 256
// raftLogCacheSize is the maximum number of logs to cache in-memory. // raftLogCacheSize is the maximum number of logs to cache in-memory.
// This is used to reduce disk I/O for the recently commited entries. // This is used to reduce disk I/O for the recently commited entries.
raftLogCacheSize = 512 raftLogCacheSize = 512
@ -63,11 +59,8 @@ type Server struct {
// aclAuthCache is the authoritative ACL cache // aclAuthCache is the authoritative ACL cache
aclAuthCache *acl.Cache aclAuthCache *acl.Cache
// aclCache is a non-authoritative ACL cache // aclCache is the non-authoritative ACL cache.
aclCache *lru.Cache aclCache *aclCache
// aclPolicyCache is a policy cache
aclPolicyCache *lru.Cache
// Consul configuration // Consul configuration
config *Config config *Config
@ -228,18 +221,10 @@ func NewServer(config *Config) (*Server, error) {
return nil, fmt.Errorf("Failed to create ACL cache: %v", err) return nil, fmt.Errorf("Failed to create ACL cache: %v", err)
} }
// Initialize the non-authoritative ACL cache // Set up the non-authoritative ACL cache
s.aclCache, err = lru.New(aclCacheSize) if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil {
if err != nil {
s.Shutdown() s.Shutdown()
return nil, fmt.Errorf("Failed to create ACL cache: %v", err) return nil, err
}
// Initialize the ACL policy cache
s.aclPolicyCache, err = lru.New(aclCacheSize)
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err)
} }
// Initialize the RPC layer // Initialize the RPC layer
@ -631,11 +616,6 @@ func (s *Server) RemoveFailedNode(node string) error {
return nil return nil
} }
// UserEvent is used to fire an event via the Serf layer on the LAN
func (s *Server) UserEvent(name string, payload []byte) error {
return s.serfLAN.UserEvent(userEventName(name), payload, false)
}
// IsLeader checks if this server is the cluster leader // IsLeader checks if this server is the cluster leader
func (s *Server) IsLeader() bool { func (s *Server) IsLeader() bool {
return s.raft.State() == raft.Leader return s.raft.State() == raft.Leader

View File

@ -56,3 +56,5 @@ The list of available flags are:
a matching tag. This must be used with `-service`. As an example, you may a matching tag. This must be used with `-service`. As an example, you may
do "-service mysql -tag slave". do "-service mysql -tag slave".
* `-token` - The ACL token to use when firing the event. This token must have
write-level privileges for the event specified. Defaults to that of the agent.

View File

@ -62,3 +62,6 @@ The list of available flags are:
* `-verbose` - Enables verbose output. * `-verbose` - Enables verbose output.
* `-token` - The ACL token to use during requests. This token must have access
to the prefix in the KV store as well as exec "write" access for the _rexec
event. Defaults to that of the agent.

View File

@ -19,7 +19,7 @@ on tokens to which fine grained rules can be applied. It is very similar to
When the ACL system was launched in Consul 0.4, it was only possible to specify When the ACL system was launched in Consul 0.4, it was only possible to specify
policies for the KV store. In Consul 0.5, ACL policies were extended to service policies for the KV store. In Consul 0.5, ACL policies were extended to service
registrations. In Consul 0.6, ACL's were further extended to restrict the registrations. In Consul 0.6, ACL's were further extended to restrict the
service discovery mechanisms. service discovery mechanisms and user events..
## ACL Design ## ACL Design
@ -126,6 +126,27 @@ The most secure way of handling service registration and discovery is to run
Consul 0.6+ and issue tokens with explicit access for the services or service Consul 0.6+ and issue tokens with explicit access for the services or service
prefixes which are expected to run on each agent. prefixes which are expected to run on each agent.
### Blacklist mode and Events
Similar to the above, if your
[`acl_default_policy`](/docs/agent/options.html#acl_default_policy) is set to
`deny`, the `anonymous` token will have no access to allow firing user events.
This deviates from pre-0.6.0 builds, where user events were completely
unrestricted.
Events have their own first-class expression in the ACL syntax. To restore
access to user events from arbitrary agents, configure an ACL rule like the
following for the `anonymous` token:
```
event "" {
policy = "write"
}
```
As always, the more secure way to handle user events is to explicitly grant
access to each API token based on the events they should be able to fire.
### Bootstrapping ACLs ### Bootstrapping ACLs
Bootstrapping the ACL system is done by providing an initial [`acl_master_token` Bootstrapping the ACL system is done by providing an initial [`acl_master_token`
@ -161,6 +182,12 @@ and ACLs can be found [below](#discovery_acls).
The policy for the "consul" service is always "write" as it is managed internally by Consul. The policy for the "consul" service is always "write" as it is managed internally by Consul.
User event policies are defined by coupling an event name prefix with a policy.
The rules are enforced using a longest-prefix match policy. The default rule,
applied to any user event without a matching policy, is provided by an empty
string. An event policy is one of "read", "write", or "deny". Currently, only
the "write" level is enforced during event firing. Events can always be read.
We make use of We make use of
the [HashiCorp Configuration Language (HCL)](https://github.com/hashicorp/hcl/) the [HashiCorp Configuration Language (HCL)](https://github.com/hashicorp/hcl/)
to specify policy. This language is human readable and interoperable to specify policy. This language is human readable and interoperable
@ -192,6 +219,16 @@ service "" {
service "secure-" { service "secure-" {
policy = "read" policy = "read"
} }
# Allow firing any user event by default.
event "" {
policy = "write"
}
# Deny firing events prefixed with "destroy-".
event "destroy-" {
policy = "deny"
}
``` ```
This is equivalent to the following JSON input: This is equivalent to the following JSON input:
@ -216,6 +253,14 @@ This is equivalent to the following JSON input:
"secure-": { "secure-": {
"policy": "read" "policy": "read"
} }
},
"event": {
"": {
"policy": "write"
},
"destroy-": {
"policy": "deny"
}
} }
} }
``` ```