Merge pull request #176 from lbbniu/fix/restore

修复任务过多导入失败问题
pull/185/head
QLeelulu 4 years ago committed by GitHub
commit 18ee99b593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -71,10 +71,23 @@ var (
valBuf = make([]byte, 1024)
)
func fixRead(r io.Reader, p []byte) (int, error) {
valLen, readLen := len(p), 0
for readLen != valLen {
n, err := r.Read(p[readLen:])
readLen += n
if err != nil {
return readLen, err
}
}
return readLen, nil
}
func restoreKvs(r io.Reader, keyPrefix string, storeChan chan *kv, wg *sync.WaitGroup) error {
for {
// read length of key
n, err := r.Read(keyLenBuf)
n, err := fixRead(r, keyLenBuf)
if err == io.EOF && n != 0 {
return fmt.Errorf("unexcepted data, the file may broken")
} else if err == io.EOF && n == 0 {
@ -82,28 +95,28 @@ func restoreKvs(r io.Reader, keyPrefix string, storeChan chan *kv, wg *sync.Wait
} else if err != nil {
return err
}
keylen := binary.LittleEndian.Uint16(keyLenBuf)
keyLen := binary.LittleEndian.Uint16(keyLenBuf)
// read key
if n, err = r.Read(keyBuf[:keylen]); err != nil {
if n, err = fixRead(r, keyBuf[:keyLen]); err != nil {
return err
}
key := keyBuf[:keylen]
key := keyBuf[:keyLen]
// read length of value
if n, err = r.Read(valLenBuf); err != nil {
if n, err = fixRead(r, valLenBuf); err != nil {
return err
}
vallen := binary.LittleEndian.Uint32(valLenBuf)
valLen := binary.LittleEndian.Uint32(valLenBuf)
// read value
if len(valBuf) < int(vallen) {
valBuf = make([]byte, vallen*2)
if len(valBuf) < int(valLen) {
valBuf = make([]byte, valLen*2)
}
if n, err = r.Read(valBuf[:vallen]); err != nil && err != io.EOF {
if n, err = fixRead(r, valBuf[:valLen]); err != nil && err != io.EOF {
return err
}
value := valBuf[:vallen]
value := valBuf[:valLen]
wg.Add(1)
storeChan <- &kv{

Loading…
Cancel
Save