package testutil // TestServer is a test helper. It uses a fork/exec model to create // a test Consul server instance in the background and initialize it // with some data and/or services. The test server can then be used // to run a unit test, and offers an easy API to tear itself down // when the test has completed. The only prerequisite is to have a consul // binary available on the $PATH. // // This package does not use Consul's official API client. This is // because we use TestServer to test the API client, which would // otherwise cause an import cycle. import ( "bytes" "encoding/base64" "encoding/json" "fmt" "io" "io/ioutil" "net" "net/http" "os" "os/exec" "strings" "sync/atomic" "testing" ) // offset is used to atomically increment the port numbers. var offset uint64 // TestPortConfig configures the various ports used for services // provided by the Consul server. type TestPortConfig struct { DNS int `json:"dns,omitempty"` HTTP int `json:"http,omitempty"` RPC int `json:"rpc,omitempty"` SerfLan int `json:"serf_lan,omitempty"` SerfWan int `json:"serf_wan,omitempty"` Server int `json:"server,omitempty"` } // TestAddressConfig contains the bind addresses for various // components of the Consul server. type TestAddressConfig struct { HTTP string `json:"http,omitempty"` } // TestServerConfig is the main server configuration struct. type TestServerConfig struct { NodeName string `json:"node_name"` Bootstrap bool `json:"bootstrap,omitempty"` Server bool `json:"server,omitempty"` DataDir string `json:"data_dir,omitempty"` Datacenter string `json:"datacenter,omitempty"` DisableCheckpoint bool `json:"disable_update_check"` LogLevel string `json:"log_level,omitempty"` Bind string `json:"bind_addr,omitempty"` Addresses *TestAddressConfig `json:"addresses,omitempty"` Ports *TestPortConfig `json:"ports,omitempty"` ACLMasterToken string `json:"acl_master_token,omitempty"` ACLDatacenter string `json:"acl_datacenter,omitempty"` ACLDefaultPolicy string `json:"acl_default_policy,omitempty"` Stdout, Stderr io.Writer `json:"-"` } // ServerConfigCallback is a function interface which can be // passed to NewTestServerConfig to modify the server config. type ServerConfigCallback func(c *TestServerConfig) // defaultServerConfig returns a new TestServerConfig struct // with all of the listen ports incremented by one. func defaultServerConfig() *TestServerConfig { idx := int(atomic.AddUint64(&offset, 1)) return &TestServerConfig{ NodeName: fmt.Sprintf("node%d", idx), DisableCheckpoint: true, Bootstrap: true, Server: true, LogLevel: "debug", Bind: "127.0.0.1", Addresses: &TestAddressConfig{}, Ports: &TestPortConfig{ DNS: 20000 + idx, HTTP: 21000 + idx, RPC: 22000 + idx, SerfLan: 23000 + idx, SerfWan: 24000 + idx, Server: 25000 + idx, }, } } // TestService is used to serialize a service definition. type TestService struct { ID string `json:",omitempty"` Name string `json:",omitempty"` Tags []string `json:",omitempty"` Address string `json:",omitempty"` Port int `json:",omitempty"` } // TestCheck is used to serialize a check definition. type TestCheck struct { ID string `json:",omitempty"` Name string `json:",omitempty"` ServiceID string `json:",omitempty"` TTL string `json:",omitempty"` } // TestKVResponse is what we use to decode KV data. type TestKVResponse struct { Value string } // TestServer is the main server wrapper struct. type TestServer struct { PID int Config *TestServerConfig t *testing.T HTTPAddr string LANAddr string WANAddr string HttpClient *http.Client } // NewTestServer is an easy helper method to create a new Consul // test server with the most basic configuration. func NewTestServer(t *testing.T) *TestServer { return NewTestServerConfig(t, nil) } // NewTestServerConfig creates a new TestServer, and makes a call to // an optional callback function to modify the configuration. func NewTestServerConfig(t *testing.T, cb ServerConfigCallback) *TestServer { if path, err := exec.LookPath("consul"); err != nil || path == "" { t.Skip("consul not found on $PATH, skipping") } dataDir, err := ioutil.TempDir("", "consul") if err != nil { t.Fatalf("err: %s", err) } configFile, err := ioutil.TempFile(dataDir, "config") if err != nil { defer os.RemoveAll(dataDir) t.Fatalf("err: %s", err) } consulConfig := defaultServerConfig() consulConfig.DataDir = dataDir if cb != nil { cb(consulConfig) } configContent, err := json.Marshal(consulConfig) if err != nil { t.Fatalf("err: %s", err) } if _, err := configFile.Write(configContent); err != nil { t.Fatalf("err: %s", err) } configFile.Close() stdout := io.Writer(os.Stdout) if consulConfig.Stdout != nil { stdout = consulConfig.Stdout } stderr := io.Writer(os.Stderr) if consulConfig.Stderr != nil { stderr = consulConfig.Stderr } // Start the server cmd := exec.Command("consul", "agent", "-config-file", configFile.Name()) cmd.Stdout = stdout cmd.Stderr = stderr if err := cmd.Start(); err != nil { t.Fatalf("err: %s", err) } var httpAddr string var client *http.Client if strings.HasPrefix(consulConfig.Addresses.HTTP, "unix://") { httpAddr = consulConfig.Addresses.HTTP client = &http.Client{ Transport: &http.Transport{ Dial: func(_, _ string) (net.Conn, error) { return net.Dial("unix", httpAddr[7:]) }, }, } } else { httpAddr = fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.HTTP) client = http.DefaultClient } server := &TestServer{ Config: consulConfig, PID: cmd.Process.Pid, t: t, HTTPAddr: httpAddr, LANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfLan), WANAddr: fmt.Sprintf("127.0.0.1:%d", consulConfig.Ports.SerfWan), HttpClient: client, } // Wait for the server to be ready if consulConfig.Bootstrap { server.waitForLeader() } else { server.waitForAPI() } return server } // Stop stops the test Consul server, and removes the Consul data // directory once we are done. func (s *TestServer) Stop() { defer os.RemoveAll(s.Config.DataDir) cmd := exec.Command("kill", "-9", fmt.Sprintf("%d", s.PID)) if err := cmd.Run(); err != nil { s.t.Errorf("err: %s", err) } } // waitForAPI waits for only the agent HTTP endpoint to start // responding. This is an indication that the agent has started, // but will likely return before a leader is elected. func (s *TestServer) waitForAPI() { WaitForResult(func() (bool, error) { resp, err := s.HttpClient.Get(s.url("/v1/agent/self")) if err != nil { return false, err } defer resp.Body.Close() if err := s.requireOK(resp); err != nil { return false, err } return true, nil }, func(err error) { defer s.Stop() s.t.Fatalf("err: %s", err) }) } // waitForLeader waits for the Consul server's HTTP API to become // available, and then waits for a known leader and an index of // 1 or more to be observed to confirm leader election is done. func (s *TestServer) waitForLeader() { WaitForResult(func() (bool, error) { // Query the API and check the status code resp, err := s.HttpClient.Get(s.url("/v1/catalog/nodes")) if err != nil { return false, err } defer resp.Body.Close() if err := s.requireOK(resp); err != nil { return false, err } // Ensure we have a leader and a node registeration if leader := resp.Header.Get("X-Consul-KnownLeader"); leader != "true" { fmt.Println(leader) return false, fmt.Errorf("Consul leader status: %#v", leader) } if resp.Header.Get("X-Consul-Index") == "0" { return false, fmt.Errorf("Consul index is 0") } return true, nil }, func(err error) { defer s.Stop() s.t.Fatalf("err: %s", err) }) } // url is a helper function which takes a relative URL and // makes it into a proper URL against the local Consul server. func (s *TestServer) url(path string) string { return fmt.Sprintf("http://127.0.0.1:%d%s", s.Config.Ports.HTTP, path) } // requireOK checks the HTTP response code and ensures it is acceptable. func (s *TestServer) requireOK(resp *http.Response) error { if resp.StatusCode != 200 { return fmt.Errorf("Bad status code: %d", resp.StatusCode) } return nil } // put performs a new HTTP PUT request. func (s *TestServer) put(path string, body io.Reader) *http.Response { req, err := http.NewRequest("PUT", s.url(path), body) if err != nil { s.t.Fatalf("err: %s", err) } resp, err := s.HttpClient.Do(req) if err != nil { s.t.Fatalf("err: %s", err) } if err := s.requireOK(resp); err != nil { defer resp.Body.Close() s.t.Fatal(err) } return resp } // get performs a new HTTP GET request. func (s *TestServer) get(path string) *http.Response { resp, err := s.HttpClient.Get(s.url(path)) if err != nil { s.t.Fatalf("err: %s", err) } if err := s.requireOK(resp); err != nil { defer resp.Body.Close() s.t.Fatal(err) } return resp } // encodePayload returns a new io.Reader wrapping the encoded contents // of the payload, suitable for passing directly to a new request. func (s *TestServer) encodePayload(payload interface{}) io.Reader { var encoded bytes.Buffer enc := json.NewEncoder(&encoded) if err := enc.Encode(payload); err != nil { s.t.Fatalf("err: %s", err) } return &encoded } // JoinLAN is used to join nodes within the same datacenter. func (s *TestServer) JoinLAN(addr string) { resp := s.get("/v1/agent/join/" + addr) resp.Body.Close() } // JoinWAN is used to join remote datacenters together. func (s *TestServer) JoinWAN(addr string) { resp := s.get("/v1/agent/join/" + addr + "?wan=1") resp.Body.Close() } // SetKV sets an individual key in the K/V store. func (s *TestServer) SetKV(key string, val []byte) { resp := s.put("/v1/kv/"+key, bytes.NewBuffer(val)) resp.Body.Close() } // GetKV retrieves a single key and returns its value func (s *TestServer) GetKV(key string) []byte { resp := s.get("/v1/kv/" + key) defer resp.Body.Close() raw, err := ioutil.ReadAll(resp.Body) if err != nil { s.t.Fatalf("err: %s", err) } var result []*TestKVResponse if err := json.Unmarshal(raw, &result); err != nil { s.t.Fatalf("err: %s", err) } if len(result) < 1 { s.t.Fatalf("key does not exist: %s", key) } v, err := base64.StdEncoding.DecodeString(result[0].Value) if err != nil { s.t.Fatalf("err: %s", err) } return []byte(v) } // PopulateKV fills the Consul KV with data from a generic map. func (s *TestServer) PopulateKV(data map[string][]byte) { for k, v := range data { s.SetKV(k, v) } } // ListKV returns a list of keys present in the KV store. This will list all // keys under the given prefix recursively and return them as a slice. func (s *TestServer) ListKV(prefix string) []string { resp := s.get("/v1/kv/" + prefix + "?keys") defer resp.Body.Close() raw, err := ioutil.ReadAll(resp.Body) if err != nil { s.t.Fatalf("err: %s", err) } var result []string if err := json.Unmarshal(raw, &result); err != nil { s.t.Fatalf("err: %s", err) } return result } // AddService adds a new service to the Consul instance. It also // automatically adds a health check with the given status, which // can be one of "passing", "warning", or "critical". func (s *TestServer) AddService(name, status string, tags []string) { svc := &TestService{ Name: name, Tags: tags, } payload := s.encodePayload(svc) s.put("/v1/agent/service/register", payload) chkName := "service:" + name chk := &TestCheck{ Name: chkName, ServiceID: name, TTL: "10m", } payload = s.encodePayload(chk) s.put("/v1/agent/check/register", payload) switch status { case "passing": s.put("/v1/agent/check/pass/"+chkName, nil) case "warning": s.put("/v1/agent/check/warn/"+chkName, nil) case "critical": s.put("/v1/agent/check/fail/"+chkName, nil) default: s.t.Fatalf("Unrecognized status: %s", status) } } // AddCheck adds a check to the Consul instance. If the serviceID is // left empty (""), then the check will be associated with the node. // The check status may be "passing", "warning", or "critical". func (s *TestServer) AddCheck(name, serviceID, status string) { chk := &TestCheck{ ID: name, Name: name, TTL: "10m", } if serviceID != "" { chk.ServiceID = serviceID } payload := s.encodePayload(chk) s.put("/v1/agent/check/register", payload) switch status { case "passing": s.put("/v1/agent/check/pass/"+name, nil) case "warning": s.put("/v1/agent/check/warn/"+name, nil) case "critical": s.put("/v1/agent/check/fail/"+name, nil) default: s.t.Fatalf("Unrecognized status: %s", status) } }