mirror of https://github.com/k3s-io/k3s
377 lines
12 KiB
Go
377 lines
12 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
goruntime "runtime"
|
|
"strings"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apiserver/pkg/admission"
|
|
"k8s.io/apiserver/pkg/authorization/authorizer"
|
|
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
|
"k8s.io/apiserver/pkg/endpoints/metrics"
|
|
"k8s.io/apiserver/pkg/endpoints/request"
|
|
"k8s.io/apiserver/pkg/registry/rest"
|
|
"k8s.io/klog"
|
|
utiltrace "k8s.io/utils/trace"
|
|
)
|
|
|
|
// RequestScope encapsulates common fields across all RESTful handler methods.
|
|
type RequestScope struct {
|
|
Namer ScopeNamer
|
|
|
|
Serializer runtime.NegotiatedSerializer
|
|
runtime.ParameterCodec
|
|
|
|
Creater runtime.ObjectCreater
|
|
Convertor runtime.ObjectConvertor
|
|
Defaulter runtime.ObjectDefaulter
|
|
Typer runtime.ObjectTyper
|
|
UnsafeConvertor runtime.ObjectConvertor
|
|
Authorizer authorizer.Authorizer
|
|
Trace *utiltrace.Trace
|
|
|
|
TableConvertor rest.TableConvertor
|
|
|
|
Resource schema.GroupVersionResource
|
|
Kind schema.GroupVersionKind
|
|
Subresource string
|
|
|
|
MetaGroupVersion schema.GroupVersion
|
|
|
|
// HubGroupVersion indicates what version objects read from etcd or incoming requests should be converted to for in-memory handling.
|
|
HubGroupVersion schema.GroupVersion
|
|
|
|
MaxRequestBodyBytes int64
|
|
}
|
|
|
|
func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Request) {
|
|
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
|
|
}
|
|
|
|
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind) bool {
|
|
// TODO: this is temporary, replace with an abstraction calculated at endpoint installation time
|
|
if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion {
|
|
switch gvk.Kind {
|
|
case "Table":
|
|
return scope.TableConvertor != nil
|
|
case "PartialObjectMetadata", "PartialObjectMetadataList":
|
|
// TODO: should delineate between lists and non-list endpoints
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (scope *RequestScope) AllowsServerVersion(version string) bool {
|
|
return version == scope.MetaGroupVersion.Version
|
|
}
|
|
|
|
func (scope *RequestScope) AllowsStreamSchema(s string) bool {
|
|
return s == "watch"
|
|
}
|
|
|
|
var _ admission.ObjectInterfaces = &RequestScope{}
|
|
|
|
func (r *RequestScope) GetObjectCreater() runtime.ObjectCreater { return r.Creater }
|
|
func (r *RequestScope) GetObjectTyper() runtime.ObjectTyper { return r.Typer }
|
|
func (r *RequestScope) GetObjectDefaulter() runtime.ObjectDefaulter { return r.Defaulter }
|
|
func (r *RequestScope) GetObjectConvertor() runtime.ObjectConvertor { return r.Convertor }
|
|
|
|
// ConnectResource returns a function that handles a connect request on a rest.Storage object.
|
|
func ConnectResource(connecter rest.Connecter, scope RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, req *http.Request) {
|
|
if isDryRun(req.URL) {
|
|
scope.err(errors.NewBadRequest("dryRun is not supported"), w, req)
|
|
return
|
|
}
|
|
|
|
namespace, name, err := scope.Namer.Name(req)
|
|
if err != nil {
|
|
scope.err(err, w, req)
|
|
return
|
|
}
|
|
ctx := req.Context()
|
|
ctx = request.WithNamespace(ctx, namespace)
|
|
ae := request.AuditEventFrom(ctx)
|
|
admit = admission.WithAudit(admit, ae)
|
|
|
|
opts, subpath, subpathKey := connecter.NewConnectOptions()
|
|
if err := getRequestOptions(req, scope, opts, subpath, subpathKey, isSubresource); err != nil {
|
|
err = errors.NewBadRequest(err.Error())
|
|
scope.err(err, w, req)
|
|
return
|
|
}
|
|
if admit != nil && admit.Handles(admission.Connect) {
|
|
userInfo, _ := request.UserFrom(ctx)
|
|
// TODO: remove the mutating admission here as soon as we have ported all plugin that handle CONNECT
|
|
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok {
|
|
err = mutatingAdmission.Admit(admission.NewAttributesRecord(opts, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, false, userInfo), &scope)
|
|
if err != nil {
|
|
scope.err(err, w, req)
|
|
return
|
|
}
|
|
}
|
|
if validatingAdmission, ok := admit.(admission.ValidationInterface); ok {
|
|
err = validatingAdmission.Validate(admission.NewAttributesRecord(opts, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Connect, false, userInfo), &scope)
|
|
if err != nil {
|
|
scope.err(err, w, req)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
requestInfo, _ := request.RequestInfoFrom(ctx)
|
|
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
|
|
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
|
|
if err != nil {
|
|
scope.err(err, w, req)
|
|
return
|
|
}
|
|
handler.ServeHTTP(w, req)
|
|
})
|
|
}
|
|
}
|
|
|
|
// responder implements rest.Responder for assisting a connector in writing objects or errors.
|
|
type responder struct {
|
|
scope RequestScope
|
|
req *http.Request
|
|
w http.ResponseWriter
|
|
}
|
|
|
|
func (r *responder) Object(statusCode int, obj runtime.Object) {
|
|
responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
|
|
}
|
|
|
|
func (r *responder) Error(err error) {
|
|
r.scope.err(err, r.w, r.req)
|
|
}
|
|
|
|
// resultFunc is a function that returns a rest result and can be run in a goroutine
|
|
type resultFunc func() (runtime.Object, error)
|
|
|
|
// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response.
|
|
// An api.Status object with status != success is considered an "error", which interrupts the normal response flow.
|
|
func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) {
|
|
// these channels need to be buffered to prevent the goroutine below from hanging indefinitely
|
|
// when the select statement reads something other than the one the goroutine sends on.
|
|
ch := make(chan runtime.Object, 1)
|
|
errCh := make(chan error, 1)
|
|
panicCh := make(chan interface{}, 1)
|
|
go func() {
|
|
// panics don't cross goroutine boundaries, so we have to handle ourselves
|
|
defer func() {
|
|
panicReason := recover()
|
|
if panicReason != nil {
|
|
const size = 64 << 10
|
|
buf := make([]byte, size)
|
|
buf = buf[:goruntime.Stack(buf, false)]
|
|
panicReason = strings.TrimSuffix(fmt.Sprintf("%v\n%s", panicReason, string(buf)), "\n")
|
|
// Propagate to parent goroutine
|
|
panicCh <- panicReason
|
|
}
|
|
}()
|
|
|
|
if result, err := fn(); err != nil {
|
|
errCh <- err
|
|
} else {
|
|
ch <- result
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case result = <-ch:
|
|
if status, ok := result.(*metav1.Status); ok {
|
|
if status.Status != metav1.StatusSuccess {
|
|
return nil, errors.FromObject(status)
|
|
}
|
|
}
|
|
return result, nil
|
|
case err = <-errCh:
|
|
return nil, err
|
|
case p := <-panicCh:
|
|
panic(p)
|
|
case <-time.After(timeout):
|
|
return nil, errors.NewTimeoutError(fmt.Sprintf("request did not complete within requested timeout %s", timeout), 0)
|
|
}
|
|
}
|
|
|
|
// transformDecodeError adds additional information when a decode fails.
|
|
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *schema.GroupVersionKind, body []byte) error {
|
|
objGVKs, _, err := typer.ObjectKinds(into)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
objGVK := objGVKs[0]
|
|
if gvk != nil && len(gvk.Kind) > 0 {
|
|
return errors.NewBadRequest(fmt.Sprintf("%s in version %q cannot be handled as a %s: %v", gvk.Kind, gvk.Version, objGVK.Kind, baseErr))
|
|
}
|
|
summary := summarizeData(body, 30)
|
|
return errors.NewBadRequest(fmt.Sprintf("the object provided is unrecognized (must be of type %s): %v (%s)", objGVK.Kind, baseErr, summary))
|
|
}
|
|
|
|
// setSelfLink sets the self link of an object (or the child items in a list) to the base URL of the request
|
|
// plus the path and query generated by the provided linkFunc
|
|
func setSelfLink(obj runtime.Object, requestInfo *request.RequestInfo, namer ScopeNamer) error {
|
|
// TODO: SelfLink generation should return a full URL?
|
|
uri, err := namer.GenerateLink(requestInfo, obj)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return namer.SetSelfLink(obj, uri)
|
|
}
|
|
|
|
func hasUID(obj runtime.Object) (bool, error) {
|
|
if obj == nil {
|
|
return false, nil
|
|
}
|
|
accessor, err := meta.Accessor(obj)
|
|
if err != nil {
|
|
return false, errors.NewInternalError(err)
|
|
}
|
|
if len(accessor.GetUID()) == 0 {
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// checkName checks the provided name against the request
|
|
func checkName(obj runtime.Object, name, namespace string, namer ScopeNamer) error {
|
|
objNamespace, objName, err := namer.ObjectName(obj)
|
|
if err != nil {
|
|
return errors.NewBadRequest(fmt.Sprintf(
|
|
"the name of the object (%s based on URL) was undeterminable: %v", name, err))
|
|
}
|
|
if objName != name {
|
|
return errors.NewBadRequest(fmt.Sprintf(
|
|
"the name of the object (%s) does not match the name on the URL (%s)", objName, name))
|
|
}
|
|
if len(namespace) > 0 {
|
|
if len(objNamespace) > 0 && objNamespace != namespace {
|
|
return errors.NewBadRequest(fmt.Sprintf(
|
|
"the namespace of the object (%s) does not match the namespace on the request (%s)", objNamespace, namespace))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// setObjectSelfLink sets the self link of an object as needed.
|
|
func setObjectSelfLink(ctx context.Context, obj runtime.Object, req *http.Request, namer ScopeNamer) error {
|
|
if !meta.IsListType(obj) {
|
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
|
if !ok {
|
|
return fmt.Errorf("missing requestInfo")
|
|
}
|
|
return setSelfLink(obj, requestInfo, namer)
|
|
}
|
|
|
|
uri, err := namer.GenerateListLink(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := namer.SetSelfLink(obj, uri); err != nil {
|
|
klog.V(4).Infof("Unable to set self link on object: %v", err)
|
|
}
|
|
requestInfo, ok := request.RequestInfoFrom(ctx)
|
|
if !ok {
|
|
return fmt.Errorf("missing requestInfo")
|
|
}
|
|
|
|
count := 0
|
|
err = meta.EachListItem(obj, func(obj runtime.Object) error {
|
|
count++
|
|
return setSelfLink(obj, requestInfo, namer)
|
|
})
|
|
|
|
if count == 0 {
|
|
if err := meta.SetList(obj, []runtime.Object{}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func summarizeData(data []byte, maxLength int) string {
|
|
switch {
|
|
case len(data) == 0:
|
|
return "<empty>"
|
|
case data[0] == '{':
|
|
if len(data) > maxLength {
|
|
return string(data[:maxLength]) + " ..."
|
|
}
|
|
return string(data)
|
|
default:
|
|
if len(data) > maxLength {
|
|
return hex.EncodeToString(data[:maxLength]) + " ..."
|
|
}
|
|
return hex.EncodeToString(data)
|
|
}
|
|
}
|
|
|
|
func limitedReadBody(req *http.Request, limit int64) ([]byte, error) {
|
|
defer req.Body.Close()
|
|
if limit <= 0 {
|
|
return ioutil.ReadAll(req.Body)
|
|
}
|
|
lr := &io.LimitedReader{
|
|
R: req.Body,
|
|
N: limit + 1,
|
|
}
|
|
data, err := ioutil.ReadAll(lr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if lr.N <= 0 {
|
|
return nil, errors.NewRequestEntityTooLargeError(fmt.Sprintf("limit is %d", limit))
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
func parseTimeout(str string) time.Duration {
|
|
if str != "" {
|
|
timeout, err := time.ParseDuration(str)
|
|
if err == nil {
|
|
return timeout
|
|
}
|
|
klog.Errorf("Failed to parse %q: %v", str, err)
|
|
}
|
|
return 30 * time.Second
|
|
}
|
|
|
|
func isDryRun(url *url.URL) bool {
|
|
return len(url.Query()["dryRun"]) != 0
|
|
}
|