Merge pull request #70508 from dashpole/pod_resources_socket

Add socket-based kubelet pod resources API.
pull/58/head
k8s-ci-robot 2018-11-15 13:43:44 -08:00 committed by GitHub
commit 7b4d4bc8ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1871 additions and 0 deletions

View File

@ -1015,6 +1015,9 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubele
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT="$(cd "$(dirname "${BASH_SOURCE}")/../" && pwd -P)"
POD_RESOURCES_ALPHA="${KUBE_ROOT}/pkg/kubelet/apis/podresources/v1alpha1/"
source "${KUBE_ROOT}/hack/lib/protoc.sh"
kube::protoc::generate_proto ${POD_RESOURCES_ALPHA}

View File

@ -0,0 +1,27 @@
#!/usr/bin/env bash
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE}")/..
# NOTE: All output from this script needs to be copied back to the calling
# source tree. This is managed in kube::build::copy_output in build/common.sh.
# If the output set is changed update that function.
${KUBE_ROOT}/build/run.sh hack/update-generated-pod-resources-dockerized.sh "$@"

View File

@ -0,0 +1,45 @@
#!/usr/bin/env bash
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o errexit
set -o nounset
set -o pipefail
KUBE_ROOT=$(dirname "${BASH_SOURCE}")/..
KUBE_REMOTE_RUNTIME_ROOT="${KUBE_ROOT}/pkg/kubelet/apis/podresources/v1alpha1"
source "${KUBE_ROOT}/hack/lib/init.sh"
kube::golang::setup_env
function cleanup {
rm -rf ${KUBE_REMOTE_RUNTIME_ROOT}/_tmp/
}
trap cleanup EXIT
mkdir -p ${KUBE_REMOTE_RUNTIME_ROOT}/_tmp
cp ${KUBE_REMOTE_RUNTIME_ROOT}/api.pb.go ${KUBE_REMOTE_RUNTIME_ROOT}/_tmp/
ret=0
KUBE_VERBOSE=3 "${KUBE_ROOT}/hack/update-generated-pod-resources.sh"
diff -I "gzipped FileDescriptorProto" -I "0x" -Naupr ${KUBE_REMOTE_RUNTIME_ROOT}/_tmp/api.pb.go ${KUBE_REMOTE_RUNTIME_ROOT}/api.pb.go || ret=$?
if [[ $ret -eq 0 ]]; then
echo "Generated pod resources api is up to date."
cp ${KUBE_REMOTE_RUNTIME_ROOT}/_tmp/api.pb.go ${KUBE_REMOTE_RUNTIME_ROOT}/
else
echo "Generated pod resources api is out of date. Please run hack/update-generated-pod-resources.sh"
exit 1
fi

View File

