mirror of https://github.com/k3s-io/k3s
Scrub pod closes watch channel
parent
c5553af281
commit
4210a5dfd8
|
@ -82,7 +82,10 @@ func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient
|
|||
|
||||
defer scrubberClient.DeletePod(pod.Name, pod.Namespace)
|
||||
|
||||
nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion)
|
||||
stopChannel := make(chan struct{})
|
||||
defer close(stopChannel)
|
||||
nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel)
|
||||
|
||||
for {
|
||||
watchedPod := nextPod()
|
||||
if watchedPod.Status.Phase == api.PodSucceeded {
|
||||
|
@ -106,7 +109,7 @@ type scrubberClient interface {
|
|||
CreatePod(pod *api.Pod) (*api.Pod, error)
|
||||
GetPod(name, namespace string) (*api.Pod, error)
|
||||
DeletePod(name, namespace string) error
|
||||
WatchPod(name, namespace, resourceVersion string) func() *api.Pod
|
||||
WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod
|
||||
}
|
||||
|
||||
func newScrubberClient(client client.Interface) scrubberClient {
|
||||
|
@ -129,7 +132,10 @@ func (c *realScrubberClient) DeletePod(name, namespace string) error {
|
|||
return c.client.Pods(namespace).Delete(name, nil)
|
||||
}
|
||||
|
||||
func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod {
|
||||
// WatchPod returns a ListWatch for watching a pod. The stopChannel is used
|
||||
// to close the reflector backing the watch. The caller is responsible for derring a close on the channel to
|
||||
// stop the reflector.
|
||||
func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
|
||||
fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
|
||||
|
||||
podLW := &cache.ListWatch{
|
||||
|
@ -141,7 +147,7 @@ func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string) f
|
|||
},
|
||||
}
|
||||
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
||||
cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).Run()
|
||||
cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel)
|
||||
|
||||
return func() *api.Pod {
|
||||
obj := queue.Pop()
|
||||
|
|
|
@ -95,7 +95,7 @@ func (c *mockScrubberClient) DeletePod(name, namespace string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod {
|
||||
func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
|
||||
return func() *api.Pod {
|
||||
return c.pod
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue