@ -369,7 +369,7 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
// there was a decoding error with the entity and shouldn't be cause to stop
// there was a decoding error with the entity and shouldn't be cause to stop
// work. The process will simply start from a pessimistic work time and
// work. The process will simply start from a pessimistic work time and
// work forward. With an idempotent processor, this is safe.
// work forward. With an idempotent processor, this is safe.
return & storage . OperatorError { e rror: err , Continuable : true }
return & storage . OperatorError { E rror: err , Continuable : true }
}
}
keySet , _ := w . sampleKeys . Get ( )
keySet , _ := w . sampleKeys . Get ( )
@ -400,13 +400,13 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
if seeker . err != nil {
if seeker . err != nil {
glog . Warningf ( "Got error in state machine: %s" , seeker . err )
glog . Warningf ( "Got error in state machine: %s" , seeker . err )
return & storage . OperatorError { e rror: seeker . err , Continuable : ! seeker . iteratorInvalid }
return & storage . OperatorError { E rror: seeker . err , Continuable : ! seeker . iteratorInvalid }
}
}
if seeker . iteratorInvalid {
if seeker . iteratorInvalid {
glog . Warningf ( "Got illegal iterator in state machine: %s" , err )
glog . Warningf ( "Got illegal iterator in state machine: %s" , err )
return & storage . OperatorError { e rror: errIllegalIterator , Continuable : false }
return & storage . OperatorError { E rror: errIllegalIterator , Continuable : false }
}
}
if ! seeker . seriesOperable {
if ! seeker . seriesOperable {
@ -417,7 +417,7 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
if err != nil {
if err != nil {
// We can't divine the severity of a processor error without refactoring the
// We can't divine the severity of a processor error without refactoring the
// interface.
// interface.
return & storage . OperatorError { e rror: err , Continuable : false }
return & storage . OperatorError { E rror: err , Continuable : false }
}
}
if err = w . curationState . Update ( & curationKey {
if err = w . curationState . Update ( & curationKey {
@ -429,7 +429,7 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
// Under the assumption that the processors are idempotent, they can be
// Under the assumption that the processors are idempotent, they can be
// re-run; thusly, the commitment of the curation remark is no cause
// re-run; thusly, the commitment of the curation remark is no cause
// to cease further progress.
// to cease further progress.
return & storage . OperatorError { e rror: err , Continuable : true }
return & storage . OperatorError { E rror: err , Continuable : true }
}
}
return nil
return nil