@ -386,6 +386,12 @@ const (
//
// Allow TTL controller to clean up Pods and Jobs after they finish.
TTLAfterFinished utilfeature.Feature = "TTLAfterFinished"
// owner: @dashpole
// alpha: v1.13
//
// Enables the kubelet's pod resources grpc endpoint
KubeletPodResources utilfeature.Feature = "KubeletPodResources"
)
func init() {
@ -453,6 +459,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
VolumeSnapshotDataSource: {Default: false, PreRelease: utilfeature.Alpha},
ProcMountType: {Default: false, PreRelease: utilfeature.Alpha},
TTLAfterFinished: {Default: false, PreRelease: utilfeature.Alpha},
KubeletPodResources: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -47,6 +47,7 @@ go_library(
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/apis/podresources:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",

View File

@ -42,6 +42,7 @@ filegroup(
"//pkg/kubelet/apis/pluginregistration/v1:all-srcs",
"//pkg/kubelet/apis/pluginregistration/v1alpha1:all-srcs",
"//pkg/kubelet/apis/pluginregistration/v1beta1:all-srcs",
"//pkg/kubelet/apis/podresources:all-srcs",
"//pkg/kubelet/apis/stats/v1alpha1:all-srcs",
],
tags = ["automanaged"],

View File

@ -0,0 +1,48 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"client.go",
"constants.go",
"server.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/podresources",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["server_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/github.com/stretchr/testify/mock:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/apis/podresources/v1alpha1:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,44 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podresources
import (
"context"
"fmt"
"time"
"google.golang.org/grpc"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/util"
)
// GetClient returns a client for the PodResourcesLister grpc service
func GetClient(socket string, connectionTimeout time.Duration, maxMsgSize int) (podresourcesapi.PodResourcesListerClient, *grpc.ClientConn, error) {
addr, dialer, err := util.GetAddressAndDialer(socket)
if err != nil {
return nil, nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if err != nil {
return nil, nil, fmt.Errorf("Error dialing socket %s: %v", socket, err)
}
return podresourcesapi.NewPodResourcesListerClient(conn), conn, nil
}

View File

@ -0,0 +1,22 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podresources
const (
// Socket is the name of the podresources server socket
Socket = "kubelet"
)

View File

@ -0,0 +1,75 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podresources
import (
"context"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
)
// DevicesProvider knows how to provide the devices used by the given container
type DevicesProvider interface {
GetDevices(podUID, containerName string) []*v1alpha1.ContainerDevices
}
// PodsProvider knows how to provide the pods admitted by the node
type PodsProvider interface {
GetPods() []*v1.Pod
}
// podResourcesServer implements PodResourcesListerServer
type podResourcesServer struct {
podsProvider PodsProvider
devicesProvider DevicesProvider
}
// NewPodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewPodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) v1alpha1.PodResourcesListerServer {
return &podResourcesServer{
podsProvider: podsProvider,
devicesProvider: devicesProvider,
}
}
// List returns information about the resources assigned to pods on the node
func (p *podResourcesServer) List(ctx context.Context, req *v1alpha1.ListPodResourcesRequest) (*v1alpha1.ListPodResourcesResponse, error) {
pods := p.podsProvider.GetPods()
podResources := make([]*v1alpha1.PodResources, len(pods))
for i, pod := range pods {
pRes := v1alpha1.PodResources{
Name: pod.Name,
Namespace: pod.Namespace,
Containers: make([]*v1alpha1.ContainerResources, len(pod.Spec.Containers)),
}
for j, container := range pod.Spec.Containers {
pRes.Containers[j] = &v1alpha1.ContainerResources{
Name: container.Name,
Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
}
}
podResources[i] = &pRes
}
return &v1alpha1.ListPodResourcesResponse{
PodResources: podResources,
}, nil
}

View File

@ -0,0 +1,153 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podresources
import (
"context"
"testing"
"github.com/stretchr/testify/mock"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
)
type mockProvider struct {
mock.Mock
}
func (m *mockProvider) GetPods() []*v1.Pod {
args := m.Called()
return args.Get(0).([]*v1.Pod)
}
func (m *mockProvider) GetDevices(podUID, containerName string) []*v1alpha1.ContainerDevices {
args := m.Called(podUID, containerName)
return args.Get(0).([]*v1alpha1.ContainerDevices)
}
func TestListPodResources(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"
podUID := types.UID("pod-uid")
containerName := "container-name"
devs := []*v1alpha1.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0", "dev1"},
},
}
for _, tc := range []struct {
desc string
pods []*v1.Pod
devices []*v1alpha1.ContainerDevices
expectedResponse *v1alpha1.ListPodResourcesResponse
}{
{
desc: "no pods",
pods: []*v1.Pod{},
devices: []*v1alpha1.ContainerDevices{},
expectedResponse: &v1alpha1.ListPodResourcesResponse{},
},
{
desc: "pod without devices",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
UID: podUID,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: containerName,
},
},
},
},
},
devices: []*v1alpha1.ContainerDevices{},
expectedResponse: &v1alpha1.ListPodResourcesResponse{
PodResources: []*v1alpha1.PodResources{
{
Name: podName,
Namespace: podNamespace,
Containers: []*v1alpha1.ContainerResources{
{
Name: containerName,
Devices: []*v1alpha1.ContainerDevices{},
},
},
},
},
},
},
{
desc: "pod with devices",
pods: []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: podNamespace,
UID: podUID,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: containerName,
},
},
},
},
},
devices: devs,
expectedResponse: &v1alpha1.ListPodResourcesResponse{
PodResources: []*v1alpha1.PodResources{
{
Name: podName,
Namespace: podNamespace,
Containers: []*v1alpha1.ContainerResources{
{
Name: containerName,
Devices: devs,
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
m := new(mockProvider)
m.On("GetPods").Return(tc.pods)
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
server := NewPodResourcesServer(m, m)
resp, err := server.List(context.TODO(), &v1alpha1.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
if tc.expectedResponse.String() != resp.String() {
t.Errorf("want resp = %s, got %s", tc.expectedResponse.String(), resp.String())
}
})
}
}

View File

@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["api.pb.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/gogo/protobuf/gogoproto:go_default_library",
"//vendor/github.com/gogo/protobuf/proto:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,48 @@
// To regenerate api.pb.go run hack/update-generated-pod-resources.sh
syntax = 'proto3';
package v1alpha1;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.goproto_getters_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_unrecognized_all) = false;
// PodResourcesLister is a service provided by the kubelet that provides information about the
// node resources consumed by pods and containers on the node
service PodResourcesLister {
rpc List(ListPodResourcesRequest) returns (ListPodResourcesResponse) {}
}
// ListPodResourcesRequest is the request made to the PodResourcesLister service
message ListPodResourcesRequest {}
// ListPodResourcesResponse is the response returned by List function
message ListPodResourcesResponse {
repeated PodResources pod_resources = 1;
}
// PodResources contains information about the node resources assigned to a pod
message PodResources {
string name = 1;
string namespace = 2;
repeated ContainerResources containers = 3;
}
// ContainerResources contains information about the resources assigned to a container
message ContainerResources {
string name = 1;
repeated ContainerDevices devices = 2;
}
// ContainerDevices contains information about the devices assigned to a container
message ContainerDevices {
string resource_name = 1;
repeated string device_ids = 2;
}

View File

@ -27,6 +27,7 @@ go_library(
deps = [
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",

View File

@ -23,6 +23,7 @@ import (
// TODO: Migrate kubelet to either use its own internal objects or client library.
"k8s.io/api/core/v1"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@ -100,6 +101,9 @@ type ContainerManager interface {
// The pluginwatcher's Handlers allow to have a single module for handling
// registration.
GetPluginRegistrationHandler() pluginwatcher.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
}
type NodeConfig struct {

View File

@ -44,6 +44,7 @@ import (
"k8s.io/client-go/tools/record"
kubefeatures "k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
@ -878,3 +879,7 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return cm.deviceManager.GetCapacity()
}
func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
return cm.deviceManager.GetDevices(podUID, containerName)
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -105,6 +106,10 @@ func (cm *containerManagerStub) GetPodCgroupRoot() string {
return ""
}
func (cm *containerManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/klog"
kubefeatures "k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -166,3 +167,7 @@ func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLi
func (cm *containerManagerImpl) GetPodCgroupRoot() string {
return ""
}
func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil
}

View File

@ -16,6 +16,7 @@ go_library(
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
@ -802,3 +803,10 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
}
return false
}
// GetDevices returns the devices used by the specified container
func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.podDevices.getContainerDevices(podUID, containerName)
}

View File

@ -18,6 +18,7 @@ package devicemanager
import (
"k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
@ -61,3 +62,8 @@ func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler {
return nil
}
// GetDevices returns nil
func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil
}

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -271,3 +272,21 @@ func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *Devic
}
return opts
}
// getContainerDevices returns the devices assigned to the provided container for all ResourceNames
func (pdev podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices {
if _, podExists := pdev[podUID]; !podExists {
return nil
}
if _, contExists := pdev[podUID][contName]; !contExists {
return nil
}
cDev := []*podresourcesapi.ContainerDevices{}
for resource, allocateInfo := range pdev[podUID][contName] {
cDev = append(cDev, &podresourcesapi.ContainerDevices{
ResourceName: resource,
DeviceIds: allocateInfo.deviceIds.UnsortedList(),
})
}
return cDev
}

View File

@ -20,6 +20,7 @@ import (
"time"
"k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -54,6 +55,9 @@ type Manager interface {
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
GetWatcherHandler() watcher.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -23,4 +23,5 @@ const (
DefaultKubeletPluginsDirName = "plugins"
DefaultKubeletContainersDirName = "containers"
DefaultKubeletPluginContainersDirName = "plugin-containers"
DefaultKubeletPodResourcesDirName = "pod-resources"
)

View File

@ -60,6 +60,7 @@ import (
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
@ -97,6 +98,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/token"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
@ -192,6 +194,7 @@ type Bootstrap interface {
StartGarbageCollection()
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool)
ListenAndServeReadOnly(address net.IP, port uint)
ListenAndServePodResources()
Run(<-chan kubetypes.PodUpdate)
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}
@ -1242,6 +1245,7 @@ func allGlobalUnicastIPs() ([]net.IP, error) {
// 1. the root directory
// 2. the pods directory
// 3. the plugins directory
// 4. the pod-resources directory
func (kl *Kubelet) setupDataDirs() error {
kl.rootDirectory = path.Clean(kl.rootDirectory)
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
@ -1256,6 +1260,9 @@ func (kl *Kubelet) setupDataDirs() error {
if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
return fmt.Errorf("error creating plugins directory: %v", err)
}
if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
return fmt.Errorf("error creating podresources directory: %v", err)
}
return nil
}
@ -2221,6 +2228,11 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
}
// ListenAndServePodResources runs the kubelet podresources grpc service
func (kl *Kubelet) ListenAndServePodResources() {
server.ListenAndServePodResources(util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket), kl.podManager, kl.containerManager)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
if podStatus, err := kl.podCache.Get(podID); err == nil {

View File

@ -139,6 +139,11 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
return filepath.Join(kl.getPodDir(podUID), config.DefaultKubeletContainersDirName, ctrName)
}
// getPodResourcesSocket returns the full path to the directory containing the pod resources socket
func (kl *Kubelet) getPodResourcesDir() string {
return filepath.Join(kl.getRootDir(), config.DefaultKubeletPodResourcesDirName)
}
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pods.
func (kl *Kubelet) GetPods() []*v1.Pod {

View File

@ -18,6 +18,8 @@ go_library(
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1/validation:go_default_library",
"//pkg/kubelet/apis/podresources:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/prober:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",
@ -25,6 +27,7 @@ go_library(
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util:go_default_library",
"//pkg/util/configz:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -47,6 +50,7 @@ go_library(
"//vendor/github.com/google/cadvisor/metrics:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -37,6 +37,7 @@ import (
"github.com/google/cadvisor/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"k8s.io/klog"
"k8s.io/api/core/v1"
@ -56,6 +57,8 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/v1/validation"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
@ -63,6 +66,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/util/configz"
)
@ -161,6 +165,17 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st
klog.Fatal(server.ListenAndServe())
}
// ListenAndServePodResources initializes a grpc server to serve the PodResources service
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
server := grpc.NewServer()
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider))
l, err := util.CreateListener(socket)
if err != nil {
klog.Fatalf("Failed to create listener for podResources endpoint: %v", err)
}
klog.Fatal(server.Serve(l))
}
// AuthInterface contains all methods required by the auth filters
type AuthInterface interface {
authenticator.Request

View File

@ -23,6 +23,7 @@ import (
"net"
"net/url"
"os"
"path/filepath"
"time"
"golang.org/x/sys/unix"
@ -99,3 +100,12 @@ func parseEndpoint(endpoint string) (string, string, error) {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}
// LocalEndpoint returns the full path to a unix socket at the given endpoint
func LocalEndpoint(path, file string) string {
u := url.URL{
Scheme: unixProtocol,
Path: path,
}
return filepath.Join(u.String(), file+".sock")
}

View File

@ -40,3 +40,8 @@ func LockAndCheckSubPath(volumePath, subPath string) ([]uintptr, error) {
// UnlockPath empty implementation
func UnlockPath(fileHandles []uintptr) {
}
// LocalEndpoint empty implementation
func LocalEndpoint(path, file string) string {
return ""
}

View File

@ -103,3 +103,12 @@ func parseEndpoint(endpoint string) (string, string, error) {
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}
// LocalEndpoint returns the full path to a windows named pipe
func LocalEndpoint(path, file string) string {
u := url.URL{
Scheme: npipeProtocol,
Path: path,
}
return u.String() + "//./pipe/" + file
}

View File

@ -24,12 +24,15 @@ go_library(
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/podresources:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/cm/devicemanager:go_default_library",
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/remote:go_default_library",
"//pkg/kubelet/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -49,6 +52,7 @@ go_library(
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/prometheus/common/model:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:linux": [

View File

@ -60,7 +60,11 @@ func testDevicePlugin(f *framework.Framework, enablePluginWatcher bool, pluginSo
Context("DevicePlugin", func() {
By("Enabling support for Kubelet Plugins Watcher")
tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
if initialConfig.FeatureGates == nil {
initialConfig.FeatureGates = map[string]bool{}
}
initialConfig.FeatureGates[string(features.KubeletPluginsWatcher)] = enablePluginWatcher
initialConfig.FeatureGates[string(features.KubeletPodResources)] = true
})
It("Verifies the Kubelet device plugin functionality.", func() {
By("Start stub device plugin")
@ -98,6 +102,17 @@ func testDevicePlugin(f *framework.Framework, enablePluginWatcher bool, pluginSo
devId1 := parseLog(f, pod1.Name, pod1.Name, deviceIDRE)
Expect(devId1).To(Not(Equal("")))
podResources, err := getNodeDevices()
Expect(err).To(BeNil())
Expect(len(podResources.PodResources)).To(Equal(1))
Expect(podResources.PodResources[0].Name).To(Equal(pod1.Name))
Expect(podResources.PodResources[0].Namespace).To(Equal(pod1.Namespace))
Expect(len(podResources.PodResources[0].Containers)).To(Equal(1))
Expect(podResources.PodResources[0].Containers[0].Name).To(Equal(pod1.Spec.Containers[0].Name))
Expect(len(podResources.PodResources[0].Containers[0].Devices)).To(Equal(1))
Expect(podResources.PodResources[0].Containers[0].Devices[0].ResourceName).To(Equal(resourceName))
Expect(len(podResources.PodResources[0].Containers[0].Devices[0].DeviceIds)).To(Equal(1))
pod1, err = f.PodClient().Get(pod1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)

View File

@ -27,6 +27,7 @@ import (
"strings"
"time"
"golang.org/x/net/context"
"k8s.io/klog"
apiv1 "k8s.io/api/core/v1"
@ -38,11 +39,14 @@ import (
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/remote"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/metrics"
frameworkmetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
@ -62,6 +66,10 @@ var busyboxImage = imageutils.GetE2EImage(imageutils.BusyBox)
const (
// Kubelet internal cgroup name for node allocatable cgroup.
defaultNodeAllocatableCgroup = "kubepods"
// defaultPodResourcesPath is the path to the local endpoint serving the podresources GRPC service.
defaultPodResourcesPath = "/var/lib/kubelet/pod-resources"
defaultPodResourcesTimeout = 10 * time.Second
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
)
func getNodeSummary() (*stats.Summary, error) {
@ -92,6 +100,22 @@ func getNodeSummary() (*stats.Summary, error) {
return &summary, nil
}
func getNodeDevices() (*podresourcesapi.ListPodResourcesResponse, error) {
endpoint := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
client, conn, err := podresources.GetClient(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
if err != nil {
return nil, fmt.Errorf("Error getting grpc client: %v", err)
}
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
return nil, fmt.Errorf("%v.Get(_) = _, %v", client, err)
}
return resp, nil
}
// Returns the current KubeletConfiguration
func getCurrentKubeletConfig() (*kubeletconfig.KubeletConfiguration, error) {
resp := pollConfigz(5*time.Minute, 5*time.Second)