diff --git a/pkg/kubelet/apis/deviceplugin/v1beta1/BUILD b/pkg/kubelet/apis/deviceplugin/v1beta1/BUILD index 8edf2d705c..6a314cd109 100644 --- a/pkg/kubelet/apis/deviceplugin/v1beta1/BUILD +++ b/pkg/kubelet/apis/deviceplugin/v1beta1/BUILD @@ -33,9 +33,3 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) - -filegroup( - name = "go_default_library_protos", - srcs = ["api.proto"], - visibility = ["//visibility:public"], -) diff --git a/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go b/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go index 666f91fa89..575e65e475 100644 --- a/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go +++ b/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go @@ -19,22 +19,25 @@ limitations under the License. // DO NOT EDIT! /* - Package v1beta1 is a generated protocol buffer package. + Package deviceplugin is a generated protocol buffer package. It is generated from these files: api.proto It has these top-level messages: + DevicePluginOptions RegisterRequest Empty ListAndWatchResponse Device + PreStartContainerRequest + PreStartContainerResponse AllocateRequest AllocateResponse Mount DeviceSpec */ -package v1beta1 +package deviceplugin import proto "github.com/gogo/protobuf/proto" import fmt "fmt" @@ -63,6 +66,22 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package +type DevicePluginOptions struct { + // Indicates if PreStartContainer call is required before each container start + PreStartRequired bool `protobuf:"varint,1,opt,name=pre_start_required,json=preStartRequired,proto3" json:"pre_start_required,omitempty"` +} + +func (m *DevicePluginOptions) Reset() { *m = DevicePluginOptions{} } +func (*DevicePluginOptions) ProtoMessage() {} +func (*DevicePluginOptions) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{0} } + +func (m *DevicePluginOptions) GetPreStartRequired() bool { + if m != nil { + return m.PreStartRequired + } + return false +} + type RegisterRequest struct { // Version of the API the Device Plugin was built against Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` @@ -71,11 +90,13 @@ type RegisterRequest struct { Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"` // Schedulable resource name. As of now it's expected to be a DNS Label ResourceName string `protobuf:"bytes,3,opt,name=resource_name,json=resourceName,proto3" json:"resource_name,omitempty"` + // Options to be communicated with Device Manager + Options *DevicePluginOptions `protobuf:"bytes,4,opt,name=options" json:"options,omitempty"` } func (m *RegisterRequest) Reset() { *m = RegisterRequest{} } func (*RegisterRequest) ProtoMessage() {} -func (*RegisterRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{0} } +func (*RegisterRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{1} } func (m *RegisterRequest) GetVersion() string { if m != nil { @@ -98,12 +119,19 @@ func (m *RegisterRequest) GetResourceName() string { return "" } +func (m *RegisterRequest) GetOptions() *DevicePluginOptions { + if m != nil { + return m.Options + } + return nil +} + type Empty struct { } func (m *Empty) Reset() { *m = Empty{} } func (*Empty) ProtoMessage() {} -func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{1} } +func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{2} } // ListAndWatch returns a stream of List of Devices // Whenever a Device state change or a Device disapears, ListAndWatch @@ -114,7 +142,7 @@ type ListAndWatchResponse struct { func (m *ListAndWatchResponse) Reset() { *m = ListAndWatchResponse{} } func (*ListAndWatchResponse) ProtoMessage() {} -func (*ListAndWatchResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{2} } +func (*ListAndWatchResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{3} } func (m *ListAndWatchResponse) GetDevices() []*Device { if m != nil { @@ -139,7 +167,7 @@ type Device struct { func (m *Device) Reset() { *m = Device{} } func (*Device) ProtoMessage() {} -func (*Device) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{3} } +func (*Device) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{4} } func (m *Device) GetID() string { if m != nil { @@ -155,6 +183,33 @@ func (m *Device) GetHealth() string { return "" } +// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase. +// - PreStartContainer allows kubelet to pass reinitialized devices to containers. +// - PreStartContainer allows Device Plugin to run device specific operations on +// the Devices requested +type PreStartContainerRequest struct { + DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"` +} + +func (m *PreStartContainerRequest) Reset() { *m = PreStartContainerRequest{} } +func (*PreStartContainerRequest) ProtoMessage() {} +func (*PreStartContainerRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{5} } + +func (m *PreStartContainerRequest) GetDevicesIDs() []string { + if m != nil { + return m.DevicesIDs + } + return nil +} + +// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest +type PreStartContainerResponse struct { +} + +func (m *PreStartContainerResponse) Reset() { *m = PreStartContainerResponse{} } +func (*PreStartContainerResponse) ProtoMessage() {} +func (*PreStartContainerResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{6} } + // - Allocate is expected to be called during pod creation since allocation // failures for any container would result in pod startup failure. // - Allocate allows kubelet to exposes additional artifacts in a pod's @@ -167,7 +222,7 @@ type AllocateRequest struct { func (m *AllocateRequest) Reset() { *m = AllocateRequest{} } func (*AllocateRequest) ProtoMessage() {} -func (*AllocateRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{4} } +func (*AllocateRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{7} } func (m *AllocateRequest) GetDevicesIDs() []string { if m != nil { @@ -197,7 +252,7 @@ type AllocateResponse struct { func (m *AllocateResponse) Reset() { *m = AllocateResponse{} } func (*AllocateResponse) ProtoMessage() {} -func (*AllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{5} } +func (*AllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{8} } func (m *AllocateResponse) GetEnvs() map[string]string { if m != nil { @@ -240,7 +295,7 @@ type Mount struct { func (m *Mount) Reset() { *m = Mount{} } func (*Mount) ProtoMessage() {} -func (*Mount) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{6} } +func (*Mount) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{9} } func (m *Mount) GetContainerPath() string { if m != nil { @@ -278,7 +333,7 @@ type DeviceSpec struct { func (m *DeviceSpec) Reset() { *m = DeviceSpec{} } func (*DeviceSpec) ProtoMessage() {} -func (*DeviceSpec) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{7} } +func (*DeviceSpec) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{10} } func (m *DeviceSpec) GetContainerPath() string { if m != nil { @@ -302,14 +357,17 @@ func (m *DeviceSpec) GetPermissions() string { } func init() { - proto.RegisterType((*RegisterRequest)(nil), "v1beta1.RegisterRequest") - proto.RegisterType((*Empty)(nil), "v1beta1.Empty") - proto.RegisterType((*ListAndWatchResponse)(nil), "v1beta1.ListAndWatchResponse") - proto.RegisterType((*Device)(nil), "v1beta1.Device") - proto.RegisterType((*AllocateRequest)(nil), "v1beta1.AllocateRequest") - proto.RegisterType((*AllocateResponse)(nil), "v1beta1.AllocateResponse") - proto.RegisterType((*Mount)(nil), "v1beta1.Mount") - proto.RegisterType((*DeviceSpec)(nil), "v1beta1.DeviceSpec") + proto.RegisterType((*DevicePluginOptions)(nil), "deviceplugin.DevicePluginOptions") + proto.RegisterType((*RegisterRequest)(nil), "deviceplugin.RegisterRequest") + proto.RegisterType((*Empty)(nil), "deviceplugin.Empty") + proto.RegisterType((*ListAndWatchResponse)(nil), "deviceplugin.ListAndWatchResponse") + proto.RegisterType((*Device)(nil), "deviceplugin.Device") + proto.RegisterType((*PreStartContainerRequest)(nil), "deviceplugin.PreStartContainerRequest") + proto.RegisterType((*PreStartContainerResponse)(nil), "deviceplugin.PreStartContainerResponse") + proto.RegisterType((*AllocateRequest)(nil), "deviceplugin.AllocateRequest") + proto.RegisterType((*AllocateResponse)(nil), "deviceplugin.AllocateResponse") + proto.RegisterType((*Mount)(nil), "deviceplugin.Mount") + proto.RegisterType((*DeviceSpec)(nil), "deviceplugin.DeviceSpec") } // Reference imports to suppress errors if they are not otherwise used. @@ -336,7 +394,7 @@ func NewRegistrationClient(cc *grpc.ClientConn) RegistrationClient { func (c *registrationClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*Empty, error) { out := new(Empty) - err := grpc.Invoke(ctx, "/v1beta1.Registration/Register", in, out, c.cc, opts...) + err := grpc.Invoke(ctx, "/deviceplugin.Registration/Register", in, out, c.cc, opts...) if err != nil { return nil, err } @@ -363,7 +421,7 @@ func _Registration_Register_Handler(srv interface{}, ctx context.Context, dec fu } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/v1beta1.Registration/Register", + FullMethod: "/deviceplugin.Registration/Register", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(RegistrationServer).Register(ctx, req.(*RegisterRequest)) @@ -372,7 +430,7 @@ func _Registration_Register_Handler(srv interface{}, ctx context.Context, dec fu } var _Registration_serviceDesc = grpc.ServiceDesc{ - ServiceName: "v1beta1.Registration", + ServiceName: "deviceplugin.Registration", HandlerType: (*RegistrationServer)(nil), Methods: []grpc.MethodDesc{ { @@ -395,6 +453,10 @@ type DevicePluginClient interface { // Plugin can run device specific operations and instruct Kubelet // of the steps to make the Device available in the container Allocate(ctx context.Context, in *AllocateRequest, opts ...grpc.CallOption) (*AllocateResponse, error) + // PreStartContainer is called, if indicated by Device Plugin during registeration phase, + // before each container start. Device plugin can run device specific operations + // such as reseting the device before making devices available to the container + PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption) (*PreStartContainerResponse, error) } type devicePluginClient struct { @@ -406,7 +468,7 @@ func NewDevicePluginClient(cc *grpc.ClientConn) DevicePluginClient { } func (c *devicePluginClient) ListAndWatch(ctx context.Context, in *Empty, opts ...grpc.CallOption) (DevicePlugin_ListAndWatchClient, error) { - stream, err := grpc.NewClientStream(ctx, &_DevicePlugin_serviceDesc.Streams[0], c.cc, "/v1beta1.DevicePlugin/ListAndWatch", opts...) + stream, err := grpc.NewClientStream(ctx, &_DevicePlugin_serviceDesc.Streams[0], c.cc, "/deviceplugin.DevicePlugin/ListAndWatch", opts...) if err != nil { return nil, err } @@ -439,7 +501,16 @@ func (x *devicePluginListAndWatchClient) Recv() (*ListAndWatchResponse, error) { func (c *devicePluginClient) Allocate(ctx context.Context, in *AllocateRequest, opts ...grpc.CallOption) (*AllocateResponse, error) { out := new(AllocateResponse) - err := grpc.Invoke(ctx, "/v1beta1.DevicePlugin/Allocate", in, out, c.cc, opts...) + err := grpc.Invoke(ctx, "/deviceplugin.DevicePlugin/Allocate", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *devicePluginClient) PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption) (*PreStartContainerResponse, error) { + out := new(PreStartContainerResponse) + err := grpc.Invoke(ctx, "/deviceplugin.DevicePlugin/PreStartContainer", in, out, c.cc, opts...) if err != nil { return nil, err } @@ -457,6 +528,10 @@ type DevicePluginServer interface { // Plugin can run device specific operations and instruct Kubelet // of the steps to make the Device available in the container Allocate(context.Context, *AllocateRequest) (*AllocateResponse, error) + // PreStartContainer is called, if indicated by Device Plugin during registeration phase, + // before each container start. Device plugin can run device specific operations + // such as reseting the device before making devices available to the container + PreStartContainer(context.Context, *PreStartContainerRequest) (*PreStartContainerResponse, error) } func RegisterDevicePluginServer(s *grpc.Server, srv DevicePluginServer) { @@ -494,7 +569,7 @@ func _DevicePlugin_Allocate_Handler(srv interface{}, ctx context.Context, dec fu } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/v1beta1.DevicePlugin/Allocate", + FullMethod: "/deviceplugin.DevicePlugin/Allocate", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(DevicePluginServer).Allocate(ctx, req.(*AllocateRequest)) @@ -502,14 +577,36 @@ func _DevicePlugin_Allocate_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _DevicePlugin_PreStartContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PreStartContainerRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DevicePluginServer).PreStartContainer(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/deviceplugin.DevicePlugin/PreStartContainer", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DevicePluginServer).PreStartContainer(ctx, req.(*PreStartContainerRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _DevicePlugin_serviceDesc = grpc.ServiceDesc{ - ServiceName: "v1beta1.DevicePlugin", + ServiceName: "deviceplugin.DevicePlugin", HandlerType: (*DevicePluginServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Allocate", Handler: _DevicePlugin_Allocate_Handler, }, + { + MethodName: "PreStartContainer", + Handler: _DevicePlugin_PreStartContainer_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -521,6 +618,34 @@ var _DevicePlugin_serviceDesc = grpc.ServiceDesc{ Metadata: "api.proto", } +func (m *DevicePluginOptions) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DevicePluginOptions) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.PreStartRequired { + dAtA[i] = 0x8 + i++ + if m.PreStartRequired { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + return i, nil +} + func (m *RegisterRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -554,6 +679,16 @@ func (m *RegisterRequest) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintApi(dAtA, i, uint64(len(m.ResourceName))) i += copy(dAtA[i:], m.ResourceName) } + if m.Options != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintApi(dAtA, i, uint64(m.Options.Size())) + n1, err := m.Options.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } return i, nil } @@ -635,6 +770,57 @@ func (m *Device) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *PreStartContainerRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PreStartContainerRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.DevicesIDs) > 0 { + for _, s := range m.DevicesIDs { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + return i, nil +} + +func (m *PreStartContainerResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PreStartContainerResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + func (m *AllocateRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -847,6 +1033,15 @@ func encodeVarintApi(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } +func (m *DevicePluginOptions) Size() (n int) { + var l int + _ = l + if m.PreStartRequired { + n += 2 + } + return n +} + func (m *RegisterRequest) Size() (n int) { var l int _ = l @@ -862,6 +1057,10 @@ func (m *RegisterRequest) Size() (n int) { if l > 0 { n += 1 + l + sovApi(uint64(l)) } + if m.Options != nil { + l = m.Options.Size() + n += 1 + l + sovApi(uint64(l)) + } return n } @@ -897,6 +1096,24 @@ func (m *Device) Size() (n int) { return n } +func (m *PreStartContainerRequest) Size() (n int) { + var l int + _ = l + if len(m.DevicesIDs) > 0 { + for _, s := range m.DevicesIDs { + l = len(s) + n += 1 + l + sovApi(uint64(l)) + } + } + return n +} + +func (m *PreStartContainerResponse) Size() (n int) { + var l int + _ = l + return n +} + func (m *AllocateRequest) Size() (n int) { var l int _ = l @@ -991,6 +1208,16 @@ func sovApi(x uint64) (n int) { func sozApi(x uint64) (n int) { return sovApi(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (this *DevicePluginOptions) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&DevicePluginOptions{`, + `PreStartRequired:` + fmt.Sprintf("%v", this.PreStartRequired) + `,`, + `}`, + }, "") + return s +} func (this *RegisterRequest) String() string { if this == nil { return "nil" @@ -999,6 +1226,7 @@ func (this *RegisterRequest) String() string { `Version:` + fmt.Sprintf("%v", this.Version) + `,`, `Endpoint:` + fmt.Sprintf("%v", this.Endpoint) + `,`, `ResourceName:` + fmt.Sprintf("%v", this.ResourceName) + `,`, + `Options:` + strings.Replace(fmt.Sprintf("%v", this.Options), "DevicePluginOptions", "DevicePluginOptions", 1) + `,`, `}`, }, "") return s @@ -1033,6 +1261,25 @@ func (this *Device) String() string { }, "") return s } +func (this *PreStartContainerRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PreStartContainerRequest{`, + `DevicesIDs:` + fmt.Sprintf("%v", this.DevicesIDs) + `,`, + `}`, + }, "") + return s +} +func (this *PreStartContainerResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PreStartContainerResponse{`, + `}`, + }, "") + return s +} func (this *AllocateRequest) String() string { if this == nil { return "nil" @@ -1108,6 +1355,76 @@ func valueToStringApi(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } +func (m *DevicePluginOptions) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DevicePluginOptions: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DevicePluginOptions: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PreStartRequired", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.PreStartRequired = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *RegisterRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1224,6 +1541,39 @@ func (m *RegisterRequest) Unmarshal(dAtA []byte) error { } m.ResourceName = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Options", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Options == nil { + m.Options = &DevicePluginOptions{} + } + if err := m.Options.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipApi(dAtA[iNdEx:]) @@ -1484,6 +1834,135 @@ func (m *Device) Unmarshal(dAtA []byte) error { } return nil } +func (m *PreStartContainerRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PreStartContainerRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PreStartContainerRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DevicesIDs", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DevicesIDs = append(m.DevicesIDs, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PreStartContainerResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PreStartContainerResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PreStartContainerResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *AllocateRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2280,43 +2759,48 @@ var ( func init() { proto.RegisterFile("api.proto", fileDescriptorApi) } var fileDescriptorApi = []byte{ - // 594 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x4b, 0x6f, 0xd3, 0x4c, - 0x14, 0x8d, 0x93, 0x36, 0x8f, 0xdb, 0xf4, 0xa1, 0xf9, 0xaa, 0x4f, 0xc6, 0x80, 0x55, 0xb9, 0x02, - 0x15, 0xa4, 0xa6, 0x0f, 0x24, 0x8a, 0x58, 0x20, 0x19, 0xa5, 0x48, 0x95, 0x0a, 0x54, 0x66, 0xc1, - 0x32, 0x9a, 0x38, 0x97, 0x78, 0x84, 0x3d, 0x63, 0x3c, 0x63, 0x4b, 0xd9, 0xf1, 0x13, 0x58, 0xf0, - 0xa3, 0xba, 0x64, 0xc9, 0x92, 0x86, 0x15, 0xff, 0x02, 0x65, 0xfc, 0x48, 0x64, 0xd1, 0x05, 0x12, - 0x3b, 0xdf, 0x73, 0xef, 0xb1, 0xcf, 0x9c, 0xb9, 0xc7, 0xd0, 0xa3, 0x31, 0x1b, 0xc4, 0x89, 0x50, - 0x82, 0x74, 0xb2, 0x93, 0x31, 0x2a, 0x7a, 0x62, 0x1d, 0x4e, 0x99, 0x0a, 0xd2, 0xf1, 0xc0, 0x17, - 0xd1, 0xd1, 0x54, 0x4c, 0xc5, 0x91, 0xee, 0x8f, 0xd3, 0x0f, 0xba, 0xd2, 0x85, 0x7e, 0xca, 0x79, - 0x4e, 0x08, 0xdb, 0x1e, 0x4e, 0x99, 0x54, 0x98, 0x78, 0xf8, 0x29, 0x45, 0xa9, 0x88, 0x09, 0x9d, - 0x0c, 0x13, 0xc9, 0x04, 0x37, 0x8d, 0x3d, 0xe3, 0xa0, 0xe7, 0x95, 0x25, 0xb1, 0xa0, 0x8b, 0x7c, - 0x12, 0x0b, 0xc6, 0x95, 0xd9, 0xd4, 0xad, 0xaa, 0x26, 0xfb, 0xb0, 0x99, 0xa0, 0x14, 0x69, 0xe2, - 0xe3, 0x88, 0xd3, 0x08, 0xcd, 0x96, 0x1e, 0xe8, 0x97, 0xe0, 0x1b, 0x1a, 0xa1, 0xd3, 0x81, 0xf5, - 0xf3, 0x28, 0x56, 0x33, 0xc7, 0x85, 0xdd, 0x4b, 0x26, 0x95, 0xcb, 0x27, 0xef, 0xa9, 0xf2, 0x03, - 0x0f, 0x65, 0x2c, 0xb8, 0x44, 0xf2, 0x08, 0x3a, 0x13, 0xcc, 0x98, 0x8f, 0xd2, 0x34, 0xf6, 0x5a, - 0x07, 0x1b, 0xa7, 0xdb, 0x83, 0xe2, 0x60, 0x83, 0xa1, 0xc6, 0xbd, 0xb2, 0xef, 0x1c, 0x43, 0x3b, - 0x87, 0xc8, 0x16, 0x34, 0x2f, 0x86, 0x85, 0xd6, 0x26, 0x1b, 0x92, 0xff, 0xa1, 0x1d, 0x20, 0x0d, - 0x55, 0x50, 0x88, 0x2c, 0x2a, 0xe7, 0x04, 0xb6, 0xdd, 0x30, 0x14, 0x3e, 0x55, 0x58, 0x9e, 0xd5, - 0x06, 0x28, 0xde, 0x77, 0x31, 0xcc, 0x3f, 0xd9, 0xf3, 0x56, 0x10, 0xe7, 0x57, 0x13, 0x76, 0x96, - 0x9c, 0x42, 0xe4, 0x19, 0xac, 0x21, 0xcf, 0x4a, 0x85, 0xfb, 0x95, 0xc2, 0xfa, 0xe0, 0xe0, 0x9c, - 0x67, 0xf2, 0x9c, 0xab, 0x64, 0xe6, 0x69, 0x02, 0x79, 0x08, 0xed, 0x48, 0xa4, 0x5c, 0x49, 0xb3, - 0xa9, 0xa9, 0x5b, 0x15, 0xf5, 0xf5, 0x02, 0xf6, 0x8a, 0x2e, 0x39, 0x5c, 0xba, 0xd0, 0xd2, 0x83, - 0xff, 0xd5, 0x5c, 0x78, 0x17, 0xa3, 0x5f, 0x39, 0x41, 0x2e, 0x61, 0x83, 0x72, 0x2e, 0x14, 0x55, - 0x4c, 0x70, 0x69, 0xae, 0x69, 0xca, 0xe3, 0xdb, 0x65, 0xb9, 0xcb, 0xe1, 0x5c, 0xdd, 0x2a, 0xdd, - 0x3a, 0x83, 0x5e, 0xa5, 0x9b, 0xec, 0x40, 0xeb, 0x23, 0xce, 0x0a, 0x6f, 0x17, 0x8f, 0x64, 0x17, - 0xd6, 0x33, 0x1a, 0xa6, 0x58, 0x78, 0x9b, 0x17, 0xcf, 0x9b, 0xcf, 0x0c, 0xeb, 0x05, 0xec, 0xd4, - 0xdf, 0xfc, 0x37, 0x7c, 0x27, 0x80, 0x75, 0x6d, 0x03, 0x79, 0x00, 0x5b, 0xbe, 0xe0, 0x8a, 0x32, - 0x8e, 0xc9, 0x28, 0xa6, 0x2a, 0x28, 0xf8, 0x9b, 0x15, 0x7a, 0x45, 0x55, 0x40, 0xee, 0x42, 0x2f, - 0x10, 0x52, 0xe5, 0x13, 0xc5, 0x3a, 0x2e, 0x80, 0xb2, 0x99, 0x20, 0x9d, 0x8c, 0x04, 0x0f, 0x67, - 0x7a, 0x15, 0xbb, 0x5e, 0x77, 0x01, 0xbc, 0xe5, 0xe1, 0xcc, 0x49, 0x00, 0x96, 0x3e, 0xfe, 0x93, - 0xcf, 0xed, 0xc1, 0x46, 0x8c, 0x49, 0xc4, 0xa4, 0xd4, 0x57, 0x90, 0xef, 0xfe, 0x2a, 0x74, 0xfa, - 0x0a, 0xfa, 0x79, 0xd0, 0x12, 0xed, 0x0f, 0x79, 0x0a, 0xdd, 0x32, 0x78, 0xc4, 0xac, 0xee, 0xaa, - 0x96, 0x45, 0x6b, 0xb9, 0x21, 0x79, 0x6e, 0x1a, 0xa7, 0x5f, 0x0d, 0xe8, 0xe7, 0xe2, 0xaf, 0xc2, - 0x74, 0xca, 0x38, 0x71, 0xa1, 0xbf, 0x1a, 0x25, 0x52, 0xa3, 0x58, 0xf7, 0xab, 0xfa, 0x4f, 0x89, - 0x73, 0x1a, 0xc7, 0x06, 0x71, 0xa1, 0x5b, 0x2e, 0xc9, 0x8a, 0x96, 0x5a, 0x56, 0xac, 0x3b, 0xb7, - 0x6e, 0x94, 0xd3, 0x78, 0x79, 0xef, 0xfa, 0xc6, 0x36, 0xbe, 0xdf, 0xd8, 0x8d, 0xcf, 0x73, 0xdb, - 0xb8, 0x9e, 0xdb, 0xc6, 0xb7, 0xb9, 0x6d, 0xfc, 0x98, 0xdb, 0xc6, 0x97, 0x9f, 0x76, 0x63, 0xdc, - 0xd6, 0x3f, 0x9b, 0x27, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x65, 0x2a, 0x0d, 0x36, 0xb1, 0x04, - 0x00, 0x00, + // 687 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0x8e, 0x93, 0x36, 0x3f, 0x93, 0xb4, 0x0d, 0xdb, 0x0a, 0x19, 0x17, 0xac, 0x60, 0x04, 0x44, + 0x02, 0xd2, 0x12, 0x0e, 0xa0, 0x82, 0x10, 0xa5, 0x29, 0x52, 0x55, 0xa0, 0xc1, 0x3d, 0x70, 0x8c, + 0xb6, 0xce, 0x36, 0xb6, 0x70, 0x76, 0xcd, 0xee, 0x3a, 0x52, 0x6e, 0x3c, 0x42, 0x5f, 0x83, 0x37, + 0xe9, 0x91, 0x23, 0x47, 0x1a, 0x9e, 0x03, 0x09, 0x79, 0x6d, 0xe7, 0x8f, 0x40, 0x41, 0xe2, 0xe6, + 0xf9, 0x76, 0xbe, 0xf1, 0xcc, 0xb7, 0xfb, 0x0d, 0x94, 0x70, 0xe0, 0x35, 0x02, 0xce, 0x24, 0x43, + 0x95, 0x2e, 0x19, 0x78, 0x0e, 0x09, 0xfc, 0xb0, 0xe7, 0x51, 0xe3, 0x41, 0xcf, 0x93, 0x6e, 0x78, + 0xd2, 0x70, 0x58, 0x7f, 0xab, 0xc7, 0x7a, 0x6c, 0x4b, 0x25, 0x9d, 0x84, 0xa7, 0x2a, 0x52, 0x81, + 0xfa, 0x8a, 0xc9, 0xd6, 0x1e, 0xac, 0xb7, 0x14, 0xbd, 0xad, 0xe8, 0x47, 0x81, 0xf4, 0x18, 0x15, + 0xe8, 0x3e, 0xa0, 0x80, 0x93, 0x8e, 0x90, 0x98, 0xcb, 0x0e, 0x27, 0x1f, 0x43, 0x8f, 0x93, 0xae, + 0xae, 0xd5, 0xb4, 0x7a, 0xd1, 0xae, 0x06, 0x9c, 0x1c, 0x47, 0x07, 0x76, 0x82, 0x5b, 0x9f, 0x35, + 0x58, 0xb3, 0x49, 0xcf, 0x13, 0x92, 0xf0, 0x08, 0x24, 0x42, 0x22, 0x1d, 0x0a, 0x03, 0xc2, 0x85, + 0xc7, 0xa8, 0xa2, 0x95, 0xec, 0x34, 0x44, 0x06, 0x14, 0x09, 0xed, 0x06, 0xcc, 0xa3, 0x52, 0xcf, + 0xaa, 0xa3, 0x71, 0x8c, 0x6e, 0xc1, 0x0a, 0x27, 0x82, 0x85, 0xdc, 0x21, 0x1d, 0x8a, 0xfb, 0x44, + 0xcf, 0xa9, 0x84, 0x4a, 0x0a, 0xbe, 0xc5, 0x7d, 0x82, 0x9e, 0x42, 0x81, 0xc5, 0x7d, 0xea, 0x4b, + 0x35, 0xad, 0x5e, 0x6e, 0xde, 0x6c, 0x4c, 0x4b, 0xd0, 0x58, 0x30, 0x90, 0x9d, 0x32, 0xac, 0x02, + 0x2c, 0xef, 0xf7, 0x03, 0x39, 0xb4, 0x5e, 0xc1, 0xc6, 0x6b, 0x4f, 0xc8, 0x5d, 0xda, 0x7d, 0x8f, + 0xa5, 0xe3, 0xda, 0x44, 0x04, 0x8c, 0x0a, 0x82, 0x1a, 0x50, 0x88, 0xab, 0x09, 0x5d, 0xab, 0xe5, + 0xea, 0xe5, 0xe6, 0xc6, 0xa2, 0xea, 0x76, 0x9a, 0x64, 0x6d, 0x43, 0x3e, 0x86, 0xd0, 0x2a, 0x64, + 0x0f, 0x5a, 0xc9, 0xb4, 0x59, 0xaf, 0x85, 0xae, 0x42, 0xde, 0x25, 0xd8, 0x97, 0x6e, 0x32, 0x66, + 0x12, 0x59, 0x3b, 0xa0, 0xb7, 0x13, 0x09, 0xf7, 0x18, 0x95, 0xd8, 0xa3, 0x13, 0xd9, 0x4c, 0x80, + 0xa4, 0xf0, 0x41, 0x2b, 0x6e, 0xa0, 0x64, 0x4f, 0x21, 0xd6, 0x26, 0x5c, 0x5b, 0xc0, 0x8d, 0x5b, + 0xb7, 0x1e, 0xc2, 0xda, 0xae, 0xef, 0x33, 0x07, 0x4b, 0xf2, 0xb7, 0xf5, 0x7e, 0x64, 0xa1, 0x3a, + 0xe1, 0x24, 0x12, 0x3c, 0x83, 0x25, 0x42, 0x07, 0xe9, 0xfc, 0xf5, 0xd9, 0xf9, 0xe7, 0xb3, 0x1b, + 0xfb, 0x74, 0x20, 0xf6, 0xa9, 0xe4, 0x43, 0x5b, 0xb1, 0xd0, 0x3d, 0xc8, 0xf7, 0x59, 0x48, 0xa5, + 0xd0, 0xb3, 0x8a, 0xbf, 0x3e, 0xcb, 0x7f, 0x13, 0x9d, 0xd9, 0x49, 0x0a, 0x6a, 0x4e, 0xd4, 0xce, + 0xa9, 0x6c, 0x7d, 0x91, 0xda, 0xc7, 0x01, 0x71, 0xc6, 0x8a, 0xa3, 0x77, 0x50, 0xc6, 0x94, 0x32, + 0x89, 0xd3, 0x37, 0x10, 0xf1, 0xb6, 0x2e, 0xe9, 0x72, 0x77, 0xc2, 0x88, 0x9b, 0x9d, 0xae, 0x61, + 0x3c, 0x86, 0xd2, 0x78, 0x0c, 0x54, 0x85, 0xdc, 0x07, 0x32, 0x4c, 0x2e, 0x32, 0xfa, 0x44, 0x1b, + 0xb0, 0x3c, 0xc0, 0x7e, 0x48, 0x92, 0x8b, 0x8c, 0x83, 0x9d, 0xec, 0x13, 0xcd, 0x78, 0x0e, 0xd5, + 0xf9, 0xca, 0xff, 0xc2, 0xb7, 0x5c, 0x58, 0x56, 0x82, 0xa0, 0xdb, 0xb0, 0xea, 0xa4, 0x17, 0xda, + 0x09, 0xb0, 0x74, 0x13, 0xfe, 0xca, 0x18, 0x6d, 0x63, 0xe9, 0xa2, 0x4d, 0x28, 0xb9, 0x4c, 0xc8, + 0x38, 0x23, 0x71, 0x4f, 0x04, 0xa4, 0x87, 0x9c, 0xe0, 0x6e, 0x87, 0x51, 0x7f, 0xa8, 0x9c, 0x53, + 0xb4, 0x8b, 0x11, 0x70, 0x44, 0xfd, 0xa1, 0xc5, 0x01, 0x26, 0x62, 0xfe, 0x97, 0xdf, 0xd5, 0xa0, + 0x1c, 0x10, 0xde, 0xf7, 0x84, 0x50, 0xf7, 0x10, 0x5b, 0x75, 0x1a, 0x6a, 0xb6, 0xa1, 0x12, 0xef, + 0x05, 0xae, 0xf4, 0x41, 0x2f, 0xa0, 0x98, 0xee, 0x09, 0x74, 0x63, 0xf6, 0xc2, 0xe6, 0xf6, 0x87, + 0x31, 0xf7, 0x6a, 0x62, 0xcf, 0x66, 0x9a, 0x67, 0x59, 0xa8, 0x4c, 0xfb, 0x1b, 0x1d, 0x42, 0x65, + 0xda, 0xc6, 0x68, 0x11, 0xcf, 0xb0, 0x66, 0xc1, 0x45, 0xbe, 0xb7, 0x32, 0xdb, 0x1a, 0x3a, 0x84, + 0x62, 0xfa, 0x70, 0xe6, 0xfb, 0x9b, 0x33, 0x96, 0x61, 0xfe, 0xf9, 0xbd, 0x59, 0x19, 0x74, 0x0a, + 0x57, 0x7e, 0xb1, 0x2a, 0xba, 0x33, 0x4b, 0xfb, 0xdd, 0x1e, 0x30, 0xee, 0x5e, 0x9a, 0x97, 0xfe, + 0xe7, 0xe5, 0xf5, 0xf3, 0x0b, 0x53, 0xfb, 0x7a, 0x61, 0x66, 0x3e, 0x8d, 0x4c, 0xed, 0x7c, 0x64, + 0x6a, 0x5f, 0x46, 0xa6, 0xf6, 0x6d, 0x64, 0x6a, 0x67, 0xdf, 0xcd, 0xcc, 0x49, 0x5e, 0xed, 0xf9, + 0x47, 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x06, 0x87, 0xf7, 0x82, 0x31, 0x06, 0x00, 0x00, } diff --git a/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto b/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto index 1ae34f2e62..97c65e96ed 100644 --- a/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto +++ b/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto @@ -1,7 +1,7 @@ // To regenerate api.pb.go run hack/update-device-plugin.sh syntax = 'proto3'; -package v1beta1; +package deviceplugin; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; @@ -24,6 +24,11 @@ service Registration { rpc Register(RegisterRequest) returns (Empty) {} } +message DevicePluginOptions { + // Indicates if PreStartContainer call is required before each container start + bool pre_start_required = 1; +} + message RegisterRequest { // Version of the API the Device Plugin was built against string version = 1; @@ -32,6 +37,8 @@ message RegisterRequest { string endpoint = 2; // Schedulable resource name. As of now it's expected to be a DNS Label string resource_name = 3; + // Options to be communicated with Device Manager + DevicePluginOptions options = 4; } message Empty { @@ -48,6 +55,11 @@ service DevicePlugin { // Plugin can run device specific operations and instruct Kubelet // of the steps to make the Device available in the container rpc Allocate(AllocateRequest) returns (AllocateResponse) {} + + // PreStartContainer is called, if indicated by Device Plugin during registeration phase, + // before each container start. Device plugin can run device specific operations + // such as reseting the device before making devices available to the container + rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {} } // ListAndWatch returns a stream of List of Devices @@ -71,6 +83,18 @@ message Device { string health = 2; } +// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase. +// - PreStartContainer allows kubelet to pass reinitialized devices to containers. +// - PreStartContainer allows Device Plugin to run device specific operations on +// the Devices requested +message PreStartContainerRequest { + repeated string devicesIDs = 1; +} + +// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest +message PreStartContainerResponse { +} + // - Allocate is expected to be called during pod creation since allocation // failures for any container would result in pod startup failure. // - Allocate allows kubelet to exposes additional artifacts in a pod's diff --git a/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go b/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go index 76d6e1319c..5e8cdaf5ab 100644 --- a/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go +++ b/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go @@ -30,4 +30,6 @@ const ( DevicePluginPath = "/var/lib/kubelet/device-plugins/" // KubeletSocket is the path of the Kubelet registry socket KubeletSocket = DevicePluginPath + "kubelet.sock" + // Timeout duration in secs for PreStartContainer RPC + KubeletPreStartContainerRPCTimeoutInSecs = 30 ) diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 340d58c65e..42ad3125bf 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -601,8 +601,10 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe opts := &kubecontainer.RunContainerOptions{} // Allocate should already be called during predicateAdmitHandler.Admit(), // just try to fetch device runtime information from cached state here - devOpts := cm.deviceManager.GetDeviceRunContainerOptions(pod, container) - if devOpts == nil { + devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container) + if err != nil { + return nil, err + } else if devOpts == nil { return opts, nil } opts.Devices = append(opts.Devices, devOpts.Devices...) diff --git a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go index 66e8042a94..aab00427dd 100644 --- a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go +++ b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go @@ -105,7 +105,7 @@ func (m *Stub) Stop() error { } // Register registers the device plugin for the given resourceName with Kubelet. -func (m *Stub) Register(kubeletEndpoint, resourceName string) error { +func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerFlag bool) error { conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(10*time.Second), grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { @@ -120,6 +120,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string) error { Version: pluginapi.Version, Endpoint: path.Base(m.socket), ResourceName: resourceName, + Options: &pluginapi.DevicePluginOptions{PreStartRequired: preStartContainerFlag}, } _, err = client.Register(context.Background(), reqt) @@ -129,6 +130,12 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string) error { return nil } +// PreStartContainer resets the devices received +func (m *Stub) PreStartContainer(ctx context.Context, r *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + log.Printf("PreStartContainer, %+v", r) + return &pluginapi.PreStartContainerResponse{}, nil +} + // ListAndWatch lists devices and update that list according to the Update call func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { log.Println("ListAndWatch") diff --git a/pkg/kubelet/cm/devicemanager/endpoint.go b/pkg/kubelet/cm/devicemanager/endpoint.go index cd1e662c30..6d4654588e 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint.go +++ b/pkg/kubelet/cm/devicemanager/endpoint.go @@ -36,6 +36,7 @@ type endpoint interface { run() stop() allocate(devs []string) (*pluginapi.AllocateResponse, error) + preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) getDevices() []pluginapi.Device callback(resourceName string, added, updated, deleted []pluginapi.Device) } @@ -65,8 +66,9 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina client: client, clientConn: c, - socketPath: socketPath, - resourceName: resourceName, + socketPath: socketPath, + resourceName: resourceName, + invokePreStartContainerBoolFlag: false, devices: devices, cb: callback, @@ -182,6 +184,15 @@ func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, err }) } +// preStartContainer issues PreStartContainer gRPC call to the device plugin. +func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second) + defer cancel() + return e.client.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{ + DevicesIDs: devs, + }) +} + func (e *endpointImpl) stop() { e.clientConn.Close() } diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 049e7df78f..2b0d7674ed 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -85,6 +85,7 @@ type ManagerImpl struct { // podDevices contains pod to allocated device mapping. podDevices podDevices store utilstore.Store + pluginOpts map[string]*pluginapi.DevicePluginOptions } type sourcesReadyStub struct{} @@ -112,6 +113,7 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), + pluginOpts: make(map[string]*pluginapi.DevicePluginOptions), podDevices: make(podDevices), } manager.callback = manager.genericDeviceUpdateCallback @@ -201,6 +203,7 @@ func (m *ManagerImpl) checkpointFile() string { // starts device plugin registration service. func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { glog.V(2).Infof("Starting Device Plugin manager") + fmt.Println("Starting Device Plugin manager") m.activePods = activePods m.sourcesReady = sourcesReady @@ -340,8 +343,10 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) return } - m.mutex.Lock() + if r.Options != nil { + m.pluginOpts[r.ResourceName] = r.Options + } // Check for potential re-registration during the initialization of new endpoint, // and skip updating if re-registration happens. // TODO: simplify the part once we have a better way to handle registered devices @@ -590,11 +595,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont resource := string(k) needed := int(v.Value()) glog.V(3).Infof("needs %d %s", needed, resource) - _, registeredResource := m.healthyDevices[resource] - _, allocatedResource := m.allocatedDevices[resource] - // Continues if this is neither an active device plugin resource nor - // a resource we have previously allocated. - if !registeredResource && !allocatedResource { + if !m.isDevicePluginResource(resource) { continue } // Updates allocatedDevices to garbage collect any stranded resources @@ -610,6 +611,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont if allocDevices == nil || len(allocDevices) <= 0 { continue } + startRPCTime := time.Now() // Manager.Allocate involves RPC calls to device plugin, which // could be heavy-weight. Therefore we want to perform this operation outside @@ -659,10 +661,60 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont // GetDeviceRunContainerOptions checks whether we have cached containerDevices // for the passed-in and returns its DeviceRunContainerOptions // for the found one. An empty struct is returned in case no cached state is found. -func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { +func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) { + podUID := string(pod.UID) + contName := container.Name + for k := range container.Resources.Limits { + resource := string(k) + if !m.isDevicePluginResource(resource) { + continue + } + err := m.callPreStartContainerIfNeeded(podUID, contName, resource) + if err != nil { + return nil, err + } + } m.mutex.Lock() defer m.mutex.Unlock() - return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name) + return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil +} + +func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error { + m.mutex.Lock() + opts, ok := m.pluginOpts[resource] + if !ok { + m.mutex.Unlock() + glog.V(4).Infof("Plugin options not found in cache for resource: %s. Skip PreStartContainer", resource) + return nil + } + + if !opts.PreStartRequired { + m.mutex.Unlock() + glog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource, %v", resource) + return nil + } + + devices := m.podDevices.containerDevices(podUID, contName, resource) + if devices == nil { + m.mutex.Unlock() + return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource) + } + + e, ok := m.endpoints[resource] + if !ok { + m.mutex.Unlock() + return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource) + } + + m.mutex.Unlock() + devs := devices.UnsortedList() + glog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID) + _, err := e.preStartContainer(devs) + if err != nil { + return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err) + } + // TODO: Add metrics support for init RPC + return nil } // sanitizeNodeAllocatable scans through allocatedDevices in the device manager @@ -692,3 +744,14 @@ func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) { node.SetAllocatableResource(newAllocatableResource) } } + +func (m *ManagerImpl) isDevicePluginResource(resource string) bool { + _, registeredResource := m.healthyDevices[resource] + _, allocatedResource := m.allocatedDevices[resource] + // Return true if this is either an active device plugin resource or + // a resource we have previously allocated. + if registeredResource || allocatedResource { + return true + } + return false +} diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index fc88b4fef0..b2df960a26 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -53,8 +53,8 @@ func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.P } // GetDeviceRunContainerOptions simply returns nil. -func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { - return nil +func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) { + return nil, nil } // GetCapacity simply returns nil capacity and empty removed resource list. diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index fbd295f125..10af39eed5 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -60,6 +60,7 @@ func TestNewManagerImpl(t *testing.T) { defer os.RemoveAll(socketDir) _, err = newManagerImpl(socketName) require.NoError(t, err) + os.RemoveAll(socketDir) } func TestNewManagerImplStart(t *testing.T) { @@ -84,67 +85,68 @@ func TestDevicePluginReRegistration(t *testing.T) { devsForRegistration := []*pluginapi.Device{ {ID: "Dev3", Health: pluginapi.Healthy}, } + for _, preStartContainerFlag := range []bool{false, true} { - expCallbackCount := int32(0) - callbackCount := int32(0) - callbackChan := make(chan int32) - callback := func(n string, a, u, r []pluginapi.Device) { - callbackCount++ - if callbackCount > atomic.LoadInt32(&expCallbackCount) { + expCallbackCount := int32(0) + callbackCount := int32(0) + callbackChan := make(chan int32) + callback := func(n string, a, u, r []pluginapi.Device) { + callbackCount++ + if callbackCount > atomic.LoadInt32(&expCallbackCount) { + t.FailNow() + } + callbackChan <- callbackCount + } + m, p1 := setup(t, devs, callback, socketName, pluginSocketName) + atomic.StoreInt32(&expCallbackCount, 1) + p1.Register(socketName, testResourceName, preStartContainerFlag) + // Wait for the first callback to be issued. + + select { + case <-callbackChan: + break + case <-time.After(time.Second): t.FailNow() } - callbackChan <- callbackCount + devices := m.Devices() + require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") + + p2 := NewDevicePluginStub(devs, pluginSocketName+".new") + err = p2.Start() + require.NoError(t, err) + atomic.StoreInt32(&expCallbackCount, 2) + p2.Register(socketName, testResourceName, preStartContainerFlag) + // Wait for the second callback to be issued. + select { + case <-callbackChan: + break + case <-time.After(time.Second): + t.FailNow() + } + + devices2 := m.Devices() + require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") + + // Test the scenario that a plugin re-registers with different devices. + p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third") + err = p3.Start() + require.NoError(t, err) + atomic.StoreInt32(&expCallbackCount, 3) + p3.Register(socketName, testResourceName, preStartContainerFlag) + // Wait for the second callback to be issued. + select { + case <-callbackChan: + break + case <-time.After(time.Second): + t.FailNow() + } + devices3 := m.Devices() + require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") + p2.Stop() + p3.Stop() + cleanup(t, m, p1) + close(callbackChan) } - m, p1 := setup(t, devs, callback, socketName, pluginSocketName) - atomic.StoreInt32(&expCallbackCount, 1) - p1.Register(socketName, testResourceName) - // Wait for the first callback to be issued. - - select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() - } - devices := m.Devices() - require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") - - p2 := NewDevicePluginStub(devs, pluginSocketName+".new") - err = p2.Start() - require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 2) - p2.Register(socketName, testResourceName) - // Wait for the second callback to be issued. - select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() - } - - devices2 := m.Devices() - require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") - - // Test the scenario that a plugin re-registers with different devices. - p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third") - err = p3.Start() - require.NoError(t, err) - atomic.StoreInt32(&expCallbackCount, 3) - p3.Register(socketName, testResourceName) - // Wait for the second callback to be issued. - select { - case <-callbackChan: - break - case <-time.After(time.Second): - t.FailNow() - } - - devices3 := m.Devices() - require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") - p2.Stop() - p3.Stop() - cleanup(t, m, p1) - close(callbackChan) } func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub) { @@ -359,9 +361,9 @@ func TestCheckpoint(t *testing.T) { for podUID, containerDevices := range expectedPodDevices { for conName, resources := range containerDevices { for resource := range resources { - as.True(reflect.DeepEqual( - expectedPodDevices.containerDevices(podUID, conName, resource), - testManager.podDevices.containerDevices(podUID, conName, resource))) + expDevices := expectedPodDevices.containerDevices(podUID, conName, resource) + testDevices := testManager.podDevices.containerDevices(podUID, conName, resource) + as.True(reflect.DeepEqual(expDevices, testDevices)) opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName) opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName) as.Equal(len(opts1.Envs), len(opts2.Envs)) @@ -388,6 +390,7 @@ func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) { type MockEndpoint struct { allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error) + initChan chan []string } func (m *MockEndpoint) stop() {} @@ -399,6 +402,11 @@ func (m *MockEndpoint) getDevices() []pluginapi.Device { func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {} +func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { + m.initChan <- devs + return &pluginapi.PreStartContainerResponse{}, nil +} + func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) { if m.allocateFunc != nil { return m.allocateFunc(devs) @@ -423,7 +431,7 @@ func makePod(limits v1.ResourceList) *v1.Pod { } } -func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) *ManagerImpl { +func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) *ManagerImpl { monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} testManager := &ManagerImpl{ socketdir: tmpDir, @@ -431,6 +439,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpoint), + pluginOpts: opts, podDevices: make(podDevices), activePods: activePods, sourcesReady: &sourcesReadyStub{}, @@ -443,48 +452,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso } if res.resourceName == "domain1.com/resource1" { testManager.endpoints[res.resourceName] = &MockEndpoint{ - allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) { - resp := new(pluginapi.AllocateResponse) - resp.Envs = make(map[string]string) - for _, dev := range devs { - switch dev { - case "dev1": - resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ - ContainerPath: "/dev/aaa", - HostPath: "/dev/aaa", - Permissions: "mrw", - }) - - resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ - ContainerPath: "/dev/bbb", - HostPath: "/dev/bbb", - Permissions: "mrw", - }) - - resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ - ContainerPath: "/container_dir1/file1", - HostPath: "host_dir1/file1", - ReadOnly: true, - }) - - case "dev2": - resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ - ContainerPath: "/dev/ccc", - HostPath: "/dev/ccc", - Permissions: "mrw", - }) - - resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ - ContainerPath: "/container_dir1/file2", - HostPath: "host_dir1/file2", - ReadOnly: true, - }) - - resp.Envs["key1"] = "val1" - } - } - return resp, nil - }, + allocateFunc: allocateStubFunc(), } } if res.resourceName == "domain2.com/resource2" { @@ -549,7 +517,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) { as.Nil(err) defer os.RemoveAll(tmpDir) nodeInfo := getTestNodeInfo(v1.ResourceList{}) - testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources) + pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) + testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) testPods := []*v1.Pod{ makePod(v1.ResourceList{ @@ -604,7 +573,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) { t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v", testCase.description, testCase.expErr, err) } - runContainerOpts := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) + runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) + as.Nil(err) if testCase.expectedContainerOptsLen == nil { as.Nil(runContainerOpts) } else { @@ -642,7 +612,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) { tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) defer os.RemoveAll(tmpDir) - testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources) + pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) + testManager := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) podWithPluginResourcesInInitContainers := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -753,3 +724,104 @@ func TestSanitizeNodeAllocatable(t *testing.T) { // allocatable in nodeInfo is more than needed, should skip updating as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)])) } + +func TestDevicePreStartContainer(t *testing.T) { + // Ensures that if device manager is indicated to invoke `PreStartContainer` RPC + // by device plugin, then device manager invokes PreStartContainer at endpoint interface. + // Also verifies that final allocation of mounts, envs etc is same as expected. + res1 := TestResource{ + resourceName: "domain1.com/resource1", + resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), + devs: []string{"dev1", "dev2"}, + } + as := require.New(t) + podsStub := activePodsStub{ + activePods: []*v1.Pod{}, + } + tmpDir, err := ioutil.TempDir("", "checkpoint") + as.Nil(err) + defer os.RemoveAll(tmpDir) + nodeInfo := getTestNodeInfo(v1.ResourceList{}) + pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) + pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true} + + testManager := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts) + + ch := make(chan []string, 1) + testManager.endpoints[res1.resourceName] = &MockEndpoint{ + initChan: ch, + allocateFunc: allocateStubFunc(), + } + + pod := makePod(v1.ResourceList{ + v1.ResourceName(res1.resourceName): res1.resourceQuantity}) + activePods := []*v1.Pod{} + activePods = append(activePods, pod) + podsStub.updateActivePods(activePods) + err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) + as.Nil(err) + runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) + as.Nil(err) + var initializedDevs []string + select { + case <-time.After(time.Second): + t.Fatalf("Timed out while waiting on channel for response from PreStartContainer RPC stub") + case initializedDevs = <-ch: + break + } + + as.Contains(initializedDevs, "dev1") + as.Contains(initializedDevs, "dev2") + as.Equal(len(initializedDevs), len(res1.devs)) + + expectedResp, err := allocateStubFunc()([]string{"dev1", "dev2"}) + as.Nil(err) + as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices)) + as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts)) + as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs)) +} + +func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) { + return func(devs []string) (*pluginapi.AllocateResponse, error) { + resp := new(pluginapi.AllocateResponse) + resp.Envs = make(map[string]string) + for _, dev := range devs { + switch dev { + case "dev1": + resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + ContainerPath: "/dev/aaa", + HostPath: "/dev/aaa", + Permissions: "mrw", + }) + + resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + ContainerPath: "/dev/bbb", + HostPath: "/dev/bbb", + Permissions: "mrw", + }) + + resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ + ContainerPath: "/container_dir1/file1", + HostPath: "host_dir1/file1", + ReadOnly: true, + }) + + case "dev2": + resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + ContainerPath: "/dev/ccc", + HostPath: "/dev/ccc", + Permissions: "mrw", + }) + + resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ + ContainerPath: "/container_dir1/file2", + HostPath: "host_dir1/file2", + ReadOnly: true, + }) + + resp.Envs["key1"] = "val1" + } + } + return resp, nil + } +} diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index aecb36855d..192f053913 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -117,7 +117,9 @@ func (pdev podDevices) devices() map[string]sets.String { if _, exists := ret[resource]; !exists { ret[resource] = sets.NewString() } - ret[resource] = ret[resource].Union(devices.deviceIds) + if devices.allocResp != nil { + ret[resource] = ret[resource].Union(devices.deviceIds) + } } } } diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index cfab0f2e65..973f456a34 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -51,7 +51,7 @@ type Manager interface { // GetDeviceRunContainerOptions checks whether we have cached containerDevices // for the passed-in and returns its DeviceRunContainerOptions // for the found one. An empty struct is returned in case no cached state is found. - GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions + GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable // and inactive device plugin resources previously registered on the node. diff --git a/test/e2e_node/device_plugin.go b/test/e2e_node/device_plugin.go index 5a5790755f..3d20fa74af 100644 --- a/test/e2e_node/device_plugin.go +++ b/test/e2e_node/device_plugin.go @@ -75,7 +75,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin] [Serial] [D framework.ExpectNoError(err) By("Register resources") - err = dp1.Register(pluginapi.KubeletSocket, resourceName) + err = dp1.Register(pluginapi.KubeletSocket, resourceName, false) framework.ExpectNoError(err) By("Waiting for the resource exported by the stub device plugin to become available on the local node") @@ -112,7 +112,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin] [Serial] [D err = dp1.Start() framework.ExpectNoError(err) - err = dp1.Register(pluginapi.KubeletSocket, resourceName) + err = dp1.Register(pluginapi.KubeletSocket, resourceName, false) framework.ExpectNoError(err) By("Waiting for resource to become available on the local node after re-registration")