remove deprecated use of wait functions

Signed-off-by: Will <will7989@hotmail.com>
pull/10647/head
Will 2024-07-18 20:45:19 +01:00 committed by Brad Davidson
parent e514940020
commit e4f3cc7b54
12 changed files with 21 additions and 17 deletions

View File

@ -59,6 +59,8 @@ func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) (*config.Node
// does not support jittering, so we instead use wait.JitterUntilWithContext, and cancel
// the context on success.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
agentConfig, err = get(ctx, &agent, proxy)
if err != nil {
@ -78,7 +80,7 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy
var disabled bool
var err error
wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
disabled, err = getKubeProxyDisabled(ctx, node, proxy)
if err != nil {
logrus.Infof("Waiting to retrieve kube-proxy configuration; server is not ready: %v", err)
@ -96,7 +98,7 @@ func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []str
var addresses []string
var err error
wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
addresses, err = getAPIServers(ctx, node, proxy)
if err != nil {
logrus.Infof("Failed to retrieve list of apiservers from server: %v", err)

View File

@ -70,7 +70,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
// kube-router netpol requires addresses to be available in the node object.
// Wait until the uninitialized taint has been removed, at which point the addresses should be set.
// TODO: Replace with non-deprecated PollUntilContextTimeout when our and Kubernetes code migrate to it
if err := wait.PollImmediateInfiniteWithContext(ctx, 2*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
// Get the node object
node, err := client.CoreV1().Nodes().Get(ctx, nodeConfig.AgentConfig.NodeName, metav1.GetOptions{})
if err != nil {

View File

@ -181,7 +181,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) {
<-apiServerReady
wait.PollImmediateWithContext(ctx, time.Second, util.DefaultAPIServerReadyTimeout, func(ctx context.Context) (bool, error) {
wait.PollUntilContextTimeout(ctx, time.Second, util.DefaultAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) {
var readyTime metav1.Time
nodeName := os.Getenv("NODE_NAME")
node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})

View File

@ -123,7 +123,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
}
if !c.config.EtcdDisableSnapshots {
wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
err := c.managedDB.ReconcileSnapshotData(ctx)
if err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)

View File

@ -136,7 +136,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
return
}
// We use Poll here instead of Until because we want to wait the interval before running the function.
go wait.PollUntilWithContext(ctx, 30*time.Second, func(ctx context.Context) (bool, error) {
go wait.PollUntilContextCancel(ctx, 30*time.Second, true, func(ctx context.Context) (bool, error) {
clientURLs, err := c.managedDB.GetMembersClientURLs(ctx)
if err != nil {
logrus.Warnf("Failed to get etcd ClientURLs: %v", err)

View File

@ -39,7 +39,7 @@ func RotateBootstrapToken(ctx context.Context, config *config.Control, oldToken
tokenKey := storageKey(normalizedToken)
var bootstrapList []client.Value
if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0)
if err != nil {
return false, err
@ -198,7 +198,7 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error {
attempts := 0
tokenKey := storageKey(normalizedToken)
return wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
attempts++
value, saveBootstrap, err := getBootstrapKeyFromStorage(ctx, storageClient, normalizedToken, token)
c.saveBootstrap = saveBootstrap
@ -258,7 +258,7 @@ func getBootstrapKeyFromStorage(ctx context.Context, storageClient client.Client
var bootstrapList []client.Value
var err error
if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0)
if err != nil {
if errors.Is(err, rpctypes.ErrGPRCNotSupportedForLearner) {

View File

@ -482,7 +482,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for container runtime to become ready before joining etcd cluster")
case <-e.config.Runtime.ContainerRuntimeReady:
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
if err := e.join(ctx, clientAccessInfo); err != nil {
// Retry the join if waiting for another member to be promoted, or waiting for peers to connect after promotion
if errors.Is(err, rpctypes.ErrTooManyLearners) || errors.Is(err, rpctypes.ErrUnhealthy) {

View File

@ -198,8 +198,9 @@ func WriteEncryptionHashAnnotation(runtime *config.ControlRuntime, node *corev1.
// WaitForEncryptionConfigReload watches the metrics API, polling the latest time the encryption config was reloaded.
func WaitForEncryptionConfigReload(runtime *config.ControlRuntime, reloadSuccesses, reloadTime int64) error {
var lastFailure string
err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
ctx := context.Background()
err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
newReloadTime, newReloadSuccess, err := GetEncryptionConfigMetrics(runtime, false)
if err != nil {
return true, err
@ -238,7 +239,8 @@ func GetEncryptionConfigMetrics(runtime *config.ControlRuntime, initialMetrics b
// This is wrapped in a poller because on startup no metrics exist. Its only after the encryption config
// is modified and the first reload occurs that the metrics are available.
err = wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
ctx := context.Background()
err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
data, err := restClient.Get().AbsPath("/metrics").DoRaw(context.TODO())
if err != nil {
return true, err

View File

@ -502,7 +502,7 @@ func verifyNode(ctx context.Context, nodeClient coreclient.NodeController, node
func ensureSecret(ctx context.Context, config *Config, node *nodeInfo) {
runtime := config.ControlConfig.Runtime
wait.PollImmediateUntilWithContext(ctx, time.Second*5, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, time.Second*5, true, func(ctx context.Context) (bool, error) {
if runtime.Core != nil {
secretClient := runtime.Core.Core().V1().Secret()
// This is consistent with events attached to the node generated by the kubelet

View File

@ -103,7 +103,7 @@ func NewServerBootstrapper(controlConfig *config.Control) routing.Bootstrapper {
func (s *serverBootstrapper) Run(_ context.Context, id string) error {
s.controlConfig.Runtime.ClusterControllerStarts["spegel-p2p"] = func(ctx context.Context) {
nodes := s.controlConfig.Runtime.Core.Core().V1().Node()
wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return false, nil

View File

@ -236,7 +236,7 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {
// Wait up to 5 seconds for the p2p network to find peers. This will return
// immediately if the node is bootstrapping from itself.
wait.PollImmediateWithContext(ctx, time.Second, resolveTimeout, func(_ context.Context) (bool, error) {
_ = wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) {
return router.Ready()
})

View File

@ -80,7 +80,7 @@ func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout t
return err
}
err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
healthStatus := 0
result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus)
if rerr := result.Error(); rerr != nil {
@ -128,7 +128,7 @@ func WaitForRBACReady(ctx context.Context, kubeconfigPath string, timeout time.D
reviewFunc = subjectAccessReview(authClient, ra, user, groups)
}
err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
status, rerr := reviewFunc(ctx)
if rerr != nil {
lastErr = rerr