refine error handling in retry logic

pull/432/head
Darien Raymond 2017-02-10 11:41:50 +01:00
parent 51745e772c
commit 5e7fb6d0dd
No known key found for this signature in database
GPG Key ID: 7251FFA14BB18169
10 changed files with 33 additions and 35 deletions

View File

@ -66,10 +66,13 @@ func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*H
func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) { func (h *Handler) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
err := h.proxy.Process(ctx, outboundRay, h) err := h.proxy.Process(ctx, outboundRay, h)
// Ensure outbound ray is properly closed. // Ensure outbound ray is properly closed.
if err != nil && errors.Cause(err) != io.EOF { if err != nil {
outboundRay.OutboundOutput().CloseError() log.Warning("Proxyman|OutboundHandler: Failed to process outbound traffic.")
} else { if errors.Cause(err) != io.EOF {
outboundRay.OutboundOutput().Close() outboundRay.OutboundOutput().CloseError()
} else {
outboundRay.OutboundOutput().Close()
}
} }
outboundRay.OutboundInput().CloseError() outboundRay.OutboundInput().CloseError()
} }

View File

@ -2,11 +2,12 @@ package retry
import ( import (
"time" "time"
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
) )
var ( var (
ErrRetryFailed = errors.New("All retry attempts failed.") ErrRetryFailed = errors.New("Retry: All retry attempts failed.")
) )
// Strategy is a way to retry on a specific function. // Strategy is a way to retry on a specific function.
@ -23,16 +24,21 @@ type retryer struct {
// On implements Strategy.On. // On implements Strategy.On.
func (r *retryer) On(method func() error) error { func (r *retryer) On(method func() error) error {
attempt := 0 attempt := 0
accumulatedError := make([]error, 0, r.totalAttempt)
for attempt < r.totalAttempt { for attempt < r.totalAttempt {
err := method() err := method()
if err == nil { if err == nil {
return nil return nil
} }
numErrors := len(accumulatedError)
if numErrors == 0 || err.Error() != accumulatedError[numErrors-1].Error() {
accumulatedError = append(accumulatedError, err)
}
delay := r.nextDelay() delay := r.nextDelay()
<-time.After(time.Duration(delay) * time.Millisecond) <-time.After(time.Duration(delay) * time.Millisecond)
attempt++ attempt++
} }
return ErrRetryFailed return errors.Base(ErrRetryFailed).Message(accumulatedError)
} }
// Timed returns a retry strategy with fixed interval. // Timed returns a retry strategy with fixed interval.

View File

@ -73,7 +73,7 @@ func TestRetryExhausted(t *testing.T) {
}) })
duration := time.Since(startTime) duration := time.Since(startTime)
assert.Error(err).Equals(ErrRetryFailed) assert.Error(errors.Cause(err)).Equals(ErrRetryFailed)
assert.Int64(int64(duration / time.Millisecond)).AtLeast(1900) assert.Int64(int64(duration / time.Millisecond)).AtLeast(1900)
} }
@ -88,6 +88,6 @@ func TestExponentialBackoff(t *testing.T) {
}) })
duration := time.Since(startTime) duration := time.Since(startTime)
assert.Error(err).Equals(ErrRetryFailed) assert.Error(errors.Cause(err)).Equals(ErrRetryFailed)
assert.Int64(int64(duration / time.Millisecond)).AtLeast(4000) assert.Int64(int64(duration / time.Millisecond)).AtLeast(4000)
} }

View File

@ -3,7 +3,6 @@ package blackhole
import ( import (
"context" "context"
"errors"
"time" "time"
"v2ray.com/core/common" "v2ray.com/core/common"
@ -11,10 +10,6 @@ import (
"v2ray.com/core/transport/ray" "v2ray.com/core/transport/ray"
) )
var (
errConnectionBlocked = errors.New("Blackhole: connection blocked.")
)
// Handler is an outbound connection that sliently swallow the entire payload. // Handler is an outbound connection that sliently swallow the entire payload.
type Handler struct { type Handler struct {
response ResponseConfig response ResponseConfig
@ -36,7 +31,9 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
v.response.WriteTo(outboundRay.OutboundOutput()) v.response.WriteTo(outboundRay.OutboundOutput())
// Sleep a little here to make sure the response is sent to client. // Sleep a little here to make sure the response is sent to client.
time.Sleep(time.Second) time.Sleep(time.Second)
return errConnectionBlocked outboundRay.OutboundOutput().Close()
outboundRay.OutboundInput().CloseError()
return nil
} }
func init() { func init() {

View File

@ -102,8 +102,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
return nil return nil
}) })
if err != nil { if err != nil {
log.Warning("Freedom: Failed to open connection to ", destination, ": ", err) return errors.Base(err).Message("Freedom: Failed to open connection to ", destination)
return err
} }
defer conn.Close() defer conn.Close()

