Include a content hash of the intention for use during replication

pull/6051/head
Matt Keeler 2019-06-26 12:28:09 -04:00
parent 747ae6bdf5
commit 4bc1277315
3 changed files with 69 additions and 4 deletions

View File

@ -141,6 +141,9 @@ func (s *Intention) Apply(
} }
} }
// make sure we set the hash prior to raft application
args.Intention.SetHash(true)
// Commit // Commit
resp, err := s.srv.raftApply(structs.IntentionRequestType, args) resp, err := s.srv.raftApply(structs.IntentionRequestType, args)
if err != nil { if err != nil {

View File

@ -1,6 +1,7 @@
package consul package consul
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -411,14 +412,14 @@ func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(er
// diffIntentions computes the difference between the local and remote intentions // diffIntentions computes the difference between the local and remote intentions
// and returns lists of deletes and updates. // and returns lists of deletes and updates.
func diffIntentions(local, remote structs.Intentions) (structs.Intentions, structs.Intentions) { func diffIntentions(local, remote structs.Intentions) (structs.Intentions, structs.Intentions) {
localIdx := make(map[string]uint64, len(local)) localIdx := make(map[string][]byte, len(local))
remoteIdx := make(map[string]struct{}, len(remote)) remoteIdx := make(map[string]struct{}, len(remote))
var deletes structs.Intentions var deletes structs.Intentions
var updates structs.Intentions var updates structs.Intentions
for _, intention := range local { for _, intention := range local {
localIdx[intention.ID] = intention.ModifyIndex localIdx[intention.ID] = intention.Hash
} }
for _, intention := range remote { for _, intention := range remote {
remoteIdx[intention.ID] = struct{}{} remoteIdx[intention.ID] = struct{}{}
@ -431,10 +432,10 @@ func diffIntentions(local, remote structs.Intentions) (structs.Intentions, struc
} }
for _, intention := range remote { for _, intention := range remote {
existingIdx, ok := localIdx[intention.ID] existingHash, ok := localIdx[intention.ID]
if !ok { if !ok {
updates = append(updates, intention) updates = append(updates, intention)
} else if existingIdx < intention.ModifyIndex { } else if bytes.Compare(existingHash, intention.Hash) != 0 {
updates = append(updates, intention) updates = append(updates, intention)
} }
} }

View File

@ -1,7 +1,9 @@
package structs package structs
import ( import (
"encoding/binary"
"fmt" "fmt"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -9,6 +11,8 @@ import (
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/mitchellh/hashstructure" "github.com/mitchellh/hashstructure"
"golang.org/x/crypto/blake2b"
) )
const ( const (
@ -70,9 +74,66 @@ type Intention struct {
// or modified. // or modified.
CreatedAt, UpdatedAt time.Time `mapstructure:"-"` CreatedAt, UpdatedAt time.Time `mapstructure:"-"`
// Hash of the contents of the intention
//
// This is needed mainly for replication purposes. When replicating from
// one DC to another keeping the content Hash will allow us to detect
// content changes more efficiently than checking every single field
Hash []byte
RaftIndex RaftIndex
} }
func (x *Intention) SetHash(force bool) []byte {
if force || x.Hash == nil {
hash, err := blake2b.New256(nil)
if err != nil {
panic(err)
}
// Any non-immutable "content" fields should be involved with the
// overall hash. The IDs are immutable which is why they aren't here.
// The raft indices are metadata similar to the hash which is why they
// aren't incorporated. CreateTime is similarly immutable
//
// The Hash is really only used for replication to determine if a token
// has changed and should be updated locally.
// Write all the user set fields
hash.Write([]byte(x.ID))
hash.Write([]byte(x.Description))
hash.Write([]byte(x.SourceNS))
hash.Write([]byte(x.SourceName))
hash.Write([]byte(x.DestinationNS))
hash.Write([]byte(x.DestinationName))
hash.Write([]byte(x.SourceType))
hash.Write([]byte(x.Action))
hash.Write([]byte(x.DefaultAddr))
binary.Write(hash, binary.LittleEndian, x.DefaultPort)
binary.Write(hash, binary.LittleEndian, x.Precedence)
// hashing the metadata
var keys []string
for k := range x.Meta {
keys = append(keys, k)
}
// keep them sorted to ensure hash stability
sort.Strings(keys)
for _, k := range keys {
hash.Write([]byte(k))
hash.Write([]byte(x.Meta[k]))
}
// Finalize the hash
hashVal := hash.Sum(nil)
x.Hash = hashVal
}
return x.Hash
}
// Validate returns an error if the intention is invalid for inserting // Validate returns an error if the intention is invalid for inserting
// or updating. // or updating.
func (x *Intention) Validate() error { func (x *Intention) Validate() error {