|
|
|
@ -50,7 +50,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
|
|
|
|
|
// Instead, the lock-delay must be enforced before commit. This means that
|
|
|
|
|
// only the wall-time of the leader node is used, preventing any inconsistencies.
|
|
|
|
|
if args.Op == structs.KVSLock { |
|
|
|
|
state := k.srv.fsm.State() |
|
|
|
|
state := k.srv.fsm.StateNew() |
|
|
|
|
expires := state.KVSLockDelay(args.DirEnt.Key) |
|
|
|
|
if expires.After(time.Now()) { |
|
|
|
|
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v", |
|
|
|
@ -89,14 +89,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Get the local state
|
|
|
|
|
state := k.srv.fsm.State() |
|
|
|
|
opts := blockingRPCOptions{ |
|
|
|
|
queryOpts: &args.QueryOptions, |
|
|
|
|
queryMeta: &reply.QueryMeta, |
|
|
|
|
kvWatch: true, |
|
|
|
|
kvPrefix: args.Key, |
|
|
|
|
run: func() error { |
|
|
|
|
index, ent, err := state.KVSGet(args.Key) |
|
|
|
|
state := k.srv.fsm.StateNew() |
|
|
|
|
return k.srv.blockingRPCNew( |
|
|
|
|
&args.QueryOptions, |
|
|
|
|
&reply.QueryMeta, |
|
|
|
|
state.GetKVSWatch(args.Key), |
|
|
|
|
func() error { |
|
|
|
|
ent, err := state.KVSGet(args.Key) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -106,20 +105,14 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
|
|
|
|
if ent == nil { |
|
|
|
|
// Must provide non-zero index to prevent blocking
|
|
|
|
|
// Index 1 is impossible anyways (due to Raft internals)
|
|
|
|
|
if index == 0 { |
|
|
|
|
reply.Index = 1 |
|
|
|
|
} else { |
|
|
|
|
reply.Index = index |
|
|
|
|
} |
|
|
|
|
reply.Entries = nil |
|
|
|
|
} else { |
|
|
|
|
reply.Index = ent.ModifyIndex |
|
|
|
|
reply.Entries = structs.DirEntries{ent} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
return k.srv.blockingRPCOpt(&opts) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// List is used to list all keys with a given prefix
|
|
|
|
@ -134,14 +127,13 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Get the local state
|
|
|
|
|
state := k.srv.fsm.State() |
|
|
|
|
opts := blockingRPCOptions{ |
|
|
|
|
queryOpts: &args.QueryOptions, |
|
|
|
|
queryMeta: &reply.QueryMeta, |
|
|
|
|
kvWatch: true, |
|
|
|
|
kvPrefix: args.Key, |
|
|
|
|
run: func() error { |
|
|
|
|
tombIndex, index, ent, err := state.KVSList(args.Key) |
|
|
|
|
state := k.srv.fsm.StateNew() |
|
|
|
|
return k.srv.blockingRPCNew( |
|
|
|
|
&args.QueryOptions, |
|
|
|
|
&reply.QueryMeta, |
|
|
|
|
state.GetKVSWatch(args.Key), |
|
|
|
|
func() error { |
|
|
|
|
index, ent, err := state.KVSList(args.Key) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -158,25 +150,12 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
|
|
|
|
reply.Index = index |
|
|
|
|
} |
|
|
|
|
reply.Entries = nil |
|
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
// Determine the maximum affected index
|
|
|
|
|
var maxIndex uint64 |
|
|
|
|
for _, e := range ent { |
|
|
|
|
if e.ModifyIndex > maxIndex { |
|
|
|
|
maxIndex = e.ModifyIndex |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if tombIndex > maxIndex { |
|
|
|
|
maxIndex = tombIndex |
|
|
|
|
} |
|
|
|
|
reply.Index = maxIndex |
|
|
|
|
reply.Index = index |
|
|
|
|
reply.Entries = ent |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
return k.srv.blockingRPCOpt(&opts) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ListKeys is used to list all keys with a given prefix to a separator
|
|
|
|
@ -191,22 +170,29 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Get the local state
|
|
|
|
|
state := k.srv.fsm.State() |
|
|
|
|
opts := blockingRPCOptions{ |
|
|
|
|
queryOpts: &args.QueryOptions, |
|
|
|
|
queryMeta: &reply.QueryMeta, |
|
|
|
|
kvWatch: true, |
|
|
|
|
kvPrefix: args.Prefix, |
|
|
|
|
run: func() error { |
|
|
|
|
state := k.srv.fsm.StateNew() |
|
|
|
|
return k.srv.blockingRPCNew( |
|
|
|
|
&args.QueryOptions, |
|
|
|
|
&reply.QueryMeta, |
|
|
|
|
state.GetKVSWatch(args.Prefix), |
|
|
|
|
func() error { |
|
|
|
|
index, keys, err := state.KVSListKeys(args.Prefix, args.Seperator) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Must provide non-zero index to prevent blocking
|
|
|
|
|
// Index 1 is impossible anyways (due to Raft internals)
|
|
|
|
|
if index == 0 { |
|
|
|
|
reply.Index = 1 |
|
|
|
|
} else { |
|
|
|
|
reply.Index = index |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if acl != nil { |
|
|
|
|
keys = FilterKeys(acl, keys) |
|
|
|
|
} |
|
|
|
|
reply.Keys = keys |
|
|
|
|
return err |
|
|
|
|
|
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
return k.srv.blockingRPCOpt(&opts) |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|