Merge pull request #76178 from humblec/endpoint

Create endpoint/service early to avoid unwanted create/delete volume transaction.
k3s-v1.15.3
Kubernetes Prow Robot 2019-05-22 09:58:09 -07:00 committed by GitHub
commit d5876954e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 21 deletions

View File

@ -804,6 +804,24 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersi
customVolumeName := ""
epServiceName := ""
kubeClient := p.plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, 0, "", fmt.Errorf("failed to get kube client to update endpoint")
}
if len(p.provisionerConfig.customEpNamePrefix) == 0 {
epServiceName = string(p.options.PVC.UID)
} else {
epServiceName = p.provisionerConfig.customEpNamePrefix + "-" + string(p.options.PVC.UID)
}
epNamespace := p.options.PVC.Namespace
endpoint, service, err := p.createOrGetEndpointService(epNamespace, epServiceName, p.options.PVC)
if err != nil {
klog.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
return nil, 0, "", fmt.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
}
klog.V(3).Infof("dynamic endpoint %v and service %v ", endpoint, service)
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
// GlusterFS/heketi creates volumes in units of GiB.
@ -850,21 +868,42 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersi
return nil, 0, "", fmt.Errorf("failed to get cluster nodes for volume %s: %v", volume, err)
}
if len(p.provisionerConfig.customEpNamePrefix) == 0 {
epServiceName = string(p.options.PVC.UID)
} else {
epServiceName = p.provisionerConfig.customEpNamePrefix + "-" + string(p.options.PVC.UID)
addrlist := make([]v1.EndpointAddress, len(dynamicHostIps))
for i, v := range dynamicHostIps {
addrlist[i].IP = v
}
epNamespace := p.options.PVC.Namespace
endpoint, service, err := p.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC)
subset := make([]v1.EndpointSubset, 1)
ports := []v1.EndpointPort{{Port: 1, Protocol: "TCP"}}
endpoint.Subsets = subset
endpoint.Subsets[0].Addresses = addrlist
endpoint.Subsets[0].Ports = ports
_, err = kubeClient.CoreV1().Endpoints(epNamespace).Update(endpoint)
if err != nil {
deleteErr := cli.VolumeDelete(volume.Id)
if deleteErr != nil {
klog.Errorf("failed to delete volume: %v, manual deletion of the volume required", deleteErr)
}
return nil, 0, "", fmt.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
klog.V(3).Infof("failed to update endpoint, deleting %s", endpoint)
err = kubeClient.CoreV1().Services(epNamespace).Delete(epServiceName, nil)
if err != nil && errors.IsNotFound(err) {
klog.V(1).Infof("service %s does not exist in namespace %s", epServiceName, epNamespace)
err = nil
}
if err != nil {
klog.Errorf("failed to delete service %s/%s: %v", epNamespace, epServiceName, err)
}
klog.V(1).Infof("service/endpoint: %s/%s deleted successfully", epNamespace, epServiceName)
return nil, 0, "", fmt.Errorf("failed to update endpoint %s: %v", endpoint, err)
}
klog.V(3).Infof("dynamic endpoint %v and service %v ", endpoint, service)
klog.V(3).Infof("endpoint %s updated successfully", endpoint)
return &v1.GlusterfsPersistentVolumeSource{
EndpointsName: endpoint.Name,
EndpointsNamespace: &epNamespace,
@ -873,11 +912,11 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersi
}, sz, volID, nil
}
// createEndpointService() makes sure an endpoint and service
// exist for the given namespace, PVC name, endpoint name, and
// set of IPs. I.e. the endpoint or service is only created
// createOrGetEndpointService() makes sure an endpoint and service
// exist for the given namespace, PVC name, endpoint name
// I.e. the endpoint or service is only created
// if it does not exist yet.
func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epServiceName string, hostips []string, pvc *v1.PersistentVolumeClaim) (endpoint *v1.Endpoints, service *v1.Service, err error) {
func (p *glusterfsVolumeProvisioner) createOrGetEndpointService(namespace string, epServiceName string, pvc *v1.PersistentVolumeClaim) (endpoint *v1.Endpoints, service *v1.Service, err error) {
pvcNameOrID := ""
if len(pvc.Name) >= 63 {
pvcNameOrID = string(pvc.UID)
@ -885,10 +924,6 @@ func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epS
pvcNameOrID = pvc.Name
}
addrlist := make([]v1.EndpointAddress, len(hostips))
for i, v := range hostips {
addrlist[i].IP = v
}
endpoint = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
@ -897,10 +932,6 @@ func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epS
"gluster.kubernetes.io/provisioned-for-pvc": pvcNameOrID,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: addrlist,
Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}},
}},
}
kubeClient := p.plugin.host.GetKubeClient()
if kubeClient == nil {

View File

@ -230,7 +230,8 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
// glusterfs
rbacv1helpers.NewRule("get", "list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
rbacv1helpers.NewRule("get", "create", "delete").Groups(legacyGroup).Resources("services", "endpoints").RuleOrDie(),
rbacv1helpers.NewRule("get", "create", "update", "delete").Groups(legacyGroup).Resources("endpoints").RuleOrDie(),
rbacv1helpers.NewRule("get", "create", "delete").Groups(legacyGroup).Resources("services").RuleOrDie(),
rbacv1helpers.NewRule("get").Groups(legacyGroup).Resources("secrets").RuleOrDie(),
// openstack
rbacv1helpers.NewRule("get", "list").Groups(legacyGroup).Resources("nodes").RuleOrDie(),

View File

@ -789,6 +789,14 @@ items:
- ""
resources:
- endpoints
verbs:
- create
- delete
- get
- update
- apiGroups:
- ""
resources:
- services
verbs:
- create