mirror of https://github.com/k3s-io/k3s
add kubelet grpc server for pod-resources service
parent
288667f436
commit
630cb53f82
|
@ -1015,6 +1015,9 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubele
|
|||
if kubeCfg.ReadOnlyPort > 0 {
|
||||
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
|
||||
go k.ListenAndServePodResources()
|
||||
}
|
||||
}
|
||||
|
||||
func CreateAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
||||
|
|
|
@ -47,6 +47,7 @@ go_library(
|
|||
"//pkg/kubelet/apis/cri:go_default_library",
|
||||
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
|
||||
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
|
||||
"//pkg/kubelet/apis/podresources:go_default_library",
|
||||
"//pkg/kubelet/cadvisor:go_default_library",
|
||||
"//pkg/kubelet/certificate:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
|
|
|
@ -40,6 +40,7 @@ filegroup(
|
|||
"//pkg/kubelet/apis/pluginregistration/v1:all-srcs",
|
||||
"//pkg/kubelet/apis/pluginregistration/v1alpha1:all-srcs",
|
||||
"//pkg/kubelet/apis/pluginregistration/v1beta1:all-srcs",
|
||||
"//pkg/kubelet/apis/podresources:all-srcs",
|
||||
"//pkg/kubelet/apis/stats/v1alpha1:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"client.go",
|
||||
"constants.go",
|
||||
"server.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/apis/podresources",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/google.golang.org/grpc:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["server_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/mock:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [
|
||||
":package-srcs",
|
||||
"//pkg/kubelet/apis/podresources/v1alpha1:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podresources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
)
|
||||
|
||||
// GetClient returns a client for the PodResourcesLister grpc service
|
||||
func GetClient(socket string, connectionTimeout time.Duration, maxMsgSize int) (podresourcesapi.PodResourcesListerClient, *grpc.ClientConn, error) {
|
||||
addr, dialer, err := util.GetAddressAndDialer(socket)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("Error dialing socket %s: %v", socket, err)
|
||||
}
|
||||
return podresourcesapi.NewPodResourcesListerClient(conn), conn, nil
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podresources
|
||||
|
||||
const (
|
||||
// Socket is the name of the podresources server socket
|
||||
Socket = "kubelet"
|
||||
)
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podresources
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
)
|
||||
|
||||
// DevicesProvider knows how to provide the devices used by the given container
|
||||
type DevicesProvider interface {
|
||||
GetDevices(podUID, containerName string) []*v1alpha1.ContainerDevices
|
||||
}
|
||||
|
||||
// PodsProvider knows how to provide the pods admitted by the node
|
||||
type PodsProvider interface {
|
||||
GetPods() []*v1.Pod
|
||||
}
|
||||
|
||||
// podResourcesServer implements PodResourcesListerServer
|
||||
type podResourcesServer struct {
|
||||
podsProvider PodsProvider
|
||||
devicesProvider DevicesProvider
|
||||
}
|
||||
|
||||
// NewPodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
|
||||
// with device information provided by the DevicesProvider
|
||||
func NewPodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) v1alpha1.PodResourcesListerServer {
|
||||
return &podResourcesServer{
|
||||
podsProvider: podsProvider,
|
||||
devicesProvider: devicesProvider,
|
||||
}
|
||||
}
|
||||
|
||||
// List returns information about the resources assigned to pods on the node
|
||||
func (p *podResourcesServer) List(ctx context.Context, req *v1alpha1.ListPodResourcesRequest) (*v1alpha1.ListPodResourcesResponse, error) {
|
||||
pods := p.podsProvider.GetPods()
|
||||
podResources := make([]*v1alpha1.PodResources, len(pods))
|
||||
|
||||
for i, pod := range pods {
|
||||
pRes := v1alpha1.PodResources{
|
||||
Name: pod.Name,
|
||||
Namespace: pod.Namespace,
|
||||
Containers: make([]*v1alpha1.ContainerResources, len(pod.Spec.Containers)),
|
||||
}
|
||||
|
||||
for j, container := range pod.Spec.Containers {
|
||||
pRes.Containers[j] = &v1alpha1.ContainerResources{
|
||||
Name: container.Name,
|
||||
Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
|
||||
}
|
||||
}
|
||||
podResources[i] = &pRes
|
||||
}
|
||||
|
||||
return &v1alpha1.ListPodResourcesResponse{
|
||||
PodResources: podResources,
|
||||
}, nil
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package podresources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
)
|
||||
|
||||
type mockProvider struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockProvider) GetPods() []*v1.Pod {
|
||||
args := m.Called()
|
||||
return args.Get(0).([]*v1.Pod)
|
||||
}
|
||||
|
||||
func (m *mockProvider) GetDevices(podUID, containerName string) []*v1alpha1.ContainerDevices {
|
||||
args := m.Called(podUID, containerName)
|
||||
return args.Get(0).([]*v1alpha1.ContainerDevices)
|
||||
}
|
||||
|
||||
func TestListPodResources(t *testing.T) {
|
||||
podName := "pod-name"
|
||||
podNamespace := "pod-namespace"
|
||||
podUID := types.UID("pod-uid")
|
||||
containerName := "container-name"
|
||||
|
||||
devs := []*v1alpha1.ContainerDevices{
|
||||
{
|
||||
ResourceName: "resource",
|
||||
DeviceIds: []string{"dev0", "dev1"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range []struct {
|
||||
desc string
|
||||
pods []*v1.Pod
|
||||
devices []*v1alpha1.ContainerDevices
|
||||
expectedResponse *v1alpha1.ListPodResourcesResponse
|
||||
}{
|
||||
{
|
||||
desc: "no pods",
|
||||
pods: []*v1.Pod{},
|
||||
devices: []*v1alpha1.ContainerDevices{},
|
||||
expectedResponse: &v1alpha1.ListPodResourcesResponse{},
|
||||
},
|
||||
{
|
||||
desc: "pod without devices",
|
||||
pods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
UID: podUID,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: containerName,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
devices: []*v1alpha1.ContainerDevices{},
|
||||
expectedResponse: &v1alpha1.ListPodResourcesResponse{
|
||||
PodResources: []*v1alpha1.PodResources{
|
||||
{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
Containers: []*v1alpha1.ContainerResources{
|
||||
{
|
||||
Name: containerName,
|
||||
Devices: []*v1alpha1.ContainerDevices{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "pod with devices",
|
||||
pods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
UID: podUID,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: containerName,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
devices: devs,
|
||||
expectedResponse: &v1alpha1.ListPodResourcesResponse{
|
||||
PodResources: []*v1alpha1.PodResources{
|
||||
{
|
||||
Name: podName,
|
||||
Namespace: podNamespace,
|
||||
Containers: []*v1alpha1.ContainerResources{
|
||||
{
|
||||
Name: containerName,
|
||||
Devices: devs,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
m := new(mockProvider)
|
||||
m.On("GetPods").Return(tc.pods)
|
||||
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
|
||||
server := NewPodResourcesServer(m, m)
|
||||
resp, err := server.List(context.TODO(), &v1alpha1.ListPodResourcesRequest{})
|
||||
if err != nil {
|
||||
t.Errorf("want err = %v, got %q", nil, err)
|
||||
}
|
||||
if tc.expectedResponse.String() != resp.String() {
|
||||
t.Errorf("want resp = %s, got %s", tc.expectedResponse.String(), resp.String())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -27,6 +27,7 @@ go_library(
|
|||
deps = [
|
||||
"//pkg/features:go_default_library",
|
||||
"//pkg/kubelet/apis/cri:go_default_library",
|
||||
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/cm/cpumanager:go_default_library",
|
||||
"//pkg/kubelet/config:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
// TODO: Migrate kubelet to either use its own internal objects or client library.
|
||||
"k8s.io/api/core/v1"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
||||
|
@ -100,6 +101,9 @@ type ContainerManager interface {
|
|||
// The pluginwatcher's Handlers allow to have a single module for handling
|
||||
// registration.
|
||||
GetPluginRegistrationHandler() pluginwatcher.PluginHandler
|
||||
|
||||
// GetDevices returns information about the devices assigned to pods and containers
|
||||
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
|
||||
}
|
||||
|
||||
type NodeConfig struct {
|
||||
|
|
|
@ -44,6 +44,7 @@ import (
|
|||
"k8s.io/client-go/tools/record"
|
||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
|
||||
|
@ -878,3 +879,7 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
|
|||
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
|
||||
return cm.deviceManager.GetCapacity()
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
|
||||
return cm.deviceManager.GetDevices(podUID, containerName)
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
|
@ -105,6 +106,10 @@ func (cm *containerManagerStub) GetPodCgroupRoot() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
func (cm *containerManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewStubContainerManager() ContainerManager {
|
||||
return &containerManagerStub{}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"k8s.io/klog"
|
||||
kubefeatures "k8s.io/kubernetes/pkg/features"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
|
@ -166,3 +167,7 @@ func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLi
|
|||
func (cm *containerManagerImpl) GetPodCgroupRoot() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ go_library(
|
|||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
|
||||
"//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
|
||||
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
|
||||
|
|
|
@ -33,6 +33,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
|
||||
|
@ -802,3 +803,10 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetDevices returns the devices used by the specified container
|
||||
func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
return m.podDevices.getContainerDevices(podUID, containerName)
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package devicemanager
|
|||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
|
@ -61,3 +62,8 @@ func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
|
|||
func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDevices returns nil
|
||||
func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
)
|
||||
|
@ -271,3 +272,21 @@ func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *Devic
|
|||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
// getContainerDevices returns the devices assigned to the provided container for all ResourceNames
|
||||
func (pdev podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices {
|
||||
if _, podExists := pdev[podUID]; !podExists {
|
||||
return nil
|
||||
}
|
||||
if _, contExists := pdev[podUID][contName]; !contExists {
|
||||
return nil
|
||||
}
|
||||
cDev := []*podresourcesapi.ContainerDevices{}
|
||||
for resource, allocateInfo := range pdev[podUID][contName] {
|
||||
cDev = append(cDev, &podresourcesapi.ContainerDevices{
|
||||
ResourceName: resource,
|
||||
DeviceIds: allocateInfo.deviceIds.UnsortedList(),
|
||||
})
|
||||
}
|
||||
return cDev
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
|
@ -54,6 +55,9 @@ type Manager interface {
|
|||
// and inactive device plugin resources previously registered on the node.
|
||||
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
|
||||
GetWatcherHandler() watcher.PluginHandler
|
||||
|
||||
// GetDevices returns information about the devices assigned to pods and containers
|
||||
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
|
||||
}
|
||||
|
||||
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
|
||||
|
|
|
@ -23,4 +23,5 @@ const (
|
|||
DefaultKubeletPluginsDirName = "plugins"
|
||||
DefaultKubeletContainersDirName = "containers"
|
||||
DefaultKubeletPluginContainersDirName = "plugin-containers"
|
||||
DefaultKubeletPodResourcesDirName = "pod-resources"
|
||||
)
|
||||
|
|
|
@ -60,6 +60,7 @@ import (
|
|||
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
|
@ -97,6 +98,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/kubelet/sysctl"
|
||||
"k8s.io/kubernetes/pkg/kubelet/token"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/manager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
|
@ -192,6 +194,7 @@ type Bootstrap interface {
|
|||
StartGarbageCollection()
|
||||
ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool)
|
||||
ListenAndServeReadOnly(address net.IP, port uint)
|
||||
ListenAndServePodResources()
|
||||
Run(<-chan kubetypes.PodUpdate)
|
||||
RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
|
||||
}
|
||||
|
@ -1242,6 +1245,7 @@ func allGlobalUnicastIPs() ([]net.IP, error) {
|
|||
// 1. the root directory
|
||||
// 2. the pods directory
|
||||
// 3. the plugins directory
|
||||
// 4. the pod-resources directory
|
||||
func (kl *Kubelet) setupDataDirs() error {
|
||||
kl.rootDirectory = path.Clean(kl.rootDirectory)
|
||||
if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
|
||||
|
@ -1256,6 +1260,9 @@ func (kl *Kubelet) setupDataDirs() error {
|
|||
if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
|
||||
return fmt.Errorf("error creating plugins directory: %v", err)
|
||||
}
|
||||
if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
|
||||
return fmt.Errorf("error creating podresources directory: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2221,6 +2228,11 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
|
|||
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
|
||||
}
|
||||
|
||||
// ListenAndServePodResources runs the kubelet podresources grpc service
|
||||
func (kl *Kubelet) ListenAndServePodResources() {
|
||||
server.ListenAndServePodResources(util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket), kl.podManager, kl.containerManager)
|
||||
}
|
||||
|
||||
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
|
||||
func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
|
||||
if podStatus, err := kl.podCache.Get(podID); err == nil {
|
||||
|
|
|
@ -139,6 +139,11 @@ func (kl *Kubelet) getPodContainerDir(podUID types.UID, ctrName string) string {
|
|||
return filepath.Join(kl.getPodDir(podUID), config.DefaultKubeletContainersDirName, ctrName)
|
||||
}
|
||||
|
||||
// getPodResourcesSocket returns the full path to the directory containing the pod resources socket
|
||||
func (kl *Kubelet) getPodResourcesDir() string {
|
||||
return filepath.Join(kl.getRootDir(), config.DefaultKubeletPodResourcesDirName)
|
||||
}
|
||||
|
||||
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
||||
// pods.
|
||||
func (kl *Kubelet) GetPods() []*v1.Pod {
|
||||
|
|
|
@ -18,6 +18,8 @@ go_library(
|
|||
"//pkg/api/legacyscheme:go_default_library",
|
||||
"//pkg/apis/core:go_default_library",
|
||||
"//pkg/apis/core/v1/validation:go_default_library",
|
||||
"//pkg/kubelet/apis/podresources:go_default_library",
|
||||
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/prober:go_default_library",
|
||||
"//pkg/kubelet/server/portforward:go_default_library",
|
||||
|
@ -25,6 +27,7 @@ go_library(
|
|||
"//pkg/kubelet/server/stats:go_default_library",
|
||||
"//pkg/kubelet/server/streaming:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//pkg/kubelet/util:go_default_library",
|
||||
"//pkg/util/configz:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
@ -47,6 +50,7 @@ go_library(
|
|||
"//vendor/github.com/google/cadvisor/metrics:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus/promhttp:go_default_library",
|
||||
"//vendor/google.golang.org/grpc:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/google/cadvisor/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
|
@ -56,6 +57,8 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/apis/core/v1/validation"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober"
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
|
||||
|
@ -63,6 +66,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/kubelet/server/stats"
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
||||
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
)
|
||||
|
||||
|
@ -161,6 +165,17 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st
|
|||
klog.Fatal(server.ListenAndServe())
|
||||
}
|
||||
|
||||
// ListenAndServePodResources initializes a grpc server to serve the PodResources service
|
||||
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) {
|
||||
server := grpc.NewServer()
|
||||
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewPodResourcesServer(podsProvider, devicesProvider))
|
||||
l, err := util.CreateListener(socket)
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to create listener for podResources endpoint: %v", err)
|
||||
}
|
||||
klog.Fatal(server.Serve(l))
|
||||
}
|
||||
|
||||
// AuthInterface contains all methods required by the auth filters
|
||||
type AuthInterface interface {
|
||||
authenticator.Request
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
|
@ -99,3 +100,12 @@ func parseEndpoint(endpoint string) (string, string, error) {
|
|||
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
|
||||
}
|
||||
}
|
||||
|
||||
// LocalEndpoint returns the full path to a unix socket at the given endpoint
|
||||
func LocalEndpoint(path, file string) string {
|
||||
u := url.URL{
|
||||
Scheme: unixProtocol,
|
||||
Path: path,
|
||||
}
|
||||
return filepath.Join(u.String(), file+".sock")
|
||||
}
|
||||
|
|
|
@ -40,3 +40,8 @@ func LockAndCheckSubPath(volumePath, subPath string) ([]uintptr, error) {
|
|||
// UnlockPath empty implementation
|
||||
func UnlockPath(fileHandles []uintptr) {
|
||||
}
|
||||
|
||||
// LocalEndpoint empty implementation
|
||||
func LocalEndpoint(path, file string) string {
|
||||
return ""
|
||||
}
|
||||
|
|
|
@ -103,3 +103,12 @@ func parseEndpoint(endpoint string) (string, string, error) {
|
|||
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
|
||||
}
|
||||
}
|
||||
|
||||
// LocalEndpoint returns the full path to a windows named pipe
|
||||
func LocalEndpoint(path, file string) string {
|
||||
u := url.URL{
|
||||
Scheme: npipeProtocol,
|
||||
Path: path,
|
||||
}
|
||||
return u.String() + "//./pipe/" + file
|
||||
}
|
||||
|
|
|
@ -24,12 +24,15 @@ go_library(
|
|||
"//pkg/kubelet/apis/cri:go_default_library",
|
||||
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
|
||||
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
|
||||
"//pkg/kubelet/apis/podresources:go_default_library",
|
||||
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/cm:go_default_library",
|
||||
"//pkg/kubelet/cm/devicemanager:go_default_library",
|
||||
"//pkg/kubelet/kubeletconfig/util/codec:go_default_library",
|
||||
"//pkg/kubelet/metrics:go_default_library",
|
||||
"//pkg/kubelet/remote:go_default_library",
|
||||
"//pkg/kubelet/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
|
@ -49,6 +52,7 @@ go_library(
|
|||
"//vendor/github.com/onsi/ginkgo:go_default_library",
|
||||
"//vendor/github.com/onsi/gomega:go_default_library",
|
||||
"//vendor/github.com/prometheus/common/model:go_default_library",
|
||||
"//vendor/golang.org/x/net/context:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
] + select({
|
||||
"@io_bazel_rules_go//go/platform:linux": [
|
||||
|
|
|
@ -60,7 +60,11 @@ func testDevicePlugin(f *framework.Framework, enablePluginWatcher bool, pluginSo
|
|||
Context("DevicePlugin", func() {
|
||||
By("Enabling support for Kubelet Plugins Watcher")
|
||||
tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
|
||||
if initialConfig.FeatureGates == nil {
|
||||
initialConfig.FeatureGates = map[string]bool{}
|
||||
}
|
||||
initialConfig.FeatureGates[string(features.KubeletPluginsWatcher)] = enablePluginWatcher
|
||||
initialConfig.FeatureGates[string(features.KubeletPodResources)] = true
|
||||
})
|
||||
It("Verifies the Kubelet device plugin functionality.", func() {
|
||||
By("Start stub device plugin")
|
||||
|
@ -98,6 +102,17 @@ func testDevicePlugin(f *framework.Framework, enablePluginWatcher bool, pluginSo
|
|||
devId1 := parseLog(f, pod1.Name, pod1.Name, deviceIDRE)
|
||||
Expect(devId1).To(Not(Equal("")))
|
||||
|
||||
podResources, err := getNodeDevices()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(len(podResources.PodResources)).To(Equal(1))
|
||||
Expect(podResources.PodResources[0].Name).To(Equal(pod1.Name))
|
||||
Expect(podResources.PodResources[0].Namespace).To(Equal(pod1.Namespace))
|
||||
Expect(len(podResources.PodResources[0].Containers)).To(Equal(1))
|
||||
Expect(podResources.PodResources[0].Containers[0].Name).To(Equal(pod1.Spec.Containers[0].Name))
|
||||
Expect(len(podResources.PodResources[0].Containers[0].Devices)).To(Equal(1))
|
||||
Expect(podResources.PodResources[0].Containers[0].Devices[0].ResourceName).To(Equal(resourceName))
|
||||
Expect(len(podResources.PodResources[0].Containers[0].Devices[0].DeviceIds)).To(Equal(1))
|
||||
|
||||
pod1, err = f.PodClient().Get(pod1.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/klog"
|
||||
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
|
@ -38,11 +39,14 @@ import (
|
|||
"k8s.io/kubernetes/pkg/features"
|
||||
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
|
||||
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cm"
|
||||
kubeletconfigcodec "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/codec"
|
||||
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/kubelet/remote"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/framework/metrics"
|
||||
frameworkmetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
|
||||
|
@ -62,6 +66,10 @@ var busyboxImage = imageutils.GetE2EImage(imageutils.BusyBox)
|
|||
const (
|
||||
// Kubelet internal cgroup name for node allocatable cgroup.
|
||||
defaultNodeAllocatableCgroup = "kubepods"
|
||||
// defaultPodResourcesPath is the path to the local endpoint serving the podresources GRPC service.
|
||||
defaultPodResourcesPath = "/var/lib/kubelet/pod-resources"
|
||||
defaultPodResourcesTimeout = 10 * time.Second
|
||||
defaultPodResourcesMaxSize = 1024 * 1024 * 16 // 16 Mb
|
||||
)
|
||||
|
||||
func getNodeSummary() (*stats.Summary, error) {
|
||||
|
@ -92,6 +100,22 @@ func getNodeSummary() (*stats.Summary, error) {
|
|||
return &summary, nil
|
||||
}
|
||||
|
||||
func getNodeDevices() (*podresourcesapi.ListPodResourcesResponse, error) {
|
||||
endpoint := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
|
||||
client, conn, err := podresources.GetClient(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error getting grpc client: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
resp, err := client.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v.Get(_) = _, %v", client, err)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Returns the current KubeletConfiguration
|
||||
func getCurrentKubeletConfig() (*kubeletconfig.KubeletConfiguration, error) {
|
||||
resp := pollConfigz(5*time.Minute, 5*time.Second)
|
||||
|
|
Loading…
Reference in New Issue