Merge pull request #2363 from erikwilson/netpol-informers

Add event handlers to network policy controller
pull/2384/head
Erik Wilson 4 years ago committed by GitHub
commit 114b5ccad1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1683,6 +1683,10 @@ func NewNetworkPolicyController(
npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer() npInformer := informerFactory.Networking().V1().NetworkPolicies().Informer()
informerFactory.Start(stopCh) informerFactory.Start(stopCh)
if err := CacheSyncOrTimeout(informerFactory, stopCh, 1*time.Minute); err != nil {
return nil, errors.New("Failed to synchronize cache: " + err.Error())
}
// if config.MetricsEnabled { // if config.MetricsEnabled {
// //Register the metrics for this controller // //Register the metrics for this controller
// prometheus.MustRegister(metrics.ControllerIPtablesSyncTime) // prometheus.MustRegister(metrics.ControllerIPtablesSyncTime)
@ -1726,12 +1730,15 @@ func NewNetworkPolicyController(
npc.podLister = podInformer.GetIndexer() npc.podLister = podInformer.GetIndexer()
npc.PodEventHandler = npc.newPodEventHandler() npc.PodEventHandler = npc.newPodEventHandler()
podInformer.AddEventHandler(npc.PodEventHandler)
npc.nsLister = nsInformer.GetIndexer() npc.nsLister = nsInformer.GetIndexer()
npc.NamespaceEventHandler = npc.newNamespaceEventHandler() npc.NamespaceEventHandler = npc.newNamespaceEventHandler()
nsInformer.AddEventHandler(npc.NamespaceEventHandler)
npc.npLister = npInformer.GetIndexer() npc.npLister = npInformer.GetIndexer()
npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler() npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler()
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)
return &npc, nil return &npc, nil
} }

@ -12,8 +12,10 @@ import (
"net" "net"
"os/exec" "os/exec"
"strings" "strings"
"time"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
) )
var ( var (
@ -537,3 +539,19 @@ func GetNodeIP(node *apiv1.Node) (net.IP, error) {
} }
return nil, errors.New("host IP unknown") return nil, errors.New("host IP unknown")
} }
// CacheSync performs cache synchronization under timeout limit
func CacheSyncOrTimeout(informerFactory informers.SharedInformerFactory, stopCh <-chan struct{}, cacheSyncTimeout time.Duration) error {
syncOverCh := make(chan struct{})
go func() {
informerFactory.WaitForCacheSync(stopCh)
close(syncOverCh)
}()
select {
case <-time.After(cacheSyncTimeout):
return errors.New(cacheSyncTimeout.String() + " timeout")
case <-syncOverCh:
return nil
}
}

Loading…
Cancel
Save