Merge pull request #15621 from mesosphere/sttts-send-initial-set-to-get-seen

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-10-16 08:04:29 -07:00
commit 4736adee8f
2 changed files with 32 additions and 5 deletions

View File

@ -152,7 +152,9 @@ func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock() s.updateLock.Lock()
defer s.updateLock.Unlock() defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes := s.merge(source, change) adds, updates, deletes := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications // deliver update notifications
switch s.mode { switch s.mode {
@ -160,7 +162,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
if len(deletes.Pods) > 0 { if len(deletes.Pods) > 0 {
s.updates <- *deletes s.updates <- *deletes
} }
if len(adds.Pods) > 0 { if len(adds.Pods) > 0 || firstSet {
s.updates <- *adds s.updates <- *adds
} }
if len(updates.Pods) > 0 { if len(updates.Pods) > 0 {
@ -168,15 +170,15 @@ func (s *podStorage) Merge(source string, change interface{}) error {
} }
case PodConfigNotificationSnapshotAndUpdates: case PodConfigNotificationSnapshotAndUpdates:
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 { if len(updates.Pods) > 0 {
s.updates <- *updates s.updates <- *updates
} }
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
}
case PodConfigNotificationSnapshot: case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source} s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
} }

View File

@ -267,6 +267,31 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
} }
func TestInitialEmptySet(t *testing.T) {
for _, test := range []struct {
mode PodConfigNotificationMode
op kubetypes.PodOperation
}{
{PodConfigNotificationIncremental, kubetypes.ADD},
{PodConfigNotificationSnapshot, kubetypes.SET},
{PodConfigNotificationSnapshotAndUpdates, kubetypes.SET},
} {
channel, ch, _ := createPodConfigTester(test.mode)
// should register an empty PodUpdate operation
podUpdate := CreatePodUpdate(kubetypes.SET, TestSource)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource))
// should ignore following empty sets
podUpdate = CreatePodUpdate(kubetypes.SET, TestSource)
channel <- podUpdate
podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource, CreateValidPod("foo", "new")))
}
}
func TestPodUpdateAnnotations(t *testing.T) { func TestPodUpdateAnnotations(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)