From 6a1b36bd46e818e2f595c8a087375a63e1299c73 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 19 Dec 2013 15:08:55 -0800 Subject: [PATCH] Testing Client RPC to server --- consul/client.go | 20 ++++++++++++++++++++ consul/client_test.go | 40 +++++++++++++++++++++++++++++++++++++++ consul/structs/structs.go | 5 +++-- 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/consul/client.go b/consul/client.go index c7b73948c5..e67fdb3557 100644 --- a/consul/client.go +++ b/consul/client.go @@ -2,8 +2,10 @@ package consul import ( "fmt" + "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/serf" "log" + "math/rand" "net" "os" "path/filepath" @@ -218,3 +220,21 @@ func (c *Client) nodeFail(me serf.MemberEvent) { c.consulLock.Unlock() } } + +// RPC is used to forward an RPC call to a consul server, or fail if no servers +func (c *Client) RPC(method string, args interface{}, reply interface{}) error { + // Bail if we can't find any servers + c.consulLock.RLock() + if len(c.consuls) == 0 { + c.consulLock.RUnlock() + return structs.ErrNoServers + } + + // Select a random addr + offset := rand.Int31() % int32(len(c.consuls)) + server := c.consuls[offset] + c.consulLock.RUnlock() + + // Forward to remote Consul + return c.connPool.RPC(server, method, args, reply) +} diff --git a/consul/client_test.go b/consul/client_test.go index 58f7d2551a..0a535ed124 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -2,6 +2,7 @@ package consul import ( "fmt" + "github.com/hashicorp/consul/consul/structs" "os" "testing" "time" @@ -75,3 +76,42 @@ func TestClient_JoinLAN(t *testing.T) { t.Fatalf("expected consul server") } } + +func TestClient_RPC(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Try an RPC + var out struct{} + if err := c1.RPC("Status.Ping", struct{}{}, &out); err != structs.ErrNoServers { + t.Fatalf("err: %v", err) + } + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.Port) + if err := c1.JoinLAN(addr); err != nil { + t.Fatalf("err: %v", err) + } + + // Check the members + if len(s1.LANMembers()) != 2 { + t.Fatalf("bad len") + } + + if len(c1.LANMembers()) != 2 { + t.Fatalf("bad len") + } + + time.Sleep(10 * time.Millisecond) + + // RPC shoudl succeed + if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil { + t.Fatalf("err: %v", err) + } +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 7839be2310..98bed54dca 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -7,8 +7,9 @@ import ( ) var ( - ErrNoLeader = fmt.Errorf("No cluster leader") - ErrNoDCPath = fmt.Errorf("No path to datacenter") + ErrNoLeader = fmt.Errorf("No cluster leader") + ErrNoDCPath = fmt.Errorf("No path to datacenter") + ErrNoServers = fmt.Errorf("No known Consul servers") ) type MessageType uint8