From 07beac600469a2551ba92f08c2cd20ea15d02f96 Mon Sep 17 00:00:00 2001 From: Jiaying Zhang Date: Fri, 23 Feb 2018 10:54:43 -0800 Subject: [PATCH] Made a couple API changes to deviceplugin/v1beta1 to avoid future incompatible changes: - Add GetDevicePluginOptions rpc call. This is needed when we switch from Registration service to probe-based plugin watcher. - Change AllocateRequest and AllocateResponse to allow device requests from multiple containers in a pod. Currently only made mechanical change on the devicemanager and test code to cope with the API but still issues an Allocate call per container. We can modify the devicemanager in 1.11 to issue a single Allocate call per pod. The change will also facilitate incremental API change to communicate pod level information through Allocate rpc if there is such future need. --- .../apis/deviceplugin/v1beta1/api.pb.go | 454 +++++++++++++++--- .../apis/deviceplugin/v1beta1/api.proto | 12 + .../apis/deviceplugin/v1beta1/constants.go | 2 +- .../cm/devicemanager/device_plugin_stub.go | 5 + pkg/kubelet/cm/devicemanager/endpoint.go | 4 +- pkg/kubelet/cm/devicemanager/endpoint_test.go | 9 +- pkg/kubelet/cm/devicemanager/manager.go | 4 +- pkg/kubelet/cm/devicemanager/manager_test.go | 20 +- pkg/kubelet/cm/devicemanager/pod_devices.go | 6 +- test/e2e_node/device_plugin.go | 58 +-- 10 files changed, 475 insertions(+), 99 deletions(-) diff --git a/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go b/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go index ebc275a3c9..dc6d68efdc 100644 --- a/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go +++ b/pkg/kubelet/apis/deviceplugin/v1beta1/api.pb.go @@ -33,7 +33,9 @@ limitations under the License. PreStartContainerRequest PreStartContainerResponse AllocateRequest + ContainerAllocateRequest AllocateResponse + ContainerAllocateResponse Mount DeviceSpec */ @@ -217,14 +219,29 @@ func (*PreStartContainerResponse) Descriptor() ([]byte, []int) { return fileDesc // - Allocate allows Device Plugin to run device specific operations on // the Devices requested type AllocateRequest struct { - DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"` + ContainerRequests []*ContainerAllocateRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests" json:"container_requests,omitempty"` } func (m *AllocateRequest) Reset() { *m = AllocateRequest{} } func (*AllocateRequest) ProtoMessage() {} func (*AllocateRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{7} } -func (m *AllocateRequest) GetDevicesIDs() []string { +func (m *AllocateRequest) GetContainerRequests() []*ContainerAllocateRequest { + if m != nil { + return m.ContainerRequests + } + return nil +} + +type ContainerAllocateRequest struct { + DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"` +} + +func (m *ContainerAllocateRequest) Reset() { *m = ContainerAllocateRequest{} } +func (*ContainerAllocateRequest) ProtoMessage() {} +func (*ContainerAllocateRequest) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{8} } + +func (m *ContainerAllocateRequest) GetDevicesIDs() []string { if m != nil { return m.DevicesIDs } @@ -240,6 +257,21 @@ func (m *AllocateRequest) GetDevicesIDs() []string { // The Device plugin should send a ListAndWatch update and fail the // Allocation request type AllocateResponse struct { + ContainerResponses []*ContainerAllocateResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses" json:"container_responses,omitempty"` +} + +func (m *AllocateResponse) Reset() { *m = AllocateResponse{} } +func (*AllocateResponse) ProtoMessage() {} +func (*AllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{9} } + +func (m *AllocateResponse) GetContainerResponses() []*ContainerAllocateResponse { + if m != nil { + return m.ContainerResponses + } + return nil +} + +type ContainerAllocateResponse struct { // List of environment variable to be set in the container to access one of more devices. Envs map[string]string `protobuf:"bytes,1,rep,name=envs" json:"envs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // Mounts for the container. @@ -250,32 +282,32 @@ type AllocateResponse struct { Annotations map[string]string `protobuf:"bytes,4,rep,name=annotations" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } -func (m *AllocateResponse) Reset() { *m = AllocateResponse{} } -func (*AllocateResponse) ProtoMessage() {} -func (*AllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{8} } +func (m *ContainerAllocateResponse) Reset() { *m = ContainerAllocateResponse{} } +func (*ContainerAllocateResponse) ProtoMessage() {} +func (*ContainerAllocateResponse) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{10} } -func (m *AllocateResponse) GetEnvs() map[string]string { +func (m *ContainerAllocateResponse) GetEnvs() map[string]string { if m != nil { return m.Envs } return nil } -func (m *AllocateResponse) GetMounts() []*Mount { +func (m *ContainerAllocateResponse) GetMounts() []*Mount { if m != nil { return m.Mounts } return nil } -func (m *AllocateResponse) GetDevices() []*DeviceSpec { +func (m *ContainerAllocateResponse) GetDevices() []*DeviceSpec { if m != nil { return m.Devices } return nil } -func (m *AllocateResponse) GetAnnotations() map[string]string { +func (m *ContainerAllocateResponse) GetAnnotations() map[string]string { if m != nil { return m.Annotations } @@ -295,7 +327,7 @@ type Mount struct { func (m *Mount) Reset() { *m = Mount{} } func (*Mount) ProtoMessage() {} -func (*Mount) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{9} } +func (*Mount) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{11} } func (m *Mount) GetContainerPath() string { if m != nil { @@ -333,7 +365,7 @@ type DeviceSpec struct { func (m *DeviceSpec) Reset() { *m = DeviceSpec{} } func (*DeviceSpec) ProtoMessage() {} -func (*DeviceSpec) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{10} } +func (*DeviceSpec) Descriptor() ([]byte, []int) { return fileDescriptorApi, []int{12} } func (m *DeviceSpec) GetContainerPath() string { if m != nil { @@ -365,7 +397,9 @@ func init() { proto.RegisterType((*PreStartContainerRequest)(nil), "v1beta1.PreStartContainerRequest") proto.RegisterType((*PreStartContainerResponse)(nil), "v1beta1.PreStartContainerResponse") proto.RegisterType((*AllocateRequest)(nil), "v1beta1.AllocateRequest") + proto.RegisterType((*ContainerAllocateRequest)(nil), "v1beta1.ContainerAllocateRequest") proto.RegisterType((*AllocateResponse)(nil), "v1beta1.AllocateResponse") + proto.RegisterType((*ContainerAllocateResponse)(nil), "v1beta1.ContainerAllocateResponse") proto.RegisterType((*Mount)(nil), "v1beta1.Mount") proto.RegisterType((*DeviceSpec)(nil), "v1beta1.DeviceSpec") } @@ -445,6 +479,9 @@ var _Registration_serviceDesc = grpc.ServiceDesc{ // Client API for DevicePlugin service type DevicePluginClient interface { + // GetDevicePluginOptions returns options to be communicated with Device + // Manager + GetDevicePluginOptions(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*DevicePluginOptions, error) // ListAndWatch returns a stream of List of Devices // Whenever a Device state change or a Device disapears, ListAndWatch // returns the new list @@ -467,6 +504,15 @@ func NewDevicePluginClient(cc *grpc.ClientConn) DevicePluginClient { return &devicePluginClient{cc} } +func (c *devicePluginClient) GetDevicePluginOptions(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*DevicePluginOptions, error) { + out := new(DevicePluginOptions) + err := grpc.Invoke(ctx, "/v1beta1.DevicePlugin/GetDevicePluginOptions", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *devicePluginClient) ListAndWatch(ctx context.Context, in *Empty, opts ...grpc.CallOption) (DevicePlugin_ListAndWatchClient, error) { stream, err := grpc.NewClientStream(ctx, &_DevicePlugin_serviceDesc.Streams[0], c.cc, "/v1beta1.DevicePlugin/ListAndWatch", opts...) if err != nil { @@ -520,6 +566,9 @@ func (c *devicePluginClient) PreStartContainer(ctx context.Context, in *PreStart // Server API for DevicePlugin service type DevicePluginServer interface { + // GetDevicePluginOptions returns options to be communicated with Device + // Manager + GetDevicePluginOptions(context.Context, *Empty) (*DevicePluginOptions, error) // ListAndWatch returns a stream of List of Devices // Whenever a Device state change or a Device disapears, ListAndWatch // returns the new list @@ -538,6 +587,24 @@ func RegisterDevicePluginServer(s *grpc.Server, srv DevicePluginServer) { s.RegisterService(&_DevicePlugin_serviceDesc, srv) } +func _DevicePlugin_GetDevicePluginOptions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DevicePluginServer).GetDevicePluginOptions(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v1beta1.DevicePlugin/GetDevicePluginOptions", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DevicePluginServer).GetDevicePluginOptions(ctx, req.(*Empty)) + } + return interceptor(ctx, in, info, handler) +} + func _DevicePlugin_ListAndWatch_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(Empty) if err := stream.RecvMsg(m); err != nil { @@ -599,6 +666,10 @@ var _DevicePlugin_serviceDesc = grpc.ServiceDesc{ ServiceName: "v1beta1.DevicePlugin", HandlerType: (*DevicePluginServer)(nil), Methods: []grpc.MethodDesc{ + { + MethodName: "GetDevicePluginOptions", + Handler: _DevicePlugin_GetDevicePluginOptions_Handler, + }, { MethodName: "Allocate", Handler: _DevicePlugin_Allocate_Handler, @@ -832,6 +903,36 @@ func (m *AllocateRequest) Marshal() (dAtA []byte, err error) { } func (m *AllocateRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.ContainerRequests) > 0 { + for _, msg := range m.ContainerRequests { + dAtA[i] = 0xa + i++ + i = encodeVarintApi(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *ContainerAllocateRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ContainerAllocateRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int @@ -865,6 +966,36 @@ func (m *AllocateResponse) Marshal() (dAtA []byte, err error) { } func (m *AllocateResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.ContainerResponses) > 0 { + for _, msg := range m.ContainerResponses { + dAtA[i] = 0xa + i++ + i = encodeVarintApi(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *ContainerAllocateResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ContainerAllocateResponse) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int @@ -1115,6 +1246,18 @@ func (m *PreStartContainerResponse) Size() (n int) { } func (m *AllocateRequest) Size() (n int) { + var l int + _ = l + if len(m.ContainerRequests) > 0 { + for _, e := range m.ContainerRequests { + l = e.Size() + n += 1 + l + sovApi(uint64(l)) + } + } + return n +} + +func (m *ContainerAllocateRequest) Size() (n int) { var l int _ = l if len(m.DevicesIDs) > 0 { @@ -1127,6 +1270,18 @@ func (m *AllocateRequest) Size() (n int) { } func (m *AllocateResponse) Size() (n int) { + var l int + _ = l + if len(m.ContainerResponses) > 0 { + for _, e := range m.ContainerResponses { + l = e.Size() + n += 1 + l + sovApi(uint64(l)) + } + } + return n +} + +func (m *ContainerAllocateResponse) Size() (n int) { var l int _ = l if len(m.Envs) > 0 { @@ -1285,12 +1440,32 @@ func (this *AllocateRequest) String() string { return "nil" } s := strings.Join([]string{`&AllocateRequest{`, + `ContainerRequests:` + strings.Replace(fmt.Sprintf("%v", this.ContainerRequests), "ContainerAllocateRequest", "ContainerAllocateRequest", 1) + `,`, + `}`, + }, "") + return s +} +func (this *ContainerAllocateRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ContainerAllocateRequest{`, `DevicesIDs:` + fmt.Sprintf("%v", this.DevicesIDs) + `,`, `}`, }, "") return s } func (this *AllocateResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&AllocateResponse{`, + `ContainerResponses:` + strings.Replace(fmt.Sprintf("%v", this.ContainerResponses), "ContainerAllocateResponse", "ContainerAllocateResponse", 1) + `,`, + `}`, + }, "") + return s +} +func (this *ContainerAllocateResponse) String() string { if this == nil { return "nil" } @@ -1314,7 +1489,7 @@ func (this *AllocateResponse) String() string { mapStringForAnnotations += fmt.Sprintf("%v: %v,", k, this.Annotations[k]) } mapStringForAnnotations += "}" - s := strings.Join([]string{`&AllocateResponse{`, + s := strings.Join([]string{`&ContainerAllocateResponse{`, `Envs:` + mapStringForEnvs + `,`, `Mounts:` + strings.Replace(fmt.Sprintf("%v", this.Mounts), "Mount", "Mount", 1) + `,`, `Devices:` + strings.Replace(fmt.Sprintf("%v", this.Devices), "DeviceSpec", "DeviceSpec", 1) + `,`, @@ -1992,6 +2167,87 @@ func (m *AllocateRequest) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: AllocateRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ContainerRequests", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ContainerRequests = append(m.ContainerRequests, &ContainerAllocateRequest{}) + if err := m.ContainerRequests[len(m.ContainerRequests)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ContainerAllocateRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ContainerAllocateRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ContainerAllocateRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { case 1: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field DevicesIDs", wireType) @@ -2071,6 +2327,87 @@ func (m *AllocateResponse) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: AllocateResponse: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ContainerResponses", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ContainerResponses = append(m.ContainerResponses, &ContainerAllocateResponse{}) + if err := m.ContainerResponses[len(m.ContainerResponses)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ContainerAllocateResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ContainerAllocateResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ContainerAllocateResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { case 1: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Envs", wireType) @@ -2759,48 +3096,53 @@ var ( func init() { proto.RegisterFile("api.proto", fileDescriptorApi) } var fileDescriptorApi = []byte{ - // 688 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x4f, 0x4f, 0x13, 0x41, - 0x14, 0xef, 0xb6, 0xd0, 0x3f, 0xaf, 0x05, 0xea, 0x40, 0xcc, 0xb2, 0xe0, 0xa6, 0x2e, 0xd1, 0xa0, - 0x91, 0x02, 0x35, 0x01, 0xc3, 0xc1, 0xa4, 0x52, 0x4c, 0x48, 0x50, 0xc8, 0x72, 0xf0, 0x62, 0xd2, - 0x4c, 0xdb, 0xb1, 0xbb, 0x71, 0x3b, 0xb3, 0xce, 0xcc, 0x36, 0xe9, 0xcd, 0x8f, 0xe0, 0x87, 0xf0, - 0xc3, 0x70, 0xf4, 0xe8, 0x51, 0xea, 0xc9, 0xab, 0x9f, 0xc0, 0xec, 0xec, 0x9f, 0x36, 0x2b, 0x18, - 0x4d, 0xbc, 0xcd, 0xfb, 0xbd, 0xf7, 0x7b, 0xf3, 0xfe, 0xcc, 0x6f, 0xa0, 0x82, 0x7d, 0xb7, 0xe9, - 0x73, 0x26, 0x19, 0x2a, 0x8d, 0xf7, 0x7b, 0x44, 0xe2, 0x7d, 0x63, 0x67, 0xe8, 0x4a, 0x27, 0xe8, - 0x35, 0xfb, 0x6c, 0xb4, 0x3b, 0x64, 0x43, 0xb6, 0xab, 0xfc, 0xbd, 0xe0, 0x9d, 0xb2, 0x94, 0xa1, - 0x4e, 0x11, 0xcf, 0x3a, 0x86, 0xd5, 0x0e, 0x19, 0xbb, 0x7d, 0x72, 0xe1, 0x05, 0x43, 0x97, 0x9e, - 0xfb, 0xd2, 0x65, 0x54, 0xa0, 0x27, 0x80, 0x7c, 0x4e, 0xba, 0x42, 0x62, 0x2e, 0xbb, 0x9c, 0x7c, - 0x08, 0x5c, 0x4e, 0x06, 0xba, 0xd6, 0xd0, 0xb6, 0xcb, 0x76, 0xdd, 0xe7, 0xe4, 0x32, 0x74, 0xd8, - 0x31, 0x6e, 0x7d, 0xd6, 0x60, 0xc5, 0x26, 0x43, 0x57, 0x48, 0xc2, 0x43, 0x90, 0x08, 0x89, 0x74, - 0x28, 0x8d, 0x09, 0x17, 0x2e, 0xa3, 0x8a, 0x56, 0xb1, 0x13, 0x13, 0x19, 0x50, 0x26, 0x74, 0xe0, - 0x33, 0x97, 0x4a, 0x3d, 0xaf, 0x5c, 0xa9, 0x8d, 0xb6, 0x60, 0x89, 0x13, 0xc1, 0x02, 0xde, 0x27, - 0x5d, 0x8a, 0x47, 0x44, 0x2f, 0xa8, 0x80, 0x5a, 0x02, 0xbe, 0xc6, 0x23, 0x82, 0x0e, 0xa0, 0xc4, - 0xa2, 0x3a, 0xf5, 0x85, 0x86, 0xb6, 0x5d, 0x6d, 0x6d, 0x36, 0xe3, 0xee, 0x9b, 0x37, 0xf4, 0x62, - 0x27, 0xc1, 0x56, 0x09, 0x16, 0x4f, 0x46, 0xbe, 0x9c, 0x58, 0x6d, 0x58, 0x3b, 0x73, 0x85, 0x6c, - 0xd3, 0xc1, 0x1b, 0x2c, 0xfb, 0x8e, 0x4d, 0x84, 0xcf, 0xa8, 0x20, 0xe8, 0x11, 0x94, 0x06, 0x2a, - 0x81, 0xd0, 0xb5, 0x46, 0x61, 0xbb, 0xda, 0x5a, 0xc9, 0x24, 0xb6, 0x13, 0xbf, 0xb5, 0x07, 0xc5, - 0x08, 0x42, 0xcb, 0x90, 0x3f, 0xed, 0xc4, 0x3d, 0xe6, 0xdd, 0x0e, 0xba, 0x0b, 0x45, 0x87, 0x60, - 0x4f, 0x3a, 0x71, 0x73, 0xb1, 0x65, 0x1d, 0x81, 0x7e, 0x11, 0x0f, 0xee, 0x98, 0x51, 0x89, 0x5d, - 0x3a, 0x1b, 0x96, 0x09, 0x10, 0x27, 0x3e, 0xed, 0x44, 0x77, 0x57, 0xec, 0x39, 0xc4, 0xda, 0x80, - 0xf5, 0x1b, 0xb8, 0x51, 0xd5, 0xd6, 0x3e, 0xac, 0xb4, 0x3d, 0x8f, 0xf5, 0xb1, 0x24, 0x7f, 0x9b, - 0xef, 0x47, 0x1e, 0xea, 0x33, 0x4e, 0xdc, 0xfd, 0x21, 0x2c, 0x10, 0x3a, 0x4e, 0x5a, 0xdf, 0x4a, - 0x5b, 0xcf, 0x06, 0x36, 0x4f, 0xe8, 0x58, 0x9c, 0x50, 0xc9, 0x27, 0xb6, 0x22, 0xa0, 0x87, 0x50, - 0x1c, 0xb1, 0x80, 0x4a, 0xa1, 0xe7, 0x15, 0x75, 0x39, 0xa5, 0xbe, 0x0a, 0x61, 0x3b, 0xf6, 0xa2, - 0x9d, 0xd9, 0x78, 0x0b, 0x2a, 0x70, 0x35, 0x33, 0xde, 0x4b, 0x9f, 0xf4, 0xd3, 0x11, 0xa3, 0x33, - 0xa8, 0x62, 0x4a, 0x99, 0xc4, 0xc9, 0xaa, 0x43, 0xca, 0xe3, 0xdb, 0xcb, 0x6a, 0xcf, 0x82, 0xa3, - 0xea, 0xe6, 0xe9, 0xc6, 0x21, 0x54, 0xd2, 0xba, 0x51, 0x1d, 0x0a, 0xef, 0xc9, 0x24, 0x5e, 0x5a, - 0x78, 0x44, 0x6b, 0xb0, 0x38, 0xc6, 0x5e, 0x40, 0xe2, 0xa5, 0x45, 0xc6, 0x51, 0xfe, 0x99, 0x66, - 0x3c, 0x87, 0x7a, 0x36, 0xf3, 0xbf, 0xf0, 0x2d, 0x07, 0x16, 0xd5, 0x18, 0xd0, 0x03, 0x58, 0xee, - 0x27, 0xcb, 0xeb, 0xfa, 0x58, 0x3a, 0x31, 0x7f, 0x29, 0x45, 0x2f, 0xb0, 0x74, 0xd0, 0x06, 0x54, - 0x1c, 0x26, 0x64, 0x14, 0x11, 0xeb, 0x23, 0x04, 0x12, 0x27, 0x27, 0x78, 0xd0, 0x65, 0xd4, 0x9b, - 0x28, 0x6d, 0x94, 0xed, 0x72, 0x08, 0x9c, 0x53, 0x6f, 0x62, 0x71, 0x80, 0xd9, 0x1c, 0xff, 0xcb, - 0x75, 0x0d, 0xa8, 0xfa, 0x84, 0x8f, 0x5c, 0x21, 0xd4, 0x0a, 0x22, 0x31, 0xce, 0x43, 0xad, 0x97, - 0x50, 0x8b, 0x94, 0xcf, 0xd5, 0x7c, 0xd0, 0x01, 0x94, 0x93, 0x9f, 0x00, 0xe9, 0xe9, 0xae, 0x32, - 0x9f, 0x83, 0x31, 0x7b, 0x21, 0x91, 0x20, 0x73, 0xad, 0x9f, 0x1a, 0xd4, 0xe6, 0xc5, 0x8b, 0xda, - 0x50, 0x9b, 0xd7, 0x28, 0xca, 0x50, 0x8c, 0x7b, 0xa9, 0x7d, 0x93, 0x94, 0xad, 0xdc, 0x9e, 0x86, - 0xda, 0x50, 0x4e, 0x1e, 0xc9, 0x5c, 0x2d, 0x19, 0xad, 0x18, 0xeb, 0xb7, 0xbe, 0x28, 0x2b, 0x87, - 0xde, 0xc2, 0x9d, 0xdf, 0x84, 0x87, 0xee, 0xa7, 0x8c, 0xdb, 0x04, 0x6d, 0x58, 0x7f, 0x0a, 0x49, - 0xb2, 0xbf, 0xd8, 0xbc, 0xba, 0x36, 0xb5, 0xaf, 0xd7, 0x66, 0xee, 0xe3, 0xd4, 0xd4, 0xae, 0xa6, - 0xa6, 0xf6, 0x65, 0x6a, 0x6a, 0xdf, 0xa6, 0xa6, 0xf6, 0xe9, 0xbb, 0x99, 0xeb, 0x15, 0xd5, 0x0f, - 0xfd, 0xf4, 0x57, 0x00, 0x00, 0x00, 0xff, 0xff, 0xad, 0x28, 0x13, 0x30, 0xe6, 0x05, 0x00, 0x00, + // 760 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xcd, 0x4e, 0xdb, 0x4a, + 0x14, 0x8e, 0x13, 0xc8, 0xcf, 0x49, 0x80, 0x30, 0x20, 0x64, 0x0c, 0xd7, 0xca, 0xf5, 0xd5, 0xbd, + 0xe2, 0x4a, 0x10, 0x20, 0x48, 0xdc, 0x2b, 0x16, 0x55, 0x53, 0x42, 0x5b, 0xa4, 0xb6, 0x44, 0x46, + 0x55, 0x37, 0x95, 0x22, 0xc7, 0x99, 0xc6, 0x56, 0x93, 0x19, 0xd7, 0x33, 0x89, 0x94, 0x5d, 0x17, + 0x7d, 0x80, 0x3e, 0x44, 0x1f, 0xa3, 0x0f, 0xc0, 0xb2, 0xcb, 0x2e, 0x4b, 0xfa, 0x22, 0x95, 0xc7, + 0x1e, 0x3b, 0x32, 0x01, 0x5a, 0xa9, 0x3b, 0xcf, 0x77, 0xce, 0xf7, 0xcd, 0x39, 0x67, 0x4e, 0xbe, + 0x40, 0xc9, 0xf2, 0xdc, 0xba, 0xe7, 0x53, 0x4e, 0x51, 0x61, 0x7c, 0xd8, 0xc5, 0xdc, 0x3a, 0xd4, + 0xf6, 0xfa, 0x2e, 0x77, 0x46, 0xdd, 0xba, 0x4d, 0x87, 0xfb, 0x7d, 0xda, 0xa7, 0xfb, 0x22, 0xde, + 0x1d, 0xbd, 0x11, 0x27, 0x71, 0x10, 0x5f, 0x21, 0xcf, 0x38, 0x85, 0xb5, 0x16, 0x1e, 0xbb, 0x36, + 0x6e, 0x0f, 0x46, 0x7d, 0x97, 0x5c, 0x78, 0xdc, 0xa5, 0x84, 0xa1, 0x5d, 0x40, 0x9e, 0x8f, 0x3b, + 0x8c, 0x5b, 0x3e, 0xef, 0xf8, 0xf8, 0xdd, 0xc8, 0xf5, 0x71, 0x4f, 0x55, 0x6a, 0xca, 0x4e, 0xd1, + 0xac, 0x7a, 0x3e, 0xbe, 0x0c, 0x02, 0x66, 0x84, 0x1b, 0x9f, 0x14, 0x58, 0x31, 0x71, 0xdf, 0x65, + 0x1c, 0xfb, 0x01, 0x88, 0x19, 0x47, 0x2a, 0x14, 0xc6, 0xd8, 0x67, 0x2e, 0x25, 0x82, 0x56, 0x32, + 0xe5, 0x11, 0x69, 0x50, 0xc4, 0xa4, 0xe7, 0x51, 0x97, 0x70, 0x35, 0x2b, 0x42, 0xf1, 0x19, 0xfd, + 0x05, 0x4b, 0x3e, 0x66, 0x74, 0xe4, 0xdb, 0xb8, 0x43, 0xac, 0x21, 0x56, 0x73, 0x22, 0xa1, 0x22, + 0xc1, 0x17, 0xd6, 0x10, 0xa3, 0x63, 0x28, 0xd0, 0xb0, 0x4e, 0x75, 0xa1, 0xa6, 0xec, 0x94, 0x1b, + 0xdb, 0xf5, 0xa8, 0xfb, 0xfa, 0x9c, 0x5e, 0x4c, 0x99, 0x6c, 0x14, 0x60, 0xf1, 0x6c, 0xe8, 0xf1, + 0x89, 0xd1, 0x84, 0xf5, 0x67, 0x2e, 0xe3, 0x4d, 0xd2, 0x7b, 0x65, 0x71, 0xdb, 0x31, 0x31, 0xf3, + 0x28, 0x61, 0x18, 0xfd, 0x0b, 0x85, 0x9e, 0x10, 0x60, 0xaa, 0x52, 0xcb, 0xed, 0x94, 0x1b, 0x2b, + 0x29, 0x61, 0x53, 0xc6, 0x8d, 0x03, 0xc8, 0x87, 0x10, 0x5a, 0x86, 0xec, 0x79, 0x2b, 0xea, 0x31, + 0xeb, 0xb6, 0xd0, 0x06, 0xe4, 0x1d, 0x6c, 0x0d, 0xb8, 0x13, 0x35, 0x17, 0x9d, 0x8c, 0x13, 0x50, + 0xdb, 0xd1, 0xe0, 0x4e, 0x29, 0xe1, 0x96, 0x4b, 0x92, 0x61, 0xe9, 0x00, 0x91, 0xf0, 0x79, 0x2b, + 0xbc, 0xbb, 0x64, 0xce, 0x20, 0xc6, 0x16, 0x6c, 0xce, 0xe1, 0x86, 0x55, 0x1b, 0x36, 0xac, 0x34, + 0x07, 0x03, 0x6a, 0x5b, 0x1c, 0x4b, 0xbd, 0x36, 0x20, 0x5b, 0xe6, 0x89, 0xe7, 0xc3, 0x8c, 0xcb, + 0x9e, 0xfe, 0x8c, 0x7b, 0x8a, 0xa5, 0x52, 0x74, 0x73, 0xd5, 0x4e, 0x15, 0xc8, 0x82, 0xea, 0x6f, + 0x4b, 0xbf, 0xb7, 0xfa, 0x3e, 0x54, 0x13, 0x4a, 0x34, 0xea, 0x4b, 0x58, 0x9b, 0xad, 0x30, 0x44, + 0x65, 0x89, 0xc6, 0x5d, 0x25, 0x86, 0xa9, 0x26, 0xb2, 0xd3, 0x83, 0x60, 0xc6, 0x87, 0x1c, 0x6c, + 0xde, 0xca, 0x40, 0x0f, 0x61, 0x01, 0x93, 0xb1, 0xbc, 0x63, 0xf7, 0xfe, 0x3b, 0xea, 0x67, 0x64, + 0xcc, 0xce, 0x08, 0xf7, 0x27, 0xa6, 0x60, 0xa2, 0x7f, 0x20, 0x3f, 0xa4, 0x23, 0xc2, 0x99, 0x9a, + 0x15, 0x1a, 0xcb, 0xb1, 0xc6, 0xf3, 0x00, 0x36, 0xa3, 0x28, 0xda, 0x4b, 0xf6, 0x28, 0x27, 0x12, + 0xd7, 0x52, 0x7b, 0x74, 0xe9, 0x61, 0x3b, 0xde, 0x25, 0xf4, 0x12, 0xca, 0x16, 0x21, 0x94, 0x5b, + 0x72, 0xa7, 0x03, 0xca, 0xd1, 0x4f, 0xd4, 0xd7, 0x4c, 0x58, 0x61, 0x99, 0xb3, 0x3a, 0xda, 0x7f, + 0x50, 0x8a, 0x1b, 0x40, 0x55, 0xc8, 0xbd, 0xc5, 0x93, 0x68, 0x4d, 0x83, 0x4f, 0xb4, 0x0e, 0x8b, + 0x63, 0x6b, 0x30, 0xc2, 0xd1, 0x9a, 0x86, 0x87, 0x93, 0xec, 0xff, 0x8a, 0xf6, 0x00, 0xaa, 0x69, + 0xe5, 0x5f, 0xe1, 0x1b, 0x0e, 0x2c, 0x8a, 0x79, 0xa0, 0xbf, 0x61, 0x39, 0x79, 0x64, 0xcf, 0xe2, + 0x4e, 0xc4, 0x5f, 0x8a, 0xd1, 0xb6, 0xc5, 0x1d, 0xb4, 0x05, 0x25, 0x87, 0x32, 0x1e, 0x66, 0x44, + 0x8e, 0x10, 0x00, 0x32, 0xe8, 0x63, 0xab, 0xd7, 0xa1, 0x64, 0x30, 0x11, 0x6e, 0x50, 0x34, 0x8b, + 0x01, 0x70, 0x41, 0x06, 0x13, 0xc3, 0x07, 0x48, 0x06, 0xfa, 0x5b, 0xae, 0xab, 0x41, 0xd9, 0xc3, + 0xfe, 0xd0, 0x65, 0x4c, 0xbc, 0x45, 0x68, 0x3f, 0xb3, 0x50, 0xe3, 0x31, 0x54, 0x42, 0xaf, 0xf3, + 0xc5, 0x7c, 0xd0, 0x31, 0x14, 0xa5, 0xf7, 0x21, 0x35, 0x7e, 0xb4, 0x94, 0x1d, 0x6a, 0xc9, 0xaa, + 0x84, 0x16, 0x94, 0x69, 0x7c, 0xce, 0x42, 0x65, 0xd6, 0xae, 0xd0, 0x53, 0xd8, 0x78, 0x82, 0xf9, + 0x3c, 0x37, 0x4e, 0x91, 0xb5, 0x3b, 0xfd, 0xce, 0xc8, 0xa0, 0x26, 0x54, 0x66, 0xfd, 0xed, 0x06, + 0xff, 0x8f, 0xf8, 0x3c, 0xcf, 0x06, 0x8d, 0xcc, 0x81, 0x82, 0x9a, 0x50, 0x94, 0xeb, 0x36, 0xd3, + 0x55, 0xea, 0x97, 0xaf, 0x6d, 0xce, 0x89, 0x48, 0x11, 0xf4, 0x1a, 0x56, 0x6f, 0x98, 0x16, 0x4a, + 0xdc, 0xe7, 0x36, 0x33, 0xd4, 0x8c, 0xbb, 0x52, 0xa4, 0xfa, 0xa3, 0xed, 0xab, 0x6b, 0x5d, 0xf9, + 0x7a, 0xad, 0x67, 0xde, 0x4f, 0x75, 0xe5, 0x6a, 0xaa, 0x2b, 0x5f, 0xa6, 0xba, 0xf2, 0x6d, 0xaa, + 0x2b, 0x1f, 0xbf, 0xeb, 0x99, 0x6e, 0x5e, 0xfc, 0xbb, 0x1d, 0xfd, 0x08, 0x00, 0x00, 0xff, 0xff, + 0xb9, 0xc2, 0x87, 0x92, 0x22, 0x07, 0x00, 0x00, } diff --git a/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto b/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto index fe4b892fb0..efbd72c133 100644 --- a/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto +++ b/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto @@ -46,6 +46,10 @@ message Empty { // DevicePlugin is the service advertised by Device Plugins service DevicePlugin { + // GetDevicePluginOptions returns options to be communicated with Device + // Manager + rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {} + // ListAndWatch returns a stream of List of Devices // Whenever a Device state change or a Device disapears, ListAndWatch // returns the new list @@ -102,6 +106,10 @@ message PreStartContainerResponse { // - Allocate allows Device Plugin to run device specific operations on // the Devices requested message AllocateRequest { + repeated ContainerAllocateRequest container_requests = 1; +} + +message ContainerAllocateRequest { repeated string devicesIDs = 1; } @@ -114,6 +122,10 @@ message AllocateRequest { // The Device plugin should send a ListAndWatch update and fail the // Allocation request message AllocateResponse { + repeated ContainerAllocateResponse container_responses = 1; +} + +message ContainerAllocateResponse { // List of environment variable to be set in the container to access one of more devices. map envs = 1; // Mounts for the container. diff --git a/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go b/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go index d2f8e21162..13c9bdd101 100644 --- a/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go +++ b/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go @@ -19,7 +19,7 @@ package v1beta1 const ( // Healthy means that the device is healty Healthy = "Healthy" - // UnHealthy means that the device is unhealty + // UnHealthy means that the device is unhealthy Unhealthy = "Unhealthy" // Current version of the API supported by kubelet diff --git a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go index 3b5f81ec96..a7c3fd38bb 100644 --- a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go +++ b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go @@ -130,6 +130,11 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerF return nil } +// GetDevicePluginOptions returns DevicePluginOptions settings for the device plugin. +func (m *Stub) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + return &pluginapi.DevicePluginOptions{}, nil +} + // PreStartContainer resets the devices received func (m *Stub) PreStartContainer(ctx context.Context, r *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { log.Printf("PreStartContainer, %+v", r) diff --git a/pkg/kubelet/cm/devicemanager/endpoint.go b/pkg/kubelet/cm/devicemanager/endpoint.go index ca9d69639a..6f57a482ef 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint.go +++ b/pkg/kubelet/cm/devicemanager/endpoint.go @@ -179,7 +179,9 @@ func (e *endpointImpl) run() { // allocate issues Allocate gRPC call to the device plugin. func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) { return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{ - DevicesIDs: devs, + ContainerRequests: []*pluginapi.ContainerAllocateRequest{ + {DevicesIDs: devs}, + }, }) } diff --git a/pkg/kubelet/cm/devicemanager/endpoint_test.go b/pkg/kubelet/cm/devicemanager/endpoint_test.go index 9774dd2e13..59edf6baf2 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint_test.go +++ b/pkg/kubelet/cm/devicemanager/endpoint_test.go @@ -130,24 +130,27 @@ func TestAllocate(t *testing.T) { defer ecleanup(t, p, e) resp := new(pluginapi.AllocateResponse) - resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + contResp := new(pluginapi.ContainerAllocateResponse) + contResp.Devices = append(contResp.Devices, &pluginapi.DeviceSpec{ ContainerPath: "/dev/aaa", HostPath: "/dev/aaa", Permissions: "mrw", }) - resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + contResp.Devices = append(contResp.Devices, &pluginapi.DeviceSpec{ ContainerPath: "/dev/bbb", HostPath: "/dev/bbb", Permissions: "mrw", }) - resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ + contResp.Mounts = append(contResp.Mounts, &pluginapi.Mount{ ContainerPath: "/container_dir1/file1", HostPath: "host_dir1/file1", ReadOnly: true, }) + resp.ContainerResponses = append(resp.ContainerResponses, contResp) + p.SetAllocFunc(func(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) { return resp, nil }) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index a535824982..ce2093edf8 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -643,6 +643,8 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont } devs := allocDevices.UnsortedList() + // TODO: refactor this part of code to just append a ContainerAllocationRequest + // in a passed in AllocateRequest pointer, and issues a single Allocate call per pod. glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource) resp, err := e.allocate(devs) metrics.DevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime)) @@ -657,7 +659,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont // Update internal cached podDevices state. m.mutex.Lock() - m.podDevices.insert(podUID, contName, resource, allocDevices, resp) + m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0]) m.mutex.Unlock() } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 0dcf7042a6..643b965387 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -279,8 +279,8 @@ func constructDevices(devices []string) sets.String { return ret } -func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.AllocateResponse { - resp := &pluginapi.AllocateResponse{} +func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.ContainerAllocateResponse { + resp := &pluginapi.ContainerAllocateResponse{} for k, v := range devices { resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ HostPath: k, @@ -458,7 +458,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso if res.resourceName == "domain2.com/resource2" { testManager.endpoints[res.resourceName] = &MockEndpoint{ allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) { - resp := new(pluginapi.AllocateResponse) + resp := new(pluginapi.ContainerAllocateResponse) resp.Envs = make(map[string]string) for _, dev := range devs { switch dev { @@ -469,7 +469,9 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso resp.Envs["key2"] = "val3" } } - return resp, nil + resps := new(pluginapi.AllocateResponse) + resps.ContainerResponses = append(resps.ContainerResponses, resp) + return resps, nil }, } } @@ -774,8 +776,10 @@ func TestDevicePreStartContainer(t *testing.T) { as.Contains(initializedDevs, "dev2") as.Equal(len(initializedDevs), len(res1.devs)) - expectedResp, err := allocateStubFunc()([]string{"dev1", "dev2"}) + expectedResps, err := allocateStubFunc()([]string{"dev1", "dev2"}) as.Nil(err) + as.Equal(1, len(expectedResps.ContainerResponses)) + expectedResp := expectedResps.ContainerResponses[0] as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices)) as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts)) as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs)) @@ -783,7 +787,7 @@ func TestDevicePreStartContainer(t *testing.T) { func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) { return func(devs []string) (*pluginapi.AllocateResponse, error) { - resp := new(pluginapi.AllocateResponse) + resp := new(pluginapi.ContainerAllocateResponse) resp.Envs = make(map[string]string) for _, dev := range devs { switch dev { @@ -822,6 +826,8 @@ func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) resp.Envs["key1"] = "val1" } } - return resp, nil + resps := new(pluginapi.AllocateResponse) + resps.ContainerResponses = append(resps.ContainerResponses, resp) + return resps, nil } } diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 89b03353a2..eb20dc0a6e 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -28,7 +28,7 @@ type deviceAllocateInfo struct { // deviceIds contains device Ids allocated to this container for the given resourceName. deviceIds sets.String // allocResp contains cached rpc AllocateResponse. - allocResp *pluginapi.AllocateResponse + allocResp *pluginapi.ContainerAllocateResponse } type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName. @@ -43,7 +43,7 @@ func (pdev podDevices) pods() sets.String { return ret } -func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.AllocateResponse) { +func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) { if _, podExists := pdev[podUID]; !podExists { pdev[podUID] = make(containerDevices) } @@ -168,7 +168,7 @@ func (pdev podDevices) fromCheckpointData(data []podDevicesCheckpointEntry) { for _, devID := range entry.DeviceIDs { devIDs.Insert(devID) } - allocResp := &pluginapi.AllocateResponse{} + allocResp := &pluginapi.ContainerAllocateResponse{} err := allocResp.Unmarshal(entry.AllocResp) if err != nil { glog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err) diff --git a/test/e2e_node/device_plugin.go b/test/e2e_node/device_plugin.go index 7f10cd379d..7ae7003598 100644 --- a/test/e2e_node/device_plugin.go +++ b/test/e2e_node/device_plugin.go @@ -229,34 +229,38 @@ func numberOfDevices(node *v1.Node, resourceName string) int64 { // stubAllocFunc will pass to stub device plugin func stubAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) { - var response pluginapi.AllocateResponse - for _, requestID := range r.DevicesIDs { - dev, ok := devs[requestID] - if !ok { - return nil, fmt.Errorf("invalid allocation request with non-existing device %s", requestID) + var responses pluginapi.AllocateResponse + for _, req := range r.ContainerRequests { + response := &pluginapi.ContainerAllocateResponse{} + for _, requestID := range req.DevicesIDs { + dev, ok := devs[requestID] + if !ok { + return nil, fmt.Errorf("invalid allocation request with non-existing device %s", requestID) + } + + if dev.Health != pluginapi.Healthy { + return nil, fmt.Errorf("invalid allocation request with unhealthy device: %s", requestID) + } + + // create fake device file + fpath := filepath.Join("/tmp", dev.ID) + + // clean first + os.RemoveAll(fpath) + f, err := os.Create(fpath) + if err != nil && !os.IsExist(err) { + return nil, fmt.Errorf("failed to create fake device file: %s", err) + } + + f.Close() + + response.Mounts = append(response.Mounts, &pluginapi.Mount{ + ContainerPath: fpath, + HostPath: fpath, + }) } - - if dev.Health != pluginapi.Healthy { - return nil, fmt.Errorf("invalid allocation request with unhealthy device: %s", requestID) - } - - // create fake device file - fpath := filepath.Join("/tmp", dev.ID) - - // clean first - os.RemoveAll(fpath) - f, err := os.Create(fpath) - if err != nil && !os.IsExist(err) { - return nil, fmt.Errorf("failed to create fake device file: %s", err) - } - - f.Close() - - response.Mounts = append(response.Mounts, &pluginapi.Mount{ - ContainerPath: fpath, - HostPath: fpath, - }) + responses.ContainerResponses = append(responses.ContainerResponses, response) } - return &response, nil + return &responses, nil }