Update vendor

pull/13/head
Darren Shepherd 2019-01-22 13:53:35 -07:00
parent fa08d6076c
commit 2e250c308b
60 changed files with 2099 additions and 476 deletions

View File

@ -109,7 +109,7 @@ import:
- package: github.com/pkg/errors
version: v0.8.0
- package: github.com/rancher/norman
version: 29915f8336c0a242560a9fef1d11bbaf04660915
version: 628eb6b32906125ddd77e23f3ad3c9bdb6e93664
repo: https://github.com/ibuildthecloud/norman.git
- package: github.com/seccomp/libseccomp-golang
version: 32f571b70023028bd57d9288c20efbcb237f3ce0
@ -150,145 +150,145 @@ import:
- package: gopkg.in/yaml.v2
version: v2.2.1
- package: k8s.io/kubernetes
version: v1.13.1-k3s2
version: v1.13.2-k3s2
repo: https://github.com/ibuildthecloud/k3s.git
transitive: true
staging: true
- package: github.com/Azure/go-ansiterm
version: d6e3b3328b783f23731bc4d058875b0371ff8109
- package: github.com/imdario/mergo
version: v0.3.5
- package: github.com/mxk/go-flowrate
version: cca7078d478f8520f85629ad7c68962d31ed7682
- package: github.com/prometheus/client_model
version: model-0.0.2-12-gfa8ad6fec33561
- package: github.com/prometheus/procfs
version: 65c1f6f8f0fc1e2185eb9863a3bc751496404259
- package: github.com/russross/blackfriday
version: v1.4-2-g300106c228d52c
- package: k8s.io/gengo
version: 51747d6e00da1fc578d5a333a93bb2abcbce7a95
- package: github.com/mistifyio/go-zfs
version: v2.1.1-5-g1b4ae6fb4e77b0
- package: github.com/vishvananda/netns
version: be1fbeda19366dea804f00efff2dd73a1642fdcc
- package: k8s.io/heapster
version: v1.2.0-beta.1
- package: golang.org/x/tools
version: 2382e3994d48b1d22acc2c86bcad0a2aff028e32
- package: github.com/coreos/etcd
version: v3.3.10
- package: github.com/docker/docker
version: docs-v1.12.0-rc4-2016-07-15-9510-ga9fbbdc8dd8794
- package: github.com/docker/go-connections
version: v0.3.0
- package: github.com/spf13/pflag
version: v1.0.1
- package: github.com/vishvananda/netlink
version: b2de5d10e38ecce8607e6b438b6d174f389a004e
- package: github.com/evanphx/json-patch
version: v4.0.0-3-g36442dbdb58521
- package: github.com/renstrom/dedent
version: v1.0.0-3-g020d11c3b9c0c7
- package: github.com/google/certificate-transparency-go
version: v1.0.21
- package: github.com/daviddengcn/go-colortext
version: 511bcaf42ccd42c38aba7427b6673277bf19e2a1
- package: github.com/shurcooL/sanitized_anchor_name
version: 10ef21a441db47d8b13ebcc5fd2310f636973c77
- package: github.com/mattn/go-shellwords
version: v1.0.3-20-gf8471b0a71ded0
- package: github.com/coreos/pkg
version: v4
- package: github.com/gregjones/httpcache
version: 787624de3eb7bd915c329cba748687a3b22666a6
- package: github.com/JeffAshton/win_pdh
version: 76bb4ee9f0ab50f77826f2a2ee7fb9d3880d6ec2
- package: github.com/googleapis/gnostic
version: 0c5108395e2debce0d731cf0287ddf7242066aba
- package: github.com/euank/go-kmsg-parser
version: v2.0.0
- package: github.com/prometheus/common
version: 13ba4ddd0caa9c28ca7b7bffe1dfa9ed8d5ef207
- package: github.com/pborman/uuid
version: ca53cad383cad2479bbba7f7a1a05797ec1386e4
- package: github.com/ibuildthecloud/kvsql
version: 8dfe3deb0646c4817567e4a53ed1dea41ea5668f
- package: github.com/coreos/go-semver
version: v0.2.0-9-ge214231b295a8e
- package: github.com/sigma/go-inotify
version: c87b6cf5033d2c6486046f045eeebdc3d910fd38
- package: vbom.ml/util
version: db5cfe13f5cc80a4990d98e2e1b0707a4d1a5394
- package: github.com/inconshreveable/mousetrap
version: v1.0
- package: github.com/prometheus/client_golang
version: v0.8.0-83-ge7e903064f5e9e
- package: github.com/mitchellh/go-wordwrap
version: ad45545899c7b13c020ea92b2072220eefad42b8
- package: github.com/robfig/cron
version: v1-53-gdf38d32658d878
- package: github.com/chai2010/gettext-go
version: c6fed771bfd517099caf0f7a961671fa8ed08723
- package: github.com/cyphar/filepath-securejoin
version: v0.2.1-1-gae69057f2299fb
- package: gopkg.in/natefinch/lumberjack.v2
version: v1.0-16-g20b71e5b60d756
- package: k8s.io/klog
version: 8139d8cb77af419532b33dfa7dd09fbc5f1d344f
- package: github.com/fatih/camelcase
version: f6a740d52f961c60348ebb109adde9f4635d7540
- package: sigs.k8s.io/yaml
version: v1.1.0
- package: github.com/cloudflare/cfssl
version: 1.3.2-21-g56268a613adfed
- package: github.com/google/cadvisor
version: 91dab6eb91496ed68acbef68b02b34b3392ca754
repo: https://github.com/ibuildthecloud/cadvisor.git
- package: github.com/miekg/dns
version: 5d001d020961ae1c184f9f8152fdc73810481677
- package: github.com/mrunalp/fileutils
version: 4ee1cc9a80582a0c75febdd5cfa779ee4361cbca
- package: github.com/karrick/godirwalk
version: v1.7.5
- package: github.com/mindprince/gonvml
version: fee913ce8fb235edf54739d259ca0ecc226c7b8a
- package: github.com/armon/circbuf
version: bbbad097214e2918d8543d5201d12bfd7bca254d
- package: github.com/spf13/cobra
version: v0.0.1-34-gc439c4fa093711
- package: gopkg.in/square/go-jose.v2
version: v2.1.6-4-g89060dee6a84df
- package: github.com/jteeuwen/go-bindata
version: v3.0.7-72-ga0ff2567cfb709
- package: bitbucket.org/ww/goautoneg
version: a547fc61f48d567d5b4ec6f8aee5573d8efce11d
repo: https://github.com/rancher/goautoneg.git
- package: github.com/JeffAshton/win_pdh
version: 76bb4ee9f0ab50f77826f2a2ee7fb9d3880d6ec2
- package: github.com/MakeNowJust/heredoc
version: bb23615498cded5e105af4ce27de75b089cbe851
- package: github.com/mrunalp/fileutils
version: 4ee1cc9a80582a0c75febdd5cfa779ee4361cbca
- package: github.com/container-storage-interface/spec
version: v1.0.0
- package: github.com/mindprince/gonvml
version: fee913ce8fb235edf54739d259ca0ecc226c7b8a
- package: k8s.io/klog
version: 8139d8cb77af419532b33dfa7dd09fbc5f1d344f
- package: github.com/exponent-io/jsonpath
version: d6023ce2651d8eafb5c75bb0c7167536102ec9f5
- package: github.com/mattn/go-shellwords
version: v1.0.3-20-gf8471b0a71ded0
- package: sigs.k8s.io/yaml
version: v1.1.0
- package: vbom.ml/util
version: db5cfe13f5cc80a4990d98e2e1b0707a4d1a5394
- package: github.com/miekg/dns
version: 5d001d020961ae1c184f9f8152fdc73810481677
- package: github.com/fsnotify/fsnotify
version: v1.3.1-1-gf12c6236fe7b5c
- package: github.com/gregjones/httpcache
version: 787624de3eb7bd915c329cba748687a3b22666a6
- package: github.com/ibuildthecloud/kvsql
version: 6bb3d252056655760ed8ca6557d6d5e607b361d2
- package: github.com/cyphar/filepath-securejoin
version: v0.2.1-1-gae69057f2299fb
- package: github.com/jteeuwen/go-bindata
version: v3.0.7-72-ga0ff2567cfb709
- package: github.com/armon/circbuf
version: bbbad097214e2918d8543d5201d12bfd7bca254d
- package: github.com/coreos/pkg
version: v4
- package: github.com/docker/go-connections
version: v0.3.0
- package: github.com/peterbourgon/diskv
version: v2.0.1
- package: github.com/prometheus/client_model
version: model-0.0.2-12-gfa8ad6fec33561
- package: k8s.io/heapster
version: v1.2.0-beta.1
- package: github.com/vishvananda/netlink
version: b2de5d10e38ecce8607e6b438b6d174f389a004e
- package: github.com/chai2010/gettext-go
version: c6fed771bfd517099caf0f7a961671fa8ed08723
- package: github.com/google/certificate-transparency-go
version: v1.0.21
- package: github.com/docker/libnetwork
version: v0.8.0-dev.2-1265-ga9cd636e378982
- package: github.com/inconshreveable/mousetrap
version: v1.0
- package: github.com/mistifyio/go-zfs
version: v2.1.1-5-g1b4ae6fb4e77b0
- package: github.com/mitchellh/go-wordwrap
version: ad45545899c7b13c020ea92b2072220eefad42b8
- package: github.com/euank/go-kmsg-parser
version: v2.0.0
- package: github.com/coreos/go-semver
version: v0.2.0-9-ge214231b295a8e
- package: golang.org/x/sys
version: 95c6576299259db960f6c5b9b69ea52422860fce
- package: github.com/google/btree
version: 7d79101e329e5a3adf994758c578dab82b90c017
- package: github.com/sigma/go-inotify
version: c87b6cf5033d2c6486046f045eeebdc3d910fd38
- package: golang.org/x/tools
version: 2382e3994d48b1d22acc2c86bcad0a2aff028e32
- package: k8s.io/gengo
version: 51747d6e00da1fc578d5a333a93bb2abcbce7a95
- package: github.com/mxk/go-flowrate
version: cca7078d478f8520f85629ad7c68962d31ed7682
- package: github.com/Azure/go-ansiterm
version: d6e3b3328b783f23731bc4d058875b0371ff8109
- package: github.com/Nvveen/Gotty
version: cd527374f1e5bff4938207604a14f2e38a9cf512
- package: k8s.io/utils
version: 66066c83e385e385ccc3c964b44fd7dcd413d0ed
- package: github.com/googleapis/gnostic
version: 0c5108395e2debce0d731cf0287ddf7242066aba
- package: gopkg.in/natefinch/lumberjack.v2
version: v1.0-16-g20b71e5b60d756
- package: github.com/pborman/uuid
version: ca53cad383cad2479bbba7f7a1a05797ec1386e4
- package: github.com/prometheus/client_golang
version: v0.8.0-83-ge7e903064f5e9e
- package: github.com/docker/docker
version: docs-v1.12.0-rc4-2016-07-15-9510-ga9fbbdc8dd8794
- package: github.com/golang/groupcache
version: 02826c3e79038b59d737d3b1c0a1d937f71a4433
- package: github.com/robfig/cron
version: v1-53-gdf38d32658d878
- package: github.com/daviddengcn/go-colortext
version: 511bcaf42ccd42c38aba7427b6673277bf19e2a1
- package: github.com/ugorji/go
version: bdcc60b419d136a85cdf2e7cbcac34b3f1cd6e57
- package: github.com/prometheus/procfs
version: 65c1f6f8f0fc1e2185eb9863a3bc751496404259
- package: github.com/fatih/camelcase
version: f6a740d52f961c60348ebb109adde9f4635d7540
- package: github.com/renstrom/dedent
version: v1.0.0-3-g020d11c3b9c0c7
- package: github.com/spf13/pflag
version: v1.0.1
- package: github.com/coreos/etcd
version: v3.3.10
- package: github.com/shurcooL/sanitized_anchor_name
version: 10ef21a441db47d8b13ebcc5fd2310f636973c77
- package: gopkg.in/square/go-jose.v2
version: v2.1.6-4-g89060dee6a84df
- package: github.com/jonboulle/clockwork
version: 72f9bd7c4e0c2a40055ab3d0f09654f730cce982
- package: github.com/karrick/godirwalk
version: v1.7.5
- package: github.com/prometheus/common
version: 13ba4ddd0caa9c28ca7b7bffe1dfa9ed8d5ef207
- package: github.com/evanphx/json-patch
version: v4.0.0-3-g36442dbdb58521
- package: github.com/google/cadvisor
version: 91dab6eb91496ed68acbef68b02b34b3392ca754
repo: https://github.com/ibuildthecloud/cadvisor.git
- package: github.com/docker/libnetwork
version: v0.8.0-dev.2-1265-ga9cd636e378982
- package: k8s.io/utils
version: 66066c83e385e385ccc3c964b44fd7dcd413d0ed
- package: github.com/exponent-io/jsonpath
version: d6023ce2651d8eafb5c75bb0c7167536102ec9f5
- package: github.com/google/btree
version: 7d79101e329e5a3adf994758c578dab82b90c017
- package: github.com/hashicorp/golang-lru
version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4
- package: github.com/imdario/mergo
version: v0.3.5
- package: github.com/jonboulle/clockwork
version: 72f9bd7c4e0c2a40055ab3d0f09654f730cce982
- package: github.com/peterbourgon/diskv
version: v2.0.1
- package: golang.org/x/sys
version: 95c6576299259db960f6c5b9b69ea52422860fce
- package: github.com/MakeNowJust/heredoc
version: bb23615498cded5e105af4ce27de75b089cbe851
- package: github.com/golang/groupcache
version: 02826c3e79038b59d737d3b1c0a1d937f71a4433

View File

@ -9,9 +9,9 @@ package=github.com/opencontainers/runc/libcontainer/nsenter
package=github.com/opencontainers/runc/libcontainer/specconv
package=github.com/opencontainers/runc/contrib/cmd/recvtty
k8s.io/kubernetes v1.13.1-k3s2 https://github.com/ibuildthecloud/k3s.git transitive=true,staging=true
k8s.io/kubernetes v1.13.2-k3s2 https://github.com/ibuildthecloud/k3s.git transitive=true,staging=true
github.com/rancher/norman 29915f8336c0a242560a9fef1d11bbaf04660915 https://github.com/ibuildthecloud/norman.git
github.com/rancher/norman 628eb6b32906125ddd77e23f3ad3c9bdb6e93664 https://github.com/ibuildthecloud/norman.git
github.com/coreos/flannel 3d7cff78e2ca4cade87c6c7d44adf27fe3de2709 https://github.com/ibuildthecloud/flannel.git
github.com/natefinch/lumberjack aee4629129445bbdfb69aa565537dcfa16544311
github.com/gorilla/mux v1.6.2

View File

