mirror of https://github.com/k3s-io/k3s
599 lines
16 KiB
Go
599 lines
16 KiB
Go
/**
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* to you under the Apache License, Version 2.0 (the
|
|
* "License"); you may not use this file except in compliance
|
|
* with the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package messenger
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/tls"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/golang/glog"
|
|
"github.com/mesos/mesos-go/upid"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
const (
|
|
DefaultReadTimeout = 10 * time.Second
|
|
DefaultWriteTimeout = 10 * time.Second
|
|
)
|
|
|
|
var (
|
|
ReadTimeout = DefaultReadTimeout
|
|
WriteTimeout = DefaultWriteTimeout
|
|
|
|
discardOnStopError = fmt.Errorf("discarding message because transport is shutting down")
|
|
errNotStarted = errors.New("HTTP transport has not been started")
|
|
errTerminal = errors.New("HTTP transport is terminated")
|
|
errAlreadyRunning = errors.New("HTTP transport is already running")
|
|
|
|
httpTransport = http.Transport{
|
|
Dial: (&net.Dialer{
|
|
Timeout: 10 * time.Second,
|
|
KeepAlive: 30 * time.Second,
|
|
}).Dial,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
ResponseHeaderTimeout: DefaultReadTimeout,
|
|
}
|
|
|
|
// HttpClient is used for sending messages to remote processes
|
|
HttpClient = http.Client{
|
|
Timeout: DefaultReadTimeout,
|
|
}
|
|
)
|
|
|
|
// httpTransporter is a subset of the Transporter interface
|
|
type httpTransporter interface {
|
|
Send(ctx context.Context, msg *Message) error
|
|
Recv() (*Message, error)
|
|
Install(messageName string)
|
|
Start() (upid.UPID, <-chan error)
|
|
Stop(graceful bool) error
|
|
}
|
|
|
|
type notStartedState struct {
|
|
h *HTTPTransporter
|
|
}
|
|
|
|
type stoppedState struct{}
|
|
|
|
type runningState struct {
|
|
*notStartedState
|
|
}
|
|
|
|
/* -- not-started state */
|
|
|
|
func (s *notStartedState) Send(ctx context.Context, msg *Message) error { return errNotStarted }
|
|
func (s *notStartedState) Recv() (*Message, error) { return nil, errNotStarted }
|
|
func (s *notStartedState) Stop(graceful bool) error { return errNotStarted }
|
|
func (s *notStartedState) Install(messageName string) { s.h.install(messageName) }
|
|
func (s *notStartedState) Start() (upid.UPID, <-chan error) {
|
|
s.h.state = &runningState{s}
|
|
return s.h.start()
|
|
}
|
|
|
|
/* -- stopped state */
|
|
|
|
func (s *stoppedState) Send(ctx context.Context, msg *Message) error { return errTerminal }
|
|
func (s *stoppedState) Recv() (*Message, error) { return nil, errTerminal }
|
|
func (s *stoppedState) Stop(graceful bool) error { return errTerminal }
|
|
func (s *stoppedState) Install(messageName string) {}
|
|
func (s *stoppedState) Start() (upid.UPID, <-chan error) {
|
|
ch := make(chan error, 1)
|
|
ch <- errTerminal
|
|
return upid.UPID{}, ch
|
|
}
|
|
|
|
/* -- running state */
|
|
|
|
func (s *runningState) Send(ctx context.Context, msg *Message) error { return s.h.send(ctx, msg) }
|
|
func (s *runningState) Recv() (*Message, error) { return s.h.recv() }
|
|
func (s *runningState) Stop(graceful bool) error {
|
|
s.h.state = &stoppedState{}
|
|
return s.h.stop(graceful)
|
|
}
|
|
func (s *runningState) Start() (upid.UPID, <-chan error) {
|
|
ch := make(chan error, 1)
|
|
ch <- errAlreadyRunning
|
|
return upid.UPID{}, ch
|
|
}
|
|
|
|
// httpOpt is a functional option type
|
|
type httpOpt func(*HTTPTransporter)
|
|
|
|
// HTTPTransporter implements the interfaces of the Transporter.
|
|
type HTTPTransporter struct {
|
|
// If the host is empty("") then it will listen on localhost.
|
|
// If the port is empty("") then it will listen on random port.
|
|
upid upid.UPID
|
|
listener net.Listener // TODO(yifan): Change to TCPListener.
|
|
mux *http.ServeMux
|
|
tr *http.Transport
|
|
client *http.Client
|
|
messageQueue chan *Message
|
|
address net.IP // optional binding address
|
|
shouldQuit chan struct{}
|
|
stateLock sync.RWMutex // protect lifecycle (start/stop) funcs
|
|
state httpTransporter
|
|
server *http.Server
|
|
}
|
|
|
|
// NewHTTPTransporter creates a new http transporter with an optional binding address.
|
|
func NewHTTPTransporter(upid upid.UPID, address net.IP, opts ...httpOpt) *HTTPTransporter {
|
|
transport := httpTransport
|
|
client := HttpClient
|
|
client.Transport = &transport
|
|
mux := http.NewServeMux()
|
|
|
|
result := &HTTPTransporter{
|
|
upid: upid,
|
|
messageQueue: make(chan *Message, defaultQueueSize),
|
|
mux: mux,
|
|
client: &client,
|
|
tr: &transport,
|
|
address: address,
|
|
shouldQuit: make(chan struct{}),
|
|
server: &http.Server{
|
|
ReadTimeout: ReadTimeout,
|
|
WriteTimeout: WriteTimeout,
|
|
Handler: mux,
|
|
},
|
|
}
|
|
for _, f := range opts {
|
|
f(result)
|
|
}
|
|
result.state = ¬StartedState{result}
|
|
return result
|
|
}
|
|
|
|
func ServerTLSConfig(config *tls.Config, nextProto map[string]func(*http.Server, *tls.Conn, http.Handler)) httpOpt {
|
|
return func(transport *HTTPTransporter) {
|
|
transport.server.TLSConfig = config
|
|
transport.server.TLSNextProto = nextProto
|
|
}
|
|
}
|
|
|
|
func ClientTLSConfig(config *tls.Config, handshakeTimeout time.Duration) httpOpt {
|
|
return func(transport *HTTPTransporter) {
|
|
transport.tr.TLSClientConfig = config
|
|
transport.tr.TLSHandshakeTimeout = handshakeTimeout
|
|
}
|
|
}
|
|
|
|
func (t *HTTPTransporter) getState() httpTransporter {
|
|
t.stateLock.RLock()
|
|
defer t.stateLock.RUnlock()
|
|
return t.state
|
|
}
|
|
|
|
// Send sends the message to its specified upid.
|
|
func (t *HTTPTransporter) Send(ctx context.Context, msg *Message) (sendError error) {
|
|
return t.getState().Send(ctx, msg)
|
|
}
|
|
|
|
type mesosError struct {
|
|
errorCode int
|
|
upid string
|
|
uri string
|
|
status string
|
|
}
|
|
|
|
func (e *mesosError) Error() string {
|
|
return fmt.Sprintf("master %s rejected %s, returned status %q",
|
|
e.upid, e.uri, e.status)
|
|
}
|
|
|
|
type networkError struct {
|
|
cause error
|
|
}
|
|
|
|
func (e *networkError) Error() string {
|
|
return e.cause.Error()
|
|
}
|
|
|
|
// send delivers a message to a mesos component via HTTP, returns a mesosError if the
|
|
// communication with the remote process was successful but rejected. A networkError
|
|
// error indicates that communication with the remote process failed at the network layer.
|
|
func (t *HTTPTransporter) send(ctx context.Context, msg *Message) (sendError error) {
|
|
log.V(2).Infof("Sending message to %v via http\n", msg.UPID)
|
|
req, err := t.makeLibprocessRequest(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return t.httpDo(ctx, req, func(resp *http.Response, err error) error {
|
|
if err != nil {
|
|
log.V(1).Infof("Failed to POST: %v\n", err)
|
|
return &networkError{err}
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
// ensure master acknowledgement.
|
|
if (resp.StatusCode != http.StatusOK) && (resp.StatusCode != http.StatusAccepted) {
|
|
return &mesosError{
|
|
errorCode: resp.StatusCode,
|
|
upid: msg.UPID.String(),
|
|
uri: msg.RequestURI(),
|
|
status: resp.Status,
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (t *HTTPTransporter) httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-t.shouldQuit:
|
|
return discardOnStopError
|
|
default: // continue
|
|
}
|
|
|
|
c := make(chan error, 1)
|
|
go func() { c <- f(t.client.Do(req)) }()
|
|
select {
|
|
case <-ctx.Done():
|
|
t.tr.CancelRequest(req)
|
|
<-c // Wait for f to return.
|
|
return ctx.Err()
|
|
case err := <-c:
|
|
return err
|
|
case <-t.shouldQuit:
|
|
t.tr.CancelRequest(req)
|
|
<-c // Wait for f to return.
|
|
return discardOnStopError
|
|
}
|
|
}
|
|
|
|
// Recv returns the message, one at a time.
|
|
func (t *HTTPTransporter) Recv() (*Message, error) {
|
|
return t.getState().Recv()
|
|
}
|
|
|
|
func (t *HTTPTransporter) recv() (*Message, error) {
|
|
select {
|
|
default:
|
|
select {
|
|
case msg := <-t.messageQueue:
|
|
return msg, nil
|
|
case <-t.shouldQuit:
|
|
}
|
|
case <-t.shouldQuit:
|
|
}
|
|
return nil, discardOnStopError
|
|
}
|
|
|
|
// Install the request URI according to the message's name.
|
|
func (t *HTTPTransporter) Install(msgName string) {
|
|
t.getState().Install(msgName)
|
|
}
|
|
|
|
func (t *HTTPTransporter) install(msgName string) {
|
|
requestURI := fmt.Sprintf("/%s/%s", t.upid.ID, msgName)
|
|
t.mux.HandleFunc(requestURI, t.messageDecoder)
|
|
}
|
|
|
|
type loggedListener struct {
|
|
delegate net.Listener
|
|
done <-chan struct{}
|
|
}
|
|
|
|
func (l *loggedListener) Accept() (c net.Conn, err error) {
|
|
c, err = l.delegate.Accept()
|
|
if c != nil {
|
|
log.Infoln("accepted connection from", c.RemoteAddr())
|
|
c = logConnection(c)
|
|
} else if err != nil {
|
|
select {
|
|
case <-l.done:
|
|
default:
|
|
log.Errorln("failed to accept connection:", err.Error())
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (l *loggedListener) Close() (err error) {
|
|
err = l.delegate.Close()
|
|
if err != nil {
|
|
select {
|
|
case <-l.done:
|
|
default:
|
|
log.Errorln("error closing listener:", err.Error())
|
|
}
|
|
} else {
|
|
log.Infoln("closed listener")
|
|
}
|
|
return
|
|
}
|
|
|
|
func (l *loggedListener) Addr() net.Addr { return l.delegate.Addr() }
|
|
|
|
func logConnection(c net.Conn) net.Conn {
|
|
w := hex.Dumper(os.Stdout)
|
|
r := io.TeeReader(c, w)
|
|
return &loggedConnection{
|
|
Conn: c,
|
|
reader: r,
|
|
}
|
|
}
|
|
|
|
type loggedConnection struct {
|
|
net.Conn
|
|
reader io.Reader
|
|
}
|
|
|
|
func (c *loggedConnection) Read(b []byte) (int, error) {
|
|
return c.reader.Read(b)
|
|
}
|
|
|
|
// Listen starts listen on UPID. If UPID is empty, the transporter
|
|
// will listen on a random port, and then fill the UPID with the
|
|
// host:port it is listening.
|
|
func (t *HTTPTransporter) listen() error {
|
|
var host string
|
|
if t.address != nil {
|
|
host = t.address.String()
|
|
} else {
|
|
host = t.upid.Host
|
|
}
|
|
|
|
var port string
|
|
if t.upid.Port != "" {
|
|
port = t.upid.Port
|
|
} else {
|
|
port = "0"
|
|
}
|
|
|
|
// NOTE: Explicitly specifies IPv4 because Libprocess
|
|
// only supports IPv4 for now.
|
|
ln, err := net.Listen("tcp4", net.JoinHostPort(host, port))
|
|
if err != nil {
|
|
log.Errorf("HTTPTransporter failed to listen: %v\n", err)
|
|
return err
|
|
}
|
|
// Save the host:port in case they are not specified in upid.
|
|
host, port, _ = net.SplitHostPort(ln.Addr().String())
|
|
log.Infoln("listening on", host, "port", port)
|
|
|
|
if len(t.upid.Host) == 0 {
|
|
t.upid.Host = host
|
|
}
|
|
|
|
if len(t.upid.Port) == 0 || t.upid.Port == "0" {
|
|
t.upid.Port = port
|
|
}
|
|
|
|
if log.V(3) {
|
|
t.listener = &loggedListener{delegate: ln, done: t.shouldQuit}
|
|
} else {
|
|
t.listener = ln
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Start starts the http transporter
|
|
func (t *HTTPTransporter) Start() (upid.UPID, <-chan error) {
|
|
t.stateLock.Lock()
|
|
defer t.stateLock.Unlock()
|
|
return t.state.Start()
|
|
}
|
|
|
|
// start expects to be guarded by stateLock
|
|
func (t *HTTPTransporter) start() (upid.UPID, <-chan error) {
|
|
ch := make(chan error, 1)
|
|
if err := t.listen(); err != nil {
|
|
ch <- err
|
|
return upid.UPID{}, ch
|
|
}
|
|
|
|
// TODO(yifan): Set read/write deadline.
|
|
go func() {
|
|
err := t.server.Serve(t.listener)
|
|
select {
|
|
case <-t.shouldQuit:
|
|
log.V(1).Infof("HTTP server stopped because of shutdown")
|
|
ch <- nil
|
|
default:
|
|
if err != nil && log.V(1) {
|
|
log.Errorln("HTTP server stopped with error", err.Error())
|
|
} else {
|
|
log.V(1).Infof("HTTP server stopped")
|
|
}
|
|
ch <- err
|
|
t.Stop(false)
|
|
}
|
|
}()
|
|
return t.upid, ch
|
|
}
|
|
|
|
// Stop stops the http transporter by closing the listener.
|
|
func (t *HTTPTransporter) Stop(graceful bool) error {
|
|
t.stateLock.Lock()
|
|
defer t.stateLock.Unlock()
|
|
return t.state.Stop(graceful)
|
|
}
|
|
|
|
// stop expects to be guarded by stateLock
|
|
func (t *HTTPTransporter) stop(graceful bool) error {
|
|
close(t.shouldQuit)
|
|
|
|
log.Info("stopping HTTP transport")
|
|
|
|
//TODO(jdef) if graceful, wait for pending requests to terminate
|
|
|
|
err := t.listener.Close()
|
|
return err
|
|
}
|
|
|
|
// UPID returns the upid of the transporter.
|
|
func (t *HTTPTransporter) UPID() upid.UPID {
|
|
t.stateLock.Lock()
|
|
defer t.stateLock.Unlock()
|
|
return t.upid
|
|
}
|
|
|
|
func (t *HTTPTransporter) messageDecoder(w http.ResponseWriter, r *http.Request) {
|
|
// Verify it's a libprocess request.
|
|
from, err := getLibprocessFrom(r)
|
|
if err != nil {
|
|
log.Errorf("Ignoring the request, because it's not a libprocess request: %v\n", err)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
decoder := DecodeHTTP(w, r)
|
|
defer decoder.Cancel(true)
|
|
|
|
t.processRequests(from, decoder.Requests())
|
|
|
|
// log an error if there's one waiting, otherwise move on
|
|
select {
|
|
case err, ok := <-decoder.Err():
|
|
if ok {
|
|
log.Errorf("failed to decode HTTP message: %v", err)
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (t *HTTPTransporter) processRequests(from *upid.UPID, incoming <-chan *Request) {
|
|
for {
|
|
select {
|
|
case r, ok := <-incoming:
|
|
if !ok || !t.processOneRequest(from, r) {
|
|
return
|
|
}
|
|
case <-t.shouldQuit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *HTTPTransporter) processOneRequest(from *upid.UPID, request *Request) (keepGoing bool) {
|
|
// regardless of whether we write a Response we must close this chan
|
|
defer close(request.response)
|
|
keepGoing = true
|
|
|
|
//TODO(jdef) this is probably inefficient given the current implementation of the
|
|
// decoder: no need to make another copy of data that's already competely buffered
|
|
data, err := ioutil.ReadAll(request.Body)
|
|
if err != nil {
|
|
// this is unlikely given the current implementation of the decoder:
|
|
// the body has been completely buffered in memory already
|
|
log.Errorf("failed to read HTTP body: %v", err)
|
|
return
|
|
}
|
|
log.V(2).Infof("Receiving %q %v from %v, length %v", request.Method, request.URL, from, len(data))
|
|
m := &Message{
|
|
UPID: from,
|
|
Name: extractNameFromRequestURI(request.RequestURI),
|
|
Bytes: data,
|
|
}
|
|
|
|
// deterministic behavior and output..
|
|
select {
|
|
case <-t.shouldQuit:
|
|
keepGoing = false
|
|
select {
|
|
case t.messageQueue <- m:
|
|
default:
|
|
}
|
|
case t.messageQueue <- m:
|
|
select {
|
|
case <-t.shouldQuit:
|
|
keepGoing = false
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Only send back an HTTP response if this isn't from libprocess
|
|
// (which we determine by looking at the User-Agent). This is
|
|
// necessary because older versions of libprocess would try and
|
|
// recv the data and parse it as an HTTP request which would
|
|
// fail thus causing the socket to get closed (but now
|
|
// libprocess will ignore responses, see ignore_data).
|
|
// see https://github.com/apache/mesos/blob/adecbfa6a216815bd7dc7d26e721c4c87e465c30/3rdparty/libprocess/src/process.cpp#L2192
|
|
if _, ok := parseLibprocessAgent(request.Header); !ok {
|
|
log.V(2).Infof("not libprocess agent, sending a 202")
|
|
request.response <- Response{
|
|
code: 202,
|
|
reason: "Accepted",
|
|
} // should never block
|
|
}
|
|
return
|
|
}
|
|
|
|
func (t *HTTPTransporter) makeLibprocessRequest(msg *Message) (*http.Request, error) {
|
|
if msg.UPID == nil {
|
|
panic(fmt.Sprintf("message is missing UPID: %+v", msg))
|
|
}
|
|
hostport := net.JoinHostPort(msg.UPID.Host, msg.UPID.Port)
|
|
targetURL := fmt.Sprintf("http://%s%s", hostport, msg.RequestURI())
|
|
log.V(2).Infof("libproc target URL %s", targetURL)
|
|
req, err := http.NewRequest("POST", targetURL, bytes.NewReader(msg.Bytes))
|
|
if err != nil {
|
|
log.V(1).Infof("Failed to create request: %v\n", err)
|
|
return nil, err
|
|
}
|
|
if !msg.isV1API() {
|
|
req.Header.Add("Libprocess-From", t.upid.String())
|
|
req.Header.Add("Connection", "Keep-Alive")
|
|
}
|
|
req.Header.Add("Content-Type", "application/x-protobuf")
|
|
|
|
return req, nil
|
|
}
|
|
|
|
func getLibprocessFrom(r *http.Request) (*upid.UPID, error) {
|
|
if r.Method != "POST" {
|
|
return nil, fmt.Errorf("Not a POST request")
|
|
}
|
|
if agent, ok := parseLibprocessAgent(r.Header); ok {
|
|
return upid.Parse(agent)
|
|
}
|
|
lf, ok := r.Header["Libprocess-From"]
|
|
if ok {
|
|
// TODO(yifan): Just take the first field for now.
|
|
return upid.Parse(lf[0])
|
|
}
|
|
return nil, fmt.Errorf("Cannot find 'User-Agent' or 'Libprocess-From'")
|
|
}
|
|
|
|
func parseLibprocessAgent(h http.Header) (string, bool) {
|
|
const prefix = "libprocess/"
|
|
if ua, ok := h["User-Agent"]; ok {
|
|
for _, agent := range ua {
|
|
if strings.HasPrefix(agent, prefix) {
|
|
return agent[len(prefix):], true
|
|
}
|
|
}
|
|
}
|
|
return "", false
|
|
}
|