Merge pull request #348 from monnand/race-controller

fix data races and turn on race detector by default.
pull/6/head
brendandburns 2014-07-03 14:21:17 -07:00
commit 9a1053de7c
5 changed files with 22 additions and 24 deletions

View File

@ -1,7 +1,6 @@
language: go language: go
go: go:
- 1.2
- 1.3 - 1.3
- tip - tip

View File

@ -39,10 +39,10 @@ find_test_dirs() {
cd "${KUBE_TARGET}" cd "${KUBE_TARGET}"
if [ "$1" != "" ]; then if [ "$1" != "" ]; then
go test -cover -coverprofile="tmp.out" "$KUBE_GO_PACKAGE/$1" go test -race -cover -coverprofile="tmp.out" "$KUBE_GO_PACKAGE/$1"
exit 0 exit 0
fi fi
for package in $(find_test_dirs); do for package in $(find_test_dirs); do
go test -cover -coverprofile="tmp.out" "${KUBE_GO_PACKAGE}/${package}" go test -race -cover -coverprofile="tmp.out" "${KUBE_GO_PACKAGE}/${package}"
done done

View File

@ -689,11 +689,13 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha
return nil return nil
} }
type empty struct{}
// Sync the configured list of containers (desired state) with the host current state // Sync the configured list of containers (desired state) with the host current state
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
glog.Infof("Desired: %+v", config) glog.Infof("Desired: %+v", config)
var err error var err error
dockerIdsToKeep := map[DockerID]bool{} dockerIdsToKeep := map[DockerID]empty{}
keepChannel := make(chan DockerID) keepChannel := make(chan DockerID)
waitGroup := sync.WaitGroup{} waitGroup := sync.WaitGroup{}
@ -711,15 +713,18 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
} }
}(ix) }(ix)
} }
ch := make(chan bool)
go func() { go func() {
for id := range keepChannel { for id := range keepChannel {
dockerIdsToKeep[id] = true dockerIdsToKeep[id] = empty{}
} }
ch <- true
}() }()
if len(config) > 0 { if len(config) > 0 {
waitGroup.Wait() waitGroup.Wait()
close(keepChannel)
} }
close(keepChannel)
<-ch
// Kill any containers we don't need // Kill any containers we don't need
existingContainers, err := kl.getDockerContainers() existingContainers, err := kl.getDockerContainers()
@ -728,7 +733,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
return err return err
} }
for id, container := range existingContainers { for id, container := range existingContainers {
if !dockerIdsToKeep[id] { if _, ok := dockerIdsToKeep[id]; !ok {
glog.Infof("Killing: %s", id) glog.Infof("Killing: %s", id)
err = kl.killContainer(container) err = kl.killContainer(container)
if err != nil { if err != nil {

View File

@ -94,8 +94,8 @@ type ServiceConfig struct {
endpointsNotifyChannel chan string endpointsNotifyChannel chan string
} }
func NewServiceConfig() ServiceConfig { func NewServiceConfig() *ServiceConfig {
config := ServiceConfig{ config := &ServiceConfig{
serviceConfigSources: make(map[string]chan ServiceUpdate), serviceConfigSources: make(map[string]chan ServiceUpdate),
endpointsConfigSources: make(map[string]chan EndpointsUpdate), endpointsConfigSources: make(map[string]chan EndpointsUpdate),
serviceHandlers: make([]ServiceConfigHandler, 10), serviceHandlers: make([]ServiceConfigHandler, 10),
@ -130,6 +130,7 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c
for { for {
select { select {
case update := <-listenChannel: case update := <-listenChannel:
impl.configLock.Lock()
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.Infof("Adding new service from source %s : %v", source, update.Services) glog.Infof("Adding new service from source %s : %v", source, update.Services)
@ -152,7 +153,6 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c
glog.Infof("Received invalid update type: %v", update) glog.Infof("Received invalid update type: %v", update)
continue continue
} }
impl.configLock.Lock()
impl.serviceConfig[source] = serviceMap impl.serviceConfig[source] = serviceMap
impl.configLock.Unlock() impl.configLock.Unlock()
impl.serviceNotifyChannel <- source impl.serviceNotifyChannel <- source
@ -165,6 +165,7 @@ func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel
for { for {
select { select {
case update := <-listenChannel: case update := <-listenChannel:
impl.configLock.Lock()
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.Infof("Adding a new endpoint %v", update) glog.Infof("Adding a new endpoint %v", update)
@ -188,7 +189,6 @@ func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel
glog.Infof("Received invalid update type: %v", update) glog.Infof("Received invalid update type: %v", update)
continue continue
} }
impl.configLock.Lock()
impl.endpointConfig[source] = endpointMap impl.endpointConfig[source] = endpointMap
impl.configLock.Unlock() impl.configLock.Unlock()
impl.endpointsNotifyChannel <- source impl.endpointsNotifyChannel <- source

View File

@ -18,7 +18,6 @@ package tools
import ( import (
"fmt" "fmt"
"sync"
"testing" "testing"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
@ -30,8 +29,7 @@ type EtcdResponseWithError struct {
} }
type FakeEtcdClient struct { type FakeEtcdClient struct {
condWatchCompleted *sync.Cond watchCompletedChan chan bool
condLock sync.Mutex
Data map[string]EtcdResponseWithError Data map[string]EtcdResponseWithError
DeletedKeys []string DeletedKeys []string
@ -59,12 +57,11 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
// They are only available when Watch() is called. If users of // They are only available when Watch() is called. If users of
// FakeEtcdClient want to use any of these channels, they have to call // FakeEtcdClient want to use any of these channels, they have to call
// WaitForWatchCompletion before any operation on these channels. // WaitForWatchCompletion before any operation on these channels.
// Internally, FakeEtcdClient use condWatchCompleted to indicate if the // Internally, FakeEtcdClient use watchCompletedChan to indicate if the
// Watch() method has been called. WaitForWatchCompletion() will wait // Watch() method has been called. WaitForWatchCompletion() will wait
// on condWatchCompleted. By the end of the Watch() method, it will // on this channel. WaitForWatchCompletion() will return only when
// call Broadcast() on condWatchCompleted, which will awake any // WatchResponse, WatchInjectError and WatchStop are ready to read/write.
// goroutine waiting on this condition. ret.watchCompletedChan = make(chan bool)
ret.condWatchCompleted = sync.NewCond(&ret.condLock)
return ret return ret
} }
@ -116,9 +113,7 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
} }
func (f *FakeEtcdClient) WaitForWatchCompletion() { func (f *FakeEtcdClient) WaitForWatchCompletion() {
f.condLock.Lock() <-f.watchCompletedChan
defer f.condLock.Unlock()
f.condWatchCompleted.Wait()
} }
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
@ -129,8 +124,7 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool,
defer close(injectedError) defer close(injectedError)
f.WatchInjectError = injectedError f.WatchInjectError = injectedError
f.condWatchCompleted.Broadcast() f.watchCompletedChan <- true
select { select {
case <-stop: case <-stop:
return nil, etcd.ErrWatchStoppedByUser return nil, etcd.ErrWatchStoppedByUser