mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
473 lines
13 KiB
473 lines
13 KiB
package testutil |
|
|
|
// TestServer is a test helper. It uses a fork/exec model to create |
|
// a test Consul server instance in the background and initialize it |
|
// with some data and/or services. The test server can then be used |
|
// to run a unit test, and offers an easy API to tear itself down |
|
// when the test has completed. The only prerequisite is to have a consul |
|
// binary available on the $PATH. |
|
// |
|
// This package does not use Consul's official API client. This is |
|
// because we use TestServer to test the API client, which would |
|
// otherwise cause an import cycle. |
|
|
|
import ( |
|
"bytes" |
|
"encoding/base64" |
|
"encoding/json" |
|
"fmt" |
|
"io" |
|
"io/ioutil" |
|
"net" |
|
"net/http" |
|
"os" |
|
"os/exec" |
|
"strings" |
|
"sync/atomic" |
|
"testing" |
|
|
|
"github.com/hashicorp/go-cleanhttp" |
|
) |
|
|
|
// offset is used to atomically increment the port numbers. |
|
var offset uint64 |
|
|
|
// TestPortConfig configures the various ports used for services |
|
// provided by the Consul server. |
|
type TestPortConfig struct { |
|
DNS int `json:"dns,omitempty"` |
|
HTTP int `json:"http,omitempty"` |
|
RPC int `json:"rpc,omitempty"` |
|
SerfLan int `json:"serf_lan,omitempty"` |
|
SerfWan int `json:"serf_wan,omitempty"` |
|
Server int `json:"server,omitempty"` |
|
} |
|
|
|
// TestAddressConfig contains the bind addresses for various |
|
// components of the Consul server. |
|
type TestAddressConfig struct { |
|
HTTP string `json:"http,omitempty"` |
|
} |
|
|
|
// TestServerConfig is the main server configuration struct. |
|
type TestServerConfig struct { |
|
NodeName string `json:"node_name"` |
|
Bootstrap bool `json:"bootstrap,omitempty"` |
|
Server bool `json:"server,omitempty"` |
|
DataDir string `json:"data_dir,omitempty"` |
|
Datacenter string `json:"datacenter,omitempty"` |
|
DisableCheckpoint bool `json:"disable_update_check"` |
|
LogLevel string `json:"log_level,omitempty"` |
|
Bind string `json:"bind_addr,omitempty"` |
|
Addresses *TestAddressConfig `json:"addresses,omitempty"` |
|
Ports *TestPortConfig `json:"ports,omitempty"` |
|
ACLMasterToken string `json:"acl_master_token,omitempty"` |
|
ACLDatacenter string `json:"acl_datacenter,omitempty"` |
|
ACLDefaultPolicy string `json:"acl_default_policy,omitempty"` |
|
Stdout, Stderr io.Writer `json:"-"` |
|
} |
|
|
|
// ServerConfigCallback is a function interface which can be |
|
// passed to NewTestServerConfig to modify the server config. |
|
type ServerConfigCallback func(c *TestServerConfig) |
|
|
|
// defaultServerConfig returns a new TestServerConfig struct |
|
// with all of the listen ports incremented by one. |
|
func defaultServerConfig() *TestServerConfig { |
|
idx := int(atomic.AddUint64(&offset, 1)) |
|
|
|
return &TestServerConfig{ |
|
NodeName: fmt.Sprintf("node%d", idx), |
|
DisableCheckpoint: true, |
|
Bootstrap: true, |
|
Server: true, |
|
LogLevel: "debug", |
|
Bind: "127.0.0.1", |
|
Addresses: &TestAddressConfig{}, |
|
Ports: &TestPortConfig{ |
|
DNS: 20000 + idx, |
|
HTTP: 21000 + idx, |
|
RPC: 22000 + idx, |
|
SerfLan: 23000 + idx, |
|
SerfWan: 24000 + idx, |
|
Server: 25000 + idx, |
|
}, |
|
} |
|
} |
|
|
|
// TestService is used to serialize a service definition. |
|
type TestService struct { |
|
ID string `json:",omitempty"` |
|
Name string `json:",omitempty"` |
|
Tags []string `json:",omitempty"` |
|
Address string `json:",omitempty"` |
|
Port int `json:",omitempty"` |
|
} |
|
|
|
// TestCheck is used to serialize a check definition. |
|
type TestCheck struct { |
|
ID string `json:",omitempty"` |
|
Name string `json:",omitempty"` |
|
ServiceID string `json:",omitempty"` |
|
TTL string `json:",omitempty"` |
|
} |
|
|
|
// TestKVResponse is what we use to decode KV data. |
|
type TestKVResponse struct { |
|
Value string |
|
} |
|
|
|
// TestServer is the main server wrapper struct. |
|
type TestServer struct { |
|
PID int |
|
Config *TestServerConfig |
|
t *testing.T |
|
|
|
HTTPAddr string |
|
LANAddr string |
|
WANAddr string |
|
|
|
HttpClient *http.Client |
|
} |
|
|
|
// NewTestServer is an easy helper method to create a new Consul |
|
// test server with the most basic configuration. |
|
func NewTestServer(t *testing.T) *TestServer { |
|
return NewTestServerConfig(t, nil) |
|
} |
|
|
|
// NewTestServerConfig creates a new TestServer, and makes a call to |
|
// an optional callback function to modify the configuration. |
|
func NewTestServerConfig(t *testing.T, cb ServerConfigCallback) *TestServer { |
|
if path, err := exec.LookPath("consul"); err != nil || path == "" { |
|
t.Skip("consul not found on $PATH, skipping") |
|
} |
|
|
|
dataDir, err := ioutil.TempDir("", "consul") |
|
if err != nil { |
|
t.Fatalf("err: %s", err) |
|
} |
|
|
|
configFile, err := ioutil.TempFile(dataDir, "config") |
|
if err != nil { |
|
defer os.RemoveAll(dataDir) |
|
t.Fatalf("err: %s", err) |
|
} |
|
|
|
consulConfig := defaultServerConfig() |
|
consulConfig.DataDir = dataDir |
|
|
|
if cb != nil { |
|
cb(consulConfig) |
|
} |
|
|
|
configContent, err := json.Marshal(consulConfig) |
|
if err != nil { |
|
t.Fatalf("err: %s", err) |
|
} |
|
|
|
if _, err := configFile.Write(configContent); err != nil { |
|
t.Fatalf("err: %s", err) |
|
} |
|
configFile.Close() |
|
|
|
stdout := io.Writer(os.Stdout) |
|
if consulConfig.Stdout != nil { |
|
stdout = consulConfig.Stdout |
|
} |
|
|
|
stderr := io.Writer(os.Stderr) |
|
if consulConfig.Stderr != nil { |
|
stderr = consulConfig.Stderr |
|
} |
|
|
|
// Start the server |
|
cmd := exec.Command("consul", "agent", "-config-file", configFile.Name()) |
|
cmd.Stdout = stdout |
|
cmd.Stderr = stderr |
|
if err := cmd.Start(); err != nil { |
|
t.Fatalf("err: %s", err) |
|
} |
|
|
|
var httpAddr string |
|
var client *http.Client |
|
if strings.HasPrefix(consulConfig.Addresses.HTTP, "unix://") { |
|
httpAddr = consulConfig.Addresses.HTTP |
|
trans := cleanhttp.DefaultTransport() |
|
trans.Dial = func(_, _ string) (net.Conn, error) { |
|
return net.Dial("unix", httpAddr[7:]) |
|
} |
|
client = &http.Client{ |
|
Transport: trans, |
|
} |
|
} else { |
|
httpAddr = fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.HTTP) |
|
client = cleanhttp.DefaultClient() |
|
} |
|
|
|
server := &TestServer{ |
|
Config: consulConfig, |
|
PID: cmd.Process.Pid, |
|
t: t, |
|
|
|
HTTPAddr: httpAddr, |
|
LANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfLan), |
|
WANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfWan), |
|
|
|
HttpClient: client, |
|
} |
|
|
|
// Wait for the server to be ready |
|
if consulConfig.Bootstrap { |
|
server.waitForLeader() |
|
} else { |
|
server.waitForAPI() |
|
} |
|
|
|
return server |
|
} |
|
|
|
// Stop stops the test Consul server, and removes the Consul data |
|
// directory once we are done. |
|
func (s *TestServer) Stop() { |
|
defer os.RemoveAll(s.Config.DataDir) |
|
|
|
cmd := exec.Command("kill", "-9", fmt.Sprintf("%d", s.PID)) |
|
if err := cmd.Run(); err != nil { |
|
s.t.Errorf("err: %s", err) |
|
} |
|
} |
|
|
|
// waitForAPI waits for only the agent HTTP endpoint to start |
|
// responding. This is an indication that the agent has started, |
|
// but will likely return before a leader is elected. |
|
func (s *TestServer) waitForAPI() { |
|
WaitForResult(func() (bool, error) { |
|
resp, err := s.HttpClient.Get(s.url("/v1/agent/self")) |
|
if err != nil { |
|
return false, err |
|
} |
|
defer resp.Body.Close() |
|
if err := s.requireOK(resp); err != nil { |
|
return false, err |
|
} |
|
return true, nil |
|
}, func(err error) { |
|
defer s.Stop() |
|
s.t.Fatalf("err: %s", err) |
|
}) |
|
} |
|
|
|
// waitForLeader waits for the Consul server's HTTP API to become |
|
// available, and then waits for a known leader and an index of |
|
// 1 or more to be observed to confirm leader election is done. |
|
func (s *TestServer) waitForLeader() { |
|
WaitForResult(func() (bool, error) { |
|
// Query the API and check the status code |
|
resp, err := s.HttpClient.Get(s.url("/v1/catalog/nodes")) |
|
if err != nil { |
|
return false, err |
|
} |
|
defer resp.Body.Close() |
|
if err := s.requireOK(resp); err != nil { |
|
return false, err |
|
} |
|
|
|
// Ensure we have a leader and a node registration |
|
if leader := resp.Header.Get("X-Consul-KnownLeader"); leader != "true" { |
|
fmt.Println(leader) |
|
return false, fmt.Errorf("Consul leader status: %#v", leader) |
|
} |
|
if resp.Header.Get("X-Consul-Index") == "0" { |
|
return false, fmt.Errorf("Consul index is 0") |
|
} |
|
return true, nil |
|
}, func(err error) { |
|
defer s.Stop() |
|
s.t.Fatalf("err: %s", err) |
|
}) |
|
} |
|
|
|
// url is a helper function which takes a relative URL and |
|
// makes it into a proper URL against the local Consul server. |
|
func (s *TestServer) url(path string) string { |
|
return fmt.Sprintf("http://127.0.0.1:%d%s", s.Config.Ports.HTTP, path) |
|
} |
|
|
|
// requireOK checks the HTTP response code and ensures it is acceptable. |
|
func (s *TestServer) requireOK(resp *http.Response) error { |
|
if resp.StatusCode != 200 { |
|
return fmt.Errorf("Bad status code: %d", resp.StatusCode) |
|
} |
|
return nil |
|
} |
|
|
|
// put performs a new HTTP PUT request. |
|
func (s *TestServer) put(path string, body io.Reader) *http.Response { |
|
req, err := http.NewRequest("PUT", s.url(path), body) |
|
if err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
resp, err := s.HttpClient.Do(req) |
|
if err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
if err := s.requireOK(resp); err != nil { |
|
defer resp.Body.Close() |
|
s.t.Fatal(err) |
|
} |
|
return resp |
|
} |
|
|
|
// get performs a new HTTP GET request. |
|
func (s *TestServer) get(path string) *http.Response { |
|
resp, err := s.HttpClient.Get(s.url(path)) |
|
if err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
if err := s.requireOK(resp); err != nil { |
|
defer resp.Body.Close() |
|
s.t.Fatal(err) |
|
} |
|
return resp |
|
} |
|
|
|
// encodePayload returns a new io.Reader wrapping the encoded contents |
|
// of the payload, suitable for passing directly to a new request. |
|
func (s *TestServer) encodePayload(payload interface{}) io.Reader { |
|
var encoded bytes.Buffer |
|
enc := json.NewEncoder(&encoded) |
|
if err := enc.Encode(payload); err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
return &encoded |
|
} |
|
|
|
// JoinLAN is used to join nodes within the same datacenter. |
|
func (s *TestServer) JoinLAN(addr string) { |
|
resp := s.get("/v1/agent/join/" + addr) |
|
resp.Body.Close() |
|
} |
|
|
|
// JoinWAN is used to join remote datacenters together. |
|
func (s *TestServer) JoinWAN(addr string) { |
|
resp := s.get("/v1/agent/join/" + addr + "?wan=1") |
|
resp.Body.Close() |
|
} |
|
|
|
// SetKV sets an individual key in the K/V store. |
|
func (s *TestServer) SetKV(key string, val []byte) { |
|
resp := s.put("/v1/kv/"+key, bytes.NewBuffer(val)) |
|
resp.Body.Close() |
|
} |
|
|
|
// GetKV retrieves a single key and returns its value |
|
func (s *TestServer) GetKV(key string) []byte { |
|
resp := s.get("/v1/kv/" + key) |
|
defer resp.Body.Close() |
|
|
|
raw, err := ioutil.ReadAll(resp.Body) |
|
if err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
|
|
var result []*TestKVResponse |
|
if err := json.Unmarshal(raw, &result); err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
if len(result) < 1 { |
|
s.t.Fatalf("key does not exist: %s", key) |
|
} |
|
|
|
v, err := base64.StdEncoding.DecodeString(result[0].Value) |
|
if err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
|
|
return []byte(v) |
|
} |
|
|
|
// PopulateKV fills the Consul KV with data from a generic map. |
|
func (s *TestServer) PopulateKV(data map[string][]byte) { |
|
for k, v := range data { |
|
s.SetKV(k, v) |
|
} |
|
} |
|
|
|
// ListKV returns a list of keys present in the KV store. This will list all |
|
// keys under the given prefix recursively and return them as a slice. |
|
func (s *TestServer) ListKV(prefix string) []string { |
|
resp := s.get("/v1/kv/" + prefix + "?keys") |
|
defer resp.Body.Close() |
|
|
|
raw, err := ioutil.ReadAll(resp.Body) |
|
if err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
|
|
var result []string |
|
if err := json.Unmarshal(raw, &result); err != nil { |
|
s.t.Fatalf("err: %s", err) |
|
} |
|
return result |
|
} |
|
|
|
// AddService adds a new service to the Consul instance. It also |
|
// automatically adds a health check with the given status, which |
|
// can be one of "passing", "warning", or "critical". |
|
func (s *TestServer) AddService(name, status string, tags []string) { |
|
svc := &TestService{ |
|
Name: name, |
|
Tags: tags, |
|
} |
|
payload := s.encodePayload(svc) |
|
s.put("/v1/agent/service/register", payload) |
|
|
|
chkName := "service:" + name |
|
chk := &TestCheck{ |
|
Name: chkName, |
|
ServiceID: name, |
|
TTL: "10m", |
|
} |
|
payload = s.encodePayload(chk) |
|
s.put("/v1/agent/check/register", payload) |
|
|
|
switch status { |
|
case "passing": |
|
s.put("/v1/agent/check/pass/"+chkName, nil) |
|
case "warning": |
|
s.put("/v1/agent/check/warn/"+chkName, nil) |
|
case "critical": |
|
s.put("/v1/agent/check/fail/"+chkName, nil) |
|
default: |
|
s.t.Fatalf("Unrecognized status: %s", status) |
|
} |
|
} |
|
|
|
// AddCheck adds a check to the Consul instance. If the serviceID is |
|
// left empty (""), then the check will be associated with the node. |
|
// The check status may be "passing", "warning", or "critical". |
|
func (s *TestServer) AddCheck(name, serviceID, status string) { |
|
chk := &TestCheck{ |
|
ID: name, |
|
Name: name, |
|
TTL: "10m", |
|
} |
|
if serviceID != "" { |
|
chk.ServiceID = serviceID |
|
} |
|
|
|
payload := s.encodePayload(chk) |
|
s.put("/v1/agent/check/register", payload) |
|
|
|
switch status { |
|
case "passing": |
|
s.put("/v1/agent/check/pass/"+name, nil) |
|
case "warning": |
|
s.put("/v1/agent/check/warn/"+name, nil) |
|
case "critical": |
|
s.put("/v1/agent/check/fail/"+name, nil) |
|
default: |
|
s.t.Fatalf("Unrecognized status: %s", status) |
|
} |
|
}
|
|
|