Invoke PreStart RPC call before container start, if desired by plugin

Signed-off-by: vikaschoudhary16 <vichoudh@redhat.com>
pull/6/head
vikaschoudhary16 2018-01-15 05:40:52 -05:00 committed by vikaschoudhary16
parent 852e7f7bfa
commit defcab81d5
13 changed files with 857 additions and 196 deletions

View File

@ -33,9 +33,3 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
filegroup(
name = "go_default_library_protos",
srcs = ["api.proto"],
visibility = ["//visibility:public"],
)

View File

@ -19,22 +19,25 @@ limitations under the License.
// DO NOT EDIT!
/*
Package v1beta1 is a generated protocol buffer package.
Package deviceplugin is a generated protocol buffer package.
It is generated from these files:
api.proto
It has these top-level messages:
DevicePluginOptions
RegisterRequest
Empty
ListAndWatchResponse
Device
PreStartContainerRequest
PreStartContainerResponse
AllocateRequest
AllocateResponse
Mount
DeviceSpec
*/
package v1beta1
package deviceplugin
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
@ -63,6 +66,22 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type DevicePluginOptions struct {
// Indicates if PreStartContainer call is required before each container start
PreStartRequired bool `protobuf:"varint,1,opt,name=pre_start_required,json=preStartRequired,proto3" json:"pre_start_required,omitempty"`
}
func (m *DevicePluginOptions) Reset() { *m = DevicePluginOptions{} }
func (*DevicePluginOptions) ProtoMessage() {}
func (*DevicePluginOptions) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{0} }
func (m *DevicePluginOptions) GetPreStartRequired() bool {
if m != nil {
return m.PreStartRequired
}
return false
}
type RegisterRequest struct {
// Version of the API the Device Plugin was built against
Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"`
@ -71,11 +90,13 @@ type RegisterRequest struct {
Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
// Schedulable resource name. As of now it's expected to be a DNS Label
ResourceName string `protobuf:"bytes,3,opt,name=resource_name,json=resourceName,proto3" json:"resource_name,omitempty"`
// Options to be communicated with Device Manager
Options *DevicePluginOptions `protobuf:"bytes,4,opt,name=options" json:"options,omitempty"`
}
func (m *RegisterRequest) Reset() { *m = RegisterRequest{} }
func (*RegisterRequest) ProtoMessage() {}
func (*RegisterRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{0} }
func (*RegisterRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{1} }
func (m *RegisterRequest) GetVersion() string {
if m != nil {
@ -98,12 +119,19 @@ func (m *RegisterRequest) GetResourceName() string {
return ""
}
func (m *RegisterRequest) GetOptions() *DevicePluginOptions {
if m != nil {
return m.Options
}
return nil
}
type Empty struct {
}
func (m *Empty) Reset() { *m = Empty{} }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{1} }
func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{2} }
// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disapears, ListAndWatch
@ -114,7 +142,7 @@ type ListAndWatchResponse struct {
func (m *ListAndWatchResponse) Reset() { *m = ListAndWatchResponse{} }
func (*ListAndWatchResponse) ProtoMessage() {}
func (*ListAndWatchResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{2} }
func (*ListAndWatchResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{3} }
func (m *ListAndWatchResponse) GetDevices() []*Device {
if m != nil {
@ -139,7 +167,7 @@ type Device struct {
func (m *Device) Reset() { *m = Device{} }
func (*Device) ProtoMessage() {}
func (*Device) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{3} }
func (*Device) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{4} }
func (m *Device) GetID() string {
if m != nil {
@ -155,6 +183,33 @@ func (m *Device) GetHealth() string {
return ""
}
// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
type PreStartContainerRequest struct {
DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"`
}
func (m *PreStartContainerRequest) Reset() { *m = PreStartContainerRequest{} }
func (*PreStartContainerRequest) ProtoMessage() {}
func (*PreStartContainerRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{5} }
func (m *PreStartContainerRequest) GetDevicesIDs() []string {
if m != nil {
return m.DevicesIDs
}
return nil
}
// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
type PreStartContainerResponse struct {
}
func (m *PreStartContainerResponse) Reset() { *m = PreStartContainerResponse{} }
func (*PreStartContainerResponse) ProtoMessage() {}
func (*PreStartContainerResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{6} }
// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
@ -167,7 +222,7 @@ type AllocateRequest struct {
func (m *AllocateRequest) Reset() { *m = AllocateRequest{} }
func (*AllocateRequest) ProtoMessage() {}
func (*AllocateRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{4} }
func (*AllocateRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{7} }
func (m *AllocateRequest) GetDevicesIDs() []string {
if m != nil {
@ -197,7 +252,7 @@ type AllocateResponse struct {
func (m *AllocateResponse) Reset() { *m = AllocateResponse{} }
func (*AllocateResponse) ProtoMessage() {}
func (*AllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{5} }
func (*AllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{8} }
func (m *AllocateResponse) GetEnvs() map[string]string {
if m != nil {
@ -240,7 +295,7 @@ type Mount struct {
func (m *Mount) Reset() { *m = Mount{} }
func (*Mount) ProtoMessage() {}
func (*Mount) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{6} }
func (*Mount) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{9} }
func (m *Mount) GetContainerPath() string {
if m != nil {
@ -278,7 +333,7 @@ type DeviceSpec struct {
func (m *DeviceSpec) Reset() { *m = DeviceSpec{} }
func (*DeviceSpec) ProtoMessage() {}
func (*DeviceSpec) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{7} }
func (*DeviceSpec) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{10} }
func (m *DeviceSpec) GetContainerPath() string {
if m != nil {
@ -302,14 +357,17 @@ func (m *DeviceSpec) GetPermissions() string {
}
func init() {
proto.RegisterType((*RegisterRequest)(nil), "v1beta1.RegisterRequest")
proto.RegisterType((*Empty)(nil), "v1beta1.Empty")
proto.RegisterType((*ListAndWatchResponse)(nil), "v1beta1.ListAndWatchResponse")
proto.RegisterType((*Device)(nil), "v1beta1.Device")
proto.RegisterType((*AllocateRequest)(nil), "v1beta1.AllocateRequest")
proto.RegisterType((*AllocateResponse)(nil), "v1beta1.AllocateResponse")
proto.RegisterType((*Mount)(nil), "v1beta1.Mount")
proto.RegisterType((*DeviceSpec)(nil), "v1beta1.DeviceSpec")
proto.RegisterType((*DevicePluginOptions)(nil), "deviceplugin.DevicePluginOptions")
proto.RegisterType((*RegisterRequest)(nil), "deviceplugin.RegisterRequest")
proto.RegisterType((*Empty)(nil), "deviceplugin.Empty")
proto.RegisterType((*ListAndWatchResponse)(nil), "deviceplugin.ListAndWatchResponse")
proto.RegisterType((*Device)(nil), "deviceplugin.Device")
proto.RegisterType((*PreStartContainerRequest)(nil), "deviceplugin.PreStartContainerRequest")
proto.RegisterType((*PreStartContainerResponse)(nil), "deviceplugin.PreStartContainerResponse")
proto.RegisterType((*AllocateRequest)(nil), "deviceplugin.AllocateRequest")
proto.RegisterType((*AllocateResponse)(nil), "deviceplugin.AllocateResponse")
proto.RegisterType((*Mount)(nil), "deviceplugin.Mount")
proto.RegisterType((*DeviceSpec)(nil), "deviceplugin.DeviceSpec")
}
// Reference imports to suppress errors if they are not otherwise used.
@ -336,7 +394,7 @@ func NewRegistrationClient(cc *grpc.ClientConn) RegistrationClient {
func (c *registrationClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := grpc.Invoke(ctx, "/v1beta1.Registration/Register", in, out, c.cc, opts...)
err := grpc.Invoke(ctx, "/deviceplugin.Registration/Register", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
@ -363,7 +421,7 @@ func _Registration_Register_Handler(srv interface{}, ctx context.Context, dec fu
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/v1beta1.Registration/Register",
FullMethod: "/deviceplugin.Registration/Register",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RegistrationServer).Register(ctx, req.(*RegisterRequest))
@ -372,7 +430,7 @@ func _Registration_Register_Handler(srv interface{}, ctx context.Context, dec fu
}
var _Registration_serviceDesc = grpc.ServiceDesc{
ServiceName: "v1beta1.Registration",
ServiceName: "deviceplugin.Registration",
HandlerType: (*RegistrationServer)(nil),
Methods: []grpc.MethodDesc{
{
@ -395,6 +453,10 @@ type DevicePluginClient interface {
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
Allocate(ctx context.Context, in *AllocateRequest, opts ...grpc.CallOption) (*AllocateResponse, error)
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption) (*PreStartContainerResponse, error)
}
type devicePluginClient struct {
@ -406,7 +468,7 @@ func NewDevicePluginClient(cc *grpc.ClientConn) DevicePluginClient {
}
func (c *devicePluginClient) ListAndWatch(ctx context.Context, in *Empty, opts ...grpc.CallOption) (DevicePlugin_ListAndWatchClient, error) {
stream, err := grpc.NewClientStream(ctx, &_DevicePlugin_serviceDesc.Streams[0], c.cc, "/v1beta1.DevicePlugin/ListAndWatch", opts...)
stream, err := grpc.NewClientStream(ctx, &_DevicePlugin_serviceDesc.Streams[0], c.cc, "/deviceplugin.DevicePlugin/ListAndWatch", opts...)
if err != nil {
return nil, err
}
@ -439,7 +501,16 @@ func (x *devicePluginListAndWatchClient) Recv() (*ListAndWatchResponse, error) {
func (c *devicePluginClient) Allocate(ctx context.Context, in *AllocateRequest, opts ...grpc.CallOption) (*AllocateResponse, error) {
out := new(AllocateResponse)
err := grpc.Invoke(ctx, "/v1beta1.DevicePlugin/Allocate", in, out, c.cc, opts...)
err := grpc.Invoke(ctx, "/deviceplugin.DevicePlugin/Allocate", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *devicePluginClient) PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption) (*PreStartContainerResponse, error) {
out := new(PreStartContainerResponse)
err := grpc.Invoke(ctx, "/deviceplugin.DevicePlugin/PreStartContainer", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
@ -457,6 +528,10 @@ type DevicePluginServer interface {
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error)
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error)
}
func RegisterDevicePluginServer(s *grpc.Server, srv DevicePluginServer) {
@ -494,7 +569,7 @@ func _DevicePlugin_Allocate_Handler(srv interface{}, ctx context.Context, dec fu
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/v1beta1.DevicePlugin/Allocate",
FullMethod: "/deviceplugin.DevicePlugin/Allocate",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DevicePluginServer).Allocate(ctx, req.(*AllocateRequest))
@ -502,14 +577,36 @@ func _DevicePlugin_Allocate_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _DevicePlugin_PreStartContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PreStartContainerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DevicePluginServer).PreStartContainer(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/deviceplugin.DevicePlugin/PreStartContainer",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DevicePluginServer).PreStartContainer(ctx, req.(*PreStartContainerRequest))
}
return interceptor(ctx, in, info, handler)
}
var _DevicePlugin_serviceDesc = grpc.ServiceDesc{
ServiceName: "v1beta1.DevicePlugin",
ServiceName: "deviceplugin.DevicePlugin",
HandlerType: (*DevicePluginServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Allocate",
Handler: _DevicePlugin_Allocate_Handler,
},
{
MethodName: "PreStartContainer",
Handler: _DevicePlugin_PreStartContainer_Handler,
},
},
Streams: []grpc.StreamDesc{
{
@ -521,6 +618,34 @@ var _DevicePlugin_serviceDesc = grpc.ServiceDesc{
Metadata: "api.proto",
}
func (m *DevicePluginOptions) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *DevicePluginOptions) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.PreStartRequired {
dAtA[i] = 0x8
i++
if m.PreStartRequired {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i++
}
return i, nil
}
func (m *RegisterRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -554,6 +679,16 @@ func (m *RegisterRequest) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintApi(dAtA, i, uint64(len(m.ResourceName)))
i += copy(dAtA[i:], m.ResourceName)
}
if m.Options != nil {
dAtA[i] = 0x22
i++
i = encodeVarintApi(dAtA, i, uint64(m.Options.Size()))
n1, err := m.Options.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
return i, nil
}
@ -635,6 +770,57 @@ func (m *Device) MarshalTo(dAtA []byte) (int, error) {
return i, nil
}
func (m *PreStartContainerRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PreStartContainerRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.DevicesIDs) > 0 {
for _, s := range m.DevicesIDs {
dAtA[i] = 0xa
i++
l = len(s)
for l >= 1<<7 {
dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
l >>= 7
i++
}
dAtA[i] = uint8(l)
i++
i += copy(dAtA[i:], s)
}
}
return i, nil
}
func (m *PreStartContainerResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PreStartContainerResponse) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
return i, nil
}
func (m *AllocateRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -847,6 +1033,15 @@ func encodeVarintApi(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *DevicePluginOptions) Size() (n int) {
var l int
_ = l
if m.PreStartRequired {
n += 2
}
return n
}
func (m *RegisterRequest) Size() (n int) {
var l int
_ = l
@ -862,6 +1057,10 @@ func (m *RegisterRequest) Size() (n int) {
if l > 0 {
n += 1 + l + sovApi(uint64(l))
}
if m.Options != nil {
l = m.Options.Size()
n += 1 + l + sovApi(uint64(l))
}
return n
}
@ -897,6 +1096,24 @@ func (m *Device) Size() (n int) {
return n
}
func (m *PreStartContainerRequest) Size() (n int) {
var l int
_ = l
if len(m.DevicesIDs) > 0 {
for _, s := range m.DevicesIDs {
l = len(s)
n += 1 + l + sovApi(uint64(l))
}
}
return n
}
func (m *PreStartContainerResponse) Size() (n int) {
var l int
_ = l
return n
}
func (m *AllocateRequest) Size() (n int) {
var l int
_ = l
@ -991,6 +1208,16 @@ func sovApi(x uint64) (n int) {
func sozApi(x uint64) (n int) {
return sovApi(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (this *DevicePluginOptions) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&DevicePluginOptions{`,
`PreStartRequired:` + fmt.Sprintf("%v", this.PreStartRequired) + `,`,
`}`,
}, "")
return s
}
func (this *RegisterRequest) String() string {
if this == nil {
return "nil"
@ -999,6 +1226,7 @@ func (this *RegisterRequest) String() string {
`Version:` + fmt.Sprintf("%v", this.Version) + `,`,
`Endpoint:` + fmt.Sprintf("%v", this.Endpoint) + `,`,
`ResourceName:` + fmt.Sprintf("%v", this.ResourceName) + `,`,
`Options:` + strings.Replace(fmt.Sprintf("%v", this.Options), "DevicePluginOptions", "DevicePluginOptions", 1) + `,`,
`}`,
}, "")
return s
@ -1033,6 +1261,25 @@ func (this *Device) String() string {
}, "")
return s
}
func (this *PreStartContainerRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&PreStartContainerRequest{`,
`DevicesIDs:` + fmt.Sprintf("%v", this.DevicesIDs) + `,`,
`}`,
}, "")
return s
}
func (this *PreStartContainerResponse) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&PreStartContainerResponse{`,
`}`,
}, "")
return s
}
func (this *AllocateRequest) String() string {
if this == nil {
return "nil"
@ -1108,6 +1355,76 @@ func valueToStringApi(v interface{}) string {
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
func (m *DevicePluginOptions) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: DevicePluginOptions: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: DevicePluginOptions: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field PreStartRequired", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.PreStartRequired = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *RegisterRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -1224,6 +1541,39 @@ func (m *RegisterRequest) Unmarshal(dAtA []byte) error {
}
m.ResourceName = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Options", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Options == nil {
m.Options = &DevicePluginOptions{}
}
if err := m.Options.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
@ -1484,6 +1834,135 @@ func (m *Device) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *PreStartContainerRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PreStartContainerRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PreStartContainerRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field DevicesIDs", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.DevicesIDs = append(m.DevicesIDs, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PreStartContainerResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PreStartContainerResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PreStartContainerResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *AllocateRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -2280,43 +2759,48 @@ var (
func init() { proto.RegisterFile("api.proto", fileDescriptorApi) }
var fileDescriptorApi = []byte{
// 594 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x4b, 0x6f, 0xd3, 0x4c,
0x14, 0x8d, 0x93, 0x36, 0x8f, 0xdb, 0xf4, 0xa1, 0xf9, 0xaa, 0x4f, 0xc6, 0x80, 0x55, 0xb9, 0x02,
0x15, 0xa4, 0xa6, 0x0f, 0x24, 0x8a, 0x58, 0x20, 0x19, 0xa5, 0x48, 0x95, 0x0a, 0x54, 0x66, 0xc1,
0x32, 0x9a, 0x38, 0x97, 0x78, 0x84, 0x3d, 0x63, 0x3c, 0x63, 0x4b, 0xd9, 0xf1, 0x13, 0x58, 0xf0,
0xa3, 0xba, 0x64, 0xc9, 0x92, 0x86, 0x15, 0xff, 0x02, 0x65, 0xfc, 0x48, 0x64, 0xd1, 0x05, 0x12,
0x3b, 0xdf, 0x73, 0xef, 0xb1, 0xcf, 0x9c, 0xb9, 0xc7, 0xd0, 0xa3, 0x31, 0x1b, 0xc4, 0x89, 0x50,
0x82, 0x74, 0xb2, 0x93, 0x31, 0x2a, 0x7a, 0x62, 0x1d, 0x4e, 0x99, 0x0a, 0xd2, 0xf1, 0xc0, 0x17,
0xd1, 0xd1, 0x54, 0x4c, 0xc5, 0x91, 0xee, 0x8f, 0xd3, 0x0f, 0xba, 0xd2, 0x85, 0x7e, 0xca, 0x79,
0x4e, 0x08, 0xdb, 0x1e, 0x4e, 0x99, 0x54, 0x98, 0x78, 0xf8, 0x29, 0x45, 0xa9, 0x88, 0x09, 0x9d,
0x0c, 0x13, 0xc9, 0x04, 0x37, 0x8d, 0x3d, 0xe3, 0xa0, 0xe7, 0x95, 0x25, 0xb1, 0xa0, 0x8b, 0x7c,
0x12, 0x0b, 0xc6, 0x95, 0xd9, 0xd4, 0xad, 0xaa, 0x26, 0xfb, 0xb0, 0x99, 0xa0, 0x14, 0x69, 0xe2,
0xe3, 0x88, 0xd3, 0x08, 0xcd, 0x96, 0x1e, 0xe8, 0x97, 0xe0, 0x1b, 0x1a, 0xa1, 0xd3, 0x81, 0xf5,
0xf3, 0x28, 0x56, 0x33, 0xc7, 0x85, 0xdd, 0x4b, 0x26, 0x95, 0xcb, 0x27, 0xef, 0xa9, 0xf2, 0x03,
0x0f, 0x65, 0x2c, 0xb8, 0x44, 0xf2, 0x08, 0x3a, 0x13, 0xcc, 0x98, 0x8f, 0xd2, 0x34, 0xf6, 0x5a,
0x07, 0x1b, 0xa7, 0xdb, 0x83, 0xe2, 0x60, 0x83, 0xa1, 0xc6, 0xbd, 0xb2, 0xef, 0x1c, 0x43, 0x3b,
0x87, 0xc8, 0x16, 0x34, 0x2f, 0x86, 0x85, 0xd6, 0x26, 0x1b, 0x92, 0xff, 0xa1, 0x1d, 0x20, 0x0d,
0x55, 0x50, 0x88, 0x2c, 0x2a, 0xe7, 0x04, 0xb6, 0xdd, 0x30, 0x14, 0x3e, 0x55, 0x58, 0x9e, 0xd5,
0x06, 0x28, 0xde, 0x77, 0x31, 0xcc, 0x3f, 0xd9, 0xf3, 0x56, 0x10, 0xe7, 0x57, 0x13, 0x76, 0x96,
0x9c, 0x42, 0xe4, 0x19, 0xac, 0x21, 0xcf, 0x4a, 0x85, 0xfb, 0x95, 0xc2, 0xfa, 0xe0, 0xe0, 0x9c,
0x67, 0xf2, 0x9c, 0xab, 0x64, 0xe6, 0x69, 0x02, 0x79, 0x08, 0xed, 0x48, 0xa4, 0x5c, 0x49, 0xb3,
0xa9, 0xa9, 0x5b, 0x15, 0xf5, 0xf5, 0x02, 0xf6, 0x8a, 0x2e, 0x39, 0x5c, 0xba, 0xd0, 0xd2, 0x83,
0xff, 0xd5, 0x5c, 0x78, 0x17, 0xa3, 0x5f, 0x39, 0x41, 0x2e, 0x61, 0x83, 0x72, 0x2e, 0x14, 0x55,
0x4c, 0x70, 0x69, 0xae, 0x69, 0xca, 0xe3, 0xdb, 0x65, 0xb9, 0xcb, 0xe1, 0x5c, 0xdd, 0x2a, 0xdd,
0x3a, 0x83, 0x5e, 0xa5, 0x9b, 0xec, 0x40, 0xeb, 0x23, 0xce, 0x0a, 0x6f, 0x17, 0x8f, 0x64, 0x17,
0xd6, 0x33, 0x1a, 0xa6, 0x58, 0x78, 0x9b, 0x17, 0xcf, 0x9b, 0xcf, 0x0c, 0xeb, 0x05, 0xec, 0xd4,
0xdf, 0xfc, 0x37, 0x7c, 0x27, 0x80, 0x75, 0x6d, 0x03, 0x79, 0x00, 0x5b, 0xbe, 0xe0, 0x8a, 0x32,
0x8e, 0xc9, 0x28, 0xa6, 0x2a, 0x28, 0xf8, 0x9b, 0x15, 0x7a, 0x45, 0x55, 0x40, 0xee, 0x42, 0x2f,
0x10, 0x52, 0xe5, 0x13, 0xc5, 0x3a, 0x2e, 0x80, 0xb2, 0x99, 0x20, 0x9d, 0x8c, 0x04, 0x0f, 0x67,
0x7a, 0x15, 0xbb, 0x5e, 0x77, 0x01, 0xbc, 0xe5, 0xe1, 0xcc, 0x49, 0x00, 0x96, 0x3e, 0xfe, 0x93,
0xcf, 0xed, 0xc1, 0x46, 0x8c, 0x49, 0xc4, 0xa4, 0xd4, 0x57, 0x90, 0xef, 0xfe, 0x2a, 0x74, 0xfa,
0x0a, 0xfa, 0x79, 0xd0, 0x12, 0xed, 0x0f, 0x79, 0x0a, 0xdd, 0x32, 0x78, 0xc4, 0xac, 0xee, 0xaa,
0x96, 0x45, 0x6b, 0xb9, 0x21, 0x79, 0x6e, 0x1a, 0xa7, 0x5f, 0x0d, 0xe8, 0xe7, 0xe2, 0xaf, 0xc2,
0x74, 0xca, 0x38, 0x71, 0xa1, 0xbf, 0x1a, 0x25, 0x52, 0xa3, 0x58, 0xf7, 0xab, 0xfa, 0x4f, 0x89,
0x73, 0x1a, 0xc7, 0x06, 0x71, 0xa1, 0x5b, 0x2e, 0xc9, 0x8a, 0x96, 0x5a, 0x56, 0xac, 0x3b, 0xb7,
0x6e, 0x94, 0xd3, 0x78, 0x79, 0xef, 0xfa, 0xc6, 0x36, 0xbe, 0xdf, 0xd8, 0x8d, 0xcf, 0x73, 0xdb,
0xb8, 0x9e, 0xdb, 0xc6, 0xb7, 0xb9, 0x6d, 0xfc, 0x98, 0xdb, 0xc6, 0x97, 0x9f, 0x76, 0x63, 0xdc,
0xd6, 0x3f, 0x9b, 0x27, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x65, 0x2a, 0x0d, 0x36, 0xb1, 0x04,
0x00, 0x00,
// 687 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0x8e, 0x93, 0x36, 0x3f, 0x93, 0xb4, 0x0d, 0xdb, 0x0a, 0x19, 0x17, 0xac, 0x60, 0x04, 0x44,
0x02, 0xd2, 0x12, 0x0e, 0xa0, 0x82, 0x10, 0xa5, 0x29, 0x52, 0x55, 0xa0, 0xc1, 0x3d, 0x70, 0x8c,
0xb6, 0xce, 0x36, 0xb6, 0x70, 0x76, 0xcd, 0xee, 0x3a, 0x52, 0x6e, 0x3c, 0x42, 0x5f, 0x83, 0x37,
0xe9, 0x91, 0x23, 0x47, 0x1a, 0x9e, 0x03, 0x09, 0x79, 0x6d, 0xe7, 0x8f, 0x40, 0x41, 0xe2, 0xe6,
0xf9, 0x76, 0xbe, 0xf1, 0xcc, 0xb7, 0xfb, 0x0d, 0x94, 0x70, 0xe0, 0x35, 0x02, 0xce, 0x24, 0x43,
0x95, 0x2e, 0x19, 0x78, 0x0e, 0x09, 0xfc, 0xb0, 0xe7, 0x51, 0xe3, 0x41, 0xcf, 0x93, 0x6e, 0x78,
0xd2, 0x70, 0x58, 0x7f, 0xab, 0xc7, 0x7a, 0x6c, 0x4b, 0x25, 0x9d, 0x84, 0xa7, 0x2a, 0x52, 0x81,
0xfa, 0x8a, 0xc9, 0xd6, 0x1e, 0xac, 0xb7, 0x14, 0xbd, 0xad, 0xe8, 0x47, 0x81, 0xf4, 0x18, 0x15,
0xe8, 0x3e, 0xa0, 0x80, 0x93, 0x8e, 0x90, 0x98, 0xcb, 0x0e, 0x27, 0x1f, 0x43, 0x8f, 0x93, 0xae,
0xae, 0xd5, 0xb4, 0x7a, 0xd1, 0xae, 0x06, 0x9c, 0x1c, 0x47, 0x07, 0x76, 0x82, 0x5b, 0x9f, 0x35,
0x58, 0xb3, 0x49, 0xcf, 0x13, 0x92, 0xf0, 0x08, 0x24, 0x42, 0x22, 0x1d, 0x0a, 0x03, 0xc2, 0x85,
0xc7, 0xa8, 0xa2, 0x95, 0xec, 0x34, 0x44, 0x06, 0x14, 0x09, 0xed, 0x06, 0xcc, 0xa3, 0x52, 0xcf,
0xaa, 0xa3, 0x71, 0x8c, 0x6e, 0xc1, 0x0a, 0x27, 0x82, 0x85, 0xdc, 0x21, 0x1d, 0x8a, 0xfb, 0x44,
0xcf, 0xa9, 0x84, 0x4a, 0x0a, 0xbe, 0xc5, 0x7d, 0x82, 0x9e, 0x42, 0x81, 0xc5, 0x7d, 0xea, 0x4b,
0x35, 0xad, 0x5e, 0x6e, 0xde, 0x6c, 0x4c, 0x4b, 0xd0, 0x58, 0x30, 0x90, 0x9d, 0x32, 0xac, 0x02,
0x2c, 0xef, 0xf7, 0x03, 0x39, 0xb4, 0x5e, 0xc1, 0xc6, 0x6b, 0x4f, 0xc8, 0x5d, 0xda, 0x7d, 0x8f,
0xa5, 0xe3, 0xda, 0x44, 0x04, 0x8c, 0x0a, 0x82, 0x1a, 0x50, 0x88, 0xab, 0x09, 0x5d, 0xab, 0xe5,
0xea, 0xe5, 0xe6, 0xc6, 0xa2, 0xea, 0x76, 0x9a, 0x64, 0x6d, 0x43, 0x3e, 0x86, 0xd0, 0x2a, 0x64,
0x0f, 0x5a, 0xc9, 0xb4, 0x59, 0xaf, 0x85, 0xae, 0x42, 0xde, 0x25, 0xd8, 0x97, 0x6e, 0x32, 0x66,
0x12, 0x59, 0x3b, 0xa0, 0xb7, 0x13, 0x09, 0xf7, 0x18, 0x95, 0xd8, 0xa3, 0x13, 0xd9, 0x4c, 0x80,
0xa4, 0xf0, 0x41, 0x2b, 0x6e, 0xa0, 0x64, 0x4f, 0x21, 0xd6, 0x26, 0x5c, 0x5b, 0xc0, 0x8d, 0x5b,
0xb7, 0x1e, 0xc2, 0xda, 0xae, 0xef, 0x33, 0x07, 0x4b, 0xf2, 0xb7, 0xf5, 0x7e, 0x64, 0xa1, 0x3a,
0xe1, 0x24, 0x12, 0x3c, 0x83, 0x25, 0x42, 0x07, 0xe9, 0xfc, 0xf5, 0xd9, 0xf9, 0xe7, 0xb3, 0x1b,
0xfb, 0x74, 0x20, 0xf6, 0xa9, 0xe4, 0x43, 0x5b, 0xb1, 0xd0, 0x3d, 0xc8, 0xf7, 0x59, 0x48, 0xa5,
0xd0, 0xb3, 0x8a, 0xbf, 0x3e, 0xcb, 0x7f, 0x13, 0x9d, 0xd9, 0x49, 0x0a, 0x6a, 0x4e, 0xd4, 0xce,
0xa9, 0x6c, 0x7d, 0x91, 0xda, 0xc7, 0x01, 0x71, 0xc6, 0x8a, 0xa3, 0x77, 0x50, 0xc6, 0x94, 0x32,
0x89, 0xd3, 0x37, 0x10, 0xf1, 0xb6, 0x2e, 0xe9, 0x72, 0x77, 0xc2, 0x88, 0x9b, 0x9d, 0xae, 0x61,
0x3c, 0x86, 0xd2, 0x78, 0x0c, 0x54, 0x85, 0xdc, 0x07, 0x32, 0x4c, 0x2e, 0x32, 0xfa, 0x44, 0x1b,
0xb0, 0x3c, 0xc0, 0x7e, 0x48, 0x92, 0x8b, 0x8c, 0x83, 0x9d, 0xec, 0x13, 0xcd, 0x78, 0x0e, 0xd5,
0xf9, 0xca, 0xff, 0xc2, 0xb7, 0x5c, 0x58, 0x56, 0x82, 0xa0, 0xdb, 0xb0, 0xea, 0xa4, 0x17, 0xda,
0x09, 0xb0, 0x74, 0x13, 0xfe, 0xca, 0x18, 0x6d, 0x63, 0xe9, 0xa2, 0x4d, 0x28, 0xb9, 0x4c, 0xc8,
0x38, 0x23, 0x71, 0x4f, 0x04, 0xa4, 0x87, 0x9c, 0xe0, 0x6e, 0x87, 0x51, 0x7f, 0xa8, 0x9c, 0x53,
0xb4, 0x8b, 0x11, 0x70, 0x44, 0xfd, 0xa1, 0xc5, 0x01, 0x26, 0x62, 0xfe, 0x97, 0xdf, 0xd5, 0xa0,
0x1c, 0x10, 0xde, 0xf7, 0x84, 0x50, 0xf7, 0x10, 0x5b, 0x75, 0x1a, 0x6a, 0xb6, 0xa1, 0x12, 0xef,
0x05, 0xae, 0xf4, 0x41, 0x2f, 0xa0, 0x98, 0xee, 0x09, 0x74, 0x63, 0xf6, 0xc2, 0xe6, 0xf6, 0x87,
0x31, 0xf7, 0x6a, 0x62, 0xcf, 0x66, 0x9a, 0x67, 0x59, 0xa8, 0x4c, 0xfb, 0x1b, 0x1d, 0x42, 0x65,
0xda, 0xc6, 0x68, 0x11, 0xcf, 0xb0, 0x66, 0xc1, 0x45, 0xbe, 0xb7, 0x32, 0xdb, 0x1a, 0x3a, 0x84,
0x62, 0xfa, 0x70, 0xe6, 0xfb, 0x9b, 0x33, 0x96, 0x61, 0xfe, 0xf9, 0xbd, 0x59, 0x19, 0x74, 0x0a,
0x57, 0x7e, 0xb1, 0x2a, 0xba, 0x33, 0x4b, 0xfb, 0xdd, 0x1e, 0x30, 0xee, 0x5e, 0x9a, 0x97, 0xfe,
0xe7, 0xe5, 0xf5, 0xf3, 0x0b, 0x53, 0xfb, 0x7a, 0x61, 0x66, 0x3e, 0x8d, 0x4c, 0xed, 0x7c, 0x64,
0x6a, 0x5f, 0x46, 0xa6, 0xf6, 0x6d, 0x64, 0x6a, 0x67, 0xdf, 0xcd, 0xcc, 0x49, 0x5e, 0xed, 0xf9,
0x47, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x06, 0x87, 0xf7, 0x82, 0x31, 0x06, 0x00, 0x00,
}

View File

@ -1,7 +1,7 @@
// To regenerate api.pb.go run hack/update-device-plugin.sh
syntax = 'proto3';
package v1beta1;
package deviceplugin;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
@ -24,6 +24,11 @@ service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}
message DevicePluginOptions {
// Indicates if PreStartContainer call is required before each container start
bool pre_start_required = 1;
}
message RegisterRequest {
// Version of the API the Device Plugin was built against
string version = 1;
@ -32,6 +37,8 @@ message RegisterRequest {
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
}
message Empty {
@ -48,6 +55,11 @@ service DevicePlugin {
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}
// ListAndWatch returns a stream of List of Devices
@ -71,6 +83,18 @@ message Device {
string health = 2;
}
// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
message PreStartContainerRequest {
repeated string devicesIDs = 1;
}
// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
message PreStartContainerResponse {
}
// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's

View File

@ -30,4 +30,6 @@ const (
DevicePluginPath = "/var/lib/kubelet/device-plugins/"
// KubeletSocket is the path of the Kubelet registry socket
KubeletSocket = DevicePluginPath + "kubelet.sock"
// Timeout duration in secs for PreStartContainer RPC
KubeletPreStartContainerRPCTimeoutInSecs = 30
)

View File

@ -601,8 +601,10 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
opts := &kubecontainer.RunContainerOptions{}
// Allocate should already be called during predicateAdmitHandler.Admit(),
// just try to fetch device runtime information from cached state here
devOpts := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
if devOpts == nil {
devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
if err != nil {
return nil, err
} else if devOpts == nil {
return opts, nil
}
opts.Devices = append(opts.Devices, devOpts.Devices...)

View File

@ -105,7 +105,7 @@ func (m *Stub) Stop() error {
}
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *Stub) Register(kubeletEndpoint, resourceName string) error {
func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerFlag bool) error {
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(10*time.Second),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
@ -120,6 +120,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string) error {
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
Options: &pluginapi.DevicePluginOptions{PreStartRequired: preStartContainerFlag},
}
_, err = client.Register(context.Background(), reqt)
@ -129,6 +130,12 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string) error {
return nil
}
// PreStartContainer resets the devices received
func (m *Stub) PreStartContainer(ctx context.Context, r *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
log.Printf("PreStartContainer, %+v", r)
return &pluginapi.PreStartContainerResponse{}, nil
}
// ListAndWatch lists devices and update that list according to the Update call
func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
log.Println("ListAndWatch")

View File

@ -36,6 +36,7 @@ type endpoint interface {
run()
stop()
allocate(devs []string) (*pluginapi.AllocateResponse, error)
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
getDevices() []pluginapi.Device
callback(resourceName string, added, updated, deleted []pluginapi.Device)
}
@ -65,8 +66,9 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina
client: client,
clientConn: c,
socketPath: socketPath,
resourceName: resourceName,
socketPath: socketPath,
resourceName: resourceName,
invokePreStartContainerBoolFlag: false,
devices: devices,
cb: callback,
@ -182,6 +184,15 @@ func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, err
})
}
// preStartContainer issues PreStartContainer gRPC call to the device plugin.
func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second)
defer cancel()
return e.client.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{
DevicesIDs: devs,
})
}
func (e *endpointImpl) stop() {
e.clientConn.Close()
}

View File

@ -85,6 +85,7 @@ type ManagerImpl struct {
// podDevices contains pod to allocated device mapping.
podDevices podDevices
store utilstore.Store
pluginOpts map[string]*pluginapi.DevicePluginOptions
}
type sourcesReadyStub struct{}
@ -112,6 +113,7 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
pluginOpts: make(map[string]*pluginapi.DevicePluginOptions),
podDevices: make(podDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
@ -201,6 +203,7 @@ func (m *ManagerImpl) checkpointFile() string {
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
glog.V(2).Infof("Starting Device Plugin manager")
fmt.Println("Starting Device Plugin manager")
m.activePods = activePods
m.sourcesReady = sourcesReady
@ -340,8 +343,10 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
m.mutex.Lock()
if r.Options != nil {
m.pluginOpts[r.ResourceName] = r.Options
}
// Check for potential re-registration during the initialization of new endpoint,
// and skip updating if re-registration happens.
// TODO: simplify the part once we have a better way to handle registered devices
@ -590,11 +595,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
resource := string(k)
needed := int(v.Value())
glog.V(3).Infof("needs %d %s", needed, resource)
_, registeredResource := m.healthyDevices[resource]
_, allocatedResource := m.allocatedDevices[resource]
// Continues if this is neither an active device plugin resource nor
// a resource we have previously allocated.
if !registeredResource && !allocatedResource {
if !m.isDevicePluginResource(resource) {
continue
}
// Updates allocatedDevices to garbage collect any stranded resources
@ -610,6 +611,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
if allocDevices == nil || len(allocDevices) <= 0 {
continue
}
startRPCTime := time.Now()
// Manager.Allocate involves RPC calls to device plugin, which
// could be heavy-weight. Therefore we want to perform this operation outside
@ -659,10 +661,60 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
podUID := string(pod.UID)
contName := container.Name
for k := range container.Resources.Limits {
resource := string(k)
if !m.isDevicePluginResource(resource) {
continue
}
err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
if err != nil {
return nil, err
}
}
m.mutex.Lock()
defer m.mutex.Unlock()
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name)
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
}
func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
m.mutex.Lock()
opts, ok := m.pluginOpts[resource]
if !ok {
m.mutex.Unlock()
glog.V(4).Infof("Plugin options not found in cache for resource: %s. Skip PreStartContainer", resource)
return nil
}
if !opts.PreStartRequired {
m.mutex.Unlock()
glog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource, %v", resource)
return nil
}
devices := m.podDevices.containerDevices(podUID, contName, resource)
if devices == nil {
m.mutex.Unlock()
return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
}
e, ok := m.endpoints[resource]
if !ok {
m.mutex.Unlock()
return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
}
m.mutex.Unlock()
devs := devices.UnsortedList()
glog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID)
_, err := e.preStartContainer(devs)
if err != nil {
return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
}
// TODO: Add metrics support for init RPC
return nil
}
// sanitizeNodeAllocatable scans through allocatedDevices in the device manager
@ -692,3 +744,14 @@ func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) {
node.SetAllocatableResource(newAllocatableResource)
}
}
func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
_, registeredResource := m.healthyDevices[resource]
_, allocatedResource := m.allocatedDevices[resource]
// Return true if this is either an active device plugin resource or
// a resource we have previously allocated.
if registeredResource || allocatedResource {
return true
}
return false
}

View File

@ -53,8 +53,8 @@ func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P
}
// GetDeviceRunContainerOptions simply returns nil.
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
return nil
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
return nil, nil
}
// GetCapacity simply returns nil capacity and empty removed resource list.

View File

@ -60,6 +60,7 @@ func TestNewManagerImpl(t *testing.T) {
defer os.RemoveAll(socketDir)
_, err = newManagerImpl(socketName)
require.NoError(t, err)
os.RemoveAll(socketDir)
}
func TestNewManagerImplStart(t *testing.T) {
@ -84,67 +85,68 @@ func TestDevicePluginReRegistration(t *testing.T) {
devsForRegistration := []*pluginapi.Device{
{ID: "Dev3", Health: pluginapi.Healthy},
}
for _, preStartContainerFlag := range []bool{false, true} {
expCallbackCount := int32(0)
callbackCount := int32(0)
callbackChan := make(chan int32)
callback := func(n string, a, u, r []pluginapi.Device) {
callbackCount++
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
expCallbackCount := int32(0)
callbackCount := int32(0)
callbackChan := make(chan int32)
callback := func(n string, a, u, r []pluginapi.Device) {
callbackCount++
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
t.FailNow()
}
callbackChan <- callbackCount
}
m, p1 := setup(t, devs, callback, socketName, pluginSocketName)
atomic.StoreInt32(&expCallbackCount, 1)
p1.Register(socketName, testResourceName, preStartContainerFlag)
// Wait for the first callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
callbackChan <- callbackCount
devices := m.Devices()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new")
err = p2.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 2)
p2.Register(socketName, testResourceName, preStartContainerFlag)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices2 := m.Devices()
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third")
err = p3.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 3)
p3.Register(socketName, testResourceName, preStartContainerFlag)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices3 := m.Devices()
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1)
close(callbackChan)
}
m, p1 := setup(t, devs, callback, socketName, pluginSocketName)
atomic.StoreInt32(&expCallbackCount, 1)
p1.Register(socketName, testResourceName)
// Wait for the first callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices := m.Devices()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new")
err = p2.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 2)
p2.Register(socketName, testResourceName)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices2 := m.Devices()
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third")
err = p3.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 3)
p3.Register(socketName, testResourceName)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices3 := m.Devices()
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1)
close(callbackChan)
}
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub) {
@ -359,9 +361,9 @@ func TestCheckpoint(t *testing.T) {
for podUID, containerDevices := range expectedPodDevices {
for conName, resources := range containerDevices {
for resource := range resources {
as.True(reflect.DeepEqual(
expectedPodDevices.containerDevices(podUID, conName, resource),
testManager.podDevices.containerDevices(podUID, conName, resource)))
expDevices := expectedPodDevices.containerDevices(podUID, conName, resource)
testDevices := testManager.podDevices.containerDevices(podUID, conName, resource)
as.True(reflect.DeepEqual(expDevices, testDevices))
opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName)
as.Equal(len(opts1.Envs), len(opts2.Envs))
@ -388,6 +390,7 @@ func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
type MockEndpoint struct {
allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
initChan chan []string
}
func (m *MockEndpoint) stop() {}
@ -399,6 +402,11 @@ func (m *MockEndpoint) getDevices() []pluginapi.Device {
func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {}
func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
m.initChan <- devs
return &pluginapi.PreStartContainerResponse{}, nil
}
func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
if m.allocateFunc != nil {
return m.allocateFunc(devs)
@ -423,7 +431,7 @@ func makePod(limits v1.ResourceList) *v1.Pod {
}
}
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) *ManagerImpl {
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) *ManagerImpl {
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
testManager := &ManagerImpl{
socketdir: tmpDir,
@ -431,6 +439,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpoint),
pluginOpts: opts,
podDevices: make(podDevices),
activePods: activePods,
sourcesReady: &sourcesReadyStub{},
@ -443,48 +452,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
}
if res.resourceName == "domain1.com/resource1" {
testManager.endpoints[res.resourceName] = &MockEndpoint{
allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.AllocateResponse)
resp.Envs = make(map[string]string)
for _, dev := range devs {
switch dev {
case "dev1":
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/aaa",
HostPath: "/dev/aaa",
Permissions: "mrw",
})
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/bbb",
HostPath: "/dev/bbb",
Permissions: "mrw",
})
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
ContainerPath: "/container_dir1/file1",
HostPath: "host_dir1/file1",
ReadOnly: true,
})
case "dev2":
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/ccc",
HostPath: "/dev/ccc",
Permissions: "mrw",
})
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
ContainerPath: "/container_dir1/file2",
HostPath: "host_dir1/file2",
ReadOnly: true,
})
resp.Envs["key1"] = "val1"
}
}
return resp, nil
},
allocateFunc: allocateStubFunc(),
}
}
if res.resourceName == "domain2.com/resource2" {
@ -549,7 +517,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
as.Nil(err)
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources)
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
testPods := []*v1.Pod{
makePod(v1.ResourceList{
@ -604,7 +573,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err)
}
runContainerOpts := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
as.Nil(err)
if testCase.expectedContainerOptsLen == nil {
as.Nil(runContainerOpts)
} else {
@ -642,7 +612,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources)
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
podWithPluginResourcesInInitContainers := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -753,3 +724,104 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
// allocatable in nodeInfo is more than needed, should skip updating
as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
}
func TestDevicePreStartContainer(t *testing.T) {
// Ensures that if device manager is indicated to invoke `PreStartContainer` RPC
// by device plugin, then device manager invokes PreStartContainer at endpoint interface.
// Also verifies that final allocation of mounts, envs etc is same as expected.
res1 := TestResource{
resourceName: "domain1.com/resource1",
resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
devs: []string{"dev1", "dev2"},
}
as := require.New(t)
podsStub := activePodsStub{
activePods: []*v1.Pod{},
}
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true}
testManager := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts)
ch := make(chan []string, 1)
testManager.endpoints[res1.resourceName] = &MockEndpoint{
initChan: ch,
allocateFunc: allocateStubFunc(),
}
pod := makePod(v1.ResourceList{
v1.ResourceName(res1.resourceName): res1.resourceQuantity})
activePods := []*v1.Pod{}
activePods = append(activePods, pod)
podsStub.updateActivePods(activePods)
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
as.Nil(err)
runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
as.Nil(err)
var initializedDevs []string
select {
case <-time.After(time.Second):
t.Fatalf("Timed out while waiting on channel for response from PreStartContainer RPC stub")
case initializedDevs = <-ch:
break
}
as.Contains(initializedDevs, "dev1")
as.Contains(initializedDevs, "dev2")
as.Equal(len(initializedDevs), len(res1.devs))
expectedResp, err := allocateStubFunc()([]string{"dev1", "dev2"})
as.Nil(err)
as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices))
as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts))
as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
}
func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
return func(devs []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.AllocateResponse)
resp.Envs = make(map[string]string)
for _, dev := range devs {
switch dev {
case "dev1":
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/aaa",
HostPath: "/dev/aaa",
Permissions: "mrw",
})
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/bbb",
HostPath: "/dev/bbb",
Permissions: "mrw",
})
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
ContainerPath: "/container_dir1/file1",
HostPath: "host_dir1/file1",
ReadOnly: true,
})
case "dev2":
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
ContainerPath: "/dev/ccc",
HostPath: "/dev/ccc",
Permissions: "mrw",
})
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
ContainerPath: "/container_dir1/file2",
HostPath: "host_dir1/file2",
ReadOnly: true,
})
resp.Envs["key1"] = "val1"
}
}
return resp, nil
}
}

View File

@ -117,7 +117,9 @@ func (pdev podDevices) devices() map[string]sets.String {
if _, exists := ret[resource]; !exists {
ret[resource] = sets.NewString()
}
ret[resource] = ret[resource].Union(devices.deviceIds)
if devices.allocResp != nil {
ret[resource] = ret[resource].Union(devices.deviceIds)
}
}
}
}

View File

@ -51,7 +51,7 @@ type Manager interface {
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
// for the found one. An empty struct is returned in case no cached state is found.
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error)
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.

View File

@ -75,7 +75,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin] [Serial] [D
framework.ExpectNoError(err)
By("Register resources")
err = dp1.Register(pluginapi.KubeletSocket, resourceName)
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false)
framework.ExpectNoError(err)
By("Waiting for the resource exported by the stub device plugin to become available on the local node")
@ -112,7 +112,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin] [Serial] [D
err = dp1.Start()
framework.ExpectNoError(err)
err = dp1.Register(pluginapi.KubeletSocket, resourceName)
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false)
framework.ExpectNoError(err)
By("Waiting for resource to become available on the local node after re-registration")