Merge pull request #8269 from smarterclayton/add_proxier_error

Proxier should return typed errors
pull/6/head
Clayton Coleman 2015-05-18 10:00:44 -04:00
commit a2e4f95e32
3 changed files with 92 additions and 38 deletions

View File

@ -76,14 +76,14 @@ func (s *ProxyServer) AddFlags(fs *pflag.FlagSet) {
func (s *ProxyServer) Run(_ []string) error {
// TODO(vmarmol): Use container config for this.
if err := util.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil {
glog.Info(err)
glog.V(2).Info(err)
}
// Run in its own container.
if err := util.RunInResourceContainer(s.ResourceContainer); err != nil {
glog.Warningf("Failed to start in resource-only container %q: %v", s.ResourceContainer, err)
} else {
glog.Infof("Running in resource-only container %q", s.ResourceContainer)
glog.V(2).Infof("Running in resource-only container %q", s.ResourceContainer)
}
serviceConfig := config.NewServiceConfig()
@ -94,9 +94,9 @@ func (s *ProxyServer) Run(_ []string) error {
protocol = iptables.ProtocolIpv6
}
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol))
if proxier == nil {
glog.Fatalf("failed to create proxier, aborting")
proxier, err := proxy.NewProxier(loadBalancer, net.IP(s.BindAddress), iptables.New(exec.New(), protocol))
if err != nil {
glog.Fatalf("Unable to create proxer: %v", err)
}
// Wire proxier to handle changes to services

View File

@ -67,38 +67,50 @@ type Proxier struct {
hostIP net.IP
}
var (
// ErrProxyOnLocalhost is returned by NewProxier if the user requests a proxier on
// the loopback address. May be checked for by callers of NewProxier to know whether
// the caller provided invalid input.
ErrProxyOnLocalhost = fmt.Errorf("cannot proxy on localhost")
)
// IsProxyLocked returns true if the proxy could not acquire the lock on iptables.
func IsProxyLocked(err error) bool {
return strings.Contains(err.Error(), "holding the xtables lock")
}
// NewProxier returns a new Proxier given a LoadBalancer and an address on
// which to listen. Because of the iptables logic, It is assumed that there
// is only a single Proxier active on a machine.
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface) *Proxier {
// is only a single Proxier active on a machine. An error will be returned if
// the proxier cannot be started due to an invalid ListenIP (loopback) or
// if iptables fails to update or acquire the initial lock. Once a proxier is
// created, it will keep iptables up to date in the background and will not
// terminate if a particular iptables call fails.
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface) (*Proxier, error) {
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
glog.Errorf("Can't proxy only on localhost - iptables can't do it")
return nil
return nil, ErrProxyOnLocalhost
}
hostIP, err := util.ChooseHostInterface()
if err != nil {
glog.Errorf("Failed to select a host interface: %v", err)
return nil
return nil, fmt.Errorf("failed to select a host interface: %v", err)
}
glog.Infof("Setting Proxy IP to %v", hostIP)
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
return createProxier(loadBalancer, listenIP, iptables, hostIP)
}
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP) *Proxier {
glog.Infof("Initializing iptables")
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP) (*Proxier, error) {
// Clean up old messes. Ignore erors.
iptablesDeleteOld(iptables)
// Set up the iptables foundations we need.
if err := iptablesInit(iptables); err != nil {
glog.Errorf("Failed to initialize iptables: %v", err)
return nil
return nil, fmt.Errorf("failed to initialize iptables: %v", err)
}
// Flush old iptables rules (since the bound ports will be invalid after a restart).
// When OnUpdate() is first called, the rules will be recreated.
if err := iptablesFlush(iptables); err != nil {
glog.Errorf("Failed to flush iptables: %v", err)
return nil
return nil, fmt.Errorf("failed to flush iptables: %v", err)
}
return &Proxier{
loadBalancer: loadBalancer,
@ -106,7 +118,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
}
}, nil
}
// The periodic interval for checking the state of things.
@ -118,7 +130,7 @@ func (proxier *Proxier) SyncLoop() {
defer t.Stop()
for {
<-t.C
glog.V(3).Infof("Periodic sync")
glog.V(6).Infof("Periodic sync")
if err := iptablesInit(proxier.iptables); err != nil {
glog.Errorf("Failed to ensure iptables: %v", err)
}
@ -201,7 +213,7 @@ func (proxier *Proxier) addServiceOnPort(service ServicePortName, protocol api.P
}
proxier.setServiceInfo(service, si)
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
glog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service ServicePortName, proxier *Proxier) {
defer util.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1)
@ -340,7 +352,7 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
return err
}
if !existed {
glog.Infof("Opened iptables from-containers portal for service %q on %s %s:%d", name, protocol, portalIP, portalPort)
glog.V(3).Infof("Opened iptables from-containers portal for service %q on %s %s:%d", name, protocol, portalIP, portalPort)
}
// Handle traffic from the host.
@ -351,7 +363,7 @@ func (proxier *Proxier) openOnePortal(portalIP net.IP, portalPort int, protocol
return err
}
if !existed {
glog.Infof("Opened iptables from-host portal for service %q on %s %s:%d", name, protocol, portalIP, portalPort)
glog.V(3).Infof("Opened iptables from-host portal for service %q on %s %s:%d", name, protocol, portalIP, portalPort)
}
return nil
}
@ -363,7 +375,7 @@ func (proxier *Proxier) closePortal(service ServicePortName, info *serviceInfo)
el = append(el, proxier.closeOnePortal(net.ParseIP(publicIP), info.portalPort, info.protocol, proxier.listenIP, info.proxyPort, service)...)
}
if len(el) == 0 {
glog.Infof("Closed iptables portals for service %q", service)
glog.V(3).Infof("Closed iptables portals for service %q", service)
} else {
glog.Errorf("Some errors closing iptables portals for service %q", service)
}

View File

@ -206,7 +206,10 @@ func TestTCPProxy(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -230,7 +233,10 @@ func TestUDPProxy(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -259,7 +265,10 @@ func TestMultiPortProxy(t *testing.T) {
}},
}})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
@ -283,7 +292,10 @@ func TestMultiPortOnUpdate(t *testing.T) {
serviceQ := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"}
serviceX := ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "x"}
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
p.OnUpdate([]api.Service{{
@ -343,7 +355,10 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -378,7 +393,10 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -413,7 +431,10 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -447,7 +468,10 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -481,7 +505,10 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -530,7 +557,10 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -579,7 +609,10 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -624,7 +657,10 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -666,7 +702,10 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -715,7 +754,10 @@ func TestProxyUpdatePortal(t *testing.T) {
},
})
p := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)