|
|
|
@ -298,7 +298,7 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|
|
|
|
// An anomaly with the series frontier is severe in the sense that some sort
|
|
|
|
|
// of an illegal state condition exists in the storage layer, which would
|
|
|
|
|
// probably signify an illegal disk frontier.
|
|
|
|
|
return &storage.OperatorError{err, false} |
|
|
|
|
return &storage.OperatorError{error: err, Continuable: false} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
curationState, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint) |
|
|
|
@ -307,7 +307,7 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|
|
|
|
// there was a decoding error with the entity and shouldn't be cause to stop
|
|
|
|
|
// work. The process will simply start from a pessimistic work time and
|
|
|
|
|
// work forward. With an idempotent processor, this is safe.
|
|
|
|
|
return &storage.OperatorError{err, true} |
|
|
|
|
return &storage.OperatorError{error: err, Continuable: true} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
startKey := model.SampleKey{ |
|
|
|
@ -318,12 +318,12 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|
|
|
|
prospectiveKey, err := coding.NewProtocolBuffer(startKey.ToDTO()).Encode() |
|
|
|
|
if err != nil { |
|
|
|
|
// An encoding failure of a key is no reason to stop.
|
|
|
|
|
return &storage.OperatorError{err, true} |
|
|
|
|
return &storage.OperatorError{error: err, Continuable: true} |
|
|
|
|
} |
|
|
|
|
if !w.sampleIterator.Seek(prospectiveKey) { |
|
|
|
|
// LevelDB is picky about the seek ranges. If an iterator was invalidated,
|
|
|
|
|
// no work may occur, and the iterator cannot be recovered.
|
|
|
|
|
return &storage.OperatorError{fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), false} |
|
|
|
|
return &storage.OperatorError{error: fmt.Errorf("Illegal Condition: Iterator invalidated due to seek range."), Continuable: false} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
newestAllowedSample := w.stopAt |
|
|
|
@ -335,7 +335,7 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|
|
|
|
if err != nil { |
|
|
|
|
// We can't divine the severity of a processor error without refactoring the
|
|
|
|
|
// interface.
|
|
|
|
|
return &storage.OperatorError{err, false} |
|
|
|
|
return &storage.OperatorError{error: err, Continuable: false} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err = w.refreshCurationRemark(fingerprint, lastTime) |
|
|
|
@ -343,7 +343,7 @@ func (w watermarkOperator) Operate(key, _ interface{}) (oErr *storage.OperatorEr
|
|
|
|
|
// Under the assumption that the processors are idempotent, they can be
|
|
|
|
|
// re-run; thusly, the commitment of the curation remark is no cause
|
|
|
|
|
// to cease further progress.
|
|
|
|
|
return &storage.OperatorError{err, true} |
|
|
|
|
return &storage.OperatorError{error: err, Continuable: true} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return |
|
|
|
|