|
|
|
@ -8,10 +8,11 @@ import (
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/memberlist"
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/agent/pool"
|
|
|
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
|
|
|
"github.com/hashicorp/consul/tlsutil"
|
|
|
|
|
"github.com/hashicorp/memberlist"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
@ -29,9 +30,15 @@ const (
|
|
|
|
|
|
|
|
|
|
type MeshGatewayResolver func(datacenter string) string
|
|
|
|
|
|
|
|
|
|
type IngestionAwareTransport interface {
|
|
|
|
|
memberlist.NodeAwareTransport
|
|
|
|
|
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
|
|
|
|
|
IngestStream(conn net.Conn) error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewTransport(
|
|
|
|
|
tlsConfigurator *tlsutil.Configurator,
|
|
|
|
|
transport memberlist.NodeAwareTransport,
|
|
|
|
|
transport IngestionAwareTransport,
|
|
|
|
|
datacenter string,
|
|
|
|
|
gwResolver MeshGatewayResolver,
|
|
|
|
|
) (*Transport, error) {
|
|
|
|
@ -48,17 +55,17 @@ func NewTransport(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t := &Transport{
|
|
|
|
|
NodeAwareTransport: transport,
|
|
|
|
|
tlsConfigurator: tlsConfigurator,
|
|
|
|
|
datacenter: datacenter,
|
|
|
|
|
gwResolver: gwResolver,
|
|
|
|
|
pool: cp,
|
|
|
|
|
IngestionAwareTransport: transport,
|
|
|
|
|
tlsConfigurator: tlsConfigurator,
|
|
|
|
|
datacenter: datacenter,
|
|
|
|
|
gwResolver: gwResolver,
|
|
|
|
|
pool: cp,
|
|
|
|
|
}
|
|
|
|
|
return t, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Transport struct {
|
|
|
|
|
memberlist.NodeAwareTransport
|
|
|
|
|
IngestionAwareTransport
|
|
|
|
|
|
|
|
|
|
tlsConfigurator *tlsutil.Configurator
|
|
|
|
|
datacenter string
|
|
|
|
@ -71,7 +78,7 @@ var _ memberlist.NodeAwareTransport = (*Transport)(nil)
|
|
|
|
|
// Shutdown implements memberlist.Transport.
|
|
|
|
|
func (t *Transport) Shutdown() error {
|
|
|
|
|
err1 := t.pool.Close()
|
|
|
|
|
err2 := t.NodeAwareTransport.Shutdown()
|
|
|
|
|
err2 := t.IngestionAwareTransport.Shutdown()
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
// the more important error is err2
|
|
|
|
|
return err2
|
|
|
|
@ -118,7 +125,7 @@ func (t *Transport) WriteToAddress(b []byte, addr memberlist.Address) (time.Time
|
|
|
|
|
return time.Now(), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return t.NodeAwareTransport.WriteToAddress(b, addr)
|
|
|
|
|
return t.IngestionAwareTransport.WriteToAddress(b, addr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DialAddressTimeout implements memberlist.NodeAwareTransport.
|
|
|
|
@ -137,7 +144,7 @@ func (t *Transport) DialAddressTimeout(addr memberlist.Address, timeout time.Dur
|
|
|
|
|
return t.dial(dc, node, pool.ALPN_WANGossipStream, gwAddr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return t.NodeAwareTransport.DialAddressTimeout(addr, timeout)
|
|
|
|
|
return t.IngestionAwareTransport.DialAddressTimeout(addr, timeout)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NOTE: There is a close mirror of this method in agent/pool/pool.go:DialTimeoutWithRPCType
|
|
|
|
|