View File

@ -2,7 +2,6 @@ package shadowsocks
import ( import (
"context" "context"
"errors"
"runtime" "runtime"
"time" "time"
@ -10,6 +9,7 @@ import (
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/bufio" "v2ray.com/core/common/bufio"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/retry" "v2ray.com/core/common/retry"
@ -61,8 +61,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
return nil return nil
}) })
if err != nil { if err != nil {
log.Warning("Shadowsocks|Client: Failed to find an available destination:", err) return errors.Base(err).Message("Shadowsocks|Client: Failed to find an available destination.")
return err
} }
log.Info("Shadowsocks|Client: Tunneling request to ", destination, " via ", server.Destination()) log.Info("Shadowsocks|Client: Tunneling request to ", destination, " via ", server.Destination())

View File

@ -2,13 +2,13 @@ package socks
import ( import (
"context" "context"
"errors"
"runtime" "runtime"
"time" "time"
"v2ray.com/core/app/log" "v2ray.com/core/app/log"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/buf" "v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/net" "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol" "v2ray.com/core/common/protocol"
"v2ray.com/core/common/retry" "v2ray.com/core/common/retry"
@ -56,8 +56,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
}) })
if err != nil { if err != nil {
log.Warning("Socks|Client: Failed to find an available destination.") return errors.Base(err).Message("Socks|Client: Failed to find an available destination.")
return err
} }
defer conn.Close() defer conn.Close()

View File

@ -51,7 +51,7 @@ func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.Outb
var rec *protocol.ServerSpec var rec *protocol.ServerSpec
var conn internet.Connection var conn internet.Connection
err := retry.ExponentialBackoff(5, 100).On(func() error { err := retry.ExponentialBackoff(5, 200).On(func() error {
rec = v.serverPicker.PickServer() rec = v.serverPicker.PickServer()
rawConn, err := dialer.Dial(ctx, rec.Destination()) rawConn, err := dialer.Dial(ctx, rec.Destination())
if err != nil { if err != nil {
@ -62,8 +62,7 @@ func (v *VMessOutboundHandler) Process(ctx context.Context, outboundRay ray.Outb
return nil return nil
}) })
if err != nil { if err != nil {
log.Warning("VMess|Outbound: Failed to find an available destination:", err) return errors.Base(err).Message("VMess|Outbound: Failed to find an available destination.")
return err
} }
defer conn.Close() defer conn.Close()

View File

@ -5,7 +5,6 @@ import (
"sync" "sync"
"v2ray.com/core/common/errors" "v2ray.com/core/common/errors"
"v2ray.com/core/app/log"
v2net "v2ray.com/core/common/net" v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/retry" "v2ray.com/core/common/retry"
) )
@ -73,16 +72,13 @@ func (v *TCPHub) start() {
v.accepting = true v.accepting = true
for v.accepting { for v.accepting {
var newConn Connection var newConn Connection
err := retry.ExponentialBackoff(10, 200).On(func() error { err := retry.ExponentialBackoff(10, 500).On(func() error {
if !v.accepting { if !v.accepting {
return nil return nil
} }
conn, err := v.listener.Accept() conn, err := v.listener.Accept()
if err != nil { if err != nil {
if v.accepting { return errors.Base(err).Message("Internet|Listener: Failed to accept new TCP connection.")
log.Warning("Internet|Listener: Failed to accept new TCP connection: ", err)
}
return err
} }
newConn = conn newConn = conn
return nil return nil

View File

@ -2,12 +2,12 @@ package websocket
import ( import (
"context" "context"
"io/ioutil"
"net" "net"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"v2ray.com/core/app/log" "v2ray.com/core/app/log"
"v2ray.com/core/common" "v2ray.com/core/common"
"v2ray.com/core/common/errors"
v2net "v2ray.com/core/common/net" v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet" "v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/internal" "v2ray.com/core/transport/internet/internal"
@ -78,11 +78,11 @@ func wsDial(ctx context.Context, dest v2net.Destination) (net.Conn, error) {
conn, resp, err := dialer.Dial(uri, nil) conn, resp, err := dialer.Dial(uri, nil)
if err != nil { if err != nil {
var reason string
if resp != nil { if resp != nil {
reason, reasonerr := ioutil.ReadAll(resp.Body) reason = resp.Status
log.Info(string(reason), reasonerr)
} }
return nil, err return nil, errors.Base(err).Format("WebSocket|Dialer: Failed to dial to (", uri, "): ", reason)
} }
return &wsconn{ return &wsconn{