Task -> Pod part #3

pull/6/head
Brendan Burns 2014-06-08 22:38:45 -07:00
parent d05a3f1f8d
commit 6018497174
27 changed files with 475 additions and 475 deletions

View File

@ -2,7 +2,7 @@
"$schema": "http://json-schema.org/draft-03/schema",
"type": "object",
"required": false,
"description": "Task resource. A task corresponds to a colocated group of [Docker containers](http://docker.io).",
"description": "Pod resource. A pod corresponds to a co-located group of [Docker containers](http://docker.io).",
"properties": {
"kind": {
"type": "string",
@ -23,7 +23,7 @@
"desiredState": {
"type": "object",
"required": false,
"description": "The desired configuration of the task",
"description": "The desired configuration of the pod",
"properties": {
"manifest": {
"type": "object",
@ -55,7 +55,7 @@
"currentState": {
"type": "object",
"required": false,
"description": "The current configuration and status of the task. Fields in common with desiredState have the same meaning.",
"description": "The current configuration and status of the pod. Fields in common with desiredState have the same meaning.",
"properties": {
"manifest": {
"type": "object",

View File

@ -1,7 +1,7 @@
{
"items": [
{
"id": "my-task-1",
"id": "my-pod-1",
"labels": {
"name": "testRun",
"replicationController": "testRun"
@ -22,7 +22,7 @@
}
},
{
"id": "my-task-2",
"id": "my-pod-2",
"labels": {
"name": "testRun",
"replicationController": "testRun"

View File

@ -75,7 +75,7 @@ func main() {
}
storage := map[string]apiserver.RESTStorage{
"tasks": registry.MakeTaskRegistryStorage(podRegistry, containerInfo, registry.MakeFirstFitScheduler(machineList, podRegistry)),
"tasks": registry.MakePodRegistryStorage(podRegistry, containerInfo, registry.MakeFirstFitScheduler(machineList, podRegistry)),
"replicationControllers": registry.MakeControllerRegistryStorage(controllerRegistry),
"services": registry.MakeServiceRegistryStorage(serviceRegistry),
}

View File

@ -14,8 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// The controller manager is responsible for monitoring replication controllers, and creating corresponding
// tasks to achieve the desired state. It listens for new controllers in etcd, and it sends requests to the
// master to create/delete tasks.
// pods to achieve the desired state. It listens for new controllers in etcd, and it sends requests to the
// master to create/delete pods.
//
// TODO: Refactor the etcd watch code so that it is a pluggable interface.
package main

View File

@ -42,7 +42,7 @@ func main() {
reg := registry.MakeEtcdRegistry(etcdClient, machineList)
apiserver := apiserver.New(map[string]apiserver.RESTStorage{
"tasks": registry.MakeTaskRegistryStorage(reg, &kube_client.FakeContainerInfo{}, registry.MakeRoundRobinScheduler(machineList)),
"tasks": registry.MakePodRegistryStorage(reg, &kube_client.FakeContainerInfo{}, registry.MakeRoundRobinScheduler(machineList)),
"replicationControllers": registry.MakeControllerRegistryStorage(reg),
}, "/api/v1beta1")
server := httptest.NewServer(apiserver)
@ -75,12 +75,12 @@ func main() {
if _, err = kubeClient.CreateReplicationController(controllerRequest); err != nil {
log.Fatalf("Unexpected error: %#v", err)
}
// Give the controllers some time to actually create the tasks
// Give the controllers some time to actually create the pods
time.Sleep(time.Second * 10)
// Validate that they're truly up.
tasks, err := kubeClient.ListTasks(nil)
if err != nil || len(tasks.Items) != 2 {
pods, err := kubeClient.ListPods(nil)
if err != nil || len(pods.Items) != 2 {
log.Fatal("FAILED")
}
log.Printf("OK")

View File

@ -10,7 +10,7 @@ This example assumes that you have forked the repository and [turned up a Kubern
### Step One: Turn up the redis master.
Create a file named `redis-master.json`, this file is describes a single task, which runs a redis key-value server in a container.
Create a file named `redis-master.json`, this file is describes a single pod, which runs a redis key-value server in a container.
```javascript
{
@ -33,27 +33,27 @@ Create a file named `redis-master.json`, this file is describes a single task, w
}
```
Once you have that task file, you can create the redis task in your Kubernetes cluster using the `cloudcfg` cli:
Once you have that pod file, you can create the redis pod in your Kubernetes cluster using the `cloudcfg` cli:
```shell
cluster/cloudcfg.sh -c redis-master.json create /tasks
cluster/cloudcfg.sh -c redis-master.json create /pods
```
Once that's up you can list the tasks in the cluster, to verify that the master is running:
Once that's up you can list the pods in the cluster, to verify that the master is running:
```shell
cluster/cloudcfg.sh list /tasks
cluster/cloudcfg.sh list /pods
```
You should see a single redis master task. It will also display the machine that the task is running on. If you ssh to that machine, you can run
You should see a single redis master pod. It will also display the machine that the pod is running on. If you ssh to that machine, you can run
```shell
sudo docker ps
```
And see the actual task. (Note that initial `docker pull` may take a few minutes, depending on network conditions.)
And see the actual pod. (Note that initial `docker pull` may take a few minutes, depending on network conditions.)
### Step Two: Turn up the master service.
A Kubernetes 'service' is a named load balancer that proxies traffic to one or more containers. The services in a Kubernetes cluster are discoverable inside other containers via environment variables. Services find the containers to load balance based on task labels. The task that you created in Step One has the label `name=redis-master`, so the corresponding service is defined by that label. Create a file named `redis-master-service.json` that contains:
A Kubernetes 'service' is a named load balancer that proxies traffic to one or more containers. The services in a Kubernetes cluster are discoverable inside other containers via environment variables. Services find the containers to load balance based on pod labels. The pod that you created in Step One has the label `name=redis-master`, so the corresponding service is defined by that label. Create a file named `redis-master-service.json` that contains:
```javascript
{
@ -73,8 +73,8 @@ cluster/cloudcfg.sh -c redis-master-service.json create /services
Once created, the service proxy on each minion is configured to set up a proxy on the specified port (in this case port 10000).
### Step Three: Turn up the replicated slave tasks.
Although the redis master is a single task, the redis read slaves are a 'replicated' task, in Kubernetes, a replication controller is responsible for managing multiple instances of a replicated task. Create a file named `redis-slave-controller.json` that contains:
### Step Three: Turn up the replicated slave pods.
Although the redis master is a single pod, the redis read slaves are a 'replicated' pod, in Kubernetes, a replication controller is responsible for managing multiple instances of a replicated pod. Create a file named `redis-slave-controller.json` that contains:
```javascript
{
@ -109,13 +109,13 @@ The redis slave configures itself by looking for the Kubernetes service environm
redis-server --slaveof $SERVICE_HOST $REDISMASTER_SERVICE_PORT
```
Once that's up you can list the tasks in the cluster, to verify that the master and slaves are running:
Once that's up you can list the pods in the cluster, to verify that the master and slaves are running:
```shell
cluster/cloudcfg.sh list /tasks
cluster/cloudcfg.sh list /pods
```
You should see a single redis master task, and two redis slave tasks.
You should see a single redis master pod, and two redis slave pods.
### Step Four: Create the redis slave service.
@ -139,7 +139,7 @@ Now that you have created the service specification, create it in your cluster w
cluster/cloudcfg.sh -c redis-slave-service.json create /services
```
### Step Five: Create the frontend task.
### Step Five: Create the frontend pod.
This is a simple PHP server that is configured to talk to both the slave and master services depdending on if the request is a read or a write. It exposes a simple AJAX interface, and serves an angular based U/X. Like the redis read slaves it is a replicated service instantiated by a replication controller. Create a file named `frontend-controller.json`:
@ -170,13 +170,13 @@ With this file, you can turn up your frontend with:
cluster/cloudcfg.sh -c frontend-controller.json create /replicationControllers
```
Once that's up you can list the tasks in the cluster, to verify that the master, slaves and frontends are running:
Once that's up you can list the pods in the cluster, to verify that the master, slaves and frontends are running:
```shell
cluster/cloudcfg.sh list /tasks
cluster/cloudcfg.sh list /pods
```
You should see a single redis master task, two redis slave and three frontend tasks.
You should see a single redis master pod, two redis slave and three frontend pods.
The code for the PHP service looks like this:
```php

View File

@ -138,16 +138,16 @@ func (server *ApiServer) handleREST(parts []string, url *url.URL, req *http.Requ
}
server.write(200, controllers, w)
case 2:
task, err := storage.Get(parts[1])
pod, err := storage.Get(parts[1])
if err != nil {
server.error(err, w)
return
}
if task == nil {
if pod == nil {
server.notFound(req, w)
return
}
server.write(200, task, w)
server.write(200, pod, w)
default:
server.notFound(req, w)
}

View File

@ -16,9 +16,9 @@ limitations under the License.
// A client for the Kubernetes cluster management API
// There are three fundamental objects
// Task - A single running container
// TaskForce - A set of co-scheduled Task(s)
// ReplicationController - A manager for replicating TaskForces
// Pod - A co-scheduled set of running containers
// ReplicationController - A manager for replicating Pods
// Service - A discoverable load balancer
package client
import (
@ -38,11 +38,11 @@ import (
// ClientInterface holds the methods for clients of Kubenetes, an interface to allow mock testing
type ClientInterface interface {
ListTasks(labelQuery map[string]string) (api.PodList, error)
GetTask(name string) (api.Pod, error)
DeleteTask(name string) error
CreateTask(api.Pod) (api.Pod, error)
UpdateTask(api.Pod) (api.Pod, error)
ListPods(labelQuery map[string]string) (api.PodList, error)
GetPod(name string) (api.Pod, error)
DeletePod(name string) error
CreatePod(api.Pod) (api.Pod, error)
UpdatePod(api.Pod) (api.Pod, error)
GetReplicationController(name string) (api.ReplicationController, error)
CreateReplicationController(api.ReplicationController) (api.ReplicationController, error)
@ -142,8 +142,8 @@ func DecodeLabelQuery(labelQuery string) map[string]string {
return result
}
// ListTasks takes a label query, and returns the list of tasks that match that query
func (client Client) ListTasks(labelQuery map[string]string) (api.PodList, error) {
// ListPods takes a label query, and returns the list of pods that match that query
func (client Client) ListPods(labelQuery map[string]string) (api.PodList, error) {
path := "tasks"
if labelQuery != nil && len(labelQuery) > 0 {
path += "?labels=" + EncodeLabelQuery(labelQuery)
@ -153,35 +153,35 @@ func (client Client) ListTasks(labelQuery map[string]string) (api.PodList, error
return result, err
}
// GetTask takes the name of the task, and returns the corresponding Task object, and an error if it occurs
func (client Client) GetTask(name string) (api.Pod, error) {
// GetPod takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
func (client Client) GetPod(name string) (api.Pod, error) {
var result api.Pod
_, err := client.rawRequest("GET", "tasks/"+name, nil, &result)
return result, err
}
// DeleteTask takes the name of the task, and returns an error if one occurs
func (client Client) DeleteTask(name string) error {
// DeletePod takes the name of the pod, and returns an error if one occurs
func (client Client) DeletePod(name string) error {
_, err := client.rawRequest("DELETE", "tasks/"+name, nil, nil)
return err
}
// CreateTask takes the representation of a task. Returns the server's representation of the task, and an error, if it occurs
func (client Client) CreateTask(task api.Pod) (api.Pod, error) {
// CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs
func (client Client) CreatePod(pod api.Pod) (api.Pod, error) {
var result api.Pod
body, err := json.Marshal(task)
body, err := json.Marshal(pod)
if err == nil {
_, err = client.rawRequest("POST", "tasks", bytes.NewBuffer(body), &result)
}
return result, err
}
// UpdateTask takes the representation of a task to update. Returns the server's representation of the task, and an error, if it occurs
func (client Client) UpdateTask(task api.Pod) (api.Pod, error) {
// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs
func (client Client) UpdatePod(pod api.Pod) (api.Pod, error) {
var result api.Pod
body, err := json.Marshal(task)
body, err := json.Marshal(pod)
if err == nil {
_, err = client.rawRequest("PUT", "tasks/"+task.ID, bytes.NewBuffer(body), &result)
_, err = client.rawRequest("PUT", "tasks/"+pod.ID, bytes.NewBuffer(body), &result)
}
return result, err
}

View File

@ -40,7 +40,7 @@ func makeUrl(suffix string) string {
return apiPath + suffix
}
func TestListEmptyTasks(t *testing.T) {
func TestListEmptyPods(t *testing.T) {
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: `{ "items": []}`,
@ -49,19 +49,19 @@ func TestListEmptyTasks(t *testing.T) {
client := Client{
Host: testServer.URL,
}
taskList, err := client.ListTasks(nil)
podList, err := client.ListPods(nil)
fakeHandler.ValidateRequest(t, makeUrl("/tasks"), "GET", nil)
if err != nil {
t.Errorf("Unexpected error in listing tasks: %#v", err)
t.Errorf("Unexpected error in listing pods: %#v", err)
}
if len(taskList.Items) != 0 {
t.Errorf("Unexpected items in task list: %#v", taskList)
if len(podList.Items) != 0 {
t.Errorf("Unexpected items in pod list: %#v", podList)
}
testServer.Close()
}
func TestListTasks(t *testing.T) {
expectedTaskList := api.PodList{
func TestListPods(t *testing.T) {
expectedPodList := api.PodList{
Items: []api.Pod{
api.Pod{
CurrentState: api.PodState{
@ -74,7 +74,7 @@ func TestListTasks(t *testing.T) {
},
},
}
body, _ := json.Marshal(expectedTaskList)
body, _ := json.Marshal(expectedPodList)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -83,19 +83,19 @@ func TestListTasks(t *testing.T) {
client := Client{
Host: testServer.URL,
}
receivedTaskList, err := client.ListTasks(nil)
receivedPodList, err := client.ListPods(nil)
fakeHandler.ValidateRequest(t, makeUrl("/tasks"), "GET", nil)
if err != nil {
t.Errorf("Unexpected error in listing tasks: %#v", err)
t.Errorf("Unexpected error in listing pods: %#v", err)
}
if !reflect.DeepEqual(expectedTaskList, receivedTaskList) {
t.Errorf("Unexpected task list: %#v\nvs.\n%#v", receivedTaskList, expectedTaskList)
if !reflect.DeepEqual(expectedPodList, receivedPodList) {
t.Errorf("Unexpected pod list: %#v\nvs.\n%#v", receivedPodList, expectedPodList)
}
testServer.Close()
}
func TestListTasksLabels(t *testing.T) {
expectedTaskList := api.PodList{
func TestListPodsLabels(t *testing.T) {
expectedPodList := api.PodList{
Items: []api.Pod{
api.Pod{
CurrentState: api.PodState{
@ -108,7 +108,7 @@ func TestListTasksLabels(t *testing.T) {
},
},
}
body, _ := json.Marshal(expectedTaskList)
body, _ := json.Marshal(expectedPodList)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -118,7 +118,7 @@ func TestListTasksLabels(t *testing.T) {
Host: testServer.URL,
}
query := map[string]string{"foo": "bar", "name": "baz"}
receivedTaskList, err := client.ListTasks(query)
receivedPodList, err := client.ListPods(query)
fakeHandler.ValidateRequest(t, makeUrl("/tasks"), "GET", nil)
queryString := fakeHandler.RequestReceived.URL.Query().Get("labels")
queryString, _ = url.QueryUnescape(queryString)
@ -128,16 +128,16 @@ func TestListTasksLabels(t *testing.T) {
t.Errorf("Unexpected label query: %s", queryString)
}
if err != nil {
t.Errorf("Unexpected error in listing tasks: %#v", err)
t.Errorf("Unexpected error in listing pods: %#v", err)
}
if !reflect.DeepEqual(expectedTaskList, receivedTaskList) {
t.Errorf("Unexpected task list: %#v\nvs.\n%#v", receivedTaskList, expectedTaskList)
if !reflect.DeepEqual(expectedPodList, receivedPodList) {
t.Errorf("Unexpected pod list: %#v\nvs.\n%#v", receivedPodList, expectedPodList)
}
testServer.Close()
}
func TestGetTask(t *testing.T) {
expectedTask := api.Pod{
func TestGetPod(t *testing.T) {
expectedPod := api.Pod{
CurrentState: api.PodState{
Status: "Foobar",
},
@ -146,7 +146,7 @@ func TestGetTask(t *testing.T) {
"name": "baz",
},
}
body, _ := json.Marshal(expectedTask)
body, _ := json.Marshal(expectedPod)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -155,18 +155,18 @@ func TestGetTask(t *testing.T) {
client := Client{
Host: testServer.URL,
}
receivedTask, err := client.GetTask("foo")
receivedPod, err := client.GetPod("foo")
fakeHandler.ValidateRequest(t, makeUrl("/tasks/foo"), "GET", nil)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(expectedTask, receivedTask) {
t.Errorf("Received task: %#v\n doesn't match expected task: %#v", receivedTask, expectedTask)
if !reflect.DeepEqual(expectedPod, receivedPod) {
t.Errorf("Received pod: %#v\n doesn't match expected pod: %#v", receivedPod, expectedPod)
}
testServer.Close()
}
func TestDeleteTask(t *testing.T) {
func TestDeletePod(t *testing.T) {
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: `{"success": true}`,
@ -175,7 +175,7 @@ func TestDeleteTask(t *testing.T) {
client := Client{
Host: testServer.URL,
}
err := client.DeleteTask("foo")
err := client.DeletePod("foo")
fakeHandler.ValidateRequest(t, makeUrl("/tasks/foo"), "DELETE", nil)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
@ -183,8 +183,8 @@ func TestDeleteTask(t *testing.T) {
testServer.Close()
}
func TestCreateTask(t *testing.T) {
requestTask := api.Pod{
func TestCreatePod(t *testing.T) {
requestPod := api.Pod{
CurrentState: api.PodState{
Status: "Foobar",
},
@ -193,7 +193,7 @@ func TestCreateTask(t *testing.T) {
"name": "baz",
},
}
body, _ := json.Marshal(requestTask)
body, _ := json.Marshal(requestPod)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -202,19 +202,19 @@ func TestCreateTask(t *testing.T) {
client := Client{
Host: testServer.URL,
}
receivedTask, err := client.CreateTask(requestTask)
receivedPod, err := client.CreatePod(requestPod)
fakeHandler.ValidateRequest(t, makeUrl("/tasks"), "POST", nil)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !reflect.DeepEqual(requestTask, receivedTask) {
t.Errorf("Received task: %#v\n doesn't match expected task: %#v", receivedTask, requestTask)
if !reflect.DeepEqual(requestPod, receivedPod) {
t.Errorf("Received pod: %#v\n doesn't match expected pod: %#v", receivedPod, requestPod)
}
testServer.Close()
}
func TestUpdateTask(t *testing.T) {
requestTask := api.Pod{
func TestUpdatePod(t *testing.T) {
requestPod := api.Pod{
JSONBase: api.JSONBase{ID: "foo"},
CurrentState: api.PodState{
Status: "Foobar",
@ -224,7 +224,7 @@ func TestUpdateTask(t *testing.T) {
"name": "baz",
},
}
body, _ := json.Marshal(requestTask)
body, _ := json.Marshal(requestPod)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -233,12 +233,12 @@ func TestUpdateTask(t *testing.T) {
client := Client{
Host: testServer.URL,
}
receivedTask, err := client.UpdateTask(requestTask)
receivedPod, err := client.UpdatePod(requestPod)
fakeHandler.ValidateRequest(t, makeUrl("/tasks/foo"), "PUT", nil)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
expectEqual(t, requestTask, receivedTask)
expectEqual(t, requestPod, receivedPod)
testServer.Close()
}

View File

@ -63,10 +63,10 @@ func LoadAuthInfo(path string) (client.AuthInfo, error) {
return auth, err
}
// Perform a rolling update of a collection of tasks.
// Perform a rolling update of a collection of pods.
// 'name' points to a replication controller.
// 'client' is used for updating tasks.
// 'updatePeriod' is the time between task updates.
// 'client' is used for updating pods.
// 'updatePeriod' is the time between pod updates.
func Update(name string, client client.ClientInterface, updatePeriod time.Duration) error {
controller, err := client.GetReplicationController(name)
if err != nil {
@ -74,12 +74,12 @@ func Update(name string, client client.ClientInterface, updatePeriod time.Durati
}
labels := controller.DesiredState.ReplicasInSet
taskList, err := client.ListTasks(labels)
podList, err := client.ListPods(labels)
if err != nil {
return err
}
for _, task := range taskList.Items {
_, err = client.UpdateTask(task)
for _, pod := range podList.Items {
_, err = client.UpdatePod(pod)
if err != nil {
return err
}
@ -172,7 +172,7 @@ func makePorts(spec string) []api.Port {
return result
}
// RunController creates a new replication controller named 'name' which creates 'replicas' tasks running 'image'
// RunController creates a new replication controller named 'name' which creates 'replicas' pods running 'image'
func RunController(image, name string, replicas int, client client.ClientInterface, portSpec string, servicePort int) error {
controller := api.ReplicationController{
JSONBase: api.JSONBase{

View File

@ -40,32 +40,32 @@ type Action struct {
type FakeKubeClient struct {
actions []Action
tasks PodList
pods PodList
ctrl ReplicationController
}
func (client *FakeKubeClient) ListTasks(labelQuery map[string]string) (PodList, error) {
client.actions = append(client.actions, Action{action: "list-tasks"})
return client.tasks, nil
func (client *FakeKubeClient) ListPods(labelQuery map[string]string) (PodList, error) {
client.actions = append(client.actions, Action{action: "list-pods"})
return client.pods, nil
}
func (client *FakeKubeClient) GetTask(name string) (Pod, error) {
client.actions = append(client.actions, Action{action: "get-task", value: name})
func (client *FakeKubeClient) GetPod(name string) (Pod, error) {
client.actions = append(client.actions, Action{action: "get-pod", value: name})
return Pod{}, nil
}
func (client *FakeKubeClient) DeleteTask(name string) error {
client.actions = append(client.actions, Action{action: "delete-task", value: name})
func (client *FakeKubeClient) DeletePod(name string) error {
client.actions = append(client.actions, Action{action: "delete-pod", value: name})
return nil
}
func (client *FakeKubeClient) CreateTask(task Pod) (Pod, error) {
client.actions = append(client.actions, Action{action: "create-task"})
func (client *FakeKubeClient) CreatePod(pod Pod) (Pod, error) {
client.actions = append(client.actions, Action{action: "create-pod"})
return Pod{}, nil
}
func (client *FakeKubeClient) UpdateTask(task Pod) (Pod, error) {
client.actions = append(client.actions, Action{action: "update-task", value: task.ID})
func (client *FakeKubeClient) UpdatePod(pod Pod) (Pod, error) {
client.actions = append(client.actions, Action{action: "update-pod", value: pod.ID})
return Pod{}, nil
}
@ -115,12 +115,12 @@ func validateAction(expectedAction, actualAction Action, t *testing.T) {
}
}
func TestUpdateWithTasks(t *testing.T) {
func TestUpdateWithPods(t *testing.T) {
client := FakeKubeClient{
tasks: PodList{
pods: PodList{
Items: []Pod{
Pod{JSONBase: JSONBase{ID: "task-1"}},
Pod{JSONBase: JSONBase{ID: "task-2"}},
Pod{JSONBase: JSONBase{ID: "pod-1"}},
Pod{JSONBase: JSONBase{ID: "pod-2"}},
},
},
}
@ -129,19 +129,19 @@ func TestUpdateWithTasks(t *testing.T) {
t.Errorf("Unexpected action list %#v", client.actions)
}
validateAction(Action{action: "get-controller", value: "foo"}, client.actions[0], t)
validateAction(Action{action: "list-tasks"}, client.actions[1], t)
validateAction(Action{action: "update-task", value: "task-1"}, client.actions[2], t)
validateAction(Action{action: "update-task", value: "task-2"}, client.actions[3], t)
validateAction(Action{action: "list-pods"}, client.actions[1], t)
validateAction(Action{action: "update-pod", value: "pod-1"}, client.actions[2], t)
validateAction(Action{action: "update-pod", value: "pod-2"}, client.actions[3], t)
}
func TestUpdateNoTasks(t *testing.T) {
func TestUpdateNoPods(t *testing.T) {
client := FakeKubeClient{}
Update("foo", &client, 0)
if len(client.actions) != 2 {
t.Errorf("Unexpected action list %#v", client.actions)
}
validateAction(Action{action: "get-controller", value: "foo"}, client.actions[0], t)
validateAction(Action{action: "list-tasks"}, client.actions[1], t)
validateAction(Action{action: "list-pods"}, client.actions[1], t)
}
func TestDoRequest(t *testing.T) {

View File

@ -319,7 +319,7 @@ func (kl *Kubelet) KillContainer(name string) error {
return err
}
// Watch a file for changes to the set of tasks that should run on this Kubelet
// Watch a file for changes to the set of pods that should run on this Kubelet
// This function loops forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) {
var lastData []byte
@ -344,7 +344,7 @@ func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerMani
}
}
// Watch an HTTP endpoint for changes to the set of tasks that should run on this Kubelet
// Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet
// This function runs forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) {
var lastData []byte

View File

@ -62,7 +62,7 @@ func TestListControllersError(t *testing.T) {
t.Errorf("Expected %#v, Got %#v", mockRegistry.err, err)
}
if len(controllers.Items) != 0 {
t.Errorf("Unexpected non-zero task list: %#v", controllers)
t.Errorf("Unexpected non-zero ctrl list: %#v", controllers)
}
}
@ -74,7 +74,7 @@ func TestListEmptyControllerList(t *testing.T) {
controllers, err := storage.List(nil)
expectNoError(t, err)
if len(controllers.(ReplicationControllerList).Items) != 0 {
t.Errorf("Unexpected non-zero task list: %#v", controllers)
t.Errorf("Unexpected non-zero ctrl list: %#v", controllers)
}
}

View File

@ -22,16 +22,16 @@ import (
. "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func MakeEndpointController(serviceRegistry ServiceRegistry, taskRegistry PodRegistry) *EndpointController {
func MakeEndpointController(serviceRegistry ServiceRegistry, podRegistry PodRegistry) *EndpointController {
return &EndpointController{
serviceRegistry: serviceRegistry,
taskRegistry: taskRegistry,
podRegistry: podRegistry,
}
}
type EndpointController struct {
serviceRegistry ServiceRegistry
taskRegistry PodRegistry
podRegistry PodRegistry
}
func (e *EndpointController) SyncServiceEndpoints() error {
@ -41,16 +41,16 @@ func (e *EndpointController) SyncServiceEndpoints() error {
}
var resultErr error
for _, service := range services.Items {
tasks, err := e.taskRegistry.ListTasks(&service.Labels)
pods, err := e.podRegistry.ListPods(&service.Labels)
if err != nil {
log.Printf("Error syncing service: %#v, skipping.", service)
resultErr = err
continue
}
endpoints := make([]string, len(tasks))
for ix, task := range tasks {
endpoints := make([]string, len(pods))
for ix, pod := range pods {
// TODO: Use port names in the service object, don't just use port #0
endpoints[ix] = fmt.Sprintf("%s:%d", task.CurrentState.Host, task.DesiredState.Manifest.Containers[0].Ports[0].HostPort)
endpoints[ix] = fmt.Sprintf("%s:%d", pod.CurrentState.Host, pod.DesiredState.Manifest.Containers[0].Ports[0].HostPort)
}
err = e.serviceRegistry.UpdateEndpoints(Endpoints{
Name: service.ID,

View File

@ -24,9 +24,9 @@ import (
func TestSyncEndpointsEmpty(t *testing.T) {
serviceRegistry := MockServiceRegistry{}
taskRegistry := MockPodRegistry{}
podRegistry := MockPodRegistry{}
endpoints := MakeEndpointController(&serviceRegistry, &taskRegistry)
endpoints := MakeEndpointController(&serviceRegistry, &podRegistry)
err := endpoints.SyncServiceEndpoints()
expectNoError(t, err)
}
@ -35,9 +35,9 @@ func TestSyncEndpointsError(t *testing.T) {
serviceRegistry := MockServiceRegistry{
err: fmt.Errorf("Test Error"),
}
taskRegistry := MockPodRegistry{}
podRegistry := MockPodRegistry{}
endpoints := MakeEndpointController(&serviceRegistry, &taskRegistry)
endpoints := MakeEndpointController(&serviceRegistry, &podRegistry)
err := endpoints.SyncServiceEndpoints()
if err != serviceRegistry.err {
t.Errorf("Errors don't match: %#v %#v", err, serviceRegistry.err)
@ -56,7 +56,7 @@ func TestSyncEndpointsItems(t *testing.T) {
},
},
}
taskRegistry := MockPodRegistry{
podRegistry := MockPodRegistry{
pods: []Pod{
Pod{
DesiredState: PodState{
@ -76,7 +76,7 @@ func TestSyncEndpointsItems(t *testing.T) {
},
}
endpoints := MakeEndpointController(&serviceRegistry, &taskRegistry)
endpoints := MakeEndpointController(&serviceRegistry, &podRegistry)
err := endpoints.SyncServiceEndpoints()
expectNoError(t, err)
if len(serviceRegistry.endpoints.Endpoints) != 1 {
@ -84,7 +84,7 @@ func TestSyncEndpointsItems(t *testing.T) {
}
}
func TestSyncEndpointsTaskError(t *testing.T) {
func TestSyncEndpointsPodError(t *testing.T) {
serviceRegistry := MockServiceRegistry{
list: ServiceList{
Items: []Service{
@ -96,11 +96,11 @@ func TestSyncEndpointsTaskError(t *testing.T) {
},
},
}
taskRegistry := MockPodRegistry{
podRegistry := MockPodRegistry{
err: fmt.Errorf("test error."),
}
endpoints := MakeEndpointController(&serviceRegistry, &taskRegistry)
endpoints := MakeEndpointController(&serviceRegistry, &podRegistry)
err := endpoints.SyncServiceEndpoints()
if err == nil {
t.Error("Unexpected non-error")

View File

@ -25,7 +25,7 @@ import (
. "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// TODO: Need to add a reconciler loop that makes sure that things in tasks are reflected into
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
// kubelet (and vice versa)
// EtcdClient is an injectable interface for testing.
@ -40,7 +40,7 @@ type EtcdClient interface {
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
// EtcdRegistry is an implementation of both ControllerRegistry and TaskRegistry which is backed with etcd.
// EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd.
type EtcdRegistry struct {
etcdClient EtcdClient
machines []string
@ -62,24 +62,24 @@ func MakeEtcdRegistry(client EtcdClient, machines []string) *EtcdRegistry {
return registry
}
func makeTaskKey(machine, taskID string) string {
return "/registry/hosts/" + machine + "/tasks/" + taskID
func makePodKey(machine, podID string) string {
return "/registry/hosts/" + machine + "/pods/" + podID
}
func (registry *EtcdRegistry) ListTasks(query *map[string]string) ([]Pod, error) {
tasks := []Pod{}
func (registry *EtcdRegistry) ListPods(query *map[string]string) ([]Pod, error) {
pods := []Pod{}
for _, machine := range registry.machines {
machineTasks, err := registry.listTasksForMachine(machine)
machinePods, err := registry.listPodsForMachine(machine)
if err != nil {
return tasks, err
return pods, err
}
for _, task := range machineTasks {
if LabelsMatch(task, query) {
tasks = append(tasks, task)
for _, pod := range machinePods {
if LabelsMatch(pod, query) {
pods = append(pods, pod)
}
}
}
return tasks, nil
return pods, nil
}
func (registry *EtcdRegistry) listEtcdNode(key string) ([]*etcd.Node, error) {
@ -95,25 +95,25 @@ func (registry *EtcdRegistry) listEtcdNode(key string) ([]*etcd.Node, error) {
return result.Node.Nodes, nil
}
func (registry *EtcdRegistry) listTasksForMachine(machine string) ([]Pod, error) {
tasks := []Pod{}
key := "/registry/hosts/" + machine + "/tasks"
func (registry *EtcdRegistry) listPodsForMachine(machine string) ([]Pod, error) {
pods := []Pod{}
key := "/registry/hosts/" + machine + "/pods"
nodes, err := registry.listEtcdNode(key)
for _, node := range nodes {
task := Pod{}
err = json.Unmarshal([]byte(node.Value), &task)
pod := Pod{}
err = json.Unmarshal([]byte(node.Value), &pod)
if err != nil {
return tasks, err
return pods, err
}
task.CurrentState.Host = machine
tasks = append(tasks, task)
pod.CurrentState.Host = machine
pods = append(pods, pod)
}
return tasks, err
return pods, err
}
func (registry *EtcdRegistry) GetTask(taskID string) (*Pod, error) {
task, _, err := registry.findTask(taskID)
return &task, err
func (registry *EtcdRegistry) GetPod(podID string) (*Pod, error) {
pod, _, err := registry.findPod(podID)
return &pod, err
}
func makeContainerKey(machine string) string {
@ -144,28 +144,28 @@ func (registry *EtcdRegistry) updateManifests(machine string, manifests []Contai
return err
}
func (registry *EtcdRegistry) CreateTask(machineIn string, task Pod) error {
taskOut, machine, err := registry.findTask(task.ID)
func (registry *EtcdRegistry) CreatePod(machineIn string, pod Pod) error {
podOut, machine, err := registry.findPod(pod.ID)
if err == nil {
return fmt.Errorf("A task named %s already exists on %s (%#v)", task.ID, machine, taskOut)
return fmt.Errorf("A pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
}
return registry.runTask(task, machineIn)
return registry.runPod(pod, machineIn)
}
func (registry *EtcdRegistry) runTask(task Pod, machine string) error {
func (registry *EtcdRegistry) runPod(pod Pod, machine string) error {
manifests, err := registry.loadManifests(machine)
if err != nil {
return err
}
key := makeTaskKey(machine, task.ID)
data, err := json.Marshal(task)
key := makePodKey(machine, pod.ID)
data, err := json.Marshal(pod)
if err != nil {
return err
}
_, err = registry.etcdClient.Create(key, string(data), 0)
manifest, err := registry.manifestFactory.MakeManifest(machine, task)
manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
if err != nil {
return err
}
@ -173,19 +173,19 @@ func (registry *EtcdRegistry) runTask(task Pod, machine string) error {
return registry.updateManifests(machine, manifests)
}
func (registry *EtcdRegistry) UpdateTask(task Pod) error {
func (registry *EtcdRegistry) UpdatePod(pod Pod) error {
return fmt.Errorf("Unimplemented!")
}
func (registry *EtcdRegistry) DeleteTask(taskID string) error {
_, machine, err := registry.findTask(taskID)
func (registry *EtcdRegistry) DeletePod(podID string) error {
_, machine, err := registry.findPod(podID)
if err != nil {
return err
}
return registry.deleteTaskFromMachine(machine, taskID)
return registry.deletePodFromMachine(machine, podID)
}
func (registry *EtcdRegistry) deleteTaskFromMachine(machine, taskID string) error {
func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
manifests, err := registry.loadManifests(machine)
if err != nil {
return err
@ -193,7 +193,7 @@ func (registry *EtcdRegistry) deleteTaskFromMachine(machine, taskID string) erro
newManifests := make([]ContainerManifest, 0)
found := false
for _, manifest := range manifests {
if manifest.Id != taskID {
if manifest.Id != podID {
newManifests = append(newManifests, manifest)
} else {
found = true
@ -201,20 +201,20 @@ func (registry *EtcdRegistry) deleteTaskFromMachine(machine, taskID string) erro
}
if !found {
// This really shouldn't happen, it indicates something is broken, and likely
// there is a lost task somewhere.
// there is a lost pod somewhere.
// However it is "deleted" so log it and move on
log.Printf("Couldn't find: %s in %#v", taskID, manifests)
log.Printf("Couldn't find: %s in %#v", podID, manifests)
}
if err = registry.updateManifests(machine, newManifests); err != nil {
return err
}
key := makeTaskKey(machine, taskID)
key := makePodKey(machine, podID)
_, err = registry.etcdClient.Delete(key, true)
return err
}
func (registry *EtcdRegistry) getTaskForMachine(machine, taskID string) (Pod, error) {
key := makeTaskKey(machine, taskID)
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (Pod, error) {
key := makePodKey(machine, podID)
result, err := registry.etcdClient.Get(key, false, false)
if err != nil {
if isEtcdNotFound(err) {
@ -226,20 +226,20 @@ func (registry *EtcdRegistry) getTaskForMachine(machine, taskID string) (Pod, er
if result.Node == nil || len(result.Node.Value) == 0 {
return Pod{}, fmt.Errorf("no nodes field: %#v", result)
}
task := Pod{}
err = json.Unmarshal([]byte(result.Node.Value), &task)
task.CurrentState.Host = machine
return task, err
pod := Pod{}
err = json.Unmarshal([]byte(result.Node.Value), &pod)
pod.CurrentState.Host = machine
return pod, err
}
func (registry *EtcdRegistry) findTask(taskID string) (Pod, string, error) {
func (registry *EtcdRegistry) findPod(podID string) (Pod, string, error) {
for _, machine := range registry.machines {
task, err := registry.getTaskForMachine(machine, taskID)
pod, err := registry.getPodForMachine(machine, podID)
if err == nil {
return task, machine, nil
return pod, machine, nil
}
}
return Pod{}, "", fmt.Errorf("Task not found %s", taskID)
return Pod{}, "", fmt.Errorf("Pod not found %s", podID)
}
func isEtcdNotFound(err error) bool {

View File

@ -25,20 +25,20 @@ import (
"github.com/coreos/go-etcd/etcd"
)
func TestEtcdGetTask(t *testing.T) {
func TestEtcdGetPod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Set("/registry/hosts/machine/tasks/foo", util.MakeJSONString(Pod{JSONBase: JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(Pod{JSONBase: JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
task, err := registry.GetTask("foo")
pod, err := registry.GetPod("foo")
expectNoError(t, err)
if task.ID != "foo" {
t.Errorf("Unexpected task: %#v", task)
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v", pod)
}
}
func TestEtcdGetTaskNotFound(t *testing.T) {
func TestEtcdGetPodNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/tasks/foo"] = EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -47,15 +47,15 @@ func TestEtcdGetTaskNotFound(t *testing.T) {
},
}
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
_, err := registry.GetTask("foo")
_, err := registry.GetPod("foo")
if err == nil {
t.Errorf("Unexpected non-error.")
}
}
func TestEtcdCreateTask(t *testing.T) {
func TestEtcdCreatePod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/tasks/foo"] = EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -63,7 +63,7 @@ func TestEtcdCreateTask(t *testing.T) {
}
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]ContainerManifest{}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateTask("machine", Pod{
err := registry.CreatePod("machine", Pod{
JSONBase: JSONBase{
ID: "foo",
},
@ -78,13 +78,13 @@ func TestEtcdCreateTask(t *testing.T) {
},
})
expectNoError(t, err)
resp, err := fakeClient.Get("/registry/hosts/machine/tasks/foo", false, false)
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
expectNoError(t, err)
var task Pod
err = json.Unmarshal([]byte(resp.Node.Value), &task)
var pod Pod
err = json.Unmarshal([]byte(resp.Node.Value), &pod)
expectNoError(t, err)
if task.ID != "foo" {
t.Errorf("Unexpected task: %#v %s", task, resp.Node.Value)
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests []ContainerManifest
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
@ -95,9 +95,9 @@ func TestEtcdCreateTask(t *testing.T) {
}
}
func TestEtcdCreateTaskAlreadyExisting(t *testing.T) {
func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/tasks/foo"] = EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString(Pod{JSONBase: JSONBase{ID: "foo"}}),
@ -106,7 +106,7 @@ func TestEtcdCreateTaskAlreadyExisting(t *testing.T) {
E: nil,
}
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateTask("machine", Pod{
err := registry.CreatePod("machine", Pod{
JSONBase: JSONBase{
ID: "foo",
},
@ -116,9 +116,9 @@ func TestEtcdCreateTaskAlreadyExisting(t *testing.T) {
}
}
func TestEtcdCreateTaskWithContainersError(t *testing.T) {
func TestEtcdCreatePodWithContainersError(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/tasks/foo"] = EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -131,7 +131,7 @@ func TestEtcdCreateTaskWithContainersError(t *testing.T) {
E: &etcd.EtcdError{ErrorCode: 200},
}
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateTask("machine", Pod{
err := registry.CreatePod("machine", Pod{
JSONBase: JSONBase{
ID: "foo",
},
@ -139,7 +139,7 @@ func TestEtcdCreateTaskWithContainersError(t *testing.T) {
if err == nil {
t.Error("Unexpected non-error")
}
_, err = fakeClient.Get("/registry/hosts/machine/tasks/foo", false, false)
_, err = fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
if err == nil {
t.Error("Unexpected non-error")
}
@ -148,9 +148,9 @@ func TestEtcdCreateTaskWithContainersError(t *testing.T) {
}
}
func TestEtcdCreateTaskWithContainersNotFound(t *testing.T) {
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/tasks/foo"] = EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -163,7 +163,7 @@ func TestEtcdCreateTaskWithContainersNotFound(t *testing.T) {
E: &etcd.EtcdError{ErrorCode: 100},
}
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateTask("machine", Pod{
err := registry.CreatePod("machine", Pod{
JSONBase: JSONBase{
ID: "foo",
},
@ -179,13 +179,13 @@ func TestEtcdCreateTaskWithContainersNotFound(t *testing.T) {
},
})
expectNoError(t, err)
resp, err := fakeClient.Get("/registry/hosts/machine/tasks/foo", false, false)
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
expectNoError(t, err)
var task Pod
err = json.Unmarshal([]byte(resp.Node.Value), &task)
var pod Pod
err = json.Unmarshal([]byte(resp.Node.Value), &pod)
expectNoError(t, err)
if task.ID != "foo" {
t.Errorf("Unexpected task: %#v %s", task, resp.Node.Value)
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests []ContainerManifest
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
@ -196,9 +196,9 @@ func TestEtcdCreateTaskWithContainersNotFound(t *testing.T) {
}
}
func TestEtcdCreateTaskWithExistingContainers(t *testing.T) {
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/tasks/foo"] = EtcdResponseWithError{
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
@ -210,7 +210,7 @@ func TestEtcdCreateTaskWithExistingContainers(t *testing.T) {
},
}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateTask("machine", Pod{
err := registry.CreatePod("machine", Pod{
JSONBase: JSONBase{
ID: "foo",
},
@ -226,13 +226,13 @@ func TestEtcdCreateTaskWithExistingContainers(t *testing.T) {
},
})
expectNoError(t, err)
resp, err := fakeClient.Get("/registry/hosts/machine/tasks/foo", false, false)
resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false)
expectNoError(t, err)
var task Pod
err = json.Unmarshal([]byte(resp.Node.Value), &task)
var pod Pod
err = json.Unmarshal([]byte(resp.Node.Value), &pod)
expectNoError(t, err)
if task.ID != "foo" {
t.Errorf("Unexpected task: %#v %s", task, resp.Node.Value)
if pod.ID != "foo" {
t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value)
}
var manifests []ContainerManifest
resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
@ -243,9 +243,9 @@ func TestEtcdCreateTaskWithExistingContainers(t *testing.T) {
}
}
func TestEtcdDeleteTask(t *testing.T) {
func TestEtcdDeletePod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/tasks/foo"
key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(Pod{JSONBase: JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]ContainerManifest{
ContainerManifest{
@ -253,7 +253,7 @@ func TestEtcdDeleteTask(t *testing.T) {
},
}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeleteTask("foo")
err := registry.DeletePod("foo")
expectNoError(t, err)
if len(fakeClient.deletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys)
@ -267,16 +267,16 @@ func TestEtcdDeleteTask(t *testing.T) {
}
}
func TestEtcdDeleteTaskMultipleContainers(t *testing.T) {
func TestEtcdDeletePodMultipleContainers(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/tasks/foo"
key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(Pod{JSONBase: JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]ContainerManifest{
ContainerManifest{Id: "foo"},
ContainerManifest{Id: "bar"},
}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeleteTask("foo")
err := registry.DeletePod("foo")
expectNoError(t, err)
if len(fakeClient.deletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys)
@ -295,9 +295,9 @@ func TestEtcdDeleteTaskMultipleContainers(t *testing.T) {
}
}
func TestEtcdEmptyListTasks(t *testing.T) {
func TestEtcdEmptyListPods(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/tasks"
key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@ -307,31 +307,31 @@ func TestEtcdEmptyListTasks(t *testing.T) {
E: nil,
}
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
tasks, err := registry.ListTasks(nil)
pods, err := registry.ListPods(nil)
expectNoError(t, err)
if len(tasks) != 0 {
t.Errorf("Unexpected task list: %#v", tasks)
if len(pods) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestEtcdListTasksNotFound(t *testing.T) {
func TestEtcdListPodsNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/tasks"
key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100},
}
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
tasks, err := registry.ListTasks(nil)
pods, err := registry.ListPods(nil)
expectNoError(t, err)
if len(tasks) != 0 {
t.Errorf("Unexpected task list: %#v", tasks)
if len(pods) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestEtcdListTasks(t *testing.T) {
func TestEtcdListPods(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/tasks"
key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
@ -348,10 +348,10 @@ func TestEtcdListTasks(t *testing.T) {
E: nil,
}
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
tasks, err := registry.ListTasks(nil)
pods, err := registry.ListPods(nil)
expectNoError(t, err)
if len(tasks) != 2 || tasks[0].ID != "foo" || tasks[1].ID != "bar" {
t.Errorf("Unexpected task list: %#v", tasks)
if len(pods) != 2 || pods[0].ID != "foo" || pods[1].ID != "bar" {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
@ -471,7 +471,7 @@ func TestEtcdCreateController(t *testing.T) {
err = json.Unmarshal([]byte(resp.Node.Value), &ctrl)
expectNoError(t, err)
if ctrl.ID != "foo" {
t.Errorf("Unexpected task: %#v %s", ctrl, resp.Node.Value)
t.Errorf("Unexpected pod: %#v %s", ctrl, resp.Node.Value)
}
}
@ -514,7 +514,7 @@ func TestEtcdListServices(t *testing.T) {
services, err := registry.ListServices()
expectNoError(t, err)
if len(services.Items) != 2 || services.Items[0].ID != "foo" || services.Items[1].ID != "bar" {
t.Errorf("Unexpected task list: %#v", services)
t.Errorf("Unexpected pod list: %#v", services)
}
}
@ -548,7 +548,7 @@ func TestEtcdGetService(t *testing.T) {
service, err := registry.GetService("foo")
expectNoError(t, err)
if service.ID != "foo" {
t.Errorf("Unexpected task: %#v", service)
t.Errorf("Unexpected pod: %#v", service)
}
}

View File

@ -19,19 +19,19 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// TaskRegistry is an interface implemented by things that know how to store Task objects
// PodRegistry is an interface implemented by things that know how to store Pod objects
type PodRegistry interface {
// ListTasks obtains a list of tasks that match query.
// Query may be nil in which case all tasks are returned.
ListTasks(query *map[string]string) ([]api.Pod, error)
// Get a specific task
GetTask(taskId string) (*api.Pod, error)
// Create a task based on a specification, schedule it onto a specific machine.
CreateTask(machine string, task api.Pod) error
// Update an existing task
UpdateTask(task api.Pod) error
// Delete an existing task
DeleteTask(taskId string) error
// ListPods obtains a list of pods that match query.
// Query may be nil in which case all pods are returned.
ListPods(query *map[string]string) ([]api.Pod, error)
// Get a specific pod
GetPod(podID string) (*api.Pod, error)
// Create a pod based on a specification, schedule it onto a specific machine.
CreatePod(machine string, pod api.Pod) error
// Update an existing pod
UpdatePod(pod api.Pod) error
// Delete an existing pod
DeletePod(podID string) error
}
// ControllerRegistry is an interface for things that know how to store Controllers

View File

@ -20,22 +20,22 @@ import (
)
type ManifestFactory interface {
// Make a container object for a given task, given the machine that the task is running on.
MakeManifest(machine string, task Pod) (ContainerManifest, error)
// Make a container object for a given pod, given the machine that the pod is running on.
MakeManifest(machine string, pod Pod) (ContainerManifest, error)
}
type BasicManifestFactory struct {
serviceRegistry ServiceRegistry
}
func (b *BasicManifestFactory) MakeManifest(machine string, task Pod) (ContainerManifest, error) {
func (b *BasicManifestFactory) MakeManifest(machine string, pod Pod) (ContainerManifest, error) {
envVars, err := GetServiceEnvironmentVariables(b.serviceRegistry, machine)
if err != nil {
return ContainerManifest{}, err
}
for ix, container := range task.DesiredState.Manifest.Containers {
task.DesiredState.Manifest.Id = task.ID
task.DesiredState.Manifest.Containers[ix].Env = append(container.Env, envVars...)
for ix, container := range pod.DesiredState.Manifest.Containers {
pod.DesiredState.Manifest.Id = pod.ID
pod.DesiredState.Manifest.Containers[ix].Env = append(container.Env, envVars...)
}
return task.DesiredState.Manifest, nil
return pod.DesiredState.Manifest, nil
}

View File

@ -19,25 +19,25 @@ import (
. "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// An implementation of TaskRegistry and ControllerRegistry that is backed by memory
// An implementation of PodRegistry and ControllerRegistry that is backed by memory
// Mainly used for testing.
type MemoryRegistry struct {
taskData map[string]Pod
podData map[string]Pod
controllerData map[string]ReplicationController
serviceData map[string]Service
}
func MakeMemoryRegistry() *MemoryRegistry {
return &MemoryRegistry{
taskData: map[string]Pod{},
podData: map[string]Pod{},
controllerData: map[string]ReplicationController{},
serviceData: map[string]Service{},
}
}
func (registry *MemoryRegistry) ListTasks(labelQuery *map[string]string) ([]Pod, error) {
func (registry *MemoryRegistry) ListPods(labelQuery *map[string]string) ([]Pod, error) {
result := []Pod{}
for _, value := range registry.taskData {
for _, value := range registry.podData {
if LabelsMatch(value, labelQuery) {
result = append(result, value)
}
@ -45,27 +45,27 @@ func (registry *MemoryRegistry) ListTasks(labelQuery *map[string]string) ([]Pod,
return result, nil
}
func (registry *MemoryRegistry) GetTask(taskID string) (*Pod, error) {
task, found := registry.taskData[taskID]
func (registry *MemoryRegistry) GetPod(podID string) (*Pod, error) {
pod, found := registry.podData[podID]
if found {
return &task, nil
return &pod, nil
} else {
return nil, nil
}
}
func (registry *MemoryRegistry) CreateTask(machine string, task Pod) error {
registry.taskData[task.ID] = task
func (registry *MemoryRegistry) CreatePod(machine string, pod Pod) error {
registry.podData[pod.ID] = pod
return nil
}
func (registry *MemoryRegistry) DeleteTask(taskID string) error {
delete(registry.taskData, taskID)
func (registry *MemoryRegistry) DeletePod(podID string) error {
delete(registry.podData, podID)
return nil
}
func (registry *MemoryRegistry) UpdateTask(task Pod) error {
registry.taskData[task.ID] = task
func (registry *MemoryRegistry) UpdatePod(pod Pod) error {
registry.podData[pod.ID] = pod
return nil
}

View File

@ -21,40 +21,40 @@ import (
. "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func TestListTasksEmpty(t *testing.T) {
func TestListPodsEmpty(t *testing.T) {
registry := MakeMemoryRegistry()
tasks, err := registry.ListTasks(nil)
pods, err := registry.ListPods(nil)
expectNoError(t, err)
if len(tasks) != 0 {
t.Errorf("Unexpected task list: %#v", tasks)
if len(pods) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestMemoryListTasks(t *testing.T) {
func TestMemoryListPods(t *testing.T) {
registry := MakeMemoryRegistry()
registry.CreateTask("machine", Pod{JSONBase: JSONBase{ID: "foo"}})
tasks, err := registry.ListTasks(nil)
registry.CreatePod("machine", Pod{JSONBase: JSONBase{ID: "foo"}})
pods, err := registry.ListPods(nil)
expectNoError(t, err)
if len(tasks) != 1 || tasks[0].ID != "foo" {
t.Errorf("Unexpected task list: %#v", tasks)
if len(pods) != 1 || pods[0].ID != "foo" {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestMemorySetGetTasks(t *testing.T) {
func TestMemorySetGetPods(t *testing.T) {
registry := MakeMemoryRegistry()
expectedTask := Pod{JSONBase: JSONBase{ID: "foo"}}
registry.CreateTask("machine", expectedTask)
task, err := registry.GetTask("foo")
expectedPod := Pod{JSONBase: JSONBase{ID: "foo"}}
registry.CreatePod("machine", expectedPod)
pod, err := registry.GetPod("foo")
expectNoError(t, err)
if expectedTask.ID != task.ID {
t.Errorf("Unexpected task, expected %#v, actual %#v", expectedTask, task)
if expectedPod.ID != pod.ID {
t.Errorf("Unexpected pod, expected %#v, actual %#v", expectedPod, pod)
}
}
func TestMemorySetUpdateGetTasks(t *testing.T) {
func TestMemorySetUpdateGetPods(t *testing.T) {
registry := MakeMemoryRegistry()
oldTask := Pod{JSONBase: JSONBase{ID: "foo"}}
expectedTask := Pod{
oldPod := Pod{JSONBase: JSONBase{ID: "foo"}}
expectedPod := Pod{
JSONBase: JSONBase{
ID: "foo",
},
@ -62,43 +62,43 @@ func TestMemorySetUpdateGetTasks(t *testing.T) {
Host: "foo.com",
},
}
registry.CreateTask("machine", oldTask)
registry.UpdateTask(expectedTask)
task, err := registry.GetTask("foo")
registry.CreatePod("machine", oldPod)
registry.UpdatePod(expectedPod)
pod, err := registry.GetPod("foo")
expectNoError(t, err)
if expectedTask.ID != task.ID || task.DesiredState.Host != expectedTask.DesiredState.Host {
t.Errorf("Unexpected task, expected %#v, actual %#v", expectedTask, task)
if expectedPod.ID != pod.ID || pod.DesiredState.Host != expectedPod.DesiredState.Host {
t.Errorf("Unexpected pod, expected %#v, actual %#v", expectedPod, pod)
}
}
func TestMemorySetDeleteGetTasks(t *testing.T) {
func TestMemorySetDeleteGetPods(t *testing.T) {
registry := MakeMemoryRegistry()
expectedTask := Pod{JSONBase: JSONBase{ID: "foo"}}
registry.CreateTask("machine", expectedTask)
registry.DeleteTask("foo")
task, err := registry.GetTask("foo")
expectedPod := Pod{JSONBase: JSONBase{ID: "foo"}}
registry.CreatePod("machine", expectedPod)
registry.DeletePod("foo")
pod, err := registry.GetPod("foo")
expectNoError(t, err)
if task != nil {
t.Errorf("Unexpected task: %#v", task)
if pod != nil {
t.Errorf("Unexpected pod: %#v", pod)
}
}
func TestListControllersEmpty(t *testing.T) {
registry := MakeMemoryRegistry()
tasks, err := registry.ListControllers()
pods, err := registry.ListControllers()
expectNoError(t, err)
if len(tasks) != 0 {
t.Errorf("Unexpected task list: %#v", tasks)
if len(pods) != 0 {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
func TestMemoryListControllers(t *testing.T) {
registry := MakeMemoryRegistry()
registry.CreateController(ReplicationController{JSONBase: JSONBase{ID: "foo"}})
tasks, err := registry.ListControllers()
pods, err := registry.ListControllers()
expectNoError(t, err)
if len(tasks) != 1 || tasks[0].ID != "foo" {
t.Errorf("Unexpected task list: %#v", tasks)
if len(pods) != 1 || pods[0].ID != "foo" {
t.Errorf("Unexpected pod list: %#v", pods)
}
}
@ -106,10 +106,10 @@ func TestMemorySetGetControllers(t *testing.T) {
registry := MakeMemoryRegistry()
expectedController := ReplicationController{JSONBase: JSONBase{ID: "foo"}}
registry.CreateController(expectedController)
task, err := registry.GetController("foo")
pod, err := registry.GetController("foo")
expectNoError(t, err)
if expectedController.ID != task.ID {
t.Errorf("Unexpected task, expected %#v, actual %#v", expectedController, task)
if expectedController.ID != pod.ID {
t.Errorf("Unexpected pod, expected %#v, actual %#v", expectedController, pod)
}
}
@ -126,10 +126,10 @@ func TestMemorySetUpdateGetControllers(t *testing.T) {
}
registry.CreateController(oldController)
registry.UpdateController(expectedController)
task, err := registry.GetController("foo")
pod, err := registry.GetController("foo")
expectNoError(t, err)
if expectedController.ID != task.ID || task.DesiredState.Replicas != expectedController.DesiredState.Replicas {
t.Errorf("Unexpected task, expected %#v, actual %#v", expectedController, task)
if expectedController.ID != pod.ID || pod.DesiredState.Replicas != expectedController.DesiredState.Replicas {
t.Errorf("Unexpected pod, expected %#v, actual %#v", expectedController, pod)
}
}
@ -138,9 +138,9 @@ func TestMemorySetDeleteGetControllers(t *testing.T) {
expectedController := ReplicationController{JSONBase: JSONBase{ID: "foo"}}
registry.CreateController(expectedController)
registry.DeleteController("foo")
task, err := registry.GetController("foo")
pod, err := registry.GetController("foo")
expectNoError(t, err)
if task != nil {
t.Errorf("Unexpected task: %#v", task)
if pod != nil {
t.Errorf("Unexpected pod: %#v", pod)
}
}

View File

@ -25,24 +25,24 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
)
// TaskRegistryStorage implements the RESTStorage interface in terms of a TaskRegistry
type TaskRegistryStorage struct {
// PodRegistryStorage implements the RESTStorage interface in terms of a PodRegistry
type PodRegistryStorage struct {
registry PodRegistry
containerInfo client.ContainerInfo
scheduler Scheduler
}
func MakeTaskRegistryStorage(registry PodRegistry, containerInfo client.ContainerInfo, scheduler Scheduler) apiserver.RESTStorage {
return &TaskRegistryStorage{
func MakePodRegistryStorage(registry PodRegistry, containerInfo client.ContainerInfo, scheduler Scheduler) apiserver.RESTStorage {
return &PodRegistryStorage{
registry: registry,
containerInfo: containerInfo,
scheduler: scheduler,
}
}
// LabelMatch tests to see if a Task's labels map contains 'key' mapping to 'value'
func LabelMatch(task Pod, queryKey, queryValue string) bool {
for key, value := range task.Labels {
// LabelMatch tests to see if a Pod's labels map contains 'key' mapping to 'value'
func LabelMatch(pod Pod, queryKey, queryValue string) bool {
for key, value := range pod.Labels {
if queryKey == key && queryValue == value {
return true
}
@ -50,72 +50,72 @@ func LabelMatch(task Pod, queryKey, queryValue string) bool {
return false
}
// LabelMatch tests to see if a Task's labels map contains all key/value pairs in 'labelQuery'
func LabelsMatch(task Pod, labelQuery *map[string]string) bool {
// LabelMatch tests to see if a Pod's labels map contains all key/value pairs in 'labelQuery'
func LabelsMatch(pod Pod, labelQuery *map[string]string) bool {
if labelQuery == nil {
return true
}
for key, value := range *labelQuery {
if !LabelMatch(task, key, value) {
if !LabelMatch(pod, key, value) {
return false
}
}
return true
}
func (storage *TaskRegistryStorage) List(url *url.URL) (interface{}, error) {
func (storage *PodRegistryStorage) List(url *url.URL) (interface{}, error) {
var result PodList
var query *map[string]string
if url != nil {
queryMap := client.DecodeLabelQuery(url.Query().Get("labels"))
query = &queryMap
}
tasks, err := storage.registry.ListTasks(query)
pods, err := storage.registry.ListPods(query)
if err == nil {
result = PodList{
Items: tasks,
Items: pods,
}
}
result.Kind = "cluster#taskList"
result.Kind = "cluster#podList"
return result, err
}
func (storage *TaskRegistryStorage) Get(id string) (interface{}, error) {
task, err := storage.registry.GetTask(id)
func (storage *PodRegistryStorage) Get(id string) (interface{}, error) {
pod, err := storage.registry.GetPod(id)
if err != nil {
return task, err
return pod, err
}
info, err := storage.containerInfo.GetContainerInfo(task.CurrentState.Host, id)
info, err := storage.containerInfo.GetContainerInfo(pod.CurrentState.Host, id)
if err != nil {
return task, err
return pod, err
}
task.CurrentState.Info = info
task.Kind = "cluster#task"
return task, err
pod.CurrentState.Info = info
pod.Kind = "cluster#pod"
return pod, err
}
func (storage *TaskRegistryStorage) Delete(id string) error {
return storage.registry.DeleteTask(id)
func (storage *PodRegistryStorage) Delete(id string) error {
return storage.registry.DeletePod(id)
}
func (storage *TaskRegistryStorage) Extract(body string) (interface{}, error) {
task := Pod{}
err := json.Unmarshal([]byte(body), &task)
return task, err
func (storage *PodRegistryStorage) Extract(body string) (interface{}, error) {
pod := Pod{}
err := json.Unmarshal([]byte(body), &pod)
return pod, err
}
func (storage *TaskRegistryStorage) Create(task interface{}) error {
taskObj := task.(Pod)
if len(taskObj.ID) == 0 {
return fmt.Errorf("ID is unspecified: %#v", task)
func (storage *PodRegistryStorage) Create(pod interface{}) error {
podObj := pod.(Pod)
if len(podObj.ID) == 0 {
return fmt.Errorf("ID is unspecified: %#v", pod)
}
machine, err := storage.scheduler.Schedule(taskObj)
machine, err := storage.scheduler.Schedule(podObj)
if err != nil {
return err
}
return storage.registry.CreateTask(machine, taskObj)
return storage.registry.CreatePod(machine, podObj)
}
func (storage *TaskRegistryStorage) Update(task interface{}) error {
return storage.registry.UpdateTask(task.(Pod))
func (storage *PodRegistryStorage) Update(pod interface{}) error {
return storage.registry.UpdatePod(pod.(Pod))
}

View File

@ -24,7 +24,7 @@ import (
)
type MockPodRegistry struct {
err error
err error
pods []Pod
}
@ -34,30 +34,30 @@ func expectNoError(t *testing.T, err error) {
}
}
func (registry *MockPodRegistry) ListTasks(*map[string]string) ([]Pod, error) {
func (registry *MockPodRegistry) ListPods(*map[string]string) ([]Pod, error) {
return registry.pods, registry.err
}
func (registry *MockPodRegistry) GetTask(podId string) (*Pod, error) {
func (registry *MockPodRegistry) GetPod(podId string) (*Pod, error) {
return &Pod{}, registry.err
}
func (registry *MockPodRegistry) CreateTask(machine string, pod Pod) error {
func (registry *MockPodRegistry) CreatePod(machine string, pod Pod) error {
return registry.err
}
func (registry *MockPodRegistry) UpdateTask(pod Pod) error {
func (registry *MockPodRegistry) UpdatePod(pod Pod) error {
return registry.err
}
func (registry *MockPodRegistry) DeleteTask(podId string) error {
func (registry *MockPodRegistry) DeletePod(podId string) error {
return registry.err
}
func TestListTasksError(t *testing.T) {
func TestListPodsError(t *testing.T) {
mockRegistry := MockPodRegistry{
err: fmt.Errorf("Test Error"),
}
storage := TaskRegistryStorage{
storage := PodRegistryStorage{
registry: &mockRegistry,
}
pods, err := storage.List(nil)
@ -69,9 +69,9 @@ func TestListTasksError(t *testing.T) {
}
}
func TestListEmptyTaskList(t *testing.T) {
func TestListEmptyPodList(t *testing.T) {
mockRegistry := MockPodRegistry{}
storage := TaskRegistryStorage{
storage := PodRegistryStorage{
registry: &mockRegistry,
}
pods, err := storage.List(nil)
@ -81,7 +81,7 @@ func TestListEmptyTaskList(t *testing.T) {
}
}
func TestListTaskList(t *testing.T) {
func TestListPodList(t *testing.T) {
mockRegistry := MockPodRegistry{
pods: []Pod{
Pod{
@ -96,7 +96,7 @@ func TestListTaskList(t *testing.T) {
},
},
}
storage := TaskRegistryStorage{
storage := PodRegistryStorage{
registry: &mockRegistry,
}
podsObj, err := storage.List(nil)
@ -115,7 +115,7 @@ func TestListTaskList(t *testing.T) {
func TestExtractJson(t *testing.T) {
mockRegistry := MockPodRegistry{}
storage := TaskRegistryStorage{
storage := PodRegistryStorage{
registry: &mockRegistry,
}
pod := Pod{

View File

@ -31,53 +31,53 @@ import (
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
// with actual running tasks.
// with actual running pods.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct {
etcdClient *etcd.Client
kubeClient client.ClientInterface
taskControl TaskControlInterface
updateLock sync.Mutex
etcdClient *etcd.Client
kubeClient client.ClientInterface
podControl PodControlInterface
updateLock sync.Mutex
}
// An interface that knows how to add or delete tasks
// An interface that knows how to add or delete pods
// created as an interface to allow testing.
type TaskControlInterface interface {
type PodControlInterface interface {
createReplica(controllerSpec ReplicationController)
deleteTask(taskID string) error
deletePod(podID string) error
}
type RealTaskControl struct {
type RealPodControl struct {
kubeClient client.ClientInterface
}
func (r RealTaskControl) createReplica(controllerSpec ReplicationController) {
func (r RealPodControl) createReplica(controllerSpec ReplicationController) {
labels := controllerSpec.DesiredState.PodTemplate.Labels
if labels != nil {
labels["replicationController"] = controllerSpec.ID
}
task := Pod{
pod := Pod{
JSONBase: JSONBase{
ID: fmt.Sprintf("%x", rand.Int()),
},
DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
Labels: controllerSpec.DesiredState.PodTemplate.Labels,
}
_, err := r.kubeClient.CreateTask(task)
_, err := r.kubeClient.CreatePod(pod)
if err != nil {
log.Printf("%#v\n", err)
}
}
func (r RealTaskControl) deleteTask(taskID string) error {
return r.kubeClient.DeleteTask(taskID)
func (r RealPodControl) deletePod(podID string) error {
return r.kubeClient.DeletePod(podID)
}
func MakeReplicationManager(etcdClient *etcd.Client, kubeClient client.ClientInterface) *ReplicationManager {
return &ReplicationManager{
kubeClient: kubeClient,
etcdClient: etcdClient,
taskControl: RealTaskControl{
podControl: RealPodControl{
kubeClient: kubeClient,
},
}
@ -118,9 +118,9 @@ func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*Rep
return nil, nil
}
func (rm *ReplicationManager) filterActiveTasks(tasks []Pod) []Pod {
func (rm *ReplicationManager) filterActivePods(pods []Pod) []Pod {
var result []Pod
for _, value := range tasks {
for _, value := range pods {
if strings.Index(value.CurrentState.Status, "Exit") == -1 {
result = append(result, value)
}
@ -130,23 +130,23 @@ func (rm *ReplicationManager) filterActiveTasks(tasks []Pod) []Pod {
func (rm *ReplicationManager) syncReplicationController(controllerSpec ReplicationController) error {
rm.updateLock.Lock()
taskList, err := rm.kubeClient.ListTasks(controllerSpec.DesiredState.ReplicasInSet)
podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet)
if err != nil {
return err
}
filteredList := rm.filterActiveTasks(taskList.Items)
filteredList := rm.filterActivePods(podList.Items)
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
log.Printf("%#v", filteredList)
if diff < 0 {
diff *= -1
log.Printf("Too few replicas, creating %d\n", diff)
for i := 0; i < diff; i++ {
rm.taskControl.createReplica(controllerSpec)
rm.podControl.createReplica(controllerSpec)
}
} else if diff > 0 {
log.Print("Too many replicas, deleting")
for i := 0; i < diff; i++ {
rm.taskControl.deleteTask(filteredList[i].ID)
rm.podControl.deletePod(filteredList[i].ID)
}
}
rm.updateLock.Unlock()

View File

@ -35,17 +35,17 @@ func makeUrl(suffix string) string {
return apiPath + suffix
}
type FakeTaskControl struct {
type FakePodControl struct {
controllerSpec []ReplicationController
deleteTaskID []string
deletePodID []string
}
func (f *FakeTaskControl) createReplica(spec ReplicationController) {
func (f *FakePodControl) createReplica(spec ReplicationController) {
f.controllerSpec = append(f.controllerSpec, spec)
}
func (f *FakeTaskControl) deleteTask(taskID string) error {
f.deleteTaskID = append(f.deleteTaskID, taskID)
func (f *FakePodControl) deletePod(podID string) error {
f.deletePodID = append(f.deletePodID, podID)
return nil
}
@ -72,31 +72,31 @@ func makeReplicationController(replicas int) ReplicationController {
}
}
func makeTaskList(count int) PodList {
tasks := []Pod{}
func makePodList(count int) PodList {
pods := []Pod{}
for i := 0; i < count; i++ {
tasks = append(tasks, Pod{
pods = append(pods, Pod{
JSONBase: JSONBase{
ID: fmt.Sprintf("task%d", i),
ID: fmt.Sprintf("pod%d", i),
},
})
}
return PodList{
Items: tasks,
Items: pods,
}
}
func validateSyncReplication(t *testing.T, fakeTaskControl *FakeTaskControl, expectedCreates, expectedDeletes int) {
if len(fakeTaskControl.controllerSpec) != expectedCreates {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakeTaskControl.controllerSpec))
func validateSyncReplication(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) {
if len(fakePodControl.controllerSpec) != expectedCreates {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.controllerSpec))
}
if len(fakeTaskControl.deleteTaskID) != expectedDeletes {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakeTaskControl.deleteTaskID))
if len(fakePodControl.deletePodID) != expectedDeletes {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodID))
}
}
func TestSyncReplicationControllerDoesNothing(t *testing.T) {
body, _ := json.Marshal(makeTaskList(2))
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -106,19 +106,19 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
Host: testServer.URL,
}
fakeTaskControl := FakeTaskControl{}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.taskControl = &fakeTaskControl
manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(2)
manager.syncReplicationController(controllerSpec)
validateSyncReplication(t, &fakeTaskControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 0, 0)
}
func TestSyncReplicationControllerDeletes(t *testing.T) {
body, _ := json.Marshal(makeTaskList(2))
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -128,15 +128,15 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
Host: testServer.URL,
}
fakeTaskControl := FakeTaskControl{}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.taskControl = &fakeTaskControl
manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(1)
manager.syncReplicationController(controllerSpec)
validateSyncReplication(t, &fakeTaskControl, 0, 1)
validateSyncReplication(t, &fakePodControl, 0, 1)
}
func TestSyncReplicationControllerCreates(t *testing.T) {
@ -150,15 +150,15 @@ func TestSyncReplicationControllerCreates(t *testing.T) {
Host: testServer.URL,
}
fakeTaskControl := FakeTaskControl{}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.taskControl = &fakeTaskControl
manager.podControl = &fakePodControl
controllerSpec := makeReplicationController(2)
manager.syncReplicationController(controllerSpec)
validateSyncReplication(t, &fakeTaskControl, 2, 0)
validateSyncReplication(t, &fakePodControl, 2, 0)
}
func TestCreateReplica(t *testing.T) {
@ -172,7 +172,7 @@ func TestCreateReplica(t *testing.T) {
Host: testServer.URL,
}
taskControl := RealTaskControl{
podControl := RealPodControl{
kubeClient: client,
}
@ -196,9 +196,9 @@ func TestCreateReplica(t *testing.T) {
},
}
taskControl.createReplica(controllerSpec)
podControl.createReplica(controllerSpec)
//expectedTask := Task{
//expectedPod := Pod{
// Labels: controllerSpec.DesiredState.PodTemplate.Labels,
// DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
//}
@ -207,7 +207,7 @@ func TestCreateReplica(t *testing.T) {
}
func TestHandleWatchResponseNotSet(t *testing.T) {
body, _ := json.Marshal(makeTaskList(2))
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -217,10 +217,10 @@ func TestHandleWatchResponseNotSet(t *testing.T) {
Host: testServer.URL,
}
fakeTaskControl := FakeTaskControl{}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.taskControl = &fakeTaskControl
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "delete",
})
@ -228,7 +228,7 @@ func TestHandleWatchResponseNotSet(t *testing.T) {
}
func TestHandleWatchResponseNoNode(t *testing.T) {
body, _ := json.Marshal(makeTaskList(2))
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -238,10 +238,10 @@ func TestHandleWatchResponseNoNode(t *testing.T) {
Host: testServer.URL,
}
fakeTaskControl := FakeTaskControl{}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.taskControl = &fakeTaskControl
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
})
@ -251,7 +251,7 @@ func TestHandleWatchResponseNoNode(t *testing.T) {
}
func TestHandleWatchResponseBadData(t *testing.T) {
body, _ := json.Marshal(makeTaskList(2))
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -261,10 +261,10 @@ func TestHandleWatchResponseBadData(t *testing.T) {
Host: testServer.URL,
}
fakeTaskControl := FakeTaskControl{}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.taskControl = &fakeTaskControl
manager.podControl = &fakePodControl
_, err := manager.handleWatchResponse(&etcd.Response{
Action: "set",
Node: &etcd.Node{
@ -277,7 +277,7 @@ func TestHandleWatchResponseBadData(t *testing.T) {
}
func TestHandleWatchResponse(t *testing.T) {
body, _ := json.Marshal(makeTaskList(2))
body, _ := json.Marshal(makePodList(2))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
@ -287,10 +287,10 @@ func TestHandleWatchResponse(t *testing.T) {
Host: testServer.URL,
}
fakeTaskControl := FakeTaskControl{}
fakePodControl := FakePodControl{}
manager := MakeReplicationManager(nil, &client)
manager.taskControl = &fakeTaskControl
manager.podControl = &fakePodControl
controller := makeReplicationController(2)

View File

@ -22,7 +22,7 @@ import (
. "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
// Scheduler is an interface implemented by things that know how to schedule tasks onto machines.
// Scheduler is an interface implemented by things that know how to schedule pods onto machines.
type Scheduler interface {
Schedule(Pod) (string, error)
}
@ -40,7 +40,7 @@ func MakeRandomScheduler(machines []string, random rand.Rand) Scheduler {
}
}
func (s *RandomScheduler) Schedule(task Pod) (string, error) {
func (s *RandomScheduler) Schedule(pod Pod) (string, error) {
return s.machines[s.random.Int()%len(s.machines)], nil
}
@ -57,7 +57,7 @@ func MakeRoundRobinScheduler(machines []string) Scheduler {
}
}
func (s *RoundRobinScheduler) Schedule(task Pod) (string, error) {
func (s *RoundRobinScheduler) Schedule(pod Pod) (string, error) {
result := s.machines[s.currentIndex]
s.currentIndex = (s.currentIndex + 1) % len(s.machines)
return result, nil
@ -75,10 +75,10 @@ func MakeFirstFitScheduler(machines []string, registry PodRegistry) Scheduler {
}
}
func (s *FirstFitScheduler) containsPort(task Pod, port Port) bool {
for _, container := range task.DesiredState.Manifest.Containers {
for _, taskPort := range container.Ports {
if taskPort.HostPort == port.HostPort {
func (s *FirstFitScheduler) containsPort(pod Pod, port Port) bool {
for _, container := range pod.DesiredState.Manifest.Containers {
for _, podPort := range container.Ports {
if podPort.HostPort == port.HostPort {
return true
}
}
@ -86,30 +86,30 @@ func (s *FirstFitScheduler) containsPort(task Pod, port Port) bool {
return false
}
func (s *FirstFitScheduler) Schedule(task Pod) (string, error) {
machineToTasks := map[string][]Pod{}
tasks, err := s.registry.ListTasks(nil)
func (s *FirstFitScheduler) Schedule(pod Pod) (string, error) {
machineToPods := map[string][]Pod{}
pods, err := s.registry.ListPods(nil)
if err != nil {
return "", err
}
for _, scheduledTask := range tasks {
host := scheduledTask.CurrentState.Host
machineToTasks[host] = append(machineToTasks[host], scheduledTask)
for _, scheduledPod := range pods {
host := scheduledPod.CurrentState.Host
machineToPods[host] = append(machineToPods[host], scheduledPod)
}
for _, machine := range s.machines {
taskFits := true
for _, scheduledTask := range machineToTasks[machine] {
for _, container := range task.DesiredState.Manifest.Containers {
podFits := true
for _, scheduledPod := range machineToPods[machine] {
for _, container := range pod.DesiredState.Manifest.Containers {
for _, port := range container.Ports {
if s.containsPort(scheduledTask, port) {
taskFits = false
if s.containsPort(scheduledPod, port) {
podFits = false
}
}
}
}
if taskFits {
if podFits {
return machine, nil
}
}
return "", fmt.Errorf("Failed to find fit for %#v", task)
return "", fmt.Errorf("Failed to find fit for %#v", pod)
}

View File

@ -22,8 +22,8 @@ import (
. "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
func expectSchedule(scheduler Scheduler, task Pod, expected string, t *testing.T) {
actual, err := scheduler.Schedule(task)
func expectSchedule(scheduler Scheduler, pod Pod, expected string, t *testing.T) {
actual, err := scheduler.Schedule(pod)
expectNoError(t, err)
if actual != expected {
t.Errorf("Unexpected scheduling value: %d, expected %d", actual, expected)
@ -51,7 +51,7 @@ func TestFirstFitSchedulerNothingScheduled(t *testing.T) {
expectSchedule(scheduler, Pod{}, "m1", t)
}
func makeTask(host string, hostPorts ...int) Pod {
func makePod(host string, hostPorts ...int) Pod {
networkPorts := []Port{}
for _, port := range hostPorts {
networkPorts = append(networkPorts, Port{HostPort: port})
@ -75,35 +75,35 @@ func makeTask(host string, hostPorts ...int) Pod {
func TestFirstFitSchedulerFirstScheduled(t *testing.T) {
mockRegistry := MockPodRegistry{
pods: []Pod{
makeTask("m1", 8080),
makePod("m1", 8080),
},
}
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry)
expectSchedule(scheduler, makeTask("", 8080), "m2", t)
expectSchedule(scheduler, makePod("", 8080), "m2", t)
}
func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) {
mockRegistry := MockPodRegistry{
pods: []Pod{
makeTask("m1", 80, 8080),
makeTask("m2", 8081, 8082, 8083),
makeTask("m3", 80, 443, 8085),
makePod("m1", 80, 8080),
makePod("m2", 8081, 8082, 8083),
makePod("m3", 80, 443, 8085),
},
}
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry)
expectSchedule(scheduler, makeTask("", 8080, 8081), "m3", t)
expectSchedule(scheduler, makePod("", 8080, 8081), "m3", t)
}
func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) {
mockRegistry := MockPodRegistry{
pods: []Pod{
makeTask("m1", 8080),
makeTask("m2", 8081),
makeTask("m3", 8080),
makePod("m1", 8080),
makePod("m2", 8081),
makePod("m3", 8080),
},
}
scheduler := MakeFirstFitScheduler([]string{"m1", "m2", "m3"}, &mockRegistry)
_, err := scheduler.Schedule(makeTask("", 8080, 8081))
_, err := scheduler.Schedule(makePod("", 8080, 8081))
if err == nil {
t.Error("Unexpected non-error.")
}