Revendor serf to pull in keyring list truncation changes. (#5251)

pull/5256/head
Matt Keeler 6 years ago committed by GitHub
parent 0da4502740
commit 8f0d622a54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -123,8 +123,30 @@ func (q *Query) Deadline() time.Time {
return q.deadline
}
// Respond is used to send a response to the user query
func (q *Query) Respond(buf []byte) error {
func (q *Query) createResponse(buf []byte) messageQueryResponse {
// Create response
return messageQueryResponse{
LTime: q.LTime,
ID: q.id,
From: q.serf.config.NodeName,
Payload: buf,
}
}
// Check response size
func (q *Query) checkResponseSize(resp []byte) error {
if len(resp) > q.serf.config.QueryResponseSizeLimit {
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
return nil
}
func (q *Query) respondWithMessageAndResponse(raw []byte, resp messageQueryResponse) error {
// Check the size limit
if err := q.checkResponseSize(raw); err != nil {
return err
}
q.respLock.Lock()
defer q.respLock.Unlock()
@ -138,25 +160,6 @@ func (q *Query) Respond(buf []byte) error {
return fmt.Errorf("response is past the deadline")
}
// Create response
resp := messageQueryResponse{
LTime: q.LTime,
ID: q.id,
From: q.serf.config.NodeName,
Payload: buf,
}
// Send a direct response
raw, err := encodeMessage(messageQueryResponseType, &resp)
if err != nil {
return fmt.Errorf("failed to format response: %v", err)
}
// Check the size limit
if len(raw) > q.serf.config.QueryResponseSizeLimit {
return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
}
// Send the response directly to the originator
addr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
if err := q.serf.memberlist.SendTo(&addr, raw); err != nil {
@ -170,5 +173,24 @@ func (q *Query) Respond(buf []byte) error {
// Clear the deadline, responses sent
q.deadline = time.Time{}
return nil
}
// Respond is used to send a response to the user query
func (q *Query) Respond(buf []byte) error {
// Create response
resp := q.createResponse(buf)
// Encode response
raw, err := encodeMessage(messageQueryResponseType, resp)
if err != nil {
return fmt.Errorf("failed to format response: %v", err)
}
if err := q.respondWithMessageAndResponse(raw, resp); err != nil {
return fmt.Errorf("failed to respond to key query: %v", err)
}
return nil
}

@ -2,6 +2,7 @@ package serf
import (
"encoding/base64"
"fmt"
"log"
"strings"
)
@ -28,6 +29,13 @@ const (
// listKeysQuery is used to list all known keys in the cluster
listKeysQuery = "list-keys"
// minEncodedKeyLength is used to compute the max number of keys in a list key
// response. eg 1024/25 = 40. a message with max size of 1024 bytes cannot
// contain more than 40 keys. There is a test
// (TestSerfQueries_estimateMaxKeysInListKeyResponse) which does the
// computation and in case of changes, the value can be adjusted.
minEncodedKeyLength = 25
)
// internalQueryName is used to generate a query name for an internal query
@ -149,17 +157,62 @@ func (s *serfQueries) handleConflict(q *Query) {
}
}
// sendKeyResponse handles responding to key-related queries.
func (s *serfQueries) sendKeyResponse(q *Query, resp *nodeKeyResponse) {
buf, err := encodeMessage(messageKeyResponseType, resp)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode key response: %v", err)
return
func (s *serfQueries) keyListResponseWithCorrectSize(q *Query, resp *nodeKeyResponse) ([]byte, messageQueryResponse, error) {
maxListKeys := q.serf.config.QueryResponseSizeLimit / minEncodedKeyLength
actual := len(resp.Keys)
for i := maxListKeys; i >= 0; i-- {
buf, err := encodeMessage(messageKeyResponseType, resp)
if err != nil {
return nil, messageQueryResponse{}, err
}
// Create response
qresp := q.createResponse(buf)
// Encode response
raw, err := encodeMessage(messageQueryResponseType, qresp)
if err != nil {
return nil, messageQueryResponse{}, err
}
// Check the size limit
if err = q.checkResponseSize(raw); err != nil {
resp.Keys = resp.Keys[0:i]
resp.Message = fmt.Sprintf("truncated key list response, showing first %d of %d keys", i, actual)
continue
}
if actual > i {
s.logger.Printf("[WARN] serf: %s", resp.Message)
}
return raw, qresp, nil
}
return nil, messageQueryResponse{}, fmt.Errorf("Failed to truncate response so that it fits into message")
}
if err := q.Respond(buf); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err)
return
// sendKeyResponse handles responding to key-related queries.
func (s *serfQueries) sendKeyResponse(q *Query, resp *nodeKeyResponse) {
switch q.Name {
case internalQueryName(listKeysQuery):
raw, qresp, err := s.keyListResponseWithCorrectSize(q, resp)
if err != nil {
s.logger.Printf("[ERR] serf: %v", err)
return
}
if err := q.respondWithMessageAndResponse(raw, qresp); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err)
return
}
default:
buf, err := encodeMessage(messageKeyResponseType, resp)
if err != nil {
s.logger.Printf("[ERR] serf: Failed to encode key response: %v", err)
return
}
if err := q.Respond(buf); err != nil {
s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err)
return
}
}
}

@ -68,6 +68,11 @@ func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
resp.NumErr++
}
if nodeResponse.Result && len(nodeResponse.Message) > 0 {
resp.Messages[r.From] = nodeResponse.Message
k.serf.logger.Println("[WARN] serf:", nodeResponse.Message)
}
// Currently only used for key list queries, this adds keys to a counter
// and increments them for each node response which contains them.
for _, key := range nodeResponse.Keys {

@ -150,8 +150,8 @@
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","checksumSHA1":"qnlqWJYV81ENr61SZk9c65R1mDo=","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3","revisionTime":"2015-11-16T02:03:38Z"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"3U9bQLEMikE47n4TZP6uOdgXIyQ=","revision":"da92cfe76e0c1c9b94bbc9d884ec4b2b3b90b699","revisionTime":"2018-08-17T18:12:11Z","version":"master","versionExact":"master"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"c7f3bc96b40972e67dfbe007c1fa825cf59ac8c2","revisionTime":"2019-01-04T15:39:47Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"siLn7zwVHQk070rpd99BTktGfTs=","revision":"c7f3bc96b40972e67dfbe007c1fa825cf59ac8c2","revisionTime":"2019-01-04T15:39:47Z"},
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"65da6f27f6e551e03d99364ecc0607e91e526b00","revisionTime":"2019-01-22T20:12:06Z"},
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"zXQ+WC7vYZLkqqM38dtDQ2Erv0E=","revision":"65da6f27f6e551e03d99364ecc0607e91e526b00","revisionTime":"2019-01-22T20:12:06Z"},
{"path":"github.com/hashicorp/vault/api","checksumSHA1":"LYQZ+o7zJCda/6LibdN0spFco34=","revision":"533003e27840d9646cb4e7d23b3a113895da1dd0","revisionTime":"2018-06-20T14:55:40Z","version":"v0.10.3","versionExact":"v0.10.3"},
{"path":"github.com/hashicorp/vault/audit","checksumSHA1":"2JOC+Ur0S3U8Gqv2cfNB3zxgSBk=","revision":"c737968235c8673b872350f0a047877bee396342","revisionTime":"2018-06-20T16:45:32Z"},
{"path":"github.com/hashicorp/vault/builtin/logical/database/dbplugin","checksumSHA1":"RCwWixWwKG6j2vF9iVoxbCzo6p4=","revision":"c737968235c8673b872350f0a047877bee396342","revisionTime":"2018-06-20T16:45:32Z"},

Loading…
Cancel
Save