consul/test/integration/consul-container/libs/cluster/cluster.go

384 lines
9.8 KiB
Go

package cluster
import (
"context"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"testing"
"time"
"github.com/hashicorp/serf/serf"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/teris-io/shortid"
"github.com/testcontainers/testcontainers-go"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
libagent "github.com/hashicorp/consul/test/integration/consul-container/libs/agent"
)
// Cluster provides an interface for creating and controlling a Consul cluster
// in integration tests, with agents running in containers.
// These fields are public in the event someone might want to surgically
// craft a test case.
type Cluster struct {
Agents []libagent.Agent
BuildContext *libagent.BuildContext
CACert string
CAKey string
ID string
Index int
Network testcontainers.Network
NetworkName string
}
// New creates a Consul cluster. An agent will be started for each of the given
// configs and joined to the cluster.
//
// A cluster has its own docker network for DNS connectivity, but is also joined
func New(configs []libagent.Config) (*Cluster, error) {
id, err := shortid.Generate()
if err != nil {
return nil, errors.Wrap(err, "could not cluster id")
}
name := fmt.Sprintf("consul-int-cluster-%s", id)
network, err := createNetwork(name)
if err != nil {
return nil, errors.Wrap(err, "could not create cluster container network")
}
cluster := Cluster{
ID: id,
Network: network,
NetworkName: name,
}
if err := cluster.Add(configs); err != nil {
return nil, errors.Wrap(err, "could not start or join all agents")
}
return &cluster, nil
}
// Add starts an agent with the given configuration and joins it with the existing cluster
func (c *Cluster) Add(configs []libagent.Config) error {
agents := make([]libagent.Agent, len(configs))
for idx, conf := range configs {
n, err := libagent.NewConsulContainer(context.Background(), conf, c.NetworkName, c.Index)
if err != nil {
return errors.Wrapf(err, "could not add container index %d", idx)
}
agents[idx] = n
c.Index++
}
if err := c.Join(agents); err != nil {
return errors.Wrapf(err, "could not join agent")
}
return nil
}
// Join joins the given agent to the cluster.
func (c *Cluster) Join(agents []libagent.Agent) error {
var joinAddr string
if len(c.Agents) >= 1 {
joinAddr, _ = c.Agents[0].GetAddr()
} else if len(agents) >= 1 {
joinAddr, _ = agents[0].GetAddr()
}
for _, n := range agents {
err := n.GetClient().Agent().Join(joinAddr, false)
if err != nil {
return errors.Wrapf(err, "could not join agent %s to %s", n.GetName(), joinAddr)
}
c.Agents = append(c.Agents, n)
}
return nil
}
// Remove instructs the agent to leave the cluster then removes it
// from the cluster Agent list.
func (c *Cluster) Remove(n libagent.Agent) error {
err := n.GetClient().Agent().Leave()
if err != nil {
return errors.Wrapf(err, "could not remove agent %s", n.GetName())
}
foundIdx := -1
for idx, this := range c.Agents {
if this == n {
foundIdx = idx
break
}
}
if foundIdx == -1 {
return errors.New("could not find agent in cluster")
}
c.Agents = append(c.Agents[:foundIdx], c.Agents[foundIdx+1:]...)
return nil
}
// StandardUpgrade upgrades a running consul cluster following the steps from
//
// https://developer.hashicorp.com/consul/docs/upgrading#standard-upgrades
//
// - takes a snapshot
// - terminate and rejoin the pod of new version of consul
func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersion string) error {
retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: 1 * time.Second}, t, func(r *retry.R) {
// NOTE: to suppress flakiness
execCode, err := c.Agents[0].Exec(context.Background(), []string{"consul", "snapshot", "save", "backup.snap"})
require.Equal(r, 0, execCode)
require.NoError(r, err)
})
// verify only the leader can take a snapshot
snapshotCount := 0
for _, agent := range c.Agents {
if !agent.IsServer() {
continue
}
files, err := ioutil.ReadDir(filepath.Join(agent.DataDir(), "raft", "snapshots"))
if err != nil {
return err
}
if len(files) >= 1 {
snapshotCount++
}
}
require.Equalf(t, 1, snapshotCount, "only leader agent can have a snapshot file")
// Upgrade individual agent to the target version in the following order
// 1. followers
// 2. leader
// 3. clients (TODO)
leader, err := c.Leader()
client := leader.GetClient()
require.NoError(t, err)
t.Log("Leader name:", leader.GetName())
followers, err := c.Followers()
require.NoError(t, err)
t.Log("The number of followers", len(followers))
for _, agent := range followers {
t.Log("Upgrade follower", agent.GetName())
config := agent.GetConfig()
config.Version = targetVersion
err = agent.Upgrade(context.Background(), config)
if err != nil {
return err
}
WaitForMembers(t, client, len(c.Agents))
break
}
if len(followers) > 0 {
client = followers[0].GetClient()
}
t.Log("Upgrade leader:", leader.GetName())
config := leader.GetConfig()
config.Version = targetVersion
err = leader.Upgrade(context.Background(), config)
if err != nil {
return err
}
WaitForMembers(t, client, len(c.Agents))
return nil
}
// Terminate will attempt to terminate all agents in the cluster and its network. If any agent
// termination fails, Terminate will abort and return an error.
func (c *Cluster) Terminate() error {
for _, n := range c.Agents {
err := n.Terminate()
if err != nil {
return err
}
}
// Testcontainers seems to clean this the network.
// Trigger it now will throw an error while the containers are still shutting down
//if err := c.Network.Remove(context.Background()); err != nil {
// return errors.Wrapf(err, "could not terminate cluster network %s", c.ID)
//}
return nil
}
// Leader returns the cluster leader agent, or an error if no leader is
// available.
func (c *Cluster) Leader() (libagent.Agent, error) {
if len(c.Agents) < 1 {
return nil, fmt.Errorf("no agent available")
}
n0 := c.Agents[0]
leaderAdd, err := getLeader(n0.GetClient())
if err != nil {
return nil, err
}
for _, n := range c.Agents {
addr, _ := n.GetAddr()
if strings.Contains(leaderAdd, addr) {
return n, nil
}
}
return nil, fmt.Errorf("leader not found")
}
// GetClient returns a consul API client to the node if node is provided.
// Otherwise, GetClient returns the API client to the first node of either
// server or client agent.
func (c *Cluster) GetClient(node libagent.Agent, isServer bool) (*api.Client, error) {
var err error
if node != nil {
return node.GetClient(), err
}
nodes, err := c.Clients()
if isServer {
nodes, err = c.Servers()
}
if err != nil {
return nil, fmt.Errorf("unable to get the api client: %s", err)
}
if len(nodes) <= 0 {
return nil, fmt.Errorf("not enough node: %d", len(nodes))
}
return nodes[0].GetClient(), err
}
func getLeader(client *api.Client) (string, error) {
leaderAdd, err := client.Status().Leader()
if err != nil {
return "", errors.Wrap(err, "could not query leader")
}
if leaderAdd == "" {
return "", errors.New("no leader available")
}
return leaderAdd, nil
}
// Followers returns the cluster following servers.
func (c *Cluster) Followers() ([]libagent.Agent, error) {
var followers []libagent.Agent
leader, err := c.Leader()
if err != nil {
return nil, fmt.Errorf("could not determine leader: %w", err)
}
for _, n := range c.Agents {
if n != leader && n.IsServer() {
followers = append(followers, n)
}
}
return followers, nil
}
// Servers returns the handle to server agents
func (c *Cluster) Servers() ([]libagent.Agent, error) {
var servers []libagent.Agent
for _, n := range c.Agents {
if n.IsServer() {
servers = append(servers, n)
}
}
return servers, nil
}
// Clients returns the handle to client agents
func (c *Cluster) Clients() ([]libagent.Agent, error) {
var clients []libagent.Agent
for _, n := range c.Agents {
if !n.IsServer() {
clients = append(clients, n)
}
}
return clients, nil
}
// PeerWithCluster establishes peering with the acceptor cluster
func (c *Cluster) PeerWithCluster(acceptingClient *api.Client, acceptingPeerName string, dialingPeerName string) error {
node := c.Agents[0]
dialingClient := node.GetClient()
generateReq := api.PeeringGenerateTokenRequest{
PeerName: acceptingPeerName,
}
generateRes, _, err := acceptingClient.Peerings().GenerateToken(context.Background(), generateReq, &api.WriteOptions{})
if err != nil {
return fmt.Errorf("error generate token: %v", err)
}
establishReq := api.PeeringEstablishRequest{
PeerName: dialingPeerName,
PeeringToken: generateRes.PeeringToken,
}
_, _, err = dialingClient.Peerings().Establish(context.Background(), establishReq, &api.WriteOptions{})
if err != nil {
return fmt.Errorf("error establish peering: %v", err)
}
return nil
}
const retryTimeout = 90 * time.Second
const retryFrequency = 500 * time.Millisecond
func LongFailer() *retry.Timer {
return &retry.Timer{Timeout: retryTimeout, Wait: retryFrequency}
}
func WaitForLeader(t *testing.T, cluster *Cluster, client *api.Client) {
retry.RunWith(LongFailer(), t, func(r *retry.R) {
leader, err := cluster.Leader()
require.NoError(r, err)
require.NotEmpty(r, leader)
})
if client != nil {
waitForLeaderFromClient(t, client)
}
}
func waitForLeaderFromClient(t *testing.T, client *api.Client) {
retry.RunWith(LongFailer(), t, func(r *retry.R) {
leader, err := getLeader(client)
require.NoError(r, err)
require.NotEmpty(r, leader)
})
}
func WaitForMembers(t *testing.T, client *api.Client, expectN int) {
retry.RunWith(LongFailer(), t, func(r *retry.R) {
members, err := client.Agent().Members(false)
var activeMembers int
for _, member := range members {
if serf.MemberStatus(member.Status) == serf.StatusAlive {
activeMembers++
}
}
require.NoError(r, err)
require.Equal(r, expectN, activeMembers)
})
}