k3s/pkg/controller/replication_controller.go

190 lines
5.2 KiB
Go
Raw Normal View History

2014-06-06 23:40:48 +00:00
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
2014-06-06 23:40:48 +00:00
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"strings"
"time"
2014-06-12 20:17:34 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
2014-06-06 23:40:48 +00:00
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
)
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
2014-06-09 05:38:45 +00:00
// with actual running pods.
2014-06-06 23:40:48 +00:00
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct {
etcdClient util.EtcdClient
2014-06-09 05:38:45 +00:00
kubeClient client.ClientInterface
podControl PodControlInterface
2014-06-18 00:56:18 +00:00
syncTime <-chan time.Time
2014-06-06 23:40:48 +00:00
}
2014-06-09 05:38:45 +00:00
// An interface that knows how to add or delete pods
2014-06-06 23:40:48 +00:00
// created as an interface to allow testing.
2014-06-09 05:38:45 +00:00
type PodControlInterface interface {
2014-06-12 20:17:34 +00:00
createReplica(controllerSpec api.ReplicationController)
2014-06-09 05:38:45 +00:00
deletePod(podID string) error
2014-06-06 23:40:48 +00:00
}
2014-06-09 05:38:45 +00:00
type RealPodControl struct {
2014-06-06 23:40:48 +00:00
kubeClient client.ClientInterface
}
2014-06-12 20:17:34 +00:00
func (r RealPodControl) createReplica(controllerSpec api.ReplicationController) {
2014-06-09 04:39:57 +00:00
labels := controllerSpec.DesiredState.PodTemplate.Labels
2014-06-06 23:40:48 +00:00
if labels != nil {
labels["replicationController"] = controllerSpec.ID
}
2014-06-12 20:17:34 +00:00
pod := api.Pod{
JSONBase: api.JSONBase{
2014-06-06 23:40:48 +00:00
ID: fmt.Sprintf("%x", rand.Int()),
},
2014-06-09 04:39:57 +00:00
DesiredState: controllerSpec.DesiredState.PodTemplate.DesiredState,
Labels: controllerSpec.DesiredState.PodTemplate.Labels,
2014-06-06 23:40:48 +00:00
}
2014-06-09 05:38:45 +00:00
_, err := r.kubeClient.CreatePod(pod)
2014-06-06 23:40:48 +00:00
if err != nil {
log.Printf("%#v\n", err)
}
}
2014-06-09 05:38:45 +00:00
func (r RealPodControl) deletePod(podID string) error {
return r.kubeClient.DeletePod(podID)
2014-06-06 23:40:48 +00:00
}
func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager {
2014-06-06 23:40:48 +00:00
return &ReplicationManager{
kubeClient: kubeClient,
etcdClient: etcdClient,
2014-06-09 05:38:45 +00:00
podControl: RealPodControl{
2014-06-06 23:40:48 +00:00
kubeClient: kubeClient,
},
}
}
2014-06-17 23:42:29 +00:00
// Begin watching and syncing.
func (rm *ReplicationManager) Run(period time.Duration) {
2014-06-18 00:56:18 +00:00
rm.syncTime = time.Tick(period)
2014-06-17 23:42:29 +00:00
go util.Forever(func() { rm.watchControllers() }, period)
}
func (rm *ReplicationManager) watchControllers() {
2014-06-06 23:40:48 +00:00
watchChannel := make(chan *etcd.Response)
2014-06-14 01:11:32 +00:00
go func() {
defer util.HandleCrash()
defer func() {
close(watchChannel)
}()
rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil)
}()
2014-06-06 23:40:48 +00:00
for {
2014-06-18 00:56:18 +00:00
select {
case <-rm.syncTime:
rm.synchronize()
case watchResponse, ok := <-watchChannel:
if !ok {
// watchChannel has been closed. Let the util.Forever() that
// called us call us again.
return
}
if watchResponse == nil {
time.Sleep(time.Second * 10)
continue
}
log.Printf("Got watch: %#v", watchResponse)
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
log.Printf("Error handling data: %#v, %#v", err, watchResponse)
continue
}
rm.syncReplicationController(*controller)
2014-06-06 23:40:48 +00:00
}
}
}
2014-06-12 20:17:34 +00:00
func (rm *ReplicationManager) handleWatchResponse(response *etcd.Response) (*api.ReplicationController, error) {
2014-06-06 23:40:48 +00:00
if response.Action == "set" {
if response.Node != nil {
2014-06-12 20:17:34 +00:00
var controllerSpec api.ReplicationController
2014-06-06 23:40:48 +00:00
err := json.Unmarshal([]byte(response.Node.Value), &controllerSpec)
if err != nil {
return nil, err
}
return &controllerSpec, nil
} else {
return nil, fmt.Errorf("response node is null %#v", response)
2014-06-06 23:40:48 +00:00
}
}
return nil, nil
}
2014-06-12 20:17:34 +00:00
func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod {
var result []api.Pod
2014-06-09 05:38:45 +00:00
for _, value := range pods {
2014-06-06 23:40:48 +00:00
if strings.Index(value.CurrentState.Status, "Exit") == -1 {
result = append(result, value)
}
}
return result
}
2014-06-12 20:17:34 +00:00
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
2014-06-09 05:38:45 +00:00
podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet)
2014-06-06 23:40:48 +00:00
if err != nil {
return err
}
2014-06-09 05:38:45 +00:00
filteredList := rm.filterActivePods(podList.Items)
2014-06-06 23:40:48 +00:00
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
log.Printf("%#v", filteredList)
if diff < 0 {
diff *= -1
log.Printf("Too few replicas, creating %d\n", diff)
for i := 0; i < diff; i++ {
2014-06-09 05:38:45 +00:00
rm.podControl.createReplica(controllerSpec)
2014-06-06 23:40:48 +00:00
}
} else if diff > 0 {
log.Print("Too many replicas, deleting")
for i := 0; i < diff; i++ {
2014-06-09 05:38:45 +00:00
rm.podControl.deletePod(filteredList[i].ID)
2014-06-06 23:40:48 +00:00
}
}
return nil
}
2014-06-17 23:42:29 +00:00
func (rm *ReplicationManager) synchronize() {
2014-06-18 00:56:18 +00:00
var controllerSpecs []api.ReplicationController
helper := util.EtcdHelper{rm.etcdClient}
err := helper.ExtractList("/registry/controllers", &controllerSpecs)
2014-06-17 23:42:29 +00:00
if err != nil {
2014-06-18 00:56:18 +00:00
log.Printf("Synchronization error: %v (%#v)", err, err)
2014-06-17 23:42:29 +00:00
}
2014-06-18 00:56:18 +00:00
for _, controllerSpec := range controllerSpecs {
err = rm.syncReplicationController(controllerSpec)
if err != nil {
log.Printf("Error synchronizing: %#v", err)
2014-06-06 23:40:48 +00:00
}
}
}