@ -15,6 +15,9 @@ import (
)
type Generic struct {
// revision must be first to ensure that this is properly aligned for atomic.LoadInt64
revision int64
db *sql.DB
CleanupSQL string
@ -27,7 +30,6 @@ type Generic struct {
GetRevisionSQL string
ToDeleteSQL string
DeleteOldSQL string
revision int64
changes chan *KeyValue
broadcaster broadcast.Broadcaster

View File

@ -2,6 +2,7 @@ package sqlite
import (
"database/sql"
"os"
"strings"
"github.com/ibuildthecloud/kvsql/clientv3/driver"
@ -63,7 +64,8 @@ func NewSQLite() *driver.Generic {
func Open(dataSourceName string) (*sql.DB, error) {
if dataSourceName == "" {
dataSourceName = "./state.db?_journal=WAL&cache=shared"
os.MkdirAll("./db", 700)
dataSourceName = "./db/state.db?_journal=WAL&cache=shared"
}
db, err := sql.Open("sqlite3", dataSourceName)
if err != nil {

View File

@ -185,6 +185,8 @@ func (c *Config) defaults(ctx context.Context, r *Runtime, opts Options) (contex
}
}
r.LocalConfig = c.Config
if c.ClientGetter == nil {
cg, err := proxy.NewClientGetterFromConfig(*c.Config)
if err != nil {

View File

@ -122,7 +122,7 @@ func ParseAndValidateToken(server, token string) (*Info, error) {
return nil, err
}
if len(cacerts) > 0 {
if len(cacerts) > 0 && len(parsedToken.caHash) > 0 {
if ok, hash, newHash := validateCACerts(cacerts, parsedToken.caHash); !ok {
return nil, fmt.Errorf("token does not match the server %s != %s", hash, newHash)
}

View File

@ -0,0 +1,49 @@
package objectset
import (
"github.com/rancher/norman/objectclient"
"github.com/rancher/norman/pkg/objectset/injectors"
"github.com/rancher/norman/types"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
)
type DesiredSet struct {
discoveredClients map[schema.GroupVersionKind]*objectclient.ObjectClient
discovery discovery.DiscoveryInterface
restConfig rest.Config
remove bool
setID string
objs *ObjectSet
codeVersion string
clients map[schema.GroupVersionKind]Client
owner runtime.Object
injectors []injectors.ConfigInjector
errs []error
}
func (o *DesiredSet) AddDiscoveredClient(gvk schema.GroupVersionKind, client *objectclient.ObjectClient) {
if o.discoveredClients == nil {
o.discoveredClients = map[schema.GroupVersionKind]*objectclient.ObjectClient{}
}
o.discoveredClients[gvk] = client
}
func (o *DesiredSet) DiscoveredClients() map[schema.GroupVersionKind]*objectclient.ObjectClient {
return o.discoveredClients
}
func (o *DesiredSet) AddInjector(inj injectors.ConfigInjector) {
o.injectors = append(o.injectors, inj)
}
func (o *DesiredSet) err(err error) error {
o.errs = append(o.errs, err)
return o.Err()
}
func (o *DesiredSet) Err() error {
return types.NewErrors(append(o.objs.errs, o.errs...)...)
}

View File

@ -0,0 +1,280 @@
package objectset
import (
"crypto/sha1"
"encoding/hex"
"fmt"
"sort"
"sync"
"github.com/pkg/errors"
errors2 "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/util/flowcontrol"
)
const (
LabelID = "objectset.rio.cattle.io/id"
LabelGVK = "objectset.rio.cattle.io/owner-gvk"
LabelName = "objectset.rio.cattle.io/owner-name"
LabelNamespace = "objectset.rio.cattle.io/owner-namespace"
LabelHash = "objectset.rio.cattle.io/hash"
)
var (
hashOrder = []string{
LabelID,
LabelGVK,
LabelName,
LabelNamespace,
}
rls = map[string]flowcontrol.RateLimiter{}
rlsLock sync.Mutex
)
func (o *DesiredSet) getRateLimit(inputID string) flowcontrol.RateLimiter {
var rl flowcontrol.RateLimiter
rlsLock.Lock()
defer rlsLock.Unlock()
if o.remove {
delete(rls, inputID)
} else {
rl = rls[inputID]
if rl == nil {
rl = flowcontrol.NewTokenBucketRateLimiter(4.0/60.0, 10)
rls[inputID] = rl
}
}
return rl
}
func (o *DesiredSet) Apply() error {
if err := o.Err(); err != nil {
return err
}
labelSet, annotationSet, err := o.getLabelsAndAnnotations()
if err != nil {
return o.err(err)
}
rl := o.getRateLimit(labelSet[LabelHash])
if rl != nil && !rl.TryAccept() {
return errors2.NewConflict(schema.GroupResource{}, o.setID, errors.New("delaying object set"))
}
inputID := o.inputID(labelSet[LabelHash])
objList, err := o.injectLabelsAndAnnotations(labelSet, annotationSet)
if err != nil {
return o.err(err)
}
objList, err = o.runInjectors(objList)
if err != nil {
return o.err(err)
}
objs := o.collect(objList)
debugID := o.debugID()
req, err := labels.NewRequirement(LabelHash, selection.Equals, []string{labelSet[LabelHash]})
if err != nil {
return o.err(err)
}
for _, gvk := range o.gvkOrder() {
o.process(inputID, debugID, labels.NewSelector().Add(*req), gvk, objs[gvk])
}
return o.Err()
}
func (o *DesiredSet) gvkOrder() []schema.GroupVersionKind {
seen := map[schema.GroupVersionKind]bool{}
var gvkOrder []schema.GroupVersionKind
for _, obj := range o.objs.order {
if seen[obj.GetObjectKind().GroupVersionKind()] {
continue
}
seen[obj.GetObjectKind().GroupVersionKind()] = true
gvkOrder = append(gvkOrder, obj.GetObjectKind().GroupVersionKind())
}
var rest []schema.GroupVersionKind
for gvk := range o.clients {
if seen[gvk] {
continue
}
seen[gvk] = true
rest = append(rest, gvk)
}
sort.Slice(rest, func(i, j int) bool {
return rest[i].String() < rest[j].String()
})
return append(gvkOrder, rest...)
}
func (o *DesiredSet) inputID(labelHash string) string {
sort.Slice(o.objs.inputs, func(i, j int) bool {
left, lErr := meta.Accessor(o.objs.inputs[i])
right, rErr := meta.Accessor(o.objs.inputs[j])
if lErr != nil || rErr != nil {
return true
}
lKey := o.objs.inputs[i].GetObjectKind().GroupVersionKind().String() + "/" + newObjectKey(left).String()
rKey := o.objs.inputs[j].GetObjectKind().GroupVersionKind().String() + "/" + newObjectKey(right).String()
return lKey < rKey
})
dig := sha1.New()
dig.Write([]byte(o.codeVersion))
dig.Write([]byte(labelHash))
inputs := o.objs.inputs
if o.owner != nil {
inputs = append([]runtime.Object{o.owner}, o.objs.inputs...)
}
for _, obj := range inputs {
metadata, err := meta.Accessor(obj)
if err != nil {
dig.Write([]byte(obj.GetObjectKind().GroupVersionKind().String()))
continue
}
key := newObjectKey(metadata)
dig.Write([]byte(key.String()))
dig.Write([]byte(metadata.GetResourceVersion()))
}
return hex.EncodeToString(dig.Sum(nil))
}
func (o *DesiredSet) debugID() string {
if o.owner == nil {
return o.setID
}
metadata, err := meta.Accessor(o.owner)
if err != nil {
return o.setID
}
return fmt.Sprintf("%s %s", o.setID, objectKey{
namespace: metadata.GetNamespace(),
name: metadata.GetName(),
})
}
func (o *DesiredSet) collect(objList []runtime.Object) objectCollection {
result := objectCollection{}
for _, obj := range objList {
result.add(obj)
}
return result
}
func (o *DesiredSet) runInjectors(objList []runtime.Object) ([]runtime.Object, error) {
var err error
for _, inj := range o.injectors {
if inj == nil {
continue
}
objList, err = inj(objList)
if err != nil {
return nil, err
}
}
return objList, nil
}
func (o *DesiredSet) getLabelsAndAnnotations() (map[string]string, map[string]string, error) {
annotations := map[string]string{
LabelID: o.setID,
}
if o.owner != nil {
annotations[LabelGVK] = o.owner.GetObjectKind().GroupVersionKind().String()
metadata, err := meta.Accessor(o.owner)
if err != nil {
return nil, nil, fmt.Errorf("failed to get metadata for %s", o.owner.GetObjectKind().GroupVersionKind())
}
annotations[LabelName] = metadata.GetName()
annotations[LabelNamespace] = metadata.GetNamespace()
}
labels := map[string]string{
LabelHash: objectSetHash(annotations),
}
return labels, annotations, nil
}
func (o *DesiredSet) injectLabelsAndAnnotations(labels, annotations map[string]string) ([]runtime.Object, error) {
var result []runtime.Object
for _, objMap := range o.objs.objects {
for key, obj := range objMap {
obj = obj.DeepCopyObject()
meta, err := meta.Accessor(obj)
if err != nil {
return nil, errors.Wrapf(err, "failed to get metadata for %s", key)
}
setLabels(meta, labels)
setAnnotations(meta, annotations)
result = append(result, obj)
}
}
return result, nil
}
func setAnnotations(meta metav1.Object, annotations map[string]string) {
objAnn := meta.GetAnnotations()
if objAnn == nil {
objAnn = map[string]string{}
}
delete(objAnn, LabelInputID)
delete(objAnn, LabelApplied)
for k, v := range annotations {
objAnn[k] = v
}
meta.SetAnnotations(objAnn)
}
func setLabels(meta metav1.Object, labels map[string]string) {
objLabels := meta.GetLabels()
if objLabels == nil {
objLabels = map[string]string{}
}
for k, v := range labels {
objLabels[k] = v
}
meta.SetLabels(objLabels)
}
func objectSetHash(labels map[string]string) string {
dig := sha1.New()
for _, key := range hashOrder {
dig.Write([]byte(labels[key]))
}
return hex.EncodeToString(dig.Sum(nil))
}

View File

@ -0,0 +1,332 @@
package objectset
import (
"bytes"
"compress/gzip"
"encoding/base64"
"io/ioutil"
"sync"
"github.com/pkg/errors"
"github.com/rancher/norman/objectclient"
"github.com/rancher/norman/types/convert"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/scheme"
)
const (
LabelApplied = "objectset.rio.cattle.io/applied"
LabelInputID = "objectset.rio.cattle.io/inputid"
)
var (
patchCache = map[schema.GroupVersionKind]patchCacheEntry{}
patchCacheLock = sync.Mutex{}
)
type patchCacheEntry struct {
patchType types.PatchType
lookup strategicpatch.LookupPatchMeta
}
func prepareObjectForCreate(inputID string, obj runtime.Object) (runtime.Object, error) {
serialized, err := json.Marshal(obj)
if err != nil {
return nil, err
}
obj = obj.DeepCopyObject()
meta, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
annotations := meta.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[LabelInputID] = inputID
annotations[LabelApplied] = appliedToAnnotation(serialized)
meta.SetAnnotations(annotations)
return obj, nil
}
func originalAndModifiedForInputID(inputID string, oldMetadata v1.Object, newObject runtime.Object) ([]byte, []byte, error) {
original, err := getOriginal(inputID, oldMetadata)
if err != nil {
return nil, nil, err
}
newObject, err = prepareObjectForCreate(inputID, newObject)
if err != nil {
return nil, nil, err
}
modified, err := json.Marshal(newObject)
return original, modified, err
}
func onlyKeys(data map[string]interface{}, keys ...string) bool {
for i, key := range keys {
if len(data) > 1 {
return false
} else if len(data) == 0 {
return true
}
value, ok := data[key]
if !ok {
return false
}
if i == len(keys)-1 {
return true
}
data = convert.ToMapInterface(value)
}
return false
}
func sanitizePatch(patch []byte) ([]byte, error) {
mod := false
data := map[string]interface{}{}
err := json.Unmarshal(patch, &data)
if err != nil {
return nil, err
}
if _, ok := data["kind"]; ok {
mod = true
delete(data, "kind")
}
if _, ok := data["apiVersion"]; ok {
mod = true
delete(data, "apiVersion")
}
if deleted := removeCreationTimestamp(data); deleted {
mod = true
}
if onlyKeys(data, "metadata", "annotations", LabelInputID) {
return []byte("{}"), nil
}
if !mod {
return patch, nil
}
return json.Marshal(data)
}
func applyPatch(client objectclient.GenericClient, debugID, inputID string, oldObject, newObject runtime.Object) (bool, error) {
gvk := client.GroupVersionKind()
oldMetadata, err := meta.Accessor(oldObject)
if err != nil {
return false, err
}
original, modified, err := originalAndModifiedForInputID(inputID, oldMetadata, newObject)
if err != nil {
return false, err
}
current, err := json.Marshal(oldObject)
if err != nil {
return false, err
}
patchType, patch, err := doPatch(gvk, original, modified, current)
if err != nil {
return false, errors.Wrap(err, "patch generation")
}
if string(patch) == "{}" {
return false, nil
}
patch, err = sanitizePatch(patch)
if err != nil {
return false, err
}
if string(patch) == "{}" {
return false, nil
}
logrus.Debugf("DesiredSet - Patch %s %s/%s for %s -- [%s, %s, %s, %s]", gvk, oldMetadata.GetNamespace(), oldMetadata.GetName(), debugID,
patch, original, modified, current)
logrus.Debugf("DesiredSet - Updated %s %s/%s for %s -- %s %s", gvk, oldMetadata.GetNamespace(), oldMetadata.GetName(), debugID, patchType, patch)
_, err = client.Patch(oldMetadata.GetName(), oldObject, patchType, patch)
return true, err
}
func (o *DesiredSet) compareObjects(client objectclient.GenericClient, debugID, inputID string, oldObject, newObject runtime.Object, force bool) error {
oldMetadata, err := meta.Accessor(oldObject)
if err != nil {
return err
}
gvk := client.GroupVersionKind()
if ran, err := applyPatch(client, debugID, inputID, oldObject, newObject); err != nil {
return err
} else if !ran {
logrus.Debugf("DesiredSet - No change(2) %s %s/%s for %s", gvk, oldMetadata.GetNamespace(), oldMetadata.GetName(), debugID)
}
return nil
}
func removeCreationTimestamp(data map[string]interface{}) bool {
metadata, ok := data["metadata"]
if !ok {
return false
}
data = convert.ToMapInterface(metadata)
if _, ok := data["creationTimestamp"]; ok {
delete(data, "creationTimestamp")
return true
}
return false
}
func getOriginal(inputID string, obj v1.Object) ([]byte, error) {
original := appliedFromAnnotation(obj.GetAnnotations()[LabelApplied])
if len(original) == 0 {
return []byte("{}"), nil
}
mapObj := &unstructured.Unstructured{}
err := json.Unmarshal(original, mapObj)
if err != nil {
return nil, err
}
removeCreationTimestamp(mapObj.Object)
objCopy, err := prepareObjectForCreate(inputID, mapObj)
if err != nil {
return nil, err
}
return json.Marshal(objCopy)
}
func appliedFromAnnotation(str string) []byte {
if len(str) == 0 || str[0] == '{' {
return []byte(str)
}
b, err := base64.RawStdEncoding.DecodeString(str)
if err != nil {
return nil
}
r, err := gzip.NewReader(bytes.NewBuffer(b))
if err != nil {
return nil
}
b, err = ioutil.ReadAll(r)
if err != nil {
return nil
}
return b
}
func appliedToAnnotation(b []byte) string {
if len(b) < 1024 {
return string(b)
}
buf := &bytes.Buffer{}
w := gzip.NewWriter(buf)
if _, err := w.Write(b); err != nil {
return string(b)
}
if err := w.Close(); err != nil {
return string(b)
}
return base64.RawStdEncoding.EncodeToString(buf.Bytes())
}
// doPatch is adapted from "kubectl apply"
func doPatch(gvk schema.GroupVersionKind, original, modified, current []byte) (types.PatchType, []byte, error) {
var patchType types.PatchType
var patch []byte
var lookupPatchMeta strategicpatch.LookupPatchMeta
patchType, lookupPatchMeta, err := getPatchStyle(gvk)
if err != nil {
return patchType, nil, err
}
if patchType == types.StrategicMergePatchType {
patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, true)
} else {
patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current)
}
if err != nil {
logrus.Errorf("Failed to calcuated patch: %v", err)
}
return patchType, patch, err
}
func getPatchStyle(gvk schema.GroupVersionKind) (types.PatchType, strategicpatch.LookupPatchMeta, error) {
var (
patchType types.PatchType
lookupPatchMeta strategicpatch.LookupPatchMeta
)
patchCacheLock.Lock()
entry, ok := patchCache[gvk]
patchCacheLock.Unlock()
if ok {
return entry.patchType, entry.lookup, nil
}
versionedObject, err := scheme.Scheme.New(gvk)
if runtime.IsNotRegisteredError(err) {
patchType = types.MergePatchType
} else if err != nil {
return patchType, nil, err
} else {
patchType = types.StrategicMergePatchType
lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
return patchType, nil, err
}
}
patchCacheLock.Lock()
patchCache[gvk] = patchCacheEntry{
patchType: patchType,
lookup: lookupPatchMeta,
}
patchCacheLock.Unlock()
return patchType, lookupPatchMeta, nil
}

View File

@ -0,0 +1,223 @@
package objectset
import (
"fmt"
"sort"
"github.com/pkg/errors"
"github.com/rancher/norman/controller"
"github.com/rancher/norman/objectclient"
"github.com/rancher/norman/objectclient/dynamic"
"github.com/rancher/norman/restwatch"
"github.com/rancher/norman/types"
"github.com/sirupsen/logrus"
errors2 "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
var (
deletePolicy = v1.DeletePropagationBackground
)
func NewDiscoveredClient(gvk schema.GroupVersionKind, restConfig rest.Config, discovery discovery.DiscoveryInterface) (*objectclient.ObjectClient, error) {
resources, err := discovery.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
return nil, err
}
for _, resource := range resources.APIResources {
if resource.Kind != gvk.Kind {
continue
}
if restConfig.NegotiatedSerializer == nil {
restConfig.NegotiatedSerializer = dynamic.NegotiatedSerializer
}
restClient, err := restwatch.UnversionedRESTClientFor(&restConfig)
if err != nil {
return nil, err
}
objectClient := objectclient.NewObjectClient("", restClient, &resource, gvk, &objectclient.UnstructuredObjectFactory{})
return objectClient, nil
}
return nil, fmt.Errorf("failed to discover client for %s", gvk)
}
func (o *DesiredSet) getControllerAndObjectClient(debugID string, gvk schema.GroupVersionKind) (controller.GenericController, *objectclient.ObjectClient, error) {
client, ok := o.clients[gvk]
if !ok && o.discovery == nil {
return nil, nil, fmt.Errorf("failed to find client for %s for %s", gvk, debugID)
}
if client != nil {
return client.Generic(), client.ObjectClient(), nil
}
objectClient := o.discoveredClients[gvk]
if objectClient != nil {
return nil, objectClient, nil
}
objectClient, err := NewDiscoveredClient(gvk, o.restConfig, o.discovery)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to find client for %s for %s", gvk, debugID)
}
o.AddDiscoveredClient(gvk, objectClient)
return nil, objectClient, nil
}
func (o *DesiredSet) process(inputID, debugID string, set labels.Selector, gvk schema.GroupVersionKind, objs map[objectKey]runtime.Object) {
controller, objectClient, err := o.getControllerAndObjectClient(debugID, gvk)
if err != nil {
o.err(err)
return
}
existing, err := list(controller, objectClient, set)
if err != nil {
o.err(fmt.Errorf("failed to list %s for %s", gvk, debugID))
return
}
toCreate, toDelete, toUpdate := compareSets(existing, objs)
for _, k := range toCreate {
obj := objs[k]
obj, err := prepareObjectForCreate(inputID, obj)
if err != nil {
o.err(errors.Wrapf(err, "failed to prepare create %s %s for %s", k, gvk, debugID))
continue
}
_, err = objectClient.Create(obj)
if errors2.IsAlreadyExists(err) {
// Taking over an object that wasn't previously managed by us
existingObj, err := objectClient.GetNamespaced(k.namespace, k.name, v1.GetOptions{})
if err == nil {
toUpdate = append(toUpdate, k)
existing[k] = existingObj
continue
}
}
if err != nil {
o.err(errors.Wrapf(err, "failed to create %s %s for %s", k, gvk, debugID))
continue
}
logrus.Debugf("DesiredSet - Created %s %s for %s", gvk, k, debugID)
}
for _, k := range toUpdate {
err := o.compareObjects(objectClient, debugID, inputID, existing[k], objs[k], len(toCreate) > 0 || len(toDelete) > 0)
if err != nil {
o.err(errors.Wrapf(err, "failed to update %s %s for %s", k, gvk, debugID))
continue
}
}
for _, k := range toDelete {
err := objectClient.DeleteNamespaced(k.namespace, k.name, &v1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
if err != nil {
o.err(errors.Wrapf(err, "failed to delete %s %s for %s", k, gvk, debugID))
continue
}
logrus.Debugf("DesiredSet - Delete %s %s for %s", gvk, k, debugID)
}
}
func compareSets(existingSet, newSet map[objectKey]runtime.Object) (toCreate, toDelete, toUpdate []objectKey) {
for k := range newSet {
if _, ok := existingSet[k]; ok {
toUpdate = append(toUpdate, k)
} else {
toCreate = append(toCreate, k)
}
}
for k := range existingSet {
if _, ok := newSet[k]; !ok {
toDelete = append(toDelete, k)
}
}
sortObjectKeys(toCreate)
sortObjectKeys(toDelete)
sortObjectKeys(toUpdate)
return
}
func sortObjectKeys(keys []objectKey) {
sort.Slice(keys, func(i, j int) bool {
return keys[i].String() < keys[j].String()
})
}
func addObjectToMap(objs map[objectKey]runtime.Object, obj interface{}) error {
metadata, err := meta.Accessor(obj)
if err != nil {
return err
}
objs[objectKey{
namespace: metadata.GetNamespace(),
name: metadata.GetName(),
}] = obj.(runtime.Object)
return nil
}
func list(controller controller.GenericController, objectClient *objectclient.ObjectClient, selector labels.Selector) (map[objectKey]runtime.Object, error) {
var (
errs []error
objs = map[objectKey]runtime.Object{}
)
if controller == nil {
objList, err := objectClient.List(v1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return nil, err
}
list, ok := objList.(*unstructured.UnstructuredList)
if !ok {
return nil, fmt.Errorf("invalid list type %T", objList)
}
if err != nil {
return nil, err
}
for _, obj := range list.Items {
if err := addObjectToMap(objs, obj); err != nil {
errs = append(errs, err)
}
}
return objs, nil
}
err := cache.ListAllByNamespace(controller.Informer().GetIndexer(), "", selector, func(obj interface{}) {
if err := addObjectToMap(objs, obj); err != nil {
errs = append(errs, err)
}
})
if err != nil {
errs = append(errs, err)
}
return objs, types.NewErrors(errs...)
}

View File

@ -0,0 +1,55 @@
package injectors
import (
"bufio"
"bytes"
"io"
"github.com/ghodss/yaml"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
)
func ToBytes(objects []runtime.Object) ([]byte, error) {
if len(objects) == 0 {
return nil, nil
}
buffer := &bytes.Buffer{}
for i, obj := range objects {
if i > 0 {
buffer.WriteString("\n---\n")
}
bytes, err := yaml.Marshal(obj)
if err != nil {
return nil, errors.Wrapf(err, "failed to encode %s", obj.GetObjectKind().GroupVersionKind())
}
buffer.Write(bytes)
}
return buffer.Bytes(), nil
}
func FromBytes(content []byte) ([]runtime.Object, error) {
var result []runtime.Object
reader := yamlDecoder.NewYAMLReader(bufio.NewReader(bytes.NewBuffer(content)))
for {
raw, err := reader.Read()
if err == io.EOF {
break
}
data := map[string]interface{}{}
if err := yaml.Unmarshal(raw, &data); err != nil {
return nil, err
}
result = append(result, &unstructured.Unstructured{Object: data})
}
return result, nil
}

View File

@ -0,0 +1,21 @@
package injectors
import "k8s.io/apimachinery/pkg/runtime"
var (
injectors = map[string]ConfigInjector{}
order []string
)
type ConfigInjector func(config []runtime.Object) ([]runtime.Object, error)
func Register(name string, injector ConfigInjector) {
if _, ok := injectors[name]; !ok {
order = append(order, name)
}
injectors[name] = injector
}
func Get(name string) ConfigInjector {
return injectors[name]
}

View File

@ -0,0 +1,133 @@
package objectset
import (
"fmt"
"reflect"
"github.com/rancher/norman/types"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type objectKey struct {
name string
namespace string
}
func newObjectKey(obj v1.Object) objectKey {
return objectKey{
namespace: obj.GetNamespace(),
name: obj.GetName(),
}
}
func (o objectKey) String() string {
if o.namespace == "" {
return o.name
}
return fmt.Sprintf("%s/%s", o.namespace, o.name)
}
type objectCollection map[schema.GroupVersionKind]map[objectKey]runtime.Object
func (o objectCollection) add(obj runtime.Object) error {
metadata, err := meta.Accessor(obj)
if err != nil {
return err
}
objs := o[obj.GetObjectKind().GroupVersionKind()]
if objs == nil {
objs = map[objectKey]runtime.Object{}
o[obj.GetObjectKind().GroupVersionKind()] = objs
}
objs[objectKey{
namespace: metadata.GetNamespace(),
name: metadata.GetName(),
}] = obj
return nil
}
type ObjectSet struct {
errs []error
objects objectCollection
nsed map[schema.GroupVersionKind]bool
inputs []runtime.Object
order []runtime.Object
}
func NewObjectSet() *ObjectSet {
return &ObjectSet{
nsed: map[schema.GroupVersionKind]bool{},
objects: objectCollection{},
}
}
func (o *ObjectSet) AddInput(objs ...runtime.Object) *ObjectSet {
for _, obj := range objs {
if obj == nil || reflect.ValueOf(obj).IsNil() {
continue
}
o.inputs = append(o.inputs, obj)
}
return o
}
func (o *ObjectSet) Add(objs ...runtime.Object) *ObjectSet {
for _, obj := range objs {
o.add(obj)
}
return o
}
func (o *ObjectSet) add(obj runtime.Object) {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return
}
gvk := obj.GetObjectKind().GroupVersionKind()
metadata, err := meta.Accessor(obj)
if err != nil {
o.err(fmt.Errorf("failed to get metadata for %s", gvk))
return
}
name := metadata.GetName()
if name == "" {
o.err(fmt.Errorf("%s is missing name", gvk))
return
}
namespace := metadata.GetNamespace()
nsed, ok := o.nsed[gvk]
if ok && nsed != (namespace != "") {
o.err(fmt.Errorf("got %s objects that are both namespaced and not namespaced", gvk))
return
}
o.nsed[gvk] = namespace != ""
if err := o.objects.add(obj); err != nil {
o.err(fmt.Errorf("failed to get metadata for %s", gvk))
return
}
o.order = append(o.order, obj)
}
func (o *ObjectSet) err(err error) error {
o.errs = append(o.errs, err)
return o.Err()
}
func (o *ObjectSet) AddErr(err error) {
o.errs = append(o.errs, err)
}
func (o *ObjectSet) Err() error {
return types.NewErrors(o.errs...)
}

View File

@ -0,0 +1,82 @@
package objectset
import (
"github.com/rancher/norman/controller"
"github.com/rancher/norman/objectclient"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
)
type Client interface {
Generic() controller.GenericController
ObjectClient() *objectclient.ObjectClient
}
type Processor struct {
setID string
codeVersion string
discovery discovery.DiscoveryInterface
restConfig rest.Config
allowSlowPath string
slowClient rest.HTTPClient
clients map[schema.GroupVersionKind]Client
}
func NewProcessor(setID string) *Processor {
return &Processor{
setID: setID,
clients: map[schema.GroupVersionKind]Client{},
}
}
func (t *Processor) SetID() string {
return t.setID
}
func (t *Processor) CodeVersion(version string) *Processor {
t.codeVersion = version
return t
}
func (t *Processor) AllowDiscovery(discovery discovery.DiscoveryInterface, restConfig rest.Config) *Processor {
t.discovery = discovery
t.restConfig = restConfig
return t
}
func (t *Processor) Clients() map[schema.GroupVersionKind]Client {
return t.clients
}
func (t *Processor) Client(clients ...Client) *Processor {
// ensure cache is enabled
for _, client := range clients {
client.Generic()
t.clients[client.ObjectClient().GroupVersionKind()] = client
}
return t
}
func (t Processor) Remove(owner runtime.Object) error {
return t.NewDesiredSet(owner, nil).Apply()
}
func (t Processor) NewDesiredSet(owner runtime.Object, objs *ObjectSet) *DesiredSet {
remove := false
if objs == nil {
remove = true
objs = &ObjectSet{}
}
return &DesiredSet{
discovery: t.discovery,
restConfig: t.restConfig,
remove: remove,
objs: objs,
setID: t.setID,
codeVersion: t.codeVersion,
clients: t.clients,
owner: owner,
}
}

View File

@ -70,10 +70,11 @@ func (e *Embed) ModifySchema(schema *types.Schema, schemas *types.Schemas) error
}
deleteField := true
outer:
for name, field := range embeddedSchema.ResourceFields {
for _, ignore := range e.Ignore {
if ignore == name {
continue
continue outer
}
}

View File

@ -3,8 +3,8 @@ package version
var (
gitMajor = "1"
gitMinor = "12"
gitVersion = "v1.13.1-k3s2"
gitCommit = "2054ab40be790e5f209e480b18ab8a7015262796"
gitVersion = "v1.13.2-k3s2"
gitCommit = "149316edfe67e696f622608282a3fc4252ad92b3"
gitTreeState = "clean"
buildDate = "2019-01-09T20:07+00:00Z"
buildDate = "2019-01-22T20:48+00:00Z"
)

View File

@ -47,14 +47,14 @@ func TokenSourceWrapTransport(ts oauth2.TokenSource) func(http.RoundTripper) htt
func NewCachedFileTokenSource(path string) oauth2.TokenSource {
return &cachingTokenSource{
now: time.Now,
leeway: 1 * time.Minute,
leeway: 10 * time.Second,
base: &fileTokenSource{
path: path,
// This period was picked because it is half of the minimum validity
// duration for a token provisioned by they TokenRequest API. This is
// unsophisticated and should induce rotation at a frequency that should
// work with the token volume source.
period: 5 * time.Minute,
// This period was picked because it is half of the duration between when the kubelet
// refreshes a projected service account token and when the original token expires.
// Default token lifetime is 10 minutes, and the kubelet starts refreshing at 80% of lifetime.
// This should induce re-reading at a frequency that works with the token volume source.
period: time.Minute,
},
}
}

View File

@ -1,9 +1,16 @@
<!-- BEGIN MUNGE: GENERATED_TOC -->
- [v1.13.0](#v1130)
- [Downloads for v1.13.0](#downloads-for-v1130)
- [v1.13.1](#v1131)
- [Downloads for v1.13.1](#downloads-for-v1131)
- [Client Binaries](#client-binaries)
- [Server Binaries](#server-binaries)
- [Node Binaries](#node-binaries)
- [Changelog since v1.13.0](#changelog-since-v1130)
- [Other notable changes](#other-notable-changes)
- [v1.13.0](#v1130)
- [Downloads for v1.13.0](#downloads-for-v1130)
- [Client Binaries](#client-binaries-1)
- [Server Binaries](#server-binaries-1)
- [Node Binaries](#node-binaries-1)
- [Kubernetes 1.13 Release Notes](#kubernetes-113-release-notes)
- [Security Content](#security-content)
- [Urgent Upgrade Notes](#urgent-upgrade-notes)
@ -52,60 +59,134 @@
- [External Dependencies](#external-dependencies)
- [v1.13.0-rc.2](#v1130-rc2)
- [Downloads for v1.13.0-rc.2](#downloads-for-v1130-rc2)
- [Client Binaries](#client-binaries-1)
- [Server Binaries](#server-binaries-1)
- [Node Binaries](#node-binaries-1)
- [Changelog since v1.13.0-rc.1](#changelog-since-v1130-rc1)
- [Other notable changes](#other-notable-changes)
- [v1.13.0-rc.1](#v1130-rc1)
- [Downloads for v1.13.0-rc.1](#downloads-for-v1130-rc1)
- [Client Binaries](#client-binaries-2)
- [Server Binaries](#server-binaries-2)
- [Node Binaries](#node-binaries-2)
- [Changelog since v1.13.0-beta.2](#changelog-since-v1130-beta2)
- [Changelog since v1.13.0-rc.1](#changelog-since-v1130-rc1)
- [Other notable changes](#other-notable-changes-1)
- [v1.13.0-beta.2](#v1130-beta2)
- [Downloads for v1.13.0-beta.2](#downloads-for-v1130-beta2)
- [v1.13.0-rc.1](#v1130-rc1)
- [Downloads for v1.13.0-rc.1](#downloads-for-v1130-rc1)
- [Client Binaries](#client-binaries-3)
- [Server Binaries](#server-binaries-3)
- [Node Binaries](#node-binaries-3)
- [Changelog since v1.13.0-beta.1](#changelog-since-v1130-beta1)
- [Changelog since v1.13.0-beta.2](#changelog-since-v1130-beta2)
- [Other notable changes](#other-notable-changes-2)
- [v1.13.0-beta.1](#v1130-beta1)
- [Downloads for v1.13.0-beta.1](#downloads-for-v1130-beta1)
- [v1.13.0-beta.2](#v1130-beta2)
- [Downloads for v1.13.0-beta.2](#downloads-for-v1130-beta2)
- [Client Binaries](#client-binaries-4)
- [Server Binaries](#server-binaries-4)
- [Node Binaries](#node-binaries-4)
- [Changelog since v1.13.0-alpha.3](#changelog-since-v1130-alpha3)
- [Action Required](#action-required)
- [Changelog since v1.13.0-beta.1](#changelog-since-v1130-beta1)
- [Other notable changes](#other-notable-changes-3)
- [v1.13.0-alpha.3](#v1130-alpha3)
- [Downloads for v1.13.0-alpha.3](#downloads-for-v1130-alpha3)
- [v1.13.0-beta.1](#v1130-beta1)
- [Downloads for v1.13.0-beta.1](#downloads-for-v1130-beta1)
- [Client Binaries](#client-binaries-5)
- [Server Binaries](#server-binaries-5)
- [Node Binaries](#node-binaries-5)
- [Changelog since v1.13.0-alpha.2](#changelog-since-v1130-alpha2)
- [Changelog since v1.13.0-alpha.3](#changelog-since-v1130-alpha3)
- [Action Required](#action-required)
- [Other notable changes](#other-notable-changes-4)
- [v1.13.0-alpha.2](#v1130-alpha2)
- [Downloads for v1.13.0-alpha.2](#downloads-for-v1130-alpha2)
- [v1.13.0-alpha.3](#v1130-alpha3)
- [Downloads for v1.13.0-alpha.3](#downloads-for-v1130-alpha3)
- [Client Binaries](#client-binaries-6)
- [Server Binaries](#server-binaries-6)
- [Node Binaries](#node-binaries-6)
- [Changelog since v1.13.0-alpha.1](#changelog-since-v1130-alpha1)
- [Changelog since v1.13.0-alpha.2](#changelog-since-v1130-alpha2)
- [Other notable changes](#other-notable-changes-5)
- [v1.13.0-alpha.1](#v1130-alpha1)
- [Downloads for v1.13.0-alpha.1](#downloads-for-v1130-alpha1)
- [v1.13.0-alpha.2](#v1130-alpha2)
- [Downloads for v1.13.0-alpha.2](#downloads-for-v1130-alpha2)
- [Client Binaries](#client-binaries-7)
- [Server Binaries](#server-binaries-7)
- [Node Binaries](#node-binaries-7)
- [Changelog since v1.13.0-alpha.1](#changelog-since-v1130-alpha1)
- [Other notable changes](#other-notable-changes-6)
- [v1.13.0-alpha.1](#v1130-alpha1)
- [Downloads for v1.13.0-alpha.1](#downloads-for-v1130-alpha1)
- [Client Binaries](#client-binaries-8)
- [Server Binaries](#server-binaries-8)
- [Node Binaries](#node-binaries-8)
- [Changelog since v1.12.0](#changelog-since-v1120)
- [Action Required](#action-required-1)
- [Other notable changes](#other-notable-changes-6)
- [Other notable changes](#other-notable-changes-7)
<!-- END MUNGE: GENERATED_TOC -->
<!-- NEW RELEASE NOTES ENTRY -->
# v1.13.1
[Documentation](https://docs.k8s.io)
## Downloads for v1.13.1
filename | sha512 hash
-------- | -----------
[kubernetes.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes.tar.gz) | `de3858357b2b4444bccc0599c7d0edd3e6ec1a80267ef96883ebcfb06c518ce467dd8720b48084644677a42b8e3ffad9a7d4745b40170ce9dfe5b43310979be1`
[kubernetes-src.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-src.tar.gz) | `7f0a8dbd3c7397cc5a5bc0297eb24b8e734c3c7b78e48fc794c525377c3895f4fd84fd0a2fa70c5513cc47ee5a174c22bab54796abc5a8f2b30687642c819a68`
### Client Binaries
filename | sha512 hash
-------- | -----------
[kubernetes-client-darwin-386.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-darwin-386.tar.gz) | `371028dba7a28ec3c8f10b861448cb1574dce25d32d847af254b76b7f158aa4fcda695972e2a08440faa4e16077f8021b07115d0da897bef79c33e702f3be95e`
[kubernetes-client-darwin-amd64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-darwin-amd64.tar.gz) | `6aa7025308e9fb1eb4415e504e8aa9c7a0a20b09c500cb48df82bbd04443101664b2614fb284875b9670d4bb11e8f1a10190eaf1d54f81f3a9526053958b0802`
[kubernetes-client-linux-386.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-linux-386.tar.gz) | `6453670bb61b4f5f7fe8ae78804864ecd52682b32592f6956faf3d2220884a64fb22ae2e668b63f28ea8fd354c50aa90ce61c60be327fb0b5fcfe2c7835ef559`
[kubernetes-client-linux-amd64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-linux-amd64.tar.gz) | `ca00442f50b5d5627357dce97c90c17cb0126d746b887afdab2d4db9e0826532469fd1ee62f40eb6923761618f46752d10993578ca19c8b92c3a2aeb5102a318`
[kubernetes-client-linux-arm.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-linux-arm.tar.gz) | `5fa170cbe56b8f5d103f520e2493f911c5eb59b51a6afdbaa9c08196943f1235e533f0384ce7c01c73a020c6889cf8f03cc3642912d0953c74d1098e4b21f3a0`
[kubernetes-client-linux-arm64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-linux-arm64.tar.gz) | `710343ad067f0d642c43cd26871828275645b08b4f4c86bd555865318d8fe08b7f0a720174c04d58acffcb26faf563636dc13eef66a2813eac68bb8b994908f4`
[kubernetes-client-linux-ppc64le.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-linux-ppc64le.tar.gz) | `0fa7ab255f0cba3adc754337c6184e6ec464aa5a4d6dd4d38aad8a0e2430a0044f4ed1ffcd7cc7c863190d3cda6b84abd12ca7536139d665ad61fe7704e63d30`
[kubernetes-client-linux-s390x.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-linux-s390x.tar.gz) | `749a8dce5b81e2edbd315841acac64a0e5d17bb1ead8173560b6a4ccc28604bc8254051297ab51cb5df845495bd75a45137827b3386e3962295fec8601563eaa`
[kubernetes-client-windows-386.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-windows-386.tar.gz) | `cd4732fbe569009c426f963318d05ddcc7c63dc27ec9d2bf9c60d716195e3676aa5b0e6ccbde6298f621450d365d41a910ce3ced89bf2ae6d3e81ee2fed0bb16`
[kubernetes-client-windows-amd64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-client-windows-amd64.tar.gz) | `40f5b5d221b3a611511690d316539dc8fb3f4513e4f9eb141bffa17c9ddeee875a462f5bd45e62ce7c7535310fc3e48e3441614700ee9877584c5948ddbef19f`
### Server Binaries
filename | sha512 hash
-------- | -----------
[kubernetes-server-linux-amd64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-server-linux-amd64.tar.gz) | `e0e48825c5fe33a3f82b1b74847d9bfb8c5716c4313c5e4e6f46be0580e20a1e396a669b8ca446cfa581e3eb75698813249bbfcfc79c8a90793880eb5c177921`
[kubernetes-server-linux-arm.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-server-linux-arm.tar.gz) | `7ff4856e7959cf14eba0e1ab274c0bf0d3193391e7034a936697f0c4813e81d8dda4a019d3185677bee9d1345a6433db3fd6e55f644a0f73d076e0b2014ed172`
[kubernetes-server-linux-arm64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-server-linux-arm64.tar.gz) | `b8c2356002e675bd3de5ee9c2337a12e2a1bbfa2478f8e3b91065a578dfa8d50f596fd606d9f0232b06b8263867a7ca5cc7c04150718b8e40b49ae7d46001c30`
[kubernetes-server-linux-ppc64le.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-server-linux-ppc64le.tar.gz) | `5d3a15b1241d849d8954894aa7f3fb12606f9966f73fc36aa15152038fc385153b0f0e967cc0bf410a5d5894d0269e54eac581d8e79003904d7bc29b33e98684`
[kubernetes-server-linux-s390x.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-server-linux-s390x.tar.gz) | `78a9cccaf9d737b519db0866c2e80c472c7136bc723910d08649ece1c420ae7f6e56e610d65c436c56ccef8360c4da0f70e75d0cf47c0c8e739f5138cdc7b0d2`
### Node Binaries
filename | sha512 hash
-------- | -----------
[kubernetes-node-linux-amd64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-node-linux-amd64.tar.gz) | `3a7881a52885bebe5958f02dc54194cc8c330576b7cf5935189df4f0b754b958917b104e1d3358c0bc9277f13a8eef2176284548d664f27a36baa389fbcc7bea`
[kubernetes-node-linux-arm.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-node-linux-arm.tar.gz) | `d0bfcff3ef7c0aa36005e7b111685438ebd0ea61d48dc68a7bd06eea3782b6eb224f9b651d80c955afa162f766c8b682976db43238562c293d6552cdadf9e934`
[kubernetes-node-linux-arm64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-node-linux-arm64.tar.gz) | `2e23bd00661aceb30fa37e24ab71315755bd93dfcc5ff361d78445a8e9ff99e7b3a56641112af3184e8b107545fba6573a6368a82bd0ce475c81cb53fd44da3b`
[kubernetes-node-linux-ppc64le.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-node-linux-ppc64le.tar.gz) | `8d0fdb743c700d662886636fe67b52202cf9e6e57c2d7de5961b8189d8c03c91fda1d68c47033286efcc582e78be40846e2b1f5c589a0b94794fa2ce3c1ebfee`
[kubernetes-node-linux-s390x.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-node-linux-s390x.tar.gz) | `70445038b4db62c3fc99540f5ddbb881387018244242f182332b8eaa7159ce1aa8929145010ab2befd4e101d39c24c61e430928235434c7d7eb54f113860a83a`
[kubernetes-node-windows-amd64.tar.gz](https://dl.k8s.io/v1.13.1/kubernetes-node-windows-amd64.tar.gz) | `a87ad43f5a6b8f66d1bbd64f9c91e8bcbdf4adc8de0ec3cd559adaa8c14a6fe078ffdf090e52627c0522b79209fcc37bf822b323895dd47b18c20026cb25e9f5`
## Changelog since v1.13.0
### Other notable changes
* Fix overlapping filenames in diff if multiple resources have the same name. ([#71923](https://github.com/kubernetes/kubernetes/pull/71923), [@apelisse](https://github.com/apelisse))
* Disable proxy to loopback and linklocal ([#71980](https://github.com/kubernetes/kubernetes/pull/71980), [@micahhausler](https://github.com/micahhausler))
* kube-scheduler: restores ability to run without authentication configuration lookup permissions ([#71755](https://github.com/kubernetes/kubernetes/pull/71755), [@liggitt](https://github.com/liggitt))
* client-go: restores behavior of populating the BearerToken field in rest.Config objects constructed from kubeconfig files containing tokenFile config, or from in-cluster configuration. An additional BearerTokenFile field is now populated to enable constructed clients to periodically refresh tokens. ([#71713](https://github.com/kubernetes/kubernetes/pull/71713), [@liggitt](https://github.com/liggitt))
* apply: fix detection of non-dry-run enabled servers ([#71854](https://github.com/kubernetes/kubernetes/pull/71854), [@apelisse](https://github.com/apelisse))
* Scheduler only activates unschedulable pods if node's scheduling related properties change. ([#71551](https://github.com/kubernetes/kubernetes/pull/71551), [@mlmhl](https://github.com/mlmhl))
* Fixes pod deletion when cleaning old cronjobs ([#71802](https://github.com/kubernetes/kubernetes/pull/71802), [@soltysh](https://github.com/soltysh))
* fix issue: vm sku restriction policy does not work in azure disk attach/detach ([#71941](https://github.com/kubernetes/kubernetes/pull/71941), [@andyzhangx](https://github.com/andyzhangx))
* Include CRD for BGPConfigurations, needed for calico 2.x to 3.x upgrade. ([#71868](https://github.com/kubernetes/kubernetes/pull/71868), [@satyasm](https://github.com/satyasm))
* UDP connections now support graceful termination in IPVS mode ([#71515](https://github.com/kubernetes/kubernetes/pull/71515), [@lbernail](https://github.com/lbernail))
* kubeadm: use kubeconfig flag instead of kubeconfig-dir on init phase bootstrap-token ([#71803](https://github.com/kubernetes/kubernetes/pull/71803), [@yagonobre](https://github.com/yagonobre))
* On GCI, NPD starts to monitor kubelet, docker, containerd crashlooping, read-only filesystem and corrupt docker overlay2 issues. ([#71522](https://github.com/kubernetes/kubernetes/pull/71522), [@wangzhen127](https://github.com/wangzhen127))
* Fixes an issue where Portworx volumes cannot be mounted if 9001 port is already in use on the host and users remap 9001 to another port. ([#70392](https://github.com/kubernetes/kubernetes/pull/70392), [@harsh-px](https://github.com/harsh-px))
* Only use the first IP address got from instance metadata. This is because Azure CNI would set up a list of IP addresses in instance metadata, while only the first one is the Node's IP. ([#71736](https://github.com/kubernetes/kubernetes/pull/71736), [@feiskyer](https://github.com/feiskyer))
* kube-controller-manager: fixed issue display help for the deprecated insecure --port flag ([#71601](https://github.com/kubernetes/kubernetes/pull/71601), [@liggitt](https://github.com/liggitt))
* Update Cluster Autoscaler version in gce manifests to 1.13.1 (https://github.com/kubernetes/autoscaler/releases/tag/cluster-autoscaler-1.13.1) ([#71842](https://github.com/kubernetes/kubernetes/pull/71842), [@losipiuk](https://github.com/losipiuk))
* kubectl: fixes regression in --sort-by behavior ([#71805](https://github.com/kubernetes/kubernetes/pull/71805), [@liggitt](https://github.com/liggitt))
* Fixes apiserver nil pointer panics when requesting v2beta1 autoscaling object metrics ([#71744](https://github.com/kubernetes/kubernetes/pull/71744), [@yue9944882](https://github.com/yue9944882))
* Fix scheduling starvation of pods in cluster with large number of unschedulable pods. ([#71488](https://github.com/kubernetes/kubernetes/pull/71488), [@bsalamat](https://github.com/bsalamat))
# v1.13.0
[Documentation](https://docs.k8s.io)
@ -222,7 +303,7 @@ Before upgrading to Kubernetes 1.13, you must keep the following in mind:
### SIG API Machinery
For the 1.13 release, SIG API Machinery is happy to announce that the [dry-run functionality](https://kubernetes.io//docs/reference/using-api/api-concepts/#dry-run) is now beta.
For the 1.13 release, SIG API Machinery is happy to announce that the [dry-run functionality](https://kubernetes.io/docs/reference/using-api/api-concepts/#dry-run) is now beta.
### SIG Auth
@ -240,7 +321,7 @@ For detailed release notes on the three alpha features from SIG AWS, please refe
- [aws-alb-ingress-controller v1.0.0](https://github.com/kubernetes-sigs/aws-alb-ingress-controller/releases/tag/v1.0.0)
- [aws-ebs-csi-driver v0.1](https://github.com/kubernetes-sigs/aws-ebs-csi-driver/blob/master/CHANGELOG-0.1.md)
- [cloudprovider-aws external v0.1.0] (https://github.com/kubernetes/cloud-provider-aws/blob/master/changelogs/CHANGELOG-0.1.md)
- [cloudprovider-aws external v0.1.0](https://github.com/kubernetes/cloud-provider-aws/blob/master/changelogs/CHANGELOG-0.1.md)
### SIG Azure
@ -613,15 +694,6 @@ SIG Windows focused on improving reliability for Windows and Kubernetes support
- GLBC remains unchanged at v1.2.3 since Kubernetes 1.12 ([#66793](https://github.com/kubernetes/kubernetes/pull/66793))
- Ingress-gce remains unchanged at v1.2.3 since Kubernetes 1.12 ([#66793](https://github.com/kubernetes/kubernetes/pull/66793))
- ip-masq-agen remains unchanged at v2.1.1 since Kubernetes 1.12 ([#67916](https://github.com/kubernetes/kubernetes/pull/67916))
- [v1.13.0-rc.2](#v1130-rc2)
- [v1.13.0-rc.1](#v1130-rc1)
- [v1.13.0-beta.2](#v1130-beta2)
- [v1.13.0-beta.1](#v1130-beta1)
- [v1.13.0-alpha.3](#v1130-alpha3)
- [v1.13.0-alpha.2](#v1130-alpha2)
- [v1.13.0-alpha.1](#v1130-alpha1)
# v1.13.0-rc.2

View File

@ -412,7 +412,7 @@ func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clien
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 120*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}

2
vendor/k8s.io/kubernetes/deps.sh generated vendored
View File

@ -11,7 +11,7 @@ package=k8s.io/kubernetes
package=k8s.io/kubernetes/cmd/hyperkube
$(cat ./Godeps/Godeps.json | jq -r '(.Deps | .[] | "\(.ImportPath) \(.Comment) \(.Rev)\n")' | sed 's/null//' | awk '{print $1 " " $2}' | grep -Ev 'github.com/opencontainers/runc|bitbucket.org/ww/goautoneg|github.com/google/cadvisor' | sort -k2,1 | uniq -f1)
bitbucket.org/ww/goautoneg a547fc61f48d567d5b4ec6f8aee5573d8efce11d https://github.com/rancher/goautoneg.git
github.com/ibuildthecloud/kvsql 6bb3d252056655760ed8ca6557d6d5e607b361d2
github.com/ibuildthecloud/kvsql 8dfe3deb0646c4817567e4a53ed1dea41ea5668f
github.com/google/cadvisor 91dab6eb91496ed68acbef68b02b34b3392ca754 https://github.com/ibuildthecloud/cadvisor.git
github.com/opencontainers/runc 96ec2177ae841256168fcf76954f7177af9446eb
EOF

View File

@ -207,14 +207,18 @@ func (a *HorizontalController) processNextWorkItem() bool {
}
defer a.queue.Done(key)
err := a.reconcileKey(key.(string))
if err == nil {
// don't "forget" here because we want to only process a given HPA once per resync interval
return true
deleted, err := a.reconcileKey(key.(string))
if err != nil {
utilruntime.HandleError(err)
}
// Add request processing HPA after resync interval just in case last resync didn't insert
// request into the queue. Request is not inserted into queue by resync if previous one wasn't processed yet.
// This happens quite often because requests from previous resync are removed from the queue at the same moment
// as next resync inserts new requests.
if !deleted {
a.queue.AddRateLimited(key)
}
a.queue.AddRateLimited(key)
utilruntime.HandleError(err)
return true
}
@ -298,20 +302,20 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori
return replicas, metric, statuses, timestamp, nil
}
func (a *HorizontalController) reconcileKey(key string) error {
func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
return true, err
}
hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
delete(a.recommendations, key)
return nil
return true, nil
}
return a.reconcileAutoscaler(hpa, key)
return false, a.reconcileAutoscaler(hpa, key)
}
// computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType.

View File

@ -17,6 +17,7 @@ limitations under the License.
package versioned
import (
"encoding/base64"
"encoding/json"
"fmt"
@ -152,6 +153,7 @@ func handleDockerCfgJSONContent(username, password, email, server string) ([]byt
Username: username,
Password: password,
Email: email,
Auth: encodeDockerConfigFieldAuth(username, password),
}
dockerCfgJSON := DockerConfigJSON{
@ -161,6 +163,11 @@ func handleDockerCfgJSONContent(username, password, email, server string) ([]byt
return json.Marshal(dockerCfgJSON)
}
func encodeDockerConfigFieldAuth(username, password string) string {
fieldValue := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(fieldValue))
}
// DockerConfigJSON represents a local docker auth config file
// for pulling images.
type DockerConfigJSON struct {
@ -175,7 +182,8 @@ type DockerConfigJSON struct {
type DockerConfig map[string]DockerConfigEntry
type DockerConfigEntry struct {
Username string
Password string
Email string
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
Email string `json:"email,omitempty"`
Auth string `json:"auth,omitempty"`
}

View File

@ -165,6 +165,7 @@ go_test(
"kubelet_pods_windows_test.go",
"kubelet_resources_test.go",
"kubelet_test.go",
"kubelet_volumes_linux_test.go",
"kubelet_volumes_test.go",
"main_test.go",
"oom_watcher_test.go",

View File

@ -158,7 +158,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
return f, fmt.Errorf("%s - %v", localErr, err)
}
expectedCgroups := sets.NewString("cpu", "cpuacct", "cpuset", "memory")
expectedCgroups := sets.NewString("cpu", "cpuacct", "memory")
for _, mountPoint := range mountPoints {
if mountPoint.Type == cgroupMountType {
for _, opt := range mountPoint.Opts {

View File

@ -19,6 +19,7 @@ package config
const (
DefaultKubeletPodsDirName = "pods"
DefaultKubeletVolumesDirName = "volumes"
DefaultKubeletVolumeSubpathsDirName = "volume-subpaths"
DefaultKubeletVolumeDevicesDirName = "volumeDevices"
DefaultKubeletPluginsDirName = "plugins"
DefaultKubeletPluginsRegistrationDirName = "plugins_registry"

View File

@ -25,7 +25,7 @@ import (
cadvisorapiv1 "github.com/google/cadvisor/info/v1"
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -99,6 +99,13 @@ func (kl *Kubelet) getPodDir(podUID types.UID) string {
return filepath.Join(kl.getPodsDir(), string(podUID))
}
// getPodVolumesSubpathsDir returns the full path to the per-pod subpaths directory under
// which subpath volumes are created for the specified pod. This directory may not
// exist if the pod does not exist or subpaths are not specified.
func (kl *Kubelet) getPodVolumeSubpathsDir(podUID types.UID) string {
return filepath.Join(kl.getPodDir(podUID), config.DefaultKubeletVolumeSubpathsDirName)
}
// getPodVolumesDir returns the full path to the per-pod data directory under
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
@ -310,6 +317,19 @@ func (kl *Kubelet) getMountedVolumePathListFromDisk(podUID types.UID) ([]string,
return mountedVolumes, nil
}
// podVolumesSubpathsDirExists returns true if the pod volume-subpaths directory for
// a given pod exists
func (kl *Kubelet) podVolumeSubpathsDirExists(podUID types.UID) (bool, error) {
podVolDir := kl.getPodVolumeSubpathsDir(podUID)
if pathExists, pathErr := volumeutil.PathExists(podVolDir); pathErr != nil {
return true, fmt.Errorf("Error checking if path %q exists: %v", podVolDir, pathErr)
} else if !pathExists {
return false, nil
}
return true, nil
}
// GetVersionInfo returns information about the version of cAdvisor in use.
func (kl *Kubelet) GetVersionInfo() (*cadvisorapiv1.VersionInfo, error) {
return kl.cadvisor.VersionInfo()

View File

@ -19,7 +19,7 @@ package kubelet
import (
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
@ -114,6 +114,8 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
}
// If volumes have not been unmounted/detached, do not delete directory.
// Doing so may result in corruption of data.
// TODO: getMountedVolumePathListFromDisk() call may be redundant with
// kl.getPodVolumePathListFromDisk(). Can this be cleaned up?
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
klog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up", uid)
continue
@ -128,6 +130,18 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("Orphaned pod %q found, but volume paths are still present on disk", uid))
continue
}
// If there are any volume-subpaths, do not cleanup directories
volumeSubpathExists, err := kl.podVolumeSubpathsDirExists(uid)
if err != nil {
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("Orphaned pod %q found, but error %v occurred during reading of volume-subpaths dir from disk", uid, err))
continue
}
if volumeSubpathExists {
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("Orphaned pod %q found, but volume subpaths are still present on disk", uid))
continue
}
klog.V(3).Infof("Orphaned pod %q found, removing", uid)
if err := removeall.RemoveAllOneFilesystem(kl.mounter, kl.getPodDir(uid)); err != nil {
klog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err)

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/util:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/volume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -19,7 +19,11 @@ package stats
import (
"fmt"
"k8s.io/klog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/util"
)
type SummaryProvider interface {
@ -32,6 +36,11 @@ type SummaryProvider interface {
// summaryProviderImpl implements the SummaryProvider interface.
type summaryProviderImpl struct {
// kubeletCreationTime is the time at which the summaryProvider was created.
kubeletCreationTime metav1.Time
// systemBootTime is the time at which the system was started
systemBootTime metav1.Time
provider StatsProvider
}
@ -40,7 +49,18 @@ var _ SummaryProvider = &summaryProviderImpl{}
// NewSummaryProvider returns a SummaryProvider using the stats provided by the
// specified statsProvider.
func NewSummaryProvider(statsProvider StatsProvider) SummaryProvider {
return &summaryProviderImpl{statsProvider}
kubeletCreationTime := metav1.Now()
bootTime, err := util.GetBootTime()
if err != nil {
// bootTime will be zero if we encounter an error getting the boot time.
klog.Warningf("Error getting system boot time. Node metrics will have an incorrect start time: %v", err)
}
return &summaryProviderImpl{
kubeletCreationTime: kubeletCreationTime,
systemBootTime: metav1.NewTime(bootTime),
provider: statsProvider,
}
}
func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error) {
@ -77,7 +97,7 @@ func (sp *summaryProviderImpl) Get(updateStats bool) (*statsapi.Summary, error)
CPU: rootStats.CPU,
Memory: rootStats.Memory,
Network: networkStats,
StartTime: rootStats.StartTime,
StartTime: sp.systemBootTime,
Fs: rootFsStats,
Runtime: &statsapi.RuntimeStats{ImageFs: imageFsStats},
Rlimit: rlimit,

View File

@ -21,6 +21,7 @@ package stats
import (
"k8s.io/klog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
)
@ -29,11 +30,12 @@ func (sp *summaryProviderImpl) GetSystemContainersStats(nodeConfig cm.NodeConfig
systemContainers := map[string]struct {
name string
forceStatsUpdate bool
startTime metav1.Time
}{
statsapi.SystemContainerKubelet: {nodeConfig.KubeletCgroupsName, false},
statsapi.SystemContainerRuntime: {nodeConfig.RuntimeCgroupsName, false},
statsapi.SystemContainerMisc: {nodeConfig.SystemCgroupsName, false},
statsapi.SystemContainerPods: {sp.provider.GetPodCgroupRoot(), updateStats},
statsapi.SystemContainerKubelet: {name: nodeConfig.KubeletCgroupsName, forceStatsUpdate: false, startTime: sp.kubeletCreationTime},
statsapi.SystemContainerRuntime: {name: nodeConfig.RuntimeCgroupsName, forceStatsUpdate: false},
statsapi.SystemContainerMisc: {name: nodeConfig.SystemCgroupsName, forceStatsUpdate: false},
statsapi.SystemContainerPods: {name: sp.provider.GetPodCgroupRoot(), forceStatsUpdate: updateStats},
}
for sys, cont := range systemContainers {
// skip if cgroup name is undefined (not all system containers are required)
@ -48,6 +50,11 @@ func (sp *summaryProviderImpl) GetSystemContainersStats(nodeConfig cm.NodeConfig
// System containers don't have a filesystem associated with them.
s.Logs, s.Rootfs = nil, nil
s.Name = sys
// if we know the start time of a system container, use that instead of the start time provided by cAdvisor
if !cont.startTime.IsZero() {
s.StartTime = cont.startTime
}
stats = append(stats, *s)
}
@ -58,11 +65,12 @@ func (sp *summaryProviderImpl) GetSystemContainersCPUAndMemoryStats(nodeConfig c
systemContainers := map[string]struct {
name string
forceStatsUpdate bool
startTime metav1.Time
}{
statsapi.SystemContainerKubelet: {nodeConfig.KubeletCgroupsName, false},
statsapi.SystemContainerRuntime: {nodeConfig.RuntimeCgroupsName, false},
statsapi.SystemContainerMisc: {nodeConfig.SystemCgroupsName, false},
statsapi.SystemContainerPods: {sp.provider.GetPodCgroupRoot(), updateStats},
statsapi.SystemContainerKubelet: {name: nodeConfig.KubeletCgroupsName, forceStatsUpdate: false, startTime: sp.kubeletCreationTime},
statsapi.SystemContainerRuntime: {name: nodeConfig.RuntimeCgroupsName, forceStatsUpdate: false},
statsapi.SystemContainerMisc: {name: nodeConfig.SystemCgroupsName, forceStatsUpdate: false},
statsapi.SystemContainerPods: {name: sp.provider.GetPodCgroupRoot(), forceStatsUpdate: updateStats},
}
for sys, cont := range systemContainers {
// skip if cgroup name is undefined (not all system containers are required)
@ -75,6 +83,11 @@ func (sp *summaryProviderImpl) GetSystemContainersCPUAndMemoryStats(nodeConfig c
continue
}
s.Name = sys
// if we know the start time of a system container, use that instead of the start time provided by cAdvisor
if !cont.startTime.IsZero() {
s.StartTime = cont.startTime
}
stats = append(stats, *s)
}

View File

@ -34,6 +34,8 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"boottime_util_darwin.go",
"boottime_util_linux.go",
"doc.go",
"util.go",
"util_unix.go",

View File

@ -0,0 +1,44 @@
// +build darwin
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"syscall"
"time"
"unsafe"
"golang.org/x/sys/unix"
)
// GetBootTime returns the time at which the machine was started, truncated to the nearest second
func GetBootTime() (time.Time, error) {
output, err := unix.SysctlRaw("kern.boottime")
if err != nil {
return time.Time{}, err
}
var timeval syscall.Timeval
if len(output) != int(unsafe.Sizeof(timeval)) {
return time.Time{}, fmt.Errorf("unexpected output when calling syscall kern.bootime. Expected len(output) to be %v, but got %v",
int(unsafe.Sizeof(timeval)), len(output))
}
timeval = *(*syscall.Timeval)(unsafe.Pointer(&output[0]))
sec, nsec := timeval.Unix()
return time.Unix(sec, nsec).Truncate(time.Second), nil
}

View File

@ -0,0 +1,36 @@
// +build freebsd linux
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"time"
"golang.org/x/sys/unix"
)
// GetBootTime returns the time at which the machine was started, truncated to the nearest second
func GetBootTime() (time.Time, error) {
currentTime := time.Now()
var info unix.Sysinfo_t
if err := unix.Sysinfo(&info); err != nil {
return time.Time{}, fmt.Errorf("error getting system uptime: %s", err)
}
return currentTime.Add(-time.Duration(info.Uptime) * time.Second).Truncate(time.Second), nil
}

View File

@ -45,3 +45,8 @@ func UnlockPath(fileHandles []uintptr) {
func LocalEndpoint(path, file string) string {
return ""
}
// GetBootTime empty implementation
func GetBootTime() (time.Time, error) {
return time.Time{}, fmt.Errorf("GetBootTime is unsupported in this build")
}

View File

@ -23,6 +23,7 @@ import (
"net"
"net/url"
"strings"
"syscall"
"time"
"github.com/Microsoft/go-winio"
@ -112,3 +113,15 @@ func LocalEndpoint(path, file string) string {
}
return u.String() + "//./pipe/" + file
}
var tickCount = syscall.NewLazyDLL("kernel32.dll").NewProc("GetTickCount64")
// GetBootTime returns the time at which the machine was started, truncated to the nearest second
func GetBootTime() (time.Time, error) {
currentTime := time.Now()
output, _, err := tickCount.Call()
if errno, ok := err.(syscall.Errno); !ok || errno != 0 {
return time.Time{}, err
}
return currentTime.Add(-time.Duration(output) * time.Millisecond).Truncate(time.Second), nil
}

View File

@ -203,11 +203,12 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
var volumeName v1.UniqueVolumeName
// The unique volume name used depends on whether the volume is attachable
// The unique volume name used depends on whether the volume is attachable/device-mountable
// or not.
attachable := dsw.isAttachableVolume(volumeSpec)
if attachable {
// For attachable volumes, use the unique volume name as reported by
deviceMountable := dsw.isDeviceMountableVolume(volumeSpec)
if attachable || deviceMountable {
// For attachable/device-mountable volumes, use the unique volume name as reported by
// the plugin.
volumeName, err =
util.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
@ -219,13 +220,11 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
err)
}
} else {
// For non-attachable volumes, generate a unique name based on the pod
// For non-attachable and non-device-mountable volumes, generate a unique name based on the pod
// namespace and name and the name of the volume within the pod.
volumeName = util.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, volumeSpec)
volumeName = util.GetUniqueVolumeNameFromSpecWithPod(podName, volumePlugin, volumeSpec)
}
deviceMountable := dsw.isDeviceMountableVolume(volumeSpec)
if _, volumeExists := dsw.volumesToMount[volumeName]; !volumeExists {
dsw.volumesToMount[volumeName] = volumeToMount{
volumeName: volumeName,

View File

@ -439,6 +439,10 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
if err != nil {
return nil, err
}
deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginByName(volume.pluginName)
if err != nil {
return nil, err
}
// Create pod object
pod := &v1.Pod{
@ -464,13 +468,13 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
}
var uniqueVolumeName v1.UniqueVolumeName
if attachablePlugin != nil {
if attachablePlugin != nil || deviceMountablePlugin != nil {
uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
return nil, err
}
} else {
uniqueVolumeName = util.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec)
uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
}
// Check existence of mount point for filesystem volume or symbolic link for block volume
isExist, checkErr := rc.operationExecutor.CheckVolumeExistenceOperation(volumeSpec, volume.mountPath, volumeSpec.Name(), rc.mounter, uniqueVolumeName, volume.podName, pod.UID, attachablePlugin)

View File

@ -168,6 +168,7 @@ func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, e
// For UDP, ActiveConn is always 0
// For TCP, InactiveConn are connections not in ESTABLISHED state
if rs.ActiveConn+rs.InactiveConn != 0 {
klog.Infof("Not deleting, RS %v: %v ActiveConn, %v InactiveConn", rsToDelete.String(), rs.ActiveConn, rs.InactiveConn)
return false, nil
}
klog.Infof("Deleting rs: %s", rsToDelete.String())

View File

@ -162,6 +162,8 @@ const sysctlRouteLocalnet = "net/ipv4/conf/all/route_localnet"
const sysctlBridgeCallIPTables = "net/bridge/bridge-nf-call-iptables"
const sysctlVSConnTrack = "net/ipv4/vs/conntrack"
const sysctlConnReuse = "net/ipv4/vs/conn_reuse_mode"
const sysctlExpireNoDestConn = "net/ipv4/vs/expire_nodest_conn"
const sysctlExpireQuiescentTemplate = "net/ipv4/vs/expire_quiescent_template"
const sysctlForward = "net/ipv4/ip_forward"
const sysctlArpIgnore = "net/ipv4/conf/all/arp_ignore"
const sysctlArpAnnounce = "net/ipv4/conf/all/arp_announce"
@ -321,6 +323,20 @@ func NewProxier(ipt utiliptables.Interface,
}
}
// Set the expire_nodest_conn sysctl we need for
if val, _ := sysctl.GetSysctl(sysctlExpireNoDestConn); val != 1 {
if err := sysctl.SetSysctl(sysctlExpireNoDestConn, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireNoDestConn, err)
}
}
// Set the expire_quiescent_template sysctl we need for
if val, _ := sysctl.GetSysctl(sysctlExpireQuiescentTemplate); val != 1 {
if err := sysctl.SetSysctl(sysctlExpireQuiescentTemplate, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireQuiescentTemplate, err)
}
}
// Set the ip_forward sysctl we need for
if val, _ := sysctl.GetSysctl(sysctlForward); val != 1 {
if err := sysctl.SetSysctl(sysctlForward, 1); err != nil {
@ -1190,7 +1206,15 @@ func (proxier *Proxier) syncProxyRules() {
}
proxier.portsMap = replacementPortsMap
// Clean up legacy IPVS services
// Get legacy bind address
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to get bind address, err: %v", err)
}
legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
// Clean up legacy IPVS services and unbind addresses
appliedSvcs, err := proxier.ipvs.GetVirtualServers()
if err == nil {
for _, appliedSvc := range appliedSvcs {
@ -1199,15 +1223,7 @@ func (proxier *Proxier) syncProxyRules() {
} else {
klog.Errorf("Failed to get ipvs service, err: %v", err)
}
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
// Clean up legacy bind address
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
if err != nil {
klog.Errorf("Failed to get bind address, err: %v", err)
}
proxier.cleanLegacyBindAddr(activeBindAddrs, currentBindAddrs)
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
// Update healthz timestamp
if proxier.healthzServer != nil {
@ -1605,29 +1621,38 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
if err != nil {
klog.Errorf("Failed to delete destination: %v, error: %v", delDest, err)
klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
continue
}
}
return nil
}
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
for cs := range currentServices {
svc := currentServices[cs]
if _, ok := activeServices[cs]; !ok {
// This service was not processed in the latest sync loop so before deleting it,
// make sure it does not fall within an excluded CIDR range.
okayToDelete := true
rsList, _ := proxier.ipvs.GetRealServers(svc)
// If we still have real servers graceful termination is not done
if len(rsList) > 0 {
okayToDelete = false
}
// Applying graceful termination to all real servers
for _, rs := range rsList {
uniqueRS := GetUniqueRSName(svc, rs)
// if there are in terminating real server in this service, then handle it later
// If RS is already in the graceful termination list, no need to add it again
if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
okayToDelete = false
break
continue
}
klog.V(5).Infof("Using graceful delete to delete: %v", uniqueRS)
if err := proxier.gracefuldeleteManager.GracefulDeleteRS(svc, rs); err != nil {
klog.Errorf("Failed to delete destination: %v, error: %v", uniqueRS, err)
}
}
// make sure it does not fall within an excluded CIDR range.
for _, excludedCIDR := range proxier.excludeCIDRs {
// Any validation of this CIDR already should have occurred.
_, n, _ := net.ParseCIDR(excludedCIDR)
@ -1637,26 +1662,33 @@ func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, curre
}
}
if okayToDelete {
klog.V(4).Infof("Delete service %s", svc.String())
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
klog.Errorf("Failed to delete service, error: %v", err)
klog.Errorf("Failed to delete service %s, error: %v", svc.String(), err)
}
addr := svc.Address.String()
if _, ok := legacyBindAddrs[addr]; ok {
klog.V(4).Infof("Unbinding address %s", addr)
if err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice); err != nil {
klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
} else {
// In case we delete a multi-port service, avoid trying to unbind multiple times
delete(legacyBindAddrs, addr)
}
}
}
}
}
}
func (proxier *Proxier) cleanLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) {
func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool {
legacyAddrs := make(map[string]bool)
for _, addr := range currentBindAddrs {
if _, ok := activeBindAddrs[addr]; !ok {
// This address was not processed in the latest sync loop
klog.V(4).Infof("Unbind addr %s", addr)
err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice)
// Ignore no such address error when try to unbind address
if err != nil {
klog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
}
legacyAddrs[addr] = true
}
}
return legacyAddrs
}
// Join all words with spaces, terminate with newline and write to buff.

View File

@ -89,7 +89,7 @@ func AddSystemPriorityClasses() genericapiserver.PostStartHookFunc {
} else {
// Unable to get the priority class for reasons other than "not found".
klog.Warningf("unable to get PriorityClass %v: %v. Retrying...", pc.Name, err)
return false, err
return false, nil
}
}
}

View File

@ -17,7 +17,6 @@ go_library(
deps = [
"//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/internal/cache:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",

View File

@ -23,7 +23,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
)
// NodeFieldSelectorKeys is a map that: the key are node field selector keys; the values are
@ -92,6 +91,9 @@ type NodeLister interface {
List() ([]*v1.Node, error)
}
// PodFilter is a function to filter a pod. If pod passed return true else return false.
type PodFilter func(*v1.Pod) bool
// PodLister interface represents anything that can list pods for a scheduler.
type PodLister interface {
// We explicitly return []*v1.Pod, instead of v1.PodList, to avoid
@ -99,7 +101,7 @@ type PodLister interface {
List(labels.Selector) ([]*v1.Pod, error)
// This is similar to "List()", but the returned slice does not
// contain pods that don't pass `podFilter`.
FilteredList(podFilter schedulerinternalcache.PodFilter, selector labels.Selector) ([]*v1.Pod, error)
FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error)
}
// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.

View File

@ -337,7 +337,7 @@ func (g *genericScheduler) processPreemptionWithExtenders(
// worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node.
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
pods := g.schedulingQueue.WaitingPodsForNode(nodeName)
pods := g.schedulingQueue.NominatedPodsForNode(nodeName)
if len(pods) == 0 {
return nil
@ -475,7 +475,7 @@ func addNominatedPods(pod *v1.Pod, meta algorithm.PredicateMetadata,
// This may happen only in tests.
return false, meta, nodeInfo
}
nominatedPods := queue.WaitingPodsForNode(nodeInfo.Node().Name)
nominatedPods := queue.NominatedPodsForNode(nodeInfo.Node().Name)
if nominatedPods == nil || len(nominatedPods) == 0 {
return false, meta, nodeInfo
}

View File

@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/util/node:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/klog"
@ -143,7 +144,7 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
return cache.FilteredList(alwaysTrue, selector)
}
func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
func (cache *schedulerCache) FilteredList(podFilter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We

View File

@ -19,12 +19,10 @@ package cache
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)
// PodFilter is a function to filter a pod. If pod passed return true else return false.
type PodFilter func(*v1.Pod) bool
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It does incremental updates based on pod events.
@ -106,7 +104,7 @@ type Cache interface {
List(labels.Selector) ([]*v1.Pod, error)
// FilteredList returns all cached pods that pass the filter.
FilteredList(filter PodFilter, selector labels.Selector) ([]*v1.Pod, error)
FilteredList(filter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error)
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot

View File

@ -12,6 +12,7 @@ go_library(
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -36,6 +36,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
@ -62,11 +63,14 @@ type SchedulingQueue interface {
MoveAllToActiveQueue()
AssignedPodAdded(pod *v1.Pod)
AssignedPodUpdated(pod *v1.Pod)
WaitingPodsForNode(nodeName string) []*v1.Pod
NominatedPodsForNode(nodeName string) []*v1.Pod
WaitingPods() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
// UpdateNominatedPodForNode adds the given pod to the nominated pod map or
// updates it if it already exists.
UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
DeleteNominatedPodIfExists(pod *v1.Pod)
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
@ -150,9 +154,9 @@ func (f *FIFO) AssignedPodUpdated(pod *v1.Pod) {}
// MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue.
func (f *FIFO) MoveAllToActiveQueue() {}
// WaitingPodsForNode returns pods that are nominated to run on the given node,
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but FIFO does not support it.
func (f *FIFO) WaitingPodsForNode(nodeName string) []*v1.Pod {
func (f *FIFO) NominatedPodsForNode(nodeName string) []*v1.Pod {
return nil
}
@ -164,6 +168,9 @@ func (f *FIFO) Close() {
// DeleteNominatedPodIfExists does nothing in FIFO.
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}
// UpdateNominatedPodForNode does nothing in FIFO.
func (f *FIFO) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {}
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
func (f *FIFO) NumUnschedulablePods() int {
return 0
@ -194,10 +201,9 @@ type PriorityQueue struct {
activeQ *Heap
// unschedulableQ holds pods that have been tried and determined unschedulable.
unschedulableQ *UnschedulablePodsMap
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulableQ.
nominatedPods map[string][]*v1.Pod
// nominatedPods is a structures that stores pods which are nominated to run
// on nodes.
nominatedPods *nominatedPodMap
// receivedMoveRequest is set to true whenever we receive a request to move a
// pod from the unschedulableQ to the activeQ, and is set to false, when we pop
// a pod from the activeQ. It indicates if we received a move request when a
@ -239,52 +245,12 @@ func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{
activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: map[string][]*v1.Pod{},
nominatedPods: newNominatedPodMap(),
}
pq.cond.L = &pq.lock
return pq
}
// addNominatedPodIfNeeded adds a pod to nominatedPods if it has a NominatedNodeName and it does not
// already exist in the map. Adding an existing pod is not going to update the pod.
func (p *PriorityQueue) addNominatedPodIfNeeded(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
for _, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", pod.Namespace, pod.Name)
return
}
}
p.nominatedPods[nnn] = append(p.nominatedPods[nnn], pod)
}
}
// deleteNominatedPodIfExists deletes a pod from the nominatedPods.
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) deleteNominatedPodIfExists(pod *v1.Pod) {
nnn := NominatedNodeName(pod)
if len(nnn) > 0 {
for i, np := range p.nominatedPods[nnn] {
if np.UID == pod.UID {
p.nominatedPods[nnn] = append(p.nominatedPods[nnn][:i], p.nominatedPods[nnn][i+1:]...)
if len(p.nominatedPods[nnn]) == 0 {
delete(p.nominatedPods, nnn)
}
break
}
}
}
}
// updateNominatedPod updates a pod in the nominatedPods.
func (p *PriorityQueue) updateNominatedPod(oldPod, newPod *v1.Pod) {
// Even if the nominated node name of the Pod is not changed, we must delete and add it again
// to ensure that its pointer is updated.
p.deleteNominatedPodIfExists(oldPod)
p.addNominatedPodIfNeeded(newPod)
}
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in either queue.
func (p *PriorityQueue) Add(pod *v1.Pod) error {
@ -296,10 +262,9 @@ func (p *PriorityQueue) Add(pod *v1.Pod) error {
} else {
if p.unschedulableQ.get(pod) != nil {
klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
p.deleteNominatedPodIfExists(pod)
p.unschedulableQ.delete(pod)
}
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
@ -320,7 +285,7 @@ func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error {
if err != nil {
klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
} else {
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
@ -345,12 +310,12 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod) error {
}
if !p.receivedMoveRequest && isPodUnschedulable(pod) {
p.unschedulableQ.addOrUpdate(pod)
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
return nil
}
err := p.activeQ.Add(pod)
if err == nil {
p.addNominatedPodIfNeeded(pod)
p.nominatedPods.add(pod, "")
p.cond.Broadcast()
}
return err
@ -401,13 +366,13 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
defer p.lock.Unlock()
// If the pod is already in the active queue, just update it there.
if _, exists, _ := p.activeQ.Get(newPod); exists {
p.updateNominatedPod(oldPod, newPod)
p.nominatedPods.update(oldPod, newPod)
err := p.activeQ.Update(newPod)
return err
}
// If the pod is in the unschedulable queue, updating it may make it schedulable.
if usPod := p.unschedulableQ.get(newPod); usPod != nil {
p.updateNominatedPod(oldPod, newPod)
p.nominatedPods.update(oldPod, newPod)
if isPodUpdated(oldPod, newPod) {
p.unschedulableQ.delete(usPod)
err := p.activeQ.Add(newPod)
@ -422,7 +387,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
// If pod is not in any of the two queue, we put it in the active queue.
err := p.activeQ.Add(newPod)
if err == nil {
p.addNominatedPodIfNeeded(newPod)
p.nominatedPods.add(newPod, "")
p.cond.Broadcast()
}
return err
@ -433,7 +398,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.deleteNominatedPodIfExists(pod)
p.nominatedPods.delete(pod)
err := p.activeQ.Delete(pod)
if err != nil { // The item was probably not found in the activeQ.
p.unschedulableQ.delete(pod)
@ -516,16 +481,13 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod
return podsToMove
}
// WaitingPodsForNode returns pods that are nominated to run on the given node,
// NominatedPodsForNode returns pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node before they
// can be actually scheduled.
func (p *PriorityQueue) WaitingPodsForNode(nodeName string) []*v1.Pod {
func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod {
p.lock.RLock()
defer p.lock.RUnlock()
if list, ok := p.nominatedPods[nodeName]; ok {
return list
}
return nil
return p.nominatedPods.podsForNode(nodeName)
}
// WaitingPods returns all the waiting pods in the queue.
@ -551,10 +513,20 @@ func (p *PriorityQueue) Close() {
p.cond.Broadcast()
}
// DeleteNominatedPodIfExists deletes pod from internal cache if it's a nominatedPod
// DeleteNominatedPodIfExists deletes pod nominatedPods.
func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod) {
p.lock.Lock()
p.deleteNominatedPodIfExists(pod)
p.nominatedPods.delete(pod)
p.lock.Unlock()
}
// UpdateNominatedPodForNode adds a pod to the nominated pods of the given node.
// This is called during the preemption process after a node is nominated to run
// the pod. We update the structure before sending a request to update the pod
// object to avoid races with the following scheduling cycles.
func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string) {
p.lock.Lock()
p.nominatedPods.add(pod, nodeName)
p.lock.Unlock()
}
@ -802,3 +774,77 @@ func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
},
}
}
// nominatedPodMap is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
type nominatedPodMap struct {
// nominatedPods is a map keyed by a node name and the value is a list of
// pods which are nominated to run on the node. These are pods which can be in
// the activeQ or unschedulableQ.
nominatedPods map[string][]*v1.Pod
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is
// nominated.
nominatedPodToNode map[ktypes.UID]string
}
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
// always delete the pod if it already exist, to ensure we never store more than
// one instance of the pod.
npm.delete(p)
nnn := nodeName
if len(nnn) == 0 {
nnn = NominatedNodeName(p)
if len(nnn) == 0 {
return
}
}
npm.nominatedPodToNode[p.UID] = nnn
for _, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}
func (npm *nominatedPodMap) delete(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
}
break
}
}
delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPod, "")
}
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
if list, ok := npm.nominatedPods[nodeName]; ok {
return list
}
return nil
}
func newNominatedPodMap() *nominatedPodMap {
return &nominatedPodMap{
nominatedPods: make(map[string][]*v1.Pod),
nominatedPodToNode: make(map[ktypes.UID]string),
}
}

View File

@ -316,11 +316,19 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
var nodeName = ""
if node != nil {
nodeName = node.Name
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// Make a call to update nominated node name of the pod on the API server.
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)

View File

@ -23,6 +23,7 @@ import (
"fmt"
"net"
"strings"
"sync"
"syscall"
libipvs "github.com/docker/libnetwork/ipvs"
@ -34,6 +35,7 @@ import (
type runner struct {
exec utilexec.Interface
ipvsHandle *libipvs.Handle
mu sync.Mutex // Protect Netlink calls
}
// Protocol is the IPVS service protocol type
@ -58,6 +60,8 @@ func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
if err != nil {
return err
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.NewService(svc)
}
@ -67,6 +71,8 @@ func (runner *runner) UpdateVirtualServer(vs *VirtualServer) error {
if err != nil {
return err
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.UpdateService(svc)
}
@ -76,6 +82,8 @@ func (runner *runner) DeleteVirtualServer(vs *VirtualServer) error {
if err != nil {
return err
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.DelService(svc)
}
@ -85,7 +93,10 @@ func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error
if err != nil {
return nil, err
}
runner.mu.Lock()
ipvsSvc, err := runner.ipvsHandle.GetService(svc)
runner.mu.Unlock()
if err != nil {
return nil, err
}
@ -98,7 +109,9 @@ func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error
// GetVirtualServers is part of ipvs.Interface.
func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
runner.mu.Lock()
ipvsSvcs, err := runner.ipvsHandle.GetServices()
runner.mu.Unlock()
if err != nil {
return nil, err
}
@ -115,6 +128,8 @@ func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
// Flush is part of ipvs.Interface. Currently we delete IPVS services one by one
func (runner *runner) Flush() error {
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.Flush()
}
@ -128,6 +143,8 @@ func (runner *runner) AddRealServer(vs *VirtualServer, rs *RealServer) error {
if err != nil {
return err
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.NewDestination(svc, dst)
}
@ -141,6 +158,8 @@ func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error
if err != nil {
return err
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.DelDestination(svc, dst)
}
@ -153,6 +172,8 @@ func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error
if err != nil {
return err
}
runner.mu.Lock()
defer runner.mu.Unlock()
return runner.ipvsHandle.UpdateDestination(svc, dst)
}
@ -162,7 +183,9 @@ func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) {
if err != nil {
return nil, err
}
runner.mu.Lock()
dsts, err := runner.ipvsHandle.GetDestinations(svc)
runner.mu.Unlock()
if err != nil {
return nil, err
}

View File

@ -9,6 +9,7 @@ go_library(
"exec_mount_unsupported.go",
"fake.go",
"mount.go",
"mount_helper.go",
"mount_linux.go",
"mount_unsupported.go",
"mount_windows.go",
@ -67,6 +68,7 @@ go_test(
name = "go_default_test",
srcs = [
"exec_mount_test.go",
"mount_helper_test.go",
"mount_linux_test.go",
"mount_test.go",
"mount_windows_test.go",

View File

@ -30,6 +30,8 @@ type FakeMounter struct {
MountPoints []MountPoint
Log []FakeAction
Filesystem map[string]FileType
// Error to return for a path when calling IsLikelyNotMountPoint
MountCheckErrors map[string]error
// Some tests run things in parallel, make sure the mounter does not produce
// any golang's DATA RACE warnings.
mutex sync.Mutex
@ -119,6 +121,7 @@ func (f *FakeMounter) Unmount(target string) error {
}
f.MountPoints = newMountpoints
f.Log = append(f.Log, FakeAction{Action: FakeActionUnmount, Target: absTarget})
delete(f.MountCheckErrors, target)
return nil
}
@ -141,7 +144,12 @@ func (f *FakeMounter) IsLikelyNotMountPoint(file string) (bool, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
_, err := os.Stat(file)
err := f.MountCheckErrors[file]
if err != nil {
return false, err
}
_, err = os.Stat(file)
if err != nil {
return true, err
}

124
vendor/k8s.io/kubernetes/pkg/util/mount/mount_helper.go generated vendored Normal file
View File

@ -0,0 +1,124 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mount
import (
"fmt"
"os"
"syscall"
"k8s.io/klog"
)
// CleanupMountPoint unmounts the given path and
// deletes the remaining directory if successful.
// if extensiveMountPointCheck is true
// IsNotMountPoint will be called instead of IsLikelyNotMountPoint.
// IsNotMountPoint is more expensive but properly handles bind mounts within the same fs.
func CleanupMountPoint(mountPath string, mounter Interface, extensiveMountPointCheck bool) error {
// mounter.ExistsPath cannot be used because for containerized kubelet, we need to check
// the path in the kubelet container, not on the host.
pathExists, pathErr := PathExists(mountPath)
if !pathExists {
klog.Warningf("Warning: Unmount skipped because path does not exist: %v", mountPath)
return nil
}
corruptedMnt := IsCorruptedMnt(pathErr)
if pathErr != nil && !corruptedMnt {
return fmt.Errorf("Error checking path: %v", pathErr)
}
return doCleanupMountPoint(mountPath, mounter, extensiveMountPointCheck, corruptedMnt)
}
// doCleanupMountPoint unmounts the given path and
// deletes the remaining directory if successful.
// if extensiveMountPointCheck is true
// IsNotMountPoint will be called instead of IsLikelyNotMountPoint.
// IsNotMountPoint is more expensive but properly handles bind mounts within the same fs.
// if corruptedMnt is true, it means that the mountPath is a corrupted mountpoint, and the mount point check
// will be skipped
func doCleanupMountPoint(mountPath string, mounter Interface, extensiveMountPointCheck bool, corruptedMnt bool) error {
if !corruptedMnt {
var notMnt bool
var err error
if extensiveMountPointCheck {
notMnt, err = IsNotMountPoint(mounter, mountPath)
} else {
notMnt, err = mounter.IsLikelyNotMountPoint(mountPath)
}
if err != nil {
return err
}
if notMnt {
klog.Warningf("Warning: %q is not a mountpoint, deleting", mountPath)
return os.Remove(mountPath)
}
}
// Unmount the mount path
klog.V(4).Infof("%q is a mountpoint, unmounting", mountPath)
if err := mounter.Unmount(mountPath); err != nil {
return err
}
notMnt, mntErr := mounter.IsLikelyNotMountPoint(mountPath)
if mntErr != nil {
return mntErr
}
if notMnt {
klog.V(4).Infof("%q is unmounted, deleting the directory", mountPath)
return os.Remove(mountPath)
}
return fmt.Errorf("Failed to unmount path %v", mountPath)
}
// TODO: clean this up to use pkg/util/file/FileExists
// PathExists returns true if the specified path exists.
func PathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
} else if os.IsNotExist(err) {
return false, nil
} else if IsCorruptedMnt(err) {
return true, err
} else {
return false, err
}
}
// IsCorruptedMnt return true if err is about corrupted mount point
func IsCorruptedMnt(err error) bool {
if err == nil {
return false
}
var underlyingError error
switch pe := err.(type) {
case nil:
return false
case *os.PathError:
underlyingError = pe.Err
case *os.LinkError:
underlyingError = pe.Err
case *os.SyscallError:
underlyingError = pe.Err
}
return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO
}

View File

@ -55,6 +55,7 @@ const (
fsckErrorsUncorrected = 4
// place for subpath mounts
// TODO: pass in directory using kubelet_getters instead
containerSubPathDirectoryName = "volume-subpaths"
// syscall.Openat flags used to traverse directories not following symlinks
nofollowFlags = unix.O_RDONLY | unix.O_NOFOLLOW
@ -890,15 +891,22 @@ func doCleanSubPaths(mounter Interface, podDir string, volumeName string) error
// scan /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/<container name>/*
fullContainerDirPath := filepath.Join(subPathDir, containerDir.Name())
subPaths, err := ioutil.ReadDir(fullContainerDirPath)
if err != nil {
return fmt.Errorf("error reading %s: %s", fullContainerDirPath, err)
}
for _, subPath := range subPaths {
if err = doCleanSubPath(mounter, fullContainerDirPath, subPath.Name()); err != nil {
err = filepath.Walk(fullContainerDirPath, func(path string, info os.FileInfo, err error) error {
if path == fullContainerDirPath {
// Skip top level directory
return nil
}
// pass through errors and let doCleanSubPath handle them
if err = doCleanSubPath(mounter, fullContainerDirPath, filepath.Base(path)); err != nil {
return err
}
return nil
})
if err != nil {
return fmt.Errorf("error processing %s: %s", fullContainerDirPath, err)
}
// Whole container has been processed, remove its directory.
if err := os.Remove(fullContainerDirPath); err != nil {
return fmt.Errorf("error deleting %s: %s", fullContainerDirPath, err)
@ -925,22 +933,12 @@ func doCleanSubPath(mounter Interface, fullContainerDirPath, subPathIndex string
// process /var/lib/kubelet/pods/<uid>/volume-subpaths/<volume>/<container name>/<subPathName>
klog.V(4).Infof("Cleaning up subpath mounts for subpath %v", subPathIndex)
fullSubPath := filepath.Join(fullContainerDirPath, subPathIndex)
notMnt, err := IsNotMountPoint(mounter, fullSubPath)
if err != nil {
return fmt.Errorf("error checking %s for mount: %s", fullSubPath, err)
if err := CleanupMountPoint(fullSubPath, mounter, true); err != nil {
return fmt.Errorf("error cleaning subpath mount %s: %s", fullSubPath, err)
}
// Unmount it
if !notMnt {
if err = mounter.Unmount(fullSubPath); err != nil {
return fmt.Errorf("error unmounting %s: %s", fullSubPath, err)
}
klog.V(5).Infof("Unmounted %s", fullSubPath)
}
// Remove it *non*-recursively, just in case there were some hiccups.
if err = os.Remove(fullSubPath); err != nil {
return fmt.Errorf("error deleting %s: %s", fullSubPath, err)
}
klog.V(5).Infof("Removed %s", fullSubPath)
klog.V(4).Infof("Successfully cleaned subpath directory %s", fullSubPath)
return nil
}

View File

@ -3,8 +3,8 @@ package version
var (
gitMajor = "1"
gitMinor = "12"
gitVersion = "v1.13.1-k3s2"
gitCommit = "2054ab40be790e5f209e480b18ab8a7015262796"
gitVersion = "v1.13.2-k3s2"
gitCommit = "149316edfe67e696f622608282a3fc4252ad92b3"
gitTreeState = "clean"
buildDate = "2019-01-09T20:07+00:00Z"
buildDate = "2019-01-22T20:48+00:00Z"
)

View File

@ -63,7 +63,6 @@ go_test(
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/slice:go_default_library",
"//pkg/volume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -73,8 +72,12 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
],
] + select({
"@io_bazel_rules_go//go/platform:linux": [
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
],
"//conditions:default": [],
}),
)
filegroup(

View File

@ -725,7 +725,7 @@ func (oe *operationExecutor) MountVolume(
if fsVolume {
// Filesystem volume case
// Mount/remount a volume when a volume is attached
generatedOperations, err = oe.operationGenerator.GenerateMountVolumeFunc(
generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
} else {

View File

@ -40,6 +40,10 @@ import (
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
)
const (
unknownVolumePlugin string = "UnknownVolumePlugin"
)
var _ OperationGenerator = &operationGenerator{}
type operationGenerator struct {
@ -82,7 +86,7 @@ func NewOperationGenerator(kubeClient clientset.Interface,
// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
type OperationGenerator interface {
// Generates the MountVolume function needed to perform the mount of a volume plugin
GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) (volumetypes.GeneratedOperations, error)
GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations
// Generates the UnmountVolume function needed to perform the unmount of a volume plugin
GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error)
@ -436,61 +440,61 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
waitForAttachTimeout time.Duration,
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
isRemount bool) (volumetypes.GeneratedOperations, error) {
isRemount bool) volumetypes.GeneratedOperations {
// Get mounter plugin
volumePluginName := unknownVolumePlugin
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err)
}
affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin)
if affinityErr != nil {
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error())
return volumetypes.GeneratedOperations{}, detailedErr
}
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod,
volume.VolumeOptions{})
if newMounterErr != nil {
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error())
return volumetypes.GeneratedOperations{}, detailedErr
}
mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin)
if mountCheckError != nil {
eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.UnsupportedMountOption, eventErr.Error())
return volumetypes.GeneratedOperations{}, detailedErr
}
// Get attacher, if possible
attachableVolumePlugin, _ :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
var volumeAttacher volume.Attacher
if attachableVolumePlugin != nil {
volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
}
// get deviceMounter, if possible
deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
var volumeDeviceMounter volume.DeviceMounter
if deviceMountableVolumePlugin != nil {
volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
}
var fsGroup *int64
if volumeToMount.Pod.Spec.SecurityContext != nil &&
volumeToMount.Pod.Spec.SecurityContext.FSGroup != nil {
fsGroup = volumeToMount.Pod.Spec.SecurityContext.FSGroup
if err == nil && volumePlugin != nil {
volumePluginName = volumePlugin.GetPluginName()
}
mountVolumeFunc := func() (error, error) {
if err != nil || volumePlugin == nil {
return volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err)
}
affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin)
if affinityErr != nil {
return volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
}
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod,
volume.VolumeOptions{})
if newMounterErr != nil {
return volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr)
}
mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin)
if mountCheckError != nil {
return volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError)
}
// Get attacher, if possible
attachableVolumePlugin, _ :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
var volumeAttacher volume.Attacher
if attachableVolumePlugin != nil {
volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
}
// get deviceMounter, if possible
deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
var volumeDeviceMounter volume.DeviceMounter
if deviceMountableVolumePlugin != nil {
volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
}
var fsGroup *int64
if volumeToMount.Pod.Spec.SecurityContext != nil &&
volumeToMount.Pod.Spec.SecurityContext.FSGroup != nil {
fsGroup = volumeToMount.Pod.Spec.SecurityContext.FSGroup
}
devicePath := volumeToMount.DevicePath
if volumeAttacher != nil {
// Wait for attachable volumes to finish attaching
@ -536,7 +540,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
// resizeFileSystem will resize the file system if user has requested a resize of
// underlying persistent volume and is allowed to do so.
resizeSimpleError, resizeDetailedError := og.resizeFileSystem(volumeToMount, devicePath, deviceMountPath, volumePlugin.GetPluginName())
resizeSimpleError, resizeDetailedError := og.resizeFileSystem(volumeToMount, devicePath, deviceMountPath, volumePluginName)
if resizeSimpleError != nil || resizeDetailedError != nil {
return resizeSimpleError, resizeDetailedError
@ -593,8 +597,8 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
return volumetypes.GeneratedOperations{
OperationFunc: mountVolumeFunc,
EventRecorderFunc: eventRecorderFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "volume_mount"),
}, nil
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"),
}
}
func (og *operationGenerator) resizeFileSystem(volumeToMount VolumeToMount, devicePath, deviceMountPath, pluginName string) (simpleErr, detailedErr error) {

View File

@ -23,9 +23,8 @@ import (
"path"
"path/filepath"
"strings"
"syscall"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
@ -128,8 +127,9 @@ func SetReady(dir string) {
// UnmountPath is a common unmount routine that unmounts the given path and
// deletes the remaining directory if successful.
// TODO: Remove this function and change callers to call mount pkg directly
func UnmountPath(mountPath string, mounter mount.Interface) error {
return UnmountMountPoint(mountPath, mounter, false /* extensiveMountPointCheck */)
return mount.CleanupMountPoint(mountPath, mounter, false /* extensiveMountPointCheck */)
}
// UnmountMountPoint is a common unmount routine that unmounts the given path and
@ -137,93 +137,21 @@ func UnmountPath(mountPath string, mounter mount.Interface) error {
// if extensiveMountPointCheck is true
// IsNotMountPoint will be called instead of IsLikelyNotMountPoint.
// IsNotMountPoint is more expensive but properly handles bind mounts.
// TODO: Change callers to call mount pkg directly
func UnmountMountPoint(mountPath string, mounter mount.Interface, extensiveMountPointCheck bool) error {
pathExists, pathErr := PathExists(mountPath)
if !pathExists {
klog.Warningf("Warning: Unmount skipped because path does not exist: %v", mountPath)
return nil
}
corruptedMnt := IsCorruptedMnt(pathErr)
if pathErr != nil && !corruptedMnt {
return fmt.Errorf("Error checking path: %v", pathErr)
}
return doUnmountMountPoint(mountPath, mounter, extensiveMountPointCheck, corruptedMnt)
}
// doUnmountMountPoint is a common unmount routine that unmounts the given path and
// deletes the remaining directory if successful.
// if extensiveMountPointCheck is true
// IsNotMountPoint will be called instead of IsLikelyNotMountPoint.
// IsNotMountPoint is more expensive but properly handles bind mounts.
// if corruptedMnt is true, it means that the mountPath is a corrupted mountpoint, Take it as an argument for convenience of testing
func doUnmountMountPoint(mountPath string, mounter mount.Interface, extensiveMountPointCheck bool, corruptedMnt bool) error {
if !corruptedMnt {
var notMnt bool
var err error
if extensiveMountPointCheck {
notMnt, err = mount.IsNotMountPoint(mounter, mountPath)
} else {
notMnt, err = mounter.IsLikelyNotMountPoint(mountPath)
}
if err != nil {
return err
}
if notMnt {
klog.Warningf("Warning: %q is not a mountpoint, deleting", mountPath)
return os.Remove(mountPath)
}
}
// Unmount the mount path
klog.V(4).Infof("%q is a mountpoint, unmounting", mountPath)
if err := mounter.Unmount(mountPath); err != nil {
return err
}
notMnt, mntErr := mounter.IsLikelyNotMountPoint(mountPath)
if mntErr != nil {
return mntErr
}
if notMnt {
klog.V(4).Infof("%q is unmounted, deleting the directory", mountPath)
return os.Remove(mountPath)
}
return fmt.Errorf("Failed to unmount path %v", mountPath)
return mount.CleanupMountPoint(mountPath, mounter, extensiveMountPointCheck)
}
// PathExists returns true if the specified path exists.
// TODO: Change callers to call mount pkg directly
func PathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
} else if os.IsNotExist(err) {
return false, nil
} else if IsCorruptedMnt(err) {
return true, err
} else {
return false, err
}
return mount.PathExists(path)
}
// IsCorruptedMnt return true if err is about corrupted mount point
// TODO: Change callers to call mount pkg directly
func IsCorruptedMnt(err error) bool {
if err == nil {
return false
}
var underlyingError error
switch pe := err.(type) {
case nil:
return false
case *os.PathError:
underlyingError = pe.Err
case *os.LinkError:
underlyingError = pe.Err
case *os.SyscallError:
underlyingError = pe.Err
}
return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO
return mount.IsCorruptedMnt(err)
}
// GetSecretForPod locates secret by name in the pod's namespace and returns secret map
@ -825,9 +753,10 @@ func GetUniqueVolumeName(pluginName, volumeName string) v1.UniqueVolumeName {
return v1.UniqueVolumeName(fmt.Sprintf("%s/%s", pluginName, volumeName))
}
// GetUniqueVolumeNameForNonAttachableVolume returns the unique volume name
// for a non-attachable volume.
func GetUniqueVolumeNameForNonAttachableVolume(
// GetUniqueVolumeNameFromSpecWithPod returns a unique volume name with pod
// name included. This is useful to generate different names for different pods
// on same volume.
func GetUniqueVolumeNameFromSpecWithPod(
podName types.UniquePodName, volumePlugin volume.VolumePlugin, volumeSpec *volume.Spec) v1.UniqueVolumeName {
return v1.UniqueVolumeName(
fmt.Sprintf("%s/%v-%s", volumePlugin.GetPluginName(), podName, volumeSpec.Name()))

View File

@ -67,8 +67,11 @@ github.com/google/certificate-transparency-go v1.0.21
github.com/google/gofuzz 44d81051d367757e1c7c6a5a86423ece9afcf63c
github.com/google/uuid 0.2-15-g8c31c18f31ede9
github.com/go-openapi/analysis v0.17.2
github.com/go-openapi/jsonpointer v0.18.0
github.com/go-openapi/loads v0.17.2
github.com/go-openapi/strfmt v0.17.0
github.com/go-openapi/swag v0.17.2
github.com/go-openapi/validate v0.18.0
github.com/go-ozzo/ozzo-validation v3.5.0
github.com/gophercloud/gophercloud 781450b3c4fcb4f5182bcc5133adb4b2e4a09d1d
github.com/gorilla/websocket v1.2.0-9-g4201258b820c74
@ -174,6 +177,7 @@ github.com/vmware/photon-controller-go-sdk PROMOTED-488
github.com/xanzy/go-cloudstack v2.1.1-1-g1e2cbf647e57fa
github.com/xiang90/probing 0.0.1
golang.org/x/crypto de0752318171da717af4ce24d0a2e8626afaeb11
golang.org/x/lint 8f45f776aaf18cebc8d65861cc70c33c60471952
golang.org/x/net 0ed95abb35c445290478a5348a7b38bb154135fd
golang.org/x/oauth2 a6bd8cefa1811bd24b86f8902872e4e8225f74c4
golang.org/x/sys 95c6576299259db960f6c5b9b69ea52422860fce
@ -201,6 +205,6 @@ k8s.io/utils 66066c83e385e385ccc3c964b44fd7dcd413d0ed
sigs.k8s.io/yaml v1.1.0
vbom.ml/util db5cfe13f5cc80a4990d98e2e1b0707a4d1a5394
bitbucket.org/ww/goautoneg a547fc61f48d567d5b4ec6f8aee5573d8efce11d https://github.com/rancher/goautoneg.git
github.com/ibuildthecloud/kvsql 6bb3d252056655760ed8ca6557d6d5e607b361d2
github.com/ibuildthecloud/kvsql 8dfe3deb0646c4817567e4a53ed1dea41ea5668f
github.com/google/cadvisor 91dab6eb91496ed68acbef68b02b34b3392ca754 https://github.com/ibuildthecloud/cadvisor.git
github.com/opencontainers/runc 96ec2177ae841256168fcf76954f7177af9446eb