Create endpoint/service early to avoid unwanted create/delete volume transaction.

At times, for some reason endpoint/service creation can fail in a setup. As we
currently create endpoint/service after volume creation, later we need rollback
of this volume transaction if endpoint/service creation failed. Considering
endpoint/service creation is light weight, this patch promote endpoint/service
creation to an early stage.

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
k3s-v1.15.3
Humble Chirammal 2019-04-10 18:10:36 +05:30
parent aa74064600
commit 7544b53693
3 changed files with 46 additions and 25 deletions

View File

@ -809,6 +809,19 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersi
customVolumeName := ""
epServiceName := ""
if len(p.provisionerConfig.customEpNamePrefix) == 0 {
epServiceName = string(p.options.PVC.UID)
} else {
epServiceName = p.provisionerConfig.customEpNamePrefix + "-" + string(p.options.PVC.UID)
}
epNamespace := p.options.PVC.Namespace
endpoint, service, err := p.createEndpointService(epNamespace, epServiceName, p.options.PVC.Name)
if err != nil {
klog.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
return nil, 0, "", fmt.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
}
klog.V(3).Infof("dynamic endpoint %v and service %v ", endpoint, service)
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
// GlusterFS/heketi creates volumes in units of GiB.
@ -859,22 +872,28 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersi
return nil, 0, "", fmt.Errorf("failed to get cluster nodes for volume %s: %v", volume, err)
}
if len(p.provisionerConfig.customEpNamePrefix) == 0 {
epServiceName = string(p.options.PVC.UID)
} else {
epServiceName = p.provisionerConfig.customEpNamePrefix + "-" + string(p.options.PVC.UID)
addrlist := make([]v1.EndpointAddress, len(dynamicHostIps))
for i, v := range dynamicHostIps {
addrlist[i].IP = v
}
epNamespace := p.options.PVC.Namespace
endpoint, service, err := p.createEndpointService(epNamespace, epServiceName, dynamicHostIps, p.options.PVC)
subset := make([]v1.EndpointSubset, 1)
ports := []v1.EndpointPort{{Port: 1, Protocol: "TCP"}}
endpoint.Subsets = subset
endpoint.Subsets[0].Addresses = addrlist
endpoint.Subsets[0].Ports = ports
kubeClient := p.plugin.host.GetKubeClient()
if kubeClient == nil {
return nil, 0, "", fmt.Errorf("failed to get kube client to update endpoint")
}
_, err = kubeClient.CoreV1().Endpoints(epNamespace).Update(endpoint)
if err != nil {
klog.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
deleteErr := cli.VolumeDelete(volume.Id)
if deleteErr != nil {
klog.Errorf("failed to delete volume: %v, manual deletion of the volume required", deleteErr)
}
return nil, 0, "", fmt.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
return nil, 0, "", fmt.Errorf("failed to update endpoint %s: %v", endpoint, err)
}
klog.V(3).Infof("dynamic endpoint %v and service %v ", endpoint, service)
klog.V(3).Infof("endpoint %s updated successfully", endpoint)
return &v1.GlusterfsPersistentVolumeSource{
EndpointsName: endpoint.Name,
EndpointsNamespace: &epNamespace,
@ -884,10 +903,11 @@ func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersi
}
// createEndpointService() makes sure an endpoint and service
// exist for the given namespace, PVC name, endpoint name, and
// set of IPs. I.e. the endpoint or service is only created
// exist for the given namespace, PVC name, endpoint name
// I.e. the endpoint or service is only created
// if it does not exist yet.
func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epServiceName string, hostips []string, pvc *v1.PersistentVolumeClaim) (endpoint *v1.Endpoints, service *v1.Service, err error) {
func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epServiceName string, pvcname string) (endpoint *v1.Endpoints, service *v1.Service, err error) {
pvcNameOrID := ""
if len(pvc.Name) >= 63 {
pvcNameOrID = string(pvc.UID)
@ -895,10 +915,6 @@ func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epS
pvcNameOrID = pvc.Name
}
addrlist := make([]v1.EndpointAddress, len(hostips))
for i, v := range hostips {
addrlist[i].IP = v
}
endpoint = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
@ -907,10 +923,6 @@ func (p *glusterfsVolumeProvisioner) createEndpointService(namespace string, epS
"gluster.kubernetes.io/provisioned-for-pvc": pvcNameOrID,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: addrlist,
Ports: []v1.EndpointPort{{Port: 1, Protocol: "TCP"}},
}},
}
kubeClient := p.plugin.host.GetKubeClient()
if kubeClient == nil {

View File

@ -230,7 +230,8 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
// glusterfs
rbacv1helpers.NewRule("get", "list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
rbacv1helpers.NewRule("get", "create", "delete").Groups(legacyGroup).Resources("services", "endpoints").RuleOrDie(),
rbacv1helpers.NewRule("get", "create", "update", "delete").Groups(legacyGroup).Resources("endpoints").RuleOrDie(),
rbacv1helpers.NewRule("get", "create", "delete").Groups(legacyGroup).Resources("services").RuleOrDie(),
rbacv1helpers.NewRule("get").Groups(legacyGroup).Resources("secrets").RuleOrDie(),
// openstack
rbacv1helpers.NewRule("get", "list").Groups(legacyGroup).Resources("nodes").RuleOrDie(),

View File

@ -789,6 +789,14 @@ items:
- ""
resources:
- endpoints
verbs:
- create
- delete
- get
- update
- apiGroups:
- ""
resources:
- services
verbs:
- create