mirror of https://github.com/hashicorp/consul
Adding support for cross-dc forwarding
parent
d4476e3df6
commit
15f045596b
|
@ -1,6 +1,7 @@
|
||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/hashicorp/consul/rpc"
|
"github.com/hashicorp/consul/rpc"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -37,6 +38,41 @@ func TestCatalogRegister(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCatalogRegister_ForwardDC(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client := rpcClient(t, s1)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
dir2, s2 := testServerDC(t, "dc2")
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
|
||||||
|
// Try to join
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s1.config.SerfWANConfig.MemberlistConfig.Port)
|
||||||
|
if err := s2.JoinWAN(addr); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the leaders
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
arg := rpc.RegisterRequest{
|
||||||
|
Datacenter: "dc2", // SHould forward through s1
|
||||||
|
Node: "foo",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
ServiceName: "db",
|
||||||
|
ServiceTag: "master",
|
||||||
|
ServicePort: 8000,
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
if err := client.Call("Catalog.Register", &arg, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCatalogDeregister(t *testing.T) {
|
func TestCatalogDeregister(t *testing.T) {
|
||||||
dir1, s1 := testServer(t)
|
dir1, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"github.com/hashicorp/consul/rpc"
|
"github.com/hashicorp/consul/rpc"
|
||||||
"github.com/ugorji/go/codec"
|
"github.com/ugorji/go/codec"
|
||||||
"io"
|
"io"
|
||||||
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -111,8 +112,21 @@ func (s *Server) forwardLeader(method string, args interface{}, reply interface{
|
||||||
|
|
||||||
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
|
// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
|
||||||
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
|
func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error {
|
||||||
// TODO: Fix
|
// Bail if we can't find any servers
|
||||||
return fmt.Errorf("DC forwarding not supported")
|
s.remoteLock.RLock()
|
||||||
|
servers := s.remoteConsuls[dc]
|
||||||
|
if len(servers) == 0 {
|
||||||
|
s.remoteLock.RUnlock()
|
||||||
|
return rpc.ErrNoDCPath
|
||||||
|
}
|
||||||
|
|
||||||
|
// Select a random addr
|
||||||
|
offset := rand.Int31() % int32(len(servers))
|
||||||
|
server := servers[offset]
|
||||||
|
s.remoteLock.RUnlock()
|
||||||
|
|
||||||
|
// Forward to remote Consul
|
||||||
|
return s.connPool.RPC(server, method, args, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
// raftApply is used to encode a message, run it through raft, and return
|
// raftApply is used to encode a message, run it through raft, and return
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrNoLeader = fmt.Errorf("No cluster leader")
|
ErrNoLeader = fmt.Errorf("No cluster leader")
|
||||||
|
ErrNoDCPath = fmt.Errorf("No path to datacenter")
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageType uint8
|
type MessageType uint8
|
||||||
|
|
Loading…
Reference in New Issue