mirror of https://github.com/shunfei/cronsun
156 lines
2.9 KiB
Go
156 lines
2.9 KiB
Go
package cmd
|
|
|
|
import (
|
|
"archive/zip"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/spf13/cobra"
|
|
|
|
"github.com/shunfei/cronsun"
|
|
"github.com/shunfei/cronsun/conf"
|
|
)
|
|
|
|
var restoreFile string
|
|
|
|
func init() {
|
|
RestoreCmd.Flags().StringVarP(&restoreFile, "file", "f", "", "the backup zip file")
|
|
}
|
|
|
|
var RestoreCmd = &cobra.Command{
|
|
Use: "restore",
|
|
Short: "restore job & group data",
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
var err error
|
|
var ea = NewExitAction()
|
|
|
|
restoreFile = strings.TrimSpace(restoreFile)
|
|
if len(restoreFile) == 0 {
|
|
ea.Exit("backup file is required")
|
|
}
|
|
|
|
r, err := zip.OpenReader(restoreFile)
|
|
ea.ExitOnErr(err)
|
|
ea.Defer = func() {
|
|
r.Close()
|
|
}
|
|
|
|
restoreChan, wg := startRestoreProcess()
|
|
for _, f := range r.File {
|
|
var keyPrefix string
|
|
switch f.Name {
|
|
case "job":
|
|
keyPrefix = conf.Config.Cmd
|
|
case "node_group":
|
|
keyPrefix = conf.Config.Group
|
|
}
|
|
|
|
rc, err := f.Open()
|
|
ea.ExitOnErr(err)
|
|
|
|
ea.ExitOnErr(restoreKvs(rc, keyPrefix, restoreChan, wg))
|
|
rc.Close()
|
|
}
|
|
|
|
wg.Wait()
|
|
close(restoreChan)
|
|
},
|
|
}
|
|
|
|
type kv struct {
|
|
k, v string
|
|
}
|
|
|
|
var (
|
|
keyLenBuf = make([]byte, 2)
|
|
valLenBuf = make([]byte, 4)
|
|
keyBuf = make([]byte, 256)
|
|
valBuf = make([]byte, 1024)
|
|
)
|
|
|
|
func fixRead(r io.Reader, p []byte) (int, error) {
|
|
valLen, readLen := len(p), 0
|
|
for readLen != valLen {
|
|
n, err := r.Read(p[readLen:])
|
|
readLen += n
|
|
if err != nil {
|
|
return readLen, err
|
|
}
|
|
}
|
|
return readLen, nil
|
|
}
|
|
|
|
|
|
func restoreKvs(r io.Reader, keyPrefix string, storeChan chan *kv, wg *sync.WaitGroup) error {
|
|
for {
|
|
// read length of key
|
|
n, err := fixRead(r, keyLenBuf)
|
|
if err == io.EOF && n != 0 {
|
|
return fmt.Errorf("unexcepted data, the file may broken")
|
|
} else if err == io.EOF && n == 0 {
|
|
break
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
keyLen := binary.LittleEndian.Uint16(keyLenBuf)
|
|
|
|
// read key
|
|
if n, err = fixRead(r, keyBuf[:keyLen]); err != nil {
|
|
return err
|
|
}
|
|
key := keyBuf[:keyLen]
|
|
|
|
// read length of value
|
|
if n, err = fixRead(r, valLenBuf); err != nil {
|
|
return err
|
|
}
|
|
valLen := binary.LittleEndian.Uint32(valLenBuf)
|
|
|
|
// read value
|
|
if len(valBuf) < int(valLen) {
|
|
valBuf = make([]byte, valLen*2)
|
|
}
|
|
if n, err = fixRead(r, valBuf[:valLen]); err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
value := valBuf[:valLen]
|
|
|
|
wg.Add(1)
|
|
storeChan <- &kv{
|
|
k: keyPrefix + string(key),
|
|
v: string(value),
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func startRestoreProcess() (chan *kv, *sync.WaitGroup) {
|
|
c := make(chan *kv, 0)
|
|
wg := &sync.WaitGroup{}
|
|
|
|
const maxResries = 3
|
|
go func() {
|
|
for _kv := range c {
|
|
for retries := 1; retries <= maxResries; retries++ {
|
|
_, err := cronsun.DefalutClient.Put(_kv.k, _kv.v)
|
|
if err != nil {
|
|
if retries == maxResries {
|
|
fmt.Println("[Error] restore err:", err)
|
|
fmt.Println("\tKey: ", string(_kv.k))
|
|
fmt.Println("\tValue: ", string(_kv.v))
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
|
|
wg.Done()
|
|
}
|
|
}()
|
|
|
|
return c, wg
|
|
}
|