k3s/pkg/etcd/etcdproxy.go

144 lines
3.6 KiB
Go
Raw Normal View History

package etcd
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"time"
"github.com/k3s-io/k3s/pkg/agent/loadbalancer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
)
type Proxy interface {
Update(addresses []string)
ETCDURL() string
ETCDAddresses() []string
ETCDServerURL() string
}
var httpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
// NewETCDProxy initializes a new proxy structure that contain a load balancer
// which listens on port 2379 and proxy between etcd cluster members
func NewETCDProxy(ctx context.Context, supervisorPort int, dataDir, etcdURL string, isIPv6 bool) (Proxy, error) {
u, err := url.Parse(etcdURL)
if err != nil {
return nil, errors.Wrap(err, "failed to parse etcd client URL")
}
e := &etcdproxy{
dataDir: dataDir,
initialETCDURL: etcdURL,
etcdURL: etcdURL,
supervisorPort: supervisorPort,
disconnect: map[string]context.CancelFunc{},
}
lb, err := loadbalancer.New(ctx, dataDir, loadbalancer.ETCDServerServiceName, etcdURL, 2379, isIPv6)
if err != nil {
return nil, err
}
e.etcdLB = lb
e.etcdLBURL = lb.LoadBalancerServerURL()
e.fallbackETCDAddress = u.Host
e.etcdPort = u.Port()
return e, nil
}
type etcdproxy struct {
dataDir string
etcdLBURL string
supervisorPort int
initialETCDURL string
etcdURL string
etcdPort string
fallbackETCDAddress string
etcdAddresses []string
etcdLB *loadbalancer.LoadBalancer
disconnect map[string]context.CancelFunc
}
func (e *etcdproxy) Update(addresses []string) {
e.etcdLB.Update(addresses)
validEndpoint := map[string]bool{}
for _, address := range e.etcdLB.ServerAddresses {
validEndpoint[address] = true
if _, ok := e.disconnect[address]; !ok {
ctx, cancel := context.WithCancel(context.Background())
e.disconnect[address] = cancel
e.etcdLB.SetHealthCheck(address, e.createHealthCheck(ctx, address))
}
}
for address, cancel := range e.disconnect {
if !validEndpoint[address] {
cancel()
delete(e.disconnect, address)
}
}
}
func (e *etcdproxy) ETCDURL() string {
return e.etcdURL
}
func (e *etcdproxy) ETCDAddresses() []string {
if len(e.etcdAddresses) > 0 {
return e.etcdAddresses
}
return []string{e.fallbackETCDAddress}
}
func (e *etcdproxy) ETCDServerURL() string {
return e.etcdURL
}
// start a polling routine that makes periodic requests to the etcd node's supervisor port.
// If the request fails, the node is marked unhealthy.
func (e etcdproxy) createHealthCheck(ctx context.Context, address string) func() bool {
// Assume that the connection to the server will succeed, to avoid failing health checks while attempting to connect.
// If we cannot connect, connected will be set to false when the initial connection attempt fails.
connected := true
host, _, _ := net.SplitHostPort(address)
url := fmt.Sprintf("https://%s/ping", net.JoinHostPort(host, strconv.Itoa(e.supervisorPort)))
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
resp, err := httpClient.Do(req)
var statusCode int
if resp != nil {
statusCode = resp.StatusCode
}
if err != nil || statusCode != http.StatusOK {
logrus.Debugf("Health check %s failed: %v (StatusCode: %d)", address, err, statusCode)
connected = false
} else {
connected = true
}
}, 5*time.Second, 1.0, true)
return func() bool {
return connected
}
}